flink sql是否能够支持将confluent schema registry注册的一个avro数据格式 的topic注册成一张table?
|
env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...)
env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...); 使用kafka进行消费,直接设置Watermark和经过flatMap()以后再设置,会产生什么样的区别和影响。 flatMap可能会将数据处理为1-N条。那么在这种情况下,还能够保证kafka的精确一次吗? |
In reply to this post by 陈帅
Hi 陈帅,
目前社区确实不支持confluent schema registry的avro格式,我们内部也是依赖schema registry来做avro schema的管理,所以,我们改动了flink-avro 的源码来支持。 主要涉及到这些地方: org.apache.flink.formats.avro.{AvroRowFormatFactory,AvroRowDeserializationSchema,AvroRowSerializationSchema} 和org.apache.flink.table.descriptors.{Avro,AvroValidator} 使用时在构建Avro时指定以下三个参数即可(见标红部分): tableEnv.connect( new Kafka() .version("universal") .topic(topic) .properties(props) ).withFormat( new Avro() .useRegistry(true) .registryUrl(KAFKA_SCHEMA_REGISTRY_URL_ADDRESS) .registrySubject(subject) .avroSchema(avroSchemaStr) ) 陈帅 <[hidden email]> 于2019年12月18日周三 上午8:26写道: > > flink sql是否能够支持将confluent schema registry注册的一个avro数据格式 的topic注册成一张table? |
In reply to this post by 猫猫
flatmap 逻辑中,你是否对消息记录的时间处理了吗,watermark 的更新的逻辑是比前一次 watermark 的时间截要大同时非空。
猫猫 <[hidden email]> 于2019年12月18日周三 上午9:27写道: > env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...) > > env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...); > > 使用kafka进行消费,直接设置Watermark和经过flatMap()以后再设置,会产生什么样的区别和影响。 > flatMap可能会将数据处理为1-N条。那么在这种情况下,还能够保证kafka的精确一次吗? |
可能是我说的不太明确,我理解事件时间在整个流系统下只有一个。 但是读kafka时,我无法直接拿到业务数据中的event-time。因为数据可能会被拆分为多条。 我只能当做字符串取出,并设置事件时间为kafka的时间。 在进行一次消息转换后,才能获取到数据中的时间。所以我需要在flatmap以后才能正确的设置event-time 但我又需要kafka的精确一次特性。 所以问题转变为如下的问题,我能在流中重设eventTime吗?但对最初的流(kafka提交)不影响。 所以也就是之前提到的问题。 env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...) env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...); 更进一步,我能向下图一样,在不同的流过程中,设置不同的eventTime吗?并且他们能够实现整体提交。 ------------------ 原始邮件 ------------------ 发件人: "LakeShen"<[hidden email]>; 发送时间: 2019年12月18日(星期三) 下午2:10 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于直接设置Watermark和flatmap后再设置的疑问 猫猫 <[hidden email]> 于2019年12月18日周三 上午9:27写道: > env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...) > > env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...); > > 使用kafka进行消费,直接设置Watermark和经过flatMap()以后再设置,会产生什么样的区别和影响。 > flatMap可能会将数据处理为1-N条。那么在这种情况下,还能够保证kafka的精确一次吗? |
图片不能粘贴,放到github上面了。
https://github.com/maobuji/Blog/blob/master/image/changeEventTime.jpg ------------------ 原始邮件 ------------------ 发件人: "猫猫"<[hidden email]>; 发送时间: 2019年12月18日(星期三) 下午4:03 收件人: "user-zh"<[hidden email]>; 主题: 回复: 关于直接设置Watermark和flatmap后再设置的疑问 可能是我说的不太明确,我理解事件时间在整个流系统下只有一个。 但是读kafka时,我无法直接拿到业务数据中的event-time。因为数据可能会被拆分为多条。 我只能当做字符串取出,并设置事件时间为kafka的时间。 在进行一次消息转换后,才能获取到数据中的时间。所以我需要在flatmap以后才能正确的设置event-time 但我又需要kafka的精确一次特性。 所以问题转变为如下的问题,我能在流中重设eventTime吗?但对最初的流(kafka提交)不影响。 所以也就是之前提到的问题。 env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...) env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...); 更进一步,我能向下图一样,在不同的流过程中,设置不同的eventTime吗?并且他们能够实现整体提交。 ------------------ 原始邮件 ------------------ 发件人: "LakeShen"<[hidden email]>; 发送时间: 2019年12月18日(星期三) 下午2:10 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于直接设置Watermark和flatmap后再设置的疑问 flatmap 逻辑中,你是否对消息记录的时间处理了吗,watermark 的更新的逻辑是比前一次 watermark 的时间截要大同时非空。 猫猫 <[hidden email]> 于2019年12月18日周三 上午9:27写道: > env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...) > > env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...); > > 使用kafka进行消费,直接设置Watermark和经过flatMap()以后再设置,会产生什么样的区别和影响。 > flatMap可能会将数据处理为1-N条。那么在这种情况下,还能够保证kafka的精确一次吗? |
kafka的exactly once是通过checkpoint机制保存消费位点来保证的,和event time没关系。在进入时间窗口前提取event
time和设定watermark即可。 On Wed, Dec 18, 2019 at 4:12 PM 猫猫 <[hidden email]> wrote: > 图片不能粘贴,放到github上面了。 > https://github.com/maobuji/Blog/blob/master/image/changeEventTime.jpg > > > > > ------------------ 原始邮件 ------------------ > 发件人: "猫猫"<[hidden email]>; > 发送时间: 2019年12月18日(星期三) 下午4:03 > 收件人: "user-zh"<[hidden email]>; > > 主题: 回复: 关于直接设置Watermark和flatmap后再设置的疑问 > > > > 可能是我说的不太明确,我理解事件时间在整个流系统下只有一个。 > 但是读kafka时,我无法直接拿到业务数据中的event-time。因为数据可能会被拆分为多条。 > 我只能当做字符串取出,并设置事件时间为kafka的时间。 > > > 在进行一次消息转换后,才能获取到数据中的时间。所以我需要在flatmap以后才能正确的设置event-time > 但我又需要kafka的精确一次特性。 > > 所以问题转变为如下的问题,我能在流中重设eventTime吗?但对最初的流(kafka提交)不影响。 > 所以也就是之前提到的问题。 > env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...) > > env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...); > > > 更进一步,我能向下图一样,在不同的流过程中,设置不同的eventTime吗?并且他们能够实现整体提交。 > > > > > > ------------------ 原始邮件 ------------------ > 发件人: "LakeShen"<[hidden email]>; > 发送时间: 2019年12月18日(星期三) 下午2:10 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 关于直接设置Watermark和flatmap后再设置的疑问 > > > > flatmap 逻辑中,你是否对消息记录的时间处理了吗,watermark 的更新的逻辑是比前一次 watermark 的时间截要大同时非空。 > > 猫猫 <[hidden email]> 于2019年12月18日周三 上午9:27写道: > > > env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...) > > > > > env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...); > > > > 使用kafka进行消费,直接设置Watermark和经过flatMap()以后再设置,会产生什么样的区别和影响。 > > flatMap可能会将数据处理为1-N条。那么在这种情况下,还能够保证kafka的精确一次吗? -- Regards, DinoZhang |
In reply to this post by 朱广彬
谢谢回复,有了schema registry url为何还需要填subject和avroSchemaStr呢?
朱广彬 <[hidden email]> 于2019年12月18日周三 上午10:30写道: > Hi 陈帅, > > 目前社区确实不支持confluent schema registry的avro格式,我们内部也是依赖schema registry来做avro > schema的管理,所以,我们改动了flink-avro 的源码来支持。 > > 主要涉及到这些地方: > > org.apache.flink.formats.avro.{AvroRowFormatFactory,AvroRowDeserializationSchema,AvroRowSerializationSchema} > 和org.apache.flink.table.descriptors.{Avro,AvroValidator} > > 使用时在构建Avro时指定以下三个参数即可(见标红部分): > > tableEnv.connect( > new Kafka() > .version("universal") > .topic(topic) > .properties(props) > ).withFormat( > new Avro() > .useRegistry(true) > .registryUrl(KAFKA_SCHEMA_REGISTRY_URL_ADDRESS) > .registrySubject(subject) > .avroSchema(avroSchemaStr) > ) > > > 陈帅 <[hidden email]> 于2019年12月18日周三 上午8:26写道: > > > > flink sql是否能够支持将confluent schema registry注册的一个avro数据格式 的topic注册成一张table? > |
Hi 陈帅,
这是一个非常合理的需求。我们需要开发一个 Flink ConfluentSchemaRegistryCatalog 完成元数据的获取。社区希望的用户体验是用户只需要给出confluent schema registry的链接,Flink SQL可以通过 ConfluentSchemaRegistryCatalog自动获取读写所需的信息,不再需要用户手动写DDL和format。 社区内部已经开始讨论了,我们应该会在1.11中完成,请关注 https://issues.apache.org/jira/browse/FLINK-12256 On Wed, Dec 18, 2019 at 6:46 AM 陈帅 <[hidden email]> wrote: > 谢谢回复,有了schema registry url为何还需要填subject和avroSchemaStr呢? > > 朱广彬 <[hidden email]> 于2019年12月18日周三 上午10:30写道: > > > Hi 陈帅, > > > > 目前社区确实不支持confluent schema registry的avro格式,我们内部也是依赖schema registry来做avro > > schema的管理,所以,我们改动了flink-avro 的源码来支持。 > > > > 主要涉及到这些地方: > > > > > org.apache.flink.formats.avro.{AvroRowFormatFactory,AvroRowDeserializationSchema,AvroRowSerializationSchema} > > 和org.apache.flink.table.descriptors.{Avro,AvroValidator} > > > > 使用时在构建Avro时指定以下三个参数即可(见标红部分): > > > > tableEnv.connect( > > new Kafka() > > .version("universal") > > .topic(topic) > > .properties(props) > > ).withFormat( > > new Avro() > > .useRegistry(true) > > .registryUrl(KAFKA_SCHEMA_REGISTRY_URL_ADDRESS) > > .registrySubject(subject) > > .avroSchema(avroSchemaStr) > > ) > > > > > > 陈帅 <[hidden email]> 于2019年12月18日周三 上午8:26写道: > > > > > > flink sql是否能够支持将confluent schema registry注册的一个avro数据格式 > 的topic注册成一张table? > > > |
Free forum by Nabble | Edit this page |