Skip to content

Commit 2e70358

Browse files
committed
feat: Declare parquet should respect nullable columns.
1 parent 6388894 commit 2e70358

File tree

4 files changed

+90
-20
lines changed

4 files changed

+90
-20
lines changed

data/sharded/part-0.parquet

2.62 KB
Binary file not shown.

data/sharded/part-1.parquet

2.4 KB
Binary file not shown.

src/DataFrame/Functions.hs

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ import Data.Time
4141
import qualified Data.Vector as V
4242
import qualified Data.Vector.Unboxed as VU
4343
import Data.Word
44+
import qualified Data.Set as S
4445
import qualified DataFrame.IO.CSV as CSV
4546
import qualified DataFrame.IO.Parquet as Parquet
4647
import DataFrame.IO.Parquet.Thrift
48+
import DataFrame.IO.Parquet.Types (columnNullCount)
4749
import DataFrame.Internal.Nullable (
4850
BaseType,
4951
NullLift1Op (applyNull1),
@@ -537,33 +539,39 @@ declareColumnsFromCsvFile path = do
537539
declareColumnsFromParquetFile :: String -> DecsQ
538540
declareColumnsFromParquetFile path = do
539541
isDir <- liftIO $ doesDirectoryExist path
540-
541542
let pat = if isDir then path </> "*.parquet" else path
542-
543543
matches <- liftIO $ glob pat
544-
545544
files <- liftIO $ filterM (fmap Prelude.not . doesDirectoryExist) matches
546-
df <-
547-
liftIO $
548-
foldM
549-
( \acc p -> do
550-
(metadata, _) <- liftIO (Parquet.readMetadataFromPath p)
551-
let d = schemaToEmptyDataFrame (schema metadata)
552-
pure $ acc <> d
553-
)
554-
DataFrame.Internal.DataFrame.empty
555-
files
545+
metas <- liftIO $ mapM (fmap fst . Parquet.readMetadataFromPath) files
546+
let nullableCols :: S.Set T.Text
547+
nullableCols = S.fromList
548+
[ T.pack (last colPath)
549+
| meta <- metas
550+
, rg <- rowGroups meta
551+
, cc <- rowGroupColumns rg
552+
, let cm = columnMetaData cc
553+
colPath = columnPathInSchema cm
554+
, Prelude.not (null colPath)
555+
, columnNullCount (columnStatistics cm) > 0
556+
]
557+
let df = foldl (\acc meta -> acc <> schemaToEmptyDataFrame nullableCols (schema meta))
558+
DataFrame.Internal.DataFrame.empty
559+
metas
556560
declareColumns df
557561

558-
schemaToEmptyDataFrame :: [SchemaElement] -> DataFrame
559-
schemaToEmptyDataFrame elems =
562+
schemaToEmptyDataFrame :: S.Set T.Text -> [SchemaElement] -> DataFrame
563+
schemaToEmptyDataFrame nullableCols elems =
560564
let leafElems = filter (\e -> numChildren e == 0) elems
561-
in fromNamedColumns (map schemaElemToColumn leafElems)
565+
in fromNamedColumns (map (schemaElemToColumn nullableCols) leafElems)
562566

563-
schemaElemToColumn :: SchemaElement -> (T.Text, Column)
564-
schemaElemToColumn elem =
565-
let name = elementName elem
566-
in (name, emptyColumnForType (elementType elem))
567+
schemaElemToColumn :: S.Set T.Text -> SchemaElement -> (T.Text, Column)
568+
schemaElemToColumn nullableCols elem =
569+
let name = elementName elem
570+
isNull = name `S.member` nullableCols
571+
col = if isNull
572+
then emptyNullableColumnForType (elementType elem)
573+
else emptyColumnForType (elementType elem)
574+
in (name, col)
567575

568576
emptyColumnForType :: TType -> Column
569577
emptyColumnForType = \case
@@ -578,6 +586,19 @@ emptyColumnForType = \case
578586
STRING -> fromList @T.Text []
579587
other -> error $ "Unsupported parquet type for column: " <> show other
580588

589+
emptyNullableColumnForType :: TType -> Column
590+
emptyNullableColumnForType = \case
591+
BOOL -> fromList @(Maybe Bool) []
592+
BYTE -> fromList @(Maybe Word8) []
593+
I16 -> fromList @(Maybe Int16) []
594+
I32 -> fromList @(Maybe Int32) []
595+
I64 -> fromList @(Maybe Int64) []
596+
I96 -> fromList @(Maybe Int64) []
597+
FLOAT -> fromList @(Maybe Float) []
598+
DOUBLE -> fromList @(Maybe Double) []
599+
STRING -> fromList @(Maybe T.Text) []
600+
other -> error $ "Unsupported parquet type for column: " <> show other
601+
581602
declareColumnsFromCsvWithOpts :: CSV.ReadOptions -> String -> DecsQ
582603
declareColumnsFromCsvWithOpts opts path = do
583604
df <- liftIO (CSV.readSeparated opts path)

tests/Parquet.hs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,15 @@ import qualified DataFrame.IO.Parquet as DP
1010

1111
import qualified Data.ByteString as BS
1212
import Data.Int
13+
import qualified Data.Set as S
1314
import Data.Text (Text)
15+
import qualified Data.Text as T
1416
import Data.Time
1517
import Data.Word
18+
import DataFrame.Internal.Column (hasMissing)
19+
import DataFrame.Internal.DataFrame (unsafeGetColumn)
20+
import DataFrame.IO.Parquet.Thrift (columnMetaData, columnPathInSchema, columnStatistics, rowGroupColumns, rowGroups, schema)
21+
import DataFrame.IO.Parquet.Types (columnNullCount)
1622
import DataFrame.Internal.Binary (
1723
littleEndianWord32,
1824
littleEndianWord64,
@@ -1687,6 +1693,46 @@ nationDictMalformed =
16871693
(D.readParquet "./tests/data/nation.dict-malformed.parquet")
16881694
)
16891695

1696+
shardedNullableSchema :: Test
1697+
shardedNullableSchema =
1698+
TestCase $ do
1699+
metas <- mapM (fmap fst . DP.readMetadataFromPath)
1700+
["data/sharded/part-0.parquet", "data/sharded/part-1.parquet"]
1701+
let nullableCols = S.fromList
1702+
[ last (map T.pack colPath)
1703+
| meta <- metas
1704+
, rg <- rowGroups meta
1705+
, cc <- rowGroupColumns rg
1706+
, let cm = columnMetaData cc
1707+
colPath = columnPathInSchema cm
1708+
, not (null colPath)
1709+
, columnNullCount (columnStatistics cm) > 0
1710+
]
1711+
df = foldl (\acc meta -> acc <> F.schemaToEmptyDataFrame nullableCols (schema meta))
1712+
D.empty
1713+
metas
1714+
assertBool "id should be nullable" (hasMissing (unsafeGetColumn "id" df))
1715+
assertBool "name should be nullable" (hasMissing (unsafeGetColumn "name" df))
1716+
assertBool "score should be nullable" (hasMissing (unsafeGetColumn "score" df))
1717+
1718+
singleShardNoNulls :: Test
1719+
singleShardNoNulls =
1720+
TestCase $ do
1721+
(meta, _) <- DP.readMetadataFromPath "data/sharded/part-0.parquet"
1722+
let nullableCols = S.fromList
1723+
[ last (map T.pack colPath)
1724+
| rg <- rowGroups meta
1725+
, cc <- rowGroupColumns rg
1726+
, let cm = columnMetaData cc
1727+
colPath = columnPathInSchema cm
1728+
, not (null colPath)
1729+
, columnNullCount (columnStatistics cm) > 0
1730+
]
1731+
df = F.schemaToEmptyDataFrame nullableCols (schema meta)
1732+
assertBool "id should NOT be nullable" (not (hasMissing (unsafeGetColumn "id" df)))
1733+
assertBool "name should NOT be nullable" (not (hasMissing (unsafeGetColumn "name" df)))
1734+
assertBool "score should NOT be nullable" (not (hasMissing (unsafeGetColumn "score" df)))
1735+
16901736
tests :: [Test]
16911737
tests =
16921738
[ allTypesPlain
@@ -1780,4 +1826,7 @@ tests =
17801826
, unknownLogicalType
17811827
, -- Group 12: malformed files
17821828
nationDictMalformed
1829+
, -- Group 13: metadata-based null detection
1830+
shardedNullableSchema
1831+
, singleShardNoNulls
17831832
]

0 commit comments

Comments
 (0)