CsvTableSink 目录没有写入具体的数据

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

CsvTableSink 目录没有写入具体的数据

wanglei2@geekplus.com.cn

我按官网上的 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query  例子写的代码
但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?



[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: CsvTableSink 目录没有写入具体的数据

Alec Chen
完整代码发一下

[hidden email] <[hidden email]> 于2019年8月8日周四 下午7:37写道:

>
> 我按官网上的
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> 例子写的代码
> 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
>
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: CsvTableSink 目录没有写入具体的数据

wanglei2@geekplus.com.cn
       
我接入了一个 RocketMQ 的流作为输入。


 DataStream<Tuple3<Integer, String, String>> ds = env.addSource(new RocketMQSource(
........
                        System.out.println(res);
                        return res;
                    }
                });


        tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, pick_list_no, sku_code");

        TableSink<Row> csvSink = new CsvTableSink("D:\\data\\flink",",");
        String[] fieldNames = {"num"};
        TypeInformation[] fieldTypes = {Types.INT};
        tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink);
        tableEnv.sqlUpdate(
                "INSERT INTO RubberOrders SELECT pick_task_id FROM t_pick_task");



[hidden email]
 
Sender: Alec Chen
Send Time: 2019-08-08 21:01
Receiver: user-zh
Subject: Re: CsvTableSink 目录没有写入具体的数据
完整代码发一下
 
[hidden email] <[hidden email]> 于2019年8月8日周四 下午7:37写道:
 

>
> 我按官网上的
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> 例子写的代码
> 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
>
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: CsvTableSink 目录没有写入具体的数据

Alec Chen
没数据是因为没有trigger执行,  参考sample code from doc(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html
)

// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalentlyStreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
// create a TableSinkTableSink sink = new
CsvTableSink("/path/to/file", fieldDelim = "|");
// register the TableSink with a specific schemaString[] fieldNames =
{"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT,
Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable",
fieldNames, fieldTypes, sink);
// compute a result Table using Table API operators and/or SQL
queriesTable result = ...// emit the result Table to the registered
TableSinkresult.insertInto("CsvSinkTable");
// execute the program

加上 tableEnv.execute();


[hidden email] <[hidden email]> 于2019年8月9日周五 上午9:42写道:

>
> 我接入了一个 RocketMQ 的流作为输入。
>
>
>  DataStream<Tuple3<Integer, String, String>> ds = env.addSource(new
> RocketMQSource(
> ........
>                         System.out.println(res);
>                         return res;
>                     }
>                 });
>
>
>         tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id,
> pick_list_no, sku_code");
>
>         TableSink<Row> csvSink = new CsvTableSink("D:\\data\\flink",",");
>         String[] fieldNames = {"num"};
>         TypeInformation[] fieldTypes = {Types.INT};
>         tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes,
> csvSink);
>         tableEnv.sqlUpdate(
>                 "INSERT INTO RubberOrders SELECT pick_task_id FROM
> t_pick_task");
>
>
>
> [hidden email]
>
> Sender: Alec Chen
> Send Time: 2019-08-08 21:01
> Receiver: user-zh
> Subject: Re: CsvTableSink 目录没有写入具体的数据
> 完整代码发一下
>
> [hidden email] <[hidden email]> 于2019年8月8日周四 下午7:37写道:
>
> >
> > 我按官网上的
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> > 例子写的代码
> > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
> >
> >
> >
> > [hidden email]
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: CsvTableSink 目录没有写入具体的数据

wanglei2@geekplus.com.cn

抱歉,是我搞错了。
实际上是写入数据的。我在 windows 下做测试,刷新下文件的大小始终是 0 , 只有编辑看下那个文件显示的文件大小才会变更。



[hidden email]
 
Sender: Alec Chen
Send Time: 2019-08-09 10:17
Receiver: user-zh
Subject: Re: Re: CsvTableSink 目录没有写入具体的数据
没数据是因为没有trigger执行,  参考sample code from doc(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html
)
 
// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalentlyStreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
// create a TableSinkTableSink sink = new
CsvTableSink("/path/to/file", fieldDelim = "|");
// register the TableSink with a specific schemaString[] fieldNames =
{"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT,
Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable",
fieldNames, fieldTypes, sink);
// compute a result Table using Table API operators and/or SQL
queriesTable result = ...// emit the result Table to the registered
TableSinkresult.insertInto("CsvSinkTable");
// execute the program
 
加上 tableEnv.execute();
 
 
[hidden email] <[hidden email]> 于2019年8月9日周五 上午9:42写道:
 

>
> 我接入了一个 RocketMQ 的流作为输入。
>
>
>  DataStream<Tuple3<Integer, String, String>> ds = env.addSource(new
> RocketMQSource(
> ........
>                         System.out.println(res);
>                         return res;
>                     }
>                 });
>
>
>         tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id,
> pick_list_no, sku_code");
>
>         TableSink<Row> csvSink = new CsvTableSink("D:\\data\\flink",",");
>         String[] fieldNames = {"num"};
>         TypeInformation[] fieldTypes = {Types.INT};
>         tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes,
> csvSink);
>         tableEnv.sqlUpdate(
>                 "INSERT INTO RubberOrders SELECT pick_task_id FROM
> t_pick_task");
>
>
>
> [hidden email]
>
> Sender: Alec Chen
> Send Time: 2019-08-08 21:01
> Receiver: user-zh
> Subject: Re: CsvTableSink 目录没有写入具体的数据
> 完整代码发一下
>
> [hidden email] <[hidden email]> 于2019年8月8日周四 下午7:37写道:
>
> >
> > 我按官网上的
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> > 例子写的代码
> > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
> >
> >
> >
> > [hidden email]
> >
>