你好!
我使用flink sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢? 使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度? |
Hi Asahi Lee
你需要确认下kafka topic的分区数是多少,如果是1,那就需要设置下rebalance,让每个tm都有数据流入 Asahi Lee <[hidden email]> 于2020年9月27日周日 下午6:05写道: > 你好! > 我使用flink > sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢? > 使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度? |
你好! 使用flink SQL,如何设置rebalance呢?------------------ 原始邮件 ------------------
发件人: "zilong&nbsp;xiao"<[hidden email]> 发送时间: 2020年9月27日(星期天) 晚上6:27 收件人: "user-zh"<[hidden email]>; 主题: Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出 |
flink sql似乎不能设置rebalance,在Data Stream API可以设。
一种思路是拆分sql逻辑,在Data Stream API上创建kafka source后再reblance成一张新表,再做后续dml sql。 另一种思路就是kafka topic增加一下分区 Asahi Lee <[hidden email]> 于2020年9月28日周一 下午1:56写道: > 你好! 使用flink > SQL,如何设置rebalance呢?------------------ 原始邮件 ------------------ > 发件人: "zilong&nbsp;xiao"<[hidden email]> > 发送时间: 2020年9月27日(星期天) 晚上6:27 > 收件人: "user-zh"<[hidden email]>; > 主题: Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出 |
In reply to this post by Asahi Lee
这个问题的原因应该是你的kafka partition数量应该是大于1的,并且不是所有partition都有数据导致的。
你可以检查下你的kafka topic。 目前来讲,只要你的每个kafka 的partition都有数据,那么watermark应该是可以正常产生的。跟并行度无关。 Asahi Lee <[hidden email]> 于2020年9月27日周日 下午6:05写道: > 你好! > 我使用flink > sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢? > 使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度? -- Best, Benchao Li |
这个问题我们之前使用sql窗口的时候也遇到过,当时是在1.7版本的tablesource后面加了个rebanlance算子让数据少的kafka分区的subtask watermark均衡下
发送自autohome ________________________________ 发件人: Benchao Li <[hidden email]<mailto:[hidden email]>> 发送时间: 2020-09-29 18:10:42 收件人: user-zh <[hidden email]<mailto:[hidden email]>> 主题: Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出 这个问题的原因应该是你的kafka partition数量应该是大于1的,并且不是所有partition都有数据导致的。 你可以检查下你的kafka topic。 目前来讲,只要你的每个kafka 的partition都有数据,那么watermark应该是可以正常产生的。跟并行度无关。 Asahi Lee <[hidden email]<mailto:[hidden email]>> 于2020年9月27日周日 下午6:05写道: > 你好! > 我使用flink > sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢? > 使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度? -- Best, Benchao Li |
In reply to this post by shizk233
hello,
stenv.fromDataStream(stream, $"") 请教下,如果stream中数据是org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode 类型,field应该如何设置呢? 比如: { a: 1, b: { c: "test" } } Best Wishes. shizk233 <[hidden email]> 于2020年9月28日周一 下午7:15写道: > flink sql似乎不能设置rebalance,在Data Stream API可以设。 > > 一种思路是拆分sql逻辑,在Data Stream API上创建kafka source后再reblance成一张新表,再做后续dml sql。 > > 另一种思路就是kafka topic增加一下分区 > > Asahi Lee <[hidden email]> 于2020年9月28日周一 下午1:56写道: > > > 你好! 使用flink > > SQL,如何设置rebalance呢?------------------ 原始邮件 ------------------ > > 发件人: "zilong&nbsp;xiao"<[hidden email]> > > 发送时间: 2020年9月27日(星期天) 晚上6:27 > > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出 > |
试了下一种解决方案,如下,可以调整sql并行度。
val table1: Table = stenv.sqlQuery("select * from test") val schema = table1.getSchema val table2 = stenv.fromDataStream(table1.toAppendStream[Row].map(item => Row.of(item.getField(0), item.getField(1)))(new RowTypeInfo(schema.getFieldTypes.toList.take(2).toArray, schema.getFieldNames.toList.take(2).toArray)).setParallelism(2)) Peihui He <[hidden email]> 于2020年10月14日周三 上午11:52写道: > hello, > > stenv.fromDataStream(stream, $"") > > 请教下,如果stream中数据是org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode 类型,field应该如何设置呢? > 比如: > { > a: 1, > b: { > c: "test" > } > } > > Best Wishes. > > shizk233 <[hidden email]> 于2020年9月28日周一 下午7:15写道: > >> flink sql似乎不能设置rebalance,在Data Stream API可以设。 >> >> 一种思路是拆分sql逻辑,在Data Stream API上创建kafka source后再reblance成一张新表,再做后续dml sql。 >> >> 另一种思路就是kafka topic增加一下分区 >> >> Asahi Lee <[hidden email]> 于2020年9月28日周一 下午1:56写道: >> >> > 你好! 使用flink >> > SQL,如何设置rebalance呢?------------------ 原始邮件 ------------------ >> > 发件人: "zilong&nbsp;xiao"<[hidden email]> >> > 发送时间: 2020年9月27日(星期天) 晚上6:27 >> > 收件人: "user-zh"<[hidden email]>; >> > 主题: Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出 >> > |
Free forum by Nabble | Edit this page |