我的flink sql作业如下
SELECT product_name, window_start, window_end, CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, CAST(COUNT(order_no)ASBIGINT) trans_cnt, -- LOCALTIMESTAMP AS insert_time, '微支付事业部'AS bus_name FROM( mysql sink表的定义如下 CREATE TABLE XXX ( ) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4; 运行起来后写入mysql表的数据带有中文乱码 ?????? 查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? 2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. 2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING. |
你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/> id INT COMMENT '订单id',<br/> goods_name VARCHAR(128) COMMENT '商品名称',<br/> price DECIMAL(32,2) COMMENT '商品价格',<br/> user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/> 'connector' = 'jdbc',<br/> 'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',<br/> 'username' = 'mysqluser',<br/> 'password' = 'mysqluser',<br/> 'table-name' = 'jdbc_sink'<br/>)
在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道: >我的flink sql作业如下 > > >SELECT >product_name, >window_start, >window_end, >CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, >CAST(COUNT(order_no)ASBIGINT) trans_cnt, >-- LOCALTIMESTAMP AS insert_time, >'微支付事业部'AS bus_name >FROM( > > >mysql sink表的定义如下 >CREATE TABLE XXX ( >) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4; > > >运行起来后写入mysql表的数据带有中文乱码 ?????? > > > >查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? >2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. >2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING. |
我的URL连接串已经使用了 useUnicode=true&characterEncoding=UTF-8 结果还是会有乱码
在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道: >你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/> id INT COMMENT '订单id',<br/> goods_name VARCHAR(128) COMMENT '商品名称',<br/> price DECIMAL(32,2) COMMENT '商品价格',<br/> user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/> 'connector' = 'jdbc',<br/> 'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',<br/> 'username' = 'mysqluser',<br/> 'password' = 'mysqluser',<br/> 'table-name' = 'jdbc_sink'<br/>) >在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道: >>我的flink sql作业如下 >> >> >>SELECT >>product_name, >>window_start, >>window_end, >>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, >>CAST(COUNT(order_no)ASBIGINT) trans_cnt, >>-- LOCALTIMESTAMP AS insert_time, >>'微支付事业部'AS bus_name >>FROM( >> >> >>mysql sink表的定义如下 >>CREATE TABLE XXX ( >>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4; >> >> >>运行起来后写入mysql表的数据带有中文乱码 ?????? >> >> >> >>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? >>2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. >>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING. |
那你就要看一下你数据库表的每个字段的编码格式是什么?有没有调整编码格式?我这边是可以的
在 2021-05-18 18:19:31,"casel.chen" <[hidden email]> 写道: >我的URL连接串已经使用了 useUnicode=true&characterEncoding=UTF-8 结果还是会有乱码 > > > > > > > > > > > > > > > > > >在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道: >>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/> id INT COMMENT '订单id',<br/> goods_name VARCHAR(128) COMMENT '商品名称',<br/> price DECIMAL(32,2) COMMENT '商品价格',<br/> user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/> 'connector' = 'jdbc',<br/> 'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',<br/> 'username' = 'mysqluser',<br/> 'password' = 'mysqluser',<br/> 'table-name' = 'jdbc_sink'<br/>) >>在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道: >>>我的flink sql作业如下 >>> >>> >>>SELECT >>>product_name, >>>window_start, >>>window_end, >>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, >>>CAST(COUNT(order_no)ASBIGINT) trans_cnt, >>>-- LOCALTIMESTAMP AS insert_time, >>>'微支付事业部'AS bus_name >>>FROM( >>> >>> >>>mysql sink表的定义如下 >>>CREATE TABLE XXX ( >>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4; >>> >>> >>>运行起来后写入mysql表的数据带有中文乱码 ?????? >>> >>> >>> >>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? >>>2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. >>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING. |
In reply to this post by casel.chen
数据库的字段字符编码 在 2021-05-18 18:19:31,"casel.chen" <[hidden email]> 写道: >我的URL连接串已经使用了 useUnicode=true&characterEncoding=UTF-8 结果还是会有乱码 > > > > > > > > > > > > > > > > > >在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道: >>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/> id INT COMMENT '订单id',<br/> goods_name VARCHAR(128) COMMENT '商品名称',<br/> price DECIMAL(32,2) COMMENT '商品价格',<br/> user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/> 'connector' = 'jdbc',<br/> 'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',<br/> 'username' = 'mysqluser',<br/> 'password' = 'mysqluser',<br/> 'table-name' = 'jdbc_sink'<br/>) >>在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道: >>>我的flink sql作业如下 >>> >>> >>>SELECT >>>product_name, >>>window_start, >>>window_end, >>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, >>>CAST(COUNT(order_no)ASBIGINT) trans_cnt, >>>-- LOCALTIMESTAMP AS insert_time, >>>'微支付事业部'AS bus_name >>>FROM( >>> >>> >>>mysql sink表的定义如下 >>>CREATE TABLE XXX ( >>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4; >>> >>> >>>运行起来后写入mysql表的数据带有中文乱码 ?????? >>> >>> >>> >>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? >>>2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. >>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING. |
数据库字符编码设置如下
character_set_client,utf8mb4 character_set_connection,utf8mb4 character_set_database,utf8mb4 character_set_filesystem,binary character_set_results,utf8mb4 character_set_server,utf8 character_set_system,utf8 character_sets_dir,/u01/mysql57_20200229/share/charsets/ 客户端连接串是 jdbc:mysql://host:3306/datav_test?useUnicode=true&characterEncoding=utf8 本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢! 在 2021-05-19 17:52:01,"Michael Ran" <[hidden email]> 写道: > > > >数据库的字段字符编码 > > > > > > > > > > > > > > >在 2021-05-18 18:19:31,"casel.chen" <[hidden email]> 写道: >>我的URL连接串已经使用了 useUnicode=true&characterEncoding=UTF-8 结果还是会有乱码 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道: >>>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/> id INT COMMENT '订单id',<br/> goods_name VARCHAR(128) COMMENT '商品名称',<br/> price DECIMAL(32,2) COMMENT '商品价格',<br/> user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/> 'connector' = 'jdbc',<br/> 'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',<br/> 'username' = 'mysqluser',<br/> 'password' = 'mysqluser',<br/> 'table-name' = 'jdbc_sink'<br/>) >>>在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道: >>>>我的flink sql作业如下 >>>> >>>> >>>>SELECT >>>>product_name, >>>>window_start, >>>>window_end, >>>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, >>>>CAST(COUNT(order_no)ASBIGINT) trans_cnt, >>>>-- LOCALTIMESTAMP AS insert_time, >>>>'微支付事业部'AS bus_name >>>>FROM( >>>> >>>> >>>>mysql sink表的定义如下 >>>>CREATE TABLE XXX ( >>>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4; >>>> >>>> >>>>运行起来后写入mysql表的数据带有中文乱码 ?????? >>>> >>>> >>>> >>>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? >>>>2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. >>>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING. |
我看flink官网介绍Table API & SQL的时候有打印sql执行计划中使用的是UTF-16LE字符集,为什么不用UTF-8呢?乱码会不会跟这个有关? https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/common/#%e8%a7%a3%e9%87%8a%e8%a1%a8 上述例子的结果是: ```text == Abstract Syntax Tree == LogicalUnion(all=[true]) LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) FlinkLogicalDataStreamScan(id=[2], fields=[count, word]) == Optimized Logical Plan == DataStreamUnion(all=[true], union all=[count, word]) DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE’F%')]) DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], fields=[count, word]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat 在 2021-05-25 10:40:46,"casel.chen" <[hidden email]> 写道: >数据库字符编码设置如下 > > >character_set_client,utf8mb4 >character_set_connection,utf8mb4 >character_set_database,utf8mb4 >character_set_filesystem,binary >character_set_results,utf8mb4 >character_set_server,utf8 >character_set_system,utf8 >character_sets_dir,/u01/mysql57_20200229/share/charsets/ > > >客户端连接串是 >jdbc:mysql://host:3306/datav_test?useUnicode=true&characterEncoding=utf8 > > >本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢! > > > > > > > > > > > > > > > > > >在 2021-05-19 17:52:01,"Michael Ran" <[hidden email]> 写道: >> >> >> >>数据库的字段字符编码 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2021-05-18 18:19:31,"casel.chen" <[hidden email]> 写道: >>>我的URL连接串已经使用了 useUnicode=true&characterEncoding=UTF-8 结果还是会有乱码 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>>在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道: >>>>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/> id INT COMMENT '订单id',<br/> goods_name VARCHAR(128) COMMENT '商品名称',<br/> price DECIMAL(32,2) COMMENT '商品价格',<br/> user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/> 'connector' = 'jdbc',<br/> 'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',<br/> 'username' = 'mysqluser',<br/> 'password' = 'mysqluser',<br/> 'table-name' = 'jdbc_sink'<br/>) >>>>在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道: >>>>>我的flink sql作业如下 >>>>> >>>>> >>>>>SELECT >>>>>product_name, >>>>>window_start, >>>>>window_end, >>>>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, >>>>>CAST(COUNT(order_no)ASBIGINT) trans_cnt, >>>>>-- LOCALTIMESTAMP AS insert_time, >>>>>'微支付事业部'AS bus_name >>>>>FROM( >>>>> >>>>> >>>>>mysql sink表的定义如下 >>>>>CREATE TABLE XXX ( >>>>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4; >>>>> >>>>> >>>>>运行起来后写入mysql表的数据带有中文乱码 ?????? >>>>> >>>>> >>>>> >>>>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? >>>>>2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. >>>>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING. |
同遇到这个问题,看TM的日志的执行Vertic可以看到Cast(_UTF-16LE),然后我们是想往Kafka写入数据,结果写入乱码。 然后想过通过(1)在集群机器的flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8” (2) 通过DECODE()ENCODE()将字符串编码解码成UTF-8或者GBK,都行不通 上述两种方式对乱码数据处理吗但是都是还会出现中文乱码。不知道你尝试过什么方法?有没有成功解决的? | | 李闯 | | [hidden email] | 签名由网易邮箱大师定制 在2021年05月25日 23:31,casel.chen<[hidden email]> 写道: 我看flink官网介绍Table API & SQL的时候有打印sql执行计划中使用的是UTF-16LE字符集,为什么不用UTF-8呢?乱码会不会跟这个有关? https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/common/#%e8%a7%a3%e9%87%8a%e8%a1%a8 上述例子的结果是: ```text == Abstract Syntax Tree == LogicalUnion(all=[true]) LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) FlinkLogicalDataStreamScan(id=[2], fields=[count, word]) == Optimized Logical Plan == DataStreamUnion(all=[true], union all=[count, word]) DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE’F%')]) DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], fields=[count, word]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat 在 2021-05-25 10:40:46,"casel.chen" <[hidden email]> 写道: 数据库字符编码设置如下 character_set_client,utf8mb4 character_set_connection,utf8mb4 character_set_database,utf8mb4 character_set_filesystem,binary character_set_results,utf8mb4 character_set_server,utf8 character_set_system,utf8 character_sets_dir,/u01/mysql57_20200229/share/charsets/ 客户端连接串是 jdbc:mysql://host:3306/datav_test?useUnicode=true&characterEncoding=utf8 本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢! 在 2021-05-19 17:52:01,"Michael Ran" <[hidden email]> 写道: 数据库的字段字符编码 在 2021-05-18 18:19:31,"casel.chen" <[hidden email]> 写道: 我的URL连接串已经使用了 useUnicode=true&characterEncoding=UTF-8 结果还是会有乱码 在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道: 你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/> id INT COMMENT '订单id',<br/> goods_name VARCHAR(128) COMMENT '商品名称',<br/> price DECIMAL(32,2) COMMENT '商品价格',<br/> user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/> 'connector' = 'jdbc',<br/> 'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',<br/> 'username' = 'mysqluser',<br/> 'password' = 'mysqluser',<br/> 'table-name' = 'jdbc_sink'<br/>) 在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道: 我的flink sql作业如下 SELECT product_name, window_start, window_end, CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, CAST(COUNT(order_no)ASBIGINT) trans_cnt, -- LOCALTIMESTAMP AS insert_time, '微支付事业部'AS bus_name FROM( mysql sink表的定义如下 CREATE TABLE XXX ( ) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4; 运行起来后写入mysql表的数据带有中文乱码 ?????? 查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? 2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. 2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING. |
我用的就是在启动TM时添加参数 -Dfile.encoding=UTF-8 解决的
在 2021-06-09 21:47:48,"Jason Lee" <[hidden email]> 写道: > > >同遇到这个问题,看TM的日志的执行Vertic可以看到Cast(_UTF-16LE),然后我们是想往Kafka写入数据,结果写入乱码。 > > >然后想过通过(1)在集群机器的flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8” >(2) 通过DECODE()ENCODE()将字符串编码解码成UTF-8或者GBK,都行不通 > > >上述两种方式对乱码数据处理吗但是都是还会出现中文乱码。不知道你尝试过什么方法?有没有成功解决的? > > >| | >李闯 >| >| >[hidden email] >| >签名由网易邮箱大师定制 > > >在2021年05月25日 23:31,casel.chen<[hidden email]> 写道: > > > >我看flink官网介绍Table API & SQL的时候有打印sql执行计划中使用的是UTF-16LE字符集,为什么不用UTF-8呢?乱码会不会跟这个有关? >https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/common/#%e8%a7%a3%e9%87%8a%e8%a1%a8 > > > >上述例子的结果是: > >```text == Abstract Syntax Tree == LogicalUnion(all=[true]) LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) FlinkLogicalDataStreamScan(id=[2], fields=[count, word]) > >== Optimized Logical Plan == DataStreamUnion(all=[true], union all=[count, word]) DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE’F%')]) DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], fields=[count, word]) > >== Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat > > > > > > > > > > > > > > > > > >在 2021-05-25 10:40:46,"casel.chen" <[hidden email]> 写道: >数据库字符编码设置如下 > > >character_set_client,utf8mb4 >character_set_connection,utf8mb4 >character_set_database,utf8mb4 >character_set_filesystem,binary >character_set_results,utf8mb4 >character_set_server,utf8 >character_set_system,utf8 >character_sets_dir,/u01/mysql57_20200229/share/charsets/ > > >客户端连接串是 >jdbc:mysql://host:3306/datav_test?useUnicode=true&characterEncoding=utf8 > > >本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢! > > > > > > > > > > > > > > > > > >在 2021-05-19 17:52:01,"Michael Ran" <[hidden email]> 写道: > > > >数据库的字段字符编码 > > > > > > > > > > > > > > >在 2021-05-18 18:19:31,"casel.chen" <[hidden email]> 写道: >我的URL连接串已经使用了 useUnicode=true&characterEncoding=UTF-8 结果还是会有乱码 > > > > > > > > > > > > > > > > > >在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道: >你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/> id INT COMMENT '订单id',<br/> goods_name VARCHAR(128) COMMENT '商品名称',<br/> price DECIMAL(32,2) COMMENT '商品价格',<br/> user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/> 'connector' = 'jdbc',<br/> 'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',<br/> 'username' = 'mysqluser',<br/> 'password' = 'mysqluser',<br/> 'table-name' = 'jdbc_sink'<br/>) >在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道: >我的flink sql作业如下 > > >SELECT >product_name, >window_start, >window_end, >CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, >CAST(COUNT(order_no)ASBIGINT) trans_cnt, >-- LOCALTIMESTAMP AS insert_time, >'微支付事业部'AS bus_name >FROM( > > >mysql sink表的定义如下 >CREATE TABLE XXX ( >) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4; > > >运行起来后写入mysql表的数据带有中文乱码 ?????? > > > >查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? >2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. >2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING. |
Free forum by Nabble | Edit this page |