Skip to content

Commit c885428

Browse files
committed
error handling
1 parent ab0accc commit c885428

File tree

4 files changed

+29
-26
lines changed

4 files changed

+29
-26
lines changed

src/main/java/com/google/swarm/tokenization/DLPS3ScannerPipeline.java

+9-8
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
2828
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
2929
import org.apache.beam.sdk.options.PipelineOptionsFactory;
30-
import org.apache.beam.sdk.values.KV;
3130
import org.apache.beam.sdk.values.PCollection;
3231
import org.apache.beam.sdk.values.PCollectionTuple;
3332
import org.apache.beam.sdk.values.Row;
@@ -46,18 +45,20 @@ public static void main(String[] args) {
4645
public static PipelineResult run(S3ReaderOptions options) {
4746
Pipeline p = Pipeline.create(options);
4847

49-
PCollection<KV<String, String>> nonInspectedContents =
48+
PCollectionTuple nonInspectedContents =
5049
p.apply(
5150
"File Read Transforrm",
5251
FileReaderTransform.newBuilder().setSubscriber(options.getSubscriber()).build());
5352

5453
PCollectionTuple inspectedData =
55-
nonInspectedContents.apply(
56-
"DLPScanner",
57-
DLPTransform.newBuilder()
58-
.setInspectTemplateName(options.getInspectTemplateName())
59-
.setProjectId(options.getProject())
60-
.build());
54+
nonInspectedContents
55+
.get(Util.readRowSuccess)
56+
.apply(
57+
"DLPScanner",
58+
DLPTransform.newBuilder()
59+
.setInspectTemplateName(options.getInspectTemplateName())
60+
.setProjectId(options.getProject())
61+
.build());
6162

6263
PCollection<Row> inspectedContents =
6364
inspectedData.get(Util.inspectData).setRowSchema(Util.bqDataSchema);

src/main/java/com/google/swarm/tokenization/common/FileReaderSplitDoFn.java

+7-11
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,18 @@ public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Lon
5151
readBuffer.clear();
5252
LOG.debug(
5353
"Current Restriction {}, Content Size{}", tracker.currentRestriction(), buffer.size());
54-
c.output(KV.of(fileName, buffer.toStringUtf8().trim()));
54+
c.output(Util.readRowSuccess, KV.of(fileName, buffer.toStringUtf8().trim()));
5555
}
5656
} catch (Exception e) {
57-
58-
LOG.error(e.getMessage());
57+
c.output(Util.readRowFailure, KV.of(fileName, e.getMessage()));
5958
}
6059
}
6160

6261
@GetInitialRestriction
6362
public OffsetRange getInitialRestriction(@Element KV<String, ReadableFile> file)
6463
throws IOException {
65-
long totalBytes = file.getValue().getMetadata().sizeBytes();
64+
65+
long totalBytes = file.getValue().getMetadata().sizeBytes();
6666
long totalSplit = 0;
6767
if (totalBytes < BATCH_SIZE) {
6868
totalSplit = 2;
@@ -97,14 +97,10 @@ public OffsetRangeTracker newTracker(@Restriction OffsetRange range) {
9797
return new OffsetRangeTracker(new OffsetRange(range.getFrom(), range.getTo()));
9898
}
9999

100-
private static SeekableByteChannel getReader(ReadableFile eventFile) {
100+
private static SeekableByteChannel getReader(ReadableFile eventFile) throws IOException {
101101
SeekableByteChannel channel = null;
102-
try {
103-
channel = eventFile.openSeekable();
104-
} catch (IOException e) {
105-
LOG.error("Failed to Open File {}", e.getMessage());
106-
throw new RuntimeException(e);
107-
}
102+
LOG.info("event File Channel {}",eventFile.getMetadata().resourceId().getFilename());
103+
channel = eventFile.openSeekable();
108104
return channel;
109105
}
110106
}

src/main/java/com/google/swarm/tokenization/common/FileReaderTransform.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,20 @@
1919
import com.google.swarm.tokenization.common.CSVFileReaderTransform.Builder;
2020
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
2121
import org.apache.beam.sdk.io.FileIO;
22+
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
2223
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
2324
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
2425
import org.apache.beam.sdk.transforms.DoFn;
2526
import org.apache.beam.sdk.transforms.PTransform;
2627
import org.apache.beam.sdk.transforms.ParDo;
27-
import org.apache.beam.sdk.values.KV;
2828
import org.apache.beam.sdk.values.PBegin;
29-
import org.apache.beam.sdk.values.PCollection;
29+
import org.apache.beam.sdk.values.PCollectionTuple;
30+
import org.apache.beam.sdk.values.TupleTagList;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

3334
@AutoValue
34-
public abstract class FileReaderTransform
35-
extends PTransform<PBegin, PCollection<KV<String, String>>> {
35+
public abstract class FileReaderTransform extends PTransform<PBegin, PCollectionTuple> {
3636

3737
public static final Logger LOG = LoggerFactory.getLogger(FileReaderTransform.class);
3838

@@ -50,17 +50,20 @@ public static Builder newBuilder() {
5050
}
5151

5252
@Override
53-
public PCollection<KV<String, String>> expand(PBegin input) {
53+
public PCollectionTuple expand(PBegin input) {
5454

5555
return input
5656
.apply(
5757
"ReadFileMetadata",
5858
PubsubIO.readMessagesWithAttributes().fromSubscription(subscriber()))
5959
.apply("ConvertToGCSUri", ParDo.of(new MapPubSubMessage()))
60-
.apply("FindFile", FileIO.matchAll())
60+
.apply("FindFile", FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))
6161
.apply(FileIO.readMatches())
6262
.apply("AddFileNameAsKey", ParDo.of(new FileSourceDoFn()))
63-
.apply("ReadFile", ParDo.of(new FileReaderSplitDoFn()));
63+
.apply(
64+
"ReadFile",
65+
ParDo.of(new FileReaderSplitDoFn())
66+
.withOutputTags(Util.readRowSuccess, TupleTagList.of(Util.readRowFailure)));
6467
}
6568

6669
public class MapPubSubMessage extends DoFn<PubsubMessage, String> {

src/main/java/com/google/swarm/tokenization/common/Util.java

+3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.beam.sdk.options.ValueProvider;
4040
import org.apache.beam.sdk.schemas.Schema;
4141
import org.apache.beam.sdk.schemas.Schema.FieldType;
42+
import org.apache.beam.sdk.values.KV;
4243
import org.apache.beam.sdk.values.Row;
4344
import org.apache.beam.sdk.values.TupleTag;
4445
import org.apache.commons.csv.CSVFormat;
@@ -67,6 +68,8 @@ public class Util {
6768
public static TupleTag<Row> inspectData = new TupleTag<Row>() {};
6869
public static TupleTag<Row> auditData = new TupleTag<Row>() {};
6970
public static TupleTag<Row> errorData = new TupleTag<Row>() {};
71+
public static TupleTag<KV<String, String>> readRowSuccess = new TupleTag<KV<String, String>>() {};
72+
public static TupleTag<KV<String, String>> readRowFailure = new TupleTag<KV<String, String>>() {};
7073

7174
public static String parseBucketName(String value) {
7275
return value.substring(5, value.length() - 1);

0 commit comments

Comments
 (0)