1+ {-# LANGUAGE GADTs #-}
2+ {-# LANGUAGE FlexibleContexts #-}
3+ {-# LANGUAGE InstanceSigs #-}
4+ {-# LANGUAGE ExistentialQuantification #-}
5+ {-# LANGUAGE AllowAmbiguousTypes #-}
6+ {-# LANGUAGE NumericUnderscores #-}
17module DataFrame.Lazy.Internal.DataFrame where
28
9+ import Control.Monad (forM_ )
310import Data.IORef
11+ import Data.Kind
412import qualified Data.Map as M
513import qualified Data.Text as T
614import qualified Data.Vector as V
7- import qualified DataFrame.Lazy.Internal.Column as C
15+ import qualified DataFrame.Internal.DataFrame as D
16+ import qualified DataFrame.Internal.Column as C
17+ import qualified DataFrame.Internal.Expression as E
18+ import qualified DataFrame.Operations.Core as D
19+ import qualified DataFrame.Operations.Subset as D
20+ import qualified DataFrame.Operations.Transformations as D
21+ import qualified DataFrame.IO.CSV as D
822import System.FilePath
923
10- data DataFrame = DataFrame
11- { columns :: V. Vector (Maybe C. Column )
12- , columnIndices :: ! (M. Map T. Text Int )
13- , freeIndices :: ! (IORef [Int ])
14- , dataframeDims :: ! (IORef (Int , Int )) -- (rows , cols)
15- , memBudgetBytes :: ! Int -- e.g. 512 * 1024 * 1024
16- , liveMemBytes :: ! (IORef Int ) -- updated atomically
17- , chunkRowTarget :: ! Int -- e.g. 100_000
18- , spillDir :: ! FilePath
19- }
24+ data LazyOperation where
25+ Derive :: C. Columnable a => T. Text -> E. Expr a -> LazyOperation
26+ Select :: [T. Text ] -> LazyOperation
27+ Filter :: E. Expr Bool -> LazyOperation
28+
29+ instance Show LazyOperation where
30+ show :: LazyOperation -> String
31+ show (Derive name expr) = T. unpack name ++ " := " ++ show expr
32+ show (Select columns) = " select(" ++ show columns ++ " )"
33+ show (Filter expr) = " filter(" ++ show expr ++ " )"
34+
35+ data InputType = ICSV deriving Show
36+
37+ data LazyDataFrame = LazyDataFrame
38+ { inputPath :: FilePath
39+ , inputType :: InputType
40+ , operations :: [LazyOperation ]
41+ , batchSize :: Int
42+ } deriving Show
43+
44+ eval :: LazyOperation -> D. DataFrame -> D. DataFrame
45+ eval (Derive name expr) = D. derive name expr
46+ eval (Select columns) = D. select columns
47+ eval (Filter expr) = D. filterWhere expr
48+
49+ runDataFrame :: forall a . (C. Columnable a ) => LazyDataFrame -> IO D. DataFrame
50+ runDataFrame df = do
51+ let path = inputPath df
52+ -- totalRows <- D.countRows ',' path
53+ let batches = batchRanges 1000000 (batchSize df)
54+ _ <- forM_ batches $ \ (start, end) -> do
55+ -- TODO: implement specific read operations for batching that returns a seek instead of re-reading everything.
56+ sdf <- D. readSeparated ' ,' (D. defaultOptions { D. rowRange = Just (start, (batchSize df)) }) path
57+ let rdf = foldl' (\ d op -> eval op d) sdf (operations df)
58+ if fst (D. dimensions rdf) == 0 then return () else print rdf
59+ return (D. empty)
60+
61+ batchRanges :: Int -> Int -> [(Int , Int )]
62+ batchRanges n inc = go n [0 ,inc.. n]
63+ where
64+ go _ [] = []
65+ go n [x] = [(x, n)]
66+ go n (f: s: rest) = (f, s) : go n (s: rest)
67+
68+ scanCsv :: T. Text -> LazyDataFrame
69+ scanCsv path = LazyDataFrame (T. unpack path) ICSV [] 1024
70+
71+ addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame
72+ addOperation op df = df { operations = (operations df) ++ [op] }
73+
74+ derive :: C. Columnable a => T. Text -> E. Expr a -> LazyDataFrame -> LazyDataFrame
75+ derive name expr = addOperation (Derive name expr)
76+
77+ select :: C. Columnable a => [T. Text ] -> LazyDataFrame -> LazyDataFrame
78+ select columns = addOperation (Select columns)
79+
80+ filter :: C. Columnable a => E. Expr Bool -> LazyDataFrame -> LazyDataFrame
81+ filter cond = addOperation (Filter cond)
0 commit comments