Flink SQL使用Kafka自带的timestamp作为事件时间

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL使用Kafka自带的timestamp作为事件时间

sunfulin
Hi,
想问下Flink SQL在使用DDL创建Kafka Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性.
如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错:


CREATE TABLE user_behavior (
test_time TIMESTAMP(3),
user_id STRING ,
item_id STRING ,
category_id STRING ,
behavior STRING,
ts STRING,
proctime as PROCTIME() -- 通过计算列产生一个处理时间列
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'test', -- kafka topic
'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
--'connector.properties.group.id' = 'mytest',
'connector.properties.zookeeper.connect' = '168.61.113.170:2181', -- zookeeper 地址
'connector.properties.bootstrap.servers' = '168.61.113.170:9092', -- kafka broker 地址
'format.type' = 'json' -- 数据源格式为 json
,'schema.0.rowtime.timestamps.type' = 'from-source',
'schema.0.rowtime.watermarks.type' = 'periodic-ascending',
'schema.0.rowtime.watermarks.delay' = '5000'
)




异常为:


 java.lang.UnsupportedOperationException: empty.max
 at scala.collection.TraversableOnce.max(TraversableOnce.scala:228)
 at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226)
 at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242)
 at org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310)
 at org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130)
 at scala.Option.map(Option.scala:146)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
 at org.apache.flink.table.planner.plan.no
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL使用Kafka自带的timestamp作为事件时间

Jark
Administrator
访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。
目前已经有一个 FLIP [1] 在讨论中,预计 1.12 会支持。

Best,
Jark

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records

On Fri, 5 Jun 2020 at 19:19, sunfulin <[hidden email]> wrote:

> Hi,
> 想问下Flink SQL在使用DDL创建Kafka
> Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性.
> 如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错:
>
>
> CREATE TABLE user_behavior (
> test_time TIMESTAMP(3),
> user_id STRING ,
> item_id STRING ,
> category_id STRING ,
> behavior STRING,
> ts STRING,
> proctime as PROCTIME() -- 通过计算列产生一个处理时间列
> ) WITH (
> 'connector.type' = 'kafka', -- 使用 kafka connector
> 'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本
> 'connector.topic' = 'test', -- kafka topic
> 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
> --'connector.properties.group.id' = 'mytest',
> 'connector.properties.zookeeper.connect' = '168.61.113.170:2181', --
> zookeeper 地址
> 'connector.properties.bootstrap.servers' = '168.61.113.170:9092', --
> kafka broker 地址
> 'format.type' = 'json' -- 数据源格式为 json
> ,'schema.0.rowtime.timestamps.type' = 'from-source',
> 'schema.0.rowtime.watermarks.type' = 'periodic-ascending',
> 'schema.0.rowtime.watermarks.delay' = '5000'
> )
>
>
>
>
> 异常为:
>
>
>  java.lang.UnsupportedOperationException: empty.max
>  at scala.collection.TraversableOnce.max(TraversableOnce.scala:228)
>  at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226)
>  at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310)
>  at
> org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130)
>  at scala.Option.map(Option.scala:146)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>  at org.apache.flink.table.planner.plan.no
Reply | Threaded
Open this post in threaded view
|

Re:Re: Flink SQL使用Kafka自带的timestamp作为事件时间

sunfulin



谢谢Jark老大的回复。看起来在属性里增加   'timestamp.field' = 'timestamp'  应该是我需要的。我注意到目前通过Java代码可以获取timestampFromSource,这个功能是可以拿到source的timestamp么?我测试了下貌似解析出来为空。我的Kafka版本是0.10.2。














在 2020-06-05 19:31:37,"Jark Wu" <[hidden email]> 写道:

>访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。
>目前已经有一个 FLIP [1] 在讨论中,预计 1.12 会支持。
>
>Best,
>Jark
>
>[1]:
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
>
>On Fri, 5 Jun 2020 at 19:19, sunfulin <[hidden email]> wrote:
>
>> Hi,
>> 想问下Flink SQL在使用DDL创建Kafka
>> Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性.
>> 如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错:
>>
>>
>> CREATE TABLE user_behavior (
>> test_time TIMESTAMP(3),
>> user_id STRING ,
>> item_id STRING ,
>> category_id STRING ,
>> behavior STRING,
>> ts STRING,
>> proctime as PROCTIME() -- 通过计算列产生一个处理时间列
>> ) WITH (
>> 'connector.type' = 'kafka', -- 使用 kafka connector
>> 'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本
>> 'connector.topic' = 'test', -- kafka topic
>> 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
>> --'connector.properties.group.id' = 'mytest',
>> 'connector.properties.zookeeper.connect' = '168.61.113.170:2181', --
>> zookeeper 地址
>> 'connector.properties.bootstrap.servers' = '168.61.113.170:9092', --
>> kafka broker 地址
>> 'format.type' = 'json' -- 数据源格式为 json
>> ,'schema.0.rowtime.timestamps.type' = 'from-source',
>> 'schema.0.rowtime.watermarks.type' = 'periodic-ascending',
>> 'schema.0.rowtime.watermarks.delay' = '5000'
>> )
>>
>>
>>
>>
>> 异常为:
>>
>>
>>  java.lang.UnsupportedOperationException: empty.max
>>  at scala.collection.TraversableOnce.max(TraversableOnce.scala:228)
>>  at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226)
>>  at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242)
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310)
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130)
>>  at scala.Option.map(Option.scala:146)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>  at org.apache.flink.table.planner.plan.no
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink SQL使用Kafka自带的timestamp作为事件时间

Jark
Administrator
据我所知  `timestampFromSource` 目前没有 connector 支持。。。

On Fri, 5 Jun 2020 at 22:29, sunfulin <[hidden email]> wrote:

>
>
>
> 谢谢Jark老大的回复。看起来在属性里增加   'timestamp.field' = 'timestamp'
> 应该是我需要的。我注意到目前通过Java代码可以获取timestampFromSource,这个功能是可以拿到source的timestamp么?我测试了下貌似解析出来为空。我的Kafka版本是0.10.2。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-05 19:31:37,"Jark Wu" <[hidden email]> 写道:
> >访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。
> >目前已经有一个 FLIP [1] 在讨论中,预计 1.12 会支持。
> >
> >Best,
> >Jark
> >
> >[1]:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> >
> >On Fri, 5 Jun 2020 at 19:19, sunfulin <[hidden email]> wrote:
> >
> >> Hi,
> >> 想问下Flink SQL在使用DDL创建Kafka
> >>
> Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性.
> >> 如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错:
> >>
> >>
> >> CREATE TABLE user_behavior (
> >> test_time TIMESTAMP(3),
> >> user_id STRING ,
> >> item_id STRING ,
> >> category_id STRING ,
> >> behavior STRING,
> >> ts STRING,
> >> proctime as PROCTIME() -- 通过计算列产生一个处理时间列
> >> ) WITH (
> >> 'connector.type' = 'kafka', -- 使用 kafka connector
> >> 'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本
> >> 'connector.topic' = 'test', -- kafka topic
> >> 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取
> >> --'connector.properties.group.id' = 'mytest',
> >> 'connector.properties.zookeeper.connect' = '168.61.113.170:2181', --
> >> zookeeper 地址
> >> 'connector.properties.bootstrap.servers' = '168.61.113.170:9092', --
> >> kafka broker 地址
> >> 'format.type' = 'json' -- 数据源格式为 json
> >> ,'schema.0.rowtime.timestamps.type' = 'from-source',
> >> 'schema.0.rowtime.watermarks.type' = 'periodic-ascending',
> >> 'schema.0.rowtime.watermarks.delay' = '5000'
> >> )
> >>
> >>
> >>
> >>
> >> 异常为:
> >>
> >>
> >>  java.lang.UnsupportedOperationException: empty.max
> >>  at scala.collection.TraversableOnce.max(TraversableOnce.scala:228)
> >>  at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226)
> >>  at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242)
> >>  at
> >>
> org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310)
> >>  at
> >>
> org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130)
> >>  at scala.Option.map(Option.scala:146)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> >>  at
> >>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> >>  at org.apache.flink.table.planner.plan.no
>