近期进行Flink升级,将原来的程序从老的集群(1.8.0运行正常)迁移到新的集群(1.9.1)中。在部署程序的时候发现在1.9.1的集群中,原来运行正常的Flink SQL的程序无法执行,异常如下:
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85) at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
跟踪到反编译的scala文件并设置断点,发现下图中的红框部分没有执行,直接跳过。
功能很简单,就是在某个时间窗对数值求和。测试用例如下:
package org.flowmatrix.isp.traffic.accounting.test; |
Hi,
建议你翻译成英文然后到jira里建个issue。 Best, Kurt On Thu, Dec 12, 2019 at 11:39 PM 李佟 <[hidden email]> wrote: > 近期进行Flink升级,将原来的程序从老的集群(1.8.0运行正常)迁移到新的集群(1.9.1)中。在部署程序的时候发现在1.9.1的集群中,原来运行正常的Flink > SQL的程序无法执行,异常如下: > > > org.apache.flink.table.api.ValidationException: *Window can only be > defined over a time attribute column.* > at > org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85) > > at > org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90) > > > > 跟踪到反编译的scala文件并设置断点,发现下图中的红框部分没有执行,直接跳过。 > > > 功能很简单,就是在某个时间窗对数值求和。测试用例如下: > > > package org.flowmatrix.isp.traffic.accounting.test; > > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.scala.typeutils.Types; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; > import org.apache.flink.streaming.api.watermark.Watermark; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.TableSchema; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.table.sinks.CsvTableSink; > import org.apache.flink.table.sinks.TableSink; > import org.apache.flink.table.sources.DefinedRowtimeAttributes; > import org.apache.flink.table.sources.RowtimeAttributeDescriptor; > import org.apache.flink.table.sources.StreamTableSource; > import org.apache.flink.table.sources.tsextractors.ExistingField; > import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps; > import org.apache.flink.types.Row; > import org.junit.Test; > > import javax.annotation.Nullable; > import java.sql.Timestamp; > import java.util.ArrayList; > import java.util.Collections; > import java.util.List; > > public class TestSql { > @Test > public void testAccountingSql() { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.setParallelism(1); > > try { > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > > SimpleTableSource source = new SimpleTableSource(); > Table t = tableEnv.fromTableSource(source); > > String interval = "5"; //5 second > System.out.println("source schema is " + > source.getTableSchema()); > > Table sqlResult = tableEnv.sqlQuery("SELECT " + > " TUMBLE_START(UserActionTime, INTERVAL '" + interval > + "' SECOND) as rowTime, " + > " Username," + > " SUM(Data) as Data " + > " FROM " + t + > " GROUP BY TUMBLE(UserActionTime, INTERVAL '" + > interval + "' SECOND),Username"); > > > String[] fieldNames = { > "rowTime", > "Username", "Data"}; > TypeInformation[] fieldTypes = { > TypeInformation.of(Timestamp.class), > TypeInformation.of(String.class), > TypeInformation.of(Long.class)}; > > TableSink sink1 = new CsvTableSink("/tmp/data.log", ","); > sink1 = sink1.configure(fieldNames, fieldTypes); > tableEnv.registerTableSink("EsSinkTable", sink1); > System.out.println("sql result schema is " + > sqlResult.getSchema()); > > tableEnv.sqlUpdate("insert into EsSinkTable select " + > "rowTime,Username,Data from " + sqlResult + ""); > > env.execute("test"); > } catch (Exception e) { > e.printStackTrace(); > System.err.println("start program error. FlowMatrix > --zookeeper <zookeeperAdress> --config <configpath>" + > " --name <jobName> --interval <intervalInMinute> > --indexName <indexName>"); > System.err.println(e.toString()); > return; > } > } > > public static class SimpleTableSource implements > StreamTableSource<Row>, DefinedRowtimeAttributes { > @Override > public DataStream<Row> getDataStream(StreamExecutionEnvironment > env) { > return > env.fromCollection(genertateData()).assignTimestampsAndWatermarks(new > AssignerWithPunctuatedWatermarks<Row>() { > private long lastWaterMarkMillSecond = -1; > private long waterMarkPeriodMillSecond = 1000; > @Nullable > @Override > public Watermark checkAndGetNextWatermark(Row lastElement, > long extractedTimestamp) { > if(extractedTimestamp - lastWaterMarkMillSecond >= > waterMarkPeriodMillSecond){ > lastWaterMarkMillSecond = extractedTimestamp; > return new Watermark(extractedTimestamp); > } > return null; > } > > @Override > public long extractTimestamp(Row element, long > previousElementTimestamp) { > return ((Long)element.getField(0))*1000; > } > }); > } > > @Override > public TableSchema getTableSchema() { > TableSchema schema = TableSchema.builder() > .field("Username", Types.STRING()) > .field("Data", Types.LONG()) > .field("UserActionTime", Types.SQL_TIMESTAMP()) > .build(); > return schema; > } > > @Override > public TypeInformation<Row> getReturnType() { > String[] names = new String[]{"Username", "Data", > "UserActionTime"}; > TypeInformation[] types = > new TypeInformation[]{Types.STRING(), Types.LONG(), > Types.SQL_TIMESTAMP()}; > return Types.ROW(names, types); > } > > > @Override > public List<RowtimeAttributeDescriptor> > getRowtimeAttributeDescriptors() { > RowtimeAttributeDescriptor rowtimeAttrDescr = new > RowtimeAttributeDescriptor( > "UserActionTime", > new ExistingField("UserActionTime"), > new AscendingTimestamps()); > List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = > Collections.singletonList(rowtimeAttrDescr); > return listRowtimeAttrDescr; > } > > > private static List<Row> genertateData() { > List<Row> rows = new ArrayList<>(); > long startTime = System.currentTimeMillis() / 1000 - 10000; > for (int i = 0; i < 10000; i++) { > rows.add(buildRecord(startTime, i)); > } > return rows; > } > > private static Row buildRecord(long startTime, int i) { > Row row = new Row(3); > row.setField(0, "fox"); //Username > row.setField(1, Math.random()); //Data > row.setField(2, startTime + i); //UserActionTime > return row; > } > } > } > > > |
Free forum by Nabble | Edit this page |