hello,大婶们,Flink消费kafka中json数据,示例:
{ "properties":{ "platformType":"APP", "$os":"iOS", "$screen_width":414, "$app_version":"1.0", "$is_first_day":false, "$model":"x86_64", "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3", "isLogin":false, "zrIdfa":"00000000-0000-0000-0000-000000000000", "$network_type":"WIFI", "$wifi":true, "$timezone_offset":-480, "$resume_from_background":false, "tdid":"", "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", "$screen_height":896, "$lib_version":"2.0.10", "$lib":"iOS", "$os_version":"13.4.1", "$manufacturer":"Apple", "$is_first_time":false, "$app_id":"Com.ziroom..ZRSensorsSDK" }, "type":"track", "lib":{ "$lib_version":"2.0.10", "$lib":"iOS", "$app_version":"1.0", "$lib_method":"autoTrack" } } 其中key为lib和properties的value是Json类型,其中字段可动态追加, 然后我把Json传封装为DO,最后为了转换lib和properties的value为Map<String,String>,转换成了DTO, @Data public static class CustomBuriedPointDTO { /** * 跟踪ID */ private Long track_id; /** * 事件时间 */ private Long event_time; /** * 类型 */ private String type; /** * 排重后Id */ private String distinct_id; /** * 匿名ID */ private String anonymous_id; /** * 包信息 */ private @DataTypeHint("RAW") Map<String, String> lib; /** * 事件 */ private String event; /** * 属性 */ // private Map<String, String> properties; private @DataTypeHint("RAW") Map<String, String> properties; /** * 刷新时间 */ private Long flush_time; /** * 事件日期 */ private String dt; /** * 封装数据对象中字段信息 */ public void assembly(CustomBuriedPointDO pointDO) { // 复制DO属性到DTO BeanUtils.copyProperties(pointDO, this); /* 转换特殊字段 */ // 设置分区日期 Long eventTimeLong = pointDO.getEvent_time(); if (eventTimeLong == null) { eventTimeLong = System.currentTimeMillis(); } Date eventTime = new Date(eventTimeLong); DateFormat dateFormatDate = new SimpleDateFormat("yyyy-MM-dd"); this.setDt(dateFormatDate.format(eventTime)); // json字段转换为Map类型 Map<String, String> propertiesMap = null; if (StringUtils.isNotBlank(pointDO.getProperties())) { propertiesMap = (Map<String, String>) JSON.parse(pointDO.getProperties()); } this.setProperties(propertiesMap); Map<String, String> libMap = null; if (StringUtils.isNotBlank(pointDO.getLib())) { libMap = (Map<String, String>) JSON.parse(pointDO.getLib()); } this.setLib(libMap); } } ,然后把DataStream转成了Hive临时表,最后写入Hive表,hive表定义如下: "CREATE TABLE test.test(" + " type STRING," + " lib MAP<STRING,STRING>," + " properties MAP<STRING,STRING>" + ") PARTITIONED BY (" + " dt string" + " ) stored as orcfile " + " TBLPROPERTIES" + " (" + "'partition.time-extractor.kind'='custom'," + "'partition.time-extractor.timestamp-pattern'='$dt'," + "'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor'," + "'sink.partition-commit.trigger'='partition-time'," + "'sink.partition-commit.delay'='0s'," + "'sink.partition-commit.policy.kind'='metastore'" + ")"); 结果异常: org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. ,然后打印临时表的数据结构,发现lib和properties在临时表中数据结构为: |-- lib: LEGACY('RAW', 'ANY<java.util.Map>') |-- properties: LEGACY('RAW', 'ANY<java.util.Map>') |-- track_id: BIGINT |-- type: STRING ,说明lib LEGACY('RAW', 'ANY<java.util.Map>')无法匹配lib MAP<STRING,STRING>,写入失败,有遇到类似问题的吗,求解答。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |