有界数据中batch和stream的区别

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

有界数据中batch和stream的区别

cxydevelop@163.com
hi :
flink table sql 1.11.0
在EnvironmentSettings中可以设置BatchMode或StreamingMode


EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
//                .inStreamingMode()
                .inBatchMode()
                .build();


如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,  不知道大佬们有没有例子可以比较容易理解
我的代码
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
//                .inStreamingMode()
                .inBatchMode()
                .build();
TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);
tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
"     id bigint, " +
"  game_id varchar, " +
"  PRIMARY KEY (id) NOT ENFORCED      " +
" )  " +
" with ( " +
"'connector' = 'jdbc',  " +
" 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
" 'username' = 'root' , " +
" 'password' = 'root', " +
" 'table-name' = 'mysqlsink' , " +
" 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
" 'sink.buffer-flush.interval' = '2s', " +
" 'sink.buffer-flush.max-rows' = '300' " +
" )");
tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
"     id bigint, " +
"  game_id varchar, " +
"  PRIMARY KEY (id) NOT ENFORCED      " +
" )  " +
" with ( " +
"'connector' = 'print'  " +
" )");
tableEnvironment.executeSql("insert into print_sink select id,game_id from mysql_source");
Reply | Threaded
Open this post in threaded view
|

Re: 有界数据中batch和stream的区别

godfrey he
逻辑上批产生的结果是Table,流产生的结果是Changelog。
你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
最简单的方式可以将query改为带group by的,再看结果的差异。
更多关于Table和Changelog的概念可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html

chenxuying <[hidden email]> 于2020年8月4日周二 上午11:44写道:

> hi :
> flink table sql 1.11.0
> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
>
>
> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
> //                .inStreamingMode()
>                 .inBatchMode()
>                 .build();
>
>
> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
> 不知道大佬们有没有例子可以比较容易理解
> 我的代码
> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
> //                .inStreamingMode()
>                 .inBatchMode()
>                 .build();
> TableEnvironment tableEnvironment =
> TableEnvironment.create(environmentSettings);
> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
> "     id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED      " +
> " )  " +
> " with ( " +
> "'connector' = 'jdbc',  " +
> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
> " 'username' = 'root' , " +
> " 'password' = 'root', " +
> " 'table-name' = 'mysqlsink' , " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
> " 'sink.buffer-flush.interval' = '2s', " +
> " 'sink.buffer-flush.max-rows' = '300' " +
> " )");
> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
> "     id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED      " +
> " )  " +
> " with ( " +
> "'connector' = 'print'  " +
> " )");
> tableEnvironment.executeSql("insert into print_sink select id,game_id from
> mysql_source");
Reply | Threaded
Open this post in threaded view
|

Re:Re: 有界数据中batch和stream的区别

cxydevelop@163.com
你好,请问下我修改后的语句是
insert into print_sink select game_id,count(id) from mysql_source group by game_id
然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下
2> +I(12,1)
5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2) 6> -U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4)


然后如果我使用的是batchMode,他就报错了
org.apache.flink.util.FlinkException: Error while shutting the TaskExecutor down.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440)
at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425)
...
Caused by: java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.util.JavaGcCleanerWrapper
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
...
at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
... 21 more
Suppressed: org.apache.flink.util.FlinkException: Could not properly shut down the TaskManager services.
at org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236)
at org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462)
at...
... 21 more
Caused by: org.apache.flink.util.FlinkException: Could not close resource.
at org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)
at org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204)
... 37 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.util.JavaGcCleanerWrapper
at org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94)
at org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64)
...
at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
... 21 more
[CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.util.JavaGcCleanerWrapper]


不知道您是否知道原因


在 2020-08-04 12:11:32,"godfrey he" <[hidden email]> 写道:

>逻辑上批产生的结果是Table,流产生的结果是Changelog。
>你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
>最简单的方式可以将query改为带group by的,再看结果的差异。
>更多关于Table和Changelog的概念可以参考 [1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
>
>chenxuying <[hidden email]> 于2020年8月4日周二 上午11:44写道:
>
>> hi :
>> flink table sql 1.11.0
>> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
>>
>>
>> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
>> //                .inStreamingMode()
>>                 .inBatchMode()
>>                 .build();
>>
>>
>> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
>> 不知道大佬们有没有例子可以比较容易理解
>> 我的代码
>> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
>> //                .inStreamingMode()
>>                 .inBatchMode()
>>                 .build();
>> TableEnvironment tableEnvironment =
>> TableEnvironment.create(environmentSettings);
>> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
>> "     id bigint, " +
>> "  game_id varchar, " +
>> "  PRIMARY KEY (id) NOT ENFORCED      " +
>> " )  " +
>> " with ( " +
>> "'connector' = 'jdbc',  " +
>> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
>> " 'username' = 'root' , " +
>> " 'password' = 'root', " +
>> " 'table-name' = 'mysqlsink' , " +
>> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
>> " 'sink.buffer-flush.interval' = '2s', " +
>> " 'sink.buffer-flush.max-rows' = '300' " +
>> " )");
>> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
>> "     id bigint, " +
>> "  game_id varchar, " +
>> "  PRIMARY KEY (id) NOT ENFORCED      " +
>> " )  " +
>> " with ( " +
>> "'connector' = 'print'  " +
>> " )");
>> tableEnvironment.executeSql("insert into print_sink select id,game_id from
>> mysql_source");
Reply | Threaded
Open this post in threaded view
|

Re: Re: 有界数据中batch和stream的区别

godfrey he
你的运行环境是啥?能提供一下相关配置吗?

chenxuying <[hidden email]> 于2020年8月4日周二 下午2:46写道:

> 你好,请问下我修改后的语句是
> insert into print_sink select game_id,count(id) from mysql_source group by
> game_id
> 然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下
> 2> +I(12,1)
> 5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2)
> 6> -U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4)
>
>
> 然后如果我使用的是batchMode,他就报错了
> org.apache.flink.util.FlinkException: Error while shutting the
> TaskExecutor down.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425)
> ...
> Caused by: java.util.concurrent.CompletionException:
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.util.JavaGcCleanerWrapper
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> ...
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
> ... 21 more
> Suppressed: org.apache.flink.util.FlinkException: Could not properly shut
> down the TaskManager services.
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462)
> at...
> ... 21 more
> Caused by: org.apache.flink.util.FlinkException: Could not close resource.
> at
> org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204)
> ... 37 more
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.util.JavaGcCleanerWrapper
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94)
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64)
> ...
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
> ... 21 more
> [CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize
> class org.apache.flink.util.JavaGcCleanerWrapper]
>
>
> 不知道您是否知道原因
>
>
> 在 2020-08-04 12:11:32,"godfrey he" <[hidden email]> 写道:
> >逻辑上批产生的结果是Table,流产生的结果是Changelog。
> >你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
> >最简单的方式可以将query改为带group by的,再看结果的差异。
> >更多关于Table和Changelog的概念可以参考 [1]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
> >
> >chenxuying <[hidden email]> 于2020年8月4日周二 上午11:44写道:
> >
> >> hi :
> >> flink table sql 1.11.0
> >> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
> >>
> >>
> >> EnvironmentSettings environmentSettings =
> EnvironmentSettings.newInstance()
> >> //                .inStreamingMode()
> >>                 .inBatchMode()
> >>                 .build();
> >>
> >>
> >> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
> >> 不知道大佬们有没有例子可以比较容易理解
> >> 我的代码
> >> EnvironmentSettings environmentSettings =
> EnvironmentSettings.newInstance()
> >> //                .inStreamingMode()
> >>                 .inBatchMode()
> >>                 .build();
> >> TableEnvironment tableEnvironment =
> >> TableEnvironment.create(environmentSettings);
> >> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
> >> "     id bigint, " +
> >> "  game_id varchar, " +
> >> "  PRIMARY KEY (id) NOT ENFORCED      " +
> >> " )  " +
> >> " with ( " +
> >> "'connector' = 'jdbc',  " +
> >> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
> >> " 'username' = 'root' , " +
> >> " 'password' = 'root', " +
> >> " 'table-name' = 'mysqlsink' , " +
> >> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
> >> " 'sink.buffer-flush.interval' = '2s', " +
> >> " 'sink.buffer-flush.max-rows' = '300' " +
> >> " )");
> >> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
> >> "     id bigint, " +
> >> "  game_id varchar, " +
> >> "  PRIMARY KEY (id) NOT ENFORCED      " +
> >> " )  " +
> >> " with ( " +
> >> "'connector' = 'print'  " +
> >> " )");
> >> tableEnvironment.executeSql("insert into print_sink select id,game_id
> from
> >> mysql_source");
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: 有界数据中batch和stream的区别

cxydevelop@163.com
我的是在win10+idea上开发测试的, 然后同一个项目代码, 在我同事电脑上却可以正常运行, 不知道是不是系统




在 2020-08-04 17:19:48,"godfrey he" <[hidden email]> 写道:

>你的运行环境是啥?能提供一下相关配置吗?
>
>chenxuying <[hidden email]> 于2020年8月4日周二 下午2:46写道:
>
>> 你好,请问下我修改后的语句是
>> insert into print_sink select game_id,count(id) from mysql_source group by
>> game_id
>> 然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下
>> 2> +I(12,1)
>> 5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2)
>> 6> -U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4)
>>
>>
>> 然后如果我使用的是batchMode,他就报错了
>> org.apache.flink.util.FlinkException: Error while shutting the
>> TaskExecutor down.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425)
>> ...
>> Caused by: java.util.concurrent.CompletionException:
>> java.lang.NoClassDefFoundError: Could not initialize class
>> org.apache.flink.util.JavaGcCleanerWrapper
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>> ...
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
>> ... 21 more
>> Suppressed: org.apache.flink.util.FlinkException: Could not properly shut
>> down the TaskManager services.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462)
>> at...
>> ... 21 more
>> Caused by: org.apache.flink.util.FlinkException: Could not close resource.
>> at
>> org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204)
>> ... 37 more
>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
>> org.apache.flink.util.JavaGcCleanerWrapper
>> at
>> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94)
>> at
>> org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64)
>> ...
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
>> ... 21 more
>> [CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize
>> class org.apache.flink.util.JavaGcCleanerWrapper]
>>
>>
>> 不知道您是否知道原因
>>
>>
>> 在 2020-08-04 12:11:32,"godfrey he" <[hidden email]> 写道:
>> >逻辑上批产生的结果是Table,流产生的结果是Changelog。
>> >你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
>> >最简单的方式可以将query改为带group by的,再看结果的差异。
>> >更多关于Table和Changelog的概念可以参考 [1]
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
>> >
>> >chenxuying <[hidden email]> 于2020年8月4日周二 上午11:44写道:
>> >
>> >> hi :
>> >> flink table sql 1.11.0
>> >> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
>> >>
>> >>
>> >> EnvironmentSettings environmentSettings =
>> EnvironmentSettings.newInstance()
>> >> //                .inStreamingMode()
>> >>                 .inBatchMode()
>> >>                 .build();
>> >>
>> >>
>> >> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
>> >> 不知道大佬们有没有例子可以比较容易理解
>> >> 我的代码
>> >> EnvironmentSettings environmentSettings =
>> EnvironmentSettings.newInstance()
>> >> //                .inStreamingMode()
>> >>                 .inBatchMode()
>> >>                 .build();
>> >> TableEnvironment tableEnvironment =
>> >> TableEnvironment.create(environmentSettings);
>> >> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
>> >> "     id bigint, " +
>> >> "  game_id varchar, " +
>> >> "  PRIMARY KEY (id) NOT ENFORCED      " +
>> >> " )  " +
>> >> " with ( " +
>> >> "'connector' = 'jdbc',  " +
>> >> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
>> >> " 'username' = 'root' , " +
>> >> " 'password' = 'root', " +
>> >> " 'table-name' = 'mysqlsink' , " +
>> >> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
>> >> " 'sink.buffer-flush.interval' = '2s', " +
>> >> " 'sink.buffer-flush.max-rows' = '300' " +
>> >> " )");
>> >> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
>> >> "     id bigint, " +
>> >> "  game_id varchar, " +
>> >> "  PRIMARY KEY (id) NOT ENFORCED      " +
>> >> " )  " +
>> >> " with ( " +
>> >> "'connector' = 'print'  " +
>> >> " )");
>> >> tableEnvironment.executeSql("insert into print_sink select id,game_id
>> from
>> >> mysql_source");
>>