Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

sunfulin
Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my sql logic, I am using a UDF like ts2Date to handle date format stream fields. However, when I add the `env.enableCheckpointing(time)`, my job failed to submit and throws exception like following. This is really weird, cause when I remove the UDF, the job can submit successfully. Any suggestion is highly appreciated. Besides, my sql logic is like :



INSERT INTO realtime_product_sell
select U.sor_pty_id,
       U.entrust_date,
       U.entrust_time,
       U.product_code,
       U.business_type,
       sum(cast(U.balance as double)) as balance,
       COALESCE(C.cust_name, '--') as cust_name,
       COALESCE(C.open_comp_name, '--') AS open_comp_name,
       COALESCE(C.open_comp_id, '--') as open_comp_id,
       COALESCE(C.org_name,'--') as org_name,
       COALESCE(C.org_id,'--') as comp_name,
       COALESCE(C.comp_name, '--') AS comp_name,
       COALESCE(C.comp_id,'--') as comp_id,
       COALESCE(C.mng_name,'--') as mng_name,
       COALESCE(C.mng_id,'--') as mng_id,
       COALESCE(C.is_tg,'--') as is_tg,
       COALESCE(C.cust_type,'--') as cust_type,
       COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
       COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
        ts2Date(`lastUpdateTime`, true) as entrust_date,     -- the UDF
       ts2Date(`lastUpdateTime`, false) as entrust_time,     -- the UDF
        fundCode as product_code,
        businessType as business_type,
        balance,
        proctime
      from lscsp_sc_order_all where fundCode in ('007118','007117') and businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
        entrust_date,
        entrust_time,
        product_code,
        business_type,
        COALESCE(C.cust_name, '--'),
        COALESCE(C.open_comp_name, '--'),
        COALESCE(C.open_comp_id, '--'),
        COALESCE(C.org_name,'--'),
        COALESCE(C.org_id,'--'),
        COALESCE(C.comp_name, '--'),
        COALESCE(C.comp_id,'--'),
        COALESCE(C.mng_name,'--'),
        COALESCE(C.mng_id,'--'),
        COALESCE(C.is_tg,'--'),
        COALESCE(C.cust_type,'--'),
        COALESCE(C.avg_tot_aset_y365, 0.00),
        COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
... 16 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
... 19 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 30: Cannot determine simple type name "com"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
... 25 more

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

Benchao Li
Hi fulin,

It seems like a bug in the code generation.

Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable
checkpointing and not use your udf? and disable checkpointing and use udf?

sunfulin <[hidden email]> 于2020年3月1日周日 下午5:41写道:

> Hi, guys
> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In
> my sql logic, I am using a UDF like ts2Date to handle date format stream
> fields. However, when I add the `env.enableCheckpointing(time)`, my job
> failed to submit and throws exception like following. This is really weird,
> cause when I remove the UDF, the job can submit successfully. Any
> suggestion is highly appreciated. Besides, my sql logic is like :
>
> *INSERT INTO *realtime_product_sell
> *select *U.sor_pty_id,
>        U.entrust_date,
>        U.entrust_time,
>        U.product_code,
>        U.business_type,
>        sum(*cast*(U.balance *as double*)) *as *balance,
>        COALESCE(C.cust_name, *'--'*) *as *cust_name,
>        COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>        COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>        COALESCE(C.org_name,*'--'*) *as *org_name,
>        COALESCE(C.org_id,*'--'*) *as *comp_name,
>        COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>        COALESCE(C.comp_id,*'--'*) *as *comp_id,
>        COALESCE(C.mng_name,*'--'*) *as *mng_name,
>        COALESCE(C.mng_id,*'--'*) *as *mng_id,
>        COALESCE(C.is_tg,*'--'*) *as *is_tg,
>        COALESCE(C.cust_type,*'--'*) *as *cust_type,
>        COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>        COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>
> *from*(*select *customerNumber *as *sor_pty_id,
>         ts2Date(`lastUpdateTime`, *true*) *as *entrust_date,     -- the
> UDF
>        ts2Date(`lastUpdateTime`, *false*) *as *entrust_time,     -- the
> UDF
>         fundCode *as *product_code,
>         businessType *as *business_type,
>         balance,
>         proctime
>       *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>
> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS *C
> *on **U*.sor_pty_id = *C*.cust_id
> *group by *sor_pty_id,
>         entrust_date,
>         entrust_time,
>         product_code,
>         business_type,
>         COALESCE(C.cust_name, *'--'*),
>         COALESCE(C.open_comp_name, *'--'*),
>         COALESCE(C.open_comp_id, *'--'*),
>         COALESCE(C.org_name,*'--'*),
>         COALESCE(C.org_id,*'--'*),
>         COALESCE(C.comp_name, *'--'*),
>         COALESCE(C.comp_id,*'--'*),
>         COALESCE(C.mng_name,*'--'*),
>         COALESCE(C.mng_id,*'--'*),
>         COALESCE(C.is_tg,*'--'*),
>         COALESCE(C.cust_type,*'--'*),
>         COALESCE(C.avg_tot_aset_y365, 0.00),
>         COALESCE(C.avg_aset_create_y, 0.00)
>
> 2020-03-01 17:22:06,504 ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled
> exception.
> org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> at
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> at
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> at
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> at
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> ... 16 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> ... 19 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column
> 30: Cannot determine simple type name "com"
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> at
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
> at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
> at
> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
> at
> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
> at
> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
> at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
> at
> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
> at
> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
> at
> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
> at
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> at
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> ... 25 more
>
>
>
>
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re:Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

sunfulin
Hi, Benchao,
Thanks for the reply.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
I am using Blink Planner. Not test with legacy planner because my program depend a lot of new feature based on blink planner.
2. how do you register your UDF?
Just use the code :  tableEnv.registerFunction ("ts2Date", new ts2Date());    tableEnv is a StreamTableEnvironment.
3. does this has a relation with checkpointing? what if you enable checkpointing and not use your udf? and disable checkpointing and use udf?
I don't think this is related with checkpoint. If I enable checkpointing and not use my udf, I did not see any exception and submit job successfully. If I disable checkpointing and use udf, the job can submit successfully too.


I dive a lot with this exception. Maybe it is related with some classloader issue. Hope for your suggestion.











在 2020-03-01 17:54:03,"Benchao Li" <[hidden email]> 写道:

Hi fulin,


It seems like a bug in the code generation.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable checkpointing and not use your udf? and disable checkpointing and use udf?


sunfulin <[hidden email]> 于2020年3月1日周日 下午5:41写道:

Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my sql logic, I am using a UDF like ts2Date to handle date format stream fields. However, when I add the `env.enableCheckpointing(time)`, my job failed to submit and throws exception like following. This is really weird, cause when I remove the UDF, the job can submit successfully. Any suggestion is highly appreciated. Besides, my sql logic is like :



INSERT INTO realtime_product_sell
select U.sor_pty_id,
       U.entrust_date,
       U.entrust_time,
       U.product_code,
       U.business_type,
       sum(cast(U.balance as double)) as balance,
       COALESCE(C.cust_name, '--') as cust_name,
       COALESCE(C.open_comp_name, '--') AS open_comp_name,
       COALESCE(C.open_comp_id, '--') as open_comp_id,
       COALESCE(C.org_name,'--') as org_name,
       COALESCE(C.org_id,'--') as comp_name,
       COALESCE(C.comp_name, '--') AS comp_name,
       COALESCE(C.comp_id,'--') as comp_id,
       COALESCE(C.mng_name,'--') as mng_name,
       COALESCE(C.mng_id,'--') as mng_id,
       COALESCE(C.is_tg,'--') as is_tg,
       COALESCE(C.cust_type,'--') as cust_type,
       COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
       COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
        ts2Date(`lastUpdateTime`, true) as entrust_date,     -- the UDF
       ts2Date(`lastUpdateTime`, false) as entrust_time,     -- the UDF
        fundCode as product_code,
        businessType as business_type,
        balance,
        proctime
      from lscsp_sc_order_all where fundCode in ('007118','007117') and businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
        entrust_date,
        entrust_time,
        product_code,
        business_type,
        COALESCE(C.cust_name, '--'),
        COALESCE(C.open_comp_name, '--'),
        COALESCE(C.open_comp_id, '--'),
        COALESCE(C.org_name,'--'),
        COALESCE(C.org_id,'--'),
        COALESCE(C.comp_name, '--'),
        COALESCE(C.comp_id,'--'),
        COALESCE(C.mng_name,'--'),
        COALESCE(C.mng_id,'--'),
        COALESCE(C.is_tg,'--'),
        COALESCE(C.cust_type,'--'),
        COALESCE(C.avg_tot_aset_y365, 0.00),
        COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
... 16 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
... 19 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 30: Cannot determine simple type name "com"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
... 25 more






 





--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

Benchao Li
Could you show how your UDF `ts2Date` is implemented?

sunfulin <[hidden email]> 于2020年3月1日周日 下午6:05写道:

> Hi, Benchao,
> Thanks for the reply.
>
> Could you provide us more information?
> 1. what planner are you using? blink or legacy planner?
> I am using Blink Planner. Not test with legacy planner because my program
> depend a lot of new feature based on blink planner.
> 2. how do you register your UDF?
> Just use the code :  tableEnv.registerFunction ("ts2Date", new
> ts2Date());    tableEnv is a StreamTableEnvironment.
> 3. does this has a relation with checkpointing? what if you enable
> checkpointing and not use your udf? and disable checkpointing and use udf?
> I don't think this is related with checkpoint. If I enable checkpointing
> and not use my udf, I did not see any exception and submit job
> successfully. If I disable checkpointing and use udf, the job can submit
> successfully too.
>
> I dive a lot with this exception. Maybe it is related with some
> classloader issue. Hope for your suggestion.
>
>
>
>
>
> 在 2020-03-01 17:54:03,"Benchao Li" <[hidden email]> 写道:
>
> Hi fulin,
>
> It seems like a bug in the code generation.
>
> Could you provide us more information?
> 1. what planner are you using? blink or legacy planner?
> 2. how do you register your UDF?
> 3. does this has a relation with checkpointing? what if you enable
> checkpointing and not use your udf? and disable checkpointing and use udf?
>
> sunfulin <[hidden email]> 于2020年3月1日周日 下午5:41写道:
>
>> Hi, guys
>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
>> failed to submit and throws exception like following. This is really weird,
>> cause when I remove the UDF, the job can submit successfully. Any
>> suggestion is highly appreciated. Besides, my sql logic is like :
>>
>> *INSERT INTO *realtime_product_sell
>> *select *U.sor_pty_id,
>>        U.entrust_date,
>>        U.entrust_time,
>>        U.product_code,
>>        U.business_type,
>>        sum(*cast*(U.balance *as double*)) *as *balance,
>>        COALESCE(C.cust_name, *'--'*) *as *cust_name,
>>        COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>>        COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>>        COALESCE(C.org_name,*'--'*) *as *org_name,
>>        COALESCE(C.org_id,*'--'*) *as *comp_name,
>>        COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>>        COALESCE(C.comp_id,*'--'*) *as *comp_id,
>>        COALESCE(C.mng_name,*'--'*) *as *mng_name,
>>        COALESCE(C.mng_id,*'--'*) *as *mng_id,
>>        COALESCE(C.is_tg,*'--'*) *as *is_tg,
>>        COALESCE(C.cust_type,*'--'*) *as *cust_type,
>>        COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>>        COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>>
>> *from*(*select *customerNumber *as *sor_pty_id,
>>         ts2Date(`lastUpdateTime`, *true*) *as *entrust_date,     -- the
>> UDF
>>        ts2Date(`lastUpdateTime`, *false*) *as *entrust_time,     -- the
>> UDF
>>         fundCode *as *product_code,
>>         businessType *as *business_type,
>>         balance,
>>         proctime
>>       *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>>
>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS *
>> C
>> *on **U*.sor_pty_id = *C*.cust_id
>> *group by *sor_pty_id,
>>         entrust_date,
>>         entrust_time,
>>         product_code,
>>         business_type,
>>         COALESCE(C.cust_name, *'--'*),
>>         COALESCE(C.open_comp_name, *'--'*),
>>         COALESCE(C.open_comp_id, *'--'*),
>>         COALESCE(C.org_name,*'--'*),
>>         COALESCE(C.org_id,*'--'*),
>>         COALESCE(C.comp_name, *'--'*),
>>         COALESCE(C.comp_id,*'--'*),
>>         COALESCE(C.mng_name,*'--'*),
>>         COALESCE(C.mng_id,*'--'*),
>>         COALESCE(C.is_tg,*'--'*),
>>         COALESCE(C.cust_type,*'--'*),
>>         COALESCE(C.avg_tot_aset_y365, 0.00),
>>         COALESCE(C.avg_aset_create_y, 0.00)
>>
>> 2020-03-01 17:22:06,504 ERROR
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled
>> exception.
>> org.apache.flink.util.FlinkRuntimeException:
>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>> be compiled. This is a bug. Please file an issue.
>> at
>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
>> at
>> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
>> at
>> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
>> at
>> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
>> at
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
>> at
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
>> at
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
>> at
>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
>> at
>> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
>> at
>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
>> at
>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>> be compiled. This is a bug. Please file an issue.
>> at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
>> at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>> at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>> at
>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
>> ... 16 more
>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
>> program cannot be compiled. This is a bug. Please file an issue.
>> at
>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
>> at
>> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
>> at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
>> at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
>> at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
>> at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>> at
>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>> ... 19 more
>> Caused by: org.codehaus.commons.compiler.CompileException: Line 17,
>> Column 30: Cannot determine simple type name "com"
>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>> at
>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
>> at
>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
>> at
>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> at
>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> at
>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> at
>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> at
>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
>> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
>> at
>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
>> at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
>> at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
>> at
>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
>> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
>> at
>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
>> at
>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
>> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
>> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
>> at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
>> at
>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
>> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
>> at
>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
>> at
>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
>> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
>> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
>> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
>> at
>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>> at
>> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
>> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
>> at
>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>> at
>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
>> at
>> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
>> at
>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
>> at
>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
>> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
>> at
>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
>> at
>> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
>> at
>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
>> at
>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
>> ... 25 more
>>
>>
>>
>>
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>
>
>
>
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

sunfulin
Below is the code. The function trans origin field timeStr "2020-03-01 12:01:00.234" to target timeStr accroding to dayTag.



public class ts2Date extends ScalarFunction {
    public ts2Date() {

    }




    public String eval (String timeStr, boolean dayTag) {

    if(timeStr == null) {
        return null;
    }
    SimpleDateFormat ortSf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    Date date = new Date();
    try {
        date = ortSf.parse(timeStr);
    } catch (ParseException e) {
        e.printStackTrace();
        return null;
    }
    if (dayTag) {
        String format = "yyyy-MM-dd";
        SimpleDateFormat sf = new SimpleDateFormat(format);
        return sf.format(date);
    } else {
        String format = "yyyy-MM-dd\'T\'HH:mm:00.000+0800";
        SimpleDateFormat sf = new SimpleDateFormat(format);
        return sf.format(date);
    }


}
}







At 2020-03-01 18:14:30, "Benchao Li" <[hidden email]> wrote:

Could you show how your UDF `ts2Date` is implemented?


sunfulin <[hidden email]> 于2020年3月1日周日 下午6:05写道:

Hi, Benchao,
Thanks for the reply.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
I am using Blink Planner. Not test with legacy planner because my program depend a lot of new feature based on blink planner.
2. how do you register your UDF?
Just use the code :  tableEnv.registerFunction ("ts2Date", new ts2Date());    tableEnv is a StreamTableEnvironment.
3. does this has a relation with checkpointing? what if you enable checkpointing and not use your udf? and disable checkpointing and use udf?
I don't think this is related with checkpoint. If I enable checkpointing and not use my udf, I did not see any exception and submit job successfully. If I disable checkpointing and use udf, the job can submit successfully too.


I dive a lot with this exception. Maybe it is related with some classloader issue. Hope for your suggestion.











在 2020-03-01 17:54:03,"Benchao Li" <[hidden email]> 写道:

Hi fulin,


It seems like a bug in the code generation.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable checkpointing and not use your udf? and disable checkpointing and use udf?


sunfulin <[hidden email]> 于2020年3月1日周日 下午5:41写道:

Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my sql logic, I am using a UDF like ts2Date to handle date format stream fields. However, when I add the `env.enableCheckpointing(time)`, my job failed to submit and throws exception like following. This is really weird, cause when I remove the UDF, the job can submit successfully. Any suggestion is highly appreciated. Besides, my sql logic is like :



INSERT INTO realtime_product_sell
select U.sor_pty_id,
       U.entrust_date,
       U.entrust_time,
       U.product_code,
       U.business_type,
       sum(cast(U.balance as double)) as balance,
       COALESCE(C.cust_name, '--') as cust_name,
       COALESCE(C.open_comp_name, '--') AS open_comp_name,
       COALESCE(C.open_comp_id, '--') as open_comp_id,
       COALESCE(C.org_name,'--') as org_name,
       COALESCE(C.org_id,'--') as comp_name,
       COALESCE(C.comp_name, '--') AS comp_name,
       COALESCE(C.comp_id,'--') as comp_id,
       COALESCE(C.mng_name,'--') as mng_name,
       COALESCE(C.mng_id,'--') as mng_id,
       COALESCE(C.is_tg,'--') as is_tg,
       COALESCE(C.cust_type,'--') as cust_type,
       COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
       COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
        ts2Date(`lastUpdateTime`, true) as entrust_date,     -- the UDF
       ts2Date(`lastUpdateTime`, false) as entrust_time,     -- the UDF
        fundCode as product_code,
        businessType as business_type,
        balance,
        proctime
      from lscsp_sc_order_all where fundCode in ('007118','007117') and businessType in ('5') ) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
        entrust_date,
        entrust_time,
        product_code,
        business_type,
        COALESCE(C.cust_name, '--'),
        COALESCE(C.open_comp_name, '--'),
        COALESCE(C.open_comp_id, '--'),
        COALESCE(C.org_name,'--'),
        COALESCE(C.org_id,'--'),
        COALESCE(C.comp_name, '--'),
        COALESCE(C.comp_id,'--'),
        COALESCE(C.mng_name,'--'),
        COALESCE(C.mng_id,'--'),
        COALESCE(C.is_tg,'--'),
        COALESCE(C.cust_type,'--'),
        COALESCE(C.avg_tot_aset_y365, 0.00),
        COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
... 16 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
... 19 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 30: Cannot determine simple type name "com"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
... 25 more






 





--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]




 





--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

Benchao Li
The UDF looks good. Could you also paste your DDL? Then we can produce your
bug easily.

sunfulin <[hidden email]> 于2020年3月1日周日 下午6:39写道:

> Below is the code. The function trans origin field timeStr "2020-03-01
> 12:01:00.234" to target timeStr accroding to dayTag.
>
> *public class *ts2Date *extends *ScalarFunction {
>     *public *ts2Date() {
>
>     }
>
>
>     *public *String eval (String timeStr, *boolean *dayTag) {
>
>     *if*(timeStr == *null*) {
>         *return null*;
>     }
>     SimpleDateFormat ortSf = *new *SimpleDateFormat(*"yyyy-MM-dd
> HH:mm:ss.SSS"*);
>     Date date = *new *Date();
>     *try *{
>         date = ortSf.parse(timeStr);
>     } *catch *(ParseException e) {
>         e.printStackTrace();
>         *return null*;
>     }
>     *if *(dayTag) {
>         String format = *"yyyy-MM-dd"*;
>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
>         *return *sf.format(date);
>     } *else *{
>         String format = *"yyyy-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
>         *return *sf.format(date);
>     }
> }
> }
>
>
>
> At 2020-03-01 18:14:30, "Benchao Li" <[hidden email]> wrote:
>
> Could you show how your UDF `ts2Date` is implemented?
>
> sunfulin <[hidden email]> 于2020年3月1日周日 下午6:05写道:
>
>> Hi, Benchao,
>> Thanks for the reply.
>>
>> Could you provide us more information?
>> 1. what planner are you using? blink or legacy planner?
>> I am using Blink Planner. Not test with legacy planner because my program
>> depend a lot of new feature based on blink planner.
>> 2. how do you register your UDF?
>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
>> ts2Date());    tableEnv is a StreamTableEnvironment.
>> 3. does this has a relation with checkpointing? what if you enable
>> checkpointing and not use your udf? and disable checkpointing and use udf?
>> I don't think this is related with checkpoint. If I enable checkpointing
>> and not use my udf, I did not see any exception and submit job
>> successfully. If I disable checkpointing and use udf, the job can submit
>> successfully too.
>>
>> I dive a lot with this exception. Maybe it is related with some
>> classloader issue. Hope for your suggestion.
>>
>>
>>
>>
>>
>> 在 2020-03-01 17:54:03,"Benchao Li" <[hidden email]> 写道:
>>
>> Hi fulin,
>>
>> It seems like a bug in the code generation.
>>
>> Could you provide us more information?
>> 1. what planner are you using? blink or legacy planner?
>> 2. how do you register your UDF?
>> 3. does this has a relation with checkpointing? what if you enable
>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>
>> sunfulin <[hidden email]> 于2020年3月1日周日 下午5:41写道:
>>
>>> Hi, guys
>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
>>> failed to submit and throws exception like following. This is really weird,
>>> cause when I remove the UDF, the job can submit successfully. Any
>>> suggestion is highly appreciated. Besides, my sql logic is like :
>>>
>>> *INSERT INTO *realtime_product_sell
>>> *select *U.sor_pty_id,
>>>        U.entrust_date,
>>>        U.entrust_time,
>>>        U.product_code,
>>>        U.business_type,
>>>        sum(*cast*(U.balance *as double*)) *as *balance,
>>>        COALESCE(C.cust_name, *'--'*) *as *cust_name,
>>>        COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>>>        COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>>>        COALESCE(C.org_name,*'--'*) *as *org_name,
>>>        COALESCE(C.org_id,*'--'*) *as *comp_name,
>>>        COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>>>        COALESCE(C.comp_id,*'--'*) *as *comp_id,
>>>        COALESCE(C.mng_name,*'--'*) *as *mng_name,
>>>        COALESCE(C.mng_id,*'--'*) *as *mng_id,
>>>        COALESCE(C.is_tg,*'--'*) *as *is_tg,
>>>        COALESCE(C.cust_type,*'--'*) *as *cust_type,
>>>        COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>>>        COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>>>
>>> *from*(*select *customerNumber *as *sor_pty_id,
>>>         ts2Date(`lastUpdateTime`, *true*) *as *entrust_date,     -- the
>>> UDF
>>>        ts2Date(`lastUpdateTime`, *false*) *as *entrust_time,     -- the
>>> UDF
>>>         fundCode *as *product_code,
>>>         businessType *as *business_type,
>>>         balance,
>>>         proctime
>>>       *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
>>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>>>
>>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS
>>> *C
>>> *on **U*.sor_pty_id = *C*.cust_id
>>> *group by *sor_pty_id,
>>>         entrust_date,
>>>         entrust_time,
>>>         product_code,
>>>         business_type,
>>>         COALESCE(C.cust_name, *'--'*),
>>>         COALESCE(C.open_comp_name, *'--'*),
>>>         COALESCE(C.open_comp_id, *'--'*),
>>>         COALESCE(C.org_name,*'--'*),
>>>         COALESCE(C.org_id,*'--'*),
>>>         COALESCE(C.comp_name, *'--'*),
>>>         COALESCE(C.comp_id,*'--'*),
>>>         COALESCE(C.mng_name,*'--'*),
>>>         COALESCE(C.mng_id,*'--'*),
>>>         COALESCE(C.is_tg,*'--'*),
>>>         COALESCE(C.cust_type,*'--'*),
>>>         COALESCE(C.avg_tot_aset_y365, 0.00),
>>>         COALESCE(C.avg_aset_create_y, 0.00)
>>>
>>> 2020-03-01 17:22:06,504 ERROR
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled
>>> exception.
>>> org.apache.flink.util.FlinkRuntimeException:
>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>>> be compiled. This is a bug. Please file an issue.
>>> at
>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
>>> at
>>> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
>>> at
>>> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
>>> at
>>> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
>>> at
>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
>>> at
>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
>>> at
>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
>>> at
>>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
>>> at
>>> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
>>> at
>>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
>>> at
>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>>> at
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>>> at
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by:
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>>> be compiled. This is a bug. Please file an issue.
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>>> at
>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
>>> ... 16 more
>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
>>> program cannot be compiled. This is a bug. Please file an issue.
>>> at
>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
>>> at
>>> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>>> ... 19 more
>>> Caused by: org.codehaus.commons.compiler.CompileException: Line 17,
>>> Column 30: Cannot determine simple type name "com"
>>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>>> at
>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
>>> at
>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
>>> at
>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>>> at
>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>>> at
>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>>> at
>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>>> at
>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
>>> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
>>> at
>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
>>> at
>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>>> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
>>> at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
>>> at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
>>> at
>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
>>> at
>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
>>> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
>>> at
>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
>>> at
>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
>>> at
>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
>>> at
>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
>>> at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
>>> at
>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
>>> at
>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
>>> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
>>> at
>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
>>> at
>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
>>> at
>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
>>> at
>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>>> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
>>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
>>> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>>> at
>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
>>> at
>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>>> at
>>> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
>>> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
>>> at
>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
>>> at
>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>>> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>>> at
>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>>> at
>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>>> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
>>> at
>>> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
>>> at
>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
>>> at
>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
>>> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
>>> at
>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
>>> at
>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
>>> at
>>> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
>>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
>>> at
>>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
>>> at
>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
>>> ... 25 more
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: [hidden email]; [hidden email]
>>
>>
>>
>>
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>
>
>
>
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

sunfulin



CREATE TABLE realtime_product_sell (
  sor_pty_id varchar,
  entrust_date varchar,
  entrust_time varchar,
  product_code varchar ,
  business_type varchar ,
  balance double ,
  cust_name varchar ,
  open_comp_name varchar ,
  open_comp_id varchar ,
  org_name varchar ,
  org_id varchar ,
  comp_name varchar ,
  comp_id varchar ,
  mng_name varchar ,
  mng_id varchar ,
  is_tg varchar ,
  cust_type varchar ,
  avg_tot_aset_y365 double ,
  avg_aset_create_y double
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '<version>',
'connector.hosts' = '<host_port>',
'connector.index' = 'realtime_product_sell_007118',
'connector.document-type' = '_doc',
'update-mode' = 'upsert',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.interval' = '1000',
'format.type' = 'json'
)











At 2020-03-01 21:08:08, "Benchao Li" <[hidden email]> wrote:

>The UDF looks good. Could you also paste your DDL? Then we can produce your
>bug easily.
>
>sunfulin <[hidden email]> 于2020年3月1日周日 下午6:39写道:
>
>> Below is the code. The function trans origin field timeStr "2020-03-01
>> 12:01:00.234" to target timeStr accroding to dayTag.
>>
>> *public class *ts2Date *extends *ScalarFunction {
>>     *public *ts2Date() {
>>
>>     }
>>
>>
>>     *public *String eval (String timeStr, *boolean *dayTag) {
>>
>>     *if*(timeStr == *null*) {
>>         *return null*;
>>     }
>>     SimpleDateFormat ortSf = *new *SimpleDateFormat(*"yyyy-MM-dd
>> HH:mm:ss.SSS"*);
>>     Date date = *new *Date();
>>     *try *{
>>         date = ortSf.parse(timeStr);
>>     } *catch *(ParseException e) {
>>         e.printStackTrace();
>>         *return null*;
>>     }
>>     *if *(dayTag) {
>>         String format = *"yyyy-MM-dd"*;
>>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
>>         *return *sf.format(date);
>>     } *else *{
>>         String format = *"yyyy-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
>>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
>>         *return *sf.format(date);
>>     }
>> }
>> }
>>
>>
>>
>> At 2020-03-01 18:14:30, "Benchao Li" <[hidden email]> wrote:
>>
>> Could you show how your UDF `ts2Date` is implemented?
>>
>> sunfulin <[hidden email]> 于2020年3月1日周日 下午6:05写道:
>>
>>> Hi, Benchao,
>>> Thanks for the reply.
>>>
>>> Could you provide us more information?
>>> 1. what planner are you using? blink or legacy planner?
>>> I am using Blink Planner. Not test with legacy planner because my program
>>> depend a lot of new feature based on blink planner.
>>> 2. how do you register your UDF?
>>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
>>> ts2Date());    tableEnv is a StreamTableEnvironment.
>>> 3. does this has a relation with checkpointing? what if you enable
>>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>> I don't think this is related with checkpoint. If I enable checkpointing
>>> and not use my udf, I did not see any exception and submit job
>>> successfully. If I disable checkpointing and use udf, the job can submit
>>> successfully too.
>>>
>>> I dive a lot with this exception. Maybe it is related with some
>>> classloader issue. Hope for your suggestion.
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-03-01 17:54:03,"Benchao Li" <[hidden email]> 写道:
>>>
>>> Hi fulin,
>>>
>>> It seems like a bug in the code generation.
>>>
>>> Could you provide us more information?
>>> 1. what planner are you using? blink or legacy planner?
>>> 2. how do you register your UDF?
>>> 3. does this has a relation with checkpointing? what if you enable
>>> checkpointing and not use your udf? and disable checkpointing and use udf?
>>>
>>> sunfulin <[hidden email]> 于2020年3月1日周日 下午5:41写道:
>>>
>>>> Hi, guys
>>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>>>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
>>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
>>>> failed to submit and throws exception like following. This is really weird,
>>>> cause when I remove the UDF, the job can submit successfully. Any
>>>> suggestion is highly appreciated. Besides, my sql logic is like :
>>>>
>>>> *INSERT INTO *realtime_product_sell
>>>> *select *U.sor_pty_id,
>>>>        U.entrust_date,
>>>>        U.entrust_time,
>>>>        U.product_code,
>>>>        U.business_type,
>>>>        sum(*cast*(U.balance *as double*)) *as *balance,
>>>>        COALESCE(C.cust_name, *'--'*) *as *cust_name,
>>>>        COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>>>>        COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>>>>        COALESCE(C.org_name,*'--'*) *as *org_name,
>>>>        COALESCE(C.org_id,*'--'*) *as *comp_name,
>>>>        COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>>>>        COALESCE(C.comp_id,*'--'*) *as *comp_id,
>>>>        COALESCE(C.mng_name,*'--'*) *as *mng_name,
>>>>        COALESCE(C.mng_id,*'--'*) *as *mng_id,
>>>>        COALESCE(C.is_tg,*'--'*) *as *is_tg,
>>>>        COALESCE(C.cust_type,*'--'*) *as *cust_type,
>>>>        COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>>>>        COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>>>>
>>>> *from*(*select *customerNumber *as *sor_pty_id,
>>>>         ts2Date(`lastUpdateTime`, *true*) *as *entrust_date,     -- the
>>>> UDF
>>>>        ts2Date(`lastUpdateTime`, *false*) *as *entrust_time,     -- the
>>>> UDF
>>>>         fundCode *as *product_code,
>>>>         businessType *as *business_type,
>>>>         balance,
>>>>         proctime
>>>>       *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
>>>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>>>>
>>>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS
>>>> *C
>>>> *on **U*.sor_pty_id = *C*.cust_id
>>>> *group by *sor_pty_id,
>>>>         entrust_date,
>>>>         entrust_time,
>>>>         product_code,
>>>>         business_type,
>>>>         COALESCE(C.cust_name, *'--'*),
>>>>         COALESCE(C.open_comp_name, *'--'*),
>>>>         COALESCE(C.open_comp_id, *'--'*),
>>>>         COALESCE(C.org_name,*'--'*),
>>>>         COALESCE(C.org_id,*'--'*),
>>>>         COALESCE(C.comp_name, *'--'*),
>>>>         COALESCE(C.comp_id,*'--'*),
>>>>         COALESCE(C.mng_name,*'--'*),
>>>>         COALESCE(C.mng_id,*'--'*),
>>>>         COALESCE(C.is_tg,*'--'*),
>>>>         COALESCE(C.cust_type,*'--'*),
>>>>         COALESCE(C.avg_tot_aset_y365, 0.00),
>>>>         COALESCE(C.avg_aset_create_y, 0.00)
>>>>
>>>> 2020-03-01 17:22:06,504 ERROR
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled
>>>> exception.
>>>> org.apache.flink.util.FlinkRuntimeException:
>>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>>>> be compiled. This is a bug. Please file an issue.
>>>> at
>>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
>>>> at
>>>> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
>>>> at
>>>> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
>>>> at
>>>> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
>>>> at
>>>> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
>>>> at
>>>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
>>>> at
>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>>>> at
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>>>> at
>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by:
>>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
>>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>>>> be compiled. This is a bug. Please file an issue.
>>>> at
>>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
>>>> at
>>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>>>> at
>>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>>>> at
>>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
>>>> ... 16 more
>>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
>>>> program cannot be compiled. This is a bug. Please file an issue.
>>>> at
>>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
>>>> at
>>>> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
>>>> at
>>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
>>>> at
>>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
>>>> at
>>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
>>>> at
>>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>>>> at
>>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>>>> ... 19 more
>>>> Caused by: org.codehaus.commons.compiler.CompileException: Line 17,
>>>> Column 30: Cannot determine simple type name "com"
>>>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>>>> at
>>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
>>>> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
>>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
>>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
>>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
>>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
>>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>>>> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
>>>> at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
>>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
>>>> at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
>>>> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
>>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
>>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
>>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
>>>> at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
>>>> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
>>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
>>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
>>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>>>> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
>>>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
>>>> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
>>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>>>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>>>> at
>>>> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
>>>> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
>>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
>>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>>>> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>>>> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
>>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
>>>> at
>>>> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
>>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
>>>> at
>>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
>>>> at
>>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
>>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
>>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
>>>> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
>>>> at
>>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
>>>> at
>>>> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
>>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
>>>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
>>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
>>>> at
>>>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
>>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
>>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
>>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
>>>> at
>>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
>>>> ... 25 more
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: [hidden email]; [hidden email]
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: [hidden email]; [hidden email]
>>
>>
>>
>>
>>
>
>
>--
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

Benchao Li
Could you also provide us the DDL for lscsp_sc_order_all
and dim_app_cust_info ?

sunfulin <[hidden email]> 于2020年3月1日周日 下午9:22写道:

>
> *CREATE TABLE **realtime_product_sell *(
>   sor_pty_id *varchar*,
>   entrust_date *varchar*,
>   entrust_time *varchar*,
>   product_code *varchar *,
>   business_type *varchar *,
>   balance *double *,
>   cust_name *varchar *,
>   open_comp_name *varchar *,
>   open_comp_id *varchar *,
>   org_name *varchar *,
>   org_id *varchar *,
>   comp_name *varchar *,
>   comp_id *varchar *,
>   mng_name *varchar *,
>   mng_id *varchar *,
>   is_tg *varchar *,
>   cust_type *varchar *,
>   avg_tot_aset_y365 *double *,
>   avg_aset_create_y
> *double*) *WITH *(
> *'connector.type' *= *'elasticsearch'*,
> *'connector.version' *= *'<version>'*,
> *'connector.hosts' *= *'<host_port>'*,
> *'connector.index' *= *'realtime_product_sell_007118'*,
> *'connector.document-type' *= *'_doc'*,
> *'update-mode' *= *'upsert'*,
> *'connector.key-delimiter' *= *'$'*,
> *'connector.key-null-literal' *= *'n/a'*,
> *'connector.bulk-flush.interval' *= *'1000'*,
> *'format.type' *=
> *'json'*)
>
>
>
>
>
> At 2020-03-01 21:08:08, "Benchao Li" <[hidden email]> wrote:
> >The UDF looks good. Could you also paste your DDL? Then we can produce your
> >bug easily.
> >
> >sunfulin <[hidden email]> 于2020年3月1日周日 下午6:39写道:
> >
> >> Below is the code. The function trans origin field timeStr "2020-03-01
> >> 12:01:00.234" to target timeStr accroding to dayTag.
> >>
> >> *public class *ts2Date *extends *ScalarFunction {
> >>     *public *ts2Date() {
> >>
> >>     }
> >>
> >>
> >>     *public *String eval (String timeStr, *boolean *dayTag) {
> >>
> >>     *if*(timeStr == *null*) {
> >>         *return null*;
> >>     }
> >>     SimpleDateFormat ortSf = *new *SimpleDateFormat(*"yyyy-MM-dd
> >> HH:mm:ss.SSS"*);
> >>     Date date = *new *Date();
> >>     *try *{
> >>         date = ortSf.parse(timeStr);
> >>     } *catch *(ParseException e) {
> >>         e.printStackTrace();
> >>         *return null*;
> >>     }
> >>     *if *(dayTag) {
> >>         String format = *"yyyy-MM-dd"*;
> >>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >>         *return *sf.format(date);
> >>     } *else *{
> >>         String format = *"yyyy-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
> >>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >>         *return *sf.format(date);
> >>     }
> >> }
> >> }
> >>
> >>
> >>
> >> At 2020-03-01 18:14:30, "Benchao Li" <[hidden email]> wrote:
> >>
> >> Could you show how your UDF `ts2Date` is implemented?
> >>
> >> sunfulin <[hidden email]> 于2020年3月1日周日 下午6:05写道:
> >>
> >>> Hi, Benchao,
> >>> Thanks for the reply.
> >>>
> >>> Could you provide us more information?
> >>> 1. what planner are you using? blink or legacy planner?
> >>> I am using Blink Planner. Not test with legacy planner because my program
> >>> depend a lot of new feature based on blink planner.
> >>> 2. how do you register your UDF?
> >>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
> >>> ts2Date());    tableEnv is a StreamTableEnvironment.
> >>> 3. does this has a relation with checkpointing? what if you enable
> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >>> I don't think this is related with checkpoint. If I enable checkpointing
> >>> and not use my udf, I did not see any exception and submit job
> >>> successfully. If I disable checkpointing and use udf, the job can submit
> >>> successfully too.
> >>>
> >>> I dive a lot with this exception. Maybe it is related with some
> >>> classloader issue. Hope for your suggestion.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 在 2020-03-01 17:54:03,"Benchao Li" <[hidden email]> 写道:
> >>>
> >>> Hi fulin,
> >>>
> >>> It seems like a bug in the code generation.
> >>>
> >>> Could you provide us more information?
> >>> 1. what planner are you using? blink or legacy planner?
> >>> 2. how do you register your UDF?
> >>> 3. does this has a relation with checkpointing? what if you enable
> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >>>
> >>> sunfulin <[hidden email]> 于2020年3月1日周日 下午5:41写道:
> >>>
> >>>> Hi, guys
> >>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
> >>>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
> >>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
> >>>> failed to submit and throws exception like following. This is really weird,
> >>>> cause when I remove the UDF, the job can submit successfully. Any
> >>>> suggestion is highly appreciated. Besides, my sql logic is like :
> >>>>
> >>>> *INSERT INTO *realtime_product_sell
> >>>> *select *U.sor_pty_id,
> >>>>        U.entrust_date,
> >>>>        U.entrust_time,
> >>>>        U.product_code,
> >>>>        U.business_type,
> >>>>        sum(*cast*(U.balance *as double*)) *as *balance,
> >>>>        COALESCE(C.cust_name, *'--'*) *as *cust_name,
> >>>>        COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
> >>>>        COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
> >>>>        COALESCE(C.org_name,*'--'*) *as *org_name,
> >>>>        COALESCE(C.org_id,*'--'*) *as *comp_name,
> >>>>        COALESCE(C.comp_name, *'--'*) *AS *comp_name,
> >>>>        COALESCE(C.comp_id,*'--'*) *as *comp_id,
> >>>>        COALESCE(C.mng_name,*'--'*) *as *mng_name,
> >>>>        COALESCE(C.mng_id,*'--'*) *as *mng_id,
> >>>>        COALESCE(C.is_tg,*'--'*) *as *is_tg,
> >>>>        COALESCE(C.cust_type,*'--'*) *as *cust_type,
> >>>>        COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
> >>>>        COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
> >>>>
> >>>> *from*(*select *customerNumber *as *sor_pty_id,
> >>>>         ts2Date(`lastUpdateTime`, *true*) *as *entrust_date,     -- the
> >>>> UDF
> >>>>        ts2Date(`lastUpdateTime`, *false*) *as *entrust_time,     -- the
> >>>> UDF
> >>>>         fundCode *as *product_code,
> >>>>         businessType *as *business_type,
> >>>>         balance,
> >>>>         proctime
> >>>>       *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
> >>>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
> >>>>
> >>>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS
> >>>> *C
> >>>> *on **U*.sor_pty_id = *C*.cust_id
> >>>> *group by *sor_pty_id,
> >>>>         entrust_date,
> >>>>         entrust_time,
> >>>>         product_code,
> >>>>         business_type,
> >>>>         COALESCE(C.cust_name, *'--'*),
> >>>>         COALESCE(C.open_comp_name, *'--'*),
> >>>>         COALESCE(C.open_comp_id, *'--'*),
> >>>>         COALESCE(C.org_name,*'--'*),
> >>>>         COALESCE(C.org_id,*'--'*),
> >>>>         COALESCE(C.comp_name, *'--'*),
> >>>>         COALESCE(C.comp_id,*'--'*),
> >>>>         COALESCE(C.mng_name,*'--'*),
> >>>>         COALESCE(C.mng_id,*'--'*),
> >>>>         COALESCE(C.is_tg,*'--'*),
> >>>>         COALESCE(C.cust_type,*'--'*),
> >>>>         COALESCE(C.avg_tot_aset_y365, 0.00),
> >>>>         COALESCE(C.avg_aset_create_y, 0.00)
> >>>>
> >>>> 2020-03-01 17:22:06,504 ERROR
> >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled
> >>>> exception.
> >>>> org.apache.flink.util.FlinkRuntimeException:
> >>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> >>>> be compiled. This is a bug. Please file an issue.
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> >>>> at
> >>>> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> >>>> at
> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> >>>> at
> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> >>>> at
> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> >>>> at
> >>>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> >>>> at
> >>>> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> >>>> at
> >>>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> >>>> at
> >>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> >>>> at
> >>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
> >>>> at
> >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
> >>>> at
> >>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> >>>> at
> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >>>> at
> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >>>> at java.lang.Thread.run(Thread.java:748)
> >>>> Caused by:
> >>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> >>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> >>>> be compiled. This is a bug. Please file an issue.
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> >>>> ... 16 more
> >>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> >>>> program cannot be compiled. This is a bug. Please file an issue.
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> >>>> at
> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> >>>> ... 19 more
> >>>> Caused by: org.codehaus.commons.compiler.CompileException: Line 17,
> >>>> Column 30: Cannot determine simple type name "com"
> >>>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> >>>> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> >>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> >>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> >>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> >>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> >>>> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> >>>> at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
> >>>> at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
> >>>> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
> >>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
> >>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
> >>>> at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
> >>>> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
> >>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
> >>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> >>>> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
> >>>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
> >>>> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
> >>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> >>>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
> >>>> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
> >>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> >>>> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> >>>> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> >>>> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> >>>> at
> >>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> >>>> at
> >>>> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> >>>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> >>>> at
> >>>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> >>>> at
> >>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> >>>> ... 25 more
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>
> >>>
> >>> --
> >>>
> >>> Benchao Li
> >>> School of Electronics Engineering and Computer Science, Peking University
> >>> Tel:+86-15650713730
> >>> Email: [hidden email]; [hidden email]
> >>>
> >>>
> >>>
> >>>
> >>>
> >>
> >>
> >> --
> >>
> >> Benchao Li
> >> School of Electronics Engineering and Computer Science, Peking University
> >> Tel:+86-15650713730
> >> Email: [hidden email]; [hidden email]
> >>
> >>
> >>
> >>
> >>
> >
> >
> >--
> >
> >Benchao Li
> >School of Electronics Engineering and Computer Science, Peking University
> >Tel:+86-15650713730
> >Email: [hidden email]; [hidden email]
>
>
>
>
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

sunfulin







create table lscsp_sc_order_all (
  amount varchar  ,
  argType varchar,
  balance varchar,
  branchNo varchar  ,
  businessType varchar ,
  channelType varchar ,
  counterOrderNo varchar  ,
  counterRegisteredDate varchar,
  custAsset varchar  ,
  customerNumber varchar,
  customerType varchar,
  discountId varchar,
  doubleRecordFlag varchar,
  doubleRecordType varchar,
  exceedFlag varchar,
  fundAccount varchar,
  fundCode varchar,
  fundCompany varchar,
  fundName varchar,
  fundRecruitmentFlag varchar,
  id varchar,
  lastUpdateTime varchar,
  opBranchNo varchar,
  opStation varchar,
  orderNo varchar,
  orgEntrustNo varchar,
  orgOrderNo varchar,
  prodId varchar,
  prodInvestorType varchar,
  prodLeafType varchar,
  prodRisk varchar,
  prodRiskFlag varchar,
  prodRootType varchar,
  prodTerm varchar,
  prodVariety varchar,
  quaInvestorFlag varchar,
  quaInvestorSource varchar,
  quickPurchaseFlag varchar,
  remark varchar,
  remark1 varchar,
  remark2 varchar,
  remark3 varchar,
  riskFlag varchar,
  scRcvTime varchar,
  scSendTime varchar,
  signId varchar,
  signSpecialRiskFlag varchar,
  source varchar,
  status varchar,
  subRiskFlag varchar,
  sysNodeId varchar,
  taSerialNo varchar,
  termFlag varchar,
  token varchar,
  tradeConfirmDate varchar,
  transFundCode varchar,
  transProdId varchar,
  varietyFlag varchar,
  zlcftProdType varchar,
  proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
)
with
(
  'connector.type' = 'kafka',  -- 使用 kafka connector
  'connector.version' = '0.10',  -- kafka 版本,universal 支持 0.11 以上的版本
  'connector.topic' = '<topic>',  -- kafka topic
  'connector.startup-mode' = 'group-offsets',  -- 从起始 offset 开始读取
  'connector.properties.zookeeper.connect' = '<zk_connect>',  -- zookeeper 地址
  'connector.properties.bootstrap.servers' = '<broker_server>',  -- kafka broker 地址
  'connector.properties.group.id' = 'acrm-realtime-saleorder-consumer-1',
  'format.type' = 'json'  -- 数据源格式为 json
)







CREATE TABLE dim_app_cust_info (
    cust_id varchar ,
    open_comp_name varchar ,
    open_comp_id varchar ,
    org_name varchar ,
    org_id varchar,
    comp_name varchar ,
    comp_id varchar ,
    mng_name varchar ,
    mng_id varchar ,
    is_tg varchar ,
    cust_name varchar ,
    cust_type varchar,
    avg_tot_aset_y365 double ,
    avg_aset_create_y double
) WITH (
'connector.type' = 'jdbc',
'connector.url' = '<jdbc_url>',
'connector.table' = 'app_cust_serv_rel_info',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'admin',
'connector.password' = 'Windows7',
'connector.lookup.cache.max-rows' = '8000',
'connector.lookup.cache.ttl' = '30min',
'connector.lookup.max-retries' = '3'
)








At 2020-03-02 09:16:05, "Benchao Li" <[hidden email]> wrote:

>Could you also provide us the DDL for lscsp_sc_order_all
>and dim_app_cust_info ?
>
>sunfulin <[hidden email]> 于2020年3月1日周日 下午9:22写道:
>
>>
>> *CREATE TABLE **realtime_product_sell *(
>>   sor_pty_id *varchar*,
>>   entrust_date *varchar*,
>>   entrust_time *varchar*,
>>   product_code *varchar *,
>>   business_type *varchar *,
>>   balance *double *,
>>   cust_name *varchar *,
>>   open_comp_name *varchar *,
>>   open_comp_id *varchar *,
>>   org_name *varchar *,
>>   org_id *varchar *,
>>   comp_name *varchar *,
>>   comp_id *varchar *,
>>   mng_name *varchar *,
>>   mng_id *varchar *,
>>   is_tg *varchar *,
>>   cust_type *varchar *,
>>   avg_tot_aset_y365 *double *,
>>   avg_aset_create_y
>> *double*) *WITH *(
>> *'connector.type' *= *'elasticsearch'*,
>> *'connector.version' *= *'<version>'*,
>> *'connector.hosts' *= *'<host_port>'*,
>> *'connector.index' *= *'realtime_product_sell_007118'*,
>> *'connector.document-type' *= *'_doc'*,
>> *'update-mode' *= *'upsert'*,
>> *'connector.key-delimiter' *= *'$'*,
>> *'connector.key-null-literal' *= *'n/a'*,
>> *'connector.bulk-flush.interval' *= *'1000'*,
>> *'format.type' *=
>> *'json'*)
>>
>>
>>
>>
>>
>> At 2020-03-01 21:08:08, "Benchao Li" <[hidden email]> wrote:
>> >The UDF looks good. Could you also paste your DDL? Then we can produce your
>> >bug easily.
>> >
>> >sunfulin <[hidden email]> 于2020年3月1日周日 下午6:39写道:
>> >
>> >> Below is the code. The function trans origin field timeStr "2020-03-01
>> >> 12:01:00.234" to target timeStr accroding to dayTag.
>> >>
>> >> *public class *ts2Date *extends *ScalarFunction {
>> >>     *public *ts2Date() {
>> >>
>> >>     }
>> >>
>> >>
>> >>     *public *String eval (String timeStr, *boolean *dayTag) {
>> >>
>> >>     *if*(timeStr == *null*) {
>> >>         *return null*;
>> >>     }
>> >>     SimpleDateFormat ortSf = *new *SimpleDateFormat(*"yyyy-MM-dd
>> >> HH:mm:ss.SSS"*);
>> >>     Date date = *new *Date();
>> >>     *try *{
>> >>         date = ortSf.parse(timeStr);
>> >>     } *catch *(ParseException e) {
>> >>         e.printStackTrace();
>> >>         *return null*;
>> >>     }
>> >>     *if *(dayTag) {
>> >>         String format = *"yyyy-MM-dd"*;
>> >>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> >>         *return *sf.format(date);
>> >>     } *else *{
>> >>         String format = *"yyyy-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
>> >>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> >>         *return *sf.format(date);
>> >>     }
>> >> }
>> >> }
>> >>
>> >>
>> >>
>> >> At 2020-03-01 18:14:30, "Benchao Li" <[hidden email]> wrote:
>> >>
>> >> Could you show how your UDF `ts2Date` is implemented?
>> >>
>> >> sunfulin <[hidden email]> 于2020年3月1日周日 下午6:05写道:
>> >>
>> >>> Hi, Benchao,
>> >>> Thanks for the reply.
>> >>>
>> >>> Could you provide us more information?
>> >>> 1. what planner are you using? blink or legacy planner?
>> >>> I am using Blink Planner. Not test with legacy planner because my program
>> >>> depend a lot of new feature based on blink planner.
>> >>> 2. how do you register your UDF?
>> >>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
>> >>> ts2Date());    tableEnv is a StreamTableEnvironment.
>> >>> 3. does this has a relation with checkpointing? what if you enable
>> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
>> >>> I don't think this is related with checkpoint. If I enable checkpointing
>> >>> and not use my udf, I did not see any exception and submit job
>> >>> successfully. If I disable checkpointing and use udf, the job can submit
>> >>> successfully too.
>> >>>
>> >>> I dive a lot with this exception. Maybe it is related with some
>> >>> classloader issue. Hope for your suggestion.
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> 在 2020-03-01 17:54:03,"Benchao Li" <[hidden email]> 写道:
>> >>>
>> >>> Hi fulin,
>> >>>
>> >>> It seems like a bug in the code generation.
>> >>>
>> >>> Could you provide us more information?
>> >>> 1. what planner are you using? blink or legacy planner?
>> >>> 2. how do you register your UDF?
>> >>> 3. does this has a relation with checkpointing? what if you enable
>> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
>> >>>
>> >>> sunfulin <[hidden email]> 于2020年3月1日周日 下午5:41写道:
>> >>>
>> >>>> Hi, guys
>> >>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>> >>>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
>> >>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
>> >>>> failed to submit and throws exception like following. This is really weird,
>> >>>> cause when I remove the UDF, the job can submit successfully. Any
>> >>>> suggestion is highly appreciated. Besides, my sql logic is like :
>> >>>>
>> >>>> *INSERT INTO *realtime_product_sell
>> >>>> *select *U.sor_pty_id,
>> >>>>        U.entrust_date,
>> >>>>        U.entrust_time,
>> >>>>        U.product_code,
>> >>>>        U.business_type,
>> >>>>        sum(*cast*(U.balance *as double*)) *as *balance,
>> >>>>        COALESCE(C.cust_name, *'--'*) *as *cust_name,
>> >>>>        COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>> >>>>        COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>> >>>>        COALESCE(C.org_name,*'--'*) *as *org_name,
>> >>>>        COALESCE(C.org_id,*'--'*) *as *comp_name,
>> >>>>        COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>> >>>>        COALESCE(C.comp_id,*'--'*) *as *comp_id,
>> >>>>        COALESCE(C.mng_name,*'--'*) *as *mng_name,
>> >>>>        COALESCE(C.mng_id,*'--'*) *as *mng_id,
>> >>>>        COALESCE(C.is_tg,*'--'*) *as *is_tg,
>> >>>>        COALESCE(C.cust_type,*'--'*) *as *cust_type,
>> >>>>        COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>> >>>>        COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>> >>>>
>> >>>> *from*(*select *customerNumber *as *sor_pty_id,
>> >>>>         ts2Date(`lastUpdateTime`, *true*) *as *entrust_date,     -- the
>> >>>> UDF
>> >>>>        ts2Date(`lastUpdateTime`, *false*) *as *entrust_time,     -- the
>> >>>> UDF
>> >>>>         fundCode *as *product_code,
>> >>>>         businessType *as *business_type,
>> >>>>         balance,
>> >>>>         proctime
>> >>>>       *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
>> >>>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>> >>>>
>> >>>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS
>> >>>> *C
>> >>>> *on **U*.sor_pty_id = *C*.cust_id
>> >>>> *group by *sor_pty_id,
>> >>>>         entrust_date,
>> >>>>         entrust_time,
>> >>>>         product_code,
>> >>>>         business_type,
>> >>>>         COALESCE(C.cust_name, *'--'*),
>> >>>>         COALESCE(C.open_comp_name, *'--'*),
>> >>>>         COALESCE(C.open_comp_id, *'--'*),
>> >>>>         COALESCE(C.org_name,*'--'*),
>> >>>>         COALESCE(C.org_id,*'--'*),
>> >>>>         COALESCE(C.comp_name, *'--'*),
>> >>>>         COALESCE(C.comp_id,*'--'*),
>> >>>>         COALESCE(C.mng_name,*'--'*),
>> >>>>         COALESCE(C.mng_id,*'--'*),
>> >>>>         COALESCE(C.is_tg,*'--'*),
>> >>>>         COALESCE(C.cust_type,*'--'*),
>> >>>>         COALESCE(C.avg_tot_aset_y365, 0.00),
>> >>>>         COALESCE(C.avg_aset_create_y, 0.00)
>> >>>>
>> >>>> 2020-03-01 17:22:06,504 ERROR
>> >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled
>> >>>> exception.
>> >>>> org.apache.flink.util.FlinkRuntimeException:
>> >>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>> >>>> be compiled. This is a bug. Please file an issue.
>> >>>> at
>> >>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
>> >>>> at
>> >>>> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
>> >>>> at
>> >>>> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
>> >>>> at
>> >>>> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
>> >>>> at
>> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
>> >>>> at
>> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
>> >>>> at
>> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
>> >>>> at
>> >>>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
>> >>>> at
>> >>>> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
>> >>>> at
>> >>>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
>> >>>> at
>> >>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
>> >>>> at
>> >>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>> >>>> at
>> >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>> >>>> at
>> >>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> >>>> at
>> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >>>> at
>> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >>>> at java.lang.Thread.run(Thread.java:748)
>> >>>> Caused by:
>> >>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
>> >>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>> >>>> be compiled. This is a bug. Please file an issue.
>> >>>> at
>> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
>> >>>> at
>> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>> >>>> at
>> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>> >>>> at
>> >>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
>> >>>> ... 16 more
>> >>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
>> >>>> program cannot be compiled. This is a bug. Please file an issue.
>> >>>> at
>> >>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
>> >>>> at
>> >>>> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
>> >>>> at
>> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
>> >>>> at
>> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
>> >>>> at
>> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
>> >>>> at
>> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>> >>>> at
>> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>> >>>> ... 19 more
>> >>>> Caused by: org.codehaus.commons.compiler.CompileException: Line 17,
>> >>>> Column 30: Cannot determine simple type name "com"
>> >>>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
>> >>>> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
>> >>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
>> >>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
>> >>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
>> >>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
>> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>> >>>> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
>> >>>> at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
>> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
>> >>>> at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
>> >>>> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
>> >>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
>> >>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
>> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
>> >>>> at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
>> >>>> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
>> >>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
>> >>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
>> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>> >>>> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
>> >>>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
>> >>>> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
>> >>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>> >>>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
>> >>>> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
>> >>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
>> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>> >>>> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>> >>>> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
>> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
>> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
>> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
>> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
>> >>>> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
>> >>>> at
>> >>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
>> >>>> at
>> >>>> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
>> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
>> >>>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
>> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
>> >>>> at
>> >>>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
>> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
>> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
>> >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>> >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
>> >>>> at
>> >>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
>> >>>> ... 25 more
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>
>> >>>
>> >>> --
>> >>>
>> >>> Benchao Li
>> >>> School of Electronics Engineering and Computer Science, Peking University
>> >>> Tel:+86-15650713730
>> >>> Email: [hidden email]; [hidden email]
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>
>> >>
>> >> --
>> >>
>> >> Benchao Li
>> >> School of Electronics Engineering and Computer Science, Peking University
>> >> Tel:+86-15650713730
>> >> Email: [hidden email]; [hidden email]
>> >>
>> >>
>> >>
>> >>
>> >>
>> >
>> >
>> >--
>> >
>> >Benchao Li
>> >School of Electronics Engineering and Computer Science, Peking University
>> >Tel:+86-15650713730
>> >Email: [hidden email]; [hidden email]
>>
>>
>>
>>
>>
>
>
>--
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

Benchao Li
Hi fulin,

I cannot reproduce your exception on current master using your SQLs. I
searched the error message, it seems that this issue[1] is similar with
yours, but it seems that current compile util does not have this issue.

BTW, do you using 1.10?

[1] https://issues.apache.org/jira/browse/FLINK-7490

sunfulin <[hidden email]> 于2020年3月2日周一 上午11:17写道:

>
>
>
> *create table **lscsp_sc_order_all *(
>   amount *varchar  *,
>   argType *varchar*,
>   balance *varchar*,
>   branchNo *varchar  *,
>   businessType *varchar *,
>   channelType *varchar *,
>   counterOrderNo *varchar  *,
>   counterRegisteredDate *varchar*,
>   custAsset *varchar  *,
>   customerNumber *varchar*,
>   customerType *varchar*,
>   discountId *varchar*,
>   doubleRecordFlag *varchar*,
>   doubleRecordType *varchar*,
>   exceedFlag *varchar*,
>   fundAccount *varchar*,
>   fundCode *varchar*,
>   fundCompany *varchar*,
>   fundName *varchar*,
>   fundRecruitmentFlag *varchar*,
>   id *varchar*,
>   lastUpdateTime *varchar*,
>   opBranchNo *varchar*,
>   opStation *varchar*,
>   orderNo *varchar*,
>   orgEntrustNo *varchar*,
>   orgOrderNo *varchar*,
>   prodId *varchar*,
>   prodInvestorType *varchar*,
>   prodLeafType *varchar*,
>   prodRisk *varchar*,
>   prodRiskFlag *varchar*,
>   prodRootType *varchar*,
>   prodTerm *varchar*,
>   prodVariety *varchar*,
>   quaInvestorFlag *varchar*,
>   quaInvestorSource *varchar*,
>   quickPurchaseFlag *varchar*,
>   remark *varchar*,
>   remark1 *varchar*,
>   remark2 *varchar*,
>   remark3 *varchar*,
>   riskFlag *varchar*,
>   scRcvTime *varchar*,
>   scSendTime *varchar*,
>   signId *varchar*,
>   signSpecialRiskFlag *varchar*,
>   source *varchar*,
>   *status** varchar*,
>   subRiskFlag *varchar*,
>   sysNodeId *varchar*,
>   taSerialNo *varchar*,
>   termFlag *varchar*,
>   token *varchar*,
>   tradeConfirmDate *varchar*,
>   transFundCode *varchar*,
>   transProdId *varchar*,
>   varietyFlag *varchar*,
>   zlcftProdType *varchar*,
>   proctime *as *PROCTIME()
> *-- 通过计算列产生一个处理时间列*)
>
> *with*(
>   *'connector.type' *= *'kafka'*,
> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *'<topic>'*,
>
> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
> *'<zk_connect>'*,
> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
> *'<broker_server>'*,
> *-- kafka broker 地址  **'connector.properties.group.id
> <http://connector.properties.group.id>' *=
> *'acrm-realtime-saleorder-consumer-1'*,
>   *'format.type' *= *'json'  *
> *-- 数据源格式为 json*)
>
>
> *CREATE TABLE **dim_app_cust_info *(
>     cust_id *varchar *,
>     open_comp_name *varchar *,
>     open_comp_id *varchar *,
>     org_name *varchar *,
>     org_id *varchar*,
>     comp_name *varchar *,
>     comp_id *varchar *,
>     mng_name *varchar *,
>     mng_id *varchar *,
>     is_tg *varchar *,
>     cust_name *varchar *,
>     cust_type *varchar*,
>     avg_tot_aset_y365 *double *,
>     avg_aset_create_y
> *double*) *WITH *(
> *'connector.type' *= *'jdbc'*,
> *'connector.url' *= *'<jdbc_url>'*,
> *'connector.table' *= *'app_cust_serv_rel_info'*,
> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
> *'connector.username' *= *'admin'*,
> *'connector.password' *= *'Windows7'*,
> *'connector.lookup.cache.max-rows' *= *'8000'*,
> *'connector.lookup.cache.ttl' *= *'30min'*,
> *'connector.lookup.max-retries' *=
> *'3'*)
>
>
>
> At 2020-03-02 09:16:05, "Benchao Li" <[hidden email]> wrote:
> >Could you also provide us the DDL for lscsp_sc_order_all
> >and dim_app_cust_info ?
> >
> >sunfulin <[hidden email]> 于2020年3月1日周日 下午9:22写道:
> >
> >>
> >> *CREATE TABLE **realtime_product_sell *(
> >>   sor_pty_id *varchar*,
> >>   entrust_date *varchar*,
> >>   entrust_time *varchar*,
> >>   product_code *varchar *,
> >>   business_type *varchar *,
> >>   balance *double *,
> >>   cust_name *varchar *,
> >>   open_comp_name *varchar *,
> >>   open_comp_id *varchar *,
> >>   org_name *varchar *,
> >>   org_id *varchar *,
> >>   comp_name *varchar *,
> >>   comp_id *varchar *,
> >>   mng_name *varchar *,
> >>   mng_id *varchar *,
> >>   is_tg *varchar *,
> >>   cust_type *varchar *,
> >>   avg_tot_aset_y365 *double *,
> >>   avg_aset_create_y
> >> *double*) *WITH *(
> >> *'connector.type' *= *'elasticsearch'*,
> >> *'connector.version' *= *'<version>'*,
> >> *'connector.hosts' *= *'<host_port>'*,
> >> *'connector.index' *= *'realtime_product_sell_007118'*,
> >> *'connector.document-type' *= *'_doc'*,
> >> *'update-mode' *= *'upsert'*,
> >> *'connector.key-delimiter' *= *'$'*,
> >> *'connector.key-null-literal' *= *'n/a'*,
> >> *'connector.bulk-flush.interval' *= *'1000'*,
> >> *'format.type' *=
> >> *'json'*)
> >>
> >>
> >>
> >>
> >>
> >> At 2020-03-01 21:08:08, "Benchao Li" <[hidden email]> wrote:
> >> >The UDF looks good. Could you also paste your DDL? Then we can produce your
> >> >bug easily.
> >> >
> >> >sunfulin <[hidden email]> 于2020年3月1日周日 下午6:39写道:
> >> >
> >> >> Below is the code. The function trans origin field timeStr "2020-03-01
> >> >> 12:01:00.234" to target timeStr accroding to dayTag.
> >> >>
> >> >> *public class *ts2Date *extends *ScalarFunction {
> >> >>     *public *ts2Date() {
> >> >>
> >> >>     }
> >> >>
> >> >>
> >> >>     *public *String eval (String timeStr, *boolean *dayTag) {
> >> >>
> >> >>     *if*(timeStr == *null*) {
> >> >>         *return null*;
> >> >>     }
> >> >>     SimpleDateFormat ortSf = *new *SimpleDateFormat(*"yyyy-MM-dd
> >> >> HH:mm:ss.SSS"*);
> >> >>     Date date = *new *Date();
> >> >>     *try *{
> >> >>         date = ortSf.parse(timeStr);
> >> >>     } *catch *(ParseException e) {
> >> >>         e.printStackTrace();
> >> >>         *return null*;
> >> >>     }
> >> >>     *if *(dayTag) {
> >> >>         String format = *"yyyy-MM-dd"*;
> >> >>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >> >>         *return *sf.format(date);
> >> >>     } *else *{
> >> >>         String format = *"yyyy-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
> >> >>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >> >>         *return *sf.format(date);
> >> >>     }
> >> >> }
> >> >> }
> >> >>
> >> >>
> >> >>
> >> >> At 2020-03-01 18:14:30, "Benchao Li" <[hidden email]> wrote:
> >> >>
> >> >> Could you show how your UDF `ts2Date` is implemented?
> >> >>
> >> >> sunfulin <[hidden email]> 于2020年3月1日周日 下午6:05写道:
> >> >>
> >> >>> Hi, Benchao,
> >> >>> Thanks for the reply.
> >> >>>
> >> >>> Could you provide us more information?
> >> >>> 1. what planner are you using? blink or legacy planner?
> >> >>> I am using Blink Planner. Not test with legacy planner because my program
> >> >>> depend a lot of new feature based on blink planner.
> >> >>> 2. how do you register your UDF?
> >> >>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
> >> >>> ts2Date());    tableEnv is a StreamTableEnvironment.
> >> >>> 3. does this has a relation with checkpointing? what if you enable
> >> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >> >>> I don't think this is related with checkpoint. If I enable checkpointing
> >> >>> and not use my udf, I did not see any exception and submit job
> >> >>> successfully. If I disable checkpointing and use udf, the job can submit
> >> >>> successfully too.
> >> >>>
> >> >>> I dive a lot with this exception. Maybe it is related with some
> >> >>> classloader issue. Hope for your suggestion.
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> 在 2020-03-01 17:54:03,"Benchao Li" <[hidden email]> 写道:
> >> >>>
> >> >>> Hi fulin,
> >> >>>
> >> >>> It seems like a bug in the code generation.
> >> >>>
> >> >>> Could you provide us more information?
> >> >>> 1. what planner are you using? blink or legacy planner?
> >> >>> 2. how do you register your UDF?
> >> >>> 3. does this has a relation with checkpointing? what if you enable
> >> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >> >>>
> >> >>> sunfulin <[hidden email]> 于2020年3月1日周日 下午5:41写道:
> >> >>>
> >> >>>> Hi, guys
> >> >>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
> >> >>>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
> >> >>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
> >> >>>> failed to submit and throws exception like following. This is really weird,
> >> >>>> cause when I remove the UDF, the job can submit successfully. Any
> >> >>>> suggestion is highly appreciated. Besides, my sql logic is like :
> >> >>>>
> >> >>>> *INSERT INTO *realtime_product_sell
> >> >>>> *select *U.sor_pty_id,
> >> >>>>        U.entrust_date,
> >> >>>>        U.entrust_time,
> >> >>>>        U.product_code,
> >> >>>>        U.business_type,
> >> >>>>        sum(*cast*(U.balance *as double*)) *as *balance,
> >> >>>>        COALESCE(C.cust_name, *'--'*) *as *cust_name,
> >> >>>>        COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
> >> >>>>        COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
> >> >>>>        COALESCE(C.org_name,*'--'*) *as *org_name,
> >> >>>>        COALESCE(C.org_id,*'--'*) *as *comp_name,
> >> >>>>        COALESCE(C.comp_name, *'--'*) *AS *comp_name,
> >> >>>>        COALESCE(C.comp_id,*'--'*) *as *comp_id,
> >> >>>>        COALESCE(C.mng_name,*'--'*) *as *mng_name,
> >> >>>>        COALESCE(C.mng_id,*'--'*) *as *mng_id,
> >> >>>>        COALESCE(C.is_tg,*'--'*) *as *is_tg,
> >> >>>>        COALESCE(C.cust_type,*'--'*) *as *cust_type,
> >> >>>>        COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
> >> >>>>        COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
> >> >>>>
> >> >>>> *from*(*select *customerNumber *as *sor_pty_id,
> >> >>>>         ts2Date(`lastUpdateTime`, *true*) *as *entrust_date,     -- the
> >> >>>> UDF
> >> >>>>        ts2Date(`lastUpdateTime`, *false*) *as *entrust_time,     -- the
> >> >>>> UDF
> >> >>>>         fundCode *as *product_code,
> >> >>>>         businessType *as *business_type,
> >> >>>>         balance,
> >> >>>>         proctime
> >> >>>>       *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
> >> >>>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
> >> >>>>
> >> >>>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS
> >> >>>> *C
> >> >>>> *on **U*.sor_pty_id = *C*.cust_id
> >> >>>> *group by *sor_pty_id,
> >> >>>>         entrust_date,
> >> >>>>         entrust_time,
> >> >>>>         product_code,
> >> >>>>         business_type,
> >> >>>>         COALESCE(C.cust_name, *'--'*),
> >> >>>>         COALESCE(C.open_comp_name, *'--'*),
> >> >>>>         COALESCE(C.open_comp_id, *'--'*),
> >> >>>>         COALESCE(C.org_name,*'--'*),
> >> >>>>         COALESCE(C.org_id,*'--'*),
> >> >>>>         COALESCE(C.comp_name, *'--'*),
> >> >>>>         COALESCE(C.comp_id,*'--'*),
> >> >>>>         COALESCE(C.mng_name,*'--'*),
> >> >>>>         COALESCE(C.mng_id,*'--'*),
> >> >>>>         COALESCE(C.is_tg,*'--'*),
> >> >>>>         COALESCE(C.cust_type,*'--'*),
> >> >>>>         COALESCE(C.avg_tot_aset_y365, 0.00),
> >> >>>>         COALESCE(C.avg_aset_create_y, 0.00)
> >> >>>>
> >> >>>> 2020-03-01 17:22:06,504 ERROR
> >> >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled
> >> >>>> exception.
> >> >>>> org.apache.flink.util.FlinkRuntimeException:
> >> >>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> >> >>>> be compiled. This is a bug. Please file an issue.
> >> >>>> at
> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> >> >>>> at
> >> >>>> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> >> >>>> at
> >> >>>> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> >> >>>> at
> >> >>>> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> >> >>>> at
> >> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> >> >>>> at
> >> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> >> >>>> at
> >> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> >> >>>> at
> >> >>>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> >> >>>> at
> >> >>>> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> >> >>>> at
> >> >>>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> >> >>>> at
> >> >>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> >> >>>> at
> >> >>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
> >> >>>> at
> >> >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
> >> >>>> at
> >> >>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> >> >>>> at
> >> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> >>>> at
> >> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> >>>> at java.lang.Thread.run(Thread.java:748)
> >> >>>> Caused by:
> >> >>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> >> >>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> >> >>>> be compiled. This is a bug. Please file an issue.
> >> >>>> at
> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> >> >>>> at
> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> >> >>>> at
> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> >> >>>> at
> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> >> >>>> ... 16 more
> >> >>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> >> >>>> program cannot be compiled. This is a bug. Please file an issue.
> >> >>>> at
> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> >> >>>> at
> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> >> >>>> at
> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> >> >>>> at
> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> >> >>>> at
> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> >> >>>> at
> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> >> >>>> at
> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> >> >>>> ... 19 more
> >> >>>> Caused by: org.codehaus.commons.compiler.CompileException: Line 17,
> >> >>>> Column 30: Cannot determine simple type name "com"
> >> >>>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> >> >>>> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> >> >>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> >> >>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> >> >>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> >> >>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> >> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> >> >>>> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> >> >>>> at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
> >> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
> >> >>>> at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
> >> >>>> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
> >> >>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
> >> >>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
> >> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> >> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
> >> >>>> at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
> >> >>>> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
> >> >>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
> >> >>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
> >> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> >> >>>> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
> >> >>>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
> >> >>>> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
> >> >>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> >> >>>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> >> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
> >> >>>> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
> >> >>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> >> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
> >> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> >> >>>> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> >> >>>> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
> >> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> >> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> >> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> >> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> >> >>>> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> >> >>>> at
> >> >>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> >> >>>> at
> >> >>>> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> >> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> >> >>>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> >> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> >> >>>> at
> >> >>>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> >> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> >> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> >> >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> >> >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> >> >>>> at
> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> >> >>>> ... 25 more
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>>
> >> >>> Benchao Li
> >> >>> School of Electronics Engineering and Computer Science, Peking University
> >> >>> Tel:+86-15650713730
> >> >>> Email: [hidden email]; [hidden email]
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>
> >> >>
> >> >> --
> >> >>
> >> >> Benchao Li
> >> >> School of Electronics Engineering and Computer Science, Peking University
> >> >> Tel:+86-15650713730
> >> >> Email: [hidden email]; [hidden email]
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >
> >> >
> >> >--
> >> >
> >> >Benchao Li
> >> >School of Electronics Engineering and Computer Science, Peking University
> >> >Tel:+86-15650713730
> >> >Email: [hidden email]; [hidden email]
> >>
> >>
> >>
> >>
> >>
> >
> >
> >--
> >
> >Benchao Li
> >School of Electronics Engineering and Computer Science, Peking University
> >Tel:+86-15650713730
> >Email: [hidden email]; [hidden email]
>
>
>
>
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

sunfulin




Hi,
Yep, I am using 1.10
Did you submit the job in the cluster or just run it in your IDE? Because I can also run it successfully in my IDE, but cannot run it through cluster by a shading jar. So I think maybe the problem is related with maven jar classpath. But not sure about that.


If you can submit the job by a shade jar through cluster , could you share the project pom settings and sample test code ?







At 2020-03-02 20:36:06, "Benchao Li" <[hidden email]> wrote:

>Hi fulin,
>
>I cannot reproduce your exception on current master using your SQLs. I
>searched the error message, it seems that this issue[1] is similar with
>yours, but it seems that current compile util does not have this issue.
>
>BTW, do you using 1.10?
>
>[1] https://issues.apache.org/jira/browse/FLINK-7490
>
>sunfulin <[hidden email]> 于2020年3月2日周一 上午11:17写道:
>
>>
>>
>>
>> *create table **lscsp_sc_order_all *(
>>   amount *varchar  *,
>>   argType *varchar*,
>>   balance *varchar*,
>>   branchNo *varchar  *,
>>   businessType *varchar *,
>>   channelType *varchar *,
>>   counterOrderNo *varchar  *,
>>   counterRegisteredDate *varchar*,
>>   custAsset *varchar  *,
>>   customerNumber *varchar*,
>>   customerType *varchar*,
>>   discountId *varchar*,
>>   doubleRecordFlag *varchar*,
>>   doubleRecordType *varchar*,
>>   exceedFlag *varchar*,
>>   fundAccount *varchar*,
>>   fundCode *varchar*,
>>   fundCompany *varchar*,
>>   fundName *varchar*,
>>   fundRecruitmentFlag *varchar*,
>>   id *varchar*,
>>   lastUpdateTime *varchar*,
>>   opBranchNo *varchar*,
>>   opStation *varchar*,
>>   orderNo *varchar*,
>>   orgEntrustNo *varchar*,
>>   orgOrderNo *varchar*,
>>   prodId *varchar*,
>>   prodInvestorType *varchar*,
>>   prodLeafType *varchar*,
>>   prodRisk *varchar*,
>>   prodRiskFlag *varchar*,
>>   prodRootType *varchar*,
>>   prodTerm *varchar*,
>>   prodVariety *varchar*,
>>   quaInvestorFlag *varchar*,
>>   quaInvestorSource *varchar*,
>>   quickPurchaseFlag *varchar*,
>>   remark *varchar*,
>>   remark1 *varchar*,
>>   remark2 *varchar*,
>>   remark3 *varchar*,
>>   riskFlag *varchar*,
>>   scRcvTime *varchar*,
>>   scSendTime *varchar*,
>>   signId *varchar*,
>>   signSpecialRiskFlag *varchar*,
>>   source *varchar*,
>>   *status** varchar*,
>>   subRiskFlag *varchar*,
>>   sysNodeId *varchar*,
>>   taSerialNo *varchar*,
>>   termFlag *varchar*,
>>   token *varchar*,
>>   tradeConfirmDate *varchar*,
>>   transFundCode *varchar*,
>>   transProdId *varchar*,
>>   varietyFlag *varchar*,
>>   zlcftProdType *varchar*,
>>   proctime *as *PROCTIME()
>> *-- 通过计算列产生一个处理时间列*)
>>
>> *with*(
>>   *'connector.type' *= *'kafka'*,
>> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
>> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *'<topic>'*,
>>
>> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
>> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
>> *'<zk_connect>'*,
>> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
>> *'<broker_server>'*,
>> *-- kafka broker 地址  **'connector.properties.group.id
>> <http://connector.properties.group.id>' *=
>> *'acrm-realtime-saleorder-consumer-1'*,
>>   *'format.type' *= *'json'  *
>> *-- 数据源格式为 json*)
>>
>>
>> *CREATE TABLE **dim_app_cust_info *(
>>     cust_id *varchar *,
>>     open_comp_name *varchar *,
>>     open_comp_id *varchar *,
>>     org_name *varchar *,
>>     org_id *varchar*,
>>     comp_name *varchar *,
>>     comp_id *varchar *,
>>     mng_name *varchar *,
>>     mng_id *varchar *,
>>     is_tg *varchar *,
>>     cust_name *varchar *,
>>     cust_type *varchar*,
>>     avg_tot_aset_y365 *double *,
>>     avg_aset_create_y
>> *double*) *WITH *(
>> *'connector.type' *= *'jdbc'*,
>> *'connector.url' *= *'<jdbc_url>'*,
>> *'connector.table' *= *'app_cust_serv_rel_info'*,
>> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
>> *'connector.username' *= *'admin'*,
>> *'connector.password' *= *'Windows7'*,
>> *'connector.lookup.cache.max-rows' *= *'8000'*,
>> *'connector.lookup.cache.ttl' *= *'30min'*,
>> *'connector.lookup.max-retries' *=
>> *'3'*)
>>
>>
>>
>> At 2020-03-02 09:16:05, "Benchao Li" <[hidden email]> wrote:
>> >Could you also provide us the DDL for lscsp_sc_order_all
>> >and dim_app_cust_info ?
>> >
>> >sunfulin <[hidden email]> 于2020年3月1日周日 下午9:22写道:
>> >
>> >>
>> >> *CREATE TABLE **realtime_product_sell *(
>> >>   sor_pty_id *varchar*,
>> >>   entrust_date *varchar*,
>> >>   entrust_time *varchar*,
>> >>   product_code *varchar *,
>> >>   business_type *varchar *,
>> >>   balance *double *,
>> >>   cust_name *varchar *,
>> >>   open_comp_name *varchar *,
>> >>   open_comp_id *varchar *,
>> >>   org_name *varchar *,
>> >>   org_id *varchar *,
>> >>   comp_name *varchar *,
>> >>   comp_id *varchar *,
>> >>   mng_name *varchar *,
>> >>   mng_id *varchar *,
>> >>   is_tg *varchar *,
>> >>   cust_type *varchar *,
>> >>   avg_tot_aset_y365 *double *,
>> >>   avg_aset_create_y
>> >> *double*) *WITH *(
>> >> *'connector.type' *= *'elasticsearch'*,
>> >> *'connector.version' *= *'<version>'*,
>> >> *'connector.hosts' *= *'<host_port>'*,
>> >> *'connector.index' *= *'realtime_product_sell_007118'*,
>> >> *'connector.document-type' *= *'_doc'*,
>> >> *'update-mode' *= *'upsert'*,
>> >> *'connector.key-delimiter' *= *'$'*,
>> >> *'connector.key-null-literal' *= *'n/a'*,
>> >> *'connector.bulk-flush.interval' *= *'1000'*,
>> >> *'format.type' *=
>> >> *'json'*)
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> At 2020-03-01 21:08:08, "Benchao Li" <[hidden email]> wrote:
>> >> >The UDF looks good. Could you also paste your DDL? Then we can produce your
>> >> >bug easily.
>> >> >
>> >> >sunfulin <[hidden email]> 于2020年3月1日周日 下午6:39写道:
>> >> >
>> >> >> Below is the code. The function trans origin field timeStr "2020-03-01
>> >> >> 12:01:00.234" to target timeStr accroding to dayTag.
>> >> >>
>> >> >> *public class *ts2Date *extends *ScalarFunction {
>> >> >>     *public *ts2Date() {
>> >> >>
>> >> >>     }
>> >> >>
>> >> >>
>> >> >>     *public *String eval (String timeStr, *boolean *dayTag) {
>> >> >>
>> >> >>     *if*(timeStr == *null*) {
>> >> >>         *return null*;
>> >> >>     }
>> >> >>     SimpleDateFormat ortSf = *new *SimpleDateFormat(*"yyyy-MM-dd
>> >> >> HH:mm:ss.SSS"*);
>> >> >>     Date date = *new *Date();
>> >> >>     *try *{
>> >> >>         date = ortSf.parse(timeStr);
>> >> >>     } *catch *(ParseException e) {
>> >> >>         e.printStackTrace();
>> >> >>         *return null*;
>> >> >>     }
>> >> >>     *if *(dayTag) {
>> >> >>         String format = *"yyyy-MM-dd"*;
>> >> >>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> >> >>         *return *sf.format(date);
>> >> >>     } *else *{
>> >> >>         String format = *"yyyy-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
>> >> >>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
>> >> >>         *return *sf.format(date);
>> >> >>     }
>> >> >> }
>> >> >> }
>> >> >>
>> >> >>
>> >> >>
>> >> >> At 2020-03-01 18:14:30, "Benchao Li" <[hidden email]> wrote:
>> >> >>
>> >> >> Could you show how your UDF `ts2Date` is implemented?
>> >> >>
>> >> >> sunfulin <[hidden email]> 于2020年3月1日周日 下午6:05写道:
>> >> >>
>> >> >>> Hi, Benchao,
>> >> >>> Thanks for the reply.
>> >> >>>
>> >> >>> Could you provide us more information?
>> >> >>> 1. what planner are you using? blink or legacy planner?
>> >> >>> I am using Blink Planner. Not test with legacy planner because my program
>> >> >>> depend a lot of new feature based on blink planner.
>> >> >>> 2. how do you register your UDF?
>> >> >>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
>> >> >>> ts2Date());    tableEnv is a StreamTableEnvironment.
>> >> >>> 3. does this has a relation with checkpointing? what if you enable
>> >> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
>> >> >>> I don't think this is related with checkpoint. If I enable checkpointing
>> >> >>> and not use my udf, I did not see any exception and submit job
>> >> >>> successfully. If I disable checkpointing and use udf, the job can submit
>> >> >>> successfully too.
>> >> >>>
>> >> >>> I dive a lot with this exception. Maybe it is related with some
>> >> >>> classloader issue. Hope for your suggestion.
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> 在 2020-03-01 17:54:03,"Benchao Li" <[hidden email]> 写道:
>> >> >>>
>> >> >>> Hi fulin,
>> >> >>>
>> >> >>> It seems like a bug in the code generation.
>> >> >>>
>> >> >>> Could you provide us more information?
>> >> >>> 1. what planner are you using? blink or legacy planner?
>> >> >>> 2. how do you register your UDF?
>> >> >>> 3. does this has a relation with checkpointing? what if you enable
>> >> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
>> >> >>>
>> >> >>> sunfulin <[hidden email]> 于2020年3月1日周日 下午5:41写道:
>> >> >>>
>> >> >>>> Hi, guys
>> >> >>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
>> >> >>>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
>> >> >>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
>> >> >>>> failed to submit and throws exception like following. This is really weird,
>> >> >>>> cause when I remove the UDF, the job can submit successfully. Any
>> >> >>>> suggestion is highly appreciated. Besides, my sql logic is like :
>> >> >>>>
>> >> >>>> *INSERT INTO *realtime_product_sell
>> >> >>>> *select *U.sor_pty_id,
>> >> >>>>        U.entrust_date,
>> >> >>>>        U.entrust_time,
>> >> >>>>        U.product_code,
>> >> >>>>        U.business_type,
>> >> >>>>        sum(*cast*(U.balance *as double*)) *as *balance,
>> >> >>>>        COALESCE(C.cust_name, *'--'*) *as *cust_name,
>> >> >>>>        COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
>> >> >>>>        COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
>> >> >>>>        COALESCE(C.org_name,*'--'*) *as *org_name,
>> >> >>>>        COALESCE(C.org_id,*'--'*) *as *comp_name,
>> >> >>>>        COALESCE(C.comp_name, *'--'*) *AS *comp_name,
>> >> >>>>        COALESCE(C.comp_id,*'--'*) *as *comp_id,
>> >> >>>>        COALESCE(C.mng_name,*'--'*) *as *mng_name,
>> >> >>>>        COALESCE(C.mng_id,*'--'*) *as *mng_id,
>> >> >>>>        COALESCE(C.is_tg,*'--'*) *as *is_tg,
>> >> >>>>        COALESCE(C.cust_type,*'--'*) *as *cust_type,
>> >> >>>>        COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
>> >> >>>>        COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
>> >> >>>>
>> >> >>>> *from*(*select *customerNumber *as *sor_pty_id,
>> >> >>>>         ts2Date(`lastUpdateTime`, *true*) *as *entrust_date,     -- the
>> >> >>>> UDF
>> >> >>>>        ts2Date(`lastUpdateTime`, *false*) *as *entrust_time,     -- the
>> >> >>>> UDF
>> >> >>>>         fundCode *as *product_code,
>> >> >>>>         businessType *as *business_type,
>> >> >>>>         balance,
>> >> >>>>         proctime
>> >> >>>>       *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
>> >> >>>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
>> >> >>>>
>> >> >>>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS
>> >> >>>> *C
>> >> >>>> *on **U*.sor_pty_id = *C*.cust_id
>> >> >>>> *group by *sor_pty_id,
>> >> >>>>         entrust_date,
>> >> >>>>         entrust_time,
>> >> >>>>         product_code,
>> >> >>>>         business_type,
>> >> >>>>         COALESCE(C.cust_name, *'--'*),
>> >> >>>>         COALESCE(C.open_comp_name, *'--'*),
>> >> >>>>         COALESCE(C.open_comp_id, *'--'*),
>> >> >>>>         COALESCE(C.org_name,*'--'*),
>> >> >>>>         COALESCE(C.org_id,*'--'*),
>> >> >>>>         COALESCE(C.comp_name, *'--'*),
>> >> >>>>         COALESCE(C.comp_id,*'--'*),
>> >> >>>>         COALESCE(C.mng_name,*'--'*),
>> >> >>>>         COALESCE(C.mng_id,*'--'*),
>> >> >>>>         COALESCE(C.is_tg,*'--'*),
>> >> >>>>         COALESCE(C.cust_type,*'--'*),
>> >> >>>>         COALESCE(C.avg_tot_aset_y365, 0.00),
>> >> >>>>         COALESCE(C.avg_aset_create_y, 0.00)
>> >> >>>>
>> >> >>>> 2020-03-01 17:22:06,504 ERROR
>> >> >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled
>> >> >>>> exception.
>> >> >>>> org.apache.flink.util.FlinkRuntimeException:
>> >> >>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>> >> >>>> be compiled. This is a bug. Please file an issue.
>> >> >>>> at
>> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
>> >> >>>> at
>> >> >>>> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
>> >> >>>> at
>> >> >>>> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
>> >> >>>> at
>> >> >>>> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
>> >> >>>> at
>> >> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
>> >> >>>> at
>> >> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
>> >> >>>> at
>> >> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
>> >> >>>> at
>> >> >>>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
>> >> >>>> at
>> >> >>>> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
>> >> >>>> at
>> >> >>>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
>> >> >>>> at
>> >> >>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
>> >> >>>> at
>> >> >>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>> >> >>>> at
>> >> >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>> >> >>>> at
>> >> >>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> >> >>>> at
>> >> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >> >>>> at
>> >> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >> >>>> at java.lang.Thread.run(Thread.java:748)
>> >> >>>> Caused by:
>> >> >>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
>> >> >>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>> >> >>>> be compiled. This is a bug. Please file an issue.
>> >> >>>> at
>> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
>> >> >>>> at
>> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>> >> >>>> at
>> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>> >> >>>> at
>> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
>> >> >>>> ... 16 more
>> >> >>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
>> >> >>>> program cannot be compiled. This is a bug. Please file an issue.
>> >> >>>> at
>> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
>> >> >>>> at
>> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
>> >> >>>> at
>> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
>> >> >>>> at
>> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
>> >> >>>> at
>> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
>> >> >>>> at
>> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>> >> >>>> at
>> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>> >> >>>> ... 19 more
>> >> >>>> Caused by: org.codehaus.commons.compiler.CompileException: Line 17,
>> >> >>>> Column 30: Cannot determine simple type name "com"
>> >> >>>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
>> >> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
>> >> >>>> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
>> >> >>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
>> >> >>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
>> >> >>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
>> >> >>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
>> >> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>> >> >>>> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
>> >> >>>> at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
>> >> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
>> >> >>>> at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
>> >> >>>> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
>> >> >>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
>> >> >>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
>> >> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>> >> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
>> >> >>>> at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
>> >> >>>> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
>> >> >>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
>> >> >>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
>> >> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
>> >> >>>> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
>> >> >>>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
>> >> >>>> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
>> >> >>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>> >> >>>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>> >> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
>> >> >>>> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
>> >> >>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>> >> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
>> >> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>> >> >>>> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>> >> >>>> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
>> >> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
>> >> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
>> >> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
>> >> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
>> >> >>>> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
>> >> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
>> >> >>>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
>> >> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
>> >> >>>> at
>> >> >>>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
>> >> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
>> >> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
>> >> >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>> >> >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
>> >> >>>> at
>> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
>> >> >>>> ... 25 more
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>
>> >> >>>
>> >> >>> --
>> >> >>>
>> >> >>> Benchao Li
>> >> >>> School of Electronics Engineering and Computer Science, Peking University
>> >> >>> Tel:+86-15650713730
>> >> >>> Email: [hidden email]; [hidden email]
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>
>> >> >>
>> >> >> --
>> >> >>
>> >> >> Benchao Li
>> >> >> School of Electronics Engineering and Computer Science, Peking University
>> >> >> Tel:+86-15650713730
>> >> >> Email: [hidden email]; [hidden email]
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >
>> >> >
>> >> >--
>> >> >
>> >> >Benchao Li
>> >> >School of Electronics Engineering and Computer Science, Peking University
>> >> >Tel:+86-15650713730
>> >> >Email: [hidden email]; [hidden email]
>> >>
>> >>
>> >>
>> >>
>> >>
>> >
>> >
>> >--
>> >
>> >Benchao Li
>> >School of Electronics Engineering and Computer Science, Peking University
>> >Tel:+86-15650713730
>> >Email: [hidden email]; [hidden email]
>>
>>
>>
>>
>>
>
>
>--
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

Benchao Li
I just run it in my IDE.

sunfulin <[hidden email]> 于2020年3月2日周一 下午9:04写道:

>
>
> Hi,
> Yep, I am using 1.10
> Did you submit the job in the cluster or just run it in your IDE? Because
> I can also run it successfully in my IDE, but cannot run it through cluster
> by a shading jar. So I think maybe the problem is related with maven jar
> classpath. But not sure about that.
>
> If you can submit the job by a shade jar through cluster , could you share
> the project pom settings and sample test code ?
>
>
>
>
> At 2020-03-02 20:36:06, "Benchao Li" <[hidden email]> wrote:
> >Hi fulin,
> >
> >I cannot reproduce your exception on current master using your SQLs. I
> >searched the error message, it seems that this issue[1] is similar with
> >yours, but it seems that current compile util does not have this issue.
> >
> >BTW, do you using 1.10?
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-7490
> >
> >sunfulin <[hidden email]> 于2020年3月2日周一 上午11:17写道:
> >
> >>
> >>
> >>
> >> *create table **lscsp_sc_order_all *(
> >>   amount *varchar  *,
> >>   argType *varchar*,
> >>   balance *varchar*,
> >>   branchNo *varchar  *,
> >>   businessType *varchar *,
> >>   channelType *varchar *,
> >>   counterOrderNo *varchar  *,
> >>   counterRegisteredDate *varchar*,
> >>   custAsset *varchar  *,
> >>   customerNumber *varchar*,
> >>   customerType *varchar*,
> >>   discountId *varchar*,
> >>   doubleRecordFlag *varchar*,
> >>   doubleRecordType *varchar*,
> >>   exceedFlag *varchar*,
> >>   fundAccount *varchar*,
> >>   fundCode *varchar*,
> >>   fundCompany *varchar*,
> >>   fundName *varchar*,
> >>   fundRecruitmentFlag *varchar*,
> >>   id *varchar*,
> >>   lastUpdateTime *varchar*,
> >>   opBranchNo *varchar*,
> >>   opStation *varchar*,
> >>   orderNo *varchar*,
> >>   orgEntrustNo *varchar*,
> >>   orgOrderNo *varchar*,
> >>   prodId *varchar*,
> >>   prodInvestorType *varchar*,
> >>   prodLeafType *varchar*,
> >>   prodRisk *varchar*,
> >>   prodRiskFlag *varchar*,
> >>   prodRootType *varchar*,
> >>   prodTerm *varchar*,
> >>   prodVariety *varchar*,
> >>   quaInvestorFlag *varchar*,
> >>   quaInvestorSource *varchar*,
> >>   quickPurchaseFlag *varchar*,
> >>   remark *varchar*,
> >>   remark1 *varchar*,
> >>   remark2 *varchar*,
> >>   remark3 *varchar*,
> >>   riskFlag *varchar*,
> >>   scRcvTime *varchar*,
> >>   scSendTime *varchar*,
> >>   signId *varchar*,
> >>   signSpecialRiskFlag *varchar*,
> >>   source *varchar*,
> >>   *status** varchar*,
> >>   subRiskFlag *varchar*,
> >>   sysNodeId *varchar*,
> >>   taSerialNo *varchar*,
> >>   termFlag *varchar*,
> >>   token *varchar*,
> >>   tradeConfirmDate *varchar*,
> >>   transFundCode *varchar*,
> >>   transProdId *varchar*,
> >>   varietyFlag *varchar*,
> >>   zlcftProdType *varchar*,
> >>   proctime *as *PROCTIME()
> >> *-- 通过计算列产生一个处理时间列*)
> >>
> >> *with*(
> >>   *'connector.type' *= *'kafka'*,
> >> *-- 使用 kafka connector  **'connector.version' *= *'0.10'*,
> >> *-- kafka 版本,universal 支持 0.11 以上的版本  **'connector.topic' *= *'<topic>'*,
> >>
> >> *-- kafka topic  **'connector.startup-mode' *= *'group-offsets'*,
> >> *-- 从起始 offset 开始读取  **'connector.properties.zookeeper.connect' *=
> >> *'<zk_connect>'*,
> >> *-- zookeeper 地址  **'connector.properties.bootstrap.servers' *=
> >> *'<broker_server>'*,
> >> *-- kafka broker 地址  **'connector.properties.group.id
> >> <http://connector.properties.group.id>' *=
> >> *'acrm-realtime-saleorder-consumer-1'*,
> >>   *'format.type' *= *'json'  *
> >> *-- 数据源格式为 json*)
> >>
> >>
> >> *CREATE TABLE **dim_app_cust_info *(
> >>     cust_id *varchar *,
> >>     open_comp_name *varchar *,
> >>     open_comp_id *varchar *,
> >>     org_name *varchar *,
> >>     org_id *varchar*,
> >>     comp_name *varchar *,
> >>     comp_id *varchar *,
> >>     mng_name *varchar *,
> >>     mng_id *varchar *,
> >>     is_tg *varchar *,
> >>     cust_name *varchar *,
> >>     cust_type *varchar*,
> >>     avg_tot_aset_y365 *double *,
> >>     avg_aset_create_y
> >> *double*) *WITH *(
> >> *'connector.type' *= *'jdbc'*,
> >> *'connector.url' *= *'<jdbc_url>'*,
> >> *'connector.table' *= *'app_cust_serv_rel_info'*,
> >> *'connector.driver' *= *'com.mysql.jdbc.Driver'*,
> >> *'connector.username' *= *'admin'*,
> >> *'connector.password' *= *'Windows7'*,
> >> *'connector.lookup.cache.max-rows' *= *'8000'*,
> >> *'connector.lookup.cache.ttl' *= *'30min'*,
> >> *'connector.lookup.max-retries' *=
> >> *'3'*)
> >>
> >>
> >>
> >> At 2020-03-02 09:16:05, "Benchao Li" <[hidden email]> wrote:
> >> >Could you also provide us the DDL for lscsp_sc_order_all
> >> >and dim_app_cust_info ?
> >> >
> >> >sunfulin <[hidden email]> 于2020年3月1日周日 下午9:22写道:
> >> >
> >> >>
> >> >> *CREATE TABLE **realtime_product_sell *(
> >> >>   sor_pty_id *varchar*,
> >> >>   entrust_date *varchar*,
> >> >>   entrust_time *varchar*,
> >> >>   product_code *varchar *,
> >> >>   business_type *varchar *,
> >> >>   balance *double *,
> >> >>   cust_name *varchar *,
> >> >>   open_comp_name *varchar *,
> >> >>   open_comp_id *varchar *,
> >> >>   org_name *varchar *,
> >> >>   org_id *varchar *,
> >> >>   comp_name *varchar *,
> >> >>   comp_id *varchar *,
> >> >>   mng_name *varchar *,
> >> >>   mng_id *varchar *,
> >> >>   is_tg *varchar *,
> >> >>   cust_type *varchar *,
> >> >>   avg_tot_aset_y365 *double *,
> >> >>   avg_aset_create_y
> >> >> *double*) *WITH *(
> >> >> *'connector.type' *= *'elasticsearch'*,
> >> >> *'connector.version' *= *'<version>'*,
> >> >> *'connector.hosts' *= *'<host_port>'*,
> >> >> *'connector.index' *= *'realtime_product_sell_007118'*,
> >> >> *'connector.document-type' *= *'_doc'*,
> >> >> *'update-mode' *= *'upsert'*,
> >> >> *'connector.key-delimiter' *= *'$'*,
> >> >> *'connector.key-null-literal' *= *'n/a'*,
> >> >> *'connector.bulk-flush.interval' *= *'1000'*,
> >> >> *'format.type' *=
> >> >> *'json'*)
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> At 2020-03-01 21:08:08, "Benchao Li" <[hidden email]> wrote:
> >> >> >The UDF looks good. Could you also paste your DDL? Then we can produce your
> >> >> >bug easily.
> >> >> >
> >> >> >sunfulin <[hidden email]> 于2020年3月1日周日 下午6:39写道:
> >> >> >
> >> >> >> Below is the code. The function trans origin field timeStr "2020-03-01
> >> >> >> 12:01:00.234" to target timeStr accroding to dayTag.
> >> >> >>
> >> >> >> *public class *ts2Date *extends *ScalarFunction {
> >> >> >>     *public *ts2Date() {
> >> >> >>
> >> >> >>     }
> >> >> >>
> >> >> >>
> >> >> >>     *public *String eval (String timeStr, *boolean *dayTag) {
> >> >> >>
> >> >> >>     *if*(timeStr == *null*) {
> >> >> >>         *return null*;
> >> >> >>     }
> >> >> >>     SimpleDateFormat ortSf = *new *SimpleDateFormat(*"yyyy-MM-dd
> >> >> >> HH:mm:ss.SSS"*);
> >> >> >>     Date date = *new *Date();
> >> >> >>     *try *{
> >> >> >>         date = ortSf.parse(timeStr);
> >> >> >>     } *catch *(ParseException e) {
> >> >> >>         e.printStackTrace();
> >> >> >>         *return null*;
> >> >> >>     }
> >> >> >>     *if *(dayTag) {
> >> >> >>         String format = *"yyyy-MM-dd"*;
> >> >> >>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >> >> >>         *return *sf.format(date);
> >> >> >>     } *else *{
> >> >> >>         String format = *"yyyy-MM-dd**\'**T**\'**HH:mm:00.000+0800"*;
> >> >> >>         SimpleDateFormat sf = *new *SimpleDateFormat(format);
> >> >> >>         *return *sf.format(date);
> >> >> >>     }
> >> >> >> }
> >> >> >> }
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> At 2020-03-01 18:14:30, "Benchao Li" <[hidden email]> wrote:
> >> >> >>
> >> >> >> Could you show how your UDF `ts2Date` is implemented?
> >> >> >>
> >> >> >> sunfulin <[hidden email]> 于2020年3月1日周日 下午6:05写道:
> >> >> >>
> >> >> >>> Hi, Benchao,
> >> >> >>> Thanks for the reply.
> >> >> >>>
> >> >> >>> Could you provide us more information?
> >> >> >>> 1. what planner are you using? blink or legacy planner?
> >> >> >>> I am using Blink Planner. Not test with legacy planner because my program
> >> >> >>> depend a lot of new feature based on blink planner.
> >> >> >>> 2. how do you register your UDF?
> >> >> >>> Just use the code :  tableEnv.registerFunction ("ts2Date", new
> >> >> >>> ts2Date());    tableEnv is a StreamTableEnvironment.
> >> >> >>> 3. does this has a relation with checkpointing? what if you enable
> >> >> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >> >> >>> I don't think this is related with checkpoint. If I enable checkpointing
> >> >> >>> and not use my udf, I did not see any exception and submit job
> >> >> >>> successfully. If I disable checkpointing and use udf, the job can submit
> >> >> >>> successfully too.
> >> >> >>>
> >> >> >>> I dive a lot with this exception. Maybe it is related with some
> >> >> >>> classloader issue. Hope for your suggestion.
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> 在 2020-03-01 17:54:03,"Benchao Li" <[hidden email]> 写道:
> >> >> >>>
> >> >> >>> Hi fulin,
> >> >> >>>
> >> >> >>> It seems like a bug in the code generation.
> >> >> >>>
> >> >> >>> Could you provide us more information?
> >> >> >>> 1. what planner are you using? blink or legacy planner?
> >> >> >>> 2. how do you register your UDF?
> >> >> >>> 3. does this has a relation with checkpointing? what if you enable
> >> >> >>> checkpointing and not use your udf? and disable checkpointing and use udf?
> >> >> >>>
> >> >> >>> sunfulin <[hidden email]> 于2020年3月1日周日 下午5:41写道:
> >> >> >>>
> >> >> >>>> Hi, guys
> >> >> >>>> I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch.
> >> >> >>>> In my sql logic, I am using a UDF like ts2Date to handle date format stream
> >> >> >>>> fields. However, when I add the `env.enableCheckpointing(time)`, my job
> >> >> >>>> failed to submit and throws exception like following. This is really weird,
> >> >> >>>> cause when I remove the UDF, the job can submit successfully. Any
> >> >> >>>> suggestion is highly appreciated. Besides, my sql logic is like :
> >> >> >>>>
> >> >> >>>> *INSERT INTO *realtime_product_sell
> >> >> >>>> *select *U.sor_pty_id,
> >> >> >>>>        U.entrust_date,
> >> >> >>>>        U.entrust_time,
> >> >> >>>>        U.product_code,
> >> >> >>>>        U.business_type,
> >> >> >>>>        sum(*cast*(U.balance *as double*)) *as *balance,
> >> >> >>>>        COALESCE(C.cust_name, *'--'*) *as *cust_name,
> >> >> >>>>        COALESCE(C.open_comp_name, *'--'*) *AS *open_comp_name,
> >> >> >>>>        COALESCE(C.open_comp_id, *'--'*) *as *open_comp_id,
> >> >> >>>>        COALESCE(C.org_name,*'--'*) *as *org_name,
> >> >> >>>>        COALESCE(C.org_id,*'--'*) *as *comp_name,
> >> >> >>>>        COALESCE(C.comp_name, *'--'*) *AS *comp_name,
> >> >> >>>>        COALESCE(C.comp_id,*'--'*) *as *comp_id,
> >> >> >>>>        COALESCE(C.mng_name,*'--'*) *as *mng_name,
> >> >> >>>>        COALESCE(C.mng_id,*'--'*) *as *mng_id,
> >> >> >>>>        COALESCE(C.is_tg,*'--'*) *as *is_tg,
> >> >> >>>>        COALESCE(C.cust_type,*'--'*) *as *cust_type,
> >> >> >>>>        COALESCE(C.avg_tot_aset_y365, 0.00) *as *avg_tot_aset_y365,
> >> >> >>>>        COALESCE(C.avg_aset_create_y, 0.00) *as *avg_aset_create_y
> >> >> >>>>
> >> >> >>>> *from*(*select *customerNumber *as *sor_pty_id,
> >> >> >>>>         ts2Date(`lastUpdateTime`, *true*) *as *entrust_date,     -- the
> >> >> >>>> UDF
> >> >> >>>>        ts2Date(`lastUpdateTime`, *false*) *as *entrust_time,     -- the
> >> >> >>>> UDF
> >> >> >>>>         fundCode *as *product_code,
> >> >> >>>>         businessType *as *business_type,
> >> >> >>>>         balance,
> >> >> >>>>         proctime
> >> >> >>>>       *from **lscsp_sc_order_all **where *fundCode *in *(*'007118'*,
> >> >> >>>> *'007117'*) *and *businessType *in *(*'5'*) ) *as *U
> >> >> >>>>
> >> >> >>>> *left join**dim_app_cust_info **FOR *SYSTEM_TIME *AS OF *U.proctime *AS
> >> >> >>>> *C
> >> >> >>>> *on **U*.sor_pty_id = *C*.cust_id
> >> >> >>>> *group by *sor_pty_id,
> >> >> >>>>         entrust_date,
> >> >> >>>>         entrust_time,
> >> >> >>>>         product_code,
> >> >> >>>>         business_type,
> >> >> >>>>         COALESCE(C.cust_name, *'--'*),
> >> >> >>>>         COALESCE(C.open_comp_name, *'--'*),
> >> >> >>>>         COALESCE(C.open_comp_id, *'--'*),
> >> >> >>>>         COALESCE(C.org_name,*'--'*),
> >> >> >>>>         COALESCE(C.org_id,*'--'*),
> >> >> >>>>         COALESCE(C.comp_name, *'--'*),
> >> >> >>>>         COALESCE(C.comp_id,*'--'*),
> >> >> >>>>         COALESCE(C.mng_name,*'--'*),
> >> >> >>>>         COALESCE(C.mng_id,*'--'*),
> >> >> >>>>         COALESCE(C.is_tg,*'--'*),
> >> >> >>>>         COALESCE(C.cust_type,*'--'*),
> >> >> >>>>         COALESCE(C.avg_tot_aset_y365, 0.00),
> >> >> >>>>         COALESCE(C.avg_aset_create_y, 0.00)
> >> >> >>>>
> >> >> >>>> 2020-03-01 17:22:06,504 ERROR
> >> >> >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled
> >> >> >>>> exception.
> >> >> >>>> org.apache.flink.util.FlinkRuntimeException:
> >> >> >>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> >> >> >>>> be compiled. This is a bug. Please file an issue.
> >> >> >>>> at
> >> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
> >> >> >>>> at
> >> >> >>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> >> >> >>>> at
> >> >> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> >> >>>> at
> >> >> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> >> >>>> at java.lang.Thread.run(Thread.java:748)
> >> >> >>>> Caused by:
> >> >> >>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> >> >> >>>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> >> >> >>>> be compiled. This is a bug. Please file an issue.
> >> >> >>>> at
> >> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> >> >> >>>> ... 16 more
> >> >> >>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> >> >> >>>> program cannot be compiled. This is a bug. Please file an issue.
> >> >> >>>> at
> >> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> >> >> >>>> ... 19 more
> >> >> >>>> Caused by: org.codehaus.commons.compiler.CompileException: Line 17,
> >> >> >>>> Column 30: Cannot determine simple type name "com"
> >> >> >>>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> >> >> >>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> >> >> >>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> >> >> >>>> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> >> >> >>>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> >> >> >>>> at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
> >> >> >>>> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
> >> >> >>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
> >> >> >>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
> >> >> >>>> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
> >> >> >>>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
> >> >> >>>> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
> >> >> >>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
> >> >> >>>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> >> >> >>>> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> >> >> >>>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> >> >> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> >> >> >>>> at
> >> >> >>>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> >> >> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> >> >> >>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> >> >> >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> >> >> >>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> >> >> >>>> at
> >> >> >>>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> >> >> >>>> ... 25 more
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> >> >> >>>
> >> >> >>>
> >> >> >>> --
> >> >> >>>
> >> >> >>> Benchao Li
> >> >> >>> School of Electronics Engineering and Computer Science, Peking University
> >> >> >>> Tel:+86-15650713730
> >> >> >>> Email: [hidden email]; [hidden email]
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>
> >> >> >>
> >> >> >> --
> >> >> >>
> >> >> >> Benchao Li
> >> >> >> School of Electronics Engineering and Computer Science, Peking University
> >> >> >> Tel:+86-15650713730
> >> >> >> Email: [hidden email]; [hidden email]
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >
> >> >> >
> >> >> >--
> >> >> >
> >> >> >Benchao Li
> >> >> >School of Electronics Engineering and Computer Science, Peking University
> >> >> >Tel:+86-15650713730
> >> >> >Email: [hidden email]; [hidden email]
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >
> >> >
> >> >--
> >> >
> >> >Benchao Li
> >> >School of Electronics Engineering and Computer Science, Peking University
> >> >Tel:+86-15650713730
> >> >Email: [hidden email]; [hidden email]
> >>
> >>
> >>
> >>
> >>
> >
> >
> >--
> >
> >Benchao Li
> >School of Electronics Engineering and Computer Science, Peking University
> >Tel:+86-15650713730
> >Email: [hidden email]; [hidden email]
>
>
>
>
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]