flink cdc 查询mysql binlog时,在 streaming模式下,如果无法获得reload权限应该如何配置参数

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flink cdc 查询mysql binlog时,在 streaming模式下,如果无法获得reload权限应该如何配置参数

jqwang
hi all

最近在使用flink cdc的时候,遇到需要用streaming mode来对收取的binlog日志进行后续相对复杂处理的需求。因为生产环境不允许mysql的RELOAD权限,之前在利用sql模式使用cdc的时候应用了
debezium.snap.shot.locking.mode = none的参数。


但在使用streaming mode的时候遇到了困难,不知道应该如何配置可以支持在没有reload权限的时候使用flink cdc。目前使用的配置方法是
Properties properties = new Properties();
properties.setProperty("debezium.snapshot.locking.mode", "none");
SourceFunction<String&gt; sourceFunction = MySQLSource.<String&gt;builder()
&nbsp; &nbsp; &nbsp; &nbsp; .hostname("localhost")
&nbsp; &nbsp; &nbsp; &nbsp; .port(port)
&nbsp; &nbsp; &nbsp; &nbsp; .databaseList("database")&nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; .tableList("database.test")
&nbsp; &nbsp; &nbsp; &nbsp; .username(“user)
&nbsp; &nbsp; &nbsp; &nbsp; .password("password")
&nbsp; &nbsp; &nbsp; &nbsp; .debeziumProperties(properties)
&nbsp; &nbsp; &nbsp; &nbsp; .deserializer(new StringDebeziumDeserializationSchema())
&nbsp; &nbsp; &nbsp; &nbsp; .build();

但.debeziumProperties(properties)好像并没有和sql模式一样生效。请问我应该怎么配置sourceFunction,或者Streaming模式有没有提供什么方法解决这个问题呢?
Reply | Threaded
Open this post in threaded view
|

回复:flink cdc 查询mysql binlog时,在 streaming模式下,如果无法获得reload权限应该如何配置参数

MOBIN
试试这种写法,在datastreaming API中的,debezium的相关参数应该是不需要加debezium前缀的
public static Properties debeziumProperties(){
    Properties properties = new Properties();
properties.setProperty(“xxxx”,”xxxx");
    return properties;
}


    SourceFunction<T> sourceFunction = MySQLSource.<T>builder()
            . . .
            .debeziumProperties(debeziumProperties())
            .build();


| |
MOBIN
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2021年04月27日 14:46,疾鹰击皓月<[hidden email]> 写道:
hi all

最近在使用flink cdc的时候,遇到需要用streaming mode来对收取的binlog日志进行后续相对复杂处理的需求。因为生产环境不允许mysql的RELOAD权限,之前在利用sql模式使用cdc的时候应用了
debezium.snap.shot.locking.mode = none的参数。


但在使用streaming mode的时候遇到了困难,不知道应该如何配置可以支持在没有reload权限的时候使用flink cdc。目前使用的配置方法是
Properties properties = new Properties();
properties.setProperty("debezium.snapshot.locking.mode", "none");
SourceFunction<String&gt; sourceFunction = MySQLSource.<String&gt;builder()
&nbsp; &nbsp; &nbsp; &nbsp; .hostname("localhost")
&nbsp; &nbsp; &nbsp; &nbsp; .port(port)
&nbsp; &nbsp; &nbsp; &nbsp; .databaseList("database")&nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; .tableList("database.test")
&nbsp; &nbsp; &nbsp; &nbsp; .username(“user)
&nbsp; &nbsp; &nbsp; &nbsp; .password("password")
&nbsp; &nbsp; &nbsp; &nbsp; .debeziumProperties(properties)
&nbsp; &nbsp; &nbsp; &nbsp; .deserializer(new StringDebeziumDeserializationSchema())
&nbsp; &nbsp; &nbsp; &nbsp; .build();

但.debeziumProperties(properties)好像并没有和sql模式一样生效。请问我应该怎么配置sourceFunction,或者Streaming模式有没有提供什么方法解决这个问题呢?
Reply | Threaded
Open this post in threaded view
|

回复:flink cdc 查询mysql binlog时,在 streaming模式下,如果无法获得reload权限应该如何配置参数

jqwang
谢谢,已经可以正常使用了


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2021年4月27日(星期二) 下午3:15
收件人:&nbsp;"[hidden email]"<[hidden email]&gt;;
抄送:&nbsp;"user-zh"<[hidden email]&gt;;
主题:&nbsp;回复:flink cdc 查询mysql binlog时,在 streaming模式下,如果无法获得reload权限应该如何配置参数



试试这种写法,在datastreaming API中的,debezium的相关参数应该是不需要加debezium前缀的
public static Properties debeziumProperties(){
&nbsp;&nbsp;&nbsp; Properties properties = new Properties();
properties.setProperty(“xxxx”,”xxxx");
&nbsp;&nbsp;&nbsp; return properties;
}


&nbsp;&nbsp;&nbsp; SourceFunction<T&gt; sourceFunction = MySQLSource.<T&gt;builder()
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; . . .
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .debeziumProperties(debeziumProperties())
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .build();


| |
MOBIN
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2021年04月27日 14:46,疾鹰击皓月<[hidden email]&gt; 写道:
hi all

最近在使用flink cdc的时候,遇到需要用streaming mode来对收取的binlog日志进行后续相对复杂处理的需求。因为生产环境不允许mysql的RELOAD权限,之前在利用sql模式使用cdc的时候应用了
debezium.snap.shot.locking.mode = none的参数。


但在使用streaming mode的时候遇到了困难,不知道应该如何配置可以支持在没有reload权限的时候使用flink cdc。目前使用的配置方法是
Properties properties = new Properties();
properties.setProperty("debezium.snapshot.locking.mode", "none");
SourceFunction<String&amp;gt; sourceFunction = MySQLSource.<String&amp;gt;builder()
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .hostname("localhost")
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .port(port)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .databaseList("database")&amp;nbsp;
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .tableList("database.test")
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .username(“user)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .password("password")
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .debeziumProperties(properties)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .deserializer(new StringDebeziumDeserializationSchema())
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .build();

但.debeziumProperties(properties)好像并没有和sql模式一样生效。请问我应该怎么配置sourceFunction,或者Streaming模式有没有提供什么方法解决这个问题呢?