Skip to content

Commit dc70755

Browse files
authored
Fixed an edge case for incremental run failures on cloud file-systems (#1185)
1 parent 72d1113 commit dc70755

File tree

8 files changed

+176
-149
lines changed

8 files changed

+176
-149
lines changed

pipelines/common/src/main/java/com/google/fhir/analytics/DwhFiles.java

Lines changed: 65 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.time.Instant;
3737
import java.util.ArrayList;
3838
import java.util.Collections;
39+
import java.util.Comparator;
3940
import java.util.HashSet;
4041
import java.util.List;
4142
import java.util.Set;
@@ -72,6 +73,8 @@ public class DwhFiles {
7273

7374
private static final String INCREMENTAL_DIR = "incremental_run";
7475

76+
static final String TIMESTAMP_PREFIX = "_TIMESTAMP_";
77+
7578
// TODO: It is probably better if we build all DWH files related operations using Beam's
7679
// filesystem API such that when a new filesystem is registered, it automatically works
7780
// everywhere in our code. Note that currently we have hardcoded the valid schema in some places,
@@ -121,6 +124,10 @@ static DwhFiles forRoot(String dwhRoot, FhirContext fhirContext) {
121124
return new DwhFiles(dwhRoot, fhirContext);
122125
}
123126

127+
public static String safeTimestampSuffix() {
128+
return Instant.now().toString().replace(":", "-").replace("-", "_").replace(".", "_");
129+
}
130+
124131
public String getRoot() {
125132
return dwhRoot;
126133
}
@@ -145,50 +152,76 @@ public String getFilePattern(String resourceType) {
145152
"%s*%s", getResourcePath(resourceType).toString(), ParquetUtil.PARQUET_EXTENSION);
146153
}
147154

155+
// TODO: Move this to a util class and make it non-static.
148156
/**
149-
* This returns the default incremental run path; each incremental run is relative to a full path,
150-
* hence we put this directory under the full-run root.
157+
* Returns all the child directories under the given base directory which are 1-level deep. Note
158+
* in many cloud/distributed file-systems, we do not have "directories"; there are only buckets
159+
* and files in those buckets. We use file-seprators (e.g., `/`) to simulate the concept of
160+
* directories. So for example, this method returns an empty set if `baseDir` is `bucket/test` and
161+
* the only file in that bucket is `bucket/test/dir1/dir2/file.txt`. If `baseDir` is
162+
* `bucket/test/dir1`, in the above example, `dir2` is returned.
151163
*
152-
* @return the default incremental run path
164+
* @param baseDir the path under which "directories" are looked for.
165+
* @return The list of all child directories under the base directory
166+
* @throws IOException
153167
*/
154-
public ResourceId getIncrementalRunPath() {
155-
return FileSystems.matchNewResource(getRoot(), true)
156-
.resolve(INCREMENTAL_DIR, StandardResolveOptions.RESOLVE_DIRECTORY);
157-
}
158-
159-
/** This is used when we want to keep a backup of the old incremental run output. */
160-
public ResourceId getIncrementalRunPathWithTimestamp() {
161-
return FileSystems.matchNewResource(getRoot(), true)
162-
.resolve(
163-
String.format("%s_old_%d", INCREMENTAL_DIR, System.currentTimeMillis()),
164-
StandardResolveOptions.RESOLVE_DIRECTORY);
168+
static Set<ResourceId> getAllChildDirectories(String baseDir) throws IOException {
169+
String fileSeparator = getFileSeparatorForDwhFiles(baseDir);
170+
// Avoid using ResourceId.resolve(..) method to resolve the files when the path contains glob
171+
// expressions with multiple special characters like **, */* etc as this api only supports
172+
// single special characters like `*` or `..`. Rather use the FileSystems.match(..) if the path
173+
// contains glob expressions.
174+
List<MatchResult> matchResultList =
175+
FileSystems.match(
176+
List.of(
177+
getPathEndingWithFileSeparator(baseDir, fileSeparator)
178+
+ "*"
179+
+ fileSeparator
180+
+ "*"));
181+
Set<ResourceId> childDirectories = new HashSet<>();
182+
for (MatchResult matchResult : matchResultList) {
183+
if (matchResult.status() == Status.OK && !matchResult.metadata().isEmpty()) {
184+
for (Metadata metadata : matchResult.metadata()) {
185+
childDirectories.add(metadata.resourceId().getCurrentDirectory());
186+
}
187+
} else if (matchResult.status() == Status.ERROR) {
188+
String errorMessage = String.format("Error matching files under directory %s", baseDir);
189+
log.error(errorMessage);
190+
throw new IOException(errorMessage);
191+
}
192+
}
193+
log.info("Child directories of {} are {}", baseDir, childDirectories);
194+
return childDirectories;
165195
}
166196

167197
/**
168-
* Similar to {@link #getIncrementalRunPath} but also checks if that directory exists and if so,
169-
* moves it to {@link #getIncrementalRunPathWithTimestamp()}.
198+
* Also see {@link #newIncrementalRunPath()}
170199
*
171-
* @return same as {@link #getIncrementalRunPath()}
172-
* @throws IOException if the directory move fails
200+
* @return the current incremental run path if one found; null otherwise.
173201
*/
174-
public ResourceId newIncrementalRunPath() throws IOException {
175-
ResourceId incPath = getIncrementalRunPath();
176-
if (hasIncrementalDir()) {
177-
ResourceId movePath = getIncrementalRunPathWithTimestamp();
178-
log.info("Moving the old {} directory to {}", INCREMENTAL_DIR, movePath);
179-
FileSystems.rename(Collections.singletonList(incPath), Collections.singletonList(movePath));
180-
}
181-
return incPath;
202+
@Nullable
203+
public ResourceId getLatestIncrementalRunPath() throws IOException {
204+
List<ResourceId> dirs =
205+
getAllChildDirectories(getRoot()).stream()
206+
.filter(dir -> dir.getFilename().contains(INCREMENTAL_DIR + TIMESTAMP_PREFIX))
207+
.collect(Collectors.toList());
208+
if (dirs.isEmpty()) return null;
209+
210+
Collections.sort(dirs, Comparator.comparing(ResourceId::toString));
211+
return dirs.get(dirs.size() - 1);
182212
}
183213

184214
/**
185-
* @return true iff there is already an incremental run subdirectory in this DWH.
215+
* This returns a new incremental-run path based on the current timestamp. Note that each
216+
* incremental-run is relative to a full-run, hence we put this directory under the full-run root.
217+
*
218+
* @return a new incremental run path based on the current timestamp.
186219
*/
187-
public boolean hasIncrementalDir() throws IOException {
188-
List<MatchResult> matches =
189-
FileSystems.matchResources(Collections.singletonList(getIncrementalRunPath()));
190-
MatchResult matchResult = Iterables.getOnlyElement(matches);
191-
return matchResult.status() == Status.OK;
220+
public ResourceId newIncrementalRunPath() {
221+
return FileSystems.matchNewResource(getRoot(), true)
222+
.resolve(
223+
String.format("%s%s%s", INCREMENTAL_DIR, TIMESTAMP_PREFIX, safeTimestampSuffix()),
224+
StandardResolveOptions.RESOLVE_DIRECTORY);
192225
}
193226

194227
public Set<String> findNonEmptyResourceDirs() throws IOException {

pipelines/common/src/test/java/com/google/fhir/analytics/GcsDwhFilesTest.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static org.hamcrest.MatcherAssert.assertThat;
1919
import static org.hamcrest.Matchers.equalTo;
20+
import static org.hamcrest.Matchers.startsWith;
2021

2122
import ca.uhn.fhir.context.FhirContext;
2223
import com.google.api.services.storage.model.Objects;
@@ -92,7 +93,9 @@ public void newIncrementalRunPathTest() throws IOException {
9293
Mockito.when(mockGcsUtil.getObjects(Mockito.anyList())).thenReturn(items);
9394

9495
ResourceId resourceId = dwhFiles.newIncrementalRunPath();
95-
assertThat(resourceId.toString(), equalTo("gs://testbucket/testdirectory/incremental_run/"));
96+
assertThat(
97+
resourceId.toString(),
98+
startsWith("gs://testbucket/testdirectory/incremental_run" + DwhFiles.TIMESTAMP_PREFIX));
9699
}
97100

98101
@Test
@@ -244,4 +247,41 @@ private StorageObject createStorageObject(String gcsFilename, long fileSize) {
244247
.setName(gcsPath.getObject())
245248
.setSize(size);
246249
}
250+
251+
@Test
252+
public void testGetAllChildDirectoriesOneLevelDeep() throws IOException {
253+
Objects modelObjects = new Objects();
254+
List<StorageObject> items = new ArrayList<>();
255+
// Files within the directory
256+
items.add(
257+
createStorageObject(
258+
"gs://testbucket/testdirectory/Patient/patient.parquet", 1L /* fileSize */));
259+
items.add(
260+
createStorageObject(
261+
"gs://testbucket/testdirectory/Observation/observation.parquet", 2L /* fileSize */));
262+
// This is not returned in this case of GCS because there is no files right "under" TEST1.
263+
// Note in GCS we do not have "directories", we are just simulating them by `/` separators.
264+
items.add(
265+
createStorageObject(
266+
"gs://testbucket/testdirectory/TEST1/TEST2/file.txt", 2L /* fileSize */));
267+
modelObjects.setItems(items);
268+
269+
Mockito.when(
270+
mockGcsUtil.listObjects(
271+
Mockito.eq("testbucket"), Mockito.anyString(), Mockito.isNull()))
272+
.thenReturn(modelObjects);
273+
274+
Set<ResourceId> childDirectories =
275+
DwhFiles.getAllChildDirectories("gs://testbucket/testdirectory");
276+
277+
assertThat(childDirectories.size(), equalTo(2));
278+
assertThat(
279+
childDirectories.contains(
280+
FileSystems.matchNewResource("gs://testbucket/testdirectory/Patient", true)),
281+
equalTo(true));
282+
assertThat(
283+
childDirectories.contains(
284+
FileSystems.matchNewResource("gs://testbucket/testdirectory/Observation", true)),
285+
equalTo(true));
286+
}
247287
}

pipelines/common/src/test/java/com/google/fhir/analytics/LocalDwhFilesTest.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static org.hamcrest.MatcherAssert.assertThat;
1919
import static org.hamcrest.Matchers.equalTo;
20+
import static org.hamcrest.Matchers.startsWith;
2021

2122
import ca.uhn.fhir.context.FhirContext;
2223
import com.google.common.io.Resources;
@@ -33,6 +34,8 @@
3334
import java.util.List;
3435
import java.util.Set;
3536
import java.util.stream.Collectors;
37+
import org.apache.beam.sdk.io.FileSystems;
38+
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
3639
import org.apache.beam.sdk.io.fs.ResourceId;
3740
import org.apache.commons.lang3.SystemUtils;
3841
import org.junit.Assert;
@@ -54,12 +57,31 @@ public void getResourcePathTestWindows() {
5457
assertThat(dwhFiles.getResourcePath("Patient").toString(), equalTo("C:\\tmp\\Patient\\"));
5558
}
5659

60+
@Test
61+
public void getIncrementalRunPathTest() throws IOException {
62+
Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
63+
DwhFiles instance = new DwhFiles("/tmp", FhirContext.forR4Cached());
64+
ResourceId incrementalRunPath1 = instance.newIncrementalRunPath();
65+
ResourceId file1 =
66+
incrementalRunPath1.resolve("file1.txt", StandardResolveOptions.RESOLVE_FILE);
67+
FileSystems.create(file1, "test");
68+
ResourceId incrementalRunPath2 = instance.newIncrementalRunPath();
69+
ResourceId file2 =
70+
incrementalRunPath2.resolve("file2.txt", StandardResolveOptions.RESOLVE_FILE);
71+
FileSystems.create(file2, "test");
72+
// making sure that the last incremental path is returned
73+
assertThat(
74+
instance.getLatestIncrementalRunPath().toString(), equalTo(incrementalRunPath2.toString()));
75+
}
76+
5777
@Test
5878
public void newIncrementalRunPathTestNonWindows() throws IOException {
5979
Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
6080
DwhFiles instance = new DwhFiles("/tmp", FhirContext.forR4Cached());
6181
ResourceId incrementalRunPath = instance.newIncrementalRunPath();
62-
assertThat(incrementalRunPath.toString(), equalTo("/tmp/incremental_run/"));
82+
assertThat(
83+
incrementalRunPath.toString(),
84+
startsWith("/tmp/incremental_run" + DwhFiles.TIMESTAMP_PREFIX));
6385
}
6486

6587
@Test
@@ -242,4 +264,34 @@ public void passWindowsLocalPathDwhRootPrefix_returnsFileSeparator() {
242264
private void createFile(Path path, byte[] bytes) throws IOException {
243265
Files.write(path, bytes);
244266
}
267+
268+
@Test
269+
public void testGetAllChildDirectoriesOneLevelDeep() throws IOException {
270+
Path rootDir = Files.createTempDirectory("DWH_FILES_TEST");
271+
Path childDir1 = Paths.get(rootDir.toString(), "childDir1");
272+
Files.createDirectories(childDir1);
273+
Path fileAtChildDir1 = Path.of(childDir1.toString(), "file1.txt");
274+
createFile(fileAtChildDir1, "SAMPLE TEXT".getBytes(StandardCharsets.UTF_8));
275+
Path childDir2 = Paths.get(rootDir.toString(), "childDir2");
276+
Files.createDirectories(childDir2);
277+
Path fileAtChildDir2 = Path.of(childDir2.toString(), "file2.txt");
278+
createFile(fileAtChildDir2, "SAMPLE TEXT".getBytes(StandardCharsets.UTF_8));
279+
280+
// The following directory should not appear in the results of `getAllChildDirectories`
281+
// because only dirs at one-level deep should be returned.
282+
Path childDir21 = Paths.get(childDir2.toString(), "childDir21");
283+
Files.createDirectories(childDir21);
284+
Path fileAtChildDir21 = Path.of(childDir21.toString(), "file3.txt");
285+
createFile(fileAtChildDir21, "SAMPLE TEXT".getBytes(StandardCharsets.UTF_8));
286+
287+
Set<ResourceId> childDirectories = DwhFiles.getAllChildDirectories(rootDir.toString());
288+
289+
assertThat(childDirectories.size(), equalTo(2));
290+
assertThat(
291+
childDirectories.contains(FileSystems.matchNewResource(childDir1.toString(), true)),
292+
equalTo(true));
293+
assertThat(
294+
childDirectories.contains(FileSystems.matchNewResource(childDir2.toString(), true)),
295+
equalTo(true));
296+
}
245297
}

pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.google.common.base.Strings;
2121
import java.lang.reflect.InvocationTargetException;
2222
import java.lang.reflect.Method;
23-
import java.time.Instant;
2423
import java.util.ArrayList;
2524
import java.util.Comparator;
2625
import java.util.List;
@@ -53,8 +52,6 @@ public class DataProperties {
5352

5453
private static final Logger logger = LoggerFactory.getLogger(DataProperties.class.getName());
5554

56-
static final String TIMESTAMP_PREFIX = "_TIMESTAMP_";
57-
5855
private static final String GET_PREFIX = "get";
5956

6057
private static final Set<String> EXCLUDED_ARGS =
@@ -212,17 +209,16 @@ PipelineConfig createBatchOptions() {
212209
}
213210

214211
// Using underscore for suffix as hyphens are discouraged in hive table names.
215-
String timestampSuffix =
216-
Instant.now().toString().replace(":", "-").replace("-", "_").replace(".", "_");
217-
options.setOutputParquetPath(dwhRootPrefix + TIMESTAMP_PREFIX + timestampSuffix);
212+
String timestampSuffix = DwhFiles.safeTimestampSuffix();
213+
options.setOutputParquetPath(dwhRootPrefix + DwhFiles.TIMESTAMP_PREFIX + timestampSuffix);
218214

219215
PipelineConfig.PipelineConfigBuilder pipelineConfigBuilder = addFlinkOptions(options);
220216

221217
// Get hold of thrift server parquet directory from dwhRootPrefix config.
222218
String thriftServerParquetPathPrefix =
223219
dwhRootPrefix.substring(dwhRootPrefix.lastIndexOf("/") + 1, dwhRootPrefix.length());
224220
pipelineConfigBuilder.thriftServerParquetPath(
225-
thriftServerParquetPathPrefix + TIMESTAMP_PREFIX + timestampSuffix);
221+
thriftServerParquetPathPrefix + DwhFiles.TIMESTAMP_PREFIX + timestampSuffix);
226222
pipelineConfigBuilder.timestampSuffix(timestampSuffix);
227223

228224
return pipelineConfigBuilder.build();

pipelines/controller/src/main/java/com/google/fhir/analytics/DwhFilesManager.java

Lines changed: 2 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private void purgeDwhFiles() {
140140
try {
141141
String prefix = getPrefix(dwhRootPrefix);
142142
List<ResourceId> paths =
143-
getAllChildDirectories(baseDir).stream()
143+
DwhFiles.getAllChildDirectories(baseDir).stream()
144144
.filter(dir -> dir.getFilename().startsWith(prefix))
145145
.collect(Collectors.toList());
146146

@@ -336,7 +336,7 @@ String getPrefix(String dwhRootPrefix) {
336336
}
337337

338338
List<String> findExistingResources(String dwhRoot) throws IOException {
339-
Set<ResourceId> childPaths = getAllChildDirectories(dwhRoot);
339+
Set<ResourceId> childPaths = DwhFiles.getAllChildDirectories(dwhRoot);
340340
Set<String> configuredSet =
341341
new HashSet<>(Arrays.asList(dataProperties.getResourceList().split(",")));
342342
return childPaths.stream()
@@ -345,42 +345,6 @@ List<String> findExistingResources(String dwhRoot) throws IOException {
345345
.collect(Collectors.toList());
346346
}
347347

348-
/**
349-
* Returns all the child directories under the given base directory which are 1-level deep.
350-
*
351-
* @param baseDir
352-
* @return The list of all child directories under the base directory
353-
* @throws IOException
354-
*/
355-
Set<ResourceId> getAllChildDirectories(String baseDir) throws IOException {
356-
String fileSeparator = DwhFiles.getFileSeparatorForDwhFiles(baseDir);
357-
// Avoid using ResourceId.resolve(..) method to resolve the files when the path contains glob
358-
// expressions with multiple special characters like **, */* etc as this api only supports
359-
// single special characters like `*` or `..`. Rather use the FileSystems.match(..) if the path
360-
// contains glob expressions.
361-
List<MatchResult> matchResultList =
362-
FileSystems.match(
363-
List.of(
364-
DwhFiles.getPathEndingWithFileSeparator(baseDir, fileSeparator)
365-
+ "*"
366-
+ fileSeparator
367-
+ "*"));
368-
Set<ResourceId> childDirectories = new HashSet<>();
369-
for (MatchResult matchResult : matchResultList) {
370-
if (matchResult.status() == Status.OK && !matchResult.metadata().isEmpty()) {
371-
for (Metadata metadata : matchResult.metadata()) {
372-
childDirectories.add(metadata.resourceId().getCurrentDirectory());
373-
}
374-
} else if (matchResult.status() == Status.ERROR) {
375-
String errorMessage = String.format("Error matching files under directory %s", baseDir);
376-
logger.error(errorMessage);
377-
throw new IOException(errorMessage);
378-
}
379-
}
380-
logger.info("Child directories : {}", childDirectories);
381-
return childDirectories;
382-
}
383-
384348
private int getLastIndexOfSlash(String dwhRootPrefix) {
385349
CloudPath cloudPath = DwhFiles.parsePath(dwhRootPrefix);
386350
int index = -1;

0 commit comments

Comments
 (0)