1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
[1] https://issues.apache.org/jira/browse/FLINK-18361 Best, Yangze Guo On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote: > > Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable ( > user_id STRING, > user_name STRING > uv BIGINT, > pv BIGINT, > PRIMARY KEY (user_id) NOT ENFORCED > ) WITH ( > 'connector' = 'elasticsearch-7', > 'hosts' = '<a href="http://localhost:9200'">http://localhost:9200', > 'index' = 'users' > );Connector Options > | Option | Required | Default | Type | Description | > | > connector > | required | (none) | String | Specify what connector to use, valid values are: > elasticsearch-6: connect to Elasticsearch 6.x cluster > elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster > | > | > hosts > | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. | > | > index > | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. | > | > document-type > | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. | > | > document-id.key-delimiter > | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." | > | > failure-handler > | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are: > fail: throws an exception if a request fails and thus causes a job failure. > ignore: ignores failures and drops the request. > retry_rejected: re-adds requests that have failed due to queue capacity saturation. > custom class name: for failure handling with a ActionRequestFailureHandler subclass. > | > | > sink.flush-on-checkpoint > | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. | > | > sink.bulk-flush.max-actions > | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. | > | > sink.bulk-flush.max-size > | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. | > | > sink.bulk-flush.interval > | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | > | > sink.bulk-flush.backoff.strategy > | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are: > DISABLED: no retry performed, i.e. fail after the first request error. > CONSTANT: wait for backoff delay between retries. > EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries. > | > | > sink.bulk-flush.backoff.max-retries > | optional | 8 | Integer | Maximum number of backoff retries. | > | > sink.bulk-flush.backoff.delay > | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. | > | > connection.max-retry-timeout > | optional | (none) | Duration | Maximum timeout between retries. | > | > connection.path-prefix > | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' | > | > format > | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. | > > > > > > > |
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?
在 2020-10-22 16:34:56,"Yangze Guo" <[hidden email]> 写道: >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > >Best, >Yangze Guo > >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote: >> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable ( >> user_id STRING, >> user_name STRING >> uv BIGINT, >> pv BIGINT, >> PRIMARY KEY (user_id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'elasticsearch-7', >> 'hosts' = '<a href="http://localhost:9200'">http://localhost:9200', >> 'index' = 'users' >> );Connector Options >> | Option | Required | Default | Type | Description | >> | >> connector >> | required | (none) | String | Specify what connector to use, valid values are: >> elasticsearch-6: connect to Elasticsearch 6.x cluster >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster >> | >> | >> hosts >> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. | >> | >> index >> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. | >> | >> document-type >> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. | >> | >> document-id.key-delimiter >> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." | >> | >> failure-handler >> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are: >> fail: throws an exception if a request fails and thus causes a job failure. >> ignore: ignores failures and drops the request. >> retry_rejected: re-adds requests that have failed due to queue capacity saturation. >> custom class name: for failure handling with a ActionRequestFailureHandler subclass. >> | >> | >> sink.flush-on-checkpoint >> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. | >> | >> sink.bulk-flush.max-actions >> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. | >> | >> sink.bulk-flush.max-size >> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. | >> | >> sink.bulk-flush.interval >> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | >> | >> sink.bulk-flush.backoff.strategy >> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are: >> DISABLED: no retry performed, i.e. fail after the first request error. >> CONSTANT: wait for backoff delay between retries. >> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries. >> | >> | >> sink.bulk-flush.backoff.max-retries >> | optional | 8 | Integer | Maximum number of backoff retries. | >> | >> sink.bulk-flush.backoff.delay >> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. | >> | >> connection.max-retry-timeout >> | optional | (none) | Duration | Maximum timeout between retries. | >> | >> connection.path-prefix >> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' | >> | >> format >> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. | >> >> >> >> >> >> >> |
In reply to this post by Yangze Guo
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?
在 2020-10-22 16:34:56,"Yangze Guo" <[hidden email]> 写道: >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > >Best, >Yangze Guo > >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote: >> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable ( >> user_id STRING, >> user_name STRING >> uv BIGINT, >> pv BIGINT, >> PRIMARY KEY (user_id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'elasticsearch-7', >> 'hosts' = '<a href="http://localhost:9200'">http://localhost:9200', >> 'index' = 'users' >> );Connector Options >> | Option | Required | Default | Type | Description | >> | >> connector >> | required | (none) | String | Specify what connector to use, valid values are: >> elasticsearch-6: connect to Elasticsearch 6.x cluster >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster >> | >> | >> hosts >> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. | >> | >> index >> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. | >> | >> document-type >> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. | >> | >> document-id.key-delimiter >> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." | >> | >> failure-handler >> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are: >> fail: throws an exception if a request fails and thus causes a job failure. >> ignore: ignores failures and drops the request. >> retry_rejected: re-adds requests that have failed due to queue capacity saturation. >> custom class name: for failure handling with a ActionRequestFailureHandler subclass. >> | >> | >> sink.flush-on-checkpoint >> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. | >> | >> sink.bulk-flush.max-actions >> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. | >> | >> sink.bulk-flush.max-size >> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. | >> | >> sink.bulk-flush.interval >> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | >> | >> sink.bulk-flush.backoff.strategy >> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are: >> DISABLED: no retry performed, i.e. fail after the first request error. >> CONSTANT: wait for backoff delay between retries. >> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries. >> | >> | >> sink.bulk-flush.backoff.max-retries >> | optional | 8 | Integer | Maximum number of backoff retries. | >> | >> sink.bulk-flush.backoff.delay >> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. | >> | >> connection.max-retry-timeout >> | optional | (none) | Duration | Maximum timeout between retries. | >> | >> connection.path-prefix >> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' | >> | >> format >> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. | >> >> >> >> >> >> >> |
In reply to this post by Yangze Guo
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?
在 2020-10-22 16:34:56,"Yangze Guo" <[hidden email]> 写道: >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > >Best, >Yangze Guo > >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote: >> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable ( >> user_id STRING, >> user_name STRING >> uv BIGINT, >> pv BIGINT, >> PRIMARY KEY (user_id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'elasticsearch-7', >> 'hosts' = '<a href="http://localhost:9200'">http://localhost:9200', >> 'index' = 'users' >> );Connector Options >> | Option | Required | Default | Type | Description | >> | >> connector >> | required | (none) | String | Specify what connector to use, valid values are: >> elasticsearch-6: connect to Elasticsearch 6.x cluster >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster >> | >> | >> hosts >> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. | >> | >> index >> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. | >> | >> document-type >> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. | >> | >> document-id.key-delimiter >> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." | >> | >> failure-handler >> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are: >> fail: throws an exception if a request fails and thus causes a job failure. >> ignore: ignores failures and drops the request. >> retry_rejected: re-adds requests that have failed due to queue capacity saturation. >> custom class name: for failure handling with a ActionRequestFailureHandler subclass. >> | >> | >> sink.flush-on-checkpoint >> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. | >> | >> sink.bulk-flush.max-actions >> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. | >> | >> sink.bulk-flush.max-size >> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. | >> | >> sink.bulk-flush.interval >> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | >> | >> sink.bulk-flush.backoff.strategy >> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are: >> DISABLED: no retry performed, i.e. fail after the first request error. >> CONSTANT: wait for backoff delay between retries. >> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries. >> | >> | >> sink.bulk-flush.backoff.max-retries >> | optional | 8 | Integer | Maximum number of backoff retries. | >> | >> sink.bulk-flush.backoff.delay >> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. | >> | >> connection.max-retry-timeout >> | optional | (none) | Duration | Maximum timeout between retries. | >> | >> connection.path-prefix >> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' | >> | >> format >> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. | >> >> >> >> >> >> >> |
Hi,
从源码编译安装把。可以参考文档[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink Best, Xingbo whh_960101 <[hidden email]> 于2020年10月22日周四 下午6:47写道: > 现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗? > > > > > > > > > > > > > > > > 在 2020-10-22 16:34:56,"Yangze Guo" <[hidden email]> 写道: > >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > > > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > > > >Best, > >Yangze Guo > > > >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote: > >> > >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch > connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps:// > ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE > TABLE myUserTable ( > >> user_id STRING, > >> user_name STRING > >> uv BIGINT, > >> pv BIGINT, > >> PRIMARY KEY (user_id) NOT ENFORCED > >> ) WITH ( > >> 'connector' = 'elasticsearch-7', > >> 'hosts' = '<a href="http://localhost:9200'">http://localhost:9200', > >> 'index' = 'users' > >> );Connector Options > >> | Option | Required | Default | Type | Description | > >> | > >> connector > >> | required | (none) | String | Specify what connector to use, valid > values are: > >> elasticsearch-6: connect to Elasticsearch 6.x cluster > >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster > >> | > >> | > >> hosts > >> | required | (none) | String | One or more Elasticsearch hosts to > connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. | > >> | > >> index > >> | required | (none) | String | Elasticsearch index for every record. > Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. > 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for > more details. | > >> | > >> document-type > >> | required in 6.x | (none) | String | Elasticsearch document type. Not > necessary anymore in elasticsearch-7. | > >> | > >> document-id.key-delimiter > >> | optional | _ | String | Delimiter for composite keys ("_" by > default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." | > >> | > >> failure-handler > >> | optional | fail | String | Failure handling strategy in case a > request to Elasticsearch fails. Valid strategies are: > >> fail: throws an exception if a request fails and thus causes a job > failure. > >> ignore: ignores failures and drops the request. > >> retry_rejected: re-adds requests that have failed due to queue capacity > saturation. > >> custom class name: for failure handling with a > ActionRequestFailureHandler subclass. > >> | > >> | > >> sink.flush-on-checkpoint > >> | optional | true | Boolean | Flush on checkpoint or not. When > disabled, a sink will not wait for all pending action requests to be > acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide > any strong guarantees for at-least-once delivery of action requests. | > >> | > >> sink.bulk-flush.max-actions > >> | optional | 1000 | Integer | Maximum number of buffered actions per > bulk request. Can be set to '0' to disable it. | > >> | > >> sink.bulk-flush.max-size > >> | optional | 2mb | MemorySize | Maximum size in memory of buffered > actions per bulk request. Must be in MB granularity. Can be set to '0' to > disable it. | > >> | > >> sink.bulk-flush.interval > >> | optional | 1s | Duration | The interval to flush buffered actions. > Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and > 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set > allowing for complete async processing of buffered actions. | > >> | > >> sink.bulk-flush.backoff.strategy > >> | optional | DISABLED | String | Specify how to perform retries if any > flush actions failed due to a temporary request error. Valid strategies are: > >> DISABLED: no retry performed, i.e. fail after the first request error. > >> CONSTANT: wait for backoff delay between retries. > >> EXPONENTIAL: initially wait for backoff delay and increase > exponentially between retries. > >> | > >> | > >> sink.bulk-flush.backoff.max-retries > >> | optional | 8 | Integer | Maximum number of backoff retries. | > >> | > >> sink.bulk-flush.backoff.delay > >> | optional | 50ms | Duration | Delay between each backoff attempt. For > CONSTANT backoff, this is simply the delay between each retry. For > EXPONENTIAL backoff, this is the initial base delay. | > >> | > >> connection.max-retry-timeout > >> | optional | (none) | Duration | Maximum timeout between retries. | > >> | > >> connection.path-prefix > >> | optional | (none) | String | Prefix string to be added to every REST > communication, e.g., '/v1' | > >> | > >> format > >> | optional | json | String | Elasticsearch connector supports to > specify a format. The format must produce a valid json document. By default > uses built-in 'json' format. Please refer to JSON Format page for more > details. | > >> > >> > >> > >> > >> > >> > >> > |
In reply to this post by Dian Fu
Hi,各位大佬, 想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink 1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12 升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗
|
估计可能会有问题,很多变动
whh_960101 <[hidden email]> 于2020年10月23日周五 上午11:41写道: > Hi,各位大佬, > 想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink > 1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12 > 升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗 |
In reply to this post by Dian Fu
Hi,各位大佬, 想请教一下,我使用flink run -m yarn-cluster -p 4 -py myjob.py,报错java.lang.RuntimeException: Found 0 flink-python jar.
at org.apache.flink.client.program.PackagedProgram.getJobJarAndDependencies(PackagedProgram.java:263) at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:140) at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:68) at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:590) at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:758) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:250) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:968) at org.apache.flink.client.cli.CliFrontend.lambda$main$12(CliFrontend.java:1042) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1042) 没有发现flink-python jar,这个jar在哪里可以找见,site-packages/pyflink里面,还是自己构建,目前本机已经安装pyflink 1.11.1 |
看一下目录site-packages/pyflink/opt,里面是否有名为flink-python的jar
> 在 2020年10月26日,下午4:38,whh_960101 <[hidden email]> 写道: > > Hi,各位大佬, 想请教一下,我使用flink run -m yarn-cluster -p 4 -py myjob.py,报错java.lang.RuntimeException: Found 0 flink-python jar. > at org.apache.flink.client.program.PackagedProgram.getJobJarAndDependencies(PackagedProgram.java:263) > at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:140) > at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:68) > at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:590) > at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:758) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:250) > at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:968) > at org.apache.flink.client.cli.CliFrontend.lambda$main$12(CliFrontend.java:1042) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876) > at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1042) > > 没有发现flink-python jar,这个jar在哪里可以找见,site-packages/pyflink里面,还是自己构建,目前本机已经安装pyflink 1.11.1 |
In reply to this post by Yangze Guo
有没有其他方式可以写入username和password,我了解java flink访问elasticsearch是有username和password入口的,pyflink是调用java来执行,应该是有这个入口的吧,有没有大佬可以指点一下,谢谢啦!
在 2020-10-22 16:34:56,"Yangze Guo" <[hidden email]> 写道: >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > >Best, >Yangze Guo > >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote: >> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable ( >> user_id STRING, >> user_name STRING >> uv BIGINT, >> pv BIGINT, >> PRIMARY KEY (user_id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'elasticsearch-7', >> 'hosts' = '<a href="http://localhost:9200'">http://localhost:9200', >> 'index' = 'users' >> );Connector Options >> | Option | Required | Default | Type | Description | >> | >> connector >> | required | (none) | String | Specify what connector to use, valid values are: >> elasticsearch-6: connect to Elasticsearch 6.x cluster >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster >> | >> | >> hosts >> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. | >> | >> index >> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. | >> | >> document-type >> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. | >> | >> document-id.key-delimiter >> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." | >> | >> failure-handler >> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are: >> fail: throws an exception if a request fails and thus causes a job failure. >> ignore: ignores failures and drops the request. >> retry_rejected: re-adds requests that have failed due to queue capacity saturation. >> custom class name: for failure handling with a ActionRequestFailureHandler subclass. >> | >> | >> sink.flush-on-checkpoint >> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. | >> | >> sink.bulk-flush.max-actions >> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. | >> | >> sink.bulk-flush.max-size >> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. | >> | >> sink.bulk-flush.interval >> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | >> | >> sink.bulk-flush.backoff.strategy >> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are: >> DISABLED: no retry performed, i.e. fail after the first request error. >> CONSTANT: wait for backoff delay between retries. >> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries. >> | >> | >> sink.bulk-flush.backoff.max-retries >> | optional | 8 | Integer | Maximum number of backoff retries. | >> | >> sink.bulk-flush.backoff.delay >> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. | >> | >> connection.max-retry-timeout >> | optional | (none) | Duration | Maximum timeout between retries. | >> | >> connection.path-prefix >> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' | >> | >> format >> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. | >> >> >> >> >> >> >> |
Hi,
Pyflink 1.11还不支持datastream,1.12才有 Best, Xingbo whh_960101 <[hidden email]> 于2020年10月27日周二 下午2:58写道: > 有没有其他方式可以写入username和password,我了解java > flink访问elasticsearch是有username和password入口的,pyflink是调用java来执行,应该是有这个入口的吧,有没有大佬可以指点一下,谢谢啦! > > > > > > > > 在 2020-10-22 16:34:56,"Yangze Guo" <[hidden email]> 写道: > >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > > > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > > > >Best, > >Yangze Guo > > > >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <[hidden email]> wrote: > >> > >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch > connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps:// > ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE > TABLE myUserTable ( > >> user_id STRING, > >> user_name STRING > >> uv BIGINT, > >> pv BIGINT, > >> PRIMARY KEY (user_id) NOT ENFORCED > >> ) WITH ( > >> 'connector' = 'elasticsearch-7', > >> 'hosts' = '<a href="http://localhost:9200'">http://localhost:9200', > >> 'index' = 'users' > >> );Connector Options > >> | Option | Required | Default | Type | Description | > >> | > >> connector > >> | required | (none) | String | Specify what connector to use, valid > values are: > >> elasticsearch-6: connect to Elasticsearch 6.x cluster > >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster > >> | > >> | > >> hosts > >> | required | (none) | String | One or more Elasticsearch hosts to > connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. | > >> | > >> index > >> | required | (none) | String | Elasticsearch index for every record. > Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. > 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for > more details. | > >> | > >> document-type > >> | required in 6.x | (none) | String | Elasticsearch document type. Not > necessary anymore in elasticsearch-7. | > >> | > >> document-id.key-delimiter > >> | optional | _ | String | Delimiter for composite keys ("_" by > default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." | > >> | > >> failure-handler > >> | optional | fail | String | Failure handling strategy in case a > request to Elasticsearch fails. Valid strategies are: > >> fail: throws an exception if a request fails and thus causes a job > failure. > >> ignore: ignores failures and drops the request. > >> retry_rejected: re-adds requests that have failed due to queue capacity > saturation. > >> custom class name: for failure handling with a > ActionRequestFailureHandler subclass. > >> | > >> | > >> sink.flush-on-checkpoint > >> | optional | true | Boolean | Flush on checkpoint or not. When > disabled, a sink will not wait for all pending action requests to be > acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide > any strong guarantees for at-least-once delivery of action requests. | > >> | > >> sink.bulk-flush.max-actions > >> | optional | 1000 | Integer | Maximum number of buffered actions per > bulk request. Can be set to '0' to disable it. | > >> | > >> sink.bulk-flush.max-size > >> | optional | 2mb | MemorySize | Maximum size in memory of buffered > actions per bulk request. Must be in MB granularity. Can be set to '0' to > disable it. | > >> | > >> sink.bulk-flush.interval > >> | optional | 1s | Duration | The interval to flush buffered actions. > Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and > 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set > allowing for complete async processing of buffered actions. | > >> | > >> sink.bulk-flush.backoff.strategy > >> | optional | DISABLED | String | Specify how to perform retries if any > flush actions failed due to a temporary request error. Valid strategies are: > >> DISABLED: no retry performed, i.e. fail after the first request error. > >> CONSTANT: wait for backoff delay between retries. > >> EXPONENTIAL: initially wait for backoff delay and increase > exponentially between retries. > >> | > >> | > >> sink.bulk-flush.backoff.max-retries > >> | optional | 8 | Integer | Maximum number of backoff retries. | > >> | > >> sink.bulk-flush.backoff.delay > >> | optional | 50ms | Duration | Delay between each backoff attempt. For > CONSTANT backoff, this is simply the delay between each retry. For > EXPONENTIAL backoff, this is the initial base delay. | > >> | > >> connection.max-retry-timeout > >> | optional | (none) | Duration | Maximum timeout between retries. | > >> | > >> connection.path-prefix > >> | optional | (none) | String | Prefix string to be added to every REST > communication, e.g., '/v1' | > >> | > >> format > >> | optional | json | String | Elasticsearch connector supports to > specify a format. The format must produce a valid json document. By default > uses built-in 'json' format. Please refer to JSON Format page for more > details. | > >> > >> > >> > >> > >> > >> > >> > > > > > > |
In reply to this post by Dian Fu
Hi,各位大佬, pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2] No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 ,该怎么解决
|
看起来是的,找不到JAVA_HOME,显式export一下JAVA_HOME试试?
> 在 2020年11月13日,下午5:06,whh_960101 <[hidden email]> 写道: > > Hi,各位大佬, pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2] No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 ,该怎么解决 > > > |
In reply to this post by Dian Fu
Hi,各位大佬, pyflink 1.11 将pyflink作业提交到yarn集群运行,作业在将处理后的main_table insert到sink端的kafka时报错File "/home/cdh272705/poc/T24_parse.py", line 179, in from_kafka_to_oracle_demo
main_table.execute_insert("sink")#.get_job_client().get_job_execution_result().result() File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/table/table.py", line 783, in execute_insert return TableResult(self._j_table.executeInsert(table_path, overwrite)) File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 154, in deco raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) pyflink.util.exceptions.TableException: 'Failed to execute sql' org.apache.flink.client.program.ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)是缺少jar包吗,我在opt、lib目录下都放了flink-sql-client_2.11-1.11.1.jar,'Failed to execute sql 是什么原因 |
Hi 你好,
只看目前的报错看不出问题来,请问能贴出出错部分的job源码吗? > 在 2020年11月17日,16:58,whh_960101 <[hidden email]> 写道: > > Hi,各位大佬, pyflink 1.11 将pyflink作业提交到yarn集群运行,作业在将处理后的main_table insert到sink端的kafka时报错File "/home/cdh272705/poc/T24_parse.py", line 179, in from_kafka_to_oracle_demo > main_table.execute_insert("sink")#.get_job_client().get_job_execution_result().result() > File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/table/table.py", line 783, in execute_insert > return TableResult(self._j_table.executeInsert(table_path, overwrite)) > File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 154, in deco > raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) > pyflink.util.exceptions.TableException: 'Failed to execute sql' > org.apache.flink.client.program.ProgramAbortException > at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876) > at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)是缺少jar包吗,我在opt、lib目录下都放了flink-sql-client_2.11-1.11.1.jar,'Failed to execute sql 是什么原因 > > > > > > > > > |
Free forum by Nabble | Edit this page |