使用Table&SQL API怎么构造多个sink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

使用Table&SQL API怎么构造多个sink

yuehan1
Hi,
个人在分析RelNodeBlock逻辑,多个SINK才会拆分并重用公共子树,怎么构造多个sink呢,
文件RelNodeBlock.scala源码里的writeToSink()已经找不到了

// 源码里的多sink例子
val sourceTable = tEnv.scan("test_table").select('a, 'b, 'c)
val leftTable = sourceTable.filter('a > 0).select('a as 'a1, 'b as 'b1)
val rightTable = sourceTable.filter('c.isNotNull).select('b as 'b2, 'c as 'c2)
val joinTable = leftTable.join(rightTable, 'a1 === 'b2)
joinTable.where('a1 >= 70).select('a1, 'b1).writeToSink(sink1)
joinTable.where('a1 < 70 ).select('a1, 'c2).writeToSink(sink2)

谢谢
Reply | Threaded
Open this post in threaded view
|

Re: 使用Table&SQL API怎么构造多个sink

Shengkai Fang
Hi.

可以通过`StatementSet` 指定多个insert,这样子就可以构造出多个sink了。

Best,
Shengkai

Han Han1 Yue <[hidden email]> 于2021年4月28日周三 下午2:30写道:

> Hi,
> 个人在分析RelNodeBlock逻辑,多个SINK才会拆分并重用公共子树,怎么构造多个sink呢,
> 文件RelNodeBlock.scala源码里的writeToSink()已经找不到了
>
> // 源码里的多sink例子
> val sourceTable = tEnv.scan("test_table").select('a, 'b, 'c)
> val leftTable = sourceTable.filter('a > 0).select('a as 'a1, 'b as 'b1)
> val rightTable = sourceTable.filter('c.isNotNull).select('b as 'b2, 'c as
> 'c2)
> val joinTable = leftTable.join(rightTable, 'a1 === 'b2)
> joinTable.where('a1 >= 70).select('a1, 'b1).writeToSink(sink1)
> joinTable.where('a1 < 70 ).select('a1, 'c2).writeToSink(sink2)
>
> 谢谢
>
Reply | Threaded
Open this post in threaded view
|

Re: 使用Table&SQL API怎么构造多个sink

yuehan1
可以了,非常感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/