FlinkKafkaProducer没有写入多个topic的功能

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaProducer没有写入多个topic的功能

丁浩浩
我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?



[hidden email]
Reply | Threaded
Open this post in threaded view
|

回复:FlinkKafkaProducer没有写入多个topic的功能

Shuai Xia
你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
  override def serialize(element: DemoBean, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, element.getValue)
  }
}
------------------------------------------------------------------
发件人:[hidden email] <[hidden email]>
发送时间:2020年7月8日(星期三) 10:59
收件人:user-zh <[hidden email]>
主 题:FlinkKafkaProducer没有写入多个topic的功能

我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?



[hidden email]

Reply | Threaded
Open this post in threaded view
|

Re:回复:FlinkKafkaProducer没有写入多个topic的功能

丁浩浩
兄弟,感谢
















在 2020-07-08 11:04:40,"夏帅" <[hidden email]> 写道:

你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
override def serialize(element: DemoBean, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, element.getValue)
  }
}
------------------------------------------------------------------
发件人:[hidden email] <[hidden email]>
发送时间:2020年7月8日(星期三) 10:59
收件人:user-zh <[hidden email]>
主 题:FlinkKafkaProducer没有写入多个topic的功能


我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?



[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaProducer没有写入多个topic的功能

Leonard Xu
Hi,

夏帅的方案是ok的,因为Kafka 默认支持写入topic不存在时自动创建[1], 这个配置是默认开启的,所以只用实现下自定义KafkaSerializationSchema就可以满足你的需求。

祝好,
Leonard Xu

[1] https://docs.confluent.io/current/installation/configuration/broker-configs.html#auto.create.topics.enable <https://docs.confluent.io/current/installation/configuration/broker-configs.html#auto.create.topics.enable>

> 在 2020年7月8日,11:08,flink小猪 <[hidden email]> 写道:
>
> 兄弟,感谢
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-08 11:04:40,"夏帅" <[hidden email]> 写道:
>
> 你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
> class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
> override def serialize(element: DemoBean, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
> new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, element.getValue)
>  }
> }
> ------------------------------------------------------------------
> 发件人:[hidden email] <[hidden email]>
> 发送时间:2020年7月8日(星期三) 10:59
> 收件人:user-zh <[hidden email]>
> 主 题:FlinkKafkaProducer没有写入多个topic的功能
>
>
> 我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
> 但是FlinkKafkaProducer好像只能写入一个主题里面?
>
>
>
> [hidden email]
>
>