你好,
我用你提供的这个DDL没有复现这个问题,有更详细的操作步骤么?另外如果kafka表是通过create table like创建的话有个已知问题: https://issues.apache.org/jira/browse/FLINK-21660 On Thu, Apr 1, 2021 at 4:08 PM HunterXHunter <[hidden email]> wrote: > 当配置好HiveCatalog后, > SQL-Cli 也可以查到hive库表信息 > 创建kafka表: > > create table test.test_kafka( > word VARCHAR > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'xx', > 'scan.startup.mode' = 'latest-offset', > 'properties.bootstrap.servers' = 'xx', > 'properties.group.id' = 'test', > 'format' = 'json', > 'json.ignore-parse-errors' = 'true' > ); > 在 Hive里面可以查到改表 > hive > DESCRIBE FORMATTED test_kafka > ........... > is_generic true > ......... > > 但是我在 Flink SQL > select * from test.test_kafka; > 报错: > org.apache.flink.table.api.ValidationException: Unsupported options found > for connector 'kafka'. > Unsupported options: > is_generic > Supported options: > connector > format > json.fail-on-missing-field > json.ignore-parse-errors > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li |
你好,这个问题已经解决了。
我现在通过官方例子: SET table.sql-dialect=default; create table flink_kafka( sys_time bigint, rt AS TO_TIMESTAMP(FROM_UNIXTIME(sys_time / 1000, 'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR rt AS rt - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '', 'properties.group.id' = 'test-sql', 'format' = 'json', 'json.ignore-parse-errors' = 'true' ); SET table.sql-dialect=hive; CREATE TABLE hive_table ( sys_time bigint ) PARTITIONED BY (dt STRING, hr STRING) STORED AS orc TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='process-time', 'sink.partition-commit.delay'='0s', 'sink.partition-commit.policy.kind'='metastore,success-file' ); INSERT INTO hive_table SELECT sys_time, DATE_FORMAT(rt, 'yyyy-MM-dd') as dt, DATE_FORMAT(rt, 'HH') as hr FROM flink_kafka; 发现数据一直无法写入hive。程序没有报错, select * from flink_kafka;是有数据的。 但是hive_table一直没有数据, 我发送各个时间段的数据,watermark应该也是超过了分区时间的,但是hive_table一直没有数据 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by Rui Li
查看hdfs文件:
分区一直是这样的一个文件,没有生成 _SUCCESS文件 .part-40a2c94d-0437-4666-8d43-31c908aaa02e-0-0.inprogress.73dcc10b-44f4-47e3-abac-0c14bd59f9c9 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by Rui Li
你好:
1. 实时通过读KAFKA,然后将数据写入了hive,建一张hive表,format 是 Parquet,是按天、小时、分钟来分区; 2. 通过实时 Pipeline 的手段消费 Hive Table 报java.lang.ArrayIndexOutOfBoundsException: -1 在flink sql client下: 1)直接select 所有字段,是没有问题,可以正常读出所有数据。 执行: select * from ubtCatalog.ubtHive.event_all_dwd /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include'='all', 'streaming-source.monitor-interval'='5s', 'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01') */ ; 2) 在1)基础上加上统计函数,一直报莫名的错,java.lang.ArrayIndexOutOfBoundsException: -1 执行: select count(xubtappid) from ubtCatalog.ubtHive.event_all_dwd /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include'='all', 'streaming-source.monitor-interval'='5s', 'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01') */ ; 具体报错信息如下: 2021-04-02 10:06:26 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:469) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: HiveSource-ubtHive.event_all_dwd' (operator bc764cd8ddf7a0cff126f51c16239658). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:466) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:240) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:247) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:44) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Failed to enumerate files at org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator.handleNewSplits(ContinuousHiveSplitEnumerator.java:148) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$4(ExecutorNotifier.java:135) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) ... 3 more Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at org.apache.flink.connectors.hive.util.HivePartitionUtils.toHiveTablePartition(HivePartitionUtils.java:167) at org.apache.flink.connectors.hive.HiveTableSource$HiveContinuousPartitionFetcherContext.toHiveTablePartition(HiveTableSource.java:388) at org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator$PartitionMonitor.call(ContinuousHiveSplitEnumerator.java:224) at org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator$PartitionMonitor.call(ContinuousHiveSplitEnumerator.java:172) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$5(ExecutorNotifier.java:132) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ... 3 more Best regards! samuel |
In reply to this post by Rui Li
重要: 在流模式下使用 FileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint
被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。 在官方文档找到了这个,必须要有checkpoint才行,但是我 手动savepoint之后,虽然有sucess文件,但是没有数据 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
但是用process-time是有数据的,目前用partition-time一直没成功写出过数据
-- Sent from: http://apache-flink.147419.n8.nabble.com/ |
用partition-time的话是用watermark与分区字段的timestamp对比来触发提交的,因此还需要你的source有watermark。
On Fri, Apr 16, 2021 at 9:32 AM HunterXHunter <[hidden email]> wrote: > 但是用process-time是有数据的,目前用partition-time一直没成功写出过数据 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li |
在ddl的时候设置了 watermark。在任务页面查看watermark的时候一直没有更新watermark
-- Sent from: http://apache-flink.147419.n8.nabble.com/ |
可以发一下具体的SQL语句么(包括DDL和insert)?
On Wed, Apr 21, 2021 at 5:46 PM HunterXHunter <[hidden email]> wrote: > 在ddl的时候设置了 watermark。在任务页面查看watermark的时候一直没有更新watermark > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best regards! Rui Li |
Free forum by Nabble | Edit this page |