Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

Rui Li
你好,

我用你提供的这个DDL没有复现这个问题,有更详细的操作步骤么?另外如果kafka表是通过create table like创建的话有个已知问题:
https://issues.apache.org/jira/browse/FLINK-21660

On Thu, Apr 1, 2021 at 4:08 PM HunterXHunter <[hidden email]> wrote:

> 当配置好HiveCatalog后,
> SQL-Cli 也可以查到hive库表信息
> 创建kafka表:
>
> create table test.test_kafka(
> word VARCHAR
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'xx',
>     'scan.startup.mode' = 'latest-offset',
>     'properties.bootstrap.servers' = 'xx',
>     'properties.group.id' = 'test',
>     'format' = 'json',
>     'json.ignore-parse-errors' = 'true'
> );
> 在 Hive里面可以查到改表
> hive > DESCRIBE FORMATTED test_kafka
>        ...........
>         is_generic              true
>        .........
>
> 但是我在 Flink SQL > select * from test.test_kafka;
> 报错:
> org.apache.flink.table.api.ValidationException: Unsupported options found
> for connector 'kafka'.
> Unsupported options:
> is_generic
> Supported options:
> connector
> format
> json.fail-on-missing-field
> json.ignore-parse-errors
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

HunterXHunter
你好,这个问题已经解决了。
我现在通过官方例子:


SET table.sql-dialect=default;

create table  flink_kafka(
sys_time bigint,
rt  AS TO_TIMESTAMP(FROM_UNIXTIME(sys_time / 1000, 'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR rt AS rt - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'xx',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = '',
    'properties.group.id' = 'test-sql',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true'
);

SET table.sql-dialect=hive;

CREATE TABLE hive_table (
  sys_time bigint
) PARTITIONED BY (dt STRING, hr STRING) STORED AS orc TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.delay'='0s',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);


INSERT INTO hive_table SELECT sys_time, DATE_FORMAT(rt, 'yyyy-MM-dd') as dt,
DATE_FORMAT(rt, 'HH') as hr  FROM flink_kafka;

发现数据一直无法写入hive。程序没有报错,
select * from flink_kafka;是有数据的。
但是hive_table一直没有数据,
我发送各个时间段的数据,watermark应该也是超过了分区时间的,但是hive_table一直没有数据




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

HunterXHunter
In reply to this post by Rui Li
查看hdfs文件:
分区一直是这样的一个文件,没有生成 _SUCCESS文件
.part-40a2c94d-0437-4666-8d43-31c908aaa02e-0-0.inprogress.73dcc10b-44f4-47e3-abac-0c14bd59f9c9



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

求助:通过实时 Pipeline 的手段消费 Hive Table 报java.lang.ArrayIndexOutOfBoundsException: -1,谢谢!

samuel.qiu@ubtrobot.com
In reply to this post by Rui Li
你好:
1. 实时通过读KAFKA,然后将数据写入了hive,建一张hive表,format 是 Parquet,是按天、小时、分钟来分区;
   
2. 通过实时 Pipeline 的手段消费 Hive Table 报java.lang.ArrayIndexOutOfBoundsException: -1
  在flink sql client下:  
  1)直接select 所有字段,是没有问题,可以正常读出所有数据。
      执行:  select *
 from ubtCatalog.ubtHive.event_all_dwd  
 /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include'='all', 'streaming-source.monitor-interval'='5s', 'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01') */
 ;

2) 在1)基础上加上统计函数,一直报莫名的错,java.lang.ArrayIndexOutOfBoundsException: -1
  执行:  select count(xubtappid)
 from ubtCatalog.ubtHive.event_all_dwd  
 /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include'='all', 'streaming-source.monitor-interval'='5s', 'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01') */
 ;

具体报错信息如下:
2021-04-02 10:06:26
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240)
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:469)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: HiveSource-ubtHive.event_all_dwd' (operator bc764cd8ddf7a0cff126f51c16239658).
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:466)
    at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:240)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:247)
    at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:44)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Failed to enumerate files
    at org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator.handleNewSplits(ContinuousHiveSplitEnumerator.java:148)
    at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$4(ExecutorNotifier.java:135)
    at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
    ... 3 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
    at org.apache.flink.connectors.hive.util.HivePartitionUtils.toHiveTablePartition(HivePartitionUtils.java:167)
    at org.apache.flink.connectors.hive.HiveTableSource$HiveContinuousPartitionFetcherContext.toHiveTablePartition(HiveTableSource.java:388)
    at org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator$PartitionMonitor.call(ContinuousHiveSplitEnumerator.java:224)
    at org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator$PartitionMonitor.call(ContinuousHiveSplitEnumerator.java:172)
    at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$5(ExecutorNotifier.java:132)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    ... 3 more







Best regards!
samuel
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

HunterXHunter
In reply to this post by Rui Li
重要: 在流模式下使用 FileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint
被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。

在官方文档找到了这个,必须要有checkpoint才行,但是我 手动savepoint之后,虽然有sucess文件,但是没有数据



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

HunterXHunter
但是用process-time是有数据的,目前用partition-time一直没成功写出过数据



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

Rui Li
用partition-time的话是用watermark与分区字段的timestamp对比来触发提交的,因此还需要你的source有watermark。

On Fri, Apr 16, 2021 at 9:32 AM HunterXHunter <[hidden email]> wrote:

> 但是用process-time是有数据的,目前用partition-time一直没成功写出过数据
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

HunterXHunter
在ddl的时候设置了 watermark。在任务页面查看watermark的时候一直没有更新watermark



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

Rui Li
可以发一下具体的SQL语句么(包括DDL和insert)?

On Wed, Apr 21, 2021 at 5:46 PM HunterXHunter <[hidden email]> wrote:

> 在ddl的时候设置了 watermark。在任务页面查看watermark的时候一直没有更新watermark
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--
Best regards!
Rui Li