支持flink.yarn.jars 参数

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

支持flink.yarn.jars 参数

melin
在spark中有一个spark.yarn.jars 参数,作业依赖jar 直接放在hdfs上,避免从本地上传jar,在分发,加快启动速度。

YarnClusterDescriptor.java

// upload and register ship files
String systemJarHdfsDir =
configuration.getString("stream.flink.system.jars.dir", "");
List<String> systemClassPaths = findHdfsJars(fs, systemJarHdfsDir, paths,
  localResources, envShipFileList);

String userJars = configuration.getString("stream.flink.use.jars", "");
List<String> userClassPaths;
if (userJars != null && !"".equals(userJars)) {
  userClassPaths = registerUserJars(fs, userJars.split(","), paths,
    localResources, envShipFileList);
} else {
  userClassPaths = Collections.emptyList();
}

if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
  systemClassPaths.addAll(userClassPaths);
}

// normalize classpath by sorting
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);

// classpath assembler
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
  for (String userClassPath : userClassPaths) {
    classPathBuilder.append(userClassPath).append(File.pathSeparator);
  }
}
for (String classPath : systemClassPaths) {
  classPathBuilder.append(classPath).append(File.pathSeparator);
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
  for (String userClassPath : userClassPaths) {
    classPathBuilder.append(userClassPath).append(File.pathSeparator);
  }
}

// Setup jar for ApplicationMaster
Path remotePathJar = setupFlinkJar("flink.jar", fs, flinkJarPath,
localResources);
Reply | Threaded
Open this post in threaded view
|

Re: 支持flink.yarn.jars 参数

tison
有这个想法,目前腾讯内部已经实现了相关功能,我记得 Yang Wang(in cc) 在阿里也做了类似的功能,这个要做干净可能需要连着跟
YarnClusterDescriptor 的代码都整理一下。确实也看到这个需求常常被提起,尽量在 1.11 里面实现吧。

你也可以再详细描述下行为或者由你实现社区这边帮忙 review 呀,我不太记得有没有 JIRA 了,你可以找找或者直接建一个。

Best,
tison.


melin li <[hidden email]> 于2020年1月20日周一 下午4:59写道:

> 在spark中有一个spark.yarn.jars 参数,作业依赖jar 直接放在hdfs上,避免从本地上传jar,在分发,加快启动速度。
>
> YarnClusterDescriptor.java
>
> // upload and register ship files
> String systemJarHdfsDir =
> configuration.getString("stream.flink.system.jars.dir", "");
> List<String> systemClassPaths = findHdfsJars(fs, systemJarHdfsDir, paths,
>   localResources, envShipFileList);
>
> String userJars = configuration.getString("stream.flink.use.jars", "");
> List<String> userClassPaths;
> if (userJars != null && !"".equals(userJars)) {
>   userClassPaths = registerUserJars(fs, userJars.split(","), paths,
>     localResources, envShipFileList);
> } else {
>   userClassPaths = Collections.emptyList();
> }
>
> if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
>   systemClassPaths.addAll(userClassPaths);
> }
>
> // normalize classpath by sorting
> Collections.sort(systemClassPaths);
> Collections.sort(userClassPaths);
>
> // classpath assembler
> StringBuilder classPathBuilder = new StringBuilder();
> if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
>   for (String userClassPath : userClassPaths) {
>     classPathBuilder.append(userClassPath).append(File.pathSeparator);
>   }
> }
> for (String classPath : systemClassPaths) {
>   classPathBuilder.append(classPath).append(File.pathSeparator);
> }
> if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
>   for (String userClassPath : userClassPaths) {
>     classPathBuilder.append(userClassPath).append(File.pathSeparator);
>   }
> }
>
> // Setup jar for ApplicationMaster
> Path remotePathJar = setupFlinkJar("flink.jar", fs, flinkJarPath,
> localResources);
>