Re: pyflink连接kerberos的kafka问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: pyflink连接kerberos的kafka问题

Wei Zhong
Hi,

第一个问题应该是通过你现在的配置找不到对应的KDC realm, 可以继续尝试使用System.setProperty手动配置, 例如
System.setProperty("java.security.krb5.realm", "xxxx");
System.setProperty("java.security.krb5.kdc","xxxx”);

第二个问题, 'update-mode’=‘append'指的是只接受来自上游算子的append消息,而不是写文件时采用append模式。我想你可能想要配置的属性是'format.write-mode’='OVERWRITE’?


> 在 2021年2月2日,21:17,瞿叶奇 <[hidden email]> 写道:
>
> 老师,您好!
> 非常感谢您上次的解答,jaas.conf配置不在报错了,但是出现了新的问题,程序如下:
> #!/usr/bin/python3.7
> # -*- coding: UTF-8 -*-
> from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect
> from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
> from pyflink.java_gateway import get_gateway
> System = get_gateway().jvm.System
> System.setProperty("java.security.auth.login.config", "/opt/client2/Flink/flink/conf/jaas.conf")
> System.setProperty("java.security.krb5.conf", "/root/qyq_user/krb5.conf ")
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_parallelism(1)
> s_env.enable_checkpointing(30000)
> st_env = StreamTableEnvironment.create(s_env, TableConfig())
> st_env.use_catalog("default_catalog")
> st_env.use_database("default_database")
> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com').property('group.id' ,'example-group1').property("bootstrap.servers", "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),DataTypes.FIELD("name", DataTypes.STRING())]))).with_schema(Schema().field("id", DataTypes.BIGINT()).field("name", DataTypes.STRING())).create_temporary_table("sourceKafka")
> fieldNames = ["id", "name"]
> fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
> csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, WriteMode.OVERWRITE)
> st_env.register_table_sink("csvTableSink", csvSink)
> resultQuery = st_env.sql_query("select id,name from sourceKafka")
> resultQuery = resultQuery.insert_into("csvTableSink")
> st_env.execute("pyflink-kafka-v4")
> 新问题1):
> <[hidden email]>
> 我百度了一下,也没有什么类似能够解决的案例。
> 新问题2):
> 创建hdfs的sink表后只能写一次就报错了,(文件已存在的错误)错误如下:
> <[hidden email]>
> 建表如下:
> CREATE TABLE csvTableSink ( id BIGINT,name STRING) WITH ('connector.path'= 'hdfs://hacluster/flink/qyq_qyq13','connector.type'='filesystem','format.type'='csv','update-mode' = 'append')
> 我想问一下,我需要改动,才能保证新数据追加到文件呢?
>
>