diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index bdc25dcf0fe5..6a0c67fd4a82 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -295,6 +295,9 @@ def read(self, num_bytes: Optional[int] = None) -> bytes: if not self._decompressor: raise ValueError('decompressor not initialized') + if num_bytes is None: + num_bytes = DEFAULT_READ_BUFFER_SIZE + self._fetch_to_internal_buffer(num_bytes) return self._read_from_internal_buffer( lambda: self._read_buffer.read(num_bytes)) diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py index a4d456a366da..ff53648d692a 100644 --- a/sdks/python/apache_beam/io/filesystem_test.py +++ b/sdks/python/apache_beam/io/filesystem_test.py @@ -470,6 +470,21 @@ def test_read_and_seek_back_to_beginning(self): self.assertEqual(first_pass, second_pass) + def test_read(self): + for compression_type in [CompressionTypes.BZIP2, + CompressionTypes.DEFLATE, + CompressionTypes.GZIP, + CompressionTypes.ZSTD, + CompressionTypes.LZMA]: + file_name = self._create_compressed_file(compression_type, self.content) + with open(file_name, 'rb') as f: + compressed_fd = CompressedFile( + f, compression_type, read_size=self.read_block_size) + + data = compressed_fd.read() + + self.assertEqual(data, self.content) + def test_tell(self): lines = [b'line%d\n' % i for i in range(10)] tmpfile = self._create_temp_file()