Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 3 additions & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
41 changes: 28 additions & 13 deletions gradlew

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 9 additions & 6 deletions gradlew.bat

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEventRequest;
import com.google.cloud.datacatalog.lineage.v1.ProcessOpenLineageRunEventResponse;
import com.google.cloud.datacatalog.lineage.v1.Run;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Empty;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
Expand All @@ -52,7 +53,7 @@
public final class AsyncLineageProducerClient implements BackgroundResource, AsyncLineageClient {

public static AsyncLineageProducerClient create() throws IOException {
return create(AsyncLineageProducerClientSettings.newBuilder().build());
return create(AsyncLineageProducerClientSettings.defaultInstance());
}

public static AsyncLineageProducerClient create(AsyncLineageProducerClientSettings settings)
Expand All @@ -63,13 +64,15 @@ public static AsyncLineageProducerClient create(AsyncLineageProducerClientSettin
return new AsyncLineageProducerClient(settings);
}

@VisibleForTesting
static AsyncLineageProducerClient create(BasicLineageClient basicClient) throws IOException {
return new AsyncLineageProducerClient(basicClient, Duration.ZERO);
return create(basicClient, AsyncLineageProducerClientSettings.defaultInstance());
}

@VisibleForTesting
static AsyncLineageProducerClient create(
BasicLineageClient basicClient, Duration gracefulShutdownDuration) throws IOException {
return new AsyncLineageProducerClient(basicClient, gracefulShutdownDuration);
BasicLineageClient basicClient, AsyncLineageProducerClientSettings settings) throws IOException {
return new AsyncLineageProducerClient(basicClient, settings);
}

private final InternalClient client;
Expand All @@ -81,10 +84,9 @@ private AsyncLineageProducerClient(AsyncLineageProducerClientSettings settings)
this.gracefulShutdownDuration = settings.getGracefulShutdownDuration();
}

private AsyncLineageProducerClient(
BasicLineageClient basicClient, Duration gracefulShutdownDuration) throws IOException {
private AsyncLineageProducerClient(BasicLineageClient basicClient, AsyncLineageProducerClientSettings settings) throws IOException {
client = InternalClient.create(basicClient);
this.gracefulShutdownDuration = gracefulShutdownDuration;
this.gracefulShutdownDuration = settings.getGracefulShutdownDuration();
}

@Override
Expand Down Expand Up @@ -187,19 +189,23 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
return client.awaitTermination(duration, unit);
}

private void gracefulShutdown(Instant start) throws InterruptedException {
if (!gracefulShutdownDuration.isZero()) {
log.debug("Starting graceful shutdown with duration: {}", gracefulShutdownDuration);
boolean terminated =
awaitTermination(
gracefulShutdownDuration.minus(Duration.between(start, Instant.now())).toNanos(),
TimeUnit.NANOSECONDS);
if (!terminated) {
log.warn(
"AsyncLineageProducerClient did not terminate within the "
+ "graceful shutdown duration: {}",
gracefulShutdownDuration);
}
private void gracefulShutdown(Instant shutdownStartedAt) throws InterruptedException {
if (gracefulShutdownDuration.isZero()) {
log.warn(
"AsyncLineageProducerClient graceful shutdown duration was set to zero. This effectively means hard shutdown");
return;
}
log.debug("Starting graceful shutdown with duration: {}", gracefulShutdownDuration);
boolean terminated =
awaitTermination(
gracefulShutdownDuration.minus(Duration.between(shutdownStartedAt, Instant.now()))
.toNanos(),
TimeUnit.NANOSECONDS);
if (!terminated) {
log.warn(
"AsyncLineageProducerClient did not terminate within the "
+ "graceful shutdown duration: {}",
gracefulShutdownDuration);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
* via Builder.
*/
public final class AsyncLineageProducerClientSettings extends LineageBaseSettings {

public static final Duration DEFAULT_GRACEFUL_SHUTDOWN_DURATION = Duration.ofSeconds(30);
private final Duration gracefulShutdownDuration;

public static Builder newBuilder() {
Expand Down Expand Up @@ -63,7 +65,7 @@ public Duration getGracefulShutdownDuration() {
* method.
*/
public static final class Builder extends LineageBaseSettings.Builder {
private Duration gracefulShutdownDuration = Duration.ZERO;
private Duration gracefulShutdownDuration = DEFAULT_GRACEFUL_SHUTDOWN_DURATION;

private static Builder createDefault() {
return new Builder(LineageStubSettings.newBuilder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,9 @@ public void tearDown() {
@Test
public void testCacheInitializationLogging() {
// Verify that cache initialization is logged
boolean found =
testAppender.getMessagesAtLevel(Level.DEBUG).stream()
.anyMatch(
log ->
log.contains("Initializing StandardApiEnablementCache with cache size: 100")
&& log.contains("default disabled duration: PT5M"));
assertThat(found).isTrue();
assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)).contains(
"Initializing StandardApiEnablementCache with cache size: 100, "
+ "default disabled duration: PT5M");
}

@Test
Expand All @@ -92,14 +88,9 @@ public void testMarkServiceAsDisabledLogging() {
cache.markServiceAsDisabled(projectName, duration);

// Verify that marking service as disabled is logged
boolean found =
testAppender.getMessagesAtLevel(Level.WARN).stream()
.anyMatch(
log ->
log.contains(
"Marking service as disabled for project 'test-project'"
+ " for duration: PT10M"));
assertThat(found).isTrue();
assertThat(testAppender.getMessagesAtLevel(Level.WARN)).contains(
"Marking service as disabled for project 'test-project'"
+ " for duration: PT10M");
}

@Test
Expand All @@ -111,11 +102,8 @@ public void testIsServiceMarkedAsDisabledLogging_NotFound() {
boolean result = cache.isServiceMarkedAsDisabled(projectName);

assertThat(result).isFalse();
boolean found =
testAppender.getMessagesAtLevel(Level.DEBUG).stream()
.anyMatch(
log -> log.contains("No cache entry found for project: non-existent-project"));
assertThat(found).isTrue();
assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)).contains(
"No cache entry found for project: non-existent-project");
}

@Test
Expand Down Expand Up @@ -155,10 +143,7 @@ public void testIsServiceMarkedAsDisabledLogging_Expired() {
boolean result = cache.isServiceMarkedAsDisabled("test-project");

assertThat(result).isFalse();
boolean found =
testAppender.getMessagesAtLevel(Level.DEBUG).stream()
.anyMatch(
log -> log.contains("Service disability has expired for project: test-project"));
assertThat(found).isTrue();
assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)).contains(
"Service disability has expired for project: test-project");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,8 @@ public void testGetReason_ValidErrorInfo_LogsSuccess() {
assertThat(reason).isEqualTo("API_DISABLED");

// Verify debug logging
boolean found =
testAppender.getMessagesAtLevel(Level.DEBUG).stream()
.anyMatch(
log -> log.contains("Successfully extracted reason from ErrorInfo: API_DISABLED"));
assertThat(found).isTrue();
assertThat(testAppender.getMessagesAtLevel(Level.DEBUG)).contains(
"Successfully extracted reason from ErrorInfo: API_DISABLED");
}

@Test
Expand Down Expand Up @@ -124,11 +121,8 @@ public void testGetReason_InvalidProtocolBuffer_LogsError() {
assertThat(thrown.getMessage()).contains("Invalid protocol buffer message");

// Verify error logging
boolean found =
testAppender.getMessagesAtLevel(Level.ERROR).stream()
.anyMatch(
log -> log.contains("Invalid protocol buffer message while extracting ErrorInfo"));
assertThat(found).isTrue();
assertThat(testAppender.getMessagesAtLevel(Level.ERROR)).contains(
"Invalid protocol buffer message while extracting ErrorInfo");
}

@Test
Expand All @@ -151,14 +145,9 @@ public void testGetReason_NoErrorInfo_LogsWarning() {
assertThat(thrown.getMessage()).contains("Message does not contain ErrorInfo");

// Verify warning logging
boolean found =
testAppender.getMessagesAtLevel(Level.WARN).stream()
.anyMatch(
log ->
log.contains(
"Message does not contain ErrorInfo for exception:"
+ " FAILED_PRECONDITION: API is disabled"));
assertThat(found).isTrue();
assertThat(testAppender.getMessagesAtLevel(Level.WARN)).contains(
"Message does not contain ErrorInfo for exception:"
+ " FAILED_PRECONDITION: API is disabled");
}

@Test
Expand Down
Loading
Loading