Table options do not contain an option key 'connector' for discovering a connector.

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Table options do not contain an option key 'connector' for discovering a connector.

Zhou Zach
flink 1.11 sink hive table的connector设置为什么啊,尝试设置
WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='success-file');
也报错误
query:
streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
streamTableEnv.executeSql(
"""
    |
    |
    |CREATE TABLE hive_table (
    |  user_id STRING,
    |  age INT
    |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
    |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
    |  'sink.partition-commit.trigger'='partition-time',
    |  'sink.partition-commit.delay'='1 h',
    |  'sink.partition-commit.policy.kind'='metastore,success-file'
    |)
    |
    |""".stripMargin)

streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
streamTableEnv.executeSql(
"""
    |
    |CREATE TABLE kafka_table (
    |    uid VARCHAR,
    |    -- uid BIGINT,
    |    sex VARCHAR,
    |    age INT,
    |    created_time TIMESTAMP(3),
    |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
    |) WITH (
    |    'connector.type' = 'kafka',
    |    'connector.version' = 'universal',
    |     'connector.topic' = 'user',
    |    -- 'connector.topic' = 'user_long',
    |    'connector.startup-mode' = 'latest-offset',
    |    'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181',
    |    'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
    |    'connector.properties.group.id' = 'user_flink',
    |    'format.type' = 'json',
    |    'format.derive-schema' = 'true'
    |)
    |""".stripMargin)



streamTableEnv.executeSql(
"""
    |
    |INSERT INTO hive_table
    |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH')
    |FROM kafka_table
    |
    |""".stripMargin)

streamTableEnv.executeSql(
"""
    |
    |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
    |
    |""".stripMargin)
.print()
错误栈:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hive_table'.

Table options are:

'hive.storage.file-format'='parquet'
'is_generic'='false'
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
'sink.partition-commit.delay'='1 h'
'sink.partition-commit.policy.kind'='metastore,success-file'
'sink.partition-commit.trigger'='partition-time'
        at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
        at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
        at org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
        at org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector.
        at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
        at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
        ... 19 more

Reply | Threaded
Open this post in threaded view
|

Re: Table options do not contain an option key 'connector' for discovering a connector.

Jingsong Li
Hi,

你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog

不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息

Best,
Jingsong

On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote:

> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
> h','sink.partition-commit.policy.kind'='success-file');
> 也报错误
> query:
> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> streamTableEnv.executeSql(
> """
>     |
>     |
>     |CREATE TABLE hive_table (
>     |  user_id STRING,
>     |  age INT
>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> TBLPROPERTIES (
>     |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>     |  'sink.partition-commit.trigger'='partition-time',
>     |  'sink.partition-commit.delay'='1 h',
>     |  'sink.partition-commit.policy.kind'='metastore,success-file'
>     |)
>     |
>     |""".stripMargin)
>
> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> streamTableEnv.executeSql(
> """
>     |
>     |CREATE TABLE kafka_table (
>     |    uid VARCHAR,
>     |    -- uid BIGINT,
>     |    sex VARCHAR,
>     |    age INT,
>     |    created_time TIMESTAMP(3),
>     |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>     |) WITH (
>     |    'connector.type' = 'kafka',
>     |    'connector.version' = 'universal',
>     |     'connector.topic' = 'user',
>     |    -- 'connector.topic' = 'user_long',
>     |    'connector.startup-mode' = 'latest-offset',
>     |    'connector.properties.zookeeper.connect' =
> 'cdh1:2181,cdh2:2181,cdh3:2181',
>     |    'connector.properties.bootstrap.servers' =
> 'cdh1:9092,cdh2:9092,cdh3:9092',
>     |    'connector.properties.group.id' = 'user_flink',
>     |    'format.type' = 'json',
>     |    'format.derive-schema' = 'true'
>     |)
>     |""".stripMargin)
>
>
>
> streamTableEnv.executeSql(
> """
>     |
>     |INSERT INTO hive_table
>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
> DATE_FORMAT(created_time, 'HH')
>     |FROM kafka_table
>     |
>     |""".stripMargin)
>
> streamTableEnv.executeSql(
> """
>     |
>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>     |
>     |""".stripMargin)
> .print()
> 错误栈:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Unable to create a sink for writing table
> 'default_catalog.default_database.hive_table'.
>
> Table options are:
>
> 'hive.storage.file-format'='parquet'
> 'is_generic'='false'
> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> 'sink.partition-commit.delay'='1 h'
> 'sink.partition-commit.policy.kind'='metastore,success-file'
> 'sink.partition-commit.trigger'='partition-time'
>         at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>         at
> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
>         at org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
> Caused by: org.apache.flink.table.api.ValidationException: Table options
> do not contain an option key 'connector' for discovering a connector.
>         at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
>         at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>         ... 19 more
>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:Re: Table options do not contain an option key 'connector' for discovering a connector.

Zhou Zach
Hi,
根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add partition到hive表吗,我当前设置了参数
'sink.partition-commit.policy.kind'='metastore'

















At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote:

>Hi,
>
>你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>
>不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote:
>
>> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> h','sink.partition-commit.policy.kind'='success-file');
>> 也报错误
>> query:
>> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> streamTableEnv.executeSql(
>> """
>>     |
>>     |
>>     |CREATE TABLE hive_table (
>>     |  user_id STRING,
>>     |  age INT
>>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> TBLPROPERTIES (
>>     |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>>     |  'sink.partition-commit.trigger'='partition-time',
>>     |  'sink.partition-commit.delay'='1 h',
>>     |  'sink.partition-commit.policy.kind'='metastore,success-file'
>>     |)
>>     |
>>     |""".stripMargin)
>>
>> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> streamTableEnv.executeSql(
>> """
>>     |
>>     |CREATE TABLE kafka_table (
>>     |    uid VARCHAR,
>>     |    -- uid BIGINT,
>>     |    sex VARCHAR,
>>     |    age INT,
>>     |    created_time TIMESTAMP(3),
>>     |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>     |) WITH (
>>     |    'connector.type' = 'kafka',
>>     |    'connector.version' = 'universal',
>>     |     'connector.topic' = 'user',
>>     |    -- 'connector.topic' = 'user_long',
>>     |    'connector.startup-mode' = 'latest-offset',
>>     |    'connector.properties.zookeeper.connect' =
>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>>     |    'connector.properties.bootstrap.servers' =
>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>>     |    'connector.properties.group.id' = 'user_flink',
>>     |    'format.type' = 'json',
>>     |    'format.derive-schema' = 'true'
>>     |)
>>     |""".stripMargin)
>>
>>
>>
>> streamTableEnv.executeSql(
>> """
>>     |
>>     |INSERT INTO hive_table
>>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
>> DATE_FORMAT(created_time, 'HH')
>>     |FROM kafka_table
>>     |
>>     |""".stripMargin)
>>
>> streamTableEnv.executeSql(
>> """
>>     |
>>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>>     |
>>     |""".stripMargin)
>> .print()
>> 错误栈:
>> Exception in thread "main" org.apache.flink.table.api.ValidationException:
>> Unable to create a sink for writing table
>> 'default_catalog.default_database.hive_table'.
>>
>> Table options are:
>>
>> 'hive.storage.file-format'='parquet'
>> 'is_generic'='false'
>> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> 'sink.partition-commit.delay'='1 h'
>> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> 'sink.partition-commit.trigger'='partition-time'
>>         at
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>         at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>         at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>>         at
>> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
>>         at org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
>> Caused by: org.apache.flink.table.api.ValidationException: Table options
>> do not contain an option key 'connector' for discovering a connector.
>>         at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
>>         at
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>>         ... 19 more
>>
>>
>
>--
>Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

Jingsong Li
有开checkpoint吧?delay设的多少?

Add partition 在 checkpoint完成 + delay的时间后

Best,
Jingsong

On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote:

> Hi,
> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
> partition到hive表吗,我当前设置了参数
> 'sink.partition-commit.policy.kind'='metastore'
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote:
> >Hi,
> >
> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
> >
> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote:
> >
> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
> >>
> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
> >> h','sink.partition-commit.policy.kind'='success-file');
> >> 也报错误
> >> query:
> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >> streamTableEnv.executeSql(
> >> """
> >>     |
> >>     |
> >>     |CREATE TABLE hive_table (
> >>     |  user_id STRING,
> >>     |  age INT
> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> >> TBLPROPERTIES (
> >>     |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >>     |  'sink.partition-commit.trigger'='partition-time',
> >>     |  'sink.partition-commit.delay'='1 h',
> >>     |  'sink.partition-commit.policy.kind'='metastore,success-file'
> >>     |)
> >>     |
> >>     |""".stripMargin)
> >>
> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >> streamTableEnv.executeSql(
> >> """
> >>     |
> >>     |CREATE TABLE kafka_table (
> >>     |    uid VARCHAR,
> >>     |    -- uid BIGINT,
> >>     |    sex VARCHAR,
> >>     |    age INT,
> >>     |    created_time TIMESTAMP(3),
> >>     |    WATERMARK FOR created_time as created_time - INTERVAL '3'
> SECOND
> >>     |) WITH (
> >>     |    'connector.type' = 'kafka',
> >>     |    'connector.version' = 'universal',
> >>     |     'connector.topic' = 'user',
> >>     |    -- 'connector.topic' = 'user_long',
> >>     |    'connector.startup-mode' = 'latest-offset',
> >>     |    'connector.properties.zookeeper.connect' =
> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >>     |    'connector.properties.bootstrap.servers' =
> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >>     |    'connector.properties.group.id' = 'user_flink',
> >>     |    'format.type' = 'json',
> >>     |    'format.derive-schema' = 'true'
> >>     |)
> >>     |""".stripMargin)
> >>
> >>
> >>
> >> streamTableEnv.executeSql(
> >> """
> >>     |
> >>     |INSERT INTO hive_table
> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
> >> DATE_FORMAT(created_time, 'HH')
> >>     |FROM kafka_table
> >>     |
> >>     |""".stripMargin)
> >>
> >> streamTableEnv.executeSql(
> >> """
> >>     |
> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
> >>     |
> >>     |""".stripMargin)
> >> .print()
> >> 错误栈:
> >> Exception in thread "main"
> org.apache.flink.table.api.ValidationException:
> >> Unable to create a sink for writing table
> >> 'default_catalog.default_database.hive_table'.
> >>
> >> Table options are:
> >>
> >> 'hive.storage.file-format'='parquet'
> >> 'is_generic'='false'
> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> 'sink.partition-commit.delay'='1 h'
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> 'sink.partition-commit.trigger'='partition-time'
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >>         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >>         at
> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >>         at
> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >>         at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >>         at
> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
> >>         at
> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
> >> Caused by: org.apache.flink.table.api.ValidationException: Table options
> >> do not contain an option key 'connector' for discovering a connector.
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >>         ... 19 more
> >>
> >>
> >
> >--
> >Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

Zhou Zach
开了checkpoint,
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)




间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据














在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道:

>有开checkpoint吧?delay设的多少?
>
>Add partition 在 checkpoint完成 + delay的时间后
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote:
>
>> Hi,
>> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> partition到hive表吗,我当前设置了参数
>> 'sink.partition-commit.policy.kind'='metastore'
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote:
>> >Hi,
>> >
>> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >
>> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote:
>> >
>> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> 也报错误
>> >> query:
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |
>> >>     |CREATE TABLE hive_table (
>> >>     |  user_id STRING,
>> >>     |  age INT
>> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> TBLPROPERTIES (
>> >>     |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>> >>     |  'sink.partition-commit.trigger'='partition-time',
>> >>     |  'sink.partition-commit.delay'='1 h',
>> >>     |  'sink.partition-commit.policy.kind'='metastore,success-file'
>> >>     |)
>> >>     |
>> >>     |""".stripMargin)
>> >>
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |CREATE TABLE kafka_table (
>> >>     |    uid VARCHAR,
>> >>     |    -- uid BIGINT,
>> >>     |    sex VARCHAR,
>> >>     |    age INT,
>> >>     |    created_time TIMESTAMP(3),
>> >>     |    WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> >>     |) WITH (
>> >>     |    'connector.type' = 'kafka',
>> >>     |    'connector.version' = 'universal',
>> >>     |     'connector.topic' = 'user',
>> >>     |    -- 'connector.topic' = 'user_long',
>> >>     |    'connector.startup-mode' = 'latest-offset',
>> >>     |    'connector.properties.zookeeper.connect' =
>> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >>     |    'connector.properties.bootstrap.servers' =
>> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >>     |    'connector.properties.group.id' = 'user_flink',
>> >>     |    'format.type' = 'json',
>> >>     |    'format.derive-schema' = 'true'
>> >>     |)
>> >>     |""".stripMargin)
>> >>
>> >>
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |INSERT INTO hive_table
>> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
>> >> DATE_FORMAT(created_time, 'HH')
>> >>     |FROM kafka_table
>> >>     |
>> >>     |""".stripMargin)
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >>     |
>> >>     |""".stripMargin)
>> >> .print()
>> >> 错误栈:
>> >> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException:
>> >> Unable to create a sink for writing table
>> >> 'default_catalog.default_database.hive_table'.
>> >>
>> >> Table options are:
>> >>
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='1 h'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> 'sink.partition-commit.trigger'='partition-time'
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >>         at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >>         at
>> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >>         at
>> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >>         at
>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> >>         at
>> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
>> >>         at
>> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
>> >> Caused by: org.apache.flink.table.api.ValidationException: Table options
>> >> do not contain an option key 'connector' for discovering a connector.
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> >>         ... 19 more
>> >>
>> >>
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>--
>Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

Jingsong Li
有没有设置 sink.partition-commit.delay?

Best,
Jingsong

On Mon, Jul 13, 2020 at 5:09 PM Zhou Zach <[hidden email]> wrote:

> 开了checkpoint,
> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamExecutionEnv.enableCheckpointing(5 * 1000,
> CheckpointingMode.EXACTLY_ONCE)
> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
>
>
>
>
> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道:
> >有开checkpoint吧?delay设的多少?
> >
> >Add partition 在 checkpoint完成 + delay的时间后
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote:
> >
> >> Hi,
> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
> >> partition到hive表吗,我当前设置了参数
> >> 'sink.partition-commit.policy.kind'='metastore'
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote:
> >> >Hi,
> >> >
> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
> >> >
> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote:
> >> >
> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
> >> >>
> >>
> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
> >> >> h','sink.partition-commit.policy.kind'='success-file');
> >> >> 也报错误
> >> >> query:
> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >>     |
> >> >>     |
> >> >>     |CREATE TABLE hive_table (
> >> >>     |  user_id STRING,
> >> >>     |  age INT
> >> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> >> >> TBLPROPERTIES (
> >> >>     |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >> >>     |  'sink.partition-commit.trigger'='partition-time',
> >> >>     |  'sink.partition-commit.delay'='1 h',
> >> >>     |  'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >>     |)
> >> >>     |
> >> >>     |""".stripMargin)
> >> >>
> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >>     |
> >> >>     |CREATE TABLE kafka_table (
> >> >>     |    uid VARCHAR,
> >> >>     |    -- uid BIGINT,
> >> >>     |    sex VARCHAR,
> >> >>     |    age INT,
> >> >>     |    created_time TIMESTAMP(3),
> >> >>     |    WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >> >>     |) WITH (
> >> >>     |    'connector.type' = 'kafka',
> >> >>     |    'connector.version' = 'universal',
> >> >>     |     'connector.topic' = 'user',
> >> >>     |    -- 'connector.topic' = 'user_long',
> >> >>     |    'connector.startup-mode' = 'latest-offset',
> >> >>     |    'connector.properties.zookeeper.connect' =
> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >>     |    'connector.properties.bootstrap.servers' =
> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >>     |    'connector.properties.group.id' = 'user_flink',
> >> >>     |    'format.type' = 'json',
> >> >>     |    'format.derive-schema' = 'true'
> >> >>     |)
> >> >>     |""".stripMargin)
> >> >>
> >> >>
> >> >>
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >>     |
> >> >>     |INSERT INTO hive_table
> >> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
> >> >> DATE_FORMAT(created_time, 'HH')
> >> >>     |FROM kafka_table
> >> >>     |
> >> >>     |""".stripMargin)
> >> >>
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >>     |
> >> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
> >> >>     |
> >> >>     |""".stripMargin)
> >> >> .print()
> >> >> 错误栈:
> >> >> Exception in thread "main"
> >> org.apache.flink.table.api.ValidationException:
> >> >> Unable to create a sink for writing table
> >> >> 'default_catalog.default_database.hive_table'.
> >> >>
> >> >> Table options are:
> >> >>
> >> >> 'hive.storage.file-format'='parquet'
> >> >> 'is_generic'='false'
> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> >> 'sink.partition-commit.delay'='1 h'
> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> 'sink.partition-commit.trigger'='partition-time'
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >>         at
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >>         at
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >>         at
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >>         at
> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> >>         at
> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> >>         at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> >>         at
> >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> >>         at
> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> >>         at
> >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
> >> >>         at
> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
> >> >> Caused by: org.apache.flink.table.api.ValidationException: Table
> options
> >> >> do not contain an option key 'connector' for discovering a connector.
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >> >>         ... 19 more
> >> >>
> >> >>
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

Shuai Xia
In reply to this post by Zhou Zach
你好,
你设置了1个小时的
SINK_PARTITION_COMMIT_DELAY


------------------------------------------------------------------
发件人:Zhou Zach <[hidden email]>
发送时间:2020年7月13日(星期一) 17:09
收件人:user-zh <[hidden email]>
主 题:Re:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

开了checkpoint,
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)




间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据














在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道:

>有开checkpoint吧?delay设的多少?
>
>Add partition 在 checkpoint完成 + delay的时间后
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote:
>
>> Hi,
>> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> partition到hive表吗,我当前设置了参数
>> 'sink.partition-commit.policy.kind'='metastore'
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote:
>> >Hi,
>> >
>> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >
>> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote:
>> >
>> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> 也报错误
>> >> query:
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |
>> >>     |CREATE TABLE hive_table (
>> >>     |  user_id STRING,
>> >>     |  age INT
>> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> TBLPROPERTIES (
>> >>     |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>> >>     |  'sink.partition-commit.trigger'='partition-time',
>> >>     |  'sink.partition-commit.delay'='1 h',
>> >>     |  'sink.partition-commit.policy.kind'='metastore,success-file'
>> >>     |)
>> >>     |
>> >>     |""".stripMargin)
>> >>
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |CREATE TABLE kafka_table (
>> >>     |    uid VARCHAR,
>> >>     |    -- uid BIGINT,
>> >>     |    sex VARCHAR,
>> >>     |    age INT,
>> >>     |    created_time TIMESTAMP(3),
>> >>     |    WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> >>     |) WITH (
>> >>     |    'connector.type' = 'kafka',
>> >>     |    'connector.version' = 'universal',
>> >>     |     'connector.topic' = 'user',
>> >>     |    -- 'connector.topic' = 'user_long',
>> >>     |    'connector.startup-mode' = 'latest-offset',
>> >>     |    'connector.properties.zookeeper.connect' =
>> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >>     |    'connector.properties.bootstrap.servers' =
>> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >>     |    'connector.properties.group.id' = 'user_flink',
>> >>     |    'format.type' = 'json',
>> >>     |    'format.derive-schema' = 'true'
>> >>     |)
>> >>     |""".stripMargin)
>> >>
>> >>
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |INSERT INTO hive_table
>> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
>> >> DATE_FORMAT(created_time, 'HH')
>> >>     |FROM kafka_table
>> >>     |
>> >>     |""".stripMargin)
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >>     |
>> >>     |""".stripMargin)
>> >> .print()
>> >> 错误栈:
>> >> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException:
>> >> Unable to create a sink for writing table
>> >> 'default_catalog.default_database.hive_table'.
>> >>
>> >> Table options are:
>> >>
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='1 h'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> 'sink.partition-commit.trigger'='partition-time'
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >>         at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >>         at
>> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >>         at
>> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >>         at
>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> >>         at
>> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
>> >>         at
>> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
>> >> Caused by: org.apache.flink.table.api.ValidationException: Table options
>> >> do not contain an option key 'connector' for discovering a connector.
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> >>         ... 19 more
>> >>
>> >>
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>--
>Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

Zhou Zach
Hi,


我现在改成了:
'sink.partition-commit.delay'='0s'


checkpoint完成了20多次,hdfs文件也产生了20多个,
hive表还是查不到数据













在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道:

你好,
你设置了1个小时的
SINK_PARTITION_COMMIT_DELAY


------------------------------------------------------------------
发件人:Zhou Zach <[hidden email]>
发送时间:2020年7月13日(星期一) 17:09
收件人:user-zh <[hidden email]>
主 题:Re:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.


开了checkpoint,
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)




间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据














在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道:

>有开checkpoint吧?delay设的多少?
>
>Add partition 在 checkpoint完成 + delay的时间后
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote:
>
>> Hi,
>> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> partition到hive表吗,我当前设置了参数
>> 'sink.partition-commit.policy.kind'='metastore'
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote:
>> >Hi,
>> >
>> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >
>> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote:
>> >
>> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> 也报错误
>> >> query:
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |
>> >>     |CREATE TABLE hive_table (
>> >>     |  user_id STRING,
>> >>     |  age INT
>> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> TBLPROPERTIES (
>> >>     |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>> >>     |  'sink.partition-commit.trigger'='partition-time',
>> >>     |  'sink.partition-commit.delay'='1 h',
>> >>     |  'sink.partition-commit.policy.kind'='metastore,success-file'
>> >>     |)
>> >>     |
>> >>     |""".stripMargin)
>> >>
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |CREATE TABLE kafka_table (
>> >>     |    uid VARCHAR,
>> >>     |    -- uid BIGINT,
>> >>     |    sex VARCHAR,
>> >>     |    age INT,
>> >>     |    created_time TIMESTAMP(3),
>> >>     |    WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> >>     |) WITH (
>> >>     |    'connector.type' = 'kafka',
>> >>     |    'connector.version' = 'universal',
>> >>     |     'connector.topic' = 'user',
>> >>     |    -- 'connector.topic' = 'user_long',
>> >>     |    'connector.startup-mode' = 'latest-offset',
>> >>     |    'connector.properties.zookeeper.connect' =
>> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >>     |    'connector.properties.bootstrap.servers' =
>> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >>     |    'connector.properties.group.id' = 'user_flink',
>> >>     |    'format.type' = 'json',
>> >>     |    'format.derive-schema' = 'true'
>> >>     |)
>> >>     |""".stripMargin)
>> >>
>> >>
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |INSERT INTO hive_table
>> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
>> >> DATE_FORMAT(created_time, 'HH')
>> >>     |FROM kafka_table
>> >>     |
>> >>     |""".stripMargin)
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >>     |
>> >>     |""".stripMargin)
>> >> .print()
>> >> 错误栈:
>> >> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException:
>> >> Unable to create a sink for writing table
>> >> 'default_catalog.default_database.hive_table'.
>> >>
>> >> Table options are:
>> >>
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='1 h'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> 'sink.partition-commit.trigger'='partition-time'
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >>         at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >>         at
>> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >>         at
>> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >>         at
>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> >>         at
>> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
>> >>         at
>> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
>> >> Caused by: org.apache.flink.table.api.ValidationException: Table options
>> >> do not contain an option key 'connector' for discovering a connector.
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> >>         ... 19 more
>> >>
>> >>
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>--
>Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

Jingsong Li
你把完整的程序再贴下呢

Best,
Jingsong

On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> wrote:

> Hi,
>
>
> 我现在改成了:
> 'sink.partition-commit.delay'='0s'
>
>
> checkpoint完成了20多次,hdfs文件也产生了20多个,
> hive表还是查不到数据
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道:
>
> 你好,
> 你设置了1个小时的
> SINK_PARTITION_COMMIT_DELAY
>
>
> ------------------------------------------------------------------
> 发件人:Zhou Zach <[hidden email]>
> 发送时间:2020年7月13日(星期一) 17:09
> 收件人:user-zh <[hidden email]>
> 主 题:Re:Re: Re: Table options do not contain an option key 'connector' for
> discovering a connector.
>
>
> 开了checkpoint,
> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamExecutionEnv.enableCheckpointing(5 * 1000,
> CheckpointingMode.EXACTLY_ONCE)
> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
>
>
>
>
> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道:
> >有开checkpoint吧?delay设的多少?
> >
> >Add partition 在 checkpoint完成 + delay的时间后
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote:
> >
> >> Hi,
> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
> >> partition到hive表吗,我当前设置了参数
> >> 'sink.partition-commit.policy.kind'='metastore'
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote:
> >> >Hi,
> >> >
> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
> >> >
> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote:
> >> >
> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
> >> >>
> >>
> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
> >> >> h','sink.partition-commit.policy.kind'='success-file');
> >> >> 也报错误
> >> >> query:
> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >>     |
> >> >>     |
> >> >>     |CREATE TABLE hive_table (
> >> >>     |  user_id STRING,
> >> >>     |  age INT
> >> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> >> >> TBLPROPERTIES (
> >> >>     |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >> >>     |  'sink.partition-commit.trigger'='partition-time',
> >> >>     |  'sink.partition-commit.delay'='1 h',
> >> >>     |  'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >>     |)
> >> >>     |
> >> >>     |""".stripMargin)
> >> >>
> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >>     |
> >> >>     |CREATE TABLE kafka_table (
> >> >>     |    uid VARCHAR,
> >> >>     |    -- uid BIGINT,
> >> >>     |    sex VARCHAR,
> >> >>     |    age INT,
> >> >>     |    created_time TIMESTAMP(3),
> >> >>     |    WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >> >>     |) WITH (
> >> >>     |    'connector.type' = 'kafka',
> >> >>     |    'connector.version' = 'universal',
> >> >>     |     'connector.topic' = 'user',
> >> >>     |    -- 'connector.topic' = 'user_long',
> >> >>     |    'connector.startup-mode' = 'latest-offset',
> >> >>     |    'connector.properties.zookeeper.connect' =
> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >>     |    'connector.properties.bootstrap.servers' =
> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >>     |    'connector.properties.group.id' = 'user_flink',
> >> >>     |    'format.type' = 'json',
> >> >>     |    'format.derive-schema' = 'true'
> >> >>     |)
> >> >>     |""".stripMargin)
> >> >>
> >> >>
> >> >>
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >>     |
> >> >>     |INSERT INTO hive_table
> >> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
> >> >> DATE_FORMAT(created_time, 'HH')
> >> >>     |FROM kafka_table
> >> >>     |
> >> >>     |""".stripMargin)
> >> >>
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >>     |
> >> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
> >> >>     |
> >> >>     |""".stripMargin)
> >> >> .print()
> >> >> 错误栈:
> >> >> Exception in thread "main"
> >> org.apache.flink.table.api.ValidationException:
> >> >> Unable to create a sink for writing table
> >> >> 'default_catalog.default_database.hive_table'.
> >> >>
> >> >> Table options are:
> >> >>
> >> >> 'hive.storage.file-format'='parquet'
> >> >> 'is_generic'='false'
> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> >> 'sink.partition-commit.delay'='1 h'
> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> 'sink.partition-commit.trigger'='partition-time'
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >>         at
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >>         at
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >>         at
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >>         at
> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> >>         at
> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> >>         at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> >>         at
> >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> >>         at
> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> >>         at
> >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
> >> >>         at
> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
> >> >> Caused by: org.apache.flink.table.api.ValidationException: Table
> options
> >> >> do not contain an option key 'connector' for discovering a connector.
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >> >>         ... 19 more
> >> >>
> >> >>
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

Zhou Zach
尴尬。。。。
我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
还有两个问题问下,
问题1:
创建的kafka_table,在hive和Flink SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore






问题2:
刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_161]
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_161]
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) ~[?:1.8.0_161]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_161]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_161]
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_161]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_161]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_161]
        at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154) [qile-data-flow-1.0.jar:?]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [qile-data-flow-1.0.jar:?]
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [qile-data-flow-1.0.jar:?]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [qile-data-flow-1.0.jar:?]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [qile-data-flow-1.0.jar:?]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [qile-data-flow-1.0.jar:?]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [qile-data-flow-1.0.jar:?]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
        ... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a sink for writing table 'default_catalog.default_database.hive_table1'.

Table options are:

'connector'='filesystem'
'hive.storage.file-format'='parquet'
'is_generic'='false'
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
'sink.partition-commit.delay'='0s'
'sink.partition-commit.policy.kind'='metastore,success-file'
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        ... 10 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hive_table1'.

Table options are:

'connector'='filesystem'
'hive.storage.file-format'='parquet'
'is_generic'='false'
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
'sink.partition-commit.delay'='0s'
'sink.partition-commit.policy.kind'='metastore,success-file'
        at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[qile-data-flow-1.0.jar:?]
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) ~[qile-data-flow-1.0.jar:?]
        at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) ~[qile-data-flow-1.0.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_161]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_161]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_161]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        ... 10 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='filesystem''.
        at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[qile-data-flow-1.0.jar:?]
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) ~[qile-data-flow-1.0.jar:?]
        at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) ~[qile-data-flow-1.0.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_161]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_161]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_161]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        ... 10 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'filesystem' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

Available factory identifiers are:

blackhole
hbase-1.4
jdbc
kafka
print
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?]
        at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[qile-data-flow-1.0.jar:?]
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) ~[qile-data-flow-1.0.jar:?]
        at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) ~[qile-data-flow-1.0.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_161]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_161]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_161]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-clients_2.11-1.11.0.jar:1.11.0]
        ... 10 more






query:




val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE)
    streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)

    val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)



    streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
    streamTableEnv.executeSql(
      """
        |
        |
        |CREATE TABLE hive_table (
        |  user_id STRING,
        |  age INT
        |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
        |  'connector'='filesystem',
        |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
        |  'sink.partition-commit.delay'='0s',
        |  'sink.partition-commit.policy.kind'='metastore,success-file'
        |)
        |
        |""".stripMargin)

    streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
    streamTableEnv.executeSql(
      """
        |
        |CREATE TABLE kafka_table (
        |    uid VARCHAR,
        |    -- uid BIGINT,
        |    sex VARCHAR,
        |    age INT,
        |    created_time TIMESTAMP(3),
        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
        |) WITH (
        |    'connector.type' = 'kafka',
        |    'connector.version' = 'universal',
        |     'connector.topic' = 'user',
        |    -- 'connector.topic' = 'user_long',
        |    'connector.startup-mode' = 'latest-offset',
        |    'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181',
        |    'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
        |    'connector.properties.group.id' = 'user_flink',
        |    'format.type' = 'json',
        |    'format.derive-schema' = 'true'
        |)
        |""".stripMargin)


    streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)

    streamTableEnv.executeSql(
      """
        |
        |INSERT INTO hive_table
        |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH')
        |FROM kafka_table
        |
        |""".stripMargin)

    streamTableEnv.executeSql(
      """
        |
        |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18'
        |
        |""".stripMargin)
      .print()













在 2020-07-13 17:52:54,"Jingsong Li" <[hidden email]> 写道:

>你把完整的程序再贴下呢
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> wrote:
>
>> Hi,
>>
>>
>> 我现在改成了:
>> 'sink.partition-commit.delay'='0s'
>>
>>
>> checkpoint完成了20多次,hdfs文件也产生了20多个,
>> hive表还是查不到数据
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道:
>>
>> 你好,
>> 你设置了1个小时的
>> SINK_PARTITION_COMMIT_DELAY
>>
>>
>> ------------------------------------------------------------------
>> 发件人:Zhou Zach <[hidden email]>
>> 发送时间:2020年7月13日(星期一) 17:09
>> 收件人:user-zh <[hidden email]>
>> 主 题:Re:Re: Re: Table options do not contain an option key 'connector' for
>> discovering a connector.
>>
>>
>> 开了checkpoint,
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> streamExecutionEnv.enableCheckpointing(5 * 1000,
>> CheckpointingMode.EXACTLY_ONCE)
>> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
>>
>>
>>
>>
>> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道:
>> >有开checkpoint吧?delay设的多少?
>> >
>> >Add partition 在 checkpoint完成 + delay的时间后
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote:
>> >
>> >> Hi,
>> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> >> partition到hive表吗,我当前设置了参数
>> >> 'sink.partition-commit.policy.kind'='metastore'
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote:
>> >> >Hi,
>> >> >
>> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >> >
>> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >> >
>> >> >Best,
>> >> >Jingsong
>> >> >
>> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote:
>> >> >
>> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >> >>
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> >> 也报错误
>> >> >> query:
>> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> >> streamTableEnv.executeSql(
>> >> >> """
>> >> >>     |
>> >> >>     |
>> >> >>     |CREATE TABLE hive_table (
>> >> >>     |  user_id STRING,
>> >> >>     |  age INT
>> >> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> >> TBLPROPERTIES (
>> >> >>     |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>> >> >>     |  'sink.partition-commit.trigger'='partition-time',
>> >> >>     |  'sink.partition-commit.delay'='1 h',
>> >> >>     |  'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> >>     |)
>> >> >>     |
>> >> >>     |""".stripMargin)
>> >> >>
>> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> >> streamTableEnv.executeSql(
>> >> >> """
>> >> >>     |
>> >> >>     |CREATE TABLE kafka_table (
>> >> >>     |    uid VARCHAR,
>> >> >>     |    -- uid BIGINT,
>> >> >>     |    sex VARCHAR,
>> >> >>     |    age INT,
>> >> >>     |    created_time TIMESTAMP(3),
>> >> >>     |    WATERMARK FOR created_time as created_time - INTERVAL '3'
>> >> SECOND
>> >> >>     |) WITH (
>> >> >>     |    'connector.type' = 'kafka',
>> >> >>     |    'connector.version' = 'universal',
>> >> >>     |     'connector.topic' = 'user',
>> >> >>     |    -- 'connector.topic' = 'user_long',
>> >> >>     |    'connector.startup-mode' = 'latest-offset',
>> >> >>     |    'connector.properties.zookeeper.connect' =
>> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> >>     |    'connector.properties.bootstrap.servers' =
>> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> >>     |    'connector.properties.group.id' = 'user_flink',
>> >> >>     |    'format.type' = 'json',
>> >> >>     |    'format.derive-schema' = 'true'
>> >> >>     |)
>> >> >>     |""".stripMargin)
>> >> >>
>> >> >>
>> >> >>
>> >> >> streamTableEnv.executeSql(
>> >> >> """
>> >> >>     |
>> >> >>     |INSERT INTO hive_table
>> >> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
>> >> >> DATE_FORMAT(created_time, 'HH')
>> >> >>     |FROM kafka_table
>> >> >>     |
>> >> >>     |""".stripMargin)
>> >> >>
>> >> >> streamTableEnv.executeSql(
>> >> >> """
>> >> >>     |
>> >> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >> >>     |
>> >> >>     |""".stripMargin)
>> >> >> .print()
>> >> >> 错误栈:
>> >> >> Exception in thread "main"
>> >> org.apache.flink.table.api.ValidationException:
>> >> >> Unable to create a sink for writing table
>> >> >> 'default_catalog.default_database.hive_table'.
>> >> >>
>> >> >> Table options are:
>> >> >>
>> >> >> 'hive.storage.file-format'='parquet'
>> >> >> 'is_generic'='false'
>> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> >> 'sink.partition-commit.delay'='1 h'
>> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> >> 'sink.partition-commit.trigger'='partition-time'
>> >> >>         at
>> >> >>
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >> >>         at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >> >>         at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >> >>         at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> >>         at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> >>         at
>> >> >>
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> >>         at
>> >> >>
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> >>         at
>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >> >>         at
>> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >> >>         at
>> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> >>         at
>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >> >>         at
>> >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >> >>         at
>> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >> >>         at
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> >> >>         at
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> >> >>         at
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> >> >>         at
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> >> >>         at
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> >> >>         at
>> >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
>> >> >>         at
>> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
>> >> >> Caused by: org.apache.flink.table.api.ValidationException: Table
>> options
>> >> >> do not contain an option key 'connector' for discovering a connector.
>> >> >>         at
>> >> >>
>> >>
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
>> >> >>         at
>> >> >>
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> >> >>         ... 19 more
>> >> >>
>> >> >>
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>--
>Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

Jingsong Li
Hi,

问题一:

只要current catalog是HiveCatalog。
理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.

明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?

问题二:

用filesystem创建出来的是filesystem的表,它和hive
metastore是没有关系的,你需要使用创建filesystem表的语法[1]。

filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
但是它的partition commit是不支持metastore的,所以不会有自动add
partition到hive的默认实现,你需要自定义partition-commit-policy.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

Best,
Jingsong

On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach <[hidden email]> wrote:

> 尴尬。。。。
> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
> 还有两个问题问下,
> 问题1:
> 创建的kafka_table,在hive和Flink
> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
>
>
>
>
>
>
> 问题2:
> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
> java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
>         at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> ~[?:1.8.0_161]
>         at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> ~[?:1.8.0_161]
>         at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> ~[?:1.8.0_161]
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> ~[?:1.8.0_161]
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> ~[?:1.8.0_161]
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> ~[?:1.8.0_161]
>         at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_161]
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_161]
>         at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
> [qile-data-flow-1.0.jar:?]
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> [qile-data-flow-1.0.jar:?]
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> [qile-data-flow-1.0.jar:?]
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [qile-data-flow-1.0.jar:?]
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [qile-data-flow-1.0.jar:?]
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [qile-data-flow-1.0.jar:?]
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [qile-data-flow-1.0.jar:?]
> Caused by:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
>         ... 11 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> main method caused an error: Unable to create a sink for writing table
> 'default_catalog.default_database.hive_table1'.
>
> Table options are:
>
> 'connector'='filesystem'
> 'hive.storage.file-format'='parquet'
> 'is_generic'='false'
> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> 'sink.partition-commit.delay'='0s'
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         ... 10 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create a sink for writing table
> 'default_catalog.default_database.hive_table1'.
>
> Table options are:
>
> 'connector'='filesystem'
> 'hive.storage.file-format'='parquet'
> 'is_generic'='false'
> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> 'sink.partition-commit.delay'='0s'
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>         at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> ~[qile-data-flow-1.0.jar:?]
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> ~[qile-data-flow-1.0.jar:?]
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> ~[qile-data-flow-1.0.jar:?]
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> ~[qile-data-flow-1.0.jar:?]
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> ~[qile-data-flow-1.0.jar:?]
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> ~[qile-data-flow-1.0.jar:?]
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> ~[qile-data-flow-1.0.jar:?]
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> ~[qile-data-flow-1.0.jar:?]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
> ~[qile-data-flow-1.0.jar:?]
>         at
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
> ~[qile-data-flow-1.0.jar:?]
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_161]
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_161]
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_161]
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         ... 10 more
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover
> a connector using option ''connector'='filesystem''.
>         at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> ~[qile-data-flow-1.0.jar:?]
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> ~[qile-data-flow-1.0.jar:?]
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> ~[qile-data-flow-1.0.jar:?]
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> ~[qile-data-flow-1.0.jar:?]
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> ~[qile-data-flow-1.0.jar:?]
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> ~[qile-data-flow-1.0.jar:?]
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> ~[qile-data-flow-1.0.jar:?]
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> ~[qile-data-flow-1.0.jar:?]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
> ~[qile-data-flow-1.0.jar:?]
>         at
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
> ~[qile-data-flow-1.0.jar:?]
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_161]
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_161]
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_161]
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         ... 10 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'filesystem' that implements
> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
>
> Available factory identifiers are:
>
> blackhole
> hbase-1.4
> jdbc
> kafka
> print
>         at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> ~[qile-data-flow-1.0.jar:?]
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> ~[qile-data-flow-1.0.jar:?]
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> ~[qile-data-flow-1.0.jar:?]
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> ~[qile-data-flow-1.0.jar:?]
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> ~[qile-data-flow-1.0.jar:?]
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> ~[qile-data-flow-1.0.jar:?]
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> ~[qile-data-flow-1.0.jar:?]
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> ~[qile-data-flow-1.0.jar:?]
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>         at
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
> ~[qile-data-flow-1.0.jar:?]
>         at
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
> ~[qile-data-flow-1.0.jar:?]
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_161]
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_161]
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_161]
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>         ... 10 more
>
>
>
>
>
>
> query:
>
>
>
>
> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     streamExecutionEnv.enableCheckpointing(5 * 1000,
> CheckpointingMode.EXACTLY_ONCE)
>     streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
>
>     val blinkEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> blinkEnvSettings)
>
>
>
>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>     streamTableEnv.executeSql(
>       """
>         |
>         |
>         |CREATE TABLE hive_table (
>         |  user_id STRING,
>         |  age INT
>         |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> TBLPROPERTIES (
>         |  'connector'='filesystem',
>         |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>         |  'sink.partition-commit.delay'='0s',
>         |  'sink.partition-commit.policy.kind'='metastore,success-file'
>         |)
>         |
>         |""".stripMargin)
>
>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>     streamTableEnv.executeSql(
>       """
>         |
>         |CREATE TABLE kafka_table (
>         |    uid VARCHAR,
>         |    -- uid BIGINT,
>         |    sex VARCHAR,
>         |    age INT,
>         |    created_time TIMESTAMP(3),
>         |    WATERMARK FOR created_time as created_time - INTERVAL '3'
> SECOND
>         |) WITH (
>         |    'connector.type' = 'kafka',
>         |    'connector.version' = 'universal',
>         |     'connector.topic' = 'user',
>         |    -- 'connector.topic' = 'user_long',
>         |    'connector.startup-mode' = 'latest-offset',
>         |    'connector.properties.zookeeper.connect' =
> 'cdh1:2181,cdh2:2181,cdh3:2181',
>         |    'connector.properties.bootstrap.servers' =
> 'cdh1:9092,cdh2:9092,cdh3:9092',
>         |    'connector.properties.group.id' = 'user_flink',
>         |    'format.type' = 'json',
>         |    'format.derive-schema' = 'true'
>         |)
>         |""".stripMargin)
>
>
>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>
>     streamTableEnv.executeSql(
>       """
>         |
>         |INSERT INTO hive_table
>         |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
> DATE_FORMAT(created_time, 'HH')
>         |FROM kafka_table
>         |
>         |""".stripMargin)
>
>     streamTableEnv.executeSql(
>       """
>         |
>         |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18'
>         |
>         |""".stripMargin)
>       .print()
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 17:52:54,"Jingsong Li" <[hidden email]> 写道:
> >你把完整的程序再贴下呢
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> wrote:
> >
> >> Hi,
> >>
> >>
> >> 我现在改成了:
> >> 'sink.partition-commit.delay'='0s'
> >>
> >>
> >> checkpoint完成了20多次,hdfs文件也产生了20多个,
> >> hive表还是查不到数据
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道:
> >>
> >> 你好,
> >> 你设置了1个小时的
> >> SINK_PARTITION_COMMIT_DELAY
> >>
> >>
> >> ------------------------------------------------------------------
> >> 发件人:Zhou Zach <[hidden email]>
> >> 发送时间:2020年7月13日(星期一) 17:09
> >> 收件人:user-zh <[hidden email]>
> >> 主 题:Re:Re: Re: Table options do not contain an option key 'connector'
> for
> >> discovering a connector.
> >>
> >>
> >> 开了checkpoint,
> >> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
> >>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >> streamExecutionEnv.enableCheckpointing(5 * 1000,
> >> CheckpointingMode.EXACTLY_ONCE)
> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
> >>
> >>
> >>
> >>
> >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道:
> >> >有开checkpoint吧?delay设的多少?
> >> >
> >> >Add partition 在 checkpoint完成 + delay的时间后
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote:
> >> >
> >> >> Hi,
> >> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
> >> >> partition到hive表吗,我当前设置了参数
> >> >> 'sink.partition-commit.policy.kind'='metastore'
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]>
> wrote:
> >> >> >Hi,
> >> >> >
> >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
> >> >> >
> >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
> >> >> >
> >> >> >Best,
> >> >> >Jingsong
> >> >> >
> >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]>
> wrote:
> >> >> >
> >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
> >> >> >>
> >> >>
> >>
> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
> >> >> >> h','sink.partition-commit.policy.kind'='success-file');
> >> >> >> 也报错误
> >> >> >> query:
> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >> >> >> streamTableEnv.executeSql(
> >> >> >> """
> >> >> >>     |
> >> >> >>     |
> >> >> >>     |CREATE TABLE hive_table (
> >> >> >>     |  user_id STRING,
> >> >> >>     |  age INT
> >> >> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> >> >> >> TBLPROPERTIES (
> >> >> >>     |  'partition.time-extractor.timestamp-pattern'='$dt
> $hr:00:00',
> >> >> >>     |  'sink.partition-commit.trigger'='partition-time',
> >> >> >>     |  'sink.partition-commit.delay'='1 h',
> >> >> >>     |
> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> >>     |)
> >> >> >>     |
> >> >> >>     |""".stripMargin)
> >> >> >>
> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >> >> >> streamTableEnv.executeSql(
> >> >> >> """
> >> >> >>     |
> >> >> >>     |CREATE TABLE kafka_table (
> >> >> >>     |    uid VARCHAR,
> >> >> >>     |    -- uid BIGINT,
> >> >> >>     |    sex VARCHAR,
> >> >> >>     |    age INT,
> >> >> >>     |    created_time TIMESTAMP(3),
> >> >> >>     |    WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> >> SECOND
> >> >> >>     |) WITH (
> >> >> >>     |    'connector.type' = 'kafka',
> >> >> >>     |    'connector.version' = 'universal',
> >> >> >>     |     'connector.topic' = 'user',
> >> >> >>     |    -- 'connector.topic' = 'user_long',
> >> >> >>     |    'connector.startup-mode' = 'latest-offset',
> >> >> >>     |    'connector.properties.zookeeper.connect' =
> >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >> >>     |    'connector.properties.bootstrap.servers' =
> >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >> >>     |    'connector.properties.group.id' = 'user_flink',
> >> >> >>     |    'format.type' = 'json',
> >> >> >>     |    'format.derive-schema' = 'true'
> >> >> >>     |)
> >> >> >>     |""".stripMargin)
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> streamTableEnv.executeSql(
> >> >> >> """
> >> >> >>     |
> >> >> >>     |INSERT INTO hive_table
> >> >> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
> >> >> >> DATE_FORMAT(created_time, 'HH')
> >> >> >>     |FROM kafka_table
> >> >> >>     |
> >> >> >>     |""".stripMargin)
> >> >> >>
> >> >> >> streamTableEnv.executeSql(
> >> >> >> """
> >> >> >>     |
> >> >> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
> >> >> >>     |
> >> >> >>     |""".stripMargin)
> >> >> >> .print()
> >> >> >> 错误栈:
> >> >> >> Exception in thread "main"
> >> >> org.apache.flink.table.api.ValidationException:
> >> >> >> Unable to create a sink for writing table
> >> >> >> 'default_catalog.default_database.hive_table'.
> >> >> >>
> >> >> >> Table options are:
> >> >> >>
> >> >> >> 'hive.storage.file-format'='parquet'
> >> >> >> 'is_generic'='false'
> >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> >> >> 'sink.partition-commit.delay'='1 h'
> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> >> 'sink.partition-commit.trigger'='partition-time'
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> >>         at
> >> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >> >>         at
> >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> >> >>         at
> >> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> >> >>         at
> >> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> >> >>         at
> >> >> >>
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> >> >>         at
> >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> >> >>         at
> >> >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
> >> >> >>         at
> >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
> >> >> >> Caused by: org.apache.flink.table.api.ValidationException: Table
> >> options
> >> >> >> do not contain an option key 'connector' for discovering a
> connector.
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
> >> >> >>         at
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >> >> >>         ... 19 more
> >> >> >>
> >> >> >>
> >> >> >
> >> >> >--
> >> >> >Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

Zhou Zach
创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
如果是default Dialect创建的表,是不是只是在临时会话有效

















在 2020-07-13 19:27:44,"Jingsong Li" <[hidden email]> 写道:

>Hi,
>
>问题一:
>
>只要current catalog是HiveCatalog。
>理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
>
>明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
>
>问题二:
>
>用filesystem创建出来的是filesystem的表,它和hive
>metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
>
>filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
>但是它的partition commit是不支持metastore的,所以不会有自动add
>partition到hive的默认实现,你需要自定义partition-commit-policy.
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach <[hidden email]> wrote:
>
>> 尴尬。。。。
>> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
>> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
>> 还有两个问题问下,
>> 问题1:
>> 创建的kafka_table,在hive和Flink
>> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
>>
>>
>>
>>
>>
>>
>> 问题2:
>> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
>> java.util.concurrent.CompletionException:
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> Could not execute application.
>>         at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> ~[?:1.8.0_161]
>>         at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>> ~[?:1.8.0_161]
>>         at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>> ~[?:1.8.0_161]
>>         at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>> ~[?:1.8.0_161]
>>         at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> ~[?:1.8.0_161]
>>         at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> ~[?:1.8.0_161]
>>         at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> [?:1.8.0_161]
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> [?:1.8.0_161]
>>         at
>> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>> [qile-data-flow-1.0.jar:?]
>>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> [qile-data-flow-1.0.jar:?]
>>         at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> [qile-data-flow-1.0.jar:?]
>>         at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [qile-data-flow-1.0.jar:?]
>>         at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [qile-data-flow-1.0.jar:?]
>>         at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [qile-data-flow-1.0.jar:?]
>>         at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [qile-data-flow-1.0.jar:?]
>> Caused by:
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> Could not execute application.
>>         ... 11 more
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
>> main method caused an error: Unable to create a sink for writing table
>> 'default_catalog.default_database.hive_table1'.
>>
>> Table options are:
>>
>> 'connector'='filesystem'
>> 'hive.storage.file-format'='parquet'
>> 'is_generic'='false'
>> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> 'sink.partition-commit.delay'='0s'
>> 'sink.partition-commit.policy.kind'='metastore,success-file'
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         ... 10 more
>> Caused by: org.apache.flink.table.api.ValidationException: Unable to
>> create a sink for writing table
>> 'default_catalog.default_database.hive_table1'.
>>
>> Table options are:
>>
>> 'connector'='filesystem'
>> 'hive.storage.file-format'='parquet'
>> 'is_generic'='false'
>> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> 'sink.partition-commit.delay'='0s'
>> 'sink.partition-commit.policy.kind'='metastore,success-file'
>>         at
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> ~[qile-data-flow-1.0.jar:?]
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> ~[qile-data-flow-1.0.jar:?]
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> ~[qile-data-flow-1.0.jar:?]
>>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> ~[qile-data-flow-1.0.jar:?]
>>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
>> ~[qile-data-flow-1.0.jar:?]
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_161]
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_161]
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_161]
>>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         ... 10 more
>> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover
>> a connector using option ''connector'='filesystem''.
>>         at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> ~[qile-data-flow-1.0.jar:?]
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> ~[qile-data-flow-1.0.jar:?]
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> ~[qile-data-flow-1.0.jar:?]
>>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> ~[qile-data-flow-1.0.jar:?]
>>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
>> ~[qile-data-flow-1.0.jar:?]
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_161]
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_161]
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_161]
>>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         ... 10 more
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> any factory for identifier 'filesystem' that implements
>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
>>
>> Available factory identifiers are:
>>
>> blackhole
>> hbase-1.4
>> jdbc
>> kafka
>> print
>>         at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> ~[qile-data-flow-1.0.jar:?]
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> ~[qile-data-flow-1.0.jar:?]
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> ~[qile-data-flow-1.0.jar:?]
>>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> ~[qile-data-flow-1.0.jar:?]
>>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>         at
>> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
>> ~[qile-data-flow-1.0.jar:?]
>>         at
>> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
>> ~[qile-data-flow-1.0.jar:?]
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_161]
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_161]
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_161]
>>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161]
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>>         ... 10 more
>>
>>
>>
>>
>>
>>
>> query:
>>
>>
>>
>>
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>     streamExecutionEnv.enableCheckpointing(5 * 1000,
>> CheckpointingMode.EXACTLY_ONCE)
>>     streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
>>
>>     val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>     val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
>> blinkEnvSettings)
>>
>>
>>
>>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>>     streamTableEnv.executeSql(
>>       """
>>         |
>>         |
>>         |CREATE TABLE hive_table (
>>         |  user_id STRING,
>>         |  age INT
>>         |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> TBLPROPERTIES (
>>         |  'connector'='filesystem',
>>         |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>>         |  'sink.partition-commit.delay'='0s',
>>         |  'sink.partition-commit.policy.kind'='metastore,success-file'
>>         |)
>>         |
>>         |""".stripMargin)
>>
>>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>>     streamTableEnv.executeSql(
>>       """
>>         |
>>         |CREATE TABLE kafka_table (
>>         |    uid VARCHAR,
>>         |    -- uid BIGINT,
>>         |    sex VARCHAR,
>>         |    age INT,
>>         |    created_time TIMESTAMP(3),
>>         |    WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>>         |) WITH (
>>         |    'connector.type' = 'kafka',
>>         |    'connector.version' = 'universal',
>>         |     'connector.topic' = 'user',
>>         |    -- 'connector.topic' = 'user_long',
>>         |    'connector.startup-mode' = 'latest-offset',
>>         |    'connector.properties.zookeeper.connect' =
>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>>         |    'connector.properties.bootstrap.servers' =
>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>>         |    'connector.properties.group.id' = 'user_flink',
>>         |    'format.type' = 'json',
>>         |    'format.derive-schema' = 'true'
>>         |)
>>         |""".stripMargin)
>>
>>
>>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>>
>>     streamTableEnv.executeSql(
>>       """
>>         |
>>         |INSERT INTO hive_table
>>         |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
>> DATE_FORMAT(created_time, 'HH')
>>         |FROM kafka_table
>>         |
>>         |""".stripMargin)
>>
>>     streamTableEnv.executeSql(
>>       """
>>         |
>>         |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18'
>>         |
>>         |""".stripMargin)
>>       .print()
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-13 17:52:54,"Jingsong Li" <[hidden email]> 写道:
>> >你把完整的程序再贴下呢
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> wrote:
>> >
>> >> Hi,
>> >>
>> >>
>> >> 我现在改成了:
>> >> 'sink.partition-commit.delay'='0s'
>> >>
>> >>
>> >> checkpoint完成了20多次,hdfs文件也产生了20多个,
>> >> hive表还是查不到数据
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道:
>> >>
>> >> 你好,
>> >> 你设置了1个小时的
>> >> SINK_PARTITION_COMMIT_DELAY
>> >>
>> >>
>> >> ------------------------------------------------------------------
>> >> 发件人:Zhou Zach <[hidden email]>
>> >> 发送时间:2020年7月13日(星期一) 17:09
>> >> 收件人:user-zh <[hidden email]>
>> >> 主 题:Re:Re: Re: Table options do not contain an option key 'connector'
>> for
>> >> discovering a connector.
>> >>
>> >>
>> >> 开了checkpoint,
>> >> val streamExecutionEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment
>> >>
>> >>
>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> >> streamExecutionEnv.enableCheckpointing(5 * 1000,
>> >> CheckpointingMode.EXACTLY_ONCE)
>> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
>> >>
>> >>
>> >>
>> >>
>> >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道:
>> >> >有开checkpoint吧?delay设的多少?
>> >> >
>> >> >Add partition 在 checkpoint完成 + delay的时间后
>> >> >
>> >> >Best,
>> >> >Jingsong
>> >> >
>> >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote:
>> >> >
>> >> >> Hi,
>> >> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> >> >> partition到hive表吗,我当前设置了参数
>> >> >> 'sink.partition-commit.policy.kind'='metastore'
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]>
>> wrote:
>> >> >> >Hi,
>> >> >> >
>> >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >> >> >
>> >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >> >> >
>> >> >> >Best,
>> >> >> >Jingsong
>> >> >> >
>> >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]>
>> wrote:
>> >> >> >
>> >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >> >> >>
>> >> >>
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> >> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> >> >> 也报错误
>> >> >> >> query:
>> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> >> >> streamTableEnv.executeSql(
>> >> >> >> """
>> >> >> >>     |
>> >> >> >>     |
>> >> >> >>     |CREATE TABLE hive_table (
>> >> >> >>     |  user_id STRING,
>> >> >> >>     |  age INT
>> >> >> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> >> >> TBLPROPERTIES (
>> >> >> >>     |  'partition.time-extractor.timestamp-pattern'='$dt
>> $hr:00:00',
>> >> >> >>     |  'sink.partition-commit.trigger'='partition-time',
>> >> >> >>     |  'sink.partition-commit.delay'='1 h',
>> >> >> >>     |
>> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> >> >>     |)
>> >> >> >>     |
>> >> >> >>     |""".stripMargin)
>> >> >> >>
>> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> >> >> streamTableEnv.executeSql(
>> >> >> >> """
>> >> >> >>     |
>> >> >> >>     |CREATE TABLE kafka_table (
>> >> >> >>     |    uid VARCHAR,
>> >> >> >>     |    -- uid BIGINT,
>> >> >> >>     |    sex VARCHAR,
>> >> >> >>     |    age INT,
>> >> >> >>     |    created_time TIMESTAMP(3),
>> >> >> >>     |    WATERMARK FOR created_time as created_time - INTERVAL '3'
>> >> >> SECOND
>> >> >> >>     |) WITH (
>> >> >> >>     |    'connector.type' = 'kafka',
>> >> >> >>     |    'connector.version' = 'universal',
>> >> >> >>     |     'connector.topic' = 'user',
>> >> >> >>     |    -- 'connector.topic' = 'user_long',
>> >> >> >>     |    'connector.startup-mode' = 'latest-offset',
>> >> >> >>     |    'connector.properties.zookeeper.connect' =
>> >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> >> >>     |    'connector.properties.bootstrap.servers' =
>> >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> >> >>     |    'connector.properties.group.id' = 'user_flink',
>> >> >> >>     |    'format.type' = 'json',
>> >> >> >>     |    'format.derive-schema' = 'true'
>> >> >> >>     |)
>> >> >> >>     |""".stripMargin)
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> streamTableEnv.executeSql(
>> >> >> >> """
>> >> >> >>     |
>> >> >> >>     |INSERT INTO hive_table
>> >> >> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
>> >> >> >> DATE_FORMAT(created_time, 'HH')
>> >> >> >>     |FROM kafka_table
>> >> >> >>     |
>> >> >> >>     |""".stripMargin)
>> >> >> >>
>> >> >> >> streamTableEnv.executeSql(
>> >> >> >> """
>> >> >> >>     |
>> >> >> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >> >> >>     |
>> >> >> >>     |""".stripMargin)
>> >> >> >> .print()
>> >> >> >> 错误栈:
>> >> >> >> Exception in thread "main"
>> >> >> org.apache.flink.table.api.ValidationException:
>> >> >> >> Unable to create a sink for writing table
>> >> >> >> 'default_catalog.default_database.hive_table'.
>> >> >> >>
>> >> >> >> Table options are:
>> >> >> >>
>> >> >> >> 'hive.storage.file-format'='parquet'
>> >> >> >> 'is_generic'='false'
>> >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> >> >> 'sink.partition-commit.delay'='1 h'
>> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> >> >> 'sink.partition-commit.trigger'='partition-time'
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> >> >>         at
>> >> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >> >> >>         at
>> >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >> >> >>         at
>> >> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> >> >>         at
>> >> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >> >> >>         at
>> >> >> >>
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >> >> >>         at
>> >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> >> >> >>         at
>> >> >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
>> >> >> >>         at
>> >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
>> >> >> >> Caused by: org.apache.flink.table.api.ValidationException: Table
>> >> options
>> >> >> >> do not contain an option key 'connector' for discovering a
>> connector.
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
>> >> >> >>         at
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> >> >> >>         ... 19 more
>> >> >> >>
>> >> >> >>
>> >> >> >
>> >> >> >--
>> >> >> >Best, Jingsong Lee
>> >> >>
>> >> >
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>--
>Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

Jingsong Li
创建kafka_table需要在default dialect下。

不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法)

Best,
Jingsong

On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach <[hidden email]> wrote:

> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
> 如果是default Dialect创建的表,是不是只是在临时会话有效
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 19:27:44,"Jingsong Li" <[hidden email]> 写道:
> >Hi,
> >
> >问题一:
> >
> >只要current catalog是HiveCatalog。
> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
> >
> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
> >
> >问题二:
> >
> >用filesystem创建出来的是filesystem的表,它和hive
> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
> >
> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
> >但是它的partition commit是不支持metastore的,所以不会有自动add
> >partition到hive的默认实现,你需要自定义partition-commit-policy.
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach <[hidden email]> wrote:
> >
> >> 尴尬。。。。
> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
> >> 还有两个问题问下,
> >> 问题1:
> >> 创建的kafka_table,在hive和Flink
> >>
> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
> >>
> >>
> >>
> >>
> >>
> >>
> >> 问题2:
> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
> >> java.util.concurrent.CompletionException:
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> Could not execute application.
> >>         at
> >>
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> [?:1.8.0_161]
> >>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> [?:1.8.0_161]
> >>         at
> >>
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
> >> [qile-data-flow-1.0.jar:?]
> >>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >> [qile-data-flow-1.0.jar:?]
> >>         at
> >>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> >> [qile-data-flow-1.0.jar:?]
> >>         at
> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> [qile-data-flow-1.0.jar:?]
> >>         at
> >>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> [qile-data-flow-1.0.jar:?]
> >>         at
> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> [qile-data-flow-1.0.jar:?]
> >>         at
> >>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> [qile-data-flow-1.0.jar:?]
> >> Caused by:
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> Could not execute application.
> >>         ... 11 more
> >> Caused by: org.apache.flink.client.program.ProgramInvocationException:
> The
> >> main method caused an error: Unable to create a sink for writing table
> >> 'default_catalog.default_database.hive_table1'.
> >>
> >> Table options are:
> >>
> >> 'connector'='filesystem'
> >> 'hive.storage.file-format'='parquet'
> >> 'is_generic'='false'
> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> 'sink.partition-commit.delay'='0s'
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         ... 10 more
> >> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> >> create a sink for writing table
> >> 'default_catalog.default_database.hive_table1'.
> >>
> >> Table options are:
> >>
> >> 'connector'='filesystem'
> >> 'hive.storage.file-format'='parquet'
> >> 'is_generic'='false'
> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> 'sink.partition-commit.delay'='0s'
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> ~[?:1.8.0_161]
> >>         at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_161]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         ... 10 more
> >> Caused by: org.apache.flink.table.api.ValidationException: Cannot
> discover
> >> a connector using option ''connector'='filesystem''.
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> ~[?:1.8.0_161]
> >>         at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_161]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         ... 10 more
> >> Caused by: org.apache.flink.table.api.ValidationException: Could not
> find
> >> any factory for identifier 'filesystem' that implements
> >> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the
> classpath.
> >>
> >> Available factory identifiers are:
> >>
> >> blackhole
> >> hbase-1.4
> >> jdbc
> >> kafka
> >> print
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
> >> ~[qile-data-flow-1.0.jar:?]
> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> ~[?:1.8.0_161]
> >>         at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> ~[?:1.8.0_161]
> >>         at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_161]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >>         ... 10 more
> >>
> >>
> >>
> >>
> >>
> >>
> >> query:
> >>
> >>
> >>
> >>
> >> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
> >>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >>     streamExecutionEnv.enableCheckpointing(5 * 1000,
> >> CheckpointingMode.EXACTLY_ONCE)
> >>     streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 *
> 1000)
> >>
> >>     val blinkEnvSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>     val streamTableEnv =
> StreamTableEnvironment.create(streamExecutionEnv,
> >> blinkEnvSettings)
> >>
> >>
> >>
> >>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >>     streamTableEnv.executeSql(
> >>       """
> >>         |
> >>         |
> >>         |CREATE TABLE hive_table (
> >>         |  user_id STRING,
> >>         |  age INT
> >>         |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> >> TBLPROPERTIES (
> >>         |  'connector'='filesystem',
> >>         |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >>         |  'sink.partition-commit.delay'='0s',
> >>         |  'sink.partition-commit.policy.kind'='metastore,success-file'
> >>         |)
> >>         |
> >>         |""".stripMargin)
> >>
> >>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >>     streamTableEnv.executeSql(
> >>       """
> >>         |
> >>         |CREATE TABLE kafka_table (
> >>         |    uid VARCHAR,
> >>         |    -- uid BIGINT,
> >>         |    sex VARCHAR,
> >>         |    age INT,
> >>         |    created_time TIMESTAMP(3),
> >>         |    WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >>         |) WITH (
> >>         |    'connector.type' = 'kafka',
> >>         |    'connector.version' = 'universal',
> >>         |     'connector.topic' = 'user',
> >>         |    -- 'connector.topic' = 'user_long',
> >>         |    'connector.startup-mode' = 'latest-offset',
> >>         |    'connector.properties.zookeeper.connect' =
> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >>         |    'connector.properties.bootstrap.servers' =
> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >>         |    'connector.properties.group.id' = 'user_flink',
> >>         |    'format.type' = 'json',
> >>         |    'format.derive-schema' = 'true'
> >>         |)
> >>         |""".stripMargin)
> >>
> >>
> >>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >>
> >>     streamTableEnv.executeSql(
> >>       """
> >>         |
> >>         |INSERT INTO hive_table
> >>         |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
> >> DATE_FORMAT(created_time, 'HH')
> >>         |FROM kafka_table
> >>         |
> >>         |""".stripMargin)
> >>
> >>     streamTableEnv.executeSql(
> >>       """
> >>         |
> >>         |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18'
> >>         |
> >>         |""".stripMargin)
> >>       .print()
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-13 17:52:54,"Jingsong Li" <[hidden email]> 写道:
> >> >你把完整的程序再贴下呢
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> wrote:
> >> >
> >> >> Hi,
> >> >>
> >> >>
> >> >> 我现在改成了:
> >> >> 'sink.partition-commit.delay'='0s'
> >> >>
> >> >>
> >> >> checkpoint完成了20多次,hdfs文件也产生了20多个,
> >> >> hive表还是查不到数据
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道:
> >> >>
> >> >> 你好,
> >> >> 你设置了1个小时的
> >> >> SINK_PARTITION_COMMIT_DELAY
> >> >>
> >> >>
> >> >> ------------------------------------------------------------------
> >> >> 发件人:Zhou Zach <[hidden email]>
> >> >> 发送时间:2020年7月13日(星期一) 17:09
> >> >> 收件人:user-zh <[hidden email]>
> >> >> 主 题:Re:Re: Re: Table options do not contain an option key 'connector'
> >> for
> >> >> discovering a connector.
> >> >>
> >> >>
> >> >> 开了checkpoint,
> >> >> val streamExecutionEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment
> >> >>
> >> >>
> >>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >> >> streamExecutionEnv.enableCheckpointing(5 * 1000,
> >> >> CheckpointingMode.EXACTLY_ONCE)
> >> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 *
> 1000)
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道:
> >> >> >有开checkpoint吧?delay设的多少?
> >> >> >
> >> >> >Add partition 在 checkpoint完成 + delay的时间后
> >> >> >
> >> >> >Best,
> >> >> >Jingsong
> >> >> >
> >> >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]>
> wrote:
> >> >> >
> >> >> >> Hi,
> >> >> >>
> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
> >> >> >> partition到hive表吗,我当前设置了参数
> >> >> >> 'sink.partition-commit.policy.kind'='metastore'
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]>
> >> wrote:
> >> >> >> >Hi,
> >> >> >> >
> >> >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
> >> >> >> >
> >> >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
> >> >> >> >
> >> >> >> >Best,
> >> >> >> >Jingsong
> >> >> >> >
> >> >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]>
> >> wrote:
> >> >> >> >
> >> >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
> >> >> >> >> h','sink.partition-commit.policy.kind'='success-file');
> >> >> >> >> 也报错误
> >> >> >> >> query:
> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >> >> >> >> streamTableEnv.executeSql(
> >> >> >> >> """
> >> >> >> >>     |
> >> >> >> >>     |
> >> >> >> >>     |CREATE TABLE hive_table (
> >> >> >> >>     |  user_id STRING,
> >> >> >> >>     |  age INT
> >> >> >> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> >> >> >> >> TBLPROPERTIES (
> >> >> >> >>     |  'partition.time-extractor.timestamp-pattern'='$dt
> >> $hr:00:00',
> >> >> >> >>     |  'sink.partition-commit.trigger'='partition-time',
> >> >> >> >>     |  'sink.partition-commit.delay'='1 h',
> >> >> >> >>     |
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> >> >>     |)
> >> >> >> >>     |
> >> >> >> >>     |""".stripMargin)
> >> >> >> >>
> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >> >> >> >> streamTableEnv.executeSql(
> >> >> >> >> """
> >> >> >> >>     |
> >> >> >> >>     |CREATE TABLE kafka_table (
> >> >> >> >>     |    uid VARCHAR,
> >> >> >> >>     |    -- uid BIGINT,
> >> >> >> >>     |    sex VARCHAR,
> >> >> >> >>     |    age INT,
> >> >> >> >>     |    created_time TIMESTAMP(3),
> >> >> >> >>     |    WATERMARK FOR created_time as created_time - INTERVAL
> '3'
> >> >> >> SECOND
> >> >> >> >>     |) WITH (
> >> >> >> >>     |    'connector.type' = 'kafka',
> >> >> >> >>     |    'connector.version' = 'universal',
> >> >> >> >>     |     'connector.topic' = 'user',
> >> >> >> >>     |    -- 'connector.topic' = 'user_long',
> >> >> >> >>     |    'connector.startup-mode' = 'latest-offset',
> >> >> >> >>     |    'connector.properties.zookeeper.connect' =
> >> >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >> >> >>     |    'connector.properties.bootstrap.servers' =
> >> >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >> >> >>     |    'connector.properties.group.id' = 'user_flink',
> >> >> >> >>     |    'format.type' = 'json',
> >> >> >> >>     |    'format.derive-schema' = 'true'
> >> >> >> >>     |)
> >> >> >> >>     |""".stripMargin)
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> streamTableEnv.executeSql(
> >> >> >> >> """
> >> >> >> >>     |
> >> >> >> >>     |INSERT INTO hive_table
> >> >> >> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
> >> >> >> >> DATE_FORMAT(created_time, 'HH')
> >> >> >> >>     |FROM kafka_table
> >> >> >> >>     |
> >> >> >> >>     |""".stripMargin)
> >> >> >> >>
> >> >> >> >> streamTableEnv.executeSql(
> >> >> >> >> """
> >> >> >> >>     |
> >> >> >> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
> >> >> >> >>     |
> >> >> >> >>     |""".stripMargin)
> >> >> >> >> .print()
> >> >> >> >> 错误栈:
> >> >> >> >> Exception in thread "main"
> >> >> >> org.apache.flink.table.api.ValidationException:
> >> >> >> >> Unable to create a sink for writing table
> >> >> >> >> 'default_catalog.default_database.hive_table'.
> >> >> >> >>
> >> >> >> >> Table options are:
> >> >> >> >>
> >> >> >> >> 'hive.storage.file-format'='parquet'
> >> >> >> >> 'is_generic'='false'
> >> >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> >> >> >> 'sink.partition-commit.delay'='1 h'
> >> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> >> >> 'sink.partition-commit.trigger'='partition-time'
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> >> >>         at
> >> >> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >> >> >>         at
> >> >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> >> >> >>         at
> >> >> >> >>
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> >> >> >>         at
> >> >> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> >> >> >>         at
> >> >> >> >>
> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> >> >> >>         at
> >> >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> >> >> >>         at
> >> >> >> >>
> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
> >> >> >> >>         at
> >> >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
> >> >> >> >> Caused by: org.apache.flink.table.api.ValidationException:
> Table
> >> >> options
> >> >> >> >> do not contain an option key 'connector' for discovering a
> >> connector.
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
> >> >> >> >>         at
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >> >> >> >>         ... 19 more
> >> >> >> >>
> >> >> >> >>
> >> >> >> >
> >> >> >> >--
> >> >> >> >Best, Jingsong Lee
> >> >> >>
> >> >> >
> >> >> >
> >> >> >--
> >> >> >Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

Zhou Zach
好的,感谢答疑

















在 2020-07-13 19:49:10,"Jingsong Li" <[hidden email]> 写道:

>创建kafka_table需要在default dialect下。
>
>不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法)
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach <[hidden email]> wrote:
>
>> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
>> 如果是default Dialect创建的表,是不是只是在临时会话有效
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-13 19:27:44,"Jingsong Li" <[hidden email]> 写道:
>> >Hi,
>> >
>> >问题一:
>> >
>> >只要current catalog是HiveCatalog。
>> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
>> >
>> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
>> >
>> >问题二:
>> >
>> >用filesystem创建出来的是filesystem的表,它和hive
>> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
>> >
>> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
>> >但是它的partition commit是不支持metastore的,所以不会有自动add
>> >partition到hive的默认实现,你需要自定义partition-commit-policy.
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach <[hidden email]> wrote:
>> >
>> >> 尴尬。。。。
>> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
>> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
>> >> 还有两个问题问下,
>> >> 问题1:
>> >> 创建的kafka_table,在hive和Flink
>> >>
>> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 问题2:
>> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
>> >> java.util.concurrent.CompletionException:
>> >>
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> >> Could not execute application.
>> >>         at
>> >>
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> >> ~[?:1.8.0_161]
>> >>         at
>> >>
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>> >> ~[?:1.8.0_161]
>> >>         at
>> >>
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>> >> ~[?:1.8.0_161]
>> >>         at
>> >>
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>> >> ~[?:1.8.0_161]
>> >>         at
>> >>
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> >> ~[?:1.8.0_161]
>> >>         at
>> >>
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> >> ~[?:1.8.0_161]
>> >>         at
>> >>
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >> [?:1.8.0_161]
>> >>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> >> [?:1.8.0_161]
>> >>         at
>> >>
>> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>> >> [qile-data-flow-1.0.jar:?]
>> >>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> >> [qile-data-flow-1.0.jar:?]
>> >>         at
>> >>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> >> [qile-data-flow-1.0.jar:?]
>> >>         at
>> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >> [qile-data-flow-1.0.jar:?]
>> >>         at
>> >>
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >> [qile-data-flow-1.0.jar:?]
>> >>         at
>> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >> [qile-data-flow-1.0.jar:?]
>> >>         at
>> >>
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >> [qile-data-flow-1.0.jar:?]
>> >> Caused by:
>> >>
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> >> Could not execute application.
>> >>         ... 11 more
>> >> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> The
>> >> main method caused an error: Unable to create a sink for writing table
>> >> 'default_catalog.default_database.hive_table1'.
>> >>
>> >> Table options are:
>> >>
>> >> 'connector'='filesystem'
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='0s'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >>         at
>> >>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         ... 10 more
>> >> Caused by: org.apache.flink.table.api.ValidationException: Unable to
>> >> create a sink for writing table
>> >> 'default_catalog.default_database.hive_table1'.
>> >>
>> >> Table options are:
>> >>
>> >> 'connector'='filesystem'
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='0s'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >>
>> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >> ~[?:1.8.0_161]
>> >>         at
>> >>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >> ~[?:1.8.0_161]
>> >>         at
>> >>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> ~[?:1.8.0_161]
>> >>         at java.lang.reflect.Method.invoke(Method.java:498)
>> ~[?:1.8.0_161]
>> >>         at
>> >>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         ... 10 more
>> >> Caused by: org.apache.flink.table.api.ValidationException: Cannot
>> discover
>> >> a connector using option ''connector'='filesystem''.
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >>
>> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >> ~[?:1.8.0_161]
>> >>         at
>> >>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >> ~[?:1.8.0_161]
>> >>         at
>> >>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> ~[?:1.8.0_161]
>> >>         at java.lang.reflect.Method.invoke(Method.java:498)
>> ~[?:1.8.0_161]
>> >>         at
>> >>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         ... 10 more
>> >> Caused by: org.apache.flink.table.api.ValidationException: Could not
>> find
>> >> any factory for identifier 'filesystem' that implements
>> >> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the
>> classpath.
>> >>
>> >> Available factory identifiers are:
>> >>
>> >> blackhole
>> >> hbase-1.4
>> >> jdbc
>> >> kafka
>> >> print
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at
>> >>
>> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
>> >> ~[qile-data-flow-1.0.jar:?]
>> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >> ~[?:1.8.0_161]
>> >>         at
>> >>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >> ~[?:1.8.0_161]
>> >>         at
>> >>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> ~[?:1.8.0_161]
>> >>         at java.lang.reflect.Method.invoke(Method.java:498)
>> ~[?:1.8.0_161]
>> >>         at
>> >>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         at
>> >>
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >>         ... 10 more
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> query:
>> >>
>> >>
>> >>
>> >>
>> >> val streamExecutionEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment
>> >>
>> >>
>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> >>     streamExecutionEnv.enableCheckpointing(5 * 1000,
>> >> CheckpointingMode.EXACTLY_ONCE)
>> >>     streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 *
>> 1000)
>> >>
>> >>     val blinkEnvSettings =
>> >>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >>     val streamTableEnv =
>> StreamTableEnvironment.create(streamExecutionEnv,
>> >> blinkEnvSettings)
>> >>
>> >>
>> >>
>> >>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >>     streamTableEnv.executeSql(
>> >>       """
>> >>         |
>> >>         |
>> >>         |CREATE TABLE hive_table (
>> >>         |  user_id STRING,
>> >>         |  age INT
>> >>         |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> TBLPROPERTIES (
>> >>         |  'connector'='filesystem',
>> >>         |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>> >>         |  'sink.partition-commit.delay'='0s',
>> >>         |  'sink.partition-commit.policy.kind'='metastore,success-file'
>> >>         |)
>> >>         |
>> >>         |""".stripMargin)
>> >>
>> >>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >>     streamTableEnv.executeSql(
>> >>       """
>> >>         |
>> >>         |CREATE TABLE kafka_table (
>> >>         |    uid VARCHAR,
>> >>         |    -- uid BIGINT,
>> >>         |    sex VARCHAR,
>> >>         |    age INT,
>> >>         |    created_time TIMESTAMP(3),
>> >>         |    WATERMARK FOR created_time as created_time - INTERVAL '3'
>> >> SECOND
>> >>         |) WITH (
>> >>         |    'connector.type' = 'kafka',
>> >>         |    'connector.version' = 'universal',
>> >>         |     'connector.topic' = 'user',
>> >>         |    -- 'connector.topic' = 'user_long',
>> >>         |    'connector.startup-mode' = 'latest-offset',
>> >>         |    'connector.properties.zookeeper.connect' =
>> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >>         |    'connector.properties.bootstrap.servers' =
>> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >>         |    'connector.properties.group.id' = 'user_flink',
>> >>         |    'format.type' = 'json',
>> >>         |    'format.derive-schema' = 'true'
>> >>         |)
>> >>         |""".stripMargin)
>> >>
>> >>
>> >>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >>
>> >>     streamTableEnv.executeSql(
>> >>       """
>> >>         |
>> >>         |INSERT INTO hive_table
>> >>         |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
>> >> DATE_FORMAT(created_time, 'HH')
>> >>         |FROM kafka_table
>> >>         |
>> >>         |""".stripMargin)
>> >>
>> >>     streamTableEnv.executeSql(
>> >>       """
>> >>         |
>> >>         |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18'
>> >>         |
>> >>         |""".stripMargin)
>> >>       .print()
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-07-13 17:52:54,"Jingsong Li" <[hidden email]> 写道:
>> >> >你把完整的程序再贴下呢
>> >> >
>> >> >Best,
>> >> >Jingsong
>> >> >
>> >> >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> wrote:
>> >> >
>> >> >> Hi,
>> >> >>
>> >> >>
>> >> >> 我现在改成了:
>> >> >> 'sink.partition-commit.delay'='0s'
>> >> >>
>> >> >>
>> >> >> checkpoint完成了20多次,hdfs文件也产生了20多个,
>> >> >> hive表还是查不到数据
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道:
>> >> >>
>> >> >> 你好,
>> >> >> 你设置了1个小时的
>> >> >> SINK_PARTITION_COMMIT_DELAY
>> >> >>
>> >> >>
>> >> >> ------------------------------------------------------------------
>> >> >> 发件人:Zhou Zach <[hidden email]>
>> >> >> 发送时间:2020年7月13日(星期一) 17:09
>> >> >> 收件人:user-zh <[hidden email]>
>> >> >> 主 题:Re:Re: Re: Table options do not contain an option key 'connector'
>> >> for
>> >> >> discovering a connector.
>> >> >>
>> >> >>
>> >> >> 开了checkpoint,
>> >> >> val streamExecutionEnv =
>> >> StreamExecutionEnvironment.getExecutionEnvironment
>> >> >>
>> >> >>
>> >>
>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> >> >> streamExecutionEnv.enableCheckpointing(5 * 1000,
>> >> >> CheckpointingMode.EXACTLY_ONCE)
>> >> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 *
>> 1000)
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道:
>> >> >> >有开checkpoint吧?delay设的多少?
>> >> >> >
>> >> >> >Add partition 在 checkpoint完成 + delay的时间后
>> >> >> >
>> >> >> >Best,
>> >> >> >Jingsong
>> >> >> >
>> >> >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]>
>> wrote:
>> >> >> >
>> >> >> >> Hi,
>> >> >> >>
>> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> >> >> >> partition到hive表吗,我当前设置了参数
>> >> >> >> 'sink.partition-commit.policy.kind'='metastore'
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]>
>> >> wrote:
>> >> >> >> >Hi,
>> >> >> >> >
>> >> >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >> >> >> >
>> >> >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >> >> >> >
>> >> >> >> >Best,
>> >> >> >> >Jingsong
>> >> >> >> >
>> >> >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]>
>> >> wrote:
>> >> >> >> >
>> >> >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> >> >> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> >> >> >> 也报错误
>> >> >> >> >> query:
>> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> >> >> >> streamTableEnv.executeSql(
>> >> >> >> >> """
>> >> >> >> >>     |
>> >> >> >> >>     |
>> >> >> >> >>     |CREATE TABLE hive_table (
>> >> >> >> >>     |  user_id STRING,
>> >> >> >> >>     |  age INT
>> >> >> >> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> >> >> >> TBLPROPERTIES (
>> >> >> >> >>     |  'partition.time-extractor.timestamp-pattern'='$dt
>> >> $hr:00:00',
>> >> >> >> >>     |  'sink.partition-commit.trigger'='partition-time',
>> >> >> >> >>     |  'sink.partition-commit.delay'='1 h',
>> >> >> >> >>     |
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> >> >> >>     |)
>> >> >> >> >>     |
>> >> >> >> >>     |""".stripMargin)
>> >> >> >> >>
>> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> >> >> >> streamTableEnv.executeSql(
>> >> >> >> >> """
>> >> >> >> >>     |
>> >> >> >> >>     |CREATE TABLE kafka_table (
>> >> >> >> >>     |    uid VARCHAR,
>> >> >> >> >>     |    -- uid BIGINT,
>> >> >> >> >>     |    sex VARCHAR,
>> >> >> >> >>     |    age INT,
>> >> >> >> >>     |    created_time TIMESTAMP(3),
>> >> >> >> >>     |    WATERMARK FOR created_time as created_time - INTERVAL
>> '3'
>> >> >> >> SECOND
>> >> >> >> >>     |) WITH (
>> >> >> >> >>     |    'connector.type' = 'kafka',
>> >> >> >> >>     |    'connector.version' = 'universal',
>> >> >> >> >>     |     'connector.topic' = 'user',
>> >> >> >> >>     |    -- 'connector.topic' = 'user_long',
>> >> >> >> >>     |    'connector.startup-mode' = 'latest-offset',
>> >> >> >> >>     |    'connector.properties.zookeeper.connect' =
>> >> >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> >> >> >>     |    'connector.properties.bootstrap.servers' =
>> >> >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> >> >> >>     |    'connector.properties.group.id' = 'user_flink',
>> >> >> >> >>     |    'format.type' = 'json',
>> >> >> >> >>     |    'format.derive-schema' = 'true'
>> >> >> >> >>     |)
>> >> >> >> >>     |""".stripMargin)
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >> streamTableEnv.executeSql(
>> >> >> >> >> """
>> >> >> >> >>     |
>> >> >> >> >>     |INSERT INTO hive_table
>> >> >> >> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
>> >> >> >> >> DATE_FORMAT(created_time, 'HH')
>> >> >> >> >>     |FROM kafka_table
>> >> >> >> >>     |
>> >> >> >> >>     |""".stripMargin)
>> >> >> >> >>
>> >> >> >> >> streamTableEnv.executeSql(
>> >> >> >> >> """
>> >> >> >> >>     |
>> >> >> >> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >> >> >> >>     |
>> >> >> >> >>     |""".stripMargin)
>> >> >> >> >> .print()
>> >> >> >> >> 错误栈:
>> >> >> >> >> Exception in thread "main"
>> >> >> >> org.apache.flink.table.api.ValidationException:
>> >> >> >> >> Unable to create a sink for writing table
>> >> >> >> >> 'default_catalog.default_database.hive_table'.
>> >> >> >> >>
>> >> >> >> >> Table options are:
>> >> >> >> >>
>> >> >> >> >> 'hive.storage.file-format'='parquet'
>> >> >> >> >> 'is_generic'='false'
>> >> >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> >> >> >> 'sink.partition-commit.delay'='1 h'
>> >> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> >> >> >> 'sink.partition-commit.trigger'='partition-time'
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> >> >> >>         at
>> >> >> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >> >> >> >>         at
>> >> >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >> >> >> >>         at
>> >> >> >> >>
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> >> >> >>         at
>> >> >> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >> >> >> >>         at
>> >> >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> >> >> >> >>         at
>> >> >> >> >>
>> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
>> >> >> >> >>         at
>> >> >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
>> >> >> >> >> Caused by: org.apache.flink.table.api.ValidationException:
>> Table
>> >> >> options
>> >> >> >> >> do not contain an option key 'connector' for discovering a
>> >> connector.
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
>> >> >> >> >>         at
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> >> >> >> >>         ... 19 more
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >
>> >> >> >> >--
>> >> >> >> >Best, Jingsong Lee
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >--
>> >> >> >Best, Jingsong Lee
>> >> >>
>> >> >
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>--
>Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

Jun Zhang
hi,Zhou Zach :
问一下,你把你的程序,并行度设置成 1,还能正常读取hive的数据吗?

Zhou Zach <[hidden email]> 于2020年7月13日周一 下午8:17写道:

> 好的,感谢答疑
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 19:49:10,"Jingsong Li" <[hidden email]> 写道:
> >创建kafka_table需要在default dialect下。
> >
> >不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法)
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach <[hidden email]> wrote:
> >
> >> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
> >> 如果是default Dialect创建的表,是不是只是在临时会话有效
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-13 19:27:44,"Jingsong Li" <[hidden email]> 写道:
> >> >Hi,
> >> >
> >> >问题一:
> >> >
> >> >只要current catalog是HiveCatalog。
> >> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
> >> >
> >> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
> >> >
> >> >问题二:
> >> >
> >> >用filesystem创建出来的是filesystem的表,它和hive
> >> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
> >> >
> >> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
> >> >但是它的partition commit是不支持metastore的,所以不会有自动add
> >> >partition到hive的默认实现,你需要自定义partition-commit-policy.
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach <[hidden email]> wrote:
> >> >
> >> >> 尴尬。。。。
> >> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
> >> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
> >> >> 还有两个问题问下,
> >> >> 问题1:
> >> >> 创建的kafka_table,在hive和Flink
> >> >>
> >>
> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 问题2:
> >> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector
> 也是可以创建hive表,我尝试了一下,报错了:
> >> >> java.util.concurrent.CompletionException:
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> >> Could not execute application.
> >> >>         at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> >> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> >> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> >> [?:1.8.0_161]
> >> >>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> >> [?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >>         at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >>
> >>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >>
> >>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >>
> >>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> Caused by:
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> >> Could not execute application.
> >> >>         ... 11 more
> >> >> Caused by:
> org.apache.flink.client.program.ProgramInvocationException:
> >> The
> >> >> main method caused an error: Unable to create a sink for writing
> table
> >> >> 'default_catalog.default_database.hive_table1'.
> >> >>
> >> >> Table options are:
> >> >>
> >> >> 'connector'='filesystem'
> >> >> 'hive.storage.file-format'='parquet'
> >> >> 'is_generic'='false'
> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> >> 'sink.partition-commit.delay'='0s'
> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         ... 10 more
> >> >> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> >> >> create a sink for writing table
> >> >> 'default_catalog.default_database.hive_table1'.
> >> >>
> >> >> Table options are:
> >> >>
> >> >> 'connector'='filesystem'
> >> >> 'hive.storage.file-format'='parquet'
> >> >> 'is_generic'='false'
> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> >> 'sink.partition-commit.delay'='0s'
> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >>
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> >> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >> ~[?:1.8.0_161]
> >> >>         at java.lang.reflect.Method.invoke(Method.java:498)
> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         ... 10 more
> >> >> Caused by: org.apache.flink.table.api.ValidationException: Cannot
> >> discover
> >> >> a connector using option ''connector'='filesystem''.
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >>
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> >> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >> ~[?:1.8.0_161]
> >> >>         at java.lang.reflect.Method.invoke(Method.java:498)
> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         ... 10 more
> >> >> Caused by: org.apache.flink.table.api.ValidationException: Could not
> >> find
> >> >> any factory for identifier 'filesystem' that implements
> >> >> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the
> >> classpath.
> >> >>
> >> >> Available factory identifiers are:
> >> >>
> >> >> blackhole
> >> >> hbase-1.4
> >> >> jdbc
> >> >> kafka
> >> >> print
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at
> >> >>
> >>
> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala)
> >> >> ~[qile-data-flow-1.0.jar:?]
> >> >>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> >> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >> ~[?:1.8.0_161]
> >> >>         at java.lang.reflect.Method.invoke(Method.java:498)
> >> ~[?:1.8.0_161]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         at
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >>         ... 10 more
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> query:
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> val streamExecutionEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment
> >> >>
> >> >>
> >>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >> >>     streamExecutionEnv.enableCheckpointing(5 * 1000,
> >> >> CheckpointingMode.EXACTLY_ONCE)
> >> >>     streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 *
> >> 1000)
> >> >>
> >> >>     val blinkEnvSettings =
> >> >>
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >> >>     val streamTableEnv =
> >> StreamTableEnvironment.create(streamExecutionEnv,
> >> >> blinkEnvSettings)
> >> >>
> >> >>
> >> >>
> >> >>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >> >>     streamTableEnv.executeSql(
> >> >>       """
> >> >>         |
> >> >>         |
> >> >>         |CREATE TABLE hive_table (
> >> >>         |  user_id STRING,
> >> >>         |  age INT
> >> >>         |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> >> >> TBLPROPERTIES (
> >> >>         |  'connector'='filesystem',
> >> >>         |  'partition.time-extractor.timestamp-pattern'='$dt
> $hr:00:00',
> >> >>         |  'sink.partition-commit.delay'='0s',
> >> >>         |
> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >>         |)
> >> >>         |
> >> >>         |""".stripMargin)
> >> >>
> >> >>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >> >>     streamTableEnv.executeSql(
> >> >>       """
> >> >>         |
> >> >>         |CREATE TABLE kafka_table (
> >> >>         |    uid VARCHAR,
> >> >>         |    -- uid BIGINT,
> >> >>         |    sex VARCHAR,
> >> >>         |    age INT,
> >> >>         |    created_time TIMESTAMP(3),
> >> >>         |    WATERMARK FOR created_time as created_time - INTERVAL
> '3'
> >> >> SECOND
> >> >>         |) WITH (
> >> >>         |    'connector.type' = 'kafka',
> >> >>         |    'connector.version' = 'universal',
> >> >>         |     'connector.topic' = 'user',
> >> >>         |    -- 'connector.topic' = 'user_long',
> >> >>         |    'connector.startup-mode' = 'latest-offset',
> >> >>         |    'connector.properties.zookeeper.connect' =
> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >>         |    'connector.properties.bootstrap.servers' =
> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >>         |    'connector.properties.group.id' = 'user_flink',
> >> >>         |    'format.type' = 'json',
> >> >>         |    'format.derive-schema' = 'true'
> >> >>         |)
> >> >>         |""".stripMargin)
> >> >>
> >> >>
> >> >>     streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >> >>
> >> >>     streamTableEnv.executeSql(
> >> >>       """
> >> >>         |
> >> >>         |INSERT INTO hive_table
> >> >>         |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
> >> >> DATE_FORMAT(created_time, 'HH')
> >> >>         |FROM kafka_table
> >> >>         |
> >> >>         |""".stripMargin)
> >> >>
> >> >>     streamTableEnv.executeSql(
> >> >>       """
> >> >>         |
> >> >>         |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18'
> >> >>         |
> >> >>         |""".stripMargin)
> >> >>       .print()
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-07-13 17:52:54,"Jingsong Li" <[hidden email]> 写道:
> >> >> >你把完整的程序再贴下呢
> >> >> >
> >> >> >Best,
> >> >> >Jingsong
> >> >> >
> >> >> >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]>
> wrote:
> >> >> >
> >> >> >> Hi,
> >> >> >>
> >> >> >>
> >> >> >> 我现在改成了:
> >> >> >> 'sink.partition-commit.delay'='0s'
> >> >> >>
> >> >> >>
> >> >> >> checkpoint完成了20多次,hdfs文件也产生了20多个,
> >> >> >> hive表还是查不到数据
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道:
> >> >> >>
> >> >> >> 你好,
> >> >> >> 你设置了1个小时的
> >> >> >> SINK_PARTITION_COMMIT_DELAY
> >> >> >>
> >> >> >>
> >> >> >> ------------------------------------------------------------------
> >> >> >> 发件人:Zhou Zach <[hidden email]>
> >> >> >> 发送时间:2020年7月13日(星期一) 17:09
> >> >> >> 收件人:user-zh <[hidden email]>
> >> >> >> 主 题:Re:Re: Re: Table options do not contain an option key
> 'connector'
> >> >> for
> >> >> >> discovering a connector.
> >> >> >>
> >> >> >>
> >> >> >> 开了checkpoint,
> >> >> >> val streamExecutionEnv =
> >> >> StreamExecutionEnvironment.getExecutionEnvironment
> >> >> >>
> >> >> >>
> >> >>
> >>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >> >> >> streamExecutionEnv.enableCheckpointing(5 * 1000,
> >> >> >> CheckpointingMode.EXACTLY_ONCE)
> >> >> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 *
> >> 1000)
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道:
> >> >> >> >有开checkpoint吧?delay设的多少?
> >> >> >> >
> >> >> >> >Add partition 在 checkpoint完成 + delay的时间后
> >> >> >> >
> >> >> >> >Best,
> >> >> >> >Jingsong
> >> >> >> >
> >> >> >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]>
> >> wrote:
> >> >> >> >
> >> >> >> >> Hi,
> >> >> >> >>
> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
> >> >> >> >> partition到hive表吗,我当前设置了参数
> >> >> >> >> 'sink.partition-commit.policy.kind'='metastore'
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]>
> >> >> wrote:
> >> >> >> >> >Hi,
> >> >> >> >> >
> >> >> >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
> >> >> >> >> >
> >> >> >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
> >> >> >> >> >
> >> >> >> >> >Best,
> >> >> >> >> >Jingsong
> >> >> >> >> >
> >> >> >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]>
> >> >> wrote:
> >> >> >> >> >
> >> >> >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
> >> >> >> >> >> h','sink.partition-commit.policy.kind'='success-file');
> >> >> >> >> >> 也报错误
> >> >> >> >> >> query:
> >> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >> >> >> >> >> streamTableEnv.executeSql(
> >> >> >> >> >> """
> >> >> >> >> >>     |
> >> >> >> >> >>     |
> >> >> >> >> >>     |CREATE TABLE hive_table (
> >> >> >> >> >>     |  user_id STRING,
> >> >> >> >> >>     |  age INT
> >> >> >> >> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS
> parquet
> >> >> >> >> >> TBLPROPERTIES (
> >> >> >> >> >>     |  'partition.time-extractor.timestamp-pattern'='$dt
> >> >> $hr:00:00',
> >> >> >> >> >>     |  'sink.partition-commit.trigger'='partition-time',
> >> >> >> >> >>     |  'sink.partition-commit.delay'='1 h',
> >> >> >> >> >>     |
> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> >> >> >>     |)
> >> >> >> >> >>     |
> >> >> >> >> >>     |""".stripMargin)
> >> >> >> >> >>
> >> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >> >> >> >> >> streamTableEnv.executeSql(
> >> >> >> >> >> """
> >> >> >> >> >>     |
> >> >> >> >> >>     |CREATE TABLE kafka_table (
> >> >> >> >> >>     |    uid VARCHAR,
> >> >> >> >> >>     |    -- uid BIGINT,
> >> >> >> >> >>     |    sex VARCHAR,
> >> >> >> >> >>     |    age INT,
> >> >> >> >> >>     |    created_time TIMESTAMP(3),
> >> >> >> >> >>     |    WATERMARK FOR created_time as created_time -
> INTERVAL
> >> '3'
> >> >> >> >> SECOND
> >> >> >> >> >>     |) WITH (
> >> >> >> >> >>     |    'connector.type' = 'kafka',
> >> >> >> >> >>     |    'connector.version' = 'universal',
> >> >> >> >> >>     |     'connector.topic' = 'user',
> >> >> >> >> >>     |    -- 'connector.topic' = 'user_long',
> >> >> >> >> >>     |    'connector.startup-mode' = 'latest-offset',
> >> >> >> >> >>     |    'connector.properties.zookeeper.connect' =
> >> >> >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >> >> >> >>     |    'connector.properties.bootstrap.servers' =
> >> >> >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >> >> >> >>     |    'connector.properties.group.id' = 'user_flink',
> >> >> >> >> >>     |    'format.type' = 'json',
> >> >> >> >> >>     |    'format.derive-schema' = 'true'
> >> >> >> >> >>     |)
> >> >> >> >> >>     |""".stripMargin)
> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >> >> streamTableEnv.executeSql(
> >> >> >> >> >> """
> >> >> >> >> >>     |
> >> >> >> >> >>     |INSERT INTO hive_table
> >> >> >> >> >>     |SELECT uid, age, DATE_FORMAT(created_time,
> 'yyyy-MM-dd'),
> >> >> >> >> >> DATE_FORMAT(created_time, 'HH')
> >> >> >> >> >>     |FROM kafka_table
> >> >> >> >> >>     |
> >> >> >> >> >>     |""".stripMargin)
> >> >> >> >> >>
> >> >> >> >> >> streamTableEnv.executeSql(
> >> >> >> >> >> """
> >> >> >> >> >>     |
> >> >> >> >> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and
> hr='13'
> >> >> >> >> >>     |
> >> >> >> >> >>     |""".stripMargin)
> >> >> >> >> >> .print()
> >> >> >> >> >> 错误栈:
> >> >> >> >> >> Exception in thread "main"
> >> >> >> >> org.apache.flink.table.api.ValidationException:
> >> >> >> >> >> Unable to create a sink for writing table
> >> >> >> >> >> 'default_catalog.default_database.hive_table'.
> >> >> >> >> >>
> >> >> >> >> >> Table options are:
> >> >> >> >> >>
> >> >> >> >> >> 'hive.storage.file-format'='parquet'
> >> >> >> >> >> 'is_generic'='false'
> >> >> >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> >> >> >> >> 'sink.partition-commit.delay'='1 h'
> >> >> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> >> >> >> 'sink.partition-commit.trigger'='partition-time'
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> >> >> >> >>         at
> >> >> >> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >> >> >> >>         at
> >> >> >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> >> >> >> >>         at
> >> >> >> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >> >> >> >> >>         at
> >> >> >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
> >> >> >> >> >>         at
> >> >> >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
> >> >> >> >> >> Caused by: org.apache.flink.table.api.ValidationException:
> >> Table
> >> >> >> options
> >> >> >> >> >> do not contain an option key 'connector' for discovering a
> >> >> connector.
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
> >> >> >> >> >>         at
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> >> >> >> >> >>         ... 19 more
> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >> >
> >> >> >> >> >--
> >> >> >> >> >Best, Jingsong Lee
> >> >> >> >>
> >> >> >> >
> >> >> >> >
> >> >> >> >--
> >> >> >> >Best, Jingsong Lee
> >> >> >>
> >> >> >
> >> >> >
> >> >> >--
> >> >> >Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>