Skip to content

Commit 0e60696

Browse files
committed
Address pr comments
1 parent bb67a0e commit 0e60696

File tree

8 files changed

+148
-139
lines changed

8 files changed

+148
-139
lines changed

dbsync/common/src/main/java/com/google/edwmigration/dbsync/common/ChecksumGenerator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public ChecksumGenerator(@NonNegative int blockSize) {
2828
return blockSize;
2929
}
3030

31-
public void generate(Consumer<Checksum> out, ByteSource in) throws IOException {
31+
public void generate(ChecksumConsumer<IOException> out, ByteSource in) throws IOException {
3232
// This deliberately throws if size is not Present.
3333
Optional<Long> dataSizeOptional = in.sizeIfKnown();
3434
if (!dataSizeOptional.isPresent()) {
@@ -76,4 +76,9 @@ public void generate(Consumer<Checksum> out, ByteSource in) throws IOException {
7676
}
7777
}
7878
}
79+
80+
public interface ChecksumConsumer<X extends Exception> {
81+
82+
void accept(Checksum checksum) throws X;
83+
}
7984
}

dbsync/common/src/main/java/com/google/edwmigration/dbsync/common/InstructionGenerator.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ private static Instruction newLiteralInstruction(byte[] literalBuffer, int liter
3838
.build();
3939
}
4040

41-
public void generate(Consumer<? super Instruction> out, ByteSource in,
41+
public void generate(InstructionConsumer<? extends IOException> out, ByteSource in,
4242
List<? extends Checksum> checksums) throws IOException {
4343
Int2ObjectMap<Collection<Checksum>> checksumMap = new Int2ObjectOpenHashMap<>(checksums.size());
4444
for (Checksum c : checksums) {
@@ -74,9 +74,9 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
7474
// And the wisdom to know the difference.
7575
MATCH:
7676
if (cc != null) {
77-
HashCode strongHashCode = rollingChecksum.getStrongHashCode();
77+
String strongHashCode = rollingChecksum.getStrongHashCode().toString();
7878
for (Checksum c : cc) {
79-
if (c.getStrongChecksum().equals(strongHashCode.toString())) {
79+
if (c.getStrongChecksum().equals(strongHashCode)) {
8080
if (literalBufferLength > 0) {
8181
if (DEBUG) {
8282
LOG.debug("Emitting pre-match literal");
@@ -86,8 +86,8 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
8686
}
8787
Instruction insn = Instruction.newBuilder()
8888
.setBlockLocation(BlockLocation.newBuilder()
89-
.setBlockOffset(c.getBlockOffset())
90-
.setBlockLength(c.getBlockLength()))
89+
.setBlockOffset(c.getBlockOffset())
90+
.setBlockLength(c.getBlockLength()))
9191
.build();
9292
out.accept(insn);
9393
continue STREAM;
@@ -128,4 +128,13 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
128128
}
129129
}
130130
}
131+
132+
public interface InstructionConsumer<X extends Exception> {
133+
134+
/**
135+
* Accepts an instruction from the generator
136+
*/
137+
void accept(Instruction instruction) throws X;
138+
}
139+
131140
}

dbsync/common/src/main/java/com/google/edwmigration/dbsync/common/InstructionReceiver.java

Lines changed: 4 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,13 @@ public class InstructionReceiver implements Closeable {
2525

2626
private final OutputStream out;
2727
private final ByteSource in;
28-
private InputStream sourceStream;
29-
private long currentSourceOffset = 0;
3028
@CheckForSigned
3129
private long copyStart = -1;
3230
private long copyLength = 0;
3331

34-
public InstructionReceiver(@WillCloseWhenClosed OutputStream out, ByteSource in)
35-
throws IOException {
32+
public InstructionReceiver(@WillCloseWhenClosed OutputStream out, ByteSource in) {
3633
this.out = Preconditions.checkNotNull(out, "Output was null.");
3734
this.in = Preconditions.checkNotNull(in, "Input was null.");
38-
39-
this.sourceStream = in.openStream();
4035
}
4136

4237
public void receive(Instruction instruction) throws IOException {
@@ -68,64 +63,21 @@ public void close() throws IOException {
6863
flushCopy();
6964
} finally {
7065
out.close();
71-
sourceStream.close();
7266
}
7367
}
7468

7569
private void flushCopy() throws IOException {
7670
if (copyStart == -1) {
7771
return;
7872
}
79-
LOG.info(String.format("Reuse bytes from %d for %d bytes", copyStart, copyLength));
80-
81-
// If the requested block is before our current position,
82-
// we must re-open the stream from the beginning.
83-
if (copyStart < currentSourceOffset) {
84-
if (DEBUG) {
85-
LOG.info(String.format("Target offset %d smaller than current offset %d, reopen stream",
86-
copyStart, currentSourceOffset));
87-
}
88-
sourceStream.close();
89-
sourceStream = in.openStream();
90-
currentSourceOffset = 0;
73+
if (DEBUG) {
74+
LOG.info(String.format("Reuse bytes from %d for %d bytes", copyStart, copyLength));
9175
}
92-
// Skip forward to the desired start.
93-
long toSkip = copyStart - currentSourceOffset;
94-
skip(toSkip);
95-
copy(copyLength);
76+
in.slice(copyStart, copyLength).copyTo(out);
9677

9778
// Clear pending copy.
9879
copyStart = -1;
9980
copyLength = 0;
10081
}
10182

102-
private void copy(long copyLength) throws IOException {
103-
// Now copy exactly copyLength bytes.
104-
byte[] buffer = new byte[8192];
105-
long remaining = copyLength;
106-
while (remaining > 0) {
107-
int bytesToRead = (int) Math.min(buffer.length, remaining);
108-
int read = sourceStream.read(buffer, 0, bytesToRead);
109-
if (read == -1) {
110-
throw new EOFException("Unexpected end of stream while copying " + copyLength
111-
+ " bytes starting at offset " + copyStart);
112-
}
113-
out.write(buffer, 0, read);
114-
remaining -= read;
115-
currentSourceOffset += read;
116-
}
117-
}
118-
119-
private void skip(long length) throws IOException {
120-
long remaining = length;
121-
while (remaining > 0) {
122-
long skipped = sourceStream.skip(remaining);
123-
if (skipped < remaining) {
124-
throw new EOFException("Unexpected end of stream while skipping.");
125-
}
126-
remaining -= skipped;
127-
currentSourceOffset += skipped;
128-
}
129-
}
130-
13183
}

dbsync/gcsync/src/main/java/com/google/edwmigration/dbsync/gcsync/GcsyncClient.java

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.google.cloud.storage.Blob;
1414
import com.google.common.base.Preconditions;
1515
import com.google.common.hash.Hashing;
16+
import com.google.common.io.ByteSink;
1617
import com.google.common.io.ByteSource;
1718
import com.google.edwmigration.dbsync.common.InstructionGenerator;
1819
import com.google.edwmigration.dbsync.proto.Checksum;
@@ -21,6 +22,7 @@
2122
import java.io.IOException;
2223
import java.io.InputStream;
2324
import java.io.OutputStream;
25+
import java.io.OutputStreamWriter;
2426
import java.net.URI;
2527
import java.net.URISyntaxException;
2628
import java.nio.file.DirectoryStream;
@@ -148,18 +150,16 @@ private void uploadJar() throws URISyntaxException, IOException {
148150
}
149151

150152
private void uploadFilesToRsyncList() throws URISyntaxException, IOException {
151-
Path tempFile = Files.createTempFile("filesToRsyncTmp", ".txt");
152-
try (BufferedWriter writer = Files.newBufferedWriter(tempFile)) {
153+
ByteSink byteSink = gcsStorage.newByteSink(
154+
new URI(tmpBucket).resolve(Constants.FILES_TO_RSYNC_FILE_NAME));
155+
156+
try (BufferedWriter writer = new BufferedWriter(
157+
new OutputStreamWriter(byteSink.openBufferedStream()))) {
153158
for (Path file : filesToRsync) {
154159
writer.write(file.getFileName().toString());
155160
writer.newLine();
156161
}
157162
}
158-
159-
gcsStorage.uploadFile(tempFile,
160-
new URI(tmpBucket).resolve(Constants.FILES_TO_RSYNC_FILE_NAME));
161-
162-
Files.deleteIfExists(tempFile);
163163
}
164164

165165
private void executeMainOnCloudRun(String mainClassPath)
@@ -183,39 +183,30 @@ private void downloadCheckSumFiles() throws URISyntaxException, IOException {
183183
ByteSource byteSource = gcsStorage.newByteSource(
184184
new URI(tmpBucket).resolve(checksumFileName));
185185
byteSource.copyTo(
186-
com.google.common.io.Files.asByteSink(getTemporaryCheckSumFileName(file).toFile()));
186+
com.google.common.io.Files.asByteSink(Util.getTemporaryCheckSumFilePath(file).toFile()));
187187
}
188188
}
189189

190190
private void sendRsyncInstructions() throws IOException, URISyntaxException {
191191
for (Path file : filesToRsync) {
192-
Path tmpCheckSumFile = getTemporaryCheckSumFileName(file);
192+
Path tmpCheckSumFile = Util.getTemporaryCheckSumFilePath(file);
193193
try (OutputStream instructionSink = gcsStorage.newByteSink(
194194
new URI(tmpBucket).resolve(Util.getInstructionFileName(file.getFileName().toString())))
195195
.openBufferedStream()) {
196196
try (InputStream inputStream = Files.newInputStream(tmpCheckSumFile)) {
197197
List<Checksum> checksums = getChecksumsFromFile(inputStream);
198198
ByteSource fileInput = com.google.common.io.Files.asByteSource(file.toFile());
199199

200-
instructionGenerator.generate(instruction -> {
201-
try {
202-
instruction.writeDelimitedTo(instructionSink);
203-
} catch (IOException e) {
204-
throw new RuntimeException(e);
205-
}
206-
}, fileInput, checksums);
200+
instructionGenerator.generate(instruction ->
201+
instruction.writeDelimitedTo(instructionSink)
202+
, fileInput, checksums);
207203
}
208204
}
209205

210206
Files.deleteIfExists(tmpCheckSumFile);
211207
}
212208
}
213209

214-
private static Path getTemporaryCheckSumFileName(Path file) {
215-
return file.resolveSibling(
216-
String.format("%s.%s", file.getFileName(), Constants.CHECK_SUM_FILE_SUFFIX));
217-
}
218-
219210
private void reconStructFiles()
220211
throws IOException, ExecutionException, InterruptedException, URISyntaxException {
221212
executeMainOnCloudRun(Constants.RECONSTRUCT_FILE_MAIN);

dbsync/gcsync/src/main/java/com/google/edwmigration/dbsync/gcsync/GenerateCheckSumMain.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,8 @@ public static void main(String[] args) throws Exception {
4141
ByteSink byteSink = gcsStorage.newByteSink(
4242
new URI(tmpBucket).resolve(Util.getCheckSumFileName(file)));
4343
try (OutputStream bufferedOutputStream = byteSink.openBufferedStream()) {
44-
checksumGenerator.generate(checksum -> {
45-
try {
46-
checksum.writeDelimitedTo(bufferedOutputStream);
47-
} catch (IOException e) {
48-
throw new RuntimeException("Failed to generate checksum", e);
49-
}
50-
}, byteSource);
44+
checksumGenerator.generate(checksum ->
45+
checksum.writeDelimitedTo(bufferedOutputStream), byteSource);
5146
logger.log(Level.INFO,
5247
String.format("Finished generating check sum for: %s", file));
5348
}

dbsync/gcsync/src/main/java/com/google/edwmigration/dbsync/gcsync/Util.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.io.BufferedReader;
55
import java.io.IOException;
66
import java.io.InputStreamReader;
7+
import java.nio.file.Path;
78
import java.util.ArrayList;
89
import java.util.List;
910

@@ -22,6 +23,10 @@ public static List<String> getListOfFiles(ByteSource byteSource) throws IOExcept
2223
return files;
2324
}
2425

26+
public static Path getTemporaryCheckSumFilePath(Path file) {
27+
return file.resolveSibling(getCheckSumFileName(file.getFileName().toString()));
28+
}
29+
2530
public static String getCheckSumFileName(String fileName) {
2631
return String.format("%s.%s", fileName, Constants.CHECK_SUM_FILE_SUFFIX);
2732
}

dbsync/gcsync/src/test/java/com/google/edwmigration/dbsync/gcsync/GcsyncClientTest.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -165,31 +165,6 @@ public void tearDown() {
165165
// No special cleanup required; TemporaryFolder is auto-cleaned.
166166
}
167167

168-
@Test
169-
public void testSyncFiles_LargeFileMd5Mismatch_RunsJobAndDeletesJob()
170-
throws Exception {
171-
// Suppose the large file on GCS has an MD5 mismatch
172-
Blob mockBlob = mockGcsStorage.getBlob(TARGET_BUCKET, "large.txt");
173-
reset(mockBlob);
174-
when(mockBlob.getMd5()).thenReturn("some-other-md5");
175-
176-
try {
177-
clientUnderTest.syncFiles();
178-
} catch (Exception e) {
179-
// It's supposed to fail at instruction generation step
180-
}
181-
182-
List<Path> rsyncFiles = getPrivateList(clientUnderTest, "filesToRsync");
183-
assertTrue("large.txt should be in the rsync list", rsyncFiles.contains(largeFile.toPath()));
184-
185-
verify(mockJobsClient, times(1)).createJobAsync(any(CreateJobRequest.class));
186-
187-
verify(mockJobsClient.runJobOperationCallable(), times(1))
188-
.futureCall(any(RunJobRequest.class));
189-
190-
verify(mockJobsClient, times(1)).deleteJobAsync(any(JobName.class));
191-
}
192-
193168
@Test
194169
public void testSyncFiles_LargeFileMd5Matches_NoCloudRun() throws Exception {
195170
clientUnderTest.syncFiles();

0 commit comments

Comments
 (0)