hi all
最近在使用flink cdc的时候,遇到需要用streaming mode来对收取的binlog日志进行后续相对复杂处理的需求。因为生产环境不允许mysql的RELOAD权限,之前在利用sql模式使用cdc的时候应用了 debezium.snap.shot.locking.mode = none的参数。 但在使用streaming mode的时候遇到了困难,不知道应该如何配置可以支持在没有reload权限的时候使用flink cdc。目前使用的配置方法是 Properties properties = new Properties(); properties.setProperty("debezium.snapshot.locking.mode", "none"); SourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname("localhost") .port(port) .databaseList("database") .tableList("database.test") .username(“user) .password("password") .debeziumProperties(properties) .deserializer(new StringDebeziumDeserializationSchema()) .build(); 但.debeziumProperties(properties)好像并没有和sql模式一样生效。请问我应该怎么配置sourceFunction,或者Streaming模式有没有提供什么方法解决这个问题呢? |
试试这种写法,在datastreaming API中的,debezium的相关参数应该是不需要加debezium前缀的
public static Properties debeziumProperties(){ Properties properties = new Properties(); properties.setProperty(“xxxx”,”xxxx"); return properties; } SourceFunction<T> sourceFunction = MySQLSource.<T>builder() . . . .debeziumProperties(debeziumProperties()) .build(); | | MOBIN | | [hidden email] | 签名由网易邮箱大师定制 在2021年04月27日 14:46,疾鹰击皓月<[hidden email]> 写道: hi all 最近在使用flink cdc的时候,遇到需要用streaming mode来对收取的binlog日志进行后续相对复杂处理的需求。因为生产环境不允许mysql的RELOAD权限,之前在利用sql模式使用cdc的时候应用了 debezium.snap.shot.locking.mode = none的参数。 但在使用streaming mode的时候遇到了困难,不知道应该如何配置可以支持在没有reload权限的时候使用flink cdc。目前使用的配置方法是 Properties properties = new Properties(); properties.setProperty("debezium.snapshot.locking.mode", "none"); SourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname("localhost") .port(port) .databaseList("database") .tableList("database.test") .username(“user) .password("password") .debeziumProperties(properties) .deserializer(new StringDebeziumDeserializationSchema()) .build(); 但.debeziumProperties(properties)好像并没有和sql模式一样生效。请问我应该怎么配置sourceFunction,或者Streaming模式有没有提供什么方法解决这个问题呢? |
谢谢,已经可以正常使用了
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年4月27日(星期二) 下午3:15 收件人: "[hidden email]"<[hidden email]>; 抄送: "user-zh"<[hidden email]>; 主题: 回复:flink cdc 查询mysql binlog时,在 streaming模式下,如果无法获得reload权限应该如何配置参数 试试这种写法,在datastreaming API中的,debezium的相关参数应该是不需要加debezium前缀的 public static Properties debeziumProperties(){ Properties properties = new Properties(); properties.setProperty(“xxxx”,”xxxx"); return properties; } SourceFunction<T> sourceFunction = MySQLSource.<T>builder() . . . .debeziumProperties(debeziumProperties()) .build(); | | MOBIN | | [hidden email] | 签名由网易邮箱大师定制 在2021年04月27日 14:46,疾鹰击皓月<[hidden email]> 写道: hi all 最近在使用flink cdc的时候,遇到需要用streaming mode来对收取的binlog日志进行后续相对复杂处理的需求。因为生产环境不允许mysql的RELOAD权限,之前在利用sql模式使用cdc的时候应用了 debezium.snap.shot.locking.mode = none的参数。 但在使用streaming mode的时候遇到了困难,不知道应该如何配置可以支持在没有reload权限的时候使用flink cdc。目前使用的配置方法是 Properties properties = new Properties(); properties.setProperty("debezium.snapshot.locking.mode", "none"); SourceFunction<String&gt; sourceFunction = MySQLSource.<String&gt;builder() &nbsp; &nbsp; &nbsp; &nbsp; .hostname("localhost") &nbsp; &nbsp; &nbsp; &nbsp; .port(port) &nbsp; &nbsp; &nbsp; &nbsp; .databaseList("database")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .tableList("database.test") &nbsp; &nbsp; &nbsp; &nbsp; .username(“user) &nbsp; &nbsp; &nbsp; &nbsp; .password("password") &nbsp; &nbsp; &nbsp; &nbsp; .debeziumProperties(properties) &nbsp; &nbsp; &nbsp; &nbsp; .deserializer(new StringDebeziumDeserializationSchema()) &nbsp; &nbsp; &nbsp; &nbsp; .build(); 但.debeziumProperties(properties)好像并没有和sql模式一样生效。请问我应该怎么配置sourceFunction,或者Streaming模式有没有提供什么方法解决这个问题呢? |
Free forum by Nabble | Edit this page |