Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private static Result finishOrTimeout(
LOG.warn("Error happened when checking for condition", e);
}

LOG.info("Condition was not met yet. Checking if job is finished.");
LOG.debug("Condition was not met yet. Checking if job is finished.");
if (launchFinished) {
LOG.info("Launch was finished, stop checking.");
return Result.LAUNCH_FINISHED;
Expand All @@ -212,11 +212,15 @@ private static Result finishOrTimeout(
LOG.info("Detected that launch was finished, checking conditions once more.");
launchFinished = true;
} else {
LOG.info(
"Job not finished and conditions not met. Will check again in {} seconds (total wait: {}s of max {}s)",
config.checkAfter().getSeconds(),
Duration.between(start, Instant.now()).getSeconds(),
config.timeoutAfter().getSeconds());
long checkSec = config.checkAfter().getSeconds();
long waitSec = Duration.between(start, Instant.now()).getSeconds();
if (checkSec > 0 && (waitSec / checkSec) % 5 == 0) { // reduce log spam
LOG.info(
"Job not finished and conditions not met. Will check again in {} seconds (total wait: {}s of max {}s)",
checkSec,
waitSec,
config.timeoutAfter().getSeconds());
}
}
try {
Thread.sleep(config.checkAfter().toMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import static org.apache.beam.it.neo4j.Neo4jResourceManagerUtils.generateDatabaseName;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.testcontainers.TestContainerResourceManager;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -37,14 +39,6 @@
import org.testcontainers.containers.Neo4jContainer;
import org.testcontainers.utility.DockerImageName;

/**
Copy link
Collaborator

@derrickaw derrickaw Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why delete or can we update? Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops this was a copy paste error. Added back.

* Client for managing Neo4j resources.
*
* <p>The database name is formed using testId. The database name will be "{testId}-{ISO8601 time,
* microsecond precision}", with additional formatting.
*
* <p>The class is thread-safe.
*/
public class Neo4jResourceManager extends TestContainerResourceManager<Neo4jContainer<?>>
implements ResourceManager {

Expand All @@ -60,7 +54,8 @@ public class Neo4jResourceManager extends TestContainerResourceManager<Neo4jCont
private static final int NEO4J_BOLT_PORT = 7687;

private final Driver neo4jDriver;
private final String databaseName;
private final @Nullable String databaseName;
private final List<String> newDataBases = new ArrayList<>();
private final DatabaseWaitOption waitOption;
private final String connectionString;
private final boolean usingStaticDatabase;
Expand Down Expand Up @@ -95,9 +90,8 @@ private Neo4jResourceManager(Builder builder) {
this.databaseName = builder.databaseName;
this.waitOption = null;
} else {
this.databaseName = generateDatabaseName(builder.testId);
this.databaseName = null;
this.waitOption = builder.waitOption;
createDatabase(databaseName, waitOption);
}
}

Expand All @@ -110,11 +104,12 @@ public synchronized String getUri() {
return connectionString;
}

public List<Map<String, Object>> run(String query) {
return this.run(query, Collections.emptyMap());
public List<Map<String, Object>> run(String query, String databaseName) {
return this.run(query, databaseName, Collections.emptyMap());
}

public List<Map<String, Object>> run(String query, Map<String, Object> parameters) {
public List<Map<String, Object>> run(
String query, String databaseName, Map<String, Object> parameters) {
try (Session session =
neo4jDriver.session(SessionConfig.builder().withDatabase(databaseName).build())) {
return session.run(query, parameters).list(record -> record.asMap());
Expand All @@ -128,7 +123,7 @@ public List<Map<String, Object>> run(String query, Map<String, Object> parameter
*
* @return the name of the Neo4j Database.
*/
public synchronized String getDatabaseName() {
public synchronized @Nullable String getDatabaseName() {
return databaseName;
}

Expand All @@ -140,11 +135,11 @@ public synchronized void cleanupAll() {

// First, delete the database if it was not given as a static argument
try {
if (!usingStaticDatabase) {
dropDatabase(databaseName, waitOption);
if (!newDataBases.isEmpty()) {
dropTestDatabases(waitOption);
}
} catch (Exception e) {
LOG.error("Failed to delete Neo4j database {}.", databaseName, e);
LOG.error("Failed to delete Neo4j databases {}.", newDataBases, e);
producedError = true;
}

Expand All @@ -167,28 +162,34 @@ public synchronized void cleanupAll() {
LOG.info("Neo4j manager successfully cleaned up.");
}

private void createDatabase(String databaseName, DatabaseWaitOption waitOption) {
public String createTestDatabase() {
String newDatabaseName =
generateDatabaseName("test" + UUID.randomUUID().toString().substring(0, 4));
try (Session session =
neo4jDriver.session(SessionConfig.builder().withDatabase("system").build())) {
String query =
String.format("CREATE DATABASE $db %s", DatabaseWaitOptions.asCypher(waitOption));
session.run(query, Collections.singletonMap("db", databaseName)).consume();
session.run(query, Collections.singletonMap("db", newDatabaseName)).consume();
} catch (Exception e) {
throw new Neo4jResourceManagerException(
String.format("Error dropping database %s.", databaseName), e);
String.format("Error dropping database %s.", newDatabaseName), e);
}
newDataBases.add(newDatabaseName);
return newDatabaseName;
}

@VisibleForTesting
void dropDatabase(String databaseName, DatabaseWaitOption waitOption) {
void dropTestDatabases(DatabaseWaitOption waitOption) {
try (Session session =
neo4jDriver.session(SessionConfig.builder().withDatabase("system").build())) {
String query =
String.format("DROP DATABASE $db %s", DatabaseWaitOptions.asCypher(waitOption));
session.run(query, Collections.singletonMap("db", databaseName)).consume();
for (String databaseName : newDataBases) {
session.run(query, Collections.singletonMap("db", databaseName)).consume();
}
} catch (Exception e) {
throw new Neo4jResourceManagerException(
String.format("Error dropping database %s.", databaseName), e);
String.format("Error dropping database %s.", newDataBases), e);
}
}

Expand Down Expand Up @@ -222,7 +223,7 @@ private Builder(String testId) {
* <p>Note: if a database name is set, and a static Neo4j server is being used
* (useStaticContainer() is also called on the builder), then a database will be created on the
* static server if it does not exist, and it will not be removed when cleanupAll() is called on
* the Neo4jResourceManager.
* the PatchNeo4jResourceManager.
*
* @param databaseName The database name.
* @return this builder object with the database name set.
Expand All @@ -238,7 +239,7 @@ public Builder setDatabaseName(String databaseName) {
* <p>Note: if a database name is set, and a static Neo4j server is being used
* (useStaticContainer() is also called on the builder), then a database will be created on the
* static server if it does not exist, and it will not be removed when cleanupAll() is called on
* the Neo4jResourceManager.
* the PatchNeo4jResourceManager.
*
* <p>{@link DatabaseWaitOptions} exposes all configurable wait options
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public abstract class Neo4jQueryCheck extends ConditionCheck {

abstract List<Map<String, Object>> expectedResult();

abstract String databaseName();

abstract String query();

abstract @Nullable Map<String, Object> parameters();
Expand All @@ -49,9 +51,9 @@ public String getDescription() {
protected CheckResult check() {
List<Map<String, Object>> actualResult;
if (parameters() != null) {
actualResult = resourceManager().run(query(), parameters());
actualResult = resourceManager().run(query(), databaseName(), parameters());
} else {
actualResult = resourceManager().run(query());
actualResult = resourceManager().run(query(), databaseName());
}
List<Map<String, Object>> expectedResult = expectedResult();
if (actualResult == null) {
Expand Down Expand Up @@ -80,6 +82,8 @@ public abstract static class Builder {

public abstract Builder setResourceManager(Neo4jResourceManager resourceManager);

public abstract Builder setDatabaseName(String databaseName);

public abstract Builder setQuery(String query);

public abstract Builder setParameters(Map<String, Object> parameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,27 @@
public class Neo4jResourceManagerIT {

private Neo4jResourceManager neo4jResourceManager;
private static final String STATIC_DATABASE_NAME = "neo4j";

@Before
public void setUp() {
neo4jResourceManager =
Neo4jResourceManager.builder("placeholder")
.setDatabaseName("neo4j", DatabaseWaitOptions.waitDatabase())
.setDatabaseName(STATIC_DATABASE_NAME, DatabaseWaitOptions.waitDatabase())
.setAdminPassword("password")
.build();
}

@Test
public void testResourceManagerE2E() {

neo4jResourceManager.run(
"CREATE (:Hello {whom: $whom})", Collections.singletonMap("whom", "world"));
"CREATE (:Hello {whom: $whom})",
STATIC_DATABASE_NAME,
Collections.singletonMap("whom", "world"));

List<Map<String, Object>> results =
neo4jResourceManager.run("MATCH (h:Hello) RETURN h.whom AS whom");
neo4jResourceManager.run("MATCH (h:Hello) RETURN h.whom AS whom", STATIC_DATABASE_NAME);

assertThat(results).hasSize(1);
assertThat(results)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testDatabaseIsCreatedWithNoWaitOptions() {
Neo4jResourceManager.builder(TEST_ID)
.setDatabaseName(STATIC_DATABASE_NAME, DatabaseWaitOptions.noWaitDatabase());
new Neo4jResourceManager(neo4jDriver, container, builder);

String unused = testManager.createTestDatabase();
verify(session).run(and(startsWith("CREATE DATABASE"), endsWith("NOWAIT")), anyMap());
}

Expand All @@ -107,35 +107,40 @@ public void testGetUriShouldReturnCorrectValue() {

@Test
public void testGetDatabaseNameShouldReturnCorrectValue() {
assertThat(testManager.getDatabaseName()).matches(TEST_ID + "-\\d{8}-\\d{6}-\\d{6}");
String databaseName = testManager.createTestDatabase();
assertThat(databaseName).matches("test[0-9a-f]{4}-\\d{8}-\\d{6}-\\d{6}");
}

@Test
public void testDropDatabaseShouldThrowErrorIfDriverFailsToRunQuery() {
String unused = testManager.createTestDatabase();
doThrow(ClientException.class).when(session).run(anyString(), anyMap());

assertThrows(
Neo4jResourceManagerException.class,
() -> testManager.dropDatabase(STATIC_DATABASE_NAME, DatabaseWaitOptions.noWaitDatabase()));
() -> testManager.dropTestDatabases(DatabaseWaitOptions.noWaitDatabase()));
}

@Test
public void testRunShouldThrowErrorIfDriverFailsToRunParameterlessQuery() {
String databaseName = testManager.createTestDatabase();
doThrow(ClientException.class).when(session).run(anyString(), anyMap());

assertThrows(
Neo4jResourceManagerException.class, () -> testManager.run("MATCH (n) RETURN n LIMIT 1"));
Neo4jResourceManagerException.class,
() -> testManager.run(databaseName, "MATCH (n) RETURN n LIMIT 1"));
}

@Test
public void testRunShouldThrowErrorIfDriverFailsToRunQuery() {
String databaseName = testManager.createTestDatabase();
doThrow(ClientException.class).when(session).run(anyString(), anyMap());

assertThrows(
Neo4jResourceManagerException.class,
() ->
testManager.run(
"MATCH (n) WHERE n < $val RETURN n LIMIT 1", Collections.singletonMap("val", 2)));
"MATCH (n) WHERE n < $val RETURN n LIMIT 1",
databaseName,
Collections.singletonMap("val", 2)));
}

@Test
Expand All @@ -152,6 +157,7 @@ public void testCleanupAllShouldNotDropStaticDatabase() {

@Test
public void testCleanupShouldDropNonStaticDatabase() {
String unused = testManager.createTestDatabase();
when(session.run(anyString(), anyMap())).thenReturn(mock(Result.class));

testManager.cleanupAll();
Expand All @@ -162,8 +168,8 @@ public void testCleanupShouldDropNonStaticDatabase() {

@Test
public void testCleanupAllShouldThrowErrorWhenNeo4jDriverFailsToDropDatabase() {
String unused = testManager.createTestDatabase();
doThrow(ClientException.class).when(session).run(anyString(), anyMap());

assertThrows(Neo4jResourceManagerException.class, () -> testManager.cleanupAll());
}

Expand Down
Loading