用hive streaming写 orc文件的问题

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

用hive streaming写 orc文件的问题

丁浩浩

我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到,
但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,
而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。

Reply | Threaded
Open this post in threaded view
|

Re: 用hive streaming写 orc文件的问题

Rui Li
Hi,

写orc表的作业有报错么?还是成功执行但是hive查不到数据呢?
看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer?

On Wed, Aug 12, 2020 at 5:14 PM [hidden email] <[hidden email]>
wrote:

>
>
> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到,
>
> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,
> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception
> in thread "main" java.lang.NoClassDefFoundError:
> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval
> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。
> ------------------------------
> [hidden email]
>


--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re:Re: 用hive streaming写 orc文件的问题

丁浩浩


尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。


1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。
2.没有设置table.exec.hive.fallback-mapred-writer。
以下是我的几个疑问
1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片
 
这是orc生成的文件
这是parquet生成的文件



在 2020-08-12 17:33:30,"Rui Li" <[hidden email]> 写道: >Hi, > >写orc表的作业有报错么?还是成功执行但是hive查不到数据呢? >看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer? > >On Wed, Aug 12, 2020 at 5:14 PM [hidden email] <[hidden email]> >wrote: > >> >> >> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到, >> >> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多, >> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception >> in thread "main" java.lang.NoClassDefFoundError: >> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval >> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。 >> ------------------------------ >> [hidden email] >> > > >-- >Best regards! >Rui Li



 

Reply | Threaded
Open this post in threaded view
|

Re:Re:Re: 用hive streaming写 orc文件的问题

丁浩浩
添加不了附件,我就直接贴代码了

import java.time.Duration


import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableResult}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog




/**
  * author dinghh
  * time 2020-08-11 17:03
  */
object WriteHiveStreaming {
    def main(args: Array[String]): Unit = {


        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        streamEnv.setParallelism(3)


        val tableEnvSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build()
        val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
        tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
        tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))






        val catalogName = "my_catalog"
        val catalog = new HiveCatalog(
            catalogName,              // catalog name
            "default",                // default database
            "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources",  // Hive config (hive-site.xml) directory
            "1.1.0"                   // Hive version
        )
        tableEnv.registerCatalog(catalogName, catalog)
        tableEnv.useCatalog(catalogName)




        //删除流表
        tableEnv.executeSql(
            """
              |DROP TABLE IF EXISTS `stream_db`.`datagen_user`
            """.stripMargin)


        //创建流表
        tableEnv.executeSql(
            """
              |CREATE TABLE `stream_db`.`datagen_user` (
              | id INT,
              | name STRING,
              | dt AS localtimestamp,
              | WATERMARK FOR dt AS dt
              |) WITH (
              | 'connector' = 'datagen',
              | 'rows-per-second'='10',
              | 'fields.id.kind'='random',
              | 'fields.id.min'='1',
              | 'fields.id.max'='1000',
              | 'fields.name.length'='5'
              |)
            """.stripMargin)


        //切换hive方言
        tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)


        //删除hive orc表
        tableEnv.executeSql(
            """
              |DROP TABLE IF EXISTS `default`.`hive_user_orc`
              |
            """.stripMargin)


        //创建hive orc表
        tableEnv.executeSql(
            """
              |CREATE TABLE `default`.`hive_user_orc` (
              |  id INT,
              |  name STRING
              |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute STRING ) STORED AS ORC TBLPROPERTIES (
              |  'partition.time-extractor.timestamp-pattern'='$ts_dt $ts_hour:$ts_minute:00.000',
              |  'sink.partition-commit.trigger'='partition-time',
              |  'sink.partition-commit.delay'='1 min',
              |  'sink.partition-commit.policy.kind'='metastore,success-file'
              |)
            """.stripMargin)


        //删除hive parquet表
        tableEnv.executeSql(
            """
              |DROP TABLE IF EXISTS `default`.`hive_user_parquet`
            """.stripMargin)
        //创建hive parquet表
        tableEnv.executeSql(
            """
              |CREATE TABLE `default`.`hive_user_parquet` (
              |  id INT,
              |  name STRING
              |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute STRING) STORED AS PARQUET TBLPROPERTIES (
              |  'partition.time-extractor.timestamp-pattern'='$ts_dt $ts_hour:$ts_minute:00.000',
              |  'sink.partition-commit.trigger'='partition-time',
              |  'sink.partition-commit.delay'='1 min',
              |  'sink.partition-commit.policy.kind'='metastore,success-file'
              |)
            """.stripMargin)
        //设置flink方言
        tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
        //流式写入orc表
        tableEnv.executeSql(
            """
              |INSERT INTO `default`.`hive_user_orc`
              |SELECT
              |    id,name,
              |    DATE_FORMAT(dt,'yyyy-MM-dd'),
              |    DATE_FORMAT(dt,'HH'),
              |    DATE_FORMAT(dt,'mm')
              |FROM
              |    stream_db.datagen_user
            """.stripMargin)
        //流式写入parquet表
        tableEnv.executeSql(
            """
              |INSERT INTO `default`.`hive_user_parquet`
              |SELECT
              |    id,name,
              |    DATE_FORMAT(dt,'yyyy-MM-dd'),
              |    DATE_FORMAT(dt,'HH'),
              |    DATE_FORMAT(dt,'mm')
              |FROM
              |    stream_db.datagen_user
            """.stripMargin)


    }


}


















在 2020-08-13 10:08:55,"flink小猪" <[hidden email]> 写道:




尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。




1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。
2.没有设置table.exec.hive.fallback-mapred-writer。
以下是我的几个疑问
1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片
 
这是orc生成的文件

这是parquet生成的文件





在 2020-08-12 17:33:30,"Rui Li" <[hidden email]> 写道:

>Hi,
>
>写orc表的作业有报错么?还是成功执行但是hive查不到数据呢?
>看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer?
>
>On Wed, Aug 12, 2020 at 5:14 PM [hidden email] <[hidden email]>
>wrote:
>
>>
>>
>> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到,
>>
>> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,
>> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception
>> in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval
>> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。
>> ------------------------------
>> [hidden email]
>>
>
>
>--
>Best regards!
>Rui Li






 
Reply | Threaded
Open this post in threaded view
|

Re: Re:Re: 用hive streaming写 orc文件的问题

Rui Li
如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束:

val tableResult = tEnv.executeSql(insert)
// wait to finish
tableResult.getJobClient.get
  .getJobExecutionResult(Thread.currentThread.getContextClassLoader)
  .get

> 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?

这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下

> sql client 我想要设置checkpoint生成间隔我应该在哪里设置?

可以在flink-conf.yaml里设置execution.checkpointing.interval


On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <[hidden email]> wrote:

> 添加不了附件,我就直接贴代码了
>
> import java.time.Duration
>
>
> import org.apache.flink.streaming.api.{CheckpointingMode,
> TimeCharacteristic}
> import
> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
> TableResult}
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.catalog.hive.HiveCatalog
>
>
>
>
> /**
>   * author dinghh
>   * time 2020-08-11 17:03
>   */
> object WriteHiveStreaming {
>     def main(args: Array[String]): Unit = {
>
>
>         val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>         streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>         streamEnv.setParallelism(3)
>
>
>         val tableEnvSettings = EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build()
>         val tableEnv = StreamTableEnvironment.create(streamEnv,
> tableEnvSettings)
>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE)
>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofSeconds(20))
>
>
>
>
>
>
>         val catalogName = "my_catalog"
>         val catalog = new HiveCatalog(
>             catalogName,              // catalog name
>             "default",                // default database
>
> "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources",
> // Hive config (hive-site.xml) directory
>             "1.1.0"                   // Hive version
>         )
>         tableEnv.registerCatalog(catalogName, catalog)
>         tableEnv.useCatalog(catalogName)
>
>
>
>
>         //删除流表
>         tableEnv.executeSql(
>             """
>               |DROP TABLE IF EXISTS `stream_db`.`datagen_user`
>             """.stripMargin)
>
>
>         //创建流表
>         tableEnv.executeSql(
>             """
>               |CREATE TABLE `stream_db`.`datagen_user` (
>               | id INT,
>               | name STRING,
>               | dt AS localtimestamp,
>               | WATERMARK FOR dt AS dt
>               |) WITH (
>               | 'connector' = 'datagen',
>               | 'rows-per-second'='10',
>               | 'fields.id.kind'='random',
>               | 'fields.id.min'='1',
>               | 'fields.id.max'='1000',
>               | 'fields.name.length'='5'
>               |)
>             """.stripMargin)
>
>
>         //切换hive方言
>         tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>
>
>         //删除hive orc表
>         tableEnv.executeSql(
>             """
>               |DROP TABLE IF EXISTS `default`.`hive_user_orc`
>               |
>             """.stripMargin)
>
>
>         //创建hive orc表
>         tableEnv.executeSql(
>             """
>               |CREATE TABLE `default`.`hive_user_orc` (
>               |  id INT,
>               |  name STRING
>               |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute
> STRING ) STORED AS ORC TBLPROPERTIES (
>               |  'partition.time-extractor.timestamp-pattern'='$ts_dt
> $ts_hour:$ts_minute:00.000',
>               |  'sink.partition-commit.trigger'='partition-time',
>               |  'sink.partition-commit.delay'='1 min',
>               |
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>               |)
>             """.stripMargin)
>
>
>         //删除hive parquet表
>         tableEnv.executeSql(
>             """
>               |DROP TABLE IF EXISTS `default`.`hive_user_parquet`
>             """.stripMargin)
>         //创建hive parquet表
>         tableEnv.executeSql(
>             """
>               |CREATE TABLE `default`.`hive_user_parquet` (
>               |  id INT,
>               |  name STRING
>               |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute
> STRING) STORED AS PARQUET TBLPROPERTIES (
>               |  'partition.time-extractor.timestamp-pattern'='$ts_dt
> $ts_hour:$ts_minute:00.000',
>               |  'sink.partition-commit.trigger'='partition-time',
>               |  'sink.partition-commit.delay'='1 min',
>               |
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>               |)
>             """.stripMargin)
>         //设置flink方言
>         tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>         //流式写入orc表
>         tableEnv.executeSql(
>             """
>               |INSERT INTO `default`.`hive_user_orc`
>               |SELECT
>               |    id,name,
>               |    DATE_FORMAT(dt,'yyyy-MM-dd'),
>               |    DATE_FORMAT(dt,'HH'),
>               |    DATE_FORMAT(dt,'mm')
>               |FROM
>               |    stream_db.datagen_user
>             """.stripMargin)
>         //流式写入parquet表
>         tableEnv.executeSql(
>             """
>               |INSERT INTO `default`.`hive_user_parquet`
>               |SELECT
>               |    id,name,
>               |    DATE_FORMAT(dt,'yyyy-MM-dd'),
>               |    DATE_FORMAT(dt,'HH'),
>               |    DATE_FORMAT(dt,'mm')
>               |FROM
>               |    stream_db.datagen_user
>             """.stripMargin)
>
>
>     }
>
>
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-13 10:08:55,"flink小猪" <[hidden email]> 写道:
>
>
>
>
> 尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。
>
>
>
>
>
> 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。
> 2.没有设置table.exec.hive.fallback-mapred-writer。
> 以下是我的几个疑问
> 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
> 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片
>
> 这是orc生成的文件
>
> 这是parquet生成的文件
>
>
>
>
>
> 在 2020-08-12 17:33:30,"Rui Li" <[hidden email]> 写道:
> >Hi,
> >
> >写orc表的作业有报错么?还是成功执行但是hive查不到数据呢?
> >看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer?
> >
> >On Wed, Aug 12, 2020 at 5:14 PM [hidden email] <[hidden email]>
> >wrote:
> >
> >>
> >>
> >>
> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到,
> >>
> >>
> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,
> >>
> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception
> >> in thread "main" java.lang.NoClassDefFoundError:
> >> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval
> >> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。
> >> ------------------------------
> >> [hidden email]
> >>
> >
> >
> >--
> >Best regards!
> >Rui Li
>
>
>
>
>
>
>



--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: 用hive streaming写 orc文件的问题

JasonLee
In reply to this post by 丁浩浩
hi

我这边测试了ORC的,只需要把stored as pauquet 改成stored as
orc即可,success文件能生成,hive里面也能查看数据,但是有一个问题是,Flink Web UI上面显示的数据量是不对的 UI
上面的records send 一直在增大 即使我已经停止向kafka写入数据了 但是hive 里面的数据是对的 我写了30条
hive里面查出来的确实是30条 但UI上面已经显示480条了 且还在增加



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Best Wishes
JasonLee
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re:Re: 用hive streaming写 orc文件的问题

丁浩浩
In reply to this post by Rui Li
根据我在IDE上面的测试,如果是写入parquet表的话,不添加您发的这段代码,程序依然在运行,并且每间隔checkpoint-interval的时间
会打印parquet相关的日志,但是如果是写入orc表的话,则没有任何日志输出,程序依然在运行。另外我通过sql client提交相同的任务,
parquet表依然没有任何问题,而orc表任务无限重启。并报错。

java.io.FileNotFoundException: File does not exist: hdfs://nspt-cs/hive/warehouse/hive_user_orc/ts_dt=2020-08-14/ts_hour=17/ts_minute=55/.part-650c3d36-328a-4d8d-8bdd-c170109edfba-0-0.inprogress.398158d9-eaf7-4863-855e-238c7069e298
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
    at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
    at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54) ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84) ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at StreamExecCalc$21.processElement(Unknown Source) ~[?:?]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at StreamExecCalc$4.processElement(Unknown Source) ~[?:?]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
这个文件是存在的,并且无法关闭,然后又会起新的文件,然后无法关闭,一直重复。
在使用sql client的过程中,并行度好像只能需要读取的文件数有关。我有一张分区表,进行查询,需要58个并行度,而我的集群只有10个,导致无法查询到数据,我应该
如果能解决这个问题呢














在 2020-08-13 15:40:54,"Rui Li" <[hidden email]> 写道:

>如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束:
>
>val tableResult = tEnv.executeSql(insert)
>// wait to finish
>tableResult.getJobClient.get
>  .getJobExecutionResult(Thread.currentThread.getContextClassLoader)
>  .get
>
>> 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
>
>这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下
>
>> sql client 我想要设置checkpoint生成间隔我应该在哪里设置?
>
>可以在flink-conf.yaml里设置execution.checkpointing.interval
>
>
>On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <[hidden email]> wrote:
>
>> 添加不了附件,我就直接贴代码了
>>
>> import java.time.Duration
>>
>>
>> import org.apache.flink.streaming.api.{CheckpointingMode,
>> TimeCharacteristic}
>> import
>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
>> TableResult}
>> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>> import org.apache.flink.table.catalog.hive.HiveCatalog
>>
>>
>>
>>
>> /**
>>   * author dinghh
>>   * time 2020-08-11 17:03
>>   */
>> object WriteHiveStreaming {
>>     def main(args: Array[String]): Unit = {
>>
>>
>>         val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>         streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>         streamEnv.setParallelism(3)
>>
>>
>>         val tableEnvSettings = EnvironmentSettings.newInstance()
>>                 .useBlinkPlanner()
>>                 .inStreamingMode()
>>                 .build()
>>         val tableEnv = StreamTableEnvironment.create(streamEnv,
>> tableEnvSettings)
>>
>> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>> CheckpointingMode.EXACTLY_ONCE)
>>
>> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>> Duration.ofSeconds(20))
>>
>>
>>
>>
>>
>>
>>         val catalogName = "my_catalog"
>>         val catalog = new HiveCatalog(
>>             catalogName,              // catalog name
>>             "default",                // default database
>>
>> "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources",
>> // Hive config (hive-site.xml) directory
>>             "1.1.0"                   // Hive version
>>         )
>>         tableEnv.registerCatalog(catalogName, catalog)
>>         tableEnv.useCatalog(catalogName)
>>
>>
>>
>>
>>         //删除流表
>>         tableEnv.executeSql(
>>             """
>>               |DROP TABLE IF EXISTS `stream_db`.`datagen_user`
>>             """.stripMargin)
>>
>>
>>         //创建流表
>>         tableEnv.executeSql(
>>             """
>>               |CREATE TABLE `stream_db`.`datagen_user` (
>>               | id INT,
>>               | name STRING,
>>               | dt AS localtimestamp,
>>               | WATERMARK FOR dt AS dt
>>               |) WITH (
>>               | 'connector' = 'datagen',
>>               | 'rows-per-second'='10',
>>               | 'fields.id.kind'='random',
>>               | 'fields.id.min'='1',
>>               | 'fields.id.max'='1000',
>>               | 'fields.name.length'='5'
>>               |)
>>             """.stripMargin)
>>
>>
>>         //切换hive方言
>>         tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>>
>>
>>         //删除hive orc表
>>         tableEnv.executeSql(
>>             """
>>               |DROP TABLE IF EXISTS `default`.`hive_user_orc`
>>               |
>>             """.stripMargin)
>>
>>
>>         //创建hive orc表
>>         tableEnv.executeSql(
>>             """
>>               |CREATE TABLE `default`.`hive_user_orc` (
>>               |  id INT,
>>               |  name STRING
>>               |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute
>> STRING ) STORED AS ORC TBLPROPERTIES (
>>               |  'partition.time-extractor.timestamp-pattern'='$ts_dt
>> $ts_hour:$ts_minute:00.000',
>>               |  'sink.partition-commit.trigger'='partition-time',
>>               |  'sink.partition-commit.delay'='1 min',
>>               |
>> 'sink.partition-commit.policy.kind'='metastore,success-file'
>>               |)
>>             """.stripMargin)
>>
>>
>>         //删除hive parquet表
>>         tableEnv.executeSql(
>>             """
>>               |DROP TABLE IF EXISTS `default`.`hive_user_parquet`
>>             """.stripMargin)
>>         //创建hive parquet表
>>         tableEnv.executeSql(
>>             """
>>               |CREATE TABLE `default`.`hive_user_parquet` (
>>               |  id INT,
>>               |  name STRING
>>               |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute
>> STRING) STORED AS PARQUET TBLPROPERTIES (
>>               |  'partition.time-extractor.timestamp-pattern'='$ts_dt
>> $ts_hour:$ts_minute:00.000',
>>               |  'sink.partition-commit.trigger'='partition-time',
>>               |  'sink.partition-commit.delay'='1 min',
>>               |
>> 'sink.partition-commit.policy.kind'='metastore,success-file'
>>               |)
>>             """.stripMargin)
>>         //设置flink方言
>>         tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>>         //流式写入orc表
>>         tableEnv.executeSql(
>>             """
>>               |INSERT INTO `default`.`hive_user_orc`
>>               |SELECT
>>               |    id,name,
>>               |    DATE_FORMAT(dt,'yyyy-MM-dd'),
>>               |    DATE_FORMAT(dt,'HH'),
>>               |    DATE_FORMAT(dt,'mm')
>>               |FROM
>>               |    stream_db.datagen_user
>>             """.stripMargin)
>>         //流式写入parquet表
>>         tableEnv.executeSql(
>>             """
>>               |INSERT INTO `default`.`hive_user_parquet`
>>               |SELECT
>>               |    id,name,
>>               |    DATE_FORMAT(dt,'yyyy-MM-dd'),
>>               |    DATE_FORMAT(dt,'HH'),
>>               |    DATE_FORMAT(dt,'mm')
>>               |FROM
>>               |    stream_db.datagen_user
>>             """.stripMargin)
>>
>>
>>     }
>>
>>
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-08-13 10:08:55,"flink小猪" <[hidden email]> 写道:
>>
>>
>>
>>
>> 尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。
>>
>>
>>
>>
>>
>> 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。
>> 2.没有设置table.exec.hive.fallback-mapred-writer。
>> 以下是我的几个疑问
>> 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
>> 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片
>>
>> 这是orc生成的文件
>>
>> 这是parquet生成的文件
>>
>>
>>
>>
>>
>> 在 2020-08-12 17:33:30,"Rui Li" <[hidden email]> 写道:
>> >Hi,
>> >
>> >写orc表的作业有报错么?还是成功执行但是hive查不到数据呢?
>> >看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer?
>> >
>> >On Wed, Aug 12, 2020 at 5:14 PM [hidden email] <[hidden email]>
>> >wrote:
>> >
>> >>
>> >>
>> >>
>> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到,
>> >>
>> >>
>> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,
>> >>
>> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception
>> >> in thread "main" java.lang.NoClassDefFoundError:
>> >> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval
>> >> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。
>> >> ------------------------------
>> >> [hidden email]
>> >>
>> >
>> >
>> >--
>> >Best regards!
>> >Rui Li
>>
>>
>>
>>
>>
>>
>>
>
>
>
>--
>Best regards!
>Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re:Re: 用hive streaming写 orc文件的问题

Jingsong Li
这是bug,已经修复了,待发布

On Fri, Aug 14, 2020 at 6:05 PM flink小猪 <[hidden email]> wrote:

> 根据我在IDE上面的测试,如果是写入parquet表的话,不添加您发的这段代码,程序依然在运行,并且每间隔checkpoint-interval的时间
> 会打印parquet相关的日志,但是如果是写入orc表的话,则没有任何日志输出,程序依然在运行。另外我通过sql client提交相同的任务,
> parquet表依然没有任何问题,而orc表任务无限重启。并报错。
>
> java.io.FileNotFoundException: File does not exist:
> hdfs://nspt-cs/hive/warehouse/hive_user_orc/ts_dt=2020-08-14/ts_hour=17/ts_minute=55/.part-650c3d36-328a-4d8d-8bdd-c170109edfba-0-0.inprogress.398158d9-eaf7-4863-855e-238c7069e298
>     at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
>     at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
>     at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
>     at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
> ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
>     at
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
> ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
> ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at StreamExecCalc$21.processElement(Unknown Source) ~[?:?]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at StreamExecCalc$4.processElement(Unknown Source) ~[?:?]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> 这个文件是存在的,并且无法关闭,然后又会起新的文件,然后无法关闭,一直重复。
> 在使用sql
> client的过程中,并行度好像只能需要读取的文件数有关。我有一张分区表,进行查询,需要58个并行度,而我的集群只有10个,导致无法查询到数据,我应该
> 如果能解决这个问题呢
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-13 15:40:54,"Rui Li" <[hidden email]> 写道:
> >如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束:
> >
> >val tableResult = tEnv.executeSql(insert)
> >// wait to finish
> >tableResult.getJobClient.get
> >  .getJobExecutionResult(Thread.currentThread.getContextClassLoader)
> >  .get
> >
> >> 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
> >
>
> >这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下
> >
> >> sql client 我想要设置checkpoint生成间隔我应该在哪里设置?
> >
> >可以在flink-conf.yaml里设置execution.checkpointing.interval
> >
> >
> >On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <[hidden email]> wrote:
> >
> >> 添加不了附件,我就直接贴代码了
> >>
> >> import java.time.Duration
> >>
> >>
> >> import org.apache.flink.streaming.api.{CheckpointingMode,
> >> TimeCharacteristic}
> >> import
> >> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
> >> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> >> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
> >> TableResult}
> >> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> >> import org.apache.flink.table.catalog.hive.HiveCatalog
> >>
> >>
> >>
> >>
> >> /**
> >>   * author dinghh
> >>   * time 2020-08-11 17:03
> >>   */
> >> object WriteHiveStreaming {
> >>     def main(args: Array[String]): Unit = {
> >>
> >>
> >>         val streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
>  streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >>         streamEnv.setParallelism(3)
> >>
> >>
> >>         val tableEnvSettings = EnvironmentSettings.newInstance()
> >>                 .useBlinkPlanner()
> >>                 .inStreamingMode()
> >>                 .build()
> >>         val tableEnv = StreamTableEnvironment.create(streamEnv,
> >> tableEnvSettings)
> >>
> >>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> >> CheckpointingMode.EXACTLY_ONCE)
> >>
> >>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> >> Duration.ofSeconds(20))
> >>
> >>
> >>
> >>
> >>
> >>
> >>         val catalogName = "my_catalog"
> >>         val catalog = new HiveCatalog(
> >>             catalogName,              // catalog name
> >>             "default",                // default database
> >>
> >>
> "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources",
> >> // Hive config (hive-site.xml) directory
> >>             "1.1.0"                   // Hive version
> >>         )
> >>         tableEnv.registerCatalog(catalogName, catalog)
> >>         tableEnv.useCatalog(catalogName)
> >>
> >>
> >>
> >>
> >>         //删除流表
> >>         tableEnv.executeSql(
> >>             """
> >>               |DROP TABLE IF EXISTS `stream_db`.`datagen_user`
> >>             """.stripMargin)
> >>
> >>
> >>         //创建流表
> >>         tableEnv.executeSql(
> >>             """
> >>               |CREATE TABLE `stream_db`.`datagen_user` (
> >>               | id INT,
> >>               | name STRING,
> >>               | dt AS localtimestamp,
> >>               | WATERMARK FOR dt AS dt
> >>               |) WITH (
> >>               | 'connector' = 'datagen',
> >>               | 'rows-per-second'='10',
> >>               | 'fields.id.kind'='random',
> >>               | 'fields.id.min'='1',
> >>               | 'fields.id.max'='1000',
> >>               | 'fields.name.length'='5'
> >>               |)
> >>             """.stripMargin)
> >>
> >>
> >>         //切换hive方言
> >>         tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >>
> >>
> >>         //删除hive orc表
> >>         tableEnv.executeSql(
> >>             """
> >>               |DROP TABLE IF EXISTS `default`.`hive_user_orc`
> >>               |
> >>             """.stripMargin)
> >>
> >>
> >>         //创建hive orc表
> >>         tableEnv.executeSql(
> >>             """
> >>               |CREATE TABLE `default`.`hive_user_orc` (
> >>               |  id INT,
> >>               |  name STRING
> >>               |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute
> >> STRING ) STORED AS ORC TBLPROPERTIES (
> >>               |  'partition.time-extractor.timestamp-pattern'='$ts_dt
> >> $ts_hour:$ts_minute:00.000',
> >>               |  'sink.partition-commit.trigger'='partition-time',
> >>               |  'sink.partition-commit.delay'='1 min',
> >>               |
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >>               |)
> >>             """.stripMargin)
> >>
> >>
> >>         //删除hive parquet表
> >>         tableEnv.executeSql(
> >>             """
> >>               |DROP TABLE IF EXISTS `default`.`hive_user_parquet`
> >>             """.stripMargin)
> >>         //创建hive parquet表
> >>         tableEnv.executeSql(
> >>             """
> >>               |CREATE TABLE `default`.`hive_user_parquet` (
> >>               |  id INT,
> >>               |  name STRING
> >>               |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute
> >> STRING) STORED AS PARQUET TBLPROPERTIES (
> >>               |  'partition.time-extractor.timestamp-pattern'='$ts_dt
> >> $ts_hour:$ts_minute:00.000',
> >>               |  'sink.partition-commit.trigger'='partition-time',
> >>               |  'sink.partition-commit.delay'='1 min',
> >>               |
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >>               |)
> >>             """.stripMargin)
> >>         //设置flink方言
> >>         tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >>         //流式写入orc表
> >>         tableEnv.executeSql(
> >>             """
> >>               |INSERT INTO `default`.`hive_user_orc`
> >>               |SELECT
> >>               |    id,name,
> >>               |    DATE_FORMAT(dt,'yyyy-MM-dd'),
> >>               |    DATE_FORMAT(dt,'HH'),
> >>               |    DATE_FORMAT(dt,'mm')
> >>               |FROM
> >>               |    stream_db.datagen_user
> >>             """.stripMargin)
> >>         //流式写入parquet表
> >>         tableEnv.executeSql(
> >>             """
> >>               |INSERT INTO `default`.`hive_user_parquet`
> >>               |SELECT
> >>               |    id,name,
> >>               |    DATE_FORMAT(dt,'yyyy-MM-dd'),
> >>               |    DATE_FORMAT(dt,'HH'),
> >>               |    DATE_FORMAT(dt,'mm')
> >>               |FROM
> >>               |    stream_db.datagen_user
> >>             """.stripMargin)
> >>
> >>
> >>     }
> >>
> >>
> >> }
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-08-13 10:08:55,"flink小猪" <[hidden email]> 写道:
> >>
> >>
> >>
> >>
> >> 尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。
> >>
> >>
> >>
> >>
> >>
> >>
> 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。
> >> 2.没有设置table.exec.hive.fallback-mapred-writer。
> >> 以下是我的几个疑问
> >> 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
> >> 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片
> >>
> >> 这是orc生成的文件
> >>
> >> 这是parquet生成的文件
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-08-12 17:33:30,"Rui Li" <[hidden email]> 写道:
> >> >Hi,
> >> >
> >> >写orc表的作业有报错么?还是成功执行但是hive查不到数据呢?
> >> >看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer?
> >> >
> >> >On Wed, Aug 12, 2020 at 5:14 PM [hidden email] <
> [hidden email]>
> >> >wrote:
> >> >
> >> >>
> >> >>
> >> >>
> >>
> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到,
> >> >>
> >> >>
> >>
> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,
> >> >>
> >>
> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception
> >> >> in thread "main" java.lang.NoClassDefFoundError:
> >> >> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval
> >> >> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。
> >> >> ------------------------------
> >> >> [hidden email]
> >> >>
> >> >
> >> >
> >> >--
> >> >Best regards!
> >> >Rui Li
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >
> >
> >
> >--
> >Best regards!
> >Rui Li
>


--
Best, Jingsong Lee