pyflink读取csv源表时,如何跳过标题行?如何选取特定的列?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

pyflink读取csv源表时,如何跳过标题行?如何选取特定的列?

洗你的头
尊敬的开发者您好,
我读取csv数据的源代码如下:
t_env.register_table_source("mySource",
                               CsvTableSource(r'data\trip\yellow_tripdata_2014-01.csv',
                               ['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
                                'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
                                'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude',
                                'payment_type', 'fare_amount', 'surcharge', 'mta_tax', 'tip_amount',
                                'tolls_amount', 'total_amount'],                                   
                               [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.BIGINT(),
                                DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.BIGINT(),
                                DataTypes.STRING(), DataTypes.FLOAT(), DataTypes.FLOAT(),
                                DataTypes.STRING(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(),
                                DataTypes.FLOAT(), DataTypes.FLOAT()])
)
我这里使用的CsvTableSource的方法,该如何跳过原数据中的标题行呢?同时我只想读取'pickup_longitude', 'pickup_latitude','dropoff_longitude', 'dropoff_latitude'这四列,该如何操作?
该种方法与 connect的OldCsv和Schema方法有什么区别?如果使用connect的方法应该怎样跳过标题行,并选取特定的列呢?
还是说只能在保存原数据表的时候去掉标题行?
期待您的解答。
Reply | Threaded
Open this post in threaded view
|

Re: pyflink读取csv源表时,如何跳过标题行?如何选取特定的列?

Xingbo Huang
Hi,
1. CsvTableSource的构造方法里面有参数ignore_first_line帮你跳过首行的标题,你可以查看一下。
2.
只想读取那四列应该没办法,主要在于你那几个列不是头部的几个列,比如10列的数据,你要前四列,那是可以的,因为正常读一行数据进来,我解析完前四列就行了,剩下可以不解析,可是要是你的列是1,3,5,7,9这样的,你不指定2,4,6,8列的类型,根本没法帮你把一行的数据给解析出来。

Best,
XIngbo

洗你的头 <[hidden email]> 于2020年10月27日周二 下午2:36写道:

> 尊敬的开发者您好,
> 我读取csv数据的源代码如下:
> t_env.register_table_source("mySource",
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CsvTableSource(r'data\trip\yellow_tripdata_2014-01.csv',
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;['vendor_id',&nbsp;'pickup_datetime',&nbsp;'dropoff_datetime',&nbsp;'passenger_count',
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;'trip_distance',&nbsp;'pickup_longitude',&nbsp;'pickup_latitude',&nbsp;'rate_code',
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;'store_and_fwd_flag',&nbsp;'dropoff_longitude',&nbsp;'dropoff_latitude',
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;'payment_type',&nbsp;'fare_amount',&nbsp;'surcharge',&nbsp;'mta_tax',&nbsp;'tip_amount',
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;'tolls_amount',&nbsp;'total_amount'],&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[DataTypes.STRING(),&nbsp;DataTypes.STRING(),&nbsp;DataTypes.STRING(),&nbsp;DataTypes.BIGINT(),
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;DataTypes.FLOAT(),&nbsp;DataTypes.FLOAT(),&nbsp;DataTypes.FLOAT(),&nbsp;DataTypes.BIGINT(),
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;DataTypes.STRING(),&nbsp;DataTypes.FLOAT(),&nbsp;DataTypes.FLOAT(),
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;DataTypes.STRING(),&nbsp;DataTypes.FLOAT(),&nbsp;DataTypes.FLOAT(),&nbsp;DataTypes.FLOAT(),&nbsp;DataTypes.FLOAT(),
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;DataTypes.FLOAT(),&nbsp;DataTypes.FLOAT()])
> )
>
> 我这里使用的CsvTableSource的方法,该如何跳过原数据中的标题行呢?同时我只想读取'pickup_longitude',&nbsp;'pickup_latitude','dropoff_longitude',&nbsp;'dropoff_latitude'这四列,该如何操作?
> 该种方法与 connect的OldCsv和Schema方法有什么区别?如果使用connect的方法应该怎样跳过标题行,并选取特定的列呢?
> 还是说只能在保存原数据表的时候去掉标题行?
> 期待您的解答。