|
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(3);
Map<String,String> map = new HashMap<>();
map.put("table.exec.source.idle-timeout","1000 ms");
executionEnvironment.getConfig().setGlobalJobParameters(ParameterTool.fromMap(map));
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(executionEnvironment, settings);
streamTableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1));
String catalogName = "cat1";
streamTableEnv.registerCatalog(catalogName,new GenericInMemoryCatalog(catalogName,"db1"));
streamTableEnv.useCatalog(catalogName);
streamTableEnv.executeSql("CREATE TABLE kafka_table2 (\n" +
" user_id STRING,\n" +
" order_amount DOUBLE,\n" +
" log_ts TIMESTAMP(3),\n" +
" WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND\n"+
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'tp1',\n" +
" 'properties.bootstrap.servers' = 'host1:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'csv'\n" +
")");
streamTableEnv.executeSql("select user_id," +
"sum(order_amount) as amt," +
"tumble_start(log_ts,INTERVAL '5' SECOND) as tumbleStart,"+
"tumble_end(log_ts,INTERVAL '5' SECOND) as tumbleEnd " +
"from kafka_table2 group by user_id,tumble(log_ts,INTERVAL '5' SECOND)").print();
kafka 是三个分区, 测试时只向一个分区发送数据, 发现3个并行都没有生成水位线,求解?
|