关于flinksql 与维表mysql的关联问题

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view

关于flinksql 与维表mysql的关联问题

dear:    我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
Reply | Threaded
Open this post in threaded view

回复:关于flinksql 与维表mysql的关联问题


------------------ 原始邮件 ------------------
发件人:&nbsp;"小屁孩"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月4日(星期四) 下午2:15
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;关于flinksql 与维表mysql的关联问题

dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
Reply | Threaded
Open this post in threaded view

回复:关于flinksql 与维表mysql的关联问题

Yichao Yang
In reply to this post by 小屁孩


Yichao Yang

发件人:&nbsp;"小屁孩"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月4日(星期四) 下午2:15
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;关于flinksql 与维表mysql的关联问题

dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
Reply | Threaded
Open this post in threaded view

Re:关于flinksql 与维表mysql的关联问题

Michael Ran
In reply to this post by 小屁孩
放到open 方法里面可以吗?
在 2020-06-04 14:15:05,"小屁孩" <[hidden email]> 写道:
>dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
Reply | Threaded
Open this post in threaded view

回复:关于flinksql 与维表mysql的关联问题

您的意思是open 全量预加载吗?我目前的逻辑是自己自定义的source 广播出去

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;

public class GetMysqlDvcId extends RichSourceFunction<Map<String, Integer&gt;&gt; {

&nbsp; &nbsp; private Connection connection = null;
&nbsp; &nbsp; private PreparedStatement ps = null;
&nbsp; &nbsp; private volatile boolean isRunning = true;

&nbsp; &nbsp; @Override
&nbsp; &nbsp; public void open(Configuration parameters) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; super.open(parameters);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String database="db_nssa";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String host="";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String password="saa!";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String port="3306";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String username="root";

&nbsp; &nbsp; &nbsp; &nbsp; String driver = "com.mysql.jdbc.Driver";
&nbsp; &nbsp; &nbsp; &nbsp; String url = "jdbc:mysql://" + host + ":" + port + "/" + database + "?useUnicode=true&amp;characterEncoding=UTF-8";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection = MySQLUtil.getConnection(driver, url, username, password);

&nbsp; &nbsp; &nbsp; &nbsp; if (this.connection != null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String sql = "select ip,device_id from sys_device";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ps =connection.prepareStatement(sql);
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }

&nbsp; &nbsp; @Override
&nbsp; &nbsp; public void run(SourceContext<Map<String, Integer&gt;&gt; ctx) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; Map<String, Integer&gt; map = new HashMap<&gt;();
&nbsp; &nbsp; &nbsp; &nbsp; while (isRunning) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ResultSet resultSet = ps.executeQuery();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (resultSet.next()) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; map.put(resultSet.getString("ip"),resultSet.getInt("device_id"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("=======select alarm notify from mysql, size = {}, map = {}"+ map.size()+ map);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.collect(map);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; map.clear();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Thread.sleep(2000 * 60);
&nbsp; &nbsp; &nbsp; &nbsp; }

&nbsp; &nbsp; }

&nbsp; &nbsp; @Override
&nbsp; &nbsp; public void cancel() {
&nbsp; &nbsp; &nbsp; &nbsp; try {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; super.close();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (connection != null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.close();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (ps != null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ps.close();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; } catch (Exception e) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("runException:{}"+e);
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; isRunning = false;
&nbsp; &nbsp; }

发件人:&nbsp;"Michael Ran"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月4日(星期四) 下午5:22
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re:关于flinksql 与维表mysql的关联问题

放到open 方法里面可以吗?
在 2020-06-04 14:15:05,"小屁孩" <[hidden email]&gt; 写道:
&gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
Reply | Threaded
Open this post in threaded view

Re: 关于flinksql 与维表mysql的关联问题

godfrey he
hi 可以考虑使用 temporal table join [1]



小屁孩 <[hidden email]> 于2020年6月4日周四 下午5:51写道:

> 您的意思是open 全量预加载吗?我目前的逻辑是自己自定义的source 广播出去
> 这是我的source
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> import java.sql.Connection;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import java.util.HashMap;
> import java.util.Map;
> public class GetMysqlDvcId extends RichSourceFunction<Map<String,
> Integer&gt;&gt; {
> &nbsp; &nbsp; private Connection connection = null;
> &nbsp; &nbsp; private PreparedStatement ps = null;
> &nbsp; &nbsp; private volatile boolean isRunning = true;
> &nbsp; &nbsp; @Override
> &nbsp; &nbsp; public void open(Configuration parameters) throws Exception {
> &nbsp; &nbsp; &nbsp; &nbsp; super.open(parameters);
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String database="db_nssa";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String host="";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String password="saa!";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String port="3306";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String username="root";
> &nbsp; &nbsp; &nbsp; &nbsp; String driver = "com.mysql.jdbc.Driver";
> &nbsp; &nbsp; &nbsp; &nbsp; String url = "jdbc:mysql://" + host + ":" +
> port + "/" + database + "?useUnicode=true&amp;characterEncoding=UTF-8";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection =
> MySQLUtil.getConnection(driver, url, username, password);
> &nbsp; &nbsp; &nbsp; &nbsp; if (this.connection != null) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String sql = "select
> ip,device_id from sys_device";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ps
> =connection.prepareStatement(sql);
> &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; }
> &nbsp; &nbsp; @Override
> &nbsp; &nbsp; public void run(SourceContext<Map<String, Integer&gt;&gt;
> ctx) throws Exception {
> &nbsp; &nbsp; &nbsp; &nbsp; Map<String, Integer&gt; map = new
> HashMap<&gt;();
> &nbsp; &nbsp; &nbsp; &nbsp; while (isRunning) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ResultSet resultSet =
> ps.executeQuery();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (resultSet.next()) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> map.put(resultSet.getString("ip"),resultSet.getInt("device_id"));
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> System.out.println("=======select alarm notify from mysql, size = {}, map =
> {}"+ map.size()+ map);
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.collect(map);
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; map.clear();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Thread.sleep(2000 * 60);
> &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; }
> &nbsp; &nbsp; @Override
> &nbsp; &nbsp; public void cancel() {
> &nbsp; &nbsp; &nbsp; &nbsp; try {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; super.close();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (connection != null) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.close();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (ps != null) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ps.close();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; &nbsp; &nbsp; } catch (Exception e) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> System.out.println("runException:{}"+e);
> &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; &nbsp; &nbsp; isRunning = false;
> &nbsp; &nbsp; }
> }
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Michael Ran"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年6月4日(星期四) 下午5:22
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> 主题:&nbsp;Re:关于flinksql 与维表mysql的关联问题
> 放到open 方法里面可以吗?
> 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]&gt; 写道:
> &gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
Reply | Threaded
Open this post in threaded view

Re: 关于flinksql 与维表mysql的关联问题

Px New
In reply to this post by Michael Ran
Hi ,我有一个相关操作的一疑问.
疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?

Michael Ran <[hidden email]> 于2020年6月4日周四 下午5:22写道:

> 放到open 方法里面可以吗?
> 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]> 写道:
> >dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
Reply | Threaded
Open this post in threaded view

回复: 关于flinksql 与维表mysql的关联问题

Yichao Yang

可以使用open + broadcast的方式解决~

Yichao Yang

发件人:&nbsp;"Px New"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月6日(星期六) 上午9:50
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题

Hi ,我有一个相关操作的一疑问.
疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?

Michael Ran <[hidden email]&gt; 于2020年6月4日周四 下午5:22写道:

&gt; 放到open 方法里面可以吗?
&gt; 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]&gt; 写道:
&gt; &gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
&gt; 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
Reply | Threaded
Open this post in threaded view

Re: 关于flinksql 与维表mysql的关联问题

Px New
好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?

1048262223 <[hidden email]>于2020年6月7日 周日下午3:57写道:

> Hi
> 可以使用open + broadcast的方式解决~
> Best,
> Yichao Yang
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Px New"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年6月6日(星期六) 上午9:50
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> 主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题
> Hi ,我有一个相关操作的一疑问.
> 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
> Michael Ran <[hidden email]&gt; 于2020年6月4日周四 下午5:22写道:
> &gt; 放到open 方法里面可以吗?
> &gt; 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]&gt; 写道:
> &gt; &gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联
> 关于mysql更新的问题
> &gt;
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
> &gt;
Reply | Threaded
Open this post in threaded view

回复:关于flinksql 与维表mysql的关联问题

Yichao Yang


Yichao Yang


------------------ 原始邮件 ------------------
发件人: Px New <[hidden email]&gt;
发送时间: 2020年6月7日 19:03
收件人: user-zh <[hidden email]&gt;
主题: 回复:关于flinksql 与维表mysql的关联问题

好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?

1048262223 <[hidden email]&gt;于2020年6月7日 周日下午3:57写道:

&gt; Hi
&gt; 可以使用open + broadcast的方式解决~
&gt; Best,
&gt; Yichao Yang
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Px New"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月6日(星期六) 上午9:50
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt; 主题:&amp;nbsp;Re: 关于flinksql 与维表mysql的关联问题
&gt; Hi ,我有一个相关操作的一疑问.
&gt; 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
&gt; Michael Ran <[hidden email]&amp;gt; 于2020年6月4日周四 下午5:22写道:
&gt; &amp;gt; 放到open 方法里面可以吗?
&gt; &amp;gt; 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]&amp;gt; 写道:
&gt; &amp;gt; &amp;gt;dear:&amp;amp;nbsp; &amp;amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联
&gt; 关于mysql更新的问题
&gt; &amp;gt;
&gt; 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
&gt; &amp;gt;
Reply | Threaded
Open this post in threaded view

回复: 关于flinksql 与维表mysql的关联问题

In reply to this post by Px New
hi,目前我就是这样做的 数据在启动时会有数据先后到来的问题

发件人:&nbsp;"Px New"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月7日(星期天) 晚上7:02
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题

好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?

1048262223 <[hidden email]&gt;于2020年6月7日 周日下午3:57写道:

&gt; Hi
&gt; 可以使用open + broadcast的方式解决~
&gt; Best,
&gt; Yichao Yang
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Px New"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月6日(星期六) 上午9:50
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt; 主题:&amp;nbsp;Re: 关于flinksql 与维表mysql的关联问题
&gt; Hi ,我有一个相关操作的一疑问.
&gt; 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
&gt; Michael Ran <[hidden email]&amp;gt; 于2020年6月4日周四 下午5:22写道:
&gt; &amp;gt; 放到open 方法里面可以吗?
&gt; &amp;gt; 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]&amp;gt; 写道:
&gt; &amp;gt; &amp;gt;dear:&amp;amp;nbsp; &amp;amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联
&gt; 关于mysql更新的问题
&gt; &amp;gt;
&gt; 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
&gt; &amp;gt;
Reply | Threaded
Open this post in threaded view

Re: 关于flinksql 与维表mysql的关联问题

Hi ,
延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们考虑借助 Timer 来实现的,社区如果有这个功能的话,我觉得对于 Flink
我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。

[1] https://issues.apache.org/jira/browse/FLINK-19063


小屁孩 <[hidden email]> 于2020年6月8日周一 上午9:28写道:

> hi,目前我就是这样做的 数据在启动时会有数据先后到来的问题
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Px New"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年6月7日(星期天) 晚上7:02
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> 主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题
> 好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?
> 1048262223 <[hidden email]&gt;于2020年6月7日 周日下午3:57写道:
> &gt; Hi
> &gt;
> &gt;
> &gt; 可以使用open + broadcast的方式解决~
> &gt;
> &gt;
> &gt; Best,
> &gt; Yichao Yang
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:&amp;nbsp;"Px New"<[hidden email]&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2020年6月6日(星期六) 上午9:50
> &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: 关于flinksql 与维表mysql的关联问题
> &gt;
> &gt;
> &gt;
> &gt; Hi ,我有一个相关操作的一疑问.
> &gt; 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
> &gt;
> &gt; Michael Ran <[hidden email]&amp;gt; 于2020年6月4日周四 下午5:22写道:
> &gt;
> &gt; &amp;gt; 放到open 方法里面可以吗?
> &gt; &amp;gt; 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]&amp;gt; 写道:
> &gt; &amp;gt; &amp;gt;dear:&amp;amp;nbsp; &amp;amp;nbsp;
> 我有个问题想请教下,关于flinksql与mysql维表关联
> &gt; 关于mysql更新的问题
> &gt; &amp;gt;
> &gt;
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
> &gt; &amp;gt;