Skip to content

Commit cee0e76

Browse files
authored
Merge pull request #4853 from gchq/gh-4849-proxy-forwarding-stops
PR for #4849 - stroom-proxy forwarding failure if destination name resolution fails at startup
2 parents 08dc983 + a0e0d66 commit cee0e76

20 files changed

+647
-220
lines changed

stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardException.java

+42-8
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import stroom.meta.api.AttributeMap;
44
import stroom.meta.api.StandardHeaderArguments;
55
import stroom.proxy.StroomStatusCode;
6+
import stroom.proxy.app.handler.HttpSender.ResponseStatus;
67
import stroom.util.NullSafe;
78

89
import java.util.Objects;
@@ -12,15 +13,18 @@ public class ForwardException extends RuntimeException {
1213
private final StroomStatusCode stroomStatusCode;
1314
private final String feedName;
1415
private final boolean isRecoverable;
16+
private final int httpResponseCode; // In the case of UNKNOWN_ERROR, this may differ from 999
1517

1618
private ForwardException(final StroomStatusCode stroomStatusCode,
1719
final AttributeMap attributeMap,
1820
final String message,
21+
final int httpResponseCode,
1922
final boolean isRecoverable,
2023
final Throwable cause) {
2124
super(message, cause);
2225
this.isRecoverable = isRecoverable;
2326
this.stroomStatusCode = stroomStatusCode;
27+
this.httpResponseCode = httpResponseCode;
2428
this.feedName = NullSafe.get(
2529
attributeMap,
2630
attrMap -> attrMap.get(StandardHeaderArguments.FEED));
@@ -30,26 +34,52 @@ public static ForwardException recoverable(final StroomStatusCode stroomStatusCo
3034
final AttributeMap attributeMap,
3135
final String message,
3236
final Throwable cause) {
33-
return new ForwardException(stroomStatusCode, attributeMap, message, true, cause);
37+
return new ForwardException(
38+
stroomStatusCode,
39+
attributeMap,
40+
message,
41+
stroomStatusCode.getHttpCode(),
42+
true,
43+
cause);
3444
}
3545

36-
public static ForwardException recoverable(final StroomStatusCode stroomStatusCode,
46+
public static ForwardException recoverable(final ResponseStatus responseStatus,
3747
final AttributeMap attributeMap) {
38-
Objects.requireNonNull(stroomStatusCode);
39-
return new ForwardException(stroomStatusCode, attributeMap, stroomStatusCode.getMessage(), true, null);
48+
Objects.requireNonNull(responseStatus);
49+
final StroomStatusCode stroomStatusCode = responseStatus.stroomStatusCode();
50+
return new ForwardException(
51+
stroomStatusCode,
52+
attributeMap,
53+
Objects.requireNonNullElse(responseStatus.message(), stroomStatusCode.getMessage()),
54+
responseStatus.httpResponseCode(),
55+
true,
56+
null);
4057
}
4158

4259
public static ForwardException nonRecoverable(final StroomStatusCode stroomStatusCode,
4360
final AttributeMap attributeMap,
4461
final String message,
4562
final Throwable cause) {
46-
return new ForwardException(stroomStatusCode, attributeMap, message, false, cause);
63+
return new ForwardException(
64+
stroomStatusCode,
65+
attributeMap,
66+
message,
67+
stroomStatusCode.getHttpCode(),
68+
false,
69+
cause);
4770
}
4871

49-
public static ForwardException nonRecoverable(final StroomStatusCode stroomStatusCode,
72+
public static ForwardException nonRecoverable(final ResponseStatus responseStatus,
5073
final AttributeMap attributeMap) {
51-
Objects.requireNonNull(stroomStatusCode);
52-
return new ForwardException(stroomStatusCode, attributeMap, stroomStatusCode.getMessage(), false, null);
74+
Objects.requireNonNull(responseStatus);
75+
final StroomStatusCode stroomStatusCode = responseStatus.stroomStatusCode();
76+
return new ForwardException(
77+
stroomStatusCode,
78+
attributeMap,
79+
Objects.requireNonNullElse(responseStatus.message(), stroomStatusCode.getMessage()),
80+
responseStatus.httpResponseCode(),
81+
false,
82+
null);
5383
}
5484

5585
public boolean isRecoverable() {
@@ -59,4 +89,8 @@ public boolean isRecoverable() {
5989
public String getFeedName() {
6090
return feedName;
6191
}
92+
93+
public int getHttpResponseCode() {
94+
return httpResponseCode;
95+
}
6296
}

stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardFileConfig.java

+44-12
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,33 @@ public final class ForwardFileConfig
2121
implements IsProxyConfig, ForwarderConfig {
2222

2323
public static final String PROP_NAME_SUB_PATH_TEMPLATE = "subPathTemplate";
24+
public static final String PROP_NAME_ATOMIC_MOVE_ENABLED = "atomicMoveEnabled";
2425
public static final TemplatingMode DEFAULT_TEMPLATING_MODE = TemplatingMode.REPLACE_UNKNOWN_PARAMS;
2526

2627
private static final String DEFAULT_SUB_PATH_TEMPLATE = "${year}${month}${day}/${feed}";
28+
private static final boolean DEFAULT_ATOMIC_MOVE_ENABLED = true;
2729
private static final LivenessCheckMode DEFAULT_LIVENESS_CHECK_MODE = LivenessCheckMode.READ;
2830

2931
private final boolean enabled;
3032
private final boolean instant;
3133
private final String name;
3234
private final String path;
3335
private final PathTemplateConfig subPathTemplate;
34-
private final ForwardQueueConfig forwardQueueConfig;
36+
private final ForwardFileQueueConfig forwardQueueConfig;
3537
private final String livenessCheckPath;
3638
private final LivenessCheckMode livenessCheckMode;
39+
private final boolean atomicMoveEnabled;
3740

3841
public ForwardFileConfig() {
3942
enabled = true;
4043
instant = false;
4144
name = null;
4245
path = null;
4346
subPathTemplate = null;
44-
forwardQueueConfig = null; // Assume local file forwarder by default, so no queue config needed
47+
forwardQueueConfig = new ForwardFileQueueConfig();
4548
livenessCheckPath = null;
4649
livenessCheckMode = LivenessCheckMode.READ;
50+
atomicMoveEnabled = DEFAULT_ATOMIC_MOVE_ENABLED;
4751
}
4852

4953
@SuppressWarnings("unused")
@@ -53,17 +57,19 @@ public ForwardFileConfig(@JsonProperty("enabled") final boolean enabled,
5357
@JsonProperty("name") final String name,
5458
@JsonProperty("path") final String path,
5559
@JsonProperty(PROP_NAME_SUB_PATH_TEMPLATE) final PathTemplateConfig subPathTemplate,
56-
@JsonProperty("queue") final ForwardQueueConfig forwardQueueConfig,
60+
@JsonProperty("queue") final ForwardFileQueueConfig forwardQueueConfig,
5761
@JsonProperty("livenessCheckPath") final String livenessCheckPath,
58-
@JsonProperty("livenessCheckMode") final LivenessCheckMode livenessCheckMode) {
62+
@JsonProperty("livenessCheckMode") final LivenessCheckMode livenessCheckMode,
63+
@JsonProperty(PROP_NAME_ATOMIC_MOVE_ENABLED) final Boolean atomicMoveEnabled) {
5964
this.enabled = enabled;
6065
this.instant = instant;
6166
this.name = name;
6267
this.path = path;
6368
this.subPathTemplate = Objects.requireNonNullElse(subPathTemplate, PathTemplateConfig.DISABLED);
64-
this.forwardQueueConfig = forwardQueueConfig;
69+
this.forwardQueueConfig = Objects.requireNonNullElseGet(forwardQueueConfig, ForwardFileQueueConfig::new);
6570
this.livenessCheckPath = livenessCheckPath;
6671
this.livenessCheckMode = Objects.requireNonNullElse(livenessCheckMode, DEFAULT_LIVENESS_CHECK_MODE);
72+
this.atomicMoveEnabled = Objects.requireNonNullElse(atomicMoveEnabled, DEFAULT_ATOMIC_MOVE_ENABLED);
6773
}
6874

6975
private ForwardFileConfig(final Builder builder) {
@@ -75,6 +81,7 @@ private ForwardFileConfig(final Builder builder) {
7581
forwardQueueConfig = builder.forwardQueueConfig;
7682
livenessCheckPath = builder.livenessCheckPath;
7783
livenessCheckMode = builder.livenessCheckMode;
84+
atomicMoveEnabled = builder.atomicMoveEnabled;
7885
}
7986

8087
/**
@@ -130,13 +137,14 @@ public PathTemplateConfig getSubPathTemplate() {
130137
return subPathTemplate;
131138
}
132139

140+
@NotNull
133141
@Override
134142
@JsonProperty("queue")
135143
@JsonPropertyDescription("Adds multi-threading and retry control to this forwarder. Can be set to null " +
136144
"for a local file forwarder, but should be populated if the file forwarder is " +
137145
"forwarding to a remote file system that may fail. Defaults to null as a " +
138146
"local file forwarder is assumed.")
139-
public ForwardQueueConfig getForwardQueueConfig() {
147+
public ForwardFileQueueConfig getForwardQueueConfig() {
140148
return forwardQueueConfig;
141149
}
142150

@@ -169,10 +177,24 @@ public String getLivenessCheckPath() {
169177
return livenessCheckPath;
170178
}
171179

180+
@JsonPropertyDescription(
181+
"The type of liveness check to perform (READ|WRITE). " +
182+
"READ will attempt to read the file/dir specified in livenessCheckPath. " +
183+
"WRITE will attempt to touch the file specified in livenessCheckPath.")
172184
public LivenessCheckMode getLivenessCheckMode() {
173185
return livenessCheckMode;
174186
}
175187

188+
@JsonPropertyDescription(
189+
"Stroom-Proxy will attempt to move files onto the forward destination using an atomic move. " +
190+
"This ensures that the move does not happen more than once. If an atomic move is not possible, " +
191+
"e.g. the destination is a remote file system that does not support an atomic move, then it will " +
192+
"fall back to a non-atomic move with the risk of it happening more than once. If you see warnings " +
193+
"in the logs or know the file system will not support atomic moves then set this to false.")
194+
public boolean isAtomicMoveEnabled() {
195+
return atomicMoveEnabled;
196+
}
197+
176198
@Override
177199
public boolean equals(final Object o) {
178200
if (this == o) {
@@ -189,7 +211,8 @@ public boolean equals(final Object o) {
189211
&& Objects.equals(subPathTemplate, that.subPathTemplate)
190212
&& Objects.equals(forwardQueueConfig, that.forwardQueueConfig)
191213
&& Objects.equals(livenessCheckPath, that.livenessCheckPath)
192-
&& livenessCheckMode == that.livenessCheckMode;
214+
&& livenessCheckMode == that.livenessCheckMode
215+
&& atomicMoveEnabled == that.atomicMoveEnabled;
193216
}
194217

195218
@Override
@@ -201,7 +224,8 @@ public int hashCode() {
201224
subPathTemplate,
202225
forwardQueueConfig,
203226
livenessCheckPath,
204-
livenessCheckMode);
227+
livenessCheckMode,
228+
atomicMoveEnabled);
205229
}
206230

207231
@Override
@@ -215,6 +239,7 @@ public String toString() {
215239
", forwardQueueConfig=" + forwardQueueConfig +
216240
", livenessCheckPath='" + livenessCheckPath + '\'' +
217241
", livenessCheckMode=" + livenessCheckMode +
242+
", atomicMoveEnabled=" + atomicMoveEnabled +
218243
'}';
219244
}
220245

@@ -232,6 +257,7 @@ public static Builder builder(final ForwardFileConfig copy) {
232257
builder.forwardQueueConfig = copy.getForwardQueueConfig();
233258
builder.livenessCheckPath = copy.getLivenessCheckPath();
234259
builder.livenessCheckMode = copy.getLivenessCheckMode();
260+
builder.atomicMoveEnabled = copy.isAtomicMoveEnabled();
235261
return builder;
236262
}
237263

@@ -241,14 +267,15 @@ public static Builder builder(final ForwardFileConfig copy) {
241267

242268
public static final class Builder {
243269

244-
public String livenessCheckPath;
245-
public LivenessCheckMode livenessCheckMode;
270+
private String livenessCheckPath;
271+
private LivenessCheckMode livenessCheckMode;
272+
private boolean atomicMoveEnabled;
246273
private boolean enabled;
247274
private boolean instant;
248275
private String name;
249276
private String path;
250277
private PathTemplateConfig subPathTemplate;
251-
private ForwardQueueConfig forwardQueueConfig;
278+
private ForwardFileQueueConfig forwardQueueConfig;
252279

253280
private Builder() {
254281
}
@@ -293,7 +320,7 @@ public Builder withSubPathTemplate(final PathTemplateConfig subPathTemplate) {
293320
return this;
294321
}
295322

296-
public Builder withForwardQueueConfig(final ForwardQueueConfig forwardQueueConfig) {
323+
public Builder withForwardQueueConfig(final ForwardFileQueueConfig forwardQueueConfig) {
297324
this.forwardQueueConfig = forwardQueueConfig;
298325
return this;
299326
}
@@ -308,6 +335,11 @@ public Builder withLivenessCheckMode(final LivenessCheckMode livenessCheckMode)
308335
return this;
309336
}
310337

338+
public Builder withAtomicMoveEnabled(final boolean atomicMoveEnabled) {
339+
this.atomicMoveEnabled = atomicMoveEnabled;
340+
return this;
341+
}
342+
311343
public ForwardFileConfig build() {
312344
return new ForwardFileConfig(this);
313345
}

stroom-proxy/stroom-proxy-app/src/main/java/stroom/proxy/app/handler/ForwardFileDestinationFactoryImpl.java

+12-14
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
import stroom.util.io.PathCreator;
66
import stroom.util.logging.LambdaLogger;
77
import stroom.util.logging.LambdaLoggerFactory;
8+
import stroom.util.logging.LogUtil;
89

910
import jakarta.inject.Inject;
1011
import jakarta.inject.Singleton;
1112

1213
import java.nio.file.Path;
14+
import java.util.Objects;
1315

1416
@Singleton
1517
public class ForwardFileDestinationFactoryImpl implements ForwardFileDestinationFactory {
@@ -58,19 +60,15 @@ public ForwardDestination create(final ForwardFileConfig config) {
5860
private ForwardDestination getWrappedForwardDestination(final ForwardFileConfig config,
5961
final ForwardFileDestinationImpl forwardFileDestination) {
6062
final ForwardQueueConfig forwardQueueConfig = config.getForwardQueueConfig();
61-
final ForwardDestination destination;
62-
if (forwardQueueConfig != null) {
63-
// We have queue config so wrap out ultimate destination with some queue/retry logic
64-
destination = new RetryingForwardDestination(
65-
forwardQueueConfig,
66-
forwardFileDestination,
67-
dataDirProvider,
68-
pathCreator,
69-
dirQueueFactory,
70-
proxyServices);
71-
} else {
72-
destination = forwardFileDestination;
73-
}
74-
return destination;
63+
Objects.requireNonNull(forwardQueueConfig, () -> LogUtil.message(
64+
"No forwardQueueConfig set for destination '{}'", config.getName()));
65+
// We have queue config so wrap out ultimate destination with some queue/retry logic
66+
return new RetryingForwardDestination(
67+
forwardQueueConfig,
68+
forwardFileDestination,
69+
dataDirProvider,
70+
pathCreator,
71+
dirQueueFactory,
72+
proxyServices);
7573
}
7674
}

0 commit comments

Comments
 (0)