Skip to content

Commit f768b68

Browse files
[CELEBORN-2275][CIP-14] Add C++ merge-write and Java-read hybrid integration test
Add a new C++ test client that exercises the mergeData/pushMergedData write path and validates data integrity by reading back from the Java ShuffleClient. This complements the existing pushData-based hybrid test by covering the merge write path. - Add DataSumWithMergeWriterClient.cpp and its CMake build target - Add CppMergeWriteJavaReadTest entry points for NONE, LZ4, and ZSTD compression codecs - Add runCppMergeWriteJavaRead to JavaCppHybridReadWriteTestBase - Update cpp_integration CI workflow to run the new tests
1 parent 391ef4b commit f768b68

7 files changed

Lines changed: 314 additions & 0 deletions

File tree

.github/workflows/cpp_integration.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,25 @@ jobs:
126126
test-compile exec:java \
127127
-Dexec.classpathScope="test" \
128128
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppWriteJavaReadTestWithZSTD" \
129+
-Dexec.args="-XX:MaxDirectMemorySize=2G"
130+
- name: Run Cpp-MergeWrite Java-Read Hybrid Integration Test (NONE Compression)
131+
run: |
132+
build/mvn -pl worker \
133+
test-compile exec:java \
134+
-Dexec.classpathScope="test" \
135+
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppMergeWriteJavaReadTestWithNONE" \
136+
-Dexec.args="-XX:MaxDirectMemorySize=2G"
137+
- name: Run Cpp-MergeWrite Java-Read Hybrid Integration Test (LZ4 Compression)
138+
run: |
139+
build/mvn -pl worker \
140+
test-compile exec:java \
141+
-Dexec.classpathScope="test" \
142+
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppMergeWriteJavaReadTestWithLZ4" \
143+
-Dexec.args="-XX:MaxDirectMemorySize=2G"
144+
- name: Run Cpp-MergeWrite Java-Read Hybrid Integration Test (ZSTD Compression)
145+
run: |
146+
build/mvn -pl worker \
147+
test-compile exec:java \
148+
-Dexec.classpathScope="test" \
149+
-Dexec.mainClass="org.apache.celeborn.service.deploy.cluster.CppMergeWriteJavaReadTestWithZSTD" \
129150
-Dexec.args="-XX:MaxDirectMemorySize=2G"

cpp/celeborn/tests/CMakeLists.txt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,28 @@ target_link_libraries(
6161
add_executable(cppDataSumWithWriterClient DataSumWithWriterClient.cpp)
6262

6363
target_link_libraries(cppDataSumWithWriterClient dataSumWithWriterClient)
64+
65+
add_library(
66+
dataSumWithMergeWriterClient
67+
DataSumWithMergeWriterClient.cpp)
68+
69+
target_link_libraries(
70+
dataSumWithMergeWriterClient
71+
memory
72+
utils
73+
conf
74+
proto
75+
network
76+
protocol
77+
client
78+
${WANGLE}
79+
${FIZZ}
80+
${LIBSODIUM_LIBRARY}
81+
${FOLLY_WITH_DEPENDENCIES}
82+
${GLOG}
83+
${GFLAGS_LIBRARIES}
84+
)
85+
86+
add_executable(cppDataSumWithMergeWriterClient DataSumWithMergeWriterClient.cpp)
87+
88+
target_link_libraries(cppDataSumWithMergeWriterClient dataSumWithMergeWriterClient)
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include <folly/init/Init.h>
19+
#include <cstdio>
20+
#include <fstream>
21+
#include <iostream>
22+
23+
#include <celeborn/client/ShuffleClient.h>
24+
25+
int main(int argc, char** argv) {
26+
folly::init(&argc, &argv, false);
27+
assert(argc == 10);
28+
std::string lifecycleManagerHost = argv[1];
29+
int lifecycleManagerPort = std::atoi(argv[2]);
30+
std::string appUniqueId = argv[3];
31+
int shuffleId = std::atoi(argv[4]);
32+
int attemptId = std::atoi(argv[5]);
33+
int numMappers = std::atoi(argv[6]);
34+
int numPartitions = std::atoi(argv[7]);
35+
std::string resultFile = argv[8];
36+
std::string compressCodec = argv[9];
37+
std::cout << "lifecycleManagerHost = " << lifecycleManagerHost
38+
<< ", lifecycleManagerPort = " << lifecycleManagerPort
39+
<< ", appUniqueId = " << appUniqueId
40+
<< ", shuffleId = " << shuffleId << ", attemptId = " << attemptId
41+
<< ", numMappers = " << numMappers
42+
<< ", numPartitions = " << numPartitions
43+
<< ", resultFile = " << resultFile
44+
<< ", compressCodec = " << compressCodec << std::endl;
45+
46+
auto conf = std::make_shared<celeborn::conf::CelebornConf>();
47+
conf->registerProperty(
48+
celeborn::conf::CelebornConf::kShuffleCompressionCodec, compressCodec);
49+
auto clientEndpoint =
50+
std::make_shared<celeborn::client::ShuffleClientEndpoint>(conf);
51+
auto shuffleClient = celeborn::client::ShuffleClientImpl::create(
52+
appUniqueId, conf, *clientEndpoint);
53+
shuffleClient->setupLifecycleManagerRef(
54+
lifecycleManagerHost, lifecycleManagerPort);
55+
56+
long maxData = 1000000;
57+
size_t numData = 1000;
58+
std::vector<long> result(numPartitions, 0);
59+
std::vector<size_t> dataCnt(numPartitions, 0);
60+
for (int mapId = 0; mapId < numMappers; mapId++) {
61+
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
62+
for (size_t i = 0; i < numData; i++) {
63+
int data = std::rand() % maxData;
64+
result[partitionId] += data;
65+
dataCnt[partitionId]++;
66+
std::string dataStr = "-" + std::to_string(data);
67+
shuffleClient->mergeData(
68+
shuffleId,
69+
mapId,
70+
attemptId,
71+
partitionId,
72+
reinterpret_cast<const uint8_t*>(dataStr.c_str()),
73+
0,
74+
dataStr.size(),
75+
numMappers,
76+
numPartitions);
77+
}
78+
}
79+
shuffleClient->pushMergedData(shuffleId, mapId, attemptId);
80+
shuffleClient->mapperEnd(shuffleId, mapId, attemptId, numMappers);
81+
}
82+
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
83+
std::cout << "partition " << partitionId
84+
<< " sum result = " << result[partitionId]
85+
<< ", dataCnt = " << dataCnt[partitionId] << std::endl;
86+
}
87+
88+
remove(resultFile.c_str());
89+
std::ofstream of(resultFile);
90+
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
91+
of << result[partitionId] << std::endl;
92+
}
93+
of.close();
94+
95+
return 0;
96+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.celeborn.service.deploy.cluster
19+
20+
import org.apache.celeborn.common.protocol.CompressionCodec
21+
22+
object CppMergeWriteJavaReadTestWithLZ4 extends JavaCppHybridReadWriteTestBase {
23+
24+
def main(args: Array[String]) = {
25+
testCppMergeWriteJavaRead(CompressionCodec.LZ4)
26+
}
27+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.celeborn.service.deploy.cluster
19+
20+
import org.apache.celeborn.common.protocol.CompressionCodec
21+
22+
object CppMergeWriteJavaReadTestWithNONE extends JavaCppHybridReadWriteTestBase {
23+
24+
def main(args: Array[String]) = {
25+
testCppMergeWriteJavaRead(CompressionCodec.NONE)
26+
}
27+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.celeborn.service.deploy.cluster
19+
20+
import org.apache.celeborn.common.protocol.CompressionCodec
21+
22+
object CppMergeWriteJavaReadTestWithZSTD extends JavaCppHybridReadWriteTestBase {
23+
24+
def main(args: Array[String]) = {
25+
testCppMergeWriteJavaRead(CompressionCodec.ZSTD)
26+
}
27+
}

worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/JavaCppHybridReadWriteTestBase.scala

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,4 +244,95 @@ trait JavaCppHybridReadWriteTestBase extends AnyFunSuite
244244
shuffleClient.shutdown()
245245
}
246246

247+
def testCppMergeWriteJavaRead(codec: CompressionCodec): Unit = {
248+
beforeAll()
249+
try {
250+
runCppMergeWriteJavaRead(codec)
251+
} finally {
252+
afterAll()
253+
}
254+
}
255+
256+
def runCppMergeWriteJavaRead(codec: CompressionCodec): Unit = {
257+
val appUniqueId = "test-app"
258+
val shuffleId = 0
259+
val attemptId = 0
260+
261+
val clientConf = new CelebornConf()
262+
.set(CelebornConf.MASTER_ENDPOINTS.key, s"localhost:$masterPort")
263+
.set(CelebornConf.SHUFFLE_COMPRESSION_CODEC.key, codec.name)
264+
.set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "true")
265+
.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
266+
.set(CelebornConf.READ_LOCAL_SHUFFLE_FILE, false)
267+
.set("celeborn.data.io.numConnectionsPerPeer", "1")
268+
val lifecycleManager = new LifecycleManager(appUniqueId, clientConf)
269+
270+
val shuffleClient =
271+
new ShuffleClientImpl(appUniqueId, clientConf, UserIdentifier("mock", "mock"))
272+
shuffleClient.setupLifecycleManagerRef(lifecycleManager.self)
273+
274+
val numMappers = 2
275+
val numPartitions = 2
276+
277+
val cppResultFile = "/tmp/celeborn-cpp-merge-writer-result.txt"
278+
val lifecycleManagerHost = lifecycleManager.getHost
279+
val lifecycleManagerPort = lifecycleManager.getPort
280+
val projectDirectory = new File(new File(".").getAbsolutePath)
281+
val cppBinRelativeDirectory = "cpp/build/celeborn/tests/"
282+
val cppBinFileName = "cppDataSumWithMergeWriterClient"
283+
val cppBinFilePath = s"$projectDirectory/$cppBinRelativeDirectory/$cppBinFileName"
284+
val cppCodec = codec.name()
285+
val command = {
286+
s"$cppBinFilePath $lifecycleManagerHost $lifecycleManagerPort $appUniqueId $shuffleId $attemptId $numMappers $numPartitions $cppResultFile $cppCodec"
287+
}
288+
println(s"run command: $command")
289+
val commandOutput = runCommand(command)
290+
println(s"command output: $commandOutput")
291+
292+
val metricsCallback = new MetricsCallback {
293+
override def incBytesRead(bytesWritten: Long): Unit = {}
294+
override def incReadTime(time: Long): Unit = {}
295+
}
296+
297+
var sums = new util.ArrayList[Long](numPartitions)
298+
for (partitionId <- 0 until numPartitions) {
299+
sums.add(0)
300+
val inputStream = shuffleClient.readPartition(
301+
shuffleId,
302+
partitionId,
303+
attemptId,
304+
0,
305+
0,
306+
Integer.MAX_VALUE,
307+
metricsCallback)
308+
var c = inputStream.read()
309+
var data: Long = 0
310+
var dataCnt = 0
311+
while (c != -1) {
312+
if (c == '-') {
313+
sums.set(partitionId, sums.get(partitionId) + data)
314+
data = 0
315+
dataCnt += 1
316+
} else {
317+
assert(c >= '0' && c <= '9')
318+
data *= 10
319+
data += c - '0'
320+
}
321+
c = inputStream.read()
322+
}
323+
sums.set(partitionId, sums.get(partitionId) + data)
324+
println(s"partition $partitionId sum result = ${sums.get(partitionId)}, dataCnt = $dataCnt")
325+
}
326+
327+
var lineCount = 0
328+
for (line <- Source.fromFile(cppResultFile, "utf-8").getLines.toList) {
329+
val data = line.toLong
330+
Assert.assertEquals(data, sums.get(lineCount))
331+
lineCount += 1
332+
}
333+
Assert.assertEquals(lineCount, numPartitions)
334+
lifecycleManager.stop()
335+
shuffleClient.shutdown()
336+
}
337+
247338
}

0 commit comments

Comments
 (0)