Hi,
kafka connector ddl能不能像flinkKafkaConsumer.setStartFromTimestamp(xx)一样从指定timestamp开始消费,我看文档里只提到了earliest-offset,latest-offset,group-offsets,specific-offsets CREATE TABLE MyUserTable ( ... ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', -- required: valid connector versions are -- "0.8", "0.9", "0.10", "0.11", and "universal" 'connector.topic' = 'topic_name', -- required: topic name from which the table is read 'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the ZooKeeper connection string 'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: specify the Kafka server connection string 'connector.properties.group.id' = 'testGroup', --optional: required in Kafka consumer, specify consumer group 'connector.startup-mode' = 'earliest-offset', -- optional: valid modes are "earliest-offset", -- "latest-offset", "group-offsets", -- or "specific-offsets" -- optional: used in case of startup mode with specific offsets 'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300', 'connector.sink-partitioner' = '...', -- optional: output partitioning from Flink's partitions -- into Kafka's partitions valid are "fixed" -- (each Flink partition ends up in at most one Kafka partition), -- "round-robin" (a Flink partition is distributed to -- Kafka partitions round-robin) -- "custom" (use a custom FlinkKafkaPartitioner subclass) -- optional: used in case of sink partitioner custom 'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner', 'format.type' = '...', -- required: Kafka connector requires to specify a format, ... -- the supported formats are 'csv', 'json' and 'avro'. -- Please refer to Table Formats section for more details. ) |
hi,这个功能目前已经在 Flink 中实现了,参考 [1],1.11.0 开始支持
[1]. https://issues.apache.org/jira/browse/FLINK-15220; --- Best, Matt Wang On 06/12/2020 10:37,Kyle Zhang<[hidden email]> wrote: Hi, kafka connector ddl能不能像flinkKafkaConsumer.setStartFromTimestamp(xx)一样从指定timestamp开始消费,我看文档里只提到了earliest-offset,latest-offset,group-offsets,specific-offsets CREATE TABLE MyUserTable ( ... ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', -- required: valid connector versions are -- "0.8", "0.9", "0.10", "0.11", and "universal" 'connector.topic' = 'topic_name', -- required: topic name from which the table is read 'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the ZooKeeper connection string 'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: specify the Kafka server connection string 'connector.properties.group.id' = 'testGroup', --optional: required in Kafka consumer, specify consumer group 'connector.startup-mode' = 'earliest-offset', -- optional: valid modes are "earliest-offset", -- "latest-offset", "group-offsets", -- or "specific-offsets" -- optional: used in case of startup mode with specific offsets 'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300', 'connector.sink-partitioner' = '...', -- optional: output partitioning from Flink's partitions -- into Kafka's partitions valid are "fixed" -- (each Flink partition ends up in at most one Kafka partition), -- "round-robin" (a Flink partition is distributed to -- Kafka partitions round-robin) -- "custom" (use a custom FlinkKafkaPartitioner subclass) -- optional: used in case of sink partitioner custom 'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner', 'format.type' = '...', -- required: Kafka connector requires to specify a format, ... -- the supported formats are 'csv', 'json' and 'avro'. -- Please refer to Table Formats section for more details. ) |
Free forum by Nabble | Edit this page |