[flink-1.10.2] 异步IO结果DataStream 该如何注册为table??

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[flink-1.10.2] 异步IO结果DataStream 该如何注册为table??

Tianwang Li
Flink版本:1.10.2

使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。

本地测试的结果是一直重复输出数据。

请问一下DataStream 处理之后,怎么才能注册为 Table。

-------------------------------
代码如下:

// 异步redis处理
RedisAsyncFunction asyncFunction = new RedisAsyncFunction(node,
aggProcessorArgs);

// 获取异步处理流
DataStream<Row> result = AsyncDataStream.orderedWait(
        dataStream,
        asyncFunction,
        60L,
        TimeUnit.SECONDS,
        100).returns(outRowTypeInfo);

// 注册为临时 table
tabEnv.createTemporaryView("test_table", result,
outRowFields.stream().collect(Collectors.joining(",")));

//
result.print("out_table>>");

Table test_table = tabEnv.sqlQuery("select * from test_table");
// 查询临时table
tabEnv.toAppendStream(test_table, Row.class).print("test_table");



--
**************************************
 tili
**************************************
Reply | Threaded
Open this post in threaded view
|

Re: [flink-1.10.2] 异步IO结果DataStream 该如何注册为table??

Jark
Administrator
> tabEnv.createTemporaryView("test_table", result,

我看你这不是注册进去了么? 有报什么错么?

最后提交作业执行记得调用 StreamExecutionEnvironment.execute()

Best,
Jark

On Tue, 8 Dec 2020 at 14:54, Tianwang Li <[hidden email]> wrote:

> Flink版本:1.10.2
>
> 使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。
>
> 本地测试的结果是一直重复输出数据。
>
> 请问一下DataStream 处理之后,怎么才能注册为 Table。
>
> -------------------------------
> 代码如下:
>
> // 异步redis处理
> RedisAsyncFunction asyncFunction = new RedisAsyncFunction(node,
> aggProcessorArgs);
>
> // 获取异步处理流
> DataStream<Row> result = AsyncDataStream.orderedWait(
>         dataStream,
>         asyncFunction,
>         60L,
>         TimeUnit.SECONDS,
>         100).returns(outRowTypeInfo);
>
> // 注册为临时 table
> tabEnv.createTemporaryView("test_table", result,
> outRowFields.stream().collect(Collectors.joining(",")));
>
> //
> result.print("out_table>>");
>
> Table test_table = tabEnv.sqlQuery("select * from test_table");
> // 查询临时table
> tabEnv.toAppendStream(test_table, Row.class).print("test_table");
>
>
>
> --
> **************************************
>  tili
> **************************************
>