Hi,
想问下Flink SQL在使用DDL创建Kafka Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性. 如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错: CREATE TABLE user_behavior ( test_time TIMESTAMP(3), user_id STRING , item_id STRING , category_id STRING , behavior STRING, ts STRING, proctime as PROCTIME() -- 通过计算列产生一个处理时间列 ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'test', -- kafka topic 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取 --'connector.properties.group.id' = 'mytest', 'connector.properties.zookeeper.connect' = '168.61.113.170:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = '168.61.113.170:9092', -- kafka broker 地址 'format.type' = 'json' -- 数据源格式为 json ,'schema.0.rowtime.timestamps.type' = 'from-source', 'schema.0.rowtime.watermarks.type' = 'periodic-ascending', 'schema.0.rowtime.watermarks.delay' = '5000' ) 异常为: java.lang.UnsupportedOperationException: empty.max at scala.collection.TraversableOnce.max(TraversableOnce.scala:228) at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226) at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242) at org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310) at org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) at org.apache.flink.table.planner.plan.no |
Administrator
|
访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。
目前已经有一个 FLIP [1] 在讨论中,预计 1.12 会支持。 Best, Jark [1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records On Fri, 5 Jun 2020 at 19:19, sunfulin <[hidden email]> wrote: > Hi, > 想问下Flink SQL在使用DDL创建Kafka > Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性. > 如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错: > > > CREATE TABLE user_behavior ( > test_time TIMESTAMP(3), > user_id STRING , > item_id STRING , > category_id STRING , > behavior STRING, > ts STRING, > proctime as PROCTIME() -- 通过计算列产生一个处理时间列 > ) WITH ( > 'connector.type' = 'kafka', -- 使用 kafka connector > 'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本 > 'connector.topic' = 'test', -- kafka topic > 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取 > --'connector.properties.group.id' = 'mytest', > 'connector.properties.zookeeper.connect' = '168.61.113.170:2181', -- > zookeeper 地址 > 'connector.properties.bootstrap.servers' = '168.61.113.170:9092', -- > kafka broker 地址 > 'format.type' = 'json' -- 数据源格式为 json > ,'schema.0.rowtime.timestamps.type' = 'from-source', > 'schema.0.rowtime.watermarks.type' = 'periodic-ascending', > 'schema.0.rowtime.watermarks.delay' = '5000' > ) > > > > > 异常为: > > > java.lang.UnsupportedOperationException: empty.max > at scala.collection.TraversableOnce.max(TraversableOnce.scala:228) > at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226) > at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242) > at > org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310) > at > org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) > at org.apache.flink.table.planner.plan.no |
谢谢Jark老大的回复。看起来在属性里增加 'timestamp.field' = 'timestamp' 应该是我需要的。我注意到目前通过Java代码可以获取timestampFromSource,这个功能是可以拿到source的timestamp么?我测试了下貌似解析出来为空。我的Kafka版本是0.10.2。 在 2020-06-05 19:31:37,"Jark Wu" <[hidden email]> 写道: >访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。 >目前已经有一个 FLIP [1] 在讨论中,预计 1.12 会支持。 > >Best, >Jark > >[1]: >https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records > >On Fri, 5 Jun 2020 at 19:19, sunfulin <[hidden email]> wrote: > >> Hi, >> 想问下Flink SQL在使用DDL创建Kafka >> Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性. >> 如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错: >> >> >> CREATE TABLE user_behavior ( >> test_time TIMESTAMP(3), >> user_id STRING , >> item_id STRING , >> category_id STRING , >> behavior STRING, >> ts STRING, >> proctime as PROCTIME() -- 通过计算列产生一个处理时间列 >> ) WITH ( >> 'connector.type' = 'kafka', -- 使用 kafka connector >> 'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本 >> 'connector.topic' = 'test', -- kafka topic >> 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取 >> --'connector.properties.group.id' = 'mytest', >> 'connector.properties.zookeeper.connect' = '168.61.113.170:2181', -- >> zookeeper 地址 >> 'connector.properties.bootstrap.servers' = '168.61.113.170:9092', -- >> kafka broker 地址 >> 'format.type' = 'json' -- 数据源格式为 json >> ,'schema.0.rowtime.timestamps.type' = 'from-source', >> 'schema.0.rowtime.watermarks.type' = 'periodic-ascending', >> 'schema.0.rowtime.watermarks.delay' = '5000' >> ) >> >> >> >> >> 异常为: >> >> >> java.lang.UnsupportedOperationException: empty.max >> at scala.collection.TraversableOnce.max(TraversableOnce.scala:228) >> at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226) >> at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242) >> at >> org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310) >> at >> org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130) >> at scala.Option.map(Option.scala:146) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >> at org.apache.flink.table.planner.plan.no |
Administrator
|
据我所知 `timestampFromSource` 目前没有 connector 支持。。。
On Fri, 5 Jun 2020 at 22:29, sunfulin <[hidden email]> wrote: > > > > 谢谢Jark老大的回复。看起来在属性里增加 'timestamp.field' = 'timestamp' > 应该是我需要的。我注意到目前通过Java代码可以获取timestampFromSource,这个功能是可以拿到source的timestamp么?我测试了下貌似解析出来为空。我的Kafka版本是0.10.2。 > > > > > > > > > > > > > > > 在 2020-06-05 19:31:37,"Jark Wu" <[hidden email]> 写道: > >访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。 > >目前已经有一个 FLIP [1] 在讨论中,预计 1.12 会支持。 > > > >Best, > >Jark > > > >[1]: > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records > > > >On Fri, 5 Jun 2020 at 19:19, sunfulin <[hidden email]> wrote: > > > >> Hi, > >> 想问下Flink SQL在使用DDL创建Kafka > >> > Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性. > >> 如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错: > >> > >> > >> CREATE TABLE user_behavior ( > >> test_time TIMESTAMP(3), > >> user_id STRING , > >> item_id STRING , > >> category_id STRING , > >> behavior STRING, > >> ts STRING, > >> proctime as PROCTIME() -- 通过计算列产生一个处理时间列 > >> ) WITH ( > >> 'connector.type' = 'kafka', -- 使用 kafka connector > >> 'connector.version' = '0.10', -- kafka 版本,universal 支持 0.11 以上的版本 > >> 'connector.topic' = 'test', -- kafka topic > >> 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取 > >> --'connector.properties.group.id' = 'mytest', > >> 'connector.properties.zookeeper.connect' = '168.61.113.170:2181', -- > >> zookeeper 地址 > >> 'connector.properties.bootstrap.servers' = '168.61.113.170:9092', -- > >> kafka broker 地址 > >> 'format.type' = 'json' -- 数据源格式为 json > >> ,'schema.0.rowtime.timestamps.type' = 'from-source', > >> 'schema.0.rowtime.watermarks.type' = 'periodic-ascending', > >> 'schema.0.rowtime.watermarks.delay' = '5000' > >> ) > >> > >> > >> > >> > >> 异常为: > >> > >> > >> java.lang.UnsupportedOperationException: empty.max > >> at scala.collection.TraversableOnce.max(TraversableOnce.scala:228) > >> at scala.collection.TraversableOnce.max$(TraversableOnce.scala:226) > >> at scala.collection.mutable.ArrayOps$ofInt.max(ArrayOps.scala:242) > >> at > >> > org.apache.flink.table.planner.sources.TableSourceUtil$.createSchemaRelNode(TableSourceUtil.scala:310) > >> at > >> > org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:297) > >> at > >> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.$anonfun$translateToPlanInternal$1(StreamExecTableSourceScan.scala:130) > >> at scala.Option.map(Option.scala:146) > >> at > >> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:125) > >> at > >> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) > >> at > >> > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > >> at > >> > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > >> at > >> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) > >> at > >> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > >> at > >> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > >> at > >> > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > >> at > >> > org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > >> at > >> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > >> at > >> > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) > >> at org.apache.flink.table.planner.plan.no > |
Free forum by Nabble | Edit this page |