diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index d4be868..8a0f961 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,4 +1,4 @@ # Code owners file. # This file controls who is tagged for review for any given pull request. -* @midamkina @ngorchakova @gabrysiaolsz +* @midamkina @ngorchakova @gabrysiaolsz @chanter @KonstantinMi diff --git a/.github/workflows/pr-checks.yml b/.github/workflows/pr-checks.yml new file mode 100644 index 0000000..9712f6a --- /dev/null +++ b/.github/workflows/pr-checks.yml @@ -0,0 +1,79 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: PR Checks + +on: + pull_request: + branches: main + +jobs: + pr-checks: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up JDK 11 + uses: actions/setup-java@v4 + with: + java-version: '11' + distribution: 'temurin' + + - name: Cache Gradle packages + uses: actions/cache@v4 + with: + path: | + ~/.gradle/caches + ~/.gradle/wrapper + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} + restore-keys: | + ${{ runner.os }}-gradle- + + - name: Make gradlew executable + run: chmod +x gradlew + + - name: Run Google Java Format verification + run: ./gradlew verGJF + + - name: Run Checkstyle + id: run-checkstyle + run: ./gradlew checkstyleMain checkstyleTest + + - name: Build project + run: ./gradlew build shadowJar + + - name: Run tests + id: run-tests + run: ./gradlew test + + - name: Run additional checks + run: ./gradlew check + + - name: Upload test results + uses: actions/upload-artifact@v4 + if: always() && (steps.run-tests.outcome == 'success' || steps.run-tests.outcome == 'failure') + with: + name: test-results + path: | + lib/build/reports/tests/ + lib/build/test-results/ + + - name: Upload checkstyle results + uses: actions/upload-artifact@v4 + if: always() && (steps.run-checkstyle.outcome == 'success' || steps.run-checkstyle.outcome == 'failure') + with: + name: checkstyle-results + path: lib/build/reports/checkstyle/ diff --git a/README.md b/README.md index d3a2262..923d2b3 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,56 @@ Or, for a shadow jar: ``` The output will be located at `lib/build/libs`. + +## Logging + +This library uses [SLF4J](https://www.slf4j.org/) for logging. To see log output, you need to include an SLF4J binding in your application dependencies. + +### Adding a logging implementation + +For Logback (recommended): +```xml + + ch.qos.logback + logback-classic + 1.4.14 + +``` + +For Log4j2: +```xml + + org.apache.logging.log4j + log4j-slf4j2-impl + 2.20.0 + +``` + +### Configuring log levels + +The library logs at different levels: +- **INFO**: Client creation, cache initialization, shutdown events +- **WARN**: Service disruptions, graceful shutdown timeouts, API errors +- **DEBUG**: Detailed API call information, cache operations + +Example Logback configuration (`logback.xml`): +```xml + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + +``` + ## Performing calls To perform a call, you need to: 1. Create a client diff --git a/build.sh b/build.sh index f6b7d05..a350fe3 100755 --- a/build.sh +++ b/build.sh @@ -17,4 +17,4 @@ # Fail on any error set -e -./gradlew build \ No newline at end of file +./gradlew build shadowJar \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 41d9927..d64cd49 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/kokoro/gcp_ubuntu/continuous.cfg b/kokoro/gcp_ubuntu/continuous.cfg deleted file mode 100644 index 1ad02a5..0000000 --- a/kokoro/gcp_ubuntu/continuous.cfg +++ /dev/null @@ -1,5 +0,0 @@ -# -*- protobuffer -*- -# proto-file: google3/devtools/kokoro/config/proto/build.proto -# proto-message: BuildConfig - -build_file: "data-lineage-producer-java-library/kokoro/gcp_ubuntu/kokoro_build.sh" \ No newline at end of file diff --git a/kokoro/gcp_ubuntu/kokoro_build.sh b/kokoro/gcp_ubuntu/kokoro_build.sh deleted file mode 100755 index ac2c86a..0000000 --- a/kokoro/gcp_ubuntu/kokoro_build.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/bin/bash - -# Copyright 2024 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Fail on any error -set -e - -sudo update-java-alternatives --set java-1.11.0-openjdk-amd64 -JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 - -cd "${KOKORO_ARTIFACTS_DIR}/git/data-lineage-producer-java-library" -./tests.sh -./build.sh diff --git a/kokoro/gcp_ubuntu/presubmit.cfg b/kokoro/gcp_ubuntu/presubmit.cfg deleted file mode 100644 index b83f143..0000000 --- a/kokoro/gcp_ubuntu/presubmit.cfg +++ /dev/null @@ -1,5 +0,0 @@ -# -*- protobuffer -*- -# proto-file: google3/devtools/kokoro/config/proto/build.proto -# proto-message: BuildConfig - -build_file: "data-lineage-producer-java-library/kokoro/gcp_ubuntu/kokoro_build.sh" diff --git a/lib/build.gradle b/lib/build.gradle index 063a166..b2d53d8 100755 --- a/lib/build.gradle +++ b/lib/build.gradle @@ -14,16 +14,38 @@ plugins { id 'java-library' +// TODO upgrade this to https://plugins.gradle.org/plugin/com.gradleup.shadow id 'com.github.johnrengelman.shadow' version '7.1.2' id 'com.github.sherter.google-java-format' version '0.9' id 'checkstyle' + id 'maven-publish' + id 'signing' + +// v"2.2.5" is recommended, but it uses Java21 which is incompatible with other +// plugins (e.g. "com.github.johnrengelman.shadow"), therefore, using latest compatible version + id("com.google.cloud.artifactregistry.gradle-plugin") version "2.1.5" } googleJavaFormat { toolVersion = '1.7' } -version = '1.0.0' +ext { + cloudBomVersion = '26.37.0' + protobufVersion = '3.23.0' + grpcProtobufVersion = '1.54.1' + gaxVersion = '2.29.0' + gaxHttpJsonVersion = '0.103.7' + gaxTestLibVersion = '2.12.2' + guavaVersion = '33.2.1-jre' + junitVersion = '4.13.2' + truthVersion = '1.1.3' + mockitoVersion = '3.+' + parameterInjectorVersion = '1.8' + slf4jVersion = '2.0.9' + logbackVersion = '1.4.14' + lombokVersion = '1.18.30' +} repositories { // Use Maven Central for resolving dependencies. @@ -31,21 +53,41 @@ repositories { } dependencies { - implementation platform('com.google.cloud:libraries-bom:26.37.0') - implementation "com.google.cloud:google-cloud-datalineage" - - implementation "com.google.protobuf:protobuf-java-util:3.23.0" - implementation 'com.google.api:gax:2.29.0' - implementation 'com.google.api:gax-grpc:2.29.0' - implementation 'com.google.api:gax-httpjson:0.103.7' - implementation 'io.grpc:grpc-protobuf:1.54.1' - implementation 'com.google.guava:guava:31.1-jre' + implementation platform("com.google.cloud:libraries-bom:${cloudBomVersion}") + implementation "com.google.cloud:google-cloud-datalineage:0.60.0" + + implementation "com.google.protobuf:protobuf-java-util:${protobufVersion}" + implementation "com.google.api:gax:${gaxVersion}" + implementation "com.google.api:gax-grpc:${gaxVersion}" + implementation "com.google.api:gax-httpjson:${gaxHttpJsonVersion}" + implementation "io.grpc:grpc-protobuf:${grpcProtobufVersion}" + implementation "com.google.guava:guava:${guavaVersion}" + + // SLF4J logging + implementation "org.slf4j:slf4j-api:${slf4jVersion}" + + // Lombok for @Slf4j annotation + compileOnly "org.projectlombok:lombok:${lombokVersion}" + annotationProcessor "org.projectlombok:lombok:${lombokVersion}" + + // AutoValue + compileOnly "com.google.auto.value:auto-value-annotations:1.10.1" + annotationProcessor "com.google.auto.value:auto-value:1.10.1" + // Use JUnit test framework. - testImplementation 'junit:junit:4.13.2' - testImplementation 'com.google.truth:truth:1.1.3' - testImplementation 'org.mockito:mockito-core:3.+' - testImplementation 'com.google.api:gax:2.12.2:testlib' - testImplementation 'com.google.testparameterinjector:test-parameter-injector:1.8' + testImplementation "junit:junit:${junitVersion}" + testImplementation "com.google.truth:truth:${truthVersion}" + testImplementation "org.mockito:mockito-core:${mockitoVersion}" + testImplementation "com.google.api:gax:${gaxTestLibVersion}:testlib" + testImplementation "com.google.testparameterinjector:test-parameter-injector:${parameterInjectorVersion}" + + // Test logging dependencies + testImplementation "ch.qos.logback:logback-classic:${logbackVersion}" + testImplementation "ch.qos.logback:logback-core:${logbackVersion}" + + // Test Lombok + testCompileOnly "org.projectlombok:lombok:${lombokVersion}" + testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}" } test { @@ -54,14 +96,126 @@ test { shadowJar { archiveBaseName.set('data-lineage-producer-java-library') - relocate 'com.google.protobuf', 'datalineage.shaded.com.google.protobuf' - relocate 'com.google.guava', 'datalineage.shaded.com.google.guava' - relocate 'com.google.api', 'datalineage.shaded.com.google.api' - relocate 'com.google.auth', 'datalineage.shaded.com.google.auth' - relocate 'io.grpc', 'datalineage.shaded.io.grpc' - relocate 'com.google.common', 'datalineage.shaded.com.google.common' - relocate 'com.google.gson', 'datalineage.shaded.com.google.gson' - relocate 'META-INF/native/libio_grpc_netty_shaded_netty', 'META-INF/native/libdatalineage_shaded_io_grpc_netty_shaded_netty' - relocate 'META-INF/native/io_grpc_netty_shaded_netty', 'META-INF/native/datalineage_shaded_io_grpc_netty_shaded_netty' + archiveClassifier.set('') + + def excludedPrefixes = [ + 'android/', 'javax/', 'io/opentelemetry', 'com/google/', 'opencensus/', 'grpc/', 'google/', 'org/' + ] + + def includedPrefixes = [ + 'com/google/api', 'com/google/auth', 'com/google/common', + 'com/google/gson', 'com/google/protobuf', 'com/google/longrunning', + 'com/google/rpc', 'com/google/cloud/datalineage', + 'com/google/cloud/datacatalog', 'org/threeten' + ] + + def includedExactPaths = [ + 'com/google/cloud', 'org' + ] + + // exclude from shaded jar all packages starting with excludedPrefixes + // but keep those starting with includedPrefixes or containing exact paths from includedExactPaths + exclude { FileTreeElement fte -> + def isExcluded = excludedPrefixes.any { fte.path.startsWith(it) } + def isIncluded = includedPrefixes.any { fte.path.startsWith(it) } + || includedExactPaths.contains(fte.path) + return isExcluded && !isIncluded + } + + [ + 'com.google.guava', + 'com.google.longrunning', + 'com.google.rpc', + 'io.grpc', + 'io.opencensus', + 'io.perfmark', + 'com.google.common', + 'com.google.gson', + 'org.threeten.bp' + ].each { packageName -> + relocate packageName, "datalineage.shaded.${packageName}" + } + [ + 'META-INF/native/libio_grpc_netty_shaded_netty', + 'META-INF/native/io_grpc_netty_shaded_netty' + ].each { path -> + relocate path, path.replaceFirst(/[^\/]+$/, { "datalineage_shaded_$it" }) + } + mergeServiceFiles() } + +java { + withJavadocJar() + withSourcesJar() +} + +publishing { + publications { + mavenJava(MavenPublication) { + version = '1.1.0' + group = 'com.google.cloud.datalineage' + artifactId = 'producerclient' + + artifact shadowJar + artifact javadocJar + artifact sourcesJar + + pom { + name = 'Java producer library for Data Lineage.' + description = 'Library that provides synchronous and asynchronous clients for interacting with the Google Cloud Data Lineage API.' + url = 'https://github.com/GoogleCloudPlatform/data-lineage-producer-java-library' + properties = [:] + licenses { + license { + name = 'The Apache License, Version 2.0' + url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' + } + } + developers { + developer { + id = 'midamkina' + name = 'Mary Idamkina' + email = 'midamkina@google.com' + } + developer { + id = 'gabrysiaolsz' + name = 'Gabriela Olszewska' + email = 'gabrysiaolsz@google.com' + } + developer { + id = 'ngorchakova' + name = 'Natalia Gorchakova' + email = 'ngorchakova@google.com' + } + developer { + id = 'mikhalevich' + name = 'Konstantin Mikhalevich' + email = 'mikhalevich@google.com' + } + developer { + id = 'wojtekl' + name = 'Wojciech Ɓowiec' + email = 'wojtekl@google.com' + } + } + scm { + connection = 'scm:git:https://github.com/GoogleCloudPlatform/data-lineage-producer-java-library.git' + developerConnection = 'scm:git:ssh://git@github.com/GoogleCloudPlatform/data-lineage-producer-java-library.git' + url = 'https://github.com/GoogleCloudPlatform/data-lineage-producer-java-library' + } + } + } + } + repositories { + maven { + url "artifactregistry://us-central1-maven.pkg.dev/dataplex-gob-louhi-releaser/cloud-dataplex-maven-releases" + } + } +} + +javadoc { + if (JavaVersion.current().isJava9Compatible()) { + options.addBooleanOption('html5', true) + } +} diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCache.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCache.java index 76960c7..a579ab9 100755 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCache.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCache.java @@ -19,18 +19,29 @@ /** Cache used to store information about whether the API is disabled for a given project. */ public interface ApiEnablementCache { - /** @see ApiEnablementCache#markServiceAsDisabled(String, Duration) */ - void markServiceAsDisabled(String project); - /** * Mark service state as disabled for a given project name. * - * @param offset - suggests how long given project should be marked as disabled. It is not + * @param projectName The project for which to disable the service + * @see ApiEnablementCache#markServiceAsDisabled(String, Duration) + */ + void markServiceAsDisabled(String projectName); + + /** + * Mark service state as disabled for a given project name and duration. + * + * @param projectName The project for which to disable the service + * @param duration - suggests how long the project should be marked as disabled. It is not * guarantied that cache will indicate service state as disabled for given time. Behaviour * depends on implementation. */ - void markServiceAsDisabled(String projectName, Duration offset); + void markServiceAsDisabled(String projectName, Duration duration); - /** Indicates if service with provided projectName is marked as disabled. */ + /** + * Indicates if service with provided projectName is marked as disabled. + * + * @param projectName The project for which to disable the service + * @return `true` is the service is marked as disabled + */ boolean isServiceMarkedAsDisabled(String projectName); } diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheFactory.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheFactory.java index 22931b2..63c64ad 100644 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheFactory.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheFactory.java @@ -18,7 +18,10 @@ public class ApiEnablementCacheFactory { private static volatile ApiEnablementCache commonInstance; - public static ApiEnablementCache get(ApiEnablementCacheSettings settings) { + /** Make the factory class non-instantiable */ + private ApiEnablementCacheFactory() {} + + public static ApiEnablementCache get(CacheSettings settings) { if (!settings.getEnabled()) { return new NoOpApiEnablementCache(); } diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheSettings.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheSettings.java deleted file mode 100755 index 0887a8f..0000000 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheSettings.java +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.cloud.datalineage.producerclient; - -/** Provides an immutable object for storing connection cache settings. */ -public final class ApiEnablementCacheSettings { - - /** Disables connection cache feature. */ - public static ApiEnablementCacheSettings getDisabledInstance() { - return new ApiEnablementCacheSettings( - false, false, ApiEnablementCacheOptions.getDefaultInstance()); - } - - /** Uses common instance. If there is no such instance, creates one with default settings. */ - public static ApiEnablementCacheSettings getCommonInstance() { - return new ApiEnablementCacheSettings( - true, true, ApiEnablementCacheOptions.getDefaultInstance()); - } - - /** Uses common instance. If there is no such instance, creates one using provided settings. */ - public static ApiEnablementCacheSettings getCommonInstance( - ApiEnablementCacheOptions fallbackSettings) { - if (fallbackSettings == null) { - throw new IllegalArgumentException("defaultSettings cannot be null"); - } - return new ApiEnablementCacheSettings(true, true, fallbackSettings); - } - - /** Uses stand-alone instance with default settings. */ - public static ApiEnablementCacheSettings getStandAloneInstance() { - return new ApiEnablementCacheSettings( - true, false, ApiEnablementCacheOptions.getDefaultInstance()); - } - - /** Uses stand-alone instance with provided settings. */ - public static ApiEnablementCacheSettings getStandAloneInstance( - ApiEnablementCacheOptions settings) { - if (settings == null) { - throw new IllegalArgumentException("settings cannot be null"); - } - return new ApiEnablementCacheSettings(true, false, settings); - } - - private final boolean enabled; - private final boolean useCommonInstance; - private final ApiEnablementCacheOptions options; - - private ApiEnablementCacheSettings( - boolean enabled, boolean useCommonInstance, ApiEnablementCacheOptions options) { - this.enabled = enabled; - this.useCommonInstance = useCommonInstance; - this.options = options; - } - - public boolean getEnabled() { - return enabled; - } - - public boolean getUseCommonInstance() { - return useCommonInstance; - } - - public ApiEnablementCacheOptions getOptions() { - return options; - } -} diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheOptions.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/CacheOptions.java old mode 100755 new mode 100644 similarity index 61% rename from lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheOptions.java rename to lib/src/main/java/com/google/cloud/datalineage/producerclient/CacheOptions.java index b440ab5..99e6359 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheOptions.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/CacheOptions.java @@ -1,4 +1,4 @@ -// Copyright 2024 Google LLC +// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,10 +18,11 @@ import java.time.Duration; /** - * Provides an immutable object for ConnectionCache initialization. ApiEnablementCacheOptions object - * can be created via Builder. + * Provides an immutable object for Cache initialization. CacheOptions object can be created via + * Builder. */ -public class ApiEnablementCacheOptions { +public class CacheOptions { + protected static final Duration DEFAULT_DISABLED_TIME = Duration.ofMinutes(5); protected static final int DEFAULT_SIZE = 1000; protected static final Clock DEFAULT_CLOCK = Clock.systemDefaultZone(); @@ -30,18 +31,18 @@ public class ApiEnablementCacheOptions { private final int cacheSize; private final Clock clock; - protected ApiEnablementCacheOptions(ApiEnablementCacheOptions.Builder settingsBuilder) { + protected CacheOptions(CacheOptions.Builder settingsBuilder) { defaultCacheDisabledStatusTime = settingsBuilder.defaultCacheDisabledStatusTime; cacheSize = settingsBuilder.cacheSize; clock = settingsBuilder.clock; } - public static ApiEnablementCacheOptions.Builder newBuilder() { - return ApiEnablementCacheOptions.Builder.createDefault(); + public static CacheOptions.Builder newBuilder() { + return CacheOptions.Builder.createDefault(); } - public static ApiEnablementCacheOptions getDefaultInstance() { - return ApiEnablementCacheOptions.Builder.createDefault().build(); + public static CacheOptions getDefaultInstance() { + return CacheOptions.Builder.createDefault().build(); } public Duration getDefaultCacheDisabledStatusTime() { @@ -56,22 +57,40 @@ public Clock getClock() { return clock; } - public ApiEnablementCacheOptions.Builder toBuilder() { - return new ApiEnablementCacheOptions.Builder(this); + public CacheOptions.Builder toBuilder() { + return new CacheOptions.Builder(this); + } + + /** + * Returns true if the other object is also a CacheOptions and has the same values for all fields. + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof CacheOptions)) { + return false; + } + CacheOptions that = (CacheOptions) obj; + return this.cacheSize == that.cacheSize + && this.defaultCacheDisabledStatusTime.equals(that.defaultCacheDisabledStatusTime) + && this.clock.equals(that.clock); } /** - * * Builder for ApiEnablementCacheSettings. + * * Builder for CacheSettings. * *

Lets setting `markServiceAsDisabledTime`, `cacheSize`, and `clock`. Can be created by - * ApiEnablementCacheOptions.newBuilder method. To create settings object, use build method. + * CacheOptions.newBuilder method. To create settings object, use build method. */ public static class Builder { + protected Duration defaultCacheDisabledStatusTime; protected int cacheSize; protected Clock clock; - protected Builder(ApiEnablementCacheOptions settings) { + protected Builder(CacheOptions settings) { defaultCacheDisabledStatusTime = settings.defaultCacheDisabledStatusTime; cacheSize = settings.cacheSize; clock = settings.clock; @@ -83,12 +102,11 @@ protected Builder(Duration defaultCacheDisabledStatusTime, int cacheSize, Clock this.clock = clock; } - private static ApiEnablementCacheOptions.Builder createDefault() { - return new ApiEnablementCacheOptions.Builder( - DEFAULT_DISABLED_TIME, DEFAULT_SIZE, DEFAULT_CLOCK); + private static CacheOptions.Builder createDefault() { + return new CacheOptions.Builder(DEFAULT_DISABLED_TIME, DEFAULT_SIZE, DEFAULT_CLOCK); } - public ApiEnablementCacheOptions.Builder setDefaultCacheDisabledStatusTime( + public CacheOptions.Builder setDefaultCacheDisabledStatusTime( Duration defaultCacheDisabledStatusTime) { if (defaultCacheDisabledStatusTime.isNegative()) { throw new IllegalArgumentException("Duration cannot be negative"); @@ -97,7 +115,7 @@ public ApiEnablementCacheOptions.Builder setDefaultCacheDisabledStatusTime( return this; } - public ApiEnablementCacheOptions.Builder setCacheSize(int cacheSize) { + public CacheOptions.Builder setCacheSize(int cacheSize) { if (cacheSize < 0) { throw new IllegalArgumentException("Limit cannot be negative"); } @@ -105,13 +123,13 @@ public ApiEnablementCacheOptions.Builder setCacheSize(int cacheSize) { return this; } - public ApiEnablementCacheOptions.Builder setClock(Clock clock) { + public CacheOptions.Builder setClock(Clock clock) { this.clock = clock; return this; } - public ApiEnablementCacheOptions build() { - return new ApiEnablementCacheOptions(this); + public CacheOptions build() { + return new CacheOptions(this); } } } diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/CacheSettings.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/CacheSettings.java new file mode 100644 index 0000000..3ec3add --- /dev/null +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/CacheSettings.java @@ -0,0 +1,95 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +/** Provides an immutable object for storing cache settings. */ +public final class CacheSettings { + + /** + * Disables cache feature. + * + * @return The requested cache settings. + */ + public static CacheSettings getDisabledInstance() { + return new CacheSettings(false, false, CacheOptions.getDefaultInstance()); + } + + /** + * Uses common instance. If there is no such instance, creates one with default cache options. + * + * @return The requested cache settings. + */ + public static CacheSettings getCommonInstance() { + return new CacheSettings(true, true, CacheOptions.getDefaultInstance()); + } + + /** + * Uses common instance. If there is no such instance, creates one using the provided cache + * options. + * + * @param fallbackOptions The fallback cache options. + * @return The requested cache settings. + */ + public static CacheSettings getCommonInstance(CacheOptions fallbackOptions) { + if (fallbackOptions == null) { + throw new IllegalArgumentException("defaultSettings cannot be null"); + } + return new CacheSettings(true, true, fallbackOptions); + } + + /** + * Uses stand-alone instance with default cache options. + * + * @return The requested cache settings. + */ + public static CacheSettings getStandAloneInstance() { + return new CacheSettings(true, false, CacheOptions.getDefaultInstance()); + } + + /** + * Uses stand-alone instance with provided settings. + * + * @param options The cache options + * @return The requested cache settings. + */ + public static CacheSettings getStandAloneInstance(CacheOptions options) { + if (options == null) { + throw new IllegalArgumentException("settings cannot be null"); + } + return new CacheSettings(true, false, options); + } + + private final boolean enabled; + private final boolean useCommonInstance; + private final CacheOptions options; + + private CacheSettings(boolean enabled, boolean useCommonInstance, CacheOptions options) { + this.enabled = enabled; + this.useCommonInstance = useCommonInstance; + this.options = options; + } + + public boolean getEnabled() { + return enabled; + } + + public boolean getUseCommonInstance() { + return useCommonInstance; + } + + public CacheOptions getOptions() { + return options; + } +} diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/LineageEnablementCache.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/LineageEnablementCache.java new file mode 100644 index 0000000..30deeed --- /dev/null +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/LineageEnablementCache.java @@ -0,0 +1,50 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import java.time.Duration; + +/** + * Cache used to store information about whether Lineage ingestion is disabled for a given project + * in LineageConfigurations. + */ +public interface LineageEnablementCache { + + /** + * Mark lineage ingestion state as disabled for a given project name. + * + * @param projectName The project for which to disable the lineage ingestion + * @see LineageEnablementCache#markLineageAsDisabled(String, Duration) + */ + void markLineageAsDisabled(String projectName); + + /** + * Mark lineage ingestion state as disabled for a given project name and duration. + * + * @param projectName The project for which to disable the lineage ingestion + * @param duration - suggests how long the project should be marked as disabled. It is not + * guarantied that cache will indicate lineage ingestion enablement state as disabled for + * given time. Behaviour depends on implementation. + */ + void markLineageAsDisabled(String projectName, Duration duration); + + /** + * Indicates if lineage ingestion with provided projectName is marked as disabled. + * + * @param projectName The project for which to disable the lineage ingestion + * @return `true` if the lineage ingestion is marked as disabled + */ + boolean isLineageMarkedAsDisabled(String projectName); +} diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/LineageEnablementCacheFactory.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/LineageEnablementCacheFactory.java new file mode 100644 index 0000000..0f5a566 --- /dev/null +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/LineageEnablementCacheFactory.java @@ -0,0 +1,45 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +/** A factory that returns ConnectionCache based on LineageEnablementCacheSettings. */ +public class LineageEnablementCacheFactory { + private static volatile LineageEnablementCache commonInstance; + + /** Make the factory class non-instantiable */ + private LineageEnablementCacheFactory() {} + + public static LineageEnablementCache get(CacheSettings settings) { + if (!settings.getEnabled()) { + return new NoOpLineageEnablementCache(); + } + + if (!settings.getUseCommonInstance()) { + return new StandardLineageEnablementCache(settings.getOptions()); + } + + if (commonInstance != null) { + return commonInstance; + } + + synchronized (LineageEnablementCacheFactory.class) { + if (commonInstance == null) { + commonInstance = new StandardLineageEnablementCache(settings.getOptions()); + } + } + + return commonInstance; + } +} diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/NoOpApiEnablementCache.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/NoOpApiEnablementCache.java index fa33a20..ba58861 100755 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/NoOpApiEnablementCache.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/NoOpApiEnablementCache.java @@ -21,7 +21,7 @@ public class NoOpApiEnablementCache implements ApiEnablementCache { public void markServiceAsDisabled(String project) {} - public void markServiceAsDisabled(String project, Duration offset) {} + public void markServiceAsDisabled(String project, Duration duration) {} public boolean isServiceMarkedAsDisabled(String project) { return false; diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/NoOpLineageEnablementCache.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/NoOpLineageEnablementCache.java new file mode 100644 index 0000000..9470bd4 --- /dev/null +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/NoOpLineageEnablementCache.java @@ -0,0 +1,32 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import java.time.Duration; + +/** No-op implementation of the LineageEnablementCache that does nothing. */ +public class NoOpLineageEnablementCache implements LineageEnablementCache { + + @Override + public void markLineageAsDisabled(String projectName) {} + + @Override + public void markLineageAsDisabled(String projectName, Duration duration) {} + + @Override + public boolean isLineageMarkedAsDisabled(String projectName) { + return false; + } +} diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/ProjectStatusCache.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/ProjectStatusCache.java new file mode 100644 index 0000000..532ebd2 --- /dev/null +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/ProjectStatusCache.java @@ -0,0 +1,82 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.time.Clock; +import java.time.Duration; +import java.time.LocalDateTime; +import lombok.extern.slf4j.Slf4j; + +/** + * Generic cache to indicate whether a feature is disabled for a given project. + * + *

This class is thread-safe. There is no eviction guaranteed in case of cache overload. + */ +@Slf4j +public class ProjectStatusCache { + + private final Cache projectToLockEndTime; + private final Duration defaultCacheDisabledStatusTime; + private final Clock clock; + private final String cacheName; + + public ProjectStatusCache(CacheOptions options, String cacheName) { + log.debug( + "Initializing ProjectStatusCache '{}' with cache size: {}, default disabled duration: {}", + cacheName, + options.getCacheSize(), + options.getDefaultCacheDisabledStatusTime()); + this.defaultCacheDisabledStatusTime = options.getDefaultCacheDisabledStatusTime(); + this.clock = options.getClock(); + this.projectToLockEndTime = + CacheBuilder.newBuilder().maximumSize(options.getCacheSize()).build(); + this.cacheName = cacheName; + } + + public synchronized void markProjectAsDisabled(String project) { + markProjectAsDisabled(project, defaultCacheDisabledStatusTime); + } + + public synchronized void markProjectAsDisabled(String projectName, Duration duration) { + log.warn( + "Marking project '{}' as disabled in cache '{}' for duration: {}", + projectName, + cacheName, + duration); + projectToLockEndTime.put(projectName, LocalDateTime.now(clock).plus(duration)); + } + + public synchronized boolean isProjectDisabled(String projectName) { + LocalDateTime maybeTime = projectToLockEndTime.getIfPresent(projectName); + if (maybeTime == null) { + log.debug("No cache entry found for project '{}' in cache '{}'", projectName, cacheName); + return false; + } + boolean isDisabled = !maybeTime.isBefore(LocalDateTime.now(clock)); + if (isDisabled) { + log.debug( + "Project '{}' is marked as disabled in cache '{}' until {}", + projectName, + cacheName, + maybeTime); + } else { + log.debug( + "Project disability has expired for project '{}' in cache '{}'", projectName, cacheName); + } + return isDisabled; + } +} diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/StandardApiEnablementCache.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/StandardApiEnablementCache.java index 0df1ff5..b22c194 100755 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/StandardApiEnablementCache.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/StandardApiEnablementCache.java @@ -14,60 +14,32 @@ package com.google.cloud.datalineage.producerclient; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import java.time.Clock; import java.time.Duration; -import java.time.LocalDateTime; /** - * Cache used to indicate whether API is disabled for given project. + * Cache used to indicate whether the Data Lineage API is disabled for a given project. * - *

Class lets specify default cache size, default duration of service disability and Clock. - * Structure is thread-safe. There is no eviction guaranteed in case of cache overload. + *

This class is a wrapper around a generic ProjectStatusCache. */ public class StandardApiEnablementCache implements ApiEnablementCache { - private final Cache projectToLockEndTime; - private final Duration defaultCacheDisabledStatusTime; - private final Clock clock; + private final ProjectStatusCache delegate; - StandardApiEnablementCache(ApiEnablementCacheOptions options) { - defaultCacheDisabledStatusTime = options.getDefaultCacheDisabledStatusTime(); - clock = options.getClock(); - - projectToLockEndTime = CacheBuilder.newBuilder().maximumSize(options.getCacheSize()).build(); + StandardApiEnablementCache(CacheOptions options) { + this.delegate = new ProjectStatusCache(options, "API Enablement"); } - /** - * Defaults Duration to value specified by constructor. - * - * @see StandardApiEnablementCache#markServiceAsDisabled(String, Duration) - */ - public synchronized void markServiceAsDisabled(String project) { - markServiceAsDisabled(project, defaultCacheDisabledStatusTime); + @Override + public void markServiceAsDisabled(String project) { + delegate.markProjectAsDisabled(project); } - /** - * Sets service state for given project as disabled from current timestamp to current timestamp - * increased by duration time. - * - *

It is not guarantied that cache will indicate service state as disabled up to calculated - * value. Specified entry may be deleted if cache is overloaded. - */ - public synchronized void markServiceAsDisabled(String projectName, Duration offset) { - projectToLockEndTime.put(projectName, LocalDateTime.now(clock).plus(offset)); + @Override + public void markServiceAsDisabled(String projectName, Duration duration) { + delegate.markProjectAsDisabled(projectName, duration); } - /** - * Indicates if service is disabled for given project. - * - * @return A boolean that indicates if service is disabled for given project - */ - public synchronized boolean isServiceMarkedAsDisabled(String projectName) { - LocalDateTime maybeTime = projectToLockEndTime.getIfPresent(projectName); - if (maybeTime == null) { - return false; - } - return !maybeTime.isBefore(LocalDateTime.now(clock)); + @Override + public boolean isServiceMarkedAsDisabled(String projectName) { + return delegate.isProjectDisabled(projectName); } } diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/StandardLineageEnablementCache.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/StandardLineageEnablementCache.java new file mode 100644 index 0000000..68c82c5 --- /dev/null +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/StandardLineageEnablementCache.java @@ -0,0 +1,46 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import java.time.Duration; + +/** + * Cache used to indicate whether Lineage Ingestion is disabled for a given project. + * + *

This class is a wrapper around a generic ProjectStatusCache. + */ +public class StandardLineageEnablementCache implements LineageEnablementCache { + + private final ProjectStatusCache delegate; + + StandardLineageEnablementCache(CacheOptions options) { + this.delegate = new ProjectStatusCache(options, "Lineage Enablement"); + } + + @Override + public void markLineageAsDisabled(String project) { + delegate.markProjectAsDisabled(project); + } + + @Override + public void markLineageAsDisabled(String projectName, Duration duration) { + delegate.markProjectAsDisabled(projectName, duration); + } + + @Override + public boolean isLineageMarkedAsDisabled(String projectName) { + return delegate.isProjectDisabled(projectName); + } +} diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/GrpcHelper.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/GrpcHelper.java index 551bb46..137a94d 100644 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/GrpcHelper.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/GrpcHelper.java @@ -16,25 +16,33 @@ import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.StatusCode.Code; -import com.google.protobuf.Any; +import com.google.common.collect.ImmutableSet; import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.ErrorInfo; import com.google.rpc.Status; import io.grpc.protobuf.StatusProto; +import lombok.extern.slf4j.Slf4j; /** Set of helpers for Grpc handling. */ +@Slf4j public class GrpcHelper { + /** Make this helper class non-instantiable */ + private GrpcHelper() {} + /** - * Lets getting reason field from grpc response. + * Returns a set of error reasons from com.google.rpc.Status of a gRPC Exception. * * @param grpcException - error returned form grpc call - * @return string with value from reason field + * @return a set of strings with reasons extracted from Status if any */ - public static String getReason(Throwable grpcException) { + public static ImmutableSet getErrorReasons(Throwable grpcException) { + log.debug("Extracting reasons from gRPC exception: {}", grpcException.getMessage()); Status statusProto = StatusProto.fromThrowable(grpcException); if (statusProto == null) { - throw new IllegalArgumentException("Provided throwable is not a Grpc exception"); + log.error( + "Provided throwable is not a gRPC exception: {}", grpcException.getClass().getName()); + throw new IllegalArgumentException("Provided throwable is not a gRPC exception"); } /* Status is a standard way to represent API error. * This model consists of code, message and details. @@ -44,16 +52,20 @@ public static String getReason(Throwable grpcException) { * - user can introduce new ones. * That's why in order to get ErrorInfo, we need to iterate over details and check all of them. */ - for (Any any : statusProto.getDetailsList()) { - if (any.is(ErrorInfo.class)) { - try { - return any.unpack(ErrorInfo.class).getReason(); - } catch (InvalidProtocolBufferException exception) { - throw new IllegalArgumentException("Invalid protocol buffer message", exception); - } - } - } - throw new IllegalArgumentException("Message does not contain ErrorInfo", grpcException); + return statusProto.getDetailsList().stream() + .filter((detail) -> detail.is(ErrorInfo.class)) + .map( + (errorInfo) -> { + try { + String reason = errorInfo.unpack(ErrorInfo.class).getReason(); + log.debug("Successfully extracted reason from ErrorInfo: {}", reason); + return reason; + } catch (InvalidProtocolBufferException e) { + log.error("Invalid protocol buffer message while extracting ErrorInfo", e); + throw new IllegalArgumentException("Invalid protocol buffer message", e); + } + }) + .collect(ImmutableSet.toImmutableSet()); } /** diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/NamesHelper.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/NamesHelper.java index 308aa50..ff654a6 100644 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/NamesHelper.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/NamesHelper.java @@ -24,6 +24,9 @@ public class NamesHelper { Pattern.compile( "^(?<" + PROJECT_NAME_AND_LOCATION_GROUP_NAME + ">projects/[^/]+/locations/[^/]+).*$"); + /** Make this helper class non-instantiable */ + private NamesHelper() {} + public static String getProjectNameWithLocationFromResourceName(String resourceName) { Matcher matcher = RESOURCE_PATTERN.matcher(resourceName); if (!matcher.matches()) { diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/OpenLineageHelper.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/OpenLineageHelper.java index 586d41e..50af7d6 100644 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/OpenLineageHelper.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/OpenLineageHelper.java @@ -21,7 +21,13 @@ /** Helper class for working with OpenLineage messages */ public class OpenLineageHelper { - /** Converts a valid JSON String to a protobuf Struct */ + /** + * Converts a JSON string to a protobuf Struct. + * + * @param json The JSON string to be converted + * @return A Struct object representing the input JSON + * @throws InvalidProtocolBufferException If the input JSON is invalid or cannot be parsed + */ public static Struct jsonToStruct(String json) throws InvalidProtocolBufferException { Struct.Builder message = Struct.newBuilder(); JsonFormat.parser().ignoringUnknownFields().merge(json, message); diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageClient.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageClient.java index 3f127d0..2ee7030 100755 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageClient.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageClient.java @@ -54,6 +54,7 @@ public interface AsyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return An OperationFuture that represents the long-running operation. * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ OperationFuture deleteProcess(DeleteProcessRequest request); @@ -73,6 +74,7 @@ public interface AsyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return An OperationFuture that represents the long-running operation. * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ OperationFuture deleteRun(DeleteRunRequest request); @@ -93,6 +95,7 @@ public interface AsyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return ApiFuture that represents the asynchronous operation. * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ ApiFuture deleteLineageEvent(DeleteLineageEventRequest request); @@ -111,6 +114,7 @@ public interface AsyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return ApiFuture that represents the asynchronous operation. * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ ApiFuture getProcess(GetProcessRequest request); @@ -129,6 +133,7 @@ public interface AsyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return ApiFuture that represents the asynchronous operation. * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ ApiFuture getRun(GetRunRequest request); @@ -148,6 +153,7 @@ public interface AsyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return ApiFuture that represents the asynchronous operation. * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ ApiFuture getLineageEvent(GetLineageEventRequest request); @@ -169,6 +175,7 @@ public interface AsyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return ApiFuture that represents the asynchronous operation. * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ ApiFuture listProcesses(ListProcessesRequest request); @@ -190,6 +197,7 @@ public interface AsyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return ApiFuture that represents the asynchronous operation. * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ ApiFuture listRuns(ListRunsRequest request); @@ -211,6 +219,7 @@ public interface AsyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return ApiFuture that represents the asynchronous operation. * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ ApiFuture listLineageEvents(ListLineageEventsRequest request); @@ -229,6 +238,7 @@ public interface AsyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return ApiFuture that represents the asynchronous operation. * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ ApiFuture processOpenLineageRunEvent( diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClient.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClient.java index 4c0576e..733c699 100755 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClient.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClient.java @@ -35,9 +35,13 @@ import com.google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEventRequest; import com.google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEventResponse; import com.google.cloud.datacatalog.lineage.v1.Run; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Empty; import java.io.IOException; import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.threeten.bp.Duration; +import org.threeten.bp.Instant; /** * * Async producer client. @@ -45,92 +49,127 @@ *

Implements AsyncLineageClient using client library. This implementation also provides support * for connection cache. */ +@Slf4j public final class AsyncLineageProducerClient implements BackgroundResource, AsyncLineageClient { public static AsyncLineageProducerClient create() throws IOException { - return create(AsyncLineageProducerClientSettings.newBuilder().build()); + return create(AsyncLineageProducerClientSettings.defaultInstance()); } public static AsyncLineageProducerClient create(AsyncLineageProducerClientSettings settings) throws IOException { + log.debug( + "Creating AsyncLineageProducerClient with graceful shutdown duration: {}", + settings.getGracefulShutdownDuration()); return new AsyncLineageProducerClient(settings); } + @VisibleForTesting static AsyncLineageProducerClient create(BasicLineageClient basicClient) throws IOException { - return new AsyncLineageProducerClient(basicClient); + return create(basicClient, AsyncLineageProducerClientSettings.defaultInstance()); + } + + @VisibleForTesting + static AsyncLineageProducerClient create( + BasicLineageClient basicClient, AsyncLineageProducerClientSettings settings) + throws IOException { + return new AsyncLineageProducerClient(basicClient, settings); } private final InternalClient client; + private final Duration gracefulShutdownDuration; private AsyncLineageProducerClient(AsyncLineageProducerClientSettings settings) throws IOException { client = InternalClient.create(settings); + this.gracefulShutdownDuration = settings.getGracefulShutdownDuration(); } - private AsyncLineageProducerClient(BasicLineageClient basicClient) throws IOException { + private AsyncLineageProducerClient( + BasicLineageClient basicClient, AsyncLineageProducerClientSettings settings) + throws IOException { client = InternalClient.create(basicClient); + this.gracefulShutdownDuration = settings.getGracefulShutdownDuration(); } @Override public ApiFuture deleteLineageEvent(DeleteLineageEventRequest request) { + log.debug("Deleting lineage event: {}", request.getName()); return client.deleteLineageEvent(request); } @Override public OperationFuture deleteProcess(DeleteProcessRequest request) { + log.debug("Deleting process: {}", request.getName()); return client.deleteProcess(request); } @Override public OperationFuture deleteRun(DeleteRunRequest request) { + log.debug("Deleting run: {}", request.getName()); return client.deleteRun(request); } @Override public ApiFuture getLineageEvent(GetLineageEventRequest request) { + log.debug("Getting lineage event: {}", request.getName()); return client.getLineageEvent(request); } @Override public ApiFuture getProcess(GetProcessRequest request) { + log.debug("Getting process: {}", request.getName()); return client.getProcess(request); } @Override public ApiFuture getRun(GetRunRequest request) { + log.debug("Getting run: {}", request.getName()); return client.getRun(request); } @Override public ApiFuture listLineageEvents( ListLineageEventsRequest request) { + log.debug("Listing lineage events for parent: {}", request.getParent()); return client.listLineageEvents(request); } @Override public ApiFuture listProcesses(ListProcessesRequest request) { + log.debug("Listing processes for parent: {}", request.getParent()); return client.listProcesses(request); } @Override public ApiFuture listRuns(ListRunsRequest request) { + log.debug("Listing runs for parent: {}", request.getParent()); return client.listRuns(request); } @Override public ApiFuture processOpenLineageRunEvent( ProcessOpenLineageRunEventRequest request) { + log.debug("Processing OpenLineage run event: {}", request.getOpenLineage()); return client.processOpenLineageRunEvent(request); } @Override public void close() throws Exception { + Instant start = Instant.now(); client.close(); + gracefulShutdown(start); } @Override public void shutdown() { + Instant start = Instant.now(); client.shutdown(); + try { + gracefulShutdown(start); + } catch (InterruptedException e) { + log.warn("Interrupted during shutdown", e); + } } @Override @@ -152,4 +191,26 @@ public void shutdownNow() { public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { return client.awaitTermination(duration, unit); } + + private void gracefulShutdown(Instant shutdownStartedAt) throws InterruptedException { + if (gracefulShutdownDuration.isZero()) { + log.warn( + "AsyncLineageProducerClient graceful shutdown duration was set to zero. " + + "This effectively means hard shutdown with potential data loss"); + return; + } + log.debug("Starting graceful shutdown with duration: {}", gracefulShutdownDuration); + boolean terminated = + awaitTermination( + gracefulShutdownDuration + .minus(Duration.between(shutdownStartedAt, Instant.now())) + .toNanos(), + TimeUnit.NANOSECONDS); + if (!terminated) { + log.warn( + "AsyncLineageProducerClient did not terminate within the " + + "graceful shutdown duration: {}", + gracefulShutdownDuration); + } + } } diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientSettings.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientSettings.java index e18ae5f..ddfad7a 100755 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientSettings.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientSettings.java @@ -22,7 +22,7 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.WatchdogProvider; import com.google.cloud.datacatalog.lineage.v1.stub.LineageStubSettings; -import com.google.cloud.datalineage.producerclient.ApiEnablementCacheSettings; +import com.google.cloud.datalineage.producerclient.CacheSettings; import java.io.IOException; import javax.annotation.Nullable; import org.threeten.bp.Duration; @@ -33,6 +33,9 @@ */ public final class AsyncLineageProducerClientSettings extends LineageBaseSettings { + public static final Duration DEFAULT_GRACEFUL_SHUTDOWN_DURATION = Duration.ofSeconds(30); + private final Duration gracefulShutdownDuration; + public static Builder newBuilder() { return Builder.createDefault(); } @@ -47,6 +50,11 @@ public static AsyncLineageProducerClientSettings defaultInstance() throws IOExce private AsyncLineageProducerClientSettings(Builder settingsBuilder) throws IOException { super(settingsBuilder); + this.gracefulShutdownDuration = settingsBuilder.gracefulShutdownDuration; + } + + public Duration getGracefulShutdownDuration() { + return gracefulShutdownDuration; } /** @@ -57,6 +65,7 @@ private AsyncLineageProducerClientSettings(Builder settingsBuilder) throws IOExc * method. */ public static final class Builder extends LineageBaseSettings.Builder { + private Duration gracefulShutdownDuration = DEFAULT_GRACEFUL_SHUTDOWN_DURATION; private static Builder createDefault() { return new Builder(LineageStubSettings.newBuilder()); @@ -68,6 +77,7 @@ private static Builder createDefault() { Builder(AsyncLineageProducerClientSettings settings) { super(settings); + this.gracefulShutdownDuration = settings.gracefulShutdownDuration; } Builder(LineageStubSettings.Builder stubSettings) { @@ -80,9 +90,22 @@ public AsyncLineageProducerClientSettings build() throws IOException { } @Override - public SyncLineageProducerClientSettings.Builder setConnectionCacheSettings( - ApiEnablementCacheSettings settings) { - return (SyncLineageProducerClientSettings.Builder) super.setConnectionCacheSettings(settings); + public Builder setApiEnablementCacheSettings(CacheSettings settings) { + return (Builder) super.setApiEnablementCacheSettings(settings); + } + + @Override + public Builder setLineageEnablementCacheSettings(CacheSettings settings) { + return (Builder) super.setLineageEnablementCacheSettings(settings); + } + + public Builder setGracefulShutdownDuration(Duration gracefulShutdownDuration) { + this.gracefulShutdownDuration = gracefulShutdownDuration; + return this; + } + + public Duration getGracefulShutdownDuration() { + return gracefulShutdownDuration; } @Override diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/InternalClient.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/InternalClient.java index 303c0d3..2491075 100755 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/InternalClient.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/InternalClient.java @@ -41,17 +41,24 @@ import com.google.cloud.datacatalog.lineage.v1.Run; import com.google.cloud.datalineage.producerclient.ApiEnablementCache; import com.google.cloud.datalineage.producerclient.ApiEnablementCacheFactory; +import com.google.cloud.datalineage.producerclient.LineageEnablementCache; +import com.google.cloud.datalineage.producerclient.LineageEnablementCacheFactory; import com.google.cloud.datalineage.producerclient.helpers.GrpcHelper; import com.google.cloud.datalineage.producerclient.helpers.NamesHelper; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Empty; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Wraps standard lineage client and provides common functionalities. */ final class InternalClient implements AsyncLineageClient { + private static final Logger logger = LoggerFactory.getLogger(AsyncLineageProducerClient.class); + static InternalClient create() throws IOException { return create(LineageBaseSettings.defaultInstance()); } @@ -66,9 +73,12 @@ static InternalClient create(BasicLineageClient client) throws IOException { private final BasicLineageClient client; private final ApiEnablementCache apiEnablementCache; + private final LineageEnablementCache lineageEnablementCache; private InternalClient(LineageBaseSettings settings, BasicLineageClient basicLineageClient) { - apiEnablementCache = ApiEnablementCacheFactory.get(settings.getConnectionCacheSettings()); + apiEnablementCache = ApiEnablementCacheFactory.get(settings.getApiEnablementCacheSettings()); + lineageEnablementCache = + LineageEnablementCacheFactory.get(settings.getLineageEnablementCacheSettings()); client = basicLineageClient; } @@ -171,7 +181,17 @@ private , T> F handleCall(Supplier call, String resour throw ApiExceptionFactory.createException( "Data Lineage API is disabled in project " + projectName - + ". Please enable the API and try again after a few minutes.", + + ". Please enable the API and try again later.", + null, + GrpcHelper.getStatusCodeFromCode(Code.PERMISSION_DENIED), + false); + } + + if (lineageEnablementCache.isLineageMarkedAsDisabled(projectName)) { + throw ApiExceptionFactory.createException( + "Lineage is not enabled in Lineage Configurations for project " + + projectName + + ". Please enable Lineage in Lineage Configurations and try again later", null, GrpcHelper.getStatusCodeFromCode(Code.PERMISSION_DENIED), false); @@ -183,13 +203,25 @@ private , T> F handleCall(Supplier call, String resour new ApiFutureCallback<>() { @Override public void onFailure(Throwable exception) { - if (GrpcHelper.getReason(exception).equals("SERVICE_DISABLED")) { + logger.error( + "Failed to call API for resource {}: {}", + resourceName, + exception.getMessage(), + exception); + ImmutableSet reasons = GrpcHelper.getErrorReasons(exception); + if (reasons.contains("SERVICE_DISABLED")) { apiEnablementCache.markServiceAsDisabled(projectName); + } else if (reasons.contains("LINEAGE_INGESTION_DISABLED")) { + lineageEnablementCache.markLineageAsDisabled(projectName); } } @Override - public void onSuccess(Object result) {} + public void onSuccess(Object result) { + if (logger.isDebugEnabled()) { + logger.debug("Successfully called API for resource: {}", resourceName); + } + } }, MoreExecutors.directExecutor()); diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/LineageBaseSettings.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/LineageBaseSettings.java index 831e227..fd78907 100755 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/LineageBaseSettings.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/LineageBaseSettings.java @@ -19,7 +19,7 @@ import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.datacatalog.lineage.v1.LineageSettings; import com.google.cloud.datacatalog.lineage.v1.stub.LineageStubSettings; -import com.google.cloud.datalineage.producerclient.ApiEnablementCacheSettings; +import com.google.cloud.datalineage.producerclient.CacheSettings; import com.google.common.collect.ImmutableSet; import java.io.IOException; import org.threeten.bp.Duration; @@ -42,17 +42,23 @@ public static LineageBaseSettings defaultInstance() throws IOException { return newBuilder().build(); } - private final ApiEnablementCacheSettings apiEnablementCacheSettings; + private final CacheSettings apiEnablementCacheSettings; + private final CacheSettings lineageEnablementCacheSettings; protected LineageBaseSettings(Builder settingsBuilder) throws IOException { super(settingsBuilder); this.apiEnablementCacheSettings = settingsBuilder.apiEnablementCacheSettings; + this.lineageEnablementCacheSettings = settingsBuilder.lineageEnablementCacheSettings; } - public ApiEnablementCacheSettings getConnectionCacheSettings() { + public CacheSettings getApiEnablementCacheSettings() { return apiEnablementCacheSettings; } + public CacheSettings getLineageEnablementCacheSettings() { + return lineageEnablementCacheSettings; + } + @Override public Builder toBuilder() { return new Builder(this); @@ -86,36 +92,46 @@ private static Builder createDefault() { return new Builder(LineageStubSettings.newBuilder()); } - private ApiEnablementCacheSettings apiEnablementCacheSettings; + private CacheSettings apiEnablementCacheSettings; + private CacheSettings lineageEnablementCacheSettings; protected Builder() throws IOException { super(); - apiEnablementCacheSettings = ApiEnablementCacheSettings.getCommonInstance(); + apiEnablementCacheSettings = CacheSettings.getCommonInstance(); + lineageEnablementCacheSettings = CacheSettings.getCommonInstance(); applyDefaultRetryPolicy(); } protected Builder(ClientContext clientContext) { super(clientContext); - apiEnablementCacheSettings = ApiEnablementCacheSettings.getCommonInstance(); + apiEnablementCacheSettings = CacheSettings.getCommonInstance(); + lineageEnablementCacheSettings = CacheSettings.getCommonInstance(); applyDefaultRetryPolicy(); } protected Builder(LineageBaseSettings settings) { super(settings); this.apiEnablementCacheSettings = settings.apiEnablementCacheSettings; + this.lineageEnablementCacheSettings = settings.lineageEnablementCacheSettings; } protected Builder(LineageStubSettings.Builder stubSettings) { super(stubSettings); - apiEnablementCacheSettings = ApiEnablementCacheSettings.getCommonInstance(); + apiEnablementCacheSettings = CacheSettings.getCommonInstance(); + lineageEnablementCacheSettings = CacheSettings.getCommonInstance(); applyDefaultRetryPolicy(); } - public Builder setConnectionCacheSettings(ApiEnablementCacheSettings settings) { + public Builder setApiEnablementCacheSettings(CacheSettings settings) { apiEnablementCacheSettings = settings; return this; } + public Builder setLineageEnablementCacheSettings(CacheSettings settings) { + lineageEnablementCacheSettings = settings; + return this; + } + @Override public LineageBaseSettings build() throws IOException { return new LineageBaseSettings(this); diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageClient.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageClient.java index 1513ed2..e5373fd 100755 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageClient.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageClient.java @@ -51,6 +51,8 @@ public interface SyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @throws InterruptedException if the thread is interrupted while waiting + * @throws ExecutionException if the computation threw an exception * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ void deleteProcess(DeleteProcessRequest request) throws ExecutionException, InterruptedException; @@ -70,6 +72,8 @@ public interface SyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @throws InterruptedException if the thread is interrupted while waiting + * @throws ExecutionException if the computation threw an exception * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ void deleteRun(DeleteRunRequest request) throws ExecutionException, InterruptedException; @@ -108,6 +112,7 @@ public interface SyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return The requested process object * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ Process getProcess(GetProcessRequest request); @@ -126,6 +131,7 @@ public interface SyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return The requested run object * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ Run getRun(GetRunRequest request); @@ -145,6 +151,7 @@ public interface SyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return The requested lineage event object * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ LineageEvent getLineageEvent(GetLineageEventRequest request); @@ -166,6 +173,7 @@ public interface SyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return Paged response with the requested processes * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ ListProcessesPagedResponse listProcesses(ListProcessesRequest request); @@ -187,6 +195,7 @@ public interface SyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return The paged response with the requested runs * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ ListRunsPagedResponse listRuns(ListRunsRequest request); @@ -208,6 +217,7 @@ public interface SyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return The paged response with the requested lineage events * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ ListLineageEventsPagedResponse listLineageEvents(ListLineageEventsRequest request); @@ -226,6 +236,7 @@ public interface SyncLineageClient extends BackgroundResource { * } * * @param request Required. The request object that will be used to execute API call. + * @return The API call response with details about the processed event(s). * @throws com.google.api.gax.rpc.ApiException if the remote call fails */ ProcessOpenLineageRunEventResponse processOpenLineageRunEvent( diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClient.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClient.java index a7a4b27..8873fc4 100755 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClient.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClient.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; /** * * Sync lineage producer client. @@ -42,6 +43,7 @@ *

Implements SyncLineageClient using client library. This implementation also provides support * for connection cache. */ +@Slf4j public final class SyncLineageProducerClient implements SyncLineageClient { public static SyncLineageProducerClient create() throws IOException { @@ -69,53 +71,63 @@ private SyncLineageProducerClient(BasicLineageClient basicClient) throws IOExcep @Override public void deleteLineageEvent(DeleteLineageEventRequest request) { + log.debug("Deleting lineage event: {}", request.getName()); ApiExceptions.callAndTranslateApiException(client.deleteLineageEvent(request)); } @Override public void deleteProcess(DeleteProcessRequest request) throws ExecutionException, InterruptedException { + log.debug("Deleting process: {}", request.getName()); client.deleteProcess(request).get(); } @Override public void deleteRun(DeleteRunRequest request) throws ExecutionException, InterruptedException { + log.debug("Deleting run: {}", request.getName()); client.deleteRun(request).get(); } @Override public LineageEvent getLineageEvent(GetLineageEventRequest request) { + log.debug("Getting lineage event: {}", request.getName()); return ApiExceptions.callAndTranslateApiException(client.getLineageEvent(request)); } @Override public Process getProcess(GetProcessRequest request) { + log.debug("Getting process: {}", request.getName()); return ApiExceptions.callAndTranslateApiException(client.getProcess(request)); } @Override public Run getRun(GetRunRequest request) { + log.debug("Getting run: {}", request.getName()); return ApiExceptions.callAndTranslateApiException(client.getRun(request)); } @Override public ListLineageEventsPagedResponse listLineageEvents(ListLineageEventsRequest request) { + log.debug("Listing lineage events for parent: {}", request.getParent()); return ApiExceptions.callAndTranslateApiException(client.listLineageEvents(request)); } @Override public ListProcessesPagedResponse listProcesses(ListProcessesRequest request) { + log.debug("Listing processes for parent: {}", request.getParent()); return ApiExceptions.callAndTranslateApiException(client.listProcesses(request)); } @Override public ListRunsPagedResponse listRuns(ListRunsRequest request) { + log.debug("Listing runs for parent: {}", request.getParent()); return ApiExceptions.callAndTranslateApiException(client.listRuns(request)); } @Override public ProcessOpenLineageRunEventResponse processOpenLineageRunEvent( ProcessOpenLineageRunEventRequest request) { + log.debug("Processing OpenLineage run event: {}", request.getOpenLineage()); return ApiExceptions.callAndTranslateApiException(client.processOpenLineageRunEvent(request)); } diff --git a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClientSettings.java b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClientSettings.java index 818726a..c653a08 100755 --- a/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClientSettings.java +++ b/lib/src/main/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClientSettings.java @@ -22,7 +22,7 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.WatchdogProvider; import com.google.cloud.datacatalog.lineage.v1.stub.LineageStubSettings; -import com.google.cloud.datalineage.producerclient.ApiEnablementCacheSettings; +import com.google.cloud.datalineage.producerclient.CacheSettings; import java.io.IOException; import javax.annotation.Nullable; import org.threeten.bp.Duration; @@ -80,8 +80,15 @@ public SyncLineageProducerClientSettings build() throws IOException { } @Override - public Builder setConnectionCacheSettings(ApiEnablementCacheSettings settings) { - return (Builder) super.setConnectionCacheSettings(settings); + public Builder setApiEnablementCacheSettings(CacheSettings settings) { + return (Builder) super.setApiEnablementCacheSettings(settings); + } + + @Override + public AsyncLineageProducerClientSettings.Builder setLineageEnablementCacheSettings( + CacheSettings settings) { + return (AsyncLineageProducerClientSettings.Builder) + super.setLineageEnablementCacheSettings(settings); } @Override diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheFactoryTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheFactoryTest.java new file mode 100755 index 0000000..d874124 --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/ApiEnablementCacheFactoryTest.java @@ -0,0 +1,49 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import static com.google.common.truth.Truth.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test suite for ApiEnablementCacheFactory. */ +@RunWith(JUnit4.class) +public class ApiEnablementCacheFactoryTest { + + @Test + public void get_returnsSameInstance() { + CacheSettings settings = CacheSettings.getCommonInstance(); + ApiEnablementCache firstInstance = ApiEnablementCacheFactory.get(settings); + ApiEnablementCache secondInstance = ApiEnablementCacheFactory.get(settings); + assertThat(firstInstance).isSameInstanceAs(secondInstance); + } + + @Test + public void getDisabled_returnsNoOpInstance() { + CacheSettings settings = CacheSettings.getDisabledInstance(); + ApiEnablementCache cache = ApiEnablementCacheFactory.get(settings); + assertThat(cache).isInstanceOf(NoOpApiEnablementCache.class); + } + + @Test + public void getStandAlone_returnsNewInstance() { + CacheSettings settings = CacheSettings.getStandAloneInstance(); + ApiEnablementCache firstInstance = ApiEnablementCacheFactory.get(settings); + ApiEnablementCache secondInstance = ApiEnablementCacheFactory.get(settings); + assertThat(firstInstance).isNotSameInstanceAs(secondInstance); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/CacheOptionsTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/CacheOptionsTest.java new file mode 100644 index 0000000..f616d88 --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/CacheOptionsTest.java @@ -0,0 +1,92 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import java.time.Clock; +import java.time.Duration; +import java.time.ZoneId; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test suite for CacheOptions. */ +@RunWith(JUnit4.class) +public class CacheOptionsTest { + + @Test + public void newBuilder_setsDefaultValues() { + CacheOptions options = CacheOptions.newBuilder().build(); + assertThat(options.getCacheSize()).isEqualTo(1000); + assertThat(options.getDefaultCacheDisabledStatusTime()).isEqualTo(Duration.ofMinutes(5)); + assertThat(options.getClock()).isEqualTo(Clock.systemDefaultZone()); + } + + @Test + public void setters_changeValues() { + Clock clock = Clock.systemDefaultZone(); + CacheOptions options = + CacheOptions.newBuilder() + .setCacheSize(500) + .setDefaultCacheDisabledStatusTime(Duration.ofMinutes(5)) + .setClock(clock) + .build(); + + assertThat(options.getCacheSize()).isEqualTo(500); + assertThat(options.getDefaultCacheDisabledStatusTime()).isEqualTo(Duration.ofMinutes(5)); + assertThat(options.getClock()).isEqualTo(clock); + } + + @Test + public void setCacheSize_negative_throwsIllegalArgumentException() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, () -> CacheOptions.newBuilder().setCacheSize(-1)); + assertThat(exception).hasMessageThat().contains("Limit cannot be negative"); + } + + @Test + public void setDefaultCacheDisabledStatusTime_negative_throwsIllegalArgumentException() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + CacheOptions.newBuilder() + .setDefaultCacheDisabledStatusTime(Duration.ofMinutes(-1))); + assertThat(exception).hasMessageThat().contains("Duration cannot be negative"); + } + + @Test + public void toBuilder_preserveOptions() { + Duration disabledTime = Duration.ofSeconds(30); + int size = 500; + Clock clock = Clock.fixed(Clock.systemDefaultZone().instant(), ZoneId.of("UTC")); + + CacheOptions options = + CacheOptions.newBuilder() + .setDefaultCacheDisabledStatusTime(disabledTime) + .setCacheSize(size) + .setClock(clock) + .build(); + + CacheOptions newOptions = options.toBuilder().setCacheSize(1000).build(); + + assertThat(newOptions.getCacheSize()).isEqualTo(1000); + assertThat(newOptions.getClock()).isEqualTo(clock); + assertThat(newOptions.getDefaultCacheDisabledStatusTime()).isEqualTo(disabledTime); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/CacheSettingsTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/CacheSettingsTest.java new file mode 100644 index 0000000..ef89eac --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/CacheSettingsTest.java @@ -0,0 +1,82 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test suite for CacheSettings. */ +@RunWith(JUnit4.class) +public class CacheSettingsTest { + + @Test + public void getDisabledInstance_returnsDisabledSettings() { + CacheSettings settings = CacheSettings.getDisabledInstance(); + + assertThat(settings.getEnabled()).isFalse(); + assertThat(settings.getUseCommonInstance()).isFalse(); + assertThat(settings.getOptions()).isEqualTo(CacheOptions.getDefaultInstance()); + } + + @Test + public void getCommonInstance_returnsEnabledAndCommon() { + CacheSettings settings = CacheSettings.getCommonInstance(); + + assertThat(settings.getEnabled()).isTrue(); + assertThat(settings.getUseCommonInstance()).isTrue(); + assertThat(settings.getOptions()).isEqualTo(CacheOptions.getDefaultInstance()); + } + + @Test + public void getCommonInstanceWithFallback_returnsCorrectSettings() { + CacheOptions fallbackOptions = CacheOptions.newBuilder().setCacheSize(50).build(); + CacheSettings settings = CacheSettings.getCommonInstance(fallbackOptions); + + assertThat(settings.getEnabled()).isTrue(); + assertThat(settings.getUseCommonInstance()).isTrue(); + assertThat(settings.getOptions()).isEqualTo(fallbackOptions); + } + + @Test + public void getCommonInstanceWithFallback_withNullFallback_throwsException() { + assertThrows(IllegalArgumentException.class, () -> CacheSettings.getCommonInstance(null)); + } + + @Test + public void getStandAloneInstance_returnsEnabledAndNotCommon() { + CacheSettings settings = CacheSettings.getStandAloneInstance(); + assertThat(settings.getEnabled()).isTrue(); + assertThat(settings.getUseCommonInstance()).isFalse(); + assertThat(settings.getOptions()).isEqualTo(CacheOptions.getDefaultInstance()); + } + + @Test + public void getStandAloneInstanceWithFallback_returnsEnabledAndNotCommon() { + CacheOptions fallbackOptions = CacheOptions.newBuilder().setCacheSize(50).build(); + CacheSettings settings = CacheSettings.getStandAloneInstance(fallbackOptions); + assertThat(settings.getEnabled()).isTrue(); + assertThat(settings.getUseCommonInstance()).isFalse(); + assertThat(settings.getOptions()).isEqualTo(fallbackOptions); + } + + @Test + public void getStandAloneInstanceWithFallback_withNullFallback_throwsException() { + assertThrows(IllegalArgumentException.class, () -> CacheSettings.getStandAloneInstance(null)); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/LineageEnablementCacheFactoryTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/LineageEnablementCacheFactoryTest.java new file mode 100644 index 0000000..8d99fcc --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/LineageEnablementCacheFactoryTest.java @@ -0,0 +1,49 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import static com.google.common.truth.Truth.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test suite for LineageEnablementCacheFactory. */ +@RunWith(JUnit4.class) +public class LineageEnablementCacheFactoryTest { + + @Test + public void get_returnsSameInstance() { + CacheSettings settings = CacheSettings.getCommonInstance(); + LineageEnablementCache firstInstance = LineageEnablementCacheFactory.get(settings); + LineageEnablementCache secondInstance = LineageEnablementCacheFactory.get(settings); + assertThat(firstInstance).isSameInstanceAs(secondInstance); + } + + @Test + public void getDisabled_returnsNoOpInstance() { + CacheSettings settings = CacheSettings.getDisabledInstance(); + LineageEnablementCache cache = LineageEnablementCacheFactory.get(settings); + assertThat(cache).isInstanceOf(NoOpLineageEnablementCache.class); + } + + @Test + public void getStandAlone_returnsNewInstance() { + CacheSettings settings = CacheSettings.getStandAloneInstance(); + LineageEnablementCache firstInstance = LineageEnablementCacheFactory.get(settings); + LineageEnablementCache secondInstance = LineageEnablementCacheFactory.get(settings); + assertThat(firstInstance).isNotSameInstanceAs(secondInstance); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/NoOpApiEnablementCacheTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/NoOpApiEnablementCacheTest.java new file mode 100755 index 0000000..455b7cf --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/NoOpApiEnablementCacheTest.java @@ -0,0 +1,47 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import static org.junit.Assert.assertFalse; + +import java.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test suite for NoOpApiEnablementCache. */ +@RunWith(JUnit4.class) +public class NoOpApiEnablementCacheTest { + + @Test + public void markServiceAsDisabled_serviceIsEnabled() { + NoOpApiEnablementCache cache = new NoOpApiEnablementCache(); + cache.markServiceAsDisabled("testProject"); + assertFalse(cache.isServiceMarkedAsDisabled("testProject")); + } + + @Test + public void markServiceAsDisabledWithOffset_serviceIsEnabled() { + NoOpApiEnablementCache cache = new NoOpApiEnablementCache(); + cache.markServiceAsDisabled("testProject", Duration.ofMinutes(5)); + assertFalse(cache.isServiceMarkedAsDisabled("testProject")); + } + + @Test + public void isServiceMarkedAsDisabled_returnsDefaultFalse() { + NoOpApiEnablementCache cache = new NoOpApiEnablementCache(); + assertFalse(cache.isServiceMarkedAsDisabled("testProject")); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/NoOpLineageEnablementCacheTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/NoOpLineageEnablementCacheTest.java new file mode 100644 index 0000000..04d1d72 --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/NoOpLineageEnablementCacheTest.java @@ -0,0 +1,49 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import static org.junit.Assert.assertFalse; + +import java.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test suite for NoOpLineageEnablementCache. */ +@RunWith(JUnit4.class) +public class NoOpLineageEnablementCacheTest { + + private static final String PROJECT_ID = "project-id"; + + @Test + public void isServiceMarkedAsDisabled_returnsFalse() { + NoOpLineageEnablementCache cache = new NoOpLineageEnablementCache(); + cache.markLineageAsDisabled(PROJECT_ID); + assertFalse(cache.isLineageMarkedAsDisabled(PROJECT_ID)); + } + + @Test + public void markServiceAsDisabledWithOffset_serviceIsEnabled() { + NoOpLineageEnablementCache cache = new NoOpLineageEnablementCache(); + cache.markLineageAsDisabled("testProject", Duration.ofMinutes(5)); + assertFalse(cache.isLineageMarkedAsDisabled("testProject")); + } + + @Test + public void isServiceMarkedAsDisabled_returnsDefaultFalse() { + NoOpLineageEnablementCache cache = new NoOpLineageEnablementCache(); + assertFalse(cache.isLineageMarkedAsDisabled("testProject")); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/ProjectStatusCacheLoggingTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/ProjectStatusCacheLoggingTest.java new file mode 100644 index 0000000..af09c70 --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/ProjectStatusCacheLoggingTest.java @@ -0,0 +1,164 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import static com.google.common.truth.Truth.assertThat; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; +import com.google.cloud.datalineage.producerclient.test.TestLogAppender; +import java.time.Clock; +import java.time.Duration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.LoggerFactory; + +/** Tests logging functionality in ProjectStatusCache. */ +@RunWith(JUnit4.class) +public class ProjectStatusCacheLoggingTest { + + private static final String CACHE_NAME = "Test Cache"; + private TestLogAppender testAppender; + private Logger logger; + private ProjectStatusCache cache; + + @Before + public void setUp() { + // Set up logging capture + LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); + logger = loggerContext.getLogger(ProjectStatusCache.class); + + testAppender = new TestLogAppender(); + testAppender.setContext(loggerContext); + testAppender.start(); + + logger.addAppender(testAppender); + logger.setLevel(Level.DEBUG); // Enable debug logging for tests + + CacheOptions options = + CacheOptions.newBuilder() + .setCacheSize(100) + .setDefaultCacheDisabledStatusTime(Duration.ofMinutes(5)) + .setClock(Clock.systemDefaultZone()) + .build(); + + cache = new ProjectStatusCache(options, CACHE_NAME); + } + + @After + public void tearDown() { + if (logger != null && testAppender != null) { + logger.detachAppender(testAppender); + } + if (testAppender != null) { + testAppender.stop(); + } + } + + @Test + public void testCacheInitializationLogging() { + // Verify that cache initialization is logged + assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)) + .contains( + "Initializing ProjectStatusCache '" + + CACHE_NAME + + "' with cache size: 100, " + + "default disabled duration: PT5M"); + } + + @Test + public void testMarkServiceAsDisabledLogging() { + testAppender.clear(); // Clear logs from setup + + String projectName = "test-project"; + Duration duration = Duration.ofMinutes(10); + + cache.markProjectAsDisabled(projectName, duration); + + // Verify that marking service as disabled is logged + assertThat(testAppender.getMessagesAtLevel(Level.WARN)) + .contains( + "Marking project 'test-project' as disabled in cache '" + + CACHE_NAME + + "' for duration: PT10M"); + } + + @Test + public void testIsServiceMarkedAsDisabledLogging_NotFound() { + testAppender.clear(); // Clear logs from setup + + String projectName = "non-existent-project"; + + boolean result = cache.isProjectDisabled(projectName); + + assertThat(result).isFalse(); + assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)) + .contains( + "No cache entry found for project 'non-existent-project' in cache '" + + CACHE_NAME + + "'"); + } + + @Test + public void testIsServiceMarkedAsDisabledLogging_Found() { + // First mark the service as disabled + cache.markProjectAsDisabled("test-project", Duration.ofMinutes(5)); + + // Clear logs to focus on the check operation + testAppender.clear(); + + boolean result = cache.isProjectDisabled("test-project"); + + assertThat(result).isTrue(); + boolean found = + testAppender.getMessagesAtLevel(Level.DEBUG).stream() + .anyMatch( + log -> + log.contains( + "Project 'test-project' is marked as disabled in cache '" + + CACHE_NAME + + "' until")); + assertThat(found).isTrue(); + } + + @Test + public void testIsServiceMarkedAsDisabledLogging_Expired() { + // Mark service as disabled for a very short duration + cache.markProjectAsDisabled("test-project", Duration.ofNanos(1)); + + // Wait a bit to ensure expiration + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Clear logs to focus on the check operation + testAppender.clear(); + + boolean result = cache.isProjectDisabled("test-project"); + + assertThat(result).isFalse(); + assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)) + .contains( + "Project disability has expired for project 'test-project' in cache '" + + CACHE_NAME + + "'"); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/ProjectStatusCacheTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/ProjectStatusCacheTest.java new file mode 100644 index 0000000..5451fe2 --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/ProjectStatusCacheTest.java @@ -0,0 +1,141 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import java.time.Clock; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +/** Test suite for ProjectStatusCache. */ +@RunWith(JUnit4.class) +public class ProjectStatusCacheTest { + + private static final String CACHE_NAME = "Test Cache"; + private static final String PROJECT_NAME = "test-project"; + + private static final LocalDateTime BASE_DATE = LocalDateTime.of(1989, 1, 13, 0, 0); + + private ProjectStatusCache cache; + private Clock clock; + + @Before + public void init() { + clock = Mockito.mock(Clock.class); + cache = new ProjectStatusCache(CacheOptions.newBuilder().setClock(clock).build(), CACHE_NAME); + setupTime(BASE_DATE); + } + + @Test + public void isProjectDisabled_withoutMarking_returnsFalse() { + assertThat(cache.isProjectDisabled(PROJECT_NAME)).isFalse(); + } + + @Test + public void isProjectDisabled_markDisabled_withoutMarking_returnsFalse() { + cache.markProjectAsDisabled(PROJECT_NAME, Duration.ZERO); + setupTime(BASE_DATE.plus(Duration.ofMillis(1))); + assertThat(cache.isProjectDisabled(PROJECT_NAME)).isFalse(); + } + + @Test + public void isProjectDisabled_respectsDuration() { + String projectId = "test-project"; + Duration expireDuration = Duration.ofMillis(10); + + cache.markProjectAsDisabled(projectId, expireDuration); + + assertNoStateChangeAtTime(projectId, BASE_DATE); + assertStateChangedAtTime(projectId, BASE_DATE.plus(expireDuration)); + } + + @Test + public void isProjectDisabled_respectsDefaultDuration() { + + String projectId = "test-project"; + + cache.markProjectAsDisabled(projectId); + + assertNoStateChangeAtTime(projectId, BASE_DATE); + assertStateChangedAtTime(projectId, BASE_DATE.plus(Duration.ofMinutes(5))); + } + + @Test + public void isProjectDisabled_onlyLastEntryInCacheIsRespected() { + String projectId = "test-project"; + Duration oldDuration = Duration.ofMillis(10); + + cache.markProjectAsDisabled(projectId, oldDuration); + Duration newDuration = Duration.ofMillis(5); + cache.markProjectAsDisabled(projectId, newDuration); + + assertNoStateChangeAtTime(projectId, BASE_DATE); + assertStateChangedAtTime(projectId, BASE_DATE.plus(newDuration)); + assertNoStateChangeAtTime(projectId, BASE_DATE.plus(oldDuration)); + } + + @Test + public void markProjectAsDisabled_respectsSize() { + ImmutableList projects = ImmutableList.of("project1", "project2", "project3"); + int cacheSize = projects.size() - 1; + cache = + new ProjectStatusCache( + CacheOptions.newBuilder().setCacheSize(cacheSize).build(), CACHE_NAME); + + projects.forEach(p -> cache.markProjectAsDisabled(p)); + + long projectsInCache = projects.stream().filter(p -> cache.isProjectDisabled(p)).count(); + assertThat(projectsInCache).isEqualTo(cacheSize); + } + + /** + * Asserts that there was no change in state for a project before and after a given point in time. + */ + private void assertNoStateChangeAtTime(String projectId, LocalDateTime time) { + setupTime(time); + boolean before = cache.isProjectDisabled(projectId); + + setupTime(time.plus(Duration.ofMillis(1))); + boolean after = cache.isProjectDisabled(projectId); + assertEquals(before, after); + } + + /** Asserts that the state was flipped for a project at a given point in time. */ + private void assertStateChangedAtTime(String projectId, LocalDateTime time) { + setupTime(time); + boolean before = cache.isProjectDisabled(projectId); + + setupTime(time.plus(Duration.ofMillis(1))); + boolean after = cache.isProjectDisabled(projectId); + assertEquals(before, !after); + } + + private void setupTime(LocalDateTime time) { + Clock fixedClock = + Clock.fixed(time.toInstant(OffsetDateTime.now().getOffset()), ZoneId.systemDefault()); + Mockito.doReturn(fixedClock.instant()).when(clock).instant(); + Mockito.doReturn(fixedClock.getZone()).when(clock).getZone(); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/StandardApiEnablementCacheTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/StandardApiEnablementCacheTest.java old mode 100755 new mode 100644 index b7e291f..254b42b --- a/lib/src/test/java/com/google/cloud/datalineage/producerclient/StandardApiEnablementCacheTest.java +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/StandardApiEnablementCacheTest.java @@ -14,136 +14,54 @@ package com.google.cloud.datalineage.producerclient; +import static com.google.common.truth.Truth.assertThat; + import java.time.Clock; import java.time.Duration; -import java.time.LocalDateTime; -import java.time.OffsetDateTime; +import java.time.Instant; import java.time.ZoneId; -import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mockito; -/** * Test suite for StandardApiEnablementCache */ +/** Test suite for StandardApiEnablementCache */ @RunWith(JUnit4.class) public class StandardApiEnablementCacheTest { - private static final LocalDateTime BASE_DATE = LocalDateTime.of(1989, 1, 13, 0, 0); - - private StandardApiEnablementCache standardApiEnablementCache; - private Clock clock; - - @Before - public void init() { - clock = Mockito.mock(Clock.class); - standardApiEnablementCache = - new StandardApiEnablementCache( - ApiEnablementCacheOptions.newBuilder().setClock(clock).build()); - setupTime(BASE_DATE); - } - - @Test - public void baseCase_allowsToMarkAndCheckServiceDisabilityForGivenProject() { - String projectId1 = "[project1]"; - String projectId2 = "[project2]"; - String projectId3 = "[project3]"; - Duration project1DurationExpired = Duration.ofMillis(10); - Duration project2DurationExpired = Duration.ofHours(12); - Duration project3DurationExpired = Duration.ofDays(10); - standardApiEnablementCache.markServiceAsDisabled(projectId1, project1DurationExpired); - standardApiEnablementCache.markServiceAsDisabled(projectId2, project2DurationExpired); - standardApiEnablementCache.markServiceAsDisabled(projectId3, project3DurationExpired); - - assertNoStateChange(projectId1, BASE_DATE, true); - assertNoStateChange(projectId2, BASE_DATE, true); - assertNoStateChange(projectId3, BASE_DATE, true); - - assertServiceEnablingTime(projectId1, BASE_DATE.plus(project1DurationExpired)); - assertNoStateChange(projectId2, BASE_DATE.plus(project1DurationExpired), true); - assertNoStateChange(projectId3, BASE_DATE.plus(project1DurationExpired), true); - assertNoStateChange(projectId1, BASE_DATE.plus(project2DurationExpired), false); - assertServiceEnablingTime(projectId2, BASE_DATE.plus(project2DurationExpired)); - assertNoStateChange(projectId3, BASE_DATE.plus(project2DurationExpired), true); - - assertNoStateChange(projectId1, BASE_DATE.plus(project3DurationExpired), false); - assertNoStateChange(projectId2, BASE_DATE.plus(project3DurationExpired), false); - assertServiceEnablingTime(projectId3, BASE_DATE.plus(project3DurationExpired)); - } + private static final String PROJECT_ID = "test-project"; @Test - public void defaultsTimeOfServerDisabilityTo5Minutes() { - String projectId = "[project]"; - standardApiEnablementCache.markServiceAsDisabled(projectId); - - assertServiceEnablingTime(projectId, BASE_DATE.plus(Duration.ofMinutes(5))); + public void isServiceMarkedAsDisabled_afterMarking_returnsTrue() { + StandardApiEnablementCache cache = + new StandardApiEnablementCache(CacheOptions.newBuilder().build()); + cache.markServiceAsDisabled(PROJECT_ID); + assertThat(cache.isServiceMarkedAsDisabled(PROJECT_ID)).isTrue(); } @Test - public void storesOnlyLastSetLockTime() { - String projectId = "[project]"; - Duration longerDurationExpired = Duration.ofMinutes(10); - Duration shorterDurationExpired = Duration.ofMinutes(3); - - setupTime(BASE_DATE); - standardApiEnablementCache.markServiceAsDisabled(projectId, longerDurationExpired); - assertServiceEnablingTime(projectId, BASE_DATE.plus(longerDurationExpired)); - assertNoStateChange(projectId, BASE_DATE.plus(shorterDurationExpired), true); - - setupTime(BASE_DATE); - standardApiEnablementCache.markServiceAsDisabled(projectId, shorterDurationExpired); - assertNoStateChange(projectId, BASE_DATE.plus(longerDurationExpired), false); - assertServiceEnablingTime(projectId, BASE_DATE.plus(shorterDurationExpired)); + public void isServiceMarkedAsDisabled_withoutMarking_returnsFalse() { + StandardApiEnablementCache cache = + new StandardApiEnablementCache(CacheOptions.newBuilder().build()); + assertThat(cache.isServiceMarkedAsDisabled(PROJECT_ID)).isFalse(); } @Test - public void cachesLimitedNumberOfProjectsAndAllowsToSetALimit() { - int limit = 5; - ApiEnablementCacheOptions options = - ApiEnablementCacheOptions.newBuilder() - .setDefaultCacheDisabledStatusTime(Duration.ofMinutes(5)) + public void isServiceMarkedAsDisabled_afterDurationPasses_returnsFalse() { + Clock clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + Duration duration = Duration.ofMinutes(5); + CacheOptions options = + CacheOptions.newBuilder() + .setDefaultCacheDisabledStatusTime(duration) .setClock(clock) - .setCacheSize(limit) .build(); - standardApiEnablementCache = new StandardApiEnablementCache(options); - - for (int i = 0; i < limit + 1; i++) { - standardApiEnablementCache.markServiceAsDisabled("[project" + i + "]"); - } - - int presentProjects = 0; - for (int i = 0; i < limit + 1; i++) { - presentProjects += - standardApiEnablementCache.isServiceMarkedAsDisabled("[project" + i + "]") ? 1 : 0; - } - Assert.assertTrue(presentProjects <= limit); - } - - private void setupTime(LocalDateTime time) { - Clock fixedClock = - Clock.fixed(time.toInstant(OffsetDateTime.now().getOffset()), ZoneId.systemDefault()); - Mockito.doReturn(fixedClock.instant()).when(clock).instant(); - Mockito.doReturn(fixedClock.getZone()).when(clock).getZone(); - } + StandardApiEnablementCache cache = new StandardApiEnablementCache(options); - private void assertServiceEnablingTime(String projectId, LocalDateTime changeTime) { - setupTime(changeTime); - boolean result = standardApiEnablementCache.isServiceMarkedAsDisabled(projectId); - Assert.assertTrue(result); - - setupTime(changeTime.plus(Duration.ofMillis(1))); - result = standardApiEnablementCache.isServiceMarkedAsDisabled(projectId); - Assert.assertFalse(result); - } + cache.markServiceAsDisabled(PROJECT_ID); - private void assertNoStateChange(String projectId, LocalDateTime changeTime, boolean state) { - setupTime(changeTime); - boolean result = standardApiEnablementCache.isServiceMarkedAsDisabled(projectId); - Assert.assertEquals(result, state); + // Advance the clock + options = options.toBuilder().setClock(Clock.offset(clock, duration)).build(); + cache = new StandardApiEnablementCache(options); - setupTime(changeTime.plus(Duration.ofMillis(1))); - result = standardApiEnablementCache.isServiceMarkedAsDisabled(projectId); - Assert.assertEquals(result, state); + assertThat(cache.isServiceMarkedAsDisabled(PROJECT_ID)).isFalse(); } } diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/StandardLineageEnablementCacheTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/StandardLineageEnablementCacheTest.java new file mode 100644 index 0000000..6382c91 --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/StandardLineageEnablementCacheTest.java @@ -0,0 +1,63 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient; + +import static com.google.common.truth.Truth.assertThat; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import org.junit.Test; + +/** Test suite for StandardLineageEnablementCache. */ +public class StandardLineageEnablementCacheTest { + private static final String PROJECT_ID = "project-id"; + + @Test + public void isLineageMarkedAsDisabled_afterMarking_returnsTrue() { + StandardLineageEnablementCache cache = + new StandardLineageEnablementCache(CacheOptions.newBuilder().build()); + cache.markLineageAsDisabled(PROJECT_ID); + assertThat(cache.isLineageMarkedAsDisabled(PROJECT_ID)).isTrue(); + } + + @Test + public void isLineageMarkedAsDisabled_withoutMarking_returnsFalse() { + StandardLineageEnablementCache cache = + new StandardLineageEnablementCache(CacheOptions.newBuilder().build()); + assertThat(cache.isLineageMarkedAsDisabled(PROJECT_ID)).isFalse(); + } + + @Test + public void isLineageMarkedAsDisabled_afterDurationPasses_returnsFalse() { + Clock clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + Duration duration = Duration.ofMinutes(5); + CacheOptions options = + CacheOptions.newBuilder() + .setDefaultCacheDisabledStatusTime(duration) + .setClock(clock) + .build(); + StandardLineageEnablementCache cache = new StandardLineageEnablementCache(options); + + cache.markLineageAsDisabled(PROJECT_ID); + + // Advance the clock + options = options.toBuilder().setClock(Clock.offset(clock, duration)).build(); + cache = new StandardLineageEnablementCache(options); + + assertThat(cache.isLineageMarkedAsDisabled(PROJECT_ID)).isFalse(); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/helpers/GrpcHelperLoggingTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/helpers/GrpcHelperLoggingTest.java new file mode 100644 index 0000000..3d597d6 --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/helpers/GrpcHelperLoggingTest.java @@ -0,0 +1,141 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient.helpers; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; +import com.google.cloud.datalineage.producerclient.test.TestLogAppender; +import com.google.common.collect.ImmutableSet; +import com.google.protobuf.Any; +import com.google.rpc.ErrorInfo; +import com.google.rpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.protobuf.StatusProto; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.LoggerFactory; + +/** Tests logging functionality in GrpcHelper. */ +@RunWith(JUnit4.class) +public class GrpcHelperLoggingTest { + + private TestLogAppender testAppender; + private Logger logger; + + @Before + public void setUp() { + // Set up logging capture + LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); + logger = loggerContext.getLogger(GrpcHelper.class); + + testAppender = new TestLogAppender(); + testAppender.setContext(loggerContext); + testAppender.start(); + + logger.addAppender(testAppender); + logger.setLevel(Level.DEBUG); // Enable debug logging for tests + } + + @After + public void tearDown() { + if (logger != null && testAppender != null) { + logger.detachAppender(testAppender); + } + if (testAppender != null) { + testAppender.stop(); + } + } + + @Test + public void testGetReasons_validErrorInfo_logsSuccess() { + testAppender.clear(); // Clear any existing logs + + // Create an ErrorInfo with a reason + ErrorInfo errorInfo = + ErrorInfo.newBuilder().setReason("API_DISABLED").setDomain("googleapis.com").build(); + + // Create a Status with the ErrorInfo + Status status = + Status.newBuilder() + .setCode(com.google.rpc.Code.FAILED_PRECONDITION_VALUE) + .setMessage("API is disabled") + .addDetails(Any.pack(errorInfo)) + .build(); + + // Create a gRPC exception from the status + StatusRuntimeException exception = StatusProto.toStatusRuntimeException(status); + + // Call the method + ImmutableSet reason = GrpcHelper.getErrorReasons(exception); + + // Verify the result + assertThat(reason).containsExactly("API_DISABLED"); + + // Verify debug logging + assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)) + .contains("Successfully extracted reason from ErrorInfo: API_DISABLED"); + } + + @Test + public void testGetReasons_invalidProtocolBuffer_logsWarnAndReturnsNull() { + testAppender.clear(); // Clear any existing logs + + // Create a malformed Any that can't be unpacked + Any invalidAny = + Any.newBuilder() + .setTypeUrl("type.googleapis.com/google.rpc.ErrorInfo") + .setValue(com.google.protobuf.ByteString.copyFromUtf8("invalid-data")) + .build(); + + Status status = + Status.newBuilder() + .setCode(com.google.rpc.Code.FAILED_PRECONDITION_VALUE) + .setMessage("API is disabled") + .addDetails(invalidAny) + .build(); + + StatusRuntimeException exception = StatusProto.toStatusRuntimeException(status); + + IllegalArgumentException thrown = + assertThrows(IllegalArgumentException.class, () -> GrpcHelper.getErrorReasons(exception)); + + assertThat(thrown).hasMessageThat().contains("Invalid protocol buffer message"); + assertThat(testAppender.getMessagesAtLevel(Level.ERROR)) + .contains("Invalid protocol buffer message while extracting ErrorInfo"); + } + + @Test + public void testGetReasons_nonGrpcException_returnsNull() { + testAppender.clear(); // Clear any existing logs + + // Create a non-gRPC exception + RuntimeException nonGrpcException = new RuntimeException("Not a gRPC exception"); + + // Call the method + assertThrows( + IllegalArgumentException.class, () -> GrpcHelper.getErrorReasons(nonGrpcException)); + + // Verify debug logging + assertThat(testAppender.getMessagesAtLevel(Level.ERROR)) + .contains("Provided throwable is not a gRPC exception: java.lang.RuntimeException"); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/helpers/GrpcHelperTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/helpers/GrpcHelperTest.java new file mode 100755 index 0000000..ee47421 --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/helpers/GrpcHelperTest.java @@ -0,0 +1,73 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient.helpers; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.protobuf.Any; +import com.google.rpc.ErrorInfo; +import com.google.rpc.Status; +import io.grpc.protobuf.StatusProto; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test suite for GrpcHelper */ +@RunWith(JUnit4.class) +public class GrpcHelperTest { + + @Test + public void getReasons_withNonGrpcException_throws() { + Throwable nonGrpcException = new IllegalArgumentException("This is not a gRPC exception"); + + IllegalArgumentException thrown = + assertThrows( + IllegalArgumentException.class, () -> GrpcHelper.getErrorReasons(nonGrpcException)); + assertThat(thrown).hasMessageThat().contains("Provided throwable is not a gRPC exception"); + } + + @Test + public void getReasons_withGrpcExceptionWithoutErrorInfo_returnsEmptySet() { + Throwable grpcException = StatusProto.toStatusRuntimeException(Status.newBuilder().build()); + + assertThat(GrpcHelper.getErrorReasons(grpcException)).isEmpty(); + } + + @Test + public void getReasons_withGrpcExceptionWithErrorInfo_returnsReason() { + String expectedReason = "test-reason"; + ErrorInfo errorInfo = ErrorInfo.newBuilder().setReason(expectedReason).build(); + Throwable grpcException = + StatusProto.toStatusRuntimeException( + Status.newBuilder().addDetails(Any.pack(errorInfo)).build()); + + assertThat(GrpcHelper.getErrorReasons(grpcException)).containsExactly(expectedReason); + } + + @Test + public void getReasons_withGrpcExceptionWithMultipleErrorInfo_returnsReasons() { + ErrorInfo errorInfoOne = ErrorInfo.newBuilder().setReason("reason1").build(); + ErrorInfo errorInfoTwo = ErrorInfo.newBuilder().setReason("reason2").build(); + Throwable grpcException = + StatusProto.toStatusRuntimeException( + Status.newBuilder() + .addDetails(Any.pack(errorInfoOne)) + .addDetails(Any.pack(errorInfoTwo)) + .build()); + + assertThat(GrpcHelper.getErrorReasons(grpcException)).containsExactly("reason1", "reason2"); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/test/TestLogAppender.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/test/TestLogAppender.java new file mode 100644 index 0000000..e3ebba6 --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/test/TestLogAppender.java @@ -0,0 +1,56 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient.test; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.AppenderBase; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** Test appender for capturing log messages during tests. */ +public class TestLogAppender extends AppenderBase { + private final List events = Collections.synchronizedList(new ArrayList<>()); + + @Override + protected void append(ILoggingEvent event) { + events.add(event); + } + + public List getEvents() { + return new ArrayList<>(events); + } + + public List getMessages() { + return events.stream() + .map(ILoggingEvent::getFormattedMessage) + .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); + } + + public List getMessagesAtLevel(ch.qos.logback.classic.Level level) { + return events.stream() + .filter(event -> event.getLevel().equals(level)) + .map(ILoggingEvent::getFormattedMessage) + .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); + } + + public void clear() { + events.clear(); + } + + public int size() { + return events.size(); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientLoggingTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientLoggingTest.java new file mode 100644 index 0000000..a121590 --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientLoggingTest.java @@ -0,0 +1,229 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.cloud.datalineage.producerclient.v1; + +import static com.google.common.truth.Truth.assertThat; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; +import com.google.cloud.datacatalog.lineage.v1.DeleteLineageEventRequest; +import com.google.cloud.datacatalog.lineage.v1.GetLineageEventRequest; +import com.google.cloud.datacatalog.lineage.v1.ListProcessesRequest; +import com.google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEventRequest; +import com.google.cloud.datalineage.producerclient.test.TestLogAppender; +import java.io.IOException; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.slf4j.LoggerFactory; +import org.threeten.bp.Duration; + +/** Tests logging functionality in AsyncLineageProducerClient. */ +@RunWith(JUnit4.class) +public class AsyncLineageProducerClientLoggingTest { + private TestLogAppender testAppender; + private Logger logger; + private final BasicLineageClient basicLineageClient = Mockito.mock(BasicLineageClient.class); + private AsyncLineageProducerClient client; + + @Before + public void setUp() throws IOException { + // Set up logging capture + LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); + logger = loggerContext.getLogger(AsyncLineageProducerClient.class); + + testAppender = new TestLogAppender(); + testAppender.setContext(loggerContext); + testAppender.start(); + + logger.addAppender(testAppender); + logger.setLevel(Level.DEBUG); // Enable debug logging for tests + client = AsyncLineageProducerClient.create(basicLineageClient); + testAppender.clear(); // Clear logs from setup + } + + @After + public void tearDown() { + if (client != null) { + client.shutdownNow(); + } + if (logger != null && testAppender != null) { + logger.detachAppender(testAppender); + } + if (testAppender != null) { + testAppender.stop(); + } + } + + @Test + public void testDeleteLineageEventLogging() { + DeleteLineageEventRequest request = + DeleteLineageEventRequest.newBuilder() + .setName( + "projects/test-project/locations/us-central1/processes/test-process/runs/test-run/lineageEvents/test-event") + .build(); + try { + client.deleteLineageEvent(request); + } catch (Exception e) { + // Expected to fail in test environment, we're just testing logging + } + // Verify the debug log was created + assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)) + .contains( + "Deleting lineage event: projects/test-project/locations/us-central1/processes/test-process/runs/test-run/lineageEvents/test-event"); + } + + @Test + public void testGetLineageEventLogging() { + GetLineageEventRequest request = + GetLineageEventRequest.newBuilder() + .setName( + "projects/test-project/locations/us-central1/processes/test-process/runs/test-run/lineageEvents/test-event") + .build(); + try { + client.getLineageEvent(request); + } catch (Exception e) { + // Expected to fail in test environment, we're just testing logging + } + // Verify the debug log was created + assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)) + .contains( + "Getting lineage event: projects/test-project/locations/us-central1/" + + "processes/test-process/runs/test-run/lineageEvents/test-event"); + } + + @Test + public void testProcessOpenLineageRunEventLogging() { + ProcessOpenLineageRunEventRequest request = + ProcessOpenLineageRunEventRequest.newBuilder() + .setParent("projects/test-project/locations/us-central1") + .build(); + try { + client.processOpenLineageRunEvent(request); + } catch (Exception e) { + // Expected to fail in test environment, we're just testing logging + } + // Verify the debug log was created + assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)) + .contains("Processing OpenLineage run event: "); + } + + @Test + public void testListProcessesBasicLogging() { + ListProcessesRequest request = + ListProcessesRequest.newBuilder() + .setParent("projects/test-project/locations/us-central1") + .build(); + try { + client.listProcesses(request); + } catch (Exception e) { + // Expected to fail in test environment, we're just testing logging + } + // Verify the debug log was created + assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)) + .contains("Listing processes for parent: " + "projects/test-project/locations/us-central1"); + } + + @Test + public void testListProcessesWithParametersLogging() { + ListProcessesRequest request = + ListProcessesRequest.newBuilder() + .setParent("projects/test-project/locations/us-central1") + .setPageSize(25) + .setPageToken("advanced-page-token") + .build(); + try { + client.listProcesses(request); + } catch (Exception e) { + // Expected to fail in test environment, we're just testing logging + } + // Verify the debug log includes parent (parameters are not logged individually) + assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)) + .contains("Listing processes for parent: projects/test-project/locations/us-central1"); + } + + @Test + public void testDefaultGracefulShutdownLogging() throws Exception { + // Test graceful shutdown logging + client.close(); + // Verify shutdown logging + assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)) + .contains("Starting graceful shutdown with duration: PT30S"); + } + + @Test + public void testHardShutdownLogging() throws Exception { + // Create a client with zero timeout to trigger the warning + AsyncLineageProducerClientSettings zeroTimeoutSettings = + AsyncLineageProducerClientSettings.newBuilder() + .setGracefulShutdownDuration(Duration.ZERO) + .build(); + AsyncLineageProducerClient hardShutdownClient = + AsyncLineageProducerClient.create(basicLineageClient, zeroTimeoutSettings); + + hardShutdownClient.close(); + + // Verify hard-shutdown message is printed + assertThat(testAppender.getMessagesAtLevel(Level.WARN)) + .contains( + "AsyncLineageProducerClient graceful shutdown duration was set to zero. " + + "This effectively means hard shutdown with potential data loss"); + } + + @Test + public void testShutdownTimeoutLogging() throws Exception { + // Create a client with very short timeout to trigger timeout warning + AsyncLineageProducerClientSettings shortTimeoutSettings = + AsyncLineageProducerClientSettings.newBuilder() + .setGracefulShutdownDuration(Duration.ofMillis(1)) // Very short timeout + .build(); + AsyncLineageProducerClient shortTimeoutClient = + AsyncLineageProducerClient.create(basicLineageClient, shortTimeoutSettings); + + // Start some background operations to create work that needs shutdown + // This creates background threads and operations that need time to shut down + for (int i = 0; i < 10; i++) { + try { + GetLineageEventRequest request = + GetLineageEventRequest.newBuilder() + .setName( + "projects/test-project/locations/us-central1/processes/test-process/runs/test-run/lineageEvents/test-event-" + + i) + .build(); + shortTimeoutClient.getLineageEvent(request); + } catch (Exception e) { + // Ignore exceptions, we just want to create background work + } + } + + try { + shortTimeoutClient.close(); + } catch (Exception e) { + // Ignore any exceptions, we're testing logging + } + + // Verify timeout warning was logged + List warnings = testAppender.getMessagesAtLevel(Level.WARN); + assertThat(warnings).hasSize(1); + assertThat(warnings.get(0)) + .contains( + "AsyncLineageProducerClient did not terminate within the graceful shutdown duration"); + shortTimeoutClient.shutdownNow(); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientSettingsTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientSettingsTest.java new file mode 100644 index 0000000..92326f1 --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientSettingsTest.java @@ -0,0 +1,214 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient.v1; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import com.google.api.core.ApiClock; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.rpc.HeaderProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.api.gax.rpc.WatchdogProvider; +import org.junit.Test; +import org.mockito.Mockito; +import org.threeten.bp.Duration; + +/** * Test suite for AsyncLineageProducerClientSettingsTest */ +public class AsyncLineageProducerClientSettingsTest { + + @Test + public void newBuilder_notNull() { + AsyncLineageProducerClientSettings.Builder builder = + AsyncLineageProducerClientSettings.newBuilder(); + assertNotNull(builder); + } + + @Test + public void defaultInstance_notNull() throws Exception { + AsyncLineageProducerClientSettings settings = + AsyncLineageProducerClientSettings.defaultInstance(); + assertNotNull(settings); + } + + @Test + public void builderWithSettings_initializesCorrectSettings() throws Exception { + AsyncLineageProducerClientSettings initialSettings = + AsyncLineageProducerClientSettings.defaultInstance(); + + AsyncLineageProducerClientSettings.Builder builder = + new AsyncLineageProducerClientSettings.Builder(initialSettings); + AsyncLineageProducerClientSettings newSettings = builder.build(); + + assertNotNull(builder); + assertEquals(initialSettings.getEndpoint(), newSettings.getEndpoint()); + assertEquals(initialSettings.getClock(), newSettings.getClock()); + assertEquals( + initialSettings.getBackgroundExecutorProvider(), + newSettings.getBackgroundExecutorProvider()); + assertEquals(initialSettings.getCredentialsProvider(), newSettings.getCredentialsProvider()); + assertEquals(initialSettings.getHeaderProvider(), newSettings.getHeaderProvider()); + assertEquals(initialSettings.getQuotaProjectId(), newSettings.getQuotaProjectId()); + assertEquals( + initialSettings.getTransportChannelProvider(), newSettings.getTransportChannelProvider()); + assertEquals(initialSettings.getWatchdogProvider(), newSettings.getWatchdogProvider()); + assertEquals( + initialSettings.getWatchdogCheckInterval(), newSettings.getWatchdogCheckInterval()); + } + + @Test + public void builder_setEndpoint() { + AsyncLineageProducerClientSettings.Builder builder = + AsyncLineageProducerClientSettings.newBuilder(); + String testEndpoint = "test-endpoint"; + + AsyncLineageProducerClientSettings.Builder resultBuilder = builder.setEndpoint(testEndpoint); + + assertNotNull(resultBuilder); + assertEquals(testEndpoint, resultBuilder.getEndpoint()); + } + + @Test + public void builder_setClock() throws Exception { + AsyncLineageProducerClientSettings.Builder builder = + AsyncLineageProducerClientSettings.newBuilder(); + ApiClock mockClock = Mockito.mock(ApiClock.class); + + AsyncLineageProducerClientSettings.Builder resultBuilder = builder.setClock(mockClock); + + assertNotNull(resultBuilder); + assertEquals(mockClock, resultBuilder.build().getClock()); + } + + @Test + @Deprecated + public void builder_setExecutorProvider() { + AsyncLineageProducerClientSettings.Builder builder = + AsyncLineageProducerClientSettings.newBuilder(); + ExecutorProvider mockExecutorProvider = Mockito.mock(ExecutorProvider.class); + + AsyncLineageProducerClientSettings.Builder resultBuilder = + builder.setExecutorProvider(mockExecutorProvider); + + assertNotNull(resultBuilder); + assertEquals(mockExecutorProvider, resultBuilder.getExecutorProvider()); + } + + @Test + public void builder_setWatchdogCheckInterval() { + AsyncLineageProducerClientSettings.Builder builder = + AsyncLineageProducerClientSettings.newBuilder(); + Duration checkInterval = Duration.ofSeconds(30); + + AsyncLineageProducerClientSettings.Builder resultBuilder = + builder.setWatchdogCheckInterval(checkInterval); + + assertNotNull(resultBuilder); + assertEquals(checkInterval, resultBuilder.getWatchdogCheckInterval()); + } + + @Test + public void builder_setWatchdogProvider() { + AsyncLineageProducerClientSettings.Builder builder = + AsyncLineageProducerClientSettings.newBuilder(); + WatchdogProvider mockWatchdogProvider = Mockito.mock(WatchdogProvider.class); + + AsyncLineageProducerClientSettings.Builder resultBuilder = + builder.setWatchdogProvider(mockWatchdogProvider); + + assertNotNull(resultBuilder); + assertEquals(mockWatchdogProvider, resultBuilder.getWatchdogProvider()); + } + + @Test + public void builder_setTransportChannelProvider() { + AsyncLineageProducerClientSettings.Builder builder = + AsyncLineageProducerClientSettings.newBuilder(); + TransportChannelProvider mockTransportChannelProvider = + Mockito.mock(TransportChannelProvider.class); + + AsyncLineageProducerClientSettings.Builder resultBuilder = + builder.setTransportChannelProvider(mockTransportChannelProvider); + + assertNotNull(resultBuilder); + assertEquals(mockTransportChannelProvider, resultBuilder.getTransportChannelProvider()); + } + + @Test + public void builder_setQuotaProjectId() { + AsyncLineageProducerClientSettings.Builder builder = + AsyncLineageProducerClientSettings.newBuilder(); + String quotaProjectId = "test-quota-project-id"; + + AsyncLineageProducerClientSettings.Builder resultBuilder = + builder.setQuotaProjectId(quotaProjectId); + + assertNotNull(resultBuilder); + assertEquals(quotaProjectId, resultBuilder.getQuotaProjectId()); + } + + @Test + public void builder_setHeaderProvider() { + AsyncLineageProducerClientSettings.Builder builder = + AsyncLineageProducerClientSettings.newBuilder(); + HeaderProvider mockHeaderProvider = Mockito.mock(HeaderProvider.class); + + AsyncLineageProducerClientSettings.Builder resultBuilder = + builder.setHeaderProvider(mockHeaderProvider); + + assertNotNull(resultBuilder); + assertEquals(mockHeaderProvider, resultBuilder.getHeaderProvider()); + } + + @Test + public void builder_setCredentialsProvider() { + AsyncLineageProducerClientSettings.Builder builder = + AsyncLineageProducerClientSettings.newBuilder(); + CredentialsProvider mockCredentialsProvider = Mockito.mock(CredentialsProvider.class); + + AsyncLineageProducerClientSettings.Builder resultBuilder = + builder.setCredentialsProvider(mockCredentialsProvider); + + assertNotNull(resultBuilder); + assertEquals(mockCredentialsProvider, resultBuilder.getCredentialsProvider()); + } + + @Test + public void builder_setBackgroundExecutorProvider() { + AsyncLineageProducerClientSettings.Builder builder = + AsyncLineageProducerClientSettings.newBuilder(); + ExecutorProvider mockExecutorProvider = Mockito.mock(ExecutorProvider.class); + + AsyncLineageProducerClientSettings.Builder resultBuilder = + builder.setBackgroundExecutorProvider(mockExecutorProvider); + + assertNotNull(resultBuilder); + assertEquals(mockExecutorProvider, resultBuilder.getBackgroundExecutorProvider()); + } + + @Test + public void builder_setGracefulShutdownDuration() { + AsyncLineageProducerClientSettings.Builder builder = + AsyncLineageProducerClientSettings.newBuilder(); + Duration gracefulShutdownDuration = Duration.ofSeconds(1); // 1 second + + AsyncLineageProducerClientSettings.Builder resultBuilder = + builder.setGracefulShutdownDuration(gracefulShutdownDuration); + + assertNotNull(resultBuilder); + assertEquals(gracefulShutdownDuration, resultBuilder.getGracefulShutdownDuration()); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientTest.java index 6d165e3..83a0eba 100644 --- a/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientTest.java +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/AsyncLineageProducerClientTest.java @@ -14,10 +14,15 @@ package com.google.cloud.datalineage.producerclient.v1; +import static com.google.cloud.datalineage.producerclient.v1.AsyncLineageProducerClientSettings.DEFAULT_GRACEFUL_SHUTDOWN_DURATION; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.threeten.bp.Duration.ofSeconds; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; @@ -32,12 +37,17 @@ import com.google.rpc.Code; import com.google.rpc.ErrorInfo; import com.google.rpc.Status; +import io.grpc.StatusException; import io.grpc.protobuf.StatusProto; import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.threeten.bp.Duration; /** Tests for AsyncLineageProducerClient. */ public class AsyncLineageProducerClientTest { @@ -98,6 +108,45 @@ public void apiDisabled_doesNotRetry() { .contains("Data Lineage API is disabled in project")); } + @Test + public void lineageDisabled_doesNotRetry() { + returnLineageDisabledFromMocker(); + ProcessOpenLineageRunEventRequest request = + createProcessOpenLineageRunEventRequest( + "projects/test-lineage-disabled-async/locations/test"); + + // For the first call, throw a PERMISSION_DENIED exception + assertThrows(ExecutionException.class, () -> client.processOpenLineageRunEvent(request).get()); + // Not attempt the second call + ApiException exception = + assertThrows(ApiException.class, () -> client.processOpenLineageRunEvent(request)); + assertThat(exception.getMessage()) + .contains("Lineage is not enabled in Lineage Configurations for project"); + } + + @Test + public void gracefulShutdown_awaitsTerminationByDefault() throws Exception { + // objects passed to lambda must be final or effectively final, so we use arrays to store the + // values + long[] resultAwaitTerminationTime = new long[1]; + AsyncLineageProducerClient asyncLineageProducerClient = + AsyncLineageProducerClient.create(basicLineageClient); + doAnswer( + invocation -> { + resultAwaitTerminationTime[0] = invocation.getArgument(0); + return true; + }) + .when(basicLineageClient) + .awaitTermination(anyLong(), any(TimeUnit.class)); + + asyncLineageProducerClient.close(); + // Verify that the awaitTermination was called with the expected values + // we cannot get the resultAwaitTerminationTime exactly, so we check reasonable scope + assertThat(Duration.ofNanos(resultAwaitTerminationTime[0])).isAtLeast(Duration.ZERO); + assertThat(Duration.ofNanos(resultAwaitTerminationTime[0])) + .isAtMost(DEFAULT_GRACEFUL_SHUTDOWN_DURATION.plus(ofSeconds(1))); + } + private static Struct someOpenLineage() { return Struct.newBuilder().build(); } @@ -125,16 +174,39 @@ private void returnServiceDisabledFromMocker() { @Override public ApiFuture futureCall( ProcessOpenLineageRunEventRequest request, ApiCallContext context) { - Status.Builder statusBuilder = com.google.rpc.Status.newBuilder(); - statusBuilder.setCode(Code.PERMISSION_DENIED.getNumber()); - ErrorInfo.Builder errorInfoBuilder = - ErrorInfo.newBuilder().setReason("SERVICE_DISABLED"); - statusBuilder.addDetails(Any.pack(errorInfoBuilder.build())); + return ApiFutures.immediateFailedFuture( + createStatusExceptionWithReasons("SERVICE_DISABLED", "SOME_OTHER_REASON")); + } + }); + } + + /** + * Configure the BasicLineageClient mocker to return an exception indicating Lineage is not + * enabled. + */ + private void returnLineageDisabledFromMocker() { + when(basicLineageClient.processOpenLineageRunEventCallable()) + .thenReturn( + new UnaryCallable<>() { + @Override + public ApiFuture futureCall( + ProcessOpenLineageRunEventRequest request, ApiCallContext context) { return ApiFutures.immediateFailedFuture( - StatusProto.toStatusException(statusBuilder.build())); + createStatusExceptionWithReasons( + "LINEAGE_INGESTION_DISABLED", "SOME_OTHER_REASON")); } }); } + + private StatusException createStatusExceptionWithReasons(String... reasons) { + Status.Builder statusBuilder = com.google.rpc.Status.newBuilder(); + statusBuilder.setCode(Code.PERMISSION_DENIED.getNumber()); + statusBuilder.addAllDetails( + Arrays.stream(reasons) + .map(r -> Any.pack(ErrorInfo.newBuilder().setReason(r).build())) + .collect(Collectors.toList())); + return StatusProto.toStatusException(statusBuilder.build()); + } } diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/LineageBaseSettingsTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/LineageBaseSettingsTest.java new file mode 100755 index 0000000..c90621d --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/LineageBaseSettingsTest.java @@ -0,0 +1,85 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient.v1; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.datalineage.producerclient.CacheSettings; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** * Test suite for LineageBaseSettings */ +@RunWith(JUnit4.class) +public class LineageBaseSettingsTest { + + @Test + public void toBuilder_returnsBuilderWithProvidedSettings() throws Exception { + LineageBaseSettings baseSettings = LineageBaseSettings.defaultInstance(); + + LineageBaseSettings.Builder builder = baseSettings.toBuilder(); + + assertSettingsAreEqual(baseSettings, builder.build()); + } + + @Test + public void builder_initializesDefaultSettings() throws Exception { + LineageBaseSettings.Builder builder = LineageBaseSettings.newBuilder(); + + assertSettingsAreEqual(LineageBaseSettings.defaultInstance(), builder.build()); + } + + @Test + public void setApiEnablementCacheSettings_updatesSettings() throws Exception { + LineageBaseSettings.Builder builder = LineageBaseSettings.newBuilder(); + CacheSettings newCacheSettings = CacheSettings.getDisabledInstance(); + + LineageBaseSettings.Builder returnedBuilder = + builder.setApiEnablementCacheSettings(newCacheSettings); + LineageBaseSettings settings = returnedBuilder.build(); + + assertEquals(newCacheSettings, settings.getApiEnablementCacheSettings()); + } + + @Test + public void setLineageEnablementCacheSettings_updatesSettings() throws Exception { + LineageBaseSettings.Builder builder = LineageBaseSettings.newBuilder(); + CacheSettings newCacheSettings = CacheSettings.getDisabledInstance(); + + LineageBaseSettings.Builder returnedBuilder = + builder.setLineageEnablementCacheSettings(newCacheSettings); + LineageBaseSettings settings = returnedBuilder.build(); + + assertEquals(newCacheSettings, settings.getLineageEnablementCacheSettings()); + } + + private void assertSettingsAreEqual(LineageBaseSettings expected, LineageBaseSettings actual) { + assertEquals( + expected.getApiEnablementCacheSettings().getEnabled(), + actual.getApiEnablementCacheSettings().getEnabled()); + assertEquals( + expected.getApiEnablementCacheSettings().getUseCommonInstance(), + actual.getApiEnablementCacheSettings().getUseCommonInstance()); + assertEquals( + expected.getApiEnablementCacheSettings().getOptions().getDefaultCacheDisabledStatusTime(), + actual.getApiEnablementCacheSettings().getOptions().getDefaultCacheDisabledStatusTime()); + assertEquals( + expected.getApiEnablementCacheSettings().getOptions().getClock(), + actual.getApiEnablementCacheSettings().getOptions().getClock()); + assertEquals( + expected.getApiEnablementCacheSettings().getOptions().getCacheSize(), + actual.getApiEnablementCacheSettings().getOptions().getCacheSize()); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClientSettingsTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClientSettingsTest.java new file mode 100644 index 0000000..8f59b10 --- /dev/null +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClientSettingsTest.java @@ -0,0 +1,201 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.datalineage.producerclient.v1; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import com.google.api.core.ApiClock; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.rpc.HeaderProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.api.gax.rpc.WatchdogProvider; +import org.junit.Test; +import org.mockito.Mockito; +import org.threeten.bp.Duration; + +/** * Test suite for SyncLineageProducerClientSettingsTest */ +public class SyncLineageProducerClientSettingsTest { + + @Test + public void newBuilder_notNull() { + SyncLineageProducerClientSettings.Builder builder = + SyncLineageProducerClientSettings.newBuilder(); + assertNotNull(builder); + } + + @Test + public void defaultInstance_notNull() throws Exception { + SyncLineageProducerClientSettings settings = + SyncLineageProducerClientSettings.defaultInstance(); + assertNotNull(settings); + } + + @Test + public void builderWithSettings_initializesCorrectSettings() throws Exception { + SyncLineageProducerClientSettings initialSettings = + SyncLineageProducerClientSettings.defaultInstance(); + + SyncLineageProducerClientSettings.Builder builder = + new SyncLineageProducerClientSettings.Builder(initialSettings); + SyncLineageProducerClientSettings newSettings = builder.build(); + + assertNotNull(builder); + assertEquals(initialSettings.getEndpoint(), newSettings.getEndpoint()); + assertEquals(initialSettings.getClock(), newSettings.getClock()); + assertEquals( + initialSettings.getBackgroundExecutorProvider(), + newSettings.getBackgroundExecutorProvider()); + assertEquals(initialSettings.getCredentialsProvider(), newSettings.getCredentialsProvider()); + assertEquals(initialSettings.getHeaderProvider(), newSettings.getHeaderProvider()); + assertEquals(initialSettings.getQuotaProjectId(), newSettings.getQuotaProjectId()); + assertEquals( + initialSettings.getTransportChannelProvider(), newSettings.getTransportChannelProvider()); + assertEquals(initialSettings.getWatchdogProvider(), newSettings.getWatchdogProvider()); + assertEquals( + initialSettings.getWatchdogCheckInterval(), newSettings.getWatchdogCheckInterval()); + } + + @Test + public void builder_setEndpoint() { + SyncLineageProducerClientSettings.Builder builder = + SyncLineageProducerClientSettings.newBuilder(); + String testEndpoint = "test-endpoint"; + + SyncLineageProducerClientSettings.Builder resultBuilder = builder.setEndpoint(testEndpoint); + + assertNotNull(resultBuilder); + assertEquals(testEndpoint, resultBuilder.getEndpoint()); + } + + @Test + public void builder_setClock() throws Exception { + SyncLineageProducerClientSettings.Builder builder = + SyncLineageProducerClientSettings.newBuilder(); + ApiClock mockClock = Mockito.mock(ApiClock.class); + + SyncLineageProducerClientSettings.Builder resultBuilder = builder.setClock(mockClock); + + assertNotNull(resultBuilder); + assertEquals(mockClock, resultBuilder.build().getClock()); + } + + @Test + @Deprecated + public void builder_setExecutorProvider() { + SyncLineageProducerClientSettings.Builder builder = + SyncLineageProducerClientSettings.newBuilder(); + ExecutorProvider mockExecutorProvider = Mockito.mock(ExecutorProvider.class); + + SyncLineageProducerClientSettings.Builder resultBuilder = + builder.setExecutorProvider(mockExecutorProvider); + + assertNotNull(resultBuilder); + assertEquals(mockExecutorProvider, resultBuilder.getExecutorProvider()); + } + + @Test + public void builder_setWatchdogCheckInterval() { + SyncLineageProducerClientSettings.Builder builder = + SyncLineageProducerClientSettings.newBuilder(); + Duration checkInterval = Duration.ofSeconds(30); + + SyncLineageProducerClientSettings.Builder resultBuilder = + builder.setWatchdogCheckInterval(checkInterval); + + assertNotNull(resultBuilder); + assertEquals(checkInterval, resultBuilder.getWatchdogCheckInterval()); + } + + @Test + public void builder_setWatchdogProvider() { + SyncLineageProducerClientSettings.Builder builder = + SyncLineageProducerClientSettings.newBuilder(); + WatchdogProvider mockWatchdogProvider = Mockito.mock(WatchdogProvider.class); + + SyncLineageProducerClientSettings.Builder resultBuilder = + builder.setWatchdogProvider(mockWatchdogProvider); + + assertNotNull(resultBuilder); + assertEquals(mockWatchdogProvider, resultBuilder.getWatchdogProvider()); + } + + @Test + public void builder_setTransportChannelProvider() { + SyncLineageProducerClientSettings.Builder builder = + SyncLineageProducerClientSettings.newBuilder(); + TransportChannelProvider mockTransportChannelProvider = + Mockito.mock(TransportChannelProvider.class); + + SyncLineageProducerClientSettings.Builder resultBuilder = + builder.setTransportChannelProvider(mockTransportChannelProvider); + + assertNotNull(resultBuilder); + assertEquals(mockTransportChannelProvider, resultBuilder.getTransportChannelProvider()); + } + + @Test + public void builder_setQuotaProjectId() { + SyncLineageProducerClientSettings.Builder builder = + SyncLineageProducerClientSettings.newBuilder(); + String quotaProjectId = "test-quota-project-id"; + + SyncLineageProducerClientSettings.Builder resultBuilder = + builder.setQuotaProjectId(quotaProjectId); + + assertNotNull(resultBuilder); + assertEquals(quotaProjectId, resultBuilder.getQuotaProjectId()); + } + + @Test + public void builder_setHeaderProvider() { + SyncLineageProducerClientSettings.Builder builder = + SyncLineageProducerClientSettings.newBuilder(); + HeaderProvider mockHeaderProvider = Mockito.mock(HeaderProvider.class); + + SyncLineageProducerClientSettings.Builder resultBuilder = + builder.setHeaderProvider(mockHeaderProvider); + + assertNotNull(resultBuilder); + assertEquals(mockHeaderProvider, resultBuilder.getHeaderProvider()); + } + + @Test + public void builder_setCredentialsProvider() { + SyncLineageProducerClientSettings.Builder builder = + SyncLineageProducerClientSettings.newBuilder(); + CredentialsProvider mockCredentialsProvider = Mockito.mock(CredentialsProvider.class); + + SyncLineageProducerClientSettings.Builder resultBuilder = + builder.setCredentialsProvider(mockCredentialsProvider); + + assertNotNull(resultBuilder); + assertEquals(mockCredentialsProvider, resultBuilder.getCredentialsProvider()); + } + + @Test + public void builder_setBackgroundExecutorProvider() { + SyncLineageProducerClientSettings.Builder builder = + SyncLineageProducerClientSettings.newBuilder(); + ExecutorProvider mockExecutorProvider = Mockito.mock(ExecutorProvider.class); + + SyncLineageProducerClientSettings.Builder resultBuilder = + builder.setBackgroundExecutorProvider(mockExecutorProvider); + + assertNotNull(resultBuilder); + assertEquals(mockExecutorProvider, resultBuilder.getBackgroundExecutorProvider()); + } +} diff --git a/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClientTest.java b/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClientTest.java index 82e1caf..b382594 100644 --- a/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClientTest.java +++ b/lib/src/test/java/com/google/cloud/datalineage/producerclient/v1/SyncLineageProducerClientTest.java @@ -33,8 +33,11 @@ import com.google.rpc.Code; import com.google.rpc.ErrorInfo; import com.google.rpc.Status; +import io.grpc.StatusException; import io.grpc.protobuf.StatusProto; import java.io.IOException; +import java.util.Arrays; +import java.util.stream.Collectors; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -96,6 +99,23 @@ public void apiDisabled_doesNotRetry() { .contains("Data Lineage API is disabled in project")); } + @Test + public void lineageDisabled_doesNotRetry() { + returnLineageDisabledFromMocker(); + ProcessOpenLineageRunEventRequest request = + createProcessOpenLineageRunEventRequest( + "projects/test-lineage-disabled-sync/locations/test"); + + // For the first call, throw a PERMISSION_DENIED exception + assertThrows( + UncheckedExecutionException.class, () -> client.processOpenLineageRunEvent(request)); + // Not attempt the second call + assertThat( + assertThrows(ApiException.class, () -> client.processOpenLineageRunEvent(request)) + .getMessage() + .contains("Lineage is not enabled in Lineage Configurations for project")); + } + private static Struct someOpenLineage() { return Struct.newBuilder().build(); } @@ -123,16 +143,39 @@ private void returnServiceDisabledFromMocker() { @Override public ApiFuture futureCall( ProcessOpenLineageRunEventRequest request, ApiCallContext context) { - Status.Builder statusBuilder = com.google.rpc.Status.newBuilder(); - statusBuilder.setCode(Code.PERMISSION_DENIED.getNumber()); - ErrorInfo.Builder errorInfoBuilder = - ErrorInfo.newBuilder().setReason("SERVICE_DISABLED"); - statusBuilder.addDetails(Any.pack(errorInfoBuilder.build())); + return ApiFutures.immediateFailedFuture( + createStatusExceptionWithReasons("SERVICE_DISABLED", "SOME_OTHER_REASON")); + } + }); + } + + /** + * Configure the BasicLineageClient mocker to return an exception indicating Lineage is not + * enabled. + */ + private void returnLineageDisabledFromMocker() { + when(basicLineageClient.processOpenLineageRunEventCallable()) + .thenReturn( + new UnaryCallable<>() { + @Override + public ApiFuture futureCall( + ProcessOpenLineageRunEventRequest request, ApiCallContext context) { return ApiFutures.immediateFailedFuture( - StatusProto.toStatusException(statusBuilder.build())); + createStatusExceptionWithReasons( + "LINEAGE_INGESTION_DISABLED", "SOME_OTHER_REASON")); } }); } + + private StatusException createStatusExceptionWithReasons(String... reasons) { + Status.Builder statusBuilder = com.google.rpc.Status.newBuilder(); + statusBuilder.setCode(Code.PERMISSION_DENIED.getNumber()); + statusBuilder.addAllDetails( + Arrays.stream(reasons) + .map(r -> Any.pack(ErrorInfo.newBuilder().setReason(r).build())) + .collect(Collectors.toList())); + return StatusProto.toStatusException(statusBuilder.build()); + } } diff --git a/lib/src/test/resources/logback-test.xml b/lib/src/test/resources/logback-test.xml new file mode 100644 index 0000000..61ae174 --- /dev/null +++ b/lib/src/test/resources/logback-test.xml @@ -0,0 +1,15 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + +