Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环. 哪位帮忙看看,不胜感激. 2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> SourceConversion(table=[default_catalog.default_database.user_visit_trace, source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], fields=[userId, utp, utrp, extendFields, requestTime]) -> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/8) (ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED. java.lang.Exception: Could not perform checkpoint 401 for operator Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> SourceConversion(table=[default_catalog.default_database.user_visit_trace, source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], fields=[userId, utp, utrp, extendFields, requestTime]) -> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/8). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown Source) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown Source) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793) ... 12 more 2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> SourceConversion(table=[default_catalog.default_database.user_visit_trace, source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], fields=[userId, utp, utrp, extendFields, requestTime]) -> Calc(select=[(_UTF-16LE'search_rt:' CONCAT ((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':traffic:set:' CONCAT getItemId(extendFields)) AS redisKey, requestTime AS fieldName], where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL)]) -> SinkConversionToTuple2 -> Sink: Unnamed (3/8) (e0452995af60f6ed941b8dedd078def3). |
hi 请问你用的flink是哪个版本?StreamTask这个类里报了NPE,感觉是bug。
hb <[hidden email]> 于2020年6月5日周五 下午3:07写道: > Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB), > 但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环. > 哪位帮忙看看,不胜感激. > > > 2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task - > Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> > SourceConversion(table=[default_catalog.default_database.user_visit_trace, > source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], > fields=[userId, utp, utrp, extendFields, requestTime]) -> > Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], > where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") > AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS > NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], > joinType=[InnerJoin], async=[false], lookup=[user_id=userId], > where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id, > user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / > 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT > _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT > _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> > SinkConversionToTuple2 -> Sink: Unnamed (1/8) > (ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED. > java.lang.Exception: Could not perform checkpoint 401 for operator Source: > KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> > SourceConversion(table=[default_catalog.default_database.user_visit_trace, > source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], > fields=[userId, utp, utrp, extendFields, requestTime]) -> > Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], > where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") > AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS > NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], > joinType=[InnerJoin], async=[false], lookup=[user_id=userId], > where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id, > user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / > 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT > _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT > _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> > SinkConversionToTuple2 -> Sink: Unnamed (1/8). > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown > Source) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793) > ... 12 more > 2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task - > Attempting to fail task externally Source: KafkaTableSource(userId, utp, > utrp, extendFields, requestTime) -> > SourceConversion(table=[default_catalog.default_database.user_visit_trace, > source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], > fields=[userId, utp, utrp, extendFields, requestTime]) -> > Calc(select=[(_UTF-16LE'search_rt:' CONCAT ((CAST(requestTime) / 1000) > FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':traffic:set:' CONCAT > getItemId(extendFields)) AS redisKey, requestTime AS fieldName], > where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") > AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL)]) -> > SinkConversionToTuple2 -> Sink: Unnamed (3/8) > (e0452995af60f6ed941b8dedd078def3). > > |
In reply to this post by hb
我也遇到过这个问题,这个可能是 checkpoint 的 bug,我修改了下 flink 源码后不报错了,可以参考下这个patch https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
在 2020-06-05 15:06:48,"hb" <[hidden email]> 写道: >Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB), >但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环. >哪位帮忙看看,不胜感激. > > >2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> SourceConversion(table=[default_catalog.default_database.user_visit_trace, source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], fields=[userId, utp, utrp, extendFields, requestTime]) -> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/8) (ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED. >java.lang.Exception: Could not perform checkpoint 401 for operator Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> SourceConversion(table=[default_catalog.default_database.user_visit_trace, source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], fields=[userId, utp, utrp, extendFields, requestTime]) -> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/8). > at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802) > at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777) > at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown Source) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:745) >Caused by: java.lang.NullPointerException > at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411) > at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991) > at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887) > at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown Source) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860) > at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793) > ... 12 more >2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> SourceConversion(table=[default_catalog.default_database.user_visit_trace, source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], fields=[userId, utp, utrp, extendFields, requestTime]) -> Calc(select=[(_UTF-16LE'search_rt:' CONCAT ((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':traffic:set:' CONCAT getItemId(extendFields)) AS redisKey, requestTime AS fieldName], where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL)]) -> SinkConversionToTuple2 -> Sink: Unnamed (3/8) (e0452995af60f6ed941b8dedd078def3). > |
hi chenkaibit
欢迎将fix贡献回社区 chenkaibit <[hidden email]> 于2020年6月9日周二 上午10:34写道: > 我也遇到过这个问题,这个可能是 checkpoint 的 bug,我修改了下 flink 源码后不报错了,可以参考下这个patch > https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19 > > > 在 2020-06-05 15:06:48,"hb" <[hidden email]> 写道: > >Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB), > >但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环. > >哪位帮忙看看,不胜感激. > > > > > >2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task - > Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> > SourceConversion(table=[default_catalog.default_database.user_visit_trace, > source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], > fields=[userId, utp, utrp, extendFields, requestTime]) -> > Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], > where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") > AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS > NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], > joinType=[InnerJoin], async=[false], lookup=[user_id=userId], > where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id, > user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / > 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT > _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT > _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> > SinkConversionToTuple2 -> Sink: Unnamed (1/8) > (ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED. > >java.lang.Exception: Could not perform checkpoint 401 for operator > Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) -> > SourceConversion(table=[default_catalog.default_database.user_visit_trace, > source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], > fields=[userId, utp, utrp, extendFields, requestTime]) -> > Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime], > where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") > AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS > NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)], > joinType=[InnerJoin], async=[false], lookup=[user_id=userId], > where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id, > user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / > 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT > _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT > _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) -> > SinkConversionToTuple2 -> Sink: Unnamed (1/8). > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown > Source) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > > at java.lang.Thread.run(Thread.java:745) > >Caused by: java.lang.NullPointerException > > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown > Source) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793) > > ... 12 more > >2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task - > Attempting to fail task externally Source: KafkaTableSource(userId, utp, > utrp, extendFields, requestTime) -> > SourceConversion(table=[default_catalog.default_database.user_visit_trace, > source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]], > fields=[userId, utp, utrp, extendFields, requestTime]) -> > Calc(select=[(_UTF-16LE'search_rt:' CONCAT ((CAST(requestTime) / 1000) > FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':traffic:set:' CONCAT > getItemId(extendFields)) AS redisKey, requestTime AS fieldName], > where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") > AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') > > _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL)]) -> > SinkConversionToTuple2 -> Sink: Unnamed (3/8) > (e0452995af60f6ed941b8dedd078def3). > > > |
Free forum by Nabble | Edit this page |