各位大佬:
有没有遇到过这个问题,Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. 无论是事件时间还是处理时间,都报这个错;flink和blink的planner报错差不多。 版本如下: <flink.version>1.10.0</flink.version> <scala.version>2.11</scala.version> 代码如下: //获取运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); //创建一个tableEnvironment StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); Schema schema = new Schema() //.field("id", "VARCHAR").from("id") .field("id", "STRING") //.field("name", "VARCHAR") .field("amount", "DOUBLE") .field("proctime", Types.SQL_TIMESTAMP).proctime() //.field("rowtime", Types.SQL_TIMESTAMP) //.rowtime( // new Rowtime() // .timestampsFromField( // "eventtime") // .watermarksPeriodicBounded(2000)) ; // "0.8", "0.9", "0.10", "0.11", and "universal" tableEnv.connect(new Kafka().version("universal") .topic("source0511") .property("zookeeper.connect", "172.16.44.28:7758") .property("bootstrap.servers", "172.16.44.28:9096") .property("group.id", "source0511-group") .startFromEarliest() ) .withFormat(new Csv()) .withSchema(schema) .inAppendMode() .createTemporaryTable("sourceTable"); tableEnv.connect( new Kafka() .version("universal") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("sink0511") .property("acks", "all") .property("retries", "0") .property("batch.size", "16384") .property("linger.ms", "10") .property("zookeeper.connect", "172.16.44.28:7758") .property("bootstrap.servers", "172.16.44.28:9096") .sinkPartitionerFixed()) .inAppendMode() .withFormat(new Json()) .withSchema( new Schema().field("totalamount", "DOUBLE") //.field("total", "INT") .field("time", Types.SQL_TIMESTAMP) ) .createTemporaryTable("sinkTable"); tableEnv.sqlUpdate("insert into sinkTable" + " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) " + "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)"); //SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) // FROM user_actions // GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); env.execute("test"); |
TIMESTAMP(3)时间格式不对吧
------------------ 原始邮件 ------------------ 发件人: "PCL"<[hidden email]>; 发送时间: 2020年5月12日(星期二) 晚上9:43 收件人: "user-zh"<[hidden email]>; 主题: flink10读取kafka报错 各位大佬: 有没有遇到过这个问题,Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. 无论是事件时间还是处理时间,都报这个错;flink和blink的planner报错差不多。 版本如下: <flink.version>1.10.0</flink.version> <scala.version>2.11</scala.version> 代码如下: //获取运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); //创建一个tableEnvironment StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); Schema schema = new Schema() //.field("id", "VARCHAR").from("id") .field("id", "STRING") //.field("name", "VARCHAR") .field("amount", "DOUBLE") .field("proctime", Types.SQL_TIMESTAMP).proctime() //.field("rowtime", Types.SQL_TIMESTAMP) //.rowtime( // new Rowtime() // .timestampsFromField( // "eventtime") // .watermarksPeriodicBounded(2000)) ; // "0.8", "0.9", "0.10", "0.11", and "universal" tableEnv.connect(new Kafka().version("universal") .topic("source0511") .property("zookeeper.connect", "172.16.44.28:7758") .property("bootstrap.servers", "172.16.44.28:9096") .property("group.id", "source0511-group") .startFromEarliest() ) .withFormat(new Csv()) .withSchema(schema) .inAppendMode() .createTemporaryTable("sourceTable"); tableEnv.connect( new Kafka() .version("universal") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("sink0511") .property("acks", "all") .property("retries", "0") .property("batch.size", "16384") .property("linger.ms", "10") .property("zookeeper.connect", "172.16.44.28:7758") .property("bootstrap.servers", "172.16.44.28:9096") .sinkPartitionerFixed()) .inAppendMode() .withFormat(new Json()) .withSchema( new Schema().field("totalamount", "DOUBLE") //.field("total", "INT") .field("time", Types.SQL_TIMESTAMP) ) .createTemporaryTable("sinkTable"); tableEnv.sqlUpdate("insert into sinkTable" + " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) " + "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)"); //SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) // FROM user_actions // GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); env.execute("test"); |
感谢回复! 这个很神奇的是,执行sqlquery就没问题 /*Table tb1 =tableEnv.sqlQuery("select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND)" + " from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND) "); tb1.printSchema();*/ 放开注释后,打印的schema是 root |-- EXPR$0: DOUBLE |-- EXPR$1: TIMESTAMP(3) 在 2020-05-12 22:36:17,"忝忝向仧" <[hidden email]> 写道: >TIMESTAMP(3)时间格式不对吧 > > >------------------ 原始邮件 ------------------ >发件人: "PCL"<[hidden email]>; >发送时间: 2020年5月12日(星期二) 晚上9:43 >收件人: "user-zh"<[hidden email]>; > >主题: flink10读取kafka报错 > > > >各位大佬: > 有没有遇到过这个问题,Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. >无论是事件时间还是处理时间,都报这个错;flink和blink的planner报错差不多。 >版本如下: ><flink.version>1.10.0</flink.version> ><scala.version>2.11</scala.version> >代码如下: >//获取运行环境 >StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >//EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); > //创建一个tableEnvironment >StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); > >Schema schema = new Schema() >//.field("id", "VARCHAR").from("id") >.field("id", "STRING") >//.field("name", "VARCHAR") >.field("amount", "DOUBLE") > .field("proctime", Types.SQL_TIMESTAMP).proctime() >//.field("rowtime", Types.SQL_TIMESTAMP) > //.rowtime( > // new Rowtime() > // .timestampsFromField( > // "eventtime") > // .watermarksPeriodicBounded(2000)) >; > >// "0.8", "0.9", "0.10", "0.11", and "universal" >tableEnv.connect(new Kafka().version("universal") > .topic("source0511") > .property("zookeeper.connect", "172.16.44.28:7758") > .property("bootstrap.servers", "172.16.44.28:9096") > .property("group.id", "source0511-group") > .startFromEarliest() > ) > .withFormat(new Csv()) > .withSchema(schema) > .inAppendMode() > .createTemporaryTable("sourceTable"); > >tableEnv.connect( >new Kafka() > .version("universal") >// "0.8", "0.9", "0.10", "0.11", and "universal" >.topic("sink0511") > .property("acks", "all") > .property("retries", "0") > .property("batch.size", "16384") > .property("linger.ms", "10") > .property("zookeeper.connect", "172.16.44.28:7758") > .property("bootstrap.servers", "172.16.44.28:9096") > .sinkPartitionerFixed()) > .inAppendMode() > .withFormat(new Json()) > .withSchema( >new Schema().field("totalamount", "DOUBLE") >//.field("total", "INT") >.field("time", Types.SQL_TIMESTAMP) > ) > .createTemporaryTable("sinkTable"); > >tableEnv.sqlUpdate("insert into sinkTable" >+ " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) " >+ "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)"); >//SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) > // FROM user_actions > // GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); >env.execute("test"); |
昨晚解决了,暂时还没查清原因,解决方式把createTemporaryTable 方法换成之前过时的那个方法registerTableSource,别的不用动。
[hidden email] 发件人: PCL 发送时间: 2020-05-12 23:39 收件人: user-zh 主题: Re:回复:flink10读取kafka报错 感谢回复! 这个很神奇的是,执行sqlquery就没问题 /*Table tb1 =tableEnv.sqlQuery("select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND)" + " from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND) "); tb1.printSchema();*/ 放开注释后,打印的schema是 root |-- EXPR$0: DOUBLE |-- EXPR$1: TIMESTAMP(3) 在 2020-05-12 22:36:17,"忝忝向��" <[hidden email]> 写道: >TIMESTAMP(3)时间格式不对吧 > > >------------------ 原始邮件 ------------------ >发件人: "PCL"<[hidden email]>; >发送时间: 2020年5月12日(星期二) 晚上9:43 >收件人: "user-zh"<[hidden email]>; > >主题: flink10读取kafka报错 > > > >各位大佬: > 有没有遇到过这个问题,Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. >无论是事件时间还是处理时间,都报这个错;flink和blink的planner报错差不多。 >版本如下: ><flink.version>1.10.0</flink.version> ><scala.version>2.11</scala.version> >代码如下: >//获取运行环境 >StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >//EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); > //创建一个tableEnvironment >StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); > >Schema schema = new Schema() >//.field("id", "VARCHAR").from("id") >.field("id", "STRING") >//.field("name", "VARCHAR") >.field("amount", "DOUBLE") > .field("proctime", Types.SQL_TIMESTAMP).proctime() >//.field("rowtime", Types.SQL_TIMESTAMP) > //.rowtime( > // new Rowtime() > // .timestampsFromField( > // "eventtime") > // .watermarksPeriodicBounded(2000)) >; > >// "0.8", "0.9", "0.10", "0.11", and "universal" >tableEnv.connect(new Kafka().version("universal") > .topic("source0511") > .property("zookeeper.connect", "172.16.44.28:7758") > .property("bootstrap.servers", "172.16.44.28:9096") > .property("group.id", "source0511-group") > .startFromEarliest() > ) > .withFormat(new Csv()) > .withSchema(schema) > .inAppendMode() > .createTemporaryTable("sourceTable"); > >tableEnv.connect( >new Kafka() > .version("universal") >// "0.8", "0.9", "0.10", "0.11", and "universal" >.topic("sink0511") > .property("acks", "all") > .property("retries", "0") > .property("batch.size", "16384") > .property("linger.ms", "10") > .property("zookeeper.connect", "172.16.44.28:7758") > .property("bootstrap.servers", "172.16.44.28:9096") > .sinkPartitionerFixed()) > .inAppendMode() > .withFormat(new Json()) > .withSchema( >new Schema().field("totalamount", "DOUBLE") >//.field("total", "INT") >.field("time", Types.SQL_TIMESTAMP) > ) > .createTemporaryTable("sinkTable"); > >tableEnv.sqlUpdate("insert into sinkTable" >+ " select sum(amount),TUMBLE_END(proctime, INTERVAL '5' SECOND) " >+ "from sourceTable group by TUMBLE(proctime, INTERVAL '5' SECOND)"); >//SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) > // FROM user_actions > // GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); >env.execute("test"); |
Free forum by Nabble | Edit this page |