Skip to content

Commit ad61bbd

Browse files
committed
add OL instrumentation class
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent d48bb91 commit ad61bbd

File tree

5 files changed

+135
-103
lines changed

5 files changed

+135
-103
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

+6
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,11 @@ public static void enter(@Advice.This SparkContext sparkContext) {
4646
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());
4747
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
4848
}
49+
50+
@Advice.OnMethodExit(suppress = Throwable.class)
51+
public static void exit(@Advice.This SparkContext sparkContext) {
52+
// At this point all the listeners have been added to the listener bus
53+
AbstractDatadogSparkListener.listener.setupOpenLineage();
54+
}
4955
}
5056
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

+6
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,11 @@ public static void enter(@Advice.This SparkContext sparkContext) {
4646
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());
4747
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
4848
}
49+
50+
@Advice.OnMethodExit(suppress = Throwable.class)
51+
public static void exit(@Advice.This SparkContext sparkContext) {
52+
// At this point all the listeners have been added to the listener bus
53+
AbstractDatadogSparkListener.listener.setupOpenLineage();
54+
}
4955
}
5056
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

+42-79
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,10 @@
3333
import java.util.LinkedHashMap;
3434
import java.util.List;
3535
import java.util.Map;
36-
import java.util.Objects;
3736
import java.util.Optional;
3837
import java.util.Properties;
3938
import java.util.UUID;
4039
import java.util.function.Consumer;
41-
import java.util.stream.Collectors;
4240
import org.apache.spark.ExceptionFailure;
4341
import org.apache.spark.SparkConf;
4442
import org.apache.spark.TaskFailedReason;
@@ -54,6 +52,7 @@
5452
import org.apache.spark.sql.streaming.StateOperatorProgress;
5553
import org.apache.spark.sql.streaming.StreamingQueryListener;
5654
import org.apache.spark.sql.streaming.StreamingQueryProgress;
55+
import org.apache.spark.util.Utils;
5756
import org.slf4j.Logger;
5857
import org.slf4j.LoggerFactory;
5958
import scala.Tuple2;
@@ -71,6 +70,9 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
7170
private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class);
7271
private static final ObjectMapper objectMapper = new ObjectMapper();
7372
public static volatile AbstractDatadogSparkListener listener = null;
73+
public static volatile SparkListenerInterface openLineageSparkListener = null;
74+
public static volatile SparkConf openLineageSparkConf = null;
75+
7476
public static volatile boolean finishTraceOnApplicationEnd = true;
7577
public static volatile boolean isPysparkShell = false;
7678

@@ -113,6 +115,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
113115
private final Map<Long, SparkSQLUtils.AccumulatorWithStage> accumulators =
114116
new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE);
115117

118+
private volatile boolean isStreamingJob = false;
116119
private final boolean isRunningOnDatabricks;
117120
private final String databricksClusterName;
118121
private final String databricksServiceName;
@@ -127,7 +130,6 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
127130
private long availableExecutorTime = 0;
128131

129132
private volatile boolean applicationEnded = false;
130-
private SparkListener openLineageSparkListener = null;
131133

132134
public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sparkVersion) {
133135
tracer = AgentTracer.get();
@@ -156,8 +158,6 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
156158
finishApplication(System.currentTimeMillis(), null, 0, null);
157159
}
158160
}));
159-
initApplicationSpanIfNotInitialized();
160-
loadOlSparkListener();
161161
}
162162

163163
static void setupSparkConf(SparkConf sparkConf) {
@@ -167,34 +167,44 @@ static void setupSparkConf(SparkConf sparkConf) {
167167
sparkConf.set("spark.openlineage.transport.transports.agent.url", getAgentHttpUrl());
168168
sparkConf.set("spark.openlineage.transport.transports.agent.endpoint", AGENT_OL_ENDPOINT);
169169
sparkConf.set("spark.openlineage.transport.transports.agent.compression", "gzip");
170-
}
171-
172-
void setupTrace(SparkConf sc) {
173-
sc.set(
170+
sparkConf.set(
174171
"spark.openlineage.run.tags",
175172
"_dd.trace_id:"
176-
+ applicationSpan.context().getTraceId().toString()
177-
+ ";_dd.intake.emit_spans:false");
173+
+ listener.applicationSpan.context().getTraceId().toString()
174+
+ ";_dd.ol_intake.emit_spans:false");
178175
}
179176

180-
void loadOlSparkListener() {
177+
public void setupOpenLineage() {
178+
log.debug("Setting up OpenLineage-Datadog integration");
179+
if (openLineageSparkListener != null) {
180+
setupSparkConf(openLineageSparkConf);
181+
return;
182+
}
183+
181184
String className = "io.openlineage.spark.agent.OpenLineageSparkListener";
182-
Optional<Class> clazz = loadClass(className);
183-
if (!clazz.isPresent()) {
185+
Class clazz;
186+
try {
187+
try {
188+
clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
189+
} catch (ClassNotFoundException e) {
190+
clazz = Class.forName(className, true, Utils.class.getClassLoader());
191+
}
192+
} catch (ClassNotFoundException e) {
184193
log.info("OpenLineage integration is not present on the classpath");
185194
return;
186195
}
187-
try {
188-
setupSparkConf(sparkConf);
189-
sparkConf.set(
190-
"spark.openlineage.run.tags",
191-
"_dd.trace_id:"
192-
+ applicationSpan.context().getTraceId().toString()
193-
+ ";_dd.ol_intake.emit_spans:false");
194196

197+
openLineageSparkConf = sparkConf;
198+
if (clazz == null) {
199+
log.info("OpenLineage integration is not present on the classpath: class is null");
200+
return;
201+
}
202+
try {
203+
setupSparkConf(openLineageSparkConf);
195204
openLineageSparkListener =
196-
(SparkListener)
197-
clazz.get().getDeclaredConstructor(SparkConf.class).newInstance(sparkConf);
205+
(SparkListenerInterface)
206+
clazz.getConstructor(SparkConf.class).newInstance(openLineageSparkConf);
207+
198208
log.info(
199209
"Created OL spark listener: {}", openLineageSparkListener.getClass().getSimpleName());
200210
} catch (Exception e) {
@@ -223,8 +233,6 @@ void loadOlSparkListener() {
223233
@Override
224234
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
225235
this.applicationStart = applicationStart;
226-
initApplicationSpanIfNotInitialized();
227-
notifyOl(this.openLineageSparkListener::onApplicationStart, applicationStart);
228236
}
229237

230238
private void initApplicationSpanIfNotInitialized() {
@@ -245,6 +253,8 @@ private void initApplicationSpanIfNotInitialized() {
245253
}
246254
}
247255

256+
notifyOl(x -> this.openLineageSparkListener.onApplicationStart(x), applicationStart);
257+
248258
captureApplicationParameters(builder);
249259
captureOpenlineageContextIfPresent(builder);
250260

@@ -455,6 +465,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
455465
if (sqlSpan != null) {
456466
jobSpanBuilder.asChildOf(sqlSpan.context());
457467
} else if (batchKey != null) {
468+
isStreamingJob = true;
458469
AgentSpan batchSpan =
459470
getOrCreateStreamingBatchSpan(batchKey, jobStart.time(), jobStart.properties());
460471
jobSpanBuilder.asChildOf(batchSpan.context());
@@ -760,7 +771,11 @@ public void onOtherEvent(SparkListenerEvent event) {
760771
}
761772

762773
private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
763-
if (this.openLineageSparkListener != null) {
774+
if (isRunningOnDatabricks || isStreamingJob) {
775+
log.debug("Not emitting event when running on databricks or on streaming jobs");
776+
return;
777+
}
778+
if (openLineageSparkListener != null) {
764779
log.debug("Notifying with event `{}`", event.getClass().getCanonicalName());
765780
ol.accept(event);
766781
} else {
@@ -816,6 +831,7 @@ private synchronized void updateAdaptiveSQLPlan(SparkListenerEvent event) {
816831
private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sqlStart) {
817832
sqlPlans.put(sqlStart.executionId(), sqlStart.sparkPlanInfo());
818833
sqlQueries.put(sqlStart.executionId(), sqlStart);
834+
notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlStart);
819835
}
820836

821837
private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) {
@@ -1338,57 +1354,4 @@ private static String removeUuidFromEndOfString(String input) {
13381354
return input.replaceAll(
13391355
"_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", "");
13401356
}
1341-
1342-
private Optional<Class> loadClass(String className) {
1343-
Class clazz = null;
1344-
List<ClassLoader> availableClassloaders =
1345-
Thread.getAllStackTraces().keySet().stream()
1346-
.map(Thread::getContextClassLoader)
1347-
.filter(Objects::nonNull)
1348-
.collect(Collectors.toList());
1349-
try {
1350-
clazz = Class.forName(className);
1351-
} catch (Exception e) {
1352-
log.debug("Failed to load {} via Class.forName: {}", className, e.toString());
1353-
for (ClassLoader classLoader : availableClassloaders) {
1354-
try {
1355-
clazz = classLoader.loadClass(className);
1356-
log.debug("Loaded {} via classLoader: {}", className, classLoader);
1357-
break;
1358-
} catch (Exception ex) {
1359-
log.debug(
1360-
"Failed to load {} via loadClass via ClassLoader {} - {}",
1361-
className,
1362-
classLoader,
1363-
ex.toString());
1364-
}
1365-
try {
1366-
clazz = classLoader.getParent().loadClass(className);
1367-
log.debug(
1368-
"Loaded {} via parent classLoader: {} for CL {}",
1369-
className,
1370-
classLoader.getParent(),
1371-
classLoader);
1372-
break;
1373-
} catch (Exception ex) {
1374-
log.debug(
1375-
"Failed to load {} via loadClass via parent ClassLoader {} - {}",
1376-
className,
1377-
classLoader.getParent(),
1378-
ex.toString());
1379-
}
1380-
}
1381-
}
1382-
if (clazz == null) {
1383-
try {
1384-
clazz = ClassLoader.getSystemClassLoader().loadClass(className);
1385-
log.debug(
1386-
"Loaded {} via system classLoader: {}", className, ClassLoader.getSystemClassLoader());
1387-
} catch (Exception ex) {
1388-
log.debug(
1389-
"Failed to load {} via loadClass via SystemClassLoader {}", className, ex.toString());
1390-
}
1391-
}
1392-
return Optional.ofNullable(clazz);
1393-
}
13941357
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

+1-24
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import datadog.trace.agent.tooling.InstrumenterModule;
1111
import net.bytebuddy.asm.Advice;
1212
import org.apache.spark.deploy.SparkSubmitArguments;
13-
import org.apache.spark.scheduler.SparkListenerInterface;
1413

1514
public abstract class AbstractSparkInstrumentation extends InstrumenterModule.Tracing
1615
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {
@@ -30,7 +29,7 @@ public String[] knownMatchingTypes() {
3029
"org.apache.spark.SparkContext",
3130
"org.apache.spark.deploy.SparkSubmit",
3231
"org.apache.spark.deploy.yarn.ApplicationMaster",
33-
"org.apache.spark.scheduler.LiveListenerBus",
32+
"org.apache.spark.util.SparkClassUtils"
3433
};
3534
}
3635

@@ -57,14 +56,6 @@ public void methodAdvice(MethodTransformer transformer) {
5756
.and(named("finish"))
5857
.and(isDeclaredBy(named("org.apache.spark.deploy.yarn.ApplicationMaster"))),
5958
AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice");
60-
61-
// LiveListenerBus class is used when running in a YARN cluster
62-
transformer.applyAdvice(
63-
isMethod()
64-
.and(named("addToSharedQueue"))
65-
.and(takesArgument(0, named("org.apache.spark.scheduler.SparkListenerInterface")))
66-
.and(isDeclaredBy(named("org.apache.spark.scheduler.LiveListenerBus"))),
67-
AbstractSparkInstrumentation.class.getName() + "$LiveListenerBusAdvice");
6859
}
6960

7061
public static class PrepareSubmitEnvAdvice {
@@ -110,18 +101,4 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
110101
}
111102
}
112103
}
113-
114-
public static class LiveListenerBusAdvice {
115-
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
116-
public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) {
117-
// Skip instantiating OpenLineage listener - we will inject it later with custom config
118-
if (listener == null || listener.getClass().getCanonicalName() == null) {
119-
return false;
120-
}
121-
return listener
122-
.getClass()
123-
.getCanonicalName()
124-
.equals("io.openlineage.spark.agent.OpenLineageSparkListener");
125-
}
126-
}
127104
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
5+
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
6+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
7+
8+
import com.google.auto.service.AutoService;
9+
import datadog.trace.agent.tooling.Instrumenter;
10+
import datadog.trace.agent.tooling.InstrumenterModule;
11+
import datadog.trace.api.Config;
12+
import java.lang.reflect.Field;
13+
import net.bytebuddy.asm.Advice;
14+
import org.apache.spark.SparkConf;
15+
import org.apache.spark.scheduler.SparkListenerInterface;
16+
import org.slf4j.LoggerFactory;
17+
18+
@AutoService(InstrumenterModule.class)
19+
public class OpenLineageInstrumentation extends InstrumenterModule.Tracing
20+
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {
21+
22+
public OpenLineageInstrumentation() {
23+
super("openlineage-spark");
24+
}
25+
26+
@Override
27+
public String[] helperClassNames() {
28+
return new String[] {
29+
packageName + ".AbstractDatadogSparkListener",
30+
packageName + ".DatabricksParentContext",
31+
packageName + ".OpenlineageParentContext",
32+
packageName + ".RemoveEldestHashMap",
33+
packageName + ".SparkAggregatedTaskMetrics",
34+
packageName + ".SparkConfAllowList",
35+
packageName + ".SparkSQLUtils",
36+
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
37+
packageName + ".SparkSQLUtils$AccumulatorWithStage",
38+
};
39+
}
40+
41+
@Override
42+
public boolean defaultEnabled() {
43+
return true;
44+
}
45+
46+
@Override
47+
public String[] knownMatchingTypes() {
48+
return new String[] {
49+
"io.openlineage.spark.agent.OpenLineageSparkListener", "org.apache.spark.util.Utils"
50+
};
51+
}
52+
53+
@Override
54+
public void methodAdvice(MethodTransformer transformer) {
55+
// LiveListenerBus class is used when running in a YARN cluster
56+
transformer.applyAdvice(
57+
isConstructor()
58+
.and(isDeclaredBy(named("io.openlineage.spark.agent.OpenLineageSparkListener")))
59+
.and(takesArgument(0, named("org.apache.spark.SparkConf"))),
60+
OpenLineageInstrumentation.class.getName() + "$OpenLineageSparkListenerAdvice");
61+
}
62+
63+
public static class OpenLineageSparkListenerAdvice {
64+
@Advice.OnMethodExit(suppress = Throwable.class)
65+
public static void exit(@Advice.This Object self) throws IllegalAccessException {
66+
LoggerFactory.getLogger(Config.class).debug("Checking for OpenLineageSparkListener");
67+
try {
68+
Field conf = self.getClass().getDeclaredField("conf");
69+
conf.setAccessible(true);
70+
AbstractDatadogSparkListener.openLineageSparkConf = (SparkConf) conf.get(self);
71+
AbstractDatadogSparkListener.openLineageSparkListener = (SparkListenerInterface) self;
72+
LoggerFactory.getLogger(Config.class)
73+
.debug("Detected OpenLineageSparkListener, passed to DatadogSparkListener");
74+
} catch (NoSuchFieldException | IllegalAccessException e) {
75+
LoggerFactory.getLogger(Config.class)
76+
.debug("Failed to pass OpenLineageSparkListener to DatadogSparkListener", e);
77+
}
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)