Hi,all
kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值, 我的问题是: 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK 2、如果没有,后期是否有相应的规划 测试版本:Flink 1.10 在Flink 1.10中测试SQL如下: CREATE TABLE session_login ( deal_time TIMESTAMP(3) ,aaVARCHAR ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND )WITH( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'topic', 'connector.startup-mode' = 'latest-offset', 'connector.properties.group.id' = 'group.id', 'connector.properties.zookeeper.connect' = 'ip:port', 'connector.properties.bootstrap.servers' = 'ip:port', 'format.type' = 'json', 'format.derive-schema' = 'true' ); 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常: Caused by: java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) Caused by: java.time.format.DateTimeParseException: Text '1589420545' could not be parsed at index 0 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) ... 7 more 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下: Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 4, column 33 to line 4, column 64: Cannot apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840) at org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280) at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96) at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130) at org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255) at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668) at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432) at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518) at org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5599) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5586) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1690) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1675) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:946) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:929) at org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at com.douyu.ocean.flink.streaming.core.Main.callCreateTable(Main.java:169) at com.douyu.ocean.flink.streaming.core.Main.callCommand(Main.java:149) at com.douyu.ocean.flink.streaming.core.Main.run(Main.java:108) at com.douyu.ocean.flink.streaming.core.Main.main(Main.java:47) Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 29 more [hidden email] |
你可以通过计算列写一个udf把long转成timestamp类型,在这个计算列上定义watermark就可以了。
[hidden email] <[hidden email]> 于2020年5月14日周四 上午10:02写道: > Hi,all > kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值, > 我的问题是: > 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK > 2、如果没有,后期是否有相应的规划 > > 测试版本:Flink 1.10 > > > 在Flink 1.10中测试SQL如下: > CREATE TABLE session_login ( > deal_time TIMESTAMP(3) > ,aaVARCHAR > ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND > )WITH( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'topic', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.group.id' = 'group.id', > 'connector.properties.zookeeper.connect' = 'ip:port', > 'connector.properties.bootstrap.servers' = 'ip:port', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > > 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常: > Caused by: java.io.IOException: Failed to deserialize JSON object. > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > Caused by: java.time.format.DateTimeParseException: Text '1589420545' > could not be parsed at index 0 > at > java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) > at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) > ... 7 more > > 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下: > Exception in thread "main" > org.apache.calcite.runtime.CalciteContextException: From line 4, column 33 > to line 4, column 64: Cannot apply '-' to arguments of type '<BIGINT> - > <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>' > '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' > '<DATETIME> - <DATETIME_INTERVAL>' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840) > at > org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280) > at > org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96) > at > org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130) > at > org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255) > at > org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668) > at > org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432) > at > org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518) > at > org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5599) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5586) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1690) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1675) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:946) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:929) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509) > at java.util.Optional.ifPresent(Optional.java:159) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at > com.douyu.ocean.flink.streaming.core.Main.callCreateTable(Main.java:169) > at > com.douyu.ocean.flink.streaming.core.Main.callCommand(Main.java:149) > at com.douyu.ocean.flink.streaming.core.Main.run(Main.java:108) > at com.douyu.ocean.flink.streaming.core.Main.main(Main.java:47) > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot > apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported > form(s): '<NUMERIC> - <NUMERIC>' > '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' > '<DATETIME> - <DATETIME_INTERVAL>' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) > ... 29 more > > > > [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
非常感谢Benchao Li,使用UDF测试通过,SQL示例如下:
CREATE TABLE session_login ( ,deal_time BIGINT ,deal_time_obj as DY_FROM_UNIXTIME(deal_time*1000) ,WATERMARK FOR deal_time_obj AS deal_time_obj - INTERVAL '60' SECOND )WITH( ...... ) 其中DY_FROM_UNIXTIME负责将long转成timestamp类型 oliver zhang,云长 [hidden email] 发件人: Benchao Li 发送时间: 2020-05-14 10:23 收件人: user-zh 主题: Re: SQL DDL怎样使用Long类型的时间戳作为事件时间 你可以通过计算列写一个udf把long转成timestamp类型,在这个计算列上定义watermark就可以了。 [hidden email] <[hidden email]> 于2020年5月14日周四 上午10:02写道: > Hi,all > kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值, > 我的问题是: > 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK > 2、如果没有,后期是否有相应的规划 > > 测试版本:Flink 1.10 > > > 在Flink 1.10中测试SQL如下: > CREATE TABLE session_login ( > deal_time TIMESTAMP(3) > ,aaVARCHAR > ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND > )WITH( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'topic', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.group.id' = 'group.id', > 'connector.properties.zookeeper.connect' = 'ip:port', > 'connector.properties.bootstrap.servers' = 'ip:port', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > > 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常: > Caused by: java.io.IOException: Failed to deserialize JSON object. > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > Caused by: java.time.format.DateTimeParseException: Text '1589420545' > could not be parsed at index 0 > at > java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) > at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > at > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) > ... 7 more > > 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下: > Exception in thread "main" > org.apache.calcite.runtime.CalciteContextException: From line 4, column 33 > to line 4, column 64: Cannot apply '-' to arguments of type '<BIGINT> - > <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>' > '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' > '<DATETIME> - <DATETIME_INTERVAL>' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) > at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840) > at > org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280) > at > org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96) > at > org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130) > at > org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255) > at > org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668) > at > org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432) > at > org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518) > at > org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5599) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5586) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1690) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1675) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:946) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:929) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509) > at java.util.Optional.ifPresent(Optional.java:159) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at > com.douyu.ocean.flink.streaming.core.Main.callCreateTable(Main.java:169) > at > com.douyu.ocean.flink.streaming.core.Main.callCommand(Main.java:149) > at com.douyu.ocean.flink.streaming.core.Main.run(Main.java:108) > at com.douyu.ocean.flink.streaming.core.Main.main(Main.java:47) > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot > apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported > form(s): '<NUMERIC> - <NUMERIC>' > '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' > '<DATETIME> - <DATETIME_INTERVAL>' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) > ... 29 more > > > > [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
> 在 2020年5月14日,上午11:12,[hidden email] 写道: > > 非常感谢Benchao Li,使用UDF测试通过,SQL示例如下: > > CREATE TABLE session_login ( > ,deal_time BIGINT > ,deal_time_obj as DY_FROM_UNIXTIME(deal_time*1000) > ,WATERMARK FOR deal_time_obj AS deal_time_obj - INTERVAL '60' SECOND > )WITH( > ...... > ) > > 其中DY_FROM_UNIXTIME负责将long转成timestamp类型 > > oliver zhang,云长 > [hidden email] > > 发件人: Benchao Li > 发送时间: 2020-05-14 10:23 > 收件人: user-zh > 主题: Re: SQL DDL怎样使用Long类型的时间戳作为事件时间 > 你可以通过计算列写一个udf把long转成timestamp类型,在这个计算列上定义watermark就可以了。 > > [hidden email] <[hidden email]> 于2020年5月14日周四 上午10:02写道: > >> Hi,all >> kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值, >> 我的问题是: >> 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK >> 2、如果没有,后期是否有相应的规划 >> >> 测试版本:Flink 1.10 >> >> >> 在Flink 1.10中测试SQL如下: >> CREATE TABLE session_login ( >> deal_time TIMESTAMP(3) >> ,aaVARCHAR >> ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND >> )WITH( >> 'connector.type' = 'kafka', >> 'connector.version' = 'universal', >> 'connector.topic' = 'topic', >> 'connector.startup-mode' = 'latest-offset', >> 'connector.properties.group.id' = 'group.id', >> 'connector.properties.zookeeper.connect' = 'ip:port', >> 'connector.properties.bootstrap.servers' = 'ip:port', >> 'format.type' = 'json', >> 'format.derive-schema' = 'true' >> ); >> >> 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常: >> Caused by: java.io.IOException: Failed to deserialize JSON object. >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) >> at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) >> Caused by: java.time.format.DateTimeParseException: Text '1589420545' >> could not be parsed at index 0 >> at >> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) >> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) >> ... 7 more >> >> 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下: >> Exception in thread "main" >> org.apache.calcite.runtime.CalciteContextException: From line 4, column 33 >> to line 4, column 64: Cannot apply '-' to arguments of type '<BIGINT> - >> <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>' >> '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' >> '<DATETIME> - <DATETIME_INTERVAL>' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) >> at >> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) >> at >> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840) >> at >> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280) >> at >> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96) >> at >> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130) >> at >> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255) >> at >> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668) >> at >> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432) >> at >> org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518) >> at >> org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5599) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5586) >> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1690) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1675) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:946) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:929) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509) >> at java.util.Optional.ifPresent(Optional.java:159) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) >> at >> com.douyu.ocean.flink.streaming.core.Main.callCreateTable(Main.java:169) >> at >> com.douyu.ocean.flink.streaming.core.Main.callCommand(Main.java:149) >> at com.douyu.ocean.flink.streaming.core.Main.run(Main.java:108) >> at com.douyu.ocean.flink.streaming.core.Main.main(Main.java:47) >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot >> apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported >> form(s): '<NUMERIC> - <NUMERIC>' >> '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' >> '<DATETIME> - <DATETIME_INTERVAL>' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) >> at >> org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) >> ... 29 more >> >> >> >> [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] |
In reply to this post by 云长
Hi
目前还不支持,需要自己写个简单的udf转换下, 社区有个issue[1]在跟这个问题了 Best, Leonard Xu [1]https://issues.apache.org/jira/browse/FLINK-16889 <https://issues.apache.org/jira/browse/FLINK-16889> > 在 2020年5月14日,10:01,[hidden email] 写道: > > Hi,all > kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值, > 我的问题是: > 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK > 2、如果没有,后期是否有相应的规划 > > 测试版本:Flink 1.10 > > > 在Flink 1.10中测试SQL如下: > CREATE TABLE session_login ( > deal_time TIMESTAMP(3) > ,aaVARCHAR > ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND > )WITH( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'topic', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.group.id' = 'group.id', > 'connector.properties.zookeeper.connect' = 'ip:port', > 'connector.properties.bootstrap.servers' = 'ip:port', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > > 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常: > Caused by: java.io.IOException: Failed to deserialize JSON object. > at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) > at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) > at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > Caused by: java.time.format.DateTimeParseException: Text '1589420545' could not be parsed at index 0 > at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) > at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) > ... 7 more > > 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下: > Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 4, column 33 to line 4, column 64: Cannot apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>' > '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' > '<DATETIME> - <DATETIME_INTERVAL>' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) > at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840) > at org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280) > at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96) > at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130) > at org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255) > at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668) > at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432) > at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518) > at org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) > at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5599) > at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5586) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1690) > at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1675) > at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:946) > at org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:929) > at org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509) > at java.util.Optional.ifPresent(Optional.java:159) > at org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505) > at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177) > at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) > at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at com.douyu.ocean.flink.streaming.core.Main.callCreateTable(Main.java:169) > at com.douyu.ocean.flink.streaming.core.Main.callCommand(Main.java:149) > at com.douyu.ocean.flink.streaming.core.Main.run(Main.java:108) > at com.douyu.ocean.flink.streaming.core.Main.main(Main.java:47) > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '-' to arguments of type '<BIGINT> - <INTERVAL SECOND>'. Supported form(s): '<NUMERIC> - <NUMERIC>' > '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' > '<DATETIME> - <DATETIME_INTERVAL>' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) > ... 29 more > > > > [hidden email] |
Free forum by Nabble | Edit this page |