Flink消费kafka中json数据,其中有个value是Json类型,写入Hive表Map结构异常

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink消费kafka中json数据,其中有个value是Json类型,写入Hive表Map结构异常

chuyuan
hello,大婶们,Flink消费kafka中json数据,示例:
{
        "properties":{
            "platformType":"APP",
            "$os":"iOS",
            "$screen_width":414,
            "$app_version":"1.0",
            "$is_first_day":false,
            "$model":"x86_64",
            "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
            "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
            "isLogin":false,
            "zrIdfa":"00000000-0000-0000-0000-000000000000",
            "$network_type":"WIFI",
            "$wifi":true,
            "$timezone_offset":-480,
            "$resume_from_background":false,
            "tdid":"",
            "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
            "$screen_height":896,
            "$lib_version":"2.0.10",
            "$lib":"iOS",
            "$os_version":"13.4.1",
            "$manufacturer":"Apple",
            "$is_first_time":false,
            "$app_id":"Com.ziroom..ZRSensorsSDK"
        },
        "type":"track",
        "lib":{
            "$lib_version":"2.0.10",
            "$lib":"iOS",
            "$app_version":"1.0",
            "$lib_method":"autoTrack"
        }
    }
其中key为lib和properties的value是Json类型,其中字段可动态追加,
然后我把Json传封装为DO,最后为了转换lib和properties的value为Map<String,String>,转换成了DTO,
        @Data
        public static class CustomBuriedPointDTO {
                /**
                 * 跟踪ID
                 */
                private Long track_id;
                /**
                 * 事件时间
                 */
                private Long event_time;

                /**
                 * 类型
                 */
                private String type;
                /**
                 * 排重后Id
                 */
                private String distinct_id;
                /**
                 * 匿名ID
                 */
                private String anonymous_id;
                /**
                 * 包信息
                 */
                private @DataTypeHint("RAW") Map<String, String> lib;
                /**
                 * 事件
                 */
                private String event;
                /**
                 * 属性
                 */
// private Map<String, String> properties;
                private @DataTypeHint("RAW") Map<String, String> properties;
                /**
                 * 刷新时间
                 */
                private Long flush_time;
                /**
                 * 事件日期
                 */
                private String dt;
       

                /**
                 * 封装数据对象中字段信息
                 */
                public void assembly(CustomBuriedPointDO pointDO) {
                        // 复制DO属性到DTO
                        BeanUtils.copyProperties(pointDO, this);

                        /*
                                转换特殊字段
                         */
                        // 设置分区日期
                        Long eventTimeLong = pointDO.getEvent_time();
                        if (eventTimeLong == null) {
                                eventTimeLong = System.currentTimeMillis();
                        }
                        Date eventTime = new Date(eventTimeLong);
                        DateFormat dateFormatDate = new SimpleDateFormat("yyyy-MM-dd");
                        this.setDt(dateFormatDate.format(eventTime));

                        // json字段转换为Map类型
                        Map<String, String> propertiesMap = null;
                        if (StringUtils.isNotBlank(pointDO.getProperties())) {
                                propertiesMap = (Map<String, String>)
JSON.parse(pointDO.getProperties());
                        }
                        this.setProperties(propertiesMap);
                        Map<String, String> libMap = null;
                        if (StringUtils.isNotBlank(pointDO.getLib())) {
                                libMap = (Map<String, String>) JSON.parse(pointDO.getLib());
                        }
                        this.setLib(libMap);
                }
        }
,然后把DataStream转成了Hive临时表,最后写入Hive表,hive表定义如下:
"CREATE TABLE test.test(" +
                                                "  type STRING," +
                                                "  lib MAP<STRING,STRING>," +
                                                "  properties MAP<STRING,STRING>" +
                                                ") PARTITIONED BY (" +
                                                "  dt string" +
                                                " ) stored as orcfile " +
                                                " TBLPROPERTIES" +
                                                " (" +
                                                "'partition.time-extractor.kind'='custom'," +
                                                "'partition.time-extractor.timestamp-pattern'='$dt'," +
                                       
"'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
+
                                                "'sink.partition-commit.trigger'='partition-time'," +
                                                "'sink.partition-commit.delay'='0s'," +
                                                "'sink.partition-commit.policy.kind'='metastore'" +
                                                ")");

结果异常:
org.apache.flink.table.api.TableException: A raw type backed by type
information has no serializable string representation. It needs to be
resolved into a proper raw type.
,然后打印临时表的数据结构,发现lib和properties在临时表中数据结构为:
 |-- lib: LEGACY('RAW', 'ANY<java.util.Map>')
 |-- properties: LEGACY('RAW', 'ANY<java.util.Map>')
 |-- track_id: BIGINT
 |-- type: STRING
,说明lib LEGACY('RAW', 'ANY<java.util.Map>')无法匹配lib
MAP<STRING,STRING>,写入失败,有遇到类似问题的吗,求解答。



--
Sent from: http://apache-flink.147419.n8.nabble.com/