1. 程序
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String sourceTableDDL = "CREATE TABLE fs_table (" + " user_id STRING," + " order_amount DOUBLE," + " dt TIMESTAMP(3)," + " pt AS PROCTIME() " + " ) WITH (" + " 'connector'='filesystem'," + " 'path'='D:\\Program Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," + " 'format'='csv'" + " )"; bsTableEnv.executeSql(sourceTableDDL); bsTableEnv.executeSql("select * from fs_table").print(); 2. csv文件 order.csv zhangsan,12.34,2020-08-03 12:23:50 lisi,234.67,2020-08-03 12:25:50 wangwu,57.6,2020-08-03 12:25:50 zhaoliu,345,2020-08-03 12:28:50 3. 错误 - Source: FileSystemTableSource(user_id, order_amount, dt, pt) -> Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) -> SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED. java.io.IOException: Failed to deserialize CSV row. at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299) at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected but was 3. at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441) at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244) at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293) ... 5 more |
Hi, Asahi
这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复 Best Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-18665 <https://issues.apache.org/jira/browse/FLINK-18665> > 在 2020年7月23日,00:07,Asahi Lee <[hidden email]> 写道: > > 1. 程序 > StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > > > String sourceTableDDL = "CREATE TABLE fs_table (" + > " user_id STRING," + > " order_amount DOUBLE," + > " dt TIMESTAMP(3)," + > " pt AS PROCTIME() " + > " ) WITH (" + > " 'connector'='filesystem'," + > " 'path'='D:\\Program Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," + > " 'format'='csv'" + > " )"; > > > bsTableEnv.executeSql(sourceTableDDL); > bsTableEnv.executeSql("select * from fs_table").print(); > 2. csv文件 > order.csv > zhangsan,12.34,2020-08-03 12:23:50 > lisi,234.67,2020-08-03 12:25:50 > wangwu,57.6,2020-08-03 12:25:50 > zhaoliu,345,2020-08-03 12:28:50 > > > > 3. 错误 > - Source: FileSystemTableSource(user_id, order_amount, dt, pt) -> Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) -> SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED. > java.io.IOException: Failed to deserialize CSV row. > at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299) > at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210) > at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected but was 3. > at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441) > at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244) > at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293) > ... 5 more |
使用filesystem读取csv作为源,使用流环境,为什么我的程序一执行就停止,而不是等待文件的追加写入,继续计算呢?
还是filesystem只能用于批操作? ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年7月23日(星期四) 上午9:55 收件人: "user-zh"<[hidden email]>; 主题: Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误 Hi, Asahi 这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复 Best Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-18665 <https://issues.apache.org/jira/browse/FLINK-18665> > 在 2020年7月23日,00:07,Asahi Lee <[hidden email]> 写道: > > 1. 程序 > StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > &nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > &nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > > > &nbsp; &nbsp; &nbsp; &nbsp; String sourceTableDDL = "CREATE TABLE fs_table (" + > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; user_id STRING," + > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; order_amount DOUBLE," + > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; dt TIMESTAMP(3)," + > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; pt AS PROCTIME() " + > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " ) WITH (" + > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 'connector'='filesystem'," + > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 'path'='D:\\Program Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," + > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 'format'='csv'" + > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " )"; > > > &nbsp; &nbsp; &nbsp; &nbsp; bsTableEnv.executeSql(sourceTableDDL); > &nbsp; &nbsp; &nbsp; &nbsp; bsTableEnv.executeSql("select * from fs_table").print(); > 2. csv文件 > order.csv > zhangsan,12.34,2020-08-03 12:23:50 > lisi,234.67,2020-08-03 12:25:50 > wangwu,57.6,2020-08-03 12:25:50 > zhaoliu,345,2020-08-03 12:28:50 > > > > 3. 错误 > &nbsp;- Source: FileSystemTableSource(user_id, order_amount, dt, pt) -&gt; Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) -&gt; SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED. > java.io.IOException: Failed to deserialize CSV row. > at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299) > at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210) > at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected but was 3. > at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441) > at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244) > at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293) > ... 5 more |
Hi,
Filesystem connector 支持streaming 写入,streaming 读取 还未支持,所以读取完了就停止。支持streaming 写入从文档上看[1]应该是有计划的 [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html> > 在 2020年7月23日,22:05,Asahi Lee <[hidden email]> 写道: > > 使用filesystem读取csv作为源,使用流环境,为什么我的程序一执行就停止,而不是等待文件的追加写入,继续计算呢? > 还是filesystem只能用于批操作? > > > > > ------------------ 原始邮件 ------------------ > 发件人: "user-zh" <[hidden email] <mailto:[hidden email]>>; > 发送时间: 2020年7月23日(星期四) 上午9:55 > 收件人: "user-zh"<[hidden email] <mailto:[hidden email]>>; > > 主题: Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误 > > > > Hi, Asahi > > 这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复 > > > Best > Leonard Xu > [1] https://issues.apache.org/jira/browse/FLINK-18665 <https://issues.apache.org/jira/browse/FLINK-18665> <https://issues.apache.org/jira/browse/FLINK-18665> <https://issues.apache.org/jira/browse/FLINK-18665>>; > > > 在 2020年7月23日,00:07,Asahi Lee <[hidden email] <mailto:[hidden email]>> 写道: > > > > 1. 程序 > > StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > > &nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > &nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > > > > > > &nbsp; &nbsp; &nbsp; &nbsp; String sourceTableDDL = "CREATE TABLE fs_table (" + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; user_id STRING," + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; order_amount DOUBLE," + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; dt TIMESTAMP(3)," + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; pt AS PROCTIME() " + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " ) WITH (" + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 'connector'='filesystem'," + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 'path'='D:\\Program Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 'format'='csv'" + > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " )"; > > > > > > &nbsp; &nbsp; &nbsp; &nbsp; bsTableEnv.executeSql(sourceTableDDL); > > &nbsp; &nbsp; &nbsp; &nbsp; bsTableEnv.executeSql("select * from fs_table").print(); > > 2. csv文件 > > order.csv > > zhangsan,12.34,2020-08-03 12:23:50 > > lisi,234.67,2020-08-03 12:25:50 > > wangwu,57.6,2020-08-03 12:25:50 > > zhaoliu,345,2020-08-03 12:28:50 > > > > > > > > 3. 错误 > > &nbsp;- Source: FileSystemTableSource(user_id, order_amount, dt, pt) -&gt; Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) -&gt; SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED. > > java.io.IOException: Failed to deserialize CSV row. > > at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299) > > at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210) > > at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) > > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > > Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected but was 3. > > at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441) > > at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244) > > at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293) > > ... 5 more |
和hive结合下,filesystem是支持流式读取的,可以参考 [1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/hive/hive_streaming.html#streaming-reading Leonard Xu <[hidden email]> 于2020年7月23日周四 下午10:28写道: > Hi, > > Filesystem connector 支持streaming 写入,streaming 读取 > 还未支持,所以读取完了就停止。支持streaming 写入从文档上看[1]应该是有计划的 > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html > < > https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html > > > > > > 在 2020年7月23日,22:05,Asahi Lee <[hidden email]> 写道: > > > > 使用filesystem读取csv作为源,使用流环境,为什么我的程序一执行就停止,而不是等待文件的追加写入,继续计算呢? > > 还是filesystem只能用于批操作? > > > > > > > > > > ------------------ 原始邮件 ------------------ > > 发件人: > "user-zh" > < > [hidden email] <mailto:[hidden email]>>; > > 发送时间: 2020年7月23日(星期四) 上午9:55 > > 收件人: "user-zh"<[hidden email] <mailto: > [hidden email]>>; > > > > 主题: Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误 > > > > > > > > Hi, Asahi > > > > 这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复 > > > > > > Best > > Leonard Xu > > [1] https://issues.apache.org/jira/browse/FLINK-18665 < > https://issues.apache.org/jira/browse/FLINK-18665> < > https://issues.apache.org/jira/browse/FLINK-18665> < > https://issues.apache.org/jira/browse/FLINK-18665>>; > > > > > 在 2020年7月23日,00:07,Asahi Lee <[hidden email] <mailto: > [hidden email]>> 写道: > > > > > > 1. 程序 > > > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > &nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings > bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > > &nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment > bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > > > > > > > > > &nbsp; &nbsp; &nbsp; &nbsp; String sourceTableDDL = > "CREATE TABLE fs_table (" + > > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; user_id STRING," + > > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; order_amount DOUBLE," + > > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; dt TIMESTAMP(3)," + > > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; pt AS PROCTIME() " + > > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; " ) WITH (" + > > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; 'connector'='filesystem'," + > > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; 'path'='D:\\Program > Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," > + > > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; "&nbsp; 'format'='csv'" + > > > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; > &nbsp; &nbsp; " )"; > > > > > > > > > &nbsp; &nbsp; &nbsp; &nbsp; > bsTableEnv.executeSql(sourceTableDDL); > > > &nbsp; &nbsp; &nbsp; &nbsp; > bsTableEnv.executeSql("select * from fs_table").print(); > > > 2. csv文件 > > > order.csv > > > zhangsan,12.34,2020-08-03 12:23:50 > > > lisi,234.67,2020-08-03 12:25:50 > > > wangwu,57.6,2020-08-03 12:25:50 > > > zhaoliu,345,2020-08-03 12:28:50 > > > > > > > > > > > > 3. 错误 > > > &nbsp;- Source: FileSystemTableSource(user_id, order_amount, > dt, pt) -&gt; Calc(select=[user_id, order_amount, dt, > PROCTIME_MATERIALIZE(()) AS pt]) -&gt; SinkConversionToRow (4/6) > (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED. > > > java.io.IOException: Failed to deserialize CSV row. > > > at > org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299) > > > at > org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210) > > > at > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) > > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > > > Caused by: java.lang.RuntimeException: Row length mismatch. 4 > fields expected but was 3. > > > at > org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441) > > > at > org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244) > > > at > org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293) > > > ... 5 more > > |
Free forum by Nabble | Edit this page |