Skip to content

Commit 0ebd398

Browse files
committed
oom fix
1 parent 4c3b61c commit 0ebd398

File tree

3 files changed

+40
-67
lines changed

3 files changed

+40
-67
lines changed

src/main/java/com/google/swarm/tokenization/DLPS3ScannerPipeline.java

+37-47
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,22 @@
1515
*/
1616
package com.google.swarm.tokenization;
1717

18+
import com.google.swarm.tokenization.common.AuditInspectDataTransform;
19+
import com.google.swarm.tokenization.common.BQWriteTransform;
1820
import com.google.swarm.tokenization.common.DLPTransform;
1921
import com.google.swarm.tokenization.common.FileReaderTransform;
22+
import com.google.swarm.tokenization.common.RowToJson;
2023
import com.google.swarm.tokenization.common.S3ReaderOptions;
24+
import com.google.swarm.tokenization.common.Util;
2125
import org.apache.beam.sdk.Pipeline;
2226
import org.apache.beam.sdk.PipelineResult;
27+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
28+
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
2329
import org.apache.beam.sdk.options.PipelineOptionsFactory;
2430
import org.apache.beam.sdk.values.KV;
2531
import org.apache.beam.sdk.values.PCollection;
2632
import org.apache.beam.sdk.values.PCollectionTuple;
33+
import org.apache.beam.sdk.values.Row;
2734
import org.slf4j.Logger;
2835
import org.slf4j.LoggerFactory;
2936

@@ -33,7 +40,6 @@ public class DLPS3ScannerPipeline {
3340
public static void main(String[] args) {
3441
S3ReaderOptions options =
3542
PipelineOptionsFactory.fromArgs(args).withValidation().as(S3ReaderOptions.class);
36-
// options.setEnableStreamingEngine(true);
3743
run(options);
3844
}
3945

@@ -48,23 +54,7 @@ public static PipelineResult run(S3ReaderOptions options) {
4854
.setDelimeter(options.getDelimeter())
4955
.setKeyRange(options.getKeyRange())
5056
.build());
51-
// .apply(
52-
// "Fixed Window",
53-
// Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(10)))
54-
// .triggering(
55-
//
56-
// AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))
57-
// .discardingFiredPanes()
58-
// .withAllowedLateness(Duration.ZERO));
5957

60-
// nonInspectedContents.apply("Print", ParDo.of(new DoFn<KV<String,String>, String>(){
61-
//
62-
// @ProcessElement
63-
// public void processElement(ProcessContext c) {
64-
// c.output(c.element().getValue());
65-
// }
66-
// }));
67-
//
6858
PCollectionTuple inspectedData =
6959
nonInspectedContents.apply(
7060
"DLPScanner",
@@ -74,37 +64,37 @@ public static PipelineResult run(S3ReaderOptions options) {
7464
.setBatchSize(options.getBatchSize())
7565
.build());
7666

77-
// PCollection<Row> inspectedContents =
78-
// inspectedData.get(Util.inspectData).setRowSchema(Util.bqDataSchema);
79-
//
80-
// PCollection<Row> inspectedStats =
81-
// inspectedData.get(Util.auditData).setRowSchema(Util.bqAuditSchema);
82-
//
83-
// PCollection<Row> auditData =
84-
// inspectedStats
85-
// .apply("FileTrackerTransform", new AuditInspectDataTransform())
86-
// .setRowSchema(Util.bqAuditSchema);
87-
//
88-
// auditData.apply(
89-
// "WriteAuditData",
90-
// BigQueryIO.<Row>write()
91-
// .to(options.getAuditTableSpec())
92-
// .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
93-
// .useBeamSchema()
94-
// .withoutValidation()
95-
// .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
96-
// .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
97-
//
98-
// auditData
99-
// .apply("RowToJson", new RowToJson())
100-
// .apply("WriteToTopic", PubsubIO.writeStrings().to(options.getTopic()));
67+
PCollection<Row> inspectedContents =
68+
inspectedData.get(Util.inspectData).setRowSchema(Util.bqDataSchema);
10169

102-
// inspectedContents.apply(
103-
// "WriteInspectData",
104-
// BQWriteTransform.newBuilder()
105-
// .setTableSpec(options.getTableSpec())
106-
// .setMethod(options.getWriteMethod())
107-
// .build());
70+
PCollection<Row> inspectedStats =
71+
inspectedData.get(Util.auditData).setRowSchema(Util.bqAuditSchema);
72+
73+
PCollection<Row> auditData =
74+
inspectedStats
75+
.apply("FileTrackerTransform", new AuditInspectDataTransform())
76+
.setRowSchema(Util.bqAuditSchema);
77+
78+
auditData.apply(
79+
"WriteAuditData",
80+
BigQueryIO.<Row>write()
81+
.to(options.getAuditTableSpec())
82+
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
83+
.useBeamSchema()
84+
.withoutValidation()
85+
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
86+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
87+
88+
auditData
89+
.apply("RowToJson", new RowToJson())
90+
.apply("WriteToTopic", PubsubIO.writeStrings().to(options.getTopic()));
91+
92+
inspectedContents.apply(
93+
"WriteInspectData",
94+
BQWriteTransform.newBuilder()
95+
.setTableSpec(options.getTableSpec())
96+
.setMethod(options.getWriteMethod())
97+
.build());
10898
return p.run();
10999
}
110100
}

src/main/java/com/google/swarm/tokenization/common/DLPTransform.java

+2-19
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
*/
1616
package com.google.swarm.tokenization.common;
1717

18-
import com.google.api.gax.retrying.RetrySettings;
1918
import com.google.auto.value.AutoValue;
2019
import com.google.cloud.dlp.v2.DlpServiceClient;
21-
import com.google.cloud.dlp.v2.DlpServiceSettings;
2220
import com.google.privacy.dlp.v2.ContentItem;
2321
import com.google.privacy.dlp.v2.InspectContentRequest;
2422
import com.google.privacy.dlp.v2.InspectContentResponse;
@@ -45,17 +43,15 @@
4543
import org.apache.beam.sdk.values.PCollectionTuple;
4644
import org.apache.beam.sdk.values.Row;
4745
import org.apache.beam.sdk.values.TupleTagList;
46+
import org.joda.time.Duration;
4847
import org.slf4j.Logger;
4948
import org.slf4j.LoggerFactory;
50-
import org.threeten.bp.Duration;
5149

5250
@AutoValue
5351
public abstract class DLPTransform
5452
extends PTransform<PCollection<KV<String, String>>, PCollectionTuple> {
5553
public static final Logger LOG = LoggerFactory.getLogger(DLPTransform.class);
5654

57-
private static Integer DLP_PAYLOAD_LIMIT = 524288;
58-
5955
public abstract String inspectTemplateName();
6056

6157
public abstract String projectId();
@@ -90,8 +86,6 @@ public PCollectionTuple expand(PCollection<KV<String, String>> input) {
9086
public static class BatchRequest extends DoFn<KV<String, String>, KV<String, Iterable<String>>> {
9187

9288
private static final long serialVersionUID = 1L;
93-
// private final Counter numberOfRowsBagged =
94-
// Metrics.counter(BatchRequest.class, "numberOfRowsBagged");
9589
private Integer batchSize;
9690

9791
public BatchRequest(Integer batchSize) {
@@ -111,8 +105,7 @@ public void process(
111105
BoundedWindow w,
112106
@StateId("elementsBag") BagState<KV<String, String>> elementsBag) {
113107
elementsBag.add(element);
114-
// eventTimer.set(w.maxTimestamp());
115-
eventTimer.offset(org.joda.time.Duration.standardSeconds(10)).setRelative();
108+
eventTimer.offset(Duration.standardSeconds(10)).setRelative();
116109
}
117110

118111
@OnTimer("eventTimer")
@@ -147,7 +140,6 @@ public void onTimer(
147140
// must be a better way
148141
if (!rows.isEmpty()) {
149142
LOG.debug("Remaining buffer {}, key{}", rows.size(), key);
150-
// numberOfRowsBagged.inc(rows.size());
151143
output.output(KV.of(key, rows));
152144
}
153145
}
@@ -176,15 +168,6 @@ public void setup() {
176168

177169
@StartBundle
178170
public void startBundle() throws IOException {
179-
180-
// DlpServiceSettings.Builder settingsBuilder = DlpServiceSettings.newBuilder();
181-
// settingsBuilder
182-
// .inspectContentSettings()
183-
// .setRetrySettings(
184-
// RetrySettings.newBuilder()
185-
// .setInitialRpcTimeout(Duration.ofSeconds(60))
186-
// .setMaxRpcTimeout(Duration.ofSeconds(60))
187-
// .build());
188171
dlpServiceClient = DlpServiceClient.create();
189172
}
190173

src/main/java/com/google/swarm/tokenization/common/FileReaderTransform.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public PCollection<KV<String, String>> expand(PBegin input) {
6767
"ReadFileMetadata",
6868
PubsubIO.readMessagesWithAttributes().fromSubscription(subscriber()))
6969
.apply("ConvertToGCSUri", ParDo.of(new MapPubSubMessage()))
70-
.apply("FindFile", FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW))
70+
.apply("FindFile", FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))
7171
.apply(FileIO.readMatches())
7272
.apply("AddFileNameAsKey", ParDo.of(new FileSourceDoFn()))
7373
.apply("ReadFile", ParDo.of(new FileReaderSplitDoFn(delimeter(), keyRange())));

0 commit comments

Comments
 (0)