flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

Asahi Lee
你好!
     我使用flink sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢?
     使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度?
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

zilong xiao
Hi Asahi Lee
 你需要确认下kafka topic的分区数是多少,如果是1,那就需要设置下rebalance,让每个tm都有数据流入

Asahi Lee <[hidden email]> 于2020年9月27日周日 下午6:05写道:

> 你好!
> &nbsp; &nbsp; &nbsp;我使用flink
> sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢?
> &nbsp; &nbsp; &nbsp;使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度?
Reply | Threaded
Open this post in threaded view
|

回复: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

Asahi Lee
你好!     使用flink SQL,如何设置rebalance呢?------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;&quot;zilong&amp;nbsp;xiao&quot;<[hidden email]&gt;
发送时间:&nbsp;2020年9月27日(星期天) 晚上6:27
收件人:&nbsp;&quot;user-zh&quot;<[hidden email]&gt;;
主题:&nbsp;Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

shizk233
flink sql似乎不能设置rebalance,在Data Stream API可以设。

一种思路是拆分sql逻辑,在Data Stream API上创建kafka source后再reblance成一张新表,再做后续dml sql。

另一种思路就是kafka topic增加一下分区

Asahi Lee <[hidden email]> 于2020年9月28日周一 下午1:56写道:

> 你好!     使用flink
> SQL,如何设置rebalance呢?------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;&quot;zilong&amp;nbsp;xiao&quot;<[hidden email]&gt;
> 发送时间:&nbsp;2020年9月27日(星期天) 晚上6:27
> 收件人:&nbsp;&quot;user-zh&quot;<[hidden email]&gt;;
> 主题:&nbsp;Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

Benchao Li-2
In reply to this post by Asahi Lee
这个问题的原因应该是你的kafka partition数量应该是大于1的,并且不是所有partition都有数据导致的。
你可以检查下你的kafka topic。
目前来讲,只要你的每个kafka 的partition都有数据,那么watermark应该是可以正常产生的。跟并行度无关。

Asahi Lee <[hidden email]> 于2020年9月27日周日 下午6:05写道:

> 你好!
> &nbsp; &nbsp; &nbsp;我使用flink
> sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢?
> &nbsp; &nbsp; &nbsp;使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度?



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

王刚
这个问题我们之前使用sql窗口的时候也遇到过,当时是在1.7版本的tablesource后面加了个rebanlance算子让数据少的kafka分区的subtask watermark均衡下

发送自autohome
________________________________
发件人: Benchao Li <[hidden email]<mailto:[hidden email]>>
发送时间: 2020-09-29 18:10:42
收件人: user-zh <[hidden email]<mailto:[hidden email]>>
主题: Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

这个问题的原因应该是你的kafka partition数量应该是大于1的,并且不是所有partition都有数据导致的。
你可以检查下你的kafka topic。
目前来讲,只要你的每个kafka 的partition都有数据,那么watermark应该是可以正常产生的。跟并行度无关。

Asahi Lee <[hidden email]<mailto:[hidden email]>> 于2020年9月27日周日 下午6:05写道:

> 你好!
> &nbsp; &nbsp; &nbsp;我使用flink
> sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢?
> &nbsp; &nbsp; &nbsp;使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度?



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

Peihui He
In reply to this post by shizk233
hello,

stenv.fromDataStream(stream, $"")
请教下,如果stream中数据是org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
类型,field应该如何设置呢?
比如:
{
 a: 1,
 b: {
  c: "test"
 }
}

Best Wishes.

shizk233 <[hidden email]> 于2020年9月28日周一 下午7:15写道:

> flink sql似乎不能设置rebalance,在Data Stream API可以设。
>
> 一种思路是拆分sql逻辑,在Data Stream API上创建kafka source后再reblance成一张新表,再做后续dml sql。
>
> 另一种思路就是kafka topic增加一下分区
>
> Asahi Lee <[hidden email]> 于2020年9月28日周一 下午1:56写道:
>
> > 你好!     使用flink
> > SQL,如何设置rebalance呢?------------------&nbsp;原始邮件&nbsp;------------------
> > 发件人:&nbsp;&quot;zilong&amp;nbsp;xiao&quot;<[hidden email]&gt;
> > 发送时间:&nbsp;2020年9月27日(星期天) 晚上6:27
> > 收件人:&nbsp;&quot;user-zh&quot;<[hidden email]&gt;;
> > 主题:&nbsp;Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出
>
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

Peihui He
试了下一种解决方案,如下,可以调整sql并行度。

val table1: Table = stenv.sqlQuery("select * from test")
val schema = table1.getSchema

val table2 = stenv.fromDataStream(table1.toAppendStream[Row].map(item
=> Row.of(item.getField(0), item.getField(1)))(new
RowTypeInfo(schema.getFieldTypes.toList.take(2).toArray,
schema.getFieldNames.toList.take(2).toArray)).setParallelism(2))


Peihui He <[hidden email]> 于2020年10月14日周三 上午11:52写道:

> hello,
>
> stenv.fromDataStream(stream, $"")
>
> 请教下,如果stream中数据是org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode 类型,field应该如何设置呢?
> 比如:
> {
>  a: 1,
>  b: {
>   c: "test"
>  }
> }
>
> Best Wishes.
>
> shizk233 <[hidden email]> 于2020年9月28日周一 下午7:15写道:
>
>> flink sql似乎不能设置rebalance,在Data Stream API可以设。
>>
>> 一种思路是拆分sql逻辑,在Data Stream API上创建kafka source后再reblance成一张新表,再做后续dml sql。
>>
>> 另一种思路就是kafka topic增加一下分区
>>
>> Asahi Lee <[hidden email]> 于2020年9月28日周一 下午1:56写道:
>>
>> > 你好!     使用flink
>> > SQL,如何设置rebalance呢?------------------&nbsp;原始邮件&nbsp;------------------
>> > 发件人:&nbsp;&quot;zilong&amp;nbsp;xiao&quot;<[hidden email]&gt;
>> > 发送时间:&nbsp;2020年9月27日(星期天) 晚上6:27
>> > 收件人:&nbsp;&quot;user-zh&quot;<[hidden email]&gt;;
>> > 主题:&nbsp;Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出
>>
>