Skip to content

Commit d2c58b7

Browse files
committed
Handle duplicate tars
1 parent 0761d4b commit d2c58b7

File tree

2 files changed

+92
-25
lines changed

2 files changed

+92
-25
lines changed

zstash/extract.py

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import tarfile
1414
import traceback
1515
from datetime import datetime
16-
from typing import DefaultDict, List, Optional, Tuple
16+
from typing import DefaultDict, List, Optional, Set, Tuple
1717

1818
import _hashlib
1919
import _io
@@ -390,25 +390,75 @@ def multiprocess_extract(
390390

391391

392392
def check_sizes_match(cur, tfname):
393-
match: bool
394393
if cur and tars_table_exists(cur):
395394
logger.info(f"{tfname} exists. Checking expected size matches actual size.")
396-
actual_size = os.path.getsize(tfname)
397-
name_only = os.path.split(tfname)[1]
398-
cur.execute(f"select size from tars where name is '{name_only}';")
399-
expected_size: int = cur.fetchall()[0][0]
395+
actual_size: int = os.path.getsize(tfname)
396+
name_only: str = os.path.split(tfname)[1]
397+
398+
# Get ALL entries for this tar name
399+
cur.execute("SELECT size FROM tars WHERE name = ?", (name_only,))
400+
results = cur.fetchall()
401+
402+
if not results:
403+
# Cannot access size information; assume the sizes match.
404+
logger.error(f"No database entries found for {name_only}")
405+
return True
406+
407+
# Check for multiple entries
408+
if len(results) > 1:
409+
# Extract just the size values
410+
sizes: List[int] = [row[0] for row in results]
411+
unique_sizes: Set[int] = set(sizes)
412+
413+
logger.error(
414+
f"{name_only}: Found {len(results)} database entries for this tar"
415+
)
416+
logger.error(f"{name_only}: Database sizes: {sizes}")
417+
418+
if len(unique_sizes) > 1:
419+
# Multiple entries with different sizes.
420+
for unique_size in unique_sizes:
421+
if unique_size == actual_size:
422+
logger.info(
423+
f"{name_only}: There exists at least one entry with the same size as the actual file size: {unique_size}. "
424+
)
425+
break
426+
error_msg = (
427+
f"{name_only}: Database corruption detected! "
428+
f"Found {len(results)} entries with {len(unique_sizes)} different sizes: {list(unique_sizes)}. "
429+
f"Actual file size: {actual_size}. "
430+
f"Database cleanup required before proceeding."
431+
)
432+
logger.error(error_msg)
433+
raise ValueError(error_msg)
434+
else:
435+
# Multiple entries, but they all have the same size
436+
logger.warning(
437+
f"{name_only}: Found {len(results)} duplicate database entries, "
438+
f"but all have the same size ({sizes[0]}). Consider cleaning up duplicates."
439+
)
440+
expected_size = sizes[0]
441+
else:
442+
# Single entry - normal case
443+
expected_size = results[0][0]
444+
445+
# Now check if actual size matches expected size
400446
if expected_size != actual_size:
401-
logger.info(
402-
f"{name_only}: expected size={expected_size} != {actual_size}=actual_size"
447+
error_msg = (
448+
f"{name_only}: Size mismatch! "
449+
f"Expected={expected_size} != {actual_size}=actual. "
450+
f"Difference={actual_size - expected_size}."
403451
)
404-
match = False
452+
logger.error(error_msg)
453+
return False
405454
else:
406455
# Sizes match
407-
match = True
456+
logger.info(f"{name_only}: Size check passed ({actual_size} bytes)")
457+
return True
408458
else:
409459
# Cannot access size information; assume the sizes match.
410-
match = True
411-
return match
460+
logger.debug("Cannot access tar size information; assuming sizes match")
461+
return True
412462

413463

414464
# FIXME: C901 'extractFiles' is too complex (33)

zstash/hpss_utils.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -144,25 +144,16 @@ def add_files(
144144
tar_md5: Optional[str] = tarFileObject.md5()
145145
tarFileObject.close()
146146
logger.info(f"{ts_utc()}: (add_files): Completed archive file {tfname}")
147-
if not skip_tars_md5:
148-
tar_tuple: TupleTarsRowNoId = (tfname, tarsize, tar_md5)
149-
logger.info("tar name={}, tar size={}, tar md5={}".format(*tar_tuple))
150-
if not tars_table_exists(cur):
151-
# Need to create tars table
152-
create_tars_table(cur, con)
153-
cur.execute("insert into tars values (NULL,?,?,?)", tar_tuple)
154-
con.commit()
155147

156148
# Transfer tar to HPSS
157149
if config.hpss is not None:
158150
hpss: str = config.hpss
159151
else:
160152
raise TypeError("Invalid config.hpss={}".format(config.hpss))
161153

162-
# NOTE: These lines could be added under an "if debug" condition
163-
# logger.info(f"{ts_utc()}: CONTENTS of CACHE upon call to hpss_put:")
164-
# process = subprocess.run(["ls", "-l", "zstash"], capture_output=True, text=True)
165-
# print(process.stdout)
154+
logger.info(
155+
f"Contents of the cache prior to `hpss_put`: {os.listdir(os.path.join(cache, tfname))}"
156+
)
166157

167158
logger.info(
168159
f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname} [keep, non_blocking] = [{keep}, {non_blocking}]"
@@ -172,7 +163,33 @@ def add_files(
172163
f"{ts_utc()}: SURFACE (add_files): Called hpss_put to dispatch archive file {tfname}"
173164
)
174165

175-
# Update database with files that have been archived
166+
if not skip_tars_md5:
167+
tar_tuple: TupleTarsRowNoId = (tfname, tarsize, tar_md5)
168+
logger.info("tar name={}, tar size={}, tar md5={}".format(*tar_tuple))
169+
if not tars_table_exists(cur):
170+
# Need to create tars table
171+
create_tars_table(cur, con)
172+
# We're done adding files to the tar.
173+
# And we've transferred it to HPSS.
174+
# Now we can insert the tar into the database.
175+
cur.execute("SELECT COUNT(*) FROM tars WHERE name = ?", (tfname,))
176+
if cur.fetchone()[0] == 0:
177+
# Typical case
178+
# Doesn't exist - insert new
179+
cur.execute("INSERT INTO tars VALUES (NULL,?,?,?)", tar_tuple)
180+
else:
181+
# Unusual case
182+
# Exists - update with new size and md5
183+
logger.warning(
184+
f"Possible database corruption. Updated existing tar {tfname} with new size {tarsize}"
185+
)
186+
cur.execute(
187+
"UPDATE tars SET size = ?, md5 = ? WHERE name = ?",
188+
(tarsize, tar_md5, tfname),
189+
)
190+
con.commit()
191+
192+
# Update database with the individual files that have been archived
176193
# Add a row to the "files" table,
177194
# the last 6 columns matching the values of `archived`
178195
cur.executemany("insert into files values (NULL,?,?,?,?,?,?)", archived)

0 commit comments

Comments
 (0)