Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.function.Consumer;
import org.checkerframework.checker.index.qual.NonNegative;
import org.slf4j.Logger;
Expand Down Expand Up @@ -72,7 +70,7 @@ public void generate(Consumer<Checksum> out, ByteSource in) throws IOException {
.setBlockOffset(offset)
.setBlockLength(blockSize)
.setWeakChecksum(weakHashCode)
.setStrongChecksum(strongHashCode.asLong())
.setStrongChecksum(strongHashCode.toString())
.build();
out.accept(c);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
if (cc != null) {
HashCode strongHashCode = rollingChecksum.getStrongHashCode();
for (Checksum c : cc) {
if (c.getStrongChecksum() == strongHashCode.asLong()) {
if (c.getStrongChecksum().equals(strongHashCode.toString())) {
if (literalBufferLength > 0) {
if (DEBUG) {
LOG.debug("Emitting pre-match literal");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import com.google.common.base.Preconditions;
import com.google.common.io.ByteSource;
import com.google.protobuf.ByteString;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.annotation.CheckForSigned;
import javax.annotation.WillCloseWhenClosed;
Expand All @@ -18,24 +21,22 @@ public class InstructionReceiver implements Closeable {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(InstructionReceiver.class);

private static final boolean DEBUG = false;

private final OutputStream out;
private final ByteSource in;
private InputStream sourceStream;
private long currentSourceOffset = 0;
@CheckForSigned
private long copyStart = -1;
private long copyLength = 0;

public InstructionReceiver(@WillCloseWhenClosed OutputStream out, ByteSource in) {
public InstructionReceiver(@WillCloseWhenClosed OutputStream out, ByteSource in)
throws IOException {
this.out = Preconditions.checkNotNull(out, "Output was null.");
this.in = Preconditions.checkNotNull(in, "Input was null.");
}

private void flushCopy() throws IOException {
if (copyStart != -1) {
in.slice(copyStart, copyLength).copyTo(out);
// These two assignments aren't always required, but it's nicer to have them here than belowl
copyStart = -1;
copyLength = 0;
}
this.sourceStream = in.openStream();
}

public void receive(Instruction instruction) throws IOException {
Expand Down Expand Up @@ -67,6 +68,64 @@ public void close() throws IOException {
flushCopy();
} finally {
out.close();
sourceStream.close();
}
}

private void flushCopy() throws IOException {
if (copyStart == -1) {
return;
}
LOG.info(String.format("Reuse bytes from %d for %d bytes", copyStart, copyLength));

// If the requested block is before our current position,
// we must re-open the stream from the beginning.
if (copyStart < currentSourceOffset) {
if (DEBUG) {
LOG.info(String.format("Target offset %d smaller than current offset %d, reopen stream",
copyStart, currentSourceOffset));
}
sourceStream.close();
sourceStream = in.openStream();
currentSourceOffset = 0;
}
// Skip forward to the desired start.
long toSkip = copyStart - currentSourceOffset;
skip(toSkip);
copy(copyLength);

// Clear pending copy.
copyStart = -1;
copyLength = 0;
}

private void copy(long copyLength) throws IOException {
// Now copy exactly copyLength bytes.
byte[] buffer = new byte[8192];
long remaining = copyLength;
while (remaining > 0) {
int bytesToRead = (int) Math.min(buffer.length, remaining);
int read = sourceStream.read(buffer, 0, bytesToRead);
if (read == -1) {
throw new EOFException("Unexpected end of stream while copying " + copyLength
+ " bytes starting at offset " + copyStart);
}
out.write(buffer, 0, read);
remaining -= read;
currentSourceOffset += read;
}
}
}

private void skip(long length) throws IOException {
long remaining = length;
while (remaining > 0) {
long skipped = sourceStream.skip(remaining);
if (skipped < remaining) {
throw new EOFException("Unexpected end of stream while skipping.");
}
remaining -= skipped;
currentSourceOffset += skipped;
}
}

}
2 changes: 1 addition & 1 deletion dbsync/common/src/main/proto/dbsync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ message Checksum {
uint64 blockOffset = 1;
uint32 blockLength = 2;
int32 weakChecksum = 3;
int64 strongChecksum = 4;
string strongChecksum = 4;
}

message Instruction {
Expand Down
47 changes: 47 additions & 0 deletions dbsync/gcsync/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* This file was generated by the Gradle 'init' task.
*/

plugins {
alias libs.plugins.shadow
id 'dbsync.java-application-conventions'
}

repositories {
// Use Maven Central for resolving dependencies.
mavenCentral()
}


configurations {
serverClasspath {
canBeConsumed = false
}
}

dependencies {
implementation project(':dbsync:common')
implementation project(':dbsync:storage-aws')
implementation project(':dbsync:storage-gcs')
implementation libs.jopt.simple
implementation libs.jdiagnostics
implementation libs.commons.lang3
implementation libs.google.cloud.run

// Test dependencies
testImplementation 'junit:junit:4.13.2'
testImplementation 'org.mockito:mockito-core:4.11.0'
testImplementation 'org.mockito:mockito-inline:4.11.0'
}

application {
mainClass = 'com.google.edwmigration.dbsync.gcsync.GcsyncMain'
}

shadowJar {
mergeServiceFiles()
}

test {
useJUnit()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.google.edwmigration.dbsync.gcsync;

public class Constants {

public static final String FILES_TO_RSYNC_FILE_NAME = "filesToRsync.txt";

public static final String JAR_FILE_NAME = "gcsync-all.jar";

public static final String CHECK_SUM_FILE_SUFFIX = "checksum";

public static final String INSTRUCTION_FILE_SUFFIX = "instruction";

public static final String TMP_FILE_SUFFIX = "updated";

public static final int BLOCK_SIZE = 4096;

// 10 MiB
public static final long RSYNC_SIZE_THRESHOLD = 10 * 1024 * 1024;

public static final String GENERATE_CHECK_SUM_MAIN = GenerateCheckSumMain.class.getName();

public static final String RECONSTRUCT_FILE_MAIN = ReconstructFilesMain.class.getName();
}
Loading