-
Notifications
You must be signed in to change notification settings - Fork 51
DLP Inspect initial commit #48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: file_tracker
Are you sure you want to change the base?
DLP Inspect initial commit #48
Conversation
p.apply( | ||
"File Read Transforrm", | ||
FileReaderTransform.newBuilder().setSubscriber(options.getSubscriber()).build()); | ||
PCollection<KV<String, String>> nonInspectedContents = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please return a tuple? Anywhere you have try/catch you should use multi output with error tag. I think there are quite a few places, you will need this change. Without multi output pipeline will fail to recover.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay sure. I will change this code block to return tuple and will take care about such try-catch blocks.
@@ -59,7 +59,7 @@ public Row apply(Row input) { | |||
input.getRow("value").getInt64("total_bytes_inspected").longValue(), | |||
Util.INSPECTED) | |||
.build(); | |||
LOG.info("Audit Row {}", aggrRow.toString()); | |||
LOG.info("FileTrackerTransform:MergePartialStatsRow: Audit Row {}", aggrRow.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is AuditInspectDataTransform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the class name is AuditInspectDataTransform. But for logging I have maintained the usage of Transformation names that will be displayed on UI DAG. So here the name of this Transformation is FileTrackerTransform and the sub step is MergePartialStatsRow. Can take out MergePartialStatsRow if it doesn't seem to be right as a log message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok makes sense.
@@ -99,7 +99,7 @@ public void processElement(ProcessContext c) throws IOException { | |||
if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) { | |||
String errorMessage = | |||
String.format( | |||
"Payload Size %s Exceeded Batch Size %s", | |||
"DLPTransform:DLPInspect: Payload Size %s Exceeded Batch Size %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional- Change the log.error to warn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes got that. Will run this through the team and decide accordingly.
src/main/java/com/google/swarm/tokenization/common/DLPTransform.java
Outdated
Show resolved
Hide resolved
Row.withSchema(Util.bqAuditSchema) | ||
.addValues(fileName, Util.getTimeStamp(),0L, "EMPTY") | ||
.build()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the processElement i think error is thrown and caught in the class after DLP is called. Can we have tuple like this? This should make sure internal error is not going to crash the pipeline.
dlp-dataflow-deidentification/src/main/java/com/google/swarm/tokenization/S3Import.java
Line 340 in 2c9bbd1
c.output(apiResponseFailedElements, e.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I got it. But just for clarification the try block is already handling the exception, right?
We are just adding the catch block to be sure.
Also, this class is already returning a tuple. I will add this additional TupleTag for the Errors.
Is there a need to log these in our main DLPS3ScannerPipeline.java file separately or can we leave it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also should we handle this initialization error as follows : https://github.com/GoogleCloudPlatform/dlp-dataflow-deidentification/blob/2c9bbd178ccaaa4422f89ba49fd0e8b4a0e4f26b/src/main/java/com/google/swarm/tokenization/S3Import.java#L333
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can leave it. Ideally you will flatten all the errors and write back somewhere. processElement throws the error cand catch clock is to output without crashing the pipeline.
} | ||
} catch (Exception e) { | ||
c.output(Util.readRowFailure, KV.of(fileName, e.getMessage())); | ||
LOG.error("File Read Transform:ReadFile: Error processing the file "+ fileName +" - " + Arrays.toString(e.getStackTrace())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where the multi output should be added. First comment added. So this class should output a Tuple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So should be keep the LOG as a warning here or just take out the log and keep output statement only?
Or log these in our main DLPS3ScannerPipeline.java file separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think let's log all in the main class after flatten.
src/main/java/com/google/swarm/tokenization/common/FileReaderSplitDoFn.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/swarm/tokenization/common/FileReaderSplitDoFn.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/swarm/tokenization/common/FileReaderTransform.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/swarm/tokenization/common/FileReaderTransform.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/swarm/tokenization/common/FileReaderTransform.java
Outdated
Show resolved
Hide resolved
LOG.info("File Read Transform:ConvertToGCSUri: Valid File Located: {}", file_name); | ||
c.output(file_name); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel like this logic can be simplified. (Optional)
Instant file_ts = Instant.parse(file_ts_string); | ||
Instant tf_ts = new Instant(metadata.lastModifiedMillis()); | ||
LOG.warn(file_ts.toString()); | ||
LOG.warn(tf_ts.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have one statement and a meaningful error message. This is not helpful. I feel you have too many logs here. Let's try to summarize and output what will be useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes sure. Some parts of the code are added by Robert L. Will look into those parts and take out the unnecessary logs.
public static final String ALLOWED_NOTIFICATION_EVENT_TYPE = String.valueOf("OBJECT_FINALIZE"); | ||
public static String INSPECTED = "INSPECTED"; | ||
public static String FAILED = "FAILED"; | ||
public static TupleTag<Row> inspectData = new TupleTag<Row>() {}; | ||
public static TupleTag<Row> auditData = new TupleTag<Row>() {}; | ||
public static TupleTag<Row> errorData = new TupleTag<Row>() {}; | ||
public static TupleTag<KV<String, String>> readRowSuccess = new TupleTag<KV<String, String>>() {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs documentation explaining what the key/value pairs are whenever using such generic structs
@santhh
Please review the code changes. I have pulled and then modified the code for better understanding of the changes.