我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到, 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多, 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。 |
Hi,
写orc表的作业有报错么?还是成功执行但是hive查不到数据呢? 看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer? On Wed, Aug 12, 2020 at 5:14 PM [hidden email] <[hidden email]> wrote: > > > 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到, > > 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多, > 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception > in thread "main" java.lang.NoClassDefFoundError: > org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval > 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。 > ------------------------------ > [hidden email] > -- Best regards! Rui Li |
尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。 2.没有设置table.exec.hive.fallback-mapred-writer。 以下是我的几个疑问 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片 这是orc生成的文件 这是parquet生成的文件
|
添加不了附件,我就直接贴代码了
import java.time.Duration import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableResult} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.catalog.hive.HiveCatalog /** * author dinghh * time 2020-08-11 17:03 */ object WriteHiveStreaming { def main(args: Array[String]): Unit = { val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamEnv.setParallelism(3) val tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings) tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20)) val catalogName = "my_catalog" val catalog = new HiveCatalog( catalogName, // catalog name "default", // default database "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources", // Hive config (hive-site.xml) directory "1.1.0" // Hive version ) tableEnv.registerCatalog(catalogName, catalog) tableEnv.useCatalog(catalogName) //删除流表 tableEnv.executeSql( """ |DROP TABLE IF EXISTS `stream_db`.`datagen_user` """.stripMargin) //创建流表 tableEnv.executeSql( """ |CREATE TABLE `stream_db`.`datagen_user` ( | id INT, | name STRING, | dt AS localtimestamp, | WATERMARK FOR dt AS dt |) WITH ( | 'connector' = 'datagen', | 'rows-per-second'='10', | 'fields.id.kind'='random', | 'fields.id.min'='1', | 'fields.id.max'='1000', | 'fields.name.length'='5' |) """.stripMargin) //切换hive方言 tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) //删除hive orc表 tableEnv.executeSql( """ |DROP TABLE IF EXISTS `default`.`hive_user_orc` | """.stripMargin) //创建hive orc表 tableEnv.executeSql( """ |CREATE TABLE `default`.`hive_user_orc` ( | id INT, | name STRING |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute STRING ) STORED AS ORC TBLPROPERTIES ( | 'partition.time-extractor.timestamp-pattern'='$ts_dt $ts_hour:$ts_minute:00.000', | 'sink.partition-commit.trigger'='partition-time', | 'sink.partition-commit.delay'='1 min', | 'sink.partition-commit.policy.kind'='metastore,success-file' |) """.stripMargin) //删除hive parquet表 tableEnv.executeSql( """ |DROP TABLE IF EXISTS `default`.`hive_user_parquet` """.stripMargin) //创建hive parquet表 tableEnv.executeSql( """ |CREATE TABLE `default`.`hive_user_parquet` ( | id INT, | name STRING |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute STRING) STORED AS PARQUET TBLPROPERTIES ( | 'partition.time-extractor.timestamp-pattern'='$ts_dt $ts_hour:$ts_minute:00.000', | 'sink.partition-commit.trigger'='partition-time', | 'sink.partition-commit.delay'='1 min', | 'sink.partition-commit.policy.kind'='metastore,success-file' |) """.stripMargin) //设置flink方言 tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) //流式写入orc表 tableEnv.executeSql( """ |INSERT INTO `default`.`hive_user_orc` |SELECT | id,name, | DATE_FORMAT(dt,'yyyy-MM-dd'), | DATE_FORMAT(dt,'HH'), | DATE_FORMAT(dt,'mm') |FROM | stream_db.datagen_user """.stripMargin) //流式写入parquet表 tableEnv.executeSql( """ |INSERT INTO `default`.`hive_user_parquet` |SELECT | id,name, | DATE_FORMAT(dt,'yyyy-MM-dd'), | DATE_FORMAT(dt,'HH'), | DATE_FORMAT(dt,'mm') |FROM | stream_db.datagen_user """.stripMargin) } } 在 2020-08-13 10:08:55,"flink小猪" <[hidden email]> 写道: 尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。 2.没有设置table.exec.hive.fallback-mapred-writer。 以下是我的几个疑问 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片 这是orc生成的文件 这是parquet生成的文件 在 2020-08-12 17:33:30,"Rui Li" <[hidden email]> 写道: >Hi, > >写orc表的作业有报错么?还是成功执行但是hive查不到数据呢? >看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer? > >On Wed, Aug 12, 2020 at 5:14 PM [hidden email] <[hidden email]> >wrote: > >> >> >> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到, >> >> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多, >> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception >> in thread "main" java.lang.NoClassDefFoundError: >> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval >> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。 >> ------------------------------ >> [hidden email] >> > > >-- >Best regards! >Rui Li |
如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束:
val tableResult = tEnv.executeSql(insert) // wait to finish tableResult.getJobClient.get .getJobExecutionResult(Thread.currentThread.getContextClassLoader) .get > 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? 这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下 > sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 可以在flink-conf.yaml里设置execution.checkpointing.interval On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <[hidden email]> wrote: > 添加不了附件,我就直接贴代码了 > > import java.time.Duration > > > import org.apache.flink.streaming.api.{CheckpointingMode, > TimeCharacteristic} > import > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, > TableResult} > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > import org.apache.flink.table.catalog.hive.HiveCatalog > > > > > /** > * author dinghh > * time 2020-08-11 17:03 > */ > object WriteHiveStreaming { > def main(args: Array[String]): Unit = { > > > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamEnv.setParallelism(3) > > > val tableEnvSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build() > val tableEnv = StreamTableEnvironment.create(streamEnv, > tableEnvSettings) > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE) > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofSeconds(20)) > > > > > > > val catalogName = "my_catalog" > val catalog = new HiveCatalog( > catalogName, // catalog name > "default", // default database > > "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources", > // Hive config (hive-site.xml) directory > "1.1.0" // Hive version > ) > tableEnv.registerCatalog(catalogName, catalog) > tableEnv.useCatalog(catalogName) > > > > > //删除流表 > tableEnv.executeSql( > """ > |DROP TABLE IF EXISTS `stream_db`.`datagen_user` > """.stripMargin) > > > //创建流表 > tableEnv.executeSql( > """ > |CREATE TABLE `stream_db`.`datagen_user` ( > | id INT, > | name STRING, > | dt AS localtimestamp, > | WATERMARK FOR dt AS dt > |) WITH ( > | 'connector' = 'datagen', > | 'rows-per-second'='10', > | 'fields.id.kind'='random', > | 'fields.id.min'='1', > | 'fields.id.max'='1000', > | 'fields.name.length'='5' > |) > """.stripMargin) > > > //切换hive方言 > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > > > //删除hive orc表 > tableEnv.executeSql( > """ > |DROP TABLE IF EXISTS `default`.`hive_user_orc` > | > """.stripMargin) > > > //创建hive orc表 > tableEnv.executeSql( > """ > |CREATE TABLE `default`.`hive_user_orc` ( > | id INT, > | name STRING > |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute > STRING ) STORED AS ORC TBLPROPERTIES ( > | 'partition.time-extractor.timestamp-pattern'='$ts_dt > $ts_hour:$ts_minute:00.000', > | 'sink.partition-commit.trigger'='partition-time', > | 'sink.partition-commit.delay'='1 min', > | > 'sink.partition-commit.policy.kind'='metastore,success-file' > |) > """.stripMargin) > > > //删除hive parquet表 > tableEnv.executeSql( > """ > |DROP TABLE IF EXISTS `default`.`hive_user_parquet` > """.stripMargin) > //创建hive parquet表 > tableEnv.executeSql( > """ > |CREATE TABLE `default`.`hive_user_parquet` ( > | id INT, > | name STRING > |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute > STRING) STORED AS PARQUET TBLPROPERTIES ( > | 'partition.time-extractor.timestamp-pattern'='$ts_dt > $ts_hour:$ts_minute:00.000', > | 'sink.partition-commit.trigger'='partition-time', > | 'sink.partition-commit.delay'='1 min', > | > 'sink.partition-commit.policy.kind'='metastore,success-file' > |) > """.stripMargin) > //设置flink方言 > tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > //流式写入orc表 > tableEnv.executeSql( > """ > |INSERT INTO `default`.`hive_user_orc` > |SELECT > | id,name, > | DATE_FORMAT(dt,'yyyy-MM-dd'), > | DATE_FORMAT(dt,'HH'), > | DATE_FORMAT(dt,'mm') > |FROM > | stream_db.datagen_user > """.stripMargin) > //流式写入parquet表 > tableEnv.executeSql( > """ > |INSERT INTO `default`.`hive_user_parquet` > |SELECT > | id,name, > | DATE_FORMAT(dt,'yyyy-MM-dd'), > | DATE_FORMAT(dt,'HH'), > | DATE_FORMAT(dt,'mm') > |FROM > | stream_db.datagen_user > """.stripMargin) > > > } > > > } > > > > > > > > > > > > > > > > > > > 在 2020-08-13 10:08:55,"flink小猪" <[hidden email]> 写道: > > > > > 尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。 > > > > > > 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。 > 2.没有设置table.exec.hive.fallback-mapred-writer。 > 以下是我的几个疑问 > 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? > 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片 > > 这是orc生成的文件 > > 这是parquet生成的文件 > > > > > > 在 2020-08-12 17:33:30,"Rui Li" <[hidden email]> 写道: > >Hi, > > > >写orc表的作业有报错么?还是成功执行但是hive查不到数据呢? > >看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer? > > > >On Wed, Aug 12, 2020 at 5:14 PM [hidden email] <[hidden email]> > >wrote: > > > >> > >> > >> > 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到, > >> > >> > 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多, > >> > 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception > >> in thread "main" java.lang.NoClassDefFoundError: > >> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval > >> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。 > >> ------------------------------ > >> [hidden email] > >> > > > > > >-- > >Best regards! > >Rui Li > > > > > > > -- Best regards! Rui Li |
In reply to this post by 丁浩浩
hi
我这边测试了ORC的,只需要把stored as pauquet 改成stored as orc即可,success文件能生成,hive里面也能查看数据,但是有一个问题是,Flink Web UI上面显示的数据量是不对的 UI 上面的records send 一直在增大 即使我已经停止向kafka写入数据了 但是hive 里面的数据是对的 我写了30条 hive里面查出来的确实是30条 但UI上面已经显示480条了 且还在增加 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Best Wishes
JasonLee |
In reply to this post by Rui Li
根据我在IDE上面的测试,如果是写入parquet表的话,不添加您发的这段代码,程序依然在运行,并且每间隔checkpoint-interval的时间
会打印parquet相关的日志,但是如果是写入orc表的话,则没有任何日志输出,程序依然在运行。另外我通过sql client提交相同的任务, parquet表依然没有任何问题,而orc表任务无限重启。并报错。 java.io.FileNotFoundException: File does not exist: hdfs://nspt-cs/hive/warehouse/hive_user_orc/ts_dt=2020-08-14/ts_hour=17/ts_minute=55/.part-650c3d36-328a-4d8d-8bdd-c170109edfba-0-0.inprogress.398158d9-eaf7-4863-855e-238c7069e298 at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54) ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1] at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84) ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at StreamExecCalc$21.processElement(Unknown Source) ~[?:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at StreamExecCalc$4.processElement(Unknown Source) ~[?:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.1.jar:1.11.1] 这个文件是存在的,并且无法关闭,然后又会起新的文件,然后无法关闭,一直重复。 在使用sql client的过程中,并行度好像只能需要读取的文件数有关。我有一张分区表,进行查询,需要58个并行度,而我的集群只有10个,导致无法查询到数据,我应该 如果能解决这个问题呢 在 2020-08-13 15:40:54,"Rui Li" <[hidden email]> 写道: >如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束: > >val tableResult = tEnv.executeSql(insert) >// wait to finish >tableResult.getJobClient.get > .getJobExecutionResult(Thread.currentThread.getContextClassLoader) > .get > >> 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? > >这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下 > >> sql client 我想要设置checkpoint生成间隔我应该在哪里设置? > >可以在flink-conf.yaml里设置execution.checkpointing.interval > > >On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <[hidden email]> wrote: > >> 添加不了附件,我就直接贴代码了 >> >> import java.time.Duration >> >> >> import org.apache.flink.streaming.api.{CheckpointingMode, >> TimeCharacteristic} >> import >> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions >> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, >> TableResult} >> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment >> import org.apache.flink.table.catalog.hive.HiveCatalog >> >> >> >> >> /** >> * author dinghh >> * time 2020-08-11 17:03 >> */ >> object WriteHiveStreaming { >> def main(args: Array[String]): Unit = { >> >> >> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment >> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> streamEnv.setParallelism(3) >> >> >> val tableEnvSettings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build() >> val tableEnv = StreamTableEnvironment.create(streamEnv, >> tableEnvSettings) >> >> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >> CheckpointingMode.EXACTLY_ONCE) >> >> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >> Duration.ofSeconds(20)) >> >> >> >> >> >> >> val catalogName = "my_catalog" >> val catalog = new HiveCatalog( >> catalogName, // catalog name >> "default", // default database >> >> "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources", >> // Hive config (hive-site.xml) directory >> "1.1.0" // Hive version >> ) >> tableEnv.registerCatalog(catalogName, catalog) >> tableEnv.useCatalog(catalogName) >> >> >> >> >> //删除流表 >> tableEnv.executeSql( >> """ >> |DROP TABLE IF EXISTS `stream_db`.`datagen_user` >> """.stripMargin) >> >> >> //创建流表 >> tableEnv.executeSql( >> """ >> |CREATE TABLE `stream_db`.`datagen_user` ( >> | id INT, >> | name STRING, >> | dt AS localtimestamp, >> | WATERMARK FOR dt AS dt >> |) WITH ( >> | 'connector' = 'datagen', >> | 'rows-per-second'='10', >> | 'fields.id.kind'='random', >> | 'fields.id.min'='1', >> | 'fields.id.max'='1000', >> | 'fields.name.length'='5' >> |) >> """.stripMargin) >> >> >> //切换hive方言 >> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) >> >> >> //删除hive orc表 >> tableEnv.executeSql( >> """ >> |DROP TABLE IF EXISTS `default`.`hive_user_orc` >> | >> """.stripMargin) >> >> >> //创建hive orc表 >> tableEnv.executeSql( >> """ >> |CREATE TABLE `default`.`hive_user_orc` ( >> | id INT, >> | name STRING >> |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute >> STRING ) STORED AS ORC TBLPROPERTIES ( >> | 'partition.time-extractor.timestamp-pattern'='$ts_dt >> $ts_hour:$ts_minute:00.000', >> | 'sink.partition-commit.trigger'='partition-time', >> | 'sink.partition-commit.delay'='1 min', >> | >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> |) >> """.stripMargin) >> >> >> //删除hive parquet表 >> tableEnv.executeSql( >> """ >> |DROP TABLE IF EXISTS `default`.`hive_user_parquet` >> """.stripMargin) >> //创建hive parquet表 >> tableEnv.executeSql( >> """ >> |CREATE TABLE `default`.`hive_user_parquet` ( >> | id INT, >> | name STRING >> |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute >> STRING) STORED AS PARQUET TBLPROPERTIES ( >> | 'partition.time-extractor.timestamp-pattern'='$ts_dt >> $ts_hour:$ts_minute:00.000', >> | 'sink.partition-commit.trigger'='partition-time', >> | 'sink.partition-commit.delay'='1 min', >> | >> 'sink.partition-commit.policy.kind'='metastore,success-file' >> |) >> """.stripMargin) >> //设置flink方言 >> tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) >> //流式写入orc表 >> tableEnv.executeSql( >> """ >> |INSERT INTO `default`.`hive_user_orc` >> |SELECT >> | id,name, >> | DATE_FORMAT(dt,'yyyy-MM-dd'), >> | DATE_FORMAT(dt,'HH'), >> | DATE_FORMAT(dt,'mm') >> |FROM >> | stream_db.datagen_user >> """.stripMargin) >> //流式写入parquet表 >> tableEnv.executeSql( >> """ >> |INSERT INTO `default`.`hive_user_parquet` >> |SELECT >> | id,name, >> | DATE_FORMAT(dt,'yyyy-MM-dd'), >> | DATE_FORMAT(dt,'HH'), >> | DATE_FORMAT(dt,'mm') >> |FROM >> | stream_db.datagen_user >> """.stripMargin) >> >> >> } >> >> >> } >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-08-13 10:08:55,"flink小猪" <[hidden email]> 写道: >> >> >> >> >> 尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。 >> >> >> >> >> >> 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。 >> 2.没有设置table.exec.hive.fallback-mapred-writer。 >> 以下是我的几个疑问 >> 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? >> 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片 >> >> 这是orc生成的文件 >> >> 这是parquet生成的文件 >> >> >> >> >> >> 在 2020-08-12 17:33:30,"Rui Li" <[hidden email]> 写道: >> >Hi, >> > >> >写orc表的作业有报错么?还是成功执行但是hive查不到数据呢? >> >看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer? >> > >> >On Wed, Aug 12, 2020 at 5:14 PM [hidden email] <[hidden email]> >> >wrote: >> > >> >> >> >> >> >> >> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到, >> >> >> >> >> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多, >> >> >> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception >> >> in thread "main" java.lang.NoClassDefFoundError: >> >> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval >> >> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。 >> >> ------------------------------ >> >> [hidden email] >> >> >> > >> > >> >-- >> >Best regards! >> >Rui Li >> >> >> >> >> >> >> > > > >-- >Best regards! >Rui Li |
这是bug,已经修复了,待发布
On Fri, Aug 14, 2020 at 6:05 PM flink小猪 <[hidden email]> wrote: > 根据我在IDE上面的测试,如果是写入parquet表的话,不添加您发的这段代码,程序依然在运行,并且每间隔checkpoint-interval的时间 > 会打印parquet相关的日志,但是如果是写入orc表的话,则没有任何日志输出,程序依然在运行。另外我通过sql client提交相同的任务, > parquet表依然没有任何问题,而orc表任务无限重启。并报错。 > > java.io.FileNotFoundException: File does not exist: > hdfs://nspt-cs/hive/warehouse/hive_user_orc/ts_dt=2020-08-14/ts_hour=17/ts_minute=55/.part-650c3d36-328a-4d8d-8bdd-c170109edfba-0-0.inprogress.398158d9-eaf7-4863-855e-238c7069e298 > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) > ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) > ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) > ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0] > at > org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54) > ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84) > ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at StreamExecCalc$21.processElement(Unknown Source) ~[?:?] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at StreamExecCalc$4.processElement(Unknown Source) ~[?:?] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > 这个文件是存在的,并且无法关闭,然后又会起新的文件,然后无法关闭,一直重复。 > 在使用sql > client的过程中,并行度好像只能需要读取的文件数有关。我有一张分区表,进行查询,需要58个并行度,而我的集群只有10个,导致无法查询到数据,我应该 > 如果能解决这个问题呢 > > > > > > > > > > > > > > > 在 2020-08-13 15:40:54,"Rui Li" <[hidden email]> 写道: > >如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束: > > > >val tableResult = tEnv.executeSql(insert) > >// wait to finish > >tableResult.getJobClient.get > > .getJobExecutionResult(Thread.currentThread.getContextClassLoader) > > .get > > > >> 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? > > > > >这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下 > > > >> sql client 我想要设置checkpoint生成间隔我应该在哪里设置? > > > >可以在flink-conf.yaml里设置execution.checkpointing.interval > > > > > >On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <[hidden email]> wrote: > > > >> 添加不了附件,我就直接贴代码了 > >> > >> import java.time.Duration > >> > >> > >> import org.apache.flink.streaming.api.{CheckpointingMode, > >> TimeCharacteristic} > >> import > >> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions > >> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > >> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, > >> TableResult} > >> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > >> import org.apache.flink.table.catalog.hive.HiveCatalog > >> > >> > >> > >> > >> /** > >> * author dinghh > >> * time 2020-08-11 17:03 > >> */ > >> object WriteHiveStreaming { > >> def main(args: Array[String]): Unit = { > >> > >> > >> val streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment > >> > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > >> streamEnv.setParallelism(3) > >> > >> > >> val tableEnvSettings = EnvironmentSettings.newInstance() > >> .useBlinkPlanner() > >> .inStreamingMode() > >> .build() > >> val tableEnv = StreamTableEnvironment.create(streamEnv, > >> tableEnvSettings) > >> > >> > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > >> CheckpointingMode.EXACTLY_ONCE) > >> > >> > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > >> Duration.ofSeconds(20)) > >> > >> > >> > >> > >> > >> > >> val catalogName = "my_catalog" > >> val catalog = new HiveCatalog( > >> catalogName, // catalog name > >> "default", // default database > >> > >> > "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources", > >> // Hive config (hive-site.xml) directory > >> "1.1.0" // Hive version > >> ) > >> tableEnv.registerCatalog(catalogName, catalog) > >> tableEnv.useCatalog(catalogName) > >> > >> > >> > >> > >> //删除流表 > >> tableEnv.executeSql( > >> """ > >> |DROP TABLE IF EXISTS `stream_db`.`datagen_user` > >> """.stripMargin) > >> > >> > >> //创建流表 > >> tableEnv.executeSql( > >> """ > >> |CREATE TABLE `stream_db`.`datagen_user` ( > >> | id INT, > >> | name STRING, > >> | dt AS localtimestamp, > >> | WATERMARK FOR dt AS dt > >> |) WITH ( > >> | 'connector' = 'datagen', > >> | 'rows-per-second'='10', > >> | 'fields.id.kind'='random', > >> | 'fields.id.min'='1', > >> | 'fields.id.max'='1000', > >> | 'fields.name.length'='5' > >> |) > >> """.stripMargin) > >> > >> > >> //切换hive方言 > >> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> > >> > >> //删除hive orc表 > >> tableEnv.executeSql( > >> """ > >> |DROP TABLE IF EXISTS `default`.`hive_user_orc` > >> | > >> """.stripMargin) > >> > >> > >> //创建hive orc表 > >> tableEnv.executeSql( > >> """ > >> |CREATE TABLE `default`.`hive_user_orc` ( > >> | id INT, > >> | name STRING > >> |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute > >> STRING ) STORED AS ORC TBLPROPERTIES ( > >> | 'partition.time-extractor.timestamp-pattern'='$ts_dt > >> $ts_hour:$ts_minute:00.000', > >> | 'sink.partition-commit.trigger'='partition-time', > >> | 'sink.partition-commit.delay'='1 min', > >> | > >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> |) > >> """.stripMargin) > >> > >> > >> //删除hive parquet表 > >> tableEnv.executeSql( > >> """ > >> |DROP TABLE IF EXISTS `default`.`hive_user_parquet` > >> """.stripMargin) > >> //创建hive parquet表 > >> tableEnv.executeSql( > >> """ > >> |CREATE TABLE `default`.`hive_user_parquet` ( > >> | id INT, > >> | name STRING > >> |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute > >> STRING) STORED AS PARQUET TBLPROPERTIES ( > >> | 'partition.time-extractor.timestamp-pattern'='$ts_dt > >> $ts_hour:$ts_minute:00.000', > >> | 'sink.partition-commit.trigger'='partition-time', > >> | 'sink.partition-commit.delay'='1 min', > >> | > >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> |) > >> """.stripMargin) > >> //设置flink方言 > >> tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> //流式写入orc表 > >> tableEnv.executeSql( > >> """ > >> |INSERT INTO `default`.`hive_user_orc` > >> |SELECT > >> | id,name, > >> | DATE_FORMAT(dt,'yyyy-MM-dd'), > >> | DATE_FORMAT(dt,'HH'), > >> | DATE_FORMAT(dt,'mm') > >> |FROM > >> | stream_db.datagen_user > >> """.stripMargin) > >> //流式写入parquet表 > >> tableEnv.executeSql( > >> """ > >> |INSERT INTO `default`.`hive_user_parquet` > >> |SELECT > >> | id,name, > >> | DATE_FORMAT(dt,'yyyy-MM-dd'), > >> | DATE_FORMAT(dt,'HH'), > >> | DATE_FORMAT(dt,'mm') > >> |FROM > >> | stream_db.datagen_user > >> """.stripMargin) > >> > >> > >> } > >> > >> > >> } > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-08-13 10:08:55,"flink小猪" <[hidden email]> 写道: > >> > >> > >> > >> > >> 尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。 > >> > >> > >> > >> > >> > >> > 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。 > >> 2.没有设置table.exec.hive.fallback-mapred-writer。 > >> 以下是我的几个疑问 > >> 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? > >> 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片 > >> > >> 这是orc生成的文件 > >> > >> 这是parquet生成的文件 > >> > >> > >> > >> > >> > >> 在 2020-08-12 17:33:30,"Rui Li" <[hidden email]> 写道: > >> >Hi, > >> > > >> >写orc表的作业有报错么?还是成功执行但是hive查不到数据呢? > >> >看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer? > >> > > >> >On Wed, Aug 12, 2020 at 5:14 PM [hidden email] < > [hidden email]> > >> >wrote: > >> > > >> >> > >> >> > >> >> > >> > 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到, > >> >> > >> >> > >> > 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多, > >> >> > >> > 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception > >> >> in thread "main" java.lang.NoClassDefFoundError: > >> >> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval > >> >> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。 > >> >> ------------------------------ > >> >> [hidden email] > >> >> > >> > > >> > > >> >-- > >> >Best regards! > >> >Rui Li > >> > >> > >> > >> > >> > >> > >> > > > > > > > >-- > >Best regards! > >Rui Li > -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |