Flink SQL 新手问题,RowTime field should not be null, please convert it to a non-null long value

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL 新手问题,RowTime field should not be null, please convert it to a non-null long value

Enzo wang
请各位帮忙看一下是什么问题? 

数据流如下:
Apache -> Logstash -> Kafka -> Flink ->ES -> Kibana

日志到Kafka里面已经为JSON,格式如下:
{
   "path":"/logs/user_conn_speed.log.1",
   "bytes_received":"8597",
   "ts":"2020-05-25T08:51:15Z",
   "message":"20.228.255.68 183685 2 10701 3 [2020-05-25T08:51:15Z] \"GET /speed.gif HTTP/1.1\" 200 8597",
   "client":"20.228.255.68",
   "uid":"183685",
   "ver_id":"3",
   "status_code":"200",
   "type":"logs",
   "conn_speed_ms":"10701",
   "host":"81b034ef6c72",
   "@timestamp":"2020-05-25T00:51:16.267Z",
   "request":"/speed.gif",
   "@version":"1",
   "device_id":"2",
   "http_ver":"1.1"
}

Flink SQL 中Kafka源表DDL:
CREATE TABLE user_conn_speed_log (
uid BIGINT,
device_id INT,
ver_id INT,
conn_speed_ms INT,
client STRING,
http_ver STRING,
status_code INT,
ts TIMESTAMP(3),
proctime as PROCTIME(),
WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_conn_speed_log',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'json'
);

ES 表:
CREATE TABLE log_per_sec (
    window_start VARCHAR,
    window_end VARCHAR,
    log_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '6', 
    'connector.hosts' = 'http://localhost:9200',  
    'connector.index' = 'user_conn_speed_log',  
    'connector.document-type' = 'logs_per_sec', 
    'connector.bulk-flush.max-actions' = '1', 
    'format.type' = 'json', 
    'update-mode' = 'append'
);

Flink SQL命令:

Flink SQL> INSERT INTO log_per_sec
> SELECT
>   CAST((TUMBLE_START(ts, INTERVAL '1' SECOND)) as VARCHAR) as window_start,
>   CAST((TUMBLE_END(ts, INTERVAL '1' SECOND)) as VARCHAR) as window_end,
>   count(*) as log_cnt
> FROM user_conn_speed_log
> GROUP BY TUMBLE(ts, INTERVAL '1' SECOND);
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 0f8d982d150c9fcb4ea5e78a8d7b2d85

Flink 报错:

2020-05-25 08:52:53
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at jdk.internal.reflect.GeneratedMethodAccessor86.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:567)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-null long value.
    at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:105)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
    at StreamExecCalc$550.processElement(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
    at SourceConversion$538.processElement(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)

截屏:



flink.jp2 (651K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 新手问题,RowTime field should not be null, please convert it to a non-null long value

Leonard Xu
Hi,
这个报错信息应该挺明显了,eventTime是不能为null的,请检查下Kafka里的数据ts字段是不是有null值或者没有这个字段的情况,如果是可以用个简单udf处理下没有值时ts需要指定一个值。

祝好,
Leonard Xu

> 在 2020年5月25日,09:52,Enzo wang <[hidden email]> 写道:
>
> 请各位帮忙看一下是什么问题?
>
> 数据流如下:
> Apache -> Logstash -> Kafka -> Flink ->ES -> Kibana
>
> 日志到Kafka里面已经为JSON,格式如下:
> {
>    "path":"/logs/user_conn_speed.log.1",
>    "bytes_received":"8597",
>    "ts":"2020-05-25T08:51:15Z",
>    "message":"20.228.255.68 183685 2 10701 3 [2020-05-25T08:51:15Z] \"GET /speed.gif HTTP/1.1\" 200 8597",
>    "client":"20.228.255.68",
>    "uid":"183685",
>    "ver_id":"3",
>    "status_code":"200",
>    "type":"logs",
>    "conn_speed_ms":"10701",
>    "host":"81b034ef6c72",
>    "@timestamp":"2020-05-25T00:51:16.267Z",
>    "request":"/speed.gif",
>    "@version":"1",
>    "device_id":"2",
>    "http_ver":"1.1"
> }
>
> Flink SQL 中Kafka源表DDL:
> CREATE TABLE user_conn_speed_log (
>     uid BIGINT,
>     device_id INT,
>     ver_id INT,
>     conn_speed_ms INT,
>     client STRING,
>     http_ver STRING,
>     status_code INT,
>     ts TIMESTAMP(3),
>     proctime as PROCTIME(),  
>     WATERMARK FOR ts as ts - INTERVAL '5' SECOND
> ) WITH (
>     'connector.type' = 'kafka',  
>     'connector.version' = 'universal',
>     'connector.topic' = 'user_conn_speed_log',
>     'connector.startup-mode' = 'earliest-offset',
>     'connector.properties.zookeeper.connect' = 'localhost:2181',  
>     'connector.properties.bootstrap.servers' = 'localhost:9092',
>     'format.type' = 'json'  
> );
>
> ES 表:
> CREATE TABLE log_per_sec (
>     window_start VARCHAR,
>     window_end VARCHAR,
>     log_cnt BIGINT
> ) WITH (
>     'connector.type' = 'elasticsearch',
>     'connector.version' = '6',
>     'connector.hosts' = 'http://localhost:9200 <http://localhost:9200/>',  
>     'connector.index' = 'user_conn_speed_log',  
>     'connector.document-type' = 'logs_per_sec',
>     'connector.bulk-flush.max-actions' = '1',
>     'format.type' = 'json',
>     'update-mode' = 'append'
> );
>
> Flink SQL命令:
>
> Flink SQL> INSERT INTO log_per_sec
> > SELECT
> >   CAST((TUMBLE_START(ts, INTERVAL '1' SECOND)) as VARCHAR) as window_start,
> >   CAST((TUMBLE_END(ts, INTERVAL '1' SECOND)) as VARCHAR) as window_end,
> >   count(*) as log_cnt
> > FROM user_conn_speed_log
> > GROUP BY TUMBLE(ts, INTERVAL '1' SECOND);
> [INFO] Submitting SQL update statement to the cluster...
> [INFO] Table update statement has been successfully submitted to the cluster:
> Job ID: 0f8d982d150c9fcb4ea5e78a8d7b2d85
>
> Flink 报错:
>
> 2020-05-25 08:52:53
> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>     at jdk.internal.reflect.GeneratedMethodAccessor86.invoke(Unknown Source)
>     at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:567)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>     at akka.japi.pf <http://akka.japi.pf/>.UnitCaseStatement.apply(CaseStatements.scala:26)
>     at akka.japi.pf <http://akka.japi.pf/>.UnitCaseStatement.apply(CaseStatements.scala:21)
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>     at akka.japi.pf <http://akka.japi.pf/>.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-null long value.
>     at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:105)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>     at StreamExecCalc$550.processElement(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>     at SourceConversion$538.processElement(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>     at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>     at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>     at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>     at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>     at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>
> 截屏:
>
>
> <flink.jp2>

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 新手问题,RowTime field should not be null, please convert it to a non-null long value

Enzo wang
Hi Leonard,

谢谢,你说的是对的,之前kafka有一些脏数据,没有ts字段,导致的问题,将
'connector.startup-mode' = 'earliest-offset',
改变成
'connector.startup-mode' = 'latest-offset',
就可用了。

还有个小问题,类似上面的问题,如何写flink SQL跳过没有ts字段的kafka消息?

Cheers,
Enzo

On Mon, 25 May 2020 at 10:01, Leonard Xu <[hidden email]> wrote:

> Hi,
>
> 这个报错信息应该挺明显了,eventTime是不能为null的,请检查下Kafka里的数据ts字段是不是有null值或者没有这个字段的情况,如果是可以用个简单udf处理下没有值时ts需要指定一个值。
>
> 祝好,
> Leonard Xu
>
> > 在 2020年5月25日,09:52,Enzo wang <[hidden email]> 写道:
> >
> > 请各位帮忙看一下是什么问题?
> >
> > 数据流如下:
> > Apache -> Logstash -> Kafka -> Flink ->ES -> Kibana
> >
> > 日志到Kafka里面已经为JSON,格式如下:
> > {
> >    "path":"/logs/user_conn_speed.log.1",
> >    "bytes_received":"8597",
> >    "ts":"2020-05-25T08:51:15Z",
> >    "message":"20.228.255.68 183685 2 10701 3 [2020-05-25T08:51:15Z]
> \"GET /speed.gif HTTP/1.1\" 200 8597",
> >    "client":"20.228.255.68",
> >    "uid":"183685",
> >    "ver_id":"3",
> >    "status_code":"200",
> >    "type":"logs",
> >    "conn_speed_ms":"10701",
> >    "host":"81b034ef6c72",
> >    "@timestamp":"2020-05-25T00:51:16.267Z",
> >    "request":"/speed.gif",
> >    "@version":"1",
> >    "device_id":"2",
> >    "http_ver":"1.1"
> > }
> >
> > Flink SQL 中Kafka源表DDL:
> > CREATE TABLE user_conn_speed_log (
> >     uid BIGINT,
> >     device_id INT,
> >     ver_id INT,
> >     conn_speed_ms INT,
> >     client STRING,
> >     http_ver STRING,
> >     status_code INT,
> >     ts TIMESTAMP(3),
> >     proctime as PROCTIME(),
> >     WATERMARK FOR ts as ts - INTERVAL '5' SECOND
> > ) WITH (
> >     'connector.type' = 'kafka',
> >     'connector.version' = 'universal',
> >     'connector.topic' = 'user_conn_speed_log',
> >     'connector.startup-mode' = 'earliest-offset',
> >     'connector.properties.zookeeper.connect' = 'localhost:2181',
> >     'connector.properties.bootstrap.servers' = 'localhost:9092',
> >     'format.type' = 'json'
> > );
> >
> > ES 表:
> > CREATE TABLE log_per_sec (
> >     window_start VARCHAR,
> >     window_end VARCHAR,
> >     log_cnt BIGINT
> > ) WITH (
> >     'connector.type' = 'elasticsearch',
> >     'connector.version' = '6',
> >     'connector.hosts' = 'http://localhost:9200 <http://localhost:9200/>',
>
> >     'connector.index' = 'user_conn_speed_log',
> >     'connector.document-type' = 'logs_per_sec',
> >     'connector.bulk-flush.max-actions' = '1',
> >     'format.type' = 'json',
> >     'update-mode' = 'append'
> > );
> >
> > Flink SQL命令:
> >
> > Flink SQL> INSERT INTO log_per_sec
> > > SELECT
> > >   CAST((TUMBLE_START(ts, INTERVAL '1' SECOND)) as VARCHAR) as
> window_start,
> > >   CAST((TUMBLE_END(ts, INTERVAL '1' SECOND)) as VARCHAR) as window_end,
> > >   count(*) as log_cnt
> > > FROM user_conn_speed_log
> > > GROUP BY TUMBLE(ts, INTERVAL '1' SECOND);
> > [INFO] Submitting SQL update statement to the cluster...
> > [INFO] Table update statement has been successfully submitted to the
> cluster:
> > Job ID: 0f8d982d150c9fcb4ea5e78a8d7b2d85
> >
> > Flink 报错:
> >
> > 2020-05-25 08:52:53
> > org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> >     at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> >     at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> >     at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> >     at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> >     at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> >     at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
> >     at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
> >     at jdk.internal.reflect.GeneratedMethodAccessor86.invoke(Unknown
> Source)
> >     at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >     at java.base/java.lang.reflect.Method.invoke(Method.java:567)
> >     at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> >     at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> >     at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> >     at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> >     at akka.japi.pf <http://akka.japi.pf/
> >.UnitCaseStatement.apply(CaseStatements.scala:26)
> >     at akka.japi.pf <http://akka.japi.pf/
> >.UnitCaseStatement.apply(CaseStatements.scala:21)
> >     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> >     at akka.japi.pf <http://akka.japi.pf/
> >.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> >     at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> >     at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >     at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> >     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> >     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> >     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> >     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> >     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> >     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> >     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >     at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >     at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >     at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: java.lang.RuntimeException: RowTime field should not be null,
> please convert it to a non-null long value.
> >     at
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:105)
> >     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> >     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> >     at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> >     at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> >     at StreamExecCalc$550.processElement(Unknown Source)
> >     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> >     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> >     at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> >     at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> >     at SourceConversion$538.processElement(Unknown Source)
> >     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> >     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> >     at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> >     at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> >     at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> >     at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> >     at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> >     at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
> >     at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
> >     at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> >     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> >     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> >     at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> >
> > 截屏:
> >
> >
> > <flink.jp2>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 新手问题,RowTime field should not be null, please convert it to a non-null long value

Leonard Xu
Hi,


> 还有个小问题,类似上面的问题,如何写flink SQL跳过没有ts字段的kafka消息?

 有解析异常就fail 还是 跳过解析异常的record,json forma有两个参数可以配置:
'format.fail-on-missing-field' = 'true',  -- optional: flag whether to fail if a field is missing or not,
                                            -- 'false' by default
'format.ignore-parse-errors' = 'true',    -- optional: skip fields and rows with parse errors instead of failing;
这两个参数不能同时为true。

祝好,
Leonard Xu


> Cheers,
> Enzo
>
> On Mon, 25 May 2020 at 10:01, Leonard Xu <[hidden email] <mailto:[hidden email]>> wrote:
>
>> Hi,
>>
>> 这个报错信息应该挺明显了,eventTime是不能为null的,请检查下Kafka里的数据ts字段是不是有null值或者没有这个字段的情况,如果是可以用个简单udf处理下没有值时ts需要指定一个值。
>>
>> 祝好,
>> Leonard Xu
>>
>>> 在 2020年5月25日,09:52,Enzo wang <[hidden email]> 写道:
>>>
>>> 请各位帮忙看一下是什么问题?
>>>
>>> 数据流如下:
>>> Apache -> Logstash -> Kafka -> Flink ->ES -> Kibana
>>>
>>> 日志到Kafka里面已经为JSON,格式如下:
>>> {
>>>   "path":"/logs/user_conn_speed.log.1",
>>>   "bytes_received":"8597",
>>>   "ts":"2020-05-25T08:51:15Z",
>>>   "message":"20.228.255.68 183685 2 10701 3 [2020-05-25T08:51:15Z]
>> \"GET /speed.gif HTTP/1.1\" 200 8597",
>>>   "client":"20.228.255.68",
>>>   "uid":"183685",
>>>   "ver_id":"3",
>>>   "status_code":"200",
>>>   "type":"logs",
>>>   "conn_speed_ms":"10701",
>>>   "host":"81b034ef6c72",
>>>   "@timestamp":"2020-05-25T00:51:16.267Z",
>>>   "request":"/speed.gif",
>>>   "@version":"1",
>>>   "device_id":"2",
>>>   "http_ver":"1.1"
>>> }
>>>
>>> Flink SQL 中Kafka源表DDL:
>>> CREATE TABLE user_conn_speed_log (
>>>    uid BIGINT,
>>>    device_id INT,
>>>    ver_id INT,
>>>    conn_speed_ms INT,
>>>    client STRING,
>>>    http_ver STRING,
>>>    status_code INT,
>>>    ts TIMESTAMP(3),
>>>    proctime as PROCTIME(),
>>>    WATERMARK FOR ts as ts - INTERVAL '5' SECOND
>>> ) WITH (
>>>    'connector.type' = 'kafka',
>>>    'connector.version' = 'universal',
>>>    'connector.topic' = 'user_conn_speed_log',
>>>    'connector.startup-mode' = 'earliest-offset',
>>>    'connector.properties.zookeeper.connect' = 'localhost:2181',
>>>    'connector.properties.bootstrap.servers' = 'localhost:9092',
>>>    'format.type' = 'json'
>>> );
>>>
>>> ES 表:
>>> CREATE TABLE log_per_sec (
>>>    window_start VARCHAR,
>>>    window_end VARCHAR,
>>>    log_cnt BIGINT
>>> ) WITH (
>>>    'connector.type' = 'elasticsearch',
>>>    'connector.version' = '6',
>>>    'connector.hosts' = 'http://localhost:9200 <http://localhost:9200/> <http://localhost:9200/ <http://localhost:9200/>>',
>>
>>>    'connector.index' = 'user_conn_speed_log',
>>>    'connector.document-type' = 'logs_per_sec',
>>>    'connector.bulk-flush.max-actions' = '1',
>>>    'format.type' = 'json',
>>>    'update-mode' = 'append'
>>> );
>>>
>>> Flink SQL命令:
>>>
>>> Flink SQL> INSERT INTO log_per_sec
>>>> SELECT
>>>>  CAST((TUMBLE_START(ts, INTERVAL '1' SECOND)) as VARCHAR) as
>> window_start,
>>>>  CAST((TUMBLE_END(ts, INTERVAL '1' SECOND)) as VARCHAR) as window_end,
>>>>  count(*) as log_cnt
>>>> FROM user_conn_speed_log
>>>> GROUP BY TUMBLE(ts, INTERVAL '1' SECOND);
>>> [INFO] Submitting SQL update statement to the cluster...
>>> [INFO] Table update statement has been successfully submitted to the
>> cluster:
>>> Job ID: 0f8d982d150c9fcb4ea5e78a8d7b2d85
>>>
>>> Flink 报错:
>>>
>>> 2020-05-25 08:52:53
>>> org.apache.flink.runtime.JobException: Recovery is suppressed by
>> NoRestartBackoffTimeStrategy
>>>    at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>>>    at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>>>    at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>>>    at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>>>    at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>>>    at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
>>>    at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>>>    at jdk.internal.reflect.GeneratedMethodAccessor86.invoke(Unknown
>> Source)
>>>    at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>    at java.base/java.lang.reflect.Method.invoke(Method.java:567)
>>>    at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>>>    at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>>>    at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>    at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>    at akka.japi.pf <http://akka.japi.pf/ <http://akka.japi.pf/>
>>> .UnitCaseStatement.apply(CaseStatements.scala:26)
>>>    at akka.japi.pf <http://akka.japi.pf/ <http://akka.japi.pf/>
>>> .UnitCaseStatement.apply(CaseStatements.scala:21)
>>>    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>    at akka.japi.pf <http://akka.japi.pf/ <http://akka.japi.pf/>
>>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>    at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>    at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>    at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>    at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>    at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>    at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: java.lang.RuntimeException: RowTime field should not be null,
>> please convert it to a non-null long value.
>>>    at
>> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:105)
>>>    at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>>>    at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>>>    at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>>>    at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>>>    at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>>>    at StreamExecCalc$550.processElement(Unknown Source)
>>>    at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>>>    at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>>>    at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>>>    at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>>>    at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>>>    at SourceConversion$538.processElement(Unknown Source)
>>>    at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>>>    at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>>>    at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>>>    at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>>>    at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>>>    at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>>    at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>>    at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>>>    at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>>    at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>>    at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>    at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>    at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>    at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>>>
>>> 截屏:
>>>
>>>
>>> <flink.jp2>