flink-cdc 无法读出binlog,程序也不报错

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink-cdc 无法读出binlog,程序也不报错

chenjb
hi,大佬们好,我用写了段java代码,通过cdc读取mysql的数据并通过print-table打印出来,但实际没打印,代码也不报错,一直处于运行状态

*idea中运行信息如下:*
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further
details.
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver
class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered
via the SPI and manual loading of the driver class is generally unnecessary.
十二月 02, 2020 9:31:45 下午 com.github.shyiko.mysql.binlog.BinaryLogClient
connect
信息: Connected to localhost:3307 at mysql-bin.000002/2556 (sid:6668, cid:11)




*mysql相关配置:*
binlog_format ROW
log_bin ON
binlog_row_image FULL




*java主要代码如下*

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class mysqlSourceAndSink {
    public static void main(String[] args) {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
envSettings);


        String cdc_user_id = "create table cdc_user_id(\n" +
                "id INT \n" +
                ",pid INT \n" +
                ",PRIMARY KEY (id) NOT ENFORCED \n" +
                ") WITH (\n" +
                " 'connector' = 'mysql-cdc',\n" +
                " 'hostname' = 'localhost',\n" +
                " 'port' = '3307',\n" +
                " 'username' = 'flink',\n" +
                " 'password' = '123456',\n" +
                " 'server-id' = '6668',\n" +
//                " 'server-time-zone' = 'UTC',\n" +
                " 'server-time-zone' = 'Asia/Shanghai',\n" +
                " 'database-name' = 'flinktest',\n" +
                " 'table-name' = 'flinktest.user_id'\n" +
                ")";

        String table_print = "create table table_print( \n" +
                "id bigint,\n" +
                "pid bigint\n" +
                ") WITH(\n" +
                "'connector' = 'print',\n" +
                "'print-identifier' = 'pppp',\n" +
                "'standard-error' = 'true'\n" +
                ")";

        tableEnv.executeSql(table_print);
//        tableEnv.executeSql(jdbc_user_id);
        tableEnv.executeSql(cdc_user_id);

       String cdcUserPid2Print = "insert into table_print select id, pid
from cdc_user_id";
        tableEnv.executeSql(cdcUserPid2Print);



*
mysql的err日志中有如下打印*

2020-12-02T13:29:52.412476Z 8 [Note] Start binlog_dump to
master_thread_id(8) slave_server(6668), pos(mysql-bin.000002, 2273)
2020-12-02T13:31:03.416362Z 7 [Note] Aborted connection 7 to db:
'unconnected' user: 'flink' host: 'localhost' (Got an error reading
communication packets)
2020-12-02T13:31:36.146219Z 8 [Note] Aborted connection 8 to db:
'unconnected' user: 'flink' host: 'localhost' (failed on flush_net())
2020-12-02T13:31:45.575431Z 11 [Note] Start binlog_dump to
master_thread_id(11) slave_server(6668), pos(mysql-bin.000002, 2556)



*
账号有相关授权*
create user 'flink'@'%' identified by '123456';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'flink' IDENTIFIED BY '123456';
flush privileges;



另外我用flink的jdbc测试也是类似的,不报错,也没把数据print出来

烦请大佬帮忙看看我这是什么原因,谢谢





--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink-cdc 无法读出binlog,程序也不报错

chenjb
破案了,字段类型没按官网的要求对应起来,对应起来后正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink-cdc 无法读出binlog,程序也不报错

Jark
Administrator
是不是 unsigned int 惹的祸...

On Thu, 3 Dec 2020 at 15:15, chenjb <[hidden email]> wrote:

> 破案了,字段类型没按官网的要求对应起来,对应起来后正常了
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink-cdc 无法读出binlog,程序也不报错

chenjb
谢谢老哥关注,我是int对应成bigint了,原以为bigint范围更大应该可以兼容,没认真看文档,然后mysql有错误日志但程序没报错,注意力就放mysql那边了。这两天试了下flink-cdc-sql发现有些有错误的场景不报错,比如连不上mysql(密码错误)也是程序直接退出exit
0,没任何报错或提示,是还要设置什么吗?我是java小白,这块不是很懂



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink-cdc 无法读出binlog,程序也不报错

Jark
Administrator
这个听起来不太合理。总得报个什么错 作业再失败吧。 或者TaskManager 的日志中有没有什么异常信息?

On Fri, 4 Dec 2020 at 09:23, chenjb <[hidden email]> wrote:

>
> 谢谢老哥关注,我是int对应成bigint了,原以为bigint范围更大应该可以兼容,没认真看文档,然后mysql有错误日志但程序没报错,注意力就放mysql那边了。这两天试了下flink-cdc-sql发现有些有错误的场景不报错,比如连不上mysql(密码错误)也是程序直接退出exit
> 0,没任何报错或提示,是还要设置什么吗?我是java小白,这块不是很懂
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink-cdc 无法读出binlog,程序也不报错

chenjb
老哥,抱歉,后面忘了看这里的消息了,我当时在本地用idea调试的,也没开webui,所以也没看TaskManager的日志,确实在idea里面是没报错就结束了。后面把字段类型按官网的严格对应起来就没这个问题了。多谢老哥回复



--
Sent from: http://apache-flink.147419.n8.nabble.com/