|
版本:flink1.11
场景描述:flink sql注册kafka映射表,读取数据实时写入hive
报错:No operators defined in streaming topology. Cannot generate StreamGraph.
具体代码:
val flinkKafkaSqlSouce: String =
s"""create table slog(
|`f1` string,
|`f2` string,
|`f3` string,
|`f4` string,
|`f5` string,
|collect_date string
|) with (
| 'connector' = 'kafka',
| 'topic' = 'kafka_table',
| 'properties.bootstrap.servers' = '${kafkaHost}',
| 'properties.group.id' = 'DwdSecurityLogToHive',
| 'format' = 'json',
| 'scan.startup.mode' = 'earliest-offset'
|)
|""”.stripMargin
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT)
tableEnv.executeSql(flinkKafkaSqlSouce)
println(tableEnv.getCurrentCatalog +"."+tableEnv.getCurrentDatabase)
HiveUtils.initHiveCatalog("tsgz","catalogName", tableEnv)//我们自己写的工具类,就是注册hive catalog的
//tableEnv.registerTable("security_log", kafkaData)
//println(kafkaData.toString)
tableEnv.executeSql(s"insert into table tsgz.dwd_security_log select * from default_catalog.default_database.security_log" )
|