forked from alibaba/transmittable-thread-local
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDistributedTracerUseDemo.kt
150 lines (115 loc) · 4.87 KB
/
DistributedTracerUseDemo.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package com.alibaba.demo.distributed_tracer.refcount
import com.alibaba.ttl.TransmittableThreadLocal
import com.alibaba.ttl.threadpool.TtlExecutors
import java.lang.Thread.sleep
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
/**
* DistributedTracer(DT) use demo.
*
* @author Jerry Lee (oldratlee at gmail dot com)
*/
fun main() {
rpcInvokeIn()
sleep(100)
}
private fun rpcInvokeIn() {
////////////////////////////////////////////////
// DistributedTracer Framework Code
////////////////////////////////////////////////
// Get Trace Id and Span Id from RPC Context
val traceId = "traceId_XXXYYY"
val baseSpanId = "1.1"
transferInfo.set(DtTransferInfo(traceId, baseSpanId))
traceId2LeafSpanIdInfo[traceId] = LeafSpanIdInfo()
increaseSpanIdRefCount()
////////////////////////////////////////////////
// Biz Code
////////////////////////////////////////////////
syncMethod()
////////////////////////////////////////////////
// DistributedTracer Framework Code
////////////////////////////////////////////////
decreaseSpanIdRefCount()
}
private val executorService = Executors.newFixedThreadPool(1) { r: Runnable ->
Thread(r, "Executors").apply { isDaemon = true }
}.let { TtlExecutors.getTtlExecutorService(it) }!!
private fun syncMethod() {
// async call by TTL Executor, Test OK!
executorService.submit { asyncMethod() }
// async call by new Thread
// FIXME Bug!! 没有 Increase/Decrease reference counter操作!
thread(name = "Thread-by-new") { syncMethod_ByNewThread() }
invokeServerWithRpc("server 1")
}
private fun asyncMethod() {
invokeServerWithRpc("server 2")
}
private fun syncMethod_ByNewThread() {
invokeServerWithRpc("server 3")
}
// RPC invoke
private fun invokeServerWithRpc(server: String) {
////////////////////////////////////////////////
// DistributedTracer Framework Code
////////////////////////////////////////////////
val leafSpanCurrent = increaseLeafSpanCurrentAndReturn()
// Set RpcContext
// Mocked, should use RPC util to get Rpc Context instead
val rpcContext = ConcurrentHashMap<String, String>()
rpcContext["traceId"] = transferInfo.get()!!.traceId
rpcContext["spanId"] = transferInfo.get()!!.baseSpanId + "." + leafSpanCurrent
// Do Rpc
// ...
System.out.printf("Do Rpc invocation to server %s with %s%n", server, rpcContext)
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////
internal class DtTransferInfo(val traceId: String, val baseSpanId: String)
internal class LeafSpanIdInfo(val current: AtomicInteger = AtomicInteger(1),
val refCounter: AtomicInteger = AtomicInteger(0))
private val transferInfo = object : TransmittableThreadLocal<DtTransferInfo>() {
/*
@Override
protected DtTransferInfo childValue(DtTransferInfo parentValue) {
// **注意**:
// 新建线程时,从父线程继承值时,计数加1
// 对应线程结束时,没有回调以清理ThreadLocal中的Context!,Bug!!
// InheritableThreadLocal 没有提供 对应的拦截方法。。。 计数不配对了。。。
// 但是一个线程就一个Context没清,线程数有限,Context占用内存一般很小,可以接受。
increaseSpanIdRefCount();
return super.childValue(parentValue);
}
*/
override fun beforeExecute() {
super.beforeExecute()
increaseSpanIdRefCount()
}
override fun afterExecute() {
decreaseSpanIdRefCount()
}
}
private val traceId2LeafSpanIdInfo = ConcurrentHashMap<String, LeafSpanIdInfo>()
private fun increaseSpanIdRefCount() {
val traceId = transferInfo.get().traceId
val refCounter = traceId2LeafSpanIdInfo[traceId]!!.refCounter.incrementAndGet()
System.out.printf("DEBUG: Increase reference counter(%s) for traceId %s in thread %s%n", refCounter, traceId, Thread.currentThread().name)
}
private fun decreaseSpanIdRefCount() {
val traceId = transferInfo.get().traceId
val leafSpanIdInfo = traceId2LeafSpanIdInfo[traceId]
val refCounter = leafSpanIdInfo!!.refCounter.decrementAndGet()
System.out.printf("DEBUG: Decrease reference counter(%s) for traceId %s in thread %s%n", refCounter, traceId, Thread.currentThread().name)
if (refCounter == 0) {
traceId2LeafSpanIdInfo.remove(traceId)
System.out.printf("DEBUG: Clear traceId2LeafSpanIdInfo for traceId %s in thread %s%n", traceId, Thread.currentThread().name)
} else if (refCounter < 0) {
throw IllegalStateException("Leaf Span Id Info Reference counter has Bug!!")
}
}
private fun increaseLeafSpanCurrentAndReturn(): Int {
val traceId = transferInfo.get()!!.traceId
return traceId2LeafSpanIdInfo[traceId]!!.current.getAndIncrement()
}