flink 1.11 SQL idea调试无数据也无报错

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11 SQL idea调试无数据也无报错

DanielGu
我遇到个问题,请教一下:
环境 1.11 idea
参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量
数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应
https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN
求助,各位



下面是pom 和代码,以及运行结果

        // 创建执行环境
        StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
        bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        //设置StateBackend
        bsEnv.setStateBackend(new
FsStateBackend("file:///tmp/flink/chkdir"));
        EnvironmentSettings bsSettings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);

        // Kafka
        String sourceDDL ="CREATE TABLE user_behavior (" +
                "user_id BIGINT," +
                "item_id BIGINT," +
                "category_id BIGINT," +
                "behavior STRING," +
                "ts TIMESTAMP (3)," +
                "proctime AS PROCTIME ()," +
                "WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " +
                "WITH (" +
                "'connector'='kafka'," +
                "'topic'='user_behavior'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'properties.bootstrap.servers'='localhost:9092'," +
                "'format'='json'" +
                ")";


        //写入es 改为print
/*        String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" +
                "hour_of_day BIGINT," +
                "buy_cnt BIGINT" +
                ") WITH (" +
                "'connector'='elasticsearch-7'," +
                "'hosts'='<a href="http://localhost:9200'">http://localhost:9200'," +
                "'index'='buy_cnt_per_hour')";*/
        String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" +
                 "hour_of_day BIGINT," +
                 "buy_cnt BIGINT" +
                ") WITH (\n" +
                " 'connector' = 'print'\n" +
                ")";


        String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
                "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
hour_of_day , COUNT(*) as buy_cnt\n" +
                "FROM user_behavior\n" +
                "WHERE behavior = 'buy'\n" +
                "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";



        //注册source和sink
        tEnv.executeSql(sourceDDL);
        tEnv.executeSql(sinkDDL);
//        tableResult.print();

       tEnv.executeSql(transformationDDL);

pom
<dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
           
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
           
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
           
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>

        </dependency>

       
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

       

       
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>

        </dependency>


       
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>

        </dependency>

       
        <dependency>
            <groupId>org.apache.flink</groupId>
           
<artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

       
        <dependency>
            <groupId>org.apache.flink</groupId>
           
<artifactId>flink-sql-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>



       
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

       
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

       
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
</dependencies>

运行结果
01:15:12,358 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  -
Kafka version: unknown
01:15:12,358 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  -
Kafka commitId: unknown
01:15:12,358 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  -
Kafka startTimeMs: 1597338912355
01:15:12,361 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
- [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s):
user_behavior-0
01:15:12,365 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
- [Consumer clientId=consumer-20, groupId=null] Seeking to EARLIEST offset
of partition user_behavior-0
01:15:12,377 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata  - [Consumer
clientId=consumer-20, groupId=null] Cluster ID: txkqox8yRL6aWBNsOcS67g
01:15:12,387 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
- [Consumer clientId=consumer-20, groupId=null] Resetting offset for
partition user_behavior-0 to offset 0.
01:15:12,545 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 1 (type=CHECKPOINT) @ 1597338912539 for job
c10220b65246e8269defa48f441a7e09.
01:15:12,709 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 1 for job c10220b65246e8269defa48f441a7e09 (14080
bytes in 169 ms).
01:15:17,541 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 2 (type=CHECKPOINT) @ 1597338917540 for job
c10220b65246e8269defa48f441a7e09.
01:15:17,553 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 2 for job c10220b65246e8269defa48f441a7e09 (14752
bytes in 11 ms).
01:15:22,546 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 3 (type=CHECKPOINT) @ 1597338922545 for job
c10220b65246e8269defa48f441a7e09.
01:15:22,558 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 3 for job c10220b65246e8269defa48f441a7e09 (15004
bytes in 12 ms).


原始数据

3>
+I(999602,4024409,883960,cart,2017-11-27T00:07:36,2020-08-13T17:16:20.440)
3> +I(30616,1693200,4022701,pv,2017-11-27T00:07:36,2020-08-13T17:16:20.440)
3> +I(145183,3533745,1102540,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3> +I(323010,3376212,1574064,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3> +I(944547,2640409,2465336,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3> +I(232939,1976318,411153,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3>
+I(355996,5161162,1582197,buy,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3> +I(443987,3791622,1464116,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 SQL idea调试无数据也无报错

Xingbo Huang
Hi,
这是因为flink
1.11里面executeSql是一个异步的接口,在idea里面跑的话,直接就结束了,你需要手动拿到那个executeSql的返回的TableResult,然后去

    tableResult.getJobClient.get()
      .getJobExecutionResult(Thread.currentThread().getContextClassLoader)
      .get()

进行wait job finished

Best,
Xingbo

DanielGu <[hidden email]> 于2020年8月14日周五 上午11:45写道:

> 我遇到个问题,请教一下:
> 环境 1.11 idea
> 参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量
> 数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应
> https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN
> 求助,各位
>
>
>
> 下面是pom 和代码,以及运行结果
>
>         // 创建执行环境
>         StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>         //设置StateBackend
>         bsEnv.setStateBackend(new
> FsStateBackend("file:///tmp/flink/chkdir"));
>         EnvironmentSettings bsSettings = EnvironmentSettings
>                 .newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);
>
>         // Kafka
>         String sourceDDL ="CREATE TABLE user_behavior (" +
>                 "user_id BIGINT," +
>                 "item_id BIGINT," +
>                 "category_id BIGINT," +
>                 "behavior STRING," +
>                 "ts TIMESTAMP (3)," +
>                 "proctime AS PROCTIME ()," +
>                 "WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " +
>                 "WITH (" +
>                 "'connector'='kafka'," +
>                 "'topic'='user_behavior'," +
>                 "'scan.startup.mode'='earliest-offset'," +
>                 "'properties.bootstrap.servers'='localhost:9092'," +
>                 "'format'='json'" +
>                 ")";
>
>
>         //写入es 改为print
> /*        String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" +
>                 "hour_of_day BIGINT," +
>                 "buy_cnt BIGINT" +
>                 ") WITH (" +
>                 "'connector'='elasticsearch-7'," +
>                 "'hosts'='<a href="http://localhost:9200'">http://localhost:9200'," +
>                 "'index'='buy_cnt_per_hour')";*/
>         String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" +
>                  "hour_of_day BIGINT," +
>                  "buy_cnt BIGINT" +
>                 ") WITH (\n" +
>                 " 'connector' = 'print'\n" +
>                 ")";
>
>
>         String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
>                 "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
> hour_of_day , COUNT(*) as buy_cnt\n" +
>                 "FROM user_behavior\n" +
>                 "WHERE behavior = 'buy'\n" +
>                 "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";
>
>
>
>         //注册source和sink
>         tEnv.executeSql(sourceDDL);
>         tEnv.executeSql(sinkDDL);
> //        tableResult.print();
>
>        tEnv.executeSql(transformationDDL);
>
> pom
> <dependencies>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>
>         </dependency>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-common</artifactId>
>             <version>${flink.version}</version>
>             <scope>provided</scope>
>         </dependency>
>
>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-clients_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>
>         </dependency>
>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-json</artifactId>
>             <version>${flink.version}</version>
>
>         </dependency>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-sql-connector-kafka_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>
>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>
>
>         <dependency>
>             <groupId>mysql</groupId>
>             <artifactId>mysql-connector-java</artifactId>
>             <version>${mysql.version}</version>
>         </dependency>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-runtime-web_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>             <scope>provided</scope>
>         </dependency>
> </dependencies>
>
> 运行结果
> 01:15:12,358 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> -
> Kafka version: unknown
> 01:15:12,358 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> -
> Kafka commitId: unknown
> 01:15:12,358 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> -
> Kafka startTimeMs: 1597338912355
> 01:15:12,361 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
>
> - [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s):
> user_behavior-0
> 01:15:12,365 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
>
> - [Consumer clientId=consumer-20, groupId=null] Seeking to EARLIEST offset
> of partition user_behavior-0
> 01:15:12,377 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata  -
> [Consumer
> clientId=consumer-20, groupId=null] Cluster ID: txkqox8yRL6aWBNsOcS67g
> 01:15:12,387 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
>
> - [Consumer clientId=consumer-20, groupId=null] Resetting offset for
> partition user_behavior-0 to offset 0.
> 01:15:12,545 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1 (type=CHECKPOINT) @ 1597338912539 for job
> c10220b65246e8269defa48f441a7e09.
> 01:15:12,709 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 1 for job c10220b65246e8269defa48f441a7e09 (14080
> bytes in 169 ms).
> 01:15:17,541 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 2 (type=CHECKPOINT) @ 1597338917540 for job
> c10220b65246e8269defa48f441a7e09.
> 01:15:17,553 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 2 for job c10220b65246e8269defa48f441a7e09 (14752
> bytes in 11 ms).
> 01:15:22,546 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 3 (type=CHECKPOINT) @ 1597338922545 for job
> c10220b65246e8269defa48f441a7e09.
> 01:15:22,558 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 3 for job c10220b65246e8269defa48f441a7e09 (15004
> bytes in 12 ms).
>
>
> 原始数据
>
> 3>
> +I(999602,4024409,883960,cart,2017-11-27T00:07:36,2020-08-13T17:16:20.440)
> 3> +I(30616,1693200,4022701,pv,2017-11-27T00:07:36,2020-08-13T17:16:20.440)
> 3>
> +I(145183,3533745,1102540,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
> 3>
> +I(323010,3376212,1574064,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
> 3>
> +I(944547,2640409,2465336,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
> 3> +I(232939,1976318,411153,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
> 3>
> +I(355996,5161162,1582197,buy,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
> 3>
> +I(443987,3791622,1464116,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 SQL idea调试无数据也无报错

DanielGu
hi,

之前查看邮件列表确实有看到很多地方提到executeSql是一个异步接口.但是我对这部分还是有一些疑惑


1.当inset into 的逻辑是简单逻辑的时候可以看到代码有输出,但替换为我最初发的有聚合逻辑的insert into sql
就无法显示输出了,为什么?
代码
...
        tEnv.executeSql(sourceDDL);
        tEnv.executeSql(sinkDDL);

        tEnv.executeSql("INSERT INTO print_sink SELECT  user_id
,item_id,category_id ,behavior ,ts,proctime FROM user_behavior");
...
控制台
3>
+I(1014646,2869046,4022701,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
3> +I(105950,191177,3975787,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
3>
+I(128322,5013356,4066962,buy,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
3> +I(225652,3487948,2462567,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847)

聚合逻辑代码(source不变,sink 对应变更列)

>         String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
>                 "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
> hour_of_day , COUNT(*) as buy_cnt\n" +
>                 "FROM user_behavior\n" +
>                 "WHERE behavior = 'buy'\n" +
>                 "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";
>
>
>
>         //注册source和sink
>         tEnv.executeSql(sourceDDL);
>         tEnv.executeSql(sinkDDL);
> //        tableResult.print();
>
>        tEnv.executeSql(transformationDDL);

2.没有太理解您说的   手动拿到那个executeSql的返回的TableResult,然后去 ....  wait job finished
代码修改为如下 运行控制台还是没有结果打印
        //注册source和sink
        tEnv.executeSql(sourceDDL);
        tEnv.executeSql(sinkDDL);

        TableResult tableResult = tEnv.executeSql(transformationDDL);

        tableResult.getJobClient()
                .get()
               
.getJobExecutionResult(Thread.currentThread().getContextClassLoader())
                .get().wait();

Best,
DanielGu



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 SQL idea调试无数据也无报错

DanielGu
hi,
flink 1.11 SQL idea调试 有其他伙伴了解吗?求赐教.
最近调试卡在这里..有点出不来了
十分感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 SQL idea调试无数据也无报错

godfrey he
> GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)
是要1个小时的window结束才会出结果。
你可以通过把window缩小或者设置early-fire来提早看到数据
table.exec.emit.early-fire.enabled=true
table.exec.emit.early-fire.delay=xx

>  手动拿到那个executeSql的返回的TableResult,然后去 ....  wait job finished
这个是为了防止本地ide里执行时executeSql执行结束后进程退出导致job也强制结束

DanielGu <[hidden email]> 于2020年8月17日周一 下午4:04写道:

> hi,
> flink 1.11 SQL idea调试 有其他伙伴了解吗?求赐教.
> 最近调试卡在这里..有点出不来了
> 十分感谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

回复: flink 1.11 SQL idea调试无数据也无报错

Daniel51
hi,
感谢解答,
&gt; GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)
我在source有进行设置[1],demo 是参考jark wu 的[2],数据是通过app 倍速写入kafka 的.

[1]"'scan.startup.mode'='earliest-offset',"&nbsp;
[2]https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN&nbsp;

&gt;&gt;&nbsp; 手动拿到那个executeSql的返回的TableResult,然后去 ....&nbsp; wait job finished
&gt;这个是为了防止本地ide里执行时executeSql执行结束后进程退出导致job也强制结束
意思只要


tableResult.getJobClient()
        .get()
        .getJobExecutionResult(Thread.currentThread().getContextClassLoader())
        .get(); 这样,run 之后没有自动退出就是正常了是吗?现在确实不会直接运行结束..但是print sink 还是没有输出