String ddl =
"CREATE TABLE orders (\n"
+ " user_id INT,\n"
+ " product STRING,\n"
+ " amount INT,\n"
+ " `time` bigint,\n"
+ " `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(`time`)),\n"
+ " WATERMARK FOR ts AS ts - INTERVAL '3' SECOND\n"
+ ") WITH (\n"
+ " 'connector' = 'kafka',\n"
+ " 'topic' = 'test',\n"
+ " 'properties.group.id' = 'flink_ods1',\n"
+ " 'properties.bootstrap.servers' = ‘172.21.11.11:9092‘\n"
+ " 'format' = 'csv',\n"
+ " 'csv.ignore-parse-errors' = 'true',\n"
+ " 'csv.field-delimiter' = ',',\n"
+ " 'scan.startup.mode' = 'group-offsets'\n"
+ ")";
String ddl2 = "CREATE TABLE print_check2 (\n"
+ " `window_start` STRING,\n"
+ " `order_num` BIGINT,\n"
+ " `total_amount` BIGINT,\n"
+ " `unique_products` BIGINT\n"
+ ") WITH (\n"
+ " 'connector' = 'print'\n"
+ ")";
String ddl3 =
"insert into print_check2 SELECT\n"
+ " CAST(TUMBLE_START(ts, INTERVAL '5' SECOND) AS
STRING) window_start,\n"
+ " COUNT(*) order_num,\n"
+ " SUM(amount) total_amount,\n"
+ " COUNT(DISTINCT product) unique_products\n"
+ "FROM orders\n"
+ "GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)";
执行上述语句后,在1.12中print_check2表没有结果输出,而在1.11.3中则有结果输出。上述的对比在相同的kafka输入数据环境下。
<
https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=bigjar3344&uid=bigjar3344%40gmail.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22bigjar3344%40gmail.com%22%5D>