关于配置关联初始化方案的实现问题

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

关于配置关联初始化方案的实现问题

javenjiangfsof
Hi 社区的各位
  最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc + broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
  1.初始化通过jdbc获取,通过fromCollection处理后,union cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么…
  2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…)
  3.更好的方案???


  目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举?
  希望能看到各位的回复,感谢
Reply | Threaded
Open this post in threaded view
|

Re: 关于配置关联初始化方案的实现问题

nobleyd
FlinkSQL ?

javenjiangfsof <[hidden email]> 于2021年2月1日周一 上午11:40写道:

> Hi 社区的各位
>
> 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
> +
> broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
>   1.初始化通过jdbc获取,通过fromCollection处理后,union
> cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么…
>
> 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…)
>   3.更好的方案???
>
>
>   目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+
> liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举?
>   希望能看到各位的回复,感谢
Reply | Threaded
Open this post in threaded view
|

Re: 关于配置关联初始化方案的实现问题

javenjiangfsof
In reply to this post by javenjiangfsof
DataStream API,像下面这样
```
val list = ...   //i use jdbc to get the init data
val dimensionInitStream = env.fromCollection(list)
val dimension =
dimensionStream.union(dimensionInitStream).broadcast(descriptor)
mainStream.connect(dimensionStream)
...
```
注:我怕没描述清楚问题:问题在于主流和广播流关联的时候,广播流不一定会先于主流到,导致关联不到配置

在 2021年2月1日 13:30,赵一旦<[hidden email]> 写道:


FlinkSQL ? javenjiangfsof <[hidden email]> 于2021年2月1日周一 上午11:40写道: > Hi 社区的各位 > > 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc > + > broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案: > 1.初始化通过jdbc获取,通过fromCollection处理后,union > cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么… > > 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…) > 3.更好的方案??? > > > 目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ > liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举? > 希望能看到各位的回复,感谢
Reply | Threaded
Open this post in threaded view
|

Re: 关于配置关联初始化方案的实现问题

nobleyd
我没发现官网没看到DataStream有cdc的connector貌似,你是怎么搞的呢。


javenjiangfsof <[hidden email]> 于2021年2月1日周一 下午1:40写道:

> DataStream API,像下面这样
> ```
> val list = ...   //i use jdbc to get the init data
> val dimensionInitStream = env.fromCollection(list)
> val dimension =
> dimensionStream.union(dimensionInitStream).broadcast(descriptor)
> mainStream.connect(dimensionStream)
> ...
> ```
> 注:我怕没描述清楚问题:问题在于主流和广播流关联的时候,广播流不一定会先于主流到,导致关联不到配置
>
> 在 2021年2月1日 13:30,赵一旦<[hidden email]> 写道:
>
>
> FlinkSQL ? javenjiangfsof <[hidden email]> 于2021年2月1日周一
> 上午11:40写道: > Hi 社区的各位 > >
> 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
> > + >
> broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
> > 1.初始化通过jdbc获取,通过fromCollection处理后,union >
> cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么…
> > >
> 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…)
> > 3.更好的方案??? > > > 目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ >
> liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举? > 希望能看到各位的回复,感谢
Reply | Threaded
Open this post in threaded view
|

Re: 关于配置关联初始化方案的实现问题

javenjiangfsof
In reply to this post by javenjiangfsof
官网上没有,在github上https://github.com/ververica/flink-cdc-connectors
```
SourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname("localhost") .port(3306) .databaseList("inventory") // monitor all tables under inventory database .username("flinkuser") .password("flinkpw") .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String .build();
```

在 2021年2月1日 14:07,赵一旦<[hidden email]> 写道:


我没发现官网没看到DataStream有cdc的connector貌似,你是怎么搞的呢。 javenjiangfsof <[hidden email]> 于2021年2月1日周一 下午1:40写道: > DataStream API,像下面这样 > ``` > val list = ... //i use jdbc to get the init data > val dimensionInitStream = env.fromCollection(list) > val dimension = > dimensionStream.union(dimensionInitStream).broadcast(descriptor) > mainStream.connect(dimensionStream) > ... > ``` > 注:我怕没描述清楚问题:问题在于主流和广播流关联的时候,广播流不一定会先于主流到,导致关联不到配置 > > 在 2021年2月1日 13:30,赵一旦<[hidden email]> 写道: > > > FlinkSQL ? javenjiangfsof <[hidden email]> 于2021年2月1日周一 > 上午11:40写道: > Hi 社区的各位 > > > 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc > > + > > broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案: > > 1.初始化通过jdbc获取,通过fromCollection处理后,union > > cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么… > > > > 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…) > > 3.更好的方案??? > > > 目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ > > liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举? > 希望能看到各位的回复,感谢