Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下:
首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type" :2}, {"pid":"a", "val":1, "data_type": 1, "app_type" :2}] 然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": 1, "app_type" :2} 把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink()) 然后,再创建临时表,比如: tableEnv.createTemporaryView("kafkaT1", runDataSingleOutputStreamOperator, $("pid"), $("val"), $("app_type"), $("data_type")); 接着定义不同的sql,比如: String sql1 = "insert into ods_data_10 select pid, val where data_type = 1 and app_type = 0" String sql2 = "insert into ods_data_11 select pid, val where data_type = 1 and app_type = 1" String sql3 = "insert into ods_data_01 select pid, val where data_type = 0 and app_type = 1" String sql4 = "insert into ods_data_00 select pid, val where data_type = 0 and app_type = 0" 使用StatementSet运行它们: StatementSet ss = tableEnv.createStatementSet(); ss.addInsertSql(sql1); ss.addInsertSql(sql2); ss.addInsertSql(sql3); ss.addInsertSql(sql4); 最后执行作业: env.execute(jobName); 一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图: <http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png> 作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA), 作业"insert-into_myhive.dw.ods_analog_sems *******"对应的应该是写入4个表的操作(假设作业ID为jobB),如图: <http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png> 其中,顶端的operator的定义如下: Source: Custom Source -> Map -> Flat Map -> Filter -> SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et, run_data_type]) -> (Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE _UTF-16LE'BP.%')))]) -> StreamingFileWriter, Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE _UTF-16LE'BP.%')))]) -> StreamingFileWriter, Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))]) -> StreamingFileWriter, Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))]) -> StreamingFileWriter) 我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m xxxx:8081 jobA" 会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a savepoint." 相应的停止作业jobB的时候也会生成这个savepoint。 我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi. 大罗
试一下这个方法 org.apache.flink.table.api.StatementSet#execute ss.execute(); 大罗 <[hidden email]> 于2020年9月9日周三 下午3:13写道: > Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下: > > 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type" > :2}, > {"pid":"a", "val":1, "data_type": 1, "app_type" :2}] > > 然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": 1, > "app_type" :2} > > 把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink()) > > 然后,再创建临时表,比如: > tableEnv.createTemporaryView("kafkaT1", runDataSingleOutputStreamOperator, > $("pid"), $("val"), $("app_type"), $("data_type")); > > 接着定义不同的sql,比如: > String sql1 = "insert into ods_data_10 select pid, val where data_type = 1 > and app_type = 0" > String sql2 = "insert into ods_data_11 select pid, val where data_type = 1 > and app_type = 1" > String sql3 = "insert into ods_data_01 select pid, val where data_type = 0 > and app_type = 1" > String sql4 = "insert into ods_data_00 select pid, val where data_type = 0 > and app_type = 0" > > 使用StatementSet运行它们: > StatementSet ss = tableEnv.createStatementSet(); > ss.addInsertSql(sql1); > ss.addInsertSql(sql2); > ss.addInsertSql(sql3); > ss.addInsertSql(sql4); > > 最后执行作业: > env.execute(jobName); > > 一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图: > > < > http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png> > > > 作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA), > > 作业"insert-into_myhive.dw.ods_analog_sems > *******"对应的应该是写入4个表的操作(假设作业ID为jobB),如图: > > < > http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png> > > > 其中,顶端的operator的定义如下: > Source: Custom Source -> Map -> Flat Map -> Filter -> > SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et, > run_data_type]) -> > (Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') > AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE > _UTF-16LE'BP.%')))]) -> StreamingFileWriter, > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') > AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE > _UTF-16LE'BP.%')))]) -> StreamingFileWriter, > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') > AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))]) > -> > StreamingFileWriter, > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') > AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))]) > -> > StreamingFileWriter) > > 我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m xxxx:8081 jobA" > 会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a > savepoint." > 相应的停止作业jobB的时候也会生成这个savepoint。 > > 我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢? > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Administrator
|
Hi,
目前 DataStream 和 StatementSet 没法在一个 job 中提交。 社区已经注意到这个问题,见FLINK-18840 [1], 且会在 FLIP-136 [2] 中支持。 Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-18840 [2]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API?src=contextnavpagetreemode On Thu, 10 Sep 2020 at 11:03, Qishang <[hidden email]> wrote: > Hi. 大罗 > 试一下这个方法 org.apache.flink.table.api.StatementSet#execute > ss.execute(); > > 大罗 <[hidden email]> 于2020年9月9日周三 下午3:13写道: > > > Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下: > > > > 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type" > > :2}, > > {"pid":"a", "val":1, "data_type": 1, "app_type" :2}] > > > > 然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": > 1, > > "app_type" :2} > > > > 把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink()) > > > > 然后,再创建临时表,比如: > > tableEnv.createTemporaryView("kafkaT1", > runDataSingleOutputStreamOperator, > > $("pid"), $("val"), $("app_type"), $("data_type")); > > > > 接着定义不同的sql,比如: > > String sql1 = "insert into ods_data_10 select pid, val where data_type = > 1 > > and app_type = 0" > > String sql2 = "insert into ods_data_11 select pid, val where data_type = > 1 > > and app_type = 1" > > String sql3 = "insert into ods_data_01 select pid, val where data_type = > 0 > > and app_type = 1" > > String sql4 = "insert into ods_data_00 select pid, val where data_type = > 0 > > and app_type = 0" > > > > 使用StatementSet运行它们: > > StatementSet ss = tableEnv.createStatementSet(); > > ss.addInsertSql(sql1); > > ss.addInsertSql(sql2); > > ss.addInsertSql(sql3); > > ss.addInsertSql(sql4); > > > > 最后执行作业: > > env.execute(jobName); > > > > 一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图: > > > > < > > > http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png > > > > > > > > 作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA), > > > > 作业"insert-into_myhive.dw.ods_analog_sems > > *******"对应的应该是写入4个表的操作(假设作业ID为jobB),如图: > > > > < > > > http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png > > > > > > > > 其中,顶端的operator的定义如下: > > Source: Custom Source -> Map -> Flat Map -> Filter -> > > SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et, > > run_data_type]) -> > > (Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'HH') > > AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE > > _UTF-16LE'BP.%')))]) -> StreamingFileWriter, > > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'HH') > > AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE > > _UTF-16LE'BP.%')))]) -> StreamingFileWriter, > > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'HH') > > AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))]) > > -> > > StreamingFileWriter, > > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'HH') > > AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))]) > > -> > > StreamingFileWriter) > > > > 我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m xxxx:8081 jobA" > > 会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a > > savepoint." > > 相应的停止作业jobB的时候也会生成这个savepoint。 > > > > 我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢? > > > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > |
Free forum by Nabble | Edit this page |