> tabEnv.createTemporaryView("test_table", result,
我看你这不是注册进去了么? 有报什么错么?
最后提交作业执行记得调用 StreamExecutionEnvironment.execute()
Best,
Jark
On Tue, 8 Dec 2020 at 14:54, Tianwang Li <
[hidden email]> wrote:
> Flink版本:1.10.2
>
> 使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。
>
> 本地测试的结果是一直重复输出数据。
>
> 请问一下DataStream 处理之后,怎么才能注册为 Table。
>
> -------------------------------
> 代码如下:
>
> // 异步redis处理
> RedisAsyncFunction asyncFunction = new RedisAsyncFunction(node,
> aggProcessorArgs);
>
> // 获取异步处理流
> DataStream<Row> result = AsyncDataStream.orderedWait(
> dataStream,
> asyncFunction,
> 60L,
> TimeUnit.SECONDS,
> 100).returns(outRowTypeInfo);
>
> // 注册为临时 table
> tabEnv.createTemporaryView("test_table", result,
> outRowFields.stream().collect(Collectors.joining(",")));
>
> //
> result.print("out_table>>");
>
> Table test_table = tabEnv.sqlQuery("select * from test_table");
> // 查询临时table
> tabEnv.toAppendStream(test_table, Row.class).print("test_table");
>
>
>
> --
> **************************************
> tili
> **************************************
>