select *, PROCTIME() as proctime from source 报异常SQL parse failed. Encountered ")" at ...
PROCTIME ()这方法好像不支持。 ---原始邮件--- 发件人: "Jark Wu"<[hidden email]> 发送时间: 2019年10月11日(星期五) 晚上7:23 收件人: "user-zh"<[hidden email]>; 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列 目前 o.proctime 在 DDL 暂时还声明不了,不可以可以使用嵌套 query 生成一个 proctime 字段。 "select ... from (select *, PROCTIME() as proctime from source)" On Fri, 11 Oct 2019 at 18:45, 迎风浪子 <[hidden email]> wrote: > 请问sql ddl如何定义o.proctime呢? > > > > ---原始邮件--- > 发件人: "Jark Wu"<[hidden email]&gt; > 发送时间: 2019年9月29日(星期日) 中午11:42 > 收件人: "user-zh"<[hidden email]&gt;; > 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列 > > > Hi, > > mysql 维表的 DDL 和 源表/结果表 的 DDL 是一样的。 例如下面这个 DDL 的声明,rates 在 flink sql 中既可以作为 > source 也可以是 sink,也可以是维表。 > > CREATE TABLE rates ( > &nbsp;&nbsp;&nbsp; currency VARCHAR, > &nbsp;&nbsp;&nbsp; rate BIGINT > ) WITH ( > &nbsp;&nbsp;&nbsp; 'connector.type' = 'jdbc', -- 使用 jdbc connector > &nbsp;&nbsp;&nbsp; 'connector.url' = > 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url > &nbsp;&nbsp;&nbsp; 'connector.table' = 'rates', -- 表名 > &nbsp;&nbsp;&nbsp; 'connector.username' = 'root', -- 用户名 > &nbsp;&nbsp;&nbsp; 'connector.password' = '123456', -- 密码 > ) > > 如果要用它作为维表的话,那么需要用到 temporal join 的语法。如: > > SELECT > &nbsp; o.amout, o.currency, r.rate, o.amount * r.rate > FROM > &nbsp; Orders AS o > &nbsp; JOIN rates FOR SYSTEM_TIME AS OF o.proctime AS r > &nbsp; ON r.currency = o.currency > > &nbsp;关于维表 join,可以查看官方文档: > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table > > > > > On Fri, 27 Sep 2019 at 14:12, yelun <[hidden email]&gt; wrote: > > &gt; Hi,各位大佬: > &gt; > &gt; 有没有使用flink-sql实现mysql维表的join的ddl和dml的示列的demo能够参考一下,非常感谢。 |
我试了一下云邪大佬的方法是可以的
> 在 2019年10月12日,上午9:16,迎风浪子 <[hidden email]> 写道: > > select *, PROCTIME() as proctime from source 报异常SQL parse failed. Encountered ")" at ... > PROCTIME ()这方法好像不支持。 > > > > ---原始邮件--- > 发件人: "Jark Wu"<[hidden email]> > 发送时间: 2019年10月11日(星期五) 晚上7:23 > 收件人: "user-zh"<[hidden email]>; > 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列 > > > 目前 o.proctime 在 DDL 暂时还声明不了,不可以可以使用嵌套 query 生成一个 proctime 字段。 > > "select ... > from (select *, PROCTIME() as proctime from source)" > > On Fri, 11 Oct 2019 at 18:45, 迎风浪子 <[hidden email]> wrote: > > > 请问sql ddl如何定义o.proctime呢? > > > > > > > > ---原始邮件--- > > 发件人: "Jark Wu"<[hidden email]&gt; > > 发送时间: 2019年9月29日(星期日) 中午11:42 > > 收件人: "user-zh"<[hidden email]&gt;; > > 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列 > > > > > > Hi, > > > > mysql 维表的 DDL 和 源表/结果表 的 DDL 是一样的。 例如下面这个 DDL 的声明,rates 在 flink sql 中既可以作为 > > source 也可以是 sink,也可以是维表。 > > > > CREATE TABLE rates ( > > &nbsp;&nbsp;&nbsp; currency VARCHAR, > > &nbsp;&nbsp;&nbsp; rate BIGINT > > ) WITH ( > > &nbsp;&nbsp;&nbsp; 'connector.type' = 'jdbc', -- 使用 jdbc connector > > &nbsp;&nbsp;&nbsp; 'connector.url' = > > 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url > > &nbsp;&nbsp;&nbsp; 'connector.table' = 'rates', -- 表名 > > &nbsp;&nbsp;&nbsp; 'connector.username' = 'root', -- 用户名 > > &nbsp;&nbsp;&nbsp; 'connector.password' = '123456', -- 密码 > > ) > > > > 如果要用它作为维表的话,那么需要用到 temporal join 的语法。如: > > > > SELECT > > &nbsp; o.amout, o.currency, r.rate, o.amount * r.rate > > FROM > > &nbsp; Orders AS o > > &nbsp; JOIN rates FOR SYSTEM_TIME AS OF o.proctime AS r > > &nbsp; ON r.currency = o.currency > > > > &nbsp;关于维表 join,可以查看官方文档: > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table > > > > > > > > > > On Fri, 27 Sep 2019 at 14:12, yelun <[hidden email]&gt; wrote: > > > > &gt; Hi,各位大佬: > > &gt; > > &gt; 有没有使用flink-sql实现mysql维表的join的ddl和dml的示列的demo能够参考一下,非常感谢。 |
请问这种创建dimtable ddl方式在flink sql-client的可以写吗,或者在注册一个catalog,谢谢。
> 在 2019年10月12日,下午2:12,yelun <[hidden email]> 写道: > > 我试了一下云邪大佬的方法是可以的 > >> 在 2019年10月12日,上午9:16,迎风浪子 <[hidden email]> 写道: >> >> select *, PROCTIME() as proctime from source 报异常SQL parse failed. Encountered ")" at ... >> PROCTIME ()这方法好像不支持。 >> >> >> >> ---原始邮件--- >> 发件人: "Jark Wu"<[hidden email]> >> 发送时间: 2019年10月11日(星期五) 晚上7:23 >> 收件人: "user-zh"<[hidden email]>; >> 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列 >> >> >> 目前 o.proctime 在 DDL 暂时还声明不了,不可以可以使用嵌套 query 生成一个 proctime 字段。 >> >> "select ... >> from (select *, PROCTIME() as proctime from source)" >> >> On Fri, 11 Oct 2019 at 18:45, 迎风浪子 <[hidden email]> wrote: >> >> > 请问sql ddl如何定义o.proctime呢? >> > >> > >> > >> > ---原始邮件--- >> > 发件人: "Jark Wu"<[hidden email]&gt; >> > 发送时间: 2019年9月29日(星期日) 中午11:42 >> > 收件人: "user-zh"<[hidden email]&gt;; >> > 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列 >> > >> > >> > Hi, >> > >> > mysql 维表的 DDL 和 源表/结果表 的 DDL 是一样的。 例如下面这个 DDL 的声明,rates 在 flink sql 中既可以作为 >> > source 也可以是 sink,也可以是维表。 >> > >> > CREATE TABLE rates ( >> > &nbsp;&nbsp;&nbsp; currency VARCHAR, >> > &nbsp;&nbsp;&nbsp; rate BIGINT >> > ) WITH ( >> > &nbsp;&nbsp;&nbsp; 'connector.type' = 'jdbc', -- 使用 jdbc connector >> > &nbsp;&nbsp;&nbsp; 'connector.url' = >> > 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url >> > &nbsp;&nbsp;&nbsp; 'connector.table' = 'rates', -- 表名 >> > &nbsp;&nbsp;&nbsp; 'connector.username' = 'root', -- 用户名 >> > &nbsp;&nbsp;&nbsp; 'connector.password' = '123456', -- 密码 >> > ) >> > >> > 如果要用它作为维表的话,那么需要用到 temporal join 的语法。如: >> > >> > SELECT >> > &nbsp; o.amout, o.currency, r.rate, o.amount * r.rate >> > FROM >> > &nbsp; Orders AS o >> > &nbsp; JOIN rates FOR SYSTEM_TIME AS OF o.proctime AS r >> > &nbsp; ON r.currency = o.currency >> > >> > &nbsp;关于维表 join,可以查看官方文档: >> > >> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table >> > >> > >> > >> > >> > On Fri, 27 Sep 2019 at 14:12, yelun <[hidden email]&gt; wrote: >> > >> > &gt; Hi,各位大佬: >> > &gt; >> > &gt; 有没有使用flink-sql实现mysql维表的join的ddl和dml的示列的demo能够参考一下,非常感谢。 > > |
Administrator
|
抱歉,SQL CLI 中目前暂不支持 DDL
> 在 2019年10月12日,15:17,yelun <[hidden email]> 写道: > > 请问这种创建dimtable ddl方式在flink sql-client的可以写吗,或者在注册一个catalog,谢谢。 > >> 在 2019年10月12日,下午2:12,yelun <[hidden email]> 写道: >> >> 我试了一下云邪大佬的方法是可以的 >> >>> 在 2019年10月12日,上午9:16,迎风浪子 <[hidden email]> 写道: >>> >>> select *, PROCTIME() as proctime from source 报异常SQL parse failed. Encountered ")" at ... >>> PROCTIME ()这方法好像不支持。 >>> >>> >>> >>> ---原始邮件--- >>> 发件人: "Jark Wu"<[hidden email]> >>> 发送时间: 2019年10月11日(星期五) 晚上7:23 >>> 收件人: "user-zh"<[hidden email]>; >>> 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列 >>> >>> >>> 目前 o.proctime 在 DDL 暂时还声明不了,不可以可以使用嵌套 query 生成一个 proctime 字段。 >>> >>> "select ... >>> from (select *, PROCTIME() as proctime from source)" >>> >>> On Fri, 11 Oct 2019 at 18:45, 迎风浪子 <[hidden email]> wrote: >>> >>> > 请问sql ddl如何定义o.proctime呢? >>> > >>> > >>> > >>> > ---原始邮件--- >>> > 发件人: "Jark Wu"<[hidden email]&gt; >>> > 发送时间: 2019年9月29日(星期日) 中午11:42 >>> > 收件人: "user-zh"<[hidden email]&gt;; >>> > 主题: Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列 >>> > >>> > >>> > Hi, >>> > >>> > mysql 维表的 DDL 和 源表/结果表 的 DDL 是一样的。 例如下面这个 DDL 的声明,rates 在 flink sql 中既可以作为 >>> > source 也可以是 sink,也可以是维表。 >>> > >>> > CREATE TABLE rates ( >>> > &nbsp;&nbsp;&nbsp; currency VARCHAR, >>> > &nbsp;&nbsp;&nbsp; rate BIGINT >>> > ) WITH ( >>> > &nbsp;&nbsp;&nbsp; 'connector.type' = 'jdbc', -- 使用 jdbc connector >>> > &nbsp;&nbsp;&nbsp; 'connector.url' = >>> > 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url >>> > &nbsp;&nbsp;&nbsp; 'connector.table' = 'rates', -- 表名 >>> > &nbsp;&nbsp;&nbsp; 'connector.username' = 'root', -- 用户名 >>> > &nbsp;&nbsp;&nbsp; 'connector.password' = '123456', -- 密码 >>> > ) >>> > >>> > 如果要用它作为维表的话,那么需要用到 temporal join 的语法。如: >>> > >>> > SELECT >>> > &nbsp; o.amout, o.currency, r.rate, o.amount * r.rate >>> > FROM >>> > &nbsp; Orders AS o >>> > &nbsp; JOIN rates FOR SYSTEM_TIME AS OF o.proctime AS r >>> > &nbsp; ON r.currency = o.currency >>> > >>> > &nbsp;关于维表 join,可以查看官方文档: >>> > >>> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table >>> > >>> > >>> > >>> > >>> > On Fri, 27 Sep 2019 at 14:12, yelun <[hidden email]&gt; wrote: >>> > >>> > &gt; Hi,各位大佬: >>> > &gt; >>> > &gt; 有没有使用flink-sql实现mysql维表的join的ddl和dml的示列的demo能够参考一下,非常感谢。 >> >> > > > |
In reply to this post by yelun
大家好,
请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。 我的异步Function 片段如下。 private static class SimpleAsyncFunction extends RichAsyncFunction<String, String> { private static final long serialVersionUID = 2098635244857937717L; private transient ExecutorService executorService; private transient Client client ; private final long shutdownWaitTS=1000; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); client= new Client(); } @Override public void close() throws Exception { super.close(); ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService); } @Override public void asyncInvoke(final String jsonStr, final ResultFuture<String> resultFuture) { result = client.predict(jsonStr); resultFuture.complete(Collections.singletonList(result));}} ------------------------------ dag构建部分为: AsyncFunction<String, String> nlpMoaAsyncFunction = new SimpleAsyncFunction(); DataStream<String> source = env.addSource(flinkKafkaConsumer010); DataStream<String> nlpResult = AsyncDataStream.unorderedWait( source, nlpMoaAsyncFunction, timeout, TimeUnit.MILLISECONDS, 30); FlinkKafkaProducer010<String> kafkaProducer = new FlinkKafkaProducer010<String>( ImmutableConstant.NLP_RESULT_KAFKA_TOPIC, new SimpleStringSchema(), producerProp ); nlpResult.addSink(kafkaProducer); -------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm 一个slot。 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题? |
Administrator
|
Hi,
从代码上看,并没有发现什么问题,给一些排查建议: 1)作业在这期间有 failover 过吗?Flink 写入 Kafka 目前默认不是端到端exactly once的。 2) “重复几十条”是怎么判断出来的呢? 会不会 kafka 源中的数据有重复呢? Best, Jark On Sun, 13 Oct 2019 at 12:55, 曾耀武 <[hidden email]> wrote: > 大家好, > > > 请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式 > 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。 > > > 我的异步Function 片段如下。 > > > private static class SimpleAsyncFunction extends RichAsyncFunction<String, > String> { > private static final long serialVersionUID = 2098635244857937717L; > > private transient ExecutorService executorService; > private transient Client client ; > > private final long shutdownWaitTS=1000; > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > > executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS, > new LinkedBlockingQueue<>(1000), > new ThreadPoolExecutor.CallerRunsPolicy()); > client= new Client(); > } > > @Override > public void close() throws Exception { > super.close(); > ExecutorUtils.gracefulShutdown(shutdownWaitTS, > TimeUnit.MILLISECONDS, executorService); > } > > @Override > public void asyncInvoke(final String jsonStr, final > ResultFuture<String> resultFuture) { > result = client.predict(jsonStr); > resultFuture.complete(Collections.singletonList(result));}} > ------------------------------ > dag构建部分为: > AsyncFunction<String, String> nlpMoaAsyncFunction = new > SimpleAsyncFunction(); > > DataStream<String> source = env.addSource(flinkKafkaConsumer010); > > DataStream<String> nlpResult = AsyncDataStream.unorderedWait( > source, > nlpMoaAsyncFunction, > timeout, > TimeUnit.MILLISECONDS, > 30); > > FlinkKafkaProducer010<String> kafkaProducer = new > FlinkKafkaProducer010<String>( > ImmutableConstant.NLP_RESULT_KAFKA_TOPIC, > new SimpleStringSchema(), > producerProp > ); > > nlpResult.addSink(kafkaProducer); > > -------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm 一个slot。 > 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题? |
作业是没有failover 。执行是正常的。但是官方文档上不是说那个是保证exactly-once么? 我稍微改了一下代码,还是有同样的重复问题,好像在new task 的时候被重复new 了多次。这个不知道是为何, 而且是在同一个slot里面。 异步函数: 然后我在执行的时候,往kafka 打了一条数据,在yarn 上起了两个 TM,每个TM 一个slot 然后在一个TM 上打印了这个日志: 我的日志是有编号的[0] 这个,一条输入是一个需要外部检测的json。 ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"} ====processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"} === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0} ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"} ====processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"} === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0} 有时候这些记录在flink 里经过处理之后, 日志打印了两边,好像是处理了两遍,但是 写到下游的kafka 可能只有1个。 理论期望值是一条输入对应一条输出。 但是在一百万的测试数据上,上游发送肯定是只发送了100万条,带编号递增的。 但是下游去重之后是100万,不去重就有一百多万,可能 会多好几万的样子。比较奇怪。 ------------------ 原始邮件 ------------------ 发件人: "Jark Wu"<[hidden email]>; 发送时间: 2019年10月14日(星期一) 中午11:23 收件人: "user-zh"<[hidden email]>; 主题: Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复 从代码上看,并没有发现什么问题,给一些排查建议: 1)作业在这期间有 failover 过吗?Flink 写入 Kafka 目前默认不是端到端exactly once的。 2) “重复几十条”是怎么判断出来的呢? 会不会 kafka 源中的数据有重复呢? Best, Jark On Sun, 13 Oct 2019 at 12:55, 曾耀武 <[hidden email]> wrote: > 大家好, > > > 请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式 > 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。 > > > 我的异步Function 片段如下。 > > > private static class SimpleAsyncFunction extends RichAsyncFunction<String, > String> { > private static final long serialVersionUID = 2098635244857937717L; > > private transient ExecutorService executorService; > private transient Client client ; > > private final long shutdownWaitTS=1000; > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > > executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS, > new LinkedBlockingQueue<>(1000), > new ThreadPoolExecutor.CallerRunsPolicy()); > client= new Client(); > } > > @Override > public void close() throws Exception { > super.close(); > ExecutorUtils.gracefulShutdown(shutdownWaitTS, > TimeUnit.MILLISECONDS, executorService); > } > > @Override > public void asyncInvoke(final String jsonStr, final > ResultFuture<String> resultFuture) { > result = client.predict(jsonStr); > resultFuture.complete(Collections.singletonList(result));}} > ------------------------------ > dag构建部分为: > AsyncFunction<String, String> nlpMoaAsyncFunction = new > SimpleAsyncFunction(); > > DataStream<String> source = env.addSource(flinkKafkaConsumer010); > > DataStream<String> nlpResult = AsyncDataStream.unorderedWait( > source, > nlpMoaAsyncFunction, > timeout, > TimeUnit.MILLISECONDS, > 30); > > FlinkKafkaProducer010<String> kafkaProducer = new > FlinkKafkaProducer010<String>( > ImmutableConstant.NLP_RESULT_KAFKA_TOPIC, > new SimpleStringSchema(), > producerProp > ); > > nlpResult.addSink(kafkaProducer); > > -------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm 一个slot。 > 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题? |
这位朋友你的测试数据文明一点呀。。
ps:图片看不到 Best, Kurt On Mon, Oct 14, 2019 at 12:11 PM 曾耀武 <[hidden email]> wrote: > > 作业是没有failover 。执行是正常的。但是官方文档上不是说那个是保证exactly-once么? > 我稍微改了一下代码,还是有同样的重复问题,好像在new task 的时候被重复new 了多次。这个不知道是为何, > 而且是在同一个slot里面。 > > 异步函数: > 然后我在执行的时候,往kafka 打了一条数据,在yarn 上起了两个 TM,每个TM 一个slot > > 然后在一个TM 上打印了这个日志: 我的日志是有编号的[0] 这个,一条输入是一个需要外部检测的json。 > > ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"} > ====processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"} > === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0} > ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"} > ====processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"} > === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0} > > > > 有时候这些记录在flink 里经过处理之后, 日志打印了两边,好像是处理了两遍,但是 写到下游的kafka 可能只有1个。 > 理论期望值是一条输入对应一条输出。 > > 但是在一百万的测试数据上,上游发送肯定是只发送了100万条,带编号递增的。 但是下游去重之后是100万,不去重就有一百多万,可能 > 会多好几万的样子。比较奇怪。 > > > > > > ------------------ 原始邮件 ------------------ > *发件人:* "Jark Wu"<[hidden email]>; > *发送时间:* 2019年10月14日(星期一) 中午11:23 > *收件人:* "user-zh"<[hidden email]>; > *主题:* Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复 > > Hi, > > 从代码上看,并没有发现什么问题,给一些排查建议: > 1)作业在这期间有 failover 过吗?Flink 写入 Kafka 目前默认不是端到端exactly once的。 > 2) “重复几十条”是怎么判断出来的呢? 会不会 kafka 源中的数据有重复呢? > > Best, > Jark > > On Sun, 13 Oct 2019 at 12:55, 曾耀武 <[hidden email]> wrote: > > > 大家好, > > > > > > 请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式 > > 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。 > > > > > > 我的异步Function 片段如下。 > > > > > > private static class SimpleAsyncFunction extends > RichAsyncFunction<String, > > String> { > > private static final long serialVersionUID = 2098635244857937717L; > > > > private transient ExecutorService executorService; > > private transient Client client ; > > > > private final long shutdownWaitTS=1000; > > > > @Override > > public void open(Configuration parameters) throws Exception { > > super.open(parameters); > > > > executorService = new > ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS, > > new LinkedBlockingQueue<>(1000), > > new ThreadPoolExecutor.CallerRunsPolicy()); > > client= new Client(); > > } > > > > @Override > > public void close() throws Exception { > > super.close(); > > ExecutorUtils.gracefulShutdown(shutdownWaitTS, > > TimeUnit.MILLISECONDS, executorService); > > } > > > > @Override > > public void asyncInvoke(final String jsonStr, final > > ResultFuture<String> resultFuture) { > > result = client.predict(jsonStr); > > resultFuture.complete(Collections.singletonList(result));}} > > ------------------------------ > > dag构建部分为: > > AsyncFunction<String, String> nlpMoaAsyncFunction = new > > SimpleAsyncFunction(); > > > > DataStream<String> source = env.addSource(flinkKafkaConsumer010); > > > > DataStream<String> nlpResult = AsyncDataStream.unorderedWait( > > source, > > nlpMoaAsyncFunction, > > timeout, > > TimeUnit.MILLISECONDS, > > 30); > > > > FlinkKafkaProducer010<String> kafkaProducer = new > > FlinkKafkaProducer010<String>( > > ImmutableConstant.NLP_RESULT_KAFKA_TOPIC, > > new SimpleStringSchema(), > > producerProp > > ); > > > > nlpResult.addSink(kafkaProducer); > > > > -------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm > 一个slot。 > > 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题? > |
额, sorry
忽略测试数据这个是线上 ,异常用户检测的,这样才会命中模型。 图片是这个代码: 封装的task: private static class MoaTask implements Callable<String> { private String jsonStr; private JudgeClient client; public MoaTask(String json, JudgeClient client) { this.jsonStr = json; this.client= client; System.out.println("===create new task :" + json); } @Override public String call() throws Exception { JSONObject jsonObject = JSON.parseObject(jsonStr); String business = jsonObject.getString("business"); System.out.println("====processing asr data: " + jsonStr); String result = this.client.predict(jsonObject,"test"); return result; } }异步函数:private static class SimpleAsyncFunction extends RichAsyncFunction<String, String> { private static final long serialVersionUID = 2098635244857937717L; private transient ExecutorService executorService; private transient JudgeClient moaJudgeClient ; private final long shutdownWaitTS=1000; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); executorService = new ThreadPoolExecutor(10,30,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); moaJudgeClient = new JudgeClient(); } @Override public void close() throws Exception { super.close(); ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService); } @Override public void asyncInvoke(final String jsonStr, final ResultFuture<String> resultFuture) { Future<String> future = executorService.submit(new MoaTask(jsonStr,moaJudgeClient)); CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try{ return future.get(); }catch (Exception e){ return null; } } }).thenAccept((result)->{ resultFuture.complete(Collections.singleton(result)); }); } }图构建:AsyncFunction<String, String> nlpMoaAsyncFunction = new SimpleAsyncFunction(); DataStream<String> source = env.addSource(flinkKafkaConsumer010); DataStream<String> nlpResult = AsyncDataStream.unorderedWait( source, nlpMoaAsyncFunction, timeout, TimeUnit.MILLISECONDS, 30).setParallelism(2); FlinkKafkaProducer010<String> kafkaProducer = new FlinkKafkaProducer010<String>( Constant.topic2, new SimpleStringSchema(), producerProp ); nlpResult.addSink(kafkaProducer); 整体就是比较简单。 ------------------ 原始邮件 ------------------ 发件人: "Kurt Young"<[hidden email]>; 发送时间: 2019年10月14日(星期一) 晚上6:14 收件人: "user-zh"<[hidden email]>; 主题: Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复 这位朋友你的测试数据文明一点呀。。 ps:图片看不到 Best, Kurt On Mon, Oct 14, 2019 at 12:11 PM 曾耀武 <[hidden email]> wrote: > > 作业是没有failover 。执行是正常的。但是官方文档上不是说那个是保证exactly-once么? > 我稍微改了一下代码,还是有同样的重复问题,好像在new task 的时候被重复new 了多次。这个不知道是为何, > 而且是在同一个slot里面。 > > 异步函数: > 然后我在执行的时候,往kafka 打了一条数据,在yarn 上起了两个 TM,每个TM 一个slot > > 然后在一个TM 上打印了这个日志: 我的日志是有编号的[0] 这个,一条输入是一个需要外部检测的json。 > > ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"} > ====processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"} > === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0} > ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"} > ====processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47","sid":"test"} > === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0} > > > > 有时候这些记录在flink 里经过处理之后, 日志打印了两边,好像是处理了两遍,但是 写到下游的kafka 可能只有1个。 > 理论期望值是一条输入对应一条输出。 > > 但是在一百万的测试数据上,上游发送肯定是只发送了100万条,带编号递增的。 但是下游去重之后是100万,不去重就有一百多万,可能 > 会多好几万的样子。比较奇怪。 > > > > > > ------------------ 原始邮件 ------------------ > *发件人:* "Jark Wu"<[hidden email]>; > *发送时间:* 2019年10月14日(星期一) 中午11:23 > *收件人:* "user-zh"<[hidden email]>; > *主题:* Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复 > > Hi, > > 从代码上看,并没有发现什么问题,给一些排查建议: > 1)作业在这期间有 failover 过吗?Flink 写入 Kafka 目前默认不是端到端exactly once的。 > 2) “重复几十条”是怎么判断出来的呢? 会不会 kafka 源中的数据有重复呢? > > Best, > Jark > > On Sun, 13 Oct 2019 at 12:55, 曾耀武 <[hidden email]> wrote: > > > 大家好, > > > > > > 请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式 > > 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。 > > > > > > 我的异步Function 片段如下。 > > > > > > private static class SimpleAsyncFunction extends > RichAsyncFunction<String, > > String> { > > private static final long serialVersionUID = 2098635244857937717L; > > > > private transient ExecutorService executorService; > > private transient Client client ; > > > > private final long shutdownWaitTS=1000; > > > > @Override > > public void open(Configuration parameters) throws Exception { > > super.open(parameters); > > > > executorService = new > ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS, > > new LinkedBlockingQueue<>(1000), > > new ThreadPoolExecutor.CallerRunsPolicy()); > > client= new Client(); > > } > > > > @Override > > public void close() throws Exception { > > super.close(); > > ExecutorUtils.gracefulShutdown(shutdownWaitTS, > > TimeUnit.MILLISECONDS, executorService); > > } > > > > @Override > > public void asyncInvoke(final String jsonStr, final > > ResultFuture<String> resultFuture) { > > result = client.predict(jsonStr); > > resultFuture.complete(Collections.singletonList(result));}} > > ------------------------------ > > dag构建部分为: > > AsyncFunction<String, String> nlpMoaAsyncFunction = new > > SimpleAsyncFunction(); > > > > DataStream<String> source = env.addSource(flinkKafkaConsumer010); > > > > DataStream<String> nlpResult = AsyncDataStream.unorderedWait( > > source, > > nlpMoaAsyncFunction, > > timeout, > > TimeUnit.MILLISECONDS, > > 30); > > > > FlinkKafkaProducer010<String> kafkaProducer = new > > FlinkKafkaProducer010<String>( > > ImmutableConstant.NLP_RESULT_KAFKA_TOPIC, > > new SimpleStringSchema(), > > producerProp > > ); > > > > nlpResult.addSink(kafkaProducer); > > > > -------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm > 一个slot。 > > 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题? > |
Free forum by Nabble | Edit this page |