Hi: 本人使用flink sql 提交了一个批作业读取hive中的数据, 版本是1.10.0 ,使用的是BlinkPlanner, hive中的数据包含了map类型,sql 如下: create view 111 as select * from table1 where event_id = '0103002' and `day`='2020-05-13' and `hour`='13'; create view view_1 as select `day`, a.rtime as itime, a.uid as uid, trim(BOTH a.event.log_1['scene']) as refer_list, T.s as abflags, a.hdid as hdid from 111 as a left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]), ',')) as T(s) on true; ............................ 作业提交时在客户端报错, 错误信息如下:scala method error: map......, 如下: 然后,我看了blink planner中CodeGenUtils.scala中的hashcodeForType的代码,发现确实没有匹配MAP类型,后面我自己加了一行匹配MAP代码,作业可以正常提交了, 如下: 但是运行过程中在sortAggreate 过程中又出现了 , "BaseMap not found method 'compareTo' "的异常,导致作业挂掉; 问下各位大佬 这个是否是flink的一个 bug, 还是有什么地方使用不当导致的问题呢??
|
错误信息的图片看不到,能不能直接贴一下文字?
On Tue, May 19, 2020 at 11:15 AM 邹云鹤 <[hidden email]> wrote: > > Hi: > 本人使用flink sql 提交了一个批作业读取hive中的数据, 版本是1.10.0 ,使用的是BlinkPlanner, > hive中的数据包含了map类型,sql 如下: > create view 111 > as select * from table1 where event_id = '0103002' and `day`='2020-05-13' > and `hour`='13'; > > create view view_1 > as > select > `day`, > a.rtime as itime, > a.uid as uid, > trim(BOTH a.event.log_1['scene']) as refer_list, > T.s as abflags, > a.hdid as hdid > from 111 as a > left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]), > ',')) as T(s) on true; > ............................ > > 作业提交时在客户端报错, 错误信息如下:scala method error: map......, 如下: > > 然后,我看了blink > planner中CodeGenUtils.scala中的hashcodeForType的代码,发现确实没有匹配MAP类型,后面我自己加了一行匹配MAP代码,作业可以正常提交了, > 如下: > 但是运行过程中在sortAggreate 过程中又出现了 , "BaseMap not found method 'compareTo' > "的异常,导致作业挂掉; > > 问下各位大佬 这个是否是flink的一个 bug, 还是有什么地方使用不当导致的问题呢?? > 邹云鹤 > [hidden email] > > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D> > 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制 > -- Best regards! Rui Li |
In reply to this post by 邹云鹤
Hi 云鹤,
图挂了,另外你可以发送下完整的SQL吗?(因为没看到aggregation的SQL,而异常看起来是aggregation的地方报出来的) 不好意思,这应该是Flink的bug。 Map是不支持order by,但是Map类型可以用来group by,也就是说我们需要考虑Map的类型的Shuffle(Hash)和SortAggregation(Compare)。 你可以创建一个JIRA来跟踪吗? Best, Jingsong Lee On Tue, May 19, 2020 at 11:15 AM 邹云鹤 <[hidden email]> wrote: > > Hi: > 本人使用flink sql 提交了一个批作业读取hive中的数据, 版本是1.10.0 ,使用的是BlinkPlanner, > hive中的数据包含了map类型,sql 如下: > create view 111 > as select * from table1 where event_id = '0103002' and `day`='2020-05-13' > and `hour`='13'; > > create view view_1 > as > select > `day`, > a.rtime as itime, > a.uid as uid, > trim(BOTH a.event.log_1['scene']) as refer_list, > T.s as abflags, > a.hdid as hdid > from 111 as a > left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]), > ',')) as T(s) on true; > ............................ > > 作业提交时在客户端报错, 错误信息如下:scala method error: map......, 如下: > > 然后,我看了blink > planner中CodeGenUtils.scala中的hashcodeForType的代码,发现确实没有匹配MAP类型,后面我自己加了一行匹配MAP代码,作业可以正常提交了, > 如下: > 但是运行过程中在sortAggreate 过程中又出现了 , "BaseMap not found method 'compareTo' > "的异常,导致作业挂掉; > > 问下各位大佬 这个是否是flink的一个 bug, 还是有什么地方使用不当导致的问题呢?? > 邹云鹤 > [hidden email] > > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D> > 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制 > -- Best, Jingsong Lee |
好的,我这边先去jira上去创建一个issue, 这个问题的详细过程是这样,如下(刚才图挂了)。 作业SQL是这样的: create view 111 as select * from table1 where event_id = '0103002' and `day`='2020-05-13' and `hour`='13'; create view view_1 as select `day`, a.rtime as itime, a.uid as uid, trim(BOTH a.event.log_1['scene']) as refer_list, T.s as abflags, a.hdid as hdid, a.country as country from 111 as a left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]), ',')) as T(s) on true; CREATE VIEW view_6 as SELECT `uid`, `refer_list`, `abflag`, last_value(country) FROM view_1 where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', 'WELOG_POPULAR') GROUP BY `uid`, `refer_list`, abflag; 提交的时候,出现异常信息如下:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 11 more Caused by: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212) at org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) 查看CodeGenUtils.scala 发现hashCodeForType方法中没有匹配map类型, 修改了一处代码,如下: def hashCodeForType( ctx: CodeGeneratorContext, t: LogicalType, term: String): String = t.getTypeRoot match { case BOOLEAN => s"${className[JBoolean]}.hashCode($term)" case TINYINT => s"${className[JByte]}.hashCode($term)" case SMALLINT => s"${className[JShort]}.hashCode($term)" case INTEGER => s"${className[JInt]}.hashCode($term)" case BIGINT => s"${className[JLong]}.hashCode($term)" case FLOAT => s"${className[JFloat]}.hashCode($term)" case DOUBLE => s"${className[JDouble]}.hashCode($term)" case VARCHAR | CHAR => s"$term.hashCode()" case VARBINARY | BINARY => s"${className[MurmurHashUtil]}.hashUnsafeBytes(" + s"$term, $BYTE_ARRAY_BASE_OFFSET, $term.length)" case DECIMAL => s"$term.hashCode()" case DATE => s"${className[JInt]}.hashCode($term)" case TIME_WITHOUT_TIME_ZONE => s"${className[JInt]}.hashCode($term)" case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$term.hashCode()" case INTERVAL_YEAR_MONTH => s"${className[JInt]}.hashCode($term)" case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)" case ARRAY => throw new IllegalArgumentException(s"Not support type to hash: $t") //case MAP => s"${className[BaseMap]}.getHashCode($term)" // 新加的一行代码 case ROW => val rowType = t.asInstanceOf[RowType] 作业可以正常提交了, 运行一段时间,又出现异常: java.lang.RuntimeException: Could not instantiate generated class 'HashAggregateWithKeys$1543' at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:46) at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:156) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65) ... 8 more Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) ... 10 more Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ... 13 more Caused by: org.codehaus.commons.compiler.CompileException: Line 459, Column 57: A method named "compareTo" is not declared in any enclosing class nor any supertype, nor through a static import at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) 在2020年5月19日 11:31,[hidden email] 写道:
Hi 云鹤, |
经过了一些讨论后,Map来当做aggregate的key可能是一个不好的事情。SQL标准上并不支持。
Hi, 你可以考虑使用cast来绕过,比如: select str_to_map(map_str), ... from (select cast(mapCol as varchar) as map_str, ... from inputT) group by map_str; Best, Jingsong Lee On Tue, May 19, 2020 at 12:09 PM 邹云鹤 <[hidden email]> wrote: > 好的,我这边先去jira上去创建一个issue, 这个问题的详细过程是这样,如下(刚才图挂了)。 > > 作业SQL是这样的: > create view 111 > as select * from table1 where event_id = '0103002' and `day`='2020-05-13' > and `hour`='13'; > > create view view_1 > as > select > `day`, > a.rtime as itime, > a.uid as uid, > trim(BOTH a.event.log_1['scene']) as refer_list, > T.s as abflags, > a.hdid as hdid, > a.country as country > from 111 as a > left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]), > ',')) as T(s) on true; > > CREATE VIEW view_6 as > SELECT > `uid`, > `refer_list`, > `abflag`, > last_value(country) > FROM view_1 > where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', 'WELOG_POPULAR') > GROUP BY `uid`, `refer_list`, abflag; > > 提交的时候,出现异常信息如下: > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: scala.MatchError: MAP (of class > org.apache.flink.table.types.logical.LogicalTypeRoot) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class > org.apache.flink.table.types.logical.LogicalTypeRoot) > at > sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > ... 11 more > Caused by: scala.MatchError: MAP (of class > org.apache.flink.table.types.logical.LogicalTypeRoot) > at > org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212) > at > org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > > 查看CodeGenUtils.scala 发现hashCodeForType方法中没有匹配map类型, 修改了一处代码,如下: > def hashCodeForType( > ctx: CodeGeneratorContext, t: LogicalType, term: String): String = > t.getTypeRoot match { > case BOOLEAN => s"${className[JBoolean]}.hashCode($term)" > case TINYINT => s"${className[JByte]}.hashCode($term)" > case SMALLINT => s"${className[JShort]}.hashCode($term)" > case INTEGER => s"${className[JInt]}.hashCode($term)" > case BIGINT => s"${className[JLong]}.hashCode($term)" > case FLOAT => s"${className[JFloat]}.hashCode($term)" > case DOUBLE => s"${className[JDouble]}.hashCode($term)" > case VARCHAR | CHAR => s"$term.hashCode()" > case VARBINARY | BINARY => > s"${className[MurmurHashUtil]}.hashUnsafeBytes(" + > s"$term, $BYTE_ARRAY_BASE_OFFSET, $term.length)" > case DECIMAL => s"$term.hashCode()" > case DATE => s"${className[JInt]}.hashCode($term)" > case TIME_WITHOUT_TIME_ZONE => s"${className[JInt]}.hashCode($term)" > case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => > s"$term.hashCode()" > case INTERVAL_YEAR_MONTH => s"${className[JInt]}.hashCode($term)" > case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)" > case ARRAY => throw new IllegalArgumentException(s"Not support type to > hash: $t") > * //case MAP => s"${className[BaseMap]}.getHashCode($term)" > // 新加的一行代码* > case ROW => > val rowType = t.asInstanceOf[RowType] > > > 作业可以正常提交了, 运行一段时间,又出现异常: > java.lang.RuntimeException: Could not instantiate generated class > 'HashAggregateWithKeys$1543' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) > at > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:46) > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:156) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot > be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65) > ... 8 more > Caused by: > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot > be compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) > ... 10 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table > program cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) > ... 13 more > Caused by: org.codehaus.commons.compiler.CompileException: Line 459, > Column 57: A method named "compareTo" is not declared in any enclosing > class nor any supertype, nor through a static import > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) > at > org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997) > at > org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) > at > org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) > > > 邹云鹤 > [hidden email] > > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D> > 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制 > 在2020年5月19日 11:31,Jingsong Li<[hidden email]> > <[hidden email]> 写道: > > Hi 云鹤, > > 图挂了,另外你可以发送下完整的SQL吗?(因为没看到aggregation的SQL,而异常看起来是aggregation的地方报出来的) > > 不好意思,这应该是Flink的bug。 > > Map是不支持order by,但是Map类型可以用来group > by,也就是说我们需要考虑Map的类型的Shuffle(Hash)和SortAggregation(Compare)。 > > 你可以创建一个JIRA来跟踪吗? > > Best, > Jingsong Lee > > On Tue, May 19, 2020 at 11:15 AM 邹云鹤 <[hidden email]> wrote: > > > Hi: > 本人使用flink sql 提交了一个批作业读取hive中的数据, 版本是1.10.0 ,使用的是BlinkPlanner, > hive中的数据包含了map类型,sql 如下: > create view 111 > as select * from table1 where event_id = '0103002' and `day`='2020-05-13' > and `hour`='13'; > > create view view_1 > as > select > `day`, > a.rtime as itime, > a.uid as uid, > trim(BOTH a.event.log_1['scene']) as refer_list, > T.s as abflags, > a.hdid as hdid > from 111 as a > left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]), > ',')) as T(s) on true; > ............................ > > 作业提交时在客户端报错, 错误信息如下:scala method error: map......, 如下: > > 然后,我看了blink > > planner中CodeGenUtils.scala中的hashcodeForType的代码,发现确实没有匹配MAP类型,后面我自己加了一行匹配MAP代码,作业可以正常提交了, > 如下: > 但是运行过程中在sortAggreate 过程中又出现了 , "BaseMap not found method 'compareTo' > "的异常,导致作业挂掉; > > 问下各位大佬 这个是否是flink的一个 bug, 还是有什么地方使用不当导致的问题呢?? > 邹云鹤 > [hidden email] > > < > https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D > > > 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制 > > > > -- > Best, Jingsong Lee > > -- Best, Jingsong Lee |
好的,我试下
| | 邹云鹤 | | [hidden email] | 签名由网易邮箱大师定制 在2020年5月19日 12:44,Jingsong Li<[hidden email]> 写道: 经过了一些讨论后,Map来当做aggregate的key可能是一个不好的事情。SQL标准上并不支持。 Hi, 你可以考虑使用cast来绕过,比如: select str_to_map(map_str), ... from (select cast(mapCol as varchar) as map_str, ... from inputT) group by map_str; Best, Jingsong Lee On Tue, May 19, 2020 at 12:09 PM 邹云鹤 <[hidden email]> wrote: 好的,我这边先去jira上去创建一个issue, 这个问题的详细过程是这样,如下(刚才图挂了)。 作业SQL是这样的: create view 111 as select * from table1 where event_id = '0103002' and `day`='2020-05-13' and `hour`='13'; create view view_1 as select `day`, a.rtime as itime, a.uid as uid, trim(BOTH a.event.log_1['scene']) as refer_list, T.s as abflags, a.hdid as hdid, a.country as country from 111 as a left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]), ',')) as T(s) on true; CREATE VIEW view_6 as SELECT `uid`, `refer_list`, `abflag`, last_value(country) FROM view_1 where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', 'WELOG_POPULAR') GROUP BY `uid`, `refer_list`, abflag; 提交的时候,出现异常信息如下: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 11 more Caused by: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot) at org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212) at org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) 查看CodeGenUtils.scala 发现hashCodeForType方法中没有匹配map类型, 修改了一处代码,如下: def hashCodeForType( ctx: CodeGeneratorContext, t: LogicalType, term: String): String = t.getTypeRoot match { case BOOLEAN => s"${className[JBoolean]}.hashCode($term)" case TINYINT => s"${className[JByte]}.hashCode($term)" case SMALLINT => s"${className[JShort]}.hashCode($term)" case INTEGER => s"${className[JInt]}.hashCode($term)" case BIGINT => s"${className[JLong]}.hashCode($term)" case FLOAT => s"${className[JFloat]}.hashCode($term)" case DOUBLE => s"${className[JDouble]}.hashCode($term)" case VARCHAR | CHAR => s"$term.hashCode()" case VARBINARY | BINARY => s"${className[MurmurHashUtil]}.hashUnsafeBytes(" + s"$term, $BYTE_ARRAY_BASE_OFFSET, $term.length)" case DECIMAL => s"$term.hashCode()" case DATE => s"${className[JInt]}.hashCode($term)" case TIME_WITHOUT_TIME_ZONE => s"${className[JInt]}.hashCode($term)" case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$term.hashCode()" case INTERVAL_YEAR_MONTH => s"${className[JInt]}.hashCode($term)" case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)" case ARRAY => throw new IllegalArgumentException(s"Not support type to hash: $t") * //case MAP => s"${className[BaseMap]}.getHashCode($term)" // 新加的一行代码* case ROW => val rowType = t.asInstanceOf[RowType] 作业可以正常提交了, 运行一段时间,又出现异常: java.lang.RuntimeException: Could not instantiate generated class 'HashAggregateWithKeys$1543' at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:46) at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:156) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65) ... 8 more Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) ... 10 more Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ... 13 more Caused by: org.codehaus.commons.compiler.CompileException: Line 459, Column 57: A method named "compareTo" is not declared in any enclosing class nor any supertype, nor through a static import at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) 邹云鹤 [hidden email] <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制 在2020年5月19日 11:31,Jingsong Li<[hidden email]> <[hidden email]> 写道: Hi 云鹤, 图挂了,另外你可以发送下完整的SQL吗?(因为没看到aggregation的SQL,而异常看起来是aggregation的地方报出来的) 不好意思,这应该是Flink的bug。 Map是不支持order by,但是Map类型可以用来group by,也就是说我们需要考虑Map的类型的Shuffle(Hash)和SortAggregation(Compare)。 你可以创建一个JIRA来跟踪吗? Best, Jingsong Lee On Tue, May 19, 2020 at 11:15 AM 邹云鹤 <[hidden email]> wrote: Hi: 本人使用flink sql 提交了一个批作业读取hive中的数据, 版本是1.10.0 ,使用的是BlinkPlanner, hive中的数据包含了map类型,sql 如下: create view 111 as select * from table1 where event_id = '0103002' and `day`='2020-05-13' and `hour`='13'; create view view_1 as select `day`, a.rtime as itime, a.uid as uid, trim(BOTH a.event.log_1['scene']) as refer_list, T.s as abflags, a.hdid as hdid from 111 as a left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_2['abflag]), ',')) as T(s) on true; ............................ 作业提交时在客户端报错, 错误信息如下:scala method error: map......, 如下: 然后,我看了blink planner中CodeGenUtils.scala中的hashcodeForType的代码,发现确实没有匹配MAP类型,后面我自己加了一行匹配MAP代码,作业可以正常提交了, 如下: 但是运行过程中在sortAggreate 过程中又出现了 , "BaseMap not found method 'compareTo' "的异常,导致作业挂掉; 问下各位大佬 这个是否是flink的一个 bug, 还是有什么地方使用不当导致的问题呢?? 邹云鹤 [hidden email] < https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=%E9%82%B9%E4%BA%91%E9%B9%A4&uid=kevinyunhe%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22kevinyunhe%40163.com%22%5D 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制 -- Best, Jingsong Lee -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |