Skip to content

Commit 516f410

Browse files
authored
add more tracing spans (#189)
1 parent b5b4b0c commit 516f410

File tree

4 files changed

+42
-53
lines changed

4 files changed

+42
-53
lines changed

src/main/java/no/ssb/dlp/pseudo/service/pseudo/PseudoController.java

Lines changed: 19 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,7 @@ public class PseudoController {
6666
@Produces(MediaType.APPLICATION_JSON)
6767
@Post(value = "/pseudonymize/field", consumes = MediaType.APPLICATION_JSON)
6868
@ExecuteOn(TaskExecutors.BLOCKING)
69-
public HttpResponse<Flowable<byte[]>> pseudonymizeField(
70-
@SpanAttribute("pseudonymize column request") @Schema(implementation = PseudoFieldRequest.class) String request
71-
) {
69+
public HttpResponse<Flowable<byte[]>> pseudonymizeField(@SpanAttribute("pseudonymize column request") @Schema(implementation = PseudoFieldRequest.class) String request) {
7270
PseudoFieldRequest req = Json.toObject(PseudoFieldRequest.class, request);
7371
Span currentSpan = Span.current();
7472
if (currentSpan.getSpanContext().isValid() && req != null) {
@@ -82,16 +80,14 @@ public HttpResponse<Flowable<byte[]>> pseudonymizeField(
8280
final String correlationId = MDC.get("CorrelationID");
8381

8482
return HttpResponse.ok(
85-
pseudoField.process(
86-
pseudoConfigSplitter,
87-
recordProcessorFactory,
88-
req.values,
89-
PseudoOperation.PSEUDONYMIZE,
90-
correlationId
91-
)
92-
.map(o -> o.getBytes(StandardCharsets.UTF_8))
93-
)
94-
.characterEncoding(StandardCharsets.UTF_8);
83+
pseudoField.process(
84+
pseudoConfigSplitter,
85+
recordProcessorFactory,
86+
req.values,
87+
PseudoOperation.PSEUDONYMIZE,
88+
correlationId
89+
).map(o -> o.getBytes(StandardCharsets.UTF_8))
90+
).characterEncoding(StandardCharsets.UTF_8);
9591
} catch (Exception e) {
9692
return HttpResponse.serverError(Flowable.error(e));
9793
}
@@ -110,9 +106,7 @@ public HttpResponse<Flowable<byte[]>> pseudonymizeField(
110106
@Secured({PseudoServiceRole.ADMIN})
111107
@Post(value = "/depseudonymize/field", consumes = MediaType.APPLICATION_JSON)
112108
@ExecuteOn(TaskExecutors.BLOCKING)
113-
public HttpResponse<Flowable<byte[]>> depseudonymizeField(
114-
@SpanAttribute("depseudonymize column request") @Schema(implementation = DepseudoFieldRequest.class) String request
115-
) {
109+
public HttpResponse<Flowable<byte[]>> depseudonymizeField(@SpanAttribute("depseudonymize column request") @Schema(implementation = DepseudoFieldRequest.class) String request) {
116110
DepseudoFieldRequest req = Json.toObject(DepseudoFieldRequest.class, request);
117111
Span currentSpan = Span.current();
118112
if (currentSpan.getSpanContext().isValid() && req != null) {
@@ -126,10 +120,7 @@ public HttpResponse<Flowable<byte[]>> depseudonymizeField(
126120

127121
final String correlationId = MDC.get("CorrelationID");
128122

129-
return HttpResponse.ok(pseudoField.process(
130-
pseudoConfigSplitter, recordProcessorFactory,req.values, PseudoOperation.DEPSEUDONYMIZE, correlationId)
131-
.map(o -> o.getBytes(StandardCharsets.UTF_8)))
132-
.characterEncoding(StandardCharsets.UTF_8);
123+
return HttpResponse.ok(pseudoField.process(pseudoConfigSplitter, recordProcessorFactory, req.values, PseudoOperation.DEPSEUDONYMIZE, correlationId).map(o -> o.getBytes(StandardCharsets.UTF_8))).characterEncoding(StandardCharsets.UTF_8);
133124
} catch (Exception e) {
134125
return HttpResponse.serverError(Flowable.error(e));
135126
}
@@ -146,20 +137,15 @@ public HttpResponse<Flowable<byte[]>> depseudonymizeField(
146137
@Secured({PseudoServiceRole.ADMIN})
147138
@Post(value = "/repseudonymize/field", consumes = MediaType.APPLICATION_JSON)
148139
@ExecuteOn(TaskExecutors.BLOCKING)
149-
public HttpResponse<Flowable<byte[]>> repseudonymizeField(
150-
@Schema(implementation = RepseudoFieldRequest.class) String request
151-
) {
140+
public HttpResponse<Flowable<byte[]>> repseudonymizeField(@Schema(implementation = RepseudoFieldRequest.class) String request) {
152141
RepseudoFieldRequest req = Json.toObject(RepseudoFieldRequest.class, request);
153142
log.info(Strings.padEnd(String.format("*** Repseudonymize field: %s ", req.getName()), 80, '*'));
154143
PseudoField sourcePseudoField = new PseudoField(req.getName(), req.getPattern(), req.getSourcePseudoFunc(), req.getSourceKeyset());
155144
PseudoField targetPseudoField = new PseudoField(req.getName(), req.getPattern(), req.getTargetPseudoFunc(), req.getTargetKeyset());
156145
try {
157146

158147
final String correlationId = MDC.get("CorrelationID");
159-
return HttpResponse.ok(
160-
sourcePseudoField.process(recordProcessorFactory, req.values, targetPseudoField, correlationId)
161-
.map(o -> o.getBytes(StandardCharsets.UTF_8)))
162-
.characterEncoding(StandardCharsets.UTF_8);
148+
return HttpResponse.ok(sourcePseudoField.process(recordProcessorFactory, req.values, targetPseudoField, correlationId).map(o -> o.getBytes(StandardCharsets.UTF_8))).characterEncoding(StandardCharsets.UTF_8);
163149
} catch (Exception e) {
164150
return HttpResponse.serverError(Flowable.error(e));
165151
}
@@ -209,34 +195,29 @@ public static class RepseudoFieldRequest {
209195

210196
@Error
211197
public HttpResponse<JsonError> unknownPseudoKeyError(HttpRequest request, NoSuchPseudoKeyException e) {
212-
JsonError error = new JsonError(e.getMessage())
213-
.link(Link.SELF, Link.of(request.getUri()));
198+
JsonError error = new JsonError(e.getMessage()).link(Link.SELF, Link.of(request.getUri()));
214199
return HttpResponse.<JsonError>badRequest().body(error);
215200
}
216201

217202
@Error
218203
public HttpResponse<JsonError> sidIndexUnavailable(HttpRequest request, SidIndexUnavailableException e) {
219-
JsonError error = new JsonError(e.getMessage())
220-
.link(Link.SELF, Link.of(request.getUri()));
204+
JsonError error = new JsonError(e.getMessage()).link(Link.SELF, Link.of(request.getUri()));
221205
return HttpResponse.<JsonError>serverError().status(HttpStatus.SERVICE_UNAVAILABLE).body(error);
222206
}
223207

224208
@Error
225209
public HttpResponse<JsonError> illegalArgument(HttpRequest request, IllegalArgumentException e) {
226-
JsonError error = new JsonError(e.getMessage())
227-
.link(Link.SELF, Link.of(request.getUri()));
210+
JsonError error = new JsonError(e.getMessage()).link(Link.SELF, Link.of(request.getUri()));
228211
return HttpResponse.<JsonError>badRequest().body(error);
229212
}
230213

231214
@Error
232215
public HttpResponse<JsonError> sidVersionInvalid(HttpRequest request, PseudoFuncFactory.PseudoFuncInitException e) {
233-
if (e.getCause() instanceof InvocationTargetException && e.getCause().getCause() instanceof InvalidSidSnapshotDateException){
234-
JsonError error = new JsonError(e.getCause().getCause().getMessage())
235-
.link(Link.SELF, Link.of(request.getUri()));
216+
if (e.getCause() instanceof InvocationTargetException && e.getCause().getCause() instanceof InvalidSidSnapshotDateException) {
217+
JsonError error = new JsonError(e.getCause().getCause().getMessage()).link(Link.SELF, Link.of(request.getUri()));
236218
return HttpResponse.<JsonError>badRequest().body(error);
237219
}
238-
JsonError error = new JsonError(e.getMessage())
239-
.link(Link.SELF, Link.of(request.getUri()));
220+
JsonError error = new JsonError(e.getMessage()).link(Link.SELF, Link.of(request.getUri()));
240221
return HttpResponse.<JsonError>serverError().status(HttpStatus.SERVICE_UNAVAILABLE).body(error);
241222
}
242223
}

src/main/java/no/ssb/dlp/pseudo/service/pseudo/PseudoField.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package no.ssb.dlp.pseudo.service.pseudo;
22

33
import com.google.common.base.Stopwatch;
4+
import io.opentelemetry.instrumentation.annotations.AddingSpanAttributes;
5+
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
6+
import io.opentelemetry.instrumentation.annotations.WithSpan;
47
import io.reactivex.Completable;
58
import io.reactivex.Flowable;
69
import lombok.AccessLevel;
@@ -75,21 +78,23 @@ public PseudoField(String name, String pattern, String pseudoFunc, EncryptedKeys
7578
/**
7679
* Creates a Flowable that processes each value of the field, by applying the configured pseudo rules using a recordMapProcessor.
7780
* This variant of the process() method is intended for "pseudonymize" and "depseudonymize" operations.
81+
*
7882
* @param pseudoConfigSplitter The PseudoConfigSplitter instance to use for splitting pseudo configurations.
7983
* @param recordProcessorFactory The RecordMapProcessorFactory instance to use for creating a new PseudonymizeRecordProcessor.
8084
* @param values The values to be processed.
8185
* @return A Flowable stream that processes the field values by applying the configured pseudo rules, and returns them as a lists of strings.
8286
*/
83-
public Flowable<String> process(PseudoConfigSplitter pseudoConfigSplitter,
84-
RecordMapProcessorFactory recordProcessorFactory,
85-
List<String> values,
86-
PseudoOperation pseudoOperation,
87-
String correlationId) {
87+
@WithSpan
88+
public Flowable<String> process(@SpanAttribute("pseudoConfigSplitter") PseudoConfigSplitter pseudoConfigSplitter,
89+
@SpanAttribute("recordProcessorFactory") RecordMapProcessorFactory recordProcessorFactory,
90+
@SpanAttribute("values") List<String> values,
91+
@SpanAttribute("pseudoOperation") PseudoOperation pseudoOperation,
92+
String correlationId) {
8893
Stopwatch stopwatch = Stopwatch.createStarted();
8994
List<PseudoConfig> pseudoConfigs = pseudoConfigSplitter.splitIfNecessary(this.getPseudoConfig());
9095

9196
RecordMapProcessor<PseudoMetadataProcessor> recordMapProcessor;
92-
switch (pseudoOperation){
97+
switch (pseudoOperation) {
9398
case PSEUDONYMIZE -> recordMapProcessor = recordProcessorFactory.
9499
newPseudonymizeRecordProcessor(pseudoConfigs, correlationId);
95100
case DEPSEUDONYMIZE -> recordMapProcessor = recordProcessorFactory.
@@ -124,14 +129,15 @@ public Flowable<String> process(PseudoConfigSplitter pseudoConfigSplitter,
124129
/**
125130
* Creates a Flowable that processes each value of the field, by applying the configured pseudo rules using a recordMapProcessor.
126131
* This variant of the process() method is intended for the "repseudonymize" operation.
132+
*
127133
* @param recordProcessorFactory The RecordMapProcessorFactory instance to use for creating a new PseudonymizeRecordProcessor.
128134
* @param values The values to be processed.
129135
* @return A Flowable stream that processes the field values by applying the configured pseudo rules, and returns them as a lists of strings.
130136
*/
131137
public Flowable<String> process(RecordMapProcessorFactory recordProcessorFactory,
132-
List<String> values,
133-
PseudoField targetPseudoField,
134-
String correlationId) {
138+
List<String> values,
139+
PseudoField targetPseudoField,
140+
String correlationId) {
135141
Stopwatch stopwatch = Stopwatch.createStarted();
136142
PseudoConfig targetPseudoConfig = targetPseudoField.getPseudoConfig();
137143
RecordMapProcessor<PseudoMetadataProcessor> recordMapProcessor = recordProcessorFactory.

src/main/java/no/ssb/dlp/pseudo/service/pseudo/RecordMapProcessorFactory.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.github.benmanes.caffeine.cache.LoadingCache;
44
import com.google.crypto.tink.Aead;
5-
import io.opentelemetry.instrumentation.annotations.AddingSpanAttributes;
5+
import io.opentelemetry.instrumentation.annotations.WithSpan;
66
import jakarta.inject.Singleton;
77
import lombok.RequiredArgsConstructor;
88
import lombok.extern.slf4j.Slf4j;
@@ -17,7 +17,6 @@
1717
import no.ssb.dlp.pseudo.core.PseudoException;
1818
import no.ssb.dlp.pseudo.core.PseudoKeyset;
1919
import no.ssb.dlp.pseudo.core.PseudoOperation;
20-
import no.ssb.dlp.pseudo.core.PseudoSecret;
2120
import no.ssb.dlp.pseudo.core.field.FieldDescriptor;
2221
import no.ssb.dlp.pseudo.core.field.ValueInterceptorChain;
2322
import no.ssb.dlp.pseudo.core.func.PseudoFuncDeclaration;
@@ -48,13 +47,12 @@ public class RecordMapProcessorFactory {
4847
private final PseudoSecrets pseudoSecrets;
4948
private final LoadingCache<String, Aead> aeadCache;
5049

51-
@AddingSpanAttributes
50+
@WithSpan
5251
public RecordMapProcessor<PseudoMetadataProcessor> newPseudonymizeRecordProcessor(List<PseudoConfig> pseudoConfigs, String correlationId) {
5352
ValueInterceptorChain chain = new ValueInterceptorChain();
5453
PseudoMetadataProcessor metadataProcessor = new PseudoMetadataProcessor(correlationId);
5554

5655
for (PseudoConfig config : pseudoConfigs) {
57-
List<PseudoSecret> secrets = pseudoSecrets.resolve();
5856
for (PseudoKeyset keyset : config.getKeysets()) {
5957
log.info(keyset.getKekUri().toString());
6058
}
@@ -66,7 +64,7 @@ public RecordMapProcessor<PseudoMetadataProcessor> newPseudonymizeRecordProcesso
6664
return new RecordMapProcessor<>(chain, metadataProcessor);
6765
}
6866

69-
@AddingSpanAttributes
67+
@WithSpan
7068
public RecordMapProcessor<PseudoMetadataProcessor> newDepseudonymizeRecordProcessor(List<PseudoConfig> pseudoConfigs, String correlationId) {
7169
ValueInterceptorChain chain = new ValueInterceptorChain();
7270
PseudoMetadataProcessor metadataProcessor = new PseudoMetadataProcessor(correlationId);
@@ -81,7 +79,7 @@ public RecordMapProcessor<PseudoMetadataProcessor> newDepseudonymizeRecordProces
8179
return new RecordMapProcessor<>(chain, metadataProcessor);
8280
}
8381

84-
@AddingSpanAttributes
82+
@WithSpan
8583
public RecordMapProcessor<PseudoMetadataProcessor> newRepseudonymizeRecordProcessor(PseudoConfig sourcePseudoConfig,
8684
PseudoConfig targetPseudoConfig, String correlationId) {
8785
final PseudoFuncs fieldDepseudonymizer = newPseudoFuncs(sourcePseudoConfig.getRules(),

src/main/java/no/ssb/dlp/pseudo/service/pseudo/metadata/PseudoMetadataProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package no.ssb.dlp.pseudo.service.pseudo.metadata;
22

3+
import io.opentelemetry.instrumentation.annotations.WithSpan;
34
import io.reactivex.processors.ReplayProcessor;
45
import lombok.Value;
56
import no.ssb.dlp.pseudo.core.util.Json;
@@ -34,12 +35,15 @@ public void addLog(String log) {
3435
public void addMetric(FieldMetric fieldMetric) {
3536
metrics.onNext(fieldMetric);
3637
}
38+
@WithSpan
3739
public Publisher<String> getMetadata() {
3840
return datadocMetadata.map(FieldMetadata::toDatadocVariable).map(Json::from);
3941
}
42+
@WithSpan
4043
public Publisher<String> getLogs() {
4144
return logs.map(Json::from);
4245
}
46+
@WithSpan
4347
public Publisher<String> getMetrics() {
4448
return metrics.groupBy(FieldMetric::name)
4549
.flatMapSingle(group ->

0 commit comments

Comments
 (0)