hi :
flink table sql 1.11.0 在EnvironmentSettings中可以设置BatchMode或StreamingMode EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() // .inStreamingMode() .inBatchMode() .build(); 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 , 不知道大佬们有没有例子可以比较容易理解 我的代码 EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() // .inStreamingMode() .inBatchMode() .build(); TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings); tableEnvironment.executeSql("CREATE TABLE mysql_source ( " + " id bigint, " + " game_id varchar, " + " PRIMARY KEY (id) NOT ENFORCED " + " ) " + " with ( " + "'connector' = 'jdbc', " + " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + " 'username' = 'root' , " + " 'password' = 'root', " + " 'table-name' = 'mysqlsink' , " + " 'driver' = 'com.mysql.cj.jdbc.Driver' , " + " 'sink.buffer-flush.interval' = '2s', " + " 'sink.buffer-flush.max-rows' = '300' " + " )"); tableEnvironment.executeSql("CREATE TABLE print_sink ( " + " id bigint, " + " game_id varchar, " + " PRIMARY KEY (id) NOT ENFORCED " + " ) " + " with ( " + "'connector' = 'print' " + " )"); tableEnvironment.executeSql("insert into print_sink select id,game_id from mysql_source"); |
逻辑上批产生的结果是Table,流产生的结果是Changelog。
你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。 最简单的方式可以将query改为带group by的,再看结果的差异。 更多关于Table和Changelog的概念可以参考 [1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html chenxuying <[hidden email]> 于2020年8月4日周二 上午11:44写道: > hi : > flink table sql 1.11.0 > 在EnvironmentSettings中可以设置BatchMode或StreamingMode > > > EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() > // .inStreamingMode() > .inBatchMode() > .build(); > > > 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 , > 不知道大佬们有没有例子可以比较容易理解 > 我的代码 > EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() > // .inStreamingMode() > .inBatchMode() > .build(); > TableEnvironment tableEnvironment = > TableEnvironment.create(environmentSettings); > tableEnvironment.executeSql("CREATE TABLE mysql_source ( " + > " id bigint, " + > " game_id varchar, " + > " PRIMARY KEY (id) NOT ENFORCED " + > " ) " + > " with ( " + > "'connector' = 'jdbc', " + > " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + > " 'username' = 'root' , " + > " 'password' = 'root', " + > " 'table-name' = 'mysqlsink' , " + > " 'driver' = 'com.mysql.cj.jdbc.Driver' , " + > " 'sink.buffer-flush.interval' = '2s', " + > " 'sink.buffer-flush.max-rows' = '300' " + > " )"); > tableEnvironment.executeSql("CREATE TABLE print_sink ( " + > " id bigint, " + > " game_id varchar, " + > " PRIMARY KEY (id) NOT ENFORCED " + > " ) " + > " with ( " + > "'connector' = 'print' " + > " )"); > tableEnvironment.executeSql("insert into print_sink select id,game_id from > mysql_source"); |
你好,请问下我修改后的语句是
insert into print_sink select game_id,count(id) from mysql_source group by game_id 然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下 2> +I(12,1) 5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2) 6> -U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4) 然后如果我使用的是batchMode,他就报错了 org.apache.flink.util.FlinkException: Error while shutting the TaskExecutor down. at org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440) at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425) ... Caused by: java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.util.JavaGcCleanerWrapper at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ... at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422) ... 21 more Suppressed: org.apache.flink.util.FlinkException: Could not properly shut down the TaskManager services. at org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236) at org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462) at... ... 21 more Caused by: org.apache.flink.util.FlinkException: Could not close resource. at org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42) at org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204) ... 37 more Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.util.JavaGcCleanerWrapper at org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94) at org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64) ... at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422) ... 21 more [CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.util.JavaGcCleanerWrapper] 不知道您是否知道原因 在 2020-08-04 12:11:32,"godfrey he" <[hidden email]> 写道: >逻辑上批产生的结果是Table,流产生的结果是Changelog。 >你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。 >最简单的方式可以将query改为带group by的,再看结果的差异。 >更多关于Table和Changelog的概念可以参考 [1] > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html > >chenxuying <[hidden email]> 于2020年8月4日周二 上午11:44写道: > >> hi : >> flink table sql 1.11.0 >> 在EnvironmentSettings中可以设置BatchMode或StreamingMode >> >> >> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() >> // .inStreamingMode() >> .inBatchMode() >> .build(); >> >> >> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 , >> 不知道大佬们有没有例子可以比较容易理解 >> 我的代码 >> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() >> // .inStreamingMode() >> .inBatchMode() >> .build(); >> TableEnvironment tableEnvironment = >> TableEnvironment.create(environmentSettings); >> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " + >> " id bigint, " + >> " game_id varchar, " + >> " PRIMARY KEY (id) NOT ENFORCED " + >> " ) " + >> " with ( " + >> "'connector' = 'jdbc', " + >> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + >> " 'username' = 'root' , " + >> " 'password' = 'root', " + >> " 'table-name' = 'mysqlsink' , " + >> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " + >> " 'sink.buffer-flush.interval' = '2s', " + >> " 'sink.buffer-flush.max-rows' = '300' " + >> " )"); >> tableEnvironment.executeSql("CREATE TABLE print_sink ( " + >> " id bigint, " + >> " game_id varchar, " + >> " PRIMARY KEY (id) NOT ENFORCED " + >> " ) " + >> " with ( " + >> "'connector' = 'print' " + >> " )"); >> tableEnvironment.executeSql("insert into print_sink select id,game_id from >> mysql_source"); |
你的运行环境是啥?能提供一下相关配置吗?
chenxuying <[hidden email]> 于2020年8月4日周二 下午2:46写道: > 你好,请问下我修改后的语句是 > insert into print_sink select game_id,count(id) from mysql_source group by > game_id > 然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下 > 2> +I(12,1) > 5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2) > 6> -U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4) > > > 然后如果我使用的是batchMode,他就报错了 > org.apache.flink.util.FlinkException: Error while shutting the > TaskExecutor down. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425) > ... > Caused by: java.util.concurrent.CompletionException: > java.lang.NoClassDefFoundError: Could not initialize class > org.apache.flink.util.JavaGcCleanerWrapper > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > ... > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422) > ... 21 more > Suppressed: org.apache.flink.util.FlinkException: Could not properly shut > down the TaskManager services. > at > org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462) > at... > ... 21 more > Caused by: org.apache.flink.util.FlinkException: Could not close resource. > at > org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42) > at > org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204) > ... 37 more > Caused by: java.lang.NoClassDefFoundError: Could not initialize class > org.apache.flink.util.JavaGcCleanerWrapper > at > org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94) > at > org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64) > ... > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422) > ... 21 more > [CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize > class org.apache.flink.util.JavaGcCleanerWrapper] > > > 不知道您是否知道原因 > > > 在 2020-08-04 12:11:32,"godfrey he" <[hidden email]> 写道: > >逻辑上批产生的结果是Table,流产生的结果是Changelog。 > >你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。 > >最简单的方式可以将query改为带group by的,再看结果的差异。 > >更多关于Table和Changelog的概念可以参考 [1] > > > >[1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html > > > >chenxuying <[hidden email]> 于2020年8月4日周二 上午11:44写道: > > > >> hi : > >> flink table sql 1.11.0 > >> 在EnvironmentSettings中可以设置BatchMode或StreamingMode > >> > >> > >> EnvironmentSettings environmentSettings = > EnvironmentSettings.newInstance() > >> // .inStreamingMode() > >> .inBatchMode() > >> .build(); > >> > >> > >> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 , > >> 不知道大佬们有没有例子可以比较容易理解 > >> 我的代码 > >> EnvironmentSettings environmentSettings = > EnvironmentSettings.newInstance() > >> // .inStreamingMode() > >> .inBatchMode() > >> .build(); > >> TableEnvironment tableEnvironment = > >> TableEnvironment.create(environmentSettings); > >> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " + > >> " id bigint, " + > >> " game_id varchar, " + > >> " PRIMARY KEY (id) NOT ENFORCED " + > >> " ) " + > >> " with ( " + > >> "'connector' = 'jdbc', " + > >> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + > >> " 'username' = 'root' , " + > >> " 'password' = 'root', " + > >> " 'table-name' = 'mysqlsink' , " + > >> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " + > >> " 'sink.buffer-flush.interval' = '2s', " + > >> " 'sink.buffer-flush.max-rows' = '300' " + > >> " )"); > >> tableEnvironment.executeSql("CREATE TABLE print_sink ( " + > >> " id bigint, " + > >> " game_id varchar, " + > >> " PRIMARY KEY (id) NOT ENFORCED " + > >> " ) " + > >> " with ( " + > >> "'connector' = 'print' " + > >> " )"); > >> tableEnvironment.executeSql("insert into print_sink select id,game_id > from > >> mysql_source"); > |
我的是在win10+idea上开发测试的, 然后同一个项目代码, 在我同事电脑上却可以正常运行, 不知道是不是系统
在 2020-08-04 17:19:48,"godfrey he" <[hidden email]> 写道: >你的运行环境是啥?能提供一下相关配置吗? > >chenxuying <[hidden email]> 于2020年8月4日周二 下午2:46写道: > >> 你好,请问下我修改后的语句是 >> insert into print_sink select game_id,count(id) from mysql_source group by >> game_id >> 然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下 >> 2> +I(12,1) >> 5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2) >> 6> -U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4) >> >> >> 然后如果我使用的是batchMode,他就报错了 >> org.apache.flink.util.FlinkException: Error while shutting the >> TaskExecutor down. >> at >> org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440) >> at >> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425) >> ... >> Caused by: java.util.concurrent.CompletionException: >> java.lang.NoClassDefFoundError: Could not initialize class >> org.apache.flink.util.JavaGcCleanerWrapper >> at >> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) >> ... >> at >> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422) >> ... 21 more >> Suppressed: org.apache.flink.util.FlinkException: Could not properly shut >> down the TaskManager services. >> at >> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236) >> at >> org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462) >> at... >> ... 21 more >> Caused by: org.apache.flink.util.FlinkException: Could not close resource. >> at >> org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42) >> at >> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204) >> ... 37 more >> Caused by: java.lang.NoClassDefFoundError: Could not initialize class >> org.apache.flink.util.JavaGcCleanerWrapper >> at >> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94) >> at >> org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64) >> ... >> at >> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422) >> ... 21 more >> [CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize >> class org.apache.flink.util.JavaGcCleanerWrapper] >> >> >> 不知道您是否知道原因 >> >> >> 在 2020-08-04 12:11:32,"godfrey he" <[hidden email]> 写道: >> >逻辑上批产生的结果是Table,流产生的结果是Changelog。 >> >你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。 >> >最简单的方式可以将query改为带group by的,再看结果的差异。 >> >更多关于Table和Changelog的概念可以参考 [1] >> > >> >[1] >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html >> > >> >chenxuying <[hidden email]> 于2020年8月4日周二 上午11:44写道: >> > >> >> hi : >> >> flink table sql 1.11.0 >> >> 在EnvironmentSettings中可以设置BatchMode或StreamingMode >> >> >> >> >> >> EnvironmentSettings environmentSettings = >> EnvironmentSettings.newInstance() >> >> // .inStreamingMode() >> >> .inBatchMode() >> >> .build(); >> >> >> >> >> >> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 , >> >> 不知道大佬们有没有例子可以比较容易理解 >> >> 我的代码 >> >> EnvironmentSettings environmentSettings = >> EnvironmentSettings.newInstance() >> >> // .inStreamingMode() >> >> .inBatchMode() >> >> .build(); >> >> TableEnvironment tableEnvironment = >> >> TableEnvironment.create(environmentSettings); >> >> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " + >> >> " id bigint, " + >> >> " game_id varchar, " + >> >> " PRIMARY KEY (id) NOT ENFORCED " + >> >> " ) " + >> >> " with ( " + >> >> "'connector' = 'jdbc', " + >> >> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + >> >> " 'username' = 'root' , " + >> >> " 'password' = 'root', " + >> >> " 'table-name' = 'mysqlsink' , " + >> >> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " + >> >> " 'sink.buffer-flush.interval' = '2s', " + >> >> " 'sink.buffer-flush.max-rows' = '300' " + >> >> " )"); >> >> tableEnvironment.executeSql("CREATE TABLE print_sink ( " + >> >> " id bigint, " + >> >> " game_id varchar, " + >> >> " PRIMARY KEY (id) NOT ENFORCED " + >> >> " ) " + >> >> " with ( " + >> >> "'connector' = 'print' " + >> >> " )"); >> >> tableEnvironment.executeSql("insert into print_sink select id,game_id >> from >> >> mysql_source"); >> |
Free forum by Nabble | Edit this page |