Hi! Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [aggregationTableSink] do not match. SQL = SELECT count(*) as cnt, TUMBLE_START(rowtime, INTERVAL '10' SECOND) as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND) 使用table.sqlQuery(SQL),返回的table schema 是 Query result schema: [cnt: Long, tumTime: Timestamp]。 而使用 JsonRowSchemaConverter.convert("{" + " type:'object'," + " properties:{" + " cnt: {" + " type: 'number'" + " }," + " tumTime:{" + " type:'string'," + " format:'date-time'" + " }" + " }" + “}"); 创建Elasticsearch6UpsertTableSink table schema 是 TableSink schema: [cnt: BigDecimal, tumTime: Timestamp] 而且我看了 JsonRowSchemaConverter.convert 所有的数字类型都被转成BigDecimal,导致SQL返回的schema 和 json定义的schema无法匹配。 请问是我使用的问题还是说框架存在这个问题? 附上源代码: public class AggregationFunction { public static void main(String[] args) { String sql = "SELECT count(*) as cnt, TUMBLE_START(rowtime, INTERVAL '10' SECOND) as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND)"; StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tenv = StreamTableEnvironment.create(senv); DataStream<User> source = senv.addSource(new SourceFunction<User>() { @Override public void run(SourceContext<User> sourceContext) throws Exception { int i = 1000; String[] names = {"Hanmeimei", "Lilei"}; while (i > 1) { sourceContext.collect(new User(names[i%2], i, new Timestamp(System.currentTimeMillis()))); Thread.sleep(10); i--; } } @Override public void cancel() { } }); tenv.registerDataStream("abc", source, "name, age, timestamp, rowtime.rowtime"); Table table = tenv.sqlQuery(sql); List<Host> hosts = Arrays.asList(new Host("10.20.128.210", 19201, "http")); TypeInformation<Row> typeInformation = JsonRowSchemaConverter.convert("{" + " type:'object'," + " properties:{" + " cnt: {" + " type: 'number'" + " }," + " tumTime:{" + " type:'string'," + " format:'date-time'" + " }" + " }" + "}"); RowTypeInfo typeInfo = (RowTypeInfo) typeInformation; TypeInformation<?>[] typeInformations = typeInfo.getFieldTypes(); String[] fieldNames = typeInfo.getFieldNames(); TableSchema.Builder builder = TableSchema.builder(); for (int i = 0; i < typeInformations.length; i ++) { builder.field(fieldNames[i], typeInformations[i]); } Elasticsearch6UpsertTableSink establesink = new Elasticsearch6UpsertTableSink( true, builder.build(), hosts, "aggregation", "data", "$", "n/a", new JsonRowSerializationSchema.Builder(typeInformation).build(), XContentType.JSON, new IgnoringFailureHandler(), new HashMap<>() ); tenv.registerTableSink("aggregationTableSink", establesink); table.insertInto("aggregationTableSink"); } @Data @AllArgsConstructor @NoArgsConstructor public static class User { private String name; private Integer age; private Timestamp timestamp; } } best wish! |
Free forum by Nabble | Edit this page |