flink on k8s 提交job时如何指定taskmanager的个数

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink on k8s 提交job时如何指定taskmanager的个数

dtygfn@163.com
各位大佬好:
     目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。


flink taskmanager的配置
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: 1
high-availability.storageDir: file:///tmp
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
classloader.resolve-order: parent-first
blob.server.port: 6124
query.server.port: 6125


flink jobmanger 配置


jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: 1
high-availability.storageDir: file:///data/ha
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
high-availability.jobmanager.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: file:///data/state
web.upload.dir: /data/upload
blob.server.port: 6124
metrics.internal.query-service.port: 6125
classloader.resolve-order: parent-first
~


flink-taskmanager.yml


---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: kafka
spec:
  replicas: 3
  selector:
    matchLabels:
      app: flink-taskmanager
  template:
    metadata:
      labels:
        app: flink-taskmanager
    spec:
      containers:
      - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
        name: flink-taskmanager
        ports:
        - containerPort: 8081
          name: flink-task
        workingDir: /opt/flink
        args:
        - taskmanager
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager-svc
        - name: TZ
          value: "Asia/Shanghai"
        resources:
          requests:
            cpu: 1200m
            memory: 1024Mi
          limits:
            cpu: 2000m
            memory: 2048Mi
        volumeMounts:
        - name: flink-taskmanager-pv
          mountPath: /opt/flink/conf
        - name: flink-jobmanager-lib-pv
          mountPath: /opt/flink/lib
      volumes:
      - name: flink-taskmanager-pv
        persistentVolumeClaim:
          claimName: flink-taskmanager-pvc
      - name: flink-jobmanager-lib-pv
        persistentVolumeClaim:
          claimName: flink-jobmanager-lib-pvc
      imagePullSecrets:
        - name: registrysecret


flink 代码
// 获取flink 代码运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 指定checkpoint 的时间
env.enableCheckpointing(1000*60);
// 指定生成水印的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, org.apache.flink.api.common.time.Time.minutes(1)));