在FlinkKafkaProducer获取sink表的建表key

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

在FlinkKafkaProducer获取sink表的建表key

Jimmy Zhang
Hi!大家好。
目前有一个需求,需要获取Kafka sink表的所有建表字段,而且需要在FlinkKafkaProducer中进行操作,看了源码,没有找到获取这个信息的接口,大家有知道的吗?非常感谢!
例如:CREATE TABLE kafkaTable (

 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'scan.startup.mode' = 'earliest-offset'
)
想获取到   user_id, item_id ,category_id ,behavior这四个字段。


| |
Jimmy Zhang
|
|
[hidden email]
|
签名由网易邮箱大师定制
Reply | Threaded
Open this post in threaded view
|

Re: 在FlinkKafkaProducer获取sink表的建表key

Qishang Zhong
Hi Jimmy.
FlinkKafkaProducer 里面是没有的,可以试着从  KafkaDynamicSink 里面传到 FlinkKafkaProducer
中,org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink#physicalDataType
这个里面可以拿到

Jimmy Zhang <[hidden email]> 于2021年3月18日周四 上午10:40写道:

> Hi!大家好。
> 目前有一个需求,需要获取Kafka
> sink表的所有建表字段,而且需要在FlinkKafkaProducer中进行操作,看了源码,没有找到获取这个信息的接口,大家有知道的吗?非常感谢!
> 例如:CREATE TABLE kafkaTable (
>
>  user_id BIGINT,
>  item_id BIGINT,
>  category_id BIGINT,
>  behavior STRING,
>  ts TIMESTAMP(3)
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'csv',
>  'scan.startup.mode' = 'earliest-offset'
> )
> 想获取到   user_id, item_id ,category_id ,behavior这四个字段。
>
>
> | |
> Jimmy Zhang
> |
> |
> [hidden email]
> |
> 签名由网易邮箱大师定制
Reply | Threaded
Open this post in threaded view
|

回复:在FlinkKafkaProducer获取sink表的建表key

小鲲鹏
退订


| |
小鲲鹏
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2021年03月29日 11:26,Qishang 写道:
Hi Jimmy.
FlinkKafkaProducer 里面是没有的,可以试着从  KafkaDynamicSink 里面传到 FlinkKafkaProducer
中,org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink#physicalDataType
这个里面可以拿到

Jimmy Zhang <[hidden email]> 于2021年3月18日周四 上午10:40写道:

> Hi!大家好。
> 目前有一个需求,需要获取Kafka
> sink表的所有建表字段,而且需要在FlinkKafkaProducer中进行操作,看了源码,没有找到获取这个信息的接口,大家有知道的吗?非常感谢!
> 例如:CREATE TABLE kafkaTable (
>
>  user_id BIGINT,
>  item_id BIGINT,
>  category_id BIGINT,
>  behavior STRING,
>  ts TIMESTAMP(3)
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'csv',
>  'scan.startup.mode' = 'earliest-offset'
> )
> 想获取到   user_id, item_id ,category_id ,behavior这四个字段。
>
>
> | |
> Jimmy Zhang
> |
> |
> [hidden email]
> |
> 签名由网易邮箱大师定制