Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ public void deletePartitions(String instantTime, List<MetadataPartitionType> par
@Override
protected void preWrite(String instantTime) {
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
// Every time `HFile` class is loaded in class loader, it'll create a static `MetricsIO` instance for region server IO and
// register a metric 'RegionServer,sub=IO' in `DefaultMetricsSystem#newSourceName`. There is no configuration to disable it.
// When the flink cluster is deployed as session mode and hudi flink bundle jar is summited with SQL or datastream job,
// multiple userCode context classloaders may register same region server metric which leads to validating exception in
// `DefaultMetricsSystem#newSourceName`. So here we set the init mode for hadoop metrics as `STANDBY` to disable metric
// registering for HFile writer.
System.setProperty("hadoop.metrics.init.mode", "STANDBY");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the issue exist on master? guess no because Hbase jar has been removed? Also there is no need to set it up every time for MDT commit? Just set up once in coordinator #initMetadataTable should be enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the issue only exists before 1.1.

We can put the setting up in #initMetadataTable, but my concern is hadoop.metrics.init.mode is not only for hbase metric, it's a hadoop configuration, and not sure whether it'll affect the initialization of other component's metric, so I incline to delay setting this configuration until the HFile writer initialization is required.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.sink.utils.ExplicitClassloaderThreadFactory;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.storage.StorageConfiguration;
Expand Down Expand Up @@ -70,6 +71,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN_OR_EQUALS;
Expand Down Expand Up @@ -194,12 +196,13 @@ public void start() throws Exception {
// the write client must create after the table creation
this.writeClient = FlinkWriteClients.createWriteClient(conf);
this.ckpMetadata = initCkpMetadata(writeClient.getConfig(), this.conf);
initMetadataTable(this.writeClient);
this.tableState = TableState.create(conf);
// start the executor
this.executor = NonThrownExecutor.builder(LOG)
.exceptionHook((errMsg, t) -> this.context.failJob(new HoodieException(errMsg, t)))
.threadFactory(getThreadFactory("meta-event-handle"))
.waitForTasksFinish(true).build();
initMetadataTable(this.writeClient);
// start the executor if required
if (tableState.syncHive) {
initHiveSync();
Expand All @@ -210,6 +213,10 @@ public void start() throws Exception {
}
}

private ThreadFactory getThreadFactory(String threadName) {
return new ExplicitClassloaderThreadFactory(threadName, context.getUserCodeClassloader());
}

@Override
public void close() throws Exception {
// teardown the resource
Expand Down Expand Up @@ -254,9 +261,6 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> r
public void notifyCheckpointComplete(long checkpointId) {
executor.execute(
() -> {
// The executor thread inherits the classloader of the #notifyCheckpointComplete
// caller, which is a AppClassLoader.
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// for streaming mode, commits the ever received events anyway,
// the stream write task snapshot and flush the data buffer synchronously in sequence,
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
Expand Down Expand Up @@ -322,7 +326,10 @@ public void subtaskReady(int i, SubtaskGateway subtaskGateway) {
// -------------------------------------------------------------------------

private void initHiveSync() {
this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
this.hiveSyncExecutor = NonThrownExecutor.builder(LOG)
.threadFactory(getThreadFactory("hive-sync"))
.waitForTasksFinish(true)
.build();
this.hiveSyncContext = HiveSyncContext.create(conf, this.storageConf);
}

Expand All @@ -348,8 +355,8 @@ public void doSyncHive() {
}
}

private static void initMetadataTable(HoodieFlinkWriteClient<?> writeClient) {
writeClient.initMetadataTable();
private void initMetadataTable(HoodieFlinkWriteClient<?> writeClient) {
this.executor.executeSync(writeClient::initMetadataTable, "Init metadata table client");
}

private static CkpMetadata initCkpMetadata(HoodieWriteConfig writeConfig, Configuration conf) throws IOException {
Expand Down Expand Up @@ -467,9 +474,6 @@ private void handleEndInputEvent(WriteMetadataEvent event) {
// start to commit the instant.
boolean committed = commitInstant(this.instant);
if (committed) {
// The executor thread inherits the classloader of the #handleEventFromOperator
// caller, which is a AppClassLoader.
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// sync Hive synchronously if it is enabled in batch mode.
syncHive();
// schedules the compaction or clustering if it is enabled in batch execution mode
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.sink.utils;

import org.apache.flink.util.FatalExitExceptionHandler;

import java.util.concurrent.ThreadFactory;

/**
* A thread factory with explicit classloader.
*/
public class ExplicitClassloaderThreadFactory implements ThreadFactory {
private final String threadName;
private final ClassLoader cl;
private final Thread.UncaughtExceptionHandler errorHandler;

private Thread t;

public ExplicitClassloaderThreadFactory(
final String threadName,
final ClassLoader contextClassLoader) {
this(threadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE);
}

public ExplicitClassloaderThreadFactory(
final String threadName,
final ClassLoader contextClassLoader,
final Thread.UncaughtExceptionHandler errorHandler) {
this.threadName = threadName;
this.cl = contextClassLoader;
this.errorHandler = errorHandler;
}

@Override
public synchronized Thread newThread(Runnable r) {
if (t != null) {
throw new Error(
"This indicates that a fatal error has happened and caused the "
+ "coordinator executor thread to exit. Check the earlier logs"
+ "to see the root cause of the problem.");
}
t = new Thread(r, threadName);
t.setContextClassLoader(cl);
t.setUncaughtExceptionHandler(errorHandler);
return t;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand Down Expand Up @@ -58,8 +59,8 @@ public class NonThrownExecutor implements AutoCloseable {
private final boolean waitForTasksFinish;

@VisibleForTesting
protected NonThrownExecutor(Logger logger, @Nullable ExceptionHook exceptionHook, boolean waitForTasksFinish) {
this.executor = Executors.newSingleThreadExecutor();
protected NonThrownExecutor(Logger logger, @Nullable ThreadFactory threadFactory, @Nullable ExceptionHook exceptionHook, boolean waitForTasksFinish) {
this.executor = threadFactory == null ? Executors.newSingleThreadExecutor() : Executors.newSingleThreadExecutor(threadFactory);
this.logger = logger;
this.exceptionHook = exceptionHook;
this.waitForTasksFinish = waitForTasksFinish;
Expand Down Expand Up @@ -168,6 +169,7 @@ public interface ExceptionHook {
*/
public static class Builder {
private final Logger logger;
private ThreadFactory threadFactory;
private ExceptionHook exceptionHook;
private boolean waitForTasksFinish = false;

Expand All @@ -176,7 +178,12 @@ private Builder(Logger logger) {
}

public NonThrownExecutor build() {
return new NonThrownExecutor(logger, exceptionHook, waitForTasksFinish);
return new NonThrownExecutor(logger, threadFactory, exceptionHook, waitForTasksFinish);
}

public Builder threadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}

public Builder exceptionHook(ExceptionHook exceptionHook) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class MockCoordinatorExecutor extends NonThrownExecutor {
private static final Logger LOG = LoggerFactory.getLogger(MockCoordinatorExecutor.class);

public MockCoordinatorExecutor(OperatorCoordinator.Context context) {
super(LOG, (errMsg, t) -> context.failJob(new HoodieException(errMsg, t)), true);
super(LOG, null, (errMsg, t) -> context.failJob(new HoodieException(errMsg, t)), true);
}

@Override
Expand Down
Loading