在spark中有一个spark.yarn.jars 参数,作业依赖jar 直接放在hdfs上,避免从本地上传jar,在分发,加快启动速度。
YarnClusterDescriptor.java // upload and register ship files String systemJarHdfsDir = configuration.getString("stream.flink.system.jars.dir", ""); List<String> systemClassPaths = findHdfsJars(fs, systemJarHdfsDir, paths, localResources, envShipFileList); String userJars = configuration.getString("stream.flink.use.jars", ""); List<String> userClassPaths; if (userJars != null && !"".equals(userJars)) { userClassPaths = registerUserJars(fs, userJars.split(","), paths, localResources, envShipFileList); } else { userClassPaths = Collections.emptyList(); } if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) { systemClassPaths.addAll(userClassPaths); } // normalize classpath by sorting Collections.sort(systemClassPaths); Collections.sort(userClassPaths); // classpath assembler StringBuilder classPathBuilder = new StringBuilder(); if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) { for (String userClassPath : userClassPaths) { classPathBuilder.append(userClassPath).append(File.pathSeparator); } } for (String classPath : systemClassPaths) { classPathBuilder.append(classPath).append(File.pathSeparator); } if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) { for (String userClassPath : userClassPaths) { classPathBuilder.append(userClassPath).append(File.pathSeparator); } } // Setup jar for ApplicationMaster Path remotePathJar = setupFlinkJar("flink.jar", fs, flinkJarPath, localResources); |
有这个想法,目前腾讯内部已经实现了相关功能,我记得 Yang Wang(in cc) 在阿里也做了类似的功能,这个要做干净可能需要连着跟
YarnClusterDescriptor 的代码都整理一下。确实也看到这个需求常常被提起,尽量在 1.11 里面实现吧。 你也可以再详细描述下行为或者由你实现社区这边帮忙 review 呀,我不太记得有没有 JIRA 了,你可以找找或者直接建一个。 Best, tison. melin li <[hidden email]> 于2020年1月20日周一 下午4:59写道: > 在spark中有一个spark.yarn.jars 参数,作业依赖jar 直接放在hdfs上,避免从本地上传jar,在分发,加快启动速度。 > > YarnClusterDescriptor.java > > // upload and register ship files > String systemJarHdfsDir = > configuration.getString("stream.flink.system.jars.dir", ""); > List<String> systemClassPaths = findHdfsJars(fs, systemJarHdfsDir, paths, > localResources, envShipFileList); > > String userJars = configuration.getString("stream.flink.use.jars", ""); > List<String> userClassPaths; > if (userJars != null && !"".equals(userJars)) { > userClassPaths = registerUserJars(fs, userJars.split(","), paths, > localResources, envShipFileList); > } else { > userClassPaths = Collections.emptyList(); > } > > if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) { > systemClassPaths.addAll(userClassPaths); > } > > // normalize classpath by sorting > Collections.sort(systemClassPaths); > Collections.sort(userClassPaths); > > // classpath assembler > StringBuilder classPathBuilder = new StringBuilder(); > if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) { > for (String userClassPath : userClassPaths) { > classPathBuilder.append(userClassPath).append(File.pathSeparator); > } > } > for (String classPath : systemClassPaths) { > classPathBuilder.append(classPath).append(File.pathSeparator); > } > if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) { > for (String userClassPath : userClassPaths) { > classPathBuilder.append(userClassPath).append(File.pathSeparator); > } > } > > // Setup jar for ApplicationMaster > Path remotePathJar = setupFlinkJar("flink.jar", fs, flinkJarPath, > localResources); > |
Free forum by Nabble | Edit this page |