1
1
import lzma
2
2
3
- from sc_compression .signatures import Signatures , get_signature
3
+ from sc_compression .exceptions import UnknownFileMagicException
4
+ from sc_compression .signatures import Signatures , get_signature , MAGIC_SC , MAGIC_ZSTD , MAGIC_SCLZ
4
5
from sc_compression .utils .reader import Reader
5
6
6
7
try :
@@ -34,24 +35,29 @@ def decompress(self, buffer: bytes) -> bytes:
34
35
if self .signature == Signatures .SC :
35
36
reader = Reader (buffer , "big" )
36
37
37
- reader .read (2 )
38
+ self .check_magic (MAGIC_SC , reader = reader )
39
+
38
40
self .file_version = reader .read_int32 ()
39
41
if self .file_version == 4 :
40
42
self .file_version = reader .read_int32 ()
41
- elif self .file_version == 0x05000000 :
43
+
44
+ if self .file_version == 0x05000000 :
42
45
reader .endian = "little"
43
46
44
47
metadata_table_offset = reader .read_int32 () # offset to metadata vtable
45
48
reader .read (metadata_table_offset ) # metadata
46
49
else :
47
50
hash_length = reader .read_int32 ()
48
51
self .hash = reader .read (hash_length )
52
+
49
53
return self .decompress (buffer [reader .tell () :])
50
54
elif self .signature == Signatures .SIG :
51
55
return self .decompress (buffer [68 :])
52
56
elif self .signature == Signatures .SCLZ and lzham is not None :
53
57
reader = Reader (buffer , "little" )
54
- reader .read (4 )
58
+
59
+ self .check_magic (MAGIC_SCLZ , reader = reader )
60
+
55
61
dict_size_log2 = reader .read_u_int8 ()
56
62
uncompressed_size = reader .read_int32 ()
57
63
@@ -64,9 +70,21 @@ def decompress(self, buffer: bytes) -> bytes:
64
70
compressed = buffer [:5 ] + b"\xff " * 8 + buffer [9 :]
65
71
return decompressor .decompress (compressed )
66
72
elif self .signature == Signatures .ZSTD and zstandard is not None :
67
- # the decompress.decompress() API errors with zstd.ZstdError: could not
68
- # determine content size in frame header
73
+ self . check_magic ( MAGIC_ZSTD , buffer = buffer )
74
+
69
75
decompressor = zstandard .ZstdDecompressor ()
70
76
return decompressor .decompress (buffer )
71
77
else :
72
78
raise TypeError (self .signature )
79
+
80
+ @staticmethod
81
+ def check_magic (expected_magic : bytes , reader : Reader | None = None , buffer : bytes | None = None ) -> None :
82
+ if reader is not None :
83
+ magic = reader .read (len (expected_magic ))
84
+ elif buffer is not None :
85
+ magic = buffer [:len (expected_magic )]
86
+ else :
87
+ raise Exception ("Cannot find buffer to get magic from." )
88
+
89
+ if magic != expected_magic :
90
+ raise UnknownFileMagicException (magic , magic , expected_magic )
0 commit comments