请各位帮忙看一下是什么问题? 数据流如下: Apache -> Logstash -> Kafka -> Flink ->ES -> Kibana 日志到Kafka里面已经为JSON,格式如下: { "path":"/logs/user_conn_speed.log.1", "bytes_received":"8597", "ts":"2020-05-25T08:51:15Z", "message":"20.228.255.68 183685 2 10701 3 [2020-05-25T08:51:15Z] \"GET /speed.gif HTTP/1.1\" 200 8597", "client":"20.228.255.68", "uid":"183685", "ver_id":"3", "status_code":"200", "type":"logs", "conn_speed_ms":"10701", "host":"81b034ef6c72", "@timestamp":"2020-05-25T00:51:16.267Z", "request":"/speed.gif", "@version":"1", "device_id":"2", "http_ver":"1.1" } Flink SQL 中Kafka源表DDL: CREATE TABLE user_conn_speed_log ( uid BIGINT, device_id INT, ver_id INT, conn_speed_ms INT, client STRING, http_ver STRING, status_code INT, ts TIMESTAMP(3), proctime as PROCTIME(), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_conn_speed_log', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'format.type' = 'json' ); ES 表: CREATE TABLE log_per_sec ( window_start VARCHAR, window_end VARCHAR, log_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'user_conn_speed_log', 'connector.document-type' = 'logs_per_sec', 'connector.bulk-flush.max-actions' = '1', 'format.type' = 'json', 'update-mode' = 'append' ); Flink SQL命令: Flink SQL> INSERT INTO log_per_sec > SELECT > CAST((TUMBLE_START(ts, INTERVAL '1' SECOND)) as VARCHAR) as window_start, > CAST((TUMBLE_END(ts, INTERVAL '1' SECOND)) as VARCHAR) as window_end, > count(*) as log_cnt > FROM user_conn_speed_log > GROUP BY TUMBLE(ts, INTERVAL '1' SECOND); [INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Job ID: 0f8d982d150c9fcb4ea5e78a8d7b2d85 Flink 报错: 2020-05-25 08:52:53 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at jdk.internal.reflect.GeneratedMethodAccessor86.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:567) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-null long value. at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:105) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at StreamExecCalc$550.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SourceConversion$538.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) 截屏: flink.jp2 (651K) Download Attachment |
Hi,
这个报错信息应该挺明显了,eventTime是不能为null的,请检查下Kafka里的数据ts字段是不是有null值或者没有这个字段的情况,如果是可以用个简单udf处理下没有值时ts需要指定一个值。 祝好, Leonard Xu > 在 2020年5月25日,09:52,Enzo wang <[hidden email]> 写道: > > 请各位帮忙看一下是什么问题? > > 数据流如下: > Apache -> Logstash -> Kafka -> Flink ->ES -> Kibana > > 日志到Kafka里面已经为JSON,格式如下: > { > "path":"/logs/user_conn_speed.log.1", > "bytes_received":"8597", > "ts":"2020-05-25T08:51:15Z", > "message":"20.228.255.68 183685 2 10701 3 [2020-05-25T08:51:15Z] \"GET /speed.gif HTTP/1.1\" 200 8597", > "client":"20.228.255.68", > "uid":"183685", > "ver_id":"3", > "status_code":"200", > "type":"logs", > "conn_speed_ms":"10701", > "host":"81b034ef6c72", > "@timestamp":"2020-05-25T00:51:16.267Z", > "request":"/speed.gif", > "@version":"1", > "device_id":"2", > "http_ver":"1.1" > } > > Flink SQL 中Kafka源表DDL: > CREATE TABLE user_conn_speed_log ( > uid BIGINT, > device_id INT, > ver_id INT, > conn_speed_ms INT, > client STRING, > http_ver STRING, > status_code INT, > ts TIMESTAMP(3), > proctime as PROCTIME(), > WATERMARK FOR ts as ts - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'user_conn_speed_log', > 'connector.startup-mode' = 'earliest-offset', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'format.type' = 'json' > ); > > ES 表: > CREATE TABLE log_per_sec ( > window_start VARCHAR, > window_end VARCHAR, > log_cnt BIGINT > ) WITH ( > 'connector.type' = 'elasticsearch', > 'connector.version' = '6', > 'connector.hosts' = 'http://localhost:9200 <http://localhost:9200/>', > 'connector.index' = 'user_conn_speed_log', > 'connector.document-type' = 'logs_per_sec', > 'connector.bulk-flush.max-actions' = '1', > 'format.type' = 'json', > 'update-mode' = 'append' > ); > > Flink SQL命令: > > Flink SQL> INSERT INTO log_per_sec > > SELECT > > CAST((TUMBLE_START(ts, INTERVAL '1' SECOND)) as VARCHAR) as window_start, > > CAST((TUMBLE_END(ts, INTERVAL '1' SECOND)) as VARCHAR) as window_end, > > count(*) as log_cnt > > FROM user_conn_speed_log > > GROUP BY TUMBLE(ts, INTERVAL '1' SECOND); > [INFO] Submitting SQL update statement to the cluster... > [INFO] Table update statement has been successfully submitted to the cluster: > Job ID: 0f8d982d150c9fcb4ea5e78a8d7b2d85 > > Flink 报错: > > 2020-05-25 08:52:53 > org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) > at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) > at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) > at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496) > at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) > at jdk.internal.reflect.GeneratedMethodAccessor86.invoke(Unknown Source) > at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf <http://akka.japi.pf/>.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf <http://akka.japi.pf/>.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf <http://akka.japi.pf/>.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-null long value. > at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:105) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at StreamExecCalc$550.processElement(Unknown Source) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at SourceConversion$538.processElement(Unknown Source) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) > at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > > 截屏: > > > <flink.jp2> |
Hi Leonard,
谢谢,你说的是对的,之前kafka有一些脏数据,没有ts字段,导致的问题,将 'connector.startup-mode' = 'earliest-offset', 改变成 'connector.startup-mode' = 'latest-offset', 就可用了。 还有个小问题,类似上面的问题,如何写flink SQL跳过没有ts字段的kafka消息? Cheers, Enzo On Mon, 25 May 2020 at 10:01, Leonard Xu <[hidden email]> wrote: > Hi, > > 这个报错信息应该挺明显了,eventTime是不能为null的,请检查下Kafka里的数据ts字段是不是有null值或者没有这个字段的情况,如果是可以用个简单udf处理下没有值时ts需要指定一个值。 > > 祝好, > Leonard Xu > > > 在 2020年5月25日,09:52,Enzo wang <[hidden email]> 写道: > > > > 请各位帮忙看一下是什么问题? > > > > 数据流如下: > > Apache -> Logstash -> Kafka -> Flink ->ES -> Kibana > > > > 日志到Kafka里面已经为JSON,格式如下: > > { > > "path":"/logs/user_conn_speed.log.1", > > "bytes_received":"8597", > > "ts":"2020-05-25T08:51:15Z", > > "message":"20.228.255.68 183685 2 10701 3 [2020-05-25T08:51:15Z] > \"GET /speed.gif HTTP/1.1\" 200 8597", > > "client":"20.228.255.68", > > "uid":"183685", > > "ver_id":"3", > > "status_code":"200", > > "type":"logs", > > "conn_speed_ms":"10701", > > "host":"81b034ef6c72", > > "@timestamp":"2020-05-25T00:51:16.267Z", > > "request":"/speed.gif", > > "@version":"1", > > "device_id":"2", > > "http_ver":"1.1" > > } > > > > Flink SQL 中Kafka源表DDL: > > CREATE TABLE user_conn_speed_log ( > > uid BIGINT, > > device_id INT, > > ver_id INT, > > conn_speed_ms INT, > > client STRING, > > http_ver STRING, > > status_code INT, > > ts TIMESTAMP(3), > > proctime as PROCTIME(), > > WATERMARK FOR ts as ts - INTERVAL '5' SECOND > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.topic' = 'user_conn_speed_log', > > 'connector.startup-mode' = 'earliest-offset', > > 'connector.properties.zookeeper.connect' = 'localhost:2181', > > 'connector.properties.bootstrap.servers' = 'localhost:9092', > > 'format.type' = 'json' > > ); > > > > ES 表: > > CREATE TABLE log_per_sec ( > > window_start VARCHAR, > > window_end VARCHAR, > > log_cnt BIGINT > > ) WITH ( > > 'connector.type' = 'elasticsearch', > > 'connector.version' = '6', > > 'connector.hosts' = 'http://localhost:9200 <http://localhost:9200/>', > > > 'connector.index' = 'user_conn_speed_log', > > 'connector.document-type' = 'logs_per_sec', > > 'connector.bulk-flush.max-actions' = '1', > > 'format.type' = 'json', > > 'update-mode' = 'append' > > ); > > > > Flink SQL命令: > > > > Flink SQL> INSERT INTO log_per_sec > > > SELECT > > > CAST((TUMBLE_START(ts, INTERVAL '1' SECOND)) as VARCHAR) as > window_start, > > > CAST((TUMBLE_END(ts, INTERVAL '1' SECOND)) as VARCHAR) as window_end, > > > count(*) as log_cnt > > > FROM user_conn_speed_log > > > GROUP BY TUMBLE(ts, INTERVAL '1' SECOND); > > [INFO] Submitting SQL update statement to the cluster... > > [INFO] Table update statement has been successfully submitted to the > cluster: > > Job ID: 0f8d982d150c9fcb4ea5e78a8d7b2d85 > > > > Flink 报错: > > > > 2020-05-25 08:52:53 > > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) > > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) > > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496) > > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) > > at jdk.internal.reflect.GeneratedMethodAccessor86.invoke(Unknown > Source) > > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > > at akka.japi.pf <http://akka.japi.pf/ > >.UnitCaseStatement.apply(CaseStatements.scala:26) > > at akka.japi.pf <http://akka.japi.pf/ > >.UnitCaseStatement.apply(CaseStatements.scala:21) > > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > > at akka.japi.pf <http://akka.japi.pf/ > >.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Caused by: java.lang.RuntimeException: RowTime field should not be null, > please convert it to a non-null long value. > > at > org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:105) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > > at StreamExecCalc$550.processElement(Unknown Source) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > > at SourceConversion$538.processElement(Unknown Source) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) > > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > > > > 截屏: > > > > > > <flink.jp2> > > |
Hi,
> 还有个小问题,类似上面的问题,如何写flink SQL跳过没有ts字段的kafka消息? 有解析异常就fail 还是 跳过解析异常的record,json forma有两个参数可以配置: 'format.fail-on-missing-field' = 'true', -- optional: flag whether to fail if a field is missing or not, -- 'false' by default 'format.ignore-parse-errors' = 'true', -- optional: skip fields and rows with parse errors instead of failing; 这两个参数不能同时为true。 祝好, Leonard Xu > Cheers, > Enzo > > On Mon, 25 May 2020 at 10:01, Leonard Xu <[hidden email] <mailto:[hidden email]>> wrote: > >> Hi, >> >> 这个报错信息应该挺明显了,eventTime是不能为null的,请检查下Kafka里的数据ts字段是不是有null值或者没有这个字段的情况,如果是可以用个简单udf处理下没有值时ts需要指定一个值。 >> >> 祝好, >> Leonard Xu >> >>> 在 2020年5月25日,09:52,Enzo wang <[hidden email]> 写道: >>> >>> 请各位帮忙看一下是什么问题? >>> >>> 数据流如下: >>> Apache -> Logstash -> Kafka -> Flink ->ES -> Kibana >>> >>> 日志到Kafka里面已经为JSON,格式如下: >>> { >>> "path":"/logs/user_conn_speed.log.1", >>> "bytes_received":"8597", >>> "ts":"2020-05-25T08:51:15Z", >>> "message":"20.228.255.68 183685 2 10701 3 [2020-05-25T08:51:15Z] >> \"GET /speed.gif HTTP/1.1\" 200 8597", >>> "client":"20.228.255.68", >>> "uid":"183685", >>> "ver_id":"3", >>> "status_code":"200", >>> "type":"logs", >>> "conn_speed_ms":"10701", >>> "host":"81b034ef6c72", >>> "@timestamp":"2020-05-25T00:51:16.267Z", >>> "request":"/speed.gif", >>> "@version":"1", >>> "device_id":"2", >>> "http_ver":"1.1" >>> } >>> >>> Flink SQL 中Kafka源表DDL: >>> CREATE TABLE user_conn_speed_log ( >>> uid BIGINT, >>> device_id INT, >>> ver_id INT, >>> conn_speed_ms INT, >>> client STRING, >>> http_ver STRING, >>> status_code INT, >>> ts TIMESTAMP(3), >>> proctime as PROCTIME(), >>> WATERMARK FOR ts as ts - INTERVAL '5' SECOND >>> ) WITH ( >>> 'connector.type' = 'kafka', >>> 'connector.version' = 'universal', >>> 'connector.topic' = 'user_conn_speed_log', >>> 'connector.startup-mode' = 'earliest-offset', >>> 'connector.properties.zookeeper.connect' = 'localhost:2181', >>> 'connector.properties.bootstrap.servers' = 'localhost:9092', >>> 'format.type' = 'json' >>> ); >>> >>> ES 表: >>> CREATE TABLE log_per_sec ( >>> window_start VARCHAR, >>> window_end VARCHAR, >>> log_cnt BIGINT >>> ) WITH ( >>> 'connector.type' = 'elasticsearch', >>> 'connector.version' = '6', >>> 'connector.hosts' = 'http://localhost:9200 <http://localhost:9200/> <http://localhost:9200/ <http://localhost:9200/>>', >> >>> 'connector.index' = 'user_conn_speed_log', >>> 'connector.document-type' = 'logs_per_sec', >>> 'connector.bulk-flush.max-actions' = '1', >>> 'format.type' = 'json', >>> 'update-mode' = 'append' >>> ); >>> >>> Flink SQL命令: >>> >>> Flink SQL> INSERT INTO log_per_sec >>>> SELECT >>>> CAST((TUMBLE_START(ts, INTERVAL '1' SECOND)) as VARCHAR) as >> window_start, >>>> CAST((TUMBLE_END(ts, INTERVAL '1' SECOND)) as VARCHAR) as window_end, >>>> count(*) as log_cnt >>>> FROM user_conn_speed_log >>>> GROUP BY TUMBLE(ts, INTERVAL '1' SECOND); >>> [INFO] Submitting SQL update statement to the cluster... >>> [INFO] Table update statement has been successfully submitted to the >> cluster: >>> Job ID: 0f8d982d150c9fcb4ea5e78a8d7b2d85 >>> >>> Flink 报错: >>> >>> 2020-05-25 08:52:53 >>> org.apache.flink.runtime.JobException: Recovery is suppressed by >> NoRestartBackoffTimeStrategy >>> at >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) >>> at >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) >>> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) >>> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) >>> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) >>> at >> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496) >>> at >> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) >>> at jdk.internal.reflect.GeneratedMethodAccessor86.invoke(Unknown >> Source) >>> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.base/java.lang.reflect.Method.invoke(Method.java:567) >>> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) >>> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) >>> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >>> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>> at akka.japi.pf <http://akka.japi.pf/ <http://akka.japi.pf/> >>> .UnitCaseStatement.apply(CaseStatements.scala:26) >>> at akka.japi.pf <http://akka.japi.pf/ <http://akka.japi.pf/> >>> .UnitCaseStatement.apply(CaseStatements.scala:21) >>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>> at akka.japi.pf <http://akka.japi.pf/ <http://akka.japi.pf/> >>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>> at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>> at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>> at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> Caused by: java.lang.RuntimeException: RowTime field should not be null, >> please convert it to a non-null long value. >>> at >> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:105) >>> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) >>> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) >>> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) >>> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) >>> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) >>> at StreamExecCalc$550.processElement(Unknown Source) >>> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) >>> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) >>> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) >>> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) >>> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) >>> at SourceConversion$538.processElement(Unknown Source) >>> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) >>> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) >>> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) >>> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) >>> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) >>> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) >>> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) >>> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) >>> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) >>> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) >>> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) >>> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) >>> >>> 截屏: >>> >>> >>> <flink.jp2> |
Free forum by Nabble | Edit this page |