Re: 疑似ParquetTableSource Filter Pushdown bug

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: 疑似ParquetTableSource Filter Pushdown bug

jun su
添加代码文字:
def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnv = StreamTableEnvironment.create(env)

val schema = "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
val parquetTableSource: ParquetTableSource = ParquetTableSource
.builder
.forParquetSchema(new org.apache.parquet.avro.AvroSchemaConverter().convert(
org.apache.avro.Schema.parse(schema, true)))
.path("/Users/sujun/Documents/tmp/login_data")
.build

tableEnv.registerTableSource("source",parquetTableSource)


val t1 = tableEnv.sqlQuery("select log_id,city from source where city = '274' ")
tableEnv.registerTable("t1",t1)

val t4 = tableEnv.sqlQuery("select * from t1 where log_id='5927070661978133'")
t1.toAppendStream[Row].print()

env.execute()

}

jun su <[hidden email]> 于2020年1月8日周三 下午4:59写道:
你好:
       我在使用ParquetTableSource时, 发现一些问题, 疑似是ParquetTableSource Filter Pushdown的Bug, 以下是代码和描述:



debug发现, 代码卡在了: org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp方法, while true循环一直出不来, 知道整合程序OOM



将ParquetTableSource的filter pushdown代码去掉后 , 主程序可以执行. 怀疑是calcite的优化器在迭代找代价最小的plan时一直无法退出导致的
Reply | Threaded
Open this post in threaded view
|

Re: 疑似ParquetTableSource Filter Pushdown bug

Kurt Young
如果是优化器一直卡住不能退出,那基本肯定是BUG了。请开一个issue把这些信息上传上去吧,我们会调查一下是什么问题导致的。

Best,
Kurt


On Wed, Jan 8, 2020 at 5:12 PM jun su <[hidden email]> wrote:

> 添加代码文字:
>
> def main(args: Array[String]): Unit = {
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tableEnv = StreamTableEnvironment.create(env)
>
>     val schema = "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
>     val parquetTableSource: ParquetTableSource = ParquetTableSource
>             .builder
>             .forParquetSchema(new org.apache.parquet.avro.AvroSchemaConverter().convert(
>                 org.apache.avro.Schema.parse(schema, true)))
>             .path("/Users/sujun/Documents/tmp/login_data")
>             .build
>
>     tableEnv.registerTableSource("source",parquetTableSource)
>
>
>     val t1 = tableEnv.sqlQuery("select log_id,city from source where city = '274' ")
>     tableEnv.registerTable("t1",t1)
>
>     val t4 = tableEnv.sqlQuery("select * from t1 where log_id='5927070661978133'")
>     t1.toAppendStream[Row].print()
>
>     env.execute()
>
> }
>
>
> jun su <[hidden email]> 于2020年1月8日周三 下午4:59写道:
>
>> 你好:
>>        我在使用ParquetTableSource时, 发现一些问题, 疑似是ParquetTableSource Filter
>> Pushdown的Bug, 以下是代码和描述:
>>
>> [image: 1578473593933.jpg]
>>
>> debug发现,
>> 代码卡在了: org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp方法, while
>> true循环一直出不来, 知道整合程序OOM
>>
>> [image: 1.jpg]
>>
>> 将ParquetTableSource的filter pushdown代码去掉后 , 主程序可以执行.
>> 怀疑是calcite的优化器在迭代找代价最小的plan时一直无法退出导致的
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: 疑似ParquetTableSource Filter Pushdown bug

jun su
已经创建issue:  https://issues.apache.org/jira/browse/FLINK-15563

Kurt Young <[hidden email]> 于2020年1月8日周三 下午5:33写道:

> 如果是优化器一直卡住不能退出,那基本肯定是BUG了。请开一个issue把这些信息上传上去吧,我们会调查一下是什么问题导致的。
>
> Best,
> Kurt
>
>
> On Wed, Jan 8, 2020 at 5:12 PM jun su <[hidden email]> wrote:
>
> > 添加代码文字:
> >
> > def main(args: Array[String]): Unit = {
> >
> >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> >     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >     val tableEnv = StreamTableEnvironment.create(env)
> >
> >     val schema =
> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
> >     val parquetTableSource: ParquetTableSource = ParquetTableSource
> >             .builder
> >             .forParquetSchema(new
> org.apache.parquet.avro.AvroSchemaConverter().convert(
> >                 org.apache.avro.Schema.parse(schema, true)))
> >             .path("/Users/sujun/Documents/tmp/login_data")
> >             .build
> >
> >     tableEnv.registerTableSource("source",parquetTableSource)
> >
> >
> >     val t1 = tableEnv.sqlQuery("select log_id,city from source where
> city = '274' ")
> >     tableEnv.registerTable("t1",t1)
> >
> >     val t4 = tableEnv.sqlQuery("select * from t1 where
> log_id='5927070661978133'")
> >     t1.toAppendStream[Row].print()
> >
> >     env.execute()
> >
> > }
> >
> >
> > jun su <[hidden email]> 于2020年1月8日周三 下午4:59写道:
> >
> >> 你好:
> >>        我在使用ParquetTableSource时, 发现一些问题, 疑似是ParquetTableSource Filter
> >> Pushdown的Bug, 以下是代码和描述:
> >>
> >> [image: 1578473593933.jpg]
> >>
> >> debug发现,
> >> 代码卡在了: org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp方法,
> while
> >> true循环一直出不来, 知道整合程序OOM
> >>
> >> [image: 1.jpg]
> >>
> >> 将ParquetTableSource的filter pushdown代码去掉后 , 主程序可以执行.
> >> 怀疑是calcite的优化器在迭代找代价最小的plan时一直无法退出导致的
> >>
> >
>