大家好,请教个问题:
我在pyflink中使用SQL DDL创建kafka源,如下: kafka_source_ddl = """ CREATE TABLE kafka_source_tb ( name VARCHAR, number INT, msgtime TIMESTAMP, WATERMARK FOR msgtime AS msgtime ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'mytopic', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'format.type' = 'json', 'format.derive-schema' = 'true' ) """ st_env.sql_update(kafka_source_ddl) 在使用窗口时报错,代码如下: st_env.from_path("kafka_source_tb") \ .window(Slide.over("10.secends").every("1.secends").on("msgtime").alias("msgtime")) \ .group_by("msgtime") \ .select("msgtime.start as b, msgtime.end as c, msgtime.rowtime as d") \ 报错如下 : org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment. at org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293) at org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278) at org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271) at org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233) at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243) at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762) at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747) 我看到有人说直接使用api的rowtime函数好像有bug,就选了用DDL的,这个error是否相关?或者是我哪里写错了? 请大家帮忙看一下 谢谢! |
hi 元灵,
这个是个已知bug: https://issues.apache.org/jira/browse/FLINK-17753,目前正在fix。 Bests, Godfrey 元灵 <[hidden email]> 于2020年6月3日周三 下午5:39写道: > 大家好,请教个问题: > 我在pyflink中使用SQL DDL创建kafka源,如下: > kafka_source_ddl = """ > CREATE TABLE kafka_source_tb ( > name VARCHAR, > number INT, > msgtime TIMESTAMP, > WATERMARK FOR msgtime AS msgtime > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'mytopic', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ) > """ > st_env.sql_update(kafka_source_ddl) > > > 在使用窗口时报错,代码如下: > st_env.from_path("kafka_source_tb") \ > > .window(Slide.over("10.secends").every("1.secends").on("msgtime").alias("msgtime")) > \ > .group_by("msgtime") \ > .select("msgtime.start as b, msgtime.end as c, msgtime.rowtime > as d") \ > > > 报错如下 > : org.apache.flink.table.api.ValidationException: A group window expects a > time attribute for grouping in a stream environment. > at > org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293) > at > org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278) > at > org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271) > at > org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233) > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243) > at > org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762) > at > org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747) > > > 我看到有人说直接使用api的rowtime函数好像有bug,就选了用DDL的,这个error是否相关?或者是我哪里写错了? > > > 请大家帮忙看一下 > 谢谢! > > |
谢谢回复,等1.11.0出来我再试试。 在 2020-06-03 18:00:06,"godfrey he" <[hidden email]> 写道: > hi 元灵, > 这个是个已知bug: https://issues.apache.org/jira/browse/FLINK-17753,目前正在fix。 > >Bests, >Godfrey > >元灵 <[hidden email]> 于2020年6月3日周三 下午5:39写道: > >> 大家好,请教个问题: >> 我在pyflink中使用SQL DDL创建kafka源,如下: >> kafka_source_ddl = """ >> CREATE TABLE kafka_source_tb ( >> name VARCHAR, >> number INT, >> msgtime TIMESTAMP, >> WATERMARK FOR msgtime AS msgtime >> ) WITH ( >> 'connector.type' = 'kafka', >> 'connector.version' = 'universal', >> 'connector.topic' = 'mytopic', >> 'connector.properties.zookeeper.connect' = 'localhost:2181', >> 'connector.properties.bootstrap.servers' = 'localhost:9092', >> 'format.type' = 'json', >> 'format.derive-schema' = 'true' >> ) >> """ >> st_env.sql_update(kafka_source_ddl) >> >> >> 在使用窗口时报错,代码如下: >> st_env.from_path("kafka_source_tb") \ >> >> .window(Slide.over("10.secends").every("1.secends").on("msgtime").alias("msgtime")) >> \ >> .group_by("msgtime") \ >> .select("msgtime.start as b, msgtime.end as c, msgtime.rowtime >> as d") \ >> >> >> 报错如下 >> : org.apache.flink.table.api.ValidationException: A group window expects a >> time attribute for grouping in a stream environment. >> at >> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293) >> at >> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278) >> at >> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271) >> at >> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233) >> at >> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243) >> at >> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762) >> at >> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747) >> >> >> 我看到有人说直接使用api的rowtime函数好像有bug,就选了用DDL的,这个error是否相关?或者是我哪里写错了? >> >> >> 请大家帮忙看一下 >> 谢谢! >> >> |
Free forum by Nabble | Edit this page |