Skip to content
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8a57bc8
V0.1.1 (#58)
tusharchou Oct 30, 2024
d1c0894
0.1.1 BigQuery Release (#46)
tusharchou Oct 30, 2024
72ab669
pypi release through github actions (#60)
tusharchou Oct 30, 2024
0757da8
Create manual.yml
tusharchou Oct 30, 2024
cd2cbbe
Update publish.yml (#62)
tusharchou Oct 30, 2024
80c8286
Update publish.yml (#63)
tusharchou Oct 30, 2024
48efe67
Update pyproject.toml (#64)
tusharchou Oct 30, 2024
d1dbf62
Update publish.yml (#65)
tusharchou Oct 30, 2024
7464490
Brmhastra patch 1 (#67)
brmhastra Oct 30, 2024
773058f
Brmhastra patch 1 (#68)
brmhastra Oct 30, 2024
d3e26b7
Brmhastra patch 1 (#69)
brmhastra Oct 30, 2024
840223e
GitHub workflow for pypi release (#70)
mrutunjay-kinagi Oct 30, 2024
e39db41
GitHub workflow for pypi release (#71)
mrutunjay-kinagi Oct 30, 2024
8ab9414
Update publish.yml (#72)
mrutunjay-kinagi Oct 30, 2024
11143b4
Update publish.yml (#73)
mrutunjay-kinagi Oct 30, 2024
c9d1101
Release v1.1 changes (#74)
mrutunjay-kinagi Oct 31, 2024
c3f2aa1
Github action setup for release v1.1 (#76)
mrutunjay-kinagi Oct 31, 2024
bf783b4
Update publish.yml
redpheonixx Nov 1, 2024
9cb0189
Merge pull request #77 from tusharchou/redpheonixx-patch-1
redpheonixx Nov 1, 2024
81eab88
Merge pull request #61 from tusharchou/tusharchou-patch-4
redpheonixx Nov 1, 2024
00f5009
Update publish.yml (#78)
redpheonixx Nov 2, 2024
28d5787
Update publish.yml
redpheonixx Nov 2, 2024
5735908
Merge pull request #79 from tusharchou/redpheonixx-patch-3
redpheonixx Nov 2, 2024
66b31b0
Update publish.yml (#80)
redpheonixx Nov 3, 2024
6716cfd
Update pyproject.toml (#81)
redpheonixx Nov 4, 2024
f8c994a
Release v1.1 fix (#84)
mrutunjay-kinagi Nov 5, 2024
4980033
0.1.1 Pytest Added for BigQuery Source (#86)
mrutunjay-kinagi Nov 9, 2024
0e7e61d
fix(deps): Stabilize build dependencies and configuration
tusharchou Jul 17, 2025
ea64269
Merge pull request #95 from tusharchou/fix/stabilize-dependencies
redpheonixx Jul 17, 2025
0c4c40f
Add restaurant_data_mart_PRD.md for managing restaurant data with LDP…
sankalpmodi94 Jul 20, 2025
09755aa
feat(dx): Add Makefile and setup guide (#96)
tusharchou Jul 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .github/workflows/manual.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# This is a basic workflow that is manually triggered

name: Manual workflow

# Controls when the action will run. Workflow runs when manually triggered using the UI
# or API.
on:
workflow_dispatch:
# Inputs the workflow accepts.
inputs:
name:
# Friendly description to be shown in the UI instead of 'name'
description: 'Person to greet'
# Default value if no value is explicitly provided
default: 'World'
# Input has to be provided for the workflow to run
required: true
# The data type of the input
type: string

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
# This workflow contains a single job called "greet"
greet:
# The type of runner that the job will run on
runs-on: ubuntu-latest

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Runs a single command using the runners shell
- name: Send greeting
run: echo "Hello ${{ inputs.name }}"
42 changes: 42 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: Upload Python Package to PyPI when a Release is Created

on:
release:
types: [created]
workflow_dispatch:

jobs:
pypi-publish:
name: Publish release to PyPI
runs-on: ubuntu-latest
environment:
name: production
url: https://pypi.org/p/local-data-platform
permissions:
id-token: write

steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.x"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools wheel poetry==1.8
cd local-data-platform
poetry install

- name: Build package
run: |
cd local-data-platform
poetry build

- name: Publish package to PyPI
uses: pypa/gh-action-pypi-publish@v1.11.0
with:
packages-dir: local-data-platform/dist/

3 changes: 3 additions & 0 deletions local-data-platform/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Local Data Platform
Local Data Platform is a python library that uses open source tools to orchestrate a data platform operations locally for development and testing. <br/>
This library provides solutions for all stages ranging from ingestion to reporting all of which one can build data pipeline locally, test and easily scale up to cloud.
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion local-data-platform/local_data_platform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class Table(Base):

def __init__(self, name: str, path: Path = os.getcwd()):
self.name = name
self.path = path
self.path = os.getcwd()+path
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why you have taken os.getcwd() twice, once as argument and other time concatenating ?
pls check


def get(self):
raise TableNotFound(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ class LocalIcebergCatalog(SqlCatalog):

def __init__(self, name: str, path: str, *args, **kwargs):
self.name = name
self.uri = f"sqlite:///{path}/{name}.db"
self.uri = f"sqlite:///{path}/{name}_catalog.db"
self.warehouse = f"file://{path}"
try:
logger.error(f"Initializing LocalIcebergCatalog with {self.uri}")
logger.info(f"Initializing LocalIcebergCatalog with {self.uri}")
super().__init__(*args, **kwargs, **self.__dict__)
except Exception as e:
logger.error(f"Failed to initialize LocalIcebergCatalog {e}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@ def __init__(self, *args, **kwargs):

def get(self) -> Table:
if not os.path.isfile(self.path):
logger.error(f"This path {self.path} is invalid")
raise FileNotFoundError

logger.info(
f"""
reading CSV from {self.path}
"""
)
df = csv.read_table(self.path)
df = csv.read_csv(self.path)
logger.info(
f"""
df type {type(df)}
df type {type(df)} len {len(df)}
"""
)
if df is not None:
Expand Down
28 changes: 19 additions & 9 deletions local-data-platform/local_data_platform/format/iceberg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from pyiceberg.typedef import Identifier
from pyarrow import Table
from local_data_platform.logger import log
import os

os.environ['PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE'] = 'true'


logger = log()

Expand All @@ -21,35 +25,41 @@ class Iceberg(Format):
Methods:
__init__(catalog: str, *args, **kwargs):
Initializes the Iceberg instance with the given catalog and metadata.

put(df: Table) -> Table:
Writes the given data frame to the Iceberg table.

get():
Fetches data from the Iceberg table and returns it as an Arrow table.
"""
def __init__(self, catalog: str, *args, **kwargs):
logger.info(f"Iceberg catalog : {catalog}")
self.catalog_identifier = catalog["identifier"]
def __init__(self, config: str, *args, **kwargs):
logger.info(f"Iceberg catalog : {config}")
self.catalog_identifier = config["identifier"]
self.catalog = LocalIcebergCatalog(
self.catalog_identifier, path=catalog["warehouse_path"]
self.catalog_identifier, path=config["warehouse_path"]
)
self.catalog.create_namespace(self.catalog_identifier)
if not self.catalog._namespace_exists(self.catalog_identifier):
self.catalog.create_namespace(self.catalog_identifier)
self.identifier = f"{self.catalog_identifier}.{kwargs['name']}"
self.metadata = kwargs
logger.info(f"Iceberg created with catalog namespace {self.catalog_identifier}")
logger.info(f"Iceberg initialised with identifier {self.identifier}")
super().__init__(*args, **kwargs)

def put(self, df: Table) -> Table:
if not df:
logger.error(f"While doing put in Iceberg Format we got df as None")
raise Exception(f" Got Table as non")
logger.info(f"self.identifier {self.identifier}")
logger.info(
f"""
Writing {len(df)} to Iceberg Table {self.identifier}
Writing type {type(df)} of length {len(df)} to Iceberg Table {self.identifier}
"""
)
table = self.catalog.create_table_if_not_exists(
identifier=self.identifier, schema=df.schema
identifier=self.identifier, schema=df.schema, properties={
"downcast-ns-timestamp-to-us-on-write": True # Set property for downcasting
}
)
table.append(df)
return table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def get(self) -> Table:
if not os.path.isfile(self.path):
raise FileNotFoundError

logger.info(
f"""
reading parquet from {self.path}
"""
)
if not os.path.isfile(self.path):
raise FileNotFoundError

df = parquet.read_table(self.path)
logger.info(
f"""
Expand Down
5 changes: 1 addition & 4 deletions local-data-platform/local_data_platform/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
def log():
basicConfig(level=INFO, format=
"""
%(filename)s - %(funcName)s
- %(asctime)s - %(name)s
- %(levelname)s
- message : %(message)s
%(message)s
"""
)

Expand Down
2 changes: 0 additions & 2 deletions local-data-platform/local_data_platform/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,5 @@ class Pipeline(Flow):

def __init__(self, config: Config, *args, **kwargs):
self.config = config
# self.source = Source(**config.metadata['source'])
# self.target = Target(**config.metadata['target'])
super().__init__(*args, **kwargs)

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, config: Config, *args, **kwargs):
self.source = config.metadata["source"]
self.target = config.metadata["target"]
self.target = CSV(name=self.target["name"], path=self.target["path"])
self.source = Iceberg(name=self.source["name"], catalog=self.source["catalog"])
self.source = Iceberg(name=self.source["name"], config=self.source["catalog"])
logger.info(
f"""
IcebergToCSV initialised with
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from local_data_platform.pipeline import Pipeline
from local_data_platform.logger import log

logger = log()

class Ingestion(Pipeline):

class Ingestion(Pipeline):

def extract(self):
self.source.get()
logger.info("Extracting Source in ingestion pipeline")
return self.source.get()

def load(self):
self.target.put(self.extract())
df = self.extract()
logger.info(f"Loading Source {len(df)} in ingestion pipeline")
self.target.put(df)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

logger = log()


class CSVToIceberg(Ingestion):
"""
CSVToIceberg is a class responsible for ingesting data from a CSV source and
Expand Down Expand Up @@ -39,7 +40,7 @@ def __init__(self, config, *args, **kwargs):
)
self.target = Iceberg(
name=self.target['name'],
catalog=self.target['catalog']
config=self.target['catalog']
)
logger.info(
f"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, config: Config, *args, **kwargs):
)
self.target = Iceberg(
name=self.target['name'],
catalog=self.target['catalog']
config=self.target['catalog']
)
logger.info(
f"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pathlib import Path
import json
from local_data_platform import Credentials
from local_data_platform import logger
from local_data_platform.logger import log

logger = log()

Expand Down
4 changes: 2 additions & 2 deletions local-data-platform/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "local-data-platform"
version = "0.1.0"
description = ""
version = "0.1.1"
description = "Python library for iceberg lake house on your local"
authors = ["Tushar Choudhary <151359025+tusharchou@users.noreply.github.com>"]
readme = "README.md"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from local_data_platform.pipeline.ingestion.bigquery_to_csv import BigQueryToCSV
from local_data_platform import Config, SupportedFormat, SupportedEngine
from local_data_platform.store.source.json import Json
from local_data_platform.exceptions import PipelineNotFound
import os
from local_data_platform.logger import log


logger = log()


def get_near_transaction_dataset(
dataset="near_transactions",
config_path="/real_world_use_cases/near_data_lake/config/ingestion.json",
):
"""
Retrieves and processes the near transaction dataset based on the provided configuration.

Args:
dataset (str): The name of the dataset to be processed. Defaults to "near_transactions".
config_path (str): The path to the configuration file. Defaults to "/real_world_use_cases/near_data_lake/config/ingestion.json".

Raises:
PipelineNotFound: If the source and target formats specified in the configuration are not supported.

Returns:
None
"""

config = Config(
**Json(
name=dataset,
path=config_path,
).get()
)
print(config)
logger.info(
f"""
We are using the following dictionary as the configuration to generate a monthly trust metric
{config}
"""
)
if (
config.metadata["source"]["format"] == SupportedFormat.JSON.value
and config.metadata["target"]["format"] == SupportedFormat.CSV.value
and config.metadata["source"]["engine"] == SupportedEngine.BIGQUERY.value
):
data_loader = BigQueryToCSV(config=config)
data_loader.load()
else:
raise PipelineNotFound(
f"""
source {config.metadata['source']['format']}
to target {config.metadata['target']['format']}
pipeline is not supported yet
"""
)


get_near_transaction_dataset()
Loading