Skip to content

Commit ec5453e

Browse files
committed
feat: Read parquet files from hugging files.
1 parent 1afa69e commit ec5453e

File tree

5 files changed

+262
-32
lines changed

5 files changed

+262
-32
lines changed

dataframe.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ library
145145
stm >= 2.5 && < 3,
146146
filepath >= 1.4 && < 2,
147147
Glob >= 0.10 && < 1,
148+
http-conduit >= 2.3 && < 3,
148149
streamly-core,
149150
streamly-bytestring,
150151

src/DataFrame.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ import DataFrame.Internal.Row as Row (
301301
toRowVector,
302302
)
303303
import DataFrame.Internal.Schema as Schema (
304+
makeSchema,
304305
schemaType,
305306
)
306307
import DataFrame.Operations.Aggregation as Aggregation (

src/DataFrame/IO/Parquet.hs

Lines changed: 198 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
module DataFrame.IO.Parquet where
88

9-
import Control.Exception (throw)
9+
import Control.Exception (throw, try)
1010
import Control.Monad
1111
import qualified Data.ByteString as BSO
1212
import Data.Either
@@ -27,14 +27,27 @@ import DataFrame.Internal.Expression (Expr, getColumns)
2727
import qualified DataFrame.Operations.Core as DI
2828
import DataFrame.Operations.Merge ()
2929
import qualified DataFrame.Operations.Subset as DS
30-
import System.FilePath.Glob (glob)
30+
import System.FilePath.Glob (compile, glob, match)
3131

32+
import Data.Aeson (FromJSON (..), eitherDecodeStrict, withObject, (.:))
3233
import DataFrame.IO.Parquet.Dictionary
3334
import DataFrame.IO.Parquet.Levels
3435
import DataFrame.IO.Parquet.Page
3536
import DataFrame.IO.Parquet.Thrift
3637
import DataFrame.IO.Parquet.Types
37-
import System.Directory (doesDirectoryExist)
38+
import Network.HTTP.Simple (
39+
getResponseBody,
40+
getResponseStatusCode,
41+
httpBS,
42+
parseRequest,
43+
setRequestHeader,
44+
)
45+
import System.Directory (
46+
doesDirectoryExist,
47+
getHomeDirectory,
48+
getTemporaryDirectory,
49+
)
50+
import System.Environment (lookupEnv)
3851

3952
import qualified Data.Vector.Unboxed as VU
4053
import DataFrame.IO.Parquet.Seeking
@@ -132,7 +145,13 @@ cleanColPath nodes path = go nodes path False
132145
p : go (sChildren n) ps False
133146

134147
readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
135-
readParquetWithOpts = _readParquetWithOpts Nothing
148+
readParquetWithOpts opts path
149+
| isHFUri path = do
150+
paths <- fetchHFParquetFiles path
151+
let optsNoRange = opts{rowRange = Nothing}
152+
dfs <- mapM (_readParquetWithOpts Nothing optsNoRange) paths
153+
pure (applyRowRange opts (mconcat dfs))
154+
| otherwise = _readParquetWithOpts Nothing opts path
136155

137156
-- | Internal function to pass testing parameters
138157
_readParquetWithOpts ::
@@ -292,23 +311,29 @@ ghci| "./tests/data/alltypes_plain*.parquet"
292311
@
293312
-}
294313
readParquetFilesWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
295-
readParquetFilesWithOpts opts path = do
296-
isDir <- doesDirectoryExist path
297-
298-
let pat = if isDir then path </> "*.parquet" else path
299-
300-
matches <- glob pat
301-
302-
files <- filterM (fmap not . doesDirectoryExist) matches
303-
304-
case files of
305-
[] ->
306-
error $
307-
"readParquetFiles: no parquet files found for " ++ path
308-
_ -> do
309-
let optsWithoutRowRange = opts{rowRange = Nothing}
310-
dfs <- mapM (readParquetWithOpts optsWithoutRowRange) files
311-
pure (applyRowRange opts (mconcat dfs))
314+
readParquetFilesWithOpts opts path
315+
| isHFUri path = do
316+
files <- fetchHFParquetFiles path
317+
let optsWithoutRowRange = opts{rowRange = Nothing}
318+
dfs <- mapM (_readParquetWithOpts Nothing optsWithoutRowRange) files
319+
pure (applyRowRange opts (mconcat dfs))
320+
| otherwise = do
321+
isDir <- doesDirectoryExist path
322+
323+
let pat = if isDir then path </> "*.parquet" else path
324+
325+
matches <- glob pat
326+
327+
files <- filterM (fmap not . doesDirectoryExist) matches
328+
329+
case files of
330+
[] ->
331+
error $
332+
"readParquetFiles: no parquet files found for " ++ path
333+
_ -> do
334+
let optsWithoutRowRange = opts{rowRange = Nothing}
335+
dfs <- mapM (readParquetWithOpts optsWithoutRowRange) files
336+
pure (applyRowRange opts (mconcat dfs))
312337

313338
-- Options application -----------------------------------------------------
314339

@@ -599,3 +624,155 @@ unitDivisor TIME_UNIT_UNKNOWN = 1
599624
applyScale :: Int32 -> Int32 -> Double
600625
applyScale scale rawValue =
601626
fromIntegral rawValue / (10 ^ scale)
627+
628+
-- HuggingFace support -----------------------------------------------------
629+
630+
data HFRef = HFRef
631+
{ hfOwner :: T.Text
632+
, hfDataset :: T.Text
633+
, hfGlob :: T.Text
634+
}
635+
636+
data HFParquetFile = HFParquetFile
637+
{ hfpUrl :: T.Text
638+
, hfpConfig :: T.Text
639+
, hfpSplit :: T.Text
640+
, hfpFilename :: T.Text
641+
}
642+
deriving (Show)
643+
644+
instance FromJSON HFParquetFile where
645+
parseJSON = withObject "HFParquetFile" $ \o ->
646+
HFParquetFile
647+
<$> o .: "url"
648+
<*> o .: "config"
649+
<*> o .: "split"
650+
<*> o .: "filename"
651+
652+
newtype HFParquetResponse = HFParquetResponse {hfParquetFiles :: [HFParquetFile]}
653+
654+
instance FromJSON HFParquetResponse where
655+
parseJSON = withObject "HFParquetResponse" $ \o ->
656+
HFParquetResponse <$> o .: "parquet_files"
657+
658+
isHFUri :: FilePath -> Bool
659+
isHFUri = L.isPrefixOf "hf://"
660+
661+
parseHFUri :: FilePath -> Either String HFRef
662+
parseHFUri path =
663+
let stripped = drop (length ("hf://datasets/" :: String)) path
664+
in case T.splitOn "/" (T.pack stripped) of
665+
(owner : dataset : rest)
666+
| not (null rest) ->
667+
Right $ HFRef owner dataset (T.intercalate "/" rest)
668+
_ ->
669+
Left $ "Invalid hf:// URI (expected hf://datasets/owner/dataset/glob): " ++ path
670+
671+
getHFToken :: IO (Maybe BSO.ByteString)
672+
getHFToken = do
673+
envToken <- lookupEnv "HF_TOKEN"
674+
case envToken of
675+
Just t -> pure (Just (encodeUtf8 (T.pack t)))
676+
Nothing -> do
677+
home <- getHomeDirectory
678+
let tokenPath = home </> ".cache" </> "huggingface" </> "token"
679+
result <- try (BSO.readFile tokenPath) :: IO (Either IOError BSO.ByteString)
680+
case result of
681+
Right bs -> pure (Just (BSO.takeWhile (/= 10) bs))
682+
Left _ -> pure Nothing
683+
684+
{- | Extract the repo-relative path from a HuggingFace download URL.
685+
URL format: https://huggingface.co/datasets/{owner}/{dataset}/resolve/{ref}/{path}
686+
Returns the {path} portion (e.g. "data/train-00000-of-00001.parquet").
687+
-}
688+
hfUrlRepoPath :: HFParquetFile -> String
689+
hfUrlRepoPath f =
690+
case T.breakOn "/resolve/" (hfpUrl f) of
691+
(_, rest)
692+
| not (T.null rest) ->
693+
-- Drop "/resolve/", then drop the ref component (up to and including "/")
694+
T.unpack $ T.drop 1 $ T.dropWhile (/= '/') $ T.drop (T.length "/resolve/") rest
695+
_ ->
696+
T.unpack (hfpConfig f) </> T.unpack (hfpSplit f) </> T.unpack (hfpFilename f)
697+
698+
matchesGlob :: T.Text -> HFParquetFile -> Bool
699+
matchesGlob g f = match (compile (T.unpack g)) (hfUrlRepoPath f)
700+
701+
resolveHFUrls :: Maybe BSO.ByteString -> HFRef -> IO [HFParquetFile]
702+
resolveHFUrls mToken ref = do
703+
let dataset = hfOwner ref <> "/" <> hfDataset ref
704+
let apiUrl = "https://datasets-server.huggingface.co/parquet?dataset=" ++ T.unpack dataset
705+
req0 <- parseRequest apiUrl
706+
let req = case mToken of
707+
Nothing -> req0
708+
Just tok -> setRequestHeader "Authorization" ["Bearer " <> tok] req0
709+
resp <- httpBS req
710+
let status = getResponseStatusCode resp
711+
when (status /= 200) $
712+
ioError $
713+
userError $
714+
"HuggingFace API returned status "
715+
++ show status
716+
++ " for dataset "
717+
++ T.unpack dataset
718+
case eitherDecodeStrict (getResponseBody resp) of
719+
Left err -> ioError $ userError $ "Failed to parse HF API response: " ++ err
720+
Right hfResp -> pure $ filter (matchesGlob (hfGlob ref)) (hfParquetFiles hfResp)
721+
722+
downloadHFFiles :: Maybe BSO.ByteString -> [HFParquetFile] -> IO [FilePath]
723+
downloadHFFiles mToken files = do
724+
tmpDir <- getTemporaryDirectory
725+
forM files $ \f -> do
726+
-- Derive a collision-resistant temp name from the URL path components
727+
let fname = case (hfpConfig f, hfpSplit f) of
728+
(c, s) | T.null c && T.null s -> T.unpack (hfpFilename f)
729+
(c, s) -> T.unpack c <> "_" <> T.unpack s <> "_" <> T.unpack (hfpFilename f)
730+
let destPath = tmpDir </> fname
731+
req0 <- parseRequest (T.unpack (hfpUrl f))
732+
let req = case mToken of
733+
Nothing -> req0
734+
Just tok -> setRequestHeader "Authorization" ["Bearer " <> tok] req0
735+
resp <- httpBS req
736+
let status = getResponseStatusCode resp
737+
when (status /= 200) $
738+
ioError $
739+
userError $
740+
"Failed to download " ++ T.unpack (hfpUrl f) ++ " (HTTP " ++ show status ++ ")"
741+
BSO.writeFile destPath (getResponseBody resp)
742+
pure destPath
743+
744+
-- | True when the path contains glob wildcard characters.
745+
hasGlob :: T.Text -> Bool
746+
hasGlob = T.any (\c -> c == '*' || c == '?' || c == '[')
747+
748+
{- | Build the direct HF repo download URL for a path with no wildcards.
749+
Format: https://huggingface.co/datasets/{owner}/{dataset}/resolve/main/{path}
750+
-}
751+
directHFUrl :: HFRef -> T.Text
752+
directHFUrl ref =
753+
"https://huggingface.co/datasets/"
754+
<> hfOwner ref
755+
<> "/"
756+
<> hfDataset ref
757+
<> "/resolve/main/"
758+
<> hfGlob ref
759+
760+
fetchHFParquetFiles :: FilePath -> IO [FilePath]
761+
fetchHFParquetFiles uri = do
762+
ref <- case parseHFUri uri of
763+
Left err -> ioError (userError err)
764+
Right r -> pure r
765+
mToken <- getHFToken
766+
if hasGlob (hfGlob ref)
767+
then do
768+
hfFiles <- resolveHFUrls mToken ref
769+
when (null hfFiles) $
770+
ioError $
771+
userError $
772+
"No parquet files found for " ++ uri
773+
downloadHFFiles mToken hfFiles
774+
else do
775+
-- Direct repo file download — no datasets-server needed
776+
let url = directHFUrl ref
777+
let filename = last $ T.splitOn "/" (hfGlob ref)
778+
downloadHFFiles mToken [HFParquetFile url "" "" filename]

src/DataFrame/Internal/Schema.hs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,15 @@ newtype Schema = Schema
9696
-}
9797
}
9898
deriving (Show, Eq)
99+
100+
{- | Construct a 'Schema' from a list of @(columnName, schemaType)@ pairs.
101+
102+
==== __Example__
103+
>>> :set -XTypeApplications
104+
>>> import qualified Data.Text as T
105+
>>> let s = makeSchema [("name", schemaType @T.Text), ("age", schemaType @Int)]
106+
>>> M.member "age" (elements s)
107+
True
108+
-}
109+
makeSchema :: [(T.Text, SchemaType)] -> Schema
110+
makeSchema = Schema . M.fromList

src/DataFrame/Lazy/Internal/Executor.hs

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -459,26 +459,65 @@ Column projection and predicate pushdown are forwarded to 'readParquetWithOpts'
459459
via 'ParquetReadOptions'.
460460
-}
461461
executeParquetScan :: FilePath -> ScanConfig -> IO Stream
462-
executeParquetScan path cfg = do
463-
isDir <- doesDirectoryExist path
464-
let pat = if isDir then path </> "*" else path
465-
matches <- glob pat
466-
files <- filterM (fmap not . doesDirectoryExist) matches
467-
when (null files) $
468-
error ("executeParquetScan: no parquet files found for " ++ path)
462+
executeParquetScan path cfg
463+
| Parquet.isHFUri path = executeHFParquetScan path cfg
464+
| otherwise = do
465+
isDir <- doesDirectoryExist path
466+
let pat = if isDir then path </> "*" else path
467+
matches <- glob pat
468+
files <- filterM (fmap not . doesDirectoryExist) matches
469+
when (null files) $
470+
error ("executeParquetScan: no parquet files found for " ++ path)
471+
let opts =
472+
Parquet.defaultParquetReadOptions
473+
{ Parquet.selectedColumns = Just (M.keys (elements (scanSchema cfg)))
474+
, Parquet.predicate = scanPushdownPredicate cfg
475+
}
476+
ref <- newIORef files
477+
return . Stream $ do
478+
fs <- readIORef ref
479+
case fs of
480+
[] -> return Nothing
481+
(f : rest) -> do
482+
writeIORef ref rest
483+
Just <$> Parquet.readParquetWithOpts opts f
484+
485+
{- | HuggingFace Parquet scan. Files are resolved once (API call or direct URL)
486+
then downloaded one at a time as the stream is pulled — so only one file's worth
487+
of data is in memory at a time, regardless of dataset size.
488+
-}
489+
490+
-- TODO: mchavinda - this should be a more general online file scanner.
491+
executeHFParquetScan :: FilePath -> ScanConfig -> IO Stream
492+
executeHFParquetScan path cfg = do
493+
ref <- case Parquet.parseHFUri path of
494+
Left err -> error err
495+
Right r -> pure r
496+
mToken <- Parquet.getHFToken
497+
hfFiles <-
498+
if Parquet.hasGlob (Parquet.hfGlob ref)
499+
then Parquet.resolveHFUrls mToken ref
500+
else do
501+
let url = Parquet.directHFUrl ref
502+
filename = last $ T.splitOn "/" (Parquet.hfGlob ref)
503+
pure [Parquet.HFParquetFile url "" "" filename]
504+
when (null hfFiles) $
505+
error ("executeParquetScan: no HF parquet files found for " ++ path)
469506
let opts =
470507
Parquet.defaultParquetReadOptions
471508
{ Parquet.selectedColumns = Just (M.keys (elements (scanSchema cfg)))
472509
, Parquet.predicate = scanPushdownPredicate cfg
473510
}
474-
ref <- newIORef files
511+
filesRef <- newIORef hfFiles
475512
return . Stream $ do
476-
fs <- readIORef ref
513+
fs <- readIORef filesRef
477514
case fs of
478515
[] -> return Nothing
479516
(f : rest) -> do
480-
writeIORef ref rest
481-
Just <$> Parquet.readParquetWithOpts opts f
517+
writeIORef filesRef rest
518+
-- Download a single file, read it, then return the batch.
519+
[localPath] <- Parquet.downloadHFFiles mToken [f]
520+
Just <$> Parquet.readParquetWithOpts opts localPath
482521

483522
-- ---------------------------------------------------------------------------
484523
-- CSV scan implementation

0 commit comments

Comments
 (0)