Skip to content

Commit e66d159

Browse files
committed
improve scanner scanning to ensure only values within the given range are returned
1 parent da7934a commit e66d159

File tree

5 files changed

+37
-4
lines changed

5 files changed

+37
-4
lines changed

clusterless-main-common/src/main/java/clusterless/cls/command/exec/ArcExecCommandOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ public class ArcExecCommandOptions extends CommonCommandOptions {
2323
@CommandLine.Mixin
2424
ArcCommonOptions arcCommonOptions = new ArcCommonOptions();
2525

26+
@CommandLine.Option(
27+
names = "--dry-run",
28+
description = "Do not execute the arc."
29+
)
30+
private boolean dryRun = false;
31+
2632
static class RangeOrLot {
2733
@CommandLine.ArgGroup(
2834
exclusive = false,
@@ -69,6 +75,10 @@ public ArcExecCommandOptions setNames(List<String> names) {
6975
return this;
7076
}
7177

78+
public boolean dryRun() {
79+
return dryRun;
80+
}
81+
7282
public RangeOptions rangeOptions() {
7383
return rangeOrLot.rangeOptions;
7484
}

clusterless-substrate-aws-kernel/src/main/java/clusterless/cls/substrate/aws/report/ArcExec.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,15 +150,15 @@ private void handleScanner(Printer printer, ArcStatusScanner arcStatusScanner, A
150150
}
151151
}
152152

153-
private static void execStepFunction(Printer printer, StepFunction stepFunction, ArcStatusRecord arcStatusRecord, SinkDataset sink, ManifestURI manifestURI, ArcMeta arcMeta, Placement placement) {
153+
private void execStepFunction(Printer printer, StepFunction stepFunction, ArcStatusRecord arcStatusRecord, SinkDataset sink, ManifestURI manifestURI, ArcMeta arcMeta, Placement placement) {
154154
printer.print("starting arc exec for");
155155
printer.print(" placement: " + arcStatusRecord.arcRecord().placement().display());
156156
printer.print(" project: " + arcStatusRecord.arcRecord().project().display());
157157
printer.print(" arc: " + arcStatusRecord.arcRecord().name());
158158
printer.print(" dataset: " + sink.display());
159159
printer.print(" state: " + Strings.nullToEmpty(arcStatusRecord.state()));
160160
printer.print(" lot: " + arcStatusRecord.lotId());
161-
printer.print(" manifest: " + manifestURI.uri());
161+
printer.print(" source manifest: " + manifestURI.uri());
162162
printer.println();
163163

164164
ArcNotifyEvent notifyEvent = ArcNotifyEvent.builder()
@@ -172,6 +172,11 @@ private static void execStepFunction(Printer printer, StepFunction stepFunction,
172172
ArcDeployment arcDeployment = arcMeta.arcDeployment();
173173
String stepFunctionName = arcDeployment.stepFunctionName();
174174

175+
if (arcExecCommandOptions.dryRun()) {
176+
printer.println("dry-run, not executing arc");
177+
return;
178+
}
179+
175180
stepFunction.start(placement.account(), stepFunctionName, payload)
176181
.isSuccessOrThrow(e -> {
177182
throw new IllegalStateException("unable to start step function: " + stepFunctionName + ", " + e.getMessage(), e);

clusterless-substrate-aws-kernel/src/main/java/clusterless/cls/substrate/aws/report/scanner/ArcStatusScanner.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ public ArcStatusScanner(String profile, ArcRecord arcRecord, Moment earliest, Mo
4141
this.arcStateSupplier = arcStateSupplier;
4242
}
4343

44+
@Override
45+
protected String scannerType() {
46+
return "ArcStatus";
47+
}
48+
4449
@Override
4550
protected StateURI<?, ?> createStateURIFrom(ArcRecord record) {
4651
return ArcStateURI.builder()
@@ -56,19 +61,24 @@ protected Stream<ArcStatusRecord> parseUriStreamIntoStatusRec(Stream<String> res
5661
Stream<ArcStatusRecord> arcStatusRecordStream;
5762

5863
arcStatusRecordStream = resultStream.map(ArcStateURI::parse)
64+
.filter(uri -> uri.lotId().compareTo(startLotInclusive) >= 0 && uri.lotId().compareTo(endLotExclusive) < 0)
5965
.map(uri -> new ArcStatusRecord(record, uri.lotId(), uri.state()));
6066

6167
if (fillGaps) {
6268
// there has to be a better way to zip together ordered streams and remove any dupes by a predicate or
6369
// bifunction
70+
LOG.info("filling gaps for inclusive start: {}, exclusive end: {}", startLotInclusive, endLotExclusive);
6471
arcStatusRecordStream = StreamEx.of(LotStream.stream(startLotInclusive, endLotExclusive))
6572
.map(lot -> new ArcStatusRecord(record, lot, null))
6673
.append(arcStatusRecordStream)
6774
.sortedBy(ArcStatusRecord::lotId)
6875
.collapse(
6976
(l, r) -> l.lotId().equals(r.lotId()),
7077
(l, r) -> l.state() == null ? r : l
71-
);
78+
)
79+
.peek(arcStatusRecord -> {
80+
LOG.info("using arc record: {}", arcStatusRecord);
81+
});
7282
}
7383

7484
Optional<Predicate<ArcState>> supplied = arcStateSupplier.get();

clusterless-substrate-aws-kernel/src/main/java/clusterless/cls/substrate/aws/report/scanner/ManifestScanner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ public ManifestScanner(String profile, DatasetRecord datasetRecord, Moment earli
2525
super(profile, datasetRecord, earliest, latest, false);
2626
}
2727

28+
@Override
29+
protected String scannerType() {
30+
return "ManifestStatus";
31+
}
32+
2833
@Override
2934
protected StateURI<?, ?> createStateURIFrom(DatasetRecord record) {
3035
return ManifestURI.builder()
@@ -36,6 +41,7 @@ public ManifestScanner(String profile, DatasetRecord datasetRecord, Moment earli
3641
@NotNull
3742
protected Stream<DatasetStatusRecord> parseUriStreamIntoStatusRec(Stream<String> resultStream) {
3843
return resultStream.map(ManifestURI::parse)
44+
.filter(uri -> uri.lotId().compareTo(startLotInclusive) >= 0 && uri.lotId().compareTo(endLotExclusive) < 0)
3945
.map(uri -> new DatasetStatusRecord(record, uri));
4046
}
4147

clusterless-substrate-aws-kernel/src/main/java/clusterless/cls/substrate/aws/report/scanner/Scanner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public Scanner(String profile, Rec record, Moment earliest, Moment latest, boole
4646
this.profile = profile;
4747
this.record = record;
4848
this.fillGaps = fillGaps;
49-
LOG.info("creating scanner for: {}", record.display());
49+
LOG.info("creating {} scanner for: {}", scannerType(), record.display());
5050

5151
this.stateURI = createStateURIFrom(record);
5252
this.temporalUnit = findTemporalKeyFor(this.stateURI, earliest, latest);
@@ -67,6 +67,8 @@ public Scanner(String profile, Rec record, Moment earliest, Moment latest, boole
6767
LOG.info("using lot earliest: {}, latest: {}", startLotInclusive, endLotExclusive);
6868
}
6969

70+
protected abstract String scannerType();
71+
7072
public String profile() {
7173
return profile;
7274
}

0 commit comments

Comments
 (0)