flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
1、版本说明 flink版本:1.10.2 kafka版本:1.1.0 2、kafka鉴权说明 仅使用了sasl鉴权方式 在kafka客户端有配置 kafka_server-jass.conf、 server.properties、producer.properties、consumer.properties 3、主要配置参数 sasl.mechanism=PLAIN security.protocol=SASL_PLAINTEXT sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule required username="xx" password="xx-secret"; 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。 4、用于flink SQL连接的jar包 flink-sql-connector-kafka_2.11-1.10.2.jar flink-jdbc_2.11-1.10.2.jar flink-csv-1.10.2-sql-jar.jar 5、我的思路 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。 6、启动客户端 ./bin/sql-client.sh embedded -l sql_lib/ 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包 7、建表语句: create table test_hello ( name string ) with ( ... ... 'connector.properties.sasl.mechanism' = 'PLAIN', 'connector.properties.security.protocol' = 'SASL_PLAINTEXT', 'connector.properties.sasl.jaas.config' = 'org.apache.kafka.comon.security.plain.PlainLoginModule required username="xx" password="xx-secret";', 'format.type' = 'csv' ); 建表没有问题,可以正常建表。 查询表的时候,就会报错,select * from test_hello; 报错如下: could not execute sql statement. Reason: javax.security.auth.login.loginException: unable to find loginModule class: org.apache.kafka.common.security.plain.PlainLoginModule 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因? kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。 |
我感觉还是jar的问题。如下尝试下,我懒得去试了。
将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为 org.apache.flink.kafka.shaded.org.apache.kafka.common.securi ty.plain.PlainLoginModule 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。 Carmen Free <[hidden email]> 于2021年1月5日周二 下午5:09写道: > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的 > > 1、版本说明 > flink版本:1.10.2 > kafka版本:1.1.0 > > 2、kafka鉴权说明 > 仅使用了sasl鉴权方式 > 在kafka客户端有配置 kafka_server-jass.conf、 > server.properties、producer.properties、consumer.properties > > 3、主要配置参数 > sasl.mechanism=PLAIN > security.protocol=SASL_PLAINTEXT > sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule > required username="xx" password="xx-secret"; > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。 > > 4、用于flink SQL连接的jar包 > flink-sql-connector-kafka_2.11-1.10.2.jar > flink-jdbc_2.11-1.10.2.jar > flink-csv-1.10.2-sql-jar.jar > > > 5、我的思路 > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。 > > 6、启动客户端 > ./bin/sql-client.sh embedded -l sql_lib/ > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包 > > > 7、建表语句: > create table test_hello ( > name string > ) with ( > ... > ... > 'connector.properties.sasl.mechanism' = 'PLAIN', > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT', > 'connector.properties.sasl.jaas.config' = > 'org.apache.kafka.comon.security.plain.PlainLoginModule required > username="xx" password="xx-secret";', > 'format.type' = 'csv' > ); > > 建表没有问题,可以正常建表。 > > 查询表的时候,就会报错,select * from test_hello; > 报错如下: > could not execute sql statement. Reason: > javax.security.auth.login.loginException: unable to find loginModule class: > org.apache.kafka.common.security.plain.PlainLoginModule > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因? > > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。 > |
感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。
紧接着我这边出现了新的异常 org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source:UNKONWN; line: -1, column: -1;] 这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。 但是如果遇到了kafka消息为空的情况,这边不能处理吗? 赵一旦 <[hidden email]> 于2021年1月5日周二 下午9:18写道: > 我感觉还是jar的问题。如下尝试下,我懒得去试了。 > 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为 > org.apache.flink.kafka.shaded.org.apache.kafka.common.securi > ty.plain.PlainLoginModule > > 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。 > > Carmen Free <[hidden email]> 于2021年1月5日周二 下午5:09写道: > > > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的 > > > > 1、版本说明 > > flink版本:1.10.2 > > kafka版本:1.1.0 > > > > 2、kafka鉴权说明 > > 仅使用了sasl鉴权方式 > > 在kafka客户端有配置 kafka_server-jass.conf、 > > server.properties、producer.properties、consumer.properties > > > > 3、主要配置参数 > > sasl.mechanism=PLAIN > > security.protocol=SASL_PLAINTEXT > > sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule > > required username="xx" password="xx-secret"; > > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。 > > > > 4、用于flink SQL连接的jar包 > > flink-sql-connector-kafka_2.11-1.10.2.jar > > flink-jdbc_2.11-1.10.2.jar > > flink-csv-1.10.2-sql-jar.jar > > > > > > 5、我的思路 > > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka > > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。 > > > > 6、启动客户端 > > ./bin/sql-client.sh embedded -l sql_lib/ > > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包 > > > > > > 7、建表语句: > > create table test_hello ( > > name string > > ) with ( > > ... > > ... > > 'connector.properties.sasl.mechanism' = 'PLAIN', > > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT', > > 'connector.properties.sasl.jaas.config' = > > 'org.apache.kafka.comon.security.plain.PlainLoginModule required > > username="xx" password="xx-secret";', > > 'format.type' = 'csv' > > ); > > > > 建表没有问题,可以正常建表。 > > > > 查询表的时候,就会报错,select * from test_hello; > > 报错如下: > > could not execute sql statement. Reason: > > javax.security.auth.login.loginException: unable to find loginModule > class: > > org.apache.kafka.common.security.plain.PlainLoginModule > > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因? > > > > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。 > > > |
这个的话去看看KafkaConnector相关的参数,比较新的版本支持配置解析错误忽略。
Carmen Free <[hidden email]> 于2021年1月6日周三 上午10:58写道: > 感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。 > > 紧接着我这边出现了新的异常 > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: > No content to map due to end-of-input at [Source:UNKONWN; line: -1, column: > -1;] > > 这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。 > > 但是如果遇到了kafka消息为空的情况,这边不能处理吗? > > 赵一旦 <[hidden email]> 于2021年1月5日周二 下午9:18写道: > > > 我感觉还是jar的问题。如下尝试下,我懒得去试了。 > > 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为 > > org.apache.flink.kafka.shaded.org.apache.kafka.common.securi > > ty.plain.PlainLoginModule > > > > 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。 > > > > Carmen Free <[hidden email]> 于2021年1月5日周二 下午5:09写道: > > > > > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的 > > > > > > 1、版本说明 > > > flink版本:1.10.2 > > > kafka版本:1.1.0 > > > > > > 2、kafka鉴权说明 > > > 仅使用了sasl鉴权方式 > > > 在kafka客户端有配置 kafka_server-jass.conf、 > > > server.properties、producer.properties、consumer.properties > > > > > > 3、主要配置参数 > > > sasl.mechanism=PLAIN > > > security.protocol=SASL_PLAINTEXT > > > sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule > > > required username="xx" password="xx-secret"; > > > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。 > > > > > > 4、用于flink SQL连接的jar包 > > > flink-sql-connector-kafka_2.11-1.10.2.jar > > > flink-jdbc_2.11-1.10.2.jar > > > flink-csv-1.10.2-sql-jar.jar > > > > > > > > > 5、我的思路 > > > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka > > > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。 > > > > > > 6、启动客户端 > > > ./bin/sql-client.sh embedded -l sql_lib/ > > > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包 > > > > > > > > > 7、建表语句: > > > create table test_hello ( > > > name string > > > ) with ( > > > ... > > > ... > > > 'connector.properties.sasl.mechanism' = 'PLAIN', > > > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT', > > > 'connector.properties.sasl.jaas.config' = > > > 'org.apache.kafka.comon.security.plain.PlainLoginModule required > > > username="xx" password="xx-secret";', > > > 'format.type' = 'csv' > > > ); > > > > > > 建表没有问题,可以正常建表。 > > > > > > 查询表的时候,就会报错,select * from test_hello; > > > 报错如下: > > > could not execute sql statement. Reason: > > > javax.security.auth.login.loginException: unable to find loginModule > > class: > > > org.apache.kafka.common.security.plain.PlainLoginModule > > > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因? > > > > > > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。 > > > > > > |
好的,非常感谢。
赵一旦 <[hidden email]> 于2021年1月6日周三 下午1:08写道: > 这个的话去看看KafkaConnector相关的参数,比较新的版本支持配置解析错误忽略。 > > Carmen Free <[hidden email]> 于2021年1月6日周三 上午10:58写道: > > > 感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。 > > > > 紧接着我这边出现了新的异常 > > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: > > No content to map due to end-of-input at [Source:UNKONWN; line: -1, > column: > > -1;] > > > > 这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。 > > > > 但是如果遇到了kafka消息为空的情况,这边不能处理吗? > > > > 赵一旦 <[hidden email]> 于2021年1月5日周二 下午9:18写道: > > > > > 我感觉还是jar的问题。如下尝试下,我懒得去试了。 > > > 将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为 > > > org.apache.flink.kafka.shaded.org.apache.kafka.common.securi > > > ty.plain.PlainLoginModule > > > > > > 因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。 > > > > > > Carmen Free <[hidden email]> 于2021年1月5日周二 下午5:09写道: > > > > > > > flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的 > > > > > > > > 1、版本说明 > > > > flink版本:1.10.2 > > > > kafka版本:1.1.0 > > > > > > > > 2、kafka鉴权说明 > > > > 仅使用了sasl鉴权方式 > > > > 在kafka客户端有配置 kafka_server-jass.conf、 > > > > server.properties、producer.properties、consumer.properties > > > > > > > > 3、主要配置参数 > > > > sasl.mechanism=PLAIN > > > > security.protocol=SASL_PLAINTEXT > > > > > sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule > > > > required username="xx" password="xx-secret"; > > > > 当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。 > > > > > > > > 4、用于flink SQL连接的jar包 > > > > flink-sql-connector-kafka_2.11-1.10.2.jar > > > > flink-jdbc_2.11-1.10.2.jar > > > > flink-csv-1.10.2-sql-jar.jar > > > > > > > > > > > > 5、我的思路 > > > > 类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka > > > > table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。 > > > > > > > > 6、启动客户端 > > > > ./bin/sql-client.sh embedded -l sql_lib/ > > > > 其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包 > > > > > > > > > > > > 7、建表语句: > > > > create table test_hello ( > > > > name string > > > > ) with ( > > > > ... > > > > ... > > > > 'connector.properties.sasl.mechanism' = 'PLAIN', > > > > 'connector.properties.security.protocol' = 'SASL_PLAINTEXT', > > > > 'connector.properties.sasl.jaas.config' = > > > > 'org.apache.kafka.comon.security.plain.PlainLoginModule required > > > > username="xx" password="xx-secret";', > > > > 'format.type' = 'csv' > > > > ); > > > > > > > > 建表没有问题,可以正常建表。 > > > > > > > > 查询表的时候,就会报错,select * from test_hello; > > > > 报错如下: > > > > could not execute sql statement. Reason: > > > > javax.security.auth.login.loginException: unable to find loginModule > > > class: > > > > org.apache.kafka.common.security.plain.PlainLoginModule > > > > 但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因? > > > > > > > > kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。 > > > > > > > > > > |
Free forum by Nabble | Edit this page |