yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

刘海
Hi  Dear All,
   请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:


#==============================================================================
# Common 通用设置选项
#==============================================================================
jobmanager.rpc.address: cdh1


# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m


# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m


# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme


#==============================================================================
# High Availability
#==============================================================================
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper stores
# the small ground truth for checkpoint and leader election, this location stores
# the larger objects, like persisted dataflow graphs.
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...)
high-availability.storageDir: hdfs:///flink/ha/
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
high-availability.zookeeper.client.acl: open
high-availability.zookeeper.path.root: /flink
#==============================================================================
# Fault tolerance、checkpointing  and  state backends 容错能力、检查点和状态后端
#==============================================================================
state.backend: rocksdb
#选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异,
#而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小,
#而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项
state.backend.incremental: true
#是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复
state.backend.local-recovery: true
#RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB”
state.backend.rocksdb.block.cache-size: 268435456
#这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB
state.backend.rocksdb.timer-service.factory: HEAP
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录
state.checkpoints.dir: hdfs:///flink/flink-checkpoints
# Default target directory for savepoints, optional.
#保存点的默认目录。由状态后端用于将保存点写入文件系统
state.savepoints.dir: hdfs:///flink/flink-savepoints
# 要保留的最大已完成检查点数
state.checkpoints.num-retained: 3
#此选项指定作业计算如何从任务失败中恢复。可接受的值为:
#'full':重新启动所有任务以恢复作业。
#“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。
jobmanager.execution.failover-strategy: region
#==============================================================================
# Advanced
#==============================================================================


# Override the directories for temporary files. If not specified, the
# system-specific Java temporary directory (java.io.tmpdir property) is taken.
#
# For framework setups on Yarn or Mesos, Flink will automatically pick up the
# containers' temp directories without any need for configuration.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# io.tmp.dirs: /tmp


# The classloading resolve order. Possible values are 'child-first' (Flink's default)
# and 'parent-first' (Java's default).
#
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
#
# classloader.resolve-order: child-first


# The amount of memory going to the network stack. These numbers usually need
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, the default max is 1GB.
#
# taskmanager.memory.network.fraction: 0.1
# taskmanager.memory.network.min: 64mb
# taskmanager.memory.network.max: 1gb




#==============================================================================
# YARN Configuration
#==============================================================================
#ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。
#重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接
yarn.application-attempts: 10
#yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects%
#yarn.maximum-failed-containers: 100
#yarn.tags: flink


#==============================================================================
# HistoryServer
#==============================================================================
heartbeat.timeout: 1800000




请教的问题:
 
通过   ./bin/flink run \
-d -t yarn-per-job \
-yjm 1536  \
-ytm 3072   \
-yD jobmanager.memory.process.size=1.5GB  \
-yD taskmanager.memory.process.size=3GB   \
-yD heartbeat.timeout=1800000   \
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar
这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗?




祝好!
刘海






| |
刘海
|
|
[hidden email]
|
签名由网易邮箱大师定制
Reply | Threaded
Open this post in threaded view
|

回复:yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

刘海
2021年1月18日 09:15[hidden email] 写道:
Hi  Dear All,
   请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:

#==============================================================================
# Common 通用设置选项
#==============================================================================
jobmanager.rpc.address: cdh1

# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m

# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m

# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme

#==============================================================================
# High Availability
#==============================================================================
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper stores
# the small ground truth for checkpoint and leader election, this location stores
# the larger objects, like persisted dataflow graphs.
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...) 
high-availability.storageDir: hdfs:///flink/ha/
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
high-availability.zookeeper.client.acl: open
high-availability.zookeeper.path.root: /flink
#==============================================================================
# Fault tolerance、checkpointing  and  state backends 容错能力、检查点和状态后端
#==============================================================================
state.backend: rocksdb
#选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异,
#而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小,
#而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项
state.backend.incremental: true
#是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复
state.backend.local-recovery: true
#RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB”
state.backend.rocksdb.block.cache-size: 268435456
#这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB
state.backend.rocksdb.timer-service.factory: HEAP
# Directory for checkpoints filesystem, when using any of the default bundled 
# state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录
state.checkpoints.dir: hdfs:///flink/flink-checkpoints
# Default target directory for savepoints, optional.
#保存点的默认目录。由状态后端用于将保存点写入文件系统
state.savepoints.dir: hdfs:///flink/flink-savepoints
# 要保留的最大已完成检查点数
state.checkpoints.num-retained: 3
#此选项指定作业计算如何从任务失败中恢复。可接受的值为:
#'full':重新启动所有任务以恢复作业。
#“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。
jobmanager.execution.failover-strategy: region
#==============================================================================
# Advanced
#==============================================================================

# Override the directories for temporary files. If not specified, the
# system-specific Java temporary directory (java.io.tmpdir property) is taken.
#
# For framework setups on Yarn or Mesos, Flink will automatically pick up the
# containers' temp directories without any need for configuration.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# io.tmp.dirs: /tmp

# The classloading resolve order. Possible values are 'child-first' (Flink's default)
# and 'parent-first' (Java's default).
#
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
#
# classloader.resolve-order: child-first

# The amount of memory going to the network stack. These numbers usually need 
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, the default max is 1GB.
# taskmanager.memory.network.fraction: 0.1
# taskmanager.memory.network.min: 64mb
# taskmanager.memory.network.max: 1gb


#==============================================================================
# YARN Configuration
#==============================================================================
#ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。
#重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接
yarn.application-attempts: 10
#yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects%
#yarn.maximum-failed-containers: 100
#yarn.tags: flink

#==============================================================================
# HistoryServer
#==============================================================================
heartbeat.timeout: 1800000


请教的问题:
 
通过   ./bin/flink run \
-d -t yarn-per-job \ 
-yjm 1536  \
-ytm 3072   \
-yD jobmanager.memory.process.size=1.5GB  \
-yD taskmanager.memory.process.size=3GB   \
-yD heartbeat.timeout=1800000   \
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar
这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗?


祝好!
刘海




Reply | Threaded
Open this post in threaded view
|

Re: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

Yangze Guo
Hi, 请使用 -D -tm -jm 不需要加y前缀

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Jan 18, 2021 at 9:19 AM 刘海 <[hidden email]> wrote:

>
>
> 刘海
> [hidden email]
> 签名由 网易邮箱大师 定制
> 在2021年1月18日 09:15,刘海<[hidden email]> 写道:
>
> Hi  Dear All,
>    请教各位一个问题,下面是我的集群配置:
> 1、我现在使用的是flink1.12版本;
> 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
> 3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
> 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:
>
> #==============================================================================
> # Common 通用设置选项
> #==============================================================================
> jobmanager.rpc.address: cdh1
>
> # The RPC port where the JobManager is reachable.
> jobmanager.rpc.port: 6123
> # The total process memory size for the JobManager.
> #
> # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
> jobmanager.memory.process.size: 2048m
>
> # The total process memory size for the TaskManager.
> # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
> taskmanager.memory.process.size: 6144m
>
> # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
> # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
> # taskmanager.memory.flink.size: 1280m
> # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
> #TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
> #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
> taskmanager.numberOfTaskSlots: 1
> # The parallelism used for programs that did not specify and other parallelism.
> #当未在任何地方指定并行度时使用的默认并行性(默认值:1)
> parallelism.default: 1
> #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
> #taskmanager.host: 0.0.0.0
> # The default file system scheme and authority.
> # By default file paths without scheme are interpreted relative to the local
> # root file system 'file:///'. Use this to override the default and interpret
> # relative paths relative to a different file system,
> # for example 'hdfs://mynamenode:12345'
> #
> # fs.default-scheme
>
> #==============================================================================
> # High Availability
> #==============================================================================
> # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
> high-availability: zookeeper
> # The path where metadata for master recovery is persisted. While ZooKeeper stores
> # the small ground truth for checkpoint and leader election, this location stores
> # the larger objects, like persisted dataflow graphs.
> # Must be a durable file system that is accessible from all nodes
> # (like HDFS, S3, Ceph, nfs, ...)
> high-availability.storageDir: hdfs:///flink/ha/
> # The list of ZooKeeper quorum peers that coordinate the high-availability
> # setup. This must be a list of the form:
> # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
> high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
> high-availability.zookeeper.client.acl: open
> high-availability.zookeeper.path.root: /flink
> #==============================================================================
> # Fault tolerance、checkpointing  and  state backends 容错能力、检查点和状态后端
> #==============================================================================
> state.backend: rocksdb
> #选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异,
> #而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小,
> #而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项
> state.backend.incremental: true
> #是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复
> state.backend.local-recovery: true
> #RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB”
> state.backend.rocksdb.block.cache-size: 268435456
> #这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB
> state.backend.rocksdb.timer-service.factory: HEAP
> # Directory for checkpoints filesystem, when using any of the default bundled
> # state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录
> state.checkpoints.dir: hdfs:///flink/flink-checkpoints
> # Default target directory for savepoints, optional.
> #保存点的默认目录。由状态后端用于将保存点写入文件系统
> state.savepoints.dir: hdfs:///flink/flink-savepoints
> # 要保留的最大已完成检查点数
> state.checkpoints.num-retained: 3
> #此选项指定作业计算如何从任务失败中恢复。可接受的值为:
> #'full':重新启动所有任务以恢复作业。
> #“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。
> jobmanager.execution.failover-strategy: region
> #==============================================================================
> # Advanced
> #==============================================================================
>
> # Override the directories for temporary files. If not specified, the
> # system-specific Java temporary directory (java.io.tmpdir property) is taken.
> #
> # For framework setups on Yarn or Mesos, Flink will automatically pick up the
> # containers' temp directories without any need for configuration.
> #
> # Add a delimited list for multiple directories, using the system directory
> # delimiter (colon ':' on unix) or a comma, e.g.:
> #     /data1/tmp:/data2/tmp:/data3/tmp
> #
> # Note: Each directory entry is read from and written to by a different I/O
> # thread. You can include the same directory multiple times in order to create
> # multiple I/O threads against that directory. This is for example relevant for
> # high-throughput RAIDs.
> #
> # io.tmp.dirs: /tmp
>
> # The classloading resolve order. Possible values are 'child-first' (Flink's default)
> # and 'parent-first' (Java's default).
> #
> # Child first classloading allows users to use different dependency/library
> # versions in their application than those in the classpath. Switching back
> # to 'parent-first' may help with debugging dependency issues.
> #
> # classloader.resolve-order: child-first
>
> # The amount of memory going to the network stack. These numbers usually need
> # no tuning. Adjusting them may be necessary in case of an "Insufficient number
> # of network buffers" error. The default min is 64MB, the default max is 1GB.
> #
> # taskmanager.memory.network.fraction: 0.1
> # taskmanager.memory.network.min: 64mb
> # taskmanager.memory.network.max: 1gb
>
>
> #==============================================================================
> # YARN Configuration
> #==============================================================================
> #ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。
> #重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接
> yarn.application-attempts: 10
> #yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects%
> #yarn.maximum-failed-containers: 100
> #yarn.tags: flink
>
> #==============================================================================
> # HistoryServer
> #==============================================================================
> heartbeat.timeout: 1800000
>
>
> 请教的问题:
>
> 通过   ./bin/flink run \
> -d -t yarn-per-job \
> -yjm 1536  \
> -ytm 3072   \
> -yD jobmanager.memory.process.size=1.5GB  \
> -yD taskmanager.memory.process.size=3GB   \
> -yD heartbeat.timeout=1800000   \
> /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar
> 这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗?
>
>
> 祝好!
> 刘海
>
>
>
>
> 刘海
> [hidden email]
> 签名由 网易邮箱大师 定制
Reply | Threaded
Open this post in threaded view
|

回复: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

刘海
你好
 根据你的建议我试了一下
将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D heartbeat.timeout=1800000  /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


结果出现找不到jar包的异常:
org.apache.flink.client.cli.CliArgsException: Could not get job jar and dependencies from JAR file: JAR file does not exist: 1536
at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) [hadoop-common-3.0.0-cdh6.3.2.jar:?]
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) [flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536
at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 8 more


| |
刘海
|
|
[hidden email]
|
签名由网易邮箱大师定制
在2021年1月18日 10:12,Yangze Guo<[hidden email]> 写道:
Hi, 请使用 -D -tm -jm 不需要加y前缀

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Jan 18, 2021 at 9:19 AM 刘海 <[hidden email]> wrote:


刘海
[hidden email]
签名由 网易邮箱大师 定制
在2021年1月18日 09:15,刘海<[hidden email]> 写道:

Hi  Dear All,
请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:

#==============================================================================
# Common 通用设置选项
#==============================================================================
jobmanager.rpc.address: cdh1

# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m

# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m

# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme

#==============================================================================
# High Availability
#==============================================================================
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper stores
# the small ground truth for checkpoint and leader election, this location stores
# the larger objects, like persisted dataflow graphs.
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...)
high-availability.storageDir: hdfs:///flink/ha/
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
high-availability.zookeeper.client.acl: open
high-availability.zookeeper.path.root: /flink
#==============================================================================
# Fault tolerance、checkpointing  and  state backends 容错能力、检查点和状态后端
#==============================================================================
state.backend: rocksdb
#选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异,
#而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小,
#而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项
state.backend.incremental: true
#是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复
state.backend.local-recovery: true
#RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB”
state.backend.rocksdb.block.cache-size: 268435456
#这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB
state.backend.rocksdb.timer-service.factory: HEAP
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录
state.checkpoints.dir: hdfs:///flink/flink-checkpoints
# Default target directory for savepoints, optional.
#保存点的默认目录。由状态后端用于将保存点写入文件系统
state.savepoints.dir: hdfs:///flink/flink-savepoints
# 要保留的最大已完成检查点数
state.checkpoints.num-retained: 3
#此选项指定作业计算如何从任务失败中恢复。可接受的值为:
#'full':重新启动所有任务以恢复作业。
#“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。
jobmanager.execution.failover-strategy: region
#==============================================================================
# Advanced
#==============================================================================

# Override the directories for temporary files. If not specified, the
# system-specific Java temporary directory (java.io.tmpdir property) is taken.
#
# For framework setups on Yarn or Mesos, Flink will automatically pick up the
# containers' temp directories without any need for configuration.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# io.tmp.dirs: /tmp

# The classloading resolve order. Possible values are 'child-first' (Flink's default)
# and 'parent-first' (Java's default).
#
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
#
# classloader.resolve-order: child-first

# The amount of memory going to the network stack. These numbers usually need
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, the default max is 1GB.
#
# taskmanager.memory.network.fraction: 0.1
# taskmanager.memory.network.min: 64mb
# taskmanager.memory.network.max: 1gb


#==============================================================================
# YARN Configuration
#==============================================================================
#ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。
#重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接
yarn.application-attempts: 10
#yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects%
#yarn.maximum-failed-containers: 100
#yarn.tags: flink

#==============================================================================
# HistoryServer
#==============================================================================
heartbeat.timeout: 1800000


请教的问题:

通过   ./bin/flink run \
-d -t yarn-per-job \
-yjm 1536  \
-ytm 3072   \
-yD jobmanager.memory.process.size=1.5GB  \
-yD taskmanager.memory.process.size=3GB   \
-yD heartbeat.timeout=1800000   \
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar
这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗?


祝好!
刘海




刘海
[hidden email]
签名由 网易邮箱大师 定制
Reply | Threaded
Open this post in threaded view
|

Re: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

Yangze Guo
请问这个路径是你本地的路径么?需要client端能根据这个路径找到jar包

Best,
Yangze Guo

On Mon, Jan 18, 2021 at 10:34 AM 刘海 <[hidden email]> wrote:

>
> 你好
>  根据你的建议我试了一下
> 将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D heartbeat.timeout=1800000  /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar
>
>
> jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar
>
>
> 结果出现找不到jar包的异常:
> org.apache.flink.client.cli.CliArgsException: Could not get job jar and dependencies from JAR file: JAR file does not exist: 1536
> at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
> at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) [hadoop-common-3.0.0-cdh6.3.2.jar:?]
> at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) [flink-dist_2.11-1.12.0.jar:1.12.0]
> Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536
> at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> ... 8 more
>
>
> | |
> 刘海
> |
> |
> [hidden email]
> |
> 签名由网易邮箱大师定制
> 在2021年1月18日 10:12,Yangze Guo<[hidden email]> 写道:
> Hi, 请使用 -D -tm -jm 不需要加y前缀
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Mon, Jan 18, 2021 at 9:19 AM 刘海 <[hidden email]> wrote:
>
>
> 刘海
> [hidden email]
> 签名由 网易邮箱大师 定制
> 在2021年1月18日 09:15,刘海<[hidden email]> 写道:
>
> Hi  Dear All,
> 请教各位一个问题,下面是我的集群配置:
> 1、我现在使用的是flink1.12版本;
> 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
> 3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
> 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:
>
> #==============================================================================
> # Common 通用设置选项
> #==============================================================================
> jobmanager.rpc.address: cdh1
>
> # The RPC port where the JobManager is reachable.
> jobmanager.rpc.port: 6123
> # The total process memory size for the JobManager.
> #
> # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
> jobmanager.memory.process.size: 2048m
>
> # The total process memory size for the TaskManager.
> # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
> taskmanager.memory.process.size: 6144m
>
> # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
> # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
> # taskmanager.memory.flink.size: 1280m
> # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
> #TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
> #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
> taskmanager.numberOfTaskSlots: 1
> # The parallelism used for programs that did not specify and other parallelism.
> #当未在任何地方指定并行度时使用的默认并行性(默认值:1)
> parallelism.default: 1
> #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
> #taskmanager.host: 0.0.0.0
> # The default file system scheme and authority.
> # By default file paths without scheme are interpreted relative to the local
> # root file system 'file:///'. Use this to override the default and interpret
> # relative paths relative to a different file system,
> # for example 'hdfs://mynamenode:12345'
> #
> # fs.default-scheme
>
> #==============================================================================
> # High Availability
> #==============================================================================
> # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
> high-availability: zookeeper
> # The path where metadata for master recovery is persisted. While ZooKeeper stores
> # the small ground truth for checkpoint and leader election, this location stores
> # the larger objects, like persisted dataflow graphs.
> # Must be a durable file system that is accessible from all nodes
> # (like HDFS, S3, Ceph, nfs, ...)
> high-availability.storageDir: hdfs:///flink/ha/
> # The list of ZooKeeper quorum peers that coordinate the high-availability
> # setup. This must be a list of the form:
> # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
> high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
> high-availability.zookeeper.client.acl: open
> high-availability.zookeeper.path.root: /flink
> #==============================================================================
> # Fault tolerance、checkpointing  and  state backends 容错能力、检查点和状态后端
> #==============================================================================
> state.backend: rocksdb
> #选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异,
> #而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小,
> #而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项
> state.backend.incremental: true
> #是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复
> state.backend.local-recovery: true
> #RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB”
> state.backend.rocksdb.block.cache-size: 268435456
> #这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB
> state.backend.rocksdb.timer-service.factory: HEAP
> # Directory for checkpoints filesystem, when using any of the default bundled
> # state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录
> state.checkpoints.dir: hdfs:///flink/flink-checkpoints
> # Default target directory for savepoints, optional.
> #保存点的默认目录。由状态后端用于将保存点写入文件系统
> state.savepoints.dir: hdfs:///flink/flink-savepoints
> # 要保留的最大已完成检查点数
> state.checkpoints.num-retained: 3
> #此选项指定作业计算如何从任务失败中恢复。可接受的值为:
> #'full':重新启动所有任务以恢复作业。
> #“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。
> jobmanager.execution.failover-strategy: region
> #==============================================================================
> # Advanced
> #==============================================================================
>
> # Override the directories for temporary files. If not specified, the
> # system-specific Java temporary directory (java.io.tmpdir property) is taken.
> #
> # For framework setups on Yarn or Mesos, Flink will automatically pick up the
> # containers' temp directories without any need for configuration.
> #
> # Add a delimited list for multiple directories, using the system directory
> # delimiter (colon ':' on unix) or a comma, e.g.:
> #     /data1/tmp:/data2/tmp:/data3/tmp
> #
> # Note: Each directory entry is read from and written to by a different I/O
> # thread. You can include the same directory multiple times in order to create
> # multiple I/O threads against that directory. This is for example relevant for
> # high-throughput RAIDs.
> #
> # io.tmp.dirs: /tmp
>
> # The classloading resolve order. Possible values are 'child-first' (Flink's default)
> # and 'parent-first' (Java's default).
> #
> # Child first classloading allows users to use different dependency/library
> # versions in their application than those in the classpath. Switching back
> # to 'parent-first' may help with debugging dependency issues.
> #
> # classloader.resolve-order: child-first
>
> # The amount of memory going to the network stack. These numbers usually need
> # no tuning. Adjusting them may be necessary in case of an "Insufficient number
> # of network buffers" error. The default min is 64MB, the default max is 1GB.
> #
> # taskmanager.memory.network.fraction: 0.1
> # taskmanager.memory.network.min: 64mb
> # taskmanager.memory.network.max: 1gb
>
>
> #==============================================================================
> # YARN Configuration
> #==============================================================================
> #ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。
> #重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接
> yarn.application-attempts: 10
> #yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects%
> #yarn.maximum-failed-containers: 100
> #yarn.tags: flink
>
> #==============================================================================
> # HistoryServer
> #==============================================================================
> heartbeat.timeout: 1800000
>
>
> 请教的问题:
>
> 通过   ./bin/flink run \
> -d -t yarn-per-job \
> -yjm 1536  \
> -ytm 3072   \
> -yD jobmanager.memory.process.size=1.5GB  \
> -yD taskmanager.memory.process.size=3GB   \
> -yD heartbeat.timeout=1800000   \
> /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar
> 这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗?
>
>
> 祝好!
> 刘海
>
>
>
>
> 刘海
> [hidden email]
> 签名由 网易邮箱大师 定制
Reply | Threaded
Open this post in threaded view
|

回复: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

刘海
是我本地服务器的路径,需要在三个节点上都上传这个jar包吗?


放在 /opt/flink-1.12.0/examples目录下了


| |
刘海
|
|
[hidden email]
|
签名由网易邮箱大师定制
在2021年1月18日 10:38,Yangze Guo<[hidden email]> 写道:
请问这个路径是你本地的路径么?需要client端能根据这个路径找到jar包

Best,
Yangze Guo

On Mon, Jan 18, 2021 at 10:34 AM 刘海 <[hidden email]> wrote:

你好
根据你的建议我试了一下
将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D heartbeat.timeout=1800000  /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


结果出现找不到jar包的异常:
org.apache.flink.client.cli.CliArgsException: Could not get job jar and dependencies from JAR file: JAR file does not exist: 1536
at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) [hadoop-common-3.0.0-cdh6.3.2.jar:?]
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) [flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536
at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 8 more


| |
刘海
|
|
[hidden email]
|
签名由网易邮箱大师定制
在2021年1月18日 10:12,Yangze Guo<[hidden email]> 写道:
Hi, 请使用 -D -tm -jm 不需要加y前缀

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Jan 18, 2021 at 9:19 AM 刘海 <[hidden email]> wrote:


刘海
[hidden email]
签名由 网易邮箱大师 定制
在2021年1月18日 09:15,刘海<[hidden email]> 写道:

Hi  Dear All,
请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:

#==============================================================================
# Common 通用设置选项
#==============================================================================
jobmanager.rpc.address: cdh1

# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m

# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m

# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme

#==============================================================================
# High Availability
#==============================================================================
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper stores
# the small ground truth for checkpoint and leader election, this location stores
# the larger objects, like persisted dataflow graphs.
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...)
high-availability.storageDir: hdfs:///flink/ha/
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
high-availability.zookeeper.client.acl: open
high-availability.zookeeper.path.root: /flink
#==============================================================================
# Fault tolerance、checkpointing  and  state backends 容错能力、检查点和状态后端
#==============================================================================
state.backend: rocksdb
#选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异,
#而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小,
#而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项
state.backend.incremental: true
#是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复
state.backend.local-recovery: true
#RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB”
state.backend.rocksdb.block.cache-size: 268435456
#这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB
state.backend.rocksdb.timer-service.factory: HEAP
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录
state.checkpoints.dir: hdfs:///flink/flink-checkpoints
# Default target directory for savepoints, optional.
#保存点的默认目录。由状态后端用于将保存点写入文件系统
state.savepoints.dir: hdfs:///flink/flink-savepoints
# 要保留的最大已完成检查点数
state.checkpoints.num-retained: 3
#此选项指定作业计算如何从任务失败中恢复。可接受的值为:
#'full':重新启动所有任务以恢复作业。
#“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。
jobmanager.execution.failover-strategy: region
#==============================================================================
# Advanced
#==============================================================================

# Override the directories for temporary files. If not specified, the
# system-specific Java temporary directory (java.io.tmpdir property) is taken.
#
# For framework setups on Yarn or Mesos, Flink will automatically pick up the
# containers' temp directories without any need for configuration.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# io.tmp.dirs: /tmp

# The classloading resolve order. Possible values are 'child-first' (Flink's default)
# and 'parent-first' (Java's default).
#
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
#
# classloader.resolve-order: child-first

# The amount of memory going to the network stack. These numbers usually need
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, the default max is 1GB.
#
# taskmanager.memory.network.fraction: 0.1
# taskmanager.memory.network.min: 64mb
# taskmanager.memory.network.max: 1gb


#==============================================================================
# YARN Configuration
#==============================================================================
#ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。
#重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接
yarn.application-attempts: 10
#yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects%
#yarn.maximum-failed-containers: 100
#yarn.tags: flink

#==============================================================================
# HistoryServer
#==============================================================================
heartbeat.timeout: 1800000


请教的问题:

通过   ./bin/flink run \
-d -t yarn-per-job \
-yjm 1536  \
-ytm 3072   \
-yD jobmanager.memory.process.size=1.5GB  \
-yD taskmanager.memory.process.size=3GB   \
-yD heartbeat.timeout=1800000   \
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar
这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗?


祝好!
刘海




刘海
[hidden email]
签名由 网易邮箱大师 定制
Reply | Threaded
Open this post in threaded view
|

回复: yarn Per-Job Cluster Mode提交任务时 通过cli指定的内存参数无效

刘海
还有一个问题,我已经有一个job在运行了,当我再次提交一个job运行的时候输出下面这些信息,去yarn查看发现job并未启动起来,有遇到过这个现象吗?


[root@cdh1 flink-1.12.0]# ./bin/flink run -d -t yarn-per-job  -D jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D heartbeat.timeout=1800000 /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
[root@cdh1 flink-1.12.0]#




| |
刘海
|
|
[hidden email]
|
签名由网易邮箱大师定制
在2021年1月18日 10:42,刘海<[hidden email]> 写道:
是我本地服务器的路径,需要在三个节点上都上传这个jar包吗?


放在 /opt/flink-1.12.0/examples目录下了


| |
刘海
|
|
[hidden email]
|
签名由网易邮箱大师定制
在2021年1月18日 10:38,Yangze Guo<[hidden email]> 写道:
请问这个路径是你本地的路径么?需要client端能根据这个路径找到jar包

Best,
Yangze Guo

On Mon, Jan 18, 2021 at 10:34 AM 刘海 <[hidden email]> wrote:

你好
根据你的建议我试了一下
将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D heartbeat.timeout=1800000  /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar


结果出现找不到jar包的异常:
org.apache.flink.client.cli.CliArgsException: Could not get job jar and dependencies from JAR file: JAR file does not exist: 1536
at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) [hadoop-common-3.0.0-cdh6.3.2.jar:?]
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) [flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536
at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 8 more


| |
刘海
|
|
[hidden email]
|
签名由网易邮箱大师定制
在2021年1月18日 10:12,Yangze Guo<[hidden email]> 写道:
Hi, 请使用 -D -tm -jm 不需要加y前缀

Best,
Yangze Guo

Best,
Yangze Guo


On Mon, Jan 18, 2021 at 9:19 AM 刘海 <[hidden email]> wrote:


刘海
[hidden email]
签名由 网易邮箱大师 定制
在2021年1月18日 09:15,刘海<[hidden email]> 写道:

Hi  Dear All,
请教各位一个问题,下面是我的集群配置:
1、我现在使用的是flink1.12版本;
2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群;
3、flink运行模式:Per-Job Cluster  on yarn(三个节点,没每个节点48核64G内存);
4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样:

#==============================================================================
# Common 通用设置选项
#==============================================================================
jobmanager.rpc.address: cdh1

# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
jobmanager.memory.process.size: 2048m

# The total process memory size for the TaskManager.
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
taskmanager.memory.process.size: 6144m

# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
# taskmanager.memory.flink.size: 1280m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
#TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助
#分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销)
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#当未在任何地方指定并行度时使用的默认并行性(默认值:1)
parallelism.default: 1
#添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost
#taskmanager.host: 0.0.0.0
# The default file system scheme and authority.
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme

#==============================================================================
# High Availability
#==============================================================================
# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper stores
# the small ground truth for checkpoint and leader election, this location stores
# the larger objects, like persisted dataflow graphs.
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...)
high-availability.storageDir: hdfs:///flink/ha/
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181
high-availability.zookeeper.client.acl: open
high-availability.zookeeper.path.root: /flink
#==============================================================================
# Fault tolerance、checkpointing  and  state backends 容错能力、检查点和状态后端
#==============================================================================
state.backend: rocksdb
#选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异,
#而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小,
#而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项
state.backend.incremental: true
#是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复
state.backend.local-recovery: true
#RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB”
state.backend.rocksdb.block.cache-size: 268435456
#这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB
state.backend.rocksdb.timer-service.factory: HEAP
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录
state.checkpoints.dir: hdfs:///flink/flink-checkpoints
# Default target directory for savepoints, optional.
#保存点的默认目录。由状态后端用于将保存点写入文件系统
state.savepoints.dir: hdfs:///flink/flink-savepoints
# 要保留的最大已完成检查点数
state.checkpoints.num-retained: 3
#此选项指定作业计算如何从任务失败中恢复。可接受的值为:
#'full':重新启动所有任务以恢复作业。
#“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。
jobmanager.execution.failover-strategy: region
#==============================================================================
# Advanced
#==============================================================================

# Override the directories for temporary files. If not specified, the
# system-specific Java temporary directory (java.io.tmpdir property) is taken.
#
# For framework setups on Yarn or Mesos, Flink will automatically pick up the
# containers' temp directories without any need for configuration.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# io.tmp.dirs: /tmp

# The classloading resolve order. Possible values are 'child-first' (Flink's default)
# and 'parent-first' (Java's default).
#
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
#
# classloader.resolve-order: child-first

# The amount of memory going to the network stack. These numbers usually need
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, the default max is 1GB.
#
# taskmanager.memory.network.fraction: 0.1
# taskmanager.memory.network.min: 64mb
# taskmanager.memory.network.max: 1gb


#==============================================================================
# YARN Configuration
#==============================================================================
#ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。
#重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接
yarn.application-attempts: 10
#yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects%
#yarn.maximum-failed-containers: 100
#yarn.tags: flink

#==============================================================================
# HistoryServer
#==============================================================================
heartbeat.timeout: 1800000


请教的问题:

通过   ./bin/flink run \
-d -t yarn-per-job \
-yjm 1536  \
-ytm 3072   \
-yD jobmanager.memory.process.size=1.5GB  \
-yD taskmanager.memory.process.size=3GB   \
-yD heartbeat.timeout=1800000   \
/opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar
这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗?


祝好!
刘海




刘海
[hidden email]
签名由 网易邮箱大师 定制