hi all,
过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment() 官网Blink构建方式是: val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() val bbTableEnv = TableEnvironment.create(bbSettings) 请问如何连接远程集群呢? -- Best, Jun Su |
Administrator
|
Hi,
因为 Blink planner 不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接 ExecutionEnvironment。 Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用 StreamTableEnvironment, 需要直接去构造 StreamTableEnvironmentImpl: StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.createRemoteEnvironment(...); StreamTableEnvironmentImpl tEnv = new StreamTableEnvironmentImpl(.. execEnv, .., false); // 构造的参数可以参考 StreamTableEnvironmentImpl#create 的实现 Best, Jark On Tue, 19 May 2020 at 15:27, jun su <[hidden email]> wrote: > hi all, > > 过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment() > > 官网Blink构建方式是: > > val bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() > val bbTableEnv = TableEnvironment.create(bbSettings) > > > 请问如何连接远程集群呢? > > -- > Best, > Jun Su > |
hi Jark,
抱歉这么晚回复邮件, 在flink 1.11.0版本上按照你的方式试验了下, 创建StreamTableEnvironmentImpl的方式参照了StreamTableEnvironmentImpl#create, 只是删除了方法中检查模式的代码 settings.isStreamingMode() , 结果报以下错误: Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig .getStreamOperatorFactory(StreamConfig.java:291) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>( OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1563) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2125) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.util.InstantiationUtil.deserializeObject( InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject( InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject( InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( InstantiationUtil.java:511) at org.apache.flink.streaming.api.graph.StreamConfig .getStreamOperatorFactory(StreamConfig.java:276) ... 6 more Jark Wu <[hidden email]> 于2020年5月20日周三 下午2:30写道: > Hi, > > 因为 Blink planner > 不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接 > ExecutionEnvironment。 > Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用 > StreamTableEnvironment, > 需要直接去构造 StreamTableEnvironmentImpl: > > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.createRemoteEnvironment(...); > StreamTableEnvironmentImpl tEnv = new StreamTableEnvironmentImpl(.. > execEnv, .., false); // 构造的参数可以参考 StreamTableEnvironmentImpl#create 的实现 > > Best, > Jark > > On Tue, 19 May 2020 at 15:27, jun su <[hidden email]> wrote: > > > hi all, > > > > 过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment() > > > > 官网Blink构建方式是: > > > > val bbSettings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() > > val bbTableEnv = TableEnvironment.create(bbSettings) > > > > > > 请问如何连接远程集群呢? > > > > -- > > Best, > > Jun Su > > > -- Best, Jun Su |
是依赖问题,解决了
jun su <[hidden email]> 于2020年7月27日周一 下午2:29写道: > hi Jark, > > 抱歉这么晚回复邮件, 在flink 1.11.0版本上按照你的方式试验了下, > 创建StreamTableEnvironmentImpl的方式参照了StreamTableEnvironmentImpl#create, > 只是删除了方法中检查模式的代码 settings.isStreamingMode() , 结果报以下错误: > > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Cannot instantiate user function. > at org.apache.flink.streaming.api.graph.StreamConfig > .getStreamOperatorFactory(StreamConfig.java:291) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>( > OperatorChain.java:126) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > StreamTask.java:453) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:522) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1563) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: > 2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: > 2125) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream > .java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: > 2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: > 2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream > .java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: > 2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: > 2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream > .java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: > 2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java: > 2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream > .java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:576) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:562) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:550) > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( > InstantiationUtil.java:511) > at org.apache.flink.streaming.api.graph.StreamConfig > .getStreamOperatorFactory(StreamConfig.java:276) > ... 6 more > > > Jark Wu <[hidden email]> 于2020年5月20日周三 下午2:30写道: > >> Hi, >> >> 因为 Blink planner >> 不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接 >> ExecutionEnvironment。 >> Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用 >> StreamTableEnvironment, >> 需要直接去构造 StreamTableEnvironmentImpl: >> >> StreamExecutionEnvironment execEnv = >> StreamExecutionEnvironment.createRemoteEnvironment(...); >> StreamTableEnvironmentImpl tEnv = new StreamTableEnvironmentImpl(.. >> execEnv, .., false); // 构造的参数可以参考 StreamTableEnvironmentImpl#create 的实现 >> >> Best, >> Jark >> >> On Tue, 19 May 2020 at 15:27, jun su <[hidden email]> wrote: >> >> > hi all, >> > >> > 过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment() >> > >> > 官网Blink构建方式是: >> > >> > val bbSettings = >> > >> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() >> > val bbTableEnv = TableEnvironment.create(bbSettings) >> > >> > >> > 请问如何连接远程集群呢? >> > >> > -- >> > Best, >> > Jun Su >> > >> > > > -- > Best, > Jun Su > -- Best, Jun Su |
In reply to this post by jun su
我现在也碰到了这个问题,也是删除了方法中检查模式的代码 settings.isStreamingMode()。源码为什么要加这样一个检测呢? 感觉
StreamTableEnvironmentImpl 原来是能跑批的,现在加上这个检测 反而不能了。我看最新版的 1.12 还有这个检测代码呢 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |