Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.function.Consumer;
import org.checkerframework.checker.index.qual.NonNegative;
import org.slf4j.Logger;
Expand All @@ -30,7 +28,7 @@ public ChecksumGenerator(@NonNegative int blockSize) {
return blockSize;
}

public void generate(Consumer<Checksum> out, ByteSource in) throws IOException {
public void generate(ChecksumConsumer<IOException> out, ByteSource in) throws IOException {
// This deliberately throws if size is not Present.
Optional<Long> dataSizeOptional = in.sizeIfKnown();
if (!dataSizeOptional.isPresent()) {
Expand Down Expand Up @@ -72,10 +70,15 @@ public void generate(Consumer<Checksum> out, ByteSource in) throws IOException {
.setBlockOffset(offset)
.setBlockLength(blockSize)
.setWeakChecksum(weakHashCode)
.setStrongChecksum(strongHashCode.asLong())
.setStrongChecksum(strongHashCode.toString())
.build();
out.accept(c);
}
}
}

public interface ChecksumConsumer<X extends Exception> {

void accept(Checksum checksum) throws X;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private static Instruction newLiteralInstruction(byte[] literalBuffer, int liter
.build();
}

public void generate(Consumer<? super Instruction> out, ByteSource in,
public void generate(InstructionConsumer<? extends IOException> out, ByteSource in,
List<? extends Checksum> checksums) throws IOException {
Int2ObjectMap<Collection<Checksum>> checksumMap = new Int2ObjectOpenHashMap<>(checksums.size());
for (Checksum c : checksums) {
Expand Down Expand Up @@ -74,9 +74,9 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
// And the wisdom to know the difference.
MATCH:
if (cc != null) {
HashCode strongHashCode = rollingChecksum.getStrongHashCode();
String strongHashCode = rollingChecksum.getStrongHashCode().toString();
for (Checksum c : cc) {
if (c.getStrongChecksum() == strongHashCode.asLong()) {
if (c.getStrongChecksum().equals(strongHashCode)) {
if (literalBufferLength > 0) {
if (DEBUG) {
logger.debug("Emitting pre-match literal");
Expand All @@ -86,8 +86,8 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
}
Instruction insn = Instruction.newBuilder()
.setBlockLocation(BlockLocation.newBuilder()
.setBlockOffset(c.getBlockOffset())
.setBlockLength(c.getBlockLength()))
.setBlockOffset(c.getBlockOffset())
.setBlockLength(c.getBlockLength()))
.build();
out.accept(insn);
continue STREAM;
Expand Down Expand Up @@ -128,4 +128,13 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
}
}
}

public interface InstructionConsumer<X extends Exception> {

/**
* Accepts an instruction from the generator
*/
void accept(Instruction instruction) throws X;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import com.google.common.base.Preconditions;
import com.google.common.io.ByteSource;
import com.google.protobuf.ByteString;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.annotation.CheckForSigned;
import javax.annotation.WillCloseWhenClosed;
Expand All @@ -15,9 +18,10 @@

public class InstructionReceiver implements Closeable {

@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(InstructionReceiver.class);

private static final boolean DEBUG = false;

private final OutputStream out;
private final ByteSource in;
@CheckForSigned
Expand All @@ -30,12 +34,16 @@ public InstructionReceiver(@WillCloseWhenClosed OutputStream out, ByteSource in)
}

private void flushCopy() throws IOException {
if (copyStart != -1) {
in.slice(copyStart, copyLength).copyTo(out);
// These two assignments aren't always required, but it's nicer to have them here than belowl
copyStart = -1;
copyLength = 0;
if (copyStart == -1) {
return;
}
in.slice(copyStart, copyLength).copyTo(out);
// These two assignments aren't always required, but it's nicer to have them here than below
if (DEBUG) {
logger.info(String.format("Reuse bytes from %d for %d bytes", copyStart, copyLength));
}
copyStart = -1;
copyLength = 0;
}

public void receive(Instruction instruction) throws IOException {
Expand Down Expand Up @@ -69,4 +77,4 @@ public void close() throws IOException {
out.close();
}
}
}
}
2 changes: 1 addition & 1 deletion dbsync/common/src/main/proto/dbsync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ message Checksum {
uint64 blockOffset = 1;
uint32 blockLength = 2;
int32 weakChecksum = 3;
int64 strongChecksum = 4;
string strongChecksum = 4;
}

message Instruction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testChecksumGenerator() throws Exception {
assertEquals(i * blockSize, c.getBlockOffset(),"Bad offset in " + c);
ByteSource block = data.slice(c.getBlockOffset(), c.getBlockLength());
HashCode strongHashCode = block.hash(RollingChecksumImpl.STRONG_HASH_FUNCTION);
assertEquals(strongHashCode.asLong(), c.getStrongChecksum(), "Bad hash code in " + c);
assertEquals(strongHashCode.toString(), c.getStrongChecksum(), "Bad hash code in " + c);
}
}
}
35 changes: 35 additions & 0 deletions dbsync/gcsync/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* This file was generated by the Gradle 'init' task.
*/

plugins {
alias libs.plugins.shadow
id 'dbsync.java-application-conventions'
}

dependencies {
implementation project(':dbsync:common')
implementation project(':dbsync:storage-aws')
implementation project(':dbsync:storage-gcs')
implementation libs.jopt.simple
implementation libs.jdiagnostics
implementation libs.commons.lang3
implementation libs.google.cloud.run

// Test dependencies
testImplementation 'junit:junit:4.13.2'
testImplementation 'org.mockito:mockito-core:4.11.0'
testImplementation 'org.mockito:mockito-inline:4.11.0'
}

application {
mainClass = 'com.google.edwmigration.dbsync.gcsync.GcsyncMain'
}

shadowJar {
mergeServiceFiles()
}

test {
useJUnit()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.google.edwmigration.dbsync.gcsync;

public class Constants {

public static final String FILES_TO_RSYNC_FILE_NAME = "filesToRsync.txt";

public static final String JAR_FILE_NAME = "gcsync-all.jar";

public static final String CHECK_SUM_FILE_SUFFIX = "checksum";

public static final String INSTRUCTION_FILE_SUFFIX = "instruction";

public static final String TMP_FILE_SUFFIX = "updated";

public static final int BLOCK_SIZE = 4096;

// 10 MiB
public static final long RSYNC_SIZE_THRESHOLD = 10 * 1024 * 1024;

public static final String GENERATE_CHECK_SUM_MAIN = GenerateCheckSumMain.class.getName();

public static final String RECONSTRUCT_FILE_MAIN = ReconstructFilesMain.class.getName();
}
Loading