kafka connector从指定timestamp开始消费

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

kafka connector从指定timestamp开始消费

Kyle Zhang
Hi,
        kafka connector ddl能不能像flinkKafkaConsumer.setStartFromTimestamp(xx)一样从指定timestamp开始消费,我看文档里只提到了earliest-offset,latest-offset,group-offsets,specific-offsets

CREATE TABLE MyUserTable (
  ...
) WITH (
  'connector.type' = 'kafka',      

  'connector.version' = '0.11',     -- required: valid connector versions are
                                    -- "0.8", "0.9", "0.10", "0.11", and "universal"

  'connector.topic' = 'topic_name', -- required: topic name from which the table is read

  'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the ZooKeeper connection string
  'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: specify the Kafka server connection string
  'connector.properties.group.id' = 'testGroup', --optional: required in Kafka consumer, specify consumer group
  'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes are "earliest-offset",
                                                   -- "latest-offset", "group-offsets",
                                                   -- or "specific-offsets"

  -- optional: used in case of startup mode with specific offsets
  'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',

  'connector.sink-partitioner' = '...',  -- optional: output partitioning from Flink's partitions
                                         -- into Kafka's partitions valid are "fixed"
                                         -- (each Flink partition ends up in at most one Kafka partition),
                                         -- "round-robin" (a Flink partition is distributed to
                                         -- Kafka partitions round-robin)
                                         -- "custom" (use a custom FlinkKafkaPartitioner subclass)
  -- optional: used in case of sink partitioner custom
  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
 
  'format.type' = '...',                 -- required: Kafka connector requires to specify a format,
  ...                                    -- the supported formats are 'csv', 'json' and 'avro'.
                                         -- Please refer to Table Formats section for more details.
)

Reply | Threaded
Open this post in threaded view
|

Re:kafka connector从指定timestamp开始消费

Meng Wang
hi,这个功能目前已经在 Flink 中实现了,参考 [1],1.11.0 开始支持


[1]. https://issues.apache.org/jira/browse/FLINK-15220;




---
Best,
Matt Wang


On 06/12/2020 10:37,Kyle Zhang<[hidden email]> wrote:
Hi,
kafka connector ddl能不能像flinkKafkaConsumer.setStartFromTimestamp(xx)一样从指定timestamp开始消费,我看文档里只提到了earliest-offset,latest-offset,group-offsets,specific-offsets

CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'kafka',      

'connector.version' = '0.11',     -- required: valid connector versions are
-- "0.8", "0.9", "0.10", "0.11", and "universal"

'connector.topic' = 'topic_name', -- required: topic name from which the table is read

'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the ZooKeeper connection string
'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: specify the Kafka server connection string
'connector.properties.group.id' = 'testGroup', --optional: required in Kafka consumer, specify consumer group
'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes are "earliest-offset",
-- "latest-offset", "group-offsets",
-- or "specific-offsets"

-- optional: used in case of startup mode with specific offsets
'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',

'connector.sink-partitioner' = '...',  -- optional: output partitioning from Flink's partitions
-- into Kafka's partitions valid are "fixed"
-- (each Flink partition ends up in at most one Kafka partition),
-- "round-robin" (a Flink partition is distributed to
-- Kafka partitions round-robin)
-- "custom" (use a custom FlinkKafkaPartitioner subclass)
-- optional: used in case of sink partitioner custom
'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',

'format.type' = '...',                 -- required: Kafka connector requires to specify a format,
...                                    -- the supported formats are 'csv', 'json' and 'avro'.
-- Please refer to Table Formats section for more details.
)