StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.getConfig().addConfiguration( new Configuration() .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(30))); tEnv.executeSql("CREATE TEMPORARY FUNCTION TestFunca AS 'org.example.flink.TestFunca' LANGUAGE JAVA"); tEnv.executeSql("CREATE TABLE datagen (\n" + " name STRING,\n" + " pass STRING,\n" + " type1 INT,\n" + " t1 STRING,\n" + " t2 STRING,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.type1.min'='1',\n" + " 'fields.type1.max'='10'\n" + ")"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.executeSql("CREATE TABLE hive_table (\n" + " user_id STRING,\n" + " order_amount STRING\n" + ") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + " 'sink.partition-commit.delay'='1 h',\n" + " 'sink.partition-commit.policy.kind'='metastore,success-file'\n" + ")"); tEnv.executeSql("insert into hive_table select t1,t2,TestFunca(type1),TestFunca(type1) from datagen"); Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) ... 18 more 发送自 Windows 10 版邮件应用 |
Hi McClone:
You should register a hive catalog first. Hive factory is only work for catalog. You can refer to HiveTableSinkITCase#testStreamingWrite (HiveTableSinkITCase.java). Hope this can help you! Best, Hailong Wang At 2020-10-16 20:31:55, "McClone" <[hidden email]> wrote: >StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >env.disableOperatorChaining(); >StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); >tEnv.getConfig().addConfiguration( > new Configuration() > .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(30))); >tEnv.executeSql("CREATE TEMPORARY FUNCTION TestFunca AS 'org.example.flink.TestFunca' LANGUAGE JAVA"); >tEnv.executeSql("CREATE TABLE datagen (\n" + > " name STRING,\n" + > " pass STRING,\n" + > " type1 INT,\n" + > " t1 STRING,\n" + > " t2 STRING,\n" + > " ts AS localtimestamp,\n" + > " WATERMARK FOR ts AS ts\n" + > ") WITH (\n" + > " 'connector' = 'datagen',\n" + > " 'rows-per-second'='1',\n" + > " 'fields.type1.min'='1',\n" + > " 'fields.type1.max'='10'\n" + > ")"); > >tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >tEnv.executeSql("CREATE TABLE hive_table (\n" + > " user_id STRING,\n" + > " order_amount STRING\n" + > ") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (\n" + > " 'sink.partition-commit.trigger'='partition-time',\n" + > " 'sink.partition-commit.delay'='1 h',\n" + > " 'sink.partition-commit.policy.kind'='metastore,success-file'\n" + > ")"); > >tEnv.executeSql("insert into hive_table select t1,t2,TestFunca(type1),TestFunca(type1) from datagen"); > >Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector. > at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > ... 18 more >发送自 Windows 10 版邮件应用 > |
Free forum by Nabble | Edit this page |