diff --git a/Makefile b/Makefile index 53640acd3d..5d5199170f 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,7 @@ COMPOSE_DIR := local-test KIND_CLUSTER := fusion-cluster +AMORO_LOG_DIR ?= $(CURDIR)/docker/kind/amoro-logs DIST_TAR := $(CURDIR)/dist/target/apache-amoro-0.9-SNAPSHOT-bin.tar.gz RUNTIME_HOME := $(CURDIR)/dist/target/amoro-0.9-SNAPSHOT BIN_HOME := $(CURDIR)/dist/src/main/amoro-bin @@ -118,7 +119,8 @@ stop-deps: start-fusion-docker: @echo "Starting Fusion (Kind cluster + all services)..." - @docker compose -f $(COMPOSE_DIR)/docker-compose.yml --profile prod up -d + @mkdir -p "$(AMORO_LOG_DIR)" + @AMORO_LOG_DIR="$(AMORO_LOG_DIR)" docker compose -f $(COMPOSE_DIR)/docker-compose.yml --profile prod up -d @kind export kubeconfig --name $(KIND_CLUSTER) 2>/dev/null clean-fusion-docker: diff --git a/amoro-ams/pom.xml b/amoro-ams/pom.xml index 5c02eb95fc..590732f6c5 100644 --- a/amoro-ams/pom.xml +++ b/amoro-ams/pom.xml @@ -455,6 +455,11 @@ ${pagehelper.version} + + org.apache.commons + commons-compress + + org.apache.iceberg iceberg-data diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java index e465e65e76..e7eb3c91c5 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java @@ -14,6 +14,8 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Modified by Datazip Inc. in 2026 */ package org.apache.amoro.server.dashboard; @@ -43,6 +45,7 @@ import org.apache.amoro.server.dashboard.controller.ApiTokenController; import org.apache.amoro.server.dashboard.controller.CatalogController; import org.apache.amoro.server.dashboard.controller.HealthCheckController; +import org.apache.amoro.server.dashboard.controller.LogController; import org.apache.amoro.server.dashboard.controller.LoginController; import org.apache.amoro.server.dashboard.controller.OptimizerController; import org.apache.amoro.server.dashboard.controller.OptimizerGroupController; @@ -90,6 +93,7 @@ public class DashboardServer { private final VersionController versionController; private final OverviewController overviewController; private final ApiTokenController apiTokenController; + private final LogController logController; private final PasswdAuthenticationProvider basicAuthProvider; private final TokenAuthenticationProvider jwtAuthProvider; @@ -120,6 +124,7 @@ public DashboardServer( this.overviewController = new OverviewController(manager); APITokenManager apiTokenManager = new APITokenManager(); this.apiTokenController = new ApiTokenController(apiTokenManager); + this.logController = new LogController(); String authType = serviceConfig.get(AmoroManagementConf.HTTP_SERVER_REST_AUTH_TYPE); this.basicAuthProvider = @@ -138,6 +143,7 @@ public DashboardServer( } private volatile String indexHtml = null; + // read index.html content public String getIndexFileContent() { if (indexHtml == null) { @@ -396,6 +402,15 @@ private EndpointGroup apiGroup() { post("/calculate/signature", apiTokenController::calculateSignature); post("/calculate/encryptString", apiTokenController::getEncryptStringFromQueryParam); }); + + // logs api + path( + "/logs", + () -> { + get("/process/{processId}", logController::getProcessLogs); + get("/process/{processId}/download", logController::downloadProcessLogs); + get("/process/{processId}/file/{fileId}", logController::downloadLogFile); + }); }; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/LogController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/LogController.java new file mode 100644 index 0000000000..50836d61b6 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/LogController.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Datazip Inc. in 2026 + */ + +package org.apache.amoro.server.dashboard.controller; + +import io.javalin.http.Context; +import io.javalin.http.HttpCode; +import org.apache.amoro.server.dashboard.response.OkResponse; +import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class LogController { + private static final Logger LOG = LoggerFactory.getLogger(LogController.class); + private static final String LOG_BASE_DIR; + + static { + String envLogDir = System.getenv("LOG_DIR"); + LOG_BASE_DIR = + (envLogDir != null && !envLogDir.isEmpty()) ? envLogDir : "/mnt/amoro-logs/compaction"; + } + + private static final String DRIVER_LOG_FILE = "driver.log"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** + * Parses a log file into a list of JSON objects. The file is expected to be NDJSON + * (newline-delimited JSON), but may also contain raw Java stack trace lines that appear after an + * ERROR log entry when the log4j2 PatternLayout did not include the throwable inside the JSON + * pattern. Such raw lines are collected and attached as a {@code stackTrace} field on the + * preceding log entry so the API can deliver them to the UI. + * + * @param logPath Path to the log file + * @return List of parsed log entry objects (as Maps) + */ + private List> parseNDJSONLogFile(Path logPath) { + List> logEntries = new ArrayList<>(); + try (BufferedReader reader = Files.newBufferedReader(logPath)) { + String line; + int lineNumber = 0; + Map lastJsonEntry = null; + StringBuilder stackTraceBuffer = new StringBuilder(); + + while ((line = reader.readLine()) != null) { + lineNumber++; + String trimmed = line.trim(); + if (trimmed.isEmpty()) { + continue; + } + + if (trimmed.startsWith("{")) { + // Flush any accumulated raw stack trace lines to the preceding log entry. + // Only attach if the entry does not already have a non-empty stackTrace field + // (i.e. it was written by an older log4j2 pattern that did not embed the + // throwable). + if (stackTraceBuffer.length() > 0 && lastJsonEntry != null) { + Object existing = lastJsonEntry.get("stackTrace"); + if (existing == null || existing.toString().isEmpty()) { + lastJsonEntry.put("stackTrace", stackTraceBuffer.toString().stripTrailing()); + } + stackTraceBuffer.setLength(0); + } + + try { + JsonNode jsonNode = OBJECT_MAPPER.readTree(trimmed); + Map logEntry = OBJECT_MAPPER.convertValue(jsonNode, Map.class); + logEntries.add(logEntry); + lastJsonEntry = logEntry; + } catch (Exception e) { + LOG.warn( + "Failed to parse JSON at line {} in {}: {}", + lineNumber, + logPath.getFileName(), + e.getMessage()); + stackTraceBuffer.append(line).append("\n"); + } + } else { + // Non-JSON line: part of a Java stack trace that was written outside the JSON + // object. + stackTraceBuffer.append(line).append("\n"); + } + } + + // Flush any stack trace that trails the last JSON entry (no subsequent JSON + // line). + if (stackTraceBuffer.length() > 0 && lastJsonEntry != null) { + Object existing = lastJsonEntry.get("stackTrace"); + if (existing == null || existing.toString().isEmpty()) { + lastJsonEntry.put("stackTrace", stackTraceBuffer.toString().stripTrailing()); + } + } + } catch (IOException e) { + LOG.error("Failed to read log file: {}", logPath, e); + } + return logEntries; + } + + public void getProcessLogs(Context ctx) { + String processId = ctx.pathParam("processId"); + Path processDir = Paths.get(LOG_BASE_DIR, processId); + + Map response = new HashMap<>(); + response.put("processId", processId); + + if (!Files.exists(processDir) || !Files.isDirectory(processDir)) { + response.put("exists", false); + response.put("message", "Process log directory not found"); + response.put("driverLog", null); + response.put("taskLogs", new ArrayList<>()); + ctx.json(OkResponse.of(response)); + return; + } + + response.put("exists", true); + + // reads driver log + Path driverLogPath = processDir.resolve(DRIVER_LOG_FILE); + Map driverLog = new HashMap<>(); + if (Files.exists(driverLogPath)) { + try { + driverLog.put("exists", true); + driverLog.put("content", parseNDJSONLogFile(driverLogPath)); + } catch (Exception e) { + LOG.error("Failed to read driver log: {}", driverLogPath, e); + driverLog.put("exists", true); + driverLog.put("error", "Failed to read: " + e.getMessage()); + } + } else { + driverLog.put("exists", false); + } + response.put("driverLog", driverLog); + + // reads all sub-task logs + List> taskLogs = new ArrayList<>(); + try (DirectoryStream stream = Files.newDirectoryStream(processDir, "*.log")) { + for (Path taskLogPath : stream) { + String fileName = taskLogPath.getFileName().toString(); + if (fileName.equals(DRIVER_LOG_FILE)) { + continue; // skip driver log + } + + String taskId = fileName.replace(".log", ""); + Map taskLog = new HashMap<>(); + taskLog.put("taskId", taskId); + + try { + taskLog.put("exists", true); + taskLog.put("content", parseNDJSONLogFile(taskLogPath)); + taskLogs.add(taskLog); + } catch (Exception e) { + LOG.error("Failed to read task log: {}", taskLogPath, e); + taskLog.put("exists", true); + taskLog.put("error", "Failed to read: " + e.getMessage()); + taskLogs.add(taskLog); + } + } + } catch (IOException e) { + LOG.error("Failed to list task logs in directory: {}", processDir, e); + } + + response.put("taskLogs", taskLogs); + ctx.json(OkResponse.of(response)); + } + + public void downloadLogFile(Context ctx) { + String processId = ctx.pathParam("processId"); + String fileId = ctx.pathParam("fileId"); + // fileId is either "driver" or a taskId like "1", "2", etc. + String fileName = fileId + ".log"; + Path logFile = Paths.get(LOG_BASE_DIR, processId, fileName); + + if (!Files.exists(logFile) || !Files.isRegularFile(logFile)) { + ctx.status(HttpCode.NOT_FOUND).result("Log file not found: " + processId + "/" + fileName); + return; + } + + ctx.header("Content-Type", "text/plain; charset=UTF-8"); + ctx.header("Content-Disposition", "attachment; filename=\"" + fileName + "\""); + + try { + ctx.result(Files.newInputStream(logFile)); + } catch (IOException e) { + LOG.error("Failed to read log file: {}", logFile, e); + ctx.status(HttpCode.INTERNAL_SERVER_ERROR).result("Failed to read log file"); + } + } + + public void downloadProcessLogs(Context ctx) { + String processId = ctx.pathParam("processId"); + Path processDir = Paths.get(LOG_BASE_DIR, processId); + + if (!Files.exists(processDir) || !Files.isDirectory(processDir)) { + ctx.status(HttpCode.NOT_FOUND).result("Process log directory not found: " + processId); + return; + } + + String tarFileName = "compaction-logs-" + processId + ".tar"; + ctx.header("Content-Type", "application/x-tar"); + ctx.header("Content-Disposition", "attachment; filename=\"" + tarFileName + "\""); + + try { + OutputStream out = ctx.res.getOutputStream(); + try (TarArchiveOutputStream tar = new TarArchiveOutputStream(new BufferedOutputStream(out))) { + tar.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); + + try (DirectoryStream stream = Files.newDirectoryStream(processDir, "*.log")) { + for (Path logFile : stream) { + long size = Files.size(logFile); + TarArchiveEntry entry = new TarArchiveEntry(logFile.getFileName().toString()); + entry.setSize(size); + tar.putArchiveEntry(entry); + Files.copy(logFile, tar); + tar.closeArchiveEntry(); + } + } + + tar.finish(); + } + } catch (IOException e) { + LOG.error("Failed to create tar archive for process {}", processId, e); + } + } +} diff --git a/amoro-common/pom.xml b/amoro-common/pom.xml index fd81dd1bde..70f85743dc 100644 --- a/amoro-common/pom.xml +++ b/amoro-common/pom.xml @@ -152,6 +152,16 @@ cron-utils + + org.apache.logging.log4j + log4j-api + + + + org.apache.logging.log4j + log4j-core + + org.apache.curator diff --git a/amoro-common/src/main/java/org/apache/amoro/log/OptimizingTaskLogContext.java b/amoro-common/src/main/java/org/apache/amoro/log/OptimizingTaskLogContext.java new file mode 100644 index 0000000000..8a94c74e71 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/log/OptimizingTaskLogContext.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by Datazip Inc. in 2026 + */ + +package org.apache.amoro.log; + +import org.apache.logging.log4j.ThreadContext; + +// manages per-thread logging context using Log4j2 thread context: setting "processId" and "taskId" +// isContextSet: prevent overwriting an already-initialized MDC context +public class OptimizingTaskLogContext { + + public static final String PROCESS_ID_KEY = "processId"; + public static final String TASK_ID_KEY = "taskId"; + public static final String LOG_FILE_PATH_KEY = "logFilePath"; + + // per-thread flag indicating whether a caller has already set up logging context. + private static final ThreadLocal CONTEXT_SET = ThreadLocal.withInitial(() -> false); + + public static void setContext(long processId, int taskId) { + CONTEXT_SET.set(true); + ThreadContext.put(PROCESS_ID_KEY, String.valueOf(processId)); + ThreadContext.put(TASK_ID_KEY, String.valueOf(taskId)); + } + + public static void clearContext() { + CONTEXT_SET.remove(); + ThreadContext.remove(PROCESS_ID_KEY); + ThreadContext.remove(TASK_ID_KEY); + ThreadContext.remove(LOG_FILE_PATH_KEY); + } + + public static boolean isContextSet() { + return CONTEXT_SET.get(); + } +} diff --git a/amoro-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizerExecutor.java b/amoro-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizerExecutor.java index 91eb9a9e56..84098e2da6 100644 --- a/amoro-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizerExecutor.java +++ b/amoro-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizerExecutor.java @@ -14,6 +14,8 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Modified by Datazip Inc. in 2026 */ package org.apache.amoro.optimizer.spark; @@ -30,6 +32,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import java.util.List; @@ -53,6 +56,16 @@ protected OptimizingTaskResult executeTask(OptimizingTask task) { OptimizingTaskResult result; String threadName = Thread.currentThread().getName(); long startTime = System.currentTimeMillis(); + + long processId = task.getTaskId().getProcessId(); + int taskId = task.getTaskId().getTaskId(); + String driverFilePath = processId + "/driver"; + + // Set MDC context for Log4j2 routing + // Driver logs go to: //driver.log + MDC.put("processId", String.valueOf(processId)); + MDC.put("logFilePath", driverFilePath); + try { ImmutableList of = ImmutableList.of(task); jsc.setJobDescription(jobDescription(task)); @@ -76,6 +89,9 @@ protected OptimizingTaskResult executeTask(OptimizingTask task) { result = new OptimizingTaskResult(task.getTaskId(), threadId); result.setErrorMessage(ExceptionUtil.getErrorMessage(r, ERROR_MESSAGE_MAX_LENGTH)); return result; + } finally { + MDC.remove("processId"); + MDC.remove("logFilePath"); } } diff --git a/amoro-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizingTaskFunction.java b/amoro-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizingTaskFunction.java index 5bb88648f7..17307c14d6 100644 --- a/amoro-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizingTaskFunction.java +++ b/amoro-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizingTaskFunction.java @@ -14,17 +14,21 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Modified by Datazip Inc. in 2026 */ package org.apache.amoro.optimizer.spark; import org.apache.amoro.api.OptimizingTask; import org.apache.amoro.api.OptimizingTaskResult; +import org.apache.amoro.log.OptimizingTaskLogContext; import org.apache.amoro.optimizer.common.OptimizerConfig; import org.apache.amoro.optimizer.common.OptimizerExecutor; import org.apache.spark.api.java.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; /** * The {@code SparkOptimizingTaskExecuteFunction} defines the whole processing logic that how to @@ -42,6 +46,32 @@ public SparkOptimizingTaskFunction(OptimizerConfig config, int threadId) { @Override public OptimizingTaskResult call(OptimizingTask task) { - return OptimizerExecutor.executeTask(config, threadId, task, LOG); + // Set MDC context on Spark executor for Log4j2 routing + // Executor logs go to: //.log + long processId = task.getTaskId().getProcessId(); + int taskId = task.getTaskId().getTaskId(); + String logFilePath = processId + "/" + taskId; + + // Set OptimizingTaskLogContext FIRST so AbstractRewriteFilesExecutor.execute() + // sees isContextSet()==true and does NOT override our MDC with its own format. + OptimizingTaskLogContext.setContext(processId, taskId); + + // Override logFilePath to our desired format + MDC.put("processId", String.valueOf(processId)); + MDC.put("taskId", String.valueOf(taskId)); + MDC.put("logFilePath", logFilePath); + + try { + OptimizingTaskResult result = OptimizerExecutor.executeTask(config, threadId, task, LOG); + return result; + } catch (Exception e) { + LOG.error("Task execution failed on executor", e); + throw e; + } finally { + OptimizingTaskLogContext.clearContext(); + MDC.remove("processId"); + MDC.remove("taskId"); + MDC.remove("logFilePath"); + } } } diff --git a/dist/src/main/amoro-bin/conf/optimize/log4j2-routing.xml b/dist/src/main/amoro-bin/conf/optimize/log4j2-routing.xml new file mode 100644 index 0000000000..86953231ea --- /dev/null +++ b/dist/src/main/amoro-bin/conf/optimize/log4j2-routing.xml @@ -0,0 +1,142 @@ + + + + + + ${env:LOG_DIR:-/mnt/amoro-logs/compaction} + ${env:CONSOLE_LOG_LEVEL:-info} + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%t] [%c{1}] [P:%X{processId}|T:%X{taskId}|Table:%X{tableName}|Exec:%X{executorId}] - %m%n + {"level":"%p","time":"%d{yyyy-MM-dd'T'HH:mm:ss.SSS'Z'}{UTC}","processId":"%X{processId}","taskId":"%X{taskId}","logger":"%c{1}","message":"%enc{%m}{JSON}","stackTrace":"%enc{%throwable{full}}{JSON}"}%n + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docker/amoro/entrypoint.sh b/docker/amoro/entrypoint.sh index 57d8673733..59b9bc6650 100644 --- a/docker/amoro/entrypoint.sh +++ b/docker/amoro/entrypoint.sh @@ -16,6 +16,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# +# Modified by Datazip Inc. in 2026 ############################################################################### args=("$@") @@ -44,6 +46,11 @@ configure_jvm_options() { configure_jvm_options +# Create compaction log directory for Log4j2 routing (mounted as shared volume). +# LOG_DIR is set by docker-compose (default: /mnt/amoro-logs/compaction). +_COMPACTION_LOG_DIR="${LOG_DIR:-/mnt/amoro-logs/compaction}" +mkdir -p "${_COMPACTION_LOG_DIR}" +chmod 777 "${_COMPACTION_LOG_DIR}" if [ $1 == "help" ]; then printf "Usage: $(basename $0) [ams|optimizer] [args]\n" @@ -60,4 +67,4 @@ elif [ "$1" == "optimizer" ]; then fi # Running command in pass-through mode -exec "${args[@]}" \ No newline at end of file +exec "${args[@]}" diff --git a/docker/optimizer-spark/Dockerfile b/docker/optimizer-spark/Dockerfile index 5365dc48d9..708e2cb515 100644 --- a/docker/optimizer-spark/Dockerfile +++ b/docker/optimizer-spark/Dockerfile @@ -15,6 +15,8 @@ # limitations under the License. # # Modified by Datazip Inc. in 2026 +# +# Modified by Datazip Inc. in 2026 ARG SPARK_VERSION=3.5.8 # Use the explicit java17 tag to guarantee Java 17 runtime. @@ -43,4 +45,10 @@ RUN cd $SPARK_HOME/jars \ && chown spark:spark *.jar \ && mkdir -p $SPARK_HOME/usrlib +# Remove Spark's default log4j2 config so our XML config takes priority +RUN rm -f $SPARK_HOME/conf/log4j2.properties $SPARK_HOME/conf/log4j2.properties.template + +# Install our log4j2.xml with Routing appender for per-process/task log files +COPY docker/optimizer-spark/log4j2.xml $SPARK_HOME/conf/log4j2.xml + COPY $OPTIMIZER_JOB $SPARK_HOME/usrlib/optimizer-job.jar diff --git a/docker/optimizer-spark/log4j2.xml b/docker/optimizer-spark/log4j2.xml new file mode 100644 index 0000000000..1c799b1ca3 --- /dev/null +++ b/docker/optimizer-spark/log4j2.xml @@ -0,0 +1,100 @@ + + + + + + ${env:LOG_DIR:-/mnt/amoro-logs/compaction} + %d{yyyy-MM-dd HH:mm:ss.SSS'Z'}{UTC} %-5p [%t] [%c{1}] [P:%X{processId}|T:%X{taskId}] - %m%n + {"level":"%p","time":"%d{yyyy-MM-dd'T'HH:mm:ss.SSS'Z'}{UTC}","processId":"%X{processId}","taskId":"%X{taskId}","logger":"%c{1}","message":"%enc{%m}{JSON}","stackTrace":"%enc{%throwable{full}}{JSON}"}%n + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/local-test/config.yaml b/local-test/config.yaml index f80598a9d5..0ac227c4e6 100644 --- a/local-test/config.yaml +++ b/local-test/config.yaml @@ -180,4 +180,15 @@ containers: spark-conf.spark.kubernetes.driver.secretKeyRef.AWS_ACCESS_KEY_ID: "fusion-s3-credentials:access-key-id" spark-conf.spark.kubernetes.driver.secretKeyRef.AWS_SECRET_ACCESS_KEY: "fusion-s3-credentials:secret-access-key" spark-conf.spark.kubernetes.executor.secretKeyRef.AWS_ACCESS_KEY_ID: "fusion-s3-credentials:access-key-id" - spark-conf.spark.kubernetes.executor.secretKeyRef.AWS_SECRET_ACCESS_KEY: "fusion-s3-credentials:secret-access-key" \ No newline at end of file + spark-conf.spark.kubernetes.executor.secretKeyRef.AWS_SECRET_ACCESS_KEY: "fusion-s3-credentials:secret-access-key" + # Mount shared log volume so driver/executor pods write logs to the same path + # that LogController reads from (via the Kind node hostPath extraMount). + spark-conf.spark.kubernetes.driver.volumes.hostPath.amoro-logs.mount.path: "/mnt/amoro-logs" + spark-conf.spark.kubernetes.driver.volumes.hostPath.amoro-logs.mount.readOnly: "false" + spark-conf.spark.kubernetes.driver.volumes.hostPath.amoro-logs.options.path: "/mnt/amoro-logs" + spark-conf.spark.kubernetes.executor.volumes.hostPath.amoro-logs.mount.path: "/mnt/amoro-logs" + spark-conf.spark.kubernetes.executor.volumes.hostPath.amoro-logs.mount.readOnly: "false" + spark-conf.spark.kubernetes.executor.volumes.hostPath.amoro-logs.options.path: "/mnt/amoro-logs" + # Force Log4j2 to use the routing config (belt-and-suspenders with the Dockerfile COPY) + spark-conf.spark.driver.extraJavaOptions: "-Dlog4j2.configurationFile=file:///opt/spark/conf/log4j2.xml" + spark-conf.spark.executor.extraJavaOptions: "-Dlog4j2.configurationFile=file:///opt/spark/conf/log4j2.xml" diff --git a/local-test/docker-compose.yml b/local-test/docker-compose.yml index 79089ff72b..4136f25019 100644 --- a/local-test/docker-compose.yml +++ b/local-test/docker-compose.yml @@ -42,10 +42,12 @@ services: KUBECONFIG: /root/.kube/config AWS_ACCESS_KEY_ID: admin AWS_SECRET_ACCESS_KEY: password + LOG_DIR: /mnt/amoro-logs/compaction volumes: - ./config.yaml:/usr/local/amoro/conf/config.yaml:ro - kind-kubeconfig:/root/.kube:ro - spark-home:/opt/spark:ro + - ${AMORO_LOG_DIR:-/opt/amoro-logs}:/mnt/amoro-logs command: ["/entrypoint.sh", "ams"] tty: true stdin_open: true @@ -243,6 +245,8 @@ services: - prod networks: iceberg-net: + environment: + AMORO_LOG_DIR: ${AMORO_LOG_DIR:-/opt/amoro-logs} volumes: - /var/run/docker.sock:/var/run/docker.sock - kind-kubeconfig:/output @@ -254,6 +258,12 @@ services: - | set -e + LOG_DIR="$${AMORO_LOG_DIR:-/opt/amoro-logs}" + echo "==> Log directory: $$LOG_DIR" + + echo "==> Generating kind cluster config..." + sed "s|AMORO_LOG_PLACEHOLDER|$$LOG_DIR|g" /kind-config.yaml > /tmp/kind-config-gen.yaml + echo "==> Installing kind..." apk add --no-cache curl >/dev/null 2>&1 curl -sLo /usr/local/bin/kind https://kind.sigs.k8s.io/dl/v0.23.0/kind-linux-amd64 @@ -263,7 +273,7 @@ services: if kind get clusters 2>/dev/null | grep -q '^fusion-cluster$$'; then echo " Cluster already exists, skipping creation." else - kind create cluster --config /kind-config.yaml --wait 120s + kind create cluster --config /tmp/kind-config-gen.yaml --wait 120s fi echo "==> Connecting Kind nodes to iceberg-net..." diff --git a/local-test/kind-config.yaml b/local-test/kind-config.yaml index 24e9649544..5f2a043fdb 100644 --- a/local-test/kind-config.yaml +++ b/local-test/kind-config.yaml @@ -26,6 +26,9 @@ networking: nodes: - role: control-plane image: kindest/node:v1.28.0 + extraMounts: + - hostPath: AMORO_LOG_PLACEHOLDER + containerPath: /mnt/amoro-logs kubeadmConfigPatches: - | kind: InitConfiguration @@ -42,6 +45,9 @@ nodes: - "fusion-cluster-control-plane" - role: worker image: kindest/node:v1.28.0 + extraMounts: + - hostPath: AMORO_LOG_PLACEHOLDER + containerPath: /mnt/amoro-logs kubeadmConfigPatches: - | kind: JoinConfiguration @@ -50,6 +56,9 @@ nodes: max-pods: "110" - role: worker image: kindest/node:v1.28.0 + extraMounts: + - hostPath: AMORO_LOG_PLACEHOLDER + containerPath: /mnt/amoro-logs kubeadmConfigPatches: - | kind: JoinConfiguration