Skip to content

Commit

Permalink
[3.2] Fix the consistency issue of listFrom API
Browse files Browse the repository at this point in the history
  • Loading branch information
Rong Zeng committed Jan 22, 2025
1 parent 43e7bfa commit 99cae01
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ public void write(
resolvedPath.getName(),
tempPath,
false, // not complete
null // no expireTime
null, // no expireTime
fs.getDefaultBlockSize(),
System.currentTimeMillis()
);

// Step 2.1: Create temp file T(N)
Expand Down Expand Up @@ -455,7 +457,7 @@ private void copyFile(FileSystem fs, Path src, Path dst) throws IOException {
/**
* Returns path stripped user info.
*/
private Path stripUserInfo(Path path) {
protected Path stripUserInfo(Path path) {
final URI uri = path.toUri();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,31 @@ public final class ExternalCommitEntry {
*/
public final Long expireTime;

/*
* File size
*/
public final Long fileSize;

/*
* file modification time
*/
public final Long modificationTime;

public ExternalCommitEntry(
Path tablePath,
String fileName,
String tempPath,
boolean complete,
Long expireTime) {
Long expireTime,
Long fileSize,
Long modificationTime) {
this.tablePath = tablePath;
this.fileName = fileName;
this.tempPath = tempPath;
this.complete = complete;
this.expireTime = expireTime;
this.fileSize = fileSize;
this.modificationTime = modificationTime;
}

/**
Expand All @@ -73,7 +87,9 @@ public ExternalCommitEntry asComplete(long expirationDelaySeconds) {
this.fileName,
this.tempPath,
true,
System.currentTimeMillis() / 1000L + expirationDelaySeconds
System.currentTimeMillis() / 1000L + expirationDelaySeconds,
this.fileSize,
this.modificationTime
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

/*
* Copyright (2021) The Delta Lake Project Authors.
*
Expand All @@ -15,21 +16,31 @@
*/

package io.delta.storage;

import io.delta.storage.internal.S3LogStoreUtil;
import io.delta.storage.utils.ReflectionUtils;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.Lists;

import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.conf.Configuration;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
Expand Down Expand Up @@ -87,7 +98,6 @@ public class S3DynamoDBLogStore extends BaseExternalLogStore {
public static final String DDB_CLIENT_CREDENTIALS_PROVIDER = "credentials.provider";
public static final String DDB_CREATE_TABLE_RCU = "provisionedThroughput.rcu";
public static final String DDB_CREATE_TABLE_WCU = "provisionedThroughput.wcu";

// WARNING: setting this value too low can cause data loss. Defaults to a duration of 1 day.
public static final String TTL_SECONDS = "ddb.ttl";

Expand All @@ -100,6 +110,8 @@ public class S3DynamoDBLogStore extends BaseExternalLogStore {
private static final String ATTR_COMPLETE = "complete";
private static final String ATTR_EXPIRE_TIME = "expireTime";

private static final String ATTR_FILE_SIZE = "fileSize";
private static final String ATTR_MODI_TIME = "modificationTime";
/**
* Member fields
*/
Expand Down Expand Up @@ -139,6 +151,84 @@ public S3DynamoDBLogStore(Configuration hadoopConf) throws IOException {
tryEnsureTableExists(hadoopConf);
}

private Iterator<FileStatus> mergeFileLists(
List<FileStatus> list,
List<FileStatus> listWithPrecedence) {
final Map<Path, FileStatus> fileStatusMap = new HashMap<>();

// insert all elements from `listWithPrecedence` (highest priority)
// and then insert elements from `list` if and only if that key doesn't already exist
Stream.concat(listWithPrecedence.stream(), list.stream())
.forEach(fs -> fileStatusMap.putIfAbsent(fs.getPath(), fs));

return fileStatusMap
.values()
.stream()
.sorted(Comparator.comparing(a -> a.getPath().getName()))
.iterator();
}

/**
* List files starting from `resolvedPath` (inclusive) in the same directory.
*/
private List<FileStatus> listFromCache(
FileSystem fs,
Path resolvedPath) {
final Path pathKey = super.stripUserInfo(resolvedPath);
final Path tablePath = getTablePath(resolvedPath);

final Map<String, Condition> conditions = new ConcurrentHashMap<>();
conditions.put(
ATTR_TABLE_PATH,
new Condition()
.withComparisonOperator(ComparisonOperator.EQ)
.withAttributeValueList(new AttributeValue(tablePath.toString()))
);
conditions.put(ATTR_FILE_NAME, new Condition().withComparisonOperator(ComparisonOperator.GE)
.withAttributeValueList(new AttributeValue(pathKey.getName())));

final List<Map<String,AttributeValue>> items = client.query(
new QueryRequest(tableName)
.withConsistentRead(true)
.withScanIndexForward(false)
.withLimit(1)
.withKeyConditions(conditions)
).getItems();

List<FileStatus> statuses = new ArrayList<FileStatus>();
items.forEach(item -> {
ExternalCommitEntry entry = dbResultToCommitEntry(item);
if (entry.complete == true) {
long fileSize = entry.fileSize != null ? entry.fileSize : 0L;
long modificationTime = entry.modificationTime != null ? entry.modificationTime : System.currentTimeMillis();
statuses.add(
new FileStatus(
entry.fileSize,
false,
1,
fileSize,
modificationTime,
entry.absoluteFilePath()
));
}
});
return statuses;
}

@Override
public Iterator<FileStatus> listFrom(Path path, Configuration hadoopConf) throws IOException {
final FileSystem fs = path.getFileSystem(hadoopConf);
final Path resolvedPath = stripUserInfo(fs.makeQualified(path));

final List<FileStatus> listedFromFs = Lists.newArrayList(super.listFrom(path, hadoopConf));

// add this to list the completed entry from external cache
// on the occasion that the filesystem could not provide strong consistency
final List<FileStatus> listedFromCache = listFromCache(fs, resolvedPath);

return mergeFileLists(listedFromCache, listedFromFs);
}

@Override
public CloseableIterator<String> read(Path path, Configuration hadoopConf) throws IOException {
// With many concurrent readers/writers, there's a chance that concurrent 'recovery'
Expand Down Expand Up @@ -222,11 +312,16 @@ protected Optional<ExternalCommitEntry> getLatestExternalEntry(Path tablePath) {
*/
private ExternalCommitEntry dbResultToCommitEntry(Map<String, AttributeValue> item) {
final AttributeValue expireTimeAttr = item.get(ATTR_EXPIRE_TIME);
Long fileSize = item.get(ATTR_FILE_SIZE) != null ? Long.parseLong(item.get(ATTR_FILE_SIZE).getN()) : 0;
Long modiTime = item.get(ATTR_MODI_TIME) != null ? Long.parseLong(item.get(ATTR_MODI_TIME).getN()) : System.currentTimeMillis() / 1000;

return new ExternalCommitEntry(
new Path(item.get(ATTR_TABLE_PATH).getS()),
item.get(ATTR_FILE_NAME).getS(),
item.get(ATTR_TEMP_PATH).getS(),
item.get(ATTR_COMPLETE).getS().equals("true"),
fileSize,
modiTime,
expireTimeAttr != null ? Long.parseLong(expireTimeAttr.getN()) : null
);
}
Expand All @@ -236,6 +331,11 @@ private PutItemRequest createPutItemRequest(ExternalCommitEntry entry, boolean o
attributes.put(ATTR_TABLE_PATH, new AttributeValue(entry.tablePath.toString()));
attributes.put(ATTR_FILE_NAME, new AttributeValue(entry.fileName));
attributes.put(ATTR_TEMP_PATH, new AttributeValue(entry.tempPath));
attributes.put(ATTR_FILE_SIZE, new AttributeValue().withN(
String.valueOf(entry.fileSize != null ? entry.fileSize : 0L)));
attributes.put(ATTR_MODI_TIME, new AttributeValue().withN(
String.valueOf(entry.modificationTime != null ? entry.modificationTime : System.currentTimeMillis())
));
attributes.put(
ATTR_COMPLETE,
new AttributeValue().withS(Boolean.toString(entry.complete))
Expand Down

0 comments on commit 99cae01

Please sign in to comment.