flink sql提交读取hive的批作业出现 异常“scala MatchError : MAP”

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql提交读取hive的批作业出现 异常“scala MatchError : MAP”

邹云鹤

Hi:
本人使用flink sql 提交了一个批作业读取hive中的数据, 版本是1.10.0 ,使用的是BlinkPlanner, hive中的数据包含了map类型,sql 如下:
create view 111
as select * from table1 where event_id = '0103002' and `day`='2020-05-13' and `hour`='13';

create view view_1 
as 
select 
`day`,
a.rtime as itime,
a.uid as uid, 
trim(BOTH a.event.log_1['scene']) as refer_list,
T.s as abflags,
a.hdid as hdid
from 111 as a 
left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]), ',')) as T(s) on true;
............................

作业提交时在客户端报错, 错误信息如下:scala method error: map......, 如下:

然后,我看了blink planner中CodeGenUtils.scala中的hashcodeForType的代码,发现确实没有匹配MAP类型,后面我自己加了一行匹配MAP代码,作业可以正常提交了, 如下:
但是运行过程中在sortAggreate 过程中又出现了 , "BaseMap not found method 'compareTo' "的异常,导致作业挂掉;

问下各位大佬 这个是否是flink的一个 bug, 还是有什么地方使用不当导致的问题呢??  
Reply | Threaded
Open this post in threaded view
|

Re: flink sql提交读取hive的批作业出现 异常“scala MatchError : MAP”

Rui Li
错误信息的图片看不到,能不能直接贴一下文字?

On Tue, May 19, 2020 at 11:15 AM 邹云鹤 <[hidden email]> wrote:

>
> Hi:
> 本人使用flink sql 提交了一个批作业读取hive中的数据, 版本是1.10.0 ,使用的是BlinkPlanner,
> hive中的数据包含了map类型,sql 如下:
> create view 111
> as select * from table1 where event_id = '0103002' and `day`='2020-05-13'
> and `hour`='13';
>
> create view view_1
> as
> select
> `day`,
> a.rtime as itime,
> a.uid as uid,
> trim(BOTH a.event.log_1['scene']) as refer_list,
> T.s as abflags,
> a.hdid as hdid
> from 111 as a
> left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]),
> ',')) as T(s) on true;
> ............................
>
> 作业提交时在客户端报错, 错误信息如下:scala method error: map......, 如下:
>
> 然后,我看了blink
> planner中CodeGenUtils.scala中的hashcodeForType的代码,发现确实没有匹配MAP类型,后面我自己加了一行匹配MAP代码,作业可以正常提交了,
> 如下:
> 但是运行过程中在sortAggreate 过程中又出现了 , "BaseMap not found method 'compareTo'
> "的异常,导致作业挂掉;
>
> 问下各位大佬 这个是否是flink的一个 bug, 还是有什么地方使用不当导致的问题呢??
> 邹云鹤
> [hidden email]
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>


--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: flink sql提交读取hive的批作业出现 异常“scala MatchError : MAP”

Jingsong Li
In reply to this post by 邹云鹤
Hi 云鹤,

图挂了,另外你可以发送下完整的SQL吗?(因为没看到aggregation的SQL,而异常看起来是aggregation的地方报出来的)

不好意思,这应该是Flink的bug。

Map是不支持order by,但是Map类型可以用来group
by,也就是说我们需要考虑Map的类型的Shuffle(Hash)和SortAggregation(Compare)。

你可以创建一个JIRA来跟踪吗?

Best,
Jingsong Lee

On Tue, May 19, 2020 at 11:15 AM 邹云鹤 <[hidden email]> wrote:

>
> Hi:
> 本人使用flink sql 提交了一个批作业读取hive中的数据, 版本是1.10.0 ,使用的是BlinkPlanner,
> hive中的数据包含了map类型,sql 如下:
> create view 111
> as select * from table1 where event_id = '0103002' and `day`='2020-05-13'
> and `hour`='13';
>
> create view view_1
> as
> select
> `day`,
> a.rtime as itime,
> a.uid as uid,
> trim(BOTH a.event.log_1['scene']) as refer_list,
> T.s as abflags,
> a.hdid as hdid
> from 111 as a
> left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]),
> ',')) as T(s) on true;
> ............................
>
> 作业提交时在客户端报错, 错误信息如下:scala method error: map......, 如下:
>
> 然后,我看了blink
> planner中CodeGenUtils.scala中的hashcodeForType的代码,发现确实没有匹配MAP类型,后面我自己加了一行匹配MAP代码,作业可以正常提交了,
> 如下:
> 但是运行过程中在sortAggreate 过程中又出现了 , "BaseMap not found method 'compareTo'
> "的异常,导致作业挂掉;
>
> 问下各位大佬 这个是否是flink的一个 bug, 还是有什么地方使用不当导致的问题呢??
> 邹云鹤
> [hidden email]
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

回复: flink sql提交读取hive的批作业出现 异常“scala MatchError : MAP”

邹云鹤
好的,我这边先去jira上去创建一个issue, 这个问题的详细过程是这样,如下(刚才图挂了)。

作业SQL是这样的:
create view 111
as select * from table1 where event_id = '0103002' and `day`='2020-05-13'
and `hour`='13';

create view view_1
as
select
`day`,
a.rtime as itime,
a.uid as uid,
trim(BOTH a.event.log_1['scene']) as refer_list,
T.s as abflags,
a.hdid as hdid,
a.country as country
from 111 as a
left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]),
',')) as T(s) on true;

CREATE VIEW view_6 as 
 SELECT
`uid`,
`refer_list`,
`abflag`,
        last_value(country)
 FROM view_1
 where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', 'WELOG_POPULAR')
 GROUP BY  `uid`, `refer_list`, abflag;

提交的时候,出现异常信息如下:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot)
        at sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
        ... 11 more
Caused by: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot)
        at org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212)
        at org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)

查看CodeGenUtils.scala 发现hashCodeForType方法中没有匹配map类型, 修改了一处代码,如下:
 def hashCodeForType(
      ctx: CodeGeneratorContext, t: LogicalType, term: String): String = t.getTypeRoot match {
    case BOOLEAN => s"${className[JBoolean]}.hashCode($term)"
    case TINYINT => s"${className[JByte]}.hashCode($term)"
    case SMALLINT => s"${className[JShort]}.hashCode($term)"
    case INTEGER => s"${className[JInt]}.hashCode($term)"
    case BIGINT => s"${className[JLong]}.hashCode($term)"
    case FLOAT => s"${className[JFloat]}.hashCode($term)"
    case DOUBLE => s"${className[JDouble]}.hashCode($term)"
    case VARCHAR | CHAR => s"$term.hashCode()"
    case VARBINARY | BINARY => s"${className[MurmurHashUtil]}.hashUnsafeBytes(" +
      s"$term, $BYTE_ARRAY_BASE_OFFSET, $term.length)"
    case DECIMAL => s"$term.hashCode()"
    case DATE => s"${className[JInt]}.hashCode($term)"
    case TIME_WITHOUT_TIME_ZONE => s"${className[JInt]}.hashCode($term)"
    case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
      s"$term.hashCode()"
    case INTERVAL_YEAR_MONTH => s"${className[JInt]}.hashCode($term)"
    case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)"
    case ARRAY => throw new IllegalArgumentException(s"Not support type to hash: $t")
    //case MAP => s"${className[BaseMap]}.getHashCode($term)"                            // 新加的一行代码
    case ROW =>
      val rowType = t.asInstanceOf[RowType]


作业可以正常提交了, 运行一段时间,又出现异常:
java.lang.RuntimeException: Could not instantiate generated class 'HashAggregateWithKeys$1543'
        at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
        at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:46)
        at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:156)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
        at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
        at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
        at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
        ... 8 more
Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
        at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
        ... 10 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
        at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
        at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
        ... 13 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 459, Column 57: A method named "compareTo" is not declared in any enclosing class nor any supertype, nor through a static import
        at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
        at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997)
        at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
        at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
        at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
        at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
        at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)


2020年5月19日 11:31[hidden email] 写道:
Hi 云鹤,

图挂了,另外你可以发送下完整的SQL吗?(因为没看到aggregation的SQL,而异常看起来是aggregation的地方报出来的)

不好意思,这应该是Flink的bug。

Map是不支持order by,但是Map类型可以用来group
by,也就是说我们需要考虑Map的类型的Shuffle(Hash)和SortAggregation(Compare)。

你可以创建一个JIRA来跟踪吗?

Best,
Jingsong Lee

On Tue, May 19, 2020 at 11:15 AM 邹云鹤 <[hidden email]> wrote:


Hi:
本人使用flink sql 提交了一个批作业读取hive中的数据, 版本是1.10.0 ,使用的是BlinkPlanner,
hive中的数据包含了map类型,sql 如下:
create view 111
as select * from table1 where event_id = '0103002' and `day`='2020-05-13'
and `hour`='13';

create view view_1
as
select
`day`,
a.rtime as itime,
a.uid as uid,
trim(BOTH a.event.log_1['scene']) as refer_list,
T.s as abflags,
a.hdid as hdid
from 111 as a
left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]),
',')) as T(s) on true;
............................

作业提交时在客户端报错, 错误信息如下:scala method error: map......, 如下:

然后,我看了blink
planner中CodeGenUtils.scala中的hashcodeForType的代码,发现确实没有匹配MAP类型,后面我自己加了一行匹配MAP代码,作业可以正常提交了,
如下:
但是运行过程中在sortAggreate 过程中又出现了 , "BaseMap not found method 'compareTo'
"的异常,导致作业挂掉;

问下各位大佬 这个是否是flink的一个 bug, 还是有什么地方使用不当导致的问题呢??
邹云鹤
[hidden email]

<https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D>
签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: flink sql提交读取hive的批作业出现 异常“scala MatchError : MAP”

Jingsong Li
经过了一些讨论后,Map来当做aggregate的key可能是一个不好的事情。SQL标准上并不支持。

Hi, 你可以考虑使用cast来绕过,比如:
select str_to_map(map_str), ... from (select cast(mapCol as varchar) as
map_str, ... from inputT) group by map_str;

Best,
Jingsong Lee

On Tue, May 19, 2020 at 12:09 PM 邹云鹤 <[hidden email]> wrote:

> 好的,我这边先去jira上去创建一个issue, 这个问题的详细过程是这样,如下(刚才图挂了)。
>
> 作业SQL是这样的:
> create view 111
> as select * from table1 where event_id = '0103002' and `day`='2020-05-13'
> and `hour`='13';
>
> create view view_1
> as
> select
> `day`,
> a.rtime as itime,
> a.uid as uid,
> trim(BOTH a.event.log_1['scene']) as refer_list,
> T.s as abflags,
> a.hdid as hdid,
> a.country as country
> from 111 as a
> left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]),
> ',')) as T(s) on true;
>
> CREATE VIEW view_6 as
>  SELECT
> `uid`,
> `refer_list`,
> `abflag`,
>         last_value(country)
>  FROM view_1
>  where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', 'WELOG_POPULAR')
>  GROUP BY  `uid`, `refer_list`, abflag;
>
> 提交的时候,出现异常信息如下:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: scala.MatchError: MAP (of class
> org.apache.flink.table.types.logical.LogicalTypeRoot)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>         at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>         at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>         at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class
> org.apache.flink.table.types.logical.LogicalTypeRoot)
>         at
> sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>         ... 11 more
> Caused by: scala.MatchError: MAP (of class
> org.apache.flink.table.types.logical.LogicalTypeRoot)
>         at
> org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212)
>         at
> org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97)
>         at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>         at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>
> 查看CodeGenUtils.scala 发现hashCodeForType方法中没有匹配map类型, 修改了一处代码,如下:
>  def hashCodeForType(
>       ctx: CodeGeneratorContext, t: LogicalType, term: String): String =
> t.getTypeRoot match {
>     case BOOLEAN => s"${className[JBoolean]}.hashCode($term)"
>     case TINYINT => s"${className[JByte]}.hashCode($term)"
>     case SMALLINT => s"${className[JShort]}.hashCode($term)"
>     case INTEGER => s"${className[JInt]}.hashCode($term)"
>     case BIGINT => s"${className[JLong]}.hashCode($term)"
>     case FLOAT => s"${className[JFloat]}.hashCode($term)"
>     case DOUBLE => s"${className[JDouble]}.hashCode($term)"
>     case VARCHAR | CHAR => s"$term.hashCode()"
>     case VARBINARY | BINARY =>
> s"${className[MurmurHashUtil]}.hashUnsafeBytes(" +
>       s"$term, $BYTE_ARRAY_BASE_OFFSET, $term.length)"
>     case DECIMAL => s"$term.hashCode()"
>     case DATE => s"${className[JInt]}.hashCode($term)"
>     case TIME_WITHOUT_TIME_ZONE => s"${className[JInt]}.hashCode($term)"
>     case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
>       s"$term.hashCode()"
>     case INTERVAL_YEAR_MONTH => s"${className[JInt]}.hashCode($term)"
>     case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)"
>     case ARRAY => throw new IllegalArgumentException(s"Not support type to
> hash: $t")
>    * //case MAP => s"${className[BaseMap]}.getHashCode($term)"
>                  // 新加的一行代码*
>     case ROW =>
>       val rowType = t.asInstanceOf[RowType]
>
>
> 作业可以正常提交了, 运行一段时间,又出现异常:
> java.lang.RuntimeException: Could not instantiate generated class
> 'HashAggregateWithKeys$1543'
>         at
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
>         at
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:46)
>         at
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:156)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
>         at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
>         at
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
>         at
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
>         ... 8 more
> Caused by:
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>         at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
>         ... 10 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
>         at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
>         at
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>         at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>         ... 13 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 459,
> Column 57: A method named "compareTo" is not declared in any enclosing
> class nor any supertype, nor through a static import
>         at
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>         at
> org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997)
>         at
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
>         at
> org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>         at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
>         at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
>         at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>
>
> 邹云鹤
> [hidden email]
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
> 在2020年5月19日 11:31,Jingsong Li<[hidden email]>
> <[hidden email]> 写道:
>
> Hi 云鹤,
>
> 图挂了,另外你可以发送下完整的SQL吗?(因为没看到aggregation的SQL,而异常看起来是aggregation的地方报出来的)
>
> 不好意思,这应该是Flink的bug。
>
> Map是不支持order by,但是Map类型可以用来group
> by,也就是说我们需要考虑Map的类型的Shuffle(Hash)和SortAggregation(Compare)。
>
> 你可以创建一个JIRA来跟踪吗?
>
> Best,
> Jingsong Lee
>
> On Tue, May 19, 2020 at 11:15 AM 邹云鹤 <[hidden email]> wrote:
>
>
> Hi:
> 本人使用flink sql 提交了一个批作业读取hive中的数据, 版本是1.10.0 ,使用的是BlinkPlanner,
> hive中的数据包含了map类型,sql 如下:
> create view 111
> as select * from table1 where event_id = '0103002' and `day`='2020-05-13'
> and `hour`='13';
>
> create view view_1
> as
> select
> `day`,
> a.rtime as itime,
> a.uid as uid,
> trim(BOTH a.event.log_1['scene']) as refer_list,
> T.s as abflags,
> a.hdid as hdid
> from 111 as a
> left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]),
> ',')) as T(s) on true;
> ............................
>
> 作业提交时在客户端报错, 错误信息如下:scala method error: map......, 如下:
>
> 然后,我看了blink
>
> planner中CodeGenUtils.scala中的hashcodeForType的代码,发现确实没有匹配MAP类型,后面我自己加了一行匹配MAP代码,作业可以正常提交了,
> 如下:
> 但是运行过程中在sortAggreate 过程中又出现了 , "BaseMap not found method 'compareTo'
> "的异常,导致作业挂掉;
>
> 问下各位大佬 这个是否是flink的一个 bug, 还是有什么地方使用不当导致的问题呢??
> 邹云鹤
> [hidden email]
>
> <
> https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D
> >
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>
>
>
> --
> Best, Jingsong Lee
>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

回复: flink sql提交读取hive的批作业出现 异常“scala MatchError : MAP”

邹云鹤
好的,我试下


| |
邹云鹤
|
|
[hidden email]
|
签名由网易邮箱大师定制
在2020年5月19日 12:44,Jingsong Li<[hidden email]> 写道:
经过了一些讨论后,Map来当做aggregate的key可能是一个不好的事情。SQL标准上并不支持。

Hi, 你可以考虑使用cast来绕过,比如:
select str_to_map(map_str), ... from (select cast(mapCol as varchar) as
map_str, ... from inputT) group by map_str;

Best,
Jingsong Lee

On Tue, May 19, 2020 at 12:09 PM 邹云鹤 <[hidden email]> wrote:

好的,我这边先去jira上去创建一个issue, 这个问题的详细过程是这样,如下(刚才图挂了)。

作业SQL是这样的:
create view 111
as select * from table1 where event_id = '0103002' and `day`='2020-05-13'
and `hour`='13';

create view view_1
as
select
`day`,
a.rtime as itime,
a.uid as uid,
trim(BOTH a.event.log_1['scene']) as refer_list,
T.s as abflags,
a.hdid as hdid,
a.country as country
from 111 as a
left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]),
',')) as T(s) on true;

CREATE VIEW view_6 as
SELECT
`uid`,
`refer_list`,
`abflag`,
last_value(country)
FROM view_1
where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', 'WELOG_POPULAR')
GROUP BY  `uid`, `refer_list`, abflag;

提交的时候,出现异常信息如下:
org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: scala.MatchError: MAP (of class
org.apache.flink.table.types.logical.LogicalTypeRoot)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class
org.apache.flink.table.types.logical.LogicalTypeRoot)
at
sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 11 more
Caused by: scala.MatchError: MAP (of class
org.apache.flink.table.types.logical.LogicalTypeRoot)
at
org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212)
at
org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)

查看CodeGenUtils.scala 发现hashCodeForType方法中没有匹配map类型, 修改了一处代码,如下:
def hashCodeForType(
ctx: CodeGeneratorContext, t: LogicalType, term: String): String =
t.getTypeRoot match {
case BOOLEAN => s"${className[JBoolean]}.hashCode($term)"
case TINYINT => s"${className[JByte]}.hashCode($term)"
case SMALLINT => s"${className[JShort]}.hashCode($term)"
case INTEGER => s"${className[JInt]}.hashCode($term)"
case BIGINT => s"${className[JLong]}.hashCode($term)"
case FLOAT => s"${className[JFloat]}.hashCode($term)"
case DOUBLE => s"${className[JDouble]}.hashCode($term)"
case VARCHAR | CHAR => s"$term.hashCode()"
case VARBINARY | BINARY =>
s"${className[MurmurHashUtil]}.hashUnsafeBytes(" +
s"$term, $BYTE_ARRAY_BASE_OFFSET, $term.length)"
case DECIMAL => s"$term.hashCode()"
case DATE => s"${className[JInt]}.hashCode($term)"
case TIME_WITHOUT_TIME_ZONE => s"${className[JInt]}.hashCode($term)"
case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
s"$term.hashCode()"
case INTERVAL_YEAR_MONTH => s"${className[JInt]}.hashCode($term)"
case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)"
case ARRAY => throw new IllegalArgumentException(s"Not support type to
hash: $t")
* //case MAP => s"${className[BaseMap]}.getHashCode($term)"
// 新加的一行代码*
case ROW =>
val rowType = t.asInstanceOf[RowType]


作业可以正常提交了, 运行一段时间,又出现异常:
java.lang.RuntimeException: Could not instantiate generated class
'HashAggregateWithKeys$1543'
at
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
at
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:46)
at
org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:156)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkRuntimeException:
org.apache.flink.api.common.InvalidProgramException: Table program cannot
be compiled. This is a bug. Please file an issue.
at
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
... 8 more
Caused by:
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.flink.api.common.InvalidProgramException: Table program cannot
be compiled. This is a bug. Please file an issue.
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
... 10 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table
program cannot be compiled. This is a bug. Please file an issue.
at
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
at
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
... 13 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 459,
Column 57: A method named "compareTo" is not declared in any enclosing
class nor any supertype, nor through a static import
at
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
at
org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997)
at
org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
at
org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
at
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)


邹云鹤
[hidden email]

<https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D>
签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
在2020年5月19日 11:31,Jingsong Li<[hidden email]>
<[hidden email]> 写道:

Hi 云鹤,

图挂了,另外你可以发送下完整的SQL吗?(因为没看到aggregation的SQL,而异常看起来是aggregation的地方报出来的)

不好意思,这应该是Flink的bug。

Map是不支持order by,但是Map类型可以用来group
by,也就是说我们需要考虑Map的类型的Shuffle(Hash)和SortAggregation(Compare)。

你可以创建一个JIRA来跟踪吗?

Best,
Jingsong Lee

On Tue, May 19, 2020 at 11:15 AM 邹云鹤 <[hidden email]> wrote:


Hi:
本人使用flink sql 提交了一个批作业读取hive中的数据, 版本是1.10.0 ,使用的是BlinkPlanner,
hive中的数据包含了map类型,sql 如下:
create view 111
as select * from table1 where event_id = '0103002' and `day`='2020-05-13'
and `hour`='13';

create view view_1
as
select
`day`,
a.rtime as itime,
a.uid as uid,
trim(BOTH a.event.log_1['scene']) as refer_list,
T.s as abflags,
a.hdid as hdid
from 111 as a
left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]),
',')) as T(s) on true;
............................

作业提交时在客户端报错, 错误信息如下:scala method error: map......, 如下:

然后,我看了blink

planner中CodeGenUtils.scala中的hashcodeForType的代码,发现确实没有匹配MAP类型,后面我自己加了一行匹配MAP代码,作业可以正常提交了,
如下:
但是运行过程中在sortAggreate 过程中又出现了 , "BaseMap not found method 'compareTo'
"的异常,导致作业挂掉;

问下各位大佬 这个是否是flink的一个 bug, 还是有什么地方使用不当导致的问题呢??
邹云鹤
[hidden email]

<
https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D

签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制



--
Best, Jingsong Lee



--
Best, Jingsong Lee