我按官网上的 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query 例子写的代码 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢? [hidden email] |
完整代码发一下
[hidden email] <[hidden email]> 于2019年8月8日周四 下午7:37写道: > > 我按官网上的 > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query > 例子写的代码 > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢? > > > > [hidden email] > |
我接入了一个 RocketMQ 的流作为输入。 DataStream<Tuple3<Integer, String, String>> ds = env.addSource(new RocketMQSource( ........ System.out.println(res); return res; } }); tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, pick_list_no, sku_code"); TableSink<Row> csvSink = new CsvTableSink("D:\\data\\flink",","); String[] fieldNames = {"num"}; TypeInformation[] fieldTypes = {Types.INT}; tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink); tableEnv.sqlUpdate( "INSERT INTO RubberOrders SELECT pick_task_id FROM t_pick_task"); [hidden email] Sender: Alec Chen Send Time: 2019-08-08 21:01 Receiver: user-zh Subject: Re: CsvTableSink 目录没有写入具体的数据 完整代码发一下 [hidden email] <[hidden email]> 于2019年8月8日周四 下午7:37写道: > > 我按官网上的 > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query > 例子写的代码 > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢? > > > > [hidden email] > |
没数据是因为没有trigger执行, 参考sample code from doc(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html ) // get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // create a TableSinkTableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); // register the TableSink with a specific schemaString[] fieldNames = {"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink); // compute a result Table using Table API operators and/or SQL queriesTable result = ...// emit the result Table to the registered TableSinkresult.insertInto("CsvSinkTable"); // execute the program 加上 tableEnv.execute(); [hidden email] <[hidden email]> 于2019年8月9日周五 上午9:42写道: > > 我接入了一个 RocketMQ 的流作为输入。 > > > DataStream<Tuple3<Integer, String, String>> ds = env.addSource(new > RocketMQSource( > ........ > System.out.println(res); > return res; > } > }); > > > tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, > pick_list_no, sku_code"); > > TableSink<Row> csvSink = new CsvTableSink("D:\\data\\flink",","); > String[] fieldNames = {"num"}; > TypeInformation[] fieldTypes = {Types.INT}; > tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, > csvSink); > tableEnv.sqlUpdate( > "INSERT INTO RubberOrders SELECT pick_task_id FROM > t_pick_task"); > > > > [hidden email] > > Sender: Alec Chen > Send Time: 2019-08-08 21:01 > Receiver: user-zh > Subject: Re: CsvTableSink 目录没有写入具体的数据 > 完整代码发一下 > > [hidden email] <[hidden email]> 于2019年8月8日周四 下午7:37写道: > > > > > 我按官网上的 > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query > > 例子写的代码 > > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢? > > > > > > > > [hidden email] > > > |
抱歉,是我搞错了。 实际上是写入数据的。我在 windows 下做测试,刷新下文件的大小始终是 0 , 只有编辑看下那个文件显示的文件大小才会变更。 [hidden email] Sender: Alec Chen Send Time: 2019-08-09 10:17 Receiver: user-zh Subject: Re: Re: CsvTableSink 目录没有写入具体的数据 没数据是因为没有trigger执行, 参考sample code from doc( https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html ) // get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // create a TableSinkTableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); // register the TableSink with a specific schemaString[] fieldNames = {"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink); // compute a result Table using Table API operators and/or SQL queriesTable result = ...// emit the result Table to the registered TableSinkresult.insertInto("CsvSinkTable"); // execute the program 加上 tableEnv.execute(); [hidden email] <[hidden email]> 于2019年8月9日周五 上午9:42写道: > > 我接入了一个 RocketMQ 的流作为输入。 > > > DataStream<Tuple3<Integer, String, String>> ds = env.addSource(new > RocketMQSource( > ........ > System.out.println(res); > return res; > } > }); > > > tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, > pick_list_no, sku_code"); > > TableSink<Row> csvSink = new CsvTableSink("D:\\data\\flink",","); > String[] fieldNames = {"num"}; > TypeInformation[] fieldTypes = {Types.INT}; > tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, > csvSink); > tableEnv.sqlUpdate( > "INSERT INTO RubberOrders SELECT pick_task_id FROM > t_pick_task"); > > > > [hidden email] > > Sender: Alec Chen > Send Time: 2019-08-08 21:01 > Receiver: user-zh > Subject: Re: CsvTableSink 目录没有写入具体的数据 > 完整代码发一下 > > [hidden email] <[hidden email]> 于2019年8月8日周四 下午7:37写道: > > > > > 我按官网上的 > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query > > 例子写的代码 > > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢? > > > > > > > > [hidden email] > > > |
Free forum by Nabble | Edit this page |