你好! 在flink 1.12版本之后,flink实现真正的流批一体模式,推荐使用DataStream API,然后通过设置运行模式为RuntimeExecutionMode.BATCH,用于执行批处理。
如下程序示例,这两种批模式的运行有着不同的方式和性能,请问在流批一体的模式下,如何做到真正的批处理? package com.meritdata.cloud.tempo.dw.flink.test.bug; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class JDBCTest { public static void main(String[] args) { test(); /** * 使用批环境 * EnvironmentSettings bbSettings = EnvironmentSettings.newInstance() * .useBlinkPlanner().inBatchMode().build(); * TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); * +--------------------------------+----------------------+ * | a | EXPR$1 | * +--------------------------------+----------------------+ * | 2 | 1 | * | 3 | 2 | * | 1 | 2 | * | 4 | 1 | * +--------------------------------+----------------------+ */ // test1(); /** * 使用流API的批模式 * StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); * streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); * StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv); * +----+--------------------------------+----------------------+ * | op | a | EXPR$1 | * +----+--------------------------------+----------------------+ * | +I | 2 | 1 | * | +I | 1 | 1 | * | +I | 4 | 1 | * | -U | 1 | 1 | * | +U | 1 | 2 | * | +I | 3 | 1 | * | -U | 3 | 1 | * | +U | 3 | 2 | * +----+--------------------------------+----------------------+ */ } public static void test() { EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); bbTableEnv.executeSql("CREATE TABLE ab (" + " a STRING, " + " b INT " + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," + " 'username' = 'root'," + " 'password' = 'root'," + " 'table-name' = 'ab'" + " )"); bbTableEnv.sqlQuery("select a, count(b) from ab group by a").execute().print(); } public static void test1() { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv); tableEnv.executeSql("CREATE TABLE ab (" + " a STRING, " + " b INT " + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://localhost:3306/a?serverTimezone=UTC'," + " 'username' = 'root'," + " 'password' = 'root'," + " 'table-name' = 'ab'" + " )"); tableEnv.sqlQuery("select a, count(b) from ab group by a").execute().print(); } } |
Free forum by Nabble | Edit this page |