pyflink-udf 问题反馈

classic Classic list List threaded Threaded
55 messages Options
123
Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Yangze Guo
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote:

>
> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>   user_id STRING,
>   user_name STRING
>   uv BIGINT,
>   pv BIGINT,
>   PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
>   'index' = 'users'
> );Connector Options
> | Option | Required | Default | Type | Description |
> |
> connector
> | required | (none) | String | Specify what connector to use, valid values are:
> elasticsearch-6: connect to Elasticsearch 6.x cluster
> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> |
> |
> hosts
> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. |
> |
> index
> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
> |
> document-type
> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
> |
> document-id.key-delimiter
> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> |
> failure-handler
> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
> fail: throws an exception if a request fails and thus causes a job failure.
> ignore: ignores failures and drops the request.
> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
> |
> |
> sink.flush-on-checkpoint
> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
> |
> sink.bulk-flush.max-actions
> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
> |
> sink.bulk-flush.max-size
> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
> |
> sink.bulk-flush.interval
> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
> |
> sink.bulk-flush.backoff.strategy
> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
> DISABLED: no retry performed, i.e. fail after the first request error.
> CONSTANT: wait for backoff delay between retries.
> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
> |
> |
> sink.bulk-flush.backoff.max-retries
> | optional | 8 | Integer | Maximum number of backoff retries. |
> |
> sink.bulk-flush.backoff.delay
> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
> |
> connection.max-retry-timeout
> | optional | (none) | Duration | Maximum timeout between retries. |
> |
> connection.path-prefix
> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
> |
> format
> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?

















在 2020-10-22 16:34:56,"Yangze Guo" <[hidden email]> 写道:

>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

whh_960101
In reply to this post by Yangze Guo
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?















在 2020-10-22 16:34:56,"Yangze Guo" <[hidden email]> 写道:

>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

whh_960101
In reply to this post by Yangze Guo
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?















在 2020-10-22 16:34:56,"Yangze Guo" <[hidden email]> 写道:

>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Xingbo Huang
Hi,

从源码编译安装把。可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink

Best,
Xingbo

whh_960101 <[hidden email]> 于2020年10月22日周四 下午6:47写道:

> 现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-22 16:34:56,"Yangze Guo" <[hidden email]> 写道:
> >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-18361
> >
> >Best,
> >Yangze Guo
> >
> >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote:
> >>
> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch
> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://
> ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
> TABLE myUserTable (
> >>   user_id STRING,
> >>   user_name STRING
> >>   uv BIGINT,
> >>   pv BIGINT,
> >>   PRIMARY KEY (user_id) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'elasticsearch-7',
> >>   'hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
> >>   'index' = 'users'
> >> );Connector Options
> >> | Option | Required | Default | Type | Description |
> >> |
> >> connector
> >> | required | (none) | String | Specify what connector to use, valid
> values are:
> >> elasticsearch-6: connect to Elasticsearch 6.x cluster
> >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> >> |
> >> |
> >> hosts
> >> | required | (none) | String | One or more Elasticsearch hosts to
> connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. |
> >> |
> >> index
> >> | required | (none) | String | Elasticsearch index for every record.
> Can be a static index (e.g. 'myIndex') or a dynamic index (e.g.
> 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for
> more details. |
> >> |
> >> document-type
> >> | required in 6.x | (none) | String | Elasticsearch document type. Not
> necessary anymore in elasticsearch-7. |
> >> |
> >> document-id.key-delimiter
> >> | optional | _ | String | Delimiter for composite keys ("_" by
> default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> >> |
> >> failure-handler
> >> | optional | fail | String | Failure handling strategy in case a
> request to Elasticsearch fails. Valid strategies are:
> >> fail: throws an exception if a request fails and thus causes a job
> failure.
> >> ignore: ignores failures and drops the request.
> >> retry_rejected: re-adds requests that have failed due to queue capacity
> saturation.
> >> custom class name: for failure handling with a
> ActionRequestFailureHandler subclass.
> >> |
> >> |
> >> sink.flush-on-checkpoint
> >> | optional | true | Boolean | Flush on checkpoint or not. When
> disabled, a sink will not wait for all pending action requests to be
> acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide
> any strong guarantees for at-least-once delivery of action requests. |
> >> |
> >> sink.bulk-flush.max-actions
> >> | optional | 1000 | Integer | Maximum number of buffered actions per
> bulk request. Can be set to '0' to disable it. |
> >> |
> >> sink.bulk-flush.max-size
> >> | optional | 2mb | MemorySize | Maximum size in memory of buffered
> actions per bulk request. Must be in MB granularity. Can be set to '0' to
> disable it. |
> >> |
> >> sink.bulk-flush.interval
> >> | optional | 1s | Duration | The interval to flush buffered actions.
> Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and
> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set
> allowing for complete async processing of buffered actions. |
> >> |
> >> sink.bulk-flush.backoff.strategy
> >> | optional | DISABLED | String | Specify how to perform retries if any
> flush actions failed due to a temporary request error. Valid strategies are:
> >> DISABLED: no retry performed, i.e. fail after the first request error.
> >> CONSTANT: wait for backoff delay between retries.
> >> EXPONENTIAL: initially wait for backoff delay and increase
> exponentially between retries.
> >> |
> >> |
> >> sink.bulk-flush.backoff.max-retries
> >> | optional | 8 | Integer | Maximum number of backoff retries. |
> >> |
> >> sink.bulk-flush.backoff.delay
> >> | optional | 50ms | Duration | Delay between each backoff attempt. For
> CONSTANT backoff, this is simply the delay between each retry. For
> EXPONENTIAL backoff, this is the initial base delay. |
> >> |
> >> connection.max-retry-timeout
> >> | optional | (none) | Duration | Maximum timeout between retries. |
> >> |
> >> connection.path-prefix
> >> | optional | (none) | String | Prefix string to be added to every REST
> communication, e.g., '/v1' |
> >> |
> >> format
> >> | optional | json | String | Elasticsearch connector supports to
> specify a format. The format must produce a valid json document. By default
> uses built-in 'json' format. Please refer to JSON Format page for more
> details. |
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

pyflink和flink版本的兼容性问题

whh_960101
In reply to this post by Dian Fu
Hi,各位大佬,     想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink 1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12 升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗
Reply | Threaded
Open this post in threaded view
|

Re: pyflink和flink版本的兼容性问题

zhisheng
估计可能会有问题,很多变动

whh_960101 <[hidden email]> 于2020年10月23日周五 上午11:41写道:

> Hi,各位大佬,
>  想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink
> 1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12
> 升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗
Reply | Threaded
Open this post in threaded view
|

提交pyflink作业到YARN集群报错

whh_960101
In reply to this post by Dian Fu
Hi,各位大佬,     想请教一下,我使用flink run -m yarn-cluster -p 4 -py myjob.py,报错java.lang.RuntimeException: Found 0 flink-python jar.
 at org.apache.flink.client.program.PackagedProgram.getJobJarAndDependencies(PackagedProgram.java:263)
 at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:140)
 at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:68)
 at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:590)
 at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:758)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:250)
 at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:968)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$12(CliFrontend.java:1042)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
 at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1042)

没有发现flink-python jar,这个jar在哪里可以找见,site-packages/pyflink里面,还是自己构建,目前本机已经安装pyflink 1.11.1
Reply | Threaded
Open this post in threaded view
|

Re: 提交pyflink作业到YARN集群报错

Dian Fu
看一下目录site-packages/pyflink/opt,里面是否有名为flink-python的jar

> 在 2020年10月26日,下午4:38,whh_960101 <[hidden email]> 写道:
>
> Hi,各位大佬,     想请教一下,我使用flink run -m yarn-cluster -p 4 -py myjob.py,报错java.lang.RuntimeException: Found 0 flink-python jar.
> at org.apache.flink.client.program.PackagedProgram.getJobJarAndDependencies(PackagedProgram.java:263)
> at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:140)
> at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:68)
> at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:590)
> at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:758)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:250)
> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:968)
> at org.apache.flink.client.cli.CliFrontend.lambda$main$12(CliFrontend.java:1042)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1042)
>
> 没有发现flink-python jar,这个jar在哪里可以找见,site-packages/pyflink里面,还是自己构建,目前本机已经安装pyflink 1.11.1

Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

whh_960101
In reply to this post by Yangze Guo
有没有其他方式可以写入username和password,我了解java flink访问elasticsearch是有username和password入口的,pyflink是调用java来执行,应该是有这个入口的吧,有没有大佬可以指点一下,谢谢啦!







在 2020-10-22 16:34:56,"Yangze Guo" <[hidden email]> 写道:

>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>





 
Reply | Threaded
Open this post in threaded view
|

Re: Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Xingbo Huang
Hi,

Pyflink 1.11还不支持datastream,1.12才有

Best,
Xingbo

whh_960101 <[hidden email]> 于2020年10月27日周二 下午2:58写道:

> 有没有其他方式可以写入username和password,我了解java
> flink访问elasticsearch是有username和password入口的,pyflink是调用java来执行,应该是有这个入口的吧,有没有大佬可以指点一下,谢谢啦!
>
>
>
>
>
>
>
> 在 2020-10-22 16:34:56,"Yangze Guo" <[hidden email]> 写道:
> >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-18361
> >
> >Best,
> >Yangze Guo
> >
> >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote:
> >>
> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch
> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://
> ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
> TABLE myUserTable (
> >>   user_id STRING,
> >>   user_name STRING
> >>   uv BIGINT,
> >>   pv BIGINT,
> >>   PRIMARY KEY (user_id) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'elasticsearch-7',
> >>   'hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
> >>   'index' = 'users'
> >> );Connector Options
> >> | Option | Required | Default | Type | Description |
> >> |
> >> connector
> >> | required | (none) | String | Specify what connector to use, valid
> values are:
> >> elasticsearch-6: connect to Elasticsearch 6.x cluster
> >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> >> |
> >> |
> >> hosts
> >> | required | (none) | String | One or more Elasticsearch hosts to
> connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. |
> >> |
> >> index
> >> | required | (none) | String | Elasticsearch index for every record.
> Can be a static index (e.g. 'myIndex') or a dynamic index (e.g.
> 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for
> more details. |
> >> |
> >> document-type
> >> | required in 6.x | (none) | String | Elasticsearch document type. Not
> necessary anymore in elasticsearch-7. |
> >> |
> >> document-id.key-delimiter
> >> | optional | _ | String | Delimiter for composite keys ("_" by
> default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> >> |
> >> failure-handler
> >> | optional | fail | String | Failure handling strategy in case a
> request to Elasticsearch fails. Valid strategies are:
> >> fail: throws an exception if a request fails and thus causes a job
> failure.
> >> ignore: ignores failures and drops the request.
> >> retry_rejected: re-adds requests that have failed due to queue capacity
> saturation.
> >> custom class name: for failure handling with a
> ActionRequestFailureHandler subclass.
> >> |
> >> |
> >> sink.flush-on-checkpoint
> >> | optional | true | Boolean | Flush on checkpoint or not. When
> disabled, a sink will not wait for all pending action requests to be
> acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide
> any strong guarantees for at-least-once delivery of action requests. |
> >> |
> >> sink.bulk-flush.max-actions
> >> | optional | 1000 | Integer | Maximum number of buffered actions per
> bulk request. Can be set to '0' to disable it. |
> >> |
> >> sink.bulk-flush.max-size
> >> | optional | 2mb | MemorySize | Maximum size in memory of buffered
> actions per bulk request. Must be in MB granularity. Can be set to '0' to
> disable it. |
> >> |
> >> sink.bulk-flush.interval
> >> | optional | 1s | Duration | The interval to flush buffered actions.
> Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and
> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set
> allowing for complete async processing of buffered actions. |
> >> |
> >> sink.bulk-flush.backoff.strategy
> >> | optional | DISABLED | String | Specify how to perform retries if any
> flush actions failed due to a temporary request error. Valid strategies are:
> >> DISABLED: no retry performed, i.e. fail after the first request error.
> >> CONSTANT: wait for backoff delay between retries.
> >> EXPONENTIAL: initially wait for backoff delay and increase
> exponentially between retries.
> >> |
> >> |
> >> sink.bulk-flush.backoff.max-retries
> >> | optional | 8 | Integer | Maximum number of backoff retries. |
> >> |
> >> sink.bulk-flush.backoff.delay
> >> | optional | 50ms | Duration | Delay between each backoff attempt. For
> CONSTANT backoff, this is simply the delay between each retry. For
> EXPONENTIAL backoff, this is the initial base delay. |
> >> |
> >> connection.max-retry-timeout
> >> | optional | (none) | Duration | Maximum timeout between retries. |
> >> |
> >> connection.path-prefix
> >> | optional | (none) | String | Prefix string to be added to every REST
> communication, e.g., '/v1' |
> >> |
> >> format
> >> | optional | json | String | Elasticsearch connector supports to
> specify a format. The format must produce a valid json document. By default
> uses built-in 'json' format. Please refer to JSON Format page for more
> details. |
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

pyflink 1.11 运行pyflink作业时报错

whh_960101
In reply to this post by Dian Fu
Hi,各位大佬,    pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2]  No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 ,该怎么解决



 
Reply | Threaded
Open this post in threaded view
|

Re: pyflink 1.11 运行pyflink作业时报错

Dian Fu
看起来是的,找不到JAVA_HOME,显式export一下JAVA_HOME试试?

> 在 2020年11月13日,下午5:06,whh_960101 <[hidden email]> 写道:
>
> Hi,各位大佬,    pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2]  No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 ,该怎么解决
>
>
>

Reply | Threaded
Open this post in threaded view
|

pyflink 1.11 运行pyflink作业时报错

whh_960101
In reply to this post by Dian Fu
Hi,各位大佬,    pyflink 1.11 将pyflink作业提交到yarn集群运行,作业在将处理后的main_table insert到sink端的kafka时报错File "/home/cdh272705/poc/T24_parse.py", line 179, in from_kafka_to_oracle_demo
    main_table.execute_insert("sink")#.get_job_client().get_job_execution_result().result()
  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/table/table.py", line 783, in execute_insert
    return TableResult(self._j_table.executeInsert(table_path, overwrite))
  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'Failed to execute sql'
org.apache.flink.client.program.ProgramAbortException
 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
 at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
 at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)是缺少jar包吗,我在opt、lib目录下都放了flink-sql-client_2.11-1.11.1.jar,'Failed to execute sql 是什么原因



 





 
Reply | Threaded
Open this post in threaded view
|

Re: pyflink 1.11 运行pyflink作业时报错

Wei Zhong
Hi 你好,

只看目前的报错看不出问题来,请问能贴出出错部分的job源码吗?

> 在 2020年11月17日,16:58,whh_960101 <[hidden email]> 写道:
>
> Hi,各位大佬,    pyflink 1.11 将pyflink作业提交到yarn集群运行,作业在将处理后的main_table insert到sink端的kafka时报错File "/home/cdh272705/poc/T24_parse.py", line 179, in from_kafka_to_oracle_demo
>    main_table.execute_insert("sink")#.get_job_client().get_job_execution_result().result()
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/table/table.py", line 783, in execute_insert
>    return TableResult(self._j_table.executeInsert(table_path, overwrite))
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
>    answer, self.gateway_client, self.target_id, self.name)
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 154, in deco
>    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: 'Failed to execute sql'
> org.apache.flink.client.program.ProgramAbortException
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
> at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)是缺少jar包吗,我在opt、lib目录下都放了flink-sql-client_2.11-1.11.1.jar,'Failed to execute sql 是什么原因
>
>
>
>
>
>
>
>
>

123