pyflink1.11 window groupby出错

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

pyflink1.11 window groupby出错

刘乘九
各位大佬,我想尝试下pyflink 进行时间窗口下的指标统计,写了一个demo发现table APi 的group 方法报错,网上搜索了一下相关内容也没有解决问题, 想请各位大佬帮帮忙看一下是哪里写错了?

错误信息:
py4j.protocol.Py4JJavaError: An error occurred while calling o95.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.
at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)




demo程序:
from pyflink.datastream import *
from pyflink.table import *
from pyflink.table.descriptors import *
from pyflink.table.descriptors import Json
from pyflink.table.window import *

test_out_put_data_path = r'D:\test_doc\test_result_data.csv'

s_nev = StreamExecutionEnvironment.get_execution_environment()
s_nev.set_parallelism(3)
st_nev = StreamTableEnvironment.create(s_nev, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())

st_nev.connect(Kafka().version('0.11').topic('gyhWebLog').start_from_earliest().property("zookeeper.connect","cdh3:2181, cdh4:2181, cdh5:2181").property("bootstrap.servers", "cdh3:9092, cdh4:9092, cdh5:9092")) \
    .with_format(Json()
                 .fail_on_missing_field(False)
                 .schema(DataTypes.ROW([DataTypes.FIELD('time', DataTypes.TIMESTAMP(3)),
                                        DataTypes.FIELD('prev_page',DataTypes.STRING()),
                                        DataTypes.FIELD('page', DataTypes.STRING()),
                                        DataTypes.FIELD("app", DataTypes.STRING()),
                                        DataTypes.FIELD("nextApp",DataTypes.STRING()),
                                        DataTypes.FIELD("service",DataTypes.STRING()),
                                        DataTypes.FIELD("userId",DataTypes.BIGINT())])))\
    .with_schema(Schema().
field('prev_page', DataTypes.STRING())
                 .field('page', DataTypes.STRING())
                 .field('app', DataTypes.STRING())
                 .field('nextApp', DataTypes.STRING())
                 .field('service', DataTypes.STRING())
                 .field('userId', DataTypes.BIGINT())
                 .field('time', DataTypes.TIMESTAMP(3))
                 .rowtime(Rowtime()
                          .timestamps_from_field('time')
                          .watermarks_periodic_bounded(60000)))\
    .in_append_mode()\
    .create_temporary_table('raw_web_log_data')


st_nev.connect(FileSystem().path(test_out_put_data_path))\
    .with_format(OldCsv()
                 .field_delimiter(',')
                 .field("userId", DataTypes.BIGINT())
                 .field('dataCount', DataTypes.BIGINT())
                 .field('count_time', DataTypes.TIMESTAMP(3))
                 )\
    .with_schema(Schema()
                 .field('userId', DataTypes.BIGINT())
                 .field('dataCount', DataTypes.BIGINT())
                 .field('count_time', DataTypes.TIMESTAMP(3))
                 )\
    .create_temporary_table('test_out_put')


if __name__ == '__main__':
st_nev.from_path('raw_web_log_data').window(Tumble.over('1.hours').on('time').alias('w')).group_by('userId, w').select('userId, page.count as d, w.end').execute_insert('test_out_put')
Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.11 window groupby出错

Xingbo Huang
Hello,

现在的descriptor的方式存在很多bug,社区已经在进行重构了。当前你可以使用DDL[1]的方式来解决问题。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
Best,
Xingbo

刘乘九 <[hidden email]> 于2020年9月29日周二 下午5:46写道:

> 各位大佬,我想尝试下pyflink 进行时间窗口下的指标统计,写了一个demo发现table APi 的group
> 方法报错,网上搜索了一下相关内容也没有解决问题, 想请各位大佬帮帮忙看一下是哪里写错了?
>
> 错误信息:
> py4j.protocol.Py4JJavaError: An error occurred while calling o95.select.
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
> at
> org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
>
>
>
>
> demo程序:
> from pyflink.datastream import *
> from pyflink.table import *
> from pyflink.table.descriptors import *
> from pyflink.table.descriptors import Json
> from pyflink.table.window import *
>
> test_out_put_data_path = r'D:\test_doc\test_result_data.csv'
>
> s_nev = StreamExecutionEnvironment.get_execution_environment()
> s_nev.set_parallelism(3)
> st_nev = StreamTableEnvironment.create(s_nev,
> environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
>
> st_nev.connect(Kafka().version('0.11').topic('gyhWebLog').start_from_earliest().property("zookeeper.connect","cdh3:2181,
> cdh4:2181, cdh5:2181").property("bootstrap.servers", "cdh3:9092, cdh4:9092,
> cdh5:9092")) \
>     .with_format(Json()
>                  .fail_on_missing_field(False)
>                  .schema(DataTypes.ROW([DataTypes.FIELD('time',
> DataTypes.TIMESTAMP(3)),
>
> DataTypes.FIELD('prev_page',DataTypes.STRING()),
>                                         DataTypes.FIELD('page',
> DataTypes.STRING()),
>                                         DataTypes.FIELD("app",
> DataTypes.STRING()),
>
> DataTypes.FIELD("nextApp",DataTypes.STRING()),
>
> DataTypes.FIELD("service",DataTypes.STRING()),
>
> DataTypes.FIELD("userId",DataTypes.BIGINT())])))\
>     .with_schema(Schema().
> field('prev_page', DataTypes.STRING())
>                  .field('page', DataTypes.STRING())
>                  .field('app', DataTypes.STRING())
>                  .field('nextApp', DataTypes.STRING())
>                  .field('service', DataTypes.STRING())
>                  .field('userId', DataTypes.BIGINT())
>                  .field('time', DataTypes.TIMESTAMP(3))
>                  .rowtime(Rowtime()
>                           .timestamps_from_field('time')
>                           .watermarks_periodic_bounded(60000)))\
>     .in_append_mode()\
>     .create_temporary_table('raw_web_log_data')
>
>
> st_nev.connect(FileSystem().path(test_out_put_data_path))\
>     .with_format(OldCsv()
>                  .field_delimiter(',')
>                  .field("userId", DataTypes.BIGINT())
>                  .field('dataCount', DataTypes.BIGINT())
>                  .field('count_time', DataTypes.TIMESTAMP(3))
>                  )\
>     .with_schema(Schema()
>                  .field('userId', DataTypes.BIGINT())
>                  .field('dataCount', DataTypes.BIGINT())
>                  .field('count_time', DataTypes.TIMESTAMP(3))
>                  )\
>     .create_temporary_table('test_out_put')
>
>
> if __name__ == '__main__':
> st_nev.from_path('raw_web_log_data').window(Tumble.over('1.hours').on('time').alias('w')).group_by('userId,
> w').select('userId, page.count as d, w.end').execute_insert('test_out_put')
>
Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.11 window groupby出错

anfeng
使用了你说的DDL的方式   可还是报错

    kafka_source_ddl = """CREATE TABLE mysource (createTime STRING,
                                                 type BIGINT,
                                                 uid STRING,
                                                 countryId BIGINT,
                                                 data STRING,
                                                 rowtime as
TO_TIMESTAMP(createTime),
                                                 WATERMARK FOR rowtime AS
rowtime - INTERVAL '2' SECOND
                                                 ) WITH (...)


py4j.protocol.Py4JJavaError: An error occurred while calling o21.select.
: org.apache.flink.table.api.ValidationException: A group window expects a
time attribute for grouping in a stream environment.



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.11 window groupby出错

Xingbo Huang
Hi,

你说的这个在1.11应该已经修复了[1] ,我在本地没有复现出你这个问题

[1] https://issues.apache.org/jira/browse/FLINK-17753

Best,
Xingbo

anfeng <[hidden email]> 于2020年11月17日周二 下午5:31写道:

> 使用了你说的DDL的方式   可还是报错
>
>     kafka_source_ddl = """CREATE TABLE mysource (createTime STRING,
>                                                  type BIGINT,
>                                                  uid STRING,
>                                                  countryId BIGINT,
>                                                  data STRING,
>                                                  rowtime as
> TO_TIMESTAMP(createTime),
>                                                  WATERMARK FOR rowtime AS
> rowtime - INTERVAL '2' SECOND
>                                                  ) WITH (...)
>
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o21.select.
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.11 window groupby出错

anfeng
我是在playgrounds环境跑到,  不过我检查的apache flink 是1.11.2;
跟这个会有关系吗




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.11 window groupby出错

Xingbo Huang
Hi,

我怀疑还是你的环境不干净导致的,你pip list | grep
apache-flink看下版本是不是确实是这个。因为很多时候用户会在机器上装好几个python环境。

Best,
Xingbo

anfeng <[hidden email]> 于2020年11月18日周三 上午9:40写道:

> 我是在playgrounds环境跑到,  不过我检查的apache flink 是1.11.2;
> 跟这个会有关系吗
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>