各位大佬,我想尝试下pyflink 进行时间窗口下的指标统计,写了一个demo发现table APi 的group 方法报错,网上搜索了一下相关内容也没有解决问题, 想请各位大佬帮帮忙看一下是哪里写错了?
错误信息: py4j.protocol.Py4JJavaError: An error occurred while calling o95.select. : org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment. at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293) at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278) at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271) at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233) at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250) at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794) at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781) demo程序: from pyflink.datastream import * from pyflink.table import * from pyflink.table.descriptors import * from pyflink.table.descriptors import Json from pyflink.table.window import * test_out_put_data_path = r'D:\test_doc\test_result_data.csv' s_nev = StreamExecutionEnvironment.get_execution_environment() s_nev.set_parallelism(3) st_nev = StreamTableEnvironment.create(s_nev, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()) st_nev.connect(Kafka().version('0.11').topic('gyhWebLog').start_from_earliest().property("zookeeper.connect","cdh3:2181, cdh4:2181, cdh5:2181").property("bootstrap.servers", "cdh3:9092, cdh4:9092, cdh5:9092")) \ .with_format(Json() .fail_on_missing_field(False) .schema(DataTypes.ROW([DataTypes.FIELD('time', DataTypes.TIMESTAMP(3)), DataTypes.FIELD('prev_page',DataTypes.STRING()), DataTypes.FIELD('page', DataTypes.STRING()), DataTypes.FIELD("app", DataTypes.STRING()), DataTypes.FIELD("nextApp",DataTypes.STRING()), DataTypes.FIELD("service",DataTypes.STRING()), DataTypes.FIELD("userId",DataTypes.BIGINT())])))\ .with_schema(Schema(). field('prev_page', DataTypes.STRING()) .field('page', DataTypes.STRING()) .field('app', DataTypes.STRING()) .field('nextApp', DataTypes.STRING()) .field('service', DataTypes.STRING()) .field('userId', DataTypes.BIGINT()) .field('time', DataTypes.TIMESTAMP(3)) .rowtime(Rowtime() .timestamps_from_field('time') .watermarks_periodic_bounded(60000)))\ .in_append_mode()\ .create_temporary_table('raw_web_log_data') st_nev.connect(FileSystem().path(test_out_put_data_path))\ .with_format(OldCsv() .field_delimiter(',') .field("userId", DataTypes.BIGINT()) .field('dataCount', DataTypes.BIGINT()) .field('count_time', DataTypes.TIMESTAMP(3)) )\ .with_schema(Schema() .field('userId', DataTypes.BIGINT()) .field('dataCount', DataTypes.BIGINT()) .field('count_time', DataTypes.TIMESTAMP(3)) )\ .create_temporary_table('test_out_put') if __name__ == '__main__': st_nev.from_path('raw_web_log_data').window(Tumble.over('1.hours').on('time').alias('w')).group_by('userId, w').select('userId, page.count as d, w.end').execute_insert('test_out_put') |
Hello,
现在的descriptor的方式存在很多bug,社区已经在进行重构了。当前你可以使用DDL[1]的方式来解决问题。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors Best, Xingbo 刘乘九 <[hidden email]> 于2020年9月29日周二 下午5:46写道: > 各位大佬,我想尝试下pyflink 进行时间窗口下的指标统计,写了一个demo发现table APi 的group > 方法报错,网上搜索了一下相关内容也没有解决问题, 想请各位大佬帮帮忙看一下是哪里写错了? > > 错误信息: > py4j.protocol.Py4JJavaError: An error occurred while calling o95.select. > : org.apache.flink.table.api.ValidationException: A group window expects a > time attribute for grouping in a stream environment. > at > org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293) > at > org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278) > at > org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271) > at > org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233) > at > org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250) > at > org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794) > at > org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781) > > > > > demo程序: > from pyflink.datastream import * > from pyflink.table import * > from pyflink.table.descriptors import * > from pyflink.table.descriptors import Json > from pyflink.table.window import * > > test_out_put_data_path = r'D:\test_doc\test_result_data.csv' > > s_nev = StreamExecutionEnvironment.get_execution_environment() > s_nev.set_parallelism(3) > st_nev = StreamTableEnvironment.create(s_nev, > environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()) > > st_nev.connect(Kafka().version('0.11').topic('gyhWebLog').start_from_earliest().property("zookeeper.connect","cdh3:2181, > cdh4:2181, cdh5:2181").property("bootstrap.servers", "cdh3:9092, cdh4:9092, > cdh5:9092")) \ > .with_format(Json() > .fail_on_missing_field(False) > .schema(DataTypes.ROW([DataTypes.FIELD('time', > DataTypes.TIMESTAMP(3)), > > DataTypes.FIELD('prev_page',DataTypes.STRING()), > DataTypes.FIELD('page', > DataTypes.STRING()), > DataTypes.FIELD("app", > DataTypes.STRING()), > > DataTypes.FIELD("nextApp",DataTypes.STRING()), > > DataTypes.FIELD("service",DataTypes.STRING()), > > DataTypes.FIELD("userId",DataTypes.BIGINT())])))\ > .with_schema(Schema(). > field('prev_page', DataTypes.STRING()) > .field('page', DataTypes.STRING()) > .field('app', DataTypes.STRING()) > .field('nextApp', DataTypes.STRING()) > .field('service', DataTypes.STRING()) > .field('userId', DataTypes.BIGINT()) > .field('time', DataTypes.TIMESTAMP(3)) > .rowtime(Rowtime() > .timestamps_from_field('time') > .watermarks_periodic_bounded(60000)))\ > .in_append_mode()\ > .create_temporary_table('raw_web_log_data') > > > st_nev.connect(FileSystem().path(test_out_put_data_path))\ > .with_format(OldCsv() > .field_delimiter(',') > .field("userId", DataTypes.BIGINT()) > .field('dataCount', DataTypes.BIGINT()) > .field('count_time', DataTypes.TIMESTAMP(3)) > )\ > .with_schema(Schema() > .field('userId', DataTypes.BIGINT()) > .field('dataCount', DataTypes.BIGINT()) > .field('count_time', DataTypes.TIMESTAMP(3)) > )\ > .create_temporary_table('test_out_put') > > > if __name__ == '__main__': > st_nev.from_path('raw_web_log_data').window(Tumble.over('1.hours').on('time').alias('w')).group_by('userId, > w').select('userId, page.count as d, w.end').execute_insert('test_out_put') > |
使用了你说的DDL的方式 可还是报错
kafka_source_ddl = """CREATE TABLE mysource (createTime STRING, type BIGINT, uid STRING, countryId BIGINT, data STRING, rowtime as TO_TIMESTAMP(createTime), WATERMARK FOR rowtime AS rowtime - INTERVAL '2' SECOND ) WITH (...) py4j.protocol.Py4JJavaError: An error occurred while calling o21.select. : org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment. -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
你说的这个在1.11应该已经修复了[1] ,我在本地没有复现出你这个问题 [1] https://issues.apache.org/jira/browse/FLINK-17753 Best, Xingbo anfeng <[hidden email]> 于2020年11月17日周二 下午5:31写道: > 使用了你说的DDL的方式 可还是报错 > > kafka_source_ddl = """CREATE TABLE mysource (createTime STRING, > type BIGINT, > uid STRING, > countryId BIGINT, > data STRING, > rowtime as > TO_TIMESTAMP(createTime), > WATERMARK FOR rowtime AS > rowtime - INTERVAL '2' SECOND > ) WITH (...) > > > py4j.protocol.Py4JJavaError: An error occurred while calling o21.select. > : org.apache.flink.table.api.ValidationException: A group window expects a > time attribute for grouping in a stream environment. > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
我是在playgrounds环境跑到, 不过我检查的apache flink 是1.11.2;
跟这个会有关系吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
我怀疑还是你的环境不干净导致的,你pip list | grep apache-flink看下版本是不是确实是这个。因为很多时候用户会在机器上装好几个python环境。 Best, Xingbo anfeng <[hidden email]> 于2020年11月18日周三 上午9:40写道: > 我是在playgrounds环境跑到, 不过我检查的apache flink 是1.11.2; > 跟这个会有关系吗 > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |