确认数据源有数据,全部代码如下,但是hive就是没有数据
package com.hive; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import java.time.Duration; public class HiveTest { private static final String path = "hdfs_path"; public static void main(String []args) { System.setProperty("HADOOP_USER_NAME", "work"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 同一时间只允许进行一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.setStateBackend(new FsStateBackend(path)); EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,tableEnvSettings); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20)); String name = "myhive"; String defaultDatabase = "situation"; String hiveConfDir = "/load/data/hive/hive-conf"; // a local path String version = "1.2.1"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog("myhive", hive); // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive"); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation"); tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table"); tableEnv.executeSql("CREATE TABLE situation.source_table (\n" + "\thost STRING,\n" + "\turl STRING,\n" + "\tpublic_date STRING\n" + ") WITH (\n" + "\t'connector.type' = 'kafka',\n" + "\t'connector.version' = 'universal',\n" + "\t'connector.startup-mode' = 'latest-offset',\n" + "\t'connector.topic' = 'sendMessage',\n" + "\t'connector.properties.group.id' = 'domain_testGroup',\n" + "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" + "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" + "\t'update-mode' = 'append',\n" + "\t'format.type' = 'json',\n" + "\t'format.derive-schema' = 'true'\n" + ")"); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table"); String hiveSql = "\n" + " CREATE TABLE situation.fs_table (\n" + " \n" + " host STRING,\n" + " url STRING,\n" + " public_date STRING\n" + " \n" + " ) PARTITIONED BY (\n" + " ts_date STRING,\n" + " ts_hour STRING,\n" + " ts_minute STRING\n" + " ) STORED AS PARQUET\n" + " TBLPROPERTIES (\n" + " 'sink.partition-commit.trigger' = 'process time',\n" + " 'sink.partition-commit.delay' = '1 min',\n" + " 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" + " 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'\n" + " )\n" + " "; tableEnv.executeSql(hiveSql); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tableEnv.executeSql("INSERT INTO situation.fs_table SELECT host, url,public_date," + " DATE_FORMAT(public_date,'yyyy-MM-dd') ,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm') FROM situation.source_table"); } } |
有什么异常信息吗 在 2020-07-29 14:07:26,"kcz" <[hidden email]> 写道: >确认数据源有数据,全部代码如下,但是hive就是没有数据 > >package com.hive; > >import org.apache.flink.runtime.state.filesystem.FsStateBackend; >import org.apache.flink.streaming.api.CheckpointingMode; >import org.apache.flink.streaming.api.TimeCharacteristic; >import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.table.api.EnvironmentSettings; >import org.apache.flink.table.api.SqlDialect; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >import org.apache.flink.table.catalog.hive.HiveCatalog; > >import java.time.Duration; > >public class HiveTest { > private static final String path = "hdfs_path"; > public static void main(String []args) { > System.setProperty("HADOOP_USER_NAME", "work"); > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); > // 同一时间只允许进行一个检查点 > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > > env.setStateBackend(new FsStateBackend(path)); > EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,tableEnvSettings); > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20)); > > String name = "myhive"; > String defaultDatabase = "situation"; > String hiveConfDir = "/load/data/hive/hive-conf"; // a local path > String version = "1.2.1"; > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); > tableEnv.registerCatalog("myhive", hive); > >// set the HiveCatalog as the current catalog of the session > tableEnv.useCatalog("myhive"); > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation"); > tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table"); > > > tableEnv.executeSql("CREATE TABLE situation.source_table (\n" + > "\thost STRING,\n" + > "\turl STRING,\n" + > "\tpublic_date STRING\n" + > ") WITH (\n" + > "\t'connector.type' = 'kafka',\n" + > "\t'connector.version' = 'universal',\n" + > "\t'connector.startup-mode' = 'latest-offset',\n" + > "\t'connector.topic' = 'sendMessage',\n" + > "\t'connector.properties.group.id' = 'domain_testGroup',\n" + > "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" + > "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" + > "\t'update-mode' = 'append',\n" + > "\t'format.type' = 'json',\n" + > "\t'format.derive-schema' = 'true'\n" + > ")"); > > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table"); > > String hiveSql = "\n" + > " CREATE TABLE situation.fs_table (\n" + > " \n" + > " host STRING,\n" + > " url STRING,\n" + > " public_date STRING\n" + > " \n" + > " ) PARTITIONED BY (\n" + > " ts_date STRING,\n" + > " ts_hour STRING,\n" + > " ts_minute STRING\n" + > " ) STORED AS PARQUET\n" + > " TBLPROPERTIES (\n" + > " 'sink.partition-commit.trigger' = 'process time',\n" + > " 'sink.partition-commit.delay' = '1 min',\n" + > " 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" + > " 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'\n" + > " )\n" + > " "; > tableEnv.executeSql(hiveSql); > > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > tableEnv.executeSql("INSERT INTO situation.fs_table SELECT host, url,public_date," + > " DATE_FORMAT(public_date,'yyyy-MM-dd') ,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm') FROM situation.source_table"); > > > > } >} |
In reply to this post by kcz
Hi, kcz
看connector的properties还是1.10的格式,你换成1.11试试[1]. > 在 2020年7月29日,15:07,kcz <[hidden email]> 写道: > > tableEnv.executeSql("CREATE TABLE situation.source_table (\n" + > "\thost STRING,\n" + > "\turl STRING,\n" + > "\tpublic_date STRING\n" + > ") WITH (\n" + > "\t'connector.type' = 'kafka',\n" + > "\t'connector.version' = 'universal',\n" + > "\t'connector.startup-mode' = 'latest-offset',\n" + > "\t'connector.topic' = 'sendMessage',\n" + > "\t'connector.properties.group.id <http://connector.properties.group.id/>' = 'domain_testGroup',\n" + > "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" + > "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" + > "\t'update-mode' = 'append',\n" + > "\t'format.type' = 'json',\n" + > "\t'format.derive-schema' = 'true'\n" + > ")"); Best Leonard [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options> |
sorry,我把idea的log4j弄坏了,没有出现错误提示,我下面的process-time 写成了 process time。改了log提示之后,有清楚的提示了。
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年7月30日(星期四) 凌晨0:14 收件人: "user-zh"<[hidden email]>; 主题: Re: flink-1.11 hive-1.2.1 ddl 无法写入数据 Hi, kcz 看connector的properties还是1.10的格式,你换成1.11试试[1]. > 在 2020年7月29日,15:07,kcz <[hidden email]> 写道: > > tableEnv.executeSql("CREATE TABLE situation.source_table (\n" + > "\thost STRING,\n" + > "\turl STRING,\n" + > "\tpublic_date STRING\n" + > ") WITH (\n" + > "\t'connector.type' = 'kafka',\n" + > "\t'connector.version' = 'universal',\n" + > "\t'connector.startup-mode' = 'latest-offset',\n" + > "\t'connector.topic' = 'sendMessage',\n" + > "\t'connector.properties.group.id <http://connector.properties.group.id/>' = 'domain_testGroup',\n" + > "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" + > "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" + > "\t'update-mode' = 'append',\n" + > "\t'format.type' = 'json',\n" + > "\t'format.derive-schema' = 'true'\n" + > ")"); Best Leonard [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options> |
Free forum by Nabble | Edit this page |