Skip to content

Commit 2b9bf6b

Browse files
committed
Address comments #2
1 parent 0e60696 commit 2b9bf6b

File tree

15 files changed

+175
-118
lines changed

15 files changed

+175
-118
lines changed

dbsync/common/src/main/java/com/google/edwmigration/dbsync/common/InstructionReceiver.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
package com.google.edwmigration.dbsync.common;
22

3+
import com.google.edwmigration.dbsync.common.storage.DbsyncByteSource;
34
import com.google.edwmigration.dbsync.proto.BlockLocation;
45
import com.google.edwmigration.dbsync.proto.Instruction;
56
import com.google.common.base.Preconditions;
6-
import com.google.common.io.ByteSource;
77
import com.google.protobuf.ByteString;
8-
import java.io.BufferedInputStream;
98
import java.io.Closeable;
10-
import java.io.EOFException;
119
import java.io.IOException;
12-
import java.io.InputStream;
1310
import java.io.OutputStream;
1411
import javax.annotation.CheckForSigned;
1512
import javax.annotation.WillCloseWhenClosed;
@@ -24,12 +21,12 @@ public class InstructionReceiver implements Closeable {
2421
private static final boolean DEBUG = false;
2522

2623
private final OutputStream out;
27-
private final ByteSource in;
24+
private final DbsyncByteSource in;
2825
@CheckForSigned
2926
private long copyStart = -1;
3027
private long copyLength = 0;
3128

32-
public InstructionReceiver(@WillCloseWhenClosed OutputStream out, ByteSource in) {
29+
public InstructionReceiver(@WillCloseWhenClosed OutputStream out, DbsyncByteSource in) {
3330
this.out = Preconditions.checkNotNull(out, "Output was null.");
3431
this.in = Preconditions.checkNotNull(in, "Input was null.");
3532
}
@@ -73,7 +70,7 @@ private void flushCopy() throws IOException {
7370
if (DEBUG) {
7471
LOG.info(String.format("Reuse bytes from %d for %d bytes", copyStart, copyLength));
7572
}
76-
in.slice(copyStart, copyLength).copyTo(out);
73+
in.copy(copyStart, copyLength, out);
7774

7875
// Clear pending copy.
7976
copyStart = -1;

dbsync/common/src/main/java/com/google/edwmigration/dbsync/common/storage/AbstractRemoteByteSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import javax.annotation.CheckReturnValue;
88
import org.checkerframework.checker.nullness.qual.Nullable;
99

10-
public abstract class AbstractRemoteByteSource extends ByteSource {
10+
public abstract class AbstractRemoteByteSource extends DbsyncByteSource {
1111

1212
private final Slice slice;
1313

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package com.google.edwmigration.dbsync.common.storage;
2+
3+
import com.google.common.io.ByteSource;
4+
import java.io.EOFException;
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.io.OutputStream;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
/**
12+
* A {@link ByteSource} that provides an extra method for copying bytes used in Dbsync's
13+
* "InstructionReceiver". It provides an implementation based on holding an internal
14+
* {@link InputStream} and avoid reopening it unless copying a block priori than the current offset
15+
* is needed which should be rare in rsync. Subclasses can override the implementation.
16+
*/
17+
public abstract class DbsyncByteSource extends ByteSource {
18+
19+
private static final boolean DEBUG = false;
20+
21+
private static final Logger LOG = LoggerFactory.getLogger(DbsyncByteSource.class);
22+
23+
private long currentSourceOffset = 0;
24+
private InputStream sourceStream;
25+
26+
public long copy(long offset, long copyLength, OutputStream out) throws IOException {
27+
if (sourceStream == null) {
28+
sourceStream = newStream();
29+
}
30+
31+
if (offset < currentSourceOffset) {
32+
if (DEBUG) {
33+
LOG.info(String.format("Target offset %d smaller than current offset %d, reopen stream",
34+
offset, currentSourceOffset));
35+
}
36+
sourceStream.close();
37+
sourceStream = newStream();
38+
currentSourceOffset = 0;
39+
}
40+
41+
// Skip forward to the desired start.
42+
long toSkip = offset - currentSourceOffset;
43+
skip(toSkip);
44+
return copyBytes(copyLength, out);
45+
}
46+
47+
private void skip(long copyLength) throws IOException {
48+
long remaining = copyLength;
49+
while (remaining > 0) {
50+
long skipped = sourceStream.skip(remaining);
51+
if (skipped < remaining) {
52+
throw new EOFException("Unexpected end of stream while skipping.");
53+
}
54+
remaining -= skipped;
55+
currentSourceOffset += skipped;
56+
}
57+
}
58+
59+
private long copyBytes(long copyLength, OutputStream out) throws IOException {
60+
// Now copy exactly copyLength bytes.
61+
byte[] buffer = new byte[8192];
62+
long remaining = copyLength;
63+
while (remaining > 0) {
64+
int bytesToRead = (int) Math.min(buffer.length, remaining);
65+
int read = sourceStream.read(buffer, 0, bytesToRead);
66+
if (read == -1) {
67+
throw new EOFException("Unexpected end of stream while copying " + copyLength
68+
+ " bytes starting at offset " + currentSourceOffset);
69+
}
70+
out.write(buffer, 0, read);
71+
remaining -= read;
72+
currentSourceOffset += read;
73+
}
74+
75+
// As we always read until there's no remaining bytes or throw.
76+
return copyLength;
77+
}
78+
79+
public abstract InputStream newStream() throws IOException;
80+
}

dbsync/common/src/main/java/com/google/edwmigration/dbsync/jdbc/JdbcByteSource.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.google.edwmigration.dbsync.jdbc;
22

33
import com.google.common.base.Throwables;
4-
import com.google.common.io.ByteSource;
4+
import com.google.edwmigration.dbsync.common.storage.DbsyncByteSource;
55
import java.io.IOException;
66
import java.io.InputStream;
77
import java.io.OutputStream;
@@ -17,11 +17,13 @@
1717
import org.slf4j.Logger;
1818
import org.slf4j.LoggerFactory;
1919

20-
public class JdbcByteSource extends ByteSource {
20+
public class JdbcByteSource extends DbsyncByteSource {
21+
2122
@SuppressWarnings("unused")
2223
private static final Logger LOG = LoggerFactory.getLogger(JdbcByteSource.class);
2324

2425
private class JdbcThread extends Thread {
26+
2527
private final OutputStream out;
2628
private final AtomicReference<Throwable> throwable = new AtomicReference<>();
2729

@@ -76,4 +78,9 @@ public String toString() {
7678
thread.start();
7779
return in;
7880
}
81+
82+
@Override
83+
public InputStream newStream() throws IOException {
84+
return openStream();
85+
}
7986
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.google.edwmigration.dbsync.test;
2+
3+
import com.google.edwmigration.dbsync.common.storage.DbsyncByteSource;
4+
import java.io.ByteArrayInputStream;
5+
import java.io.InputStream;
6+
7+
public class DummyDbsyncBytesCopier extends DbsyncByteSource {
8+
9+
private byte[] bytes;
10+
11+
private DummyDbsyncBytesCopier(byte[] bytes) {
12+
this.bytes = bytes;
13+
}
14+
15+
@Override
16+
public InputStream openStream() {
17+
throw new UnsupportedOperationException("DummyDbsyncByteSource doesn't support this operation");
18+
}
19+
20+
public static DummyDbsyncBytesCopier wrap(byte[] bytes) {
21+
return new DummyDbsyncBytesCopier(bytes);
22+
}
23+
24+
@Override
25+
public InputStream newStream() {
26+
return new ByteArrayInputStream(bytes);
27+
}
28+
}

dbsync/common/src/testFixtures/java/com/google/edwmigration/dbsync/test/RsyncTestRunner.java

+17-9
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.slf4j.LoggerFactory;
2323

2424
public class RsyncTestRunner {
25+
2526
@SuppressWarnings("unused")
2627
private static final Logger LOG = LoggerFactory.getLogger(RsyncTestRunner.class);
2728

@@ -64,34 +65,40 @@ public boolean isFlag(Flag flag) {
6465
return flags.contains(flag);
6566
}
6667

67-
private List<Checksum> checksum() throws Exception{
68-
if (isFlag(Flag.PrintServerRaw))
68+
private List<Checksum> checksum() throws Exception {
69+
if (isFlag(Flag.PrintServerRaw)) {
6970
LOG.debug("Server (initial) data is " + Arrays.toString(serverData.read()));
71+
}
7072
ChecksumGenerator generator = new ChecksumGenerator(blockSize);
7173
List<Checksum> checksums = new ArrayList<>();
7274
generator.generate(checksums::add, serverData);
73-
if (isFlag(Flag.PrintChecksums))
75+
if (isFlag(Flag.PrintChecksums)) {
7476
LOG.debug("Server checksums are " + checksums);
77+
}
7578
return checksums;
7679
}
7780

78-
private List<Instruction> instruct(List<Checksum> checksums) throws Exception {
79-
if (isFlag(Flag.PrintClientRaw))
81+
private List<Instruction> instruct(List<Checksum> checksums) throws Exception {
82+
if (isFlag(Flag.PrintClientRaw)) {
8083
LOG.debug("Client data is " + Arrays.toString(clientData.read()));
84+
}
8185
List<Instruction> instructions = new ArrayList<>();
8286
InstructionGenerator matcher = new InstructionGenerator(blockSize);
8387
matcher.generate(instructions::add, clientData, checksums);
84-
if (isFlag(Flag.PrintInstructions))
88+
if (isFlag(Flag.PrintInstructions)) {
8589
LOG.debug("Client instructions are\n" + Joiner.on('\n').join(instructions));
90+
}
8691
int instructionSize = 0;
87-
for (Instruction instruction : instructions)
92+
for (Instruction instruction : instructions) {
8893
instructionSize += instruction.getSerializedSize();
94+
}
8995
LOG.debug("Client instructions size is " + instructionSize);
9096
return instructions;
9197
}
9298

9399
private void reconstruct(OutputStream out, List<Instruction> instructions) throws Exception {
94-
try (InstructionReceiver receiver = new InstructionReceiver(out, serverData)) {
100+
try (InstructionReceiver receiver = new InstructionReceiver(out,
101+
DummyDbsyncBytesCopier.wrap(serverData.read()))) {
95102
for (Instruction instruction : instructions) {
96103
// LOG.info("Instruction: " + instruction.toPrettyString());
97104
receiver.receive(instruction);
@@ -103,8 +110,9 @@ private byte[] reconstruct(List<Instruction> instructions) throws Exception {
103110
ByteArrayOutputStream out = new ByteArrayOutputStream();
104111
reconstruct(out, instructions);
105112
byte[] serverDataNew = out.toByteArray();
106-
if (isFlag(Flag.PrintServerNewRaw))
113+
if (isFlag(Flag.PrintServerNewRaw)) {
107114
LOG.debug("Server (result) data is " + Arrays.toString(serverDataNew));
115+
}
108116
LOG.debug("Server (result) data size is " + serverDataNew.length);
109117
assertArrayEquals(clientData.read(), serverDataNew, name + " got mismatched data.");
110118
return serverDataNew;

dbsync/gcsync/src/main/java/com/google/edwmigration/dbsync/gcsync/GcsyncMain.java

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
public class GcsyncMain {
1414

15-
// --source_directory /usr/local/google/home/tonyxiaowei/tony-work
1615
public static void main(String[] args) throws IOException {
1716
Arguments arguments = new Arguments(args);
1817
String project = arguments.getOptions().valueOf(arguments.projectOptionSpec);

dbsync/gcsync/src/main/java/com/google/edwmigration/dbsync/gcsync/ReconstructFilesMain.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
import static com.google.edwmigration.dbsync.gcsync.Util.getTempFileName;
77

88
import com.google.common.io.ByteSink;
9-
import com.google.common.io.ByteSource;
109
import com.google.edwmigration.dbsync.common.DefaultArguments;
1110
import com.google.edwmigration.dbsync.common.InstructionReceiver;
11+
import com.google.edwmigration.dbsync.common.storage.DbsyncByteSource;
1212
import com.google.edwmigration.dbsync.proto.Instruction;
1313
import com.google.edwmigration.dbsync.storage.gcs.GcsStorage;
1414
import java.io.InputStream;
@@ -39,7 +39,7 @@ public static void main(String[] args) throws Exception {
3939
try (InputStream instructionsSource = gcsStorage.newByteSource(
4040
new URI(tmpBucket).resolve(getInstructionFileName(file))).openBufferedStream()) {
4141
URI sourceFile = new URI(targetBucket).resolve(file);
42-
ByteSource baseFileSource = gcsStorage.newByteSource(sourceFile);
42+
DbsyncByteSource baseFileSource = gcsStorage.newByteSource(sourceFile);
4343

4444
// Create a new file as a temp file and then swap it
4545
URI tempFile = new URI(targetBucket).resolve(getTempFileName(file));

dbsync/gcsync/src/test/java/com/google/edwmigration/dbsync/gcsync/GcsyncClientTest.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
import static org.mockito.Mockito.doNothing;
99
import static org.mockito.Mockito.mock;
1010
import static org.mockito.Mockito.never;
11-
import static org.mockito.Mockito.reset;
12-
import static org.mockito.Mockito.times;
1311
import static org.mockito.Mockito.verify;
1412
import static org.mockito.Mockito.when;
1513

@@ -23,9 +21,9 @@
2321
import com.google.cloud.run.v2.RunJobRequest;
2422
import com.google.cloud.storage.Blob;
2523
import com.google.common.io.ByteSink;
26-
import com.google.common.io.ByteSource;
2724
import com.google.common.io.Files;
2825
import com.google.edwmigration.dbsync.common.InstructionGenerator;
26+
import com.google.edwmigration.dbsync.common.storage.DbsyncByteSource;
2927
import com.google.edwmigration.dbsync.storage.gcs.GcsStorage;
3028
import java.io.BufferedOutputStream;
3129
import java.io.File;
@@ -106,7 +104,7 @@ public void setUp() throws Exception {
106104
String localMd5 = generateMd5(largeFile);
107105
when(mockBlob.getMd5()).thenReturn(localMd5);
108106

109-
when(mockGcsStorage.newByteSource(any())).thenReturn(mock(ByteSource.class));
107+
when(mockGcsStorage.newByteSource(any())).thenReturn(mock(DbsyncByteSource.class));
110108
ByteSink mockByteSink = mock(ByteSink.class);
111109
when(mockGcsStorage.newByteSink(any())).thenReturn(mockByteSink);
112110
when(mockByteSink.openBufferedStream()).thenReturn(mock(BufferedOutputStream.class));

dbsync/server/src/main/java/com/google/edwmigration/dbsync/server/GCSTarget.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import com.google.common.io.ByteSink;
44
import com.google.common.io.ByteSource;
5+
import com.google.edwmigration.dbsync.common.storage.DbsyncByteSource;
56
import com.google.edwmigration.dbsync.storage.gcs.GcsStorage;
67
import java.net.URI;
7-
import org.checkerframework.checker.units.qual.C;
88

99
public class GCSTarget implements RsyncTarget {
1010

@@ -42,7 +42,7 @@ public ByteSink getStagingByteSink() {
4242
}
4343

4444
@Override
45-
public ByteSource getTargetByteSource() {
45+
public DbsyncByteSource getTargetByteSource() {
4646
return storage.newByteSource(targetUri);
4747
}
4848

dbsync/server/src/main/java/com/google/edwmigration/dbsync/server/RsyncServer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.edwmigration.dbsync.common.ChecksumGenerator;
44
import com.google.edwmigration.dbsync.common.InstructionReceiver;
5+
import com.google.edwmigration.dbsync.common.storage.DbsyncByteSource;
56
import com.google.edwmigration.dbsync.proto.Instruction;
67
import java.io.IOException;
78
import java.io.InputStream;
@@ -18,7 +19,7 @@ public RsyncServer(RsyncTarget target) {
1819
}
1920

2021
public void generate() throws IOException{
21-
ByteSource source = target.getTargetByteSource();
22+
DbsyncByteSource source = target.getTargetByteSource();
2223
ByteSink checksumSink = target.getChecksumByteSink();
2324
try (OutputStream checksumStream = checksumSink.openBufferedStream()) {
2425
ChecksumGenerator generator = new ChecksumGenerator(CHECKSUM_BLOCK_SIZE);
@@ -35,7 +36,7 @@ public void generate() throws IOException{
3536

3637
public void reconstruct() throws IOException {
3738
ByteSource instructionSource = target.getInstructionsByteSource();
38-
ByteSource baseData = target.getTargetByteSource();
39+
DbsyncByteSource baseData = target.getTargetByteSource();
3940
ByteSink stagingsink = target.getStagingByteSink();
4041

4142
try (OutputStream targetStream = stagingsink.openBufferedStream();

dbsync/server/src/main/java/com/google/edwmigration/dbsync/server/RsyncTarget.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
import com.google.common.io.ByteSink;
44
import com.google.common.io.ByteSource;
5+
import com.google.edwmigration.dbsync.common.storage.DbsyncByteSource;
56

67
public interface RsyncTarget {
78
ByteSource getChecksumByteSource();
89
ByteSink getChecksumByteSink();
910
ByteSource getInstructionsByteSource();
1011
ByteSink getInstructionsByteSink();
11-
ByteSource getTargetByteSource();
12+
DbsyncByteSource getTargetByteSource();
1213
ByteSink getStagingByteSink();
1314
}

0 commit comments

Comments
 (0)