SQL DDL怎样使用Long类型的时间戳作为事件时间

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

SQL DDL怎样使用Long类型的时间戳作为事件时间

云长
Hi,all
    kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值,
我的问题是:
1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK
2、如果没有,后期是否有相应的规划

测试版本:Flink 1.10


在Flink 1.10中测试SQL如下:
CREATE TABLE session_login (
    deal_time TIMESTAMP(3)
    ,aaVARCHAR
    ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND
 )WITH(
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'topic',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.group.id' = 'group.id',
    'connector.properties.zookeeper.connect' = 'ip:port',
    'connector.properties.bootstrap.servers' = 'ip:port',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
 );

如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常:
Caused by: java.io.IOException: Failed to deserialize JSON object.
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.time.format.DateTimeParseException: Text '1589420545' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more

如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下:
Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 4, column 33 to line 4, column 64: Cannot apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>'
'<DATETIME_INTERVAL> - <DATETIME_INTERVAL>'
'<DATETIME> - <DATETIME_INTERVAL>'
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
        at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840)
        at org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280)
        at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96)
        at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130)
        at org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255)
        at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668)
        at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432)
        at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518)
        at org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5599)
        at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5586)
        at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1690)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1675)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:946)
        at org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:929)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509)
        at java.util.Optional.ifPresent(Optional.java:159)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
        at com.douyu.ocean.flink.streaming.core.Main.callCreateTable(Main.java:169)
        at com.douyu.ocean.flink.streaming.core.Main.callCommand(Main.java:149)
        at com.douyu.ocean.flink.streaming.core.Main.run(Main.java:108)
        at com.douyu.ocean.flink.streaming.core.Main.main(Main.java:47)
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>'
'<DATETIME_INTERVAL> - <DATETIME_INTERVAL>'
'<DATETIME> - <DATETIME_INTERVAL>'
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
        at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
        ... 29 more



[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: SQL DDL怎样使用Long类型的时间戳作为事件时间

Benchao Li
你可以通过计算列写一个udf把long转成timestamp类型,在这个计算列上定义watermark就可以了。

[hidden email] <[hidden email]> 于2020年5月14日周四 上午10:02写道:

> Hi,all
>     kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值,
> 我的问题是:
> 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK
> 2、如果没有,后期是否有相应的规划
>
> 测试版本:Flink 1.10
>
>
> 在Flink 1.10中测试SQL如下:
> CREATE TABLE session_login (
>     deal_time TIMESTAMP(3)
>     ,aaVARCHAR
>     ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND
>  )WITH(
>     'connector.type' = 'kafka',
>     'connector.version' = 'universal',
>     'connector.topic' = 'topic',
>     'connector.startup-mode' = 'latest-offset',
>     'connector.properties.group.id' = 'group.id',
>     'connector.properties.zookeeper.connect' = 'ip:port',
>     'connector.properties.bootstrap.servers' = 'ip:port',
>     'format.type' = 'json',
>     'format.derive-schema' = 'true'
>  );
>
> 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常:
> Caused by: java.io.IOException: Failed to deserialize JSON object.
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> Caused by: java.time.format.DateTimeParseException: Text '1589420545'
> could not be parsed at index 0
> at
> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> ... 7 more
>
> 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下:
> Exception in thread "main"
> org.apache.calcite.runtime.CalciteContextException: From line 4, column 33
> to line 4, column 64: Cannot apply '-' to arguments of type '<BIGINT> -
> <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>'
> '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>'
> '<DATETIME> - <DATETIME_INTERVAL>'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>         at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
>         at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840)
>         at
> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280)
>         at
> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96)
>         at
> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130)
>         at
> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255)
>         at
> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668)
>         at
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432)
>         at
> org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518)
>         at
> org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5599)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5586)
>         at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1690)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1675)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:946)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:929)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509)
>         at java.util.Optional.ifPresent(Optional.java:159)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>         at
> com.douyu.ocean.flink.streaming.core.Main.callCreateTable(Main.java:169)
>         at
> com.douyu.ocean.flink.streaming.core.Main.callCommand(Main.java:149)
>         at com.douyu.ocean.flink.streaming.core.Main.run(Main.java:108)
>         at com.douyu.ocean.flink.streaming.core.Main.main(Main.java:47)
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot
> apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported
> form(s): '<NUMERIC> - <NUMERIC>'
> '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>'
> '<DATETIME> - <DATETIME_INTERVAL>'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>         at
> org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>         ... 29 more
>
>
>
> [hidden email]



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re: SQL DDL怎样使用Long类型的时间戳作为事件时间

云长
非常感谢Benchao Li,使用UDF测试通过,SQL示例如下:

CREATE TABLE session_login (
    ,deal_time BIGINT
    ,deal_time_obj as DY_FROM_UNIXTIME(deal_time*1000)
    ,WATERMARK FOR deal_time_obj AS deal_time_obj - INTERVAL '60' SECOND
)WITH(
  ......
)

其中DY_FROM_UNIXTIME负责将long转成timestamp类型

oliver zhang,云长
[hidden email]
 
发件人: Benchao Li
发送时间: 2020-05-14 10:23
收件人: user-zh
主题: Re: SQL DDL怎样使用Long类型的时间戳作为事件时间
你可以通过计算列写一个udf把long转成timestamp类型,在这个计算列上定义watermark就可以了。
 
[hidden email] <[hidden email]> 于2020年5月14日周四 上午10:02写道:
 

> Hi,all
>     kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值,
> 我的问题是:
> 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK
> 2、如果没有,后期是否有相应的规划
>
> 测试版本:Flink 1.10
>
>
> 在Flink 1.10中测试SQL如下:
> CREATE TABLE session_login (
>     deal_time TIMESTAMP(3)
>     ,aaVARCHAR
>     ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND
>  )WITH(
>     'connector.type' = 'kafka',
>     'connector.version' = 'universal',
>     'connector.topic' = 'topic',
>     'connector.startup-mode' = 'latest-offset',
>     'connector.properties.group.id' = 'group.id',
>     'connector.properties.zookeeper.connect' = 'ip:port',
>     'connector.properties.bootstrap.servers' = 'ip:port',
>     'format.type' = 'json',
>     'format.derive-schema' = 'true'
>  );
>
> 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常:
> Caused by: java.io.IOException: Failed to deserialize JSON object.
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> Caused by: java.time.format.DateTimeParseException: Text '1589420545'
> could not be parsed at index 0
> at
> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> ... 7 more
>
> 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下:
> Exception in thread "main"
> org.apache.calcite.runtime.CalciteContextException: From line 4, column 33
> to line 4, column 64: Cannot apply '-' to arguments of type '<BIGINT> -
> <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>'
> '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>'
> '<DATETIME> - <DATETIME_INTERVAL>'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>         at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
>         at
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840)
>         at
> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280)
>         at
> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96)
>         at
> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130)
>         at
> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255)
>         at
> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668)
>         at
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432)
>         at
> org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518)
>         at
> org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5599)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5586)
>         at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1690)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1675)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:946)
>         at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:929)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509)
>         at java.util.Optional.ifPresent(Optional.java:159)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>         at
> com.douyu.ocean.flink.streaming.core.Main.callCreateTable(Main.java:169)
>         at
> com.douyu.ocean.flink.streaming.core.Main.callCommand(Main.java:149)
>         at com.douyu.ocean.flink.streaming.core.Main.run(Main.java:108)
>         at com.douyu.ocean.flink.streaming.core.Main.main(Main.java:47)
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot
> apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported
> form(s): '<NUMERIC> - <NUMERIC>'
> '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>'
> '<DATETIME> - <DATETIME_INTERVAL>'
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>         at
> org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>         ... 29 more
>
>
>
> [hidden email]
 
 
 
--
 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: SQL DDL怎样使用Long类型的时间戳作为事件时间

fanchuanpo-163


> 在 2020年5月14日,上午11:12,[hidden email] 写道:
>
> 非常感谢Benchao Li,使用UDF测试通过,SQL示例如下:
>
> CREATE TABLE session_login (
>     ,deal_time BIGINT
>     ,deal_time_obj as DY_FROM_UNIXTIME(deal_time*1000)
>     ,WATERMARK FOR deal_time_obj AS deal_time_obj - INTERVAL '60' SECOND
> )WITH(
>   ......
> )
>
> 其中DY_FROM_UNIXTIME负责将long转成timestamp类型
>
> oliver zhang,云长
> [hidden email]
>  
> 发件人: Benchao Li
> 发送时间: 2020-05-14 10:23
> 收件人: user-zh
> 主题: Re: SQL DDL怎样使用Long类型的时间戳作为事件时间
> 你可以通过计算列写一个udf把long转成timestamp类型,在这个计算列上定义watermark就可以了。
>  
> [hidden email] <[hidden email]> 于2020年5月14日周四 上午10:02写道:
>  
>> Hi,all
>>      kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值,
>> 我的问题是:
>> 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK
>> 2、如果没有,后期是否有相应的规划
>>
>> 测试版本:Flink 1.10
>>
>>
>> 在Flink 1.10中测试SQL如下:
>> CREATE TABLE session_login (
>>      deal_time TIMESTAMP(3)
>>      ,aaVARCHAR
>>      ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND
>>   )WITH(
>>      'connector.type' = 'kafka',
>>      'connector.version' = 'universal',
>>      'connector.topic' = 'topic',
>>      'connector.startup-mode' = 'latest-offset',
>>      'connector.properties.group.id' = 'group.id',
>>      'connector.properties.zookeeper.connect' = 'ip:port',
>>      'connector.properties.bootstrap.servers' = 'ip:port',
>>      'format.type' = 'json',
>>      'format.derive-schema' = 'true'
>>   );
>>
>> 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常:
>> Caused by: java.io.IOException: Failed to deserialize JSON object.
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>> Caused by: java.time.format.DateTimeParseException: Text '1589420545'
>> could not be parsed at index 0
>> at
>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>> ... 7 more
>>
>> 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下:
>> Exception in thread "main"
>> org.apache.calcite.runtime.CalciteContextException: From line 4, column 33
>> to line 4, column 64: Cannot apply '-' to arguments of type '<BIGINT> -
>> <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>'
>> '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>'
>> '<DATETIME> - <DATETIME_INTERVAL>'
>>          at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>>          at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>          at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>          at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>          at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>>          at
>> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
>>          at
>> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
>>          at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840)
>>          at
>> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280)
>>          at
>> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96)
>>          at
>> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130)
>>          at
>> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255)
>>          at
>> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668)
>>          at
>> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432)
>>          at
>> org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518)
>>          at
>> org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
>>          at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5599)
>>          at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5586)
>>          at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>>          at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1690)
>>          at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1675)
>>          at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:946)
>>          at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:929)
>>          at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509)
>>          at java.util.Optional.ifPresent(Optional.java:159)
>>          at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505)
>>          at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
>>          at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
>>          at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>          at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>>          at
>> com.douyu.ocean.flink.streaming.core.Main.callCreateTable(Main.java:169)
>>          at
>> com.douyu.ocean.flink.streaming.core.Main.callCommand(Main.java:149)
>>          at com.douyu.ocean.flink.streaming.core.Main.run(Main.java:108)
>>          at com.douyu.ocean.flink.streaming.core.Main.main(Main.java:47)
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot
>> apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported
>> form(s): '<NUMERIC> - <NUMERIC>'
>> '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>'
>> '<DATETIME> - <DATETIME_INTERVAL>'
>>          at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>>          at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>          at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>          at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>          at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>>          at
>> org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>>          ... 29 more
>>
>>
>>
>> [hidden email]
>  
>  
>  
> --
>  
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: SQL DDL怎样使用Long类型的时间戳作为事件时间

Leonard Xu
In reply to this post by 云长
Hi
目前还不支持,需要自己写个简单的udf转换下,
社区有个issue[1]在跟这个问题了


Best,
Leonard Xu
[1]https://issues.apache.org/jira/browse/FLINK-16889 <https://issues.apache.org/jira/browse/FLINK-16889>


> 在 2020年5月14日,10:01,[hidden email] 写道:
>
> Hi,all
>     kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值,
> 我的问题是:
> 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK
> 2、如果没有,后期是否有相应的规划
>
> 测试版本:Flink 1.10
>
>
> 在Flink 1.10中测试SQL如下:
> CREATE TABLE session_login (
>     deal_time TIMESTAMP(3)
>     ,aaVARCHAR
>     ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND
>  )WITH(
>     'connector.type' = 'kafka',
>     'connector.version' = 'universal',
>     'connector.topic' = 'topic',
>     'connector.startup-mode' = 'latest-offset',
>     'connector.properties.group.id' = 'group.id',
>     'connector.properties.zookeeper.connect' = 'ip:port',
>     'connector.properties.bootstrap.servers' = 'ip:port',
>     'format.type' = 'json',
>     'format.derive-schema' = 'true'
>  );
>
> 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常:
> Caused by: java.io.IOException: Failed to deserialize JSON object.
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> Caused by: java.time.format.DateTimeParseException: Text '1589420545' could not be parsed at index 0
> at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> ... 7 more
>
> 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下:
> Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 4, column 33 to line 4, column 64: Cannot apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>'
> '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>'
> '<DATETIME> - <DATETIME_INTERVAL>'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840)
> at org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280)
> at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96)
> at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130)
> at org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255)
> at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668)
> at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432)
> at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518)
> at org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
> at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5599)
> at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5586)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1690)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1675)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:946)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:929)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509)
> at java.util.Optional.ifPresent(Optional.java:159)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
> at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at com.douyu.ocean.flink.streaming.core.Main.callCreateTable(Main.java:169)
> at com.douyu.ocean.flink.streaming.core.Main.callCommand(Main.java:149)
> at com.douyu.ocean.flink.streaming.core.Main.run(Main.java:108)
> at com.douyu.ocean.flink.streaming.core.Main.main(Main.java:47)
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>'
> '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>'
> '<DATETIME> - <DATETIME_INTERVAL>'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> ... 29 more
>
>
>
> [hidden email]