flink sql hive streaming报错,不知道是不是bug

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql hive streaming报错,不知道是不是bug

Level1accelerator
版本:1.11.2
报错日志:java.lang.IllegalArgumentException: The ContinuousFileMonitoringFunction has already restored from a previous Flink version.
    at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Reply | Threaded
Open this post in threaded view
|

回复:flink sql hive streaming报错,不知道是不是bug

Level1accelerator
应用场景:hive table read
      blinkStreamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled", true);
      Table table=blinkStreamTableEnv.sqlQuery("SELECT * FROM test.table_config /*+ OPTIONS('streaming-source.enable'='true','streaming-source.monitor-interval' = '30 min')*/");


报错的位置代码flink源码:
                        // given that the parallelism of the function is 1, we can only have 1 or 0 retrieved items.
                        // the 0 is for the case that we are migrating from a previous Flink version.
 
                        Preconditions.checkArgument(retrievedStates.size() <= 1,
                                getClass().getSimpleName() + " retrieved invalid state.");
 
                        if (retrievedStates.size() == 1 &amp;&amp; globalModificationTime != Long.MIN_VALUE) {
                                // this is the case where we have both legacy and new state.
                                // The two should be mutually exclusive for the operator, thus we throw the exception.
 
                                throw new IllegalArgumentException(
                                        "The " + getClass().getSimpleName() + " has already restored from a previous Flink version.");
 
                        } else if (retrievedStates.size() == 1) {
                                this.globalModificationTime = retrievedStates.get(0);
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug("{} retrieved a global mod time of {}.",
                                                getClass().getSimpleName(), globalModificationTime);
                                }
                        }



------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "Excalibur"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年11月16日(星期一) 上午10:54
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;flink sql hive streaming报错,不知道是不是bug



版本:1.11.2
报错日志:java.lang.IllegalArgumentException: The&nbsp;ContinuousFileMonitoringFunction&nbsp;has already restored from&nbsp;a previous Flink&nbsp;version.
&nbsp; &nbsp; at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:176)
&nbsp; &nbsp; at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
&nbsp; &nbsp; at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
&nbsp; &nbsp; at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
&nbsp; &nbsp; at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
&nbsp; &nbsp; at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
&nbsp; &nbsp; at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
&nbsp; &nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
&nbsp; &nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
&nbsp; &nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
&nbsp; &nbsp; at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
&nbsp; &nbsp; at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
&nbsp; &nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
&nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748)