Skip to content

Commit bb67a0e

Browse files
committed
Some updates
1 parent 950a0c2 commit bb67a0e

File tree

4 files changed

+154
-72
lines changed

4 files changed

+154
-72
lines changed

dbsync/common/src/main/java/com/google/edwmigration/dbsync/common/InstructionReceiver.java

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public class InstructionReceiver implements Closeable {
2121
@SuppressWarnings("unused")
2222
private static final Logger LOG = LoggerFactory.getLogger(InstructionReceiver.class);
2323

24+
private static final boolean DEBUG = false;
25+
2426
private final OutputStream out;
2527
private final ByteSource in;
2628
private InputStream sourceStream;
@@ -37,21 +39,67 @@ public InstructionReceiver(@WillCloseWhenClosed OutputStream out, ByteSource in)
3739
this.sourceStream = in.openStream();
3840
}
3941

42+
public void receive(Instruction instruction) throws IOException {
43+
switch (instruction.getBodyCase()) {
44+
case BLOCKLOCATION:
45+
BlockLocation match = instruction.getBlockLocation();
46+
if (copyStart + copyLength == match.getBlockOffset()) {
47+
// We have a consecutive copy. This cannot happen if copyStart == -1.
48+
copyLength += match.getBlockLength();
49+
} else {
50+
// We have either no copy, or a non-consecutive copy.
51+
flushCopy();
52+
copyStart = match.getBlockOffset();
53+
copyLength = match.getBlockLength();
54+
}
55+
break;
56+
case DATA:
57+
flushCopy();
58+
ByteString data = instruction.getData();
59+
data.writeTo(out);
60+
break;
61+
default:
62+
throw new IllegalArgumentException("Unknown instruction type " + instruction.getClass());
63+
}
64+
}
65+
66+
public void close() throws IOException {
67+
try {
68+
flushCopy();
69+
} finally {
70+
out.close();
71+
sourceStream.close();
72+
}
73+
}
74+
4075
private void flushCopy() throws IOException {
4176
if (copyStart == -1) {
4277
return;
4378
}
79+
LOG.info(String.format("Reuse bytes from %d for %d bytes", copyStart, copyLength));
80+
4481
// If the requested block is before our current position,
4582
// we must re-open the stream from the beginning.
4683
if (copyStart < currentSourceOffset) {
84+
if (DEBUG) {
85+
LOG.info(String.format("Target offset %d smaller than current offset %d, reopen stream",
86+
copyStart, currentSourceOffset));
87+
}
4788
sourceStream.close();
4889
sourceStream = in.openStream();
4990
currentSourceOffset = 0;
5091
}
5192
// Skip forward to the desired start.
5293
long toSkip = copyStart - currentSourceOffset;
5394
skip(toSkip);
95+
copy(copyLength);
5496

97+
// Clear pending copy.
98+
copyStart = -1;
99+
copyLength = 0;
100+
}
101+
102+
private void copy(long copyLength) throws IOException {
55103
// Now copy exactly copyLength bytes.
56104
byte[] buffer = new byte[8192];
57105
long remaining = copyLength;
@@ -66,9 +114,6 @@ private void flushCopy() throws IOException {
66114
remaining -= read;
67115
currentSourceOffset += read;
68116
}
69-
// Clear pending copy.
70-
copyStart = -1;
71-
copyLength = 0;
72117
}
73118

74119
private void skip(long length) throws IOException {
@@ -83,35 +128,4 @@ private void skip(long length) throws IOException {
83128
}
84129
}
85130

86-
public void receive(Instruction instruction) throws IOException {
87-
switch (instruction.getBodyCase()) {
88-
case BLOCKLOCATION:
89-
BlockLocation match = instruction.getBlockLocation();
90-
if (copyStart + copyLength == match.getBlockOffset()) {
91-
// We have a consecutive copy. This cannot happen if copyStart == -1.
92-
copyLength += match.getBlockLength();
93-
} else {
94-
// We have either no copy, or a non-consecutive copy.
95-
flushCopy();
96-
copyStart = match.getBlockOffset();
97-
copyLength = match.getBlockLength();
98-
}
99-
break;
100-
case DATA:
101-
flushCopy();
102-
ByteString data = instruction.getData();
103-
data.writeTo(out);
104-
break;
105-
default:
106-
throw new IllegalArgumentException("Unknown instruction type " + instruction.getClass());
107-
}
108-
}
109-
110-
public void close() throws IOException {
111-
try {
112-
flushCopy();
113-
} finally {
114-
out.close();
115-
}
116-
}
117131
}

dbsync/gcsync/src/main/java/com/google/edwmigration/dbsync/gcsync/GcsyncClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class GcsyncClient {
5858
private static final Logger logger = Logger.getLogger("rsync");
5959

6060
public GcsyncClient(String project, String tmpBucket, String targetBucket,
61-
String location, JobsClient jobsClient, String sourceDirectory, GcsStorage gcsStorage,
61+
String location, String sourceDirectory, JobsClient jobsClient, GcsStorage gcsStorage,
6262
InstructionGenerator instructionGenerator) {
6363
this.project = project;
6464
this.tmpBucket = tmpBucket;

dbsync/gcsync/src/main/java/com/google/edwmigration/dbsync/gcsync/GcsyncMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ public static void main(String[] args) throws IOException {
2020
project,
2121
arguments.getOptions().valueOf(arguments.tmpBucketOptionSpec),
2222
arguments.getOptions().valueOf(arguments.targetOptionSpec),
23+
arguments.getOptions().valueOf(arguments.locationOptionSpec),
2324
arguments.getOptions().valueOf(arguments.sourceDirectoryOptionSpec),
2425
JobsClient.create(),
25-
arguments.getOptions().valueOf(arguments.locationOptionSpec),
2626
new GcsStorage(project),
2727
new InstructionGenerator(Constants.BLOCK_SIZE));
2828
try {

dbsync/gcsync/src/test/java/com/google/edwmigration/dbsync/gcsync/GcsyncClientTest.java

Lines changed: 104 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,34 @@
33
import static org.junit.Assert.assertFalse;
44
import static org.junit.Assert.assertTrue;
55
import static org.mockito.ArgumentMatchers.any;
6+
import static org.mockito.ArgumentMatchers.eq;
67
import static org.mockito.Mockito.atLeastOnce;
78
import static org.mockito.Mockito.doNothing;
8-
import static org.mockito.Mockito.eq;
99
import static org.mockito.Mockito.mock;
1010
import static org.mockito.Mockito.never;
11+
import static org.mockito.Mockito.reset;
12+
import static org.mockito.Mockito.times;
1113
import static org.mockito.Mockito.verify;
1214
import static org.mockito.Mockito.when;
1315

16+
import com.google.api.gax.longrunning.OperationFuture;
17+
import com.google.api.gax.rpc.OperationCallable;
1418
import com.google.cloud.run.v2.CreateJobRequest;
19+
import com.google.cloud.run.v2.Execution;
20+
import com.google.cloud.run.v2.Job;
21+
import com.google.cloud.run.v2.JobName;
1522
import com.google.cloud.run.v2.JobsClient;
23+
import com.google.cloud.run.v2.RunJobRequest;
1624
import com.google.cloud.storage.Blob;
25+
import com.google.common.io.ByteSink;
26+
import com.google.common.io.ByteSource;
1727
import com.google.common.io.Files;
1828
import com.google.edwmigration.dbsync.common.InstructionGenerator;
1929
import com.google.edwmigration.dbsync.storage.gcs.GcsStorage;
30+
import java.io.BufferedOutputStream;
2031
import java.io.File;
2132
import java.io.IOException;
33+
import java.io.OutputStream;
2234
import java.nio.file.Path;
2335
import java.util.Base64;
2436
import java.util.List;
@@ -28,7 +40,6 @@
2840
import org.junit.Test;
2941
import org.junit.rules.TemporaryFolder;
3042

31-
@SuppressWarnings("unchecked")
3243
public class GcsyncClientTest {
3344

3445
private static final String PROJECT = "dummy-project";
@@ -50,7 +61,7 @@ public class GcsyncClientTest {
5061
private GcsyncClient clientUnderTest;
5162

5263
// For test simplicity, define a smaller threshold for "rsync" logic
53-
private static final long RSYNC_SIZE_THRESHOLD = Constants.RSYNC_SIZE_THRESHOLD;
64+
private static final long RSYNC_SIZE_THRESHOLD = 1024; // e.g. 1KB
5465

5566
@Before
5667
public void setUp() throws Exception {
@@ -79,21 +90,74 @@ public void setUp() throws Exception {
7990
TMP_BUCKET,
8091
TARGET_BUCKET,
8192
LOCATION,
82-
mockJobsClient,
8393
sourceDir.getAbsolutePath(),
94+
mockJobsClient,
8495
mockGcsStorage,
85-
mockInstructionGenerator
96+
mockInstructionGenerator// The key new argument
8697
);
8798

8899
// Stub GcsStorage calls:
89-
// - If "small.txt" does not exist on GCS => returns null
100+
// - "small.txt" does not exist on GCS => returns null => triggers upload
90101
when(mockGcsStorage.getBlob(eq(TARGET_BUCKET), eq("small.txt"))).thenReturn(null);
91-
// - If "large.txt" exists on GCS => return a mock Blob
102+
103+
// - "large.txt" => return a mock Blob (we'll set its MD5 dynamically in each test)
92104
Blob mockBlob = mock(Blob.class);
93105
when(mockGcsStorage.getBlob(eq(TARGET_BUCKET), eq("large.txt"))).thenReturn(mockBlob);
106+
String localMd5 = generateMd5(largeFile);
107+
when(mockBlob.getMd5()).thenReturn(localMd5);
94108

95-
// Also stub out any uploading just to verify calls (optional)
109+
when(mockGcsStorage.newByteSource(any())).thenReturn(mock(ByteSource.class));
110+
ByteSink mockByteSink = mock(ByteSink.class);
111+
when(mockGcsStorage.newByteSink(any())).thenReturn(mockByteSink);
112+
when(mockByteSink.openBufferedStream()).thenReturn(mock(BufferedOutputStream.class));
113+
when(mockByteSink.openStream()).thenReturn(mock(OutputStream.class));
114+
115+
// Stub out any uploading just to verify calls (optional)
96116
doNothing().when(mockGcsStorage).uploadFile(any(Path.class), any());
117+
118+
// --------------------------------------------------------------------------------
119+
// Setup Mocks for the JobsClient calls that happen in "executeMainOnCloudRun(...)"
120+
// so they don't actually hit GCP.
121+
122+
// 1) createJobAsync(...) => returns an OperationFuture<Job, OperationMetadata>
123+
@SuppressWarnings("unchecked")
124+
OperationFuture<Job, Job> mockCreateFuture = mock(OperationFuture.class);
125+
126+
// We'll stub .get() to return a dummy Job so we don't throw an exception
127+
Job dummyJob = Job.newBuilder()
128+
.setName("projects/dummy-project/locations/us-central1/jobs/dummyJob").build();
129+
try {
130+
when(mockCreateFuture.get()).thenReturn(dummyJob);
131+
} catch (Exception e) {
132+
// ignored
133+
}
134+
135+
@SuppressWarnings("unchecked")
136+
OperationFuture<Execution, Execution> mockRunFuture = mock(OperationFuture.class);
137+
Execution dummyExec = Execution.newBuilder().setName("dummyExecution").build();
138+
try {
139+
when(mockRunFuture.get()).thenReturn(dummyExec);
140+
} catch (Exception e) {
141+
// ignored
142+
}
143+
144+
@SuppressWarnings("unchecked")
145+
OperationFuture<Job, Job> mockDeleteFuture = mock(OperationFuture.class);
146+
try {
147+
when(mockDeleteFuture.get()).thenReturn(dummyJob);
148+
} catch (Exception e) {
149+
// ignored
150+
}
151+
152+
OperationCallable<RunJobRequest, Execution, Execution> operationCallable = mock(
153+
OperationCallable.class);
154+
155+
// Now wire them all up
156+
when(mockJobsClient.createJobAsync(any(CreateJobRequest.class))).thenReturn(mockCreateFuture);
157+
when(mockJobsClient.runJobOperationCallable()).thenReturn(operationCallable);
158+
when(operationCallable.futureCall(any(RunJobRequest.class))).thenReturn(
159+
mockRunFuture);
160+
when(mockJobsClient.deleteJobAsync(any(JobName.class))).thenReturn(mockDeleteFuture);
97161
}
98162

99163
@After
@@ -102,56 +166,60 @@ public void tearDown() {
102166
}
103167

104168
@Test
105-
public void testSyncFiles_LargeFileRsync_TriggersChecksumAndRunJob()
169+
public void testSyncFiles_LargeFileMd5Mismatch_RunsJobAndDeletesJob()
106170
throws Exception {
107171
// Suppose the large file on GCS has an MD5 mismatch
108172
Blob mockBlob = mockGcsStorage.getBlob(TARGET_BUCKET, "large.txt");
173+
reset(mockBlob);
109174
when(mockBlob.getMd5()).thenReturn("some-other-md5");
110175

111-
// Now calling syncFiles() should eventually do:
112-
// 1) Mark large.txt for rsync
113-
// 2) call computeCheckSum() => executeMainOnCloudRun() => jobsClient calls
114-
clientUnderTest.syncFiles();
176+
try {
177+
clientUnderTest.syncFiles();
178+
} catch (Exception e) {
179+
// It's supposed to fail at instruction generation step
180+
}
115181

116-
// verify that large.txt is in "filesToRsync"
117182
List<Path> rsyncFiles = getPrivateList(clientUnderTest, "filesToRsync");
118-
assertTrue("large.txt should be in rsync list", rsyncFiles.contains(largeFile.toPath()));
183+
assertTrue("large.txt should be in the rsync list", rsyncFiles.contains(largeFile.toPath()));
184+
185+
verify(mockJobsClient, times(1)).createJobAsync(any(CreateJobRequest.class));
186+
187+
verify(mockJobsClient.runJobOperationCallable(), times(1))
188+
.futureCall(any(RunJobRequest.class));
119189

120-
// We know from the code that computeCheckSum() calls "runCloudRunJob(...)"
121-
// So let's verify that we used the mockJobsClient to create & run a job:
122-
verify(mockJobsClient, atLeastOnce()).createJobAsync(any(CreateJobRequest.class));
123-
verify(mockJobsClient, atLeastOnce()).runJobOperationCallable();
190+
verify(mockJobsClient, times(1)).deleteJobAsync(any(JobName.class));
124191
}
125192

126193
@Test
127-
public void testSyncFiles_SmallFileUpload_NoCloudRunJob()
128-
throws Exception {
129-
// getBlob(...) for "small.txt" => null => we upload it, but do *not* run Cloud Run job
194+
public void testSyncFiles_LargeFileMd5Matches_NoCloudRun() throws Exception {
130195
clientUnderTest.syncFiles();
131196

132-
// Check that small.txt is in "filesToUpload"
133-
List<Path> uploadFiles = getPrivateList(clientUnderTest, "filesToUpload");
134-
assertTrue("small.txt should be in upload list", uploadFiles.contains(smallFile.toPath()));
197+
List<Path> rsyncFiles = getPrivateList(clientUnderTest, "filesToRsync");
198+
assertFalse("large.txt should NOT be in the rsync list if MD5 matches",
199+
rsyncFiles.contains(largeFile.toPath()));
135200

136-
// Because there's no large file mismatch, we do not run any job
201+
// No job calls
137202
verify(mockJobsClient, never()).createJobAsync(any(CreateJobRequest.class));
138-
verify(mockJobsClient, never()).runJobOperationCallable();
203+
verify(mockJobsClient.runJobOperationCallable(), never()).futureCall(any(RunJobRequest.class));
204+
verify(mockJobsClient, never()).deleteJobAsync(any(JobName.class));
139205
}
140206

141207
@Test
142-
public void testSyncFiles_LargeFileMatchesMd5_NoCloudRun() throws Exception {
143-
// If large.txt on GCS has the same MD5 => no rsync => no job
144-
Blob mockBlob = mockGcsStorage.getBlob(TARGET_BUCKET, "large.txt");
145-
String localMd5 = generateMd5(largeFile);
146-
when(mockBlob.getMd5()).thenReturn(localMd5);
147-
208+
public void testSyncFiles_SmallFileNotOnGcs_UploadsItButNoJob() throws Exception {
209+
// small.txt not on GCS => we upload it
210+
// But no job because there's no large file mismatch
148211
clientUnderTest.syncFiles();
149212

150-
List<Path> rsyncFiles = getPrivateList(clientUnderTest, "filesToRsync");
151-
assertFalse("large.txt should not be rsync-ed", rsyncFiles.contains(largeFile.toPath()));
213+
// check that small file is in "filesToUpload"
214+
List<Path> uploadFiles = getPrivateList(clientUnderTest, "filesToUpload");
215+
assertTrue("small.txt should be in upload list", uploadFiles.contains(smallFile.toPath()));
216+
217+
verify(mockGcsStorage, atLeastOnce()).uploadFile(eq(smallFile.toPath()), any());
152218

219+
// No Cloud Run job triggered
153220
verify(mockJobsClient, never()).createJobAsync(any(CreateJobRequest.class));
154-
verify(mockJobsClient, never()).runJobOperationCallable();
221+
verify(mockJobsClient.runJobOperationCallable(), never()).futureCall(any(RunJobRequest.class));
222+
verify(mockJobsClient, never()).deleteJobAsync(any(JobName.class));
155223
}
156224

157225
// ----------------------------------------------------------------------------------

0 commit comments

Comments
 (0)