Hi Flink group,
今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。 问题1:Flink SQL 读Hive 表pokes 失败 Flink SQL> select * from pokes; 2020-05-26 16:12:11,439 INFO org.apache.hadoop.mapred.FileInputFormat - Total input paths to process : 4 [ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 file=/user/hive/warehouse/pokes/kv1.txt 问题2:Flink SQL 写Hive 表pokes 失败 Flink SQL> insert into pokes select 12,'tom'; [INFO] Submitting SQL update statement to the cluster... [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. Cheers, Enzo |
问题1:
org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs 命令看看那个datanode能不能访问 问题2: 写hive,需要用batch模式,set execution.type=batch; 在 2020-05-26 16:42:12,"Enzo wang" <[hidden email]> 写道: Hi Flink group, 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。 参考的网址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html 版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b 问题1:Flink SQL 读Hive 表pokes 失败 Flink SQL> select * from pokes; 2020-05-26 16:12:11,439 INFO org.apache.hadoop.mapred.FileInputFormat - Total input paths to process : 4 [ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 file=/user/hive/warehouse/pokes/kv1.txt 问题2:Flink SQL 写Hive 表pokes 失败 Flink SQL> insert into pokes select 12,'tom'; [INFO] Submitting SQL update statement to the cluster... [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. Cheers, Enzo |
Hi Wldd, 谢谢回复。 ❯ docker-compose exec namenode hadoop fs -ls /tmp Found 1 items drwx-wx-wx - root supergroup 0 2020-05-26 05:40 /tmp/hive namenode 的webui 也可以看到: 2. 设置set execution.type=batch; 以后,执行报错,错误如下 Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and 1 node(s) are excluded in this operation. 完整错误见: On Tue, 26 May 2020 at 16:52, wldd <[hidden email]> wrote: 问题1: |
Hi,Enzo wang
图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么 -- Best, wldd 在 2020-05-26 17:01:32,"Enzo wang" <[hidden email]> 写道: Hi Wldd, 谢谢回复。 1. datanode 是可用的 ❯ docker-compose exec namenode hadoop fs -ls /tmp Found 1 items drwx-wx-wx - root supergroup 0 2020-05-26 05:40 /tmp/hive namenode 的webui 也可以看到: 2. 设置set execution.type=batch; 以后,执行报错,错误如下 Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and1 node(s) are excluded inthis operation. 完整错误见: https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673 On Tue, 26 May 2020 at 16:52, wldd <[hidden email]> wrote: 问题1: org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs 命令看看那个datanode能不能访问 问题2: 写hive,需要用batch模式,set execution.type=batch; 在 2020-05-26 16:42:12,"Enzo wang" <[hidden email]> 写道: Hi Flink group, 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。 参考的网址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html 版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b 问题1:Flink SQL 读Hive 表pokes 失败 Flink SQL> select * from pokes; 2020-05-26 16:12:11,439 INFO org.apache.hadoop.mapred.FileInputFormat - Total input paths to process : 4 [ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 file=/user/hive/warehouse/pokes/kv1.txt 问题2:Flink SQL 写Hive 表pokes 失败 Flink SQL> insert into pokes select 12,'tom'; [INFO] Submitting SQL update statement to the cluster... [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. Cheers, Enzo |
Hi Wldd,
Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。 还需要什么信息我再提供。 ======== flink insert into hive error ======== org.apache.flink.table.api.TableException: Exception in close at org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131) at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109) at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.base/java.lang.Thread.run(Thread.java:830) Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and 1 node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213) at org.apache.hadoop.ipc.Client.call(Client.java:1476) at org.apache.hadoop.ipc.Client.call(Client.java:1413) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy22.addBlock(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:567) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy23.addBlock(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554) ======== Flink 1.10.0 的lib目录 ======== mysql-connector-java-5.1.48.jar slf4j-log4j12-1.7.15.jar log4j-1.2.17.jar flink-table_2.11-1.10.0.jar flink-table-blink_2.11-1.10.0.jar flink-dist_2.11-1.10.0.jar flink-jdbc_2.11-1.10.0.jar flink-sql-connector-elasticsearch6_2.11-1.10.0.jar flink-sql-connector-kafka_2.11-1.10.0.jar flink-json-1.10.0.jar flink-connector-hive_2.11-1.10.0.jar flink-shaded-hadoop-2-uber-2.7.5-10.0.jar hive-exec-2.3.7.jar flink-csv-1.10.1.jar ======== Hive table "pokes" ======== ❯ docker-compose exec hive-server bash root@53082ed70ecd:/opt# /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Connecting to jdbc:hive2://localhost:10000 Connected to: Apache Hive (version 2.3.2) Driver: Hive JDBC (version 2.3.2) Transaction isolation: TRANSACTION_REPEATABLE_READ Beeline version 2.3.2 by Apache Hive 0: jdbc:hive2://localhost:10000> describe formatted pokes; +-------------------------------+----------------------------------------------------+-----------------------+ | col_name | data_type | comment | +-------------------------------+----------------------------------------------------+-----------------------+ | # col_name | data_type | comment | | | NULL | NULL | | foo | int | | | bar | string | | | | NULL | NULL | | # Detailed Table Information | NULL | NULL | | Database: | default | NULL | | Owner: | root | NULL | | CreateTime: | Tue May 26 05:42:30 UTC 2020 | NULL | | LastAccessTime: | UNKNOWN | NULL | | Retention: | 0 | NULL | | Location: | hdfs://namenode:8020/user/hive/warehouse/pokes | NULL | | Table Type: | MANAGED_TABLE | NULL | | Table Parameters: | NULL | NULL | | | numFiles | 4 | | | numRows | 0 | | | rawDataSize | 0 | | | totalSize | 5839 | | | transient_lastDdlTime | 1590480090 | | | NULL | NULL | | # Storage Information | NULL | NULL | | SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL | | InputFormat: | org.apache.hadoop.mapred.TextInputFormat | NULL | | OutputFormat: | org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL | | Compressed: | No | NULL | | Num Buckets: | -1 | NULL | | Bucket Columns: | [] | NULL | | Sort Columns: | [] | NULL | | Storage Desc Params: | NULL | NULL | | | serialization.format | 1 | +-------------------------------+----------------------------------------------------+-----------------------+ 30 rows selected (0.328 seconds) 0: jdbc:hive2://localhost:10000> 0: jdbc:hive2://localhost:10000> select * from pokes limit 10; +------------+------------+ | pokes.foo | pokes.bar | +------------+------------+ | 25 | Tommy | | 26 | Tommy | | 27 | Tommy | | 238 | val_238 | | 86 | val_86 | | 311 | val_311 | | 27 | val_27 | | 165 | val_165 | | 409 | val_409 | | 255 | val_255 | +------------+------------+ 10 rows selected (0.622 seconds) 0: jdbc:hive2://localhost:10000> ======== Hive table "pokes" in Flink ======== Flink SQL> describe pokes; root |-- foo: INT |-- bar: STRING ======== hadoop/hive 环境 ======== version: "3" services: namenode: image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8 volumes: - namenode:/hadoop/dfs/name environment: - CLUSTER_NAME=test env_file: - ./hadoop-hive.env ports: - "50070:50070" - "8020:8020" datanode: image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8 volumes: - datanode:/hadoop/dfs/data env_file: - ./hadoop-hive.env environment: SERVICE_PRECONDITION: "namenode:50070" ports: - "50075:50075" hive-server: image: bde2020/hive:2.3.2-postgresql-metastore env_file: - ./hadoop-hive.env environment: HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore" SERVICE_PRECONDITION: "hive-metastore:9083" ports: - "10000:10000" hive-metastore: image: bde2020/hive:2.3.2-postgresql-metastore env_file: - ./hadoop-hive.env command: /opt/hive/bin/hive --service metastore environment: SERVICE_PRECONDITION: "namenode:50070 datanode:50075 hive-metastore-postgresql:5432" ports: - "9083:9083" hive-metastore-postgresql: image: bde2020/hive-metastore-postgresql:2.3.0 ports: - "5432:5432" presto-coordinator: image: shawnzhu/prestodb:0.181 ports: - "8080:8080" volumes: namenode: datanode: ======== hive-site.xml ======== <configuration> <property> <name>hive.metastore.uris</name> <value>thrift://localhost:9083</value> </property> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:postgresql://localhost/metastore?createDatabaseIfNotExist=true</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>org.postgresql.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>hive</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>hive</value> </property> <property> <name>hive.metastore.schema.verification</name> <value>true</value> </property> </configuration> ======== sql-client-defaults.yaml ======== ################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ # This file defines the default environment for Flink's SQL Client. # Defaults might be overwritten by a session specific environment. # See the Table API & SQL documentation for details about supported properties. #============================================================================== # Tables #============================================================================== # Define tables here such as sources, sinks, views, or temporal tables. tables: [] # empty list # A typical table source definition looks like: # - name: ... # type: source-table # connector: ... # format: ... # schema: ... # A typical view definition looks like: # - name: ... # type: view # query: "SELECT ..." # A typical temporal table definition looks like: # - name: ... # type: temporal-table # history-table: ... # time-attribute: ... # primary-key: ... #============================================================================== # User-defined functions #============================================================================== # Define scalar, aggregate, or table functions here. functions: [] # empty list # A typical function definition looks like: # - name: ... # from: class # class: ... # constructor: ... #============================================================================== # Catalogs #============================================================================== # Define catalogs here. catalogs: - name: myhive type: hive hive-conf-dir: /Users/enzow/code/flink-sql-demo/flink-1.10.0/conf hive-version: 2.3.2 #============================================================================== # Modules #============================================================================== # Define modules here. #modules: # note the following modules will be of the order they are specified # - name: core # type: core #============================================================================== # Execution properties #============================================================================== # Properties that change the fundamental execution behavior of a table program. execution: # select the implementation responsible for planning table programs # possible values are 'blink' (used by default) or 'old' planner: blink # 'batch' or 'streaming' execution type: streaming # allow 'event-time' or only 'processing-time' in sources time-characteristic: event-time # interval in ms for emitting periodic watermarks periodic-watermarks-interval: 200 # 'changelog' or 'table' presentation of results result-mode: table # maximum number of maintained rows in 'table' presentation of results max-table-result-rows: 1000000 # parallelism of the program parallelism: 1 # maximum parallelism max-parallelism: 128 # minimum idle state retention in ms min-idle-state-retention: 0 # maximum idle state retention in ms max-idle-state-retention: 0 # current catalog ('default_catalog' by default) current-catalog: default_catalog # current database of the current catalog (default database of the catalog by default) current-database: default_database # controls how table programs are restarted in case of a failures restart-strategy: # strategy type # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default) type: fallback #============================================================================== # Configuration options #============================================================================== # Configuration options for adjusting and tuning table programs. # A full list of options and their default values can be found # on the dedicated "Configuration" web page. # A configuration can look like: # configuration: # table.exec.spill-compression.enabled: true # table.exec.spill-compression.block-size: 128kb # table.optimizer.join-reorder-enabled: true #============================================================================== # Deployment properties #============================================================================== # Properties that describe the cluster to which table programs are submitted to. deployment: # general cluster communication timeout in ms response-timeout: 5000 # (optional) address from cluster to gateway gateway-address: "" # (optional) port from cluster to gateway gateway-port: 0 Cheers, Enzo On Tue, 26 May 2020 at 17:15, wldd <[hidden email]> wrote: > Hi,Enzo wang > 图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么 > > > > > > > > > > > > > > -- > > Best, > wldd > > > > > 在 2020-05-26 17:01:32,"Enzo wang" <[hidden email]> 写道: > > Hi Wldd, > > > 谢谢回复。 > > > 1. datanode 是可用的 > > > ❯ docker-compose exec namenode hadoop fs -ls /tmp > Found 1 items > drwx-wx-wx - root supergroup 0 2020-05-26 05:40 /tmp/hive > > > namenode 的webui 也可以看到: > > > > > 2. 设置set execution.type=batch; 以后,执行报错,错误如下 > Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException): > File > /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 > could only be replicated to 0 nodes instead of minReplication (=1). There > are 1 datanode(s) running and1 node(s) are excluded inthis operation. > > > 完整错误见: > https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673 > > > > On Tue, 26 May 2020 at 16:52, wldd <[hidden email]> wrote: > > 问题1: > > org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs > 命令看看那个datanode能不能访问 > > > 问题2: > 写hive,需要用batch模式,set execution.type=batch; > > > > > > > > 在 2020-05-26 16:42:12,"Enzo wang" <[hidden email]> 写道: > > Hi Flink group, > > > 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。 > 参考的网址: > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html > > > 版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b > > > 问题1:Flink SQL 读Hive 表pokes 失败 > > > Flink SQL> select * from pokes; > 2020-05-26 16:12:11,439 INFO org.apache.hadoop.mapred.FileInputFormat > - Total input paths to process : 4 > [ERROR] Could not execute SQL statement. Reason: > org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: > BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 > file=/user/hive/warehouse/pokes/kv1.txt > > > > > > > > 问题2:Flink SQL 写Hive 表pokes 失败 > > > Flink SQL> insert into pokes select 12,'tom'; > [INFO] Submitting SQL update statement to the cluster... > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: Stream Tables can only be > emitted by AppendStreamTableSink, RetractStreamTableSink, or > UpsertStreamTableSink. > > > > > > > > Cheers, > Enzo |
hive写数据测了么,按照你提供的异常信息,显示的是hdfs的问题
-- Best, wldd 在 2020-05-26 17:49:56,"Enzo wang" <[hidden email]> 写道: >Hi Wldd, > >Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。 > >还需要什么信息我再提供。 > > > >======== flink insert into hive error ======== > >org.apache.flink.table.api.TableException: Exception in close > at org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131) > at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97) > at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109) > at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635) > at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.base/java.lang.Thread.run(Thread.java:830) >Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): >File /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 >could only be replicated to 0 nodes instead of minReplication (=1). >There are 1 datanode(s) running and 1 node(s) are excluded in this >operation. > at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045) > at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725) > at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493) > at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213) > > at org.apache.hadoop.ipc.Client.call(Client.java:1476) > at org.apache.hadoop.ipc.Client.call(Client.java:1413) > at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy22.addBlock(Unknown Source) > at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >Method) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy23.addBlock(Unknown Source) > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588) > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373) > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554) > >======== Flink 1.10.0 的lib目录 ======== > >mysql-connector-java-5.1.48.jar >slf4j-log4j12-1.7.15.jar >log4j-1.2.17.jar >flink-table_2.11-1.10.0.jar >flink-table-blink_2.11-1.10.0.jar >flink-dist_2.11-1.10.0.jar >flink-jdbc_2.11-1.10.0.jar >flink-sql-connector-elasticsearch6_2.11-1.10.0.jar >flink-sql-connector-kafka_2.11-1.10.0.jar >flink-json-1.10.0.jar >flink-connector-hive_2.11-1.10.0.jar >flink-shaded-hadoop-2-uber-2.7.5-10.0.jar >hive-exec-2.3.7.jar >flink-csv-1.10.1.jar > > >======== Hive table "pokes" ======== > >❯ docker-compose exec hive-server bash >root@53082ed70ecd:/opt# /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000 >SLF4J: Class path contains multiple SLF4J bindings. >SLF4J: Found binding in >[jar:file:/opt/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] >SLF4J: Found binding in >[jar:file:/opt/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] >SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. >SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] >Connecting to jdbc:hive2://localhost:10000 >Connected to: Apache Hive (version 2.3.2) >Driver: Hive JDBC (version 2.3.2) >Transaction isolation: TRANSACTION_REPEATABLE_READ >Beeline version 2.3.2 by Apache Hive >0: jdbc:hive2://localhost:10000> describe formatted pokes; >+-------------------------------+----------------------------------------------------+-----------------------+ >| col_name | data_type > | comment | >+-------------------------------+----------------------------------------------------+-----------------------+ >| # col_name | data_type > | comment | >| | NULL > | NULL | >| foo | int > | | >| bar | string > | | >| | NULL > | NULL | >| # Detailed Table Information | NULL > | NULL | >| Database: | default > | NULL | >| Owner: | root > | NULL | >| CreateTime: | Tue May 26 05:42:30 UTC 2020 > | NULL | >| LastAccessTime: | UNKNOWN > | NULL | >| Retention: | 0 > | NULL | >| Location: | >hdfs://namenode:8020/user/hive/warehouse/pokes | NULL > | >| Table Type: | MANAGED_TABLE > | NULL | >| Table Parameters: | NULL > | NULL | >| | numFiles > | 4 | >| | numRows > | 0 | >| | rawDataSize > | 0 | >| | totalSize > | 5839 | >| | transient_lastDdlTime > | 1590480090 | >| | NULL > | NULL | >| # Storage Information | NULL > | NULL | >| SerDe Library: | >org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL > | >| InputFormat: | >org.apache.hadoop.mapred.TextInputFormat | NULL > | >| OutputFormat: | >org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL > | >| Compressed: | No > | NULL | >| Num Buckets: | -1 > | NULL | >| Bucket Columns: | [] > | NULL | >| Sort Columns: | [] > | NULL | >| Storage Desc Params: | NULL > | NULL | >| | serialization.format > | 1 | >+-------------------------------+----------------------------------------------------+-----------------------+ >30 rows selected (0.328 seconds) >0: jdbc:hive2://localhost:10000> > >0: jdbc:hive2://localhost:10000> select * from pokes limit 10; >+------------+------------+ >| pokes.foo | pokes.bar | >+------------+------------+ >| 25 | Tommy | >| 26 | Tommy | >| 27 | Tommy | >| 238 | val_238 | >| 86 | val_86 | >| 311 | val_311 | >| 27 | val_27 | >| 165 | val_165 | >| 409 | val_409 | >| 255 | val_255 | >+------------+------------+ >10 rows selected (0.622 seconds) >0: jdbc:hive2://localhost:10000> > > >======== Hive table "pokes" in Flink ======== > >Flink SQL> describe pokes; >root > |-- foo: INT > |-- bar: STRING > > >======== hadoop/hive 环境 ======== > >version: "3" > >services: > namenode: > image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8 > volumes: > - namenode:/hadoop/dfs/name > environment: > - CLUSTER_NAME=test > env_file: > - ./hadoop-hive.env > ports: > - "50070:50070" > - "8020:8020" > datanode: > image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8 > volumes: > - datanode:/hadoop/dfs/data > env_file: > - ./hadoop-hive.env > environment: > SERVICE_PRECONDITION: "namenode:50070" > ports: > - "50075:50075" > hive-server: > image: bde2020/hive:2.3.2-postgresql-metastore > env_file: > - ./hadoop-hive.env > environment: > HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: >"jdbc:postgresql://hive-metastore/metastore" > SERVICE_PRECONDITION: "hive-metastore:9083" > ports: > - "10000:10000" > hive-metastore: > image: bde2020/hive:2.3.2-postgresql-metastore > env_file: > - ./hadoop-hive.env > command: /opt/hive/bin/hive --service metastore > environment: > SERVICE_PRECONDITION: "namenode:50070 datanode:50075 >hive-metastore-postgresql:5432" > ports: > - "9083:9083" > hive-metastore-postgresql: > image: bde2020/hive-metastore-postgresql:2.3.0 > ports: > - "5432:5432" > presto-coordinator: > image: shawnzhu/prestodb:0.181 > ports: > - "8080:8080" > >volumes: > namenode: > datanode: > > >======== hive-site.xml ======== > ><configuration> > <property> > <name>hive.metastore.uris</name> > <value>thrift://localhost:9083</value> > </property> > <property> > <name>javax.jdo.option.ConnectionURL</name> > <value>jdbc:postgresql://localhost/metastore?createDatabaseIfNotExist=true</value> > </property> > <property> > <name>javax.jdo.option.ConnectionDriverName</name> > <value>org.postgresql.Driver</value> > </property> > <property> > <name>javax.jdo.option.ConnectionPassword</name> > <value>hive</value> > </property> > <property> > <name>javax.jdo.option.ConnectionUserName</name> > <value>hive</value> > </property> > <property> > <name>hive.metastore.schema.verification</name> > <value>true</value> > </property> ></configuration> > > >======== sql-client-defaults.yaml ======== > >################################################################################ ># Licensed to the Apache Software Foundation (ASF) under one ># or more contributor license agreements. See the NOTICE file ># distributed with this work for additional information ># regarding copyright ownership. The ASF licenses this file ># to you under the Apache License, Version 2.0 (the ># "License"); you may not use this file except in compliance ># with the License. You may obtain a copy of the License at ># ># http://www.apache.org/licenses/LICENSE-2.0 ># ># Unless required by applicable law or agreed to in writing, software ># distributed under the License is distributed on an "AS IS" BASIS, ># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ># See the License for the specific language governing permissions and ># limitations under the License. >################################################################################ > > ># This file defines the default environment for Flink's SQL Client. ># Defaults might be overwritten by a session specific environment. > > ># See the Table API & SQL documentation for details about supported properties. > > >#============================================================================== ># Tables >#============================================================================== > ># Define tables here such as sources, sinks, views, or temporal tables. > >tables: [] # empty list ># A typical table source definition looks like: ># - name: ... ># type: source-table ># connector: ... ># format: ... ># schema: ... > ># A typical view definition looks like: ># - name: ... ># type: view ># query: "SELECT ..." > ># A typical temporal table definition looks like: ># - name: ... ># type: temporal-table ># history-table: ... ># time-attribute: ... ># primary-key: ... > > >#============================================================================== ># User-defined functions >#============================================================================== > ># Define scalar, aggregate, or table functions here. > >functions: [] # empty list ># A typical function definition looks like: ># - name: ... ># from: class ># class: ... ># constructor: ... > > >#============================================================================== ># Catalogs >#============================================================================== > ># Define catalogs here. > >catalogs: > - name: myhive > type: hive > hive-conf-dir: /Users/enzow/code/flink-sql-demo/flink-1.10.0/conf > hive-version: 2.3.2 > > >#============================================================================== ># Modules >#============================================================================== > ># Define modules here. > >#modules: # note the following modules will be of the order they are specified ># - name: core ># type: core > >#============================================================================== ># Execution properties >#============================================================================== > ># Properties that change the fundamental execution behavior of a table program. > >execution: > # select the implementation responsible for planning table programs > # possible values are 'blink' (used by default) or 'old' > planner: blink > # 'batch' or 'streaming' execution > type: streaming > # allow 'event-time' or only 'processing-time' in sources > time-characteristic: event-time > # interval in ms for emitting periodic watermarks > periodic-watermarks-interval: 200 > # 'changelog' or 'table' presentation of results > result-mode: table > # maximum number of maintained rows in 'table' presentation of results > max-table-result-rows: 1000000 > # parallelism of the program > parallelism: 1 > # maximum parallelism > max-parallelism: 128 > # minimum idle state retention in ms > min-idle-state-retention: 0 > # maximum idle state retention in ms > max-idle-state-retention: 0 > # current catalog ('default_catalog' by default) > current-catalog: default_catalog > # current database of the current catalog (default database of the >catalog by default) > current-database: default_database > # controls how table programs are restarted in case of a failures > restart-strategy: > # strategy type > # possible values are "fixed-delay", "failure-rate", "none", or >"fallback" (default) > type: fallback > >#============================================================================== ># Configuration options >#============================================================================== > ># Configuration options for adjusting and tuning table programs. > ># A full list of options and their default values can be found ># on the dedicated "Configuration" web page. > ># A configuration can look like: ># configuration: ># table.exec.spill-compression.enabled: true ># table.exec.spill-compression.block-size: 128kb ># table.optimizer.join-reorder-enabled: true > >#============================================================================== ># Deployment properties >#============================================================================== > ># Properties that describe the cluster to which table programs are submitted to. > >deployment: > # general cluster communication timeout in ms > response-timeout: 5000 > # (optional) address from cluster to gateway > gateway-address: "" > # (optional) port from cluster to gateway > gateway-port: 0 > > > > > >Cheers, >Enzo > > >On Tue, 26 May 2020 at 17:15, wldd <[hidden email]> wrote: > >> Hi,Enzo wang >> 图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么 >> >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> Best, >> wldd >> >> >> >> >> 在 2020-05-26 17:01:32,"Enzo wang" <[hidden email]> 写道: >> >> Hi Wldd, >> >> >> 谢谢回复。 >> >> >> 1. datanode 是可用的 >> >> >> ❯ docker-compose exec namenode hadoop fs -ls /tmp >> Found 1 items >> drwx-wx-wx - root supergroup 0 2020-05-26 05:40 /tmp/hive >> >> >> namenode 的webui 也可以看到: >> >> >> >> >> 2. 设置set execution.type=batch; 以后,执行报错,错误如下 >> Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException): >> File >> /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 >> could only be replicated to 0 nodes instead of minReplication (=1). There >> are 1 datanode(s) running and1 node(s) are excluded inthis operation. >> >> >> 完整错误见: >> https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673 >> >> >> >> On Tue, 26 May 2020 at 16:52, wldd <[hidden email]> wrote: >> >> 问题1: >> >> org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs >> 命令看看那个datanode能不能访问 >> >> >> 问题2: >> 写hive,需要用batch模式,set execution.type=batch; >> >> >> >> >> >> >> >> 在 2020-05-26 16:42:12,"Enzo wang" <[hidden email]> 写道: >> >> Hi Flink group, >> >> >> 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。 >> 参考的网址: >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html >> >> >> 版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b >> >> >> 问题1:Flink SQL 读Hive 表pokes 失败 >> >> >> Flink SQL> select * from pokes; >> 2020-05-26 16:12:11,439 INFO org.apache.hadoop.mapred.FileInputFormat >> - Total input paths to process : 4 >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: >> BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 >> file=/user/hive/warehouse/pokes/kv1.txt >> >> >> >> >> >> >> >> 问题2:Flink SQL 写Hive 表pokes 失败 >> >> >> Flink SQL> insert into pokes select 12,'tom'; >> [INFO] Submitting SQL update statement to the cluster... >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.flink.table.api.TableException: Stream Tables can only be >> emitted by AppendStreamTableSink, RetractStreamTableSink, or >> UpsertStreamTableSink. >> >> >> >> >> >> >> >> Cheers, >> Enzo |
Hi Wldd,
Hive 写测试了,没问题。 0: jdbc:hive2://localhost:10000> select count(*) from pokes; WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. +------+ | _c0 | +------+ | 504 | +------+ 1 row selected (41.794 seconds) 0: jdbc:hive2://localhost:10000> INSERT INTO Pokes values( 200,'Kitty'); WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. No rows affected (2.523 seconds) 0: jdbc:hive2://localhost:10000> select count(*) from pokes; WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. +------+ | _c0 | +------+ | 505 | +------+ 1 row selected (1.7 seconds) Cheers, Enzo On Tue, 26 May 2020 at 18:14, wldd <[hidden email]> wrote: > hive写数据测了么,按照你提供的异常信息,显示的是hdfs的问题 > > > > > > > > > > > -- > > Best, > wldd > > > > > > 在 2020-05-26 17:49:56,"Enzo wang" <[hidden email]> 写道: > >Hi Wldd, > > > >Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。 > > > >还需要什么信息我再提供。 > > > > > > > >======== flink insert into hive error ======== > > > >org.apache.flink.table.api.TableException: Exception in close > > at > org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131) > > at > org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97) > > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > > at java.base/java.lang.Thread.run(Thread.java:830) > >Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): > >File > /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 > >could only be replicated to 0 nodes instead of minReplication (=1). > >There are 1 datanode(s) running and 1 node(s) are excluded in this > >operation. > > at > org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628) > > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121) > > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045) > > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725) > > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493) > > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) > > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217) > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:422) > > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) > > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213) > > > > at org.apache.hadoop.ipc.Client.call(Client.java:1476) > > at org.apache.hadoop.ipc.Client.call(Client.java:1413) > > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > > at com.sun.proxy.$Proxy22.addBlock(Unknown Source) > > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418) > > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > >Method) > > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > > at com.sun.proxy.$Proxy23.addBlock(Unknown Source) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554) > > > >======== Flink 1.10.0 的lib目录 ======== > > > >mysql-connector-java-5.1.48.jar > >slf4j-log4j12-1.7.15.jar > >log4j-1.2.17.jar > >flink-table_2.11-1.10.0.jar > >flink-table-blink_2.11-1.10.0.jar > >flink-dist_2.11-1.10.0.jar > >flink-jdbc_2.11-1.10.0.jar > >flink-sql-connector-elasticsearch6_2.11-1.10.0.jar > >flink-sql-connector-kafka_2.11-1.10.0.jar > >flink-json-1.10.0.jar > >flink-connector-hive_2.11-1.10.0.jar > >flink-shaded-hadoop-2-uber-2.7.5-10.0.jar > >hive-exec-2.3.7.jar > >flink-csv-1.10.1.jar > > > > > >======== Hive table "pokes" ======== > > > >❯ docker-compose exec hive-server bash > >root@53082ed70ecd:/opt# /opt/hive/bin/beeline -u > jdbc:hive2://localhost:10000 > >SLF4J: Class path contains multiple SLF4J bindings. > >SLF4J: Found binding in > > >[jar:file:/opt/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >SLF4J: Found binding in > > >[jar:file:/opt/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > >SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory] > >Connecting to jdbc:hive2://localhost:10000 > >Connected to: Apache Hive (version 2.3.2) > >Driver: Hive JDBC (version 2.3.2) > >Transaction isolation: TRANSACTION_REPEATABLE_READ > >Beeline version 2.3.2 by Apache Hive > >0: jdbc:hive2://localhost:10000> describe formatted pokes; > > >+-------------------------------+----------------------------------------------------+-----------------------+ > >| col_name | data_type > > | comment | > > >+-------------------------------+----------------------------------------------------+-----------------------+ > >| # col_name | data_type > > | comment | > >| | NULL > > | NULL | > >| foo | int > > | | > >| bar | string > > | | > >| | NULL > > | NULL | > >| # Detailed Table Information | NULL > > | NULL | > >| Database: | default > > | NULL | > >| Owner: | root > > | NULL | > >| CreateTime: | Tue May 26 05:42:30 UTC 2020 > > | NULL | > >| LastAccessTime: | UNKNOWN > > | NULL | > >| Retention: | 0 > > | NULL | > >| Location: | > >hdfs://namenode:8020/user/hive/warehouse/pokes | NULL > > | > >| Table Type: | MANAGED_TABLE > > | NULL | > >| Table Parameters: | NULL > > | NULL | > >| | numFiles > > | 4 | > >| | numRows > > | 0 | > >| | rawDataSize > > | 0 | > >| | totalSize > > | 5839 | > >| | transient_lastDdlTime > > | 1590480090 | > >| | NULL > > | NULL | > >| # Storage Information | NULL > > | NULL | > >| SerDe Library: | > >org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL > > | > >| InputFormat: | > >org.apache.hadoop.mapred.TextInputFormat | NULL > > | > >| OutputFormat: | > >org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL > > | > >| Compressed: | No > > | NULL | > >| Num Buckets: | -1 > > | NULL | > >| Bucket Columns: | [] > > | NULL | > >| Sort Columns: | [] > > | NULL | > >| Storage Desc Params: | NULL > > | NULL | > >| | serialization.format > > | 1 | > > >+-------------------------------+----------------------------------------------------+-----------------------+ > >30 rows selected (0.328 seconds) > >0: jdbc:hive2://localhost:10000> > > > >0: jdbc:hive2://localhost:10000> select * from pokes limit 10; > >+------------+------------+ > >| pokes.foo | pokes.bar | > >+------------+------------+ > >| 25 | Tommy | > >| 26 | Tommy | > >| 27 | Tommy | > >| 238 | val_238 | > >| 86 | val_86 | > >| 311 | val_311 | > >| 27 | val_27 | > >| 165 | val_165 | > >| 409 | val_409 | > >| 255 | val_255 | > >+------------+------------+ > >10 rows selected (0.622 seconds) > >0: jdbc:hive2://localhost:10000> > > > > > >======== Hive table "pokes" in Flink ======== > > > >Flink SQL> describe pokes; > >root > > |-- foo: INT > > |-- bar: STRING > > > > > >======== hadoop/hive 环境 ======== > > > >version: "3" > > > >services: > > namenode: > > image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8 > > volumes: > > - namenode:/hadoop/dfs/name > > environment: > > - CLUSTER_NAME=test > > env_file: > > - ./hadoop-hive.env > > ports: > > - "50070:50070" > > - "8020:8020" > > datanode: > > image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8 > > volumes: > > - datanode:/hadoop/dfs/data > > env_file: > > - ./hadoop-hive.env > > environment: > > SERVICE_PRECONDITION: "namenode:50070" > > ports: > > - "50075:50075" > > hive-server: > > image: bde2020/hive:2.3.2-postgresql-metastore > > env_file: > > - ./hadoop-hive.env > > environment: > > HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: > >"jdbc:postgresql://hive-metastore/metastore" > > SERVICE_PRECONDITION: "hive-metastore:9083" > > ports: > > - "10000:10000" > > hive-metastore: > > image: bde2020/hive:2.3.2-postgresql-metastore > > env_file: > > - ./hadoop-hive.env > > command: /opt/hive/bin/hive --service metastore > > environment: > > SERVICE_PRECONDITION: "namenode:50070 datanode:50075 > >hive-metastore-postgresql:5432" > > ports: > > - "9083:9083" > > hive-metastore-postgresql: > > image: bde2020/hive-metastore-postgresql:2.3.0 > > ports: > > - "5432:5432" > > presto-coordinator: > > image: shawnzhu/prestodb:0.181 > > ports: > > - "8080:8080" > > > >volumes: > > namenode: > > datanode: > > > > > >======== hive-site.xml ======== > > > ><configuration> > > <property> > > <name>hive.metastore.uris</name> > > <value>thrift://localhost:9083</value> > > </property> > > <property> > > <name>javax.jdo.option.ConnectionURL</name> > > > <value>jdbc:postgresql://localhost/metastore?createDatabaseIfNotExist=true</value> > > </property> > > <property> > > <name>javax.jdo.option.ConnectionDriverName</name> > > <value>org.postgresql.Driver</value> > > </property> > > <property> > > <name>javax.jdo.option.ConnectionPassword</name> > > <value>hive</value> > > </property> > > <property> > > <name>javax.jdo.option.ConnectionUserName</name> > > <value>hive</value> > > </property> > > <property> > > <name>hive.metastore.schema.verification</name> > > <value>true</value> > > </property> > ></configuration> > > > > > >======== sql-client-defaults.yaml ======== > > > > >################################################################################ > ># Licensed to the Apache Software Foundation (ASF) under one > ># or more contributor license agreements. See the NOTICE file > ># distributed with this work for additional information > ># regarding copyright ownership. The ASF licenses this file > ># to you under the Apache License, Version 2.0 (the > ># "License"); you may not use this file except in compliance > ># with the License. You may obtain a copy of the License at > ># > ># http://www.apache.org/licenses/LICENSE-2.0 > ># > ># Unless required by applicable law or agreed to in writing, software > ># distributed under the License is distributed on an "AS IS" BASIS, > ># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > ># See the License for the specific language governing permissions and > ># limitations under the License. > > >################################################################################ > > > > > ># This file defines the default environment for Flink's SQL Client. > ># Defaults might be overwritten by a session specific environment. > > > > > ># See the Table API & SQL documentation for details about supported > properties. > > > > > > >#============================================================================== > ># Tables > > >#============================================================================== > > > ># Define tables here such as sources, sinks, views, or temporal tables. > > > >tables: [] # empty list > ># A typical table source definition looks like: > ># - name: ... > ># type: source-table > ># connector: ... > ># format: ... > ># schema: ... > > > ># A typical view definition looks like: > ># - name: ... > ># type: view > ># query: "SELECT ..." > > > ># A typical temporal table definition looks like: > ># - name: ... > ># type: temporal-table > ># history-table: ... > ># time-attribute: ... > ># primary-key: ... > > > > > > >#============================================================================== > ># User-defined functions > > >#============================================================================== > > > ># Define scalar, aggregate, or table functions here. > > > >functions: [] # empty list > ># A typical function definition looks like: > ># - name: ... > ># from: class > ># class: ... > ># constructor: ... > > > > > > >#============================================================================== > ># Catalogs > > >#============================================================================== > > > ># Define catalogs here. > > > >catalogs: > > - name: myhive > > type: hive > > hive-conf-dir: /Users/enzow/code/flink-sql-demo/flink-1.10.0/conf > > hive-version: 2.3.2 > > > > > > >#============================================================================== > ># Modules > > >#============================================================================== > > > ># Define modules here. > > > >#modules: # note the following modules will be of the order they are > specified > ># - name: core > ># type: core > > > > >#============================================================================== > ># Execution properties > > >#============================================================================== > > > ># Properties that change the fundamental execution behavior of a table > program. > > > >execution: > > # select the implementation responsible for planning table programs > > # possible values are 'blink' (used by default) or 'old' > > planner: blink > > # 'batch' or 'streaming' execution > > type: streaming > > # allow 'event-time' or only 'processing-time' in sources > > time-characteristic: event-time > > # interval in ms for emitting periodic watermarks > > periodic-watermarks-interval: 200 > > # 'changelog' or 'table' presentation of results > > result-mode: table > > # maximum number of maintained rows in 'table' presentation of results > > max-table-result-rows: 1000000 > > # parallelism of the program > > parallelism: 1 > > # maximum parallelism > > max-parallelism: 128 > > # minimum idle state retention in ms > > min-idle-state-retention: 0 > > # maximum idle state retention in ms > > max-idle-state-retention: 0 > > # current catalog ('default_catalog' by default) > > current-catalog: default_catalog > > # current database of the current catalog (default database of the > >catalog by default) > > current-database: default_database > > # controls how table programs are restarted in case of a failures > > restart-strategy: > > # strategy type > > # possible values are "fixed-delay", "failure-rate", "none", or > >"fallback" (default) > > type: fallback > > > > >#============================================================================== > ># Configuration options > > >#============================================================================== > > > ># Configuration options for adjusting and tuning table programs. > > > ># A full list of options and their default values can be found > ># on the dedicated "Configuration" web page. > > > ># A configuration can look like: > ># configuration: > ># table.exec.spill-compression.enabled: true > ># table.exec.spill-compression.block-size: 128kb > ># table.optimizer.join-reorder-enabled: true > > > > >#============================================================================== > ># Deployment properties > > >#============================================================================== > > > ># Properties that describe the cluster to which table programs are > submitted to. > > > >deployment: > > # general cluster communication timeout in ms > > response-timeout: 5000 > > # (optional) address from cluster to gateway > > gateway-address: "" > > # (optional) port from cluster to gateway > > gateway-port: 0 > > > > > > > > > > > >Cheers, > >Enzo > > > > > >On Tue, 26 May 2020 at 17:15, wldd <[hidden email]> wrote: > > > >> Hi,Enzo wang > >> 图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> -- > >> > >> Best, > >> wldd > >> > >> > >> > >> > >> 在 2020-05-26 17:01:32,"Enzo wang" <[hidden email]> 写道: > >> > >> Hi Wldd, > >> > >> > >> 谢谢回复。 > >> > >> > >> 1. datanode 是可用的 > >> > >> > >> ❯ docker-compose exec namenode hadoop fs -ls /tmp > >> Found 1 items > >> drwx-wx-wx - root supergroup 0 2020-05-26 05:40 /tmp/hive > >> > >> > >> namenode 的webui 也可以看到: > >> > >> > >> > >> > >> 2. 设置set execution.type=batch; 以后,执行报错,错误如下 > >> Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException): > >> File > >> > /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 > >> could only be replicated to 0 nodes instead of minReplication (=1). > There > >> are 1 datanode(s) running and1 node(s) are excluded inthis operation. > >> > >> > >> 完整错误见: > >> https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673 > >> > >> > >> > >> On Tue, 26 May 2020 at 16:52, wldd <[hidden email]> wrote: > >> > >> 问题1: > >> > >> org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs > >> 命令看看那个datanode能不能访问 > >> > >> > >> 问题2: > >> 写hive,需要用batch模式,set execution.type=batch; > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-05-26 16:42:12,"Enzo wang" <[hidden email]> 写道: > >> > >> Hi Flink group, > >> > >> > >> 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。 > >> 参考的网址: > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html > >> > >> > >> 版本、表结构信息见这里: > https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b > >> > >> > >> 问题1:Flink SQL 读Hive 表pokes 失败 > >> > >> > >> Flink SQL> select * from pokes; > >> 2020-05-26 16:12:11,439 INFO org.apache.hadoop.mapred.FileInputFormat > >> - Total input paths to process : 4 > >> [ERROR] Could not execute SQL statement. Reason: > >> org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: > >> BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 > >> file=/user/hive/warehouse/pokes/kv1.txt > >> > >> > >> > >> > >> > >> > >> > >> 问题2:Flink SQL 写Hive 表pokes 失败 > >> > >> > >> Flink SQL> insert into pokes select 12,'tom'; > >> [INFO] Submitting SQL update statement to the cluster... > >> [ERROR] Could not execute SQL statement. Reason: > >> org.apache.flink.table.api.TableException: Stream Tables can only be > >> emitted by AppendStreamTableSink, RetractStreamTableSink, or > >> UpsertStreamTableSink. > >> > >> > >> > >> > >> > >> > >> > >> Cheers, > >> Enzo > |
Free forum by Nabble | Edit this page |