Skip to content

Commit d797da8

Browse files
committed
fix: return and copy only max files
1 parent 921d74e commit d797da8

File tree

6 files changed

+261
-151
lines changed

6 files changed

+261
-151
lines changed

docker-compose-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ services:
1111
- "6622:22"
1212
command: "foo:pass*+=:1001:1001:upload"
1313
volumes:
14-
- ${PWD}/src/test/resources/ssh/id_rsa.pub:/home/foo/.ssh/authorized_keys
14+
- ${PWD}/src/test/resources/ssh/id_ed25519.pub:/home/foo/.ssh/authorized_keys
1515

1616
# https://github.com/fauria/docker-vsftpd/issues/41
1717
ftp:

src/main/java/io/kestra/plugin/fs/nfs/Trigger.java

Lines changed: 53 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,18 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,
9898
)
9999
private Property<Integer> maxFiles = Property.ofValue(25);
100100

101+
private static class PendingFile {
102+
private final Path path;
103+
private final Entry candidate;
104+
private final ChangeType changeType;
105+
106+
private PendingFile(Path path, Entry candidate, ChangeType changeType) {
107+
this.path = path;
108+
this.candidate = candidate;
109+
this.changeType = changeType;
110+
}
111+
}
112+
101113
@Override
102114
public Duration getInterval() {
103115
return this.interval;
@@ -117,7 +129,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
117129
Optional<Duration> rStateTtl = runContext.render(stateTtl).as(Duration.class);
118130

119131
Map<String, StatefulTriggerService.Entry> state = readState(runContext, rStateKey, rStateTtl);
120-
List<TriggeredFile> toFire = new ArrayList<>();
132+
List<PendingFile> pendingFiles = new ArrayList<>();
121133

122134
logger.debug("Evaluating trigger for path: {}", fromPath);
123135

@@ -127,49 +139,57 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
127139
.filter(path -> rRegExp == null || path.toString().matches(rRegExp))
128140
.toList();
129141

130-
for(Path path: paths) {
131-
try {
132-
BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class);
133-
var modifiedAt = attrs.lastModifiedTime().toInstant();
134-
var key = path.toUri().toString();
135-
var version = String.format("%d_%d", modifiedAt.toEpochMilli(), attrs.size());
136-
137-
var candidate = StatefulTriggerService.Entry.candidate(key, version, modifiedAt);
138-
var change = computeAndUpdateState(state, candidate, rOn);
139-
140-
if (change.fire()) {
141-
var changeType = change.isNew() ? ChangeType.CREATE : ChangeType.UPDATE;
142-
143-
io.kestra.plugin.fs.nfs.List.File fileModel = mapToFile(path);
144-
toFire.add(TriggeredFile.builder()
145-
.file(fileModel)
146-
.changeType(changeType)
147-
.build());
148-
logger.info("File {} detected with change type: {}", path, changeType);
149-
}
150-
} catch (IOException e) {
151-
logger.warn("Error processing path {}: {}", path, e.getMessage(), e);
152-
}
142+
for (Path path : paths) {
143+
try {
144+
BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class);
145+
var modifiedAt = attrs.lastModifiedTime().toInstant();
146+
var key = path.toUri().toString();
147+
var version = String.format("%d_%d", modifiedAt.toEpochMilli(), attrs.size());
148+
var candidate = StatefulTriggerService.Entry.candidate(key, version, modifiedAt);
149+
var prev = state.get(key);
150+
151+
if (!shouldFire(prev, version, rOn)) {
152+
computeAndUpdateState(state, candidate, rOn);
153+
continue;
154+
}
155+
156+
var changeType = prev == null ? ChangeType.CREATE : ChangeType.UPDATE;
157+
pendingFiles.add(new PendingFile(path, candidate, changeType));
158+
} catch (IOException e) {
159+
logger.warn("Error processing path {}: {}", path, e.getMessage(), e);
160+
}
153161
}
154162
} catch (IOException e) {
155163
logger.error("Error walking/listing path {}: {}", fromPath, e.getMessage(), e);
156164
return Optional.empty();
157165
}
158166

159-
writeState(runContext, rStateKey, state, rStateTtl);
160-
161-
if (toFire.isEmpty()) {
162-
logger.debug("No new or updated files found.");
163-
return Optional.empty();
167+
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
168+
if (pendingFiles.size() > rMaxFiles) {
169+
logger.warn("Too many files to process ({}), limiting to {}", pendingFiles.size(), rMaxFiles);
170+
pendingFiles = pendingFiles.subList(0, Math.min(rMaxFiles, pendingFiles.size()));
164171
}
165172

166-
int rMaxFiles = runContext.render(this.maxFiles).as(Integer.class).orElse(25);
167-
if (toFire.size() > rMaxFiles) {
168-
logger.warn("Too many files to process ({}), limiting to {}", toFire.size(), rMaxFiles);
169-
toFire = toFire.subList(0, Math.min(rMaxFiles, toFire.size()));
173+
List<TriggeredFile> toFire = new ArrayList<>();
174+
175+
for (PendingFile pending : pendingFiles) {
176+
try {
177+
io.kestra.plugin.fs.nfs.List.File fileModel = mapToFile(pending.path);
178+
computeAndUpdateState(state, pending.candidate, rOn);
179+
toFire.add(TriggeredFile.builder()
180+
.file(fileModel)
181+
.changeType(pending.changeType)
182+
.build());
183+
logger.info("File {} detected with change type: {}", pending.path, pending.changeType);
184+
} catch (IOException e) {
185+
logger.warn("Error processing path {}: {}", pending.path, e.getMessage(), e);
186+
}
170187
}
171188

189+
writeState(runContext, rStateKey, state, rStateTtl);
190+
172191
if (toFire.isEmpty()) {
192+
logger.debug("No new or updated files found.");
173193
return Optional.empty();
174194
}
175195

0 commit comments

Comments
 (0)