[flink-SQL-API]如何将kafka中的json数据注册为表?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[flink-SQL-API]如何将kafka中的json数据注册为表?

猫猫
我在kafka中有如下的数据存储结构,我希望将一些字段放到表中,使用sql方式进行处理。


我需要将其转换为数组流,然后再注册吗?
还是我可以配置字段为:"key.database,value.data.user_id,value.data.org_id"这种方式直接注册成表?


也就是问,我需要装换json吗?


{
        "key": {
                "database": "test",
                "table": "jhi_user_org",
                "pk.user_id": 9522636,
                "pk.org_id": 10065286
        },
        "value": {
                "database": "test",
                "table": "jhi_user_org",
                "type": "insert",
                "ts": 1576465904,
                "xid": 539057,
                "commit": true,
                "data": {
                        "user_id": 9522636,
                        "org_id": 10065286
                }
        },
        "metadata": {
                "offset": 760,
                "topic": "TEST_BINLOG_ALL",
                "partition": 0
        }
}