flink1.11使用createStatementSet报错 No operators defined in streaming topology

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.11使用createStatementSet报错 No operators defined in streaming topology

datayangl
flink版本: 1.11
使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka
代码如下:
  def main(args: Array[String]): Unit = {
    FlinkUtils.initTable()
    val tableEnv: StreamTableEnvironment = FlinkUtils.tableEnv
    val streamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv
    streamEnv.disableOperatorChaining()
    streamEnv.setParallelism(1)
    streamEnv.setMaxParallelism(1)
    CheckPointUtils.setCheckPoint(streamEnv, 120000, 60000)
    dealWithOdsDataTohive(tableEnv)
 val sqls:Map[String,String] = ConfigItem.ODS_SQL

    val ODS_TOPIC_SWITCH_ON = ConfigItem.APP_SOURCES.getOrElse("ODS2HIVE",
null).map(x => DictClass.logTypeAndTopic.getOrElse(x, "")).toSet

    val filledAllSqlsTable = sqls.map(x=>{
      val hiveMapTopic = hiveTableMapTopic
      val topicName = hiveMapTopic.getOrElse(x._1,null)
      val topic = if(ODS_TOPIC_SWITCH_ON.contains(topicName)) topicName else
null
      (x._1,topic,x._2)
    }).filter(x=>StringUtils.isNotEmpty(x._2)).map(x=>{
      val sql = fillTemplate(x._1,x._2,x._3)
      tableEnv.executeSql(sql)
      x._1
    })
    HiveUtils.initHiveCatalog("tsgz","catalogName", tableEnv)
    val stmtSet = tableEnv.createStatementSet()
    val allInsertSqls = filledAllSqlsTable.map(table=>{
      s"insert into tsgz.${table} select * from
default_catalog.default_database.${table}"
    }).toList
    allInsertSqls.foreach(x=>{
      stmtSet.addInsertSql(x)
    })
    val insertTaskStatus = stmtSet.execute()
    //insertTaskStatus.print()
    println(insertTaskStatus.getJobClient.get().getJobStatus())
}
  /**
   * 填充kafka sql映射表的模板内容
   * */
  def fillTemplate(tableName:String, topicName:String, fields:String)={
    val kafkaHost = ConfigItem.KAFKA_BOOTSTRAP_SERVERS
    val filled = s"create table ${tableName} (${fields}) with ('connector' =
'kafka','topic' = '${topicName}','properties.bootstrap.servers' =
'${kafkaHost}','properties.group.id' = 'OdsDataToHive1','format' =
'json','scan.startup.mode' = 'latest-offset')"

    filled
  }

执行后报错
Exception in thread "main" java.lang.IllegalStateException: No operators
defined in streaming topology. Cannot generate StreamGraph.
        at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
        at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703)
        at
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
        at
com.etl.chaitin.main.OdsDataToHive$.dealWithOdsDataTohive(OdsDataToHive.scala:54)
        at com.etl.chaitin.main.OdsDataToHive$.main(OdsDataToHive.scala:21)
        at com.etl.chaitin.main.OdsDataToHive.main(OdsDataToHive.scala)

报错位置为 val insertTaskStatus = stmtSet.execute() 这一行。


参考资料:https://www.bookstack.cn/read/flink-1.11.1-zh/dc487098ce87ed44.md




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11使用createStatementSet报错 No operators defined in streaming topology

Evan
我的也是flink 1.11.0版本的,也是使用的stmtSet.execute()方式,是可以正常运行的,你可以debug检查一下你要执行的SQL语句



 
发件人: datayangl
发送时间: 2021-01-14 16:13
收件人: user-zh
主题: flink1.11使用createStatementSet报错 No operators defined in streaming topology
flink版本: 1.11
使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka
代码如下:
  def main(args: Array[String]): Unit = {
    FlinkUtils.initTable()
    val tableEnv: StreamTableEnvironment = FlinkUtils.tableEnv
    val streamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv
    streamEnv.disableOperatorChaining()
    streamEnv.setParallelism(1)
    streamEnv.setMaxParallelism(1)
    CheckPointUtils.setCheckPoint(streamEnv, 120000, 60000)
    dealWithOdsDataTohive(tableEnv)
val sqls:Map[String,String] = ConfigItem.ODS_SQL
 
    val ODS_TOPIC_SWITCH_ON = ConfigItem.APP_SOURCES.getOrElse("ODS2HIVE",
null).map(x => DictClass.logTypeAndTopic.getOrElse(x, "")).toSet
 
    val filledAllSqlsTable = sqls.map(x=>{
      val hiveMapTopic = hiveTableMapTopic
      val topicName = hiveMapTopic.getOrElse(x._1,null)
      val topic = if(ODS_TOPIC_SWITCH_ON.contains(topicName)) topicName else
null
      (x._1,topic,x._2)
    }).filter(x=>StringUtils.isNotEmpty(x._2)).map(x=>{
      val sql = fillTemplate(x._1,x._2,x._3)
      tableEnv.executeSql(sql)
      x._1
    })
    HiveUtils.initHiveCatalog("tsgz","catalogName", tableEnv)
    val stmtSet = tableEnv.createStatementSet()
    val allInsertSqls = filledAllSqlsTable.map(table=>{
      s"insert into tsgz.${table} select * from
default_catalog.default_database.${table}"
    }).toList
    allInsertSqls.foreach(x=>{
      stmtSet.addInsertSql(x)
    })
    val insertTaskStatus = stmtSet.execute()
    //insertTaskStatus.print()
    println(insertTaskStatus.getJobClient.get().getJobStatus())
}
  /**
   * 填充kafka sql映射表的模板内容
   * */
  def fillTemplate(tableName:String, topicName:String, fields:String)={
    val kafkaHost = ConfigItem.KAFKA_BOOTSTRAP_SERVERS
    val filled = s"create table ${tableName} (${fields}) with ('connector' =
'kafka','topic' = '${topicName}','properties.bootstrap.servers' =
'${kafkaHost}','properties.group.id' = 'OdsDataToHive1','format' =
'json','scan.startup.mode' = 'latest-offset')"
 
    filled
  }
 
执行后报错
Exception in thread "main" java.lang.IllegalStateException: No operators
defined in streaming topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703)
at
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
at
com.etl.chaitin.main.OdsDataToHive$.dealWithOdsDataTohive(OdsDataToHive.scala:54)
at com.etl.chaitin.main.OdsDataToHive$.main(OdsDataToHive.scala:21)
at com.etl.chaitin.main.OdsDataToHive.main(OdsDataToHive.scala)
 
报错位置为 val insertTaskStatus = stmtSet.execute() 这一行。
 
 
参考资料:https://www.bookstack.cn/read/flink-1.11.1-zh/dc487098ce87ed44.md
 
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/