刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢? stEnv.connect( new Kafka() .properties(TestKafkaUtils.getKafkaProperties()) .version("universal") .topic("test") .startFromLatest() ).withFormat(new Json() .failOnMissingField(false) ).withSchema( new Schema() .field("d", TypeInformation.of(Map.class)) ).inAppendMode().createTemporaryTable("t"); 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 'ANY<java.util.Map>'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢? root |-- d: LEGACY('RAW', 'ANY<java.util.Map>') 在 2020/8/11 下午4:23,“zhao liang”<[hidden email]> 写入: Hi,你图挂了,换个图床试试呢 发件人: Zhao,Yi(SEC) <[hidden email]> 日期: 星期二, 2020年8月11日 16:04 收件人: [hidden email] <[hidden email]> 主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下: [cid:image001.png@01D66FF8.F697E2D0] 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢? 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 'ANY<java.util.Map>'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢? root |-- d: LEGACY('RAW', 'ANY<java.util.Map>') |
Hi,
目前Json Format的实现就是假设json最外层是一个json object,暂时还无法做到顶层的所有字段无限扩展。 如果是在SQL里面,可以直接定义成map类型就可以,比如: ```SQL CREATE TABLE source ( d MAP<VARCHAR, VARCHAR> ) WITH (...) ``` Zhao,Yi(SEC) <[hidden email]> 于2020年8月11日周二 下午4:58写道: > 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下: > > 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。 > 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢? > > stEnv.connect( > new Kafka() > .properties(TestKafkaUtils.getKafkaProperties()) > .version("universal") > .topic("test") > .startFromLatest() > ).withFormat(new Json() > .failOnMissingField(false) > ).withSchema( > new Schema() > .field("d", TypeInformation.of(Map.class)) > ).inAppendMode().createTemporaryTable("t"); > > 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', > 'ANY<java.util.Map>'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢? > root > |-- d: LEGACY('RAW', 'ANY<java.util.Map>') > > > 在 2020/8/11 下午4:23,“zhao liang”<[hidden email]> 写入: > > Hi,你图挂了,换个图床试试呢 > > 发件人: Zhao,Yi(SEC) <[hidden email]> > 日期: 星期二, 2020年8月11日 16:04 > 收件人: [hidden email] <[hidden email]> > 主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题 > 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下: > [cid:image001.png@01D66FF8.F697E2D0] > 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。 > 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢? > > > 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', > 'ANY<java.util.Map>'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢? > root > |-- d: LEGACY('RAW', 'ANY<java.util.Map>') > > > -- Best, Benchao Li |
LEGACY('RAW', 'ANY<java.util.Map>')对应sql中数据类型改为:MAP<STRING,STRING>,仍然报错,异常:
org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. 方便说下具体实现细节吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi chuyuan,
可以详细描述下你遇到的问题么,比如下面这些信息 - 用的是哪个Flink版本 - SQL(包括DDL和query) - 数据是什么样子的 chuyuan <[hidden email]> 于2020年9月21日周一 下午2:40写道: > LEGACY('RAW', > 'ANY<java.util.Map>')对应sql中数据类型改为:MAP<STRING,STRING>,仍然报错,异常: > org.apache.flink.table.api.TableException: A raw type backed by type > information has no serializable string representation. It needs to be > resolved into a proper raw type. > 方便说下具体实现细节吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li |
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
{ "properties":{ "platformType":"APP", "$os":"iOS", "$screen_width":414, "$app_version":"1.0", "$is_first_day":false, "$model":"x86_64", "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3", "isLogin":false, "zrIdfa":"00000000-0000-0000-0000-000000000000", "$network_type":"WIFI", "$wifi":true, "$timezone_offset":-480, "$resume_from_background":false, "tdid":"", "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", "$screen_height":896, "$lib_version":"2.0.10", "$lib":"iOS", "$os_version":"13.4.1", "$manufacturer":"Apple", "$is_first_time":false, "$app_id":"Com.ziroom..ZRSensorsSDK" }, "type":"track", "lib":{ "$lib_version":"2.0.10", "$lib":"iOS", "$app_version":"1.0", "$lib_method":"autoTrack" } } 其中key为lib和properties的value是Json类型,其中字段可动态追加。 第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map<String,String>类型), @Data public static class CustomBuriedPointDTO { /** * 跟踪ID */ private Long track_id; /** * 事件时间 */ private Long event_time; /** * 类型 */ private String type; /** * 排重后Id */ private String distinct_id; /** * 匿名ID */ private String anonymous_id; /** * 包信息 */ private @DataTypeHint("RAW") Map<String, String> lib; /** * 事件 */ private String event; /** * 属性 */ // private Map<String, String> properties; private @DataTypeHint("RAW") Map<String, String> properties; /** * 刷新时间 */ private Long flush_time; /** * 事件日期 */ private String dt; /** * 封装数据对象中字段信息 */ public void assembly(CustomBuriedPointDO pointDO) { // 复制DO属性到DTO BeanUtils.copyProperties(pointDO, this); /* 转换特殊字段 */ // 设置分区日期 Long eventTimeLong = pointDO.getEvent_time(); if (eventTimeLong == null) { eventTimeLong = System.currentTimeMillis(); } Date eventTime = new Date(eventTimeLong); DateFormat dateFormatDate = new SimpleDateFormat("yyyy-MM-dd"); this.setDt(dateFormatDate.format(eventTime)); // json字段转换为Map类型 Map<String, String> propertiesMap = null; if (StringUtils.isNotBlank(pointDO.getProperties())) { propertiesMap = (Map<String, String>) JSON.parse(pointDO.getProperties()); } this.setProperties(propertiesMap); Map<String, String> libMap = null; if (StringUtils.isNotBlank(pointDO.getLib())) { libMap = (Map<String, String>) JSON.parse(pointDO.getLib()); } this.setLib(libMap); } } 第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下: "CREATE TABLE test.test(" + " type STRING," + " lib MAP<STRING,STRING>," + " properties MAP<STRING,STRING>" + ") PARTITIONED BY (" + " dt string" + " ) stored as orcfile " + " TBLPROPERTIES" + " (" + "'partition.time-extractor.kind'='custom'," + "'partition.time-extractor.timestamp-pattern'='$dt'," + "'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor'," + "'sink.partition-commit.trigger'='partition-time'," + "'sink.partition-commit.delay'='0s'," + "'sink.partition-commit.policy.kind'='metastore'" + ")"); 第三步,把临时表的数据insert into到目标表,此时出现异常: org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. 然后打印临时表的数据结构,发现lib和properties在临时表中数据结构被解析为: |-- lib: LEGACY('RAW', 'ANY<java.util.Map>') |-- properties: LEGACY('RAW', 'ANY<java.util.Map>') |-- track_id: BIGINT |-- type: STRING ,这说明lib LEGACY('RAW', 'ANY<java.util.Map>')无法匹配hive目标表中lib MAP<STRING,STRING>数据结构,写入失败,大概流程是这样。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by Benchao Li-2
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
{ "properties":{ "platformType":"APP", "$os":"iOS", "$screen_width":414, "$app_version":"1.0", "$is_first_day":false, "$model":"x86_64", "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3", "isLogin":false, "zrIdfa":"00000000-0000-0000-0000-000000000000", "$network_type":"WIFI", "$wifi":true, "$timezone_offset":-480, "$resume_from_background":false, "tdid":"", "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", "$screen_height":896, "$lib_version":"2.0.10", "$lib":"iOS", "$os_version":"13.4.1", "$manufacturer":"Apple", "$is_first_time":false, "$app_id":"Com.ziroom..ZRSensorsSDK" }, "type":"track", "lib":{ "$lib_version":"2.0.10", "$lib":"iOS", "$app_version":"1.0", "$lib_method":"autoTrack" } } 其中key为lib和properties的value是Json类型,其中字段可动态追加。 第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map<String,String>类型), @Data public static class CustomBuriedPointDTO { /** * 跟踪ID */ private Long track_id; /** * 事件时间 */ private Long event_time; /** * 类型 */ private String type; /** * 排重后Id */ private String distinct_id; /** * 匿名ID */ private String anonymous_id; /** * 包信息 */ private @DataTypeHint("RAW") Map<String, String> lib; /** * 事件 */ private String event; /** * 属性 */ // private Map<String, String> properties; private @DataTypeHint("RAW") Map<String, String> properties; /** * 刷新时间 */ private Long flush_time; /** * 事件日期 */ private String dt; /** * 封装数据对象中字段信息 */ public void assembly(CustomBuriedPointDO pointDO) { // 复制DO属性到DTO BeanUtils.copyProperties(pointDO, this); /* 转换特殊字段 */ // 设置分区日期 Long eventTimeLong = pointDO.getEvent_time(); if (eventTimeLong == null) { eventTimeLong = System.currentTimeMillis(); } Date eventTime = new Date(eventTimeLong); DateFormat dateFormatDate = new SimpleDateFormat("yyyy-MM-dd"); this.setDt(dateFormatDate.format(eventTime)); // json字段转换为Map类型 Map<String, String> propertiesMap = null; if (StringUtils.isNotBlank(pointDO.getProperties())) { propertiesMap = (Map<String, String>) JSON.parse(pointDO.getProperties()); } this.setProperties(propertiesMap); Map<String, String> libMap = null; if (StringUtils.isNotBlank(pointDO.getLib())) { libMap = (Map<String, String>) JSON.parse(pointDO.getLib()); } this.setLib(libMap); } } 第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下: "CREATE TABLE test.test(" + " type STRING," + " lib MAP<STRING,STRING>," + " properties MAP<STRING,STRING>" + ") PARTITIONED BY (" + " dt string" + " ) stored as orcfile " + " TBLPROPERTIES" + " (" + "'partition.time-extractor.kind'='custom'," + "'partition.time-extractor.timestamp-pattern'='$dt'," + "'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor'," + "'sink.partition-commit.trigger'='partition-time'," + "'sink.partition-commit.delay'='0s'," + "'sink.partition-commit.policy.kind'='metastore'" + ")"); 第三步,把临时表的数据insert into到目标表,此时出现异常: org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. 然后打印临时表的数据结构,发现lib和properties在临时表中数据结构被解析为: |-- lib: LEGACY('RAW', 'ANY<java.util.Map>') |-- properties: LEGACY('RAW', 'ANY<java.util.Map>') |-- track_id: BIGINT |-- type: STRING ,这说明lib LEGACY('RAW', 'ANY<java.util.Map>')无法匹配hive目标表中lib MAP<STRING,STRING>数据结构,写入失败,大概流程是这样。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by chuyuan
为什么要用DataStream解析之后再注册成table呢?
可以尝试下直接用DDL声明一个source,用内置的json format来解析。 chuyuan <[hidden email]> 于2020年9月21日周一 下午4:44写道: > 我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例: > { > "properties":{ > "platformType":"APP", > "$os":"iOS", > "$screen_width":414, > "$app_version":"1.0", > "$is_first_day":false, > "$model":"x86_64", > "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", > "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3", > "isLogin":false, > "zrIdfa":"00000000-0000-0000-0000-000000000000", > "$network_type":"WIFI", > "$wifi":true, > "$timezone_offset":-480, > "$resume_from_background":false, > "tdid":"", > "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", > "$screen_height":896, > "$lib_version":"2.0.10", > "$lib":"iOS", > "$os_version":"13.4.1", > "$manufacturer":"Apple", > "$is_first_time":false, > "$app_id":"Com.ziroom..ZRSensorsSDK" > }, > "type":"track", > "lib":{ > "$lib_version":"2.0.10", > "$lib":"iOS", > "$app_version":"1.0", > "$lib_method":"autoTrack" > } > } > 其中key为lib和properties的value是Json类型,其中字段可动态追加。 > > > 第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map<String,String>类型), > @Data > public static class CustomBuriedPointDTO { > /** > * 跟踪ID > */ > private Long track_id; > /** > * 事件时间 > */ > private Long event_time; > > /** > * 类型 > */ > private String type; > /** > * 排重后Id > */ > private String distinct_id; > /** > * 匿名ID > */ > private String anonymous_id; > /** > * 包信息 > */ > private @DataTypeHint("RAW") Map<String, String> lib; > /** > * 事件 > */ > private String event; > /** > * 属性 > */ > // private Map<String, String> properties; > private @DataTypeHint("RAW") Map<String, String> > properties; > /** > * 刷新时间 > */ > private Long flush_time; > /** > * 事件日期 > */ > private String dt; > > > /** > * 封装数据对象中字段信息 > */ > public void assembly(CustomBuriedPointDO pointDO) { > // 复制DO属性到DTO > BeanUtils.copyProperties(pointDO, this); > > /* > 转换特殊字段 > */ > // 设置分区日期 > Long eventTimeLong = pointDO.getEvent_time(); > if (eventTimeLong == null) { > eventTimeLong = System.currentTimeMillis(); > } > Date eventTime = new Date(eventTimeLong); > DateFormat dateFormatDate = new > SimpleDateFormat("yyyy-MM-dd"); > this.setDt(dateFormatDate.format(eventTime)); > > // json字段转换为Map类型 > Map<String, String> propertiesMap = null; > if > (StringUtils.isNotBlank(pointDO.getProperties())) > { > propertiesMap = (Map<String, String>) > JSON.parse(pointDO.getProperties()); > } > this.setProperties(propertiesMap); > Map<String, String> libMap = null; > if (StringUtils.isNotBlank(pointDO.getLib())) { > libMap = (Map<String, String>) > JSON.parse(pointDO.getLib()); > } > this.setLib(libMap); > } > } > > 第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下: > "CREATE TABLE test.test(" + > " type STRING," + > " lib MAP<STRING,STRING>," > + > " properties > MAP<STRING,STRING>" + > ") PARTITIONED BY (" + > " dt string" + > " ) stored as orcfile " + > " TBLPROPERTIES" + > " (" + > > "'partition.time-extractor.kind'='custom'," + > > "'partition.time-extractor.timestamp-pattern'='$dt'," + > > > "'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor'," > + > > "'sink.partition-commit.trigger'='partition-time'," + > > "'sink.partition-commit.delay'='0s'," + > > "'sink.partition-commit.policy.kind'='metastore'" + > ")"); > > > 第三步,把临时表的数据insert into到目标表,此时出现异常: > org.apache.flink.table.api.TableException: A raw type backed by type > information has no serializable string representation. It needs to be > resolved into a proper raw type. > > 然后打印临时表的数据结构,发现lib和properties在临时表中数据结构被解析为: > |-- lib: LEGACY('RAW', 'ANY<java.util.Map>') > |-- properties: LEGACY('RAW', 'ANY<java.util.Map>') > |-- track_id: BIGINT > |-- type: STRING > ,这说明lib LEGACY('RAW', 'ANY<java.util.Map>')无法匹配hive目标表中lib > MAP<STRING,STRING>数据结构,写入失败,大概流程是这样。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li |
好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据?
-- Sent from: http://apache-flink.147419.n8.nabble.com/ |
可以通过SQL的where条件来过滤吧
chuyuan <[hidden email]> 于2020年9月21日周一 下午6:48写道: > 好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li |
Free forum by Nabble | Edit this page |