请问pyflink如何跟kerberos认证的kafka对接

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

请问pyflink如何跟kerberos认证的kafka对接

瞿叶奇
附测试程序,希望老师给出解决方法。测试时发现只更新csv文件的修改日期,但没有实际数据录入。怀疑kafka连接还存在问题。
#!/usr/bin/python3.7
# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect
from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.enable_checkpointing(3000)
st_env = StreamTableEnvironment.create(s_env, TableConfig())
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com').property("bootstrap.servers", "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),DataTypes.FIELD("name", DataTypes.STRING())]))).with_schema(Schema().field("id", DataTypes.BIGINT()).field("name", DataTypes.STRING())).register_table_source("sourceKafka")
fieldNames = ["id", "name"]
fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, WriteMode.OVERWRITE)
st_env.register_table_sink("csvTableSink", csvSink)
resultQuery = st_env.sql_query("select id,name from sourceKafka")
resultQuery = resultQuery.insert_into("csvTableSink")
st_env.execute("pyflink-kafka-v2")
Reply | Threaded
Open this post in threaded view
|

Re: 请问pyflink如何跟kerberos认证的kafka对接

Wei Zhong
Hi,

看你之前发的邮件,你现在是把kerberos相关的配置放在某一个flink-conf.yaml里,然后启动了一个local模式吧?

但是local模式的pyflink shell是不会主动读取任何flink-conf.yaml的。需要配置环境变量FLINK_HOME,将相关配置写入$FLINK_HOME/conf/flink-conf.yaml里,并且只有在提交job时候(flink run、remote模式或者yarn模式)才会去读取flink-conf.yaml里的内容。

如果执意要在local模式下尝试,可以通过以下代码:
from pyflink.java_gateway import get_gateway
System = get_gateway().jvm.System
拿到java中的System对象,然后按照java中的方式进行配置。

> 在 2021年1月30日,13:58,瞿叶奇 <[hidden email]> 写道:
>
> 附测试程序,希望老师给出解决方法。测试时发现只更新csv文件的修改日期,但没有实际数据录入。怀疑kafka连接还存在问题。
> #!/usr/bin/python3.7
> # -*- coding: UTF-8 -*-
> from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect
> from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_parallelism(1)
> s_env.enable_checkpointing(3000)
> st_env = StreamTableEnvironment.create(s_env, TableConfig())
> st_env.use_catalog("default_catalog")
> st_env.use_database("default_database")
> st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com').property("bootstrap.servers", "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),DataTypes.FIELD("name", DataTypes.STRING())]))).with_schema(Schema().field("id", DataTypes.BIGINT()).field("name", DataTypes.STRING())).register_table_source("sourceKafka")
> fieldNames = ["id", "name"]
> fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()]
> csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, WriteMode.OVERWRITE)
> st_env.register_table_sink("csvTableSink", csvSink)
> resultQuery = st_env.sql_query("select id,name from sourceKafka")
> resultQuery = resultQuery.insert_into("csvTableSink")
> st_env.execute("pyflink-kafka-v2")