Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo contain
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId);
// Used for -SNAPSHOT versions of jars
Path containerJarsUnsharedDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir;
Path containerJarsCachedDir = new Path(jarCacheDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir);
LOGGER.info("Container execution-private jars root dir: " + containerJarsUnsharedDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ ApplicationId setupAndSubmitApplication() throws IOException, YarnException, Int
amContainerLaunchContext.setCommands(Lists.newArrayList(buildApplicationMasterCommand(applicationId.toString(), resource.getMemory())));

if (this.jarCacheEnabled) {
Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(this.config);
Path jarCachePath = YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs);
// Retain at least the current and last month's jars to handle executions running for ~30 days max
boolean cleanedSuccessfully = YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs);
if (!cleanedSuccessfully) {
Expand Down Expand Up @@ -675,7 +675,7 @@ private Resource prepareContainerResource(GetNewApplicationResponse newApplicati

private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId applicationId) throws IOException {
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, applicationId.toString());
Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir;

Path appMasterWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
Path appMasterJarsCacheDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
Expand Down Expand Up @@ -730,7 +730,7 @@ private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId appl
private void addContainerLocalResources(ApplicationId applicationId) throws IOException {
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName,
applicationId.toString());
Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
Path jarsRootDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config, this.fs) : appWorkDir;
Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
Path containerJarsRootDir = new Path(jarsRootDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
LOGGER.info("Configured Container work directory to: {}", containerWorkDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public class GobblinYarnConfigurationKeys {

public static final String JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.dir";

public static final String JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.root.dir";

public static final String FALLBACK_JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.fallback.root.dir";

public static final String JAR_CACHE_SUFFIX = GOBBLIN_YARN_PREFIX + "jar.cache.suffix";

public static final String YARN_APPLICATION_LIB_JAR_LIST = GOBBLIN_YARN_PREFIX + "lib.jar.list";

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.yarn;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;

import org.apache.gobblin.util.ConfigUtils;


/**
* Utility class for resolving the jar cache directory path by validating filesystem on path existence and applying fallback logic.
*
* <p>Resolution logic:</p>
* <ol>
* <li>If JAR_CACHE_DIR is explicitly configured, uses it as-is (for backward compatibility)</li>
* <li>Otherwise, validates JAR_CACHE_ROOT_DIR exists on filesystem</li>
* <li>If not found, tries FALLBACK_JAR_CACHE_ROOT_DIR</li>
* <li>Combines validated root with JAR_CACHE_SUFFIX (or default suffix) to form final path</li>
* <li>If no valid root found, throws IOException</li>
* </ol>
*/
public class JarCachePathResolver {
private static final Logger LOGGER = LoggerFactory.getLogger(JarCachePathResolver.class);
// Note: Trailing slash will be normalized away by Hadoop Path
private static final String DEFAULT_JAR_CACHE_SUFFIX = ".gobblinCache/gobblin-temporal";

// Private constructor to prevent instantiation
private JarCachePathResolver() {
}

/**
* Resolves the jar cache directory path, applying validation and fallback logic.
*
* @param config the configuration
* @param fs the filesystem to use for validation
* @return the resolved jar cache directory path
* @throws IOException if filesystem operations fail or no valid root directory is found
*/
public static Path resolveJarCachePath(Config config, FileSystem fs) throws IOException {
// If JAR_CACHE_DIR is explicitly set, use it as-is
if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_DIR)) {
String explicitCacheDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR);
LOGGER.info("Using explicitly configured JAR_CACHE_DIR: {}", explicitCacheDir);
return new Path(explicitCacheDir);
}

// Get suffix from config, or use default if not configured or empty
String suffix = ConfigUtils.getString(config, GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, "");
if (suffix == null || suffix.trim().isEmpty()) {
LOGGER.info("JAR_CACHE_SUFFIX not configured or empty, using default: {}", DEFAULT_JAR_CACHE_SUFFIX);
suffix = DEFAULT_JAR_CACHE_SUFFIX;
}

// Try primary root directory
if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR)) {
String rootDir = config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR);
Path resolvedPath = validateAndComputePath(fs, rootDir, suffix, GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR);
if (resolvedPath != null) {
return resolvedPath;
}
}

// Try fallback root directory
if (config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR)) {
String fallbackRootDir = config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR);
Path resolvedPath = validateAndComputePath(fs, fallbackRootDir, suffix, GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR);
if (resolvedPath != null) {
return resolvedPath;
}
}

// No valid root directory found - fail
throw new IOException("No valid jar cache root directory found. Please configure "
+ GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR + " or "
+ GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR
+ " with a valid directory path, or explicitly set "
+ GobblinYarnConfigurationKeys.JAR_CACHE_DIR);
}

/**
* Validates if the root directory exists and computes the full path with suffix.
*
* @param fs the filesystem to check
* @param rootDir the root directory to validate
* @param suffix the suffix to append
* @param configName the config name for logging
* @return the computed path if valid, null otherwise
* @throws IOException if filesystem operations fail
*/
@VisibleForTesting
static Path validateAndComputePath(FileSystem fs, String rootDir, String suffix, String configName) throws IOException {
Path rootPath = new Path(rootDir);
if (fs.exists(rootPath)) {
// Strip leading '/' from suffix to ensure proper concatenation
// Otherwise, Hadoop Path treats it as absolute path and ignores the parent
String normalizedSuffix = suffix.startsWith("/") ? suffix.substring(1) : suffix;
Path fullPath = new Path(rootPath, normalizedSuffix);
LOGGER.info("{} exists: {}, resolved JAR_CACHE_DIR to: {}", configName, rootDir, fullPath);
return fullPath;
}
LOGGER.warn("Configured {} does not exist: {}", configName, rootDir);
return null;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,19 @@ public static void setYarnClassPath(Config config, Configuration yarnConfigurati
/**
* Calculate the path of a jar cache on HDFS, which is retained on a monthly basis.
* Should be used in conjunction with {@link #retainKLatestJarCachePaths(Path, int, FileSystem)}. to clean up the cache on a periodic basis
* @param config
* @return
* @throws IOException
* @param config the configuration
* @param fs the filesystem to use for validation
* @return the monthly jar cache path
* @throws IOException if filesystem operations fail
*/
public static Path calculatePerMonthJarCachePath(Config config) throws IOException {
Path jarsCacheDirMonthly = new Path(config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_DIR));
String monthSuffix = new SimpleDateFormat("yyyy-MM").format(config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY));
return new Path(jarsCacheDirMonthly, monthSuffix);

public static Path calculatePerMonthJarCachePath(Config config, FileSystem fs) throws IOException {
// Use JarCachePathResolver to resolve the base jar cache directory
Path baseCacheDir = JarCachePathResolver.resolveJarCachePath(config, fs);

// Append monthly suffix
String monthSuffix = new SimpleDateFormat("yyyy-MM").format(
config.getLong(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY));
return new Path(baseCacheDir, monthSuffix);
}

/**
Expand Down
Loading
Loading