Fw:flink on k8s 提交job时如何指定taskmanager的个数

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Fw:flink on k8s 提交job时如何指定taskmanager的个数

dtygfn@163.com




2021-01-08 09:47:31,636 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 9 of job 5e953fb772f9030c728e7c0498555ae2 expired before completing.
2021-01-08 09:47:31,637 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
    at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_265]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_265]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_265]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_265]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_265]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_265]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]


-------- 转发邮件信息 --------
发件人:"旧城以西" <[hidden email]>
发送日期:2021-01-08 10:12:45
收件人:"[hidden email]" <[hidden email]>
主题:flink on k8s 提交job时如何指定taskmanager的个数
各位大佬好:
     目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。

flink taskmanager的配置
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: 1
high-availability.storageDir: file:///tmp
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
classloader.resolve-order: parent-first
blob.server.port: 6124
query.server.port: 6125

flink jobmanger 配置

jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: 1
high-availability.storageDir: file:///data/ha
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
high-availability.jobmanager.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: file:///data/state
web.upload.dir: /data/upload
blob.server.port: 6124
metrics.internal.query-service.port: 6125
classloader.resolve-order: parent-first
~

flink-taskmanager.yml

---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: kafka
spec:
  replicas: 3
  selector:
    matchLabels:
      app: flink-taskmanager
  template:
    metadata:
      labels:
        app: flink-taskmanager
    spec:
      containers:
      - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
        name: flink-taskmanager
        ports:
        - containerPort: 8081
          name: flink-task
        workingDir: /opt/flink
        args:
        - taskmanager
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager-svc
        - name: TZ
          value: "Asia/Shanghai"
        resources:
          requests:
            cpu: 1200m
            memory: 1024Mi
          limits:
            cpu: 2000m
            memory: 2048Mi
        volumeMounts:
        - name: flink-taskmanager-pv
          mountPath: /opt/flink/conf
        - name: flink-jobmanager-lib-pv
          mountPath: /opt/flink/lib
      volumes:
      - name: flink-taskmanager-pv
        persistentVolumeClaim:
          claimName: flink-taskmanager-pvc
      - name: flink-jobmanager-lib-pv
        persistentVolumeClaim:
          claimName: flink-jobmanager-lib-pvc
      imagePullSecrets:
        - name: registrysecret

<img src="file:///C:\Users\fangn\AppData\Local\Temp\SGPicFaceTpBq\568\0E785757.png">
flink 代码
        // 获取flink 代码运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 指定checkpoint 的时间
env.enableCheckpointing(1000*60);
// 指定生成水印的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, org.apache.flink.api.common.time.Time.minutes(1)));