Iterable Data is a Python library for reading and writing data files row by row in a consistent, iterator-based interface. It provides a unified API for working with various data formats (CSV, JSON, Parquet, XML, etc.) similar to csv.DictReader but supporting many more formats.
This library simplifies data processing and conversion between formats while preserving complex nested data structures (unlike pandas DataFrames which require flattening).
- Unified API: Single interface for reading/writing multiple data formats
- Automatic Format Detection: Detects file type and compression from filename or content (magic numbers and heuristics)
- Format Capability Reporting: Programmatically query format capabilities (read/write/bulk/totals/streaming/tables)
- Support for Compression: Works seamlessly with compressed files
- Preserves Nested Data: Handles complex nested structures as Python dictionaries
- DuckDB Integration: Optional DuckDB engine for high-performance queries
- Pipeline Processing: Built-in pipeline support for data transformation
- Encoding Detection: Automatic encoding and delimiter detection for text files
- Bulk Operations: Efficient batch reading and writing
- Table Listing: Discover available tables, sheets, and datasets in multi-table formats
- Context Manager Support: Use
withstatements for automatic resource cleanup
- JSON - Standard JSON files
- JSONL/NDJSON - JSON Lines format (one JSON object per line)
- JSON-LD - JSON for Linking Data (RDF format)
- CSV/TSV - Comma and tab-separated values
- Annotated CSV - CSV with type annotations and metadata
- CSVW - CSV on the Web (with metadata)
- PSV/SSV - Pipe and semicolon-separated values
- LTSV - Labeled Tab-Separated Values
- FWF - Fixed Width Format
- XML - XML files with configurable tag parsing
- ZIP XML - XML files within ZIP archives
- HTML - HTML files with table extraction
- BSON - Binary JSON format
- MessagePack - Efficient binary serialization
- CBOR - Concise Binary Object Representation
- UBJSON - Universal Binary JSON
- SMILE - Binary JSON variant
- Bencode - BitTorrent encoding format
- Avro - Apache Avro binary format
- Pickle - Python pickle format
- Parquet - Apache Parquet columnar format
- ORC - Optimized Row Columnar format
- Arrow/Feather - Apache Arrow columnar format
- Lance - Modern columnar format optimized for ML and vector search
- Delta Lake - Delta Lake format
- Iceberg - Apache Iceberg format
- Hudi - Apache Hudi format
- SQLite - SQLite database files
- DBF - dBase/FoxPro database files
- MySQL Dump - MySQL dump files
- PostgreSQL Copy - PostgreSQL COPY format
- DuckDB - DuckDB database files
- SAS - SAS data files
- Stata - Stata data files
- SPSS - SPSS data files
- R Data - R RDS and RData files
- PX - PC-Axis format
- ARFF - Attribute-Relation File Format (Weka format)
- NetCDF - Network Common Data Form for scientific data
- HDF5 - Hierarchical Data Format
- GeoJSON - Geographic JSON format
- GeoPackage - OGC GeoPackage format
- GML - Geography Markup Language
- KML - Keyhole Markup Language
- Shapefile - ESRI Shapefile format
- MVT/PBF - Mapbox Vector Tiles
- TopoJSON - Topology-preserving GeoJSON extension
- JSON-LD - JSON for Linking Data
- RDF/XML - RDF in XML format
- Turtle - Terse RDF Triple Language
- N-Triples - Line-based RDF format
- N-Quads - N-Triples with context
- Atom - Atom Syndication Format
- RSS - Rich Site Summary feed format
- PCAP - Packet Capture format
- PCAPNG - PCAP Next Generation format
- Apache Log - Apache access/error logs
- CEF - Common Event Format
- GELF - Graylog Extended Log Format
- WARC - Web ARChive format
- CDX - Web archive index format
- ILP - InfluxDB Line Protocol
- HTML - HTML files with table extraction
- EML - Email message format
- MBOX - Mailbox format
- MHTML - MIME HTML format
- INI - INI configuration files
- TOML - Tom's Obvious Minimal Language
- YAML - YAML Ain't Markup Language
- HOCON - Human-Optimized Config Object Notation
- EDN - Extensible Data Notation
- XLS/XLSX - Microsoft Excel files
- ODS - OpenDocument Spreadsheet
- DXF - AutoCAD Drawing Exchange Format
- Kafka - Apache Kafka format
- Pulsar - Apache Pulsar format
- Flink - Apache Flink format
- Beam - Apache Beam format
- RecordIO - RecordIO format
- SequenceFile - Hadoop SequenceFile
- TFRecord - TensorFlow Record format
- Protocol Buffers - Google Protocol Buffers
- Cap'n Proto - Cap'n Proto serialization
- FlatBuffers - FlatBuffers serialization
- FlexBuffers - FlexBuffers format
- Thrift - Apache Thrift format
- ASN.1 - ASN.1 encoding format
- Ion - Amazon Ion format
- VCF - Variant Call Format (genomics)
- iCal - iCalendar format
- LDIF - LDAP Data Interchange Format
- TXT - Plain text files
- GZip (.gz)
- BZip2 (.bz2)
- LZMA (.xz, .lzma)
- LZ4 (.lz4)
- ZIP (.zip)
- Brotli (.br)
- ZStandard (.zst, .zstd)
- Snappy (.snappy, .sz)
- LZO (.lzo, .lzop)
- SZIP (.sz)
- 7z (.7z)
Python 3.10+
pip install iterabledata
Or install from source:
git clone https://github.com/datenoio/iterabledata.git
cd pyiterable
pip install .
from iterable.helpers.detect import open_iterable
# Automatically detects format and compression
# Using context manager (recommended)
with open_iterable('data.csv.gz') as source:
for row in source:
print(row)
# Process your data here
# File is automatically closed
# Or manually (still supported)
source = open_iterable('data.csv.gz')
for row in source:
print(row)
source.close()
from iterable.helpers.detect import open_iterable
# Write compressed JSONL file
# Using context manager (recommended)
with open_iterable('output.jsonl.zst', mode='w') as dest:
for item in my_data:
dest.write(item)
# File is automatically closed
# Or manually (still supported)
dest = open_iterable('output.jsonl.zst', mode='w')
for item in my_data:
dest.write(item)
dest.close()
from iterable.helpers.detect import open_iterable
# Read compressed CSV file (supports .gz, .bz2, .xz, .zst, .lz4, .br, .snappy, .lzo)
source = open_iterable('data.csv.xz')
n = 0
for row in source:
n += 1
# Process row data
if n % 1000 == 0:
print(f'Processed {n} rows')
source.close()
from iterable.helpers.detect import open_iterable
# Read JSONL file
jsonl_file = open_iterable('data.jsonl')
for row in jsonl_file:
print(row)
jsonl_file.close()
# Read Parquet file
parquet_file = open_iterable('data.parquet')
for row in parquet_file:
print(row)
parquet_file.close()
# Read XML file (specify tag name)
xml_file = open_iterable('data.xml', iterableargs={'tagname': 'item'})
for row in xml_file:
print(row)
xml_file.close()
# Read Excel file
xlsx_file = open_iterable('data.xlsx')
for row in xlsx_file:
print(row)
xlsx_file.close()
from iterable.helpers.detect import open_iterable, detect_file_type, detect_file_type_from_content
from iterable.helpers.utils import detect_encoding, detect_delimiter
# Detect file type and compression (uses filename extension)
result = detect_file_type('data.csv.gz')
print(f"Type: {result['datatype']}, Codec: {result['codec']}")
# Content-based detection (for files without extensions or streams)
with open('data.unknown', 'rb') as f:
detected_format = detect_file_type_from_content(f)
print(f"Detected format: {detected_format}") # e.g., 'parquet', 'json', 'csv'
# open_iterable() automatically uses content-based detection as fallback
# Works with files without extensions, streams, or incorrect extensions
with open_iterable('data.unknown') as source: # Detects from content
for row in source:
print(row)
# Detect encoding for CSV files
encoding_info = detect_encoding('data.csv')
print(f"Encoding: {encoding_info['encoding']}, Confidence: {encoding_info['confidence']}")
# Detect delimiter for CSV files
delimiter = detect_delimiter('data.csv', encoding=encoding_info['encoding'])
# Open with detected settings
source = open_iterable('data.csv', iterableargs={
'encoding': encoding_info['encoding'],
'delimiter': delimiter
})
IterableData provides a comprehensive exception hierarchy for better error handling:
from iterable.helpers.detect import open_iterable
from iterable.exceptions import (
FormatDetectionError,
FormatNotSupportedError,
FormatParseError,
CodecError
)
try:
with open_iterable('data.unknown') as source:
for row in source:
process(row)
except FormatDetectionError as e:
print(f"Could not detect format: {e.reason}")
# Try with explicit format or check file content
except FormatNotSupportedError as e:
print(f"Format '{e.format_id}' not supported: {e.reason}")
# Install missing dependencies or use different format
except FormatParseError as e:
print(f"Failed to parse {e.format_id} format")
if e.position:
print(f"Error at position: {e.position}")
except CodecError as e:
print(f"Compression error with {e.codec_name}: {e.message}")
# Check file integrity or try different codec
except Exception as e:
print(f"Unexpected error: {e}")
See Exception Hierarchy documentation for complete exception reference.
from iterable.helpers.capabilities import (
get_format_capabilities,
get_capability,
list_all_capabilities
)
# Get all capabilities for a format
caps = get_format_capabilities("csv")
print(f"CSV readable: {caps['readable']}")
print(f"CSV writable: {caps['writable']}")
print(f"CSV supports totals: {caps['totals']}")
print(f"CSV supports tables: {caps['tables']}")
# Query a specific capability
is_writable = get_capability("json", "writable")
has_totals = get_capability("parquet", "totals")
supports_tables = get_capability("xlsx", "tables")
# List capabilities for all formats
all_caps = list_all_capabilities()
for format_id, capabilities in all_caps.items():
if capabilities.get("tables"):
print(f"{format_id} supports multiple tables")
from iterable.helpers.detect import open_iterable
from iterable.convert.core import convert
# Simple format conversion
convert('input.jsonl.gz', 'output.parquet')
# Convert with options
convert(
'input.csv.xz',
'output.jsonl.zst',
iterableargs={'delimiter': ';', 'encoding': 'utf-8'},
batch_size=10000
)
# Convert and flatten nested structures
convert(
'input.jsonl',
'output.csv',
is_flatten=True,
batch_size=50000
)
from iterable.helpers.detect import open_iterable
from iterable.pipeline.core import pipeline
source = open_iterable('input.parquet')
destination = open_iterable('output.jsonl.xz', mode='w')
def transform_record(record, state):
"""Transform each record"""
# Add processing logic
out = {}
for key in ['name', 'email', 'age']:
if key in record:
out[key] = record[key]
return out
def progress_callback(stats, state):
"""Called every trigger_on records"""
print(f"Processed {stats['rec_count']} records, "
f"Duration: {stats.get('duration', 0):.2f}s")
def final_callback(stats, state):
"""Called when processing completes"""
print(f"Total records: {stats['rec_count']}")
print(f"Total time: {stats['duration']:.2f}s")
pipeline(
source=source,
destination=destination,
process_func=transform_record,
trigger_func=progress_callback,
trigger_on=1000,
final_func=final_callback,
start_state={}
)
source.close()
destination.close()
from iterable.datatypes.jsonl import JSONLinesIterable
from iterable.datatypes.bsonf import BSONIterable
from iterable.codecs.gzipcodec import GZIPCodec
from iterable.codecs.lzmacodec import LZMACodec
# Read gzipped JSONL
read_codec = GZIPCodec('input.jsonl.gz', mode='r', open_it=True)
reader = JSONLinesIterable(codec=read_codec)
# Write LZMA compressed BSON
write_codec = LZMACodec('output.bson.xz', mode='wb', open_it=False)
writer = BSONIterable(codec=write_codec, mode='w')
for row in reader:
writer.write(row)
reader.close()
writer.close()
from iterable.helpers.detect import open_iterable
# Use DuckDB engine for CSV, JSON, JSONL files
# Supported formats: csv, jsonl, ndjson, json
# Supported codecs: gz, zstd, zst
source = open_iterable(
'data.csv.gz',
engine='duckdb'
)
# DuckDB engine supports totals
total = source.totals()
print(f"Total records: {total}")
for row in source:
print(row)
source.close()
from iterable.helpers.detect import open_iterable
source = open_iterable('input.jsonl')
destination = open_iterable('output.parquet', mode='w')
# Read and write in batches for better performance
batch = []
for row in source:
batch.append(row)
if len(batch) >= 10000:
destination.write_bulk(batch)
batch = []
# Write remaining records
if batch:
destination.write_bulk(batch)
source.close()
destination.close()
from iterable.helpers.detect import open_iterable
# Read Excel file (specify sheet or page)
xls_file = open_iterable('data.xlsx', iterableargs={'page': 0})
for row in xls_file:
print(row)
xls_file.close()
# Read specific sheet in XLSX
xlsx_file = open_iterable('data.xlsx', iterableargs={'page': 'Sheet2'})
from iterable.helpers.detect import open_iterable
# Parse XML with specific tag name
xml_file = open_iterable(
'data.xml',
iterableargs={
'tagname': 'book',
'prefix_strip': True # Strip XML namespace prefixes
}
)
for item in xml_file:
print(item)
xml_file.close()
from iterable.datatypes.xml import XMLIterable
from iterable.datatypes.parquet import ParquetIterable
from iterable.codecs.bz2codec import BZIP2Codec
# Read compressed XML
read_codec = BZIP2Codec('data.xml.bz2', mode='r')
reader = XMLIterable(codec=read_codec, tagname='page')
# Write to Parquet with schema adaptation
writer = ParquetIterable(
'output.parquet',
mode='w',
use_pandas=False,
adapt_schema=True,
batch_size=10000
)
batch = []
for row in reader:
batch.append(row)
if len(batch) >= 10000:
writer.write_bulk(batch)
batch = []
if batch:
writer.write_bulk(batch)
reader.close()
writer.close()
Opens a file and returns an iterable object.
Parameters:
filename(str): Path to the filemode(str): File mode ('r' for read, 'w' for write)engine(str): Processing engine ('internal' or 'duckdb')codecargs(dict): Arguments for codec initializationiterableargs(dict): Arguments for iterable initialization
Returns: Iterable object for the detected file type
Detects file type and compression codec from filename.
Returns: Dictionary with success, datatype, and codec keys
convert(fromfile, tofile, iterableargs={}, scan_limit=1000, batch_size=50000, silent=True, is_flatten=False)
Converts data between formats.
Parameters:
fromfile(str): Source file pathtofile(str): Destination file pathiterableargs(dict): Options for iterablescan_limit(int): Number of records to scan for schema detectionbatch_size(int): Batch size for bulk operationssilent(bool): Suppress progress outputis_flatten(bool): Flatten nested structures
All iterable objects support:
read()- Read single recordread_bulk(num)- Read multiple recordswrite(record)- Write single recordwrite_bulk(records)- Write multiple recordsreset()- Reset iterator to beginningclose()- Close file handles
The internal engine uses pure Python implementations for all formats. It supports all file types and compression codecs.
The DuckDB engine provides high-performance querying capabilities for supported formats:
- Formats: CSV, JSONL, NDJSON, JSON
- Codecs: GZIP, ZStandard (.zst)
- Features: Fast querying, totals counting, SQL-like operations
Use engine='duckdb' when opening files:
source = open_iterable('data.csv.gz', engine='duckdb')
See the examples directory for more complete examples:
simplewiki/- Processing Wikipedia XML dumps
See the tests directory for comprehensive usage examples and test cases.
IterableData can be integrated with AI platforms and frameworks for intelligent data processing:
-
AI Frameworks - Integration with LangChain, CrewAI, and AutoGen
- Tool creation for data reading and format conversion
- Schema inference and data quality analysis
- Multi-agent workflows for data processing
-
OpenAI - Direct OpenAI API integration (GPT-4, GPT-3.5, etc.)
- Function calling and Assistants API
- Structured outputs for consistent results
- Natural language data analysis and transformation
-
Claude - Anthropic Claude AI integration
- Claude API integration with tools support
- Intelligent data analysis and schema inference
- Format conversion with AI guidance
- Data quality assessment and documentation
-
Gemini - Google Gemini AI integration
- Natural language data analysis
- Intelligent format conversion with AI guidance
- Schema documentation and data quality assessment
- Function calling integration
These guides provide patterns, examples, and best practices for combining IterableData's unified data interface with AI capabilities.
This library is used in:
- undatum - Command line data processing tool
- datacrafter - Data processing ETL engine
MIT License
Contributions are welcome! Please feel free to submit pull requests or open issues.
See CHANGELOG.md for detailed version history.
- Enhanced Format Detection: Added content-based format detection using magic numbers and heuristics for files without extensions, streams, and files with incorrect extensions
- Exception Hierarchy: Added comprehensive exception hierarchy (
IterableDataError,FormatError,CodecError, etc.) for better error handling - Format Capability Reporting: Added programmatic API to query format capabilities (
get_format_capabilities(),list_all_capabilities(),get_capability()) - Table Listing Support: Added
list_tables()andhas_tables()methods for discovering tables, sheets, and datasets in multi-table formats
- AI Integration Guides: Added comprehensive guides for LangChain, CrewAI, AutoGen, and Google Gemini AI
- Documentation: Added capability matrix and enhanced API documentation
- Development Tools: Added benchmarking and utility scripts
- Code Improvements: Enhanced format detection, codecs, and data type handlers
- Examples: Added ZIP XML processing example
- Major Format Expansion: Added support for 50+ new data formats across multiple categories
- Enhanced Compression: Added LZO, Snappy, and SZIP codec support
- CI/CD: Added GitHub Actions workflows for automated testing and deployment
- Documentation: Complete documentation site with Docusaurus
- Testing: Comprehensive test suite for all formats
- Comprehensive documentation enhancements
- GitHub Actions release workflow
- Improved examples and use cases
- DuckDB engine support
- Enhanced format detection
- Pipeline processing framework
- Bulk operations support