未生成水位线

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

未生成水位线

沉醉寒風
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        Map<String,String&gt; map = new HashMap<&gt;();
        map.put("table.exec.source.idle-timeout","1000 ms");
        executionEnvironment.getConfig().setGlobalJobParameters(ParameterTool.fromMap(map));
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(executionEnvironment, settings);
        streamTableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1));
        String catalogName  = "cat1";
        streamTableEnv.registerCatalog(catalogName,new GenericInMemoryCatalog(catalogName,"db1"));
        streamTableEnv.useCatalog(catalogName);
        streamTableEnv.executeSql("CREATE TABLE kafka_table2 (\n" +
                        "  user_id STRING,\n" +
                        "  order_amount DOUBLE,\n" +
                        "  log_ts TIMESTAMP(3),\n" +
                        "  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND\n"+
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'tp1',\n" +
                "  'properties.bootstrap.servers' = 'host1:9092',\n" +
                "  'properties.group.id' = 'testGroup',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'csv'\n" +
                ")");
        streamTableEnv.executeSql("select user_id," +
                "sum(order_amount) as amt," +
                "tumble_start(log_ts,INTERVAL '5' SECOND) as tumbleStart,"+
                "tumble_end(log_ts,INTERVAL '5' SECOND) as tumbleEnd " +
                "from kafka_table2 group by user_id,tumble(log_ts,INTERVAL '5' SECOND)").print();
kafka 是三个分区, 测试时只向一个分区发送数据, 发现3个并行都没有生成水位线,求解?
Reply | Threaded
Open this post in threaded view
|

Re: 未生成水位线

Jessica J.Wang
看一下 WaterMarkAssigner节点 是否有 数据流入



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

回复: 未生成水位线

沉醉寒風
有的



kafka 是3个分区, 但是只有一个分区有数据, flink是3个并行度

------------------ 原始邮件 ------------------
发件人: "user-zh" <[hidden email]>;
发送时间: 2021年1月29日(星期五) 下午5:30
收件人: "user-zh"<[hidden email]>;
主题: Re: 未生成水位线

看一下 WaterMarkAssigner节点 是否有 数据流入



--
Sent from: http://apache-flink.147419.n8.nabble.com/