flink 1.11 sink hive table的connector设置为什么啊,尝试设置
WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='success-file'); 也报错误 query: streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) streamTableEnv.executeSql( """ | | |CREATE TABLE hive_table ( | user_id STRING, | age INT |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', | 'sink.partition-commit.trigger'='partition-time', | 'sink.partition-commit.delay'='1 h', | 'sink.partition-commit.policy.kind'='metastore,success-file' |) | |""".stripMargin) streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( | uid VARCHAR, | -- uid BIGINT, | sex VARCHAR, | age INT, | created_time TIMESTAMP(3), | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'user', | -- 'connector.topic' = 'user_long', | 'connector.startup-mode' = 'latest-offset', | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'connector.properties.group.id' = 'user_flink', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |) |""".stripMargin) streamTableEnv.executeSql( """ | |INSERT INTO hive_table |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH') |FROM kafka_table | |""".stripMargin) streamTableEnv.executeSql( """ | |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' | |""".stripMargin) .print() 错误栈: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hive_table'. Table options are: 'hive.storage.file-format'='parquet' 'is_generic'='false' 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' 'sink.partition-commit.delay'='1 h' 'sink.partition-commit.policy.kind'='metastore,success-file' 'sink.partition-commit.trigger'='partition-time' at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) at org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) ... 19 more |
Hi,
你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog 不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 Best, Jingsong On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote: > flink 1.11 sink hive table的connector设置为什么啊,尝试设置 > WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 > h','sink.partition-commit.policy.kind'='success-file'); > 也报错误 > query: > streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > streamTableEnv.executeSql( > """ > | > | > |CREATE TABLE hive_table ( > | user_id STRING, > | age INT > |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet > TBLPROPERTIES ( > | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > | 'sink.partition-commit.trigger'='partition-time', > | 'sink.partition-commit.delay'='1 h', > | 'sink.partition-commit.policy.kind'='metastore,success-file' > |) > | > |""".stripMargin) > > streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > streamTableEnv.executeSql( > """ > | > |CREATE TABLE kafka_table ( > | uid VARCHAR, > | -- uid BIGINT, > | sex VARCHAR, > | age INT, > | created_time TIMESTAMP(3), > | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND > |) WITH ( > | 'connector.type' = 'kafka', > | 'connector.version' = 'universal', > | 'connector.topic' = 'user', > | -- 'connector.topic' = 'user_long', > | 'connector.startup-mode' = 'latest-offset', > | 'connector.properties.zookeeper.connect' = > 'cdh1:2181,cdh2:2181,cdh3:2181', > | 'connector.properties.bootstrap.servers' = > 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'connector.properties.group.id' = 'user_flink', > | 'format.type' = 'json', > | 'format.derive-schema' = 'true' > |) > |""".stripMargin) > > > > streamTableEnv.executeSql( > """ > | > |INSERT INTO hive_table > |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > DATE_FORMAT(created_time, 'HH') > |FROM kafka_table > | > |""".stripMargin) > > streamTableEnv.executeSql( > """ > | > |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' > | > |""".stripMargin) > .print() > 错误栈: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Unable to create a sink for writing table > 'default_catalog.default_database.hive_table'. > > Table options are: > > 'hive.storage.file-format'='parquet' > 'is_generic'='false' > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > 'sink.partition-commit.delay'='1 h' > 'sink.partition-commit.policy.kind'='metastore,success-file' > 'sink.partition-commit.trigger'='partition-time' > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > at > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > at > org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) > at org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) > Caused by: org.apache.flink.table.api.ValidationException: Table options > do not contain an option key 'connector' for discovering a connector. > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > ... 19 more > > -- Best, Jingsong Lee |
Hi,
根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add partition到hive表吗,我当前设置了参数 'sink.partition-commit.policy.kind'='metastore' At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote: >Hi, > >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog > >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote: > >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 >> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 >> h','sink.partition-commit.policy.kind'='success-file'); >> 也报错误 >> query: >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> streamTableEnv.executeSql( >> """ >> | >> | >> |CREATE TABLE hive_table ( >> | user_id STRING, >> | age INT >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet >> TBLPROPERTIES ( >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', >> | 'sink.partition-commit.trigger'='partition-time', >> | 'sink.partition-commit.delay'='1 h', >> | 'sink.partition-commit.policy.kind'='metastore,success-file' >> |) >> | >> |""".stripMargin) >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) >> streamTableEnv.executeSql( >> """ >> | >> |CREATE TABLE kafka_table ( >> | uid VARCHAR, >> | -- uid BIGINT, >> | sex VARCHAR, >> | age INT, >> | created_time TIMESTAMP(3), >> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND >> |) WITH ( >> | 'connector.type' = 'kafka', >> | 'connector.version' = 'universal', >> | 'connector.topic' = 'user', >> | -- 'connector.topic' = 'user_long', >> | 'connector.startup-mode' = 'latest-offset', >> | 'connector.properties.zookeeper.connect' = >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> | 'connector.properties.bootstrap.servers' = >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> | 'connector.properties.group.id' = 'user_flink', >> | 'format.type' = 'json', >> | 'format.derive-schema' = 'true' >> |) >> |""".stripMargin) >> >> >> >> streamTableEnv.executeSql( >> """ >> | >> |INSERT INTO hive_table >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), >> DATE_FORMAT(created_time, 'HH') >> |FROM kafka_table >> | >> |""".stripMargin) >> >> streamTableEnv.executeSql( >> """ >> | >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' >> | >> |""".stripMargin) >> .print() >> 错误栈: >> Exception in thread "main" org.apache.flink.table.api.ValidationException: >> Unable to create a sink for writing table >> 'default_catalog.default_database.hive_table'. >> >> Table options are: >> >> 'hive.storage.file-format'='parquet' >> 'is_generic'='false' >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' >> 'sink.partition-commit.delay'='1 h' >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> 'sink.partition-commit.trigger'='partition-time' >> at >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> at >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> at >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> at >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> at >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) >> at org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) >> Caused by: org.apache.flink.table.api.ValidationException: Table options >> do not contain an option key 'connector' for discovering a connector. >> at >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) >> at >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) >> ... 19 more >> >> > >-- >Best, Jingsong Lee |
有开checkpoint吧?delay设的多少?
Add partition 在 checkpoint完成 + delay的时间后 Best, Jingsong On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote: > Hi, > 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add > partition到hive表吗,我当前设置了参数 > 'sink.partition-commit.policy.kind'='metastore' > > > > > > > > > > > > > > > > > > At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote: > >Hi, > > > >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog > > > >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 > > > >Best, > >Jingsong > > > >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote: > > > >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 > >> > WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 > >> h','sink.partition-commit.policy.kind'='success-file'); > >> 也报错误 > >> query: > >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> streamTableEnv.executeSql( > >> """ > >> | > >> | > >> |CREATE TABLE hive_table ( > >> | user_id STRING, > >> | age INT > >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet > >> TBLPROPERTIES ( > >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > >> | 'sink.partition-commit.trigger'='partition-time', > >> | 'sink.partition-commit.delay'='1 h', > >> | 'sink.partition-commit.policy.kind'='metastore,success-file' > >> |) > >> | > >> |""".stripMargin) > >> > >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> streamTableEnv.executeSql( > >> """ > >> | > >> |CREATE TABLE kafka_table ( > >> | uid VARCHAR, > >> | -- uid BIGINT, > >> | sex VARCHAR, > >> | age INT, > >> | created_time TIMESTAMP(3), > >> | WATERMARK FOR created_time as created_time - INTERVAL '3' > SECOND > >> |) WITH ( > >> | 'connector.type' = 'kafka', > >> | 'connector.version' = 'universal', > >> | 'connector.topic' = 'user', > >> | -- 'connector.topic' = 'user_long', > >> | 'connector.startup-mode' = 'latest-offset', > >> | 'connector.properties.zookeeper.connect' = > >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> | 'connector.properties.bootstrap.servers' = > >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> | 'connector.properties.group.id' = 'user_flink', > >> | 'format.type' = 'json', > >> | 'format.derive-schema' = 'true' > >> |) > >> |""".stripMargin) > >> > >> > >> > >> streamTableEnv.executeSql( > >> """ > >> | > >> |INSERT INTO hive_table > >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > >> DATE_FORMAT(created_time, 'HH') > >> |FROM kafka_table > >> | > >> |""".stripMargin) > >> > >> streamTableEnv.executeSql( > >> """ > >> | > >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' > >> | > >> |""".stripMargin) > >> .print() > >> 错误栈: > >> Exception in thread "main" > org.apache.flink.table.api.ValidationException: > >> Unable to create a sink for writing table > >> 'default_catalog.default_database.hive_table'. > >> > >> Table options are: > >> > >> 'hive.storage.file-format'='parquet' > >> 'is_generic'='false' > >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> 'sink.partition-commit.delay'='1 h' > >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> 'sink.partition-commit.trigger'='partition-time' > >> at > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> at > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> at > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> at > >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> at > >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> at > scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> at > >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) > >> at > org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) > >> Caused by: org.apache.flink.table.api.ValidationException: Table options > >> do not contain an option key 'connector' for discovering a connector. > >> at > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > >> at > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> ... 19 more > >> > >> > > > >-- > >Best, Jingsong Lee > -- Best, Jingsong Lee |
开了checkpoint,
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE) streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道: >有开checkpoint吧?delay设的多少? > >Add partition 在 checkpoint完成 + delay的时间后 > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote: > >> Hi, >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add >> partition到hive表吗,我当前设置了参数 >> 'sink.partition-commit.policy.kind'='metastore' >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote: >> >Hi, >> > >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog >> > >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 >> > >> >Best, >> >Jingsong >> > >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote: >> > >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 >> >> >> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 >> >> h','sink.partition-commit.policy.kind'='success-file'); >> >> 也报错误 >> >> query: >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> | >> >> |CREATE TABLE hive_table ( >> >> | user_id STRING, >> >> | age INT >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet >> >> TBLPROPERTIES ( >> >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', >> >> | 'sink.partition-commit.trigger'='partition-time', >> >> | 'sink.partition-commit.delay'='1 h', >> >> | 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> |) >> >> | >> >> |""".stripMargin) >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |CREATE TABLE kafka_table ( >> >> | uid VARCHAR, >> >> | -- uid BIGINT, >> >> | sex VARCHAR, >> >> | age INT, >> >> | created_time TIMESTAMP(3), >> >> | WATERMARK FOR created_time as created_time - INTERVAL '3' >> SECOND >> >> |) WITH ( >> >> | 'connector.type' = 'kafka', >> >> | 'connector.version' = 'universal', >> >> | 'connector.topic' = 'user', >> >> | -- 'connector.topic' = 'user_long', >> >> | 'connector.startup-mode' = 'latest-offset', >> >> | 'connector.properties.zookeeper.connect' = >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> >> | 'connector.properties.bootstrap.servers' = >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> >> | 'connector.properties.group.id' = 'user_flink', >> >> | 'format.type' = 'json', >> >> | 'format.derive-schema' = 'true' >> >> |) >> >> |""".stripMargin) >> >> >> >> >> >> >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |INSERT INTO hive_table >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), >> >> DATE_FORMAT(created_time, 'HH') >> >> |FROM kafka_table >> >> | >> >> |""".stripMargin) >> >> >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' >> >> | >> >> |""".stripMargin) >> >> .print() >> >> 错误栈: >> >> Exception in thread "main" >> org.apache.flink.table.api.ValidationException: >> >> Unable to create a sink for writing table >> >> 'default_catalog.default_database.hive_table'. >> >> >> >> Table options are: >> >> >> >> 'hive.storage.file-format'='parquet' >> >> 'is_generic'='false' >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' >> >> 'sink.partition-commit.delay'='1 h' >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> 'sink.partition-commit.trigger'='partition-time' >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> at >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> at >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> at >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> >> at >> scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> >> at >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) >> >> at >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) >> >> Caused by: org.apache.flink.table.api.ValidationException: Table options >> >> do not contain an option key 'connector' for discovering a connector. >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) >> >> ... 19 more >> >> >> >> >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee |
有没有设置 sink.partition-commit.delay?
Best, Jingsong On Mon, Jul 13, 2020 at 5:09 PM Zhou Zach <[hidden email]> wrote: > 开了checkpoint, > val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment > > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamExecutionEnv.enableCheckpointing(5 * 1000, > CheckpointingMode.EXACTLY_ONCE) > streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) > > > > > 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 > > > > > > > > > > > > > > > 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道: > >有开checkpoint吧?delay设的多少? > > > >Add partition 在 checkpoint完成 + delay的时间后 > > > >Best, > >Jingsong > > > >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote: > > > >> Hi, > >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add > >> partition到hive表吗,我当前设置了参数 > >> 'sink.partition-commit.policy.kind'='metastore' > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote: > >> >Hi, > >> > > >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog > >> > > >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 > >> > > >> >Best, > >> >Jingsong > >> > > >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote: > >> > > >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 > >> >> > >> > WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 > >> >> h','sink.partition-commit.policy.kind'='success-file'); > >> >> 也报错误 > >> >> query: > >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> | > >> >> |CREATE TABLE hive_table ( > >> >> | user_id STRING, > >> >> | age INT > >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet > >> >> TBLPROPERTIES ( > >> >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > >> >> | 'sink.partition-commit.trigger'='partition-time', > >> >> | 'sink.partition-commit.delay'='1 h', > >> >> | 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> |) > >> >> | > >> >> |""".stripMargin) > >> >> > >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |CREATE TABLE kafka_table ( > >> >> | uid VARCHAR, > >> >> | -- uid BIGINT, > >> >> | sex VARCHAR, > >> >> | age INT, > >> >> | created_time TIMESTAMP(3), > >> >> | WATERMARK FOR created_time as created_time - INTERVAL '3' > >> SECOND > >> >> |) WITH ( > >> >> | 'connector.type' = 'kafka', > >> >> | 'connector.version' = 'universal', > >> >> | 'connector.topic' = 'user', > >> >> | -- 'connector.topic' = 'user_long', > >> >> | 'connector.startup-mode' = 'latest-offset', > >> >> | 'connector.properties.zookeeper.connect' = > >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> >> | 'connector.properties.bootstrap.servers' = > >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> >> | 'connector.properties.group.id' = 'user_flink', > >> >> | 'format.type' = 'json', > >> >> | 'format.derive-schema' = 'true' > >> >> |) > >> >> |""".stripMargin) > >> >> > >> >> > >> >> > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |INSERT INTO hive_table > >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > >> >> DATE_FORMAT(created_time, 'HH') > >> >> |FROM kafka_table > >> >> | > >> >> |""".stripMargin) > >> >> > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' > >> >> | > >> >> |""".stripMargin) > >> >> .print() > >> >> 错误栈: > >> >> Exception in thread "main" > >> org.apache.flink.table.api.ValidationException: > >> >> Unable to create a sink for writing table > >> >> 'default_catalog.default_database.hive_table'. > >> >> > >> >> Table options are: > >> >> > >> >> 'hive.storage.file-format'='parquet' > >> >> 'is_generic'='false' > >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> >> 'sink.partition-commit.delay'='1 h' > >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> 'sink.partition-commit.trigger'='partition-time' > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> at > scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> at > >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> at > >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> at > >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> at > >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> at > >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) > >> >> at > >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) > >> >> Caused by: org.apache.flink.table.api.ValidationException: Table > options > >> >> do not contain an option key 'connector' for discovering a connector. > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> >> ... 19 more > >> >> > >> >> > >> > > >> >-- > >> >Best, Jingsong Lee > >> > > > > > >-- > >Best, Jingsong Lee > -- Best, Jingsong Lee |
In reply to this post by Zhou Zach
你好,
你设置了1个小时的 SINK_PARTITION_COMMIT_DELAY ------------------------------------------------------------------ 发件人:Zhou Zach <[hidden email]> 发送时间:2020年7月13日(星期一) 17:09 收件人:user-zh <[hidden email]> 主 题:Re:Re: Re: Table options do not contain an option key 'connector' for discovering a connector. 开了checkpoint, val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE) streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道: >有开checkpoint吧?delay设的多少? > >Add partition 在 checkpoint完成 + delay的时间后 > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote: > >> Hi, >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add >> partition到hive表吗,我当前设置了参数 >> 'sink.partition-commit.policy.kind'='metastore' >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote: >> >Hi, >> > >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog >> > >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 >> > >> >Best, >> >Jingsong >> > >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote: >> > >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 >> >> >> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 >> >> h','sink.partition-commit.policy.kind'='success-file'); >> >> 也报错误 >> >> query: >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> | >> >> |CREATE TABLE hive_table ( >> >> | user_id STRING, >> >> | age INT >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet >> >> TBLPROPERTIES ( >> >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', >> >> | 'sink.partition-commit.trigger'='partition-time', >> >> | 'sink.partition-commit.delay'='1 h', >> >> | 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> |) >> >> | >> >> |""".stripMargin) >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |CREATE TABLE kafka_table ( >> >> | uid VARCHAR, >> >> | -- uid BIGINT, >> >> | sex VARCHAR, >> >> | age INT, >> >> | created_time TIMESTAMP(3), >> >> | WATERMARK FOR created_time as created_time - INTERVAL '3' >> SECOND >> >> |) WITH ( >> >> | 'connector.type' = 'kafka', >> >> | 'connector.version' = 'universal', >> >> | 'connector.topic' = 'user', >> >> | -- 'connector.topic' = 'user_long', >> >> | 'connector.startup-mode' = 'latest-offset', >> >> | 'connector.properties.zookeeper.connect' = >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> >> | 'connector.properties.bootstrap.servers' = >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> >> | 'connector.properties.group.id' = 'user_flink', >> >> | 'format.type' = 'json', >> >> | 'format.derive-schema' = 'true' >> >> |) >> >> |""".stripMargin) >> >> >> >> >> >> >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |INSERT INTO hive_table >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), >> >> DATE_FORMAT(created_time, 'HH') >> >> |FROM kafka_table >> >> | >> >> |""".stripMargin) >> >> >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' >> >> | >> >> |""".stripMargin) >> >> .print() >> >> 错误栈: >> >> Exception in thread "main" >> org.apache.flink.table.api.ValidationException: >> >> Unable to create a sink for writing table >> >> 'default_catalog.default_database.hive_table'. >> >> >> >> Table options are: >> >> >> >> 'hive.storage.file-format'='parquet' >> >> 'is_generic'='false' >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' >> >> 'sink.partition-commit.delay'='1 h' >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> 'sink.partition-commit.trigger'='partition-time' >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> at >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> at >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> at >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> >> at >> scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> >> at >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) >> >> at >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) >> >> Caused by: org.apache.flink.table.api.ValidationException: Table options >> >> do not contain an option key 'connector' for discovering a connector. >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) >> >> ... 19 more >> >> >> >> >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee |
Hi,
我现在改成了: 'sink.partition-commit.delay'='0s' checkpoint完成了20多次,hdfs文件也产生了20多个, hive表还是查不到数据 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道: 你好, 你设置了1个小时的 SINK_PARTITION_COMMIT_DELAY ------------------------------------------------------------------ 发件人:Zhou Zach <[hidden email]> 发送时间:2020年7月13日(星期一) 17:09 收件人:user-zh <[hidden email]> 主 题:Re:Re: Re: Table options do not contain an option key 'connector' for discovering a connector. 开了checkpoint, val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE) streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道: >有开checkpoint吧?delay设的多少? > >Add partition 在 checkpoint完成 + delay的时间后 > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote: > >> Hi, >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add >> partition到hive表吗,我当前设置了参数 >> 'sink.partition-commit.policy.kind'='metastore' >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote: >> >Hi, >> > >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog >> > >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 >> > >> >Best, >> >Jingsong >> > >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote: >> > >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 >> >> >> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 >> >> h','sink.partition-commit.policy.kind'='success-file'); >> >> 也报错误 >> >> query: >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> | >> >> |CREATE TABLE hive_table ( >> >> | user_id STRING, >> >> | age INT >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet >> >> TBLPROPERTIES ( >> >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', >> >> | 'sink.partition-commit.trigger'='partition-time', >> >> | 'sink.partition-commit.delay'='1 h', >> >> | 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> |) >> >> | >> >> |""".stripMargin) >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |CREATE TABLE kafka_table ( >> >> | uid VARCHAR, >> >> | -- uid BIGINT, >> >> | sex VARCHAR, >> >> | age INT, >> >> | created_time TIMESTAMP(3), >> >> | WATERMARK FOR created_time as created_time - INTERVAL '3' >> SECOND >> >> |) WITH ( >> >> | 'connector.type' = 'kafka', >> >> | 'connector.version' = 'universal', >> >> | 'connector.topic' = 'user', >> >> | -- 'connector.topic' = 'user_long', >> >> | 'connector.startup-mode' = 'latest-offset', >> >> | 'connector.properties.zookeeper.connect' = >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> >> | 'connector.properties.bootstrap.servers' = >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> >> | 'connector.properties.group.id' = 'user_flink', >> >> | 'format.type' = 'json', >> >> | 'format.derive-schema' = 'true' >> >> |) >> >> |""".stripMargin) >> >> >> >> >> >> >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |INSERT INTO hive_table >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), >> >> DATE_FORMAT(created_time, 'HH') >> >> |FROM kafka_table >> >> | >> >> |""".stripMargin) >> >> >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' >> >> | >> >> |""".stripMargin) >> >> .print() >> >> 错误栈: >> >> Exception in thread "main" >> org.apache.flink.table.api.ValidationException: >> >> Unable to create a sink for writing table >> >> 'default_catalog.default_database.hive_table'. >> >> >> >> Table options are: >> >> >> >> 'hive.storage.file-format'='parquet' >> >> 'is_generic'='false' >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' >> >> 'sink.partition-commit.delay'='1 h' >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> 'sink.partition-commit.trigger'='partition-time' >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> at >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> at >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> at >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> >> at >> scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> >> at >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) >> >> at >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) >> >> Caused by: org.apache.flink.table.api.ValidationException: Table options >> >> do not contain an option key 'connector' for discovering a connector. >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) >> >> ... 19 more >> >> >> >> >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee |
你把完整的程序再贴下呢
Best, Jingsong On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> wrote: > Hi, > > > 我现在改成了: > 'sink.partition-commit.delay'='0s' > > > checkpoint完成了20多次,hdfs文件也产生了20多个, > hive表还是查不到数据 > > > > > > > > > > > > > > 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道: > > 你好, > 你设置了1个小时的 > SINK_PARTITION_COMMIT_DELAY > > > ------------------------------------------------------------------ > 发件人:Zhou Zach <[hidden email]> > 发送时间:2020年7月13日(星期一) 17:09 > 收件人:user-zh <[hidden email]> > 主 题:Re:Re: Re: Table options do not contain an option key 'connector' for > discovering a connector. > > > 开了checkpoint, > val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment > > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamExecutionEnv.enableCheckpointing(5 * 1000, > CheckpointingMode.EXACTLY_ONCE) > streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) > > > > > 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 > > > > > > > > > > > > > > > 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道: > >有开checkpoint吧?delay设的多少? > > > >Add partition 在 checkpoint完成 + delay的时间后 > > > >Best, > >Jingsong > > > >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote: > > > >> Hi, > >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add > >> partition到hive表吗,我当前设置了参数 > >> 'sink.partition-commit.policy.kind'='metastore' > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote: > >> >Hi, > >> > > >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog > >> > > >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 > >> > > >> >Best, > >> >Jingsong > >> > > >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote: > >> > > >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 > >> >> > >> > WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 > >> >> h','sink.partition-commit.policy.kind'='success-file'); > >> >> 也报错误 > >> >> query: > >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> | > >> >> |CREATE TABLE hive_table ( > >> >> | user_id STRING, > >> >> | age INT > >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet > >> >> TBLPROPERTIES ( > >> >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > >> >> | 'sink.partition-commit.trigger'='partition-time', > >> >> | 'sink.partition-commit.delay'='1 h', > >> >> | 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> |) > >> >> | > >> >> |""".stripMargin) > >> >> > >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |CREATE TABLE kafka_table ( > >> >> | uid VARCHAR, > >> >> | -- uid BIGINT, > >> >> | sex VARCHAR, > >> >> | age INT, > >> >> | created_time TIMESTAMP(3), > >> >> | WATERMARK FOR created_time as created_time - INTERVAL '3' > >> SECOND > >> >> |) WITH ( > >> >> | 'connector.type' = 'kafka', > >> >> | 'connector.version' = 'universal', > >> >> | 'connector.topic' = 'user', > >> >> | -- 'connector.topic' = 'user_long', > >> >> | 'connector.startup-mode' = 'latest-offset', > >> >> | 'connector.properties.zookeeper.connect' = > >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> >> | 'connector.properties.bootstrap.servers' = > >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> >> | 'connector.properties.group.id' = 'user_flink', > >> >> | 'format.type' = 'json', > >> >> | 'format.derive-schema' = 'true' > >> >> |) > >> >> |""".stripMargin) > >> >> > >> >> > >> >> > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |INSERT INTO hive_table > >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > >> >> DATE_FORMAT(created_time, 'HH') > >> >> |FROM kafka_table > >> >> | > >> >> |""".stripMargin) > >> >> > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' > >> >> | > >> >> |""".stripMargin) > >> >> .print() > >> >> 错误栈: > >> >> Exception in thread "main" > >> org.apache.flink.table.api.ValidationException: > >> >> Unable to create a sink for writing table > >> >> 'default_catalog.default_database.hive_table'. > >> >> > >> >> Table options are: > >> >> > >> >> 'hive.storage.file-format'='parquet' > >> >> 'is_generic'='false' > >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> >> 'sink.partition-commit.delay'='1 h' > >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> 'sink.partition-commit.trigger'='partition-time' > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> at > scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> at > >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> at > >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> at > >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> at > >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> at > >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) > >> >> at > >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) > >> >> Caused by: org.apache.flink.table.api.ValidationException: Table > options > >> >> do not contain an option key 'connector' for discovering a connector. > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> >> ... 19 more > >> >> > >> >> > >> > > >> >-- > >> >Best, Jingsong Lee > >> > > > > > >-- > >Best, Jingsong Lee > -- Best, Jingsong Lee |
尴尬。。。。
我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li, @夏帅 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢! 还有两个问题问下, 问题1: 创建的kafka_table,在hive和Flink SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore 问题2: 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了: java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_161] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_161] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) ~[?:1.8.0_161] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_161] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_161] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_161] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_161] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_161] at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154) [qile-data-flow-1.0.jar:?] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [qile-data-flow-1.0.jar:?] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [qile-data-flow-1.0.jar:?] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [qile-data-flow-1.0.jar:?] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [qile-data-flow-1.0.jar:?] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [qile-data-flow-1.0.jar:?] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [qile-data-flow-1.0.jar:?] Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application. ... 11 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a sink for writing table 'default_catalog.default_database.hive_table1'. Table options are: 'connector'='filesystem' 'hive.storage.file-format'='parquet' 'is_generic'='false' 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' 'sink.partition-commit.delay'='0s' 'sink.partition-commit.policy.kind'='metastore,success-file' at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-clients_2.11-1.11.0.jar:1.11.0] ... 10 more Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hive_table1'. Table options are: 'connector'='filesystem' 'hive.storage.file-format'='parquet' 'is_generic'='false' 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' 'sink.partition-commit.delay'='0s' 'sink.partition-commit.policy.kind'='metastore,success-file' at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[qile-data-flow-1.0.jar:?] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[qile-data-flow-1.0.jar:?] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[qile-data-flow-1.0.jar:?] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[qile-data-flow-1.0.jar:?] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[qile-data-flow-1.0.jar:?] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) ~[qile-data-flow-1.0.jar:?] at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) ~[qile-data-flow-1.0.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_161] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_161] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_161] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-clients_2.11-1.11.0.jar:1.11.0] ... 10 more Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='filesystem''. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[qile-data-flow-1.0.jar:?] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[qile-data-flow-1.0.jar:?] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[qile-data-flow-1.0.jar:?] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[qile-data-flow-1.0.jar:?] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[qile-data-flow-1.0.jar:?] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) ~[qile-data-flow-1.0.jar:?] at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) ~[qile-data-flow-1.0.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_161] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_161] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_161] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-clients_2.11-1.11.0.jar:1.11.0] ... 10 more Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'filesystem' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath. Available factory identifiers are: blackhole hbase-1.4 jdbc kafka at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?] at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[qile-data-flow-1.0.jar:?] at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[qile-data-flow-1.0.jar:?] at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[qile-data-flow-1.0.jar:?] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[qile-data-flow-1.0.jar:?] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[qile-data-flow-1.0.jar:?] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[qile-data-flow-1.0.jar:?] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) ~[qile-data-flow-1.0.jar:?] at cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) ~[qile-data-flow-1.0.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_161] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_161] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_161] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-clients_2.11-1.11.0.jar:1.11.0] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-clients_2.11-1.11.0.jar:1.11.0] ... 10 more query: val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE) streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings) streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) streamTableEnv.executeSql( """ | | |CREATE TABLE hive_table ( | user_id STRING, | age INT |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( | 'connector'='filesystem', | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', | 'sink.partition-commit.delay'='0s', | 'sink.partition-commit.policy.kind'='metastore,success-file' |) | |""".stripMargin) streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( | uid VARCHAR, | -- uid BIGINT, | sex VARCHAR, | age INT, | created_time TIMESTAMP(3), | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'user', | -- 'connector.topic' = 'user_long', | 'connector.startup-mode' = 'latest-offset', | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'connector.properties.group.id' = 'user_flink', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |) |""".stripMargin) streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) streamTableEnv.executeSql( """ | |INSERT INTO hive_table |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH') |FROM kafka_table | |""".stripMargin) streamTableEnv.executeSql( """ | |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18' | |""".stripMargin) .print() 在 2020-07-13 17:52:54,"Jingsong Li" <[hidden email]> 写道: >你把完整的程序再贴下呢 > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> wrote: > >> Hi, >> >> >> 我现在改成了: >> 'sink.partition-commit.delay'='0s' >> >> >> checkpoint完成了20多次,hdfs文件也产生了20多个, >> hive表还是查不到数据 >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道: >> >> 你好, >> 你设置了1个小时的 >> SINK_PARTITION_COMMIT_DELAY >> >> >> ------------------------------------------------------------------ >> 发件人:Zhou Zach <[hidden email]> >> 发送时间:2020年7月13日(星期一) 17:09 >> 收件人:user-zh <[hidden email]> >> 主 题:Re:Re: Re: Table options do not contain an option key 'connector' for >> discovering a connector. >> >> >> 开了checkpoint, >> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >> >> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> streamExecutionEnv.enableCheckpointing(5 * 1000, >> CheckpointingMode.EXACTLY_ONCE) >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) >> >> >> >> >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道: >> >有开checkpoint吧?delay设的多少? >> > >> >Add partition 在 checkpoint完成 + delay的时间后 >> > >> >Best, >> >Jingsong >> > >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote: >> > >> >> Hi, >> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add >> >> partition到hive表吗,我当前设置了参数 >> >> 'sink.partition-commit.policy.kind'='metastore' >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> wrote: >> >> >Hi, >> >> > >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog >> >> > >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 >> >> > >> >> >Best, >> >> >Jingsong >> >> > >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> wrote: >> >> > >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 >> >> >> >> >> >> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 >> >> >> h','sink.partition-commit.policy.kind'='success-file'); >> >> >> 也报错误 >> >> >> query: >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> >> >> streamTableEnv.executeSql( >> >> >> """ >> >> >> | >> >> >> | >> >> >> |CREATE TABLE hive_table ( >> >> >> | user_id STRING, >> >> >> | age INT >> >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet >> >> >> TBLPROPERTIES ( >> >> >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', >> >> >> | 'sink.partition-commit.trigger'='partition-time', >> >> >> | 'sink.partition-commit.delay'='1 h', >> >> >> | 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> >> |) >> >> >> | >> >> >> |""".stripMargin) >> >> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) >> >> >> streamTableEnv.executeSql( >> >> >> """ >> >> >> | >> >> >> |CREATE TABLE kafka_table ( >> >> >> | uid VARCHAR, >> >> >> | -- uid BIGINT, >> >> >> | sex VARCHAR, >> >> >> | age INT, >> >> >> | created_time TIMESTAMP(3), >> >> >> | WATERMARK FOR created_time as created_time - INTERVAL '3' >> >> SECOND >> >> >> |) WITH ( >> >> >> | 'connector.type' = 'kafka', >> >> >> | 'connector.version' = 'universal', >> >> >> | 'connector.topic' = 'user', >> >> >> | -- 'connector.topic' = 'user_long', >> >> >> | 'connector.startup-mode' = 'latest-offset', >> >> >> | 'connector.properties.zookeeper.connect' = >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> >> >> | 'connector.properties.bootstrap.servers' = >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> >> >> | 'connector.properties.group.id' = 'user_flink', >> >> >> | 'format.type' = 'json', >> >> >> | 'format.derive-schema' = 'true' >> >> >> |) >> >> >> |""".stripMargin) >> >> >> >> >> >> >> >> >> >> >> >> streamTableEnv.executeSql( >> >> >> """ >> >> >> | >> >> >> |INSERT INTO hive_table >> >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), >> >> >> DATE_FORMAT(created_time, 'HH') >> >> >> |FROM kafka_table >> >> >> | >> >> >> |""".stripMargin) >> >> >> >> >> >> streamTableEnv.executeSql( >> >> >> """ >> >> >> | >> >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' >> >> >> | >> >> >> |""".stripMargin) >> >> >> .print() >> >> >> 错误栈: >> >> >> Exception in thread "main" >> >> org.apache.flink.table.api.ValidationException: >> >> >> Unable to create a sink for writing table >> >> >> 'default_catalog.default_database.hive_table'. >> >> >> >> >> >> Table options are: >> >> >> >> >> >> 'hive.storage.file-format'='parquet' >> >> >> 'is_generic'='false' >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' >> >> >> 'sink.partition-commit.delay'='1 h' >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> >> 'sink.partition-commit.trigger'='partition-time' >> >> >> at >> >> >> >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> >> at >> >> >> >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> >> at >> >> >> >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> >> at >> scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> >> at >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> >> at >> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> >> at >> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> >> at >> >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> >> >> at >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> >> at >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> >> >> at >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> >> >> at >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> >> >> at >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> >> >> at >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> >> >> at >> >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) >> >> >> at >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) >> >> >> Caused by: org.apache.flink.table.api.ValidationException: Table >> options >> >> >> do not contain an option key 'connector' for discovering a connector. >> >> >> at >> >> >> >> >> >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) >> >> >> at >> >> >> >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) >> >> >> ... 19 more >> >> >> >> >> >> >> >> > >> >> >-- >> >> >Best, Jingsong Lee >> >> >> > >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee |
Hi,
问题一: 只要current catalog是HiveCatalog。 理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS. 明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗? 问题二: 用filesystem创建出来的是filesystem的表,它和hive metastore是没有关系的,你需要使用创建filesystem表的语法[1]。 filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。 但是它的partition commit是不支持metastore的,所以不会有自动add partition到hive的默认实现,你需要自定义partition-commit-policy. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html Best, Jingsong On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach <[hidden email]> wrote: > 尴尬。。。。 > 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li, @夏帅 > 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢! > 还有两个问题问下, > 问题1: > 创建的kafka_table,在hive和Flink > SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore > > > > > > > 问题2: > 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了: > java.util.concurrent.CompletionException: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application. > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > ~[?:1.8.0_161] > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > ~[?:1.8.0_161] > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) > ~[?:1.8.0_161] > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ~[?:1.8.0_161] > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > ~[?:1.8.0_161] > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > ~[?:1.8.0_161] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_161] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [?:1.8.0_161] > at > org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154) > [qile-data-flow-1.0.jar:?] > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > [qile-data-flow-1.0.jar:?] > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > [qile-data-flow-1.0.jar:?] > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [qile-data-flow-1.0.jar:?] > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [qile-data-flow-1.0.jar:?] > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [qile-data-flow-1.0.jar:?] > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [qile-data-flow-1.0.jar:?] > Caused by: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application. > ... 11 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > main method caused an error: Unable to create a sink for writing table > 'default_catalog.default_database.hive_table1'. > > Table options are: > > 'connector'='filesystem' > 'hive.storage.file-format'='parquet' > 'is_generic'='false' > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > 'sink.partition-commit.delay'='0s' > 'sink.partition-commit.policy.kind'='metastore,success-file' > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > ... 10 more > Caused by: org.apache.flink.table.api.ValidationException: Unable to > create a sink for writing table > 'default_catalog.default_database.hive_table1'. > > Table options are: > > 'connector'='filesystem' > 'hive.storage.file-format'='parquet' > 'is_generic'='false' > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > 'sink.partition-commit.delay'='0s' > 'sink.partition-commit.policy.kind'='metastore,success-file' > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[qile-data-flow-1.0.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[qile-data-flow-1.0.jar:?] > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > ~[qile-data-flow-1.0.jar:?] > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > ~[qile-data-flow-1.0.jar:?] > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > ~[qile-data-flow-1.0.jar:?] > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > ~[qile-data-flow-1.0.jar:?] > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > ~[qile-data-flow-1.0.jar:?] > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > ~[qile-data-flow-1.0.jar:?] > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) > ~[qile-data-flow-1.0.jar:?] > at > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) > ~[qile-data-flow-1.0.jar:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_161] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_161] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_161] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > ... 10 more > Caused by: org.apache.flink.table.api.ValidationException: Cannot discover > a connector using option ''connector'='filesystem''. > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[qile-data-flow-1.0.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[qile-data-flow-1.0.jar:?] > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > ~[qile-data-flow-1.0.jar:?] > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > ~[qile-data-flow-1.0.jar:?] > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > ~[qile-data-flow-1.0.jar:?] > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > ~[qile-data-flow-1.0.jar:?] > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > ~[qile-data-flow-1.0.jar:?] > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > ~[qile-data-flow-1.0.jar:?] > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) > ~[qile-data-flow-1.0.jar:?] > at > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) > ~[qile-data-flow-1.0.jar:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_161] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_161] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_161] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > ... 10 more > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any factory for identifier 'filesystem' that implements > 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath. > > Available factory identifiers are: > > blackhole > hbase-1.4 > jdbc > kafka > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[qile-data-flow-1.0.jar:?] > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > ~[qile-data-flow-1.0.jar:?] > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > ~[qile-data-flow-1.0.jar:?] > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > ~[qile-data-flow-1.0.jar:?] > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > ~[qile-data-flow-1.0.jar:?] > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > ~[qile-data-flow-1.0.jar:?] > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > ~[qile-data-flow-1.0.jar:?] > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > ~[qile-data-flow-1.0.jar:?] > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > at > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) > ~[qile-data-flow-1.0.jar:?] > at > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) > ~[qile-data-flow-1.0.jar:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_161] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_161] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_161] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > ~[flink-clients_2.11-1.11.0.jar:1.11.0] > ... 10 more > > > > > > > query: > > > > > val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment > > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamExecutionEnv.enableCheckpointing(5 * 1000, > CheckpointingMode.EXACTLY_ONCE) > streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) > > val blinkEnvSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, > blinkEnvSettings) > > > > streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > streamTableEnv.executeSql( > """ > | > | > |CREATE TABLE hive_table ( > | user_id STRING, > | age INT > |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet > TBLPROPERTIES ( > | 'connector'='filesystem', > | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > | 'sink.partition-commit.delay'='0s', > | 'sink.partition-commit.policy.kind'='metastore,success-file' > |) > | > |""".stripMargin) > > streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > streamTableEnv.executeSql( > """ > | > |CREATE TABLE kafka_table ( > | uid VARCHAR, > | -- uid BIGINT, > | sex VARCHAR, > | age INT, > | created_time TIMESTAMP(3), > | WATERMARK FOR created_time as created_time - INTERVAL '3' > SECOND > |) WITH ( > | 'connector.type' = 'kafka', > | 'connector.version' = 'universal', > | 'connector.topic' = 'user', > | -- 'connector.topic' = 'user_long', > | 'connector.startup-mode' = 'latest-offset', > | 'connector.properties.zookeeper.connect' = > 'cdh1:2181,cdh2:2181,cdh3:2181', > | 'connector.properties.bootstrap.servers' = > 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'connector.properties.group.id' = 'user_flink', > | 'format.type' = 'json', > | 'format.derive-schema' = 'true' > |) > |""".stripMargin) > > > streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > > streamTableEnv.executeSql( > """ > | > |INSERT INTO hive_table > |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > DATE_FORMAT(created_time, 'HH') > |FROM kafka_table > | > |""".stripMargin) > > streamTableEnv.executeSql( > """ > | > |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18' > | > |""".stripMargin) > .print() > > > > > > > > > > > > > > 在 2020-07-13 17:52:54,"Jingsong Li" <[hidden email]> 写道: > >你把完整的程序再贴下呢 > > > >Best, > >Jingsong > > > >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> wrote: > > > >> Hi, > >> > >> > >> 我现在改成了: > >> 'sink.partition-commit.delay'='0s' > >> > >> > >> checkpoint完成了20多次,hdfs文件也产生了20多个, > >> hive表还是查不到数据 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道: > >> > >> 你好, > >> 你设置了1个小时的 > >> SINK_PARTITION_COMMIT_DELAY > >> > >> > >> ------------------------------------------------------------------ > >> 发件人:Zhou Zach <[hidden email]> > >> 发送时间:2020年7月13日(星期一) 17:09 > >> 收件人:user-zh <[hidden email]> > >> 主 题:Re:Re: Re: Table options do not contain an option key 'connector' > for > >> discovering a connector. > >> > >> > >> 开了checkpoint, > >> val streamExecutionEnv = > StreamExecutionEnvironment.getExecutionEnvironment > >> > >> > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > >> streamExecutionEnv.enableCheckpointing(5 * 1000, > >> CheckpointingMode.EXACTLY_ONCE) > >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) > >> > >> > >> > >> > >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道: > >> >有开checkpoint吧?delay设的多少? > >> > > >> >Add partition 在 checkpoint完成 + delay的时间后 > >> > > >> >Best, > >> >Jingsong > >> > > >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote: > >> > > >> >> Hi, > >> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add > >> >> partition到hive表吗,我当前设置了参数 > >> >> 'sink.partition-commit.policy.kind'='metastore' > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> > wrote: > >> >> >Hi, > >> >> > > >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog > >> >> > > >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 > >> >> > > >> >> >Best, > >> >> >Jingsong > >> >> > > >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> > wrote: > >> >> > > >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 > >> >> >> > >> >> > >> > WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 > >> >> >> h','sink.partition-commit.policy.kind'='success-file'); > >> >> >> 也报错误 > >> >> >> query: > >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> >> >> streamTableEnv.executeSql( > >> >> >> """ > >> >> >> | > >> >> >> | > >> >> >> |CREATE TABLE hive_table ( > >> >> >> | user_id STRING, > >> >> >> | age INT > >> >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet > >> >> >> TBLPROPERTIES ( > >> >> >> | 'partition.time-extractor.timestamp-pattern'='$dt > $hr:00:00', > >> >> >> | 'sink.partition-commit.trigger'='partition-time', > >> >> >> | 'sink.partition-commit.delay'='1 h', > >> >> >> | > 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> >> |) > >> >> >> | > >> >> >> |""".stripMargin) > >> >> >> > >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> >> >> streamTableEnv.executeSql( > >> >> >> """ > >> >> >> | > >> >> >> |CREATE TABLE kafka_table ( > >> >> >> | uid VARCHAR, > >> >> >> | -- uid BIGINT, > >> >> >> | sex VARCHAR, > >> >> >> | age INT, > >> >> >> | created_time TIMESTAMP(3), > >> >> >> | WATERMARK FOR created_time as created_time - INTERVAL '3' > >> >> SECOND > >> >> >> |) WITH ( > >> >> >> | 'connector.type' = 'kafka', > >> >> >> | 'connector.version' = 'universal', > >> >> >> | 'connector.topic' = 'user', > >> >> >> | -- 'connector.topic' = 'user_long', > >> >> >> | 'connector.startup-mode' = 'latest-offset', > >> >> >> | 'connector.properties.zookeeper.connect' = > >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> >> >> | 'connector.properties.bootstrap.servers' = > >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> >> >> | 'connector.properties.group.id' = 'user_flink', > >> >> >> | 'format.type' = 'json', > >> >> >> | 'format.derive-schema' = 'true' > >> >> >> |) > >> >> >> |""".stripMargin) > >> >> >> > >> >> >> > >> >> >> > >> >> >> streamTableEnv.executeSql( > >> >> >> """ > >> >> >> | > >> >> >> |INSERT INTO hive_table > >> >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > >> >> >> DATE_FORMAT(created_time, 'HH') > >> >> >> |FROM kafka_table > >> >> >> | > >> >> >> |""".stripMargin) > >> >> >> > >> >> >> streamTableEnv.executeSql( > >> >> >> """ > >> >> >> | > >> >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' > >> >> >> | > >> >> >> |""".stripMargin) > >> >> >> .print() > >> >> >> 错误栈: > >> >> >> Exception in thread "main" > >> >> org.apache.flink.table.api.ValidationException: > >> >> >> Unable to create a sink for writing table > >> >> >> 'default_catalog.default_database.hive_table'. > >> >> >> > >> >> >> Table options are: > >> >> >> > >> >> >> 'hive.storage.file-format'='parquet' > >> >> >> 'is_generic'='false' > >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> >> >> 'sink.partition-commit.delay'='1 h' > >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> >> 'sink.partition-commit.trigger'='partition-time' > >> >> >> at > >> >> >> > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > >> >> >> at > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> >> at > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> >> at > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> >> at > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> >> at > >> >> >> > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> >> at > >> >> >> > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> >> at > >> scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> >> at > >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> >> at > >> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> >> at > >> scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> >> at > >> >> >> > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> >> at > >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> >> at > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> >> at > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> >> at > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> >> at > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> >> at > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> >> at > >> >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) > >> >> >> at > >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) > >> >> >> Caused by: org.apache.flink.table.api.ValidationException: Table > >> options > >> >> >> do not contain an option key 'connector' for discovering a > connector. > >> >> >> at > >> >> >> > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > >> >> >> at > >> >> >> > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> >> >> ... 19 more > >> >> >> > >> >> >> > >> >> > > >> >> >-- > >> >> >Best, Jingsong Lee > >> >> > >> > > >> > > >> >-- > >> >Best, Jingsong Lee > >> > > > > > >-- > >Best, Jingsong Lee > -- Best, Jingsong Lee |
创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
如果是default Dialect创建的表,是不是只是在临时会话有效 在 2020-07-13 19:27:44,"Jingsong Li" <[hidden email]> 写道: >Hi, > >问题一: > >只要current catalog是HiveCatalog。 >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS. > >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗? > >问题二: > >用filesystem创建出来的是filesystem的表,它和hive >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。 > >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。 >但是它的partition commit是不支持metastore的,所以不会有自动add >partition到hive的默认实现,你需要自定义partition-commit-policy. > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach <[hidden email]> wrote: > >> 尴尬。。。。 >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li, @夏帅 >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢! >> 还有两个问题问下, >> 问题1: >> 创建的kafka_table,在hive和Flink >> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore >> >> >> >> >> >> >> 问题2: >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了: >> java.util.concurrent.CompletionException: >> org.apache.flink.client.deployment.application.ApplicationExecutionException: >> Could not execute application. >> at >> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) >> ~[?:1.8.0_161] >> at >> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) >> ~[?:1.8.0_161] >> at >> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) >> ~[?:1.8.0_161] >> at >> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) >> ~[?:1.8.0_161] >> at >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >> ~[?:1.8.0_161] >> at >> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >> ~[?:1.8.0_161] >> at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> [?:1.8.0_161] >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> [?:1.8.0_161] >> at >> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154) >> [qile-data-flow-1.0.jar:?] >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >> [qile-data-flow-1.0.jar:?] >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >> [qile-data-flow-1.0.jar:?] >> at >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> [qile-data-flow-1.0.jar:?] >> at >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> [qile-data-flow-1.0.jar:?] >> at >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> [qile-data-flow-1.0.jar:?] >> at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> [qile-data-flow-1.0.jar:?] >> Caused by: >> org.apache.flink.client.deployment.application.ApplicationExecutionException: >> Could not execute application. >> ... 11 more >> Caused by: org.apache.flink.client.program.ProgramInvocationException: The >> main method caused an error: Unable to create a sink for writing table >> 'default_catalog.default_database.hive_table1'. >> >> Table options are: >> >> 'connector'='filesystem' >> 'hive.storage.file-format'='parquet' >> 'is_generic'='false' >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' >> 'sink.partition-commit.delay'='0s' >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> ... 10 more >> Caused by: org.apache.flink.table.api.ValidationException: Unable to >> create a sink for writing table >> 'default_catalog.default_database.hive_table1'. >> >> Table options are: >> >> 'connector'='filesystem' >> 'hive.storage.file-format'='parquet' >> 'is_generic'='false' >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' >> 'sink.partition-commit.delay'='0s' >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> at >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> ~[qile-data-flow-1.0.jar:?] >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> ~[qile-data-flow-1.0.jar:?] >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> ~[qile-data-flow-1.0.jar:?] >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> ~[qile-data-flow-1.0.jar:?] >> at >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> ~[qile-data-flow-1.0.jar:?] >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> ~[qile-data-flow-1.0.jar:?] >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> ~[qile-data-flow-1.0.jar:?] >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> ~[qile-data-flow-1.0.jar:?] >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) >> ~[qile-data-flow-1.0.jar:?] >> at >> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) >> ~[qile-data-flow-1.0.jar:?] >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:1.8.0_161] >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_161] >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_161] >> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161] >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> ... 10 more >> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover >> a connector using option ''connector'='filesystem''. >> at >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> ~[qile-data-flow-1.0.jar:?] >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> ~[qile-data-flow-1.0.jar:?] >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> ~[qile-data-flow-1.0.jar:?] >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> ~[qile-data-flow-1.0.jar:?] >> at >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> ~[qile-data-flow-1.0.jar:?] >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> ~[qile-data-flow-1.0.jar:?] >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> ~[qile-data-flow-1.0.jar:?] >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> ~[qile-data-flow-1.0.jar:?] >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) >> ~[qile-data-flow-1.0.jar:?] >> at >> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) >> ~[qile-data-flow-1.0.jar:?] >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:1.8.0_161] >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_161] >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_161] >> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161] >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> ... 10 more >> Caused by: org.apache.flink.table.api.ValidationException: Could not find >> any factory for identifier 'filesystem' that implements >> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath. >> >> Available factory identifiers are: >> >> blackhole >> hbase-1.4 >> jdbc >> kafka >> at >> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> ~[qile-data-flow-1.0.jar:?] >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> ~[qile-data-flow-1.0.jar:?] >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> ~[qile-data-flow-1.0.jar:?] >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> ~[qile-data-flow-1.0.jar:?] >> at >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> ~[qile-data-flow-1.0.jar:?] >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> ~[qile-data-flow-1.0.jar:?] >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> ~[qile-data-flow-1.0.jar:?] >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> ~[qile-data-flow-1.0.jar:?] >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> at >> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) >> ~[qile-data-flow-1.0.jar:?] >> at >> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) >> ~[qile-data-flow-1.0.jar:?] >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:1.8.0_161] >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_161] >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_161] >> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_161] >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> ... 10 more >> >> >> >> >> >> >> query: >> >> >> >> >> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >> >> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> streamExecutionEnv.enableCheckpointing(5 * 1000, >> CheckpointingMode.EXACTLY_ONCE) >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) >> >> val blinkEnvSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, >> blinkEnvSettings) >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> streamTableEnv.executeSql( >> """ >> | >> | >> |CREATE TABLE hive_table ( >> | user_id STRING, >> | age INT >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet >> TBLPROPERTIES ( >> | 'connector'='filesystem', >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', >> | 'sink.partition-commit.delay'='0s', >> | 'sink.partition-commit.policy.kind'='metastore,success-file' >> |) >> | >> |""".stripMargin) >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) >> streamTableEnv.executeSql( >> """ >> | >> |CREATE TABLE kafka_table ( >> | uid VARCHAR, >> | -- uid BIGINT, >> | sex VARCHAR, >> | age INT, >> | created_time TIMESTAMP(3), >> | WATERMARK FOR created_time as created_time - INTERVAL '3' >> SECOND >> |) WITH ( >> | 'connector.type' = 'kafka', >> | 'connector.version' = 'universal', >> | 'connector.topic' = 'user', >> | -- 'connector.topic' = 'user_long', >> | 'connector.startup-mode' = 'latest-offset', >> | 'connector.properties.zookeeper.connect' = >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> | 'connector.properties.bootstrap.servers' = >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> | 'connector.properties.group.id' = 'user_flink', >> | 'format.type' = 'json', >> | 'format.derive-schema' = 'true' >> |) >> |""".stripMargin) >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> >> streamTableEnv.executeSql( >> """ >> | >> |INSERT INTO hive_table >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), >> DATE_FORMAT(created_time, 'HH') >> |FROM kafka_table >> | >> |""".stripMargin) >> >> streamTableEnv.executeSql( >> """ >> | >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18' >> | >> |""".stripMargin) >> .print() >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-13 17:52:54,"Jingsong Li" <[hidden email]> 写道: >> >你把完整的程序再贴下呢 >> > >> >Best, >> >Jingsong >> > >> >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> wrote: >> > >> >> Hi, >> >> >> >> >> >> 我现在改成了: >> >> 'sink.partition-commit.delay'='0s' >> >> >> >> >> >> checkpoint完成了20多次,hdfs文件也产生了20多个, >> >> hive表还是查不到数据 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道: >> >> >> >> 你好, >> >> 你设置了1个小时的 >> >> SINK_PARTITION_COMMIT_DELAY >> >> >> >> >> >> ------------------------------------------------------------------ >> >> 发件人:Zhou Zach <[hidden email]> >> >> 发送时间:2020年7月13日(星期一) 17:09 >> >> 收件人:user-zh <[hidden email]> >> >> 主 题:Re:Re: Re: Table options do not contain an option key 'connector' >> for >> >> discovering a connector. >> >> >> >> >> >> 开了checkpoint, >> >> val streamExecutionEnv = >> StreamExecutionEnvironment.getExecutionEnvironment >> >> >> >> >> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> >> streamExecutionEnv.enableCheckpointing(5 * 1000, >> >> CheckpointingMode.EXACTLY_ONCE) >> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) >> >> >> >> >> >> >> >> >> >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道: >> >> >有开checkpoint吧?delay设的多少? >> >> > >> >> >Add partition 在 checkpoint完成 + delay的时间后 >> >> > >> >> >Best, >> >> >Jingsong >> >> > >> >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> wrote: >> >> > >> >> >> Hi, >> >> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add >> >> >> partition到hive表吗,我当前设置了参数 >> >> >> 'sink.partition-commit.policy.kind'='metastore' >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> >> wrote: >> >> >> >Hi, >> >> >> > >> >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog >> >> >> > >> >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 >> >> >> > >> >> >> >Best, >> >> >> >Jingsong >> >> >> > >> >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> >> wrote: >> >> >> > >> >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 >> >> >> >> >> >> >> >> >> >> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 >> >> >> >> h','sink.partition-commit.policy.kind'='success-file'); >> >> >> >> 也报错误 >> >> >> >> query: >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> >> >> >> streamTableEnv.executeSql( >> >> >> >> """ >> >> >> >> | >> >> >> >> | >> >> >> >> |CREATE TABLE hive_table ( >> >> >> >> | user_id STRING, >> >> >> >> | age INT >> >> >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet >> >> >> >> TBLPROPERTIES ( >> >> >> >> | 'partition.time-extractor.timestamp-pattern'='$dt >> $hr:00:00', >> >> >> >> | 'sink.partition-commit.trigger'='partition-time', >> >> >> >> | 'sink.partition-commit.delay'='1 h', >> >> >> >> | >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> >> >> |) >> >> >> >> | >> >> >> >> |""".stripMargin) >> >> >> >> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) >> >> >> >> streamTableEnv.executeSql( >> >> >> >> """ >> >> >> >> | >> >> >> >> |CREATE TABLE kafka_table ( >> >> >> >> | uid VARCHAR, >> >> >> >> | -- uid BIGINT, >> >> >> >> | sex VARCHAR, >> >> >> >> | age INT, >> >> >> >> | created_time TIMESTAMP(3), >> >> >> >> | WATERMARK FOR created_time as created_time - INTERVAL '3' >> >> >> SECOND >> >> >> >> |) WITH ( >> >> >> >> | 'connector.type' = 'kafka', >> >> >> >> | 'connector.version' = 'universal', >> >> >> >> | 'connector.topic' = 'user', >> >> >> >> | -- 'connector.topic' = 'user_long', >> >> >> >> | 'connector.startup-mode' = 'latest-offset', >> >> >> >> | 'connector.properties.zookeeper.connect' = >> >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> >> >> >> | 'connector.properties.bootstrap.servers' = >> >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> >> >> >> | 'connector.properties.group.id' = 'user_flink', >> >> >> >> | 'format.type' = 'json', >> >> >> >> | 'format.derive-schema' = 'true' >> >> >> >> |) >> >> >> >> |""".stripMargin) >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> streamTableEnv.executeSql( >> >> >> >> """ >> >> >> >> | >> >> >> >> |INSERT INTO hive_table >> >> >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), >> >> >> >> DATE_FORMAT(created_time, 'HH') >> >> >> >> |FROM kafka_table >> >> >> >> | >> >> >> >> |""".stripMargin) >> >> >> >> >> >> >> >> streamTableEnv.executeSql( >> >> >> >> """ >> >> >> >> | >> >> >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' >> >> >> >> | >> >> >> >> |""".stripMargin) >> >> >> >> .print() >> >> >> >> 错误栈: >> >> >> >> Exception in thread "main" >> >> >> org.apache.flink.table.api.ValidationException: >> >> >> >> Unable to create a sink for writing table >> >> >> >> 'default_catalog.default_database.hive_table'. >> >> >> >> >> >> >> >> Table options are: >> >> >> >> >> >> >> >> 'hive.storage.file-format'='parquet' >> >> >> >> 'is_generic'='false' >> >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' >> >> >> >> 'sink.partition-commit.delay'='1 h' >> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> >> >> 'sink.partition-commit.trigger'='partition-time' >> >> >> >> at >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) >> >> >> >> at >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> >> >> >> at >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> >> >> >> at >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> >> >> at >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> >> >> at >> >> >> >> >> >> >> >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> >> >> at >> >> >> >> >> >> >> >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> >> >> at >> >> scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> >> >> at >> >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> >> >> at >> >> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> >> >> at >> >> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> >> >> at >> >> >> >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> >> >> >> at >> >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> >> >> at >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> >> >> >> at >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> >> >> >> at >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> >> >> >> at >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> >> >> >> at >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> >> >> >> at >> >> >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) >> >> >> >> at >> >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) >> >> >> >> Caused by: org.apache.flink.table.api.ValidationException: Table >> >> options >> >> >> >> do not contain an option key 'connector' for discovering a >> connector. >> >> >> >> at >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) >> >> >> >> at >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) >> >> >> >> ... 19 more >> >> >> >> >> >> >> >> >> >> >> > >> >> >> >-- >> >> >> >Best, Jingsong Lee >> >> >> >> >> > >> >> > >> >> >-- >> >> >Best, Jingsong Lee >> >> >> > >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee |
创建kafka_table需要在default dialect下。
不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法) Best, Jingsong On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach <[hidden email]> wrote: > 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了, > 如果是default Dialect创建的表,是不是只是在临时会话有效 > > > > > > > > > > > > > > > > > > 在 2020-07-13 19:27:44,"Jingsong Li" <[hidden email]> 写道: > >Hi, > > > >问题一: > > > >只要current catalog是HiveCatalog。 > >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS. > > > >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗? > > > >问题二: > > > >用filesystem创建出来的是filesystem的表,它和hive > >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。 > > > >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。 > >但是它的partition commit是不支持metastore的,所以不会有自动add > >partition到hive的默认实现,你需要自定义partition-commit-policy. > > > >[1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html > > > >Best, > >Jingsong > > > >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach <[hidden email]> wrote: > > > >> 尴尬。。。。 > >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li, @夏帅 > >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢! > >> 还有两个问题问下, > >> 问题1: > >> 创建的kafka_table,在hive和Flink > >> > SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore > >> > >> > >> > >> > >> > >> > >> 问题2: > >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了: > >> java.util.concurrent.CompletionException: > >> > org.apache.flink.client.deployment.application.ApplicationExecutionException: > >> Could not execute application. > >> at > >> > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > >> ~[?:1.8.0_161] > >> at > >> > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > >> ~[?:1.8.0_161] > >> at > >> > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) > >> ~[?:1.8.0_161] > >> at > >> > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > >> ~[?:1.8.0_161] > >> at > >> > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > >> ~[?:1.8.0_161] > >> at > >> > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > >> ~[?:1.8.0_161] > >> at > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > >> [?:1.8.0_161] > >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> [?:1.8.0_161] > >> at > >> > org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154) > >> [qile-data-flow-1.0.jar:?] > >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > >> [qile-data-flow-1.0.jar:?] > >> at > >> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > >> [qile-data-flow-1.0.jar:?] > >> at > >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >> [qile-data-flow-1.0.jar:?] > >> at > >> > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >> [qile-data-flow-1.0.jar:?] > >> at > >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> [qile-data-flow-1.0.jar:?] > >> at > >> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >> [qile-data-flow-1.0.jar:?] > >> Caused by: > >> > org.apache.flink.client.deployment.application.ApplicationExecutionException: > >> Could not execute application. > >> ... 11 more > >> Caused by: org.apache.flink.client.program.ProgramInvocationException: > The > >> main method caused an error: Unable to create a sink for writing table > >> 'default_catalog.default_database.hive_table1'. > >> > >> Table options are: > >> > >> 'connector'='filesystem' > >> 'hive.storage.file-format'='parquet' > >> 'is_generic'='false' > >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> 'sink.partition-commit.delay'='0s' > >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> at > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> ... 10 more > >> Caused by: org.apache.flink.table.api.ValidationException: Unable to > >> create a sink for writing table > >> 'default_catalog.default_database.hive_table1'. > >> > >> Table options are: > >> > >> 'connector'='filesystem' > >> 'hive.storage.file-format'='parquet' > >> 'is_generic'='false' > >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> 'sink.partition-commit.delay'='0s' > >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> at > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> ~[qile-data-flow-1.0.jar:?] > >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> ~[qile-data-flow-1.0.jar:?] > >> at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> ~[qile-data-flow-1.0.jar:?] > >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> ~[qile-data-flow-1.0.jar:?] > >> at > scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) > >> ~[qile-data-flow-1.0.jar:?] > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> ~[?:1.8.0_161] > >> at > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> ~[?:1.8.0_161] > >> at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> ~[?:1.8.0_161] > >> at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_161] > >> at > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> ... 10 more > >> Caused by: org.apache.flink.table.api.ValidationException: Cannot > discover > >> a connector using option ''connector'='filesystem''. > >> at > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> ~[qile-data-flow-1.0.jar:?] > >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> ~[qile-data-flow-1.0.jar:?] > >> at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> ~[qile-data-flow-1.0.jar:?] > >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> ~[qile-data-flow-1.0.jar:?] > >> at > scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) > >> ~[qile-data-flow-1.0.jar:?] > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> ~[?:1.8.0_161] > >> at > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> ~[?:1.8.0_161] > >> at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> ~[?:1.8.0_161] > >> at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_161] > >> at > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> ... 10 more > >> Caused by: org.apache.flink.table.api.ValidationException: Could not > find > >> any factory for identifier 'filesystem' that implements > >> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the > classpath. > >> > >> Available factory identifiers are: > >> > >> blackhole > >> hbase-1.4 > >> jdbc > >> kafka > >> at > >> > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> ~[qile-data-flow-1.0.jar:?] > >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> ~[qile-data-flow-1.0.jar:?] > >> at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> ~[qile-data-flow-1.0.jar:?] > >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> ~[qile-data-flow-1.0.jar:?] > >> at > scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> at > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) > >> ~[qile-data-flow-1.0.jar:?] > >> at > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) > >> ~[qile-data-flow-1.0.jar:?] > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> ~[?:1.8.0_161] > >> at > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> ~[?:1.8.0_161] > >> at > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> ~[?:1.8.0_161] > >> at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_161] > >> at > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> at > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> ... 10 more > >> > >> > >> > >> > >> > >> > >> query: > >> > >> > >> > >> > >> val streamExecutionEnv = > StreamExecutionEnvironment.getExecutionEnvironment > >> > >> > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > >> streamExecutionEnv.enableCheckpointing(5 * 1000, > >> CheckpointingMode.EXACTLY_ONCE) > >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * > 1000) > >> > >> val blinkEnvSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >> val streamTableEnv = > StreamTableEnvironment.create(streamExecutionEnv, > >> blinkEnvSettings) > >> > >> > >> > >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> streamTableEnv.executeSql( > >> """ > >> | > >> | > >> |CREATE TABLE hive_table ( > >> | user_id STRING, > >> | age INT > >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet > >> TBLPROPERTIES ( > >> | 'connector'='filesystem', > >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > >> | 'sink.partition-commit.delay'='0s', > >> | 'sink.partition-commit.policy.kind'='metastore,success-file' > >> |) > >> | > >> |""".stripMargin) > >> > >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> streamTableEnv.executeSql( > >> """ > >> | > >> |CREATE TABLE kafka_table ( > >> | uid VARCHAR, > >> | -- uid BIGINT, > >> | sex VARCHAR, > >> | age INT, > >> | created_time TIMESTAMP(3), > >> | WATERMARK FOR created_time as created_time - INTERVAL '3' > >> SECOND > >> |) WITH ( > >> | 'connector.type' = 'kafka', > >> | 'connector.version' = 'universal', > >> | 'connector.topic' = 'user', > >> | -- 'connector.topic' = 'user_long', > >> | 'connector.startup-mode' = 'latest-offset', > >> | 'connector.properties.zookeeper.connect' = > >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> | 'connector.properties.bootstrap.servers' = > >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> | 'connector.properties.group.id' = 'user_flink', > >> | 'format.type' = 'json', > >> | 'format.derive-schema' = 'true' > >> |) > >> |""".stripMargin) > >> > >> > >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> > >> streamTableEnv.executeSql( > >> """ > >> | > >> |INSERT INTO hive_table > >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > >> DATE_FORMAT(created_time, 'HH') > >> |FROM kafka_table > >> | > >> |""".stripMargin) > >> > >> streamTableEnv.executeSql( > >> """ > >> | > >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18' > >> | > >> |""".stripMargin) > >> .print() > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-07-13 17:52:54,"Jingsong Li" <[hidden email]> 写道: > >> >你把完整的程序再贴下呢 > >> > > >> >Best, > >> >Jingsong > >> > > >> >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> wrote: > >> > > >> >> Hi, > >> >> > >> >> > >> >> 我现在改成了: > >> >> 'sink.partition-commit.delay'='0s' > >> >> > >> >> > >> >> checkpoint完成了20多次,hdfs文件也产生了20多个, > >> >> hive表还是查不到数据 > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道: > >> >> > >> >> 你好, > >> >> 你设置了1个小时的 > >> >> SINK_PARTITION_COMMIT_DELAY > >> >> > >> >> > >> >> ------------------------------------------------------------------ > >> >> 发件人:Zhou Zach <[hidden email]> > >> >> 发送时间:2020年7月13日(星期一) 17:09 > >> >> 收件人:user-zh <[hidden email]> > >> >> 主 题:Re:Re: Re: Table options do not contain an option key 'connector' > >> for > >> >> discovering a connector. > >> >> > >> >> > >> >> 开了checkpoint, > >> >> val streamExecutionEnv = > >> StreamExecutionEnvironment.getExecutionEnvironment > >> >> > >> >> > >> > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > >> >> streamExecutionEnv.enableCheckpointing(5 * 1000, > >> >> CheckpointingMode.EXACTLY_ONCE) > >> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * > 1000) > >> >> > >> >> > >> >> > >> >> > >> >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道: > >> >> >有开checkpoint吧?delay设的多少? > >> >> > > >> >> >Add partition 在 checkpoint完成 + delay的时间后 > >> >> > > >> >> >Best, > >> >> >Jingsong > >> >> > > >> >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> > wrote: > >> >> > > >> >> >> Hi, > >> >> >> > 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add > >> >> >> partition到hive表吗,我当前设置了参数 > >> >> >> 'sink.partition-commit.policy.kind'='metastore' > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> > >> wrote: > >> >> >> >Hi, > >> >> >> > > >> >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog > >> >> >> > > >> >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 > >> >> >> > > >> >> >> >Best, > >> >> >> >Jingsong > >> >> >> > > >> >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> > >> wrote: > >> >> >> > > >> >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 > >> >> >> >> > >> >> >> > >> >> > >> > WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 > >> >> >> >> h','sink.partition-commit.policy.kind'='success-file'); > >> >> >> >> 也报错误 > >> >> >> >> query: > >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> >> >> >> streamTableEnv.executeSql( > >> >> >> >> """ > >> >> >> >> | > >> >> >> >> | > >> >> >> >> |CREATE TABLE hive_table ( > >> >> >> >> | user_id STRING, > >> >> >> >> | age INT > >> >> >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet > >> >> >> >> TBLPROPERTIES ( > >> >> >> >> | 'partition.time-extractor.timestamp-pattern'='$dt > >> $hr:00:00', > >> >> >> >> | 'sink.partition-commit.trigger'='partition-time', > >> >> >> >> | 'sink.partition-commit.delay'='1 h', > >> >> >> >> | > >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> >> >> |) > >> >> >> >> | > >> >> >> >> |""".stripMargin) > >> >> >> >> > >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> >> >> >> streamTableEnv.executeSql( > >> >> >> >> """ > >> >> >> >> | > >> >> >> >> |CREATE TABLE kafka_table ( > >> >> >> >> | uid VARCHAR, > >> >> >> >> | -- uid BIGINT, > >> >> >> >> | sex VARCHAR, > >> >> >> >> | age INT, > >> >> >> >> | created_time TIMESTAMP(3), > >> >> >> >> | WATERMARK FOR created_time as created_time - INTERVAL > '3' > >> >> >> SECOND > >> >> >> >> |) WITH ( > >> >> >> >> | 'connector.type' = 'kafka', > >> >> >> >> | 'connector.version' = 'universal', > >> >> >> >> | 'connector.topic' = 'user', > >> >> >> >> | -- 'connector.topic' = 'user_long', > >> >> >> >> | 'connector.startup-mode' = 'latest-offset', > >> >> >> >> | 'connector.properties.zookeeper.connect' = > >> >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> >> >> >> | 'connector.properties.bootstrap.servers' = > >> >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> >> >> >> | 'connector.properties.group.id' = 'user_flink', > >> >> >> >> | 'format.type' = 'json', > >> >> >> >> | 'format.derive-schema' = 'true' > >> >> >> >> |) > >> >> >> >> |""".stripMargin) > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> streamTableEnv.executeSql( > >> >> >> >> """ > >> >> >> >> | > >> >> >> >> |INSERT INTO hive_table > >> >> >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > >> >> >> >> DATE_FORMAT(created_time, 'HH') > >> >> >> >> |FROM kafka_table > >> >> >> >> | > >> >> >> >> |""".stripMargin) > >> >> >> >> > >> >> >> >> streamTableEnv.executeSql( > >> >> >> >> """ > >> >> >> >> | > >> >> >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' > >> >> >> >> | > >> >> >> >> |""".stripMargin) > >> >> >> >> .print() > >> >> >> >> 错误栈: > >> >> >> >> Exception in thread "main" > >> >> >> org.apache.flink.table.api.ValidationException: > >> >> >> >> Unable to create a sink for writing table > >> >> >> >> 'default_catalog.default_database.hive_table'. > >> >> >> >> > >> >> >> >> Table options are: > >> >> >> >> > >> >> >> >> 'hive.storage.file-format'='parquet' > >> >> >> >> 'is_generic'='false' > >> >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> >> >> >> 'sink.partition-commit.delay'='1 h' > >> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> >> >> 'sink.partition-commit.trigger'='partition-time' > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> >> >> at > >> >> scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> >> >> at > >> >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> >> >> at > >> >> >> >> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> >> >> at > >> >> scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> >> >> at > >> >> >> >> > >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> >> >> at > >> >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> >> >> at > >> >> >> >> > org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) > >> >> >> >> at > >> >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) > >> >> >> >> Caused by: org.apache.flink.table.api.ValidationException: > Table > >> >> options > >> >> >> >> do not contain an option key 'connector' for discovering a > >> connector. > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > >> >> >> >> at > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> >> >> >> ... 19 more > >> >> >> >> > >> >> >> >> > >> >> >> > > >> >> >> >-- > >> >> >> >Best, Jingsong Lee > >> >> >> > >> >> > > >> >> > > >> >> >-- > >> >> >Best, Jingsong Lee > >> >> > >> > > >> > > >> >-- > >> >Best, Jingsong Lee > >> > > > > > >-- > >Best, Jingsong Lee > -- Best, Jingsong Lee |
好的,感谢答疑
在 2020-07-13 19:49:10,"Jingsong Li" <[hidden email]> 写道: >创建kafka_table需要在default dialect下。 > >不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法) > >Best, >Jingsong > >On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach <[hidden email]> wrote: > >> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了, >> 如果是default Dialect创建的表,是不是只是在临时会话有效 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-13 19:27:44,"Jingsong Li" <[hidden email]> 写道: >> >Hi, >> > >> >问题一: >> > >> >只要current catalog是HiveCatalog。 >> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS. >> > >> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗? >> > >> >问题二: >> > >> >用filesystem创建出来的是filesystem的表,它和hive >> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。 >> > >> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。 >> >但是它的partition commit是不支持metastore的,所以不会有自动add >> >partition到hive的默认实现,你需要自定义partition-commit-policy. >> > >> >[1] >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html >> > >> >Best, >> >Jingsong >> > >> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach <[hidden email]> wrote: >> > >> >> 尴尬。。。。 >> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li, @夏帅 >> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢! >> >> 还有两个问题问下, >> >> 问题1: >> >> 创建的kafka_table,在hive和Flink >> >> >> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore >> >> >> >> >> >> >> >> >> >> >> >> >> >> 问题2: >> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了: >> >> java.util.concurrent.CompletionException: >> >> >> org.apache.flink.client.deployment.application.ApplicationExecutionException: >> >> Could not execute application. >> >> at >> >> >> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) >> >> ~[?:1.8.0_161] >> >> at >> >> >> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) >> >> ~[?:1.8.0_161] >> >> at >> >> >> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) >> >> ~[?:1.8.0_161] >> >> at >> >> >> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) >> >> ~[?:1.8.0_161] >> >> at >> >> >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >> >> ~[?:1.8.0_161] >> >> at >> >> >> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >> >> ~[?:1.8.0_161] >> >> at >> >> >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> >> [?:1.8.0_161] >> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> >> [?:1.8.0_161] >> >> at >> >> >> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154) >> >> [qile-data-flow-1.0.jar:?] >> >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >> >> [qile-data-flow-1.0.jar:?] >> >> at >> >> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >> >> [qile-data-flow-1.0.jar:?] >> >> at >> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> [qile-data-flow-1.0.jar:?] >> >> at >> >> >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> [qile-data-flow-1.0.jar:?] >> >> at >> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> [qile-data-flow-1.0.jar:?] >> >> at >> >> >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> [qile-data-flow-1.0.jar:?] >> >> Caused by: >> >> >> org.apache.flink.client.deployment.application.ApplicationExecutionException: >> >> Could not execute application. >> >> ... 11 more >> >> Caused by: org.apache.flink.client.program.ProgramInvocationException: >> The >> >> main method caused an error: Unable to create a sink for writing table >> >> 'default_catalog.default_database.hive_table1'. >> >> >> >> Table options are: >> >> >> >> 'connector'='filesystem' >> >> 'hive.storage.file-format'='parquet' >> >> 'is_generic'='false' >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' >> >> 'sink.partition-commit.delay'='0s' >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> at >> >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> ... 10 more >> >> Caused by: org.apache.flink.table.api.ValidationException: Unable to >> >> create a sink for writing table >> >> 'default_catalog.default_database.hive_table1'. >> >> >> >> Table options are: >> >> >> >> 'connector'='filesystem' >> >> 'hive.storage.file-format'='parquet' >> >> 'is_generic'='false' >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' >> >> 'sink.partition-commit.delay'='0s' >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> >> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> ~[?:1.8.0_161] >> >> at >> >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >> ~[?:1.8.0_161] >> >> at >> >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> ~[?:1.8.0_161] >> >> at java.lang.reflect.Method.invoke(Method.java:498) >> ~[?:1.8.0_161] >> >> at >> >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> ... 10 more >> >> Caused by: org.apache.flink.table.api.ValidationException: Cannot >> discover >> >> a connector using option ''connector'='filesystem''. >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> >> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> ~[?:1.8.0_161] >> >> at >> >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >> ~[?:1.8.0_161] >> >> at >> >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> ~[?:1.8.0_161] >> >> at java.lang.reflect.Method.invoke(Method.java:498) >> ~[?:1.8.0_161] >> >> at >> >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> ... 10 more >> >> Caused by: org.apache.flink.table.api.ValidationException: Could not >> find >> >> any factory for identifier 'filesystem' that implements >> >> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the >> classpath. >> >> >> >> Available factory identifiers are: >> >> >> >> blackhole >> >> hbase-1.4 >> >> jdbc >> >> kafka >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at >> >> >> cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) >> >> ~[qile-data-flow-1.0.jar:?] >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> ~[?:1.8.0_161] >> >> at >> >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >> ~[?:1.8.0_161] >> >> at >> >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> ~[?:1.8.0_161] >> >> at java.lang.reflect.Method.invoke(Method.java:498) >> ~[?:1.8.0_161] >> >> at >> >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> at >> >> >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] >> >> ... 10 more >> >> >> >> >> >> >> >> >> >> >> >> >> >> query: >> >> >> >> >> >> >> >> >> >> val streamExecutionEnv = >> StreamExecutionEnvironment.getExecutionEnvironment >> >> >> >> >> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> >> streamExecutionEnv.enableCheckpointing(5 * 1000, >> >> CheckpointingMode.EXACTLY_ONCE) >> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * >> 1000) >> >> >> >> val blinkEnvSettings = >> >> >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> >> val streamTableEnv = >> StreamTableEnvironment.create(streamExecutionEnv, >> >> blinkEnvSettings) >> >> >> >> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> | >> >> |CREATE TABLE hive_table ( >> >> | user_id STRING, >> >> | age INT >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet >> >> TBLPROPERTIES ( >> >> | 'connector'='filesystem', >> >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', >> >> | 'sink.partition-commit.delay'='0s', >> >> | 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> |) >> >> | >> >> |""".stripMargin) >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |CREATE TABLE kafka_table ( >> >> | uid VARCHAR, >> >> | -- uid BIGINT, >> >> | sex VARCHAR, >> >> | age INT, >> >> | created_time TIMESTAMP(3), >> >> | WATERMARK FOR created_time as created_time - INTERVAL '3' >> >> SECOND >> >> |) WITH ( >> >> | 'connector.type' = 'kafka', >> >> | 'connector.version' = 'universal', >> >> | 'connector.topic' = 'user', >> >> | -- 'connector.topic' = 'user_long', >> >> | 'connector.startup-mode' = 'latest-offset', >> >> | 'connector.properties.zookeeper.connect' = >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> >> | 'connector.properties.bootstrap.servers' = >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> >> | 'connector.properties.group.id' = 'user_flink', >> >> | 'format.type' = 'json', >> >> | 'format.derive-schema' = 'true' >> >> |) >> >> |""".stripMargin) >> >> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> >> >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |INSERT INTO hive_table >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), >> >> DATE_FORMAT(created_time, 'HH') >> >> |FROM kafka_table >> >> | >> >> |""".stripMargin) >> >> >> >> streamTableEnv.executeSql( >> >> """ >> >> | >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18' >> >> | >> >> |""".stripMargin) >> >> .print() >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-13 17:52:54,"Jingsong Li" <[hidden email]> 写道: >> >> >你把完整的程序再贴下呢 >> >> > >> >> >Best, >> >> >Jingsong >> >> > >> >> >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> wrote: >> >> > >> >> >> Hi, >> >> >> >> >> >> >> >> >> 我现在改成了: >> >> >> 'sink.partition-commit.delay'='0s' >> >> >> >> >> >> >> >> >> checkpoint完成了20多次,hdfs文件也产生了20多个, >> >> >> hive表还是查不到数据 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道: >> >> >> >> >> >> 你好, >> >> >> 你设置了1个小时的 >> >> >> SINK_PARTITION_COMMIT_DELAY >> >> >> >> >> >> >> >> >> ------------------------------------------------------------------ >> >> >> 发件人:Zhou Zach <[hidden email]> >> >> >> 发送时间:2020年7月13日(星期一) 17:09 >> >> >> 收件人:user-zh <[hidden email]> >> >> >> 主 题:Re:Re: Re: Table options do not contain an option key 'connector' >> >> for >> >> >> discovering a connector. >> >> >> >> >> >> >> >> >> 开了checkpoint, >> >> >> val streamExecutionEnv = >> >> StreamExecutionEnvironment.getExecutionEnvironment >> >> >> >> >> >> >> >> >> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> >> >> streamExecutionEnv.enableCheckpointing(5 * 1000, >> >> >> CheckpointingMode.EXACTLY_ONCE) >> >> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * >> 1000) >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道: >> >> >> >有开checkpoint吧?delay设的多少? >> >> >> > >> >> >> >Add partition 在 checkpoint完成 + delay的时间后 >> >> >> > >> >> >> >Best, >> >> >> >Jingsong >> >> >> > >> >> >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> >> wrote: >> >> >> > >> >> >> >> Hi, >> >> >> >> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add >> >> >> >> partition到hive表吗,我当前设置了参数 >> >> >> >> 'sink.partition-commit.policy.kind'='metastore' >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> >> >> wrote: >> >> >> >> >Hi, >> >> >> >> > >> >> >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog >> >> >> >> > >> >> >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 >> >> >> >> > >> >> >> >> >Best, >> >> >> >> >Jingsong >> >> >> >> > >> >> >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> >> >> wrote: >> >> >> >> > >> >> >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 >> >> >> >> >> h','sink.partition-commit.policy.kind'='success-file'); >> >> >> >> >> 也报错误 >> >> >> >> >> query: >> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> >> >> >> >> streamTableEnv.executeSql( >> >> >> >> >> """ >> >> >> >> >> | >> >> >> >> >> | >> >> >> >> >> |CREATE TABLE hive_table ( >> >> >> >> >> | user_id STRING, >> >> >> >> >> | age INT >> >> >> >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet >> >> >> >> >> TBLPROPERTIES ( >> >> >> >> >> | 'partition.time-extractor.timestamp-pattern'='$dt >> >> $hr:00:00', >> >> >> >> >> | 'sink.partition-commit.trigger'='partition-time', >> >> >> >> >> | 'sink.partition-commit.delay'='1 h', >> >> >> >> >> | >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> >> >> >> |) >> >> >> >> >> | >> >> >> >> >> |""".stripMargin) >> >> >> >> >> >> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) >> >> >> >> >> streamTableEnv.executeSql( >> >> >> >> >> """ >> >> >> >> >> | >> >> >> >> >> |CREATE TABLE kafka_table ( >> >> >> >> >> | uid VARCHAR, >> >> >> >> >> | -- uid BIGINT, >> >> >> >> >> | sex VARCHAR, >> >> >> >> >> | age INT, >> >> >> >> >> | created_time TIMESTAMP(3), >> >> >> >> >> | WATERMARK FOR created_time as created_time - INTERVAL >> '3' >> >> >> >> SECOND >> >> >> >> >> |) WITH ( >> >> >> >> >> | 'connector.type' = 'kafka', >> >> >> >> >> | 'connector.version' = 'universal', >> >> >> >> >> | 'connector.topic' = 'user', >> >> >> >> >> | -- 'connector.topic' = 'user_long', >> >> >> >> >> | 'connector.startup-mode' = 'latest-offset', >> >> >> >> >> | 'connector.properties.zookeeper.connect' = >> >> >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', >> >> >> >> >> | 'connector.properties.bootstrap.servers' = >> >> >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', >> >> >> >> >> | 'connector.properties.group.id' = 'user_flink', >> >> >> >> >> | 'format.type' = 'json', >> >> >> >> >> | 'format.derive-schema' = 'true' >> >> >> >> >> |) >> >> >> >> >> |""".stripMargin) >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> streamTableEnv.executeSql( >> >> >> >> >> """ >> >> >> >> >> | >> >> >> >> >> |INSERT INTO hive_table >> >> >> >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), >> >> >> >> >> DATE_FORMAT(created_time, 'HH') >> >> >> >> >> |FROM kafka_table >> >> >> >> >> | >> >> >> >> >> |""".stripMargin) >> >> >> >> >> >> >> >> >> >> streamTableEnv.executeSql( >> >> >> >> >> """ >> >> >> >> >> | >> >> >> >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' >> >> >> >> >> | >> >> >> >> >> |""".stripMargin) >> >> >> >> >> .print() >> >> >> >> >> 错误栈: >> >> >> >> >> Exception in thread "main" >> >> >> >> org.apache.flink.table.api.ValidationException: >> >> >> >> >> Unable to create a sink for writing table >> >> >> >> >> 'default_catalog.default_database.hive_table'. >> >> >> >> >> >> >> >> >> >> Table options are: >> >> >> >> >> >> >> >> >> >> 'hive.storage.file-format'='parquet' >> >> >> >> >> 'is_generic'='false' >> >> >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' >> >> >> >> >> 'sink.partition-commit.delay'='1 h' >> >> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> >> >> >> >> 'sink.partition-commit.trigger'='partition-time' >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> >> >> >> >> at >> >> >> scala.collection.Iterator$class.foreach(Iterator.scala:891) >> >> >> >> >> at >> >> >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> >> >> >> >> at >> >> >> >> >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> >> >> >> >> at >> >> >> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> >> >> >> >> at >> >> >> >> >> >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> >> >> >> >> at >> >> >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104) >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) >> >> >> >> >> at >> >> >> >> >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) >> >> >> >> >> at >> >> >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) >> >> >> >> >> Caused by: org.apache.flink.table.api.ValidationException: >> Table >> >> >> options >> >> >> >> >> do not contain an option key 'connector' for discovering a >> >> connector. >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) >> >> >> >> >> at >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) >> >> >> >> >> ... 19 more >> >> >> >> >> >> >> >> >> >> >> >> >> >> > >> >> >> >> >-- >> >> >> >> >Best, Jingsong Lee >> >> >> >> >> >> >> > >> >> >> > >> >> >> >-- >> >> >> >Best, Jingsong Lee >> >> >> >> >> > >> >> > >> >> >-- >> >> >Best, Jingsong Lee >> >> >> > >> > >> >-- >> >Best, Jingsong Lee >> > > >-- >Best, Jingsong Lee |
hi,Zhou Zach :
问一下,你把你的程序,并行度设置成 1,还能正常读取hive的数据吗? Zhou Zach <[hidden email]> 于2020年7月13日周一 下午8:17写道: > 好的,感谢答疑 > > > > > > > > > > > > > > > > > > 在 2020-07-13 19:49:10,"Jingsong Li" <[hidden email]> 写道: > >创建kafka_table需要在default dialect下。 > > > >不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法) > > > >Best, > >Jingsong > > > >On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach <[hidden email]> wrote: > > > >> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了, > >> 如果是default Dialect创建的表,是不是只是在临时会话有效 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-07-13 19:27:44,"Jingsong Li" <[hidden email]> 写道: > >> >Hi, > >> > > >> >问题一: > >> > > >> >只要current catalog是HiveCatalog。 > >> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS. > >> > > >> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗? > >> > > >> >问题二: > >> > > >> >用filesystem创建出来的是filesystem的表,它和hive > >> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。 > >> > > >> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。 > >> >但是它的partition commit是不支持metastore的,所以不会有自动add > >> >partition到hive的默认实现,你需要自定义partition-commit-policy. > >> > > >> >[1] > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html > >> > > >> >Best, > >> >Jingsong > >> > > >> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach <[hidden email]> wrote: > >> > > >> >> 尴尬。。。。 > >> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li, @夏帅 > >> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢! > >> >> 还有两个问题问下, > >> >> 问题1: > >> >> 创建的kafka_table,在hive和Flink > >> >> > >> > SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> 问题2: > >> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector > 也是可以创建hive表,我尝试了一下,报错了: > >> >> java.util.concurrent.CompletionException: > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationExecutionException: > >> >> Could not execute application. > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > >> >> [?:1.8.0_161] > >> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> >> [?:1.8.0_161] > >> >> at > >> >> > >> > org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >> >> [qile-data-flow-1.0.jar:?] > >> >> Caused by: > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationExecutionException: > >> >> Could not execute application. > >> >> ... 11 more > >> >> Caused by: > org.apache.flink.client.program.ProgramInvocationException: > >> The > >> >> main method caused an error: Unable to create a sink for writing > table > >> >> 'default_catalog.default_database.hive_table1'. > >> >> > >> >> Table options are: > >> >> > >> >> 'connector'='filesystem' > >> >> 'hive.storage.file-format'='parquet' > >> >> 'is_generic'='false' > >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> >> 'sink.partition-commit.delay'='0s' > >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> ... 10 more > >> >> Caused by: org.apache.flink.table.api.ValidationException: Unable to > >> >> create a sink for writing table > >> >> 'default_catalog.default_database.hive_table1'. > >> >> > >> >> Table options are: > >> >> > >> >> 'connector'='filesystem' > >> >> 'hive.storage.file-format'='parquet' > >> >> 'is_generic'='false' > >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> >> 'sink.partition-commit.delay'='0s' > >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> >> ~[?:1.8.0_161] > >> >> at java.lang.reflect.Method.invoke(Method.java:498) > >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> ... 10 more > >> >> Caused by: org.apache.flink.table.api.ValidationException: Cannot > >> discover > >> >> a connector using option ''connector'='filesystem''. > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> >> ~[?:1.8.0_161] > >> >> at java.lang.reflect.Method.invoke(Method.java:498) > >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> ... 10 more > >> >> Caused by: org.apache.flink.table.api.ValidationException: Could not > >> find > >> >> any factory for identifier 'filesystem' that implements > >> >> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the > >> classpath. > >> >> > >> >> Available factory identifiers are: > >> >> > >> >> blackhole > >> >> hbase-1.4 > >> >> jdbc > >> >> kafka > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile$.main(FromKafkaSinkHiveByFile.scala:68) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > cn.ibobei.qile.dataflow.sql.FromKafkaSinkHiveByFile.main(FromKafkaSinkHiveByFile.scala) > >> >> ~[qile-data-flow-1.0.jar:?] > >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> >> ~[?:1.8.0_161] > >> >> at java.lang.reflect.Method.invoke(Method.java:498) > >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> ... 10 more > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> query: > >> >> > >> >> > >> >> > >> >> > >> >> val streamExecutionEnv = > >> StreamExecutionEnvironment.getExecutionEnvironment > >> >> > >> >> > >> > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > >> >> streamExecutionEnv.enableCheckpointing(5 * 1000, > >> >> CheckpointingMode.EXACTLY_ONCE) > >> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * > >> 1000) > >> >> > >> >> val blinkEnvSettings = > >> >> > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >> >> val streamTableEnv = > >> StreamTableEnvironment.create(streamExecutionEnv, > >> >> blinkEnvSettings) > >> >> > >> >> > >> >> > >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> | > >> >> |CREATE TABLE hive_table ( > >> >> | user_id STRING, > >> >> | age INT > >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet > >> >> TBLPROPERTIES ( > >> >> | 'connector'='filesystem', > >> >> | 'partition.time-extractor.timestamp-pattern'='$dt > $hr:00:00', > >> >> | 'sink.partition-commit.delay'='0s', > >> >> | > 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> |) > >> >> | > >> >> |""".stripMargin) > >> >> > >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |CREATE TABLE kafka_table ( > >> >> | uid VARCHAR, > >> >> | -- uid BIGINT, > >> >> | sex VARCHAR, > >> >> | age INT, > >> >> | created_time TIMESTAMP(3), > >> >> | WATERMARK FOR created_time as created_time - INTERVAL > '3' > >> >> SECOND > >> >> |) WITH ( > >> >> | 'connector.type' = 'kafka', > >> >> | 'connector.version' = 'universal', > >> >> | 'connector.topic' = 'user', > >> >> | -- 'connector.topic' = 'user_long', > >> >> | 'connector.startup-mode' = 'latest-offset', > >> >> | 'connector.properties.zookeeper.connect' = > >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> >> | 'connector.properties.bootstrap.servers' = > >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> >> | 'connector.properties.group.id' = 'user_flink', > >> >> | 'format.type' = 'json', > >> >> | 'format.derive-schema' = 'true' > >> >> |) > >> >> |""".stripMargin) > >> >> > >> >> > >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> >> > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |INSERT INTO hive_table > >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > >> >> DATE_FORMAT(created_time, 'HH') > >> >> |FROM kafka_table > >> >> | > >> >> |""".stripMargin) > >> >> > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='18' > >> >> | > >> >> |""".stripMargin) > >> >> .print() > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> 在 2020-07-13 17:52:54,"Jingsong Li" <[hidden email]> 写道: > >> >> >你把完整的程序再贴下呢 > >> >> > > >> >> >Best, > >> >> >Jingsong > >> >> > > >> >> >On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[hidden email]> > wrote: > >> >> > > >> >> >> Hi, > >> >> >> > >> >> >> > >> >> >> 我现在改成了: > >> >> >> 'sink.partition-commit.delay'='0s' > >> >> >> > >> >> >> > >> >> >> checkpoint完成了20多次,hdfs文件也产生了20多个, > >> >> >> hive表还是查不到数据 > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> 在 2020-07-13 17:23:34,"夏帅" <[hidden email]> 写道: > >> >> >> > >> >> >> 你好, > >> >> >> 你设置了1个小时的 > >> >> >> SINK_PARTITION_COMMIT_DELAY > >> >> >> > >> >> >> > >> >> >> ------------------------------------------------------------------ > >> >> >> 发件人:Zhou Zach <[hidden email]> > >> >> >> 发送时间:2020年7月13日(星期一) 17:09 > >> >> >> 收件人:user-zh <[hidden email]> > >> >> >> 主 题:Re:Re: Re: Table options do not contain an option key > 'connector' > >> >> for > >> >> >> discovering a connector. > >> >> >> > >> >> >> > >> >> >> 开了checkpoint, > >> >> >> val streamExecutionEnv = > >> >> StreamExecutionEnvironment.getExecutionEnvironment > >> >> >> > >> >> >> > >> >> > >> > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > >> >> >> streamExecutionEnv.enableCheckpointing(5 * 1000, > >> >> >> CheckpointingMode.EXACTLY_ONCE) > >> >> >> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * > >> 1000) > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> 在 2020-07-13 16:52:16,"Jingsong Li" <[hidden email]> 写道: > >> >> >> >有开checkpoint吧?delay设的多少? > >> >> >> > > >> >> >> >Add partition 在 checkpoint完成 + delay的时间后 > >> >> >> > > >> >> >> >Best, > >> >> >> >Jingsong > >> >> >> > > >> >> >> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[hidden email]> > >> wrote: > >> >> >> > > >> >> >> >> Hi, > >> >> >> >> > >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add > >> >> >> >> partition到hive表吗,我当前设置了参数 > >> >> >> >> 'sink.partition-commit.policy.kind'='metastore' > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> At 2020-07-13 15:01:28, "Jingsong Li" <[hidden email]> > >> >> wrote: > >> >> >> >> >Hi, > >> >> >> >> > > >> >> >> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog > >> >> >> >> > > >> >> >> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 > >> >> >> >> > > >> >> >> >> >Best, > >> >> >> >> >Jingsong > >> >> >> >> > > >> >> >> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[hidden email]> > >> >> wrote: > >> >> >> >> > > >> >> >> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 > >> >> >> >> >> h','sink.partition-commit.policy.kind'='success-file'); > >> >> >> >> >> 也报错误 > >> >> >> >> >> query: > >> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> >> >> >> >> streamTableEnv.executeSql( > >> >> >> >> >> """ > >> >> >> >> >> | > >> >> >> >> >> | > >> >> >> >> >> |CREATE TABLE hive_table ( > >> >> >> >> >> | user_id STRING, > >> >> >> >> >> | age INT > >> >> >> >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS > parquet > >> >> >> >> >> TBLPROPERTIES ( > >> >> >> >> >> | 'partition.time-extractor.timestamp-pattern'='$dt > >> >> $hr:00:00', > >> >> >> >> >> | 'sink.partition-commit.trigger'='partition-time', > >> >> >> >> >> | 'sink.partition-commit.delay'='1 h', > >> >> >> >> >> | > >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> >> >> >> |) > >> >> >> >> >> | > >> >> >> >> >> |""".stripMargin) > >> >> >> >> >> > >> >> >> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> >> >> >> >> streamTableEnv.executeSql( > >> >> >> >> >> """ > >> >> >> >> >> | > >> >> >> >> >> |CREATE TABLE kafka_table ( > >> >> >> >> >> | uid VARCHAR, > >> >> >> >> >> | -- uid BIGINT, > >> >> >> >> >> | sex VARCHAR, > >> >> >> >> >> | age INT, > >> >> >> >> >> | created_time TIMESTAMP(3), > >> >> >> >> >> | WATERMARK FOR created_time as created_time - > INTERVAL > >> '3' > >> >> >> >> SECOND > >> >> >> >> >> |) WITH ( > >> >> >> >> >> | 'connector.type' = 'kafka', > >> >> >> >> >> | 'connector.version' = 'universal', > >> >> >> >> >> | 'connector.topic' = 'user', > >> >> >> >> >> | -- 'connector.topic' = 'user_long', > >> >> >> >> >> | 'connector.startup-mode' = 'latest-offset', > >> >> >> >> >> | 'connector.properties.zookeeper.connect' = > >> >> >> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> >> >> >> >> | 'connector.properties.bootstrap.servers' = > >> >> >> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> >> >> >> >> | 'connector.properties.group.id' = 'user_flink', > >> >> >> >> >> | 'format.type' = 'json', > >> >> >> >> >> | 'format.derive-schema' = 'true' > >> >> >> >> >> |) > >> >> >> >> >> |""".stripMargin) > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> streamTableEnv.executeSql( > >> >> >> >> >> """ > >> >> >> >> >> | > >> >> >> >> >> |INSERT INTO hive_table > >> >> >> >> >> |SELECT uid, age, DATE_FORMAT(created_time, > 'yyyy-MM-dd'), > >> >> >> >> >> DATE_FORMAT(created_time, 'HH') > >> >> >> >> >> |FROM kafka_table > >> >> >> >> >> | > >> >> >> >> >> |""".stripMargin) > >> >> >> >> >> > >> >> >> >> >> streamTableEnv.executeSql( > >> >> >> >> >> """ > >> >> >> >> >> | > >> >> >> >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and > hr='13' > >> >> >> >> >> | > >> >> >> >> >> |""".stripMargin) > >> >> >> >> >> .print() > >> >> >> >> >> 错误栈: > >> >> >> >> >> Exception in thread "main" > >> >> >> >> org.apache.flink.table.api.ValidationException: > >> >> >> >> >> Unable to create a sink for writing table > >> >> >> >> >> 'default_catalog.default_database.hive_table'. > >> >> >> >> >> > >> >> >> >> >> Table options are: > >> >> >> >> >> > >> >> >> >> >> 'hive.storage.file-format'='parquet' > >> >> >> >> >> 'is_generic'='false' > >> >> >> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> >> >> >> >> 'sink.partition-commit.delay'='1 h' > >> >> >> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> >> >> >> 'sink.partition-commit.trigger'='partition-time' > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> >> >> >> at > >> >> >> scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> >> >> >> at > >> >> >> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> >> >> >> at > >> >> >> >> >> > >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> >> >> >> at > >> >> >> scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> >> >> >> at > >> >> >> >> >> > >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> >> >> >> at > >> >> >> >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> >> >> >> at > >> >> >> >> >> > >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) > >> >> >> >> >> at > >> >> >> >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) > >> >> >> >> >> Caused by: org.apache.flink.table.api.ValidationException: > >> Table > >> >> >> options > >> >> >> >> >> do not contain an option key 'connector' for discovering a > >> >> connector. > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > >> >> >> >> >> at > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> >> >> >> >> ... 19 more > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> > > >> >> >> >> >-- > >> >> >> >> >Best, Jingsong Lee > >> >> >> >> > >> >> >> > > >> >> >> > > >> >> >> >-- > >> >> >> >Best, Jingsong Lee > >> >> >> > >> >> > > >> >> > > >> >> >-- > >> >> >Best, Jingsong Lee > >> >> > >> > > >> > > >> >-- > >> >Best, Jingsong Lee > >> > > > > > >-- > >Best, Jingsong Lee > |
Free forum by Nabble | Edit this page |