Flink SQL 应用情况请教

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL 应用情况请教

占英华
我们在使用Flink SQL 统计一分钟内各个股票的交易量,SQL代码如下:
CREATE TABLE t_stock_match_p_1(
  id VARCHAR,
  stkcode INT,
  volume INT,
  matchtime TIMESTAMP,
  WATERMARK  FOR matchtime as matchtime
 ) WITH (
  'connector' = 'kafka-0.10',
  'topic' = 'stock_match_p_1',
  'scan.startup.mode' = 'latest-offset',
  'properties.group.id' = 'stock_consume1',
  'properties.bootstrap.servers' = 'sdp-10-88-100-93:6668',
  'properties.key.deserializer' = 'org.apache.kafka.common.serialization.LongDeserializer',
  'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
  'format' = 'csv',
  'csv.field-delimiter' = ','
);
CREATE TABLE t_stock_match_1(
  stkcode int,
  pd TIMESTAMP,
  volume  INT  
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.88.100.96:3306/flink_test?characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=UTC',
    'table-name' = 't_stock_match',
    'username' = 'root',
    'driver' = 'com.mysql.jdbc.Driver',
    'password' = '[hidden email]',
    'sink.buffer-flush.max-rows' = '1',
    'sink.buffer-flush.interval' = '1s'
);

INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(matchtime, INTERVAL '1' MINUTE) as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(matchtime, INTERVAL '1' MINUTE),stkcode;

现在遇到问题时:任务提交后,正常运行,日志也无异常,但是没看到数据落入到mysql中,从监控界面看window中有数据流入,但是没有sent数据,监控指标如下,我特意让一支股票连续成交超过了1分钟,也没触动窗口计算,mysql中没有数据,监控指标中sink的Bytes Received  Records Received 指标项有数据,但是Bytes Sent 和 Records Sent的指标为0,由于没有错误产生,我们无法分析原因,麻烦各位帮忙指点下可能哪里有问题?
Name
Status
Bytes Received
Records Received
Bytes Sent
Records Sent
Parallelism
Start Time
Duration
End Time
Tasks
Source: TableSourceScan(table=[[cpp_flink_catalog, default, t_stock_match_p_1]], fields=[id, stkcode, volume, matchtime]) -> WatermarkAssigner(rowtime=[matchtime], watermark=[matchtime]) -> Calc(select=[matchtime, stkcode, volume])
RUNNING
0 B01.14 27,87822021-02-26 13:48:1844m 3s-
2
GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$, matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume]) -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1], fields=[stkcode, pd, volume])
RUNNING
1.15 MB27,8780 B022021-02-26 13:48:1844m 3s-





[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 应用情况请教

Smile
你好,

关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和
numRecordsOut,看是哪个算子开始有输入没输出的。
上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。

GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$,
matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
 -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume])
 -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1],
fields=[stkcode, pd, volume])

对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records
Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect()
这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL
了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL
失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是
0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。

比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview
页面指标一直是 0 的。
<http://apache-flink.147419.n8.nabble.com/file/t1268/SourceIn0.png>
<http://apache-flink.147419.n8.nabble.com/file/t1268/SinkIn.png>

祝好~
Smile






--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 应用情况请教

占英华
不是指标显示问题,是数据一直没写到mysql中,也没啥错误日志,然后今天早上我把任务重启了下,数据就全部写入到mysql中了

> 在 2021年2月26日,15:02,Smile <[hidden email]> 写道:
>
> 你好,
>
> 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和
> numRecordsOut,看是哪个算子开始有输入没输出的。
> 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。
>
> GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$,
> matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime],
> select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
> -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume])
> -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1],
> fields=[stkcode, pd, volume])
>
> 对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records
> Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect()
> 这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL
> 了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL
> 失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是
> 0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。
>
> 比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview
> 页面指标一直是 0 的。
> <http://apache-flink.147419.n8.nabble.com/file/t1268/SourceIn0.png>
> <http://apache-flink.147419.n8.nabble.com/file/t1268/SinkIn.png>
>
> 祝好~
> Smile
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 应用情况请教

占英华
这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No Data,sink显示的是No Watermark
我的SQL语句如下:
CREATE TABLE t_stock_match_p_1(
  id VARCHAR,
  stkcode INT,
  volume INT,
  matchtime BIGINT,
  ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'yyyy-MM-dd HH:mm:ss')),
  WATERMARK  FOR ts AS ts - INTERVAL '1' SECOND
 ) WITH (
  'connector' = 'kafka-0.10',
  'topic' = 'stock_match_p_zyh',
  'scan.startup.mode' = 'latest-offset',
  'properties.group.id' = 'stock_match_p_zyh',
  'properties.bootstrap.servers' = 'sdp-10-88-100-101:6668',
  'properties.security.protocol' = 'SASL_SDP',
  'properties.sasl.mechanism' = 'SDP',
  'properties.key.deserializer' = 'org.apache.kafka.common.serialization.LongDeserializer',
  'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
  'format' = 'csv',
  'csv.field-delimiter' = ','
);

CREATE TABLE t_stock_match_1 (
  stkcode int,
  pd TIMESTAMP,
  volume  INT
) WITH (
 'connector' = 'print'
);

INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(ts, INTERVAL '1' MINUTE) as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),stkcode;

然后kafka的stock_match_p_zyh的topic中我手工输入了如下几条数据,Flink的WebUI的任务的stdout中没有数据输出,日志中也无错误产生,不知道问题发生在什么地方?
1503,600001,15811,1614405166858
1504,600001,15813,1614405333871
1505,600001,15814,1614405544862
1506,600001,15814,1614405673863





 

> 在 2021年2月26日,15:02,Smile <[hidden email]> 写道:
>
> 你好,
>
> 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和
> numRecordsOut,看是哪个算子开始有输入没输出的。
> 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。
>
> GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$,
> matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime],
> select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
> -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume])
> -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1],
> fields=[stkcode, pd, volume])
>
> 对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records
> Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect()
> 这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL
> 了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL
> 失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是
> 0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。
>
> 比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview
> 页面指标一直是 0 的。
> <http://apache-flink.147419.n8.nabble.com/file/t1268/SourceIn0.png>
> <http://apache-flink.147419.n8.nabble.com/file/t1268/SinkIn.png>
>
> 祝好~
> Smile
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink SQL 应用情况请教

xgskj@126.com
1503,600001,15811,1614405166858
1504,600001,15813,1614405333871
1505,600001,15814,1614405544862
1506,600001,15814,1614405673863
就这几条数据,并行度设置为1


 
发件人: [hidden email]
发送时间: 2021-02-27 14:23
收件人: user-zh
主题: Re: Flink SQL 应用情况请教
这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No Data,sink显示的是No Watermark
我的SQL语句如下:
CREATE TABLE t_stock_match_p_1(
  id VARCHAR,
  stkcode INT,
  volume INT,
  matchtime BIGINT,
  ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'yyyy-MM-dd HH:mm:ss')),
  WATERMARK  FOR ts AS ts - INTERVAL '1' SECOND
) WITH (
  'connector' = 'kafka-0.10',
  'topic' = 'stock_match_p_zyh',
  'scan.startup.mode' = 'latest-offset',
  'properties.group.id' = 'stock_match_p_zyh',
  'properties.bootstrap.servers' = 'sdp-10-88-100-101:6668',
  'properties.security.protocol' = 'SASL_SDP',
  'properties.sasl.mechanism' = 'SDP',
  'properties.key.deserializer' = 'org.apache.kafka.common.serialization.LongDeserializer',
  'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
  'format' = 'csv',
  'csv.field-delimiter' = ','
);
 
CREATE TABLE t_stock_match_1 (
  stkcode int,
  pd TIMESTAMP,
  volume  INT
) WITH (
'connector' = 'print'
);
 
INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(ts, INTERVAL '1' MINUTE) as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),stkcode;
 
然后kafka的stock_match_p_zyh的topic中我手工输入了如下几条数据,Flink的WebUI的任务的stdout中没有数据输出,日志中也无错误产生,不知道问题发生在什么地方?
1503,600001,15811,1614405166858
1504,600001,15813,1614405333871
1505,600001,15814,1614405544862
1506,600001,15814,1614405673863
 
 
 
 
 

> 在 2021年2月26日,15:02,Smile <[hidden email]> 写道:
>
> 你好,
>
> 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和
> numRecordsOut,看是哪个算子开始有输入没输出的。
> 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。
>
> GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$,
> matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime],
> select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
> -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume])
> -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1],
> fields=[stkcode, pd, volume])
>
> 对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records
> Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect()
> 这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL
> 了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL
> 失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是
> 0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。
>
> 比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview
> 页面指标一直是 0 的。
> <http://apache-flink.147419.n8.nabble.com/file/t1268/SourceIn0.png>
> <http://apache-flink.147419.n8.nabble.com/file/t1268/SinkIn.png>
>
> 祝好~
> Smile
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Flink SQL 应用情况请教

占英华
感谢您的答复,刚才看到您的答复后,紧急远程连接跑了下,stdout还真的有数据出来了,周一上班时间再好好测试下,万分感谢!














在 2021-02-27 19:08:25,"[hidden email]" <[hidden email]> 写道:

>1503,600001,15811,1614405166858
>1504,600001,15813,1614405333871
>1505,600001,15814,1614405544862
>1506,600001,15814,1614405673863
>就这几条数据,并行度设置为1
>
>
>
>发件人: [hidden email]
>发送时间: 2021-02-27 14:23
>收件人: user-zh
>主题: Re: Flink SQL 应用情况请教
>这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No Data,sink显示的是No Watermark
>我的SQL语句如下:
>CREATE TABLE t_stock_match_p_1(
>  id VARCHAR,
>  stkcode INT,
>  volume INT,
>  matchtime BIGINT,
>  ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'yyyy-MM-dd HH:mm:ss')),
>  WATERMARK  FOR ts AS ts - INTERVAL '1' SECOND
>) WITH (
>  'connector' = 'kafka-0.10',
>  'topic' = 'stock_match_p_zyh',
>  'scan.startup.mode' = 'latest-offset',
>  'properties.group.id' = 'stock_match_p_zyh',
>  'properties.bootstrap.servers' = 'sdp-10-88-100-101:6668',
>  'properties.security.protocol' = 'SASL_SDP',
>  'properties.sasl.mechanism' = 'SDP',
>  'properties.key.deserializer' = 'org.apache.kafka.common.serialization.LongDeserializer',
>  'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
>  'format' = 'csv',
>  'csv.field-delimiter' = ','
>);
>
>CREATE TABLE t_stock_match_1 (
>  stkcode int,
>  pd TIMESTAMP,
>  volume  INT
>) WITH (
>'connector' = 'print'
>);
>
>INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(ts, INTERVAL '1' MINUTE) as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),stkcode;
>
>然后kafka的stock_match_p_zyh的topic中我手工输入了如下几条数据,Flink的WebUI的任务的stdout中没有数据输出,日志中也无错误产生,不知道问题发生在什么地方?
>1503,600001,15811,1614405166858
>1504,600001,15813,1614405333871
>1505,600001,15814,1614405544862
>1506,600001,15814,1614405673863
>
>
>
>
>
>> 在 2021年2月26日,15:02,Smile <[hidden email]> 写道:
>>
>> 你好,
>>
>> 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和
>> numRecordsOut,看是哪个算子开始有输入没输出的。
>> 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。
>>
>> GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$,
>> matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime],
>> select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS
>> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
>> -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume])
>> -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1],
>> fields=[stkcode, pd, volume])
>>
>> 对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records
>> Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect()
>> 这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL
>> 了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL
>> 失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是
>> 0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。
>>
>> 比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview
>> 页面指标一直是 0 的。
>> <http://apache-flink.147419.n8.nabble.com/file/t1268/SourceIn0.png>
>> <http://apache-flink.147419.n8.nabble.com/file/t1268/SinkIn.png>
>>
>> 祝好~
>> Smile
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/