我这边因为混用 StreamTableEnvironment.executeSql 和 StreamExecutionEnvironment.execute 导致了这个报错.
代码逻辑里 executeSql 和 DataStream 会生成独立不相关的sink.
是这么解决这个报错的.
1. 将含有 insert into 语句的StreamTableEnvironment.executeSql 改为:
final StreamStatementSet statementSet = tEnv.createStatementSet();
statementSet.addInsertSql("insert into xxx");
statementSet.attachAsDataStream();
2. 最后统一用StreamExecutionEnvironment.execute执行:
env.execute("DataStreamJob");
旧版本有blink planner 应该就不会有这个问题, 因为根据文档:
The Blink planner will optimize multiple-sinks into one DAG (supported only on TableEnvironment, not on StreamTableEnvironment). The old planner will always optimize each sink into a new DAG, where all DAGs are independent of each other.
https://nightlies.apache.org/flink/flink-docs-release-1.10/dev/table/common.html