我们在使用Flink SQL 统计一分钟内各个股票的交易量,SQL代码如下:
CREATE TABLE t_stock_match_p_1( id VARCHAR, stkcode INT, volume INT, matchtime TIMESTAMP, WATERMARK FOR matchtime as matchtime ) WITH ( 'connector' = 'kafka-0.10', 'topic' = 'stock_match_p_1', 'scan.startup.mode' = 'latest-offset', 'properties.group.id' = 'stock_consume1', 'properties.bootstrap.servers' = 'sdp-10-88-100-93:6668', 'properties.key.deserializer' = 'org.apache.kafka.common.serialization.LongDeserializer', 'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer', 'format' = 'csv', 'csv.field-delimiter' = ',' ); CREATE TABLE t_stock_match_1( stkcode int, pd TIMESTAMP, volume INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.88.100.96:3306/flink_test?characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=UTC', 'table-name' = 't_stock_match', 'username' = 'root', 'driver' = 'com.mysql.jdbc.Driver', 'password' = '[hidden email]', 'sink.buffer-flush.max-rows' = '1', 'sink.buffer-flush.interval' = '1s' ); INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(matchtime, INTERVAL '1' MINUTE) as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(matchtime, INTERVAL '1' MINUTE),stkcode; 现在遇到问题时:任务提交后,正常运行,日志也无异常,但是没看到数据落入到mysql中,从监控界面看window中有数据流入,但是没有sent数据,监控指标如下,我特意让一支股票连续成交超过了1分钟,也没触动窗口计算,mysql中没有数据,监控指标中sink的Bytes Received Records Received 指标项有数据,但是Bytes Sent 和 Records Sent的指标为0,由于没有错误产生,我们无法分析原因,麻烦各位帮忙指点下可能哪里有问题? Name Status Bytes Received Records Received Bytes Sent Records Sent Parallelism Start Time Duration End Time Tasks Source: TableSourceScan(table=[[cpp_flink_catalog, default, t_stock_match_p_1]], fields=[id, stkcode, volume, matchtime]) -> WatermarkAssigner(rowtime=[matchtime], watermark=[matchtime]) -> Calc(select=[matchtime, stkcode, volume]) RUNNING 0 B01.14 27,87822021-02-26 13:48:1844m 3s- 2 GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$, matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume]) -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1], fields=[stkcode, pd, volume]) RUNNING 1.15 MB27,8780 B022021-02-26 13:48:1844m 3s- [hidden email] |
你好,
关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和 numRecordsOut,看是哪个算子开始有输入没输出的。 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。 GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$, matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume]) -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1], fields=[stkcode, pd, volume]) 对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect() 这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL 了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL 失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是 0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。 比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview 页面指标一直是 0 的。 <http://apache-flink.147419.n8.nabble.com/file/t1268/SourceIn0.png> <http://apache-flink.147419.n8.nabble.com/file/t1268/SinkIn.png> 祝好~ Smile -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
不是指标显示问题,是数据一直没写到mysql中,也没啥错误日志,然后今天早上我把任务重启了下,数据就全部写入到mysql中了
> 在 2021年2月26日,15:02,Smile <[hidden email]> 写道: > > 你好, > > 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和 > numRecordsOut,看是哪个算子开始有输入没输出的。 > 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。 > > GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$, > matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], > select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS > w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) > -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume]) > -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1], > fields=[stkcode, pd, volume]) > > 对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records > Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect() > 这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL > 了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL > 失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是 > 0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。 > > 比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview > 页面指标一直是 0 的。 > <http://apache-flink.147419.n8.nabble.com/file/t1268/SourceIn0.png> > <http://apache-flink.147419.n8.nabble.com/file/t1268/SinkIn.png> > > 祝好~ > Smile > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No Data,sink显示的是No Watermark
我的SQL语句如下: CREATE TABLE t_stock_match_p_1( id VARCHAR, stkcode INT, volume INT, matchtime BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka-0.10', 'topic' = 'stock_match_p_zyh', 'scan.startup.mode' = 'latest-offset', 'properties.group.id' = 'stock_match_p_zyh', 'properties.bootstrap.servers' = 'sdp-10-88-100-101:6668', 'properties.security.protocol' = 'SASL_SDP', 'properties.sasl.mechanism' = 'SDP', 'properties.key.deserializer' = 'org.apache.kafka.common.serialization.LongDeserializer', 'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer', 'format' = 'csv', 'csv.field-delimiter' = ',' ); CREATE TABLE t_stock_match_1 ( stkcode int, pd TIMESTAMP, volume INT ) WITH ( 'connector' = 'print' ); INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(ts, INTERVAL '1' MINUTE) as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),stkcode; 然后kafka的stock_match_p_zyh的topic中我手工输入了如下几条数据,Flink的WebUI的任务的stdout中没有数据输出,日志中也无错误产生,不知道问题发生在什么地方? 1503,600001,15811,1614405166858 1504,600001,15813,1614405333871 1505,600001,15814,1614405544862 1506,600001,15814,1614405673863 > 在 2021年2月26日,15:02,Smile <[hidden email]> 写道: > > 你好, > > 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和 > numRecordsOut,看是哪个算子开始有输入没输出的。 > 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。 > > GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$, > matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], > select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS > w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) > -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume]) > -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1], > fields=[stkcode, pd, volume]) > > 对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records > Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect() > 这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL > 了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL > 失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是 > 0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。 > > 比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview > 页面指标一直是 0 的。 > <http://apache-flink.147419.n8.nabble.com/file/t1268/SourceIn0.png> > <http://apache-flink.147419.n8.nabble.com/file/t1268/SinkIn.png> > > 祝好~ > Smile > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
1503,600001,15811,1614405166858
1504,600001,15813,1614405333871 1505,600001,15814,1614405544862 1506,600001,15814,1614405673863 就这几条数据,并行度设置为1 发件人: [hidden email] 发送时间: 2021-02-27 14:23 收件人: user-zh 主题: Re: Flink SQL 应用情况请教 这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No Data,sink显示的是No Watermark 我的SQL语句如下: CREATE TABLE t_stock_match_p_1( id VARCHAR, stkcode INT, volume INT, matchtime BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka-0.10', 'topic' = 'stock_match_p_zyh', 'scan.startup.mode' = 'latest-offset', 'properties.group.id' = 'stock_match_p_zyh', 'properties.bootstrap.servers' = 'sdp-10-88-100-101:6668', 'properties.security.protocol' = 'SASL_SDP', 'properties.sasl.mechanism' = 'SDP', 'properties.key.deserializer' = 'org.apache.kafka.common.serialization.LongDeserializer', 'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer', 'format' = 'csv', 'csv.field-delimiter' = ',' ); CREATE TABLE t_stock_match_1 ( stkcode int, pd TIMESTAMP, volume INT ) WITH ( 'connector' = 'print' ); INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(ts, INTERVAL '1' MINUTE) as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),stkcode; 然后kafka的stock_match_p_zyh的topic中我手工输入了如下几条数据,Flink的WebUI的任务的stdout中没有数据输出,日志中也无错误产生,不知道问题发生在什么地方? 1503,600001,15811,1614405166858 1504,600001,15813,1614405333871 1505,600001,15814,1614405544862 1506,600001,15814,1614405673863 > 在 2021年2月26日,15:02,Smile <[hidden email]> 写道: > > 你好, > > 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和 > numRecordsOut,看是哪个算子开始有输入没输出的。 > 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。 > > GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$, > matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], > select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS > w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) > -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume]) > -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1], > fields=[stkcode, pd, volume]) > > 对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records > Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect() > 这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL > 了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL > 失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是 > 0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。 > > 比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview > 页面指标一直是 0 的。 > <http://apache-flink.147419.n8.nabble.com/file/t1268/SourceIn0.png> > <http://apache-flink.147419.n8.nabble.com/file/t1268/SinkIn.png> > > 祝好~ > Smile > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
感谢您的答复,刚才看到您的答复后,紧急远程连接跑了下,stdout还真的有数据出来了,周一上班时间再好好测试下,万分感谢!
在 2021-02-27 19:08:25,"[hidden email]" <[hidden email]> 写道: >1503,600001,15811,1614405166858 >1504,600001,15813,1614405333871 >1505,600001,15814,1614405544862 >1506,600001,15814,1614405673863 >就这几条数据,并行度设置为1 > > > >发件人: [hidden email] >发送时间: 2021-02-27 14:23 >收件人: user-zh >主题: Re: Flink SQL 应用情况请教 >这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No Data,sink显示的是No Watermark >我的SQL语句如下: >CREATE TABLE t_stock_match_p_1( > id VARCHAR, > stkcode INT, > volume INT, > matchtime BIGINT, > ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'yyyy-MM-dd HH:mm:ss')), > WATERMARK FOR ts AS ts - INTERVAL '1' SECOND >) WITH ( > 'connector' = 'kafka-0.10', > 'topic' = 'stock_match_p_zyh', > 'scan.startup.mode' = 'latest-offset', > 'properties.group.id' = 'stock_match_p_zyh', > 'properties.bootstrap.servers' = 'sdp-10-88-100-101:6668', > 'properties.security.protocol' = 'SASL_SDP', > 'properties.sasl.mechanism' = 'SDP', > 'properties.key.deserializer' = 'org.apache.kafka.common.serialization.LongDeserializer', > 'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer', > 'format' = 'csv', > 'csv.field-delimiter' = ',' >); > >CREATE TABLE t_stock_match_1 ( > stkcode int, > pd TIMESTAMP, > volume INT >) WITH ( >'connector' = 'print' >); > >INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(ts, INTERVAL '1' MINUTE) as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),stkcode; > >然后kafka的stock_match_p_zyh的topic中我手工输入了如下几条数据,Flink的WebUI的任务的stdout中没有数据输出,日志中也无错误产生,不知道问题发生在什么地方? >1503,600001,15811,1614405166858 >1504,600001,15813,1614405333871 >1505,600001,15814,1614405544862 >1506,600001,15814,1614405673863 > > > > > >> 在 2021年2月26日,15:02,Smile <[hidden email]> 写道: >> >> 你好, >> >> 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和 >> numRecordsOut,看是哪个算子开始有输入没输出的。 >> 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。 >> >> GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$, >> matchtime, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], >> select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS >> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) >> -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume]) >> -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1], >> fields=[stkcode, pd, volume]) >> >> 对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records >> Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect() >> 这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL >> 了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL >> 失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是 >> 0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。 >> >> 比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview >> 页面指标一直是 0 的。 >> <http://apache-flink.147419.n8.nabble.com/file/t1268/SourceIn0.png> >> <http://apache-flink.147419.n8.nabble.com/file/t1268/SinkIn.png> >> >> 祝好~ >> Smile >> >> >> >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |