Hi,
我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下 .process(new ProcessFunction<RatioValue, RatioValue>() { @Override public void processElement(RatioValuevalue, Context ctx, Collector<RatioValue> out) throws Exception { out.collect(value); ctx.output(ratioOutputTag, value); } }); sideStream.addSink(new FlinkKafkaProducer<>( "ratio_value", new RatioValueSerializationSchema(suffix), PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), tool.get(SCHEMA_REGISTRY_URL)), FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); DataStream<RatioValue> ratioSideStream = sideStream.getSideOutput(ratioOutputTag); ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。 想问下这种情况是否有什么排查手段? [hidden email] |
一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
去掉kafka sink ,看下 写入效果。 再对比下 加入kafka 后的效果。 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了 [hidden email] <[hidden email]> 于2020年12月18日周五 下午2:01写道: > Hi, > > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下 > .process(new ProcessFunction<RatioValue, RatioValue>() { > @Override > public void processElement(RatioValuevalue, Context ctx, > Collector<RatioValue> out) throws Exception { > out.collect(value); > ctx.output(ratioOutputTag, value); > } > }); > sideStream.addSink(new FlinkKafkaProducer<>( > "ratio_value", > new RatioValueSerializationSchema(suffix), > PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), > tool.get(SCHEMA_REGISTRY_URL)), > FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); > DataStream<RatioValue> ratioSideStream = > sideStream.getSideOutput(ratioOutputTag); > ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。 > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。 > 想问下这种情况是否有什么排查手段? > > > [hidden email] > |
Hi,你这个绕太多弯路了吧。
Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。 SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。 按照你的代码,应该如下写: sideStream.addSink(new FlinkKafkaProducer<>( "ratio_value", new RatioValueSerializationSchema(suffix), PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), tool.get(SCHEMA_REGISTRY_URL)), FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); 如上,针对sideStream直接添加2个sink即可。 r pp <[hidden email]> 于2020年12月19日周六 下午12:15写道: > 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢? > 去掉kafka sink ,看下 写入效果。 > 再对比下 加入kafka 后的效果。 > > 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了 > > [hidden email] <[hidden email]> 于2020年12月18日周五 下午2:01写道: > > > Hi, > > > > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下 > > .process(new ProcessFunction<RatioValue, RatioValue>() { > > @Override > > public void processElement(RatioValuevalue, Context ctx, > > Collector<RatioValue> out) throws Exception { > > out.collect(value); > > ctx.output(ratioOutputTag, value); > > } > > }); > > sideStream.addSink(new FlinkKafkaProducer<>( > > "ratio_value", > > new RatioValueSerializationSchema(suffix), > > PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), > > tool.get(SCHEMA_REGISTRY_URL)), > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); > > DataStream<RatioValue> ratioSideStream = > > sideStream.getSideOutput(ratioOutputTag); > > > ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); > > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。 > > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。 > > 想问下这种情况是否有什么排查手段? > > > > > > [hidden email] > > > |
确实可行,多谢指点。
[hidden email] 发件人: 赵一旦 发送时间: 2020-12-20 23:24 收件人: user-zh 主题: Re: jdbc sink无法插入数据 Hi,你这个绕太多弯路了吧。 Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。 SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。 按照你的代码,应该如下写: sideStream.addSink(new FlinkKafkaProducer<>( "ratio_value", new RatioValueSerializationSchema(suffix), PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), tool.get(SCHEMA_REGISTRY_URL)), FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); 如上,针对sideStream直接添加2个sink即可。 r pp <[hidden email]> 于2020年12月19日周六 下午12:15写道: > 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢? > 去掉kafka sink ,看下 写入效果。 > 再对比下 加入kafka 后的效果。 > > 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了 > > [hidden email] <[hidden email]> 于2020年12月18日周五 下午2:01写道: > > > Hi, > > > > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下 > > .process(new ProcessFunction<RatioValue, RatioValue>() { > > @Override > > public void processElement(RatioValuevalue, Context ctx, > > Collector<RatioValue> out) throws Exception { > > out.collect(value); > > ctx.output(ratioOutputTag, value); > > } > > }); > > sideStream.addSink(new FlinkKafkaProducer<>( > > "ratio_value", > > new RatioValueSerializationSchema(suffix), > > PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), > > tool.get(SCHEMA_REGISTRY_URL)), > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); > > DataStream<RatioValue> ratioSideStream = > > sideStream.getSideOutput(ratioOutputTag); > > > ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); > > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。 > > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。 > > 想问下这种情况是否有什么排查手段? > > > > > > [hidden email] > > > |
In reply to this post by nobleyd
原因找到了,因为在JdbcSink.sink里使用了默认的JdbcExecutionOptions,里面默认的batchSize是5000,就是说数据量到这个数了再批量入库。
因为这个任务数据量少,很久才能凑到5000条,导致看起来数据没有入库的假象。 [hidden email] 发件人: [hidden email] 发送时间: 2020-12-21 09:05 收件人: user-zh 主题: Re: Re: jdbc sink无法插入数据 确实可行,多谢指点。 [hidden email] 发件人: 赵一旦 发送时间: 2020-12-20 23:24 收件人: user-zh 主题: Re: jdbc sink无法插入数据 Hi,你这个绕太多弯路了吧。 Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。 SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。 按照你的代码,应该如下写: sideStream.addSink(new FlinkKafkaProducer<>( "ratio_value", new RatioValueSerializationSchema(suffix), PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), tool.get(SCHEMA_REGISTRY_URL)), FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); 如上,针对sideStream直接添加2个sink即可。 r pp <[hidden email]> 于2020年12月19日周六 下午12:15写道: > 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢? > 去掉kafka sink ,看下 写入效果。 > 再对比下 加入kafka 后的效果。 > > 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了 > > [hidden email] <[hidden email]> 于2020年12月18日周五 下午2:01写道: > > > Hi, > > > > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下 > > .process(new ProcessFunction<RatioValue, RatioValue>() { > > @Override > > public void processElement(RatioValuevalue, Context ctx, > > Collector<RatioValue> out) throws Exception { > > out.collect(value); > > ctx.output(ratioOutputTag, value); > > } > > }); > > sideStream.addSink(new FlinkKafkaProducer<>( > > "ratio_value", > > new RatioValueSerializationSchema(suffix), > > PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), > > tool.get(SCHEMA_REGISTRY_URL)), > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); > > DataStream<RatioValue> ratioSideStream = > > sideStream.getSideOutput(ratioOutputTag); > > > ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); > > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。 > > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。 > > 想问下这种情况是否有什么排查手段? > > > > > > [hidden email] > > > |
Free forum by Nabble | Edit this page |