Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

Zhao,Yi(SEC)
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:

    这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
    此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?

        stEnv.connect(
                new Kafka()
                        .properties(TestKafkaUtils.getKafkaProperties())
                        .version("universal")
                        .topic("test")
                        .startFromLatest()
        ).withFormat(new Json()
                .failOnMissingField(false)
        ).withSchema(
                new Schema()
                        .field("d", TypeInformation.of(Map.class))
        ).inAppendMode().createTemporaryTable("t");

    其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 'ANY<java.util.Map>'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
    root
    |-- d: LEGACY('RAW', 'ANY<java.util.Map>')
   

在 2020/8/11 下午4:23,“zhao liang”<[hidden email]> 写入:

    Hi,你图挂了,换个图床试试呢
   
    发件人: Zhao,Yi(SEC) <[hidden email]>
    日期: 星期二, 2020年8月11日 16:04
    收件人: [hidden email] <[hidden email]>
    主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题
    刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
    [cid:image001.png@01D66FF8.F697E2D0]
    这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
    此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?
   
   
    其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 'ANY<java.util.Map>'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
    root
    |-- d: LEGACY('RAW', 'ANY<java.util.Map>')
   

Reply | Threaded
Open this post in threaded view
|

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

Benchao Li-2
Hi,

目前Json Format的实现就是假设json最外层是一个json object,暂时还无法做到顶层的所有字段无限扩展。

如果是在SQL里面,可以直接定义成map类型就可以,比如:
```SQL
CREATE TABLE source (
  d MAP<VARCHAR, VARCHAR>
) WITH (...)
```

Zhao,Yi(SEC) <[hidden email]> 于2020年8月11日周二 下午4:58写道:

> 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
>
>     这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
>     此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?
>
>         stEnv.connect(
>                 new Kafka()
>                         .properties(TestKafkaUtils.getKafkaProperties())
>                         .version("universal")
>                         .topic("test")
>                         .startFromLatest()
>         ).withFormat(new Json()
>                 .failOnMissingField(false)
>         ).withSchema(
>                 new Schema()
>                         .field("d", TypeInformation.of(Map.class))
>         ).inAppendMode().createTemporaryTable("t");
>
>     其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW',
> 'ANY<java.util.Map>'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
>     root
>     |-- d: LEGACY('RAW', 'ANY<java.util.Map>')
>
>
> 在 2020/8/11 下午4:23,“zhao liang”<[hidden email]> 写入:
>
>     Hi,你图挂了,换个图床试试呢
>
>     发件人: Zhao,Yi(SEC) <[hidden email]>
>     日期: 星期二, 2020年8月11日 16:04
>     收件人: [hidden email] <[hidden email]>
>     主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题
>     刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
>     [cid:image001.png@01D66FF8.F697E2D0]
>     这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
>     此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?
>
>
>     其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW',
> 'ANY<java.util.Map>'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
>     root
>     |-- d: LEGACY('RAW', 'ANY<java.util.Map>')
>
>
>

--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

chuyuan
 LEGACY('RAW', 'ANY<java.util.Map>')对应sql中数据类型改为:MAP<STRING,STRING>,仍然报错,异常:
org.apache.flink.table.api.TableException: A raw type backed by type
information has no serializable string representation. It needs to be
resolved into a proper raw type.
方便说下具体实现细节吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

Benchao Li-2
Hi chuyuan,

可以详细描述下你遇到的问题么,比如下面这些信息
- 用的是哪个Flink版本
- SQL(包括DDL和query)
- 数据是什么样子的

chuyuan <[hidden email]> 于2020年9月21日周一 下午2:40写道:

>  LEGACY('RAW',
> 'ANY<java.util.Map>')对应sql中数据类型改为:MAP<STRING,STRING>,仍然报错,异常:
> org.apache.flink.table.api.TableException: A raw type backed by type
> information has no serializable string representation. It needs to be
> resolved into a proper raw type.
> 方便说下具体实现细节吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

chuyuan
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
{
        "properties":{
            "platformType":"APP",
            "$os":"iOS",
            "$screen_width":414,
            "$app_version":"1.0",
            "$is_first_day":false,
            "$model":"x86_64",
            "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
            "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
            "isLogin":false,
            "zrIdfa":"00000000-0000-0000-0000-000000000000",
            "$network_type":"WIFI",
            "$wifi":true,
            "$timezone_offset":-480,
            "$resume_from_background":false,
            "tdid":"",
            "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
            "$screen_height":896,
            "$lib_version":"2.0.10",
            "$lib":"iOS",
            "$os_version":"13.4.1",
            "$manufacturer":"Apple",
            "$is_first_time":false,
            "$app_id":"Com.ziroom..ZRSensorsSDK"
        },
        "type":"track",
        "lib":{
            "$lib_version":"2.0.10",
            "$lib":"iOS",
            "$app_version":"1.0",
            "$lib_method":"autoTrack"
        }
    }
其中key为lib和properties的value是Json类型,其中字段可动态追加。

第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map<String,String>类型),
        @Data
        public static class CustomBuriedPointDTO {
                /**
                 * 跟踪ID
                 */
                private Long track_id;
                /**
                 * 事件时间
                 */
                private Long event_time;

                /**
                 * 类型
                 */
                private String type;
                /**
                 * 排重后Id
                 */
                private String distinct_id;
                /**
                 * 匿名ID
                 */
                private String anonymous_id;
                /**
                 * 包信息
                 */
                private @DataTypeHint("RAW") Map<String, String> lib;
                /**
                 * 事件
                 */
                private String event;
                /**
                 * 属性
                 */
// private Map<String, String> properties;
                private @DataTypeHint("RAW") Map<String, String> properties;
                /**
                 * 刷新时间
                 */
                private Long flush_time;
                /**
                 * 事件日期
                 */
                private String dt;
       

                /**
                 * 封装数据对象中字段信息
                 */
                public void assembly(CustomBuriedPointDO pointDO) {
                        // 复制DO属性到DTO
                        BeanUtils.copyProperties(pointDO, this);

                        /*
                                转换特殊字段
                         */
                        // 设置分区日期
                        Long eventTimeLong = pointDO.getEvent_time();
                        if (eventTimeLong == null) {
                                eventTimeLong = System.currentTimeMillis();
                        }
                        Date eventTime = new Date(eventTimeLong);
                        DateFormat dateFormatDate = new
SimpleDateFormat("yyyy-MM-dd");
                        this.setDt(dateFormatDate.format(eventTime));

                        // json字段转换为Map类型
                        Map<String, String> propertiesMap = null;
                        if (StringUtils.isNotBlank(pointDO.getProperties()))
{
                                propertiesMap = (Map<String, String>)
JSON.parse(pointDO.getProperties());
                        }
                        this.setProperties(propertiesMap);
                        Map<String, String> libMap = null;
                        if (StringUtils.isNotBlank(pointDO.getLib())) {
                                libMap = (Map<String, String>)
JSON.parse(pointDO.getLib());
                        }
                        this.setLib(libMap);
                }
        }

第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下:
"CREATE TABLE test.test(" +
                                                "  type STRING," +
                                                "  lib MAP<STRING,STRING>,"
+
                                                "  properties
MAP<STRING,STRING>" +
                                                ") PARTITIONED BY (" +
                                                "  dt string" +
                                                " ) stored as orcfile " +
                                                " TBLPROPERTIES" +
                                                " (" +
                                               
"'partition.time-extractor.kind'='custom'," +
                                               
"'partition.time-extractor.timestamp-pattern'='$dt'," +
                                       
"'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
+
                                               
"'sink.partition-commit.trigger'='partition-time'," +
                                               
"'sink.partition-commit.delay'='0s'," +
                                               
"'sink.partition-commit.policy.kind'='metastore'" +
                                                ")");


第三步,把临时表的数据insert into到目标表,此时出现异常:
org.apache.flink.table.api.TableException: A raw type backed by type
information has no serializable string representation. It needs to be
resolved into a proper raw type.

然后打印临时表的数据结构,发现lib和properties在临时表中数据结构被解析为:
 |-- lib: LEGACY('RAW', 'ANY<java.util.Map>')
 |-- properties: LEGACY('RAW', 'ANY<java.util.Map>')
 |-- track_id: BIGINT
 |-- type: STRING
,这说明lib LEGACY('RAW', 'ANY<java.util.Map>')无法匹配hive目标表中lib
MAP<STRING,STRING>数据结构,写入失败,大概流程是这样。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

chuyuan
In reply to this post by Benchao Li-2
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
{
        "properties":{
            "platformType":"APP",
            "$os":"iOS",
            "$screen_width":414,
            "$app_version":"1.0",
            "$is_first_day":false,
            "$model":"x86_64",
            "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
            "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
            "isLogin":false,
            "zrIdfa":"00000000-0000-0000-0000-000000000000",
            "$network_type":"WIFI",
            "$wifi":true,
            "$timezone_offset":-480,
            "$resume_from_background":false,
            "tdid":"",
            "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
            "$screen_height":896,
            "$lib_version":"2.0.10",
            "$lib":"iOS",
            "$os_version":"13.4.1",
            "$manufacturer":"Apple",
            "$is_first_time":false,
            "$app_id":"Com.ziroom..ZRSensorsSDK"
        },
        "type":"track",
        "lib":{
            "$lib_version":"2.0.10",
            "$lib":"iOS",
            "$app_version":"1.0",
            "$lib_method":"autoTrack"
        }
    }
其中key为lib和properties的value是Json类型,其中字段可动态追加。

第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map<String,String>类型),
        @Data
        public static class CustomBuriedPointDTO {
                /**
                 * 跟踪ID
                 */
                private Long track_id;
                /**
                 * 事件时间
                 */
                private Long event_time;

                /**
                 * 类型
                 */
                private String type;
                /**
                 * 排重后Id
                 */
                private String distinct_id;
                /**
                 * 匿名ID
                 */
                private String anonymous_id;
                /**
                 * 包信息
                 */
                private @DataTypeHint("RAW") Map<String, String> lib;
                /**
                 * 事件
                 */
                private String event;
                /**
                 * 属性
                 */
// private Map<String, String> properties;
                private @DataTypeHint("RAW") Map<String, String> properties;
                /**
                 * 刷新时间
                 */
                private Long flush_time;
                /**
                 * 事件日期
                 */
                private String dt;
       

                /**
                 * 封装数据对象中字段信息
                 */
                public void assembly(CustomBuriedPointDO pointDO) {
                        // 复制DO属性到DTO
                        BeanUtils.copyProperties(pointDO, this);

                        /*
                                转换特殊字段
                         */
                        // 设置分区日期
                        Long eventTimeLong = pointDO.getEvent_time();
                        if (eventTimeLong == null) {
                                eventTimeLong = System.currentTimeMillis();
                        }
                        Date eventTime = new Date(eventTimeLong);
                        DateFormat dateFormatDate = new
SimpleDateFormat("yyyy-MM-dd");
                        this.setDt(dateFormatDate.format(eventTime));

                        // json字段转换为Map类型
                        Map<String, String> propertiesMap = null;
                        if (StringUtils.isNotBlank(pointDO.getProperties()))
{
                                propertiesMap = (Map<String, String>)
JSON.parse(pointDO.getProperties());
                        }
                        this.setProperties(propertiesMap);
                        Map<String, String> libMap = null;
                        if (StringUtils.isNotBlank(pointDO.getLib())) {
                                libMap = (Map<String, String>)
JSON.parse(pointDO.getLib());
                        }
                        this.setLib(libMap);
                }
        }

第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下:
"CREATE TABLE test.test(" +
                                                "  type STRING," +
                                                "  lib MAP<STRING,STRING>,"
+
                                                "  properties
MAP<STRING,STRING>" +
                                                ") PARTITIONED BY (" +
                                                "  dt string" +
                                                " ) stored as orcfile " +
                                                " TBLPROPERTIES" +
                                                " (" +
                                               
"'partition.time-extractor.kind'='custom'," +
                                               
"'partition.time-extractor.timestamp-pattern'='$dt'," +
                                       
"'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
+
                                               
"'sink.partition-commit.trigger'='partition-time'," +
                                               
"'sink.partition-commit.delay'='0s'," +
                                               
"'sink.partition-commit.policy.kind'='metastore'" +
                                                ")");


第三步,把临时表的数据insert into到目标表,此时出现异常:
org.apache.flink.table.api.TableException: A raw type backed by type
information has no serializable string representation. It needs to be
resolved into a proper raw type.

然后打印临时表的数据结构,发现lib和properties在临时表中数据结构被解析为:
 |-- lib: LEGACY('RAW', 'ANY<java.util.Map>')
 |-- properties: LEGACY('RAW', 'ANY<java.util.Map>')
 |-- track_id: BIGINT
 |-- type: STRING
,这说明lib LEGACY('RAW', 'ANY<java.util.Map>')无法匹配hive目标表中lib
MAP<STRING,STRING>数据结构,写入失败,大概流程是这样。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

Benchao Li-2
In reply to this post by chuyuan
为什么要用DataStream解析之后再注册成table呢?
可以尝试下直接用DDL声明一个source,用内置的json format来解析。

chuyuan <[hidden email]> 于2020年9月21日周一 下午4:44写道:

> 我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
> {
>         "properties":{
>             "platformType":"APP",
>             "$os":"iOS",
>             "$screen_width":414,
>             "$app_version":"1.0",
>             "$is_first_day":false,
>             "$model":"x86_64",
>             "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
>             "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
>             "isLogin":false,
>             "zrIdfa":"00000000-0000-0000-0000-000000000000",
>             "$network_type":"WIFI",
>             "$wifi":true,
>             "$timezone_offset":-480,
>             "$resume_from_background":false,
>             "tdid":"",
>             "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
>             "$screen_height":896,
>             "$lib_version":"2.0.10",
>             "$lib":"iOS",
>             "$os_version":"13.4.1",
>             "$manufacturer":"Apple",
>             "$is_first_time":false,
>             "$app_id":"Com.ziroom..ZRSensorsSDK"
>         },
>         "type":"track",
>         "lib":{
>             "$lib_version":"2.0.10",
>             "$lib":"iOS",
>             "$app_version":"1.0",
>             "$lib_method":"autoTrack"
>         }
>     }
> 其中key为lib和properties的value是Json类型,其中字段可动态追加。
>
>
> 第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map<String,String>类型),
>         @Data
>         public static class CustomBuriedPointDTO {
>                 /**
>                  * 跟踪ID
>                  */
>                 private Long track_id;
>                 /**
>                  * 事件时间
>                  */
>                 private Long event_time;
>
>                 /**
>                  * 类型
>                  */
>                 private String type;
>                 /**
>                  * 排重后Id
>                  */
>                 private String distinct_id;
>                 /**
>                  * 匿名ID
>                  */
>                 private String anonymous_id;
>                 /**
>                  * 包信息
>                  */
>                 private @DataTypeHint("RAW") Map<String, String> lib;
>                 /**
>                  * 事件
>                  */
>                 private String event;
>                 /**
>                  * 属性
>                  */
> // private Map<String, String> properties;
>                 private @DataTypeHint("RAW") Map<String, String>
> properties;
>                 /**
>                  * 刷新时间
>                  */
>                 private Long flush_time;
>                 /**
>                  * 事件日期
>                  */
>                 private String dt;
>
>
>                 /**
>                  * 封装数据对象中字段信息
>                  */
>                 public void assembly(CustomBuriedPointDO pointDO) {
>                         // 复制DO属性到DTO
>                         BeanUtils.copyProperties(pointDO, this);
>
>                         /*
>                                 转换特殊字段
>                          */
>                         // 设置分区日期
>                         Long eventTimeLong = pointDO.getEvent_time();
>                         if (eventTimeLong == null) {
>                                 eventTimeLong = System.currentTimeMillis();
>                         }
>                         Date eventTime = new Date(eventTimeLong);
>                         DateFormat dateFormatDate = new
> SimpleDateFormat("yyyy-MM-dd");
>                         this.setDt(dateFormatDate.format(eventTime));
>
>                         // json字段转换为Map类型
>                         Map<String, String> propertiesMap = null;
>                         if
> (StringUtils.isNotBlank(pointDO.getProperties()))
> {
>                                 propertiesMap = (Map<String, String>)
> JSON.parse(pointDO.getProperties());
>                         }
>                         this.setProperties(propertiesMap);
>                         Map<String, String> libMap = null;
>                         if (StringUtils.isNotBlank(pointDO.getLib())) {
>                                 libMap = (Map<String, String>)
> JSON.parse(pointDO.getLib());
>                         }
>                         this.setLib(libMap);
>                 }
>         }
>
> 第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下:
> "CREATE TABLE test.test(" +
>                                                 "  type STRING," +
>                                                 "  lib MAP<STRING,STRING>,"
> +
>                                                 "  properties
> MAP<STRING,STRING>" +
>                                                 ") PARTITIONED BY (" +
>                                                 "  dt string" +
>                                                 " ) stored as orcfile " +
>                                                 " TBLPROPERTIES" +
>                                                 " (" +
>
> "'partition.time-extractor.kind'='custom'," +
>
> "'partition.time-extractor.timestamp-pattern'='$dt'," +
>
>
> "'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
> +
>
> "'sink.partition-commit.trigger'='partition-time'," +
>
> "'sink.partition-commit.delay'='0s'," +
>
> "'sink.partition-commit.policy.kind'='metastore'" +
>                                                 ")");
>
>
> 第三步,把临时表的数据insert into到目标表,此时出现异常:
> org.apache.flink.table.api.TableException: A raw type backed by type
> information has no serializable string representation. It needs to be
> resolved into a proper raw type.
>
> 然后打印临时表的数据结构,发现lib和properties在临时表中数据结构被解析为:
>  |-- lib: LEGACY('RAW', 'ANY<java.util.Map>')
>  |-- properties: LEGACY('RAW', 'ANY<java.util.Map>')
>  |-- track_id: BIGINT
>  |-- type: STRING
> ,这说明lib LEGACY('RAW', 'ANY<java.util.Map>')无法匹配hive目标表中lib
> MAP<STRING,STRING>数据结构,写入失败,大概流程是这样。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

chuyuan
好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

Benchao Li-2
可以通过SQL的where条件来过滤吧

chuyuan <[hidden email]> 于2020年9月21日周一 下午6:48写道:

> 好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

chuyuan
好勒,这种方案已经成功了,非常感谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/