Skip to content

DLP Inspect initial commit #48

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: file_tracker
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ dependencies {
compile group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: dataflowBeamVersion
compile group: 'org.apache.beam', name: 'beam-runners-direct-java', version: dataflowBeamVersion
compile group: 'org.slf4j', name: 'slf4j-jdk14', version: '1.7.5'
compile group: 'com.google.cloud', name: 'google-cloud-dlp', version: '1.1.2'
compile group: 'com.google.cloud', name: 'google-cloud-dlp', version: '1.1.4'
compile group: 'com.google.apis', name: 'google-api-services-cloudkms', version: 'v1-rev108-1.25.0'
compile group: 'org.apache.beam', name: 'beam-sdks-java-io-amazon-web-services', version: dataflowBeamVersion
compile "com.google.auto.value:auto-value-annotations:1.6.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,23 @@ public static void main(String[] args) {
public static PipelineResult run(S3ReaderOptions options) {
Pipeline p = Pipeline.create(options);

PCollection<KV<String, String>> nonInspectedContents =
p.apply(
"File Read Transforrm",
FileReaderTransform.newBuilder().setSubscriber(options.getSubscriber()).build());
PCollectionTuple nonInspectedContents =
p.apply(
"File Read Transform",
FileReaderTransform.newBuilder()
.setSubscriber(options.getSubscriber())
.setBatchSize(options.getBatchSize())
.build());

PCollectionTuple inspectedData =
nonInspectedContents.apply(
"DLPScanner",
DLPTransform.newBuilder()
.setInspectTemplateName(options.getInspectTemplateName())
.setProjectId(options.getProject())
.build());
nonInspectedContents
.get(Util.readRowSuccess)
.apply(
"DLPScanner",
DLPTransform.newBuilder()
.setInspectTemplateName(options.getInspectTemplateName())
.setProjectId(options.getProject())
.build());

PCollection<Row> inspectedContents =
inspectedData.get(Util.inspectData).setRowSchema(Util.bqDataSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Row apply(Row input) {
input.getRow("value").getInt64("total_bytes_inspected").longValue(),
Util.INSPECTED)
.build();
LOG.info("Audit Row {}", aggrRow.toString());
LOG.info("FileTrackerTransform:MergePartialStatsRow: Audit Row {}", aggrRow.toString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is AuditInspectDataTransform

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the class name is AuditInspectDataTransform. But for logging I have maintained the usage of Transformation names that will be displayed on UI DAG. So here the name of this Transformation is FileTrackerTransform and the sub step is MergePartialStatsRow. Can take out MergePartialStatsRow if it doesn't seem to be right as a log message.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense.

return aggrRow;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public WriteResult expand(PCollection<Row> input) {
.to(tableSpec())
.withMethod(method())
.useBeamSchema()
.withoutValidation()
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ public PCollectionTuple expand(PCollection<KV<String, String>> input) {
return input.apply(
"DLPInspect",
ParDo.of(new InspectData(projectId(), inspectTemplateName()))
.withOutputTags(Util.inspectData, TupleTagList.of(Util.auditData)));
.withOutputTags(Util.inspectData, TupleTagList.of(Util.auditData).and(Util.apiResponseFailedElements)));
}

public static class InspectData extends DoFn<KV<String, String>, Row> {
private String projectId;
private DlpServiceClient dlpServiceClient;
private String inspectTemplateName;
private InspectContentRequest.Builder requestBuilder;
private final Counter numberOfBytesInspected =
Expand All @@ -87,9 +88,27 @@ public void setup() {
.setInspectTemplateName(this.inspectTemplateName);
}

@StartBundle
public void startBundle() {
try {
this.dlpServiceClient = DlpServiceClient.create();

} catch (IOException e) {
LOG.error("DLPTransform:DLPInspect: Failed to create DLP Service Client {}", e.getMessage());
throw new RuntimeException(e);
}
}

@FinishBundle
public void finishBundle() {
if (this.dlpServiceClient != null) {
this.dlpServiceClient.close();
}
}

@ProcessElement
public void processElement(ProcessContext c) throws IOException {
try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
try {
String fileName = c.element().getKey();

if (!c.element().getValue().isEmpty()) {
Expand All @@ -99,7 +118,7 @@ public void processElement(ProcessContext c) throws IOException {
if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) {
String errorMessage =
String.format(
"Payload Size %s Exceeded Batch Size %s",
"DLPTransform:DLPInspect: Payload Size %s Exceeded Batch Size %s",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional- Change the log.error to warn

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes got that. Will run this through the team and decide accordingly.

this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT);
LOG.error(errorMessage);
} else {
Expand All @@ -109,7 +128,7 @@ public void processElement(ProcessContext c) throws IOException {
long bytesInspected = contentItem.getSerializedSize();
int totalFinding =
Long.valueOf(response.getResult().getFindingsList().stream().count()).intValue();
LOG.debug("bytes inspected {}", bytesInspected);

boolean hasErrors = response.findInitializationErrors().stream().count() > 0;
if (response.hasResult() && !hasErrors) {
response
Expand All @@ -128,7 +147,7 @@ public void processElement(ProcessContext c) throws IOException {
finding.getLocation().getCodepointRange().getStart(),
finding.getLocation().getCodepointRange().getEnd())
.build();
LOG.debug("Row {}", row);
LOG.debug("DLPTransform:DLPInspect: Row {}", row);

c.output(Util.inspectData, row);
});
Expand All @@ -148,7 +167,9 @@ public void processElement(ProcessContext c) throws IOException {
Row.withSchema(Util.errorSchema)
.addValues(fileName, timeStamp, error.toString())
.build());
LOG.info("DLPTransform:DLPInspect: Initialization error in DLP response - {}", error);
});
//Need to change 0 to 0L
c.output(
Util.auditData,
Row.withSchema(Util.bqAuditSchema)
Expand All @@ -157,6 +178,17 @@ public void processElement(ProcessContext c) throws IOException {
}
}
}
else{
LOG.info("DLPTransform:DLPInspect: {} is an empty file | Size of the file in bytes - {}", fileName, c.element().getValue().length());
c.output(
Util.auditData,
Row.withSchema(Util.bqAuditSchema)
.addValues(fileName, Util.getTimeStamp(),0L, "EMPTY")
.build());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the processElement i think error is thrown and caught in the class after DLP is called. Can we have tuple like this? This should make sure internal error is not going to crash the pipeline.

c.output(apiResponseFailedElements, e.toString());

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I got it. But just for clarification the try block is already handling the exception, right?
We are just adding the catch block to be sure.

Also, this class is already returning a tuple. I will add this additional TupleTag for the Errors.
Is there a need to log these in our main DLPS3ScannerPipeline.java file separately or can we leave it ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can leave it. Ideally you will flatten all the errors and write back somewhere. processElement throws the error cand catch clock is to output without crashing the pipeline.

}
catch (Exception e) {
c.output(Util.apiResponseFailedElements, e.toString());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package com.google.swarm.tokenization.common;

import com.google.protobuf.ByteString;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.util.Arrays;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -31,7 +33,11 @@
public class FileReaderSplitDoFn extends DoFn<KV<String, ReadableFile>, KV<String, String>> {
public static final Logger LOG = LoggerFactory.getLogger(FileReaderSplitDoFn.class);
public static Integer SPLIT_SIZE = 900000;
private static Integer BATCH_SIZE = 520000;
public Integer BATCH_SIZE;

public FileReaderSplitDoFn(Integer batchSize) {
this.BATCH_SIZE = batchSize;
}

@ProcessElement
public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker)
Expand All @@ -50,19 +56,19 @@ public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Lon
buffer = ByteString.copyFrom(readBuffer);
readBuffer.clear();
LOG.debug(
"Current Restriction {}, Content Size{}", tracker.currentRestriction(), buffer.size());
c.output(KV.of(fileName, buffer.toStringUtf8().trim()));
"File Read Transform:ReadFile: Current Restriction {}, Content Size{}", tracker.currentRestriction(), buffer.size());
c.output(Util.readRowSuccess, KV.of(fileName, buffer.toStringUtf8().trim()));
}
} catch (Exception e) {

LOG.error(e.getMessage());
c.output(Util.readRowFailure, KV.of(fileName, e.getMessage()));
}
}

@GetInitialRestriction
public OffsetRange getInitialRestriction(@Element KV<String, ReadableFile> file)
throws IOException {
long totalBytes = file.getValue().getMetadata().sizeBytes();

long totalBytes = file.getValue().getMetadata().sizeBytes();
long totalSplit = 0;
if (totalBytes < BATCH_SIZE) {
totalSplit = 2;
Expand All @@ -75,10 +81,11 @@ public OffsetRange getInitialRestriction(@Element KV<String, ReadableFile> file)
}

LOG.info(
"Total Bytes {} for File {} -Initial Restriction range from 1 to: {}",
"File Read Transform:ReadFile: Total Bytes {} for File {} -Initial Restriction range from 1 to: {}. Batch size of each chunk: {} ",
totalBytes,
file.getKey(),
totalSplit);
totalSplit,
BATCH_SIZE);
return new OffsetRange(1, totalSplit);
}

Expand All @@ -97,14 +104,9 @@ public OffsetRangeTracker newTracker(@Restriction OffsetRange range) {
return new OffsetRangeTracker(new OffsetRange(range.getFrom(), range.getTo()));
}

private static SeekableByteChannel getReader(ReadableFile eventFile) {
private static SeekableByteChannel getReader(ReadableFile eventFile) throws IOException {
SeekableByteChannel channel = null;
try {
channel = eventFile.openSeekable();
} catch (IOException e) {
LOG.error("Failed to Open File {}", e.getMessage());
throw new RuntimeException(e);
}
channel = eventFile.openSeekable();
return channel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,65 +19,134 @@
import com.google.swarm.tokenization.common.CSVFileReaderTransform.Builder;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import com.google.common.collect.ImmutableList;

@AutoValue
public abstract class FileReaderTransform
extends PTransform<PBegin, PCollection<KV<String, String>>> {
public abstract class FileReaderTransform extends PTransform<PBegin, PCollectionTuple> {

public static final Logger LOG = LoggerFactory.getLogger(FileReaderTransform.class);

public abstract String subscriber();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setSubscriber(String subscriber);
public abstract Integer batchSize();

public abstract FileReaderTransform build();
}
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setSubscriber(String subscriber);

public abstract Builder setBatchSize(Integer batchSize);

public abstract FileReaderTransform build();
}

public static Builder newBuilder() {
return new AutoValue_FileReaderTransform.Builder();
}

@Override
public PCollection<KV<String, String>> expand(PBegin input) {
public PCollectionTuple expand(PBegin input) {

return input
.apply(
"ReadFileMetadata",
PubsubIO.readMessagesWithAttributes().fromSubscription(subscriber()))
.apply("ConvertToGCSUri", ParDo.of(new MapPubSubMessage()))
.apply("FindFile", FileIO.matchAll())
.apply("FindFile", FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))
.apply(FileIO.readMatches())
.apply("AddFileNameAsKey", ParDo.of(new FileSourceDoFn()))
.apply("ReadFile", ParDo.of(new FileReaderSplitDoFn()));
.apply("ReadFile", ParDo.of(new FileReaderSplitDoFn(batchSize()))
.withOutputTags(Util.readRowSuccess, TupleTagList.of(Util.readRowFailure)));
}

public class MapPubSubMessage extends DoFn<PubsubMessage, String> {

private final Counter numberOfFilesReceived =
Metrics.counter(FileReaderTransform.MapPubSubMessage.class, "NumberOfFilesReceived");

@ProcessElement
public void processElement(ProcessContext c) {

LOG.info("File Read Transform:ConvertToGCSUri: Located File's Metadata : {} ", c.element().getAttributeMap());
numberOfFilesReceived.inc(1L);

String bucket = c.element().getAttribute("bucketId");
String object = c.element().getAttribute("objectId");
String eventType = c.element().getAttribute("eventType");
String file_ts_string = c.element().getAttribute("eventTime");
GcsPath uri = GcsPath.fromComponents(bucket, object);

if (eventType.equalsIgnoreCase(Util.ALLOWED_NOTIFICATION_EVENT_TYPE)) {
LOG.info("File Name {}", uri.toString());
c.output(uri.toString());
} else {
LOG.info("Event Type Not Supported {}", eventType);
}
String file_name = uri.toString();
String prefix;

//Match filenames having extensions
Matcher m1 = Pattern.compile("^gs://([^/]+)/(.*)\\.(.*)$").matcher(file_name);

if (m1.find()) {
prefix = m1.group(2);
} else {//No extension
prefix = object;
}

ImmutableList.Builder<ResourceId> sourceFiles = ImmutableList.builder();
AtomicBoolean should_scan = new AtomicBoolean(true);

if (!file_name.matches(Util.VALID_FILE_PATTERNS)) {
LOG.warn("File Read Transform:ConvertToGCSUri: Unsupported File Format. Skipping: {}", file_name);
should_scan.set(false);
} else if (!eventType.equalsIgnoreCase(Util.ALLOWED_NOTIFICATION_EVENT_TYPE)) {
LOG.warn("File Read Transform:ConvertToGCSUri: Event Type Not Supported: {}. Skipping: {}", eventType,file_name);
should_scan.set(false);
} else {
try {
MatchResult listResult = FileSystems.match("gs://" + bucket + "/" + prefix + ".*.dlp", EmptyMatchTreatment.ALLOW);
listResult.metadata().forEach(metadata -> {
ResourceId resourceId = metadata.resourceId();
Instant file_ts = Instant.parse(file_ts_string);
Instant tf_ts = new Instant(metadata.lastModifiedMillis());
if (resourceId.toString().equals("gs://" + bucket + "/" + prefix + ".rdct.dlp") && file_ts.isBefore(tf_ts)) {
LOG.warn("File Read Transform:ConvertToGCSUri: File has already been redacted. Skipping: {}", file_name);
should_scan.set(false);
} else {
LOG.warn("File Read Transform:ConvertToGCSUri: Deleting old touchfile: {}", resourceId.toString());
sourceFiles.add(resourceId);
}
});
FileSystems.delete(sourceFiles.build(), MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

if (should_scan.get()) {
LOG.info("File Read Transform:ConvertToGCSUri: Valid File Located: {}", file_name);
c.output(file_name);
}
}
Copy link
Collaborator

@santhh santhh May 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel like this logic can be simplified. (Optional)

}
}
}
Loading