dear: 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
|
Hi:
我的理解是想达到延迟JOIN的目的吗,可以考虑利用WaterMark的maxoutoforderness ------------------ 原始邮件 ------------------ 发件人: "小屁孩"<[hidden email]>; 发送时间: 2020年6月4日(星期四) 下午2:15 收件人: "user-zh"<[hidden email]>; 主题: 关于flinksql 与维表mysql的关联问题 dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据 |
In reply to this post by 小屁孩
Hi
你是会有job重启操作吗,job取消时做savepoint重启时应该不会有这个问题? Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "小屁孩"<[hidden email]>; 发送时间: 2020年6月4日(星期四) 下午2:15 收件人: "user-zh"<[hidden email]>; 主题: 关于flinksql 与维表mysql的关联问题 dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据 |
In reply to this post by 小屁孩
放到open 方法里面可以吗?
在 2020-06-04 14:15:05,"小屁孩" <[hidden email]> 写道: >dear: 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据 |
您的意思是open 全量预加载吗?我目前的逻辑是自己自定义的source 广播出去
这是我的source import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.HashMap; import java.util.Map; public class GetMysqlDvcId extends RichSourceFunction<Map<String, Integer>> { private Connection connection = null; private PreparedStatement ps = null; private volatile boolean isRunning = true; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); String database="db_nssa"; String host="212.21.12.12"; String password="saa!"; String port="3306"; String username="root"; String driver = "com.mysql.jdbc.Driver"; String url = "jdbc:mysql://" + host + ":" + port + "/" + database + "?useUnicode=true&characterEncoding=UTF-8"; connection = MySQLUtil.getConnection(driver, url, username, password); if (this.connection != null) { String sql = "select ip,device_id from sys_device"; ps =connection.prepareStatement(sql); } } @Override public void run(SourceContext<Map<String, Integer>> ctx) throws Exception { Map<String, Integer> map = new HashMap<>(); while (isRunning) { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { map.put(resultSet.getString("ip"),resultSet.getInt("device_id")); } System.out.println("=======select alarm notify from mysql, size = {}, map = {}"+ map.size()+ map); ctx.collect(map); map.clear(); Thread.sleep(2000 * 60); } } @Override public void cancel() { try { super.close(); if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } catch (Exception e) { System.out.println("runException:{}"+e); } isRunning = false; } } ------------------ 原始邮件 ------------------ 发件人: "Michael Ran"<[hidden email]>; 发送时间: 2020年6月4日(星期四) 下午5:22 收件人: "user-zh"<[hidden email]>; 主题: Re:关于flinksql 与维表mysql的关联问题 放到open 方法里面可以吗? 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]> 写道: >dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据 |
hi 可以考虑使用 temporal table join [1]
Best, Godfrey [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#join-with-a-temporal-table 小屁孩 <[hidden email]> 于2020年6月4日周四 下午5:51写道: > 您的意思是open 全量预加载吗?我目前的逻辑是自己自定义的source 广播出去 > 这是我的source > > > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.functions.source.RichSourceFunction; > > > import java.sql.Connection; > import java.sql.PreparedStatement; > import java.sql.ResultSet; > import java.util.HashMap; > import java.util.Map; > > > public class GetMysqlDvcId extends RichSourceFunction<Map<String, > Integer>> { > > > private Connection connection = null; > private PreparedStatement ps = null; > private volatile boolean isRunning = true; > > > > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > String database="db_nssa"; > String host="212.21.12.12"; > String password="saa!"; > String port="3306"; > String username="root"; > > > > > String driver = "com.mysql.jdbc.Driver"; > String url = "jdbc:mysql://" + host + ":" + > port + "/" + database + "?useUnicode=true&characterEncoding=UTF-8"; > connection = > MySQLUtil.getConnection(driver, url, username, password); > > > > > if (this.connection != null) { > String sql = "select > ip,device_id from sys_device"; > ps > =connection.prepareStatement(sql); > } > } > > > @Override > public void run(SourceContext<Map<String, Integer>> > ctx) throws Exception { > Map<String, Integer> map = new > HashMap<>(); > while (isRunning) { > ResultSet resultSet = > ps.executeQuery(); > while (resultSet.next()) { > > map.put(resultSet.getString("ip"),resultSet.getInt("device_id")); > } > > System.out.println("=======select alarm notify from mysql, size = {}, map = > {}"+ map.size()+ map); > ctx.collect(map); > map.clear(); > Thread.sleep(2000 * 60); > } > > > } > > > > > @Override > public void cancel() { > try { > super.close(); > if (connection != null) { > connection.close(); > } > if (ps != null) { > ps.close(); > } > } catch (Exception e) { > > System.out.println("runException:{}"+e); > } > isRunning = false; > } > } > > ------------------ 原始邮件 ------------------ > 发件人: "Michael Ran"<[hidden email]>; > 发送时间: 2020年6月4日(星期四) 下午5:22 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re:关于flinksql 与维表mysql的关联问题 > > > > 放到open 方法里面可以吗? > 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]> 写道: > >dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 > 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据 |
In reply to this post by Michael Ran
Hi ,我有一个相关操作的一疑问.
疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢? Michael Ran <[hidden email]> 于2020年6月4日周四 下午5:22写道: > 放到open 方法里面可以吗? > 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]> 写道: > >dear: 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 > 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据 > |
Hi
可以使用open + broadcast的方式解决~ Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "Px New"<[hidden email]>; 发送时间: 2020年6月6日(星期六) 上午9:50 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于flinksql 与维表mysql的关联问题 Hi ,我有一个相关操作的一疑问. 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢? Michael Ran <[hidden email]> 于2020年6月4日周四 下午5:22写道: > 放到open 方法里面可以吗? > 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]> 写道: > >dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 > 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据 > |
好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?
1048262223 <[hidden email]>于2020年6月7日 周日下午3:57写道: > Hi > > > 可以使用open + broadcast的方式解决~ > > > Best, > Yichao Yang > > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Px New"<[hidden email]>; > 发送时间: 2020年6月6日(星期六) 上午9:50 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 关于flinksql 与维表mysql的关联问题 > > > > Hi ,我有一个相关操作的一疑问. > 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢? > > Michael Ran <[hidden email]> 于2020年6月4日周四 下午5:22写道: > > > 放到open 方法里面可以吗? > > 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]> 写道: > > >dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 > 关于mysql更新的问题 > > > 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据 > > |
Hi
是的。 Best, Yichao Yang 发自我的iPhone ------------------ 原始邮件 ------------------ 发件人: Px New <[hidden email]> 发送时间: 2020年6月7日 19:03 收件人: user-zh <[hidden email]> 主题: 回复:关于flinksql 与维表mysql的关联问题 好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作? 1048262223 <[hidden email]>于2020年6月7日 周日下午3:57写道: > Hi > > > 可以使用open + broadcast的方式解决~ > > > Best, > Yichao Yang > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Px New"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月6日(星期六) 上午9:50 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题 > > > > Hi ,我有一个相关操作的一疑问. > 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢? > > Michael Ran <[hidden email]&gt; 于2020年6月4日周四 下午5:22写道: > > &gt; 放到open 方法里面可以吗? > &gt; 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]&gt; 写道: > &gt; &gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 > 关于mysql更新的问题 > &gt; > 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据 > &gt; |
In reply to this post by Px New
hi,目前我就是这样做的 数据在启动时会有数据先后到来的问题
------------------ 原始邮件 ------------------ 发件人: "Px New"<[hidden email]>; 发送时间: 2020年6月7日(星期天) 晚上7:02 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于flinksql 与维表mysql的关联问题 好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作? 1048262223 <[hidden email]>于2020年6月7日 周日下午3:57写道: > Hi > > > 可以使用open + broadcast的方式解决~ > > > Best, > Yichao Yang > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Px New"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月6日(星期六) 上午9:50 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题 > > > > Hi ,我有一个相关操作的一疑问. > 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢? > > Michael Ran <[hidden email]&gt; 于2020年6月4日周四 下午5:22写道: > > &gt; 放到open 方法里面可以吗? > &gt; 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]&gt; 写道: > &gt; &gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 > 关于mysql更新的问题 > &gt; > 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据 > &gt; |
Hi ,
延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们考虑借助 Timer 来实现的,社区如果有这个功能的话,我觉得对于 Flink 使用方会有很大帮助的。 我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。 [1] https://issues.apache.org/jira/browse/FLINK-19063 Best, LakeShen 小屁孩 <[hidden email]> 于2020年6月8日周一 上午9:28写道: > hi,目前我就是这样做的 数据在启动时会有数据先后到来的问题 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Px New"<[hidden email]>; > 发送时间: 2020年6月7日(星期天) 晚上7:02 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 关于flinksql 与维表mysql的关联问题 > > > > 好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作? > > 1048262223 <[hidden email]>于2020年6月7日 周日下午3:57写道: > > > Hi > > > > > > 可以使用open + broadcast的方式解决~ > > > > > > Best, > > Yichao Yang > > > > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Px New"<[hidden email]&gt;; > > 发送时间:&nbsp;2020年6月6日(星期六) 上午9:50 > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > 主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题 > > > > > > > > Hi ,我有一个相关操作的一疑问. > > 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢? > > > > Michael Ran <[hidden email]&gt; 于2020年6月4日周四 下午5:22写道: > > > > &gt; 放到open 方法里面可以吗? > > &gt; 在 2020-06-04 14:15:05,"小屁孩" <[hidden email]&gt; 写道: > > &gt; &gt;dear:&amp;nbsp; &amp;nbsp; > 我有个问题想请教下,关于flinksql与mysql维表关联 > > 关于mysql更新的问题 > > &gt; > > > 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据 > > &gt; |
Free forum by Nabble | Edit this page |