*我的代码如下*
其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。 package com.athub.dcpoints.scala.connector.table.hive import com.athub.dcpoints.java.model.KafkaDcpoints import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment} import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.flink.table.descriptors._ /** * @author Nash Cen * @date 2020/9/22 10:33 * @version 1.0 */ object KafkaFlinkHiveScalaApp { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME","hdfs") // 1. 获取执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv: TableEnvironment = StreamTableEnvironment.create(env,settings) // 2. 注册 KafkaSource 表 tableEnv.connect(new Kafka() .version("universal") .topic("ods_dcpoints_dev") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") //.property("group.id", "testGroup") .startFromEarliest() ) .withFormat(new Json()) .withSchema(new Schema() .field("assetSpecId", DataTypes.STRING()) .field("dcnum", DataTypes.STRING()) .field("monitorType", DataTypes.STRING()) .field("tagNo", DataTypes.STRING()) .field("updateTime", DataTypes.BIGINT()) .field("value", DataTypes.STRING()) ) .createTemporaryTable("kafka_source_table") // 3. 查询转换 // 使用 sqlQuery val selectKafkaTable: Table = tableEnv.sqlQuery(s"select assetSpecId,dcnum,monitorType,tagNo,updateTime from kafka_source_table") selectKafkaTable.toAppendStream[(String,String,String,String,BigInt)].print("selectKafka") env.execute("KafkaFlinkHiveScalaApp") } } *运行时报错信息如下:* [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class scala.math.BigInt does not contain a setter for field bigInteger [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class scala.math.BigInt cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. Exception in thread "main" org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. at org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
试一下java的BigInteger呢 ________________________________ 发件人: nashcen <[hidden email]> 发送时间: 2020年9月22日 16:29:41 收件人: [hidden email] 主题: Flink-1.11.1 Kafka Table API BigInt 问题 *我的代码如下* 其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。 package com.athub.dcpoints.scala.connector.table.hive import com.athub.dcpoints.java.model.KafkaDcpoints import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment} import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.flink.table.descriptors._ /** * @author Nash Cen * @date 2020/9/22 10:33 * @version 1.0 */ object KafkaFlinkHiveScalaApp { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME","hdfs") // 1. 获取执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv: TableEnvironment = StreamTableEnvironment.create(env,settings) // 2. 注册 KafkaSource 表 tableEnv.connect(new Kafka() .version("universal") .topic("ods_dcpoints_dev") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") //.property("group.id", "testGroup") .startFromEarliest() ) .withFormat(new Json()) .withSchema(new Schema() .field("assetSpecId", DataTypes.STRING()) .field("dcnum", DataTypes.STRING()) .field("monitorType", DataTypes.STRING()) .field("tagNo", DataTypes.STRING()) .field("updateTime", DataTypes.BIGINT()) .field("value", DataTypes.STRING()) ) .createTemporaryTable("kafka_source_table") // 3. 查询转换 // 使用 sqlQuery val selectKafkaTable: Table = tableEnv.sqlQuery(s"select assetSpecId,dcnum,monitorType,tagNo,updateTime from kafka_source_table") selectKafkaTable.toAppendStream[(String,String,String,String,BigInt)].print("selectKafka") env.execute("KafkaFlinkHiveScalaApp") } } *运行时报错信息如下:* [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class scala.math.BigInt does not contain a setter for field bigInteger [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class scala.math.BigInt cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. Exception in thread "main" org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. at org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |