Skip to content

Commit 4792d41

Browse files
use error reason instead of error message and fix caches
1 parent a8d2fa8 commit 4792d41

23 files changed

+523
-422
lines changed

lib/build.gradle

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ plugins {
2323

2424
// v"2.2.5" is recommended, but it uses Java21 which is incompatible with other
2525
// plugins (e.g. "com.github.johnrengelman.shadow"), therefore, using latest compatible version
26-
id("com.google.cloud.artifactregistry.gradle-plugin") version "2.1.5"
26+
id("com.google.cloud.artifactregistry.gradle-plugin") version "2.1.5"
2727
}
2828

2929
googleJavaFormat {
@@ -62,10 +62,10 @@ dependencies {
6262
implementation "com.google.api:gax-httpjson:${gaxHttpJsonVersion}"
6363
implementation "io.grpc:grpc-protobuf:${grpcProtobufVersion}"
6464
implementation "com.google.guava:guava:${guavaVersion}"
65-
65+
6666
// SLF4J logging
6767
implementation "org.slf4j:slf4j-api:${slf4jVersion}"
68-
68+
6969
// Lombok for @Slf4j annotation
7070
compileOnly "org.projectlombok:lombok:${lombokVersion}"
7171
annotationProcessor "org.projectlombok:lombok:${lombokVersion}"
@@ -74,21 +74,17 @@ dependencies {
7474
compileOnly "com.google.auto.value:auto-value-annotations:1.10.1"
7575
annotationProcessor "com.google.auto.value:auto-value:1.10.1"
7676

77-
// AutoValue
78-
compileOnly "com.google.auto.value:auto-value-annotations:1.10.1"
79-
annotationProcessor "com.google.auto.value:auto-value:1.10.1"
80-
8177
// Use JUnit test framework.
8278
testImplementation "junit:junit:${junitVersion}"
8379
testImplementation "com.google.truth:truth:${truthVersion}"
8480
testImplementation "org.mockito:mockito-core:${mockitoVersion}"
8581
testImplementation "com.google.api:gax:${gaxTestLibVersion}:testlib"
8682
testImplementation "com.google.testparameterinjector:test-parameter-injector:${parameterInjectorVersion}"
87-
83+
8884
// Test logging dependencies
8985
testImplementation "ch.qos.logback:logback-classic:${logbackVersion}"
9086
testImplementation "ch.qos.logback:logback-core:${logbackVersion}"
91-
87+
9288
// Test Lombok
9389
testCompileOnly "org.projectlombok:lombok:${lombokVersion}"
9490
testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}"

lib/src/main/java/com/google/cloud/datalineage/producerclient/CacheOptions.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
5-
// You may not obtain a copy of the License at
5+
// You may obtain a copy of the License at
66
//
77
// https://www.apache.org/licenses/LICENSE-2.0
88
//
@@ -18,13 +18,14 @@
1818
import java.time.Duration;
1919

2020
/**
21-
* Provides an immutable object for Cache initialization. CacheOptions object
22-
* can be created via Builder.
21+
* Provides an immutable object for Cache initialization. CacheOptions object can be created via
22+
* Builder.
2323
*/
2424
public class CacheOptions {
25-
protected static final Duration DEFAULT_DISABLED_TIME = Duration.ofMinutes(10);
25+
26+
protected static final Duration DEFAULT_DISABLED_TIME = Duration.ofMinutes(5);
2627
protected static final int DEFAULT_SIZE = 1000;
27-
protected static final Clock DEFAULT_CLOCK = Clock.systemUTC();
28+
protected static final Clock DEFAULT_CLOCK = Clock.systemDefaultZone();
2829

2930
private final Duration defaultCacheDisabledStatusTime;
3031
private final int cacheSize;
@@ -60,13 +61,23 @@ public CacheOptions.Builder toBuilder() {
6061
return new CacheOptions.Builder(this);
6162
}
6263

64+
65+
@Override
66+
public boolean equals(Object options) {
67+
return options instanceof CacheOptions that
68+
&& that.getDefaultCacheDisabledStatusTime() == this.defaultCacheDisabledStatusTime &&
69+
that.getCacheSize() == this.cacheSize &&
70+
that.clock == this.clock;
71+
}
72+
6373
/**
6474
* * Builder for CacheSettings.
6575
*
6676
* <p>Lets setting `markServiceAsDisabledTime`, `cacheSize`, and `clock`. Can be created by
6777
* CacheOptions.newBuilder method. To create settings object, use build method.
6878
*/
6979
public static class Builder {
80+
7081
protected Duration defaultCacheDisabledStatusTime;
7182
protected int cacheSize;
7283
protected Clock clock;

lib/src/main/java/com/google/cloud/datalineage/producerclient/LineageEnablementCache.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,35 @@
1616

1717
import java.time.Duration;
1818

19-
/** Cache used to store information about whether the API is disabled for a given project. */
19+
/**
20+
* Cache used to store information about whether Lineage is disabled for a given project in
21+
* LineageConfigurations.
22+
*/
2023
public interface LineageEnablementCache {
2124

2225
/**
23-
* Mark service state as disabled for a given project name.
26+
* Mark lineage ingestion state as disabled for a given project name.
2427
*
25-
* @param projectName The project for which to disable the service
28+
* @param projectName The project for which to disable the lineage ingestion
2629
* @see LineageEnablementCache#markLineageAsDisabled(String, Duration)
2730
*/
2831
void markLineageAsDisabled(String projectName);
2932

3033
/**
31-
* Mark service state as disabled for a given project name and duration.
34+
* Mark lineage ingestion state as disabled for a given project name and duration.
3235
*
33-
* @param projectName The project for which to disable the service
36+
* @param projectName The project for which to disable the lineage ingestion
3437
* @param duration - suggests how long the project should be marked as disabled. It is not
35-
* guarantied that cache will indicate service state as disabled for given time. Behaviour
36-
* depends on implementation.
38+
* guarantied that cache will indicate lineage ingestion enablement state as disabled for
39+
* given time. Behaviour depends on implementation.
3740
*/
3841
void markLineageAsDisabled(String projectName, Duration duration);
3942

4043
/**
41-
* Indicates if service with provided projectName is marked as disabled.
44+
* Indicates if lineage ingestion with provided projectName is marked as disabled.
4245
*
43-
* @param projectName The project for which to disable the service
44-
* @return `true` is the service is marked as disabled
46+
* @param projectName The project for which to disable the lineage ingestion
47+
* @return `true` if the lineage ingestion is marked as disabled
4548
*/
4649
boolean isLineageMarkedAsDisabled(String projectName);
4750
}

lib/src/main/java/com/google/cloud/datalineage/producerclient/LineageEnablementCacheFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
115
package com.google.cloud.datalineage.producerclient;
216

317
/** A factory that returns ConnectionCache based on LineageEnablementCacheSettings. */

lib/src/main/java/com/google/cloud/datalineage/producerclient/NoOpLineageEnablementCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import java.time.Duration;
1818

19-
/** No-op implementation of the ApiEnablementCache that does nothing. */
19+
/** No-op implementation of the LineageEnablementCache that does nothing. */
2020
public class NoOpLineageEnablementCache implements LineageEnablementCache {
2121

2222
@Override

lib/src/main/java/com/google/cloud/datalineage/producerclient/ProjectStatusCache.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@
2424
/**
2525
* Generic cache to indicate whether a feature is disabled for a given project.
2626
*
27-
* <p>This class is thread-safe.
27+
* <p> This class is thread-safe. There is no eviction guaranteed in case of cache overload.
2828
*/
2929
@Slf4j
3030
public class ProjectStatusCache {
31+
3132
private final Cache<String, LocalDateTime> projectToLockEndTime;
3233
private final Duration defaultCacheDisabledStatusTime;
3334
private final Clock clock;
@@ -41,7 +42,8 @@ public ProjectStatusCache(CacheOptions options, String cacheName) {
4142
options.getDefaultCacheDisabledStatusTime());
4243
this.defaultCacheDisabledStatusTime = options.getDefaultCacheDisabledStatusTime();
4344
this.clock = options.getClock();
44-
this.projectToLockEndTime = CacheBuilder.newBuilder().maximumSize(options.getCacheSize()).build();
45+
this.projectToLockEndTime = CacheBuilder.newBuilder().maximumSize(options.getCacheSize())
46+
.build();
4547
this.cacheName = cacheName;
4648
}
4749

lib/src/main/java/com/google/cloud/datalineage/producerclient/StandardLineageEnablementCache.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
import java.time.Duration;
1818

1919
/**
20-
* Cache used to indicate whether Lineage is disabled for a given project.
20+
* Cache used to indicate whether Lineage Ingestion is disabled for a given project.
2121
*
2222
* <p>This class is a wrapper around a generic ProjectStatusCache.
2323
*/
2424
public class StandardLineageEnablementCache implements LineageEnablementCache {
25+
2526
private final ProjectStatusCache delegate;
2627

2728
StandardLineageEnablementCache(CacheOptions options) {

lib/src/main/java/com/google/cloud/datalineage/producerclient/helpers/GrpcHelper.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
import com.google.api.gax.rpc.StatusCode;
1818
import com.google.api.gax.rpc.StatusCode.Code;
19+
import com.google.common.collect.ImmutableList;
20+
import com.google.common.collect.ImmutableSet;
1921
import com.google.protobuf.Any;
2022
import com.google.protobuf.InvalidProtocolBufferException;
2123
import com.google.rpc.ErrorInfo;
2224
import com.google.rpc.Status;
2325
import io.grpc.protobuf.StatusProto;
26+
import java.util.stream.Collectors;
2427
import lombok.extern.slf4j.Slf4j;
2528

2629
/**
@@ -36,18 +39,18 @@ private GrpcHelper() {
3639
}
3740

3841
/**
39-
* Gets reason field from grpc response. ets reason field from grpc response.
42+
* Returns a set of error reasons from <code>com.google.rpc.Status</code> of a gRPC Exception.
4043
*
4144
* @param grpcException - error returned form grpc call
42-
* @return string with value from reason field and null if it is unable to extract reason
45+
* @return a set of strings with reasons extracted from Status if any
4346
*/
44-
public static String getReason(Throwable grpcException) {
45-
log.debug("Extracting reason from gRPC exception: {}", grpcException.getMessage());
47+
public static ImmutableSet<String> getErrorReasons(Throwable grpcException) {
48+
log.debug("Extracting reasons from gRPC exception: {}", grpcException.getMessage());
4649
Status statusProto = StatusProto.fromThrowable(grpcException);
4750
if (statusProto == null) {
48-
log.debug(
51+
log.error(
4952
"Provided throwable is not a gRPC exception: {}", grpcException.getClass().getName());
50-
return null;
53+
throw new IllegalArgumentException("Provided throwable is not a gRPC exception");
5154
}
5255
/* Status is a standard way to represent API error.
5356
* This model consists of code, message and details.
@@ -57,20 +60,18 @@ public static String getReason(Throwable grpcException) {
5760
* - user can introduce new ones.
5861
* That's why in order to get ErrorInfo, we need to iterate over details and check all of them.
5962
*/
60-
for (Any any : statusProto.getDetailsList()) {
61-
if (any.is(ErrorInfo.class)) {
62-
try {
63-
String reason = any.unpack(ErrorInfo.class).getReason();
64-
log.debug("Successfully extracted reason from ErrorInfo: {}", reason);
65-
return reason;
66-
} catch (InvalidProtocolBufferException exception) {
67-
log.warn("Invalid protocol buffer message while extracting ErrorInfo", exception);
68-
return null;
69-
}
70-
}
71-
}
72-
log.debug("Message does not contain ErrorInfo for exception: {}", grpcException.getMessage());
73-
return null;
63+
return statusProto.getDetailsList().stream().filter((detail) -> detail.is(ErrorInfo.class))
64+
.map((errorInfo) -> {
65+
try {
66+
String reason = errorInfo.unpack(
67+
ErrorInfo.class).getReason();
68+
log.debug("Successfully extracted reason from ErrorInfo: {}", reason);
69+
return reason;
70+
} catch (InvalidProtocolBufferException e) {
71+
log.error("Invalid protocol buffer message while extracting ErrorInfo", e);
72+
throw new IllegalArgumentException("Invalid protocol buffer message", e);
73+
}
74+
}).collect(ImmutableSet.toImmutableSet());
7475
}
7576

7677
/**

0 commit comments

Comments
 (0)