jdbc sink无法插入数据

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

jdbc sink无法插入数据

Lucas
Hi,

我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
        .process(new ProcessFunction<RatioValue, RatioValue>() {
            @Override
            public void processElement(RatioValuevalue, Context ctx, Collector<RatioValue> out) throws Exception {
                out.collect(value);
                ctx.output(ratioOutputTag, value);
            }
        });
sideStream.addSink(new FlinkKafkaProducer<>(
        "ratio_value",
        new RatioValueSerializationSchema(suffix),
        PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), tool.get(SCHEMA_REGISTRY_URL)),
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
DataStream<RatioValue> ratioSideStream = sideStream.getSideOutput(ratioOutputTag);
ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
想问下这种情况是否有什么排查手段?


[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: jdbc sink无法插入数据

r pp
一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
去掉kafka sink ,看下 写入效果。
再对比下 加入kafka 后的效果。

一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了

[hidden email] <[hidden email]> 于2020年12月18日周五 下午2:01写道:

> Hi,
>
> 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
>         .process(new ProcessFunction<RatioValue, RatioValue>() {
>             @Override
>             public void processElement(RatioValuevalue, Context ctx,
> Collector<RatioValue> out) throws Exception {
>                 out.collect(value);
>                 ctx.output(ratioOutputTag, value);
>             }
>         });
> sideStream.addSink(new FlinkKafkaProducer<>(
>         "ratio_value",
>         new RatioValueSerializationSchema(suffix),
>         PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
> tool.get(SCHEMA_REGISTRY_URL)),
>         FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
> DataStream<RatioValue> ratioSideStream =
> sideStream.getSideOutput(ratioOutputTag);
> ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
> 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
> 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
> 想问下这种情况是否有什么排查手段?
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: jdbc sink无法插入数据

nobleyd
Hi,你这个绕太多弯路了吧。
Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。
SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。
按照你的代码,应该如下写:

sideStream.addSink(new FlinkKafkaProducer<>(
        "ratio_value",
        new RatioValueSerializationSchema(suffix),
        PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
tool.get(SCHEMA_REGISTRY_URL)),
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));

如上,针对sideStream直接添加2个sink即可。


r pp <[hidden email]> 于2020年12月19日周六 下午12:15写道:

> 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
> 去掉kafka sink ,看下 写入效果。
> 再对比下 加入kafka 后的效果。
>
> 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了
>
> [hidden email] <[hidden email]> 于2020年12月18日周五 下午2:01写道:
>
> > Hi,
> >
> > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
> >         .process(new ProcessFunction<RatioValue, RatioValue>() {
> >             @Override
> >             public void processElement(RatioValuevalue, Context ctx,
> > Collector<RatioValue> out) throws Exception {
> >                 out.collect(value);
> >                 ctx.output(ratioOutputTag, value);
> >             }
> >         });
> > sideStream.addSink(new FlinkKafkaProducer<>(
> >         "ratio_value",
> >         new RatioValueSerializationSchema(suffix),
> >         PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
> > tool.get(SCHEMA_REGISTRY_URL)),
> >         FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
> > DataStream<RatioValue> ratioSideStream =
> > sideStream.getSideOutput(ratioOutputTag);
> >
> ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
> > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
> > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
> > 想问下这种情况是否有什么排查手段?
> >
> >
> > [hidden email]
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: jdbc sink无法插入数据

Lucas
确实可行,多谢指点。



[hidden email]
 
发件人: 赵一旦
发送时间: 2020-12-20 23:24
收件人: user-zh
主题: Re: jdbc sink无法插入数据
Hi,你这个绕太多弯路了吧。
Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。
SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。
按照你的代码,应该如下写:
 
sideStream.addSink(new FlinkKafkaProducer<>(
        "ratio_value",
        new RatioValueSerializationSchema(suffix),
        PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
tool.get(SCHEMA_REGISTRY_URL)),
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
 
sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
 
如上,针对sideStream直接添加2个sink即可。
 
 
r pp <[hidden email]> 于2020年12月19日周六 下午12:15写道:
 

> 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
> 去掉kafka sink ,看下 写入效果。
> 再对比下 加入kafka 后的效果。
>
> 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了
>
> [hidden email] <[hidden email]> 于2020年12月18日周五 下午2:01写道:
>
> > Hi,
> >
> > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
> >         .process(new ProcessFunction<RatioValue, RatioValue>() {
> >             @Override
> >             public void processElement(RatioValuevalue, Context ctx,
> > Collector<RatioValue> out) throws Exception {
> >                 out.collect(value);
> >                 ctx.output(ratioOutputTag, value);
> >             }
> >         });
> > sideStream.addSink(new FlinkKafkaProducer<>(
> >         "ratio_value",
> >         new RatioValueSerializationSchema(suffix),
> >         PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
> > tool.get(SCHEMA_REGISTRY_URL)),
> >         FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
> > DataStream<RatioValue> ratioSideStream =
> > sideStream.getSideOutput(ratioOutputTag);
> >
> ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
> > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
> > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
> > 想问下这种情况是否有什么排查手段?
> >
> >
> > [hidden email]
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: jdbc sink无法插入数据

Lucas
In reply to this post by nobleyd
原因找到了,因为在JdbcSink.sink里使用了默认的JdbcExecutionOptions,里面默认的batchSize是5000,就是说数据量到这个数了再批量入库。
因为这个任务数据量少,很久才能凑到5000条,导致看起来数据没有入库的假象。



[hidden email]
 
发件人: [hidden email]
发送时间: 2020-12-21 09:05
收件人: user-zh
主题: Re: Re: jdbc sink无法插入数据
确实可行,多谢指点。
 
 
 
[hidden email]
发件人: 赵一旦
发送时间: 2020-12-20 23:24
收件人: user-zh
主题: Re: jdbc sink无法插入数据
Hi,你这个绕太多弯路了吧。
Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。
SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。
按照你的代码,应该如下写:
sideStream.addSink(new FlinkKafkaProducer<>(
        "ratio_value",
        new RatioValueSerializationSchema(suffix),
        PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
tool.get(SCHEMA_REGISTRY_URL)),
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
如上,针对sideStream直接添加2个sink即可。
r pp <[hidden email]> 于2020年12月19日周六 下午12:15写道:

> 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
> 去掉kafka sink ,看下 写入效果。
> 再对比下 加入kafka 后的效果。
>
> 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了
>
> [hidden email] <[hidden email]> 于2020年12月18日周五 下午2:01写道:
>
> > Hi,
> >
> > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
> >         .process(new ProcessFunction<RatioValue, RatioValue>() {
> >             @Override
> >             public void processElement(RatioValuevalue, Context ctx,
> > Collector<RatioValue> out) throws Exception {
> >                 out.collect(value);
> >                 ctx.output(ratioOutputTag, value);
> >             }
> >         });
> > sideStream.addSink(new FlinkKafkaProducer<>(
> >         "ratio_value",
> >         new RatioValueSerializationSchema(suffix),
> >         PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
> > tool.get(SCHEMA_REGISTRY_URL)),
> >         FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
> > DataStream<RatioValue> ratioSideStream =
> > sideStream.getSideOutput(ratioOutputTag);
> >
> ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
> > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
> > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
> > 想问下这种情况是否有什么排查手段?
> >
> >
> > [hidden email]
> >
>