附测试程序,希望老师给出解决方法。测试时发现只更新csv文件的修改日期,但没有实际数据录入。怀疑kafka连接还存在问题。
#!/usr/bin/python3.7 # -*- coding: UTF-8 -*- from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_parallelism(1) s_env.enable_checkpointing(3000) st_env = StreamTableEnvironment.create(s_env, TableConfig()) st_env.use_catalog("default_catalog") st_env.use_database("default_database") st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com').property("bootstrap.servers", "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),DataTypes.FIELD("name", DataTypes.STRING())]))).with_schema(Schema().field("id", DataTypes.BIGINT()).field("name", DataTypes.STRING())).register_table_source("sourceKafka") fieldNames = ["id", "name"] fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()] csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, WriteMode.OVERWRITE) st_env.register_table_sink("csvTableSink", csvSink) resultQuery = st_env.sql_query("select id,name from sourceKafka") resultQuery = resultQuery.insert_into("csvTableSink") st_env.execute("pyflink-kafka-v2") |
Hi,
看你之前发的邮件,你现在是把kerberos相关的配置放在某一个flink-conf.yaml里,然后启动了一个local模式吧? 但是local模式的pyflink shell是不会主动读取任何flink-conf.yaml的。需要配置环境变量FLINK_HOME,将相关配置写入$FLINK_HOME/conf/flink-conf.yaml里,并且只有在提交job时候(flink run、remote模式或者yarn模式)才会去读取flink-conf.yaml里的内容。 如果执意要在local模式下尝试,可以通过以下代码: from pyflink.java_gateway import get_gateway System = get_gateway().jvm.System 拿到java中的System对象,然后按照java中的方式进行配置。 > 在 2021年1月30日,13:58,瞿叶奇 <[hidden email]> 写道: > > 附测试程序,希望老师给出解决方法。测试时发现只更新csv文件的修改日期,但没有实际数据录入。怀疑kafka连接还存在问题。 > #!/usr/bin/python3.7 > # -*- coding: UTF-8 -*- > from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode > from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect > from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json > s_env = StreamExecutionEnvironment.get_execution_environment() > s_env.set_parallelism(1) > s_env.enable_checkpointing(3000) > st_env = StreamTableEnvironment.create(s_env, TableConfig()) > st_env.use_catalog("default_catalog") > st_env.use_database("default_database") > st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com').property("bootstrap.servers", "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),DataTypes.FIELD("name", DataTypes.STRING())]))).with_schema(Schema().field("id", DataTypes.BIGINT()).field("name", DataTypes.STRING())).register_table_source("sourceKafka") > fieldNames = ["id", "name"] > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()] > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, WriteMode.OVERWRITE) > st_env.register_table_sink("csvTableSink", csvSink) > resultQuery = st_env.sql_query("select id,name from sourceKafka") > resultQuery = resultQuery.insert_into("csvTableSink") > st_env.execute("pyflink-kafka-v2") |
Free forum by Nabble | Edit this page |