flink版本: 1.11
使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka 代码如下: def main(args: Array[String]): Unit = { FlinkUtils.initTable() val tableEnv: StreamTableEnvironment = FlinkUtils.tableEnv val streamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv streamEnv.disableOperatorChaining() streamEnv.setParallelism(1) streamEnv.setMaxParallelism(1) CheckPointUtils.setCheckPoint(streamEnv, 120000, 60000) dealWithOdsDataTohive(tableEnv) val sqls:Map[String,String] = ConfigItem.ODS_SQL val ODS_TOPIC_SWITCH_ON = ConfigItem.APP_SOURCES.getOrElse("ODS2HIVE", null).map(x => DictClass.logTypeAndTopic.getOrElse(x, "")).toSet val filledAllSqlsTable = sqls.map(x=>{ val hiveMapTopic = hiveTableMapTopic val topicName = hiveMapTopic.getOrElse(x._1,null) val topic = if(ODS_TOPIC_SWITCH_ON.contains(topicName)) topicName else null (x._1,topic,x._2) }).filter(x=>StringUtils.isNotEmpty(x._2)).map(x=>{ val sql = fillTemplate(x._1,x._2,x._3) tableEnv.executeSql(sql) x._1 }) HiveUtils.initHiveCatalog("tsgz","catalogName", tableEnv) val stmtSet = tableEnv.createStatementSet() val allInsertSqls = filledAllSqlsTable.map(table=>{ s"insert into tsgz.${table} select * from default_catalog.default_database.${table}" }).toList allInsertSqls.foreach(x=>{ stmtSet.addInsertSql(x) }) val insertTaskStatus = stmtSet.execute() //insertTaskStatus.print() println(insertTaskStatus.getJobClient.get().getJobStatus()) } /** * 填充kafka sql映射表的模板内容 * */ def fillTemplate(tableName:String, topicName:String, fields:String)={ val kafkaHost = ConfigItem.KAFKA_BOOTSTRAP_SERVERS val filled = s"create table ${tableName} (${fields}) with ('connector' = 'kafka','topic' = '${topicName}','properties.bootstrap.servers' = '${kafkaHost}','properties.group.id' = 'OdsDataToHive1','format' = 'json','scan.startup.mode' = 'latest-offset')" filled } 执行后报错 Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703) at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97) at com.etl.chaitin.main.OdsDataToHive$.dealWithOdsDataTohive(OdsDataToHive.scala:54) at com.etl.chaitin.main.OdsDataToHive$.main(OdsDataToHive.scala:21) at com.etl.chaitin.main.OdsDataToHive.main(OdsDataToHive.scala) 报错位置为 val insertTaskStatus = stmtSet.execute() 这一行。 参考资料:https://www.bookstack.cn/read/flink-1.11.1-zh/dc487098ce87ed44.md -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
我的也是flink 1.11.0版本的,也是使用的stmtSet.execute()方式,是可以正常运行的,你可以debug检查一下你要执行的SQL语句
发件人: datayangl 发送时间: 2021-01-14 16:13 收件人: user-zh 主题: flink1.11使用createStatementSet报错 No operators defined in streaming topology flink版本: 1.11 使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka 代码如下: def main(args: Array[String]): Unit = { FlinkUtils.initTable() val tableEnv: StreamTableEnvironment = FlinkUtils.tableEnv val streamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv streamEnv.disableOperatorChaining() streamEnv.setParallelism(1) streamEnv.setMaxParallelism(1) CheckPointUtils.setCheckPoint(streamEnv, 120000, 60000) dealWithOdsDataTohive(tableEnv) val sqls:Map[String,String] = ConfigItem.ODS_SQL val ODS_TOPIC_SWITCH_ON = ConfigItem.APP_SOURCES.getOrElse("ODS2HIVE", null).map(x => DictClass.logTypeAndTopic.getOrElse(x, "")).toSet val filledAllSqlsTable = sqls.map(x=>{ val hiveMapTopic = hiveTableMapTopic val topicName = hiveMapTopic.getOrElse(x._1,null) val topic = if(ODS_TOPIC_SWITCH_ON.contains(topicName)) topicName else null (x._1,topic,x._2) }).filter(x=>StringUtils.isNotEmpty(x._2)).map(x=>{ val sql = fillTemplate(x._1,x._2,x._3) tableEnv.executeSql(sql) x._1 }) HiveUtils.initHiveCatalog("tsgz","catalogName", tableEnv) val stmtSet = tableEnv.createStatementSet() val allInsertSqls = filledAllSqlsTable.map(table=>{ s"insert into tsgz.${table} select * from default_catalog.default_database.${table}" }).toList allInsertSqls.foreach(x=>{ stmtSet.addInsertSql(x) }) val insertTaskStatus = stmtSet.execute() //insertTaskStatus.print() println(insertTaskStatus.getJobClient.get().getJobStatus()) } /** * 填充kafka sql映射表的模板内容 * */ def fillTemplate(tableName:String, topicName:String, fields:String)={ val kafkaHost = ConfigItem.KAFKA_BOOTSTRAP_SERVERS val filled = s"create table ${tableName} (${fields}) with ('connector' = 'kafka','topic' = '${topicName}','properties.bootstrap.servers' = '${kafkaHost}','properties.group.id' = 'OdsDataToHive1','format' = 'json','scan.startup.mode' = 'latest-offset')" filled } 执行后报错 Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703) at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97) at com.etl.chaitin.main.OdsDataToHive$.dealWithOdsDataTohive(OdsDataToHive.scala:54) at com.etl.chaitin.main.OdsDataToHive$.main(OdsDataToHive.scala:21) at com.etl.chaitin.main.OdsDataToHive.main(OdsDataToHive.scala) 报错位置为 val insertTaskStatus = stmtSet.execute() 这一行。 参考资料:https://www.bookstack.cn/read/flink-1.11.1-zh/dc487098ce87ed44.md -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |