flink sql消费kafka sink到mysql问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql消费kafka sink到mysql问题

air23
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
然后再重启 发现报错的数据 会丢失
采用的scan.startup.mode' = 'group-offsets'
按理说 不是要重新消费 失败的那条数据 开始消费吗?
请问如何配置 可以不丢失数据


CREATE TABLE source1 (
id BIGINT           ,
username STRING     ,
password STRING      ,
AddTime TIMESTAMP      ,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'plink_canal',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'canal1',
'scan.startup.mode' = 'group-offsets',
'canal-json.table.include' = 'test.*',
-- 'canal-json.ignore-parse-errors' = 'true', -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
'format' = 'canal-json'
);
Reply | Threaded
Open this post in threaded view
|

回复: flink sql消费kafka sink到mysql问题

Evan
flinksql 貌似是目前做不到你说的这样



 
发件人: air23
发送时间: 2021-01-06 12:29
收件人: user-zh
主题: flink sql消费kafka sink到mysql问题
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
然后再重启 发现报错的数据 会丢失
采用的scan.startup.mode' = 'group-offsets'
按理说 不是要重新消费 失败的那条数据 开始消费吗?
请问如何配置 可以不丢失数据
 
 
CREATE TABLE source1 (
id BIGINT           ,
username STRING     ,
password STRING      ,
AddTime TIMESTAMP      ,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'plink_canal',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'canal1',
'scan.startup.mode' = 'group-offsets',
'canal-json.table.include' = 'test.*',
-- 'canal-json.ignore-parse-errors' = 'true', -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
'format' = 'canal-json'
);
Reply | Threaded
Open this post in threaded view
|

Re:回复: flink sql消费kafka sink到mysql问题

air23
In reply to this post by air23
发现是flink sql 消费kafka 不管有没有解析成功。先去提交offset到kafka 但是实际 是解析失败了。
在 2021-01-06 14:01:34,"Evan" <[hidden email]> 写道:

>flinksql 貌似是目前做不到你说的这样
>
>
>
>
>发件人: air23
>发送时间: 2021-01-06 12:29
>收件人: user-zh
>主题: flink sql消费kafka sink到mysql问题
>你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
>然后再重启 发现报错的数据 会丢失
>采用的scan.startup.mode' = 'group-offsets'
>按理说 不是要重新消费 失败的那条数据 开始消费吗?
>请问如何配置 可以不丢失数据
>
>
>CREATE TABLE source1 (
>id BIGINT           ,
>username STRING     ,
>password STRING      ,
>AddTime TIMESTAMP      ,
>origin_table STRING METADATA FROM 'value.table' VIRTUAL,
>origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL
>) WITH (
>'connector' = 'kafka',
>'topic' = 'plink_canal',
>'properties.bootstrap.servers' = '***',
>'properties.group.id' = 'canal1',
>'scan.startup.mode' = 'group-offsets',
>'canal-json.table.include' = 'test.*',
>-- 'canal-json.ignore-parse-errors' = 'true', -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
>'format' = 'canal-json'
>);