现在我使用flink cdc
读取mysql的binlog,然后发送至kafak,使用flink读取kafka消息,最后写入hive中,但是现在我在向yarn提交代码的时候,发现提交了两个job,但是,两个job执行的都是insert into kafka.order_info;一直不执行insert into ods.order_info;程序目前也没有任何报错!代码如下,是我提交job的姿势不对吗,还是什么其他的问题?提交命令:flink run -m yarn-client -ynm mysql-cdc-2-hive -ys 3 -yjm 4g -ytm 8g -c com.zallsteel.flink.app.log.MySQLCDC2HiveApp -d /opt/tools/flink-1.12.0/zallsteel-realtime-etl-1.0-SNAPSHOT.jar package com.zallsteel.flink.app.log; import com.google.gson.Gson; import com.zallsteel.flink.entity.ChangelogVO; import com.zallsteel.flink.utils.ConfigUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import java.text.ParseException; import java.time.Duration; import java.util.Properties; /** * flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java */ public class MySQLCDC2HiveApp { public static void main(String[] args) { //获取执行环节 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发 env.setParallelism(6); //设置checkpoint env.enableCheckpointing(60000); env.getConfig().setAutoWatermarkInterval(200); // 设置Flink SQL环境 EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // 创建table Env StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableEnvSettings); // 设置checkpoint 模型 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); // 设置checkpoint间隔 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1)); // 指定catalog名称 String catalogName = "devHive"; // 创建HiveCatalog HiveCatalog hiveCatalog = new HiveCatalog( catalogName, "default", "/etc/hive/conf" ); //注册 Hive Catalog tableEnv.registerCatalog(catalogName,hiveCatalog); //使用hive Catalog tableEnv.useCatalog(catalogName); //创建mysql cdc 数据源 tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); // 创建mysql cdc 数据表 tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info"); tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" + " id BIGINT,\n" + " user_id BIGINT,\n" + " create_time TIMESTAMP,\n" + " operate_time TIMESTAMP,\n" + " province_id INT,\n" + " order_status STRING,\n" + " total_amount DECIMAL(10, 5)\n" + " ) WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'hdp-xxx-dev-node01',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = 'phkC4DE4dM28$PUD',\n" + " 'database-name' = 'cdc_test',\n" + " 'table-name' = 'order_info'\n" + ")"); // 创建kafka source tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info"); tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" + "id BIGINT,\n" + "user_id BIGINT,\n" + "create_time TIMESTAMP,\n" + "operate_time TIMESTAMP,\n" + "province_id INT,\n" + "order_status STRING,\n" + "total_amount DECIMAL(10, 5)\n" + ") WITH (\n" + "'connector' = 'kafka',\n" + "'topic' = 'order_info',\n" + "'scan.startup.mode' = 'earliest-offset',\n" + "'properties.bootstrap.servers' = 'hdp-xxx-dev-node03:9092',\n" + "'format' = 'changelog-json'\n" + ")"); // 向kafka表中插入数据 tableEnv.executeSql("INSERT INTO kafka.order_info\n" + "SELECT id, user_id, create_time, operate_time,province_id,order_status,total_amount\n" + "FROM cdc.order_info"); // 自定义带op字段的stream Properties kafkaConfig = ConfigUtils.getKafkaConfig(); FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>( "order_info", new SimpleStringSchema(), kafkaConfig ).setStartFromEarliest(); DataStreamSource<String> streamSource = env.addSource(consumer); String[] fieldNames = {"id","user_id","create_time","operate_time","province_id","order_status","total_amount","op"}; TypeInformation[] types = {Types.LONG,Types.LONG,Types.STRING,Types.STRING,Types.INT,Types.INT,Types.DOUBLE,Types.STRING}; SingleOutputStreamOperator<Row> ds2 = streamSource.map(new MapFunction<String, Row>() { @Override public Row map(String value) throws Exception { Gson gson = new Gson(); ChangelogVO changelogVO = gson.fromJson(value, ChangelogVO.class); String op = changelogVO.getOp(); int arity = fieldNames.length; Row row = new Row(arity); row.setField(0, changelogVO.getData().getId()); row.setField(1, changelogVO.getData().getUserId()); row.setField(2, changelogVO.getData().getCreateTime()); row.setField(3, changelogVO.getData().getOperateTime()); row.setField(4, changelogVO.getData().getProviceId()); row.setField(5, changelogVO.getData().getOrderStatus()); row.setField(6, changelogVO.getData().getTotalAmount()); String operation = getOperation(op); row.setField(7, operation); return row; } private String getOperation(String op) { String operation = "INSERT"; for (RowKind rk : RowKind.values()) { if (rk.shortString().equals(op)) { switch (rk) { case UPDATE_BEFORE: operation = "UPDATE-BEFORE"; break; case UPDATE_AFTER: operation = "UPDATE-AFTER"; break; case DELETE: operation = "DELETE"; break; case INSERT: default: operation = "INSERT"; break; } break; } } return operation; } }, new RowTypeInfo(types, fieldNames)); // 设置水印 SingleOutputStreamOperator<Row> ds3 = ds2.assignTimestampsAndWatermarks( WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<Row>() { @Override public long extractTimestamp(Row element, long recordTimestamp) { String create_time = (String) element.getField(2); FastDateFormat dateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss"); try { long time = dateFormat.parse(create_time).getTime(); return time; } catch (ParseException e) { e.printStackTrace(); } return 0; } }) ); tableEnv.createTemporaryView("merged_order_info", ds3); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); tableEnv.executeSql("DROP TABLE IF EXISTS ods.order_info"); tableEnv.executeSql("CREATE TABLE ods.order_info (\n" + " id BIGINT,\n" + " user_id BIGINT,\n" + " create_time STRING,\n" + " operate_time STRING,\n" + " province_id INT,\n" + " order_status INT,\n" + " total_amount DOUBLE,\n" + " op STRING \n" + ") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (\n" + " 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + " 'sink.partition-commit.delay'='1 min',\n" + " 'sink.partition-commit.policy.kind'='metastore,success-file'\n" + ")"); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tableEnv.executeSql("INSERT INTO ods.order_info\n" + "SELECT \n" + "id,\n" + "user_id,\n" + "create_time,\n" + "operate_time,\n" + "province_id,\n" + "order_status,\n" + "total_amount,\n" + "op,\n" + "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd') as dt,\n" + "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd HH:mm:ss'),'HH') as hr\n" + "FROM merged_order_info" ); } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
目前我所了解的Flink SQL 触发任务提交的是execcuteSql()中的insert into
以及StatementSet.execute()。我之前将两个insert into 分为两个executeSql进行提交,结果结果出现上述效果,修改成StatementSet.execute()之后得到了想要的效果,不太清楚这两种有什么区别?暂记录一下,待以后分析。小白学Flink真是好多坑 package com.zallsteel.flink.app.log; import com.google.gson.Gson; import com.zallsteel.flink.utils.ConfigUtils; import com.google.gson.Gson; import com.zallsteel.flink.entity.ChangelogVO; import com.zallsteel.flink.utils.ConfigUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import java.text.ParseException; import java.time.Duration; import java.util.Properties; import java.util.concurrent.CompletableFuture; /** * @author Jackie Zhu * @time 2021-01-13 16:50:18 * @desc 测试MySQLCDC to Hive flink run \ -m yarn-cluster \ -ys 2 \ -yjm 2g \ -ytm 4g \ -c com.zallsteel.flink.app.log.MySQLCDC2HiveApp \ -d \ /opt/tools/flink-1.12.0/zallsteel-realtime-etl-1.0-SNAPSHOT.jar */ public class MySQLCDC2HiveApp { public static void main(String[] args) { //获取执行环节 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发 env.setParallelism(6); //设置checkpoint env.enableCheckpointing(60000); env.getConfig().setAutoWatermarkInterval(200); // 设置Flink SQL环境 EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // 创建table Env StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableEnvSettings); // 设置checkpoint 模型 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); // 设置checkpoint间隔 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1)); // 指定catalog名称 String catalogName = "devHive"; // 创建HiveCatalog HiveCatalog hiveCatalog = new HiveCatalog(catalogName, "default", "/etc/hive/conf", "/etc/hadoop/conf", "3.1.2" ); //注册 Hive Catalog tableEnv.registerCatalog(catalogName,hiveCatalog); //使用hive Catalog tableEnv.useCatalog(catalogName); //创建mysql cdc 数据源 tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); // 创建mysql cdc 数据表 tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info"); tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" + " id BIGINT,\n" + " user_id BIGINT,\n" + " create_time TIMESTAMP,\n" + " operate_time TIMESTAMP,\n" + " province_id INT,\n" + " order_status STRING,\n" + " total_amount DECIMAL(10, 5)\n" + " ) WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'hdp-xxx-dev-node01',\n" + " 'port' = '3306',\n" + " 'username' = 'xxx',\n" + " 'password' = 'xxxx',\n" + " 'database-name' = 'cdc_test',\n" + " 'table-name' = 'order_info'\n" + ")"); // 创建kafka source tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info"); tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" + "id BIGINT,\n" + "user_id BIGINT,\n" + "create_time TIMESTAMP,\n" + "operate_time TIMESTAMP,\n" + "province_id INT,\n" + "order_status STRING,\n" + "total_amount DECIMAL(10, 5)\n" + ") WITH (\n" + "'connector' = 'kafka',\n" + "'topic' = 'order_info',\n" + "'scan.startup.mode' = 'earliest-offset',\n" + "'properties.bootstrap.servers' = 'hdp-xxx-dev-node03:9092,hdp-xxx-dev-node04:9092,hdp-xxx-dev-node05:9092',\n" + "'format' = 'changelog-json'\n" + ")"); // 向kafka表中插入数据 // tableEnv.executeSql("INSERT INTO kafka.order_info\n" + // "SELECT id, user_id, create_time, operate_time,province_id,order_status,total_amount\n" + // "FROM cdc.order_info"); // 自定义带op字段的stream Properties kafkaConfig = ConfigUtils.getKafkaConfig(); FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>( "order_info", new SimpleStringSchema(), kafkaConfig ).setStartFromEarliest(); DataStreamSource<String> streamSource = env.addSource(consumer); String[] fieldNames = {"id","user_id","create_time","operate_time","province_id","order_status","total_amount","op"}; TypeInformation[] types = {Types.LONG,Types.LONG,Types.STRING,Types.STRING,Types.INT,Types.INT,Types.DOUBLE,Types.STRING}; SingleOutputStreamOperator<Row> ds2 = streamSource.map(new MapFunction<String, Row>() { @Override public Row map(String value) throws Exception { Gson gson = new Gson(); ChangelogVO changelogVO = gson.fromJson(value, ChangelogVO.class); String op = changelogVO.getOp(); int arity = fieldNames.length; Row row = new Row(arity); row.setField(0, changelogVO.getData().getId()); row.setField(1, changelogVO.getData().getUserId()); row.setField(2, changelogVO.getData().getCreateTime()); row.setField(3, changelogVO.getData().getOperateTime()); row.setField(4, changelogVO.getData().getProviceId()); row.setField(5, changelogVO.getData().getOrderStatus()); row.setField(6, changelogVO.getData().getTotalAmount()); String operation = getOperation(op); row.setField(7, operation); return row; } private String getOperation(String op) { String operation = "INSERT"; for (RowKind rk : RowKind.values()) { if (rk.shortString().equals(op)) { switch (rk) { case UPDATE_BEFORE: operation = "UPDATE-BEFORE"; break; case UPDATE_AFTER: operation = "UPDATE-AFTER"; break; case DELETE: operation = "DELETE"; break; case INSERT: default: operation = "INSERT"; break; } break; } } return operation; } }, new RowTypeInfo(types, fieldNames)); // 设置水印 SingleOutputStreamOperator<Row> ds3 = ds2.assignTimestampsAndWatermarks( WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<Row>() { @Override public long extractTimestamp(Row element, long recordTimestamp) { String create_time = (String) element.getField(2); FastDateFormat dateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss"); try { long time = dateFormat.parse(create_time).getTime(); return time; } catch (ParseException e) { e.printStackTrace(); } return 0; } }) ); tableEnv.createTemporaryView("merged_order_info", ds3); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); tableEnv.executeSql("DROP TABLE IF EXISTS ods.order_info"); tableEnv.executeSql("CREATE TABLE ods.order_info (\n" + " id BIGINT,\n" + " user_id BIGINT,\n" + " create_time STRING,\n" + " operate_time STRING,\n" + " province_id INT,\n" + " order_status INT,\n" + " total_amount DOUBLE,\n" + " op STRING \n" + ") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (\n" + " 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + " 'sink.partition-commit.delay'='1 min',\n" + " 'sink.partition-commit.policy.kind'='metastore,success-file'\n" + ")"); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); *StatementSet statementSet = tableEnv.createStatementSet(); statementSet.addInsertSql("INSERT INTO kafka.order_info\n" + "SELECT id, user_id, create_time, operate_time,province_id,order_status,total_amount\n" + "FROM cdc.order_info") .addInsertSql("INSERT INTO ods.order_info\n" + "SELECT " + "id," + "user_id," + "create_time," + "operate_time," + "province_id," + "order_status," + "total_amount," + "op," + "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd') as dt," + "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd HH:mm:ss'),'HH') as hr " + "FROM merged_order_info");* CompletableFuture<JobStatus> jobStatus = statementSet.execute().getJobClient().get().getJobStatus(); System.out.println(jobStatus); // TableResult tableResult = tableEnv.executeSql( // ); } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |