Hi Dear All,
请教各位一个问题,下面是我的集群配置: 1、我现在使用的是flink1.12版本; 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群; 3、flink运行模式:Per-Job Cluster on yarn(三个节点,没每个节点48核64G内存); 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样: #============================================================================== # Common 通用设置选项 #============================================================================== jobmanager.rpc.address: cdh1 # The RPC port where the JobManager is reachable. jobmanager.rpc.port: 6123 # The total process memory size for the JobManager. # # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. jobmanager.memory.process.size: 2048m # The total process memory size for the TaskManager. # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. taskmanager.memory.process.size: 6144m # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. # taskmanager.memory.flink.size: 1280m # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. #TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助 #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销) taskmanager.numberOfTaskSlots: 1 # The parallelism used for programs that did not specify and other parallelism. #当未在任何地方指定并行度时使用的默认并行性(默认值:1) parallelism.default: 1 #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost #taskmanager.host: 0.0.0.0 # The default file system scheme and authority. # By default file paths without scheme are interpreted relative to the local # root file system 'file:///'. Use this to override the default and interpret # relative paths relative to a different file system, # for example 'hdfs://mynamenode:12345' # # fs.default-scheme #============================================================================== # High Availability #============================================================================== # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. high-availability: zookeeper # The path where metadata for master recovery is persisted. While ZooKeeper stores # the small ground truth for checkpoint and leader election, this location stores # the larger objects, like persisted dataflow graphs. # Must be a durable file system that is accessible from all nodes # (like HDFS, S3, Ceph, nfs, ...) high-availability.storageDir: hdfs:///flink/ha/ # The list of ZooKeeper quorum peers that coordinate the high-availability # setup. This must be a list of the form: # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 high-availability.zookeeper.client.acl: open high-availability.zookeeper.path.root: /flink #============================================================================== # Fault tolerance、checkpointing and state backends 容错能力、检查点和状态后端 #============================================================================== state.backend: rocksdb #选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异, #而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小, #而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项 state.backend.incremental: true #是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复 state.backend.local-recovery: true #RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB” state.backend.rocksdb.block.cache-size: 268435456 #这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB state.backend.rocksdb.timer-service.factory: HEAP # Directory for checkpoints filesystem, when using any of the default bundled # state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录 state.checkpoints.dir: hdfs:///flink/flink-checkpoints # Default target directory for savepoints, optional. #保存点的默认目录。由状态后端用于将保存点写入文件系统 state.savepoints.dir: hdfs:///flink/flink-savepoints # 要保留的最大已完成检查点数 state.checkpoints.num-retained: 3 #此选项指定作业计算如何从任务失败中恢复。可接受的值为: #'full':重新启动所有任务以恢复作业。 #“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。 jobmanager.execution.failover-strategy: region #============================================================================== # Advanced #============================================================================== # Override the directories for temporary files. If not specified, the # system-specific Java temporary directory (java.io.tmpdir property) is taken. # # For framework setups on Yarn or Mesos, Flink will automatically pick up the # containers' temp directories without any need for configuration. # # Add a delimited list for multiple directories, using the system directory # delimiter (colon ':' on unix) or a comma, e.g.: # /data1/tmp:/data2/tmp:/data3/tmp # # Note: Each directory entry is read from and written to by a different I/O # thread. You can include the same directory multiple times in order to create # multiple I/O threads against that directory. This is for example relevant for # high-throughput RAIDs. # # io.tmp.dirs: /tmp # The classloading resolve order. Possible values are 'child-first' (Flink's default) # and 'parent-first' (Java's default). # # Child first classloading allows users to use different dependency/library # versions in their application than those in the classpath. Switching back # to 'parent-first' may help with debugging dependency issues. # # classloader.resolve-order: child-first # The amount of memory going to the network stack. These numbers usually need # no tuning. Adjusting them may be necessary in case of an "Insufficient number # of network buffers" error. The default min is 64MB, the default max is 1GB. # # taskmanager.memory.network.fraction: 0.1 # taskmanager.memory.network.min: 64mb # taskmanager.memory.network.max: 1gb #============================================================================== # YARN Configuration #============================================================================== #ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。 #重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接 yarn.application-attempts: 10 #yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects% #yarn.maximum-failed-containers: 100 #yarn.tags: flink #============================================================================== # HistoryServer #============================================================================== heartbeat.timeout: 1800000 请教的问题: 通过 ./bin/flink run \ -d -t yarn-per-job \ -yjm 1536 \ -ytm 3072 \ -yD jobmanager.memory.process.size=1.5GB \ -yD taskmanager.memory.process.size=3GB \ -yD heartbeat.timeout=1800000 \ /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar 这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗? 祝好! 刘海 | | 刘海 | | [hidden email] | 签名由网易邮箱大师定制 |
在2021年1月18日 09:15,[hidden email] 写道:
|
Hi, 请使用 -D -tm -jm 不需要加y前缀
Best, Yangze Guo Best, Yangze Guo On Mon, Jan 18, 2021 at 9:19 AM 刘海 <[hidden email]> wrote: > > > 刘海 > [hidden email] > 签名由 网易邮箱大师 定制 > 在2021年1月18日 09:15,刘海<[hidden email]> 写道: > > Hi Dear All, > 请教各位一个问题,下面是我的集群配置: > 1、我现在使用的是flink1.12版本; > 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群; > 3、flink运行模式:Per-Job Cluster on yarn(三个节点,没每个节点48核64G内存); > 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样: > > #============================================================================== > # Common 通用设置选项 > #============================================================================== > jobmanager.rpc.address: cdh1 > > # The RPC port where the JobManager is reachable. > jobmanager.rpc.port: 6123 > # The total process memory size for the JobManager. > # > # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. > jobmanager.memory.process.size: 2048m > > # The total process memory size for the TaskManager. > # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. > taskmanager.memory.process.size: 6144m > > # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. > # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. > # taskmanager.memory.flink.size: 1280m > # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. > #TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助 > #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销) > taskmanager.numberOfTaskSlots: 1 > # The parallelism used for programs that did not specify and other parallelism. > #当未在任何地方指定并行度时使用的默认并行性(默认值:1) > parallelism.default: 1 > #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost > #taskmanager.host: 0.0.0.0 > # The default file system scheme and authority. > # By default file paths without scheme are interpreted relative to the local > # root file system 'file:///'. Use this to override the default and interpret > # relative paths relative to a different file system, > # for example 'hdfs://mynamenode:12345' > # > # fs.default-scheme > > #============================================================================== > # High Availability > #============================================================================== > # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. > high-availability: zookeeper > # The path where metadata for master recovery is persisted. While ZooKeeper stores > # the small ground truth for checkpoint and leader election, this location stores > # the larger objects, like persisted dataflow graphs. > # Must be a durable file system that is accessible from all nodes > # (like HDFS, S3, Ceph, nfs, ...) > high-availability.storageDir: hdfs:///flink/ha/ > # The list of ZooKeeper quorum peers that coordinate the high-availability > # setup. This must be a list of the form: > # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) > high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 > high-availability.zookeeper.client.acl: open > high-availability.zookeeper.path.root: /flink > #============================================================================== > # Fault tolerance、checkpointing and state backends 容错能力、检查点和状态后端 > #============================================================================== > state.backend: rocksdb > #选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异, > #而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小, > #而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项 > state.backend.incremental: true > #是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复 > state.backend.local-recovery: true > #RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB” > state.backend.rocksdb.block.cache-size: 268435456 > #这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB > state.backend.rocksdb.timer-service.factory: HEAP > # Directory for checkpoints filesystem, when using any of the default bundled > # state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录 > state.checkpoints.dir: hdfs:///flink/flink-checkpoints > # Default target directory for savepoints, optional. > #保存点的默认目录。由状态后端用于将保存点写入文件系统 > state.savepoints.dir: hdfs:///flink/flink-savepoints > # 要保留的最大已完成检查点数 > state.checkpoints.num-retained: 3 > #此选项指定作业计算如何从任务失败中恢复。可接受的值为: > #'full':重新启动所有任务以恢复作业。 > #“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。 > jobmanager.execution.failover-strategy: region > #============================================================================== > # Advanced > #============================================================================== > > # Override the directories for temporary files. If not specified, the > # system-specific Java temporary directory (java.io.tmpdir property) is taken. > # > # For framework setups on Yarn or Mesos, Flink will automatically pick up the > # containers' temp directories without any need for configuration. > # > # Add a delimited list for multiple directories, using the system directory > # delimiter (colon ':' on unix) or a comma, e.g.: > # /data1/tmp:/data2/tmp:/data3/tmp > # > # Note: Each directory entry is read from and written to by a different I/O > # thread. You can include the same directory multiple times in order to create > # multiple I/O threads against that directory. This is for example relevant for > # high-throughput RAIDs. > # > # io.tmp.dirs: /tmp > > # The classloading resolve order. Possible values are 'child-first' (Flink's default) > # and 'parent-first' (Java's default). > # > # Child first classloading allows users to use different dependency/library > # versions in their application than those in the classpath. Switching back > # to 'parent-first' may help with debugging dependency issues. > # > # classloader.resolve-order: child-first > > # The amount of memory going to the network stack. These numbers usually need > # no tuning. Adjusting them may be necessary in case of an "Insufficient number > # of network buffers" error. The default min is 64MB, the default max is 1GB. > # > # taskmanager.memory.network.fraction: 0.1 > # taskmanager.memory.network.min: 64mb > # taskmanager.memory.network.max: 1gb > > > #============================================================================== > # YARN Configuration > #============================================================================== > #ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。 > #重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接 > yarn.application-attempts: 10 > #yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects% > #yarn.maximum-failed-containers: 100 > #yarn.tags: flink > > #============================================================================== > # HistoryServer > #============================================================================== > heartbeat.timeout: 1800000 > > > 请教的问题: > > 通过 ./bin/flink run \ > -d -t yarn-per-job \ > -yjm 1536 \ > -ytm 3072 \ > -yD jobmanager.memory.process.size=1.5GB \ > -yD taskmanager.memory.process.size=3GB \ > -yD heartbeat.timeout=1800000 \ > /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar > 这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗? > > > 祝好! > 刘海 > > > > > 刘海 > [hidden email] > 签名由 网易邮箱大师 定制 |
你好
根据你的建议我试了一下 将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D heartbeat.timeout=1800000 /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar 结果出现找不到jar包的异常: org.apache.flink.client.cli.CliArgsException: Could not get job jar and dependencies from JAR file: JAR file does not exist: 1536 at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181] at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) [hadoop-common-3.0.0-cdh6.3.2.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) [flink-dist_2.11-1.12.0.jar:1.12.0] Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536 at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 8 more | | 刘海 | | [hidden email] | 签名由网易邮箱大师定制 在2021年1月18日 10:12,Yangze Guo<[hidden email]> 写道: Hi, 请使用 -D -tm -jm 不需要加y前缀 Best, Yangze Guo Best, Yangze Guo On Mon, Jan 18, 2021 at 9:19 AM 刘海 <[hidden email]> wrote: 刘海 [hidden email] 签名由 网易邮箱大师 定制 在2021年1月18日 09:15,刘海<[hidden email]> 写道: Hi Dear All, 请教各位一个问题,下面是我的集群配置: 1、我现在使用的是flink1.12版本; 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群; 3、flink运行模式:Per-Job Cluster on yarn(三个节点,没每个节点48核64G内存); 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样: #============================================================================== # Common 通用设置选项 #============================================================================== jobmanager.rpc.address: cdh1 # The RPC port where the JobManager is reachable. jobmanager.rpc.port: 6123 # The total process memory size for the JobManager. # # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. jobmanager.memory.process.size: 2048m # The total process memory size for the TaskManager. # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. taskmanager.memory.process.size: 6144m # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. # taskmanager.memory.flink.size: 1280m # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. #TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助 #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销) taskmanager.numberOfTaskSlots: 1 # The parallelism used for programs that did not specify and other parallelism. #当未在任何地方指定并行度时使用的默认并行性(默认值:1) parallelism.default: 1 #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost #taskmanager.host: 0.0.0.0 # The default file system scheme and authority. # By default file paths without scheme are interpreted relative to the local # root file system 'file:///'. Use this to override the default and interpret # relative paths relative to a different file system, # for example 'hdfs://mynamenode:12345' # # fs.default-scheme #============================================================================== # High Availability #============================================================================== # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. high-availability: zookeeper # The path where metadata for master recovery is persisted. While ZooKeeper stores # the small ground truth for checkpoint and leader election, this location stores # the larger objects, like persisted dataflow graphs. # Must be a durable file system that is accessible from all nodes # (like HDFS, S3, Ceph, nfs, ...) high-availability.storageDir: hdfs:///flink/ha/ # The list of ZooKeeper quorum peers that coordinate the high-availability # setup. This must be a list of the form: # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 high-availability.zookeeper.client.acl: open high-availability.zookeeper.path.root: /flink #============================================================================== # Fault tolerance、checkpointing and state backends 容错能力、检查点和状态后端 #============================================================================== state.backend: rocksdb #选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异, #而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小, #而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项 state.backend.incremental: true #是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复 state.backend.local-recovery: true #RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB” state.backend.rocksdb.block.cache-size: 268435456 #这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB state.backend.rocksdb.timer-service.factory: HEAP # Directory for checkpoints filesystem, when using any of the default bundled # state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录 state.checkpoints.dir: hdfs:///flink/flink-checkpoints # Default target directory for savepoints, optional. #保存点的默认目录。由状态后端用于将保存点写入文件系统 state.savepoints.dir: hdfs:///flink/flink-savepoints # 要保留的最大已完成检查点数 state.checkpoints.num-retained: 3 #此选项指定作业计算如何从任务失败中恢复。可接受的值为: #'full':重新启动所有任务以恢复作业。 #“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。 jobmanager.execution.failover-strategy: region #============================================================================== # Advanced #============================================================================== # Override the directories for temporary files. If not specified, the # system-specific Java temporary directory (java.io.tmpdir property) is taken. # # For framework setups on Yarn or Mesos, Flink will automatically pick up the # containers' temp directories without any need for configuration. # # Add a delimited list for multiple directories, using the system directory # delimiter (colon ':' on unix) or a comma, e.g.: # /data1/tmp:/data2/tmp:/data3/tmp # # Note: Each directory entry is read from and written to by a different I/O # thread. You can include the same directory multiple times in order to create # multiple I/O threads against that directory. This is for example relevant for # high-throughput RAIDs. # # io.tmp.dirs: /tmp # The classloading resolve order. Possible values are 'child-first' (Flink's default) # and 'parent-first' (Java's default). # # Child first classloading allows users to use different dependency/library # versions in their application than those in the classpath. Switching back # to 'parent-first' may help with debugging dependency issues. # # classloader.resolve-order: child-first # The amount of memory going to the network stack. These numbers usually need # no tuning. Adjusting them may be necessary in case of an "Insufficient number # of network buffers" error. The default min is 64MB, the default max is 1GB. # # taskmanager.memory.network.fraction: 0.1 # taskmanager.memory.network.min: 64mb # taskmanager.memory.network.max: 1gb #============================================================================== # YARN Configuration #============================================================================== #ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。 #重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接 yarn.application-attempts: 10 #yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects% #yarn.maximum-failed-containers: 100 #yarn.tags: flink #============================================================================== # HistoryServer #============================================================================== heartbeat.timeout: 1800000 请教的问题: 通过 ./bin/flink run \ -d -t yarn-per-job \ -yjm 1536 \ -ytm 3072 \ -yD jobmanager.memory.process.size=1.5GB \ -yD taskmanager.memory.process.size=3GB \ -yD heartbeat.timeout=1800000 \ /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar 这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗? 祝好! 刘海 刘海 [hidden email] 签名由 网易邮箱大师 定制 |
请问这个路径是你本地的路径么?需要client端能根据这个路径找到jar包
Best, Yangze Guo On Mon, Jan 18, 2021 at 10:34 AM 刘海 <[hidden email]> wrote: > > 你好 > 根据你的建议我试了一下 > 将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D heartbeat.timeout=1800000 /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar > > > jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar > > > 结果出现找不到jar包的异常: > org.apache.flink.client.cli.CliArgsException: Could not get job jar and dependencies from JAR file: JAR file does not exist: 1536 > at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259) ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181] > at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181] > at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) [hadoop-common-3.0.0-cdh6.3.2.jar:?] > at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.12.0.jar:1.12.0] > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) [flink-dist_2.11-1.12.0.jar:1.12.0] > Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536 > at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256) ~[flink-dist_2.11-1.12.0.jar:1.12.0] > ... 8 more > > > | | > 刘海 > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > 在2021年1月18日 10:12,Yangze Guo<[hidden email]> 写道: > Hi, 请使用 -D -tm -jm 不需要加y前缀 > > Best, > Yangze Guo > > Best, > Yangze Guo > > > On Mon, Jan 18, 2021 at 9:19 AM 刘海 <[hidden email]> wrote: > > > 刘海 > [hidden email] > 签名由 网易邮箱大师 定制 > 在2021年1月18日 09:15,刘海<[hidden email]> 写道: > > Hi Dear All, > 请教各位一个问题,下面是我的集群配置: > 1、我现在使用的是flink1.12版本; > 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群; > 3、flink运行模式:Per-Job Cluster on yarn(三个节点,没每个节点48核64G内存); > 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样: > > #============================================================================== > # Common 通用设置选项 > #============================================================================== > jobmanager.rpc.address: cdh1 > > # The RPC port where the JobManager is reachable. > jobmanager.rpc.port: 6123 > # The total process memory size for the JobManager. > # > # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. > jobmanager.memory.process.size: 2048m > > # The total process memory size for the TaskManager. > # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. > taskmanager.memory.process.size: 6144m > > # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. > # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. > # taskmanager.memory.flink.size: 1280m > # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. > #TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助 > #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销) > taskmanager.numberOfTaskSlots: 1 > # The parallelism used for programs that did not specify and other parallelism. > #当未在任何地方指定并行度时使用的默认并行性(默认值:1) > parallelism.default: 1 > #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost > #taskmanager.host: 0.0.0.0 > # The default file system scheme and authority. > # By default file paths without scheme are interpreted relative to the local > # root file system 'file:///'. Use this to override the default and interpret > # relative paths relative to a different file system, > # for example 'hdfs://mynamenode:12345' > # > # fs.default-scheme > > #============================================================================== > # High Availability > #============================================================================== > # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. > high-availability: zookeeper > # The path where metadata for master recovery is persisted. While ZooKeeper stores > # the small ground truth for checkpoint and leader election, this location stores > # the larger objects, like persisted dataflow graphs. > # Must be a durable file system that is accessible from all nodes > # (like HDFS, S3, Ceph, nfs, ...) > high-availability.storageDir: hdfs:///flink/ha/ > # The list of ZooKeeper quorum peers that coordinate the high-availability > # setup. This must be a list of the form: > # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) > high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 > high-availability.zookeeper.client.acl: open > high-availability.zookeeper.path.root: /flink > #============================================================================== > # Fault tolerance、checkpointing and state backends 容错能力、检查点和状态后端 > #============================================================================== > state.backend: rocksdb > #选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异, > #而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小, > #而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项 > state.backend.incremental: true > #是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复 > state.backend.local-recovery: true > #RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB” > state.backend.rocksdb.block.cache-size: 268435456 > #这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB > state.backend.rocksdb.timer-service.factory: HEAP > # Directory for checkpoints filesystem, when using any of the default bundled > # state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录 > state.checkpoints.dir: hdfs:///flink/flink-checkpoints > # Default target directory for savepoints, optional. > #保存点的默认目录。由状态后端用于将保存点写入文件系统 > state.savepoints.dir: hdfs:///flink/flink-savepoints > # 要保留的最大已完成检查点数 > state.checkpoints.num-retained: 3 > #此选项指定作业计算如何从任务失败中恢复。可接受的值为: > #'full':重新启动所有任务以恢复作业。 > #“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。 > jobmanager.execution.failover-strategy: region > #============================================================================== > # Advanced > #============================================================================== > > # Override the directories for temporary files. If not specified, the > # system-specific Java temporary directory (java.io.tmpdir property) is taken. > # > # For framework setups on Yarn or Mesos, Flink will automatically pick up the > # containers' temp directories without any need for configuration. > # > # Add a delimited list for multiple directories, using the system directory > # delimiter (colon ':' on unix) or a comma, e.g.: > # /data1/tmp:/data2/tmp:/data3/tmp > # > # Note: Each directory entry is read from and written to by a different I/O > # thread. You can include the same directory multiple times in order to create > # multiple I/O threads against that directory. This is for example relevant for > # high-throughput RAIDs. > # > # io.tmp.dirs: /tmp > > # The classloading resolve order. Possible values are 'child-first' (Flink's default) > # and 'parent-first' (Java's default). > # > # Child first classloading allows users to use different dependency/library > # versions in their application than those in the classpath. Switching back > # to 'parent-first' may help with debugging dependency issues. > # > # classloader.resolve-order: child-first > > # The amount of memory going to the network stack. These numbers usually need > # no tuning. Adjusting them may be necessary in case of an "Insufficient number > # of network buffers" error. The default min is 64MB, the default max is 1GB. > # > # taskmanager.memory.network.fraction: 0.1 > # taskmanager.memory.network.min: 64mb > # taskmanager.memory.network.max: 1gb > > > #============================================================================== > # YARN Configuration > #============================================================================== > #ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。 > #重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接 > yarn.application-attempts: 10 > #yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects% > #yarn.maximum-failed-containers: 100 > #yarn.tags: flink > > #============================================================================== > # HistoryServer > #============================================================================== > heartbeat.timeout: 1800000 > > > 请教的问题: > > 通过 ./bin/flink run \ > -d -t yarn-per-job \ > -yjm 1536 \ > -ytm 3072 \ > -yD jobmanager.memory.process.size=1.5GB \ > -yD taskmanager.memory.process.size=3GB \ > -yD heartbeat.timeout=1800000 \ > /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar > 这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗? > > > 祝好! > 刘海 > > > > > 刘海 > [hidden email] > 签名由 网易邮箱大师 定制 |
是我本地服务器的路径,需要在三个节点上都上传这个jar包吗?
放在 /opt/flink-1.12.0/examples目录下了 | | 刘海 | | [hidden email] | 签名由网易邮箱大师定制 在2021年1月18日 10:38,Yangze Guo<[hidden email]> 写道: 请问这个路径是你本地的路径么?需要client端能根据这个路径找到jar包 Best, Yangze Guo On Mon, Jan 18, 2021 at 10:34 AM 刘海 <[hidden email]> wrote: 你好 根据你的建议我试了一下 将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D heartbeat.timeout=1800000 /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar 结果出现找不到jar包的异常: org.apache.flink.client.cli.CliArgsException: Could not get job jar and dependencies from JAR file: JAR file does not exist: 1536 at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181] at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) [hadoop-common-3.0.0-cdh6.3.2.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) [flink-dist_2.11-1.12.0.jar:1.12.0] Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536 at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 8 more | | 刘海 | | [hidden email] | 签名由网易邮箱大师定制 在2021年1月18日 10:12,Yangze Guo<[hidden email]> 写道: Hi, 请使用 -D -tm -jm 不需要加y前缀 Best, Yangze Guo Best, Yangze Guo On Mon, Jan 18, 2021 at 9:19 AM 刘海 <[hidden email]> wrote: 刘海 [hidden email] 签名由 网易邮箱大师 定制 在2021年1月18日 09:15,刘海<[hidden email]> 写道: Hi Dear All, 请教各位一个问题,下面是我的集群配置: 1、我现在使用的是flink1.12版本; 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群; 3、flink运行模式:Per-Job Cluster on yarn(三个节点,没每个节点48核64G内存); 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样: #============================================================================== # Common 通用设置选项 #============================================================================== jobmanager.rpc.address: cdh1 # The RPC port where the JobManager is reachable. jobmanager.rpc.port: 6123 # The total process memory size for the JobManager. # # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. jobmanager.memory.process.size: 2048m # The total process memory size for the TaskManager. # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. taskmanager.memory.process.size: 6144m # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. # taskmanager.memory.flink.size: 1280m # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. #TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助 #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销) taskmanager.numberOfTaskSlots: 1 # The parallelism used for programs that did not specify and other parallelism. #当未在任何地方指定并行度时使用的默认并行性(默认值:1) parallelism.default: 1 #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost #taskmanager.host: 0.0.0.0 # The default file system scheme and authority. # By default file paths without scheme are interpreted relative to the local # root file system 'file:///'. Use this to override the default and interpret # relative paths relative to a different file system, # for example 'hdfs://mynamenode:12345' # # fs.default-scheme #============================================================================== # High Availability #============================================================================== # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. high-availability: zookeeper # The path where metadata for master recovery is persisted. While ZooKeeper stores # the small ground truth for checkpoint and leader election, this location stores # the larger objects, like persisted dataflow graphs. # Must be a durable file system that is accessible from all nodes # (like HDFS, S3, Ceph, nfs, ...) high-availability.storageDir: hdfs:///flink/ha/ # The list of ZooKeeper quorum peers that coordinate the high-availability # setup. This must be a list of the form: # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 high-availability.zookeeper.client.acl: open high-availability.zookeeper.path.root: /flink #============================================================================== # Fault tolerance、checkpointing and state backends 容错能力、检查点和状态后端 #============================================================================== state.backend: rocksdb #选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异, #而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小, #而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项 state.backend.incremental: true #是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复 state.backend.local-recovery: true #RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB” state.backend.rocksdb.block.cache-size: 268435456 #这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB state.backend.rocksdb.timer-service.factory: HEAP # Directory for checkpoints filesystem, when using any of the default bundled # state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录 state.checkpoints.dir: hdfs:///flink/flink-checkpoints # Default target directory for savepoints, optional. #保存点的默认目录。由状态后端用于将保存点写入文件系统 state.savepoints.dir: hdfs:///flink/flink-savepoints # 要保留的最大已完成检查点数 state.checkpoints.num-retained: 3 #此选项指定作业计算如何从任务失败中恢复。可接受的值为: #'full':重新启动所有任务以恢复作业。 #“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。 jobmanager.execution.failover-strategy: region #============================================================================== # Advanced #============================================================================== # Override the directories for temporary files. If not specified, the # system-specific Java temporary directory (java.io.tmpdir property) is taken. # # For framework setups on Yarn or Mesos, Flink will automatically pick up the # containers' temp directories without any need for configuration. # # Add a delimited list for multiple directories, using the system directory # delimiter (colon ':' on unix) or a comma, e.g.: # /data1/tmp:/data2/tmp:/data3/tmp # # Note: Each directory entry is read from and written to by a different I/O # thread. You can include the same directory multiple times in order to create # multiple I/O threads against that directory. This is for example relevant for # high-throughput RAIDs. # # io.tmp.dirs: /tmp # The classloading resolve order. Possible values are 'child-first' (Flink's default) # and 'parent-first' (Java's default). # # Child first classloading allows users to use different dependency/library # versions in their application than those in the classpath. Switching back # to 'parent-first' may help with debugging dependency issues. # # classloader.resolve-order: child-first # The amount of memory going to the network stack. These numbers usually need # no tuning. Adjusting them may be necessary in case of an "Insufficient number # of network buffers" error. The default min is 64MB, the default max is 1GB. # # taskmanager.memory.network.fraction: 0.1 # taskmanager.memory.network.min: 64mb # taskmanager.memory.network.max: 1gb #============================================================================== # YARN Configuration #============================================================================== #ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。 #重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接 yarn.application-attempts: 10 #yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects% #yarn.maximum-failed-containers: 100 #yarn.tags: flink #============================================================================== # HistoryServer #============================================================================== heartbeat.timeout: 1800000 请教的问题: 通过 ./bin/flink run \ -d -t yarn-per-job \ -yjm 1536 \ -ytm 3072 \ -yD jobmanager.memory.process.size=1.5GB \ -yD taskmanager.memory.process.size=3GB \ -yD heartbeat.timeout=1800000 \ /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar 这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗? 祝好! 刘海 刘海 [hidden email] 签名由 网易邮箱大师 定制 |
还有一个问题,我已经有一个job在运行了,当我再次提交一个job运行的时候输出下面这些信息,去yarn查看发现job并未启动起来,有遇到过这个现象吗?
[root@cdh1 flink-1.12.0]# ./bin/flink run -d -t yarn-per-job -D jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D heartbeat.timeout=1800000 /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] [root@cdh1 flink-1.12.0]# | | 刘海 | | [hidden email] | 签名由网易邮箱大师定制 在2021年1月18日 10:42,刘海<[hidden email]> 写道: 是我本地服务器的路径,需要在三个节点上都上传这个jar包吗? 放在 /opt/flink-1.12.0/examples目录下了 | | 刘海 | | [hidden email] | 签名由网易邮箱大师定制 在2021年1月18日 10:38,Yangze Guo<[hidden email]> 写道: 请问这个路径是你本地的路径么?需要client端能根据这个路径找到jar包 Best, Yangze Guo On Mon, Jan 18, 2021 at 10:34 AM 刘海 <[hidden email]> wrote: 你好 根据你的建议我试了一下 将提交命令改为: ./bin/flink run -d -t yarn-per-job -tm 1536 -jm 3072 -D jobmanager.memory.process.size=1.5GB -D taskmanager.memory.process.size=3GB -D heartbeat.timeout=1800000 /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar jar包我使用了一个绝对路径: /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.1.jar 结果出现找不到jar包的异常: org.apache.flink.client.cli.CliArgsException: Could not get job jar and dependencies from JAR file: JAR file does not exist: 1536 at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:259) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181] at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) [hadoop-common-3.0.0-cdh6.3.2.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) [flink-dist_2.11-1.12.0.jar:1.12.0] Caused by: java.io.FileNotFoundException: JAR file does not exist: 1536 at org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:793) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.getJobJarAndDependencies(CliFrontend.java:256) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 8 more | | 刘海 | | [hidden email] | 签名由网易邮箱大师定制 在2021年1月18日 10:12,Yangze Guo<[hidden email]> 写道: Hi, 请使用 -D -tm -jm 不需要加y前缀 Best, Yangze Guo Best, Yangze Guo On Mon, Jan 18, 2021 at 9:19 AM 刘海 <[hidden email]> wrote: 刘海 [hidden email] 签名由 网易邮箱大师 定制 在2021年1月18日 09:15,刘海<[hidden email]> 写道: Hi Dear All, 请教各位一个问题,下面是我的集群配置: 1、我现在使用的是flink1.12版本; 2、基于CDH6.3.2搭建的hadoop三个节点的集群,使用CDH自带的yarn集群; 3、flink运行模式:Per-Job Cluster on yarn(三个节点,没每个节点48核64G内存); 4、以下是我三个节点的 flink-conf.yaml 的配置,三个flink节点除了jobmanager.rpc.address不同外其它配置都一样: #============================================================================== # Common 通用设置选项 #============================================================================== jobmanager.rpc.address: cdh1 # The RPC port where the JobManager is reachable. jobmanager.rpc.port: 6123 # The total process memory size for the JobManager. # # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. jobmanager.memory.process.size: 2048m # The total process memory size for the TaskManager. # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. taskmanager.memory.process.size: 6144m # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. # taskmanager.memory.flink.size: 1280m # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. #TaskManager提供的插槽数(默认值:1)。每个插槽可以执行一项任务或管道。TaskManager中具有多个插槽可以帮助 #分摊跨并行任务或管道的某些恒定开销(JVM,应用程序库或网络连接的开销) taskmanager.numberOfTaskSlots: 1 # The parallelism used for programs that did not specify and other parallelism. #当未在任何地方指定并行度时使用的默认并行性(默认值:1) parallelism.default: 1 #添加如下配置,指定taskmananger的地址,如果是单机部署,指定localhost #taskmanager.host: 0.0.0.0 # The default file system scheme and authority. # By default file paths without scheme are interpreted relative to the local # root file system 'file:///'. Use this to override the default and interpret # relative paths relative to a different file system, # for example 'hdfs://mynamenode:12345' # # fs.default-scheme #============================================================================== # High Availability #============================================================================== # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. high-availability: zookeeper # The path where metadata for master recovery is persisted. While ZooKeeper stores # the small ground truth for checkpoint and leader election, this location stores # the larger objects, like persisted dataflow graphs. # Must be a durable file system that is accessible from all nodes # (like HDFS, S3, Ceph, nfs, ...) high-availability.storageDir: hdfs:///flink/ha/ # The list of ZooKeeper quorum peers that coordinate the high-availability # setup. This must be a list of the form: # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) high-availability.zookeeper.quorum: cdh1:2181,cdh2:2181,cdh3:2181 high-availability.zookeeper.client.acl: open high-availability.zookeeper.path.root: /flink #============================================================================== # Fault tolerance、checkpointing and state backends 容错能力、检查点和状态后端 #============================================================================== state.backend: rocksdb #选择状态后端是否应创建增量检查点默认false,如果可能对于增量检查点,仅存储与前一个检查点的差异, #而不存储完整的检查点状态。启用后,显示在Web UI中或从rest API获取的状态大小仅代表增量检查点大小, #而不是完整的检查点大小。某些状态后端可能不支持增量检查点,因此会忽略此选项 state.backend.incremental: true #是否为状态后端配置本地恢复。默认情况下,本地恢复处于禁用状态。本地恢复当前仅涵盖键控状态后端。当前,MemoryStateBackend不支持本地恢复 state.backend.local-recovery: true #RocksDB中数据块的缓存数量,单位比特。RocksDB的默认块缓存大小为“ 8MB” state.backend.rocksdb.block.cache-size: 268435456 #这确定了计时器服务状态实现的工厂。对于基于RocksDB的实现,选项可以是HEAP(基于堆)或ROCKSDB state.backend.rocksdb.timer-service.factory: HEAP # Directory for checkpoints filesystem, when using any of the default bundled # state backends. 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录 state.checkpoints.dir: hdfs:///flink/flink-checkpoints # Default target directory for savepoints, optional. #保存点的默认目录。由状态后端用于将保存点写入文件系统 state.savepoints.dir: hdfs:///flink/flink-savepoints # 要保留的最大已完成检查点数 state.checkpoints.num-retained: 3 #此选项指定作业计算如何从任务失败中恢复。可接受的值为: #'full':重新启动所有任务以恢复作业。 #“region”:重新启动可能受任务故障影响的所有任务。可以在此处找到更多详细信息。 jobmanager.execution.failover-strategy: region #============================================================================== # Advanced #============================================================================== # Override the directories for temporary files. If not specified, the # system-specific Java temporary directory (java.io.tmpdir property) is taken. # # For framework setups on Yarn or Mesos, Flink will automatically pick up the # containers' temp directories without any need for configuration. # # Add a delimited list for multiple directories, using the system directory # delimiter (colon ':' on unix) or a comma, e.g.: # /data1/tmp:/data2/tmp:/data3/tmp # # Note: Each directory entry is read from and written to by a different I/O # thread. You can include the same directory multiple times in order to create # multiple I/O threads against that directory. This is for example relevant for # high-throughput RAIDs. # # io.tmp.dirs: /tmp # The classloading resolve order. Possible values are 'child-first' (Flink's default) # and 'parent-first' (Java's default). # # Child first classloading allows users to use different dependency/library # versions in their application than those in the classpath. Switching back # to 'parent-first' may help with debugging dependency issues. # # classloader.resolve-order: child-first # The amount of memory going to the network stack. These numbers usually need # no tuning. Adjusting them may be necessary in case of an "Insufficient number # of network buffers" error. The default min is 64MB, the default max is 1GB. # # taskmanager.memory.network.fraction: 0.1 # taskmanager.memory.network.min: 64mb # taskmanager.memory.network.max: 1gb #============================================================================== # YARN Configuration #============================================================================== #ApplicationMaster重新启动的次数。默认情况下,该值将设置为1。如果启用了高可用性,则默认值将为2。 #重新启动次数也受YARN限制(通过yarn.resourcemanager.am.max-attempts配置)。请注意,整个Flink群集将重新启动,并且YARN Client将失去连接 yarn.application-attempts: 10 #yarn.container-start-command-template: %java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects% #yarn.maximum-failed-containers: 100 #yarn.tags: flink #============================================================================== # HistoryServer #============================================================================== heartbeat.timeout: 1800000 请教的问题: 通过 ./bin/flink run \ -d -t yarn-per-job \ -yjm 1536 \ -ytm 3072 \ -yD jobmanager.memory.process.size=1.5GB \ -yD taskmanager.memory.process.size=3GB \ -yD heartbeat.timeout=1800000 \ /opt/flink-1.12.0/examples/myProject/bi-wxfx-fqd-1.0.0.jar 这个命令提交运行flink job之后 命令中指定的内存参数没有被使用,在flink webUI里面观察到的使用内存是 flink-conf.yaml 里面配置的大小,cli命令指定的并未起作用,是我使用的不正确吗? 祝好! 刘海 刘海 [hidden email] 签名由 网易邮箱大师 定制 |
Free forum by Nabble | Edit this page |