29
29
urllib_error = attempt_import ('urllib.error' )[0 ]
30
30
ssl = attempt_import ('ssl' )[0 ]
31
31
zipfile = attempt_import ('zipfile' )[0 ]
32
+ tarfile = attempt_import ('tarfile' )[0 ]
32
33
gzip = attempt_import ('gzip' )[0 ]
33
34
distro , distro_available = attempt_import ('distro' )
34
35
@@ -371,7 +372,7 @@ def get_zip_archive(self, url, dirOffset=0):
371
372
# Simple sanity checks
372
373
for info in zip_file .infolist ():
373
374
f = info .filename
374
- if f [0 ] in '\\ /' or '..' in f :
375
+ if f [0 ] in '\\ /' or '..' in f or os . path . isabs ( f ) :
375
376
logger .error (
376
377
"malformed (potentially insecure) filename (%s) "
377
378
"found in zip archive. Skipping file." % (f ,)
@@ -387,6 +388,61 @@ def get_zip_archive(self, url, dirOffset=0):
387
388
info .filename = target [- 1 ] + '/' if f [- 1 ] == '/' else target [- 1 ]
388
389
zip_file .extract (f , os .path .join (self ._fname , * tuple (target [dirOffset :- 1 ])))
389
390
391
+ def get_tar_archive (self , url , dirOffset = 0 ):
392
+ if self ._fname is None :
393
+ raise DeveloperError (
394
+ "target file name has not been initialized "
395
+ "with set_destination_filename"
396
+ )
397
+ if os .path .exists (self ._fname ) and not os .path .isdir (self ._fname ):
398
+ raise RuntimeError (
399
+ "Target directory (%s) exists, but is not a directory" % (self ._fname ,)
400
+ )
401
+
402
+ def filter_fcn (info ):
403
+ # this mocks up the `tarfile` filter introduced in Python
404
+ # 3.12 and backported to later releases of Python (e.g.,
405
+ # 3.8.17, 3.9.17, 3.10.12, and 3.11.4)
406
+ f = info .name
407
+ if os .path .isabs (f ) or '..' in f or f .startswith (('/' , os .sep )):
408
+ logger .error (
409
+ "malformed or potentially insecure filename (%s). "
410
+ "Skipping file." % (f ,)
411
+ )
412
+ return False
413
+ target = self ._splitpath (f )
414
+ if len (target ) <= dirOffset :
415
+ if not info .isdir ():
416
+ logger .warning (
417
+ "Skipping file (%s) in tar archive due to dirOffset." % (f ,)
418
+ )
419
+ return False
420
+ info .name = f = '/' .join (target [dirOffset :])
421
+ target = os .path .realpath (os .path .join (dest , f ))
422
+ try :
423
+ if os .path .commonpath ([target , dest ]) != dest :
424
+ logger .error (
425
+ "potentially insecure filename (%s) resolves outside target "
426
+ "directory. Skipping file." % (f ,)
427
+ )
428
+ return False
429
+ except ValueError :
430
+ # commonpath() will raise ValueError for paths that
431
+ # don't have anything in common (notably, when files are
432
+ # on different drives on Windows)
433
+ logger .error (
434
+ "potentially insecure filename (%s) resolves outside target "
435
+ "directory. Skipping file." % (f ,)
436
+ )
437
+ return False
438
+ # Strip high bits & group/other write bits
439
+ info .mode &= 0o755
440
+ return True
441
+
442
+ with tarfile .open (fileobj = io .BytesIO (self .retrieve_url (url ))) as TAR :
443
+ dest = os .path .realpath (self ._fname )
444
+ TAR .extractall (dest , filter (filter_fcn , TAR .getmembers ()))
445
+
390
446
def get_gzipped_binary_file (self , url ):
391
447
if self ._fname is None :
392
448
raise DeveloperError (
0 commit comments