我使用的是flink 1.10 ,想通过查询hive表的数据初始化Savepoint,现在遇到的问题是无法将Table转成DataSet
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); Table table = tableEnv.sqlQuery("select * from test001"); BootstrapTransformation<CurrencyRate> broadcastTransformation = OperatorTransformation .bootstrapWith(currencyDataSet) .transform(new CurrencyBootstrapFunction()); Savepoint .create(backend, 128) .withOperator(ACCOUNT_UID, transformation) .withOperator(CURRENCY_UID, broadcastTransformation) .write(savepointPath); |
Hi,
Blink planner是不支持Table API跟DataSet API互转的。 Blink planner是批流统一的架构,不是基于DataSet API实现的批。 王良 <[hidden email]> 于2020年5月6日周三 下午4:41写道: > 我使用的是flink 1.10 ,想通过查询hive表的数据初始化Savepoint,现在遇到的问题是无法将Table转成DataSet > > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > Table table = tableEnv.sqlQuery("select * from test001"); > > BootstrapTransformation<CurrencyRate> broadcastTransformation = > OperatorTransformation > .bootstrapWith(currencyDataSet) > .transform(new CurrencyBootstrapFunction()); > > Savepoint > .create(backend, 128) > .withOperator(ACCOUNT_UID, transformation) > .withOperator(CURRENCY_UID, broadcastTransformation) > .write(savepointPath); > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi,
后续有规划Savepoint支持BoundedStream(在DataStream上构建批执行) 目前作为work around,或许你可以考虑先用Blink sql写到文件里(parquet,orc),再从文件里用Dataset读出来? Best, Jingsong Lee On Wed, May 6, 2020 at 7:45 PM Benchao Li <[hidden email]> wrote: > Hi, > > Blink planner是不支持Table API跟DataSet API互转的。 > Blink planner是批流统一的架构,不是基于DataSet API实现的批。 > > 王良 <[hidden email]> 于2020年5月6日周三 下午4:41写道: > > > 我使用的是flink 1.10 ,想通过查询hive表的数据初始化Savepoint,现在遇到的问题是无法将Table转成DataSet > > > > EnvironmentSettings settings = > > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > > TableEnvironment tableEnv = TableEnvironment.create(settings); > > Table table = tableEnv.sqlQuery("select * from test001"); > > > > BootstrapTransformation<CurrencyRate> broadcastTransformation = > > OperatorTransformation > > .bootstrapWith(currencyDataSet) > > .transform(new CurrencyBootstrapFunction()); > > > > Savepoint > > .create(backend, 128) > > .withOperator(ACCOUNT_UID, transformation) > > .withOperator(CURRENCY_UID, broadcastTransformation) > > .write(savepointPath); > > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |