Hi all:
建表语句 create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table` varchar, `ServerId` varchar, `Database` varchar, `OldData` varchar, `GTID` varchar, `Offset` varchar, `event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'yyyy-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS event_ts - interval '60' second ) with(…) 查询语句 insert into sinkTable from Select * from source_table; 报错信息: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SinkConversion$51.processElement(Unknown Source) …… 最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。 请问这个问题可以避免吗? |
Hi lucas,
看起来这个是query event_time字段的bug,TimeIndicatorTypeInfo导致的问题。 如果你用的是1.10,可以建个JIRA来跟踪这个问题。 Best, Jingsong Lee On Fri, Mar 20, 2020 at 11:40 AM lucas.wu <[hidden email]> wrote: > Hi all: > 建表语句 > create table `source_table`( > `SeqNo` varchar, > `Type` varchar, > `Table` varchar, > `ServerId` varchar, > `Database` varchar, > `OldData` varchar, > `GTID` varchar, > `Offset` varchar, > `event_ts` as > to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'yyyy-MM-ddHH:mm:ss'), > WATERMARK FOR event_ts AS event_ts - interval '60' second > ) with(…) > > > 查询语句 > insert into sinkTable from Select * from source_table; > > > > 报错信息: > java.lang.ClassCastException: java.sql.Timestamp cannot be cast to > java.lang.Long at > org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at SinkConversion$51.processElement(Unknown Source) > …… > > > > 最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。 > 请问这个问题可以避免吗? -- Best, Jingsong Lee |
Administrator
|
In reply to this post by lucas.wu
Hi,
请问使用的是 blink planner 么?可以把 sinkTable 的定义也发一下吗? Best, Jark On Fri, 20 Mar 2020 at 11:40, lucas.wu <[hidden email]> wrote: > Hi all: > 建表语句 > create table `source_table`( > `SeqNo` varchar, > `Type` varchar, > `Table` varchar, > `ServerId` varchar, > `Database` varchar, > `OldData` varchar, > `GTID` varchar, > `Offset` varchar, > `event_ts` as > to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'yyyy-MM-ddHH:mm:ss'), > WATERMARK FOR event_ts AS event_ts - interval '60' second > ) with(…) > > > 查询语句 > insert into sinkTable from Select * from source_table; > > > > 报错信息: > java.lang.ClassCastException: java.sql.Timestamp cannot be cast to > java.lang.Long at > org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at SinkConversion$51.processElement(Unknown Source) > …… > > > > 最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。 > 请问这个问题可以避免吗? |
In reply to this post by lucas.wu
是的 使用的是blink planner。因为我基于flink的基础上又做了一些简单的开发,所以sinkTable的schmea我是先读取了Select * from source_table,
然后把它注册成了一个临时表,然后把这个临时表的schema赋给sinktable,sinkTable同时也继承了RetractStreamTableSink[Row]。 这是他的一个operator连接图 Source: KafkaTableSource(SeqNo, Type, Table, ServerId, Database, OldData, GTID, Data, Timestamp, Offset) - SourceConversion(table=[default_catalog.default_database.source_table, source: [KafkaTableSource(SeqNo, Type, Table, ServerId, Database, OldData, GTID, Data, Timestamp, Offset)]], fields=[SeqNo, Type, Table, ServerId, Database, OldData, GTID, Data, Timestamp, Offset]) - Calc(select=[SeqNo, Type, Table, ServerId, Database, OldData, GTID, Data, Timestamp, Offset, from_unixtime((Data.FuiUpdateTime / 1000)) AS FuiUpdateTimeSec, (from_unixtime((Data.FuiUpdateTime / 1000)) TO_TIMESTAMP _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS event_ts]) - WatermarkAssigner(rowtime=[event_ts], watermark=[(event_ts - 60000:INTERVAL SECOND)]) - Calc(select=[event_ts]) - SinkConversionToTuple2 - Sink: ConsoleTableSink(event_ts) 目前从报错信息看,可能是SinkConversionToTuple2这个operator有点问题。 这个算子的 inTypeInfo是BaseRow(event_ts: TIMESTAMP(3) *ROWTIME*) outTypeInfo是Java Tuple2Boolean, Row(event_ts: TimeIndicatorTypeInfo(rowtime)) 这两种对TimeIndicatorTypeInfo序列化方式是不一样的。 一个使用BaseRowSerializer会将TimeIndicatorTypeInfo的序列化方式设置成SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer。 所以我猜测是这里出现了问题。 原始邮件 发件人:Jark [hidden email] 收件人:[hidden email] 发送时间:2020年3月20日(周五) 14:21 主题:Re: rowtime 的类型序列化问题 Hi, 请问使用的是 blink planner 么?可以把 sinkTable 的定义也发一下吗? Best, Jark On Fri, 20 Mar 2020 at 11:40, lucas.wu [hidden email] wrote: Hi all: 建表语句 create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table` varchar, `ServerId` varchar, `Database` varchar, `OldData` varchar, `GTID` varchar, `Offset` varchar, `event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'yyyy-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS event_ts - interval '60' second ) with(…) 查询语句 insert into sinkTable from Select * from source_table; 报错信息: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SinkConversion$51.processElement(Unknown Source) …… 最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。 请问这个问题可以避免吗? |
我跟你使用的方法一样,也是加工数据源创建临时view然后传递到sink,其中用到了rowtime,遇到和你同样的错,请问是怎么解决的最后
-- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by lucas.wu
|
Free forum by Nabble | Edit this page |