flink 双流join报错,java.lang.AssertionError

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 双流join报错,java.lang.AssertionError

sunfulin
hi,
我使用flink 1.10.1 blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么?


select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, A.fund_name as product_name, cast(B.balance as double) as balance
from (
select toLong(behaviorTime, true) as recvTime, user_id,
cast(regexp_extract(btnTitle, 'zbid=\{([^|]*)\}', 1) as int) as live_id,
regexp_extract(btnTitle, 'fundname=\{([^|]*)\}', 1) as fund_name,
regexp_extract(btnTitle, 'fundcode=\{([^|]*)\}', 1) as fund_code, proctime from kafka_zl_etrack_event_stream
where pageId = 'xxxx'
    and eventId = 'click'
    and btnId = 'xxxx
    and CHARACTER_LENGTH(user_id) > 4
) A
left join
(
select customerNumber, balance, fundCode, lastUpdateTime, proctime
      from lscsp_sc_order_all
       where `status` = '4'
         and businessType IN ('4','5','14','16','17','18')
         and fundCode IS NOT NULL
         and balance IS NOT NULL
         and lastUpdateTime IS NOT NULL
) B
on A.user_id = B.customerNumber and A.fund_code = B.fundCode
group by  A.recvTime, A.user_id, A.live_id, A.fund_code, A.fund_name, cast(B.balance as double)






Exception in thread "main" java.lang.AssertionError
at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.getRootField(SqlToRelConverter.java:4448)
at org.apache.calcite.sql2rel.SqlToRelConverter.adjustInputRef(SqlToRelConverter.java:3765)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3737)
at org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
at org.apache.calcite.sql2rel.StandardConvertletTable.convertCast(StandardConvertletTable.java:522)
at org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)
Reply | Threaded
Open this post in threaded view
|

Re:flink 双流join报错,java.lang.AssertionError

sunfulin



hi,
我切到最新的1.11 release版本,跑同样的sql,没有抛出异常。想问下这有相关的issue么?想确认下原因。














在 2020-07-09 16:53:34,"sunfulin" <[hidden email]> 写道:

>hi,
>我使用flink 1.10.1 blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么?
>
>
>select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, A.fund_name as product_name, cast(B.balance as double) as balance
>from (
>select toLong(behaviorTime, true) as recvTime, user_id,
>cast(regexp_extract(btnTitle, 'zbid=\{([^|]*)\}', 1) as int) as live_id,
>regexp_extract(btnTitle, 'fundname=\{([^|]*)\}', 1) as fund_name,
>regexp_extract(btnTitle, 'fundcode=\{([^|]*)\}', 1) as fund_code, proctime from kafka_zl_etrack_event_stream
>where pageId = 'xxxx'
>    and eventId = 'click'
>    and btnId = 'xxxx
>    and CHARACTER_LENGTH(user_id) > 4
>) A
>left join
>(
>select customerNumber, balance, fundCode, lastUpdateTime, proctime
>      from lscsp_sc_order_all
>       where `status` = '4'
>         and businessType IN ('4','5','14','16','17','18')
>         and fundCode IS NOT NULL
>         and balance IS NOT NULL
>         and lastUpdateTime IS NOT NULL
>) B
>on A.user_id = B.customerNumber and A.fund_code = B.fundCode
>group by  A.recvTime, A.user_id, A.live_id, A.fund_code, A.fund_name, cast(B.balance as double)
>
>
>
>
>
>
>Exception in thread "main" java.lang.AssertionError
>at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.getRootField(SqlToRelConverter.java:4448)
>at org.apache.calcite.sql2rel.SqlToRelConverter.adjustInputRef(SqlToRelConverter.java:3765)
>at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3737)
>at org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
>at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
>at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
>at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
>at org.apache.calcite.sql2rel.StandardConvertletTable.convertCast(StandardConvertletTable.java:522)
>at org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)
Reply | Threaded
Open this post in threaded view
|

Re: flink 双流join报错,java.lang.AssertionError

Jark
Administrator
cc @Danny Chan <[hidden email]>  也许 Danny 老师知道。

On Thu, 9 Jul 2020 at 17:29, sunfulin <[hidden email]> wrote:

>
> hi,
> 我切到最新的1.11 release版本,跑同样的sql,没有抛出异常。想问下这有相关的issue么?想确认下原因。
>
>
>
>
>
>
> 在 2020-07-09 16:53:34,"sunfulin" <[hidden email]> 写道:
> >hi,
> >我使用flink 1.10.1 blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么?
> >
> >
> >select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, A.fund_name as product_name, cast(B.balance as double) as balance
> >from (
> >select toLong(behaviorTime, true) as recvTime, user_id,
> >cast(regexp_extract(btnTitle, 'zbid=\{([^|]*)\}', 1) as int) as live_id,
> >regexp_extract(btnTitle, 'fundname=\{([^|]*)\}', 1) as fund_name,
> >regexp_extract(btnTitle, 'fundcode=\{([^|]*)\}', 1) as fund_code, proctime from kafka_zl_etrack_event_stream
> >where pageId = 'xxxx'
> >    and eventId = 'click'
> >    and btnId = 'xxxx
> >    and CHARACTER_LENGTH(user_id) > 4
> >) A
> >left join
> >(
> >select customerNumber, balance, fundCode, lastUpdateTime, proctime
> >      from lscsp_sc_order_all
> >       where `status` = '4'
> >         and businessType IN ('4','5','14','16','17','18')
> >         and fundCode IS NOT NULL
> >         and balance IS NOT NULL
> >         and lastUpdateTime IS NOT NULL
> >) B
> >on A.user_id = B.customerNumber and A.fund_code = B.fundCode
> >group by  A.recvTime, A.user_id, A.live_id, A.fund_code, A.fund_name, cast(B.balance as double)
> >
> >
> >
> >
> >
> >
> >Exception in thread "main" java.lang.AssertionError
> >at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.getRootField(SqlToRelConverter.java:4448)
> >at org.apache.calcite.sql2rel.SqlToRelConverter.adjustInputRef(SqlToRelConverter.java:3765)
> >at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3737)
> >at org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
> >at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
> >at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
> >at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
> >at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
> >at org.apache.calcite.sql2rel.StandardConvertletTable.convertCast(StandardConvertletTable.java:522)
> >at org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink 双流join报错,java.lang.AssertionError

sunfulin
hi,
 @Danny Chan 我在1.10版本中确实触发到了这个bug,切到1.11版本貌似就没这问题了。简单解释下问题:双流join的case,右边流join后的结果字段在获取时貌似乱序了。


















在 2020-07-13 10:42:12,"Jark Wu" <[hidden email]> 写道:

>cc @Danny Chan <[hidden email]>  也许 Danny 老师知道。
>
>On Thu, 9 Jul 2020 at 17:29, sunfulin <[hidden email]> wrote:
>
>>
>> hi,
>> 我切到最新的1.11 release版本,跑同样的sql,没有抛出异常。想问下这有相关的issue么?想确认下原因。
>>
>>
>>
>>
>>
>>
>> 在 2020-07-09 16:53:34,"sunfulin" <[hidden email]> 写道:
>> >hi,
>> >我使用flink 1.10.1 blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么?
>> >
>> >
>> >select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, A.fund_name as product_name, cast(B.balance as double) as balance
>> >from (
>> >select toLong(behaviorTime, true) as recvTime, user_id,
>> >cast(regexp_extract(btnTitle, 'zbid=\{([^|]*)\}', 1) as int) as live_id,
>> >regexp_extract(btnTitle, 'fundname=\{([^|]*)\}', 1) as fund_name,
>> >regexp_extract(btnTitle, 'fundcode=\{([^|]*)\}', 1) as fund_code, proctime from kafka_zl_etrack_event_stream
>> >where pageId = 'xxxx'
>> >    and eventId = 'click'
>> >    and btnId = 'xxxx
>> >    and CHARACTER_LENGTH(user_id) > 4
>> >) A
>> >left join
>> >(
>> >select customerNumber, balance, fundCode, lastUpdateTime, proctime
>> >      from lscsp_sc_order_all
>> >       where `status` = '4'
>> >         and businessType IN ('4','5','14','16','17','18')
>> >         and fundCode IS NOT NULL
>> >         and balance IS NOT NULL
>> >         and lastUpdateTime IS NOT NULL
>> >) B
>> >on A.user_id = B.customerNumber and A.fund_code = B.fundCode
>> >group by  A.recvTime, A.user_id, A.live_id, A.fund_code, A.fund_name, cast(B.balance as double)
>> >
>> >
>> >
>> >
>> >
>> >
>> >Exception in thread "main" java.lang.AssertionError
>> >at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.getRootField(SqlToRelConverter.java:4448)
>> >at org.apache.calcite.sql2rel.SqlToRelConverter.adjustInputRef(SqlToRelConverter.java:3765)
>> >at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3737)
>> >at org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
>> >at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
>> >at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
>> >at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>> >at org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
>> >at org.apache.calcite.sql2rel.StandardConvertletTable.convertCast(StandardConvertletTable.java:522)
>> >at org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)
>>
>>
>>
>>
>>