pyflink 如何正确设置高速度?(如何提速)

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

pyflink 如何正确设置高速度?(如何提速)

洗你的头
尊敬的开发者您好,
我的需求是这样的,
拥有数据:
现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id)
需要做什么?
将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。
我现在的代码如下:
import pandas as pd
import numpy as np
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
import os
import time
# 环境等设置,目前使用的并行数为1,batchsize为10万(我不知道这个有没有用)

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size", '100000')
# 输出表创建
if os.path.exists('output'):
    os.remove('output')

t_env.connect(FileSystem().path('output')) \
    .with_format(OldCsv()
                 .field('id', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('id', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')
# 交叉口经纬度数据读取
data = pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv')
coor_o = pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'], data['O_Y'])))).T
coor_d = pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'], data['D_Y'])))).T
coor = coor_o.append(coor_d).drop_duplicates()
coor.columns = ['lng', 'lat']
coor = coor.sort_index()
coor = coor.to_numpy()
# udf编写与注册



@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
     DataTypes.ARRAY(DataTypes.FLOAT()), DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
def distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]):
    temp = (np.sin((lng2-lng1)/2*np.pi/180)**2+ 
        +np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
    distance = 2*np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    distance = distance*3958.8*1609.344

    buffer=100
&nbsp;&nbsp;&nbsp; if&nbsp;(distance <=&nbsp;buffer).sum() &gt;&nbsp;0:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;distance.argmin()
&nbsp;&nbsp;&nbsp; else:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;-1
# 出行起点数据读取

df =&nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
use_data =&nbsp;df[['pickup_longitude', 'pickup_latitude']]
# 处理流程
t_env.from_pandas(use_data) \
&nbsp;&nbsp;&nbsp;&nbsp; .select("distance_meters(pickup_longitude, pickup_latitude)") \
&nbsp;&nbsp;&nbsp;&nbsp; .insert_into('mySink')
# 执行与计时

start_time =&nbsp;time.time()
t_env.execute("tutorial_job")
print(time.time() -&nbsp;start_time)
我电脑的CPU为12核24线程










目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧.. 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢)
请问,我这种情况应该如何去提速呢?可否解释一下batch.size?
期待您的回答,感谢!
Reply | Threaded
Open this post in threaded view
|

Re: pyflink 如何正确设置高速度?(如何提速)

Xingbo Huang
Hi,

1. from_pandas性能不太好的,不是用在生产上的。你可以直接用flink的csv的connector来读取你的数据呀。
2. arrow.batch.size,表示的是会把多少条数据变成一个pandas.series,然后作为你的udf的一个列传给你

Best,
Xingbo

洗你的头 <[hidden email]> 于2020年10月26日周一 下午4:32写道:

> 尊敬的开发者您好,
> 我的需求是这样的,
> 拥有数据:
> 现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id)
> 需要做什么?
> 将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。
> 我现在的代码如下:
> import&nbsp;pandas as&nbsp;pd
> import&nbsp;numpy as&nbsp;np
> from&nbsp;pyflink.datastream import&nbsp;StreamExecutionEnvironment
> from&nbsp;pyflink.table import&nbsp;StreamTableEnvironment, DataTypes
> from&nbsp;pyflink.table.descriptors import&nbsp;Schema, OldCsv, FileSystem
> from&nbsp;pyflink.table.udf import&nbsp;udf
> import&nbsp;os
> import&nbsp;time
> # 环境等设置,目前使用的并行数为1,batchsize为10万(我不知道这个有没有用)
>
> env =&nbsp;StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env =&nbsp;StreamTableEnvironment.create(env)
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '80m')
> t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",
> '100000')
> # 输出表创建
> if&nbsp;os.path.exists('output'):
> &nbsp;&nbsp;&nbsp; os.remove('output')
>
> t_env.connect(FileSystem().path('output')) \
> &nbsp;&nbsp;&nbsp; .with_format(OldCsv()
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field('id', DataTypes.BIGINT())) \
> &nbsp;&nbsp;&nbsp; .with_schema(Schema()
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .field('id', DataTypes.BIGINT())) \
> &nbsp;&nbsp;&nbsp; .create_temporary_table('mySink')
> # 交叉口经纬度数据读取
> data =&nbsp;pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv')
> coor_o =&nbsp;pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'],
> data['O_Y'])))).T
> coor_d =&nbsp;pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'],
> data['D_Y'])))).T
> coor =&nbsp;coor_o.append(coor_d).drop_duplicates()
> coor.columns =&nbsp;['lng', 'lat']
> coor =&nbsp;coor.sort_index()
> coor =&nbsp;coor.to_numpy()
> # udf编写与注册
>
>
>
> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
> &nbsp;&nbsp;&nbsp;&nbsp; DataTypes.ARRAY(DataTypes.FLOAT()),
> DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
> def&nbsp;distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]):
> &nbsp;&nbsp;&nbsp; temp =&nbsp;(np.sin((lng2-lng1)/2*np.pi/180)**2+&nbsp;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> +np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
> &nbsp;&nbsp;&nbsp; distance =&nbsp;2*np.arctan2(np.sqrt(temp),
> np.sqrt(1-temp))
> &nbsp;&nbsp;&nbsp; distance =&nbsp;distance*3958.8*1609.344
>
> &nbsp;&nbsp;&nbsp; buffer=100
> &nbsp;&nbsp;&nbsp; if&nbsp;(distance <=&nbsp;buffer).sum() &gt;&nbsp;0:
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;distance.argmin()
> &nbsp;&nbsp;&nbsp; else:
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return&nbsp;-1
> # 出行起点数据读取
>
> df =&nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
> use_data =&nbsp;df[['pickup_longitude', 'pickup_latitude']]
> # 处理流程
> t_env.from_pandas(use_data) \
> &nbsp;&nbsp;&nbsp;&nbsp; .select("distance_meters(pickup_longitude,
> pickup_latitude)") \
> &nbsp;&nbsp;&nbsp;&nbsp; .insert_into('mySink')
> # 执行与计时
>
> start_time =&nbsp;time.time()
> t_env.execute("tutorial_job")
> print(time.time() -&nbsp;start_time)
> 我电脑的CPU为12核24线程
>
>
>
>
>
>
>
>
>
>
> 目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧..
> 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢)
> 请问,我这种情况应该如何去提速呢?可否解释一下batch.size?
> 期待您的回答,感谢!
Reply | Threaded
Open this post in threaded view
|

回复: pyflink 如何正确设置高速度?(如何提速)

洗你的头
感谢您的解答,原来from_pandas的性能会差点哦,我明天会改一下读取的方式

然后我尝试了设置并行数为8,使用400万数据测试了一下,env.set_parallelism(8),400万的数据耗时耗时12分钟,感觉是比之前快了点


1.但是结果是在output的文件夹内生成8个文件,但是只有文件1有数据,这样是正常的吗?检查了一下,好像顺序没有改变,与原顺序一致,怎样设置可以将其按照原顺序保存为1个文件呢?


2.arrow.batch.size的意思经过您的细心解答我理解了,那么增大arrow.batch.size也是可以加快处理速度吗?


3.我应该如何确定该使用多大的并行数和多大arrow.batch.size呢?还是说这是一个经验的做法?需要多次尝试?


4.我的电脑是12核24线程的CPU,如果我不设置并行数,那么默认就是并行数12吗?

最后,再次感谢您的细心解答,祝您工作顺利,身体健康!我的问题可能比较多,并且比较初级,真的十分感谢您能细心回答,对我的帮助太大了。


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年10月26日(星期一) 晚上8:47
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: pyflink 如何正确设置高速度?(如何提速)



Hi,

1. from_pandas性能不太好的,不是用在生产上的。你可以直接用flink的csv的connector来读取你的数据呀。
2. arrow.batch.size,表示的是会把多少条数据变成一个pandas.series,然后作为你的udf的一个列传给你

Best,
Xingbo

洗你的头 <[hidden email]&gt; 于2020年10月26日周一 下午4:32写道:

&gt; 尊敬的开发者您好,
&gt; 我的需求是这样的,
&gt; 拥有数据:
&gt; 现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id)
&gt; 需要做什么?
&gt; 将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。
&gt; 我现在的代码如下:
&gt; import&amp;nbsp;pandas as&amp;nbsp;pd
&gt; import&amp;nbsp;numpy as&amp;nbsp;np
&gt; from&amp;nbsp;pyflink.datastream import&amp;nbsp;StreamExecutionEnvironment
&gt; from&amp;nbsp;pyflink.table import&amp;nbsp;StreamTableEnvironment, DataTypes
&gt; from&amp;nbsp;pyflink.table.descriptors import&amp;nbsp;Schema, OldCsv, FileSystem
&gt; from&amp;nbsp;pyflink.table.udf import&amp;nbsp;udf
&gt; import&amp;nbsp;os
&gt; import&amp;nbsp;time
&gt; # 环境等设置,目前使用的并行数为1,batchsize为10万(我不知道这个有没有用)
&gt;
&gt; env =&amp;nbsp;StreamExecutionEnvironment.get_execution_environment()
&gt; env.set_parallelism(1)
&gt; t_env =&amp;nbsp;StreamTableEnvironment.create(env)
&gt; t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
&gt; '80m')
&gt; t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",
&gt; '100000')
&gt; # 输出表创建
&gt; if&amp;nbsp;os.path.exists('output'):
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; os.remove('output')
&gt;
&gt; t_env.connect(FileSystem().path('output')) \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .with_format(OldCsv()
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .field('id', DataTypes.BIGINT())) \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .with_schema(Schema()
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .field('id', DataTypes.BIGINT())) \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .create_temporary_table('mySink')
&gt; # 交叉口经纬度数据读取
&gt; data =&amp;nbsp;pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv')
&gt; coor_o =&amp;nbsp;pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'],
&gt; data['O_Y'])))).T
&gt; coor_d =&amp;nbsp;pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'],
&gt; data['D_Y'])))).T
&gt; coor =&amp;nbsp;coor_o.append(coor_d).drop_duplicates()
&gt; coor.columns =&amp;nbsp;['lng', 'lat']
&gt; coor =&amp;nbsp;coor.sort_index()
&gt; coor =&amp;nbsp;coor.to_numpy()
&gt; # udf编写与注册
&gt;
&gt;
&gt;
&gt; @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; DataTypes.ARRAY(DataTypes.FLOAT()),
&gt; DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
&gt; def&amp;nbsp;distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]):
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; temp =&amp;nbsp;(np.sin((lng2-lng1)/2*np.pi/180)**2+&amp;nbsp;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; +np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; distance =&amp;nbsp;2*np.arctan2(np.sqrt(temp),
&gt; np.sqrt(1-temp))
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; distance =&amp;nbsp;distance*3958.8*1609.344
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; buffer=100
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; if&amp;nbsp;(distance <=&amp;nbsp;buffer).sum() &amp;gt;&amp;nbsp;0:
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; return&amp;nbsp;distance.argmin()
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; else:
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; return&amp;nbsp;-1
&gt; # 出行起点数据读取
&gt;
&gt; df =&amp;nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
&gt; use_data =&amp;nbsp;df[['pickup_longitude', 'pickup_latitude']]
&gt; # 处理流程
&gt; t_env.from_pandas(use_data) \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .select("distance_meters(pickup_longitude,
&gt; pickup_latitude)") \
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .insert_into('mySink')
&gt; # 执行与计时
&gt;
&gt; start_time =&amp;nbsp;time.time()
&gt; t_env.execute("tutorial_job")
&gt; print(time.time() -&amp;nbsp;start_time)
&gt; 我电脑的CPU为12核24线程
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; 目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧..
&gt; 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢)
&gt; 请问,我这种情况应该如何去提速呢?可否解释一下batch.size?
&gt; 期待您的回答,感谢!
Reply | Threaded
Open this post in threaded view
|

Re: pyflink 如何正确设置高速度?(如何提速)

Xingbo Huang
Hi,
>>>
1.但是结果是在output的文件夹内生成8个文件,但是只有文件1有数据,这样是正常的吗?检查了一下,好像顺序没有改变,与原顺序一致,怎样设置可以将其按照原顺序保存为1个文件呢?
flink的table作业目前没法单独为每一个算子设置并发度,所以你设置并发度为8,就会输出8个文件。我觉得你这数据量不大,本质还是from_pandas的问题,你先把它换了,先用一个并发度玩就行。

>>> 2.arrow.batch.size的意思经过您的细心解答我理解了,那么增大arrow.batch.size也是可以加快处理速度吗?
其实跑pandas
udf的模型是有一个java进程和对应一个Python进程,你的udf在Python进程跑着,数据从Java进程批量发送过去,一次发送多少数据是由python.fn-execution.bundle.size这个配置控制的,对于pandas
udf来说,因为需要把这个数据组织成pandas.series,所以还会有这个配置python.fn-execution.arrow.batch.size。举个例子就是说比如python.fn-execution.bundle.size=6,python.fn-execution.arrow.batch.size=2,那么就是我就会把6条数据,组织称3个pandas.series一次性发送到Python进程,一个pandas.series会调用一次的pandas
udf。所以这里就是调用3次。很明显了,你提高arrow.batch.size的好处是,一个是你组成的pandas.series的数量会更少,很显然每个pandas.series都是有meta信息放在数据头部,越少的pandas.series,那么你传送的数据少一点,通信开销会少一点,另一个好处是你调用udf的次数会减少。当然了你的python.fn-execution.arrow.batch.size是没法超过python.fn-execution.bundle.size,至于说不断增大python.fn-execution.bundle.size是不是就一定是好的也不一定,太大显然你是要buffer数据的,会增大延迟的,而且这时候python进程是空闲的,没有充分调度起来。
关于这两个参数的配置你可以参考文档[1]

>>> 3.我应该如何确定该使用多大的并行数和多大arrow.batch.size呢?还是说这是一个经验的做法?需要多次尝试?
调节参数大小,你要根据你具体作业来调节。一般来说我们提供的默认值都是较优的,不需要调节。

>>> 4.我的电脑是12核24线程的CPU,如果我不设置并行数,那么默认就是并行数12吗?
你这应该是24个。你可以通过env.get_parallelism()拿到

我说的可能有点多,希望对你有所帮助。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_config.html#python-options

Best,
Xingbo



洗你的头 <[hidden email]> 于2020年10月26日周一 下午11:04写道:

> 感谢您的解答,原来from_pandas的性能会差点哦,我明天会改一下读取的方式
>
>
> 然后我尝试了设置并行数为8,使用400万数据测试了一下,env.set_parallelism(8),400万的数据耗时耗时12分钟,感觉是比之前快了点
>
>
>
> 1.但是结果是在output的文件夹内生成8个文件,但是只有文件1有数据,这样是正常的吗?检查了一下,好像顺序没有改变,与原顺序一致,怎样设置可以将其按照原顺序保存为1个文件呢?
>
>
> 2.arrow.batch.size的意思经过您的细心解答我理解了,那么增大arrow.batch.size也是可以加快处理速度吗?
>
>
> 3.我应该如何确定该使用多大的并行数和多大arrow.batch.size呢?还是说这是一个经验的做法?需要多次尝试?
>
>
> 4.我的电脑是12核24线程的CPU,如果我不设置并行数,那么默认就是并行数12吗?
>
> 最后,再次感谢您的细心解答,祝您工作顺利,身体健康!我的问题可能比较多,并且比较初级,真的十分感谢您能细心回答,对我的帮助太大了。
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> [hidden email]&gt;;
> 发送时间:&nbsp;2020年10月26日(星期一) 晚上8:47
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: pyflink 如何正确设置高速度?(如何提速)
>
>
>
> Hi,
>
> 1. from_pandas性能不太好的,不是用在生产上的。你可以直接用flink的csv的connector来读取你的数据呀。
> 2. arrow.batch.size,表示的是会把多少条数据变成一个pandas.series,然后作为你的udf的一个列传给你
>
> Best,
> Xingbo
>
> 洗你的头 <[hidden email]&gt; 于2020年10月26日周一 下午4:32写道:
>
> &gt; 尊敬的开发者您好,
> &gt; 我的需求是这样的,
> &gt; 拥有数据:
> &gt;
> 现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id)
> &gt; 需要做什么?
> &gt;
> 将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。
> &gt; 我现在的代码如下:
> &gt; import&amp;nbsp;pandas as&amp;nbsp;pd
> &gt; import&amp;nbsp;numpy as&amp;nbsp;np
> &gt; from&amp;nbsp;pyflink.datastream
> import&amp;nbsp;StreamExecutionEnvironment
> &gt; from&amp;nbsp;pyflink.table import&amp;nbsp;StreamTableEnvironment,
> DataTypes
> &gt; from&amp;nbsp;pyflink.table.descriptors import&amp;nbsp;Schema,
> OldCsv, FileSystem
> &gt; from&amp;nbsp;pyflink.table.udf import&amp;nbsp;udf
> &gt; import&amp;nbsp;os
> &gt; import&amp;nbsp;time
> &gt; # 环境等设置,目前使用的并行数为1,batchsize为10万(我不知道这个有没有用)
> &gt;
> &gt; env =&amp;nbsp;StreamExecutionEnvironment.get_execution_environment()
> &gt; env.set_parallelism(1)
> &gt; t_env =&amp;nbsp;StreamTableEnvironment.create(env)
> &gt;
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> &gt; '80m')
> &gt;
> t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size",
> &gt; '100000')
> &gt; # 输出表创建
> &gt; if&amp;nbsp;os.path.exists('output'):
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; os.remove('output')
> &gt;
> &gt; t_env.connect(FileSystem().path('output')) \
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .with_format(OldCsv()
> &gt;
> &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; .field('id', DataTypes.BIGINT())) \
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .with_schema(Schema()
> &gt;
> &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; .field('id', DataTypes.BIGINT())) \
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; .create_temporary_table('mySink')
> &gt; # 交叉口经纬度数据读取
> &gt; data =&amp;nbsp;pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv')
> &gt; coor_o =&amp;nbsp;pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'],
> &gt; data['O_Y'])))).T
> &gt; coor_d =&amp;nbsp;pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'],
> &gt; data['D_Y'])))).T
> &gt; coor =&amp;nbsp;coor_o.append(coor_d).drop_duplicates()
> &gt; coor.columns =&amp;nbsp;['lng', 'lat']
> &gt; coor =&amp;nbsp;coor.sort_index()
> &gt; coor =&amp;nbsp;coor.to_numpy()
> &gt; # udf编写与注册
> &gt;
> &gt;
> &gt;
> &gt; @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> DataTypes.ARRAY(DataTypes.FLOAT()),
> &gt; DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT())
> &gt; def&amp;nbsp;distance_meters(lng1, lat1, lng2=coor[:, 0],
> lat2=coor[:, 1]):
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; temp
> =&amp;nbsp;(np.sin((lng2-lng1)/2*np.pi/180)**2+&amp;nbsp;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt;
> +np.cos(lng1*np.pi/180)*np.cos(lng2*np.pi/180)*np.sin((lat2-lat1)/2*np.pi/180)**2)
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; distance
> =&amp;nbsp;2*np.arctan2(np.sqrt(temp),
> &gt; np.sqrt(1-temp))
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; distance
> =&amp;nbsp;distance*3958.8*1609.344
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; buffer=100
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; if&amp;nbsp;(distance
> <=&amp;nbsp;buffer).sum() &amp;gt;&amp;nbsp;0:
> &gt;
> &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> return&amp;nbsp;distance.argmin()
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; else:
> &gt;
> &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> return&amp;nbsp;-1
> &gt; # 出行起点数据读取
> &gt;
> &gt; df =&amp;nbsp;pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv')
> &gt; use_data =&amp;nbsp;df[['pickup_longitude', 'pickup_latitude']]
> &gt; # 处理流程
> &gt; t_env.from_pandas(use_data) \
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> .select("distance_meters(pickup_longitude,
> &gt; pickup_latitude)") \
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; .insert_into('mySink')
> &gt; # 执行与计时
> &gt;
> &gt; start_time =&amp;nbsp;time.time()
> &gt; t_env.execute("tutorial_job")
> &gt; print(time.time() -&amp;nbsp;start_time)
> &gt; 我电脑的CPU为12核24线程
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> 目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧..
> &gt; 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢)
> &gt; 请问,我这种情况应该如何去提速呢?可否解释一下batch.size?
> &gt; 期待您的回答,感谢!