各位大佬好:
目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。 flink taskmanager的配置 jobmanager.rpc.address: flink-jobmanager-svc jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.heap.size: 1024m taskmanager.numberOfTaskSlots: 8 parallelism.default: 1 high-availability: zookeeper high-availability.cluster-id: 1 high-availability.storageDir: file:///tmp high-availability.zookeeper.quorum: zookeeper:2181 zookeeper.sasl.disable: true classloader.resolve-order: parent-first blob.server.port: 6124 query.server.port: 6125 flink jobmanger 配置 jobmanager.rpc.address: flink-jobmanager-svc jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.heap.size: 1024m taskmanager.numberOfTaskSlots: 1 parallelism.default: 1 high-availability: zookeeper high-availability.zookeeper.path.root: /flink high-availability.cluster-id: 1 high-availability.storageDir: file:///data/ha high-availability.zookeeper.quorum: zookeeper:2181 zookeeper.sasl.disable: true high-availability.jobmanager.port: 6123 state.backend: filesystem state.backend.fs.checkpointdir: file:///data/state web.upload.dir: /data/upload blob.server.port: 6124 metrics.internal.query-service.port: 6125 classloader.resolve-order: parent-first ~ flink-taskmanager.yml --- apiVersion: apps/v1beta1 kind: Deployment metadata: name: flink-taskmanager namespace: kafka spec: replicas: 3 selector: matchLabels: app: flink-taskmanager template: metadata: labels: app: flink-taskmanager spec: containers: - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8 name: flink-taskmanager ports: - containerPort: 8081 name: flink-task workingDir: /opt/flink args: - taskmanager env: - name: JOB_MANAGER_RPC_ADDRESS value: flink-jobmanager-svc - name: TZ value: "Asia/Shanghai" resources: requests: cpu: 1200m memory: 1024Mi limits: cpu: 2000m memory: 2048Mi volumeMounts: - name: flink-taskmanager-pv mountPath: /opt/flink/conf - name: flink-jobmanager-lib-pv mountPath: /opt/flink/lib volumes: - name: flink-taskmanager-pv persistentVolumeClaim: claimName: flink-taskmanager-pvc - name: flink-jobmanager-lib-pv persistentVolumeClaim: claimName: flink-jobmanager-lib-pvc imagePullSecrets: - name: registrysecret flink 代码 // 获取flink 代码运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 指定checkpoint env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 指定checkpoint 的时间 env.enableCheckpointing(1000*60); // 指定生成水印的时间间隔 env.getConfig().setAutoWatermarkInterval(100); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, org.apache.flink.api.common.time.Time.minutes(1))); |
Free forum by Nabble | Edit this page |