老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka 存在问题,希望老师能够给解决疑惑。 1)Kafka生产数据: 2)pyflink 程序 #!/usr/bin/python3.7 # -*- coding: UTF-8 -*- from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_parallelism(1) s_env.enable_checkpointing(3000) st_env = StreamTableEnvironment.create(s_env, TableConfig()) st_env.use_catalog("default_catalog") st_env.use_database("default_database") st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com').property("bootstrap.servers", "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),DataTypes.FIELD("name", DataTypes.STRING())]))).with_schema(Schema().field("id", DataTypes.BIGINT()).field("name", DataTypes.STRING())).register_table_source("sourceKafka") fieldNames = ["id", "name"] fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()] csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, WriteMode.OVERWRITE) st_env.register_table_sink("csvTableSink", csvSink) resultQuery = st_env.sql_query("select id,name from sourceKafka") resultQuery = resultQuery.insert_into("csvTableSink") st_env.execute("pyflink-kafka-v2") 3)pyflink-shell.sh local 4)运行结果 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下: 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。 |
先看下kafka能否通过命令行消费数据.
命令行检查确保能消费,再使用Flink. 在 2021-01-30 14:25:57,"瞿叶奇" <[hidden email]> 写道: 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka 存在问题,希望老师能够给解决疑惑。 1)Kafka生产数据: 2)pyflink 程序 #!/usr/bin/python3.7 # -*- coding: UTF-8 -*- from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_parallelism(1) s_env.enable_checkpointing(3000) st_env = StreamTableEnvironment.create(s_env, TableConfig()) st_env.use_catalog("default_catalog") st_env.use_database("default_database") st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com').property("bootstrap.servers", "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),DataTypes.FIELD("name", DataTypes.STRING())]))).with_schema(Schema().field("id", DataTypes.BIGINT()).field("name", DataTypes.STRING())).register_table_source("sourceKafka") fieldNames = ["id", "name"] fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()] csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, WriteMode.OVERWRITE) st_env.register_table_sink("csvTableSink", csvSink) resultQuery = st_env.sql_query("select id,name from sourceKafka") resultQuery = resultQuery.insert_into("csvTableSink") st_env.execute("pyflink-kafka-v2") 3)pyflink-shell.sh local 4)运行结果 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下: 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。 |
老师,你好,消费是没有任何问题,可以正常消费。 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年1月30日(星期六) 下午3:08 收件人: "user-zh"<[hidden email]>; 主题: Re:问题求助(Pyflink) 命令行检查确保能消费,再使用Flink. 在 2021-01-30 14:25:57,"瞿叶奇" <[hidden email]> 写道: 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka 存在问题,希望老师能够给解决疑惑。 1)Kafka生产数据: 2)pyflink 程序 #!/usr/bin/python3.7 # -*- coding: UTF-8 -*- from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_parallelism(1) s_env.enable_checkpointing(3000) st_env = StreamTableEnvironment.create(s_env, TableConfig()) st_env.use_catalog("default_catalog") st_env.use_database("default_database") st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com').property("bootstrap.servers", "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),DataTypes.FIELD("name", DataTypes.STRING())]))).with_schema(Schema().field("id", DataTypes.BIGINT()).field("name", DataTypes.STRING())).register_table_source("sourceKafka") fieldNames = ["id", "name"] fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()] csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, WriteMode.OVERWRITE) st_env.register_table_sink("csvTableSink", csvSink) resultQuery = st_env.sql_query("select id,name from sourceKafka") resultQuery = resultQuery.insert_into("csvTableSink") st_env.execute("pyflink-kafka-v2") 3)pyflink-shell.sh local 4)运行结果 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下: 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。 |
你好,
可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka partition相关meta信息和认证相关是否成功的信息。 瞿叶奇 <[hidden email]> 于2021年1月30日周六 下午3:14写道: > 老师,你好,消费是没有任何问题,可以正常消费。 > > > > > ------------------ 原始邮件 ------------------ > *发件人:* "user-zh" <[hidden email]>; > *发送时间:* 2021年1月30日(星期六) 下午3:08 > *收件人:* "user-zh"<[hidden email]>; > *主题:* Re:问题求助(Pyflink) > > 先看下kafka能否通过命令行消费数据. > > 命令行检查确保能消费,再使用Flink. > > > > > > > > > > > > > > 在 2021-01-30 14:25:57,"瞿叶奇" <[hidden email]> 写道: > > 老师,您好,我想通过Flink消费kafka写本地csv文件,目前遇到的问题是Flink、Kafka都是kerberos认证的集群,而且我是用的是pyflink,现在程序能执行,不报错,但是不消费数据,csv文件没有结果,但是文件日期修改日期一直在更新。怀疑对接kafka > 存在问题,希望老师能够给解决疑惑。 > 1)Kafka生产数据: > > 2)pyflink 程序 > > > #!/usr/bin/python3.7 > # -*- coding: UTF-8 -*- > from pyflink.datastream import StreamExecutionEnvironment, > CheckpointingMode > from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, > CsvTableSink, WriteMode, SqlDialect > from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json > s_env = StreamExecutionEnvironment.get_execution_environment() > s_env.set_parallelism(1) > s_env.enable_checkpointing(3000) > st_env = StreamTableEnvironment.create(s_env, TableConfig()) > st_env.use_catalog("default_catalog") > st_env.use_database("default_database") > st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", > "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", > 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", > 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com').property("bootstrap.servers", > "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", > DataTypes.BIGINT()),DataTypes.FIELD("name", > DataTypes.STRING())]))).with_schema(Schema().field("id", > DataTypes.BIGINT()).field("name", > DataTypes.STRING())).register_table_source("sourceKafka") > fieldNames = ["id", "name"] > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()] > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", > 1, WriteMode.OVERWRITE) > st_env.register_table_sink("csvTableSink", csvSink) > resultQuery = st_env.sql_query("select id,name from sourceKafka") > resultQuery = resultQuery.insert_into("csvTableSink") > st_env.execute("pyflink-kafka-v2") > 3)pyflink-shell.sh local > > 4)运行结果 > 在pyflink-shell local运行程序的同时,启用生产者生产数据,查看结果文件如下: > > > 可以看出文件确实在被更新,文件的修改时间在变化,但是里面是空的,一方面希望pyflink可以增加打印到控制台的功能,一方面希望老师能给出对接kerberos认证的kafka的案例,我是陕西国网用电信息采集系统新架构改造的工程师,我们计划使用flink测试kafka-hdfs数据的数据传输。希望老师能给出一个案例,帮助我们完成测试。 > > > > |
Hi, 按照这个文档, 在flink-conf.yaml里配置了 security.kerberos.login.keytab 和 security.kerberos.login.principal这两个属性了吗? 还有就是jaas.conf文件在各个task manager所在的机器都能访问到吗? 瞿叶奇 <[hidden email]> 于2021年1月30日周六 下午4:15写道:
|
Shuiqiang Chen <[hidden email]> 于2021年1月30日周六 下午4:32写道:
|
Free forum by Nabble | Edit this page |