Skip to content

Commit fc39ee6

Browse files
authored
Update neo4j resource manager (#37310)
* Update neo4j resource manager * Fix a test log spam in it/common * Add back javadoc
1 parent daa0f15 commit fc39ee6

File tree

5 files changed

+62
-35
lines changed

5 files changed

+62
-35
lines changed

it/common/src/main/java/org/apache/beam/it/common/PipelineOperator.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ private static Result finishOrTimeout(
202202
LOG.warn("Error happened when checking for condition", e);
203203
}
204204

205-
LOG.info("Condition was not met yet. Checking if job is finished.");
205+
LOG.debug("Condition was not met yet. Checking if job is finished.");
206206
if (launchFinished) {
207207
LOG.info("Launch was finished, stop checking.");
208208
return Result.LAUNCH_FINISHED;
@@ -212,11 +212,15 @@ private static Result finishOrTimeout(
212212
LOG.info("Detected that launch was finished, checking conditions once more.");
213213
launchFinished = true;
214214
} else {
215-
LOG.info(
216-
"Job not finished and conditions not met. Will check again in {} seconds (total wait: {}s of max {}s)",
217-
config.checkAfter().getSeconds(),
218-
Duration.between(start, Instant.now()).getSeconds(),
219-
config.timeoutAfter().getSeconds());
215+
long checkSec = config.checkAfter().getSeconds();
216+
long waitSec = Duration.between(start, Instant.now()).getSeconds();
217+
if (checkSec > 0 && (waitSec / checkSec) % 5 == 0) { // reduce log spam
218+
LOG.info(
219+
"Job not finished and conditions not met. Will check again in {} seconds (total wait: {}s of max {}s)",
220+
checkSec,
221+
waitSec,
222+
config.timeoutAfter().getSeconds());
223+
}
220224
}
221225
try {
222226
Thread.sleep(config.checkAfter().toMillis());

it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import static org.apache.beam.it.neo4j.Neo4jResourceManagerUtils.generateDatabaseName;
2222

2323
import com.google.common.annotations.VisibleForTesting;
24+
import java.util.ArrayList;
2425
import java.util.Collections;
2526
import java.util.List;
2627
import java.util.Map;
28+
import java.util.UUID;
2729
import org.apache.beam.it.common.ResourceManager;
2830
import org.apache.beam.it.testcontainers.TestContainerResourceManager;
2931
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -60,7 +62,8 @@ public class Neo4jResourceManager extends TestContainerResourceManager<Neo4jCont
6062
private static final int NEO4J_BOLT_PORT = 7687;
6163

6264
private final Driver neo4jDriver;
63-
private final String databaseName;
65+
private final @Nullable String databaseName;
66+
private final List<String> newDataBases = new ArrayList<>();
6467
private final DatabaseWaitOption waitOption;
6568
private final String connectionString;
6669
private final boolean usingStaticDatabase;
@@ -95,9 +98,8 @@ private Neo4jResourceManager(Builder builder) {
9598
this.databaseName = builder.databaseName;
9699
this.waitOption = null;
97100
} else {
98-
this.databaseName = generateDatabaseName(builder.testId);
101+
this.databaseName = null;
99102
this.waitOption = builder.waitOption;
100-
createDatabase(databaseName, waitOption);
101103
}
102104
}
103105

@@ -110,11 +112,12 @@ public synchronized String getUri() {
110112
return connectionString;
111113
}
112114

113-
public List<Map<String, Object>> run(String query) {
114-
return this.run(query, Collections.emptyMap());
115+
public List<Map<String, Object>> run(String query, String databaseName) {
116+
return this.run(query, databaseName, Collections.emptyMap());
115117
}
116118

117-
public List<Map<String, Object>> run(String query, Map<String, Object> parameters) {
119+
public List<Map<String, Object>> run(
120+
String query, String databaseName, Map<String, Object> parameters) {
118121
try (Session session =
119122
neo4jDriver.session(SessionConfig.builder().withDatabase(databaseName).build())) {
120123
return session.run(query, parameters).list(record -> record.asMap());
@@ -128,7 +131,7 @@ public List<Map<String, Object>> run(String query, Map<String, Object> parameter
128131
*
129132
* @return the name of the Neo4j Database.
130133
*/
131-
public synchronized String getDatabaseName() {
134+
public synchronized @Nullable String getDatabaseName() {
132135
return databaseName;
133136
}
134137

@@ -140,11 +143,11 @@ public synchronized void cleanupAll() {
140143

141144
// First, delete the database if it was not given as a static argument
142145
try {
143-
if (!usingStaticDatabase) {
144-
dropDatabase(databaseName, waitOption);
146+
if (!newDataBases.isEmpty()) {
147+
dropTestDatabases(waitOption);
145148
}
146149
} catch (Exception e) {
147-
LOG.error("Failed to delete Neo4j database {}.", databaseName, e);
150+
LOG.error("Failed to delete Neo4j databases {}.", newDataBases, e);
148151
producedError = true;
149152
}
150153

@@ -167,28 +170,34 @@ public synchronized void cleanupAll() {
167170
LOG.info("Neo4j manager successfully cleaned up.");
168171
}
169172

170-
private void createDatabase(String databaseName, DatabaseWaitOption waitOption) {
173+
public String createTestDatabase() {
174+
String newDatabaseName =
175+
generateDatabaseName("test" + UUID.randomUUID().toString().substring(0, 4));
171176
try (Session session =
172177
neo4jDriver.session(SessionConfig.builder().withDatabase("system").build())) {
173178
String query =
174179
String.format("CREATE DATABASE $db %s", DatabaseWaitOptions.asCypher(waitOption));
175-
session.run(query, Collections.singletonMap("db", databaseName)).consume();
180+
session.run(query, Collections.singletonMap("db", newDatabaseName)).consume();
176181
} catch (Exception e) {
177182
throw new Neo4jResourceManagerException(
178-
String.format("Error dropping database %s.", databaseName), e);
183+
String.format("Error dropping database %s.", newDatabaseName), e);
179184
}
185+
newDataBases.add(newDatabaseName);
186+
return newDatabaseName;
180187
}
181188

182189
@VisibleForTesting
183-
void dropDatabase(String databaseName, DatabaseWaitOption waitOption) {
190+
void dropTestDatabases(DatabaseWaitOption waitOption) {
184191
try (Session session =
185192
neo4jDriver.session(SessionConfig.builder().withDatabase("system").build())) {
186193
String query =
187194
String.format("DROP DATABASE $db %s", DatabaseWaitOptions.asCypher(waitOption));
188-
session.run(query, Collections.singletonMap("db", databaseName)).consume();
195+
for (String databaseName : newDataBases) {
196+
session.run(query, Collections.singletonMap("db", databaseName)).consume();
197+
}
189198
} catch (Exception e) {
190199
throw new Neo4jResourceManagerException(
191-
String.format("Error dropping database %s.", databaseName), e);
200+
String.format("Error dropping database %s.", newDataBases), e);
192201
}
193202
}
194203

it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public abstract class Neo4jQueryCheck extends ConditionCheck {
3434

3535
abstract List<Map<String, Object>> expectedResult();
3636

37+
abstract String databaseName();
38+
3739
abstract String query();
3840

3941
abstract @Nullable Map<String, Object> parameters();
@@ -49,9 +51,9 @@ public String getDescription() {
4951
protected CheckResult check() {
5052
List<Map<String, Object>> actualResult;
5153
if (parameters() != null) {
52-
actualResult = resourceManager().run(query(), parameters());
54+
actualResult = resourceManager().run(query(), databaseName(), parameters());
5355
} else {
54-
actualResult = resourceManager().run(query());
56+
actualResult = resourceManager().run(query(), databaseName());
5557
}
5658
List<Map<String, Object>> expectedResult = expectedResult();
5759
if (actualResult == null) {
@@ -80,6 +82,8 @@ public abstract static class Builder {
8082

8183
public abstract Builder setResourceManager(Neo4jResourceManager resourceManager);
8284

85+
public abstract Builder setDatabaseName(String databaseName);
86+
8387
public abstract Builder setQuery(String query);
8488

8589
public abstract Builder setParameters(Map<String, Object> parameters);

it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerIT.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,27 @@
3737
public class Neo4jResourceManagerIT {
3838

3939
private Neo4jResourceManager neo4jResourceManager;
40+
private static final String STATIC_DATABASE_NAME = "neo4j";
4041

4142
@Before
4243
public void setUp() {
4344
neo4jResourceManager =
4445
Neo4jResourceManager.builder("placeholder")
45-
.setDatabaseName("neo4j", DatabaseWaitOptions.waitDatabase())
46+
.setDatabaseName(STATIC_DATABASE_NAME, DatabaseWaitOptions.waitDatabase())
4647
.setAdminPassword("password")
4748
.build();
4849
}
4950

5051
@Test
5152
public void testResourceManagerE2E() {
53+
5254
neo4jResourceManager.run(
53-
"CREATE (:Hello {whom: $whom})", Collections.singletonMap("whom", "world"));
55+
"CREATE (:Hello {whom: $whom})",
56+
STATIC_DATABASE_NAME,
57+
Collections.singletonMap("whom", "world"));
5458

5559
List<Map<String, Object>> results =
56-
neo4jResourceManager.run("MATCH (h:Hello) RETURN h.whom AS whom");
60+
neo4jResourceManager.run("MATCH (h:Hello) RETURN h.whom AS whom", STATIC_DATABASE_NAME);
5761

5862
assertThat(results).hasSize(1);
5963
assertThat(results)

it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void testDatabaseIsCreatedWithNoWaitOptions() {
9696
Neo4jResourceManager.builder(TEST_ID)
9797
.setDatabaseName(STATIC_DATABASE_NAME, DatabaseWaitOptions.noWaitDatabase());
9898
new Neo4jResourceManager(neo4jDriver, container, builder);
99-
99+
String unused = testManager.createTestDatabase();
100100
verify(session).run(and(startsWith("CREATE DATABASE"), endsWith("NOWAIT")), anyMap());
101101
}
102102

@@ -107,35 +107,40 @@ public void testGetUriShouldReturnCorrectValue() {
107107

108108
@Test
109109
public void testGetDatabaseNameShouldReturnCorrectValue() {
110-
assertThat(testManager.getDatabaseName()).matches(TEST_ID + "-\\d{8}-\\d{6}-\\d{6}");
110+
String databaseName = testManager.createTestDatabase();
111+
assertThat(databaseName).matches("test[0-9a-f]{4}-\\d{8}-\\d{6}-\\d{6}");
111112
}
112113

113114
@Test
114115
public void testDropDatabaseShouldThrowErrorIfDriverFailsToRunQuery() {
116+
String unused = testManager.createTestDatabase();
115117
doThrow(ClientException.class).when(session).run(anyString(), anyMap());
116118

117119
assertThrows(
118120
Neo4jResourceManagerException.class,
119-
() -> testManager.dropDatabase(STATIC_DATABASE_NAME, DatabaseWaitOptions.noWaitDatabase()));
121+
() -> testManager.dropTestDatabases(DatabaseWaitOptions.noWaitDatabase()));
120122
}
121123

122124
@Test
123125
public void testRunShouldThrowErrorIfDriverFailsToRunParameterlessQuery() {
126+
String databaseName = testManager.createTestDatabase();
124127
doThrow(ClientException.class).when(session).run(anyString(), anyMap());
125-
126128
assertThrows(
127-
Neo4jResourceManagerException.class, () -> testManager.run("MATCH (n) RETURN n LIMIT 1"));
129+
Neo4jResourceManagerException.class,
130+
() -> testManager.run(databaseName, "MATCH (n) RETURN n LIMIT 1"));
128131
}
129132

130133
@Test
131134
public void testRunShouldThrowErrorIfDriverFailsToRunQuery() {
135+
String databaseName = testManager.createTestDatabase();
132136
doThrow(ClientException.class).when(session).run(anyString(), anyMap());
133-
134137
assertThrows(
135138
Neo4jResourceManagerException.class,
136139
() ->
137140
testManager.run(
138-
"MATCH (n) WHERE n < $val RETURN n LIMIT 1", Collections.singletonMap("val", 2)));
141+
"MATCH (n) WHERE n < $val RETURN n LIMIT 1",
142+
databaseName,
143+
Collections.singletonMap("val", 2)));
139144
}
140145

141146
@Test
@@ -152,6 +157,7 @@ public void testCleanupAllShouldNotDropStaticDatabase() {
152157

153158
@Test
154159
public void testCleanupShouldDropNonStaticDatabase() {
160+
String unused = testManager.createTestDatabase();
155161
when(session.run(anyString(), anyMap())).thenReturn(mock(Result.class));
156162

157163
testManager.cleanupAll();
@@ -162,8 +168,8 @@ public void testCleanupShouldDropNonStaticDatabase() {
162168

163169
@Test
164170
public void testCleanupAllShouldThrowErrorWhenNeo4jDriverFailsToDropDatabase() {
171+
String unused = testManager.createTestDatabase();
165172
doThrow(ClientException.class).when(session).run(anyString(), anyMap());
166-
167173
assertThrows(Neo4jResourceManagerException.class, () -> testManager.cleanupAll());
168174
}
169175

0 commit comments

Comments
 (0)