Blink Planner构造Remote Env

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Blink Planner构造Remote Env

jun su
hi all,

过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment()

官网Blink构建方式是:

val bbSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)


请问如何连接远程集群呢?

--
Best,
Jun Su
Reply | Threaded
Open this post in threaded view
|

Re: Blink Planner构造Remote Env

Jark
Administrator
Hi,

因为 Blink planner
不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接
ExecutionEnvironment。
Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用
StreamTableEnvironment,
需要直接去构造 StreamTableEnvironmentImpl:

StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.createRemoteEnvironment(...);
StreamTableEnvironmentImpl tEnv = new StreamTableEnvironmentImpl(..
execEnv, .., false); // 构造的参数可以参考 StreamTableEnvironmentImpl#create 的实现

Best,
Jark

On Tue, 19 May 2020 at 15:27, jun su <[hidden email]> wrote:

> hi all,
>
> 过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment()
>
> 官网Blink构建方式是:
>
> val bbSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
> val bbTableEnv = TableEnvironment.create(bbSettings)
>
>
> 请问如何连接远程集群呢?
>
> --
> Best,
> Jun Su
>
Reply | Threaded
Open this post in threaded view
|

Re: Blink Planner构造Remote Env

jun su
hi Jark,

抱歉这么晚回复邮件, 在flink 1.11.0版本上按照你的方式试验了下,
创建StreamTableEnvironmentImpl的方式参照了StreamTableEnvironmentImpl#create,
只是删除了方法中检查模式的代码 settings.isStreamingMode() , 结果报以下错误:

Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig
.getStreamOperatorFactory(StreamConfig.java:291)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(
OperatorChain.java:126)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:453)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1563)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2125)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig
.getStreamOperatorFactory(StreamConfig.java:276)
    ... 6 more


Jark Wu <[hidden email]> 于2020年5月20日周三 下午2:30写道:

> Hi,
>
> 因为 Blink planner
> 不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接
> ExecutionEnvironment。
> Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用
> StreamTableEnvironment,
> 需要直接去构造 StreamTableEnvironmentImpl:
>
> StreamExecutionEnvironment execEnv =
> StreamExecutionEnvironment.createRemoteEnvironment(...);
> StreamTableEnvironmentImpl tEnv = new StreamTableEnvironmentImpl(..
> execEnv, .., false); // 构造的参数可以参考 StreamTableEnvironmentImpl#create 的实现
>
> Best,
> Jark
>
> On Tue, 19 May 2020 at 15:27, jun su <[hidden email]> wrote:
>
> > hi all,
> >
> > 过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment()
> >
> > 官网Blink构建方式是:
> >
> > val bbSettings =
> > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
> > val bbTableEnv = TableEnvironment.create(bbSettings)
> >
> >
> > 请问如何连接远程集群呢?
> >
> > --
> > Best,
> > Jun Su
> >
>


--
Best,
Jun Su
Reply | Threaded
Open this post in threaded view
|

Re: Blink Planner构造Remote Env

jun su
是依赖问题,解决了

jun su <[hidden email]> 于2020年7月27日周一 下午2:29写道:

> hi Jark,
>
> 抱歉这么晚回复邮件, 在flink 1.11.0版本上按照你的方式试验了下,
> 创建StreamTableEnvironmentImpl的方式参照了StreamTableEnvironmentImpl#create,
> 只是删除了方法中检查模式的代码 settings.isStreamingMode() , 结果报以下错误:
>
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
>     at org.apache.flink.streaming.api.graph.StreamConfig
> .getStreamOperatorFactory(StreamConfig.java:291)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(
> OperatorChain.java:126)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:453)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:522)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1563)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2245)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2125)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2027)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2245)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2169)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2027)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2245)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2169)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2027)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2245)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2169)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2027)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:576)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:562)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:550)
>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> InstantiationUtil.java:511)
>     at org.apache.flink.streaming.api.graph.StreamConfig
> .getStreamOperatorFactory(StreamConfig.java:276)
>     ... 6 more
>
>
> Jark Wu <[hidden email]> 于2020年5月20日周三 下午2:30写道:
>
>> Hi,
>>
>> 因为 Blink planner
>> 不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接
>> ExecutionEnvironment。
>> Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用
>> StreamTableEnvironment,
>> 需要直接去构造 StreamTableEnvironmentImpl:
>>
>> StreamExecutionEnvironment execEnv =
>> StreamExecutionEnvironment.createRemoteEnvironment(...);
>> StreamTableEnvironmentImpl tEnv = new StreamTableEnvironmentImpl(..
>> execEnv, .., false); // 构造的参数可以参考 StreamTableEnvironmentImpl#create 的实现
>>
>> Best,
>> Jark
>>
>> On Tue, 19 May 2020 at 15:27, jun su <[hidden email]> wrote:
>>
>> > hi all,
>> >
>> > 过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment()
>> >
>> > 官网Blink构建方式是:
>> >
>> > val bbSettings =
>> >
>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
>> > val bbTableEnv = TableEnvironment.create(bbSettings)
>> >
>> >
>> > 请问如何连接远程集群呢?
>> >
>> > --
>> > Best,
>> > Jun Su
>> >
>>
>
>
> --
> Best,
> Jun Su
>


--
Best,
Jun Su
Reply | Threaded
Open this post in threaded view
|

Re: Blink Planner构造Remote Env

莫失莫忘
In reply to this post by jun su
我现在也碰到了这个问题,也是删除了方法中检查模式的代码 settings.isStreamingMode()。源码为什么要加这样一个检测呢? 感觉
StreamTableEnvironmentImpl 原来是能跑批的,现在加上这个检测 反而不能了。我看最新版的 1.12 还有这个检测代码呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/