flink1.10.1 写的 SQL 作业, 开始运行3个小时正常, checkpoint也正常. 然后,checkpoint失败了, 作业一直卡在RESTARTING 状态不动. TaskManager 日志: 2020-06-16 20:38:16,640 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-11, groupId=] Discovered group coordinator 172.16.30.165:9092 (id: 2147483645 rack: null) 2020-06-16 23:27:46,026 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10) CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) (5c29783b8f7ed8bfb1a7723f5c4216b1). 2020-06-16 23:27:46,027 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10) CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (2/2) (ed41475641eb3c58f7504d4a16a9c19b). 2020-06-16 23:27:46,034 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (1/2) (2d34cabe390aaadb88c2b861250b793a). 2020-06-16 23:27:46,036 INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (1/2) (2d34cabe390aaadb88c2b861250b793a) switched from RUNNING to FAILED. java.lang.Exception: Could not perform checkpoint 329 for operator Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (1/2). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/1681613747.call(Unknown Source) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: refCnt: 0 at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1464) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1448) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.readLong(AbstractByteBuf.java:843) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:964) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/890021961.run(Unknown Source) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776) ... 12 more 2020-06-16 23:27:46,045 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[order_id, shop_id, item_id, edit_time], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> LookupJoin(table=[JDBCTableSource(id, category_id_first, brand, category)], joinType=[InnerJoin], async=[false], lookup=[id=item_id], where=[brand IS NOT NULL], select=[order_id, shop_id, item_id, edit_time, id, brand]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id, brand, order_id]) (1/2) (0527ac291f32f14675f9d5bec8ef5369). 2020-06-16 23:27:46,038 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id, item_id, 1 AS $f3], where=[((_change_column jsonHasKey _UTF-16LE'"pay_time"') AND shop_id IS NOT NULL AND item_id IS NOT NULL)]) (2/2) (2d984d0e7df4ed2a6fa1d8892fcccefc). 2020-06-16 23:27:46,038 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id, item_id, 1 AS $f3], where=[((_change_column jsonHasKey _UTF-16LE'"pay_time"') AND shop_id IS NOT NULL AND item_id IS NOT NULL)]) (1/2) (32b2422a4619f4900c5a668dfb466ec9). 2020-06-16 23:27:46,037 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09). 2020-06-16 23:27:46,050 INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09) switched from RUNNING to FAILED. java.lang.Exception: Could not perform checkpoint 329 for operator Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (2/2). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/1681613747.call(Unknown Source) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(8) exceeds writerIndex(0): Buffer 1 (freed) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1451) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.readLong(AbstractByteBuf.java:843) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:964) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/890021961.run(Unknown Source) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776) ... 12 more 2020-06-16 23:27:46,051 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09). 2020-06-16 23:27:46,036 INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10) CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) (5c29783b8f7ed8bfb1a7723f5c4216b1) switched from RUNNING to FAILED. java.lang.Exception: Could not perform checkpoint 329 for operator Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, customer_amount, sharing_amount, logistic_type, logistic_pay_type, logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, end_time, is_deleted, creator, editor, create_time, edit_time, _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10) CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/1681613747.call(Unknown Source) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:964) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/890021961.run(Unknown Source) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776) ... 12 more 求指点啊.
|
Administrator
|
看起来是一个已知问题: https://issues.apache.org/jira/browse/FLINK-17479
On Wed, 17 Jun 2020 at 11:00, hb <[hidden email]> wrote: > flink1.10.1 写的 SQL 作业, 开始运行3个小时正常, checkpoint也正常. > 然后,checkpoint失败了, 作业一直卡在RESTARTING 状态不动. > > TaskManager 日志: > 2020-06-16 20:38:16,640 INFO org.apache.kafka.clients.consumer.internals. > AbstractCoordinator - [Consumer clientId=consumer-11, groupId=] Discovered > group coordinator 172.16.30.165:9092 (id: 2147483645 rack: null) > 2020-06-16 23:27:46,026 INFO org.apache.flink.runtime.taskmanager.Task - > Attempting to fail task externally Source: KafkaTableSource(id, order_id, > order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, > item_count, origin_single_item_amount, pay_amount, tax_amount, > item_actual_amount, customer_amount, sharing_amount, logistic_type, > logistic_pay_type, logistic_amount, attribute, spu_feature, spec, > spec_desc, item_picture, promotion_attr, order_status, refund_status, > biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, > stock_out_time, delivery_time, end_time, is_deleted, creator, editor, > create_time, edit_time, _change_column, _old_column, _ddl_field, > _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10) > CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT > _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS > setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE > '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) (5 > c29783b8f7ed8bfb1a7723f5c4216b1). > 2020-06-16 23:27:46,027 INFO org.apache.flink.runtime.taskmanager.Task - > Attempting to fail task externally Source: KafkaTableSource(id, order_id, > order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, > item_count, origin_single_item_amount, pay_amount, tax_amount, > item_actual_amount, customer_amount, sharing_amount, logistic_type, > logistic_pay_type, logistic_amount, attribute, spu_feature, spec, > spec_desc, item_picture, promotion_attr, order_status, refund_status, > biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, > stock_out_time, delivery_time, end_time, is_deleted, creator, editor, > create_time, edit_time, _change_column, _old_column, _ddl_field, > _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10) > CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT > _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS > setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE > '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (2/2) > (ed41475641eb3c58f7504d4a16a9c19b). > 2020-06-16 23:27:46,034 INFO org.apache.flink.runtime.taskmanager.Task - > Attempting to fail task externally Source: KafkaTableSource(id, order_id, > order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, > item_count, origin_single_item_amount, pay_amount, tax_amount, > item_actual_amount, customer_amount, sharing_amount, logistic_type, > logistic_pay_type, logistic_amount, attribute, spu_feature, spec, > spec_desc, item_picture, promotion_attr, order_status, refund_status, > biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, > stock_out_time, delivery_time, end_time, is_deleted, creator, editor, > create_time, edit_time, _change_column, _old_column, _ddl_field, > _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS > the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE > '"pay_time"')]) (1/2) (2d34cabe390aaadb88c2b861250b793a). > 2020-06-16 23:27:46,036 INFO org.apache.flink.runtime.taskmanager.Task - > Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS > the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE > '"pay_time"')]) (1/2) (2d34cabe390aaadb88c2b861250b793a) switched from > RUNNING to FAILED. > java.lang.Exception: Could not perform checkpoint 329 for operator Source: > KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS > the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE > '"pay_time"')]) (1/2). > at org.apache.flink.streaming.runtime.tasks.StreamTask > .triggerCheckpoint(StreamTask.java:785) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$triggerCheckpointAsync$3(StreamTask.java:760) > at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/ > 1681613747.call(Unknown Source) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at org.apache.flink.streaming.runtime.tasks. > StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run( > StreamTaskActionExecutor.java:87) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail > .java:78) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .processMail(MailboxProcessor.java:261) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:186) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:485) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:469) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.shaded.netty4.io.netty.util. > IllegalReferenceCountException: refCnt: 0 > at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf > .ensureAccessible(AbstractByteBuf.java:1464) > at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf > .checkReadableBytes0(AbstractByteBuf.java:1448) > at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf > .readLong(AbstractByteBuf.java:843) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .checkpointState(StreamTask.java:964) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$performCheckpoint$5(StreamTask.java:870) > at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/ > 890021961.run(Unknown Source) > at org.apache.flink.streaming.runtime.tasks. > StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( > StreamTaskActionExecutor.java:94) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .performCheckpoint(StreamTask.java:843) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .triggerCheckpoint(StreamTask.java:776) > ... 12 more > 2020-06-16 23:27:46,045 INFO org.apache.flink.runtime.taskmanager.Task - > Attempting to fail task externally Source: KafkaTableSource(id, order_id, > order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, > item_count, origin_single_item_amount, pay_amount, tax_amount, > item_actual_amount, customer_amount, sharing_amount, logistic_type, > logistic_pay_type, logistic_amount, attribute, spu_feature, spec, > spec_desc, item_picture, promotion_attr, order_status, refund_status, > biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, > stock_out_time, delivery_time, end_time, is_deleted, creator, editor, > create_time, edit_time, _change_column, _old_column, _ddl_field, > _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[order_id, shop_id, item_id, edit_time], > where=[(_change_column jsonHasKey _UTF-16LE'"pay_time"')]) -> LookupJoin > (table=[JDBCTableSource(id, category_id_first, brand, category)], > joinType=[InnerJoin], async=[false], lookup=[id=item_id], where=[brand IS > NOT NULL], select=[order_id, shop_id, item_id, edit_time, id, brand]) -> > Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS the_day, shop_id, > brand, order_id]) (1/2) (0527ac291f32f14675f9d5bec8ef5369). > 2020-06-16 23:27:46,038 INFO org.apache.flink.runtime.taskmanager.Task - > Attempting to fail task externally Source: KafkaTableSource(id, order_id, > order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, > item_count, origin_single_item_amount, pay_amount, tax_amount, > item_actual_amount, customer_amount, sharing_amount, logistic_type, > logistic_pay_type, logistic_amount, attribute, spu_feature, spec, > spec_desc, item_picture, promotion_attr, order_status, refund_status, > biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, > stock_out_time, delivery_time, end_time, is_deleted, creator, editor, > create_time, edit_time, _change_column, _old_column, _ddl_field, > _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS > the_day, shop_id, item_id, 1 AS $f3], where=[((_change_column jsonHasKey > _UTF-16LE'"pay_time"') AND shop_id IS NOT NULL AND item_id IS NOT NULL)]) > (2/2) (2d984d0e7df4ed2a6fa1d8892fcccefc). > 2020-06-16 23:27:46,038 INFO org.apache.flink.runtime.taskmanager.Task - > Attempting to fail task externally Source: KafkaTableSource(id, order_id, > order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, > item_count, origin_single_item_amount, pay_amount, tax_amount, > item_actual_amount, customer_amount, sharing_amount, logistic_type, > logistic_pay_type, logistic_amount, attribute, spu_feature, spec, > spec_desc, item_picture, promotion_attr, order_status, refund_status, > biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, > stock_out_time, delivery_time, end_time, is_deleted, creator, editor, > create_time, edit_time, _change_column, _old_column, _ddl_field, > _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS > the_day, shop_id, item_id, 1 AS $f3], where=[((_change_column jsonHasKey > _UTF-16LE'"pay_time"') AND shop_id IS NOT NULL AND item_id IS NOT NULL)]) > (1/2) (32b2422a4619f4900c5a668dfb466ec9). > 2020-06-16 23:27:46,037 INFO org.apache.flink.runtime.taskmanager.Task - > Attempting to fail task externally Source: KafkaTableSource(id, order_id, > order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name, > item_count, origin_single_item_amount, pay_amount, tax_amount, > item_actual_amount, customer_amount, sharing_amount, logistic_type, > logistic_pay_type, logistic_amount, attribute, spu_feature, spec, > spec_desc, item_picture, promotion_attr, order_status, refund_status, > biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, > stock_out_time, delivery_time, end_time, is_deleted, creator, editor, > create_time, edit_time, _change_column, _old_column, _ddl_field, > _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS > the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE > '"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09). > 2020-06-16 23:27:46,050 INFO org.apache.flink.runtime.taskmanager.Task - > Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS > the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE > '"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09) switched from > RUNNING to FAILED. > java.lang.Exception: Could not perform checkpoint 329 for operator Source: > KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS > the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE > '"pay_time"')]) (2/2). > at org.apache.flink.streaming.runtime.tasks.StreamTask > .triggerCheckpoint(StreamTask.java:785) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$triggerCheckpointAsync$3(StreamTask.java:760) > at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/ > 1681613747.call(Unknown Source) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at org.apache.flink.streaming.runtime.tasks. > StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run( > StreamTaskActionExecutor.java:87) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail > .java:78) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .processMail(MailboxProcessor.java:261) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:186) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:485) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:469) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(8) > exceeds writerIndex(0): Buffer 1 (freed) > at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf > .checkReadableBytes0(AbstractByteBuf.java:1451) > at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf > .readLong(AbstractByteBuf.java:843) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .checkpointState(StreamTask.java:964) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$performCheckpoint$5(StreamTask.java:870) > at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/ > 890021961.run(Unknown Source) > at org.apache.flink.streaming.runtime.tasks. > StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( > StreamTaskActionExecutor.java:94) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .performCheckpoint(StreamTask.java:843) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .triggerCheckpoint(StreamTask.java:776) > ... 12 more > 2020-06-16 23:27:46,051 INFO org.apache.flink.runtime.taskmanager.Task - > Triggering cancellation of task code Source: KafkaTableSource(id, > order_id, order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, > item_name, item_count, origin_single_item_amount, pay_amount, tax_amount, > item_actual_amount, customer_amount, sharing_amount, logistic_type, > logistic_pay_type, logistic_amount, attribute, spu_feature, spec, > spec_desc, item_picture, promotion_attr, order_status, refund_status, > biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time, > stock_out_time, delivery_time, end_time, is_deleted, creator, editor, > create_time, edit_time, _change_column, _old_column, _ddl_field, > _table_name, _db_name, _op_type, _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[(edit_time SUBSTRING 0 SUBSTRING 10) AS > the_day, shop_id, trade_id], where=[(_change_column jsonHasKey _UTF-16LE > '"pay_time"')]) (2/2) (65d0693dbe12af0d571601dfafd0ca09). > 2020-06-16 23:27:46,036 INFO org.apache.flink.runtime.taskmanager.Task - > Source: KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10) > CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT > _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS > setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE > '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) (5c29783b8f7ed8bfb1a7723f5c4216b1) > switched from RUNNING to FAILED. > java.lang.Exception: Could not perform checkpoint 329 for operator Source: > KafkaTableSource(id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time) -> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501, > source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id, > shop_id, supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id, > supply_id, sku_id, item_id, item_name, item_count, > origin_single_item_amount, pay_amount, tax_amount, item_actual_amount, > customer_amount, sharing_amount, logistic_type, logistic_pay_type, > logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture, > promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr, > bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time, > end_time, is_deleted, creator, editor, create_time, edit_time, > _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type, > _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10) > CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT > _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS > setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE > '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2). > at org.apache.flink.streaming.runtime.tasks.StreamTask > .triggerCheckpoint(StreamTask.java:785) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$triggerCheckpointAsync$3(StreamTask.java:760) > at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$429/ > 1681613747.call(Unknown Source) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at org.apache.flink.streaming.runtime.tasks. > StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run( > StreamTaskActionExecutor.java:87) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail > .java:78) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .processMail(MailboxProcessor.java:261) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:186) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:485) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:469) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at org.apache.flink.streaming.runtime.tasks.StreamTask > .checkpointState(StreamTask.java:964) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$performCheckpoint$5(StreamTask.java:870) > at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$430/ > 890021961.run(Unknown Source) > at org.apache.flink.streaming.runtime.tasks. > StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( > StreamTaskActionExecutor.java:94) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .performCheckpoint(StreamTask.java:843) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .triggerCheckpoint(StreamTask.java:776) > ... 12 more > > 求指点啊. > > > > |
Free forum by Nabble | Edit this page |