Skip to content

V0.1.1 : pytest for BigQueryToCSV.extract() #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 25 commits into
base: develop
Choose a base branch
from
Draft
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0232ac4
fixed bug warehouse uri
tusharchou Oct 29, 2024
0d4b61a
added logging to CSV.get()
tusharchou Oct 29, 2024
a2474ca
supported big query ts format
tusharchou Oct 29, 2024
4c11eb1
refactored parameter to config from catalog to reduce confusion
tusharchou Oct 29, 2024
8809515
Fixed bug of logger in GCP
tusharchou Oct 29, 2024
02099c6
replaced local path with dynamic path
tusharchou Oct 29, 2024
7c9b0af
replaced local path with dynamic path
tusharchou Oct 29, 2024
8cafca7
replaced local path with dynamic path
tusharchou Oct 29, 2024
929faf7
demo
tusharchou Oct 29, 2024
a01f5c4
demo
tusharchou Oct 29, 2024
e0911aa
added a class implementation for github issue hoping it is useful for…
tusharchou Nov 3, 2024
2ec3528
added a exception for PlanNotFound to ask users to raise issues on th…
tusharchou Nov 3, 2024
4d7783a
Updated overview and milestones. Added directory structure under tech…
tusharchou Nov 4, 2024
224d7fb
Updated components in technical speciifcations
tusharchou Nov 4, 2024
becddf4
added testing for BigQueryToCSV.extract()
tusharchou Nov 4, 2024
9df5337
added testing for BigQueryToCSV.extract()
tusharchou Nov 4, 2024
70d71a6
added testing for Iceberg.get()
tusharchou Nov 4, 2024
932941a
added testing for table_statistics_from_iceberg_metadata
tusharchou Nov 20, 2024
a2ba091
added table_statistics_from_iceberg_metadata in format.iceberg.metadata
tusharchou Nov 21, 2024
66505ba
added vocab in product
tusharchou Nov 23, 2024
6371648
added dimension in store
tusharchou Nov 23, 2024
6d37b44
tested dimension class without file format
tusharchou Nov 23, 2024
3814e4b
work: doc
tusharchou Nov 28, 2024
3ff6466
work: testing
tusharchou Nov 28, 2024
f372eeb
kafka spark iceberg WIP
tusharchou Jan 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


help: ## Display this help
@awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m\033[0m\n"} /^[a-zA-Z_-]+:.*?##/ { printf " \033[36m%-20s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } ' $(MAKEFILE_LIST)

install-poetry: ## Install poetry if the user has not done that yet.
@if ! command -v poetry &> /dev/null; then \
echo "Poetry could not be found. Installing..."; \
pip install --user poetry==2.0.1; \
else \
echo "Poetry is already installed."; \
fi

install-dependencies: ## Install dependencies including dev, docs, and all extras
poetry install --all-extras

install: | install-poetry install-dependencies

check-license: ## Check license headers
./dev/check-license

lint: ## lint
poetry run pre-commit run --all-files

test: ## Run all unit tests, can add arguments with PYTEST_ARGS="-vv"
poetry run pytest tests/ -m "(unmarked or parametrize) and not integration" ${PYTEST_ARGS}

test-s3: # Run tests marked with s3, can add arguments with PYTEST_ARGS="-vv"
sh ./dev/run-minio.sh
poetry run pytest tests/ -m s3 ${PYTEST_ARGS}


test-kafka: ## Up Kafka container
docker compose -f dev/docker-compose-kafka.yml kill
docker compose -f dev/docker-compose-kafka.yml rm -f
docker compose -f dev/docker-compose-kafka.yml up -d
sleep 10

test-spark:
docker compose -f dev/docker-compose-spark.yml kill
docker compose -f dev/docker-compose-spark.yml rm -f
docker compose -f dev/docker-compose-spark.yml up -d
sleep 10


test-integration: ## Run all integration tests, can add arguments with PYTEST_ARGS="-vv"
docker compose -f dev/docker-compose-integration.yml kill
docker compose -f dev/docker-compose-integration.yml rm -f
docker compose -f dev/docker-compose-integration.yml up -d
sleep 10
docker compose -f dev/docker-compose-integration.yml cp ./dev/provision.py spark-iceberg:/opt/spark/provision.py
docker compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
poetry run pytest tests/ -v -m integration ${PYTEST_ARGS}

test-integration-rebuild:
docker compose -f dev/docker-compose-integration.yml kill
docker compose -f dev/docker-compose-integration.yml rm -f
docker compose -f dev/docker-compose-integration.yml build --no-cache

test-adls: ## Run tests marked with adls, can add arguments with PYTEST_ARGS="-vv"
sh ./dev/run-azurite.sh
poetry run pytest tests/ -m adls ${PYTEST_ARGS}

test-gcs: ## Run tests marked with gcs, can add arguments with PYTEST_ARGS="-vv"
sh ./dev/run-gcs-server.sh
poetry run pytest tests/ -m gcs ${PYTEST_ARGS}

test-coverage-unit: # Run test with coverage for unit tests, can add arguments with PYTEST_ARGS="-vv"
poetry run coverage run --source=pyiceberg/ --data-file=.coverage.unit -m pytest tests/ -v -m "(unmarked or parametrize) and not integration" ${PYTEST_ARGS}

test-coverage-integration: # Run test with coverage for integration tests, can add arguments with PYTEST_ARGS="-vv"
docker compose -f dev/docker-compose-integration.yml kill
docker compose -f dev/docker-compose-integration.yml rm -f
docker compose -f dev/docker-compose-integration.yml up -d
sh ./dev/run-azurite.sh
sh ./dev/run-gcs-server.sh
sleep 10
docker compose -f dev/docker-compose-integration.yml cp ./dev/provision.py spark-iceberg:/opt/spark/provision.py
docker compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
poetry run coverage run --source=pyiceberg/ --data-file=.coverage.integration -m pytest tests/ -v -m integration ${PYTEST_ARGS}

test-coverage: | test-coverage-unit test-coverage-integration ## Run all tests with coverage including unit and integration tests
poetry run coverage combine .coverage.unit .coverage.integration
poetry run coverage report -m --fail-under=90
poetry run coverage html
poetry run coverage xml


clean: ## Clean up the project Python working environment
@echo "Cleaning up Cython and Python cached files"
@rm -rf build dist *.egg-info
@find . -name "*.so" -exec echo Deleting {} \; -delete
@find . -name "*.pyc" -exec echo Deleting {} \; -delete
@find . -name "__pycache__" -exec echo Deleting {} \; -exec rm -rf {} +
@find . -name "*.pyd" -exec echo Deleting {} \; -delete
@find . -name "*.pyo" -exec echo Deleting {} \; -delete
@echo "Cleanup complete"

docs-install:
poetry install --with docs

docs-serve:
poetry run mkdocs serve -f mkdocs/mkdocs.yml

docs-build:
poetry run mkdocs build -f mkdocs/mkdocs.yml --strict
316 changes: 188 additions & 128 deletions README.md

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions dev/docker-compose-kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
services:
broker:
image: apache/kafka:latest
hostname: broker
container_name: broker
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
102 changes: 102 additions & 0 deletions dev/docker-compose-spark.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

services:
spark-iceberg:
image: python-integration
container_name: pyiceberg-spark
build: .
networks:
iceberg_net:
depends_on:
- rest
- hive
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
links:
- rest:rest
- hive:hive
- minio:minio
rest:
image: apache/iceberg-rest-fixture
container_name: pyiceberg-rest
networks:
iceberg_net:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: pyiceberg-minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: pyiceberg-mc
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
hive:
build: hive/
container_name: hive
hostname: hive
networks:
iceberg_net:
ports:
- 9083:9083
environment:
SERVICE_NAME: "metastore"
SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/"

networks:
iceberg_net:
22 changes: 22 additions & 0 deletions kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from confluent_kafka import Consumer

c = Consumer({
'bootstrap.servers': 'localhost',
'group.id': 'my-topic',
'auto.offset.reset': 'earliest'
})

c.subscribe(['my-topic'])

while True:
msg = c.poll(1.0)

if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue

print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()
Empty file removed local-data-platform/README.md
Empty file.
2 changes: 1 addition & 1 deletion local-data-platform/local_data_platform/__init__.py
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ class Table(Base):

def __init__(self, name: str, path: Path = os.getcwd()):
self.name = name
self.path = path
self.path = os.getcwd()+path

def get(self):
raise TableNotFound(
Original file line number Diff line number Diff line change
@@ -30,10 +30,10 @@ class LocalIcebergCatalog(SqlCatalog):

def __init__(self, name: str, path: str, *args, **kwargs):
self.name = name
self.uri = f"sqlite:///{path}/{name}.db"
self.uri = f"sqlite:///{path}/{name}_catalog.db"
self.warehouse = f"file://{path}"
try:
logger.error(f"Initializing LocalIcebergCatalog with {self.uri}")
logger.info(f"Initializing LocalIcebergCatalog with {self.uri}")
super().__init__(*args, **kwargs, **self.__dict__)
except Exception as e:
logger.error(f"Failed to initialize LocalIcebergCatalog {e}")
3 changes: 3 additions & 0 deletions local-data-platform/local_data_platform/exceptions.py
Original file line number Diff line number Diff line change
@@ -8,3 +8,6 @@ class PipelineNotFound(Exception):

class EngineNotFound(Exception):
"""Raised when engine is not supported"""

class PlanNotFound(Exception):
"""Raised when issue doesn't have resolution estimate"""
Original file line number Diff line number Diff line change
@@ -17,17 +17,18 @@ def __init__(self, *args, **kwargs):

def get(self) -> Table:
if not os.path.isfile(self.path):
logger.error(f"This path {self.path} is invalid")
raise FileNotFoundError

logger.info(
f"""
reading CSV from {self.path}
"""
)
df = csv.read_table(self.path)
df = csv.read_csv(self.path)
logger.info(
f"""
df type {type(df)}
df type {type(df)} len {len(df)}
"""
)
if df is not None:
28 changes: 19 additions & 9 deletions local-data-platform/local_data_platform/format/iceberg/__init__.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,10 @@
from pyiceberg.typedef import Identifier
from pyarrow import Table
from local_data_platform.logger import log
import os

os.environ['PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE'] = 'true'


logger = log()

@@ -21,35 +25,41 @@ class Iceberg(Format):
Methods:
__init__(catalog: str, *args, **kwargs):
Initializes the Iceberg instance with the given catalog and metadata.
put(df: Table) -> Table:
Writes the given data frame to the Iceberg table.
get():
Fetches data from the Iceberg table and returns it as an Arrow table.
"""
def __init__(self, catalog: str, *args, **kwargs):
logger.info(f"Iceberg catalog : {catalog}")
self.catalog_identifier = catalog["identifier"]
def __init__(self, config: str, *args, **kwargs):
logger.info(f"Iceberg catalog : {config}")
self.catalog_identifier = config["identifier"]
self.catalog = LocalIcebergCatalog(
self.catalog_identifier, path=catalog["warehouse_path"]
self.catalog_identifier, path=config["warehouse_path"]
)
self.catalog.create_namespace(self.catalog_identifier)
if not self.catalog._namespace_exists(self.catalog_identifier):
self.catalog.create_namespace(self.catalog_identifier)
self.identifier = f"{self.catalog_identifier}.{kwargs['name']}"
self.metadata = kwargs
logger.info(f"Iceberg created with catalog namespace {self.catalog_identifier}")
logger.info(f"Iceberg initialised with identifier {self.identifier}")
super().__init__(*args, **kwargs)

def put(self, df: Table) -> Table:
if not df:
logger.error(f"While doing put in Iceberg Format we got df as None")
raise Exception(f" Got Table as non")
logger.info(f"self.identifier {self.identifier}")
logger.info(
f"""
Writing {len(df)} to Iceberg Table {self.identifier}
Writing type {type(df)} of length {len(df)} to Iceberg Table {self.identifier}
"""
)
table = self.catalog.create_table_if_not_exists(
identifier=self.identifier, schema=df.schema
identifier=self.identifier, schema=df.schema, properties={
"downcast-ns-timestamp-to-us-on-write": True # Set property for downcasting
}
)
table.append(df)
return table
Original file line number Diff line number Diff line change
@@ -16,14 +16,14 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def get(self) -> Table:
if not os.path.isfile(self.path):
raise FileNotFoundError

logger.info(
f"""
reading parquet from {self.path}
"""
)
if not os.path.isfile(self.path):
raise FileNotFoundError

df = parquet.read_table(self.path)
logger.info(
f"""
92 changes: 92 additions & 0 deletions local-data-platform/local_data_platform/issue/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from .. import Repository
from ..logger import log
import requests
from bs4 import BeautifulSoup
from textwrap import fill
import re

logger = log()

"""
class issue is of type Repository
"""


class Issue(Repository):
def __init__(
self,
number: int=1,
repo_name='local-data-platform',
owner='tusharchou'
) -> str:
logger.debug(
f"""
Loading... Issue Object from {owner} {repo_name} {number}
"""
)
self.num = number
self.project = repo_name
self.owner = owner
self.name, self.desc = self._get_github_issue()

def get(self) -> str:
return (self.name ,self.desc)

def _get_github_issue(self):
url = f"https://github.com/{self.owner}/{self.project}/issues/{self.num}"
logger.debug(
f"""
Pulling Issue Object from {url}
"""
)

try:
response = requests.get(url)
logger.debug(
f"""
Extracting Issue Object from {response}
"""
)
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)

soup = BeautifulSoup(response.text, 'html.parser')

"""
<bdi class="js-issue-title markdown-title">PyIceberg Near-Term Roadmap</bdi>
"""
element = soup.select_one('.js-issue-title').get_text(strip=True)
# element = soup.select_one('.js-issue-title markdown-title')

logger.debug(
f"""
"""
)
# Extract the issue body
text = soup.select_one('.js-comment-body').get_text()
# pattern = re.compile(r'(?<!\s)(?=[A-Z])')
#
# split_text = re.split(pattern=pattern, string=text)
# body = "\n ".join(split_text)
body =text
# body = fill(body_text, width=80)

body = body if body else 'No body text'
logger.info(
f"""
Reading... issue title {element}
Fetching...
Body: {body}
Read More: {url}
"""
)
return (
element,
body
)# Access the 'body' field for the description
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching issue: {e}")
return None


def put(self):
raise Exception
5 changes: 1 addition & 4 deletions local-data-platform/local_data_platform/logger.py
Original file line number Diff line number Diff line change
@@ -8,10 +8,7 @@
def log():
basicConfig(level=INFO, format=
"""
%(filename)s - %(funcName)s
- %(asctime)s - %(name)s
- %(levelname)s
- message : %(message)s
%(message)s
"""
)

2 changes: 0 additions & 2 deletions local-data-platform/local_data_platform/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -11,7 +11,5 @@ class Pipeline(Flow):

def __init__(self, config: Config, *args, **kwargs):
self.config = config
# self.source = Source(**config.metadata['source'])
# self.target = Target(**config.metadata['target'])
super().__init__(*args, **kwargs)

Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ def __init__(self, config: Config, *args, **kwargs):
self.source = config.metadata["source"]
self.target = config.metadata["target"]
self.target = CSV(name=self.target["name"], path=self.target["path"])
self.source = Iceberg(name=self.source["name"], catalog=self.source["catalog"])
self.source = Iceberg(name=self.source["name"], config=self.source["catalog"])
logger.info(
f"""
IcebergToCSV initialised with
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from local_data_platform.pipeline import Pipeline
from local_data_platform.logger import log

logger = log()

class Ingestion(Pipeline):

class Ingestion(Pipeline):

def extract(self):
self.source.get()
logger.info("Extracting Source in ingestion pipeline")
return self.source.get()

def load(self):
self.target.put(self.extract())
df = self.extract()
logger.info(f"Loading Source {len(df)} in ingestion pipeline")
self.target.put(df)
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@

logger = log()


class CSVToIceberg(Ingestion):
"""
CSVToIceberg is a class responsible for ingesting data from a CSV source and
@@ -39,7 +40,7 @@ def __init__(self, config, *args, **kwargs):
)
self.target = Iceberg(
name=self.target['name'],
catalog=self.target['catalog']
config=self.target['catalog']
)
logger.info(
f"""
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ def __init__(self, config: Config, *args, **kwargs):
)
self.target = Iceberg(
name=self.target['name'],
catalog=self.target['catalog']
config=self.target['catalog']
)
logger.info(
f"""
7 changes: 7 additions & 0 deletions local-data-platform/local_data_platform/product/vocab.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
Business vocabulary
"""
from enum import Enum

class Vocab(Enum):
LISTINGS = "Listings"
14 changes: 14 additions & 0 deletions local-data-platform/local_data_platform/store/dimension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from . import Store
from typing import List, Dict
from pathlib import Path
from pandas import DataFrame

class Dimension(Store):
def __init__(self, name: str, data: DataFrame):
self.name = name
self.data = data
def get(self):
return self.data

def get_name(self):
return self.name
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
from pathlib import Path
import json
from local_data_platform import Credentials
from local_data_platform import logger
from local_data_platform.logger import log

logger = log()

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from local_data_platform.pipeline.ingestion.bigquery_to_csv import BigQueryToCSV
from local_data_platform import Config, SupportedFormat, SupportedEngine
from local_data_platform.store.source.json import Json
from local_data_platform.exceptions import PipelineNotFound
import os
from local_data_platform.logger import log


logger = log()


def get_near_transaction_dataset(
dataset="near_transactions",
config_path="/real_world_use_cases/near_data_lake/config/ingestion.json",
):
"""
Retrieves and processes the near transaction dataset based on the provided configuration.
Args:
dataset (str): The name of the dataset to be processed. Defaults to "near_transactions".
config_path (str): The path to the configuration file. Defaults to "/real_world_use_cases/near_data_lake/config/ingestion.json".
Raises:
PipelineNotFound: If the source and target formats specified in the configuration are not supported.
Returns:
None
"""

config = Config(
**Json(
name=dataset,
path=config_path,
).get()
)
print(config)
logger.info(
f"""
We are using the following dictionary as the configuration to generate a monthly trust metric
{config}
"""
)
if (
config.metadata["source"]["format"] == SupportedFormat.JSON.value
and config.metadata["target"]["format"] == SupportedFormat.CSV.value
and config.metadata["source"]["engine"] == SupportedEngine.BIGQUERY.value
):
data_loader = BigQueryToCSV(config=config)
data_loader.load()
else:
raise PipelineNotFound(
f"""
source {config.metadata['source']['format']}
to target {config.metadata['target']['format']}
pipeline is not supported yet
"""
)


get_near_transaction_dataset()
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from local_data_platform.pipeline.egression.csv_to_iceberg import CSVToIceberg
from local_data_platform.pipeline.ingestion.csv_to_iceberg import CSVToIceberg
from local_data_platform.pipeline.ingestion.bigquery_to_csv import BigQueryToCSV
from local_data_platform import Config, SupportedFormat, SupportedEngine
from local_data_platform.store.source.json import Json
@@ -10,54 +10,6 @@
logger = log()


def get_near_trasaction_dataset(
dataset="near_transactions",
config_path="/real_world_use_cases/near_data_lake/config/ingestion.json",
):
"""
Retrieves and processes the near transaction dataset based on the provided configuration.
Args:
dataset (str): The name of the dataset to be processed. Defaults to "near_transactions".
config_path (str): The path to the configuration file. Defaults to "/real_world_use_cases/near_data_lake/config/ingestion.json".
Raises:
PipelineNotFound: If the source and target formats specified in the configuration are not supported.
Returns:
None
"""

config = Config(
**Json(
name=dataset,
path=os.getcwd() + config_path,
).get()
)
print(config)
logger.info(
f"""
We are using the following dictionary as the configuration to generate a monthly trust metric
{config}
"""
)
if (
config.metadata["source"]["format"] == SupportedFormat.JSON.value
and config.metadata["target"]["format"] == SupportedFormat.CSV.value
and config.metadata["source"]["engine"] == SupportedEngine.BIGQUERY.value
):
data_loader = BigQueryToCSV(config=config)
data_loader.load()
else:
raise PipelineNotFound(
f"""
source {config.metadata['source']['format']}
to target {config.metadata['target']['format']}
pipeline is not supported yet
"""
)


def put_near_trasaction_dataset(
dataset="near_transactions",
config_path="/real_world_use_cases/near_data_lake/config/egression.json",
@@ -75,11 +27,11 @@ def put_near_trasaction_dataset(
Raises:
PipelineNotFound: If the source and target formats specified in the configuration are not supported.
"""
"""
config = Config(
**Json(
name=dataset,
path=os.getcwd() + config_path,
path=config_path,
).get()
)

@@ -105,5 +57,4 @@ def put_near_trasaction_dataset(
)


# get_near_trasaction_dataset();
put_near_trasaction_dataset()
Original file line number Diff line number Diff line change
@@ -11,15 +11,15 @@
"target": {
"name": "rides",
"format": "CSV",
"path": "/Users/tushar/Documents/GitHub/local-data-platform/local-data-platform/yellow_tripdata_2023-01.csv"
"path": "/real_world_use_cases/nyc_yellow_taxi_dataset/data/nyc_yellow_taxi_rides.csv"
},
"source": {
"name": "rides",
"format": "ICEBERG",
"catalog": {
"type": "LocalIceberg",
"identifier": "pyiceberg_catalog_db",
"warehouse_path": "./tmp/warehouse"
"identifier": "nyc_yellow_taxi_dataset",
"warehouse_path": "real_world_use_cases/nyc_yellow_taxi_dataset/data"
}
}
}
Original file line number Diff line number Diff line change
@@ -11,16 +11,16 @@
"source": {
"name": "rides",
"format": "PARQUET",
"path": "/Users/tushar/Documents/GitHub/local-data-platform/local-data-platform/yellow_tripdata_2023-01.parquet"
"path": "/yellow_tripdata_2023-01.parquet"
},
"target": {
"name": "rides",
"format": "ICEBERG",
"path": "yellow_tripdata_2023-01.parquet",
"catalog": {
"type": "LocalIceberg",
"identifier": "pyiceberg_catalog_db",
"warehouse_path": "./tmp/warehouse"
"identifier": "nyc_yellow_taxi_dataset",
"warehouse_path": "real_world_use_cases/nyc_yellow_taxi_dataset/data"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'''
New York Taxi and Limousine Commission
TLC Trip Record Data

Yellow and green taxi trip records include fields capturing pick-up and drop-off dates/times,
pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types,
and driver-reported passenger counts.

For-Hire Vehicle (“FHV”) trip records include fields capturing the dispatching base license number and the pick-up date,
time, and taxi zone location ID (shape file below).

All files will be stored in the PARQUET format.
Trip data will be published monthly (with two months delay) instead of bi-annually.
HVFHV files will now include 17 more columns (please see High Volume FHV Trips Dictionary for details).
Additional columns will be added to the old files as well.
'''







This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from local_data_platform import Config, SupportedFormat
from local_data_platform.store.source.json import Json
from local_data_platform.exceptions import PipelineNotFound
import os
from local_data_platform.logger import log
from local_data_platform.pipeline.egression.iceberg_to_csv import IcebergToCSV


logger = log()


def get_nyc_yellow_taxi_dataset(
dataset='nyc_taxi',
config_path='/real_world_use_cases/nyc_yellow_taxi_dataset/config/egression.json'
):
logger.info(
"""
We will try to read a ICEBERG table from local catalog
"""
)

config = Config(
**Json(
name=dataset,
path=config_path,
).get()
)

logger.info(
f"""
We are using the following dictionary as the configuration to generate a monthly trust metric
{config}
"""
)
if (
config.metadata['source']['format'] == SupportedFormat.ICEBERG.value and
config.metadata['target']['format'] == SupportedFormat.CSV.value
):
data_loader = IcebergToCSV(config=config)
data_loader.load()
else:
raise PipelineNotFound(
f"""
source {config.metadata['source']['format']}
to target {config.metadata['target']['format']}
pipeline is not supported yet
"""
)


get_nyc_yellow_taxi_dataset()
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from local_data_platform import Config, SupportedFormat
from local_data_platform.store.source.json import Json
from local_data_platform.exceptions import PipelineNotFound
import os
from local_data_platform.logger import log
from local_data_platform.pipeline.ingestion.parquet_to_iceberg import ParquetToIceberg

logger = log()


def put_nyc_yellow_taxi_dataset(
dataset='nyc_taxi',
config_path='/real_world_use_cases/nyc_yellow_taxi_dataset/config/ingestion.json'
):
logger.info(
"""
We will try to read a PARQUET file downloaded from :
https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
"""
)

config = Config(
**Json(
name=dataset,
path=config_path,
).get()
)

logger.info(
f"""
We are using the following dictionary as the configuration to generate a monthly trust metric
{config}
"""
)
if (
config.metadata['source']['format'] == SupportedFormat.PARQUET.value and
config.metadata['target']['format'] == SupportedFormat.ICEBERG.value
):
data_loader = ParquetToIceberg(config=config)
data_loader.load()
else:
raise PipelineNotFound(
f"""
source {config.metadata['source']['format']}
to target {config.metadata['target']['format']}
pipeline is not supported yet
"""
)


put_nyc_yellow_taxi_dataset()
Empty file.
197 changes: 197 additions & 0 deletions local-data-platform/tests/test_counts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# the row count is a common feature in formats like Parquet and Iceberg
import math
from io import BytesIO
from typing import List, Optional, Dict, Union, Tuple

import pandas
from pyarrow import parquet
import pyarrow as pa
from pyarrow import (
parquet as pq
)
from pandas import DataFrame, read_parquet
from pyiceberg.table import Table, TableMetadata, DataScan
from pyiceberg.catalog.sql import SqlCatalog

warehouse_path = "sample_data/warehouse"
catalog = SqlCatalog(
"default",
**{
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
},
)


def _create_data() -> DataFrame:
"""
to test that any table counts can be calculated from just metadata
we will create a test data List that we can use to test
"""

example_row = [
"nyc,1,1,1"
]
example_data_dictionary = [
"place",
"user_id",
"vendor_id",
"listings_id"
]
# example_data = [column:example_row.split for index,column in enumerate(example_data_dictionary)]
dict_of_column_values = {
"place" : ["nyc", "blr"],
"user_id": [ 1, 2],
"vendor_id": [ 101, 102],
"listings_id": [ 1001, 1002]
}
return DataFrame(dict_of_column_values)

def _create_parquet_metadata() -> parquet.FileMetaData:
df = _create_data()
buf_bytes = df.to_parquet()

buf_stream = BytesIO(buf_bytes)
parquet_file = parquet.ParquetFile(buf_stream)

return parquet_file.metadata


# parquet_metadata = read_parquet(buf_stream)
def construct_test_table(
write_statistics: Union[bool, List[str]] = True,
) -> Tuple[pq.FileMetaData, Union[TableMetadataV1, TableMetadataV2]]:
table_metadata = {
"format-version": 2,
"location": "s3://bucket/test/location",
"last-column-id": 7,
"current-schema-id": 0,
"schemas": [
{
"type": "struct",
"schema-id": 0,
"fields": [
{"id": 1, "name": "strings", "required": False, "type": "string"},
{"id": 2, "name": "floats", "required": False, "type": "float"},
{
"id": 3,
"name": "list",
"required": False,
"type": {"type": "list", "element-id": 6, "element": "long", "element-required": False},
},
{
"id": 4,
"name": "maps",
"required": False,
"type": {
"type": "map",
"key-id": 7,
"key": "long",
"value-id": 8,
"value": "long",
"value-required": False,
},
},
{
"id": 5,
"name": "structs",
"required": False,
"type": {
"type": "struct",
"fields": [
{"id": 9, "name": "x", "required": False, "type": "long"},
{"id": 10, "name": "y", "required": False, "type": "float", "doc": "comment"},
],
},
},
],
},
],
"default-spec-id": 0,
"partition-specs": [{"spec-id": 0, "fields": []}],
"properties": {},
}

table_metadata = TableMetadataUtil.parse_obj(table_metadata)
arrow_schema = schema_to_pyarrow(table_metadata.schemas[0])

_strings = ["zzzzzzzzzzzzzzzzzzzz", "rrrrrrrrrrrrrrrrrrrr", None, "aaaaaaaaaaaaaaaaaaaa"]

_floats = [3.14, math.nan, 1.69, 100]

_list = [[1, 2, 3], [4, 5, 6], None, [7, 8, 9]]

_maps: List[Optional[Dict[int, int]]] = [
{1: 2, 3: 4},
None,
{5: 6},
{},
]

_structs = [
asdict(TestStruct(1, 0.2)),
asdict(TestStruct(None, -1.34)),
None,
asdict(TestStruct(54, None)),
]

table = pa.Table.from_pydict(
{
"strings": _strings,
"floats": _floats,
"list": _list,
"maps": _maps,
"structs": _structs,
},
schema=arrow_schema,
)
metadata_collector: List[Any] = []

with pa.BufferOutputStream() as f:
with pq.ParquetWriter(
f, table.schema, metadata_collector=metadata_collector, write_statistics=write_statistics
) as writer:
writer.write_table(table)

return metadata_collector[0], table_metadata


def test_parquet_count():
metadata = _create_parquet_metadata()
assert metadata.num_rows == 2

def _create_iceberg_metadata() -> DataScan:
df = _create_data()
arrow_table = pa.Table.from_pandas(df)
# catalog.create_namespace("default")

# table = catalog.create_table(
# "default.taxi_dataset",
# schema=arrow_table.schema,
# )
table = catalog.load_table("default.taxi_dataset")
table.append(arrow_table)
return DataScanV2(table.scan())

def test_arrow_count():
df = _create_data()
arrow_table = pa.Table.from_pandas(df)
assert arrow_table.num_rows == 2

def test_iceberg_count():
table = _create_iceberg_metadata()
assert len(table.to_arrow()) == 2

class DataScanV2():
def __init__(self, scan: DataScan):
self.scan = scan
def count(self):
res = 0
tasks = self.scan.plan_files()
for task in tasks:
res+=task.file.record_count
return res

def test_iceberg_metadata_only_count():
table = _create_iceberg_metadata()
assert table.count() == 2
23 changes: 23 additions & 0 deletions local-data-platform/tests/test_dimension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from local_data_platform.store.dimension import Dimension
from local_data_platform.product.vocab import Vocab

def test_get():
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love how simple this is 🥇

data = [{
'name': 'baked ham glazed with pineapple and chipotle peppers',
'minutes': 85,
'submitted': '2005-11-28',
'tags': "['ham', 'time-to-make', 'course', 'main-ingredient', 'cuisine', 'preparation', 'occasion', 'north-american', 'lunch', 'main-dish', 'pork', 'american', 'mexican', 'southwestern-united-states', 'tex-mex', 'oven', 'holiday-event', 'easter', 'stove-top', 'spicy', 'christmas', 'meat', 'taste-mood', 'sweet', 'equipment', 'presentation', 'served-hot', '4-hours-or-less']",
'n_steps': 7,
'steps': "['mix cornstarch with a little cold water to dissolve', 'place all ingredients except for ham in a blender and blend smooth , in a small saucepan over medium heat bring to a boil them simmer till thickened', 'preheat oven to 375 f', 'place ham , cut end down , in a large baking pan and score skin', 'bake ham for 15 minutes', 'brush glaze over ham and bake for another hour or until internal temperature reads 140 f', 'baste half way through baking']",
'description': 'sweet, smokey and spicy! go ahead and leave the seeds in if you enjoy the heat.',
'ingredients': "['smoked ham', 'brown sugar', 'crushed pineapple', 'chipotle chile in adobo', 'adobo sauce', 'nutmeg', 'fresh ginger', 'cornstarch', 'salt']",
'n_ingredients': 9,
'average_rating': 5.0,
'votes': 27,
'Score': 4.85275401,
'calories': 712.5,
'category': 'Non-veg'
}]
tbl = Dimension(name=Vocab.LISTINGS,data=data)
assert len(tbl.get()) == 1

53 changes: 53 additions & 0 deletions local-data-platform/tests/test_extract_big_query_to_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from local_data_platform.pipeline.ingestion.bigquery_to_csv import BigQueryToCSV
from local_data_platform import Config, SupportedFormat, SupportedEngine
from local_data_platform.store.source.json import Json
from local_data_platform.exceptions import PipelineNotFound
from local_data_platform.logger import log
import pytest


logger = log()


class TestBigQueryToCSV:

def test_config(
self,
dataset="near_transactions",
config_path="/sample_data" + "/real_world_use_cases/near_data_lake/config/ingestion.json",
):

config = Config(
**Json(
name=dataset,
path=config_path,
).get()
)
if (
config.metadata["source"]["format"] == SupportedFormat.JSON.value
and config.metadata["target"]["format"] == SupportedFormat.CSV.value
and config.metadata["source"]["engine"] == SupportedEngine.BIGQUERY.value
):
data_loader = BigQueryToCSV(config=config)
data_loader.load()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: Write test cases


assert True
else:
raise AssertionError(
f"""
source {config.metadata['source']['format']}
to target {config.metadata['target']['format']}
pipeline is not supported yet
"""
)
# def test_extract(self):
# def test_schema(table_v2: Table) -> None:
# assert table_v2.schema() == Schema(
# NestedField(field_id=1, name="x", field_type=LongType(), required=True),
# NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"),
# NestedField(field_id=3, name="z", field_type=LongType(), required=True),
# identifier_field_ids=[1, 2],
# )
# assert table_v2.schema().schema_id == 1
#
# assert False
3 changes: 3 additions & 0 deletions local-data-platform/tests/test_format_iceberg_extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class TestIceberg:
def test_put(self):
assert False
80 changes: 80 additions & 0 deletions local-data-platform/tests/test_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import Optional, List, Union
from local_data_platform.format.iceberg.metadata import (
table_statistics_from_iceberg_metadata,
IcebergTableStatistics
)
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.schema import Schema
from pyiceberg.catalog import Catalog
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.table import Table
from pyiceberg.typedef import Properties
import pyarrow as pa
import pytest
from pyiceberg.catalog.sql import SqlCatalog
from local_data_platform.logger import log


logger = log()

warehouse_path = "sample_data/"
catalog = SqlCatalog(
"default",
**{
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
},
)

UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)

def _create_table(
session_catalog: Catalog,
identifier: str,
properties: Properties = {},
data: Optional[List[pa.Table]] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
) -> Table:

try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

data = pa.Table.from_pylist(
[
{
"foo": "foo_val",
"bar": 1,
"baz": False,
"qux": ["x", "y"],
"quux": {"key": {"subkey": 2}},
"location": [{"latitude": 1.1}],
"person": {"name": "some_name", "age": 23},
}
]
)

tbl = session_catalog.create_table_if_not_exists(identifier=identifier, schema=data.schema, properties=properties, partition_spec=partition_spec)

tbl.append(data)

return tbl


def test_table_statistics_from_iceberg_metadata():
identifier = "default.iceberg_table_with_stats"
if not catalog._namespace_exists('default'):
catalog.create_namespace('default')

table = _create_table(
session_catalog=catalog,
identifier = identifier
)
stats = table_statistics_from_iceberg_metadata(
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1 I am trying to solve for the statistics here @rakhioza07 @redpheonixx

iceberg_table_io=table.io,
iceberg_metadata=table.metadata
)
logger.error(stats.)
if stats.partition_count != 1:
assert False
51 changes: 51 additions & 0 deletions spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from pyspark.sql import SparkSession
from typing import List


def spark() -> "SparkSession":
import importlib.metadata

from pyspark.sql import SparkSession

# Remember to also update `dev/Dockerfile`
spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2])
scala_version = "2.12"
iceberg_version = "1.6.0"

os.environ["PYSPARK_SUBMIT_ARGS"] = (
f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version},"
f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell"
)
os.environ["AWS_REGION"] = "us-east-1"
os.environ["AWS_ACCESS_KEY_ID"] = "admin"
os.environ["AWS_SECRET_ACCESS_KEY"] = "password"

spark = (
SparkSession.builder.appName("PyIceberg integration test")
.config("spark.sql.session.timeZone", "UTC")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog")
.config("spark.sql.catalog.integration.cache-enabled", "false")
.config("spark.sql.catalog.integration.uri", "http://localhost:8181")
.config("spark.sql.catalog.integration.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/")
.config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000")
.config("spark.sql.catalog.integration.s3.path-style-access", "true")
.config("spark.sql.defaultCatalog", "integration")
.config("spark.sql.catalog.hive", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hive.type", "hive")
.config("spark.sql.catalog.hive.uri", "http://localhost:9083")
.config("spark.sql.catalog.hive.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.catalog.hive.warehouse", "s3://warehouse/hive/")
.config("spark.sql.catalog.hive.s3.endpoint", "http://localhost:9000")
.config("spark.sql.catalog.hive.s3.path-style-access", "true")
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
.getOrCreate()
)

return spark

def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
for sql in sqls:
spark.sql(sql)
17 changes: 17 additions & 0 deletions streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Streaming

## Kafka

### docker

Generally Kafka needs to run with a Zoo Keeper. However, with KRaft we can run it without Zookeeper in local.

[KRaft docker compose](https://developer.confluent.io/confluent-tutorials/kafka-on-docker/?utm_medium=sem&utm_source=google&utm_campaign=ch.sem_br.nonbrand_tp.prs_tgt.dsa_mt.dsa_rgn.india_lng.eng_dv.all_con.confluent-developer&utm_term=&creative=&device=c&placement=&gad_source=1&gclid=Cj0KCQiAv628BhC2ARIsAIJIiK8Blwt5X8DJdUNeXRBMhTEOS4crsZx5eanHfcb7JWv13rN944nCG4IaAgXAEALw_wcB)

### python kafka

confluent-python is a very easy python library to interact with the kafka queue

### spark

streaming or batch can write to iceberg