回复:使用flink-sql实现mysql维表的join的ddl和dml的示列

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

回复:使用flink-sql实现mysql维表的join的ddl和dml的示列

迎风浪子
select *, PROCTIME() as proctime from source 报异常SQL parse failed. Encountered ")" at ... 
PROCTIME ()这方法好像不支持。


 
---原始邮件---
发件人: "Jark Wu"<[hidden email]&gt;
发送时间: 2019年10月11日(星期五) 晚上7:23
收件人: "user-zh"<[hidden email]&gt;;
主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列


目前 o.proctime 在 DDL 暂时还声明不了,不可以可以使用嵌套 query 生成一个 proctime 字段。

"select ...
&nbsp;from (select *, PROCTIME() as proctime from source)"

On Fri, 11 Oct 2019 at 18:45, 迎风浪子 <[hidden email]&gt; wrote:

&gt; 请问sql ddl如何定义o.proctime呢?
&gt;
&gt;
&gt;
&gt; ---原始邮件---
&gt; 发件人: "Jark Wu"<[hidden email]&amp;gt;
&gt; 发送时间: 2019年9月29日(星期日) 中午11:42
&gt; 收件人: "user-zh"<[hidden email]&amp;gt;;
&gt; 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列
&gt;
&gt;
&gt; Hi,
&gt;
&gt; mysql 维表的 DDL 和 源表/结果表 的 DDL 是一样的。 例如下面这个 DDL 的声明,rates 在 flink sql 中既可以作为
&gt; source 也可以是 sink,也可以是维表。
&gt;
&gt; CREATE TABLE rates (
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; currency VARCHAR,
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; rate BIGINT
&gt; ) WITH (
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.type' = 'jdbc', -- 使用 jdbc connector
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.url' =
&gt; 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.table' = 'rates', -- 表名
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.username' = 'root', -- 用户名
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.password' = '123456', -- 密码
&gt; )
&gt;
&gt; 如果要用它作为维表的话,那么需要用到 temporal join 的语法。如:
&gt;
&gt; SELECT
&gt; &amp;nbsp; o.amout, o.currency, r.rate, o.amount * r.rate
&gt; FROM
&gt; &amp;nbsp; Orders AS o
&gt; &amp;nbsp; JOIN rates FOR SYSTEM_TIME AS OF o.proctime AS r
&gt; &amp;nbsp; ON r.currency = o.currency
&gt;
&gt; &amp;nbsp;关于维表 join,可以查看官方文档:
&gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
&gt;
&gt;
&gt;
&gt;
&gt; On Fri, 27 Sep 2019 at 14:12, yelun <[hidden email]&amp;gt; wrote:
&gt;
&gt; &amp;gt; Hi,各位大佬:
&gt; &amp;gt;
&gt; &amp;gt; 有没有使用flink-sql实现mysql维表的join的ddl和dml的示列的demo能够参考一下,非常感谢。
Reply | Threaded
Open this post in threaded view
|

Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列

yelun
我试了一下云邪大佬的方法是可以的

> 在 2019年10月12日,上午9:16,迎风浪子 <[hidden email]> 写道:
>
> select *, PROCTIME() as proctime from source 报异常SQL parse failed. Encountered ")" at ...&nbsp;
> PROCTIME ()这方法好像不支持。
>
>
>
> ---原始邮件---
> 发件人: "Jark Wu"<[hidden email]&gt;
> 发送时间: 2019年10月11日(星期五) 晚上7:23
> 收件人: "user-zh"<[hidden email]&gt;;
> 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列
>
>
> 目前 o.proctime 在 DDL 暂时还声明不了,不可以可以使用嵌套 query 生成一个 proctime 字段。
>
> "select ...
> &nbsp;from (select *, PROCTIME() as proctime from source)"
>
> On Fri, 11 Oct 2019 at 18:45, 迎风浪子 <[hidden email]&gt; wrote:
>
> &gt; 请问sql ddl如何定义o.proctime呢?
> &gt;
> &gt;
> &gt;
> &gt; ---原始邮件---
> &gt; 发件人: "Jark Wu"<[hidden email]&amp;gt;
> &gt; 发送时间: 2019年9月29日(星期日) 中午11:42
> &gt; 收件人: "user-zh"<[hidden email]&amp;gt;;
> &gt; 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列
> &gt;
> &gt;
> &gt; Hi,
> &gt;
> &gt; mysql 维表的 DDL 和 源表/结果表 的 DDL 是一样的。 例如下面这个 DDL 的声明,rates 在 flink sql 中既可以作为
> &gt; source 也可以是 sink,也可以是维表。
> &gt;
> &gt; CREATE TABLE rates (
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; currency VARCHAR,
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; rate BIGINT
> &gt; ) WITH (
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.type' = 'jdbc', -- 使用 jdbc connector
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.url' =
> &gt; 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.table' = 'rates', -- 表名
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.username' = 'root', -- 用户名
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.password' = '123456', -- 密码
> &gt; )
> &gt;
> &gt; 如果要用它作为维表的话,那么需要用到 temporal join 的语法。如:
> &gt;
> &gt; SELECT
> &gt; &amp;nbsp; o.amout, o.currency, r.rate, o.amount * r.rate
> &gt; FROM
> &gt; &amp;nbsp; Orders AS o
> &gt; &amp;nbsp; JOIN rates FOR SYSTEM_TIME AS OF o.proctime AS r
> &gt; &amp;nbsp; ON r.currency = o.currency
> &gt;
> &gt; &amp;nbsp;关于维表 join,可以查看官方文档:
> &gt;
> &gt; https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; On Fri, 27 Sep 2019 at 14:12, yelun <[hidden email]&amp;gt; wrote:
> &gt;
> &gt; &amp;gt; Hi,各位大佬:
> &gt; &amp;gt;
> &gt; &amp;gt; 有没有使用flink-sql实现mysql维表的join的ddl和dml的示列的demo能够参考一下,非常感谢。

Reply | Threaded
Open this post in threaded view
|

Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列

yelun
请问这种创建dimtable ddl方式在flink sql-client的可以写吗,或者在注册一个catalog,谢谢。

> 在 2019年10月12日,下午2:12,yelun <[hidden email]> 写道:
>
> 我试了一下云邪大佬的方法是可以的
>
>> 在 2019年10月12日,上午9:16,迎风浪子 <[hidden email]> 写道:
>>
>> select *, PROCTIME() as proctime from source 报异常SQL parse failed. Encountered ")" at ...&nbsp;
>> PROCTIME ()这方法好像不支持。
>>
>>
>>
>> ---原始邮件---
>> 发件人: "Jark Wu"<[hidden email]&gt;
>> 发送时间: 2019年10月11日(星期五) 晚上7:23
>> 收件人: "user-zh"<[hidden email]&gt;;
>> 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列
>>
>>
>> 目前 o.proctime 在 DDL 暂时还声明不了,不可以可以使用嵌套 query 生成一个 proctime 字段。
>>
>> "select ...
>> &nbsp;from (select *, PROCTIME() as proctime from source)"
>>
>> On Fri, 11 Oct 2019 at 18:45, 迎风浪子 <[hidden email]&gt; wrote:
>>
>> &gt; 请问sql ddl如何定义o.proctime呢?
>> &gt;
>> &gt;
>> &gt;
>> &gt; ---原始邮件---
>> &gt; 发件人: "Jark Wu"<[hidden email]&amp;gt;
>> &gt; 发送时间: 2019年9月29日(星期日) 中午11:42
>> &gt; 收件人: "user-zh"<[hidden email]&amp;gt;;
>> &gt; 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列
>> &gt;
>> &gt;
>> &gt; Hi,
>> &gt;
>> &gt; mysql 维表的 DDL 和 源表/结果表 的 DDL 是一样的。 例如下面这个 DDL 的声明,rates 在 flink sql 中既可以作为
>> &gt; source 也可以是 sink,也可以是维表。
>> &gt;
>> &gt; CREATE TABLE rates (
>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; currency VARCHAR,
>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; rate BIGINT
>> &gt; ) WITH (
>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.type' = 'jdbc', -- 使用 jdbc connector
>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.url' =
>> &gt; 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.table' = 'rates', -- 表名
>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.username' = 'root', -- 用户名
>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.password' = '123456', -- 密码
>> &gt; )
>> &gt;
>> &gt; 如果要用它作为维表的话,那么需要用到 temporal join 的语法。如:
>> &gt;
>> &gt; SELECT
>> &gt; &amp;nbsp; o.amout, o.currency, r.rate, o.amount * r.rate
>> &gt; FROM
>> &gt; &amp;nbsp; Orders AS o
>> &gt; &amp;nbsp; JOIN rates FOR SYSTEM_TIME AS OF o.proctime AS r
>> &gt; &amp;nbsp; ON r.currency = o.currency
>> &gt;
>> &gt; &amp;nbsp;关于维表 join,可以查看官方文档:
>> &gt;
>> &gt; https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>> &gt;
>> &gt;
>> &gt;
>> &gt;
>> &gt; On Fri, 27 Sep 2019 at 14:12, yelun <[hidden email]&amp;gt; wrote:
>> &gt;
>> &gt; &amp;gt; Hi,各位大佬:
>> &gt; &amp;gt;
>> &gt; &amp;gt; 有没有使用flink-sql实现mysql维表的join的ddl和dml的示列的demo能够参考一下,非常感谢。
>
>



Reply | Threaded
Open this post in threaded view
|

Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列

Jark
Administrator
抱歉,SQL CLI 中目前暂不支持 DDL



> 在 2019年10月12日,15:17,yelun <[hidden email]> 写道:
>
> 请问这种创建dimtable ddl方式在flink sql-client的可以写吗,或者在注册一个catalog,谢谢。
>
>> 在 2019年10月12日,下午2:12,yelun <[hidden email]> 写道:
>>
>> 我试了一下云邪大佬的方法是可以的
>>
>>> 在 2019年10月12日,上午9:16,迎风浪子 <[hidden email]> 写道:
>>>
>>> select *, PROCTIME() as proctime from source 报异常SQL parse failed. Encountered ")" at ...&nbsp;
>>> PROCTIME ()这方法好像不支持。
>>>
>>>
>>>
>>> ---原始邮件---
>>> 发件人: "Jark Wu"<[hidden email]&gt;
>>> 发送时间: 2019年10月11日(星期五) 晚上7:23
>>> 收件人: "user-zh"<[hidden email]&gt;;
>>> 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列
>>>
>>>
>>> 目前 o.proctime 在 DDL 暂时还声明不了,不可以可以使用嵌套 query 生成一个 proctime 字段。
>>>
>>> "select ...
>>> &nbsp;from (select *, PROCTIME() as proctime from source)"
>>>
>>> On Fri, 11 Oct 2019 at 18:45, 迎风浪子 <[hidden email]&gt; wrote:
>>>
>>> &gt; 请问sql ddl如何定义o.proctime呢?
>>> &gt;
>>> &gt;
>>> &gt;
>>> &gt; ---原始邮件---
>>> &gt; 发件人: "Jark Wu"<[hidden email]&amp;gt;
>>> &gt; 发送时间: 2019年9月29日(星期日) 中午11:42
>>> &gt; 收件人: "user-zh"<[hidden email]&amp;gt;;
>>> &gt; 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列
>>> &gt;
>>> &gt;
>>> &gt; Hi,
>>> &gt;
>>> &gt; mysql 维表的 DDL 和 源表/结果表 的 DDL 是一样的。 例如下面这个 DDL 的声明,rates 在 flink sql 中既可以作为
>>> &gt; source 也可以是 sink,也可以是维表。
>>> &gt;
>>> &gt; CREATE TABLE rates (
>>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; currency VARCHAR,
>>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; rate BIGINT
>>> &gt; ) WITH (
>>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.type' = 'jdbc', -- 使用 jdbc connector
>>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.url' =
>>> &gt; 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
>>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.table' = 'rates', -- 表名
>>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.username' = 'root', -- 用户名
>>> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 'connector.password' = '123456', -- 密码
>>> &gt; )
>>> &gt;
>>> &gt; 如果要用它作为维表的话,那么需要用到 temporal join 的语法。如:
>>> &gt;
>>> &gt; SELECT
>>> &gt; &amp;nbsp; o.amout, o.currency, r.rate, o.amount * r.rate
>>> &gt; FROM
>>> &gt; &amp;nbsp; Orders AS o
>>> &gt; &amp;nbsp; JOIN rates FOR SYSTEM_TIME AS OF o.proctime AS r
>>> &gt; &amp;nbsp; ON r.currency = o.currency
>>> &gt;
>>> &gt; &amp;nbsp;关于维表 join,可以查看官方文档:
>>> &gt;
>>> &gt; https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>>> &gt;
>>> &gt;
>>> &gt;
>>> &gt;
>>> &gt; On Fri, 27 Sep 2019 at 14:12, yelun <[hidden email]&amp;gt; wrote:
>>> &gt;
>>> &gt; &amp;gt; Hi,各位大佬:
>>> &gt; &amp;gt;
>>> &gt; &amp;gt; 有没有使用flink-sql实现mysql维表的join的ddl和dml的示列的demo能够参考一下,非常感谢。
>>
>>
>
>
>

Reply | Threaded
Open this post in threaded view
|

flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复

曾耀武
In reply to this post by yelun
大家好,


请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式
访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。


我的异步Function 片段如下。


private static class SimpleAsyncFunction extends RichAsyncFunction<String, String> {
    private static final long serialVersionUID = 2098635244857937717L;

    private transient ExecutorService executorService;
    private  transient Client client ;

    private final long shutdownWaitTS=1000;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000),
                new ThreadPoolExecutor.CallerRunsPolicy());
         client= new Client();
    }

    @Override
    public void close() throws Exception {
        super.close();
        ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
    }

    @Override
    public void asyncInvoke(final String jsonStr, final ResultFuture<String> resultFuture) {
          result = client.predict(jsonStr);          resultFuture.complete(Collections.singletonList(result));}}
------------------------------
dag构建部分为:
AsyncFunction<String, String> nlpMoaAsyncFunction = new SimpleAsyncFunction();

DataStream<String> source = env.addSource(flinkKafkaConsumer010);

DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
        source,
        nlpMoaAsyncFunction,
        timeout,
        TimeUnit.MILLISECONDS,
        30);

FlinkKafkaProducer010<String> kafkaProducer = new FlinkKafkaProducer010<String>(
        ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
        new SimpleStringSchema(),
        producerProp
);

nlpResult.addSink(kafkaProducer);

-------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm 一个slot。 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题?
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复

Jark
Administrator
Hi,

从代码上看,并没有发现什么问题,给一些排查建议:
1)作业在这期间有 failover 过吗?Flink 写入 Kafka 目前默认不是端到端exactly once的。
2) “重复几十条”是怎么判断出来的呢? 会不会 kafka 源中的数据有重复呢?

Best,
Jark

On Sun, 13 Oct 2019 at 12:55, 曾耀武 <[hidden email]> wrote:

> 大家好,
>
>
> 请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式
> 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。
>
>
> 我的异步Function 片段如下。
>
>
> private static class SimpleAsyncFunction extends RichAsyncFunction<String,
> String> {
>     private static final long serialVersionUID = 2098635244857937717L;
>
>     private transient ExecutorService executorService;
>     private  transient Client client ;
>
>     private final long shutdownWaitTS=1000;
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>
>         executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
>                 new LinkedBlockingQueue<>(1000),
>                 new ThreadPoolExecutor.CallerRunsPolicy());
>          client= new Client();
>     }
>
>     @Override
>     public void close() throws Exception {
>         super.close();
>         ExecutorUtils.gracefulShutdown(shutdownWaitTS,
> TimeUnit.MILLISECONDS, executorService);
>     }
>
>     @Override
>     public void asyncInvoke(final String jsonStr, final
> ResultFuture<String> resultFuture) {
>           result = client.predict(jsonStr);
> resultFuture.complete(Collections.singletonList(result));}}
> ------------------------------
> dag构建部分为:
> AsyncFunction<String, String> nlpMoaAsyncFunction = new
> SimpleAsyncFunction();
>
> DataStream<String> source = env.addSource(flinkKafkaConsumer010);
>
> DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
>         source,
>         nlpMoaAsyncFunction,
>         timeout,
>         TimeUnit.MILLISECONDS,
>         30);
>
> FlinkKafkaProducer010<String> kafkaProducer = new
> FlinkKafkaProducer010<String>(
>         ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
>         new SimpleStringSchema(),
>         producerProp
> );
>
> nlpResult.addSink(kafkaProducer);
>
> -------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm 一个slot。
> 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题?
Reply | Threaded
Open this post in threaded view
|

回复: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复

曾耀武

作业是没有failover 。执行是正常的。但是官方文档上不是说那个是保证exactly-once么?
我稍微改了一下代码,还是有同样的重复问题,好像在new task 的时候被重复new 了多次。这个不知道是为何,
而且是在同一个slot里面。

异步函数:
然后我在执行的时候,往kafka 打了一条数据,在yarn 上起了两个  TM,每个TM 一个slot

然后在一个TM 上打印了这个日志: 我的日志是有编号的[0]  这个,一条输入是一个需要外部检测的json。

===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"}
====processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"}
=== the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0}
===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"}
====processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"}
=== the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0}


有时候这些记录在flink 里经过处理之后, 日志打印了两边,好像是处理了两遍,但是 写到下游的kafka 可能只有1个。
理论期望值是一条输入对应一条输出。

但是在一百万的测试数据上,上游发送肯定是只发送了100万条,带编号递增的。 但是下游去重之后是100万,不去重就有一百多万,可能
会多好几万的样子。比较奇怪。





------------------ 原始邮件 ------------------
发件人: "Jark Wu"<[hidden email]>;
发送时间: 2019年10月14日(星期一) 中午11:23
收件人: "user-zh"<[hidden email]>;
主题: Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复

Hi,

从代码上看,并没有发现什么问题,给一些排查建议:
1)作业在这期间有 failover 过吗?Flink 写入 Kafka 目前默认不是端到端exactly once的。
2) “重复几十条”是怎么判断出来的呢? 会不会 kafka 源中的数据有重复呢?

Best,
Jark

On Sun, 13 Oct 2019 at 12:55, 曾耀武 <[hidden email]> wrote:

> 大家好,
>
>
> 请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式
> 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。
>
>
> 我的异步Function 片段如下。
>
>
> private static class SimpleAsyncFunction extends RichAsyncFunction<String,
> String> {
>     private static final long serialVersionUID = 2098635244857937717L;
>
>     private transient ExecutorService executorService;
>     private  transient Client client ;
>
>     private final long shutdownWaitTS=1000;
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>
>         executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
>                 new LinkedBlockingQueue<>(1000),
>                 new ThreadPoolExecutor.CallerRunsPolicy());
>          client= new Client();
>     }
>
>     @Override
>     public void close() throws Exception {
>         super.close();
>         ExecutorUtils.gracefulShutdown(shutdownWaitTS,
> TimeUnit.MILLISECONDS, executorService);
>     }
>
>     @Override
>     public void asyncInvoke(final String jsonStr, final
> ResultFuture<String> resultFuture) {
>           result = client.predict(jsonStr);
> resultFuture.complete(Collections.singletonList(result));}}
> ------------------------------
> dag构建部分为:
> AsyncFunction<String, String> nlpMoaAsyncFunction = new
> SimpleAsyncFunction();
>
> DataStream<String> source = env.addSource(flinkKafkaConsumer010);
>
> DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
>         source,
>         nlpMoaAsyncFunction,
>         timeout,
>         TimeUnit.MILLISECONDS,
>         30);
>
> FlinkKafkaProducer010<String> kafkaProducer = new
> FlinkKafkaProducer010<String>(
>         ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
>         new SimpleStringSchema(),
>         producerProp
> );
>
> nlpResult.addSink(kafkaProducer);
>
> -------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm 一个slot。
> 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题?
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复

Kurt Young
这位朋友你的测试数据文明一点呀。。
ps:图片看不到

Best,
Kurt


On Mon, Oct 14, 2019 at 12:11 PM 曾耀武 <[hidden email]> wrote:

>
> 作业是没有failover 。执行是正常的。但是官方文档上不是说那个是保证exactly-once么?
> 我稍微改了一下代码,还是有同样的重复问题,好像在new task 的时候被重复new 了多次。这个不知道是为何,
> 而且是在同一个slot里面。
>
> 异步函数:
> 然后我在执行的时候,往kafka 打了一条数据,在yarn 上起了两个  TM,每个TM 一个slot
>
> 然后在一个TM 上打印了这个日志: 我的日志是有编号的[0]  这个,一条输入是一个需要外部检测的json。
>
> ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"}
> ====processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"}
> === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0}
> ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"}
> ====processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"}
> === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0}
>
>
>
> 有时候这些记录在flink 里经过处理之后, 日志打印了两边,好像是处理了两遍,但是 写到下游的kafka 可能只有1个。
> 理论期望值是一条输入对应一条输出。
>
> 但是在一百万的测试数据上,上游发送肯定是只发送了100万条,带编号递增的。 但是下游去重之后是100万,不去重就有一百多万,可能
> 会多好几万的样子。比较奇怪。
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Jark Wu"<[hidden email]>;
> *发送时间:* 2019年10月14日(星期一) 中午11:23
> *收件人:* "user-zh"<[hidden email]>;
> *主题:* Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复
>
> Hi,
>
> 从代码上看,并没有发现什么问题,给一些排查建议:
> 1)作业在这期间有 failover 过吗?Flink 写入 Kafka 目前默认不是端到端exactly once的。
> 2) “重复几十条”是怎么判断出来的呢? 会不会 kafka 源中的数据有重复呢?
>
> Best,
> Jark
>
> On Sun, 13 Oct 2019 at 12:55, 曾耀武 <[hidden email]> wrote:
>
> > 大家好,
> >
> >
> > 请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式
> > 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。
> >
> >
> > 我的异步Function 片段如下。
> >
> >
> > private static class SimpleAsyncFunction extends
> RichAsyncFunction<String,
> > String> {
> >     private static final long serialVersionUID = 2098635244857937717L;
> >
> >     private transient ExecutorService executorService;
> >     private  transient Client client ;
> >
> >     private final long shutdownWaitTS=1000;
> >
> >     @Override
> >     public void open(Configuration parameters) throws Exception {
> >         super.open(parameters);
> >
> >         executorService = new
> ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
> >                 new LinkedBlockingQueue<>(1000),
> >                 new ThreadPoolExecutor.CallerRunsPolicy());
> >          client= new Client();
> >     }
> >
> >     @Override
> >     public void close() throws Exception {
> >         super.close();
> >         ExecutorUtils.gracefulShutdown(shutdownWaitTS,
> > TimeUnit.MILLISECONDS, executorService);
> >     }
> >
> >     @Override
> >     public void asyncInvoke(final String jsonStr, final
> > ResultFuture<String> resultFuture) {
> >           result = client.predict(jsonStr);
> > resultFuture.complete(Collections.singletonList(result));}}
> > ------------------------------
> > dag构建部分为:
> > AsyncFunction<String, String> nlpMoaAsyncFunction = new
> > SimpleAsyncFunction();
> >
> > DataStream<String> source = env.addSource(flinkKafkaConsumer010);
> >
> > DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
> >         source,
> >         nlpMoaAsyncFunction,
> >         timeout,
> >         TimeUnit.MILLISECONDS,
> >         30);
> >
> > FlinkKafkaProducer010<String> kafkaProducer = new
> > FlinkKafkaProducer010<String>(
> >         ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
> >         new SimpleStringSchema(),
> >         producerProp
> > );
> >
> > nlpResult.addSink(kafkaProducer);
> >
> > -------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm
> 一个slot。
> > 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题?
>
Reply | Threaded
Open this post in threaded view
|

回复: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复

曾耀武
额, sorry
忽略测试数据这个是线上 ,异常用户检测的,这样才会命中模型。


图片是这个代码:


封装的task:
private static class MoaTask implements Callable<String> {

    private String jsonStr;
    private JudgeClient client;

    public MoaTask(String json, JudgeClient client) {
        this.jsonStr = json;
        this.client= client;
        System.out.println("===create new task :" + json);
    }
    @Override
    public String call() throws Exception {

        JSONObject jsonObject = JSON.parseObject(jsonStr);
        String business = jsonObject.getString("business");
        System.out.println("====processing asr data: " + jsonStr);
        String result = this.client.predict(jsonObject,"test");
        return result;
    }
}异步函数:private static class SimpleAsyncFunction extends RichAsyncFunction<String, String> {
    private static final long serialVersionUID = 2098635244857937717L;
    private transient ExecutorService executorService;
    private  transient JudgeClient moaJudgeClient ;
    private final long shutdownWaitTS=1000;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        executorService = new ThreadPoolExecutor(10,30,60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000),
                new ThreadPoolExecutor.CallerRunsPolicy());
        moaJudgeClient = new JudgeClient();
    }
    @Override
    public void close() throws Exception {
        super.close();
        ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
    }
    @Override
    public void asyncInvoke(final String jsonStr, final ResultFuture<String> resultFuture) {
        Future<String> future = executorService.submit(new MoaTask(jsonStr,moaJudgeClient));
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try{
                    return future.get();

                }catch (Exception e){
                    return null;
                }
            }
        }).thenAccept((result)->{
            resultFuture.complete(Collections.singleton(result));
        });
    }
}图构建:AsyncFunction<String, String> nlpMoaAsyncFunction = new SimpleAsyncFunction();
DataStream<String> source = env.addSource(flinkKafkaConsumer010);
DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
        source,
        nlpMoaAsyncFunction,
        timeout,
        TimeUnit.MILLISECONDS,
        30).setParallelism(2);
FlinkKafkaProducer010<String> kafkaProducer = new FlinkKafkaProducer010<String>(
        Constant.topic2,
        new SimpleStringSchema(),
        producerProp
);
nlpResult.addSink(kafkaProducer);


整体就是比较简单。






------------------ 原始邮件 ------------------
发件人: "Kurt Young"<[hidden email]>;
发送时间: 2019年10月14日(星期一) 晚上6:14
收件人: "user-zh"<[hidden email]>;

主题: Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复



这位朋友你的测试数据文明一点呀。。
ps:图片看不到

Best,
Kurt


On Mon, Oct 14, 2019 at 12:11 PM 曾耀武 <[hidden email]> wrote:

>
> 作业是没有failover 。执行是正常的。但是官方文档上不是说那个是保证exactly-once么?
> 我稍微改了一下代码,还是有同样的重复问题,好像在new task 的时候被重复new 了多次。这个不知道是为何,
> 而且是在同一个slot里面。
>
> 异步函数:
> 然后我在执行的时候,往kafka 打了一条数据,在yarn 上起了两个  TM,每个TM 一个slot
>
> 然后在一个TM 上打印了这个日志: 我的日志是有编号的[0]  这个,一条输入是一个需要外部检测的json。
>
> ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"}
> ====processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"}
> === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0}
> ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"}
> ====processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"}
> === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0}
>
>
>
> 有时候这些记录在flink 里经过处理之后, 日志打印了两边,好像是处理了两遍,但是 写到下游的kafka 可能只有1个。
> 理论期望值是一条输入对应一条输出。
>
> 但是在一百万的测试数据上,上游发送肯定是只发送了100万条,带编号递增的。 但是下游去重之后是100万,不去重就有一百多万,可能
> 会多好几万的样子。比较奇怪。
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Jark Wu"<[hidden email]>;
> *发送时间:* 2019年10月14日(星期一) 中午11:23
> *收件人:* "user-zh"<[hidden email]>;
> *主题:* Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复
>
> Hi,
>
> 从代码上看,并没有发现什么问题,给一些排查建议:
> 1)作业在这期间有 failover 过吗?Flink 写入 Kafka 目前默认不是端到端exactly once的。
> 2) “重复几十条”是怎么判断出来的呢? 会不会 kafka 源中的数据有重复呢?
>
> Best,
> Jark
>
> On Sun, 13 Oct 2019 at 12:55, 曾耀武 <[hidden email]> wrote:
>
> > 大家好,
> >
> >
> > 请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式
> > 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。
> >
> >
> > 我的异步Function 片段如下。
> >
> >
> > private static class SimpleAsyncFunction extends
> RichAsyncFunction<String,
> > String> {
> >     private static final long serialVersionUID = 2098635244857937717L;
> >
> >     private transient ExecutorService executorService;
> >     private  transient Client client ;
> >
> >     private final long shutdownWaitTS=1000;
> >
> >     @Override
> >     public void open(Configuration parameters) throws Exception {
> >         super.open(parameters);
> >
> >         executorService = new
> ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
> >                 new LinkedBlockingQueue<>(1000),
> >                 new ThreadPoolExecutor.CallerRunsPolicy());
> >          client= new Client();
> >     }
> >
> >     @Override
> >     public void close() throws Exception {
> >         super.close();
> >         ExecutorUtils.gracefulShutdown(shutdownWaitTS,
> > TimeUnit.MILLISECONDS, executorService);
> >     }
> >
> >     @Override
> >     public void asyncInvoke(final String jsonStr, final
> > ResultFuture<String> resultFuture) {
> >           result = client.predict(jsonStr);
> > resultFuture.complete(Collections.singletonList(result));}}
> > ------------------------------
> > dag构建部分为:
> > AsyncFunction<String, String> nlpMoaAsyncFunction = new
> > SimpleAsyncFunction();
> >
> > DataStream<String> source = env.addSource(flinkKafkaConsumer010);
> >
> > DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
> >         source,
> >         nlpMoaAsyncFunction,
> >         timeout,
> >         TimeUnit.MILLISECONDS,
> >         30);
> >
> > FlinkKafkaProducer010<String> kafkaProducer = new
> > FlinkKafkaProducer010<String>(
> >         ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
> >         new SimpleStringSchema(),
> >         producerProp
> > );
> >
> > nlpResult.addSink(kafkaProducer);
> >
> > -------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm
> 一个slot。
> > 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题?
>