diff --git a/.github/workflows/build-deploy-app.yml b/.github/workflows/build-deploy-app.yml index cd9bdae4..1fb4db29 100644 --- a/.github/workflows/build-deploy-app.yml +++ b/.github/workflows/build-deploy-app.yml @@ -3,7 +3,7 @@ on: types: [ published ] push: branches: - - master + - vibecoded-test paths-ignore: - "**/*.md" - "Makefile" @@ -131,9 +131,6 @@ jobs: echo "nais_config_path=.nais/test/nais.yaml" >> "$GITHUB_OUTPUT" fi - - - deploy: name: Deploy to NAIS needs: build-push diff --git a/.gitignore b/.gitignore index e26dad9a..2a518ea8 100755 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,5 @@ http-client.private.env.json .direnv/ .envrc +.tools/ +.metals/ \ No newline at end of file diff --git a/.metals/.reports/metals-full/2026-02-18/r_empty-definition_14-57-39-956.md b/.metals/.reports/metals-full/2026-02-18/r_empty-definition_14-57-39-956.md new file mode 100644 index 00000000..8a01596f --- /dev/null +++ b/.metals/.reports/metals-full/2026-02-18/r_empty-definition_14-57-39-956.md @@ -0,0 +1,309 @@ +error id: file:///src/test/java/no/ssb/dlp/pseudo/service/performance/PseudoServiceDaeadPerformanceTest.java: +file:///src/test/java/no/ssb/dlp/pseudo/service/performance/PseudoServiceDaeadPerformanceTest.java +empty definition using pc, found symbol in pc: +empty definition using semanticdb +empty definition using fallback +non-local guesses: + +offset: 3633 +uri: file:///src/test/java/no/ssb/dlp/pseudo/service/performance/PseudoServiceDaeadPerformanceTest.java +text: +```scala +package no.ssb.dlp.pseudo.service.performance; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.crypto.tink.Aead; +import com.google.crypto.tink.JsonKeysetWriter; +import com.google.crypto.tink.KeyTemplates; +import com.google.crypto.tink.KeysetHandle; +import com.google.crypto.tink.aead.AeadConfig; +import com.google.crypto.tink.daead.DeterministicAeadConfig; +import no.ssb.dlp.pseudo.core.util.Json; +import no.ssb.dlp.pseudo.core.tink.model.EncryptedKeysetWrapper; +import no.ssb.dlp.pseudo.service.pseudo.PseudoConfigSplitter; +import no.ssb.dlp.pseudo.service.pseudo.PseudoController; +import no.ssb.dlp.pseudo.service.pseudo.PseudoSecrets; +import no.ssb.dlp.pseudo.service.pseudo.RecordMapProcessorFactory; +import no.ssb.dlp.pseudo.service.pseudo.StreamProcessorFactory; +import no.ssb.dlp.pseudo.service.secrets.MockSecretService; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.SplittableRandom; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class PseudoServiceDaeadPerformanceTest { + + private static final String ENABLED_PROP = "pseudo.performance.enabled"; + private static final String BATCH_SIZE_PROP = "pseudo.performance.batchSize"; + private static final String WARMUP_ROUNDS_PROP = "pseudo.performance.warmupRounds"; + private static final String MEASURE_ROUNDS_PROP = "pseudo.performance.measureRounds"; + + private static final URI TEST_KEK_URI = URI.create("test-kek://local/master-key"); + + private static PseudoController controller; + private static EncryptedKeysetWrapper keyset; + + @BeforeAll + static void setUp() throws Exception { + AeadConfig.register(); + DeterministicAeadConfig.register(); + + Aead masterAead = KeysetHandle.generateNew(KeyTemplates.get("AES256_GCM")) + .getPrimitive(Aead.class); + + keyset = createWrappedDaeadKeyset(masterAead, TEST_KEK_URI); + + LoadingCache aeadCache = Caffeine.newBuilder() + .maximumSize(100) + .build(uri -> { + if (!TEST_KEK_URI.toString().equals(uri)) { + throw new IllegalArgumentException("Unknown KEK URI in test: " + uri); + } + return masterAead; + }); + + PseudoSecrets pseudoSecrets = new PseudoSecrets(new MockSecretService(), Map.of()); + RecordMapProcessorFactory recordMapProcessorFactory = + new RecordMapProcessorFactory(pseudoSecrets, aeadCache); + + controller = new PseudoController( + new StreamProcessorFactory(), + recordMapProcessorFactory, + new PseudoConfigSplitter() + ); + } + + @Test + void benchmarkDaeadFieldEndpoints() throws Exception { + Assumptions.assumeTrue(Boolean.getBoolean(ENABLED_PROP), + () -> "Skipping performance benchmark. Run with -D" + ENABLED_PROP + "=true"); + + final int batchSize = Integer.getInteger(BATCH_SIZE_PROP, 10_000); + final int warmupRounds = Integer.getInteger(WARMUP_ROUNDS_PROP, 2); + final int@@ measureRounds = Integer.getInteger(MEASURE_ROUNDS_PROP, 8); + + List inputValues = generateValues(batchSize, 42L); + String daeadFunc = "daead(keyId=" + keyset.primaryKeyId() + ")"; + + PseudoController.PseudoFieldRequest pseudoReq = new PseudoController.PseudoFieldRequest(); + pseudoReq.setName("fnr"); + pseudoReq.setPattern("**"); + pseudoReq.setPseudoFunc(daeadFunc); + pseudoReq.setKeyset(keyset); + pseudoReq.setValues(inputValues); + + String pseudoReqJson = Json.from(pseudoReq); + + BenchmarkResult pseudonymizeResult = benchmark( + "pseudonymize", + warmupRounds, + measureRounds, + () -> { + String responseJson = callPseudonymize(pseudoReqJson); + List data = extractDataValues(responseJson); + assertEquals(inputValues.size(), data.size()); + return data; + } + ); + + List pseudonymizedValues = callPseudonymizeAndExtractData(pseudoReqJson); + + PseudoController.DepseudoFieldRequest depseudoReq = new PseudoController.DepseudoFieldRequest(); + depseudoReq.setName("fnr"); + depseudoReq.setPattern("**"); + depseudoReq.setPseudoFunc(daeadFunc); + depseudoReq.setKeyset(keyset); + depseudoReq.setValues(pseudonymizedValues); + + String depseudoReqJson = Json.from(depseudoReq); + + BenchmarkResult depseudonymizeResult = benchmark( + "depseudonymize", + warmupRounds, + measureRounds, + () -> { + String responseJson = callDepseudonymize(depseudoReqJson); + List restored = extractDataValues(responseJson); + assertEquals(inputValues, restored); + return restored; + } + ); + + Map report = new LinkedHashMap<>(); + report.put("timestamp", Instant.now().toString()); + report.put("batchSize", batchSize); + report.put("warmupRounds", warmupRounds); + report.put("measureRounds", measureRounds); + report.put("pseudonymize", pseudonymizeResult.toMap()); + report.put("depseudonymize", depseudonymizeResult.toMap()); + + Path reportPath = Path.of("target", "performance", "pseudo-service-daead-field.json"); + Files.createDirectories(reportPath.getParent()); + Files.writeString(reportPath, Json.prettyFrom(report), StandardCharsets.UTF_8); + + System.out.println("DAEAD benchmark report written to: " + reportPath.toAbsolutePath()); + System.out.println(Json.prettyFrom(report)); + } + + private static BenchmarkResult benchmark(String name, + int warmupRounds, + int measureRounds, + Supplier> call) { + for (int i = 0; i < warmupRounds; i++) { + call.get(); + } + + List elapsedMillis = new ArrayList<>(measureRounds); + int itemCount = -1; + for (int i = 0; i < measureRounds; i++) { + long start = System.nanoTime(); + List data = call.get(); + long end = System.nanoTime(); + if (itemCount < 0) { + itemCount = data.size(); + } + elapsedMillis.add((end - start) / 1_000_000d); + } + + return BenchmarkResult.of(name, itemCount, elapsedMillis); + } + + private static String callPseudonymize(String requestJson) { + return collectBody( + controller.pseudonymizeField(requestJson).body() + ); + } + + private static List callPseudonymizeAndExtractData(String requestJson) { + return extractDataValues(callPseudonymize(requestJson)); + } + + private static String callDepseudonymize(String requestJson) { + return collectBody( + controller.depseudonymizeField(requestJson).body() + ); + } + + private static String collectBody(io.reactivex.Flowable body) { + StringBuilder sb = new StringBuilder(); + for (byte[] part : body.blockingIterable()) { + sb.append(new String(part, StandardCharsets.UTF_8)); + } + return sb.toString(); + } + + private static List extractDataValues(String responseJson) { + Map payload = Json.toGenericMap(responseJson); + Object dataObj = payload.get("data"); + assertNotNull(dataObj, "Expected response to contain data array"); + + List raw = (List) dataObj; + List values = new ArrayList<>(raw.size()); + for (Object value : raw) { + values.add(value == null ? null : String.valueOf(value)); + } + return values; + } + + private static List generateValues(int size, long seed) { + SplittableRandom random = new SplittableRandom(seed); + List values = new ArrayList<>(size); + final String alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + + for (int i = 0; i < size; i++) { + int len = 10 + random.nextInt(11); + StringBuilder sb = new StringBuilder(len); + for (int c = 0; c < len; c++) { + sb.append(alphabet.charAt(random.nextInt(alphabet.length()))); + } + values.add(sb.toString()); + } + + return values; + } + + private static EncryptedKeysetWrapper createWrappedDaeadKeyset(Aead masterAead, URI kekUri) throws Exception { + KeysetHandle dataKeyset = KeysetHandle.generateNew(KeyTemplates.get("AES256_SIV")); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + dataKeyset.write(JsonKeysetWriter.withOutputStream(baos), masterAead); + + EncryptedKeysetWrapper wrapper = Json.toObject( + EncryptedKeysetWrapper.class, + baos.toString(StandardCharsets.UTF_8) + ); + wrapper.setKekUri(kekUri); + return wrapper; + } + + private record BenchmarkResult( + String name, + int itemCount, + double minMs, + double maxMs, + double avgMs, + double p50Ms, + double p95Ms, + double throughputPerSec + ) { + private static BenchmarkResult of(String name, int itemCount, List elapsedMillis) { + List sorted = new ArrayList<>(elapsedMillis); + Collections.sort(sorted); + + double min = sorted.get(0); + double max = sorted.get(sorted.size() - 1); + double sum = sorted.stream().mapToDouble(Double::doubleValue).sum(); + double avg = sum / sorted.size(); + double p50 = percentile(sorted, 0.50); + double p95 = percentile(sorted, 0.95); + double throughput = (itemCount * 1000d) / avg; + + return new BenchmarkResult(name, itemCount, min, max, avg, p50, p95, throughput); + } + + private static double percentile(List sorted, double p) { + if (sorted.size() == 1) { + return sorted.get(0); + } + int idx = (int) Math.ceil(p * sorted.size()) - 1; + idx = Math.max(0, Math.min(idx, sorted.size() - 1)); + return sorted.get(idx); + } + + private Map toMap() { + Map map = new LinkedHashMap<>(); + map.put("name", name); + map.put("items", itemCount); + map.put("minMs", minMs); + map.put("maxMs", maxMs); + map.put("avgMs", avgMs); + map.put("p50Ms", p50Ms); + map.put("p95Ms", p95Ms); + map.put("throughputItemsPerSec", throughputPerSec); + return map; + } + } +} + +``` + + +#### Short summary: + +empty definition using pc, found symbol in pc: \ No newline at end of file diff --git a/.nais/test/nais.yaml b/.nais/test/nais.yaml index d276ba66..e9068b22 100644 --- a/.nais/test/nais.yaml +++ b/.nais/test/nais.yaml @@ -1,7 +1,7 @@ apiVersion: nais.io/v1alpha1 kind: Application metadata: - name: pseudo-service + name: pseudo-service-develop namespace: {{team}} labels: team: {{team}} @@ -14,8 +14,8 @@ spec: port: 10210 terminationGracePeriodSeconds: 180 replicas: - max: 5 - min: 2 + max: 1 + min: 1 resources: requests: cpu: 100m @@ -24,8 +24,8 @@ spec: memory: 6Gi ingresses: - - https://pseudo-service.intern.test.ssb.no - - https://pseudo-service.test.ssb.no + - https://pseudo-service-develop.intern.test.ssb.no + - https://pseudo-service-develop.test.ssb.no accessPolicy: outbound: diff --git a/.tools/async-profiler-4.3-macos.zip b/.tools/async-profiler-4.3-macos.zip new file mode 100644 index 00000000..77f5428d Binary files /dev/null and b/.tools/async-profiler-4.3-macos.zip differ diff --git a/.tools/async-profiler-4.3-macos/LICENSE b/.tools/async-profiler-4.3-macos/LICENSE new file mode 100644 index 00000000..8dada3ed --- /dev/null +++ b/.tools/async-profiler-4.3-macos/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/.tools/async-profiler-4.3-macos/README.md b/.tools/async-profiler-4.3-macos/README.md new file mode 100644 index 00000000..b3c41131 --- /dev/null +++ b/.tools/async-profiler-4.3-macos/README.md @@ -0,0 +1,117 @@ +# Async-profiler + +This project is a low overhead sampling profiler for Java +that does not suffer from the [Safepoint bias problem](http://psy-lob-saw.blogspot.ru/2016/02/why-most-sampling-java-profilers-are.html). +It features HotSpot-specific API to collect stack traces +and to track memory allocations. The profiler works with +OpenJDK and other Java runtimes based on the HotSpot JVM. + +Unlike traditional Java profilers, async-profiler monitors non-Java threads +(e.g., GC and JIT compiler threads) and shows native and kernel frames in stack traces. + +What can be profiled: + +- CPU time +- Allocations in Java Heap +- Native memory allocations and leaks +- Contended locks +- Hardware and software performance counters like cache misses, page faults, context switches +- and [more](docs/ProfilingModes.md). + +See our [3 hours playlist](https://www.youtube.com/playlist?list=PLNCLTEx3B8h4Yo_WvKWdLvI9mj1XpTKBr) +to learn about more features. + +# Download + +### Stable release: [4.2.1](https://github.com/async-profiler/async-profiler/releases/tag/v4.2.1) + +- Linux x64: [async-profiler-4.2.1-linux-x64.tar.gz](https://github.com/async-profiler/async-profiler/releases/download/v4.2.1/async-profiler-4.2.1-linux-x64.tar.gz) +- Linux arm64: [async-profiler-4.2.1-linux-arm64.tar.gz](https://github.com/async-profiler/async-profiler/releases/download/v4.2.1/async-profiler-4.2.1-linux-arm64.tar.gz) +- macOS arm64/x64: [async-profiler-4.2.1-macos.zip](https://github.com/async-profiler/async-profiler/releases/download/v4.2.1/async-profiler-4.2.1-macos.zip) +- Profile converters: [jfr-converter.jar](https://github.com/async-profiler/async-profiler/releases/download/v4.2.1/jfr-converter.jar) + +### Nightly builds + +[The most recent binaries](https://github.com/async-profiler/async-profiler/releases/tag/nightly) corresponding +to the latest successful commit in `master`. + +For a build corresponding to one of the previous commits, go to +[Nightly Builds](https://github.com/async-profiler/async-profiler/actions/workflows/test-and-publish-nightly.yml), +click the desired build and scroll down to the artifacts section. These binaries are kept for 30 days. + +# Quick start + +In a typical use case, profiling a Java application is just a matter of a running `asprof` with a PID of a +running Java process. + +``` +$ asprof -d 30 -f flamegraph.html +``` + +The above command translates to: run profiler for 30 seconds and save results to `flamegraph.html` +as an interactive [Flame Graph](docs/FlamegraphInterpretation.md) that can be viewed in a browser. + +[![FlameGraph](/.assets/images/flamegraph.png)](https://htmlpreview.github.io/?https://github.com/async-profiler/async-profiler/blob/master/.assets/html/flamegraph.html) + +Find more details in the [Getting started guide](docs/GettingStarted.md). + +# Building + +### Build status + +[![Build Status](https://github.com/async-profiler/async-profiler/actions/workflows/test-and-publish-nightly.yml/badge.svg?branch=master)](https://github.com/async-profiler/async-profiler/actions/workflows/test-and-publish-nightly.yml) + +### Minimum requirements + +- make +- GCC 7.5.0+ or Clang 7.0.0+ +- Static version of libstdc++ (e.g. on Amazon Linux 2023: `yum install libstdc++-static`) +- JDK 11+ + +### How to build + +Make sure `gcc`, `g++` and `java` are available on the `PATH`. +Navigate to the root directory with async-profiler sources and run `make`. +async-profiler launcher will be available at `build/bin/asprof`. + +Other Makefile targets: + +- `make test` - run unit and integration tests; +- `make release` - package async-profiler binaries as `.tar.gz` (Linux) or `.zip` (macOS). + +### Supported platforms + +| | Officially maintained builds | Other available ports | +| --------- | ---------------------------- | ----------------------------------------- | +| **Linux** | x64, arm64 | x86, arm32, ppc64le, riscv64, loongarch64 | +| **macOS** | x64, arm64 | | + +# Documentation + +## Basic usage + +- [Getting Started](docs/GettingStarted.md) +- [Profiler Options](docs/ProfilerOptions.md) +- [Profiling Modes](docs/ProfilingModes.md) +- [Integrating async-profiler](docs/IntegratingAsyncProfiler.md) +- [Profiling In Container](docs/ProfilingInContainer.md) + +## Profiler output + +- [Output Formats](docs/OutputFormats.md) +- [FlameGraph Interpretation](docs/FlamegraphInterpretation.md) +- [JFR Visualization](docs/JfrVisualization.md) +- [Converter Usage](docs/ConverterUsage.md) +- [Heatmap](docs/Heatmap.md) + +## Advanced usage + +- [CPU Sampling Engines](docs/CpuSamplingEngines.md) +- [Stack Walking Modes](docs/StackWalkingModes.md) +- [Advanced Stacktrace Features](docs/AdvancedStacktraceFeatures.md) +- [Profiling Non-Java Applications](docs/ProfilingNonJavaApplications.md) + +## Troubleshooting + +For known issues faced while running async-profiler and their detailed troubleshooting, +please refer [here](docs/Troubleshooting.md). diff --git a/.tools/async-profiler-4.3-macos/bin/asprof b/.tools/async-profiler-4.3-macos/bin/asprof new file mode 100755 index 00000000..d7b87f20 Binary files /dev/null and b/.tools/async-profiler-4.3-macos/bin/asprof differ diff --git a/.tools/async-profiler-4.3-macos/bin/jfrconv b/.tools/async-profiler-4.3-macos/bin/jfrconv new file mode 100755 index 00000000..9491cd38 Binary files /dev/null and b/.tools/async-profiler-4.3-macos/bin/jfrconv differ diff --git a/.tools/async-profiler-4.3-macos/include/asprof.h b/.tools/async-profiler-4.3-macos/include/asprof.h new file mode 100644 index 00000000..3f6cbfdc --- /dev/null +++ b/.tools/async-profiler-4.3-macos/include/asprof.h @@ -0,0 +1,106 @@ +/* + * Copyright The async-profiler authors + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _ASPROF_H +#define _ASPROF_H + +#include +#include + +#ifdef __clang__ +# define DLLEXPORT __attribute__((visibility("default"))) +#else +# define DLLEXPORT __attribute__((visibility("default"),externally_visible)) +#endif + +#define WEAK __attribute__((weak)) + +#ifdef __cplusplus +extern "C" { +#endif + + +typedef const char* asprof_error_t; +typedef void (*asprof_writer_t)(const char* buf, size_t size); + +// Should be called once prior to any other API functions +DLLEXPORT void asprof_init(); +typedef void (*asprof_init_t)(); + +// Returns an error message for the given error code or NULL if there is no error +DLLEXPORT const char* asprof_error_str(asprof_error_t err); +typedef const char* (*asprof_error_str_t)(asprof_error_t err); + +// Executes async-profiler command using output_callback as an optional sink +// for the profiler output. Returning an error code or NULL on success. +DLLEXPORT asprof_error_t asprof_execute(const char* command, asprof_writer_t output_callback); +typedef asprof_error_t (*asprof_execute_t)(const char* command, asprof_writer_t output_callback); + +// This API is UNSTABLE and might change or be removed in the next version of async-profiler. +typedef struct { + // A thread-local sample counter, which increments (not necessarily by 1) every time a + // stack profiling sample is taken using a profiling signal. + // + // The counter might be initialized lazily, only starting counting from 0 the first time + // `asprof_get_thread_local_data` is called on a given thread. Further calls to + // `asprof_get_thread_local_data` on a given thread will of course not reset the counter. + volatile uint64_t sample_counter; +} asprof_thread_local_data; + +// This API is UNSTABLE and might change or be removed in the next version of async-profiler. +// +// Gets a pointer to asprof's thread-local data structure, see `asprof_thread_local_data`'s +// documentation for the details of each field. This function might lazily initialize that +// structure. +// +// This function can return NULL either if the profiler is not yet initializer, or in +// case of an allocation failure. +// +// This function is *not* async-signal-safe. However, it is safe to call concurrently +// with async-profiler operations, including initialization. +DLLEXPORT asprof_thread_local_data* asprof_get_thread_local_data(void); +typedef asprof_thread_local_data* (*asprof_get_thread_local_data_t)(void); + + +typedef int asprof_jfr_event_key; + +// This API is UNSTABLE and might change or be removed in the next version of async-profiler. +// +// Return a asprof_jfr_event_key identifier for a user-defined JFR key. +// That identifier can then be used in `asprof_emit_jfr_event` +// +// The name is required to be valid (since it's a C string, NUL-free) UTF-8. +// +// Returns -1 on failure. +DLLEXPORT asprof_jfr_event_key asprof_register_jfr_event(const char* name); +typedef asprof_jfr_event_key (*asprof_register_jfr_event_t)(const char* name); + + +#define ASPROF_MAX_JFR_EVENT_LENGTH 2048 + +// This API is UNSTABLE and might change or be removed in the next version of async-profiler. +// +// Emits a custom, user-defined JFR event. The key should be created via `asprof_register_jfr_event`. +// The data can be arbitrary binary data, with size <= ASPROF_MAX_JFR_EVENT_LENGTH. +// +// User-defined events are included in the JFR under a `profiler.UserEvent` event type. That type will contain +// (at least) the following fields: +// 1. `startTime` [Long] - the emitted event's time in ticks. +// 2. `eventThread` [java.lang.Thread] - the thread that emitted the events. +// 3. `type` [profiler.types.UserEventType] - the event's type, +// where `profiler.types.UserEventType` is an indexed string from the JFR constant pool. +// 4. `data` [String] - the event data. This is the Latin-1 encoded version of the inputted data. +// The Latin-1 encoding is used as a way to stuff the arbitrary byte input into something +// that JFR supports (JFR technically supports byte arrays, but `jfr print` doesn't). +// +// Returns an error code or NULL on success. +DLLEXPORT asprof_error_t asprof_emit_jfr_event(asprof_jfr_event_key type, const uint8_t* data, size_t len); +typedef asprof_error_t (*asprof_emit_jfr_event_t)(asprof_jfr_event_key type, const uint8_t* data, size_t len); + +#ifdef __cplusplus +} +#endif + +#endif // _ASPROF_H diff --git a/.tools/async-profiler-4.3-macos/lib/libasyncProfiler.dylib b/.tools/async-profiler-4.3-macos/lib/libasyncProfiler.dylib new file mode 100644 index 00000000..a0fe4630 Binary files /dev/null and b/.tools/async-profiler-4.3-macos/lib/libasyncProfiler.dylib differ diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..e72490fb --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "files.watcherExclude": { + "**/target": true + } +} \ No newline at end of file diff --git a/doc/performance-profiling.md b/doc/performance-profiling.md new file mode 100644 index 00000000..07cd0b70 --- /dev/null +++ b/doc/performance-profiling.md @@ -0,0 +1,50 @@ +# Profiling pseudonymization hot path + +This project includes two profiler helpers for the DAEAD field benchmark: + +- `scripts/perf/run-jfr-benchmark.sh` +- `scripts/perf/run-async-profiler.sh` + +## 1) JFR (built into JDK) + +Run: + +```bash +./scripts/perf/run-jfr-benchmark.sh +``` + +Output: + +- `target/performance/pseudo-service-benchmark.jfr` + +Open this recording in Java Mission Control. + +## 2) async-profiler flamegraph + +Run wall-clock profile: + +```bash +./scripts/perf/run-async-profiler.sh wall 15 +``` + +Run CPU profile: + +```bash +./scripts/perf/run-async-profiler.sh cpu 15 +``` + +Output: + +- `target/performance/flamegraph-.html` +- `target/performance/flamegraph-.collapsed` +- `target/performance/benchmark-under-profiler.log` + +The script auto-downloads async-profiler into `.tools/` on first run. + +## 3) Quick collapsed-stack analysis + +```bash +./scripts/perf/analyze-collapsed.py target/performance/flamegraph-cpu.collapsed +``` + +This prints top sampled frames and top frames under service/core/tink packages. diff --git a/pom.xml b/pom.xml index 9b242c27..a40d6e02 100644 --- a/pom.xml +++ b/pom.xml @@ -180,7 +180,7 @@ no.ssb.dapla.dlp.pseudo dapla-dlp-pseudo-core - ${dapla-dlp-pseudo-core.version} + 2.0.9 jakarta.inject diff --git a/scripts/perf/analyze-collapsed.py b/scripts/perf/analyze-collapsed.py new file mode 100755 index 00000000..58602373 --- /dev/null +++ b/scripts/perf/analyze-collapsed.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import sys +from collections import defaultdict +from pathlib import Path + + +def main() -> int: + if len(sys.argv) != 2: + print("Usage: analyze-collapsed.py ") + return 2 + + path = Path(sys.argv[1]) + if not path.exists(): + print(f"File not found: {path}") + return 2 + + total = 0 + top_frame = defaultdict(int) + top_service_frame = defaultdict(int) + top_core_frame = defaultdict(int) + top_tink_frame = defaultdict(int) + + with path.open("r", encoding="utf-8", errors="ignore") as fh: + for line in fh: + line = line.strip() + if not line: + continue + try: + stack, value = line.rsplit(" ", 1) + samples = int(value) + except ValueError: + continue + frames = stack.split(";") + if not frames: + continue + + total += samples + leaf = frames[-1] + top_frame[leaf] += samples + + for frame in reversed(frames): + if "no/ssb/dlp/pseudo/service" in frame: + top_service_frame[frame] += samples + break + for frame in reversed(frames): + if "no/ssb/dlp/pseudo/core" in frame: + top_core_frame[frame] += samples + break + for frame in reversed(frames): + if "com/google/crypto/tink" in frame or "no/ssb/crypto/tink" in frame: + top_tink_frame[frame] += samples + break + + if total == 0: + print("No samples found") + return 1 + + def print_top(title: str, data: dict[str, int], limit: int = 12) -> None: + print(f"\n{title}") + for frame, count in sorted(data.items(), key=lambda x: x[1], reverse=True)[:limit]: + pct = 100.0 * count / total + print(f" {pct:6.2f}% {frame}") + + print(f"Total samples: {total}") + print_top("Top leaf frames", top_frame) + print_top("Top service frames", top_service_frame) + print_top("Top core frames", top_core_frame) + print_top("Top tink frames", top_tink_frame) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/perf/run-async-profiler.sh b/scripts/perf/run-async-profiler.sh new file mode 100755 index 00000000..ec14dfbc --- /dev/null +++ b/scripts/perf/run-async-profiler.sh @@ -0,0 +1,68 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)" +TOOLS_DIR="$ROOT_DIR/.tools" +OUT_DIR="$ROOT_DIR/target/performance" +ASYNC_PROFILER_VERSION="4.3" +ASYNC_PROFILER_ZIP="async-profiler-${ASYNC_PROFILER_VERSION}-macos.zip" +ASYNC_PROFILER_URL="https://github.com/async-profiler/async-profiler/releases/download/v${ASYNC_PROFILER_VERSION}/${ASYNC_PROFILER_ZIP}" +ASYNC_PROFILER_DIR="$TOOLS_DIR/async-profiler-${ASYNC_PROFILER_VERSION}-macos" +ASPROF_BIN="$ASYNC_PROFILER_DIR/bin/asprof" + +EVENT="${1:-wall}" +DURATION_SECONDS="${2:-20}" + +mkdir -p "$TOOLS_DIR" "$OUT_DIR" + +if [[ ! -x "$ASPROF_BIN" ]]; then + echo "Installing async-profiler ${ASYNC_PROFILER_VERSION} into $TOOLS_DIR ..." + curl -L --fail "$ASYNC_PROFILER_URL" -o "$TOOLS_DIR/$ASYNC_PROFILER_ZIP" + rm -rf "$ASYNC_PROFILER_DIR" + unzip -q "$TOOLS_DIR/$ASYNC_PROFILER_ZIP" -d "$TOOLS_DIR" +fi + +SVG_FILE="$OUT_DIR/flamegraph-${EVENT}.html" +COLLAPSED_FILE="$OUT_DIR/flamegraph-${EVENT}.collapsed" + +echo "Starting benchmark JVM..." +"$ROOT_DIR/mvnw" \ + -Dtest=PseudoServiceDaeadPerformanceTest \ + -Dsurefire.forkCount=1 \ + -Dsurefire.reuseForks=false \ + -Dpseudo.performance.enabled=true \ + -Dpseudo.performance.batchSize=10000 \ + -Dpseudo.performance.warmupRounds=5 \ + -Dpseudo.performance.measureRounds=240 \ + test > "$OUT_DIR/benchmark-under-profiler.log" 2>&1 & + +MVN_PID=$! +trap 'kill "$MVN_PID" 2>/dev/null || true' EXIT + +echo "Waiting for surefire JVM PID..." +TARGET_PID="" +for _ in {1..120}; do + CHILD_PID="$(pgrep -P "$MVN_PID" | head -n1 || true)" + if [[ -n "$CHILD_PID" ]]; then + TARGET_PID="$(pgrep -P "$CHILD_PID" | head -n1 || true)" + fi + if [[ -n "$TARGET_PID" ]]; then + break + fi + sleep 0.5 +done + +if [[ -z "$TARGET_PID" ]]; then + echo "Could not find surefire JVM. See $OUT_DIR/benchmark-under-profiler.log" + exit 1 +fi + +echo "Profiling PID $TARGET_PID for ${DURATION_SECONDS}s ($EVENT) ..." +"$ASPROF_BIN" -d "$DURATION_SECONDS" -e "$EVENT" -f "$SVG_FILE" "$TARGET_PID" +"$ASPROF_BIN" -d "$DURATION_SECONDS" -e "$EVENT" -o collapsed -f "$COLLAPSED_FILE" "$TARGET_PID" + +wait "$MVN_PID" +trap - EXIT + +echo "Flamegraph written to: $SVG_FILE" +echo "Collapsed stacks written to: $COLLAPSED_FILE" diff --git a/scripts/perf/run-jfr-benchmark.sh b/scripts/perf/run-jfr-benchmark.sh new file mode 100755 index 00000000..af794f21 --- /dev/null +++ b/scripts/perf/run-jfr-benchmark.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)" +OUT_DIR="$ROOT_DIR/target/performance" +JFR_FILE="$OUT_DIR/pseudo-service-benchmark.jfr" + +mkdir -p "$OUT_DIR" + +echo "Running benchmark with JFR recording..." +MAVEN_OPTS="-XX:StartFlightRecording=filename=$JFR_FILE,dumponexit=true,settings=profile" \ + "$ROOT_DIR/mvnw" \ + -Dtest=PseudoServiceDaeadPerformanceTest \ + -Dpseudo.performance.enabled=true \ + -Dpseudo.performance.batchSize=10000 \ + -Dpseudo.performance.warmupRounds=5 \ + -Dpseudo.performance.measureRounds=30 \ + test + +echo "JFR written to: $JFR_FILE" diff --git a/src/main/java/no/ssb/dlp/pseudo/service/pseudo/PseudoController.java b/src/main/java/no/ssb/dlp/pseudo/service/pseudo/PseudoController.java index 2a7fc34b..10900039 100644 --- a/src/main/java/no/ssb/dlp/pseudo/service/pseudo/PseudoController.java +++ b/src/main/java/no/ssb/dlp/pseudo/service/pseudo/PseudoController.java @@ -84,7 +84,9 @@ public HttpResponse> pseudonymizeField( @Schema(implementation = PseudoFieldRequest.class) String request ) { PseudoFieldRequest req = Json.toObject(PseudoFieldRequest.class, request); - log.info(Strings.padEnd(String.format("*** Pseudonymize field: %s ", req.getName()), 80, '*')); + if (log.isDebugEnabled()) { + log.debug(Strings.padEnd(String.format("*** Pseudonymize field: %s ", req.getName()), 80, '*')); + } PseudoField pseudoField = new PseudoField(req.getName(), req.getPattern(), req.getPseudoFunc(), req.getKeyset()); try { final String correlationId = MDC.get("CorrelationID"); @@ -105,6 +107,38 @@ public HttpResponse> pseudonymizeField( } } + @Operation(summary = "Pseudonymize field fast", description = "Pseudonymize a single field with optimized path.") + @Produces(MediaType.APPLICATION_JSON) + @Post(value = "/pseudonymize/field-fast", consumes = MediaType.APPLICATION_JSON) + @ExecuteOn(TaskExecutors.BLOCKING) + public HttpResponse> pseudonymizeFieldFast( + @Schema(implementation = PseudoFieldRequest.class) String request, + @QueryValue(defaultValue = "false") boolean minimalMetricsMode + ) { + PseudoFieldRequest req = Json.toObject(PseudoFieldRequest.class, request); + if (log.isDebugEnabled()) { + log.debug(Strings.padEnd(String.format("*** Pseudonymize field fast: %s ", req.getName()), 80, '*')); + } + PseudoField pseudoField = new PseudoField(req.getName(), req.getPattern(), req.getPseudoFunc(), req.getKeyset()); + try { + final String correlationId = MDC.get("CorrelationID"); + + return HttpResponse.ok( + pseudoField.processFastPseudonymize( + pseudoConfigSplitter, + recordProcessorFactory, + req.values, + correlationId, + minimalMetricsMode + ) + .map(o -> o.getBytes(StandardCharsets.UTF_8)) + ) + .characterEncoding(StandardCharsets.UTF_8); + } catch (Exception e) { + return HttpResponse.serverError(Flowable.error(e)); + } + } + /** * Depseudonymizes a field. * @@ -120,7 +154,9 @@ public HttpResponse> depseudonymizeField( @Schema(implementation = DepseudoFieldRequest.class) String request ) { DepseudoFieldRequest req = Json.toObject(DepseudoFieldRequest.class, request); - log.info(Strings.padEnd(String.format("*** Depseudonymize field: %s ", req.getName()), 80, '*')); + if (log.isDebugEnabled()) { + log.debug(Strings.padEnd(String.format("*** Depseudonymize field: %s ", req.getName()), 80, '*')); + } PseudoField pseudoField = new PseudoField(req.getName(), req.getPattern(), req.getPseudoFunc(), req.getKeyset()); try { @@ -150,7 +186,9 @@ public HttpResponse> repseudonymizeField( @Schema(implementation = RepseudoFieldRequest.class) String request ) { RepseudoFieldRequest req = Json.toObject(RepseudoFieldRequest.class, request); - log.info(Strings.padEnd(String.format("*** Repseudonymize field: %s ", req.getName()), 80, '*')); + if (log.isDebugEnabled()) { + log.debug(Strings.padEnd(String.format("*** Repseudonymize field: %s ", req.getName()), 80, '*')); + } PseudoField sourcePseudoField = new PseudoField(req.getName(), req.getPattern(), req.getSourcePseudoFunc(), req.getSourceKeyset()); PseudoField targetPseudoField = new PseudoField(req.getName(), req.getPattern(), req.getTargetPseudoFunc(), req.getTargetKeyset()); try { diff --git a/src/main/java/no/ssb/dlp/pseudo/service/pseudo/PseudoField.java b/src/main/java/no/ssb/dlp/pseudo/service/pseudo/PseudoField.java index d712fde9..55756ec0 100644 --- a/src/main/java/no/ssb/dlp/pseudo/service/pseudo/PseudoField.java +++ b/src/main/java/no/ssb/dlp/pseudo/service/pseudo/PseudoField.java @@ -17,6 +17,7 @@ import no.ssb.dlp.pseudo.service.pseudo.metadata.PseudoMetadataProcessor; import java.util.List; +import java.util.ArrayList; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -106,17 +107,19 @@ public Flowable process(PseudoConfigSplitter pseudoConfigSplitter, Flowable result = preprocessor.andThen(Flowable.fromIterable(values.stream() .map(v -> mapOptional(v, recordMapProcessor, metadataProcessor)).toList() - )) + ) .map(v -> v.map(Json::from).orElse("null")) .doOnError(throwable -> { log.error("Response failed", throwable); recordMapProcessor.getMetadataProcessor().onErrorAll(throwable); }) .doOnComplete(() -> { - log.info("{} took {}", pseudoOperation, stopwatch.stop().elapsed()); + if (log.isDebugEnabled()) { + log.debug("{} took {}", pseudoOperation, stopwatch.stop().elapsed()); + } // Signal the metadataProcessor to stop collecting metadata recordMapProcessor.getMetadataProcessor().onCompleteAll(); - }); + })); return PseudoResponseSerializer.serialize(result, metadata, logs, metrics); } @@ -145,17 +148,19 @@ public Flowable process(RecordMapProcessorFactory recordProcessorFactory Flowable result = preprocessor.andThen(Flowable.fromIterable(values.stream() .map(v -> mapOptional(v, recordMapProcessor, metadataProcessor)).toList() - )) + ) .map(v -> v.map(Json::from).orElse("null")) .doOnError(throwable -> { log.error("Response failed", throwable); metadataProcessor.onErrorAll(throwable); }) .doOnComplete(() -> { - log.info("{} took {}", PseudoOperation.REPSEUDONYMIZE, stopwatch.stop().elapsed()); + if (log.isDebugEnabled()) { + log.debug("{} took {}", PseudoOperation.REPSEUDONYMIZE, stopwatch.stop().elapsed()); + } // Signal the metadataProcessor to stop collecting metadata metadataProcessor.onCompleteAll(); - }); + })); return PseudoResponseSerializer.serialize(result, metadata, logs, metrics); } @@ -171,13 +176,71 @@ private Optional mapOptional(String v, RecordMapProcessor values, RecordMapProcessor recordMapProcessor) { if (recordMapProcessor.hasPreprocessors()) { - return Completable.fromPublisher(Flowable.fromIterable(values.stream() - .filter(Objects::nonNull) - .map(v -> recordMapProcessor.init(Map.of(this.getName(), v))) - .toList()) + return Completable.fromPublisher( + Flowable.fromIterable(values.stream() + .filter(Objects::nonNull) + .toList()) + .map(v -> recordMapProcessor.init(Map.of(this.getName(), v))) ); } else { return Completable.complete(); } } -} \ No newline at end of file + + public Flowable processFastPseudonymize(PseudoConfigSplitter pseudoConfigSplitter, + RecordMapProcessorFactory recordProcessorFactory, + List values, + String correlationId, + boolean minimalMetricsMode) { + Stopwatch stopwatch = Stopwatch.createStarted(); + List pseudoConfigs = pseudoConfigSplitter.splitIfNecessary(this.getPseudoConfig()); + RecordMapProcessorFactory.SingleFieldProcessor processor = + recordProcessorFactory.newPseudonymizeSingleFieldProcessor( + pseudoConfigs, + this.getName(), + correlationId, + minimalMetricsMode + ); + + Completable preprocessor = Completable.fromAction(() -> { + for (String value : values) { + if (value != null) { + processor.init(value); + } + } + } + ); + + final PseudoMetadataProcessor metadataProcessor = processor.metadataProcessor(); + final Flowable metadata = minimalMetricsMode + ? Flowable.empty() + : Flowable.fromPublisher(metadataProcessor.getMetadata()); + final Flowable logs = minimalMetricsMode + ? Flowable.empty() + : Flowable.fromPublisher(metadataProcessor.getLogs()); + final Flowable metrics = minimalMetricsMode + ? Flowable.empty() + : Flowable.fromPublisher(metadataProcessor.getMetrics()); + + Flowable result = preprocessor.andThen(Flowable.defer(() -> { + List serialized = new ArrayList<>(values.size()); + for (String value : values) { + String transformed = processor.pseudonymize(value); + serialized.add(transformed == null ? "null" : Json.from(transformed)); + } + return Flowable.fromIterable(serialized); + })) + .doOnError(throwable -> { + log.error("Response failed", throwable); + metadataProcessor.onErrorAll(throwable); + }) + .doOnComplete(() -> { + if (log.isDebugEnabled()) { + log.debug("{} took {}", PseudoOperation.PSEUDONYMIZE, stopwatch.stop().elapsed()); + } + metadataProcessor.onCompleteAll(); + }); + + return PseudoResponseSerializer.serialize(result, metadata, logs, metrics); + } +} diff --git a/src/main/java/no/ssb/dlp/pseudo/service/pseudo/RecordMapProcessorFactory.java b/src/main/java/no/ssb/dlp/pseudo/service/pseudo/RecordMapProcessorFactory.java index 8b87b69a..69031333 100644 --- a/src/main/java/no/ssb/dlp/pseudo/service/pseudo/RecordMapProcessorFactory.java +++ b/src/main/java/no/ssb/dlp/pseudo/service/pseudo/RecordMapProcessorFactory.java @@ -1,5 +1,7 @@ package no.ssb.dlp.pseudo.service.pseudo; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.crypto.tink.Aead; import jakarta.inject.Singleton; @@ -7,6 +9,7 @@ import lombok.extern.slf4j.Slf4j; import no.ssb.dapla.dlp.pseudo.func.PseudoFuncInput; import no.ssb.dapla.dlp.pseudo.func.PseudoFuncOutput; +import no.ssb.dapla.dlp.pseudo.func.PseudoFunc; import no.ssb.dapla.dlp.pseudo.func.TransformDirection; import no.ssb.dapla.dlp.pseudo.func.fpe.FpeFunc; import no.ssb.dapla.dlp.pseudo.func.map.MapFailureStrategy; @@ -16,7 +19,6 @@ import no.ssb.dlp.pseudo.core.PseudoException; import no.ssb.dlp.pseudo.core.PseudoKeyset; import no.ssb.dlp.pseudo.core.PseudoOperation; -import no.ssb.dlp.pseudo.core.PseudoSecret; import no.ssb.dlp.pseudo.core.field.FieldDescriptor; import no.ssb.dlp.pseudo.core.field.ValueInterceptorChain; import no.ssb.dlp.pseudo.core.func.PseudoFuncDeclaration; @@ -31,9 +33,12 @@ import no.ssb.dlp.pseudo.service.pseudo.metadata.PseudoMetadataProcessor; import java.util.Collection; +import java.util.HashSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import static no.ssb.dlp.pseudo.core.PseudoOperation.DEPSEUDONYMIZE; import static no.ssb.dlp.pseudo.core.PseudoOperation.PSEUDONYMIZE; @@ -46,20 +51,44 @@ public class RecordMapProcessorFactory { private final PseudoSecrets pseudoSecrets; private final LoadingCache aeadCache; + private final Cache pseudoFuncsCache = Caffeine.newBuilder() + .maximumSize(1000) + .build(); + + public SingleFieldProcessor newPseudonymizeSingleFieldProcessor(List pseudoConfigs, + String fieldName, + String correlationId, + boolean minimalMetricsMode) { + PseudoMetadataProcessor metadataProcessor = new PseudoMetadataProcessor(correlationId); + Set metadataAdded = new HashSet<>(); + FieldDescriptor fieldDescriptor = FieldDescriptor.from(fieldName); + + List contexts = pseudoConfigs.stream().map(config -> new PseudoFuncContext( + newPseudoFuncs(config.getRules(), pseudoKeysetsOf(config.getKeysets())), + pseudoFuncDeclarationsOf(config.getRules()), + new HashMap<>()) + ).toList(); + + return new SingleFieldProcessor(fieldDescriptor, contexts, metadataProcessor, metadataAdded, minimalMetricsMode); + } public RecordMapProcessor newPseudonymizeRecordProcessor(List pseudoConfigs, String correlationId) { ValueInterceptorChain chain = new ValueInterceptorChain(); PseudoMetadataProcessor metadataProcessor = new PseudoMetadataProcessor(correlationId); + Set metadataAdded = new HashSet<>(); for (PseudoConfig config : pseudoConfigs) { - List secrets = pseudoSecrets.resolve(); + Map> fieldMatchCache = new HashMap<>(); for (PseudoKeyset keyset : config.getKeysets()) { - log.info(keyset.getKekUri().toString()); + if (log.isDebugEnabled()) { + log.debug("Using keyset KEK URI: {}", keyset.getKekUri()); + } } final PseudoFuncs fieldPseudonymizer = newPseudoFuncs(config.getRules(), pseudoKeysetsOf(config.getKeysets())); - chain.preprocessor((f, v) -> init(fieldPseudonymizer, TransformDirection.APPLY, f, v)); - chain.register((f, v) -> process(PSEUDONYMIZE, fieldPseudonymizer, f, v, metadataProcessor)); + final Map funcDeclarations = pseudoFuncDeclarationsOf(config.getRules()); + chain.preprocessor((f, v) -> init(fieldPseudonymizer, fieldMatchCache, TransformDirection.APPLY, f, v)); + chain.register((f, v) -> process(PSEUDONYMIZE, fieldPseudonymizer, fieldMatchCache, funcDeclarations, metadataAdded, f, v, metadataProcessor)); } return new RecordMapProcessor<>(chain, metadataProcessor); } @@ -69,10 +98,12 @@ public RecordMapProcessor newDepseudonymizeRecordProces PseudoMetadataProcessor metadataProcessor = new PseudoMetadataProcessor(correlationId); for (PseudoConfig config : pseudoConfigs) { + Map> fieldMatchCache = new HashMap<>(); final PseudoFuncs fieldDepseudonymizer = newPseudoFuncs(config.getRules(), pseudoKeysetsOf(config.getKeysets())); - chain.preprocessor((f, v) -> init(fieldDepseudonymizer, TransformDirection.RESTORE, f, v)); - chain.register((f, v) -> process(DEPSEUDONYMIZE, fieldDepseudonymizer, f, v, metadataProcessor)); + final Map funcDeclarations = pseudoFuncDeclarationsOf(config.getRules()); + chain.preprocessor((f, v) -> init(fieldDepseudonymizer, fieldMatchCache, TransformDirection.RESTORE, f, v)); + chain.register((f, v) -> process(DEPSEUDONYMIZE, fieldDepseudonymizer, fieldMatchCache, funcDeclarations, null, f, v, metadataProcessor)); } return new RecordMapProcessor<>(chain, metadataProcessor); @@ -82,25 +113,62 @@ public RecordMapProcessor newRepseudonymizeRecordProces PseudoConfig targetPseudoConfig, String correlationId) { final PseudoFuncs fieldDepseudonymizer = newPseudoFuncs(sourcePseudoConfig.getRules(), pseudoKeysetsOf(sourcePseudoConfig.getKeysets())); + final Map sourceDeclarations = pseudoFuncDeclarationsOf(sourcePseudoConfig.getRules()); final PseudoFuncs fieldPseudonymizer = newPseudoFuncs(targetPseudoConfig.getRules(), pseudoKeysetsOf(targetPseudoConfig.getKeysets())); + final Map targetDeclarations = pseudoFuncDeclarationsOf(targetPseudoConfig.getRules()); PseudoMetadataProcessor metadataProcessor = new PseudoMetadataProcessor(correlationId); + Set metadataAdded = new HashSet<>(); + Map> sourceFieldMatchCache = new HashMap<>(); + Map> targetFieldMatchCache = new HashMap<>(); return new RecordMapProcessor<>( new ValueInterceptorChain() - .preprocessor((f, v) -> init(fieldDepseudonymizer, TransformDirection.RESTORE, f, v)) - .register((f, v) -> process(DEPSEUDONYMIZE, fieldDepseudonymizer, f, v, metadataProcessor)) - .register((f, v) -> process(PSEUDONYMIZE, fieldPseudonymizer, f, v, metadataProcessor)), + .preprocessor((f, v) -> init(fieldDepseudonymizer, sourceFieldMatchCache, TransformDirection.RESTORE, f, v)) + .register((f, v) -> process(DEPSEUDONYMIZE, fieldDepseudonymizer, sourceFieldMatchCache, sourceDeclarations, null, f, v, metadataProcessor)) + .register((f, v) -> process(PSEUDONYMIZE, fieldPseudonymizer, targetFieldMatchCache, targetDeclarations, metadataAdded, f, v, metadataProcessor)), metadataProcessor); } protected PseudoFuncs newPseudoFuncs(Collection rules, Collection keysets) { - return new PseudoFuncs(rules, pseudoSecrets.resolve(), keysets, aeadCache); + if (containsStatefulFunc(rules)) { + return new PseudoFuncs(rules, pseudoSecrets.resolve(), keysets, aeadCache); + } + + PseudoFuncsCacheKey cacheKey = new PseudoFuncsCacheKey( + rules.stream().map(PseudoFuncRule::getFunc).toList(), + keysets.stream().map(this::pseudoKeysetSignature).toList() + ); + + return pseudoFuncsCache.get(cacheKey, + key -> new PseudoFuncs(rules, pseudoSecrets.resolve(), keysets, aeadCache)); + } + + private static boolean containsStatefulFunc(Collection rules) { + return rules.stream() + .map(PseudoFuncRule::getFunc) + .map(PseudoFuncDeclaration::fromString) + .map(PseudoFuncDeclaration::getFuncName) + .anyMatch(funcName -> funcName.equals(PseudoFuncNames.MAP_SID) + || funcName.equals(PseudoFuncNames.MAP_SID_FF31) + || funcName.equals(PseudoFuncNames.MAP_SID_DAEAD)); } - private String init(PseudoFuncs pseudoFuncs, TransformDirection direction, FieldDescriptor field, String varValue) { + private String pseudoKeysetSignature(PseudoKeyset keyset) { + return String.join("|", + String.valueOf(keyset.primaryKeyId()), + String.valueOf(keyset.getKekUri()), + String.valueOf(keyset.toJson()) + ); + } + + private String init(PseudoFuncs pseudoFuncs, + Map> fieldMatchCache, + TransformDirection direction, + FieldDescriptor field, + String varValue) { if (varValue != null) { - pseudoFuncs.findPseudoFunc(field).ifPresent(pseudoFunc -> + findPseudoFunc(pseudoFuncs, fieldMatchCache, field).ifPresent(pseudoFunc -> pseudoFunc.getFunc().init(PseudoFuncInput.of(varValue), direction)); } return varValue; @@ -108,10 +176,13 @@ private String init(PseudoFuncs pseudoFuncs, TransformDirection direction, Field private String process(PseudoOperation operation, PseudoFuncs func, + Map> fieldMatchCache, + Map funcDeclarations, + Set metadataAdded, FieldDescriptor field, String varValue, PseudoMetadataProcessor metadataProcessor) { - PseudoFuncRuleMatch match = func.findPseudoFunc(field).orElse(null); + PseudoFuncRuleMatch match = findPseudoFunc(func, fieldMatchCache, field).orElse(null); if (match == null) { return varValue; @@ -124,7 +195,7 @@ private String process(PseudoOperation operation, return varValue; } try { - PseudoFuncDeclaration funcDeclaration = PseudoFuncDeclaration.fromString(match.getRule().getFunc()); + PseudoFuncDeclaration funcDeclaration = funcDeclarations.get(match.getRule()); // FPE requires minimum two bytes/chars to perform encryption and minimum four bytes in case of Unicode. if (varValue.length() < 4 && ( @@ -153,15 +224,19 @@ private String process(PseudoOperation operation, } else if (isSidMapping) { metadataProcessor.addMetric(FieldMetric.MAPPED_SID); } - metadataProcessor.addMetadata(FieldMetadata.builder() - .shortName(field.getName()) - .dataElementPath(normalizePath(field.getPath())) // Skip leading slash and use dot as separator - .encryptionKeyReference(funcDeclaration.getArgs().getOrDefault(KEY_REFERENCE, null)) - .encryptionAlgorithm(match.getFunc().getAlgorithm()) - .stableIdentifierVersion(sidSnapshotDate) - .stableIdentifierType(isSidMapping) - .encryptionAlgorithmParameters(funcDeclaration.getArgs()) - .build()); + String path = normalizePath(field.getPath()); + String metadataKey = path + "|" + match.getRule().getFunc() + "|" + sidSnapshotDate; + if (metadataAdded == null || metadataAdded.add(metadataKey)) { + metadataProcessor.addMetadata(FieldMetadata.builder() + .shortName(field.getName()) + .dataElementPath(path) + .encryptionKeyReference(funcDeclaration.getArgs().getOrDefault(KEY_REFERENCE, null)) + .encryptionAlgorithm(match.getFunc().getAlgorithm()) + .stableIdentifierVersion(sidSnapshotDate) + .stableIdentifierType(isSidMapping) + .encryptionAlgorithmParameters(funcDeclaration.getArgs()) + .build()); + } return mappedValue; } else if (operation == DEPSEUDONYMIZE) { @@ -201,6 +276,155 @@ protected static List pseudoKeysetsOf(List .toList(); } + private static Map pseudoFuncDeclarationsOf(Collection rules) { + return rules.stream().collect(java.util.stream.Collectors.toMap( + rule -> rule, + rule -> PseudoFuncDeclaration.fromString(rule.getFunc()) + )); + } + + private static Optional findPseudoFunc(PseudoFuncs pseudoFuncs, + Map> fieldMatchCache, + FieldDescriptor field) { + return fieldMatchCache.computeIfAbsent(field.getPath(), p -> pseudoFuncs.findPseudoFunc(field)); + } + + private record PseudoFuncsCacheKey(List funcs, List keysetSignatures) {} + + private record PseudoFuncContext(PseudoFuncs funcs, + Map declarations, + Map> fieldMatchCache) {} + + public class SingleFieldProcessor { + private final FieldDescriptor fieldDescriptor; + private final List steps; + private final PseudoMetadataProcessor metadataProcessor; + private final Set metadataAdded; + private final String normalizedPath; + private final boolean minimalMetricsMode; + + private SingleFieldProcessor(FieldDescriptor fieldDescriptor, + List contexts, + PseudoMetadataProcessor metadataProcessor, + Set metadataAdded, + boolean minimalMetricsMode) { + this.fieldDescriptor = fieldDescriptor; + this.steps = contexts.stream() + .map(context -> findPseudoFunc(context.funcs, context.fieldMatchCache, fieldDescriptor) + .map(match -> new SingleFieldStep(match, context.declarations.get(match.getRule()))) + .orElse(null)) + .filter(java.util.Objects::nonNull) + .toList(); + this.metadataProcessor = metadataProcessor; + this.metadataAdded = metadataAdded; + this.normalizedPath = normalizePath(fieldDescriptor.getPath()); + this.minimalMetricsMode = minimalMetricsMode; + } + + public String init(String varValue) { + String current = varValue; + for (SingleFieldStep step : steps) { + if (current != null) { + step.func.init(PseudoFuncInput.of(current), TransformDirection.APPLY); + } + } + return current; + } + + public String pseudonymize(String varValue) { + if (varValue == null) { + if (!minimalMetricsMode) { + metadataProcessor.addMetric(FieldMetric.NULL_VALUE); + } + return null; + } + String current = varValue; + for (SingleFieldStep step : steps) { + if (current == null) { + if (!minimalMetricsMode && !(step.func instanceof MapFunc)) { + metadataProcessor.addMetric(FieldMetric.NULL_VALUE); + } + continue; + } + + if (current.length() < 4 && step.fpeLimited) { + if (!minimalMetricsMode) { + metadataProcessor.addMetric(FieldMetric.FPE_LIMITATION); + } + current = step.mapFailureStrategy == MapFailureStrategy.RETURN_ORIGINAL ? current : null; + continue; + } + + PseudoFuncOutput output = step.func.apply(PseudoFuncInput.of(current)); + + if (!minimalMetricsMode) { + output.getWarnings().forEach(metadataProcessor::addLog); + } + String sidSnapshotDate = output.getMetadata().getOrDefault(MapFuncConfig.Param.SNAPSHOT_DATE, null); + String mapFailureMetadata = output.getMetadata().getOrDefault(MAP_FAILURE_METADATA, null); + current = output.getValue(); + + if (!minimalMetricsMode) { + if (step.isSidMapping && mapFailureMetadata != null) { + metadataProcessor.addMetric(FieldMetric.MISSING_SID); + } else if (step.isSidMapping) { + metadataProcessor.addMetric(FieldMetric.MAPPED_SID); + } + + if (step.isSidMapping) { + addMetadata(step, sidSnapshotDate); + } else if (!step.metadataAddedOnce) { + addMetadata(step, null); + step.metadataAddedOnce = true; + } + } + } + return current; + } + + private void addMetadata(SingleFieldStep step, String sidSnapshotDate) { + String metadataKey = normalizedPath + "|" + step.declaration.getFuncName() + "|" + sidSnapshotDate; + if (metadataAdded.add(metadataKey)) { + metadataProcessor.addMetadata(FieldMetadata.builder() + .shortName(fieldDescriptor.getName()) + .dataElementPath(normalizedPath) + .encryptionKeyReference(step.declaration.getArgs().getOrDefault(KEY_REFERENCE, null)) + .encryptionAlgorithm(step.func.getAlgorithm()) + .stableIdentifierVersion(sidSnapshotDate) + .stableIdentifierType(step.isSidMapping) + .encryptionAlgorithmParameters(step.declaration.getArgs()) + .build()); + } + } + + public PseudoMetadataProcessor metadataProcessor() { + return metadataProcessor; + } + } + + private class SingleFieldStep { + private final PseudoFunc func; + private final PseudoFuncDeclaration declaration; + private final boolean isSidMapping; + private final boolean fpeLimited; + private final MapFailureStrategy mapFailureStrategy; + private boolean metadataAddedOnce; + + private SingleFieldStep(PseudoFuncRuleMatch match, PseudoFuncDeclaration declaration) { + this.func = match.getFunc(); + this.declaration = declaration; + this.isSidMapping = declaration.getFuncName().equals(PseudoFuncNames.MAP_SID) + || declaration.getFuncName().equals(PseudoFuncNames.MAP_SID_FF31) + || declaration.getFuncName().equals(PseudoFuncNames.MAP_SID_DAEAD); + this.fpeLimited = func instanceof FpeFunc + || func instanceof TinkFpeFunc + || declaration.getFuncName().equals(PseudoFuncNames.MAP_SID) + || declaration.getFuncName().equals(PseudoFuncNames.MAP_SID_FF31); + this.mapFailureStrategy = getMapFailureStrategy(declaration.getArgs()); + this.metadataAddedOnce = false; + } + } + private MapFailureStrategy getMapFailureStrategy(Map config) { return Optional.ofNullable( config.getOrDefault(MapFuncConfig.Param.MAP_FAILURE_STRATEGY, null) diff --git a/src/main/java/no/ssb/dlp/pseudo/service/pseudo/metadata/PseudoMetadataProcessor.java b/src/main/java/no/ssb/dlp/pseudo/service/pseudo/metadata/PseudoMetadataProcessor.java index e731f90f..ea4476c8 100644 --- a/src/main/java/no/ssb/dlp/pseudo/service/pseudo/metadata/PseudoMetadataProcessor.java +++ b/src/main/java/no/ssb/dlp/pseudo/service/pseudo/metadata/PseudoMetadataProcessor.java @@ -9,15 +9,19 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; @Value public class PseudoMetadataProcessor { + private static final int LOG_LIMIT = 100; + String correlationId; Map> uniqueMetadataPaths = new LinkedHashMap<>(); ReplayProcessor datadocMetadata = ReplayProcessor.create(); ReplayProcessor logs = ReplayProcessor.create(); ReplayProcessor metrics = ReplayProcessor.create(); + AtomicInteger logCount = new AtomicInteger(0); public PseudoMetadataProcessor(String correlationId) { this.correlationId = correlationId; @@ -29,7 +33,9 @@ public void addMetadata(final FieldMetadata metadata) { } } public void addLog(String log) { - logs.onNext(log); + if (logCount.getAndIncrement() < LOG_LIMIT) { + logs.onNext(log); + } } public void addMetric(FieldMetric fieldMetric) { metrics.onNext(fieldMetric); diff --git a/src/main/java/no/ssb/dlp/pseudo/service/security/CloudRunEndpointFilter.java b/src/main/java/no/ssb/dlp/pseudo/service/security/CloudRunEndpointFilter.java index a4baf2b0..223b80e5 100644 --- a/src/main/java/no/ssb/dlp/pseudo/service/security/CloudRunEndpointFilter.java +++ b/src/main/java/no/ssb/dlp/pseudo/service/security/CloudRunEndpointFilter.java @@ -34,10 +34,10 @@ public class CloudRunEndpointFilter implements HttpServerFilter { @Override public Publisher> doFilter(HttpRequest request, ServerFilterChain chain) { String path = request.getUri().getPath(); - if (path.equals("/pseudonymize/file") || path.equals("/pseudonymize/field")) { + if (path.equals("/pseudonymize/file") || path.equals("/pseudonymize/field") || path.equals("/pseudonymize/field-fast")) { return chain.proceed(request); } else { return Flowable.just(HttpResponse.notFound()); } } -} \ No newline at end of file +} diff --git a/src/main/java/no/ssb/dlp/pseudo/service/sid/SidMapper.java b/src/main/java/no/ssb/dlp/pseudo/service/sid/SidMapper.java index db5e3129..6786a9be 100644 --- a/src/main/java/no/ssb/dlp/pseudo/service/sid/SidMapper.java +++ b/src/main/java/no/ssb/dlp/pseudo/service/sid/SidMapper.java @@ -83,27 +83,12 @@ private PseudoFuncOutput mapTo(String identifier, boolean isFnr) { if (identifier == null) { return PseudoFuncOutput.of(null); } - // Execute the bulk request if necessary - if (bulkRequest.isEmpty()) { - // Split fnrs or snrs into chunks of BULK_SIZE - for (List bulkIdentifiers : Lists.partition(List.copyOf(identifiers), partitionSize)) { - log.info("Execute SID-mapping bulk request"); - final ObservableSubscriber> subscriber; - - if (isFnr) { - subscriber = ObservableSubscriber.subscribe( - sidService.lookupFnr(bulkIdentifiers, getSnapshot())); - } else { - subscriber = ObservableSubscriber.subscribe( - sidService.lookupSnr(bulkIdentifiers, getSnapshot())); - } - - for (String id : bulkIdentifiers) { - bulkRequest.put(id, subscriber); - } - } + ensureBulkRequestContains(identifier, isFnr); + ObservableSubscriber> subscriber = bulkRequest.get(identifier); + if (subscriber == null) { + throw new RuntimeException("SID subscriber not found for identifier"); } - SidInfo result = bulkRequest.get(identifier).awaitResult() + SidInfo result = subscriber.awaitResult() .orElseThrow(() -> new RuntimeException("SID service did not respond")) .get(identifier); @@ -112,6 +97,48 @@ private PseudoFuncOutput mapTo(String identifier, boolean isFnr) { } + private void ensureBulkRequestContains(String identifier, boolean isFnr) { + if (bulkRequest.containsKey(identifier)) { + return; + } + + synchronized (this) { + if (bulkRequest.containsKey(identifier)) { + return; + } + + // Defensive fallback: include the current identifier even if init() was not called + identifiers.add(identifier); + + // Initial request: use pre-collected identifiers for efficient bulk lookup. + if (bulkRequest.isEmpty()) { + for (List bulkIdentifiers : Lists.partition(List.copyOf(identifiers), partitionSize)) { + log.info("Execute SID-mapping bulk request"); + final ObservableSubscriber> subscriber; + + if (isFnr) { + subscriber = ObservableSubscriber.subscribe( + sidService.lookupFnr(bulkIdentifiers, getSnapshot())); + } else { + subscriber = ObservableSubscriber.subscribe( + sidService.lookupSnr(bulkIdentifiers, getSnapshot())); + } + + for (String id : bulkIdentifiers) { + bulkRequest.put(id, subscriber); + } + } + } else { + // Late/missing identifier: fallback to a tiny lookup to avoid NullPointerException. + log.info("Execute SID-mapping fallback request for missing identifier"); + ObservableSubscriber> subscriber = isFnr + ? ObservableSubscriber.subscribe(sidService.lookupFnr(List.of(identifier), getSnapshot())) + : ObservableSubscriber.subscribe(sidService.lookupSnr(List.of(identifier), getSnapshot())); + bulkRequest.put(identifier, subscriber); + } + } + } + private PseudoFuncOutput createMappingLogsAndOutput(SidInfo sidInfo, boolean isFnr, String identifier) { //Mapping for fnr @@ -281,4 +308,4 @@ private ObservableSubscriber await() { return this; } } -} \ No newline at end of file +} diff --git a/src/test/java/no/ssb/dlp/pseudo/service/performance/PseudoServiceDaeadPerformanceTest.java b/src/test/java/no/ssb/dlp/pseudo/service/performance/PseudoServiceDaeadPerformanceTest.java new file mode 100644 index 00000000..eb0d456e --- /dev/null +++ b/src/test/java/no/ssb/dlp/pseudo/service/performance/PseudoServiceDaeadPerformanceTest.java @@ -0,0 +1,304 @@ +package no.ssb.dlp.pseudo.service.performance; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.crypto.tink.Aead; +import com.google.crypto.tink.JsonKeysetWriter; +import com.google.crypto.tink.KeyTemplates; +import com.google.crypto.tink.KeysetHandle; +import com.google.crypto.tink.aead.AeadConfig; +import com.google.crypto.tink.daead.DeterministicAeadConfig; +import no.ssb.dlp.pseudo.core.util.Json; +import no.ssb.dlp.pseudo.core.tink.model.EncryptedKeysetWrapper; +import no.ssb.dlp.pseudo.service.pseudo.PseudoConfigSplitter; +import no.ssb.dlp.pseudo.service.pseudo.PseudoController; +import no.ssb.dlp.pseudo.service.pseudo.PseudoSecrets; +import no.ssb.dlp.pseudo.service.pseudo.RecordMapProcessorFactory; +import no.ssb.dlp.pseudo.service.pseudo.StreamProcessorFactory; +import no.ssb.dlp.pseudo.service.secrets.MockSecretService; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.SplittableRandom; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class PseudoServiceDaeadPerformanceTest { + + private static final String ENABLED_PROP = "pseudo.performance.enabled"; + private static final String BATCH_SIZE_PROP = "pseudo.performance.batchSize"; + private static final String WARMUP_ROUNDS_PROP = "pseudo.performance.warmupRounds"; + private static final String MEASURE_ROUNDS_PROP = "pseudo.performance.measureRounds"; + + private static final URI TEST_KEK_URI = URI.create("test-kek://local/master-key"); + + private static PseudoController controller; + private static EncryptedKeysetWrapper keyset; + + @BeforeAll + static void setUp() throws Exception { + AeadConfig.register(); + DeterministicAeadConfig.register(); + + Aead masterAead = KeysetHandle.generateNew(KeyTemplates.get("AES256_GCM")) + .getPrimitive(Aead.class); + + keyset = createWrappedDaeadKeyset(masterAead, TEST_KEK_URI); + + LoadingCache aeadCache = Caffeine.newBuilder() + .maximumSize(100) + .build(uri -> { + if (!TEST_KEK_URI.toString().equals(uri)) { + throw new IllegalArgumentException("Unknown KEK URI in test: " + uri); + } + return masterAead; + }); + + PseudoSecrets pseudoSecrets = new PseudoSecrets(new MockSecretService(), Map.of()); + RecordMapProcessorFactory recordMapProcessorFactory = + new RecordMapProcessorFactory(pseudoSecrets, aeadCache); + + controller = new PseudoController( + new StreamProcessorFactory(), + recordMapProcessorFactory, + new PseudoConfigSplitter() + ); + } + + @Test + void benchmarkDaeadFieldEndpoints() throws Exception { + Assumptions.assumeTrue(Boolean.getBoolean(ENABLED_PROP), + () -> "Skipping performance benchmark. Run with -D" + ENABLED_PROP + "=true"); + + final int batchSize = Integer.getInteger(BATCH_SIZE_PROP, 10_000); + final int warmupRounds = Integer.getInteger(WARMUP_ROUNDS_PROP, 2); + final int measureRounds = Integer.getInteger(MEASURE_ROUNDS_PROP, 8); + + List inputValues = generateValues(batchSize, 42L); + String daeadFunc = "daead(keyId=" + keyset.primaryKeyId() + ")"; + + PseudoController.PseudoFieldRequest pseudoReq = new PseudoController.PseudoFieldRequest(); + pseudoReq.setName("fnr"); + pseudoReq.setPattern("**"); + pseudoReq.setPseudoFunc(daeadFunc); + pseudoReq.setKeyset(keyset); + pseudoReq.setValues(inputValues); + + String pseudoReqJson = Json.from(pseudoReq); + + BenchmarkResult pseudonymizeResult = benchmark( + "pseudonymize_full", + warmupRounds, + measureRounds, + () -> { + String responseJson = callPseudonymize(pseudoReqJson, false); + List data = extractDataValues(responseJson); + assertEquals(inputValues.size(), data.size()); + return data; + } + ); + + BenchmarkResult pseudonymizeMinimalResult = benchmark( + "pseudonymize_minimal_metrics", + warmupRounds, + measureRounds, + () -> { + String responseJson = callPseudonymize(pseudoReqJson, true); + List data = extractDataValues(responseJson); + assertEquals(inputValues.size(), data.size()); + return data; + } + ); + + List pseudonymizedValues = callPseudonymizeAndExtractData(pseudoReqJson, false); + + PseudoController.DepseudoFieldRequest depseudoReq = new PseudoController.DepseudoFieldRequest(); + depseudoReq.setName("fnr"); + depseudoReq.setPattern("**"); + depseudoReq.setPseudoFunc(daeadFunc); + depseudoReq.setKeyset(keyset); + depseudoReq.setValues(pseudonymizedValues); + + String depseudoReqJson = Json.from(depseudoReq); + + BenchmarkResult depseudonymizeResult = benchmark( + "depseudonymize", + warmupRounds, + measureRounds, + () -> { + String responseJson = callDepseudonymize(depseudoReqJson); + List restored = extractDataValues(responseJson); + assertEquals(inputValues, restored); + return restored; + } + ); + + Map report = new LinkedHashMap<>(); + report.put("timestamp", Instant.now().toString()); + report.put("batchSize", batchSize); + report.put("warmupRounds", warmupRounds); + report.put("measureRounds", measureRounds); + report.put("pseudonymizeFull", pseudonymizeResult.toMap()); + report.put("pseudonymizeMinimalMetrics", pseudonymizeMinimalResult.toMap()); + report.put("depseudonymize", depseudonymizeResult.toMap()); + + Path reportPath = Path.of("target", "performance", "pseudo-service-daead-field.json"); + Files.createDirectories(reportPath.getParent()); + Files.writeString(reportPath, Json.prettyFrom(report), StandardCharsets.UTF_8); + + System.out.println("DAEAD benchmark report written to: " + reportPath.toAbsolutePath()); + System.out.println(Json.prettyFrom(report)); + } + + private static BenchmarkResult benchmark(String name, + int warmupRounds, + int measureRounds, + Supplier> call) { + for (int i = 0; i < warmupRounds; i++) { + call.get(); + } + + List elapsedMillis = new ArrayList<>(measureRounds); + int itemCount = -1; + for (int i = 0; i < measureRounds; i++) { + long start = System.nanoTime(); + List data = call.get(); + long end = System.nanoTime(); + if (itemCount < 0) { + itemCount = data.size(); + } + elapsedMillis.add((end - start) / 1_000_000d); + } + + return BenchmarkResult.of(name, itemCount, elapsedMillis); + } + + private static String callPseudonymize(String requestJson, boolean minimalMetricsMode) { + return collectBody( + controller.pseudonymizeFieldFast(requestJson, minimalMetricsMode).body() + ); + } + + private static List callPseudonymizeAndExtractData(String requestJson, boolean minimalMetricsMode) { + return extractDataValues(callPseudonymize(requestJson, minimalMetricsMode)); + } + + private static String callDepseudonymize(String requestJson) { + return collectBody( + controller.depseudonymizeField(requestJson).body() + ); + } + + private static String collectBody(io.reactivex.Flowable body) { + StringBuilder sb = new StringBuilder(); + for (byte[] part : body.blockingIterable()) { + sb.append(new String(part, StandardCharsets.UTF_8)); + } + return sb.toString(); + } + + private static List extractDataValues(String responseJson) { + Map payload = Json.toGenericMap(responseJson); + Object dataObj = payload.get("data"); + assertNotNull(dataObj, "Expected response to contain data array"); + + List raw = (List) dataObj; + List values = new ArrayList<>(raw.size()); + for (Object value : raw) { + values.add(value == null ? null : String.valueOf(value)); + } + return values; + } + + private static List generateValues(int size, long seed) { + SplittableRandom random = new SplittableRandom(seed); + List values = new ArrayList<>(size); + final String alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + + for (int i = 0; i < size; i++) { + int len = 10 + random.nextInt(11); + StringBuilder sb = new StringBuilder(len); + for (int c = 0; c < len; c++) { + sb.append(alphabet.charAt(random.nextInt(alphabet.length()))); + } + values.add(sb.toString()); + } + + return values; + } + + private static EncryptedKeysetWrapper createWrappedDaeadKeyset(Aead masterAead, URI kekUri) throws Exception { + KeysetHandle dataKeyset = KeysetHandle.generateNew(KeyTemplates.get("AES256_SIV")); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + dataKeyset.write(JsonKeysetWriter.withOutputStream(baos), masterAead); + + EncryptedKeysetWrapper wrapper = Json.toObject( + EncryptedKeysetWrapper.class, + baos.toString(StandardCharsets.UTF_8) + ); + wrapper.setKekUri(kekUri); + return wrapper; + } + + private record BenchmarkResult( + String name, + int itemCount, + double minMs, + double maxMs, + double avgMs, + double p50Ms, + double p95Ms, + double throughputPerSec + ) { + private static BenchmarkResult of(String name, int itemCount, List elapsedMillis) { + List sorted = new ArrayList<>(elapsedMillis); + Collections.sort(sorted); + + double min = sorted.get(0); + double max = sorted.get(sorted.size() - 1); + double sum = sorted.stream().mapToDouble(Double::doubleValue).sum(); + double avg = sum / sorted.size(); + double p50 = percentile(sorted, 0.50); + double p95 = percentile(sorted, 0.95); + double throughput = (itemCount * 1000d) / avg; + + return new BenchmarkResult(name, itemCount, min, max, avg, p50, p95, throughput); + } + + private static double percentile(List sorted, double p) { + if (sorted.size() == 1) { + return sorted.get(0); + } + int idx = (int) Math.ceil(p * sorted.size()) - 1; + idx = Math.max(0, Math.min(idx, sorted.size() - 1)); + return sorted.get(idx); + } + + private Map toMap() { + Map map = new LinkedHashMap<>(); + map.put("name", name); + map.put("items", itemCount); + map.put("minMs", minMs); + map.put("maxMs", maxMs); + map.put("avgMs", avgMs); + map.put("p50Ms", p50Ms); + map.put("p95Ms", p95Ms); + map.put("throughputItemsPerSec", throughputPerSec); + return map; + } + } +}