Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit for gcsync command line tool #796

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.function.Consumer;
import org.checkerframework.checker.index.qual.NonNegative;
import org.slf4j.Logger;
Expand All @@ -30,7 +28,7 @@ public ChecksumGenerator(@NonNegative int blockSize) {
return blockSize;
}

public void generate(Consumer<Checksum> out, ByteSource in) throws IOException {
public void generate(ChecksumConsumer<IOException> out, ByteSource in) throws IOException {
// This deliberately throws if size is not Present.
Optional<Long> dataSizeOptional = in.sizeIfKnown();
if (!dataSizeOptional.isPresent()) {
Expand Down Expand Up @@ -72,10 +70,15 @@ public void generate(Consumer<Checksum> out, ByteSource in) throws IOException {
.setBlockOffset(offset)
.setBlockLength(blockSize)
.setWeakChecksum(weakHashCode)
.setStrongChecksum(strongHashCode.asLong())
.setStrongChecksum(strongHashCode.toString())
.build();
out.accept(c);
}
}
}

public interface ChecksumConsumer<X extends Exception> {

void accept(Checksum checksum) throws X;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private static Instruction newLiteralInstruction(byte[] literalBuffer, int liter
.build();
}

public void generate(Consumer<? super Instruction> out, ByteSource in,
public void generate(InstructionConsumer<? extends IOException> out, ByteSource in,
List<? extends Checksum> checksums) throws IOException {
Int2ObjectMap<Collection<Checksum>> checksumMap = new Int2ObjectOpenHashMap<>(checksums.size());
for (Checksum c : checksums) {
Expand Down Expand Up @@ -74,9 +74,9 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
// And the wisdom to know the difference.
MATCH:
if (cc != null) {
HashCode strongHashCode = rollingChecksum.getStrongHashCode();
String strongHashCode = rollingChecksum.getStrongHashCode().toString();
for (Checksum c : cc) {
if (c.getStrongChecksum() == strongHashCode.asLong()) {
if (c.getStrongChecksum().equals(strongHashCode)) {
if (literalBufferLength > 0) {
if (DEBUG) {
logger.debug("Emitting pre-match literal");
Expand All @@ -86,8 +86,8 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
}
Instruction insn = Instruction.newBuilder()
.setBlockLocation(BlockLocation.newBuilder()
.setBlockOffset(c.getBlockOffset())
.setBlockLength(c.getBlockLength()))
.setBlockOffset(c.getBlockOffset())
.setBlockLength(c.getBlockLength()))
.build();
out.accept(insn);
continue STREAM;
Expand Down Expand Up @@ -128,4 +128,13 @@ public void generate(Consumer<? super Instruction> out, ByteSource in,
}
}
}

public interface InstructionConsumer<X extends Exception> {

/**
* Accepts an instruction from the generator
*/
void accept(Instruction instruction) throws X;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import com.google.common.base.Preconditions;
import com.google.common.io.ByteSource;
import com.google.protobuf.ByteString;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.annotation.CheckForSigned;
import javax.annotation.WillCloseWhenClosed;
Expand All @@ -15,9 +18,10 @@

public class InstructionReceiver implements Closeable {

@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(InstructionReceiver.class);

private static final boolean DEBUG = false;

private final OutputStream out;
private final ByteSource in;
@CheckForSigned
Expand All @@ -29,15 +33,6 @@ public InstructionReceiver(@WillCloseWhenClosed OutputStream out, ByteSource in)
this.in = Preconditions.checkNotNull(in, "Input was null.");
}

private void flushCopy() throws IOException {
if (copyStart != -1) {
in.slice(copyStart, copyLength).copyTo(out);
// These two assignments aren't always required, but it's nicer to have them here than belowl
copyStart = -1;
copyLength = 0;
}
}

public void receive(Instruction instruction) throws IOException {
switch (instruction.getBodyCase()) {
case BLOCKLOCATION:
Expand Down Expand Up @@ -69,4 +64,19 @@ public void close() throws IOException {
out.close();
}
}
}

private void flushCopy() throws IOException {
if (copyStart == -1) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the value -1 mean for copyStart?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means copyStart is unset

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document this, e.g.

...
// the start index for copy() if >= 0, or -1 if it is unset
@CheckForSigned
private int copyStart = -1
...

return;
}
if (DEBUG) {
logger.info(String.format("Reuse bytes from %d for %d bytes", copyStart, copyLength));
}
in.slice(copyStart, copyLength).copyTo(out);

// Clear pending copy.
copyStart = -1;
copyLength = 0;
}

}
2 changes: 1 addition & 1 deletion dbsync/common/src/main/proto/dbsync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ message Checksum {
uint64 blockOffset = 1;
uint32 blockLength = 2;
int32 weakChecksum = 3;
int64 strongChecksum = 4;
string strongChecksum = 4;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be safer as bytes - although bytes are really expensive in protobuf, so it might as well be string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave it as string.

}

message Instruction {
Expand Down
35 changes: 35 additions & 0 deletions dbsync/gcsync/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* This file was generated by the Gradle 'init' task.
*/

plugins {
alias libs.plugins.shadow
id 'dbsync.java-application-conventions'
}

dependencies {
implementation project(':dbsync:common')
implementation project(':dbsync:storage-aws')
implementation project(':dbsync:storage-gcs')
implementation libs.jopt.simple
implementation libs.jdiagnostics
implementation libs.commons.lang3
implementation libs.google.cloud.run

// Test dependencies
testImplementation 'junit:junit:4.13.2'
testImplementation 'org.mockito:mockito-core:4.11.0'
testImplementation 'org.mockito:mockito-inline:4.11.0'
}

application {
mainClass = 'com.google.edwmigration.dbsync.gcsync.GcsyncMain'
}

shadowJar {
mergeServiceFiles()
}

test {
useJUnit()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.google.edwmigration.dbsync.gcsync;

public class Constants {

public static final String FILES_TO_RSYNC_FILE_NAME = "filesToRsync.txt";

public static final String JAR_FILE_NAME = "gcsync-all.jar";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this and other constants which are only used in one file, I'd move it to that file.


public static final String CHECK_SUM_FILE_SUFFIX = "checksum";

public static final String INSTRUCTION_FILE_SUFFIX = "instruction";

public static final String TMP_FILE_SUFFIX = "updated";
Copy link
Collaborator

@misolt misolt Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider adding an enum instead of the *_FILE_SUFFIX constants.


public static final int BLOCK_SIZE = 4096;

// 10 MiB
public static final long RSYNC_SIZE_THRESHOLD = 10 * 1024 * 1024;

public static final String GENERATE_CHECK_SUM_MAIN = GenerateCheckSumMain.class.getName();

public static final String RECONSTRUCT_FILE_MAIN = ReconstructFilesMain.class.getName();
}
Loading