kafka相关问题

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

kafka相关问题

咿咿呀呀
各位大佬好,请教一个问题:
利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl ,是否由于'update-mode' = 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。


table_ddl = """
CREATE TABLE table_ddl (
 trck_id VARCHAR
) WITH (
 'connector.type' = 'kafka',
 'connector.version' = 'universal',    
 'connector.topic' = 'w',    
 'connector.startup-mode' = 'group-offsets',
 'connector.properties.group.id' = 'trck_w', 
 'update-mode' = 'append',
 'connector.properties.zookeeper.connect' = '*',
 'connector.properties.bootstrap.servers' = '%#',
 'format.type' = 'json',     
 'format.derive-schema' = 'true' 
)
"""
Reply | Threaded
Open this post in threaded view
|

Re: kafka相关问题

Shengkai Fang
我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。
至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗?
我个人猜可能有两种方案:
1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始;
2.定期向文件系统写入数据。


小学生 <[hidden email]> 于2020年6月10日周三 下午2:48写道:

> 各位大佬好,请教一个问题:
> 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl&nbsp;,是否由于'update-mode' =
> 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。
>
>
> table_ddl = """
> CREATE TABLE table_ddl&nbsp;(
> &nbsp;trck_id VARCHAR
> ) WITH (
> &nbsp;'connector.type' = 'kafka',
> &nbsp;'connector.version' = 'universal',&nbsp; &nbsp;&nbsp;
> &nbsp;'connector.topic' = 'w',&nbsp; &nbsp;&nbsp;
> &nbsp;'connector.startup-mode' = 'group-offsets',
> &nbsp;'connector.properties.group.id' = 'trck_w',&nbsp;
> &nbsp;'update-mode' = 'append',
> &nbsp;'connector.properties.zookeeper.connect' = '*',
> &nbsp;'connector.properties.bootstrap.servers' = '%#',
> &nbsp;'format.type' = 'json',&nbsp; &nbsp; &nbsp;
> &nbsp;'format.derive-schema' = 'true'&nbsp;
> )
> """
Reply | Threaded
Open this post in threaded view
|

Re: kafka相关问题

咿咿呀呀
您好,我是通过select * from table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据)
Reply | Threaded
Open this post in threaded view
|

Re: kafka相关问题

Shengkai Fang
那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整.
如有错误欢迎指正

小学生 <[hidden email]> 于2020年6月10日周三 下午3:26写道:

> 您好,我是通过select * from
> table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from
> table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据)
Reply | Threaded
Open this post in threaded view
|

Re: kafka相关问题

咿咿呀呀
limit 没有用呀。有没有切实可行的方案呢,pyflink下。
Reply | Threaded
Open this post in threaded view
|

Re: kafka相关问题

Shengkai Fang
那你有没有尝试过修改connector中property中connector.startup-mode
设置为latest-offset,这样子每次从kafka读取都是读取最新的消息。
另外,我想问一下 你的sql是一直运行的吗?
我给的limit方案是一个upersert流。

小学生 <[hidden email]> 于2020年6月10日周三 下午5:31写道:

> limit 没有用呀。有没有切实可行的方案呢,pyflink下。
Reply | Threaded
Open this post in threaded view
|

Re: kafka相关问题

咿咿呀呀
改为最新的table_ddl表的容量也是不断上升的;select * from table_ddl这个是一直开着的
Reply | Threaded
Open this post in threaded view
|

Re: kafka相关问题

Jingsong Li
In reply to this post by Shengkai Fang
Hi, 小学生

你可以仔细描述下你的业务场景吗?然后再描述下问题,没懂到底是想要什么。

Best,
Jingsong Lee

On Wed, Jun 10, 2020 at 3:46 PM 方盛凯 <[hidden email]> wrote:

> 那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整.
> 如有错误欢迎指正
>
> 小学生 <[hidden email]> 于2020年6月10日周三 下午3:26写道:
>
> > 您好,我是通过select * from
> > table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from
> > table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据)
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: kafka相关问题

咿咿呀呀
业务场景是我需要实时的kafka一条消息记录,从里边取一个字段的值,根据这个值去mysql的表中去筛选出这个值下的记录数,类似与where的条件,所以每次我只希望能得到kafka实时消息里的记录值。
kcz
Reply | Threaded
Open this post in threaded view
|

回复:kafka相关问题

kcz
你这个表达,实时kafka的一条记录,你要最新的那个是吧,你最新的判断标准是什么?根据什么特性来,表达清楚一点哇。





------------------ 原始邮件 ------------------
发件人: 小学生 <[hidden email]&gt;
发送时间: 2020年6月10日 18:15
收件人: user-zh <[hidden email]&gt;
主题: 回复:kafka相关问题



业务场景是我需要实时的kafka一条消息记录,从里边取一个字段的值,根据这个值去mysql的表中去筛选出这个值下的记录数,类似与where的条件,所以每次我只希望能得到kafka实时消息里的记录值。
Reply | Threaded
Open this post in threaded view
|

Re: kafka相关问题

咿咿呀呀
In reply to this post by Jingsong Li
最新一条的标准不就是kafka实时推送的那一条吗,推送时间
Reply | Threaded
Open this post in threaded view
|

Re: kafka相关问题

359502980@qq.com
In reply to this post by Shengkai Fang
hi,可以取最新的一条数据:

select id, last_value(value) over (partition by id order by id range between 1 prodding and current row ) as cur_value  from table_ddl
通过分组分组获取最新的一条数据。
具体可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html>

> 在 2020年6月10日,下午3:18,方盛凯 <[hidden email]> 写道:
>
> 我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。
> 至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗?
> 我个人猜可能有两种方案:
> 1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始;
> 2.定期向文件系统写入数据。
>
>
> 小学生 <[hidden email]> 于2020年6月10日周三 下午2:48写道:
>
>> 各位大佬好,请教一个问题:
>> 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl&nbsp;,是否由于'update-mode' =
>> 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。
>>
>>
>> table_ddl = """
>> CREATE TABLE table_ddl&nbsp;(
>> &nbsp;trck_id VARCHAR
>> ) WITH (
>> &nbsp;'connector.type' = 'kafka',
>> &nbsp;'connector.version' = 'universal',&nbsp; &nbsp;&nbsp;
>> &nbsp;'connector.topic' = 'w',&nbsp; &nbsp;&nbsp;
>> &nbsp;'connector.startup-mode' = 'group-offsets',
>> &nbsp;'connector.properties.group.id' = 'trck_w',&nbsp;
>> &nbsp;'update-mode' = 'append',
>> &nbsp;'connector.properties.zookeeper.connect' = '*',
>> &nbsp;'connector.properties.bootstrap.servers' = '%#',
>> &nbsp;'format.type' = 'json',&nbsp; &nbsp; &nbsp;
>> &nbsp;'format.derive-schema' = 'true'&nbsp;
>> )
>> """

Reply | Threaded
Open this post in threaded view
|

Re: kafka相关问题

Jark
Administrator
In reply to this post by Shengkai Fang
Hi,

我的理解是你想要获取 kafka 里面的最新一条数据,然后就结束?
类似于 kafka 的命令?
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic xxx
--max-messages 1

在 Flink 里面表达出来就是 select * from kafka limit 1 的批处理结果,只不过现在这个 query
会一直运行(流模式),不会结束。

Best,
Jarrk

On Wed, 10 Jun 2020 at 21:44, Mikey <[hidden email]> wrote:

> hi,可以取最新的一条数据:
>
> select id, last_value(value) over (partition by id order by id range
> between 1 prodding and current row ) as cur_value  from table_ddl
> 通过分组分组获取最新的一条数据。
> 具体可参考:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html
> >
>
> > 在 2020年6月10日,下午3:18,方盛凯 <[hidden email]> 写道:
> >
> > 我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。
> > 至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗?
> > 我个人猜可能有两种方案:
> >
> 1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始;
> > 2.定期向文件系统写入数据。
> >
> >
> > 小学生 <[hidden email]> 于2020年6月10日周三 下午2:48写道:
> >
> >> 各位大佬好,请教一个问题:
> >> 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl&nbsp;,是否由于'update-mode' =
> >>
> 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。
> >>
> >>
> >> table_ddl = """
> >> CREATE TABLE table_ddl&nbsp;(
> >> &nbsp;trck_id VARCHAR
> >> ) WITH (
> >> &nbsp;'connector.type' = 'kafka',
> >> &nbsp;'connector.version' = 'universal',&nbsp; &nbsp;&nbsp;
> >> &nbsp;'connector.topic' = 'w',&nbsp; &nbsp;&nbsp;
> >> &nbsp;'connector.startup-mode' = 'group-offsets',
> >> &nbsp;'connector.properties.group.id' = 'trck_w',&nbsp;
> >> &nbsp;'update-mode' = 'append',
> >> &nbsp;'connector.properties.zookeeper.connect' = '*',
> >> &nbsp;'connector.properties.bootstrap.servers' = '%#',
> >> &nbsp;'format.type' = 'json',&nbsp; &nbsp; &nbsp;
> >> &nbsp;'format.derive-schema' = 'true'&nbsp;
> >> )
> >> """
>
>
Reply | Threaded
Open this post in threaded view
|

Re: kafka相关问题

咿咿呀呀
谢谢Mikey大佬的解答,我试试看