Skip to content

Commit 40a0bff

Browse files
authored
Create hdfs test dockerized (#4470) (#4479)
* Create hdfs test dockerized (#4470) * Create HDFS test * migrated tests * cleanup * added createPortBindingModifier util * try solve ci error * cleanup * fix tests * retry fix ci * 2nd try * Update HdfsContainerBaseTest.java * Create hdfs test dockerized (#4470) (#4480) * Create hdfs test dockerized (#4470) * Create HDFS test * migrated tests * cleanup * added createPortBindingModifier util * try solve ci error * cleanup * fix tests * retry fix ci * 2nd try * Update HdfsContainerBaseTest.java * removed executeGradleTasks methods
1 parent a215622 commit 40a0bff

File tree

11 files changed

+450
-305
lines changed

11 files changed

+450
-305
lines changed

extended-it/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ dependencies {
6060
testImplementation group: 'org.apache.poi', name: 'poi', version: '5.1.0'
6161
testImplementation group: 'org.apache.poi', name: 'poi-ooxml', version: '5.1.0'
6262
testImplementation 'com.azure:azure-storage-blob:12.22.0'
63+
testImplementation group: 'org.mock-server', name: 'mockserver-client-java', version: '5.6.0'
6364
configurations.all {
6465
exclude group: 'org.slf4j', module: 'slf4j-nop'
6566
exclude group: 'ch.qos.logback', module: 'logback-classic'
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package apoc.hadoop;
2+
3+
import apoc.util.*;
4+
import org.junit.*;
5+
import org.junit.rules.TemporaryFolder;
6+
import org.neo4j.driver.Value;
7+
8+
import java.util.List;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
import java.util.stream.Collectors;
11+
12+
import static apoc.util.CompressionAlgo.BLOCK_LZ4;
13+
import static apoc.util.CompressionAlgo.NONE;
14+
import static apoc.util.MapUtil.map;
15+
import static junit.framework.TestCase.assertTrue;
16+
import static org.junit.Assert.assertEquals;
17+
18+
public class ExportCsvHdfsTest extends HdfsContainerBaseTest {
19+
private static final String EXPECTED = String.format("0,:User:User1,42,,[\"a\",\"b\",\"c\"],true,foo,,,,%n" +
20+
"1,:User,42,,,,bar,,,,%n" +
21+
"2,:User,12,,,,,,,,%n" +
22+
"3,:Address:Address1,,Milano,,,Andrea,Via Garibaldi, 7,,,%n" +
23+
"4,:Address,,,,,Bar Sport,,,,%n" +
24+
"5,:Address,,,,,,via Benni,,,%n" +
25+
",,,,,,,,0,1,KNOWS%n" +
26+
",,,,,,,,3,4,NEXT_DELIVERY");
27+
28+
@ClassRule
29+
public static TemporaryFolder storeDir = new TemporaryFolder();
30+
31+
@ClassRule
32+
public static TemporaryFolder hdfsDir = new TemporaryFolder();
33+
34+
@BeforeClass
35+
public static void setUp() throws Exception {
36+
HdfsContainerBaseTest.setUp();
37+
38+
session.executeWrite(tx -> tx.run("CREATE (f:User1:User {name:'foo',age:42,male:true,kids:['a','b','c']})-[:KNOWS]->(b:User {name:'bar',age:42}),(c:User {age:12})").consume());
39+
session.executeWrite(tx -> tx.run("CREATE (f:Address1:Address {name:'Andrea', city: 'Milano', street:'Via Garibaldi, 7'})-[:NEXT_DELIVERY]->(a:Address {name: 'Bar Sport'}), (b:Address {street: 'via Benni'})").consume());
40+
}
41+
42+
@Test
43+
public void testExportAllCsvHDFS() {
44+
String url = assertHdfsFile(NONE);
45+
46+
String actual = session.executeRead(tx -> tx.run(
47+
"CALL apoc.load.csv($url, $config) YIELD strings",
48+
map("url", url,
49+
"config", map("results", List.of("strings")))
50+
).stream()
51+
.map(i -> String.join(",", i.get("strings").asList(Value::asString)))
52+
.collect(Collectors.joining("\n"))
53+
);
54+
55+
assertEquals(EXPECTED, actual);
56+
}
57+
58+
@Test
59+
public void testExportAllCsvHDFSCompressed() {
60+
assertHdfsFile(BLOCK_LZ4);
61+
}
62+
63+
private String assertHdfsFile(CompressionAlgo compression) {
64+
String fileExt = compression.equals(NONE) ? "" : ".lz4";
65+
66+
AtomicReference<String> url = new AtomicReference<>();
67+
TestContainerUtil.testCall(session, "CALL apoc.export.csv.all($file, $config)",
68+
map("file", hdfsUrl + "/all.csv" + fileExt, "config", map("compression", compression.name())),
69+
(r) -> {
70+
try {
71+
assertEquals(6L, r.get("nodes"));
72+
assertEquals(2L, r.get("relationships"));
73+
assertEquals(12L, r.get("properties"));
74+
assertEquals("database: nodes(6), rels(2)", r.get("source"));
75+
assertEquals("csv", r.get("format"));
76+
url.set( (String) r.get("file") );
77+
assertTrue("Should get time greater than 0",((long) r.get("time")) >= 0);
78+
} catch (Exception e) {
79+
throw new RuntimeException(e);
80+
}
81+
});
82+
83+
return url.get();
84+
}
85+
86+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package apoc.hadoop;
2+
3+
import apoc.util.ExtendedTestContainerUtil;
4+
import apoc.util.Neo4jContainerExtension;
5+
import org.apache.commons.io.filefilter.WildcardFileFilter;
6+
import org.junit.AfterClass;
7+
import org.junit.BeforeClass;
8+
import org.neo4j.driver.Session;
9+
import org.testcontainers.containers.GenericContainer;
10+
import org.testcontainers.containers.Network;
11+
import org.testcontainers.containers.wait.strategy.Wait;
12+
import org.testcontainers.utility.DockerImageName;
13+
import org.testcontainers.utility.MountableFile;
14+
15+
import java.io.File;
16+
import java.nio.file.Files;
17+
import java.util.Arrays;
18+
import java.util.List;
19+
20+
import static apoc.util.ExtendedTestContainerUtil.createPortBindingModifier;
21+
import static apoc.util.TestContainerUtil.*;
22+
23+
/**
24+
* Create a TestContainer's network equivalent of the `src/test/resources/docker-compose-hadoop.yml`
25+
*/
26+
public class HdfsContainerBaseTest {
27+
28+
public static final String APACHE_HADOOP_IMAGE = "apache/hadoop:3.3.6";
29+
public static final String hdfsUrl = "hdfs://namenode:8020";
30+
31+
private static Network hadoopNetwork = Network.newNetwork();
32+
33+
protected static Neo4jContainerExtension neo4jContainer;
34+
protected static GenericContainer<?> namenode;
35+
private static GenericContainer<?> datanode;
36+
private static GenericContainer<?> resourcemanager;
37+
private static GenericContainer<?> nodemanager;
38+
39+
40+
// Start the HDFS cluster with DockerComposeContainer
41+
protected static Session session;
42+
43+
44+
@BeforeClass
45+
public static void setUp() throws Exception {
46+
String rpcAddress = "namenode:8020";
47+
48+
// Namenode
49+
namenode = new GenericContainer<>(DockerImageName.parse(APACHE_HADOOP_IMAGE))
50+
.withNetwork(hadoopNetwork)
51+
.withNetworkAliases("namenode")
52+
.withCommand("hdfs", "namenode")
53+
.withEnv("CORE-SITE.XML_fs.default.name", hdfsUrl)
54+
.withEnv("CORE-SITE.XML_fs.defaultFS", hdfsUrl)
55+
.withEnv("HDFS-SITE.XML_dfs.namenode.rpc-address", rpcAddress)
56+
.withEnv("ENSURE_NAMENODE_DIR", "/tmp/hadoop-root/dfs/name")
57+
.withEnv("HADOOP_USER_NAME", "hadoop")
58+
.withCreateContainerCmdModifier(createPortBindingModifier(8020, 9870))
59+
.waitingFor(Wait.forListeningPort());
60+
61+
62+
// Datanode
63+
datanode = new GenericContainer<>(DockerImageName.parse(APACHE_HADOOP_IMAGE))
64+
.withNetwork(hadoopNetwork)
65+
.withCommand("hdfs", "datanode")
66+
.withEnv("CORE-SITE.XML_fs.default.name", hdfsUrl)
67+
.withEnv("CORE-SITE.XML_fs.defaultFS", hdfsUrl)
68+
.withEnv("HDFS-SITE.XML_dfs.namenode.rpc-address", rpcAddress)
69+
.withEnv("HADOOP_USER_NAME", "hadoop")
70+
.dependsOn(namenode)
71+
.withCreateContainerCmdModifier(createPortBindingModifier(9866, 9864));
72+
73+
// ResourceManager
74+
resourcemanager = new GenericContainer<>(DockerImageName.parse(APACHE_HADOOP_IMAGE))
75+
.withNetwork(hadoopNetwork)
76+
.withNetworkAliases("resourcemanager")
77+
.withCommand("yarn", "resourcemanager")
78+
.withEnv("CORE-SITE.XML_fs.default.name", hdfsUrl)
79+
.withEnv("CORE-SITE.XML_fs.defaultFS", hdfsUrl)
80+
.withEnv("HDFS-SITE.XML_dfs.namenode.rpc-address", rpcAddress)
81+
.withEnv("HADOOP_USER_NAME", "hadoop")
82+
.dependsOn(namenode)
83+
.withCreateContainerCmdModifier(createPortBindingModifier(8088, 8088));
84+
85+
// NodeManager
86+
nodemanager = new GenericContainer<>(DockerImageName.parse(APACHE_HADOOP_IMAGE))
87+
.withNetwork(hadoopNetwork)
88+
.withCommand("yarn", "nodemanager")
89+
.withEnv("CORE-SITE.XML_fs.default.name", hdfsUrl)
90+
.withEnv("CORE-SITE.XML_fs.defaultFS", hdfsUrl)
91+
.withEnv("HDFS-SITE.XML_dfs.namenode.rpc-address", rpcAddress)
92+
.withEnv("HADOOP_USER_NAME", "hadoop")
93+
.dependsOn(namenode, resourcemanager);
94+
95+
neo4jContainer = new Neo4jContainerExtension(neo4jCommunityDockerImageVersion, Files.createTempDirectory("neo4j-logs"))
96+
.withNetwork(hadoopNetwork)
97+
.withNetworkAliases("neo4j")
98+
.withEnv("NEO4J_AUTH", "neo4j/password")
99+
.withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes")
100+
.withEnv("HADOOP_USER_NAME", "hadoop")
101+
.withEnv("apoc.export.file.enabled", "true")
102+
.withEnv("apoc.import.file.enabled", "true")
103+
.withNeo4jConfig("dbms.security.procedures.unrestricted", "apoc.*")
104+
.withPlugins(MountableFile.forHostPath(pluginsFolder.toPath()))
105+
.dependsOn(namenode);
106+
107+
neo4jContainer.setExposedPorts(List.of(7474, 7687));
108+
109+
ExtendedTestContainerUtil.addExtraDependencies();
110+
neo4jContainer.start();
111+
namenode.start();
112+
datanode.start();
113+
resourcemanager.start();
114+
nodemanager.start();
115+
session = neo4jContainer.getSession();
116+
117+
// waiting for hadoop container to be up and running
118+
Thread.sleep(20000);
119+
}
120+
121+
@AfterClass
122+
public static void afterClass() throws Exception {
123+
neo4jContainer.close();
124+
namenode.close();
125+
datanode.close();
126+
resourcemanager.close();
127+
nodemanager.close();
128+
}
129+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package apoc.hadoop;
2+
3+
import apoc.util.Util;
4+
import org.junit.BeforeClass;
5+
import org.junit.Test;
6+
import org.junit.jupiter.api.Assertions;
7+
import org.testcontainers.containers.Container;
8+
9+
import java.util.Map;
10+
11+
import static apoc.util.TestContainerUtil.testCall;
12+
13+
public class LoadHdfsTest extends HdfsContainerBaseTest {
14+
15+
@BeforeClass
16+
public static void setUp() throws Exception {
17+
HdfsContainerBaseTest.setUp();
18+
// copy csv file in hadoop
19+
Container.ExecResult execResult = namenode.execInContainer("hdfs", "dfs", "-mkdir", "/DATA/");
20+
System.out.println("execResult.getStdout() = " + execResult.getStdout());
21+
System.out.println("execResult.getStderr(() = " + execResult.getStderr());
22+
Container.ExecResult execResult1 = namenode.execInContainer("bash", "-c", "echo \"a,b\n1,2\" > data.csv");
23+
System.out.println("execResult1.getStdout() = " + execResult1.getStdout());
24+
System.out.println("execResult1.getStderr(() = " + execResult1.getStderr());
25+
Container.ExecResult execResult2 = namenode.execInContainer("hdfs", "dfs", "-put", "data.csv", "/DATA/");
26+
System.out.println("execResult2.getStdout() = " + execResult2.getStdout());
27+
System.out.println("execResult2.getStderr() = " + execResult2.getStderr());
28+
}
29+
30+
@Test
31+
public void testApocLoadCsvFromHdfs() throws Exception {
32+
// Thread.sleep(3000);
33+
testCall(session, "CALL apoc.load.csv($url) YIELD map",
34+
Util.map("url", hdfsUrl + "/DATA/data.csv"),
35+
r -> {
36+
Map<String, String> expected = Util.map("a", "1", "b", "2");
37+
Assertions.assertEquals(expected, r.get("map"));
38+
});
39+
}
40+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package apoc.hadoop;
2+
3+
import apoc.util.collection.Iterators;
4+
import org.junit.Before;
5+
import org.junit.ClassRule;
6+
import org.junit.Test;
7+
import org.junit.rules.TemporaryFolder;
8+
9+
import java.io.File;
10+
import java.util.Map;
11+
12+
import static apoc.export.parquet.ParquetTest.MAPPING_ALL;
13+
import static apoc.export.parquet.ParquetTestUtil.assertNodeAndLabel;
14+
import static apoc.export.parquet.ParquetUtil.FIELD_SOURCE_ID;
15+
import static apoc.export.parquet.ParquetUtil.FIELD_TARGET_ID;
16+
import static apoc.util.TestContainerUtil.testCall;
17+
import static apoc.util.TestContainerUtil.testResult;
18+
import static org.junit.Assert.*;
19+
20+
public class ParquetHdfsTest extends HdfsContainerBaseTest {
21+
22+
private static final File directory = new File("target/hdfs-parquet-import");
23+
static { //noinspection ResultOfMethodCallIgnored
24+
directory.mkdirs();
25+
}
26+
27+
@ClassRule
28+
public static TemporaryFolder hdfsDir = new TemporaryFolder();
29+
30+
31+
@Before
32+
public void before() {
33+
session.executeWrite(tx -> tx.run("MATCH (n) DETACH DELETE n").consume());
34+
35+
session.executeWrite(tx -> tx.run("CREATE (f:User {name:'Adam',age:42,male:true,kids:['Sam','Anna','Grace', 'Qwe'], born:localdatetime('2015-05-18T19:32:24.000'), place:point({latitude: 13.1, longitude: 33.46789, height: 100.0})})-[:KNOWS {since: 1993, bffSince: duration('P5M1.5D')}]->(b:User {name:'Jim',age:42})").consume());
36+
session.executeWrite(tx -> tx.run("CREATE (:Another {foo:1, listDate: [date('1999'), date('2000')], listInt: [1,2]}), (:Another {bar:'Sam'})").consume());
37+
38+
}
39+
40+
@Test
41+
public void testFileRoundtripParquetAll() {
42+
43+
String hdfsFileUrl = hdfsUrl + "/all.parquet";
44+
String file = session.executeWrite(tx -> tx.run("CALL apoc.export.parquet.all($url) YIELD file", Map.of("url", hdfsFileUrl)).single().get("file").asString());
45+
// check that file extracted from apoc.export is equals to `hdfs://path/to/file` url
46+
assertEquals(hdfsFileUrl, file);
47+
48+
// check load procedure
49+
final String query = "CALL apoc.load.parquet($file, $config) YIELD value " +
50+
"RETURN value";
51+
52+
testResult(session, query, Map.of("file", file, "config", MAPPING_ALL),
53+
value -> {
54+
Map<String, Object> actual = value.next();
55+
assertNodeAndLabel((Map) actual.get("value"), "User");
56+
actual = value.next();
57+
assertNodeAndLabel((Map) actual.get("value"), "User");
58+
actual = value.next();
59+
assertNodeAndLabel((Map) actual.get("value"), "Another");
60+
actual = value.next();
61+
assertNodeAndLabel((Map) actual.get("value"), "Another");
62+
actual = value.next();
63+
Map relMap = (Map) actual.get("value");
64+
assertTrue(relMap.get(FIELD_SOURCE_ID) instanceof Long);
65+
assertTrue(relMap.get(FIELD_TARGET_ID) instanceof Long);
66+
assertFalse(value.hasNext());
67+
});
68+
69+
// check import procedure
70+
Map<String, Object> params = Map.of("file", file, "config", MAPPING_ALL);
71+
// remove current data
72+
session.executeWrite(tx -> tx.run("MATCH (n) DETACH DELETE n").consume());
73+
74+
final String queryImport = "CALL apoc.import.parquet($file, $config)";
75+
testCall(session, queryImport, params,
76+
r -> {
77+
assertEquals(4L, r.get("nodes"));
78+
assertEquals(1L, r.get("relationships"));
79+
});
80+
81+
testResult(session, "MATCH (start:User)-[rel:KNOWS]->(end:User) RETURN start, rel, end", r -> {
82+
long count = Iterators.count(r);
83+
assertEquals(1, count);
84+
});
85+
86+
testResult(session, "MATCH (m:Another) RETURN m", r -> {
87+
long count = Iterators.count(r);
88+
assertEquals(2, count);
89+
});
90+
91+
}
92+
}

0 commit comments

Comments
 (0)