Skip to content

Commit 6388894

Browse files
authored
refactor: Refactor Parquet reader to avoid loading entire file in memory at once (#184)
* refactor:Refactor Parquet reader to avoid whole-file loading in memory Read Parquet metadata from the footer and fetch column chunk bytes by seek instead of loading the entire file into memory up front. This keeps the current page decoding path intact while reducing peak memory usage for normal file reads, ensuring that only the column chunks needed are loaded into memory. One column chunk at a time so extra memory is bounded by the size of the column chunk. This is also the first step towards a streaming reader. * Use Map.Strict to reduce the possibility of unevaluated thunk leak * Remove forceNonSeekable from ReaderOpts, use functions + partial application to inject testing behavior while preserving readParquetWithOpts API
1 parent 727c579 commit 6388894

File tree

6 files changed

+216
-26
lines changed

6 files changed

+216
-26
lines changed

dataframe.cabal

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ library
9191
DataFrame.IO.Parquet.Compression,
9292
DataFrame.IO.Parquet.Encoding,
9393
DataFrame.IO.Parquet.Page,
94+
DataFrame.IO.Parquet.Seeking,
9495
DataFrame.IO.Parquet.Time,
9596
DataFrame.IO.Parquet.Types,
9697
DataFrame.Lazy.IO.CSV,
@@ -143,6 +144,8 @@ library
143144
stm >= 2.5 && < 3,
144145
filepath >= 1.4 && < 2,
145146
Glob >= 0.10 && < 1,
147+
streamly-core,
148+
streamly-bytestring,
146149

147150
hs-source-dirs: src
148151
c-sources: cbits/process_csv.c

src/DataFrame/IO/Parquet.hs

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import Data.Either
1313
import Data.IORef
1414
import Data.Int
1515
import qualified Data.List as L
16-
import qualified Data.Map as M
16+
import qualified Data.Map.Strict as M
1717
import qualified Data.Set as S
1818
import qualified Data.Text as T
1919
import Data.Text.Encoding
@@ -37,7 +37,9 @@ import DataFrame.IO.Parquet.Types
3737
import System.Directory (doesDirectoryExist)
3838

3939
import qualified Data.Vector.Unboxed as VU
40+
import DataFrame.IO.Parquet.Seeking
4041
import System.FilePath ((</>))
42+
import System.IO (IOMode (ReadMode))
4143

4244
-- Options -----------------------------------------------------------------
4345

@@ -130,8 +132,13 @@ cleanColPath nodes path = go nodes path False
130132
p : go (sChildren n) ps False
131133

132134
readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
133-
readParquetWithOpts opts path = do
134-
(fileMetadata, contents) <- readMetadataFromPath path
135+
readParquetWithOpts = _readParquetWithOpts Nothing
136+
137+
-- | Internal function to pass testing parameters
138+
_readParquetWithOpts ::
139+
ForceNonSeekable -> ParquetReadOptions -> FilePath -> IO DataFrame
140+
_readParquetWithOpts extraConfig opts path = withFileBufferedOrSeekable extraConfig path ReadMode $ \file -> do
141+
fileMetadata <- readMetadataFromHandle file
135142
let columnPaths = getColumnPaths (drop 1 $ schema fileMetadata)
136143
let columnNames = map fst columnPaths
137144
let leafNames = map (last . T.splitOn ".") columnNames
@@ -204,7 +211,11 @@ readParquetWithOpts opts path = do
204211
else colDataPageOffset
205212
let colLength = columnTotalCompressedSize metadata
206213

207-
let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents)
214+
columnBytes <-
215+
seekAndReadBytes
216+
(Just (AbsoluteSeek, fromIntegral colStart))
217+
(fromIntegral colLength)
218+
file
208219

209220
pages <- readAllPages (columnCodec metadata) columnBytes
210221

@@ -236,13 +247,13 @@ readParquetWithOpts opts path = do
236247
Nothing -> do
237248
mc <- DI.newMutableColumn totalRows column
238249
DI.copyIntoMutableColumn mc 0 column
239-
modifyIORef colMutMap (M.insert colFullName mc)
240-
modifyIORef colOffMap (M.insert colFullName (DI.columnLength column))
250+
modifyIORef' colMutMap (M.insert colFullName mc)
251+
modifyIORef' colOffMap (M.insert colFullName (DI.columnLength column))
241252
Just mc -> do
242253
off <- (M.! colFullName) <$> readIORef colOffMap
243254
DI.copyIntoMutableColumn mc off column
244-
modifyIORef colOffMap (M.adjust (+ DI.columnLength column) colFullName)
245-
modifyIORef lTypeMap (M.insert colFullName lType)
255+
modifyIORef' colOffMap (M.adjust (+ DI.columnLength column) colFullName)
256+
modifyIORef' lTypeMap (M.insert colFullName lType)
246257

247258
finalMutMap <- readIORef colMutMap
248259
finalColMap <-
@@ -321,24 +332,35 @@ applyReadOptions opts =
321332

322333
-- File and metadata parsing -----------------------------------------------
323334

335+
-- | read the file in memory at once, parse magicString and return the entire file ByteString
324336
readMetadataFromPath :: FilePath -> IO (FileMetadata, BSO.ByteString)
325337
readMetadataFromPath path = do
326338
contents <- BSO.readFile path
327-
let (size, magicString) = contents `seq` readMetadataSizeFromFooter contents
339+
let (size, magicString) = readMetadataSizeFromFooter contents
328340
when (magicString /= "PAR1") $ error "Invalid Parquet file"
329341
meta <- readMetadata contents size
330342
pure (meta, contents)
331343

332-
readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString)
333-
readMetadataSizeFromFooter contents =
344+
-- | read from the end of the file, parse magicString and return the entire file ByteString
345+
readMetadataFromHandle :: FileBufferedOrSeekable -> IO FileMetadata
346+
readMetadataFromHandle sh = do
347+
footerBs <- readLastBytes (fromIntegral footerSize) sh
348+
let (size, magicString) = readMetadataSizeFromFooterSlice footerBs
349+
when (magicString /= "PAR1") $ error "Invalid Parquet file"
350+
readMetadataByHandleMetaSize sh size
351+
352+
-- | Takes the last 8 bit of the file to parse metadata size and magic string
353+
readMetadataSizeFromFooterSlice :: BSO.ByteString -> (Int, BSO.ByteString)
354+
readMetadataSizeFromFooterSlice contents =
334355
let
335-
footerOffSet = BSO.length contents - 8
336-
footer = BSO.drop footerOffSet contents
337-
size = fromIntegral (littleEndianWord32 footer)
338-
magicString = BSO.take 4 (BSO.drop 4 footer)
356+
size = fromIntegral (littleEndianWord32 contents)
357+
magicString = BSO.take 4 (BSO.drop 4 contents)
339358
in
340359
(size, magicString)
341360

361+
readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString)
362+
readMetadataSizeFromFooter = readMetadataSizeFromFooterSlice . BSO.takeEnd 8
363+
342364
-- Schema navigation -------------------------------------------------------
343365

344366
getColumnPaths :: [SchemaElement] -> [(T.Text, Int)]

src/DataFrame/IO/Parquet/Binary.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ readAndAdvance :: IORef Int -> BS.ByteString -> IO Word8
6262
readAndAdvance bufferPos buffer = do
6363
pos <- readIORef bufferPos
6464
let b = BS.index buffer pos
65-
modifyIORef bufferPos (+ 1)
65+
modifyIORef' bufferPos (+ 1)
6666
return b
6767

6868
readVarIntFromBuffer :: (Integral a) => BS.ByteString -> IORef Int -> IO a
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
{- | This module contains low-level utilities around file seeking
2+
3+
potentially also contains all Streamly related low-level utilities.
4+
5+
later this module can be renamed / moved to an internal module.
6+
-}
7+
module DataFrame.IO.Parquet.Seeking (
8+
SeekableHandle (getSeekableHandle),
9+
SeekMode (..),
10+
FileBufferedOrSeekable (..),
11+
ForceNonSeekable,
12+
advanceBytes,
13+
mkFileBufferedOrSeekable,
14+
mkSeekableHandle,
15+
readLastBytes,
16+
seekAndReadBytes,
17+
seekAndStreamBytes,
18+
withFileBufferedOrSeekable,
19+
) where
20+
21+
import Control.Monad
22+
import Control.Monad.IO.Class
23+
import qualified Data.ByteString as BS
24+
import Data.IORef
25+
import Data.Int
26+
import Data.Word
27+
import Streamly.Data.Stream (Stream)
28+
import qualified Streamly.Data.Stream as S
29+
import qualified Streamly.External.ByteString as SBS
30+
import qualified Streamly.FileSystem.Handle as SHandle
31+
import System.IO
32+
33+
{- | This handle carries a proof that it must be seekable.
34+
Note: Handle and SeekableHandle are not thread safe, should not be
35+
shared across threads, beaware when running parallel/concurrent code.
36+
37+
Not seekable:
38+
- stdin / stdout
39+
- pipes / FIFOs
40+
41+
But regular files are always seekable. Parquet fundamentally wants random
42+
access, a non-seekable source will not support effecient access without
43+
buffering the entire file.
44+
-}
45+
newtype SeekableHandle = SeekableHandle {getSeekableHandle :: Handle}
46+
47+
{- | If we truely want to support non-seekable files, we need to also consider the case
48+
to buffer the entire file in memory.
49+
50+
Not thread safe, contains mutable reference (as Handle already is).
51+
52+
If we need concurrent / parallel parsing or something, we need to read into ByteString
53+
first, not sharing the same handle.
54+
-}
55+
data FileBufferedOrSeekable
56+
= FileBuffered !(IORef Int64) !BS.ByteString
57+
| FileSeekable !SeekableHandle
58+
59+
-- | Smart constructor for SeekableHandle
60+
mkSeekableHandle :: Handle -> IO (Maybe SeekableHandle)
61+
mkSeekableHandle h = do
62+
seekable <- hIsSeekable h
63+
pure $ if seekable then Just (SeekableHandle h) else Nothing
64+
65+
-- | For testing only
66+
type ForceNonSeekable = Maybe Bool
67+
68+
{- | Smart constructor for FileBufferedOrSeekable, tries to keep in the seekable case
69+
if possible.
70+
-}
71+
mkFileBufferedOrSeekable ::
72+
ForceNonSeekable -> Handle -> IO FileBufferedOrSeekable
73+
mkFileBufferedOrSeekable forceNonSeek h = do
74+
seekable <- hIsSeekable h
75+
if not seekable || forceNonSeek == Just True
76+
then FileBuffered <$> newIORef 0 <*> BS.hGetContents h
77+
else pure $ FileSeekable $ SeekableHandle h
78+
79+
{- | With / bracket pattern for FileBufferedOrSeekable
80+
81+
Warning: do not return the FileBufferedOrSeekable outside the scope of the action as
82+
it will be closed.
83+
-}
84+
withFileBufferedOrSeekable ::
85+
ForceNonSeekable ->
86+
FilePath ->
87+
IOMode ->
88+
(FileBufferedOrSeekable -> IO a) ->
89+
IO a
90+
withFileBufferedOrSeekable forceNonSeek path ioMode action = withFile path ioMode $ \h -> do
91+
fbos <- mkFileBufferedOrSeekable forceNonSeek h
92+
action fbos
93+
94+
-- | Read from the end, useful for reading metadata without loading entire file
95+
readLastBytes :: Integer -> FileBufferedOrSeekable -> IO BS.ByteString
96+
readLastBytes n (FileSeekable sh) = do
97+
let h = getSeekableHandle sh
98+
hSeek h SeekFromEnd (negate n)
99+
S.fold SBS.write (SHandle.read h)
100+
readLastBytes n (FileBuffered i bs) = do
101+
writeIORef i (fromIntegral $ BS.length bs)
102+
when (n > fromIntegral (BS.length bs)) $ error "lastBytes: n > length bs"
103+
pure $ BS.drop (BS.length bs - fromIntegral n) bs
104+
105+
-- | Note: this does not guarantee n bytes (if it ends early)
106+
advanceBytes :: Int -> FileBufferedOrSeekable -> IO BS.ByteString
107+
advanceBytes = seekAndReadBytes Nothing
108+
109+
-- | Note: this does not guarantee n bytes (if it ends early)
110+
seekAndReadBytes ::
111+
Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> IO BS.ByteString
112+
seekAndReadBytes mSeek len f = seekAndStreamBytes mSeek len f >>= S.fold SBS.write
113+
114+
{- | Warning: the stream produced from this function accesses to the mutable handler.
115+
if multiple streams are pulled from the same handler at the same time, chaos happen.
116+
Make sure there is only one stream running at one time for each SeekableHandle,
117+
and streams are not read again when they are not used anymore.
118+
-}
119+
seekAndStreamBytes ::
120+
(MonadIO m) =>
121+
Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> m (Stream m Word8)
122+
seekAndStreamBytes mSeek len f = do
123+
liftIO $
124+
case mSeek of
125+
Nothing -> pure ()
126+
Just (seekMode, seekTo) -> fSeek f seekMode seekTo
127+
pure $ S.take len $ fRead f
128+
129+
fSeek :: FileBufferedOrSeekable -> SeekMode -> Integer -> IO ()
130+
fSeek (FileSeekable (SeekableHandle h)) seekMode seekTo = hSeek h seekMode seekTo
131+
fSeek (FileBuffered i bs) AbsoluteSeek seekTo = writeIORef i (fromIntegral seekTo)
132+
fSeek (FileBuffered i bs) RelativeSeek seekTo = modifyIORef' i (+ fromIntegral seekTo)
133+
fSeek (FileBuffered i bs) SeekFromEnd seekTo = writeIORef i (fromIntegral $ BS.length bs + fromIntegral seekTo)
134+
135+
fRead :: (MonadIO m) => FileBufferedOrSeekable -> Stream m Word8
136+
fRead (FileSeekable (SeekableHandle h)) = SHandle.read h
137+
fRead (FileBuffered i bs) = S.concatEffect $ do
138+
pos <- liftIO $ readIORef i
139+
pure $
140+
S.mapM
141+
( \x -> do
142+
liftIO (modifyIORef' i (+ 1))
143+
pure x
144+
)
145+
(S.unfold SBS.reader (BS.drop (fromIntegral pos) bs))

src/DataFrame/IO/Parquet/Thrift.hs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import qualified Data.Vector as V
2121
import qualified Data.Vector.Unboxed as VU
2222
import Data.Word
2323
import DataFrame.IO.Parquet.Binary
24+
import DataFrame.IO.Parquet.Seeking
2425
import DataFrame.IO.Parquet.Types
2526
import qualified DataFrame.Internal.Column as DI
2627
import DataFrame.Internal.DataFrame (DataFrame, unsafeGetColumn)
@@ -329,6 +330,17 @@ skipList buf pos = do
329330
let elemType = toTType sizeAndType
330331
replicateM_ sizeOnly (skipFieldData elemType buf pos)
331332

333+
{- | This avoids reading entire bytestring at once: it uses the seekable handle
334+
seeks it to the end of the file to read the metadata
335+
-}
336+
readMetadataByHandleMetaSize :: FileBufferedOrSeekable -> Int -> IO FileMetadata
337+
readMetadataByHandleMetaSize sh metaSize = do
338+
let lastFieldId = 0
339+
bs <- readLastBytes (fromIntegral $ metaSize + footerSize) sh
340+
bufferPos <- newIORef 0
341+
readFileMetaData defaultMetadata bs bufferPos lastFieldId
342+
343+
-- | metadata starts from (L - 8 - meta_size) to L - 8 - 1.
332344
readMetadata :: BS.ByteString -> Int -> IO FileMetadata
333345
readMetadata contents size = do
334346
let metadataStartPos = BS.length contents - footerSize - size

tests/Parquet.hs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module Parquet where
66
import Assertions (assertExpectException)
77
import qualified DataFrame as D
88
import qualified DataFrame.Functions as F
9+
import qualified DataFrame.IO.Parquet as DP
910

1011
import qualified Data.ByteString as BS
1112
import Data.Int
@@ -61,13 +62,20 @@ allTypes =
6162
)
6263
]
6364

65+
testBothReadParquetPaths :: ((FilePath -> IO D.DataFrame) -> Test) -> Test
66+
testBothReadParquetPaths test =
67+
TestList
68+
[ test D.readParquet
69+
, test (DP._readParquetWithOpts (Just True) D.defaultParquetReadOptions)
70+
]
71+
6472
allTypesPlain :: Test
65-
allTypesPlain =
73+
allTypesPlain = testBothReadParquetPaths $ \readParquet ->
6674
TestCase
6775
( assertEqual
6876
"allTypesPlain"
6977
allTypes
70-
(unsafePerformIO (D.readParquet "./tests/data/alltypes_plain.parquet"))
78+
(unsafePerformIO (readParquet "./tests/data/alltypes_plain.parquet"))
7179
)
7280

7381
allTypesTinyPagesDimensions :: Test
@@ -163,7 +171,7 @@ tinyPagesLast10 =
163171
]
164172

165173
allTypesTinyPagesLastFew :: Test
166-
allTypesTinyPagesLastFew =
174+
allTypesTinyPagesLastFew = testBothReadParquetPaths $ \readParquet ->
167175
TestCase
168176
( assertEqual
169177
"allTypesTinyPages dimensions"
@@ -172,27 +180,27 @@ allTypesTinyPagesLastFew =
172180
-- Excluding doubles because they are weird to compare.
173181
( fmap
174182
(D.takeLast 10 . D.exclude ["double_col"])
175-
(D.readParquet "./tests/data/alltypes_tiny_pages.parquet")
183+
(readParquet "./tests/data/alltypes_tiny_pages.parquet")
176184
)
177185
)
178186
)
179187

180188
allTypesPlainSnappy :: Test
181-
allTypesPlainSnappy =
189+
allTypesPlainSnappy = testBothReadParquetPaths $ \readParquet ->
182190
TestCase
183191
( assertEqual
184192
"allTypesPlainSnappy"
185193
(D.filter (F.col @Int32 "id") (`elem` [6, 7]) allTypes)
186-
(unsafePerformIO (D.readParquet "./tests/data/alltypes_plain.snappy.parquet"))
194+
(unsafePerformIO (readParquet "./tests/data/alltypes_plain.snappy.parquet"))
187195
)
188196

189197
allTypesDictionary :: Test
190-
allTypesDictionary =
198+
allTypesDictionary = testBothReadParquetPaths $ \readParquet ->
191199
TestCase
192200
( assertEqual
193201
"allTypesPlainSnappy"
194202
(D.filter (F.col @Int32 "id") (`elem` [0, 1]) allTypes)
195-
(unsafePerformIO (D.readParquet "./tests/data/alltypes_dictionary.parquet"))
203+
(unsafePerformIO (readParquet "./tests/data/alltypes_dictionary.parquet"))
196204
)
197205

198206
selectedColumnsWithOpts :: Test
@@ -468,12 +476,12 @@ transactions =
468476
]
469477

470478
transactionsTest :: Test
471-
transactionsTest =
479+
transactionsTest = testBothReadParquetPaths $ \readParquet ->
472480
TestCase
473481
( assertEqual
474482
"transactions"
475483
transactions
476-
(unsafePerformIO (D.readParquet "./tests/data/transactions.parquet"))
484+
(unsafePerformIO (readParquet "./tests/data/transactions.parquet"))
477485
)
478486

479487
mtCarsDataset :: D.DataFrame

0 commit comments

Comments
 (0)