各位大佬
我看flink-connector-kudu的例子都是DataStream,但是我想用DataSet 进行点查。 看着提示好像不支持。 有什么办法处理么?
代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
StreamTableEnvironment tEnv2 = StreamTableEnvironment.create(env);
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);
String KUDU_MASTERS="192.168.248.4:7051";
KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS);
tEnv2.registerCatalog("kudu", catalog);
tEnv2.useCatalog("kudu");
oldBatchTableEnv.registerCatalog("kudu", catalog);
oldBatchTableEnv.useCatalog("kudu");
Table odlTable = oldBatchTableEnv.sqlQuery("select * from users");
DataSet<Row> dsRow = oldBatchTableEnv.toDataSet(odlTable, Row.class);
dsRow.print();
Table table = tEnv2.sqlQuery("select * from users");
tEnv2.toAppendStream(table, Row.class).print();
报错如下:
ERROR StatusLogger No Log4j 2 configuration file found. Using default configuration (logging only errors to the console), or user programmatically provided configurations. Set system property 'log4j2.debug' to show Log4j 2 internal initialization logging. See
https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions on how to configure Log4j 2
Exception in thread "main" org.apache.flink.table.api.TableException: Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment.
at org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116)
at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580)
at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555)
at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537)
at org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101)
at com.hujiang.bi.order.OrderMasterJob.main(OrderMasterJob.java:44)
看着应该是 flink-connector-kudu不支持batch读了。。。