我遇到个问题,请教一下:
环境 1.11 idea 参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量 数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应 https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN 求助,各位 下面是pom 和代码,以及运行结果 // 创建执行环境 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); //设置StateBackend bsEnv.setStateBackend(new FsStateBackend("file:///tmp/flink/chkdir")); EnvironmentSettings bsSettings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // Kafka String sourceDDL ="CREATE TABLE user_behavior (" + "user_id BIGINT," + "item_id BIGINT," + "category_id BIGINT," + "behavior STRING," + "ts TIMESTAMP (3)," + "proctime AS PROCTIME ()," + "WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " + "WITH (" + "'connector'='kafka'," + "'topic'='user_behavior'," + "'scan.startup.mode'='earliest-offset'," + "'properties.bootstrap.servers'='localhost:9092'," + "'format'='json'" + ")"; //写入es 改为print /* String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" + "hour_of_day BIGINT," + "buy_cnt BIGINT" + ") WITH (" + "'connector'='elasticsearch-7'," + "'hosts'='<a href="http://localhost:9200'">http://localhost:9200'," + "'index'='buy_cnt_per_hour')";*/ String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" + "hour_of_day BIGINT," + "buy_cnt BIGINT" + ") WITH (\n" + " 'connector' = 'print'\n" + ")"; String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" + "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as hour_of_day , COUNT(*) as buy_cnt\n" + "FROM user_behavior\n" + "WHERE behavior = 'buy'\n" + "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)"; //注册source和sink tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); // tableResult.print(); tEnv.executeSql(transformationDDL); pom <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> </dependencies> 运行结果 01:15:12,358 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka version: unknown 01:15:12,358 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: unknown 01:15:12,358 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1597338912355 01:15:12,361 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s): user_behavior-0 01:15:12,365 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-20, groupId=null] Seeking to EARLIEST offset of partition user_behavior-0 01:15:12,377 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-20, groupId=null] Cluster ID: txkqox8yRL6aWBNsOcS67g 01:15:12,387 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-20, groupId=null] Resetting offset for partition user_behavior-0 to offset 0. 01:15:12,545 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 (type=CHECKPOINT) @ 1597338912539 for job c10220b65246e8269defa48f441a7e09. 01:15:12,709 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job c10220b65246e8269defa48f441a7e09 (14080 bytes in 169 ms). 01:15:17,541 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 (type=CHECKPOINT) @ 1597338917540 for job c10220b65246e8269defa48f441a7e09. 01:15:17,553 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job c10220b65246e8269defa48f441a7e09 (14752 bytes in 11 ms). 01:15:22,546 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 3 (type=CHECKPOINT) @ 1597338922545 for job c10220b65246e8269defa48f441a7e09. 01:15:22,558 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3 for job c10220b65246e8269defa48f441a7e09 (15004 bytes in 12 ms). 原始数据 3> +I(999602,4024409,883960,cart,2017-11-27T00:07:36,2020-08-13T17:16:20.440) 3> +I(30616,1693200,4022701,pv,2017-11-27T00:07:36,2020-08-13T17:16:20.440) 3> +I(145183,3533745,1102540,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(323010,3376212,1574064,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(944547,2640409,2465336,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(232939,1976318,411153,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(355996,5161162,1582197,buy,2017-11-27T00:07:37,2020-08-13T17:16:20.440) 3> +I(443987,3791622,1464116,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
这是因为flink 1.11里面executeSql是一个异步的接口,在idea里面跑的话,直接就结束了,你需要手动拿到那个executeSql的返回的TableResult,然后去 tableResult.getJobClient.get() .getJobExecutionResult(Thread.currentThread().getContextClassLoader) .get() 进行wait job finished Best, Xingbo DanielGu <[hidden email]> 于2020年8月14日周五 上午11:45写道: > 我遇到个问题,请教一下: > 环境 1.11 idea > 参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量 > 数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应 > https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN > 求助,各位 > > > > 下面是pom 和代码,以及运行结果 > > // 创建执行环境 > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > //设置StateBackend > bsEnv.setStateBackend(new > FsStateBackend("file:///tmp/flink/chkdir")); > EnvironmentSettings bsSettings = EnvironmentSettings > .newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, > bsSettings); > > // Kafka > String sourceDDL ="CREATE TABLE user_behavior (" + > "user_id BIGINT," + > "item_id BIGINT," + > "category_id BIGINT," + > "behavior STRING," + > "ts TIMESTAMP (3)," + > "proctime AS PROCTIME ()," + > "WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " + > "WITH (" + > "'connector'='kafka'," + > "'topic'='user_behavior'," + > "'scan.startup.mode'='earliest-offset'," + > "'properties.bootstrap.servers'='localhost:9092'," + > "'format'='json'" + > ")"; > > > //写入es 改为print > /* String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" + > "hour_of_day BIGINT," + > "buy_cnt BIGINT" + > ") WITH (" + > "'connector'='elasticsearch-7'," + > "'hosts'='<a href="http://localhost:9200'">http://localhost:9200'," + > "'index'='buy_cnt_per_hour')";*/ > String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" + > "hour_of_day BIGINT," + > "buy_cnt BIGINT" + > ") WITH (\n" + > " 'connector' = 'print'\n" + > ")"; > > > String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" + > "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as > hour_of_day , COUNT(*) as buy_cnt\n" + > "FROM user_behavior\n" + > "WHERE behavior = 'buy'\n" + > "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)"; > > > > //注册source和sink > tEnv.executeSql(sourceDDL); > tEnv.executeSql(sinkDDL); > // tableResult.print(); > > tEnv.executeSql(transformationDDL); > > pom > <dependencies> > > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> > <version>${flink.version}</version> > > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-table-planner-blink_${scala.version}</artifactId> > <version>${flink.version}</version> > > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-common</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > > > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_${scala.version}</artifactId> > <version>${flink.version}</version> > > </dependency> > > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>${flink.version}</version> > > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-sql-connector-kafka_${scala.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-jdbc_${scala.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > > <dependency> > <groupId>mysql</groupId> > <artifactId>mysql-connector-java</artifactId> > <version>${mysql.version}</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-runtime-web_${scala.version}</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > </dependencies> > > 运行结果 > 01:15:12,358 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser > - > Kafka version: unknown > 01:15:12,358 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser > - > Kafka commitId: unknown > 01:15:12,358 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser > - > Kafka startTimeMs: 1597338912355 > 01:15:12,361 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer > > - [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s): > user_behavior-0 > 01:15:12,365 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState > > - [Consumer clientId=consumer-20, groupId=null] Seeking to EARLIEST offset > of partition user_behavior-0 > 01:15:12,377 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata - > [Consumer > clientId=consumer-20, groupId=null] Cluster ID: txkqox8yRL6aWBNsOcS67g > 01:15:12,387 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState > > - [Consumer clientId=consumer-20, groupId=null] Resetting offset for > partition user_behavior-0 to offset 0. > 01:15:12,545 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 1 (type=CHECKPOINT) @ 1597338912539 for job > c10220b65246e8269defa48f441a7e09. > 01:15:12,709 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 1 for job c10220b65246e8269defa48f441a7e09 (14080 > bytes in 169 ms). > 01:15:17,541 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 2 (type=CHECKPOINT) @ 1597338917540 for job > c10220b65246e8269defa48f441a7e09. > 01:15:17,553 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 2 for job c10220b65246e8269defa48f441a7e09 (14752 > bytes in 11 ms). > 01:15:22,546 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 3 (type=CHECKPOINT) @ 1597338922545 for job > c10220b65246e8269defa48f441a7e09. > 01:15:22,558 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 3 for job c10220b65246e8269defa48f441a7e09 (15004 > bytes in 12 ms). > > > 原始数据 > > 3> > +I(999602,4024409,883960,cart,2017-11-27T00:07:36,2020-08-13T17:16:20.440) > 3> +I(30616,1693200,4022701,pv,2017-11-27T00:07:36,2020-08-13T17:16:20.440) > 3> > +I(145183,3533745,1102540,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) > 3> > +I(323010,3376212,1574064,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) > 3> > +I(944547,2640409,2465336,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) > 3> +I(232939,1976318,411153,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) > 3> > +I(355996,5161162,1582197,buy,2017-11-27T00:07:37,2020-08-13T17:16:20.440) > 3> > +I(443987,3791622,1464116,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440) > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
hi,
之前查看邮件列表确实有看到很多地方提到executeSql是一个异步接口.但是我对这部分还是有一些疑惑 1.当inset into 的逻辑是简单逻辑的时候可以看到代码有输出,但替换为我最初发的有聚合逻辑的insert into sql 就无法显示输出了,为什么? 代码 ... tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); tEnv.executeSql("INSERT INTO print_sink SELECT user_id ,item_id,category_id ,behavior ,ts,proctime FROM user_behavior"); ... 控制台 3> +I(1014646,2869046,4022701,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847) 3> +I(105950,191177,3975787,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847) 3> +I(128322,5013356,4066962,buy,2017-11-27T00:38:15,2020-08-14T08:20:23.847) 3> +I(225652,3487948,2462567,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847) 聚合逻辑代码(source不变,sink 对应变更列) > String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" + > "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as > hour_of_day , COUNT(*) as buy_cnt\n" + > "FROM user_behavior\n" + > "WHERE behavior = 'buy'\n" + > "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)"; > > > > //注册source和sink > tEnv.executeSql(sourceDDL); > tEnv.executeSql(sinkDDL); > // tableResult.print(); > > tEnv.executeSql(transformationDDL); 2.没有太理解您说的 手动拿到那个executeSql的返回的TableResult,然后去 .... wait job finished 代码修改为如下 运行控制台还是没有结果打印 //注册source和sink tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); TableResult tableResult = tEnv.executeSql(transformationDDL); tableResult.getJobClient() .get() .getJobExecutionResult(Thread.currentThread().getContextClassLoader()) .get().wait(); Best, DanielGu -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
hi,
flink 1.11 SQL idea调试 有其他伙伴了解吗?求赐教. 最近调试卡在这里..有点出不来了 十分感谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
> GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)
是要1个小时的window结束才会出结果。 你可以通过把window缩小或者设置early-fire来提早看到数据 table.exec.emit.early-fire.enabled=true table.exec.emit.early-fire.delay=xx > 手动拿到那个executeSql的返回的TableResult,然后去 .... wait job finished 这个是为了防止本地ide里执行时executeSql执行结束后进程退出导致job也强制结束 DanielGu <[hidden email]> 于2020年8月17日周一 下午4:04写道: > hi, > flink 1.11 SQL idea调试 有其他伙伴了解吗?求赐教. > 最近调试卡在这里..有点出不来了 > 十分感谢 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
hi,
感谢解答, > GROUP BY TUMBLE(ts, INTERVAL '1' HOUR) 我在source有进行设置[1],demo 是参考jark wu 的[2],数据是通过app 倍速写入kafka 的. [1]"'scan.startup.mode'='earliest-offset'," [2]https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN >> 手动拿到那个executeSql的返回的TableResult,然后去 .... wait job finished >这个是为了防止本地ide里执行时executeSql执行结束后进程退出导致job也强制结束 意思只要 tableResult.getJobClient() .get() .getJobExecutionResult(Thread.currentThread().getContextClassLoader()) .get(); 这样,run 之后没有自动退出就是正常了是吗?现在确实不会直接运行结束..但是print sink 还是没有输出 |
Free forum by Nabble | Edit this page |