Skip to content

Commit f16cf4c

Browse files
authored
Initial commit for gcsync command line tool (#796)
* Initial push for gcsync command line tool * Remove unintentional changes * Some updates * Address pr comments * Reimplement gcsbytesource with a cached inputstream * remove unnecessary lines in build.gradle * fix a typo * Remember to close the outputstream * The correct change for last committ * Fix checksumgeneratortest * These lines shouldn't be commented out * Address some comments
1 parent 6383077 commit f16cf4c

File tree

16 files changed

+1017
-62
lines changed

16 files changed

+1017
-62
lines changed

dbsync/common/src/main/java/com/google/edwmigration/dbsync/common/ChecksumGenerator.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import com.google.common.primitives.Ints;
88
import java.io.IOException;
99
import java.io.InputStream;
10-
import java.net.URI;
11-
import java.net.URISyntaxException;
1210
import java.util.function.Consumer;
1311
import org.checkerframework.checker.index.qual.NonNegative;
1412
import org.slf4j.Logger;
@@ -30,7 +28,7 @@ public ChecksumGenerator(@NonNegative int blockSize) {
3028
return blockSize;
3129
}
3230

33-
public void generate(Consumer<Checksum> out, ByteSource in) throws IOException {
31+
public void generate(ChecksumConsumer<IOException> out, ByteSource in) throws IOException {
3432
// This deliberately throws if size is not Present.
3533
Optional<Long> dataSizeOptional = in.sizeIfKnown();
3634
if (!dataSizeOptional.isPresent()) {
@@ -72,10 +70,15 @@ public void generate(Consumer<Checksum> out, ByteSource in) throws IOException {
7270
.setBlockOffset(offset)
7371
.setBlockLength(blockSize)
7472
.setWeakChecksum(weakHashCode)
75-
.setStrongChecksum(strongHashCode.asLong())
73+
.setStrongChecksum(strongHashCode.toString())
7674
.build();
7775
out.accept(c);
7876
}
7977
}
8078
}
79+
80+
public interface ChecksumConsumer<X extends Exception> {
81+
82+
void accept(Checksum checksum) throws X;
83+
}
8184
}

dbsync/common/src/main/java/com/google/edwmigration/dbsync/common/InstructionGenerator.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ private static Instruction newLiteralInstruction(byte[] literalBuffer, int liter
3838
.build();
3939
}
4040

41-
public void generate(Consumer<? super Instruction> out, ByteSource in,
41+
public void generate(InstructionConsumer<? extends IOException> out, ByteSource in,
4242
List<? extends Checksum> checksums) throws IOException {
4343
Int2ObjectMap<Collection<Checksum>> checksumMap = new Int2ObjectOpenHashMap<>(checksums.size());
4444
for (Checksum c : checksums) {
@@ -74,9 +74,9 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
7474
// And the wisdom to know the difference.
7575
MATCH:
7676
if (cc != null) {
77-
HashCode strongHashCode = rollingChecksum.getStrongHashCode();
77+
String strongHashCode = rollingChecksum.getStrongHashCode().toString();
7878
for (Checksum c : cc) {
79-
if (c.getStrongChecksum() == strongHashCode.asLong()) {
79+
if (c.getStrongChecksum().equals(strongHashCode)) {
8080
if (literalBufferLength > 0) {
8181
if (DEBUG) {
8282
logger.debug("Emitting pre-match literal");
@@ -86,8 +86,8 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
8686
}
8787
Instruction insn = Instruction.newBuilder()
8888
.setBlockLocation(BlockLocation.newBuilder()
89-
.setBlockOffset(c.getBlockOffset())
90-
.setBlockLength(c.getBlockLength()))
89+
.setBlockOffset(c.getBlockOffset())
90+
.setBlockLength(c.getBlockLength()))
9191
.build();
9292
out.accept(insn);
9393
continue STREAM;
@@ -128,4 +128,13 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
128128
}
129129
}
130130
}
131+
132+
public interface InstructionConsumer<X extends Exception> {
133+
134+
/**
135+
* Accepts an instruction from the generator
136+
*/
137+
void accept(Instruction instruction) throws X;
138+
}
139+
131140
}

dbsync/common/src/main/java/com/google/edwmigration/dbsync/common/InstructionReceiver.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
import com.google.common.base.Preconditions;
66
import com.google.common.io.ByteSource;
77
import com.google.protobuf.ByteString;
8+
import java.io.BufferedInputStream;
89
import java.io.Closeable;
10+
import java.io.EOFException;
911
import java.io.IOException;
12+
import java.io.InputStream;
1013
import java.io.OutputStream;
1114
import javax.annotation.CheckForSigned;
1215
import javax.annotation.WillCloseWhenClosed;
@@ -15,9 +18,10 @@
1518

1619
public class InstructionReceiver implements Closeable {
1720

18-
@SuppressWarnings("unused")
1921
private static final Logger logger = LoggerFactory.getLogger(InstructionReceiver.class);
2022

23+
private static final boolean DEBUG = false;
24+
2125
private final OutputStream out;
2226
private final ByteSource in;
2327
@CheckForSigned
@@ -30,12 +34,16 @@ public InstructionReceiver(@WillCloseWhenClosed OutputStream out, ByteSource in)
3034
}
3135

3236
private void flushCopy() throws IOException {
33-
if (copyStart != -1) {
34-
in.slice(copyStart, copyLength).copyTo(out);
35-
// These two assignments aren't always required, but it's nicer to have them here than belowl
36-
copyStart = -1;
37-
copyLength = 0;
37+
if (copyStart == -1) {
38+
return;
39+
}
40+
in.slice(copyStart, copyLength).copyTo(out);
41+
// These two assignments aren't always required, but it's nicer to have them here than below
42+
if (DEBUG) {
43+
logger.info(String.format("Reuse bytes from %d for %d bytes", copyStart, copyLength));
3844
}
45+
copyStart = -1;
46+
copyLength = 0;
3947
}
4048

4149
public void receive(Instruction instruction) throws IOException {
@@ -69,4 +77,4 @@ public void close() throws IOException {
6977
out.close();
7078
}
7179
}
72-
}
80+
}

dbsync/common/src/main/proto/dbsync.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ message Checksum {
1313
uint64 blockOffset = 1;
1414
uint32 blockLength = 2;
1515
int32 weakChecksum = 3;
16-
int64 strongChecksum = 4;
16+
string strongChecksum = 4;
1717
}
1818

1919
message Instruction {

dbsync/common/src/test/java/com/google/edwmigration/dbsync/common/ChecksumGeneratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void testChecksumGenerator() throws Exception {
4343
assertEquals(i * blockSize, c.getBlockOffset(),"Bad offset in " + c);
4444
ByteSource block = data.slice(c.getBlockOffset(), c.getBlockLength());
4545
HashCode strongHashCode = block.hash(RollingChecksumImpl.STRONG_HASH_FUNCTION);
46-
assertEquals(strongHashCode.asLong(), c.getStrongChecksum(), "Bad hash code in " + c);
46+
assertEquals(strongHashCode.toString(), c.getStrongChecksum(), "Bad hash code in " + c);
4747
}
4848
}
4949
}

dbsync/gcsync/build.gradle

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* This file was generated by the Gradle 'init' task.
3+
*/
4+
5+
plugins {
6+
alias libs.plugins.shadow
7+
id 'dbsync.java-application-conventions'
8+
}
9+
10+
dependencies {
11+
implementation project(':dbsync:common')
12+
implementation project(':dbsync:storage-aws')
13+
implementation project(':dbsync:storage-gcs')
14+
implementation libs.jopt.simple
15+
implementation libs.jdiagnostics
16+
implementation libs.commons.lang3
17+
implementation libs.google.cloud.run
18+
19+
// Test dependencies
20+
testImplementation 'junit:junit:4.13.2'
21+
testImplementation 'org.mockito:mockito-core:4.11.0'
22+
testImplementation 'org.mockito:mockito-inline:4.11.0'
23+
}
24+
25+
application {
26+
mainClass = 'com.google.edwmigration.dbsync.gcsync.GcsyncMain'
27+
}
28+
29+
shadowJar {
30+
mergeServiceFiles()
31+
}
32+
33+
test {
34+
useJUnit()
35+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.google.edwmigration.dbsync.gcsync;
2+
3+
public class Constants {
4+
5+
public static final String FILES_TO_RSYNC_FILE_NAME = "filesToRsync.txt";
6+
7+
public static final String JAR_FILE_NAME = "gcsync-all.jar";
8+
9+
public static final String CHECK_SUM_FILE_SUFFIX = "checksum";
10+
11+
public static final String INSTRUCTION_FILE_SUFFIX = "instruction";
12+
13+
public static final String TMP_FILE_SUFFIX = "updated";
14+
15+
public static final int BLOCK_SIZE = 4096;
16+
17+
// 10 MiB
18+
public static final long RSYNC_SIZE_THRESHOLD = 10 * 1024 * 1024;
19+
20+
public static final String GENERATE_CHECK_SUM_MAIN = GenerateCheckSumMain.class.getName();
21+
22+
public static final String RECONSTRUCT_FILE_MAIN = ReconstructFilesMain.class.getName();
23+
}

0 commit comments

Comments
 (0)