各位大佬好,请教一个问题:
利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl ,是否由于'update-mode' = 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。 table_ddl = """ CREATE TABLE table_ddl ( trck_id VARCHAR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'w', 'connector.startup-mode' = 'group-offsets', 'connector.properties.group.id' = 'trck_w', 'update-mode' = 'append', 'connector.properties.zookeeper.connect' = '*', 'connector.properties.bootstrap.servers' = '%#', 'format.type' = 'json', 'format.derive-schema' = 'true' ) """ |
我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。
至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗? 我个人猜可能有两种方案: 1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始; 2.定期向文件系统写入数据。 小学生 <[hidden email]> 于2020年6月10日周三 下午2:48写道: > 各位大佬好,请教一个问题: > 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl ,是否由于'update-mode' = > 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。 > > > table_ddl = """ > CREATE TABLE table_ddl ( > trck_id VARCHAR > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'w', > 'connector.startup-mode' = 'group-offsets', > 'connector.properties.group.id' = 'trck_w', > 'update-mode' = 'append', > 'connector.properties.zookeeper.connect' = '*', > 'connector.properties.bootstrap.servers' = '%#', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ) > """ |
您好,我是通过select * from table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据)
|
那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整.
如有错误欢迎指正 小学生 <[hidden email]> 于2020年6月10日周三 下午3:26写道: > 您好,我是通过select * from > table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from > table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据) |
limit 没有用呀。有没有切实可行的方案呢,pyflink下。
|
那你有没有尝试过修改connector中property中connector.startup-mode
设置为latest-offset,这样子每次从kafka读取都是读取最新的消息。 另外,我想问一下 你的sql是一直运行的吗? 我给的limit方案是一个upersert流。 小学生 <[hidden email]> 于2020年6月10日周三 下午5:31写道: > limit 没有用呀。有没有切实可行的方案呢,pyflink下。 |
改为最新的table_ddl表的容量也是不断上升的;select * from table_ddl这个是一直开着的
|
In reply to this post by Shengkai Fang
Hi, 小学生
你可以仔细描述下你的业务场景吗?然后再描述下问题,没懂到底是想要什么。 Best, Jingsong Lee On Wed, Jun 10, 2020 at 3:46 PM 方盛凯 <[hidden email]> wrote: > 那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整. > 如有错误欢迎指正 > > 小学生 <[hidden email]> 于2020年6月10日周三 下午3:26写道: > > > 您好,我是通过select * from > > table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from > > table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据) > -- Best, Jingsong Lee |
业务场景是我需要实时的kafka一条消息记录,从里边取一个字段的值,根据这个值去mysql的表中去筛选出这个值下的记录数,类似与where的条件,所以每次我只希望能得到kafka实时消息里的记录值。
|
你这个表达,实时kafka的一条记录,你要最新的那个是吧,你最新的判断标准是什么?根据什么特性来,表达清楚一点哇。
------------------ 原始邮件 ------------------ 发件人: 小学生 <[hidden email]> 发送时间: 2020年6月10日 18:15 收件人: user-zh <[hidden email]> 主题: 回复:kafka相关问题 业务场景是我需要实时的kafka一条消息记录,从里边取一个字段的值,根据这个值去mysql的表中去筛选出这个值下的记录数,类似与where的条件,所以每次我只希望能得到kafka实时消息里的记录值。 |
In reply to this post by Jingsong Li
最新一条的标准不就是kafka实时推送的那一条吗,推送时间
|
In reply to this post by Shengkai Fang
hi,可以取最新的一条数据:
select id, last_value(value) over (partition by id order by id range between 1 prodding and current row ) as cur_value from table_ddl 通过分组分组获取最新的一条数据。 具体可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html> > 在 2020年6月10日,下午3:18,方盛凯 <[hidden email]> 写道: > > 我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。 > 至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗? > 我个人猜可能有两种方案: > 1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始; > 2.定期向文件系统写入数据。 > > > 小学生 <[hidden email]> 于2020年6月10日周三 下午2:48写道: > >> 各位大佬好,请教一个问题: >> 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl ,是否由于'update-mode' = >> 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。 >> >> >> table_ddl = """ >> CREATE TABLE table_ddl ( >> trck_id VARCHAR >> ) WITH ( >> 'connector.type' = 'kafka', >> 'connector.version' = 'universal', >> 'connector.topic' = 'w', >> 'connector.startup-mode' = 'group-offsets', >> 'connector.properties.group.id' = 'trck_w', >> 'update-mode' = 'append', >> 'connector.properties.zookeeper.connect' = '*', >> 'connector.properties.bootstrap.servers' = '%#', >> 'format.type' = 'json', >> 'format.derive-schema' = 'true' >> ) >> """ |
Administrator
|
In reply to this post by Shengkai Fang
Hi,
我的理解是你想要获取 kafka 里面的最新一条数据,然后就结束? 类似于 kafka 的命令? ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic xxx --max-messages 1 在 Flink 里面表达出来就是 select * from kafka limit 1 的批处理结果,只不过现在这个 query 会一直运行(流模式),不会结束。 Best, Jarrk On Wed, 10 Jun 2020 at 21:44, Mikey <[hidden email]> wrote: > hi,可以取最新的一条数据: > > select id, last_value(value) over (partition by id order by id range > between 1 prodding and current row ) as cur_value from table_ddl > 通过分组分组获取最新的一条数据。 > 具体可参考: > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html > < > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html > > > > > 在 2020年6月10日,下午3:18,方盛凯 <[hidden email]> 写道: > > > > 我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。 > > 至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗? > > 我个人猜可能有两种方案: > > > 1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始; > > 2.定期向文件系统写入数据。 > > > > > > 小学生 <[hidden email]> 于2020年6月10日周三 下午2:48写道: > > > >> 各位大佬好,请教一个问题: > >> 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl ,是否由于'update-mode' = > >> > 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。 > >> > >> > >> table_ddl = """ > >> CREATE TABLE table_ddl ( > >> trck_id VARCHAR > >> ) WITH ( > >> 'connector.type' = 'kafka', > >> 'connector.version' = 'universal', > >> 'connector.topic' = 'w', > >> 'connector.startup-mode' = 'group-offsets', > >> 'connector.properties.group.id' = 'trck_w', > >> 'update-mode' = 'append', > >> 'connector.properties.zookeeper.connect' = '*', > >> 'connector.properties.bootstrap.servers' = '%#', > >> 'format.type' = 'json', > >> 'format.derive-schema' = 'true' > >> ) > >> """ > > |
Free forum by Nabble | Edit this page |