Flink 查询hive表 初始化 Savepoint

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 查询hive表 初始化 Savepoint

王良
我使用的是flink 1.10 ,想通过查询hive表的数据初始化Savepoint,现在遇到的问题是无法将Table转成DataSet

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
Table table = tableEnv.sqlQuery("select * from test001");

BootstrapTransformation<CurrencyRate> broadcastTransformation = OperatorTransformation
        .bootstrapWith(currencyDataSet)
        .transform(new CurrencyBootstrapFunction());

Savepoint
        .create(backend, 128)
        .withOperator(ACCOUNT_UID, transformation)
        .withOperator(CURRENCY_UID, broadcastTransformation)
        .write(savepointPath);

Reply | Threaded
Open this post in threaded view
|

Re: Flink 查询hive表 初始化 Savepoint

Benchao Li
Hi,

Blink planner是不支持Table API跟DataSet API互转的。
Blink planner是批流统一的架构,不是基于DataSet API实现的批。

王良 <[hidden email]> 于2020年5月6日周三 下午4:41写道:

> 我使用的是flink 1.10 ,想通过查询hive表的数据初始化Savepoint,现在遇到的问题是无法将Table转成DataSet
>
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
> Table table = tableEnv.sqlQuery("select * from test001");
>
> BootstrapTransformation<CurrencyRate> broadcastTransformation =
> OperatorTransformation
>         .bootstrapWith(currencyDataSet)
>         .transform(new CurrencyBootstrapFunction());
>
> Savepoint
>         .create(backend, 128)
>         .withOperator(ACCOUNT_UID, transformation)
>         .withOperator(CURRENCY_UID, broadcastTransformation)
>         .write(savepointPath);
>
>

--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink 查询hive表 初始化 Savepoint

Jingsong Li
Hi,

后续有规划Savepoint支持BoundedStream(在DataStream上构建批执行)

目前作为work around,或许你可以考虑先用Blink sql写到文件里(parquet,orc),再从文件里用Dataset读出来?

Best,
Jingsong Lee

On Wed, May 6, 2020 at 7:45 PM Benchao Li <[hidden email]> wrote:

> Hi,
>
> Blink planner是不支持Table API跟DataSet API互转的。
> Blink planner是批流统一的架构,不是基于DataSet API实现的批。
>
> 王良 <[hidden email]> 于2020年5月6日周三 下午4:41写道:
>
> > 我使用的是flink 1.10 ,想通过查询hive表的数据初始化Savepoint,现在遇到的问题是无法将Table转成DataSet
> >
> > EnvironmentSettings settings =
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> > TableEnvironment tableEnv = TableEnvironment.create(settings);
> > Table table = tableEnv.sqlQuery("select * from test001");
> >
> > BootstrapTransformation<CurrencyRate> broadcastTransformation =
> > OperatorTransformation
> >         .bootstrapWith(currencyDataSet)
> >         .transform(new CurrencyBootstrapFunction());
> >
> > Savepoint
> >         .create(backend, 128)
> >         .withOperator(ACCOUNT_UID, transformation)
> >         .withOperator(CURRENCY_UID, broadcastTransformation)
> >         .write(savepointPath);
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>


--
Best, Jingsong Lee