flink sql bigint cannot be cast to mysql Long

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql bigint cannot be cast to mysql Long

Zhou Zach
SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Thu Jun 11 13:18:19 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 77019acfda2a5ccdc0cfcd28c72b18c1)

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)

at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)

at org.rabbit.streaming.FromJdbcSinkJdbcBySql$.main(FromJdbcSinkJdbcBySql.scala:99)

at org.rabbit.streaming.FromJdbcSinkJdbcBySql.main(FromJdbcSinkJdbcBySql.scala)

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 77019acfda2a5ccdc0cfcd28c72b18c1)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)

at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)

at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)

at akka.dispatch.OnComplete.internal(Future.scala:264)

at akka.dispatch.OnComplete.internal(Future.scala:261)

at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)

at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)

at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)

at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)

at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)

at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)

at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)

at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)

at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)

at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)

at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)

at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)

at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)

at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)

at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)

at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)

at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 31 more

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)

at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)

at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)

at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)

at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)

at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)

at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

... 4 more

Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Long

at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)

at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)

at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)

at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)

at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)











query:




val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = TableEnvironment.create(settings)

tableEnv.sqlUpdate(
"""
    |
    |CREATE TABLE analysis_gift_consume (
    |    id  bigint,
    |    times INT,
    |    gid INT,
    |    gname VARCHAR,
    |    counts bigint
    |) WITH (
    |    'connector.type' = 'jdbc',
    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
    |    'connector.table' = 'analysis_gift_consume',
    |    'connector.username' = 'root',
    |    'connector.password' = '123456',
    |    'connector.write.flush.max-rows' = '1'
    |)
    |""".stripMargin)

tableEnv.sqlUpdate(
"""
    |
    |CREATE TABLE analysis_gift_consume1 (
    |    id  bigint,
    |    times INT,
    |    gid INT,
    |    gname VARCHAR,
    |    counts bigint
    |) WITH (
    |    'connector.type' = 'jdbc',
    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
    |    'connector.table' = 'analysis_gift_consume1',
    |    'connector.username' = 'root',
    |    'connector.password' = '123456',
    |    'connector.write.flush.max-rows' = '1'
    |)
    |""".stripMargin)
tableEnv.sqlUpdate(
"""
    |
    |insert into analysis_gift_consume1
    |select * from analysis_gift_consume
    |
    |""".stripMargin)


tableEnv.execute("sink mysql")
Reply | Threaded
Open this post in threaded view
|

Re:flink sql bigint cannot be cast to mysql Long

chaojianok
检查一下你项目里引入的 MySQL 包的版本是否和你使用的 MySQL 版本一致,或者也可以直接转换一下数据类型。

















At 2020-06-11 13:22:07, "Zhou Zach" <[hidden email]> wrote:

>SLF4J: Class path contains multiple SLF4J bindings.
>
>SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
>SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
>SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
>
>SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
>
>ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
>Thu Jun 11 13:18:19 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
>Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 77019acfda2a5ccdc0cfcd28c72b18c1)
>
>at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
>at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>
>at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
>
>at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>
>at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>
>at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>
>at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>
>at org.rabbit.streaming.FromJdbcSinkJdbcBySql$.main(FromJdbcSinkJdbcBySql.scala:99)
>
>at org.rabbit.streaming.FromJdbcSinkJdbcBySql.main(FromJdbcSinkJdbcBySql.scala)
>
>Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 77019acfda2a5ccdc0cfcd28c72b18c1)
>
>at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>
>at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>
>at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>
>at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
>at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>
>at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
>
>at akka.dispatch.OnComplete.internal(Future.scala:264)
>
>at akka.dispatch.OnComplete.internal(Future.scala:261)
>
>at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>
>at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>
>at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>
>at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>
>at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>
>at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>
>at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>
>at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>
>at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>
>at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>
>at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>
>at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>
>at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>
>at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>
>at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>
>at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>
>at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>
>at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>
>at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>
>at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>
>at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
>at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>
>at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>
>at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>
>... 31 more
>
>Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>
>at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>
>at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>
>at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>
>at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>
>at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>
>at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>
>at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>at java.lang.reflect.Method.invoke(Method.java:498)
>
>at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>
>at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>
>at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>
>at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>
>at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
>at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>
>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>
>at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>
>at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>
>at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>
>... 4 more
>
>Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Long
>
>at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
>
>at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>
>at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>
>at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
>
>at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>
>at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>
>at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>
>at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>
>at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>
>at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
>
>at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>
>at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>
>at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>
>
>
>
>
>
>
>
>
>
>
>query:
>
>
>
>
>val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>val tableEnv = TableEnvironment.create(settings)
>
>tableEnv.sqlUpdate(
>"""
>    |
>    |CREATE TABLE analysis_gift_consume (
>    |    id  bigint,
>    |    times INT,
>    |    gid INT,
>    |    gname VARCHAR,
>    |    counts bigint
>    |) WITH (
>    |    'connector.type' = 'jdbc',
>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>    |    'connector.table' = 'analysis_gift_consume',
>    |    'connector.username' = 'root',
>    |    'connector.password' = '123456',
>    |    'connector.write.flush.max-rows' = '1'
>    |)
>    |""".stripMargin)
>
>tableEnv.sqlUpdate(
>"""
>    |
>    |CREATE TABLE analysis_gift_consume1 (
>    |    id  bigint,
>    |    times INT,
>    |    gid INT,
>    |    gname VARCHAR,
>    |    counts bigint
>    |) WITH (
>    |    'connector.type' = 'jdbc',
>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>    |    'connector.table' = 'analysis_gift_consume1',
>    |    'connector.username' = 'root',
>    |    'connector.password' = '123456',
>    |    'connector.write.flush.max-rows' = '1'
>    |)
>    |""".stripMargin)
>tableEnv.sqlUpdate(
>"""
>    |
>    |insert into analysis_gift_consume1
>    |select * from analysis_gift_consume
>    |
>    |""".stripMargin)
>
>
>tableEnv.execute("sink mysql")
Reply | Threaded
Open this post in threaded view
|

Re: flink sql bigint cannot be cast to mysql Long

Leonard Xu
In reply to this post by Zhou Zach
Hi,
用的 flink 版本是多少? 数据库的字段确定是 bigint 类型吗?
> Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Long

java.math.BigInteger 的范围比 java.lang.Long的范围大很多,是不能cast的,应该是你数据类型对应错误了,可以把mysql 表的schema贴下吗?


祝好,
Leonard Xu

> 在 2020年6月11日,13:22,Zhou Zach <[hidden email]> 写道:
>
> SLF4J: Class path contains multiple SLF4J bindings.
>
> SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
>
> SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
>
> ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
>
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
> Thu Jun 11 13:18:19 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>
> Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 77019acfda2a5ccdc0cfcd28c72b18c1)
>
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
>
> at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>
> at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>
> at org.rabbit.streaming.FromJdbcSinkJdbcBySql$.main(FromJdbcSinkJdbcBySql.scala:99)
>
> at org.rabbit.streaming.FromJdbcSinkJdbcBySql.main(FromJdbcSinkJdbcBySql.scala)
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 77019acfda2a5ccdc0cfcd28c72b18c1)
>
> at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>
> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>
> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
>
> at akka.dispatch.OnComplete.internal(Future.scala:264)
>
> at akka.dispatch.OnComplete.internal(Future.scala:261)
>
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>
> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>
> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>
> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>
> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>
> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>
> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>
> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>
> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>
> at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>
> at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>
> ... 31 more
>
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>
> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>
> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>
> at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>
> at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>
> at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>
> at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>
> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>
> ... 4 more
>
> Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Long
>
> at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
>
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>
> at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
>
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>
>
>
>
>
>
>
>
>
>
>
> query:
>
>
>
>
> val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val tableEnv = TableEnvironment.create(settings)
>
> tableEnv.sqlUpdate(
> """
>    |
>    |CREATE TABLE analysis_gift_consume (
>    |    id  bigint,
>    |    times INT,
>    |    gid INT,
>    |    gname VARCHAR,
>    |    counts bigint
>    |) WITH (
>    |    'connector.type' = 'jdbc',
>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>    |    'connector.table' = 'analysis_gift_consume',
>    |    'connector.username' = 'root',
>    |    'connector.password' = '123456',
>    |    'connector.write.flush.max-rows' = '1'
>    |)
>    |""".stripMargin)
>
> tableEnv.sqlUpdate(
> """
>    |
>    |CREATE TABLE analysis_gift_consume1 (
>    |    id  bigint,
>    |    times INT,
>    |    gid INT,
>    |    gname VARCHAR,
>    |    counts bigint
>    |) WITH (
>    |    'connector.type' = 'jdbc',
>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>    |    'connector.table' = 'analysis_gift_consume1',
>    |    'connector.username' = 'root',
>    |    'connector.password' = '123456',
>    |    'connector.write.flush.max-rows' = '1'
>    |)
>    |""".stripMargin)
> tableEnv.sqlUpdate(
> """
>    |
>    |insert into analysis_gift_consume1
>    |select * from analysis_gift_consume
>    |
>    |""".stripMargin)
>
>
> tableEnv.execute("sink mysql")

Reply | Threaded
Open this post in threaded view
|

Re:Re: flink sql bigint cannot be cast to mysql Long

Zhou Zach
flink版本是1.10.0,
mysql表:
CREATE TABLE `analysis_gift_consume` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `times` int(8) NOT NULL COMMENT '时间[yyyyMMdd]',
  `gid` int(4) NOT NULL DEFAULT '0' COMMENT '礼物ID',
  `gname` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '礼物名称',
  `counts` bigint(20) NOT NULL DEFAULT '0' COMMENT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='';




CREATE TABLE `analysis_gift_consume1` (

  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,

  `times` int(8) NOT NULL COMMENT '时间[yyyyMMdd]',

  `gid` int(4) NOT NULL DEFAULT '0' COMMENT '礼物ID',

  `gname` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '礼物名称',

  `counts` bigint(20) NOT NULL DEFAULT '0' COMMENT '',

  PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='';




数据库的字段是 bigint 类型,总有场景在mysql的字段设置为bigint吧,如果mysql的字段为bigint,那在创建flink sql时,用什么类型合适呢





At 2020-06-11 13:42:11, "Leonard Xu" <[hidden email]> wrote:

>Hi,
>用的 flink 版本是多少? 数据库的字段确定是 bigint 类型吗?
>> Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Long
>
>java.math.BigInteger 的范围比 java.lang.Long的范围大很多,是不能cast的,应该是你数据类型对应错误了,可以把mysql 表的schema贴下吗?
>
>
>祝好,
>Leonard Xu
>
>> 在 2020年6月11日,13:22,Zhou Zach <[hidden email]> 写道:
>>
>> SLF4J: Class path contains multiple SLF4J bindings.
>>
>> SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
>>
>> SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
>>
>> ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
>>
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>> Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>> Thu Jun 11 13:18:19 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>> Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 77019acfda2a5ccdc0cfcd28c72b18c1)
>>
>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>
>> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
>>
>> at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>>
>> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>
>> at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>>
>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>>
>> at org.rabbit.streaming.FromJdbcSinkJdbcBySql$.main(FromJdbcSinkJdbcBySql.scala:99)
>>
>> at org.rabbit.streaming.FromJdbcSinkJdbcBySql.main(FromJdbcSinkJdbcBySql.scala)
>>
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 77019acfda2a5ccdc0cfcd28c72b18c1)
>>
>> at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>
>> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>
>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>
>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>
>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
>>
>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>>
>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>>
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>>
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>>
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>
>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>>
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>
>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>>
>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>>
>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>>
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>
>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>
>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>
>> at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>
>> at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>>
>> ... 31 more
>>
>> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>
>> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>>
>> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>>
>> at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>>
>> at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>>
>> at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>>
>> at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>>
>> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>>
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>>
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>
>> ... 4 more
>>
>> Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Long
>>
>> at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
>>
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
>>
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>>
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>>
>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>>
>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>>
>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>
>> at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
>>
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>
>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> query:
>>
>>
>>
>>
>> val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> val tableEnv = TableEnvironment.create(settings)
>>
>> tableEnv.sqlUpdate(
>> """
>>    |
>>    |CREATE TABLE analysis_gift_consume (
>>    |    id  bigint,
>>    |    times INT,
>>    |    gid INT,
>>    |    gname VARCHAR,
>>    |    counts bigint
>>    |) WITH (
>>    |    'connector.type' = 'jdbc',
>>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>    |    'connector.table' = 'analysis_gift_consume',
>>    |    'connector.username' = 'root',
>>    |    'connector.password' = '123456',
>>    |    'connector.write.flush.max-rows' = '1'
>>    |)
>>    |""".stripMargin)
>>
>> tableEnv.sqlUpdate(
>> """
>>    |
>>    |CREATE TABLE analysis_gift_consume1 (
>>    |    id  bigint,
>>    |    times INT,
>>    |    gid INT,
>>    |    gname VARCHAR,
>>    |    counts bigint
>>    |) WITH (
>>    |    'connector.type' = 'jdbc',
>>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>    |    'connector.table' = 'analysis_gift_consume1',
>>    |    'connector.username' = 'root',
>>    |    'connector.password' = '123456',
>>    |    'connector.write.flush.max-rows' = '1'
>>    |)
>>    |""".stripMargin)
>> tableEnv.sqlUpdate(
>> """
>>    |
>>    |insert into analysis_gift_consume1
>>    |select * from analysis_gift_consume
>>    |
>>    |""".stripMargin)
>>
>>
>> tableEnv.execute("sink mysql")
Reply | Threaded
Open this post in threaded view
|

Re:Re:flink sql bigint cannot be cast to mysql Long

Zhou Zach
In reply to this post by chaojianok









项目里引用的mysql:
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>5.1.46</version>
</dependency>
使用的Mysql版本是5.7.18-log
如果mysql里面的字段是bigint,建表转换成int吗,会有截断风险吧








At 2020-06-11 13:39:18, "chaojianok" <[hidden email]> wrote:

>检查一下你项目里引入的 MySQL 包的版本是否和你使用的 MySQL 版本一致,或者也可以直接转换一下数据类型。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>At 2020-06-11 13:22:07, "Zhou Zach" <[hidden email]> wrote:
>>SLF4J: Class path contains multiple SLF4J bindings.
>>
>>SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>>SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>>SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
>>
>>SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
>>
>>ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:18 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>>Thu Jun 11 13:18:19 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>>
>>Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 77019acfda2a5ccdc0cfcd28c72b18c1)
>>
>>at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>
>>at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>
>>at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
>>
>>at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>>
>>at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>
>>at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>>
>>at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>>
>>at org.rabbit.streaming.FromJdbcSinkJdbcBySql$.main(FromJdbcSinkJdbcBySql.scala:99)
>>
>>at org.rabbit.streaming.FromJdbcSinkJdbcBySql.main(FromJdbcSinkJdbcBySql.scala)
>>
>>Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 77019acfda2a5ccdc0cfcd28c72b18c1)
>>
>>at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>
>>at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>
>>at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>
>>at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>>at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>
>>at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
>>
>>at akka.dispatch.OnComplete.internal(Future.scala:264)
>>
>>at akka.dispatch.OnComplete.internal(Future.scala:261)
>>
>>at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>>
>>at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>>
>>at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>
>>at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>>
>>at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>
>>at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>
>>at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>>
>>at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>>
>>at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>>
>>at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>
>>at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>
>>at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>
>>at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>
>>at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>
>>at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>
>>at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>
>>at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>
>>at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>
>>at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>
>>at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>
>>at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>>at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>>at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>>at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>
>>at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>
>>at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>>
>>... 31 more
>>
>>Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>
>>at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>>
>>at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>>
>>at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>>
>>at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>>
>>at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>>
>>at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>>
>>at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>>
>>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>>at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>>at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>>at java.lang.reflect.Method.invoke(Method.java:498)
>>
>>at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>>
>>at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>>
>>at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>
>>at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>
>>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>
>>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>
>>at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>
>>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>
>>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>
>>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>
>>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>
>>at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>
>>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>
>>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>
>>at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>
>>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>
>>at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>
>>at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>
>>... 4 more
>>
>>Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Long
>>
>>at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
>>
>>at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>
>>at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>
>>at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
>>
>>at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>>
>>at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>>
>>at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>>
>>at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>>
>>at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>
>>at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
>>
>>at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>
>>at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>
>>at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>query:
>>
>>
>>
>>
>>val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>val tableEnv = TableEnvironment.create(settings)
>>
>>tableEnv.sqlUpdate(
>>"""
>>    |
>>    |CREATE TABLE analysis_gift_consume (
>>    |    id  bigint,
>>    |    times INT,
>>    |    gid INT,
>>    |    gname VARCHAR,
>>    |    counts bigint
>>    |) WITH (
>>    |    'connector.type' = 'jdbc',
>>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>    |    'connector.table' = 'analysis_gift_consume',
>>    |    'connector.username' = 'root',
>>    |    'connector.password' = '123456',
>>    |    'connector.write.flush.max-rows' = '1'
>>    |)
>>    |""".stripMargin)
>>
>>tableEnv.sqlUpdate(
>>"""
>>    |
>>    |CREATE TABLE analysis_gift_consume1 (
>>    |    id  bigint,
>>    |    times INT,
>>    |    gid INT,
>>    |    gname VARCHAR,
>>    |    counts bigint
>>    |) WITH (
>>    |    'connector.type' = 'jdbc',
>>    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>    |    'connector.table' = 'analysis_gift_consume1',
>>    |    'connector.username' = 'root',
>>    |    'connector.password' = '123456',
>>    |    'connector.write.flush.max-rows' = '1'
>>    |)
>>    |""".stripMargin)
>>tableEnv.sqlUpdate(
>>"""
>>    |
>>    |insert into analysis_gift_consume1
>>    |select * from analysis_gift_consume
>>    |
>>    |""".stripMargin)
>>
>>
>>tableEnv.execute("sink mysql")
Reply | Threaded
Open this post in threaded view
|

Re: flink sql bigint cannot be cast to mysql Long

Leonard Xu
In reply to this post by Zhou Zach
Hi,

JDBC connector 之前不支持 unsigned 类型,unsigned 会比signed 类型更长。
bigint(20) unsigned(range is 0 to 18446744073709551615) 超过了  bigint (range is -9223372036854775808 to 9223372036854775807)的长度。


最新的代码已经修复这个问题了[1],你可以等1.11发布后试用,或者编译下最新的代码,flink 中对应表 声明decimal(20, 0)处理。

祝好,
Leonard Xu

[1]  https://issues.apache.org/jira/browse/FLINK-17657 <https://issues.apache.org/jira/browse/FLINK-17657>

> 在 2020年6月11日,13:51,Zhou Zach <[hidden email]> 写道:
>
> bigint(20) unsigned

Reply | Threaded
Open this post in threaded view
|

Re:Re: flink sql bigint cannot be cast to mysql Long

Zhou Zach
3ku

















在 2020-06-11 14:10:53,"Leonard Xu" <[hidden email]> 写道:

>Hi,
>
>JDBC connector 之前不支持 unsigned 类型,unsigned 会比signed 类型更长。
>bigint(20) unsigned(range is 0 to 18446744073709551615) 超过了  bigint (range is -9223372036854775808 to 9223372036854775807)的长度。
>
>
>最新的代码已经修复这个问题了[1],你可以等1.11发布后试用,或者编译下最新的代码,flink 中对应表 声明decimal(20, 0)处理。
>
>祝好,
>Leonard Xu
>
>[1]  https://issues.apache.org/jira/browse/FLINK-17657 <https://issues.apache.org/jira/browse/FLINK-17657>
>
>> 在 2020年6月11日,13:51,Zhou Zach <[hidden email]> 写道:
>>
>> bigint(20) unsigned
>