大家好:
我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。 public static void main(String[] args) throws Exception{ StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.enableCheckpointing(10000); bsEnv.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv); String sqlSource = "CREATE TABLE source_kafka (\n" + " appName STRING,\n" + " appVersion STRING,\n" + " uploadTime STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = '0.10',\n" + " 'connector.topic' = 'mytest',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'connector.properties.group.id' = 'testGroup',\n" + " 'format.type'='json',\n" + " 'update-mode' = 'append' )"; tEnv.executeSql(sqlSource); String sql = "CREATE TABLE fs_table (\n" + " appName STRING,\n" + " appVersion STRING,\n" + " uploadTime STRING,\n" + " dt STRING," + " h string" + ") PARTITIONED BY (dt,h) WITH (\n" + " 'connector'='filesystem',\n" + " 'path'='hdfs://localhost/tmp/',\n" + " 'sink.partition-commit.policy.kind' = 'success-file', " + " 'format'='orc'\n" + ")"; tEnv.executeSql(sql); String insertSql = "insert into fs_table SELECT appName ,appVersion,uploadTime, " + " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka"; tEnv.executeSql(insertSql); } |
Hi,
默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢? 我用同样SQL在我的环境是有的。 Best, Jingsong On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang <[hidden email]> wrote: > 大家好: > 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢 > > > 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。 > > public static void main(String[] args) throws Exception{ > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > bsEnv.enableCheckpointing(10000); > bsEnv.setParallelism(1); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv); > > String sqlSource = "CREATE TABLE source_kafka (\n" + > " appName STRING,\n" + > " appVersion STRING,\n" + > " uploadTime STRING\n" + > ") WITH (\n" + > " 'connector.type' = 'kafka', \n" + > " 'connector.version' = '0.10',\n" + > " 'connector.topic' = 'mytest',\n" + > " 'connector.properties.zookeeper.connect' = > 'localhost:2181',\n" + > " 'connector.properties.bootstrap.servers' = > 'localhost:9092',\n" + > " 'connector.properties.group.id' = 'testGroup',\n" + > " 'format.type'='json',\n" + > " 'update-mode' = 'append' )"; > > tEnv.executeSql(sqlSource); > > String sql = "CREATE TABLE fs_table (\n" + > " appName STRING,\n" + > " appVersion STRING,\n" + > " uploadTime STRING,\n" + > " dt STRING," + > " h string" + > ") PARTITIONED BY (dt,h) WITH (\n" + > " 'connector'='filesystem',\n" + > " 'path'='hdfs://localhost/tmp/',\n" + > " 'sink.partition-commit.policy.kind' = > 'success-file', " + > " 'format'='orc'\n" + > ")"; > tEnv.executeSql(sql); > > String insertSql = "insert into fs_table SELECT appName > ,appVersion,uploadTime, " + > " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka"; > > tEnv.executeSql(insertSql); > > } > -- Best, Jingsong Lee |
hi,jinsong:
在我的测试环境下,对于我贴出来的那个代码。 1.如果source使用的有界的数据,比如bsEnv.fromElements(...),这样会有success文件生成,如果是kafka数据,就不行。 2.如果设置程序的并行度是大于1,那么也会有success生成。 3.如果我写入的是local file,比如 file:///tmp/aaa ,而不是hdfs,也会有success文件生成。 综上,在并行度设置为1,消费的是kafka的永不停止的数据,写入的是hdfs,我的checkpoint设置是10s,这种情况下,我测试了好多遍,都没有success文件生成。 Jingsong Li <[hidden email]> 于2020年7月10日周五 下午2:54写道: > Hi, > > 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢? > 我用同样SQL在我的环境是有的。 > > Best, > Jingsong > > On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang <[hidden email]> > wrote: > > > 大家好: > > 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢 > > > > > > > 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。 > > > > public static void main(String[] args) throws Exception{ > > StreamExecutionEnvironment bsEnv = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > bsEnv.enableCheckpointing(10000); > > bsEnv.setParallelism(1); > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv); > > > > String sqlSource = "CREATE TABLE source_kafka (\n" + > > " appName STRING,\n" + > > " appVersion STRING,\n" + > > " uploadTime STRING\n" + > > ") WITH (\n" + > > " 'connector.type' = 'kafka', \n" + > > " 'connector.version' = '0.10',\n" + > > " 'connector.topic' = 'mytest',\n" + > > " 'connector.properties.zookeeper.connect' = > > 'localhost:2181',\n" + > > " 'connector.properties.bootstrap.servers' = > > 'localhost:9092',\n" + > > " 'connector.properties.group.id' = 'testGroup',\n" + > > " 'format.type'='json',\n" + > > " 'update-mode' = 'append' )"; > > > > tEnv.executeSql(sqlSource); > > > > String sql = "CREATE TABLE fs_table (\n" + > > " appName STRING,\n" + > > " appVersion STRING,\n" + > > " uploadTime STRING,\n" + > > " dt STRING," + > > " h string" + > > ") PARTITIONED BY (dt,h) WITH (\n" + > > " 'connector'='filesystem',\n" + > > " 'path'='hdfs://localhost/tmp/',\n" + > > " 'sink.partition-commit.policy.kind' = > > 'success-file', " + > > " 'format'='orc'\n" + > > ")"; > > tEnv.executeSql(sql); > > > > String insertSql = "insert into fs_table SELECT appName > > ,appVersion,uploadTime, " + > > " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), > > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka"; > > > > tEnv.executeSql(insertSql); > > > > } > > > > > -- > Best, Jingsong Lee > |
Hi, Jun,
非常感谢详细充分的测试~ 接下来我复现排查下~ Best, Jingsong On Fri, Jul 10, 2020 at 3:09 PM Jun Zhang <[hidden email]> wrote: > hi,jinsong: > > 在我的测试环境下,对于我贴出来的那个代码。 > > 1.如果source使用的有界的数据,比如bsEnv.fromElements(...),这样会有success文件生成,如果是kafka数据,就不行。 > 2.如果设置程序的并行度是大于1,那么也会有success生成。 > 3.如果我写入的是local file,比如 file:///tmp/aaa ,而不是hdfs,也会有success文件生成。 > > > 综上,在并行度设置为1,消费的是kafka的永不停止的数据,写入的是hdfs,我的checkpoint设置是10s,这种情况下,我测试了好多遍,都没有success文件生成。 > > Jingsong Li <[hidden email]> 于2020年7月10日周五 下午2:54写道: > > > Hi, > > > > 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢? > > 我用同样SQL在我的环境是有的。 > > > > Best, > > Jingsong > > > > On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang <[hidden email]> > > wrote: > > > > > 大家好: > > > 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢 > > > > > > > > > > > > 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。 > > > > > > public static void main(String[] args) throws Exception{ > > > StreamExecutionEnvironment bsEnv = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > bsEnv.enableCheckpointing(10000); > > > bsEnv.setParallelism(1); > > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv); > > > > > > String sqlSource = "CREATE TABLE source_kafka (\n" + > > > " appName STRING,\n" + > > > " appVersion STRING,\n" + > > > " uploadTime STRING\n" + > > > ") WITH (\n" + > > > " 'connector.type' = 'kafka', \n" + > > > " 'connector.version' = '0.10',\n" + > > > " 'connector.topic' = 'mytest',\n" + > > > " 'connector.properties.zookeeper.connect' = > > > 'localhost:2181',\n" + > > > " 'connector.properties.bootstrap.servers' = > > > 'localhost:9092',\n" + > > > " 'connector.properties.group.id' = > 'testGroup',\n" + > > > " 'format.type'='json',\n" + > > > " 'update-mode' = 'append' )"; > > > > > > tEnv.executeSql(sqlSource); > > > > > > String sql = "CREATE TABLE fs_table (\n" + > > > " appName STRING,\n" + > > > " appVersion STRING,\n" + > > > " uploadTime STRING,\n" + > > > " dt STRING," + > > > " h string" + > > > ") PARTITIONED BY (dt,h) WITH (\n" + > > > " 'connector'='filesystem',\n" + > > > " 'path'='hdfs://localhost/tmp/',\n" + > > > " 'sink.partition-commit.policy.kind' = > > > 'success-file', " + > > > " 'format'='orc'\n" + > > > ")"; > > > tEnv.executeSql(sql); > > > > > > String insertSql = "insert into fs_table SELECT appName > > > ,appVersion,uploadTime, " + > > > " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), > > > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka"; > > > > > > tEnv.executeSql(insertSql); > > > > > > } > > > > > > > > > -- > > Best, Jingsong Lee > > > -- Best, Jingsong Lee |
此外,如果设置并行度是大于1,虽然可以生成success文件,但是貌似不是第一次checkpoint结束的时候就生成了,我反复测试之后,好像是不固定的时间,比如可能是第5次,也可能是第10次checkpoint之后才生成的。
Jingsong Li <[hidden email]> 于2020年7月10日周五 下午3:35写道: > Hi, Jun, > > 非常感谢详细充分的测试~ > > 接下来我复现排查下~ > > Best, > Jingsong > > On Fri, Jul 10, 2020 at 3:09 PM Jun Zhang <[hidden email]> > wrote: > > > hi,jinsong: > > > > 在我的测试环境下,对于我贴出来的那个代码。 > > > > > 1.如果source使用的有界的数据,比如bsEnv.fromElements(...),这样会有success文件生成,如果是kafka数据,就不行。 > > 2.如果设置程序的并行度是大于1,那么也会有success生成。 > > 3.如果我写入的是local file,比如 file:///tmp/aaa ,而不是hdfs,也会有success文件生成。 > > > > > > > 综上,在并行度设置为1,消费的是kafka的永不停止的数据,写入的是hdfs,我的checkpoint设置是10s,这种情况下,我测试了好多遍,都没有success文件生成。 > > > > Jingsong Li <[hidden email]> 于2020年7月10日周五 下午2:54写道: > > > > > Hi, > > > > > > 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢? > > > 我用同样SQL在我的环境是有的。 > > > > > > Best, > > > Jingsong > > > > > > On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang <[hidden email]> > > > wrote: > > > > > > > 大家好: > > > > 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢 > > > > > > > > > > > > > > > > > > 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。 > > > > > > > > public static void main(String[] args) throws Exception{ > > > > StreamExecutionEnvironment bsEnv = > > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > bsEnv.enableCheckpointing(10000); > > > > bsEnv.setParallelism(1); > > > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv); > > > > > > > > String sqlSource = "CREATE TABLE source_kafka (\n" + > > > > " appName STRING,\n" + > > > > " appVersion STRING,\n" + > > > > " uploadTime STRING\n" + > > > > ") WITH (\n" + > > > > " 'connector.type' = 'kafka', \n" + > > > > " 'connector.version' = '0.10',\n" + > > > > " 'connector.topic' = 'mytest',\n" + > > > > " 'connector.properties.zookeeper.connect' = > > > > 'localhost:2181',\n" + > > > > " 'connector.properties.bootstrap.servers' = > > > > 'localhost:9092',\n" + > > > > " 'connector.properties.group.id' = > > 'testGroup',\n" + > > > > " 'format.type'='json',\n" + > > > > " 'update-mode' = 'append' )"; > > > > > > > > tEnv.executeSql(sqlSource); > > > > > > > > String sql = "CREATE TABLE fs_table (\n" + > > > > " appName STRING,\n" + > > > > " appVersion STRING,\n" + > > > > " uploadTime STRING,\n" + > > > > " dt STRING," + > > > > " h string" + > > > > ") PARTITIONED BY (dt,h) WITH (\n" + > > > > " 'connector'='filesystem',\n" + > > > > " 'path'='hdfs://localhost/tmp/',\n" + > > > > " 'sink.partition-commit.policy.kind' = > > > > 'success-file', " + > > > > " 'format'='orc'\n" + > > > > ")"; > > > > tEnv.executeSql(sql); > > > > > > > > String insertSql = "insert into fs_table SELECT appName > > > > ,appVersion,uploadTime, " + > > > > " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), > > > > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka"; > > > > > > > > tEnv.executeSql(insertSql); > > > > > > > > } > > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > > > -- > Best, Jingsong Lee > |
你好,
我这边同样的代码,并没有出现类似的问题 是本地跑么,可以提供下日志信息么? |
hi,夏帅:
抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。 你测试没有问题的情况并行度是 1 吗?写入hdfs? 夏帅 <[hidden email]> 于2020年7月10日周五 下午5:39写道: > 你好, > 我这边同样的代码,并没有出现类似的问题 > 是本地跑么,可以提供下日志信息么? > > |
hi,jinsong:
这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 Jun Zhang <[hidden email]> 于2020年7月23日周四 上午11:15写道: > hi,夏帅: > > 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。 > > 你测试没有问题的情况并行度是 1 吗?写入hdfs? > > 夏帅 <[hidden email]> 于2020年7月10日周五 下午5:39写道: > >> 你好, >> 我这边同样的代码,并没有出现类似的问题 >> 是本地跑么,可以提供下日志信息么? >> >> |
hi,jinsong:
这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 Jun Zhang <[hidden email]> 于2020年7月23日周四 上午11:34写道: > hi,jinsong: > > 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 > > Jun Zhang <[hidden email]> 于2020年7月23日周四 上午11:15写道: > >> hi,夏帅: >> >> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。 >> >> 你测试没有问题的情况并行度是 1 吗?写入hdfs? >> >> 夏帅 <[hidden email]> 于2020年7月10日周五 下午5:39写道: >> >>> 你好, >>> 我这边同样的代码,并没有出现类似的问题 >>> 是本地跑么,可以提供下日志信息么? >>> >>> |
In reply to this post by Jun Zhang
相同操作我也没有复现。。是可以成功执行的
你的HDFS是什么版本?是否可以考虑换个来测试下 On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang <[hidden email]> wrote: > hi,jinsong: > > 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 > > Jun Zhang <[hidden email]> 于2020年7月23日周四 上午11:15写道: > >> hi,夏帅: >> >> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。 >> >> 你测试没有问题的情况并行度是 1 吗?写入hdfs? >> >> 夏帅 <[hidden email]> 于2020年7月10日周五 下午5:39写道: >> >>> 你好, >>> 我这边同样的代码,并没有出现类似的问题 >>> 是本地跑么,可以提供下日志信息么? >>> >>> -- Best, Jingsong Lee |
hi,jinsong
我们生产环境hdfs是cdh 2.6的,我换了一个hadoop 3 版本的hdfs,还真没问题了,不知道是哪里出问题了。 Jingsong Li <[hidden email]> 于2020年7月23日周四 上午11:45写道: > 相同操作我也没有复现。。是可以成功执行的 > > 你的HDFS是什么版本?是否可以考虑换个来测试下 > > On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang <[hidden email]> > wrote: > >> hi,jinsong: >> >> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 >> >> Jun Zhang <[hidden email]> 于2020年7月23日周四 上午11:15写道: >> >>> hi,夏帅: >>> >>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。 >>> >>> 你测试没有问题的情况并行度是 1 吗?写入hdfs? >>> >>> 夏帅 <[hidden email]> 于2020年7月10日周五 下午5:39写道: >>> >>>> 你好, >>>> 我这边同样的代码,并没有出现类似的问题 >>>> 是本地跑么,可以提供下日志信息么? >>>> >>>> > > -- > Best, Jingsong Lee > |
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html
应该是这个原因 General Important Note 1: When using Hadoop < 2.7, please use the OnCheckpointRollingPolicy which rolls part files on every checkpoint. The reason is that if part files “traverse” the checkpoint interval, then, upon recovery from a failure the StreamingFileSink may use the truncate() method of the filesystem to discard uncommitted data from the in-progress file. This method is not supported by pre-2.7 Hadoop versions and Flink will throw an exception. 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用 发件人: Jun Zhang<mailto:[hidden email]> 发送时间: 2020年7月23日 12:55 收件人: Jingsong Li<mailto:[hidden email]> 抄送: user-zh<mailto:[hidden email]> 主题: Re: flink 1.11 使用sql写入hdfs无法自动提交分区 hi,jinsong 我们生产环境hdfs是cdh 2.6的,我换了一个hadoop 3 版本的hdfs,还真没问题了,不知道是哪里出问题了。 Jingsong Li <[hidden email]> 于2020年7月23日周四 上午11:45写道: > 相同操作我也没有复现。。是可以成功执行的 > > 你的HDFS是什么版本?是否可以考虑换个来测试下 > > On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang <[hidden email]> > wrote: > >> hi,jinsong: >> >> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 >> >> Jun Zhang <[hidden email]> 于2020年7月23日周四 上午11:15写道: >> >>> hi,夏帅: >>> >>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。 >>> >>> 你测试没有问题的情况并行度是 1 吗?写入hdfs? >>> >>> 夏帅 <[hidden email]> 于2020年7月10日周五 下午5:39写道: >>> >>>> 你好, >>>> 我这边同样的代码,并没有出现类似的问题 >>>> 是本地跑么,可以提供下日志信息么? >>>> >>>> > > -- > Best, Jingsong Lee > |
Free forum by Nabble | Edit this page |