Skip to content

Commit 4ee5b3e

Browse files
[GOBBLIN-2243]Add Fallback dir for Jar caching (#4160)
* added fallback caching dir * addressed comments * refactor * resolved comments * refactor
1 parent 6619cb1 commit 4ee5b3e

File tree

7 files changed

+435
-16
lines changed

7 files changed

+435
-16
lines changed

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo contain
394394
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId);
395395
// Used for -SNAPSHOT versions of jars
396396
Path containerJarsUnsharedDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
397-
Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
397+
Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir;
398398
Path containerJarsCachedDir = new Path(jarCacheDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
399399
LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir);
400400
LOGGER.info("Container execution-private jars root dir: " + containerJarsUnsharedDir);

gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ ApplicationId setupAndSubmitApplication() throws IOException, YarnException, Int
608608
amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(), resource.getMemory())));
609609

610610
if (this.jarCacheEnabled) {
611-
Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(this.config);
611+
Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs);
612612
// Retain at least the current and last month's jars to handle executions running for ~30 days max
613613
boolean cleanedSuccessfully = YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs);
614614
if (!cleanedSuccessfully) {
@@ -675,7 +675,7 @@ private Resource prepareContainerResource(GetNewApplicationResponse newApplicati
675675

676676
private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId applicationId) throws IOException {
677677
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, applicationId.toString());
678-
Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
678+
Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir;
679679

680680
Path appMasterWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
681681
Path appMasterJarsCacheDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
@@ -730,7 +730,7 @@ private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId appl
730730
private void addContainerLocalResources(ApplicationId applicationId) throws IOException {
731731
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName,
732732
applicationId.toString());
733-
Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
733+
Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir;
734734
Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
735735
Path containerJarsRootDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
736736
LOGGER.info("Configured Container work directory to: {}", containerWorkDir);

gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ public class GobblinYarnConfigurationKeys {
5555

5656
public static final String JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.dir";
5757

58+
public static final String JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.root.dir";
59+
60+
public static final String FALLBACK_JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.fallback.root.dir";
61+
62+
public static final String JAR_CACHE_SUFFIX = GOBBLIN_YARN_PREFIX + "jar.cache.suffix";
63+
5864
public static final String YARN_APPLICATION_LIB_JAR_LIST = GOBBLIN_YARN_PREFIX + "lib.jar.list";
5965

6066
/**
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.yarn;
19+
20+
import java.io.IOException;
21+
22+
import org.apache.hadoop.fs.FileSystem;
23+
import org.apache.hadoop.fs.Path;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import com.google.common.annotations.VisibleForTesting;
28+
import com.typesafe.config.Config;
29+
30+
import org.apache.gobblin.util.ConfigUtils;
31+
32+
33+
/**
34+
* Utility class for resolving the jar cache directory path by validating filesystem on path existence and applying fallback logic.
35+
*
36+
* <p>Resolution logic:</p>
37+
* <ol>
38+
* <li>If JAR_CACHE_DIR is explicitly configured, uses it as-is (for backward compatibility)</li>
39+
* <li>Otherwise, validates JAR_CACHE_ROOT_DIR exists on filesystem</li>
40+
* <li>If not found, tries FALLBACK_JAR_CACHE_ROOT_DIR</li>
41+
* <li>Combines validated root with JAR_CACHE_SUFFIX (or default suffix) to form final path</li>
42+
* <li>If no valid root found, throws IOException</li>
43+
* </ol>
44+
*/
45+
public class JarCachePathResolver {
46+
private static final Logger LOGGER = LoggerFactory.getLogger(JarCachePathResolver.class);
47+
// Note: Trailing slash will be normalized away by Hadoop Path
48+
private static final String DEFAULT_JAR_CACHE_SUFFIX = ".gobblinCache/gobblin-temporal";
49+
50+
// Private constructor to prevent instantiation
51+
private JarCachePathResolver() {
52+
}
53+
54+
/**
55+
* Resolves the jar cache directory path, applying validation and fallback logic.
56+
*
57+
* @param config the configuration
58+
* @param fs the filesystem to use for validation
59+
* @return the resolved jar cache directory path
60+
* @throws IOException if filesystem operations fail or no valid root directory is found
61+
*/
62+
public static Path resolveJarCachePath(Config config, FileSystem fs) throws IOException {
63+
// If JAR_CACHE_DIR is explicitly set, use it as-is
64+
if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)) {
65+
String explicitCacheDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR);
66+
LOGGER.info("Using explicitly configured JAR_CACHE_DIR: {}", explicitCacheDir);
67+
return new Path(explicitCacheDir);
68+
}
69+
70+
// Get suffix from config, or use default if not configured or empty
71+
String suffix = ConfigUtils.getString(config, GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, "");
72+
if (suffix == null || suffix.trim().isEmpty()) {
73+
LOGGER.info("JAR_CACHE_SUFFIX not configured or empty, using default: {}", DEFAULT_JAR_CACHE_SUFFIX);
74+
suffix = DEFAULT_JAR_CACHE_SUFFIX;
75+
}
76+
77+
// Try primary root directory
78+
if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR)) {
79+
String rootDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR);
80+
Path resolvedPath = validateAndComputePath(fs, rootDir, suffix, GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR);
81+
if (resolvedPath != null) {
82+
return resolvedPath;
83+
}
84+
}
85+
86+
// Try fallback root directory
87+
if (config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR)) {
88+
String fallbackRootDir = config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR);
89+
Path resolvedPath = validateAndComputePath(fs, fallbackRootDir, suffix, GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR);
90+
if (resolvedPath != null) {
91+
return resolvedPath;
92+
}
93+
}
94+
95+
// No valid root directory found - fail
96+
throw new IOException("No valid jar cache root directory found. Please configure "
97+
+ GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR + " or "
98+
+ GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR
99+
+ " with a valid directory path, or explicitly set "
100+
+ GobblinYarnConfigurationKeys.JAR_CACHE_DIR);
101+
}
102+
103+
/**
104+
* Validates if the root directory exists and computes the full path with suffix.
105+
*
106+
* @param fs the filesystem to check
107+
* @param rootDir the root directory to validate
108+
* @param suffix the suffix to append
109+
* @param configName the config name for logging
110+
* @return the computed path if valid, null otherwise
111+
* @throws IOException if filesystem operations fail
112+
*/
113+
@VisibleForTesting
114+
static Path validateAndComputePath(FileSystem fs, String rootDir, String suffix, String configName) throws IOException {
115+
Path rootPath = new Path(rootDir);
116+
if (fs.exists(rootPath)) {
117+
// Strip leading '/' from suffix to ensure proper concatenation
118+
// Otherwise, Hadoop Path treats it as absolute path and ignores the parent
119+
String normalizedSuffix = suffix.startsWith("/") ? suffix.substring(1) : suffix;
120+
Path fullPath = new Path(rootPath, normalizedSuffix);
121+
LOGGER.info("{} exists: {}, resolved JAR_CACHE_DIR to: {}", configName, rootDir, fullPath);
122+
return fullPath;
123+
}
124+
LOGGER.warn("Configured {} does not exist: {}", configName, rootDir);
125+
return null;
126+
}
127+
128+
}
129+

gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -209,15 +209,19 @@ public static void setYarnClassPath(Config config, Configuration yarnConfigurati
209209
/**
210210
* Calculate the path of a jar cache on HDFS, which is retained on a monthly basis.
211211
* Should be used in conjunction with {@link #retainKLatestJarCachePaths(Path, int, FileSystem)}. to clean up the cache on a periodic basis
212-
* @param config
213-
* @return
214-
* @throws IOException
212+
* @param config the configuration
213+
* @param fs the filesystem to use for validation
214+
* @return the monthly jar cache path
215+
* @throws IOException if filesystem operations fail
215216
*/
216-
public static Path calculatePerMonthJarCachePath(Config config) throws IOException {
217-
Path jarsCacheDirMonthly = new Path(config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR));
218-
String monthSuffix = new SimpleDateFormat("yyyy-MM").format(config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY));
219-
return new Path(jarsCacheDirMonthly, monthSuffix);
220-
217+
public static Path calculatePerMonthJarCachePath(Config config, FileSystem fs) throws IOException {
218+
// Use JarCachePathResolver to resolve the base jar cache directory
219+
Path baseCacheDir = JarCachePathResolver.resolveJarCachePath(config, fs);
220+
221+
// Append monthly suffix
222+
String monthSuffix = new SimpleDateFormat("yyyy-MM").format(
223+
config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY));
224+
return new Path(baseCacheDir, monthSuffix);
221225
}
222226

223227
/**

0 commit comments

Comments
 (0)