flink 读写hive问题

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 读写hive问题

Enzo wang
Hi Flink group,

今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。


问题1:Flink SQL 读Hive 表pokes 失败

Flink SQL> select * from pokes;
2020-05-26 16:12:11,439 INFO  org.apache.hadoop.mapred.FileInputFormat                      - Total input paths to process : 4
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 file=/user/hive/warehouse/pokes/kv1.txt



问题2:Flink SQL 写Hive 表pokes 失败

Flink SQL> insert into pokes select 12,'tom';
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.



Cheers,
Enzo
Reply | Threaded
Open this post in threaded view
|

Re:flink 读写hive问题

wldd
问题1:

org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs 命令看看那个datanode能不能访问


问题2:
写hive,需要用batch模式,set execution.type=batch;







在 2020-05-26 16:42:12,"Enzo wang" <[hidden email]> 写道:

Hi Flink group,


今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。
参考的网址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html


版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b


问题1:Flink SQL 读Hive 表pokes 失败


Flink SQL> select * from pokes;
2020-05-26 16:12:11,439 INFO  org.apache.hadoop.mapred.FileInputFormat                      - Total input paths to process : 4
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 file=/user/hive/warehouse/pokes/kv1.txt







问题2:Flink SQL 写Hive 表pokes 失败


Flink SQL> insert into pokes select 12,'tom';
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.







Cheers,
Enzo
Reply | Threaded
Open this post in threaded view
|

Re: flink 读写hive问题

Enzo wang
Hi Wldd,

谢谢回复。

1.  datanode 是可用的

❯ docker-compose exec namenode hadoop fs -ls /tmp
Found 1 items
drwx-wx-wx   - root supergroup          0 2020-05-26 05:40 /tmp/hive

namenode 的webui 也可以看到:


2.  设置set execution.type=batch; 以后,执行报错,错误如下
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and 1 node(s) are excluded in this operation.

完整错误见:

On Tue, 26 May 2020 at 16:52, wldd <[hidden email]> wrote:
问题1:

org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs 命令看看那个datanode能不能访问


问题2:
写hive,需要用batch模式,set execution.type=batch;







在 2020-05-26 16:42:12,"Enzo wang" <[hidden email]> 写道:

Hi Flink group,


今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。
参考的网址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html


版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b


问题1:Flink SQL 读Hive 表pokes 失败


Flink SQL> select * from pokes;
2020-05-26 16:12:11,439 INFO  org.apache.hadoop.mapred.FileInputFormat                      - Total input paths to process : 4
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 file=/user/hive/warehouse/pokes/kv1.txt







问题2:Flink SQL 写Hive 表pokes 失败


Flink SQL> insert into pokes select 12,'tom';
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.







Cheers,
Enzo
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink 读写hive问题

wldd
Hi,Enzo wang
图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么













--

Best,
wldd




在 2020-05-26 17:01:32,"Enzo wang" <[hidden email]> 写道:

Hi Wldd,


谢谢回复。


1.  datanode 是可用的


❯ docker-compose exec namenode hadoop fs -ls /tmp
Found 1 items
drwx-wx-wx   - root supergroup          0 2020-05-26 05:40 /tmp/hive


namenode 的webui 也可以看到:




2.  设置set execution.type=batch; 以后,执行报错,错误如下
Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0 could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and1 node(s) are excluded inthis operation.


完整错误见:
https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673



On Tue, 26 May 2020 at 16:52, wldd <[hidden email]> wrote:

问题1:

org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs 命令看看那个datanode能不能访问


问题2:
写hive,需要用batch模式,set execution.type=batch;







在 2020-05-26 16:42:12,"Enzo wang" <[hidden email]> 写道:

Hi Flink group,


今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。
参考的网址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html


版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b


问题1:Flink SQL 读Hive 表pokes 失败


Flink SQL> select * from pokes;
2020-05-26 16:12:11,439 INFO  org.apache.hadoop.mapred.FileInputFormat                      - Total input paths to process : 4
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001 file=/user/hive/warehouse/pokes/kv1.txt







问题2:Flink SQL 写Hive 表pokes 失败


Flink SQL> insert into pokes select 12,'tom';
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.







Cheers,
Enzo
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink 读写hive问题

Enzo wang
Hi Wldd,

Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。

还需要什么信息我再提供。



========  flink insert into hive error ========

org.apache.flink.table.api.TableException: Exception in close
        at org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131)
        at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
File /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
could only be replicated to 0 nodes instead of minReplication (=1).
There are 1 datanode(s) running and 1 node(s) are excluded in this
operation.
        at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)

        at org.apache.hadoop.ipc.Client.call(Client.java:1476)
        at org.apache.hadoop.ipc.Client.call(Client.java:1413)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
        at com.sun.proxy.$Proxy22.addBlock(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:567)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy23.addBlock(Unknown Source)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554)

======== Flink 1.10.0 的lib目录 ========

mysql-connector-java-5.1.48.jar
slf4j-log4j12-1.7.15.jar
log4j-1.2.17.jar
flink-table_2.11-1.10.0.jar
flink-table-blink_2.11-1.10.0.jar
flink-dist_2.11-1.10.0.jar
flink-jdbc_2.11-1.10.0.jar
flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
flink-sql-connector-kafka_2.11-1.10.0.jar
flink-json-1.10.0.jar
flink-connector-hive_2.11-1.10.0.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
hive-exec-2.3.7.jar
flink-csv-1.10.1.jar


======== Hive table "pokes" ========

❯ docker-compose exec hive-server bash
root@53082ed70ecd:/opt# /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Connecting to jdbc:hive2://localhost:10000
Connected to: Apache Hive (version 2.3.2)
Driver: Hive JDBC (version 2.3.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 2.3.2 by Apache Hive
0: jdbc:hive2://localhost:10000> describe formatted pokes;
+-------------------------------+----------------------------------------------------+-----------------------+
|           col_name            |                     data_type
              |        comment        |
+-------------------------------+----------------------------------------------------+-----------------------+
| # col_name                    | data_type
              | comment               |
|                               | NULL
              | NULL                  |
| foo                           | int
              |                       |
| bar                           | string
              |                       |
|                               | NULL
              | NULL                  |
| # Detailed Table Information  | NULL
              | NULL                  |
| Database:                     | default
              | NULL                  |
| Owner:                        | root
              | NULL                  |
| CreateTime:                   | Tue May 26 05:42:30 UTC 2020
              | NULL                  |
| LastAccessTime:               | UNKNOWN
              | NULL                  |
| Retention:                    | 0
              | NULL                  |
| Location:                     |
hdfs://namenode:8020/user/hive/warehouse/pokes     | NULL
    |
| Table Type:                   | MANAGED_TABLE
              | NULL                  |
| Table Parameters:             | NULL
              | NULL                  |
|                               | numFiles
              | 4                     |
|                               | numRows
              | 0                     |
|                               | rawDataSize
              | 0                     |
|                               | totalSize
              | 5839                  |
|                               | transient_lastDdlTime
              | 1590480090            |
|                               | NULL
              | NULL                  |
| # Storage Information         | NULL
              | NULL                  |
| SerDe Library:                |
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL
    |
| InputFormat:                  |
org.apache.hadoop.mapred.TextInputFormat           | NULL
    |
| OutputFormat:                 |
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL
            |
| Compressed:                   | No
              | NULL                  |
| Num Buckets:                  | -1
              | NULL                  |
| Bucket Columns:               | []
              | NULL                  |
| Sort Columns:                 | []
              | NULL                  |
| Storage Desc Params:          | NULL
              | NULL                  |
|                               | serialization.format
              | 1                     |
+-------------------------------+----------------------------------------------------+-----------------------+
30 rows selected (0.328 seconds)
0: jdbc:hive2://localhost:10000>

0: jdbc:hive2://localhost:10000> select * from pokes limit 10;
+------------+------------+
| pokes.foo  | pokes.bar  |
+------------+------------+
| 25         | Tommy      |
| 26         | Tommy      |
| 27         | Tommy      |
| 238        | val_238    |
| 86         | val_86     |
| 311        | val_311    |
| 27         | val_27     |
| 165        | val_165    |
| 409        | val_409    |
| 255        | val_255    |
+------------+------------+
10 rows selected (0.622 seconds)
0: jdbc:hive2://localhost:10000>


======== Hive table "pokes" in Flink ========

Flink SQL> describe pokes;
root
 |-- foo: INT
 |-- bar: STRING


======== hadoop/hive 环境 ========

version: "3"

services:
  namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
    volumes:
      - namenode:/hadoop/dfs/name
    environment:
      - CLUSTER_NAME=test
    env_file:
      - ./hadoop-hive.env
    ports:
      - "50070:50070"
      - "8020:8020"
  datanode:
    image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
    volumes:
      - datanode:/hadoop/dfs/data
    env_file:
      - ./hadoop-hive.env
    environment:
      SERVICE_PRECONDITION: "namenode:50070"
    ports:
      - "50075:50075"
  hive-server:
    image: bde2020/hive:2.3.2-postgresql-metastore
    env_file:
      - ./hadoop-hive.env
    environment:
      HIVE_CORE_CONF_javax_jdo_option_ConnectionURL:
"jdbc:postgresql://hive-metastore/metastore"
      SERVICE_PRECONDITION: "hive-metastore:9083"
    ports:
      - "10000:10000"
  hive-metastore:
    image: bde2020/hive:2.3.2-postgresql-metastore
    env_file:
      - ./hadoop-hive.env
    command: /opt/hive/bin/hive --service metastore
    environment:
      SERVICE_PRECONDITION: "namenode:50070 datanode:50075
hive-metastore-postgresql:5432"
    ports:
      - "9083:9083"
  hive-metastore-postgresql:
    image: bde2020/hive-metastore-postgresql:2.3.0
    ports:
      - "5432:5432"
  presto-coordinator:
    image: shawnzhu/prestodb:0.181
    ports:
      - "8080:8080"

volumes:
  namenode:
  datanode:


======== hive-site.xml ========

<configuration>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://localhost:9083</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:postgresql://localhost/metastore?createDatabaseIfNotExist=true</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>org.postgresql.Driver</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>hive</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>hive</value>
    </property>
   <property>
       <name>hive.metastore.schema.verification</name>
       <value>true</value>
   </property>
</configuration>


======== sql-client-defaults.yaml ========

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################


# This file defines the default environment for Flink's SQL Client.
# Defaults might be overwritten by a session specific environment.


# See the Table API & SQL documentation for details about supported properties.


#==============================================================================
# Tables
#==============================================================================

# Define tables here such as sources, sinks, views, or temporal tables.

tables: [] # empty list
# A typical table source definition looks like:
# - name: ...
#   type: source-table
#   connector: ...
#   format: ...
#   schema: ...

# A typical view definition looks like:
# - name: ...
#   type: view
#   query: "SELECT ..."

# A typical temporal table definition looks like:
# - name: ...
#   type: temporal-table
#   history-table: ...
#   time-attribute: ...
#   primary-key: ...


#==============================================================================
# User-defined functions
#==============================================================================

# Define scalar, aggregate, or table functions here.

functions: [] # empty list
# A typical function definition looks like:
# - name: ...
#   from: class
#   class: ...
#   constructor: ...


#==============================================================================
# Catalogs
#==============================================================================

# Define catalogs here.

catalogs:
  - name: myhive
    type: hive
    hive-conf-dir: /Users/enzow/code/flink-sql-demo/flink-1.10.0/conf
    hive-version: 2.3.2


#==============================================================================
# Modules
#==============================================================================

# Define modules here.

#modules: # note the following modules will be of the order they are specified
#  - name: core
#    type: core

#==============================================================================
# Execution properties
#==============================================================================

# Properties that change the fundamental execution behavior of a table program.

execution:
  # select the implementation responsible for planning table programs
  # possible values are 'blink' (used by default) or 'old'
  planner: blink
  # 'batch' or 'streaming' execution
  type: streaming
  # allow 'event-time' or only 'processing-time' in sources
  time-characteristic: event-time
  # interval in ms for emitting periodic watermarks
  periodic-watermarks-interval: 200
  # 'changelog' or 'table' presentation of results
  result-mode: table
  # maximum number of maintained rows in 'table' presentation of results
  max-table-result-rows: 1000000
  # parallelism of the program
  parallelism: 1
  # maximum parallelism
  max-parallelism: 128
  # minimum idle state retention in ms
  min-idle-state-retention: 0
  # maximum idle state retention in ms
  max-idle-state-retention: 0
  # current catalog ('default_catalog' by default)
  current-catalog: default_catalog
  # current database of the current catalog (default database of the
catalog by default)
  current-database: default_database
  # controls how table programs are restarted in case of a failures
  restart-strategy:
    # strategy type
    # possible values are "fixed-delay", "failure-rate", "none", or
"fallback" (default)
    type: fallback

#==============================================================================
# Configuration options
#==============================================================================

# Configuration options for adjusting and tuning table programs.

# A full list of options and their default values can be found
# on the dedicated "Configuration" web page.

# A configuration can look like:
# configuration:
#   table.exec.spill-compression.enabled: true
#   table.exec.spill-compression.block-size: 128kb
#   table.optimizer.join-reorder-enabled: true

#==============================================================================
# Deployment properties
#==============================================================================

# Properties that describe the cluster to which table programs are submitted to.

deployment:
  # general cluster communication timeout in ms
  response-timeout: 5000
  # (optional) address from cluster to gateway
  gateway-address: ""
  # (optional) port from cluster to gateway
  gateway-port: 0





Cheers,
Enzo


On Tue, 26 May 2020 at 17:15, wldd <[hidden email]> wrote:

> Hi,Enzo wang
> 图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best,
> wldd
>
>
>
>
> 在 2020-05-26 17:01:32,"Enzo wang" <[hidden email]> 写道:
>
> Hi Wldd,
>
>
> 谢谢回复。
>
>
> 1.  datanode 是可用的
>
>
> ❯ docker-compose exec namenode hadoop fs -ls /tmp
> Found 1 items
> drwx-wx-wx   - root supergroup          0 2020-05-26 05:40 /tmp/hive
>
>
> namenode 的webui 也可以看到:
>
>
>
>
> 2.  设置set execution.type=batch; 以后,执行报错,错误如下
> Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> File
> /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
> could only be replicated to 0 nodes instead of minReplication (=1). There
> are 1 datanode(s) running and1 node(s) are excluded inthis operation.
>
>
> 完整错误见:
> https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673
>
>
>
> On Tue, 26 May 2020 at 16:52, wldd <[hidden email]> wrote:
>
> 问题1:
>
> org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs
> 命令看看那个datanode能不能访问
>
>
> 问题2:
> 写hive,需要用batch模式,set execution.type=batch;
>
>
>
>
>
>
>
> 在 2020-05-26 16:42:12,"Enzo wang" <[hidden email]> 写道:
>
> Hi Flink group,
>
>
> 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。
> 参考的网址:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html
>
>
> 版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b
>
>
> 问题1:Flink SQL 读Hive 表pokes 失败
>
>
> Flink SQL> select * from pokes;
> 2020-05-26 16:12:11,439 INFO  org.apache.hadoop.mapred.FileInputFormat
>                   - Total input paths to process : 4
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block:
> BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001
> file=/user/hive/warehouse/pokes/kv1.txt
>
>
>
>
>
>
>
> 问题2:Flink SQL 写Hive 表pokes 失败
>
>
> Flink SQL> insert into pokes select 12,'tom';
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Stream Tables can only be
> emitted by AppendStreamTableSink, RetractStreamTableSink, or
> UpsertStreamTableSink.
>
>
>
>
>
>
>
> Cheers,
> Enzo
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: flink 读写hive问题

wldd
hive写数据测了么,按照你提供的异常信息,显示的是hdfs的问题










--

Best,
wldd





在 2020-05-26 17:49:56,"Enzo wang" <[hidden email]> 写道:

>Hi Wldd,
>
>Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。
>
>还需要什么信息我再提供。
>
>
>
>========  flink insert into hive error ========
>
>org.apache.flink.table.api.TableException: Exception in close
> at org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131)
> at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97)
> at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.base/java.lang.Thread.run(Thread.java:830)
>Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>File /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
>could only be replicated to 0 nodes instead of minReplication (=1).
>There are 1 datanode(s) running and 1 node(s) are excluded in this
>operation.
> at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045)
> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493)
> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
> at org.apache.hadoop.ipc.Client.call(Client.java:1413)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy22.addBlock(Unknown Source)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>Method)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:567)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy23.addBlock(Unknown Source)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554)
>
>======== Flink 1.10.0 的lib目录 ========
>
>mysql-connector-java-5.1.48.jar
>slf4j-log4j12-1.7.15.jar
>log4j-1.2.17.jar
>flink-table_2.11-1.10.0.jar
>flink-table-blink_2.11-1.10.0.jar
>flink-dist_2.11-1.10.0.jar
>flink-jdbc_2.11-1.10.0.jar
>flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
>flink-sql-connector-kafka_2.11-1.10.0.jar
>flink-json-1.10.0.jar
>flink-connector-hive_2.11-1.10.0.jar
>flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
>hive-exec-2.3.7.jar
>flink-csv-1.10.1.jar
>
>
>======== Hive table "pokes" ========
>
>❯ docker-compose exec hive-server bash
>root@53082ed70ecd:/opt# /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000
>SLF4J: Class path contains multiple SLF4J bindings.
>SLF4J: Found binding in
>[jar:file:/opt/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>SLF4J: Found binding in
>[jar:file:/opt/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
>SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
>Connecting to jdbc:hive2://localhost:10000
>Connected to: Apache Hive (version 2.3.2)
>Driver: Hive JDBC (version 2.3.2)
>Transaction isolation: TRANSACTION_REPEATABLE_READ
>Beeline version 2.3.2 by Apache Hive
>0: jdbc:hive2://localhost:10000> describe formatted pokes;
>+-------------------------------+----------------------------------------------------+-----------------------+
>|           col_name            |                     data_type
>              |        comment        |
>+-------------------------------+----------------------------------------------------+-----------------------+
>| # col_name                    | data_type
>              | comment               |
>|                               | NULL
>              | NULL                  |
>| foo                           | int
>              |                       |
>| bar                           | string
>              |                       |
>|                               | NULL
>              | NULL                  |
>| # Detailed Table Information  | NULL
>              | NULL                  |
>| Database:                     | default
>              | NULL                  |
>| Owner:                        | root
>              | NULL                  |
>| CreateTime:                   | Tue May 26 05:42:30 UTC 2020
>              | NULL                  |
>| LastAccessTime:               | UNKNOWN
>              | NULL                  |
>| Retention:                    | 0
>              | NULL                  |
>| Location:                     |
>hdfs://namenode:8020/user/hive/warehouse/pokes     | NULL
>    |
>| Table Type:                   | MANAGED_TABLE
>              | NULL                  |
>| Table Parameters:             | NULL
>              | NULL                  |
>|                               | numFiles
>              | 4                     |
>|                               | numRows
>              | 0                     |
>|                               | rawDataSize
>              | 0                     |
>|                               | totalSize
>              | 5839                  |
>|                               | transient_lastDdlTime
>              | 1590480090            |
>|                               | NULL
>              | NULL                  |
>| # Storage Information         | NULL
>              | NULL                  |
>| SerDe Library:                |
>org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL
>    |
>| InputFormat:                  |
>org.apache.hadoop.mapred.TextInputFormat           | NULL
>    |
>| OutputFormat:                 |
>org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL
>            |
>| Compressed:                   | No
>              | NULL                  |
>| Num Buckets:                  | -1
>              | NULL                  |
>| Bucket Columns:               | []
>              | NULL                  |
>| Sort Columns:                 | []
>              | NULL                  |
>| Storage Desc Params:          | NULL
>              | NULL                  |
>|                               | serialization.format
>              | 1                     |
>+-------------------------------+----------------------------------------------------+-----------------------+
>30 rows selected (0.328 seconds)
>0: jdbc:hive2://localhost:10000>
>
>0: jdbc:hive2://localhost:10000> select * from pokes limit 10;
>+------------+------------+
>| pokes.foo  | pokes.bar  |
>+------------+------------+
>| 25         | Tommy      |
>| 26         | Tommy      |
>| 27         | Tommy      |
>| 238        | val_238    |
>| 86         | val_86     |
>| 311        | val_311    |
>| 27         | val_27     |
>| 165        | val_165    |
>| 409        | val_409    |
>| 255        | val_255    |
>+------------+------------+
>10 rows selected (0.622 seconds)
>0: jdbc:hive2://localhost:10000>
>
>
>======== Hive table "pokes" in Flink ========
>
>Flink SQL> describe pokes;
>root
> |-- foo: INT
> |-- bar: STRING
>
>
>======== hadoop/hive 环境 ========
>
>version: "3"
>
>services:
>  namenode:
>    image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
>    volumes:
>      - namenode:/hadoop/dfs/name
>    environment:
>      - CLUSTER_NAME=test
>    env_file:
>      - ./hadoop-hive.env
>    ports:
>      - "50070:50070"
>      - "8020:8020"
>  datanode:
>    image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
>    volumes:
>      - datanode:/hadoop/dfs/data
>    env_file:
>      - ./hadoop-hive.env
>    environment:
>      SERVICE_PRECONDITION: "namenode:50070"
>    ports:
>      - "50075:50075"
>  hive-server:
>    image: bde2020/hive:2.3.2-postgresql-metastore
>    env_file:
>      - ./hadoop-hive.env
>    environment:
>      HIVE_CORE_CONF_javax_jdo_option_ConnectionURL:
>"jdbc:postgresql://hive-metastore/metastore"
>      SERVICE_PRECONDITION: "hive-metastore:9083"
>    ports:
>      - "10000:10000"
>  hive-metastore:
>    image: bde2020/hive:2.3.2-postgresql-metastore
>    env_file:
>      - ./hadoop-hive.env
>    command: /opt/hive/bin/hive --service metastore
>    environment:
>      SERVICE_PRECONDITION: "namenode:50070 datanode:50075
>hive-metastore-postgresql:5432"
>    ports:
>      - "9083:9083"
>  hive-metastore-postgresql:
>    image: bde2020/hive-metastore-postgresql:2.3.0
>    ports:
>      - "5432:5432"
>  presto-coordinator:
>    image: shawnzhu/prestodb:0.181
>    ports:
>      - "8080:8080"
>
>volumes:
>  namenode:
>  datanode:
>
>
>======== hive-site.xml ========
>
><configuration>
>    <property>
>        <name>hive.metastore.uris</name>
>        <value>thrift://localhost:9083</value>
>    </property>
>    <property>
>        <name>javax.jdo.option.ConnectionURL</name>
>        <value>jdbc:postgresql://localhost/metastore?createDatabaseIfNotExist=true</value>
>    </property>
>    <property>
>        <name>javax.jdo.option.ConnectionDriverName</name>
>        <value>org.postgresql.Driver</value>
>    </property>
>    <property>
>        <name>javax.jdo.option.ConnectionPassword</name>
>        <value>hive</value>
>    </property>
>    <property>
>        <name>javax.jdo.option.ConnectionUserName</name>
>        <value>hive</value>
>    </property>
>   <property>
>       <name>hive.metastore.schema.verification</name>
>       <value>true</value>
>   </property>
></configuration>
>
>
>======== sql-client-defaults.yaml ========
>
>################################################################################
>#  Licensed to the Apache Software Foundation (ASF) under one
>#  or more contributor license agreements.  See the NOTICE file
>#  distributed with this work for additional information
>#  regarding copyright ownership.  The ASF licenses this file
>#  to you under the Apache License, Version 2.0 (the
>#  "License"); you may not use this file except in compliance
>#  with the License.  You may obtain a copy of the License at
>#
>#      http://www.apache.org/licenses/LICENSE-2.0
>#
>#  Unless required by applicable law or agreed to in writing, software
>#  distributed under the License is distributed on an "AS IS" BASIS,
>#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>#  See the License for the specific language governing permissions and
># limitations under the License.
>################################################################################
>
>
># This file defines the default environment for Flink's SQL Client.
># Defaults might be overwritten by a session specific environment.
>
>
># See the Table API & SQL documentation for details about supported properties.
>
>
>#==============================================================================
># Tables
>#==============================================================================
>
># Define tables here such as sources, sinks, views, or temporal tables.
>
>tables: [] # empty list
># A typical table source definition looks like:
># - name: ...
>#   type: source-table
>#   connector: ...
>#   format: ...
>#   schema: ...
>
># A typical view definition looks like:
># - name: ...
>#   type: view
>#   query: "SELECT ..."
>
># A typical temporal table definition looks like:
># - name: ...
>#   type: temporal-table
>#   history-table: ...
>#   time-attribute: ...
>#   primary-key: ...
>
>
>#==============================================================================
># User-defined functions
>#==============================================================================
>
># Define scalar, aggregate, or table functions here.
>
>functions: [] # empty list
># A typical function definition looks like:
># - name: ...
>#   from: class
>#   class: ...
>#   constructor: ...
>
>
>#==============================================================================
># Catalogs
>#==============================================================================
>
># Define catalogs here.
>
>catalogs:
>  - name: myhive
>    type: hive
>    hive-conf-dir: /Users/enzow/code/flink-sql-demo/flink-1.10.0/conf
>    hive-version: 2.3.2
>
>
>#==============================================================================
># Modules
>#==============================================================================
>
># Define modules here.
>
>#modules: # note the following modules will be of the order they are specified
>#  - name: core
>#    type: core
>
>#==============================================================================
># Execution properties
>#==============================================================================
>
># Properties that change the fundamental execution behavior of a table program.
>
>execution:
>  # select the implementation responsible for planning table programs
>  # possible values are 'blink' (used by default) or 'old'
>  planner: blink
>  # 'batch' or 'streaming' execution
>  type: streaming
>  # allow 'event-time' or only 'processing-time' in sources
>  time-characteristic: event-time
>  # interval in ms for emitting periodic watermarks
>  periodic-watermarks-interval: 200
>  # 'changelog' or 'table' presentation of results
>  result-mode: table
>  # maximum number of maintained rows in 'table' presentation of results
>  max-table-result-rows: 1000000
>  # parallelism of the program
>  parallelism: 1
>  # maximum parallelism
>  max-parallelism: 128
>  # minimum idle state retention in ms
>  min-idle-state-retention: 0
>  # maximum idle state retention in ms
>  max-idle-state-retention: 0
>  # current catalog ('default_catalog' by default)
>  current-catalog: default_catalog
>  # current database of the current catalog (default database of the
>catalog by default)
>  current-database: default_database
>  # controls how table programs are restarted in case of a failures
>  restart-strategy:
>    # strategy type
>    # possible values are "fixed-delay", "failure-rate", "none", or
>"fallback" (default)
>    type: fallback
>
>#==============================================================================
># Configuration options
>#==============================================================================
>
># Configuration options for adjusting and tuning table programs.
>
># A full list of options and their default values can be found
># on the dedicated "Configuration" web page.
>
># A configuration can look like:
># configuration:
>#   table.exec.spill-compression.enabled: true
>#   table.exec.spill-compression.block-size: 128kb
>#   table.optimizer.join-reorder-enabled: true
>
>#==============================================================================
># Deployment properties
>#==============================================================================
>
># Properties that describe the cluster to which table programs are submitted to.
>
>deployment:
>  # general cluster communication timeout in ms
>  response-timeout: 5000
>  # (optional) address from cluster to gateway
>  gateway-address: ""
>  # (optional) port from cluster to gateway
>  gateway-port: 0
>
>
>
>
>
>Cheers,
>Enzo
>
>
>On Tue, 26 May 2020 at 17:15, wldd <[hidden email]> wrote:
>
>> Hi,Enzo wang
>> 图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best,
>> wldd
>>
>>
>>
>>
>> 在 2020-05-26 17:01:32,"Enzo wang" <[hidden email]> 写道:
>>
>> Hi Wldd,
>>
>>
>> 谢谢回复。
>>
>>
>> 1.  datanode 是可用的
>>
>>
>> ❯ docker-compose exec namenode hadoop fs -ls /tmp
>> Found 1 items
>> drwx-wx-wx   - root supergroup          0 2020-05-26 05:40 /tmp/hive
>>
>>
>> namenode 的webui 也可以看到:
>>
>>
>>
>>
>> 2.  设置set execution.type=batch; 以后,执行报错,错误如下
>> Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>> File
>> /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
>> could only be replicated to 0 nodes instead of minReplication (=1). There
>> are 1 datanode(s) running and1 node(s) are excluded inthis operation.
>>
>>
>> 完整错误见:
>> https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673
>>
>>
>>
>> On Tue, 26 May 2020 at 16:52, wldd <[hidden email]> wrote:
>>
>> 问题1:
>>
>> org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs
>> 命令看看那个datanode能不能访问
>>
>>
>> 问题2:
>> 写hive,需要用batch模式,set execution.type=batch;
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-05-26 16:42:12,"Enzo wang" <[hidden email]> 写道:
>>
>> Hi Flink group,
>>
>>
>> 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。
>> 参考的网址:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html
>>
>>
>> 版本、表结构信息见这里: https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b
>>
>>
>> 问题1:Flink SQL 读Hive 表pokes 失败
>>
>>
>> Flink SQL> select * from pokes;
>> 2020-05-26 16:12:11,439 INFO  org.apache.hadoop.mapred.FileInputFormat
>>                   - Total input paths to process : 4
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block:
>> BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001
>> file=/user/hive/warehouse/pokes/kv1.txt
>>
>>
>>
>>
>>
>>
>>
>> 问题2:Flink SQL 写Hive 表pokes 失败
>>
>>
>> Flink SQL> insert into pokes select 12,'tom';
>> [INFO] Submitting SQL update statement to the cluster...
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.table.api.TableException: Stream Tables can only be
>> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> UpsertStreamTableSink.
>>
>>
>>
>>
>>
>>
>>
>> Cheers,
>> Enzo
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: flink 读写hive问题

Enzo wang
Hi Wldd,

Hive 写测试了,没问题。

0: jdbc:hive2://localhost:10000> select count(*) from pokes;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the
future versions. Consider using a different execution engine (i.e. spark,
tez) or using Hive 1.X releases.
+------+
| _c0  |
+------+
| 504  |
+------+
1 row selected (41.794 seconds)

0: jdbc:hive2://localhost:10000> INSERT INTO Pokes values( 200,'Kitty');
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the
future versions. Consider using a different execution engine (i.e. spark,
tez) or using Hive 1.X releases.
No rows affected (2.523 seconds)

0: jdbc:hive2://localhost:10000> select count(*) from pokes;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the
future versions. Consider using a different execution engine (i.e. spark,
tez) or using Hive 1.X releases.
+------+
| _c0  |
+------+
| 505  |
+------+
1 row selected (1.7 seconds)

Cheers,
Enzo

On Tue, 26 May 2020 at 18:14, wldd <[hidden email]> wrote:

> hive写数据测了么,按照你提供的异常信息,显示的是hdfs的问题
>
>
>
>
>
>
>
>
>
>
> --
>
> Best,
> wldd
>
>
>
>
>
> 在 2020-05-26 17:49:56,"Enzo wang" <[hidden email]> 写道:
> >Hi Wldd,
> >
> >Hive 是可以访问的,我把之前的2个gist内容贴在这个邮件里,你看一下,里面有hive查询数据的返回结果。
> >
> >还需要什么信息我再提供。
> >
> >
> >
> >========  flink insert into hive error ========
> >
> >org.apache.flink.table.api.TableException: Exception in close
> >       at
> org.apache.flink.table.filesystem.FileSystemOutputFormat.close(FileSystemOutputFormat.java:131)
> >       at
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.close(OutputFormatSinkFunction.java:97)
> >       at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> >       at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
> >       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635)
> >       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515)
> >       at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513)
> >       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)
> >       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >       at java.base/java.lang.Thread.run(Thread.java:830)
> >Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> >File
> /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
> >could only be replicated to 0 nodes instead of minReplication (=1).
> >There are 1 datanode(s) running and 1 node(s) are excluded in this
> >operation.
> >       at
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1628)
> >       at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3121)
> >       at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3045)
> >       at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
> >       at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493)
> >       at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> >       at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
> >       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> >       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
> >       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
> >       at java.security.AccessController.doPrivileged(Native Method)
> >       at javax.security.auth.Subject.doAs(Subject.java:422)
> >       at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
> >       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)
> >
> >       at org.apache.hadoop.ipc.Client.call(Client.java:1476)
> >       at org.apache.hadoop.ipc.Client.call(Client.java:1413)
> >       at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> >       at com.sun.proxy.$Proxy22.addBlock(Unknown Source)
> >       at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
> >       at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> >Method)
> >       at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >       at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >       at java.base/java.lang.reflect.Method.invoke(Method.java:567)
> >       at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> >       at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> >       at com.sun.proxy.$Proxy23.addBlock(Unknown Source)
> >       at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1588)
> >       at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1373)
> >       at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:554)
> >
> >======== Flink 1.10.0 的lib目录 ========
> >
> >mysql-connector-java-5.1.48.jar
> >slf4j-log4j12-1.7.15.jar
> >log4j-1.2.17.jar
> >flink-table_2.11-1.10.0.jar
> >flink-table-blink_2.11-1.10.0.jar
> >flink-dist_2.11-1.10.0.jar
> >flink-jdbc_2.11-1.10.0.jar
> >flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
> >flink-sql-connector-kafka_2.11-1.10.0.jar
> >flink-json-1.10.0.jar
> >flink-connector-hive_2.11-1.10.0.jar
> >flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
> >hive-exec-2.3.7.jar
> >flink-csv-1.10.1.jar
> >
> >
> >======== Hive table "pokes" ========
> >
> >❯ docker-compose exec hive-server bash
> >root@53082ed70ecd:/opt# /opt/hive/bin/beeline -u
> jdbc:hive2://localhost:10000
> >SLF4J: Class path contains multiple SLF4J bindings.
> >SLF4J: Found binding in
>
> >[jar:file:/opt/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >SLF4J: Found binding in
>
> >[jar:file:/opt/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> >SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> >Connecting to jdbc:hive2://localhost:10000
> >Connected to: Apache Hive (version 2.3.2)
> >Driver: Hive JDBC (version 2.3.2)
> >Transaction isolation: TRANSACTION_REPEATABLE_READ
> >Beeline version 2.3.2 by Apache Hive
> >0: jdbc:hive2://localhost:10000> describe formatted pokes;
>
> >+-------------------------------+----------------------------------------------------+-----------------------+
> >|           col_name            |                     data_type
> >              |        comment        |
>
> >+-------------------------------+----------------------------------------------------+-----------------------+
> >| # col_name                    | data_type
> >              | comment               |
> >|                               | NULL
> >              | NULL                  |
> >| foo                           | int
> >              |                       |
> >| bar                           | string
> >              |                       |
> >|                               | NULL
> >              | NULL                  |
> >| # Detailed Table Information  | NULL
> >              | NULL                  |
> >| Database:                     | default
> >              | NULL                  |
> >| Owner:                        | root
> >              | NULL                  |
> >| CreateTime:                   | Tue May 26 05:42:30 UTC 2020
> >              | NULL                  |
> >| LastAccessTime:               | UNKNOWN
> >              | NULL                  |
> >| Retention:                    | 0
> >              | NULL                  |
> >| Location:                     |
> >hdfs://namenode:8020/user/hive/warehouse/pokes     | NULL
> >    |
> >| Table Type:                   | MANAGED_TABLE
> >              | NULL                  |
> >| Table Parameters:             | NULL
> >              | NULL                  |
> >|                               | numFiles
> >              | 4                     |
> >|                               | numRows
> >              | 0                     |
> >|                               | rawDataSize
> >              | 0                     |
> >|                               | totalSize
> >              | 5839                  |
> >|                               | transient_lastDdlTime
> >              | 1590480090            |
> >|                               | NULL
> >              | NULL                  |
> >| # Storage Information         | NULL
> >              | NULL                  |
> >| SerDe Library:                |
> >org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL
> >    |
> >| InputFormat:                  |
> >org.apache.hadoop.mapred.TextInputFormat           | NULL
> >    |
> >| OutputFormat:                 |
> >org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | NULL
> >            |
> >| Compressed:                   | No
> >              | NULL                  |
> >| Num Buckets:                  | -1
> >              | NULL                  |
> >| Bucket Columns:               | []
> >              | NULL                  |
> >| Sort Columns:                 | []
> >              | NULL                  |
> >| Storage Desc Params:          | NULL
> >              | NULL                  |
> >|                               | serialization.format
> >              | 1                     |
>
> >+-------------------------------+----------------------------------------------------+-----------------------+
> >30 rows selected (0.328 seconds)
> >0: jdbc:hive2://localhost:10000>
> >
> >0: jdbc:hive2://localhost:10000> select * from pokes limit 10;
> >+------------+------------+
> >| pokes.foo  | pokes.bar  |
> >+------------+------------+
> >| 25         | Tommy      |
> >| 26         | Tommy      |
> >| 27         | Tommy      |
> >| 238        | val_238    |
> >| 86         | val_86     |
> >| 311        | val_311    |
> >| 27         | val_27     |
> >| 165        | val_165    |
> >| 409        | val_409    |
> >| 255        | val_255    |
> >+------------+------------+
> >10 rows selected (0.622 seconds)
> >0: jdbc:hive2://localhost:10000>
> >
> >
> >======== Hive table "pokes" in Flink ========
> >
> >Flink SQL> describe pokes;
> >root
> > |-- foo: INT
> > |-- bar: STRING
> >
> >
> >======== hadoop/hive 环境 ========
> >
> >version: "3"
> >
> >services:
> >  namenode:
> >    image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
> >    volumes:
> >      - namenode:/hadoop/dfs/name
> >    environment:
> >      - CLUSTER_NAME=test
> >    env_file:
> >      - ./hadoop-hive.env
> >    ports:
> >      - "50070:50070"
> >      - "8020:8020"
> >  datanode:
> >    image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
> >    volumes:
> >      - datanode:/hadoop/dfs/data
> >    env_file:
> >      - ./hadoop-hive.env
> >    environment:
> >      SERVICE_PRECONDITION: "namenode:50070"
> >    ports:
> >      - "50075:50075"
> >  hive-server:
> >    image: bde2020/hive:2.3.2-postgresql-metastore
> >    env_file:
> >      - ./hadoop-hive.env
> >    environment:
> >      HIVE_CORE_CONF_javax_jdo_option_ConnectionURL:
> >"jdbc:postgresql://hive-metastore/metastore"
> >      SERVICE_PRECONDITION: "hive-metastore:9083"
> >    ports:
> >      - "10000:10000"
> >  hive-metastore:
> >    image: bde2020/hive:2.3.2-postgresql-metastore
> >    env_file:
> >      - ./hadoop-hive.env
> >    command: /opt/hive/bin/hive --service metastore
> >    environment:
> >      SERVICE_PRECONDITION: "namenode:50070 datanode:50075
> >hive-metastore-postgresql:5432"
> >    ports:
> >      - "9083:9083"
> >  hive-metastore-postgresql:
> >    image: bde2020/hive-metastore-postgresql:2.3.0
> >    ports:
> >      - "5432:5432"
> >  presto-coordinator:
> >    image: shawnzhu/prestodb:0.181
> >    ports:
> >      - "8080:8080"
> >
> >volumes:
> >  namenode:
> >  datanode:
> >
> >
> >======== hive-site.xml ========
> >
> ><configuration>
> >    <property>
> >        <name>hive.metastore.uris</name>
> >        <value>thrift://localhost:9083</value>
> >    </property>
> >    <property>
> >        <name>javax.jdo.option.ConnectionURL</name>
> >
> <value>jdbc:postgresql://localhost/metastore?createDatabaseIfNotExist=true</value>
> >    </property>
> >    <property>
> >        <name>javax.jdo.option.ConnectionDriverName</name>
> >        <value>org.postgresql.Driver</value>
> >    </property>
> >    <property>
> >        <name>javax.jdo.option.ConnectionPassword</name>
> >        <value>hive</value>
> >    </property>
> >    <property>
> >        <name>javax.jdo.option.ConnectionUserName</name>
> >        <value>hive</value>
> >    </property>
> >   <property>
> >       <name>hive.metastore.schema.verification</name>
> >       <value>true</value>
> >   </property>
> ></configuration>
> >
> >
> >======== sql-client-defaults.yaml ========
> >
>
> >################################################################################
> >#  Licensed to the Apache Software Foundation (ASF) under one
> >#  or more contributor license agreements.  See the NOTICE file
> >#  distributed with this work for additional information
> >#  regarding copyright ownership.  The ASF licenses this file
> >#  to you under the Apache License, Version 2.0 (the
> >#  "License"); you may not use this file except in compliance
> >#  with the License.  You may obtain a copy of the License at
> >#
> >#      http://www.apache.org/licenses/LICENSE-2.0
> >#
> >#  Unless required by applicable law or agreed to in writing, software
> >#  distributed under the License is distributed on an "AS IS" BASIS,
> >#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> >#  See the License for the specific language governing permissions and
> ># limitations under the License.
>
> >################################################################################
> >
> >
> ># This file defines the default environment for Flink's SQL Client.
> ># Defaults might be overwritten by a session specific environment.
> >
> >
> ># See the Table API & SQL documentation for details about supported
> properties.
> >
> >
>
> >#==============================================================================
> ># Tables
>
> >#==============================================================================
> >
> ># Define tables here such as sources, sinks, views, or temporal tables.
> >
> >tables: [] # empty list
> ># A typical table source definition looks like:
> ># - name: ...
> >#   type: source-table
> >#   connector: ...
> >#   format: ...
> >#   schema: ...
> >
> ># A typical view definition looks like:
> ># - name: ...
> >#   type: view
> >#   query: "SELECT ..."
> >
> ># A typical temporal table definition looks like:
> ># - name: ...
> >#   type: temporal-table
> >#   history-table: ...
> >#   time-attribute: ...
> >#   primary-key: ...
> >
> >
>
> >#==============================================================================
> ># User-defined functions
>
> >#==============================================================================
> >
> ># Define scalar, aggregate, or table functions here.
> >
> >functions: [] # empty list
> ># A typical function definition looks like:
> ># - name: ...
> >#   from: class
> >#   class: ...
> >#   constructor: ...
> >
> >
>
> >#==============================================================================
> ># Catalogs
>
> >#==============================================================================
> >
> ># Define catalogs here.
> >
> >catalogs:
> >  - name: myhive
> >    type: hive
> >    hive-conf-dir: /Users/enzow/code/flink-sql-demo/flink-1.10.0/conf
> >    hive-version: 2.3.2
> >
> >
>
> >#==============================================================================
> ># Modules
>
> >#==============================================================================
> >
> ># Define modules here.
> >
> >#modules: # note the following modules will be of the order they are
> specified
> >#  - name: core
> >#    type: core
> >
>
> >#==============================================================================
> ># Execution properties
>
> >#==============================================================================
> >
> ># Properties that change the fundamental execution behavior of a table
> program.
> >
> >execution:
> >  # select the implementation responsible for planning table programs
> >  # possible values are 'blink' (used by default) or 'old'
> >  planner: blink
> >  # 'batch' or 'streaming' execution
> >  type: streaming
> >  # allow 'event-time' or only 'processing-time' in sources
> >  time-characteristic: event-time
> >  # interval in ms for emitting periodic watermarks
> >  periodic-watermarks-interval: 200
> >  # 'changelog' or 'table' presentation of results
> >  result-mode: table
> >  # maximum number of maintained rows in 'table' presentation of results
> >  max-table-result-rows: 1000000
> >  # parallelism of the program
> >  parallelism: 1
> >  # maximum parallelism
> >  max-parallelism: 128
> >  # minimum idle state retention in ms
> >  min-idle-state-retention: 0
> >  # maximum idle state retention in ms
> >  max-idle-state-retention: 0
> >  # current catalog ('default_catalog' by default)
> >  current-catalog: default_catalog
> >  # current database of the current catalog (default database of the
> >catalog by default)
> >  current-database: default_database
> >  # controls how table programs are restarted in case of a failures
> >  restart-strategy:
> >    # strategy type
> >    # possible values are "fixed-delay", "failure-rate", "none", or
> >"fallback" (default)
> >    type: fallback
> >
>
> >#==============================================================================
> ># Configuration options
>
> >#==============================================================================
> >
> ># Configuration options for adjusting and tuning table programs.
> >
> ># A full list of options and their default values can be found
> ># on the dedicated "Configuration" web page.
> >
> ># A configuration can look like:
> ># configuration:
> >#   table.exec.spill-compression.enabled: true
> >#   table.exec.spill-compression.block-size: 128kb
> >#   table.optimizer.join-reorder-enabled: true
> >
>
> >#==============================================================================
> ># Deployment properties
>
> >#==============================================================================
> >
> ># Properties that describe the cluster to which table programs are
> submitted to.
> >
> >deployment:
> >  # general cluster communication timeout in ms
> >  response-timeout: 5000
> >  # (optional) address from cluster to gateway
> >  gateway-address: ""
> >  # (optional) port from cluster to gateway
> >  gateway-port: 0
> >
> >
> >
> >
> >
> >Cheers,
> >Enzo
> >
> >
> >On Tue, 26 May 2020 at 17:15, wldd <[hidden email]> wrote:
> >
> >> Hi,Enzo wang
> >> 图片无法加载,github地址也无法访问,你可以试一下hive可以正常读写表么
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> --
> >>
> >> Best,
> >> wldd
> >>
> >>
> >>
> >>
> >> 在 2020-05-26 17:01:32,"Enzo wang" <[hidden email]> 写道:
> >>
> >> Hi Wldd,
> >>
> >>
> >> 谢谢回复。
> >>
> >>
> >> 1.  datanode 是可用的
> >>
> >>
> >> ❯ docker-compose exec namenode hadoop fs -ls /tmp
> >> Found 1 items
> >> drwx-wx-wx   - root supergroup          0 2020-05-26 05:40 /tmp/hive
> >>
> >>
> >> namenode 的webui 也可以看到:
> >>
> >>
> >>
> >>
> >> 2.  设置set execution.type=batch; 以后,执行报错,错误如下
> >> Causedby: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> >> File
> >>
> /user/hive/warehouse/pokes/.staging_1590483224500/cp-0/task-0/cp-0-task-0-file-0
> >> could only be replicated to 0 nodes instead of minReplication (=1).
> There
> >> are 1 datanode(s) running and1 node(s) are excluded inthis operation.
> >>
> >>
> >> 完整错误见:
> >> https://gist.github.com/r0c/f95ec650fec0a16055787ac0d63f4673
> >>
> >>
> >>
> >> On Tue, 26 May 2020 at 16:52, wldd <[hidden email]> wrote:
> >>
> >> 问题1:
> >>
> >> org.apache.hadoop.hdfs.BlockMissingException,可以用hadoop fs
> >> 命令看看那个datanode能不能访问
> >>
> >>
> >> 问题2:
> >> 写hive,需要用batch模式,set execution.type=batch;
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-05-26 16:42:12,"Enzo wang" <[hidden email]> 写道:
> >>
> >> Hi Flink group,
> >>
> >>
> >> 今天再看Flink与Hive集成的部分遇到了几个问题,麻烦大家帮忙看看。
> >> 参考的网址:
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html
> >>
> >>
> >> 版本、表结构信息见这里:
> https://gist.github.com/r0c/e244622d66447dfc85a512e75fc2159b
> >>
> >>
> >> 问题1:Flink SQL 读Hive 表pokes 失败
> >>
> >>
> >> Flink SQL> select * from pokes;
> >> 2020-05-26 16:12:11,439 INFO  org.apache.hadoop.mapred.FileInputFormat
> >>                   - Total input paths to process : 4
> >> [ERROR] Could not execute SQL statement. Reason:
> >> org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block:
> >> BP-138389591-172.20.0.4-1590471581703:blk_1073741825_1001
> >> file=/user/hive/warehouse/pokes/kv1.txt
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 问题2:Flink SQL 写Hive 表pokes 失败
> >>
> >>
> >> Flink SQL> insert into pokes select 12,'tom';
> >> [INFO] Submitting SQL update statement to the cluster...
> >> [ERROR] Could not execute SQL statement. Reason:
> >> org.apache.flink.table.api.TableException: Stream Tables can only be
> >> emitted by AppendStreamTableSink, RetractStreamTableSink, or
> >> UpsertStreamTableSink.
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> Cheers,
> >> Enzo
>