Skip to content

Commit 542757a

Browse files
authored
Changes to avoid breaking in-flight queries during avro2orc conversion (#2002)
1 parent 0a093ce commit 542757a

File tree

2 files changed

+201
-13
lines changed

2 files changed

+201
-13
lines changed

gobblin-data-management/src/main/java/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java

+73-13
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package gobblin.data.management.conversion.hive.converter;
1818

1919
import java.io.IOException;
20+
import java.util.Arrays;
2021
import java.util.Collections;
2122
import java.util.HashMap;
2223
import java.util.LinkedHashMap;
@@ -39,6 +40,7 @@
3940
import org.apache.hadoop.hive.metastore.api.FieldSchema;
4041
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
4142
import org.apache.hadoop.hive.metastore.api.Table;
43+
import org.apache.hadoop.hive.ql.metadata.HiveException;
4244
import org.apache.hadoop.hive.ql.metadata.Partition;
4345
import org.apache.thrift.TException;
4446

@@ -135,6 +137,14 @@ public String getConfigPrefix() {
135137
public static final String HIVE_DATASET_DESTINATION_SKIP_SETGROUP = "hive.dataset.destination.skip.setGroup";
136138
public static final boolean DEFAULT_HIVE_DATASET_DESTINATION_SKIP_SETGROUP = false;
137139

140+
/**
141+
* If the property is set to true then partition dir is overwritten,
142+
* else a new time-stamped partition dir is created to avoid breaking in-flight queries
143+
* Check gobblin.data.management.retention.Avro2OrcStaleDatasetCleaner to clean stale directories
144+
*/
145+
public static final String HIVE_DATASET_PARTITION_OVERWRITE = "hive.dataset.partition.overwrite";
146+
public static final boolean DEFAULT_HIVE_DATASET_PARTITION_OVERWRITE = true;
147+
138148
/**
139149
* If set to true, a set format DDL will be separate from add partition DDL
140150
*/
@@ -459,31 +469,35 @@ public Iterable<QueryBasedHiveConversionEntity> convertRecord(Schema outputAvroS
459469
// Step:
460470
// A.2.3, B.2.3: If partitioned table, move partitions from staging to final table; for all partitions:
461471

462-
// Step:
463-
// A.2.3.1, B.2.3.1: Drop if exists partition in final table
464-
List<String> dropPartitionsDDL =
465-
HiveAvroORCQueryGenerator.generateDropPartitionsDDL(orcTableDatabase,
466-
orcTableName,
467-
partitionsDMLInfo);
468-
log.debug("Drop partitions if exist in final table: " + dropPartitionsDDL);
469-
publishQueries.addAll(dropPartitionsDDL);
470-
471472
// Step:
472473
// A.2.3.2, B.2.3.2: Move partition directory
473474
// Move: orcStagingDataPartitionLocation to: orcFinalDataPartitionLocation
474475
String orcFinalDataPartitionLocation = orcDataLocation + Path.SEPARATOR + orcStagingDataPartitionDirName;
475-
log.info("Partition directory to move: " + orcStagingDataPartitionLocation + " to: " + orcFinalDataPartitionLocation);
476+
Optional<Path> destPartitionLocation = getDestinationPartitionLocation(destinationTableMeta, workUnit,
477+
conversionEntity.getHivePartition().get().getName());
478+
orcFinalDataPartitionLocation =
479+
updatePartitionLocation(orcFinalDataPartitionLocation, workUnit, destPartitionLocation);
480+
log.info(
481+
"Partition directory to move: " + orcStagingDataPartitionLocation + " to: " + orcFinalDataPartitionLocation);
476482
publishDirectories.put(orcStagingDataPartitionLocation, orcFinalDataPartitionLocation);
483+
// Step:
484+
// A.2.3.1, B.2.3.1: Drop if exists partition in final table
477485

478486
// Step:
487+
// If destination partition already exists, alter the partition location
479488
// A.2.3.3, B.2.3.3: Create partition with location (and update storage format if not in ORC already)
480-
String orcDataPartitionLocation = orcDataLocation + Path.SEPARATOR + orcStagingDataPartitionDirName;
489+
List<String> dropPartitionsDDL =
490+
HiveAvroORCQueryGenerator.generateDropPartitionsDDL(orcTableDatabase,
491+
orcTableName,
492+
partitionsDMLInfo);
493+
log.debug("Drop partitions if exist in final table: " + dropPartitionsDDL);
494+
publishQueries.addAll(dropPartitionsDDL);
481495
if (workUnit.getPropAsBoolean(HIVE_CONVERSION_SETSERDETOAVROEXPLICITELY,
482496
DEFAULT_HIVE_CONVERSION_SETSERDETOAVROEXPLICITELY)) {
483497
List<String> createFinalPartitionDDL =
484498
HiveAvroORCQueryGenerator.generateCreatePartitionDDL(orcTableDatabase,
485499
orcTableName,
486-
orcDataPartitionLocation,
500+
orcFinalDataPartitionLocation,
487501
partitionsDMLInfo,
488502
Optional.<String>absent());
489503

@@ -503,7 +517,7 @@ public Iterable<QueryBasedHiveConversionEntity> convertRecord(Schema outputAvroS
503517
List<String> createFinalPartitionDDL =
504518
HiveAvroORCQueryGenerator.generateCreatePartitionDDL(orcTableDatabase,
505519
orcTableName,
506-
orcDataPartitionLocation,
520+
orcFinalDataPartitionLocation,
507521
partitionsDMLInfo,
508522
Optional.fromNullable(ORC_FORMAT));
509523

@@ -747,4 +761,50 @@ private Pair<Optional<Table>, Optional<List<Partition>>> getDestinationTableMeta
747761

748762
return ImmutablePair.of(table, partitions);
749763
}
764+
765+
/**
766+
* If partition already exists then new partition location will be a separate time stamp dir
767+
* If partition location is /a/b/c/<oldTimeStamp> then new partition location is /a/b/c/<currentTimeStamp>
768+
* If partition location is /a/b/c/ then new partition location is /a/b/c/<currentTimeStamp>
769+
**/
770+
private String updatePartitionLocation(String orcDataPartitionLocation, WorkUnitState workUnitState,
771+
Optional<Path> destPartitionLocation)
772+
throws DataConversionException {
773+
774+
if (workUnitState.getPropAsBoolean(HIVE_DATASET_PARTITION_OVERWRITE, DEFAULT_HIVE_DATASET_PARTITION_OVERWRITE)) {
775+
return orcDataPartitionLocation;
776+
}
777+
if (!destPartitionLocation.isPresent()) {
778+
return orcDataPartitionLocation;
779+
}
780+
long timeStamp = System.currentTimeMillis();
781+
return StringUtils.join(Arrays.asList(orcDataPartitionLocation, timeStamp), '/');
782+
}
783+
784+
private Optional<Path> getDestinationPartitionLocation(Optional<Table> table, WorkUnitState state,
785+
String partitionName)
786+
throws DataConversionException {
787+
Optional<org.apache.hadoop.hive.metastore.api.Partition> partitionOptional =
788+
Optional.<org.apache.hadoop.hive.metastore.api.Partition>absent();
789+
if (!table.isPresent()) {
790+
return Optional.<Path>absent();
791+
}
792+
try {
793+
HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(state.getJobState().getProperties(),
794+
Optional.fromNullable(state.getJobState().getProp(HiveDatasetFinder.HIVE_METASTORE_URI_KEY)));
795+
try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) {
796+
partitionOptional =
797+
Optional.of(client.get().getPartition(table.get().getDbName(), table.get().getTableName(), partitionName));
798+
}
799+
if (partitionOptional.isPresent()) {
800+
org.apache.hadoop.hive.ql.metadata.Table qlTable = new org.apache.hadoop.hive.ql.metadata.Table(table.get());
801+
org.apache.hadoop.hive.ql.metadata.Partition qlPartition =
802+
new org.apache.hadoop.hive.ql.metadata.Partition(qlTable, partitionOptional.get());
803+
return Optional.of(qlPartition.getDataLocation());
804+
}
805+
} catch (IOException | TException | HiveException e) {
806+
throw new DataConversionException("Could not fetch destination table metadata", e);
807+
}
808+
return Optional.<Path>absent();
809+
}
750810
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package gobblin.data.management.retention;
2+
3+
import java.io.IOException;
4+
import java.util.Arrays;
5+
import java.util.HashSet;
6+
import java.util.Iterator;
7+
import java.util.Properties;
8+
import java.util.Set;
9+
import java.util.concurrent.TimeUnit;
10+
11+
import org.apache.hadoop.conf.Configuration;
12+
import org.apache.hadoop.fs.FileStatus;
13+
import org.apache.hadoop.fs.FileSystem;
14+
import org.apache.hadoop.fs.Path;
15+
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
16+
import org.apache.hadoop.hive.ql.metadata.Partition;
17+
import org.apache.log4j.Logger;
18+
19+
import com.google.common.base.Optional;
20+
import com.typesafe.config.Config;
21+
import com.typesafe.config.ConfigFactory;
22+
23+
import azkaban.jobExecutor.AbstractJob;
24+
25+
import gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
26+
import gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder;
27+
import gobblin.data.management.conversion.hive.events.EventConstants;
28+
import gobblin.data.management.conversion.hive.validation.ValidationJob;
29+
import gobblin.data.management.copy.hive.HiveDataset;
30+
import gobblin.data.management.copy.hive.HiveDatasetFinder;
31+
import gobblin.data.management.copy.hive.HiveUtils;
32+
import gobblin.instrumented.Instrumented;
33+
import gobblin.metrics.MetricContext;
34+
import gobblin.metrics.event.EventSubmitter;
35+
import gobblin.util.AutoReturnableObject;
36+
import gobblin.util.ConfigUtils;
37+
38+
39+
public class Avro2OrcStaleDatasetCleaner extends AbstractJob {
40+
private static final Logger log = Logger.getLogger(ValidationJob.class);
41+
private static final String HIVE_PARTITION_DELETION_GRACE_TIME_IN_DAYS = "hive.partition.deletion.graceTime.inDays";
42+
private static final String DEFAULT_HIVE_PARTITION_DELETION_GRACE_TIME_IN_DAYS = "2";
43+
private final MetricContext metricContext;
44+
private final EventSubmitter eventSubmitter;
45+
private final ConvertibleHiveDatasetFinder datasetFinder;
46+
private static final String HIVE_DATASET_CONFIG_AVRO_PREFIX = "hive.conversion.avro";
47+
private final FileSystem fs;
48+
private final long graceTimeInMillis;
49+
50+
public Avro2OrcStaleDatasetCleaner(String jobId, Properties props)
51+
throws IOException {
52+
super(jobId, log);
53+
props.setProperty(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, HIVE_DATASET_CONFIG_AVRO_PREFIX);
54+
this.graceTimeInMillis = TimeUnit.DAYS.toMillis(Long.parseLong(props
55+
.getProperty(HIVE_PARTITION_DELETION_GRACE_TIME_IN_DAYS, DEFAULT_HIVE_PARTITION_DELETION_GRACE_TIME_IN_DAYS)));
56+
Config config = ConfigFactory.parseProperties(props);
57+
this.fs = FileSystem.newInstance(new Configuration());
58+
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), ValidationJob.class);
59+
this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, EventConstants.CONVERSION_NAMESPACE).build();
60+
this.datasetFinder = new ConvertibleHiveDatasetFinder(this.fs, props, this.eventSubmitter);
61+
}
62+
63+
@Override
64+
public void run()
65+
throws Exception {
66+
Iterator<HiveDataset> iterator = this.datasetFinder.getDatasetsIterator();
67+
while (iterator.hasNext()) {
68+
ConvertibleHiveDataset hiveDataset = (ConvertibleHiveDataset) iterator.next();
69+
try (AutoReturnableObject<IMetaStoreClient> client = hiveDataset.getClientPool().getClient()) {
70+
Set<Partition> sourcePartitions =
71+
new HashSet<>(HiveUtils.getPartitions(client.get(), hiveDataset.getTable(), Optional.<String>absent()));
72+
73+
sourcePartitions.parallelStream().filter(partition -> isUnixTimeStamp(partition.getDataLocation().getName()))
74+
.forEach(partition -> {
75+
Arrays.stream(listFiles(partition.getDataLocation().getParent())).filter(
76+
fileStatus -> !fileStatus.getPath().toString()
77+
.equalsIgnoreCase(partition.getDataLocation().toString())).forEach(fileStatus -> {
78+
deletePath(fileStatus, this.graceTimeInMillis, true);
79+
});
80+
});
81+
}
82+
}
83+
}
84+
85+
private FileStatus[] listFiles(Path path) {
86+
try {
87+
return this.fs.listStatus(path);
88+
} catch (IOException e) {
89+
log.error("Unalbe to list files for directory " + path, e);
90+
return new FileStatus[0];
91+
}
92+
}
93+
94+
private void deletePath(FileStatus fileStatus, long graceTimeInMillis, boolean recursively) {
95+
long modificationTime = fileStatus.getModificationTime();
96+
long currentTime = System.currentTimeMillis();
97+
if ((currentTime - modificationTime) < 0) {
98+
log.error("Modification time cannot be greater than current time: " + fileStatus.getPath());
99+
return;
100+
}
101+
if ((currentTime - modificationTime) < graceTimeInMillis) {
102+
log.info("Modification time is still within grace time for deletion: " + fileStatus.getPath());
103+
return;
104+
}
105+
try {
106+
this.fs.delete(fileStatus.getPath(), recursively);
107+
log.info("Deleted path " + fileStatus.getPath());
108+
} catch (IOException e) {
109+
log.error("Unable to delete directory " + fileStatus.getPath(), e);
110+
}
111+
}
112+
113+
/**
114+
* Check if a given string is a valid unixTimeStamp
115+
*/
116+
private static boolean isUnixTimeStamp(String timeStamp) {
117+
int TIME_STAMP_LENGTH = 13;
118+
if (timeStamp.length() != TIME_STAMP_LENGTH) {
119+
return false;
120+
}
121+
try {
122+
Long.parseLong(timeStamp);
123+
return true;
124+
} catch (NumberFormatException e) {
125+
return false;
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)