flink sql写mysql中文乱码问题

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql写mysql中文乱码问题

casel.chen
我的flink sql作业如下


SELECT
product_name,
window_start,
window_end,
CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
CAST(COUNT(order_no)ASBIGINT) trans_cnt,
-- LOCALTIMESTAMP AS insert_time,
'微支付事业部'AS bus_name
FROM(


mysql sink表的定义如下
CREATE TABLE XXX (
) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;


运行起来后写入mysql表的数据带有中文乱码 ??????



查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.
Reply | Threaded
Open this post in threaded view
|

Re:flink sql写mysql中文乱码问题

王炳焱
你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/>    id INT  COMMENT '订单id',<br/>    goods_name VARCHAR(128) COMMENT '商品名称',<br/>    price DECIMAL(32,2) COMMENT '商品价格',<br/>    user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/>   'connector' = 'jdbc',<br/>   'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8',<br/>   'username' = 'mysqluser',<br/>   'password' = 'mysqluser',<br/>   'table-name' = 'jdbc_sink'<br/>)
在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道:

>我的flink sql作业如下
>
>
>SELECT
>product_name,
>window_start,
>window_end,
>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>-- LOCALTIMESTAMP AS insert_time,
>'微支付事业部'AS bus_name
>FROM(
>
>
>mysql sink表的定义如下
>CREATE TABLE XXX (
>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>
>
>运行起来后写入mysql表的数据带有中文乱码 ??????
>
>
>
>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.
Reply | Threaded
Open this post in threaded view
|

Re:Re:flink sql写mysql中文乱码问题

casel.chen
我的URL连接串已经使用了  useUnicode=true&amp;characterEncoding=UTF-8 结果还是会有乱码

















在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道:

>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/>    id INT  COMMENT '订单id',<br/>    goods_name VARCHAR(128) COMMENT '商品名称',<br/>    price DECIMAL(32,2) COMMENT '商品价格',<br/>    user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/>   'connector' = 'jdbc',<br/>   'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8',<br/>   'username' = 'mysqluser',<br/>   'password' = 'mysqluser',<br/>   'table-name' = 'jdbc_sink'<br/>)
>在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道:
>>我的flink sql作业如下
>>
>>
>>SELECT
>>product_name,
>>window_start,
>>window_end,
>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>>-- LOCALTIMESTAMP AS insert_time,
>>'微支付事业部'AS bus_name
>>FROM(
>>
>>
>>mysql sink表的定义如下
>>CREATE TABLE XXX (
>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>>
>>
>>运行起来后写入mysql表的数据带有中文乱码 ??????
>>
>>
>>
>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>>2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:flink sql写mysql中文乱码问题

王炳焱
那你就要看一下你数据库表的每个字段的编码格式是什么?有没有调整编码格式?我这边是可以的
在 2021-05-18 18:19:31,"casel.chen" <[hidden email]> 写道:

>我的URL连接串已经使用了  useUnicode=true&amp;characterEncoding=UTF-8 结果还是会有乱码
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道:
>>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/>    id INT  COMMENT '订单id',<br/>    goods_name VARCHAR(128) COMMENT '商品名称',<br/>    price DECIMAL(32,2) COMMENT '商品价格',<br/>    user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/>   'connector' = 'jdbc',<br/>   'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8',<br/>   'username' = 'mysqluser',<br/>   'password' = 'mysqluser',<br/>   'table-name' = 'jdbc_sink'<br/>)
>>在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道:
>>>我的flink sql作业如下
>>>
>>>
>>>SELECT
>>>product_name,
>>>window_start,
>>>window_end,
>>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>>>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>>>-- LOCALTIMESTAMP AS insert_time,
>>>'微支付事业部'AS bus_name
>>>FROM(
>>>
>>>
>>>mysql sink表的定义如下
>>>CREATE TABLE XXX (
>>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>>>
>>>
>>>运行起来后写入mysql表的数据带有中文乱码 ??????
>>>
>>>
>>>
>>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>>>2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:flink sql写mysql中文乱码问题

Michael Ran
In reply to this post by casel.chen



数据库的字段字符编码














在 2021-05-18 18:19:31,"casel.chen" <[hidden email]> 写道:

>我的URL连接串已经使用了  useUnicode=true&amp;characterEncoding=UTF-8 结果还是会有乱码
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道:
>>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/>    id INT  COMMENT '订单id',<br/>    goods_name VARCHAR(128) COMMENT '商品名称',<br/>    price DECIMAL(32,2) COMMENT '商品价格',<br/>    user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/>   'connector' = 'jdbc',<br/>   'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8',<br/>   'username' = 'mysqluser',<br/>   'password' = 'mysqluser',<br/>   'table-name' = 'jdbc_sink'<br/>)
>>在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道:
>>>我的flink sql作业如下
>>>
>>>
>>>SELECT
>>>product_name,
>>>window_start,
>>>window_end,
>>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>>>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>>>-- LOCALTIMESTAMP AS insert_time,
>>>'微支付事业部'AS bus_name
>>>FROM(
>>>
>>>
>>>mysql sink表的定义如下
>>>CREATE TABLE XXX (
>>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>>>
>>>
>>>运行起来后写入mysql表的数据带有中文乱码 ??????
>>>
>>>
>>>
>>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>>>2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.
Reply | Threaded
Open this post in threaded view
|

Re:Re:Re:Re:flink sql写mysql中文乱码问题

casel.chen
数据库字符编码设置如下


character_set_client,utf8mb4
character_set_connection,utf8mb4
character_set_database,utf8mb4
character_set_filesystem,binary
character_set_results,utf8mb4
character_set_server,utf8
character_set_system,utf8
character_sets_dir,/u01/mysql57_20200229/share/charsets/


客户端连接串是
jdbc:mysql://host:3306/datav_test?useUnicode=true&characterEncoding=utf8


本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢!

















在 2021-05-19 17:52:01,"Michael Ran" <[hidden email]> 写道:

>
>
>
>数据库的字段字符编码
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-18 18:19:31,"casel.chen" <[hidden email]> 写道:
>>我的URL连接串已经使用了  useUnicode=true&amp;characterEncoding=UTF-8 结果还是会有乱码
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道:
>>>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/>    id INT  COMMENT '订单id',<br/>    goods_name VARCHAR(128) COMMENT '商品名称',<br/>    price DECIMAL(32,2) COMMENT '商品价格',<br/>    user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/>   'connector' = 'jdbc',<br/>   'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8',<br/>   'username' = 'mysqluser',<br/>   'password' = 'mysqluser',<br/>   'table-name' = 'jdbc_sink'<br/>)
>>>在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道:
>>>>我的flink sql作业如下
>>>>
>>>>
>>>>SELECT
>>>>product_name,
>>>>window_start,
>>>>window_end,
>>>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>>>>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>>>>-- LOCALTIMESTAMP AS insert_time,
>>>>'微支付事业部'AS bus_name
>>>>FROM(
>>>>
>>>>
>>>>mysql sink表的定义如下
>>>>CREATE TABLE XXX (
>>>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>>>>
>>>>
>>>>运行起来后写入mysql表的数据带有中文乱码 ??????
>>>>
>>>>
>>>>
>>>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>>>>2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>>>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.
Reply | Threaded
Open this post in threaded view
|

Re:flink sql写mysql中文乱码问题

casel.chen



我看flink官网介绍Table API & SQL的时候有打印sql执行计划中使用的是UTF-16LE字符集,为什么不用UTF-8呢?乱码会不会跟这个有关?
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/common/#%e8%a7%a3%e9%87%8a%e8%a1%a8



上述例子的结果是:

```text == Abstract Syntax Tree == LogicalUnion(all=[true]) LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== Optimized Logical Plan == DataStreamUnion(all=[true], union all=[count, word]) DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE’F%')]) DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], fields=[count, word])

== Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat

















在 2021-05-25 10:40:46,"casel.chen" <[hidden email]> 写道:

>数据库字符编码设置如下
>
>
>character_set_client,utf8mb4
>character_set_connection,utf8mb4
>character_set_database,utf8mb4
>character_set_filesystem,binary
>character_set_results,utf8mb4
>character_set_server,utf8
>character_set_system,utf8
>character_sets_dir,/u01/mysql57_20200229/share/charsets/
>
>
>客户端连接串是
>jdbc:mysql://host:3306/datav_test?useUnicode=true&characterEncoding=utf8
>
>
>本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-19 17:52:01,"Michael Ran" <[hidden email]> 写道:
>>
>>
>>
>>数据库的字段字符编码
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2021-05-18 18:19:31,"casel.chen" <[hidden email]> 写道:
>>>我的URL连接串已经使用了  useUnicode=true&amp;characterEncoding=UTF-8 结果还是会有乱码
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道:
>>>>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/>    id INT  COMMENT '订单id',<br/>    goods_name VARCHAR(128) COMMENT '商品名称',<br/>    price DECIMAL(32,2) COMMENT '商品价格',<br/>    user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/>   'connector' = 'jdbc',<br/>   'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8',<br/>   'username' = 'mysqluser',<br/>   'password' = 'mysqluser',<br/>   'table-name' = 'jdbc_sink'<br/>)
>>>>在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道:
>>>>>我的flink sql作业如下
>>>>>
>>>>>
>>>>>SELECT
>>>>>product_name,
>>>>>window_start,
>>>>>window_end,
>>>>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>>>>>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>>>>>-- LOCALTIMESTAMP AS insert_time,
>>>>>'微支付事业部'AS bus_name
>>>>>FROM(
>>>>>
>>>>>
>>>>>mysql sink表的定义如下
>>>>>CREATE TABLE XXX (
>>>>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>>>>>
>>>>>
>>>>>运行起来后写入mysql表的数据带有中文乱码 ??????
>>>>>
>>>>>
>>>>>
>>>>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>>>>>2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>>>>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.
Reply | Threaded
Open this post in threaded view
|

回复:flink sql写mysql中文乱码问题

Jason Lee


同遇到这个问题,看TM的日志的执行Vertic可以看到Cast(_UTF-16LE),然后我们是想往Kafka写入数据,结果写入乱码。


然后想过通过(1)在集群机器的flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”
(2) 通过DECODE()ENCODE()将字符串编码解码成UTF-8或者GBK,都行不通


上述两种方式对乱码数据处理吗但是都是还会出现中文乱码。不知道你尝试过什么方法?有没有成功解决的?


| |
李闯
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2021年05月25日 23:31,casel.chen<[hidden email]> 写道:



我看flink官网介绍Table API & SQL的时候有打印sql执行计划中使用的是UTF-16LE字符集,为什么不用UTF-8呢?乱码会不会跟这个有关?
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/common/#%e8%a7%a3%e9%87%8a%e8%a1%a8



上述例子的结果是:

```text == Abstract Syntax Tree == LogicalUnion(all=[true]) LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== Optimized Logical Plan == DataStreamUnion(all=[true], union all=[count, word]) DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE’F%')]) DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], fields=[count, word])

== Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat

















在 2021-05-25 10:40:46,"casel.chen" <[hidden email]> 写道:
数据库字符编码设置如下


character_set_client,utf8mb4
character_set_connection,utf8mb4
character_set_database,utf8mb4
character_set_filesystem,binary
character_set_results,utf8mb4
character_set_server,utf8
character_set_system,utf8
character_sets_dir,/u01/mysql57_20200229/share/charsets/


客户端连接串是
jdbc:mysql://host:3306/datav_test?useUnicode=true&characterEncoding=utf8


本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢!

















在 2021-05-19 17:52:01,"Michael Ran" <[hidden email]> 写道:



数据库的字段字符编码














在 2021-05-18 18:19:31,"casel.chen" <[hidden email]> 写道:
我的URL连接串已经使用了  useUnicode=true&amp;characterEncoding=UTF-8 结果还是会有乱码

















在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道:
你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/>    id INT  COMMENT '订单id',<br/>    goods_name VARCHAR(128) COMMENT '商品名称',<br/>    price DECIMAL(32,2) COMMENT '商品价格',<br/>    user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/>   'connector' = 'jdbc',<br/>   'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8',<br/>   'username' = 'mysqluser',<br/>   'password' = 'mysqluser',<br/>   'table-name' = 'jdbc_sink'<br/>)
在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道:
我的flink sql作业如下


SELECT
product_name,
window_start,
window_end,
CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
CAST(COUNT(order_no)ASBIGINT) trans_cnt,
-- LOCALTIMESTAMP AS insert_time,
'微支付事业部'AS bus_name
FROM(


mysql sink表的定义如下
CREATE TABLE XXX (
) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;


运行起来后写入mysql表的数据带有中文乱码 ??????



查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.
Reply | Threaded
Open this post in threaded view
|

Re:回复:flink sql写mysql中文乱码问题

casel.chen
我用的就是在启动TM时添加参数 -Dfile.encoding=UTF-8 解决的

















在 2021-06-09 21:47:48,"Jason Lee" <[hidden email]> 写道:

>
>
>同遇到这个问题,看TM的日志的执行Vertic可以看到Cast(_UTF-16LE),然后我们是想往Kafka写入数据,结果写入乱码。
>
>
>然后想过通过(1)在集群机器的flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”
>(2) 通过DECODE()ENCODE()将字符串编码解码成UTF-8或者GBK,都行不通
>
>
>上述两种方式对乱码数据处理吗但是都是还会出现中文乱码。不知道你尝试过什么方法?有没有成功解决的?
>
>
>| |
>李闯
>|
>|
>[hidden email]
>|
>签名由网易邮箱大师定制
>
>
>在2021年05月25日 23:31,casel.chen<[hidden email]> 写道:
>
>
>
>我看flink官网介绍Table API & SQL的时候有打印sql执行计划中使用的是UTF-16LE字符集,为什么不用UTF-8呢?乱码会不会跟这个有关?
>https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/common/#%e8%a7%a3%e9%87%8a%e8%a1%a8
>
>
>
>上述例子的结果是:
>
>```text == Abstract Syntax Tree == LogicalUnion(all=[true]) LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
>
>== Optimized Logical Plan == DataStreamUnion(all=[true], union all=[count, word]) DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE’F%')]) DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], fields=[count, word])
>
>== Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-25 10:40:46,"casel.chen" <[hidden email]> 写道:
>数据库字符编码设置如下
>
>
>character_set_client,utf8mb4
>character_set_connection,utf8mb4
>character_set_database,utf8mb4
>character_set_filesystem,binary
>character_set_results,utf8mb4
>character_set_server,utf8
>character_set_system,utf8
>character_sets_dir,/u01/mysql57_20200229/share/charsets/
>
>
>客户端连接串是
>jdbc:mysql://host:3306/datav_test?useUnicode=true&characterEncoding=utf8
>
>
>本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-19 17:52:01,"Michael Ran" <[hidden email]> 写道:
>
>
>
>数据库的字段字符编码
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-18 18:19:31,"casel.chen" <[hidden email]> 写道:
>我的URL连接串已经使用了  useUnicode=true&amp;characterEncoding=UTF-8 结果还是会有乱码
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-18 17:21:12,"王炳焱" <[hidden email]> 写道:
>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8,像这样<br/>CREATE TABLE jdbc_sink(<br/>    id INT  COMMENT '订单id',<br/>    goods_name VARCHAR(128) COMMENT '商品名称',<br/>    price DECIMAL(32,2) COMMENT '商品价格',<br/>    user_name VARCHAR(64) COMMENT '用户名称'<br/>) WITH (<br/>   'connector' = 'jdbc',<br/>   'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&amp;characterEncoding=UTF-8',<br/>   'username' = 'mysqluser',<br/>   'password' = 'mysqluser',<br/>   'table-name' = 'jdbc_sink'<br/>)
>在 2021-05-18 11:55:46,"casel.chen" <[hidden email]> 写道:
>我的flink sql作业如下
>
>
>SELECT
>product_name,
>window_start,
>window_end,
>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>-- LOCALTIMESTAMP AS insert_time,
>'微支付事业部'AS bus_name
>FROM(
>
>
>mysql sink表的定义如下
>CREATE TABLE XXX (
>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>
>
>运行起来后写入mysql表的数据带有中文乱码 ??????
>
>
>
>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.