Flink es7 connector认证问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink es7 connector认证问题

李宇彬
各位好,
    请教一个问题
    我们生产环境的es7是有用户名密码认证的,使用如下代码启动后会报,这段代码调用了es rest client api,单独使用是没问题的,不过放到 flink 里就报错了
        org.elasticsearch.client.ResponseException: method [HEAD], host [xxx], URI [/], status line [HTTP/1.1 401 Unauthorized]
ParameterTool pt = ParameterTool.fromArgs(args);
String confFile = pt.get("confFile");
pt = ParameterTool.fromPropertiesFile(new File(confFile));
provider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(pt.get("es.user.name"), pt.get("es.user.password")));

esSinkBuilder.setRestClientFactory(
(RestClientBuilder restClientBuilder) ->
restClientBuilder
.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder.setSocketTimeout(180000)
.setConnectionRequestTimeout(10000)
)
.setHttpClientConfigCallback(httpClientBuilder ->
{
httpClientBuilder.disableAuthCaching(); //禁用 preemptive 身份验证
return httpClientBuilder.setDefaultCredentialsProvider(provider);
}
)
);
Reply | Threaded
Open this post in threaded view
|

Re: Flink es7 connector认证问题

Yangze Guo
Hi,

请问您有检查过pt.get("es.user.name"),
pt.get("es.user.password")这两个参数读出来是否都是正确的,另外更完整的错误栈方便提供下么?

Best,
Yangze Guo

On Mon, Jul 13, 2020 at 3:10 PM 李宇彬 <[hidden email]> wrote:

>
> 各位好,
>     请教一个问题
>     我们生产环境的es7是有用户名密码认证的,使用如下代码启动后会报,这段代码调用了es rest client api,单独使用是没问题的,不过放到 flink 里就报错了
>         org.elasticsearch.client.ResponseException: method [HEAD], host [xxx], URI [/], status line [HTTP/1.1 401 Unauthorized]
> ParameterTool pt = ParameterTool.fromArgs(args);
> String confFile = pt.get("confFile");
> pt = ParameterTool.fromPropertiesFile(new File(confFile));
> provider.setCredentials(AuthScope.ANY,
>                 new UsernamePasswordCredentials(pt.get("es.user.name"), pt.get("es.user.password")));
>
> esSinkBuilder.setRestClientFactory(
> (RestClientBuilder restClientBuilder) ->
> restClientBuilder
> .setRequestConfigCallback(requestConfigBuilder ->
> requestConfigBuilder.setSocketTimeout(180000)
> .setConnectionRequestTimeout(10000)
> )
> .setHttpClientConfigCallback(httpClientBuilder ->
> {
> httpClientBuilder.disableAuthCaching(); //禁用 preemptive 身份验证
> return httpClientBuilder.setDefaultCredentialsProvider(provider);
> }
> )
> );
Reply | Threaded
Open this post in threaded view
|

回复: Flink es7 connector认证问题

李宇彬
感谢,已找到问题原因,这个provider变量应该放到setHttpClientConfigCallback内部,之前是作为私有成员变量transient声明的,会导致taskmanager无法拿到认证信息
        String user = pt.get("es.user.name");
String password = pt.get("es.user.password");
esSinkBuilder.setRestClientFactory(
                (RestClientBuilder restClientBuilder) ->
                        restClientBuilder
                                .setHttpClientConfigCallback(httpClientBuilder ->
                                        {
                                            CredentialsProvider provider = new BasicCredentialsProvider();
provider.setCredentials(AuthScope.ANY,
                                                    new UsernamePasswordCredentials(user, password));
httpClientBuilder.disableAuthCaching(); //禁用 preemptive 身份验证
return httpClientBuilder.setDefaultCredentialsProvider(provider);
}
                                )
        );
在2020年7月13日 15:33,Yangze Guo<[hidden email]> 写道:
Hi,

请问您有检查过pt.get("es.user.name"),
pt.get("es.user.password")这两个参数读出来是否都是正确的,另外更完整的错误栈方便提供下么?

Best,
Yangze Guo

On Mon, Jul 13, 2020 at 3:10 PM 李宇彬 <[hidden email]> wrote:

各位好,
请教一个问题
我们生产环境的es7是有用户名密码认证的,使用如下代码启动后会报,这段代码调用了es rest client api,单独使用是没问题的,不过放到 flink 里就报错了
org.elasticsearch.client.ResponseException: method [HEAD], host [xxx], URI [/], status line [HTTP/1.1 401 Unauthorized]
ParameterTool pt = ParameterTool.fromArgs(args);
String confFile = pt.get("confFile");
pt = ParameterTool.fromPropertiesFile(new File(confFile));
provider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(pt.get("es.user.name"), pt.get("es.user.password")));

esSinkBuilder.setRestClientFactory(
(RestClientBuilder restClientBuilder) ->
restClientBuilder
.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder.setSocketTimeout(180000)
.setConnectionRequestTimeout(10000)
)
.setHttpClientConfigCallback(httpClientBuilder ->
{
httpClientBuilder.disableAuthCaching(); //禁用 preemptive 身份验证
return httpClientBuilder.setDefaultCredentialsProvider(provider);
}
)
);