Hi,
第一个问题应该是通过你现在的配置找不到对应的KDC realm, 可以继续尝试使用System.setProperty手动配置, 例如 System.setProperty("java.security.krb5.realm", "xxxx"); System.setProperty("java.security.krb5.kdc","xxxx”); 第二个问题, 'update-mode’=‘append'指的是只接受来自上游算子的append消息,而不是写文件时采用append模式。我想你可能想要配置的属性是'format.write-mode’='OVERWRITE’? > 在 2021年2月2日,21:17,瞿叶奇 <[hidden email]> 写道: > > 老师,您好! > 非常感谢您上次的解答,jaas.conf配置不在报错了,但是出现了新的问题,程序如下: > #!/usr/bin/python3.7 > # -*- coding: UTF-8 -*- > from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode > from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, CsvTableSink, WriteMode, SqlDialect > from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json > from pyflink.java_gateway import get_gateway > System = get_gateway().jvm.System > System.setProperty("java.security.auth.login.config", "/opt/client2/Flink/flink/conf/jaas.conf") > System.setProperty("java.security.krb5.conf", "/root/qyq_user/krb5.conf ") > s_env = StreamExecutionEnvironment.get_execution_environment() > s_env.set_parallelism(1) > s_env.enable_checkpointing(30000) > st_env = StreamTableEnvironment.create(s_env, TableConfig()) > st_env.use_catalog("default_catalog") > st_env.use_database("default_database") > st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", 'kafka').property("kerberos.domain.name", 'hadoop.hadoop.com').property('group.id' ,'example-group1').property("bootstrap.servers", "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),DataTypes.FIELD("name", DataTypes.STRING())]))).with_schema(Schema().field("id", DataTypes.BIGINT()).field("name", DataTypes.STRING())).create_temporary_table("sourceKafka") > fieldNames = ["id", "name"] > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()] > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, WriteMode.OVERWRITE) > st_env.register_table_sink("csvTableSink", csvSink) > resultQuery = st_env.sql_query("select id,name from sourceKafka") > resultQuery = resultQuery.insert_into("csvTableSink") > st_env.execute("pyflink-kafka-v4") > 新问题1): > <[hidden email]> > 我百度了一下,也没有什么类似能够解决的案例。 > 新问题2): > 创建hdfs的sink表后只能写一次就报错了,(文件已存在的错误)错误如下: > <[hidden email]> > 建表如下: > CREATE TABLE csvTableSink ( id BIGINT,name STRING) WITH ('connector.path'= 'hdfs://hacluster/flink/qyq_qyq13','connector.type'='filesystem','format.type'='csv','update-mode' = 'append') > 我想问一下,我需要改动,才能保证新数据追加到文件呢? > > |
Free forum by Nabble | Edit this page |