无法生成rowtime导致在window失败

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

无法生成rowtime导致在window失败

naturalfree
大家好
       在使用窗口的过程中遇到一个问题,麻烦大家帮忙看下!
       简单描述下情况:我们是从kafka获取数据,在flink做一些相关处理后sink到elasticsearch中。没有使用window的时候没有问题,可以成功完成流程。使用窗口后报错:Exception in thread "main" org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.
 
       下边是我的详细流程的相关片段
----------------------------------------------------------------------------
1. 我们使用的jar包是flink-xx_2.12:1.10.0 / kafka版本为0.11
2. kafka的数据格式为{"acct":"acct1234", "evtime":1593396391819}
3. 使用descriptor的方式连接kafka,代码为:
    StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv);


    fsTableEnv.connect(new Kafka()
                                      .version("universal")
                                      .topic("jes_topic_evtime")
                                      .property("zookeeper.connect", "172.xx.xx.xxx:2181")
                                      .property("bootstrap.servers", "172.xx.xx.xxx:9092")
                                      .property("group.id", "grp1")
                                      .startFromEarliest()
                                    ).withFormat(new Json()
                                                  .failOnMissingField(false).deriveSchema())
                                           .withSchema(new Schema().field("acct", "STRING").field("evtime", "LONG").field("logictime","TIMESTAMP(3)").rowTime(new Rowtime().timestampsFromField("evtime").watermarksPeriodicBounded(5000)))
                                         .inAppendMode().createTemporaryTable("testTableName");


           Table testTab = fsTableEnv.sqlQuery("SELECT acct, evtime, logictime FROM testTableName")
                           .window(Tumble.over("5.seconds").on("logictime").as("w1"))
                           .groupBy("w1, acct")
                           .select("w1.rowtime, acctno");




测试发现在descriptor连接kafka时定义schema时,定义的rowtime字段和使用from的方式重命名字段好像都无法成功。测试时使用from方式重命名字段返回的值是null
Reply | Threaded
Open this post in threaded view
|

Re: 无法生成rowtime导致在window失败

Leonard Xu
Hi,

> field("logictime","TIMESTAMP(3)”)
 报错的原因这个字段在你原始的表中不存在的,理解你的需求是你想用 field evitime(Long型)生成一个新的 field logictime(TIMESTAMP(3)),这个可以用计算列解决,Table API上还不支持计算列,1.12 已经在开发中了。你可以用 DDL 加计算列完成满足你的需求,参考[1]

create table test (
 acct STRING,
 evitime BIGINT,
 logictime as TO_TIMESTAMP(FROM_UNIXTIME(evitime)),
 WATERMARK FOR logictime AS logictime - INTERVAL ‘5’ SECOND,
) with(
...
)
 
   
祝好
Leonard
[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html>