Skip to content

跨线程标签传递

Fishtail edited this page Oct 30, 2025 · 4 revisions

在微服务架构中,通常需要在请求链路中传递一些元数据信息,这些元数据通常存储在 ThreadLocal 中,但在使用 CompletableFuture 等异步编程时会遇到问题。在sct中,我们提供了PolarisCompletableFutureUtils 和TaskExecutorWrapper 解决上述问题。

PolarisCompletableFutureUtils

PolarisCompletableFutureUtils 用于处理异步任务时跨线程元数据上下文传递的工具类,使用方式与标准 CompletableFuture 一致,解决了使用 CompletableFuture 进行异步编程时,ThreadLocal 上下文信息无法自动传递到新线程的问题。

使用示例:

PolarisCompletableFutureUtils.supplyAsync

// 使用PolarisCompletableFutureUtils 以传递元数据
@GetMapping("/async/supplyAsync/compare")
public Map<String, Object> supplyAsyncCompare() throws ExecutionException, InterruptedException {
	Map<String, String> customMetadata = new HashMap<>();
	customMetadata.put("user", "frank");
	customMetadata.put("trace-id", "supplyAsync-compare-" + System.currentTimeMillis());
	customMetadata.put("comparison", "polaris-vs-standard");
	MetadataContext metadataContext = MetadataContextHolder.get();
	metadataContext.setTransitiveMetadata(customMetadata);
	// ✅ 使用PolarisCompletableFutureUtils.supplyAsync,将元数据上下文传入子线程
	CompletableFuture<Map<String, Object>> polarisFuture = PolarisCompletableFutureUtils.supplyAsync(() -> {
		MetadataContext asyncContext = MetadataContextHolder.get();
		Map<String, String> asyncMetadata = asyncContext.getTransitiveMetadata();
		Map<String, Object> result = new HashMap<>();
		result.put("transitiveMetadata", asyncMetadata);
		result.put("threadName", Thread.currentThread().getName());
		return result;
	});
	// ❌ 使用标准 CompletableFuture.supplyAsync - 元数据不会传递
	CompletableFuture<Map<String, Object>> standardFuture = CompletableFuture.supplyAsync(() -> {
		MetadataContext asyncContext = MetadataContextHolder.get();
		Map<String, String> asyncMetadata = asyncContext.getTransitiveMetadata();
		Map<String, Object> result = new HashMap<>();
		result.put("transitiveMetadata", asyncMetadata);
		result.put("threadName", Thread.currentThread().getName());
		return result;
	});
	Map<String, Object> polarisResult = polarisFuture.get();
	Map<String, Object> standardResult = standardFuture.get();
	Map<String, Object> response = new HashMap<>();
	response.put("polarisResult", polarisResult);
	response.put("standardResult", standardResult);
	return response;
}
Clipboard_Screenshot_1761732246

PolarisCompletableFutureUtils.runAsync

@GetMapping("/async/runAsync/compare")
public Map<String, Object> runAsyncCompare() throws ExecutionException, InterruptedException {
	Map<String, String> customMetadata = new HashMap<>();
	customMetadata.put("user", "eve");
	customMetadata.put("trace-id", "supplyAsync-compare-" + System.currentTimeMillis());
	customMetadata.put("comparison", "polaris-vs-standard");
	MetadataContext metadataContext = MetadataContextHolder.get();
	metadataContext.setTransitiveMetadata(customMetadata);
	Map<String, Object> polarisResult = new HashMap<>();
	Map<String, Object> standardResult = new HashMap<>();
	// ✅ 使用PolarisCompletableFutureUtils.supplyAsync,将元数据上下文传入子线程
	CompletableFuture<Void> polarisFuture = PolarisCompletableFutureUtils.runAsync(() -> {
		MetadataContext asyncContext = MetadataContextHolder.get();
		Map<String, String> asyncMetadata = asyncContext.getTransitiveMetadata();
		Map<String, Object> result = new HashMap<>();
		result.put("transitiveMetadata", asyncMetadata);
		result.put("threadName", Thread.currentThread().getName());
		polarisResult.putAll(result);
	});
	// ❌ 使用标准 CompletableFuture.supplyAsync - 元数据不会传递
	CompletableFuture<Void> standardFuture = CompletableFuture.runAsync(() -> {
		MetadataContext asyncContext = MetadataContextHolder.get();
		Map<String, String> asyncMetadata = asyncContext.getTransitiveMetadata();
		Map<String, Object> result = new HashMap<>();
		result.put("transitiveMetadata", asyncMetadata);
		result.put("threadName", Thread.currentThread().getName());
		standardResult.putAll(result);
	});
	polarisFuture.get();
	standardFuture.get();
	Map<String, Object> response = new HashMap<>();
	response.put("polarisResult", polarisResult);
	response.put("standardResult", standardResult);
	return response;
}
Clipboard_Screenshot_1761732195

TaskExecutorWrapper

TaskExecutorWrapper 是一个线程池执行器的包装类,主要用于在异步任务执行时传递和恢复线程上下文信息。

@GetMapping("/async/taskExecutor/demo")
public Map<String, Object> taskExecutorDemo() throws InterruptedException {
	ThreadPoolTaskExecutor wrappedBaseExecutor = new ThreadPoolTaskExecutor();
	wrappedBaseExecutor.initialize();
	TaskExecutor wrappedExecutor = new TaskExecutorWrapper<>(
			wrappedBaseExecutor,
			CROSS_THREAD_METADATA_CONTEXT_SUPPLIER,  // Supplier: 获取当前线程的 MetadataContext
			CROSS_THREAD_METADATA_CONTEXT_CONSUMER   // Consumer: 设置到新线程的 MetadataContext
	);
	Map<String, String> customMetadata = new HashMap<>();
	customMetadata.put("user", "george");
	customMetadata.put("trace-id", "taskExecutor-demo-" + System.currentTimeMillis());
	customMetadata.put("source", "main-thread");
	MetadataContext metadataContext = MetadataContextHolder.get();
	metadataContext.setTransitiveMetadata(customMetadata);
	final Map<String, Object> wrappedResult = new HashMap<>();
	java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
	wrappedExecutor.execute(() -> {
		try {
			MetadataContext asyncContext = MetadataContextHolder.get();
			Map<String, String> asyncMetadata = asyncContext.getTransitiveMetadata();
			wrappedResult.put("transitiveMetadata", asyncMetadata);
		}
		finally {
			latch.countDown();
		}
	});
	latch.await();
	wrappedBaseExecutor.shutdown();
	return wrappedResult;
}
Clipboard_Screenshot_1761796590

Clone this wiki locally