Ledger Sync uses SQLite (dev) / Neon PostgreSQL (prod) with SQLAlchemy 2.0 ORM (Mapped types). The database stores financial transactions with multi-user support, and includes models for analytics, budgets, anomalies, net worth tracking, user display preferences (including multi-currency settings), salary/tax projection data, and AI assistant configuration (provider, model, encrypted API key).
Models are organized by bounded context in backend/src/ledger_sync/db/_models/ (split into enums.py, user.py, transactions.py, investments.py, analytics.py, planning.py). The models.py file at the top of db/ is a thin re-export facade -- always import from ledger_sync.db.models, never from _models directly.
Represents a single financial transaction (income, expense, or transfer). Uses SHA-256 hash as the primary key for deduplication.
class Transaction(Base):
__tablename__ = "transactions"
# Primary key is the SHA-256 hash (no separate auto-increment ID)
transaction_id: Mapped[str] = mapped_column(String(64), primary_key=True)
user_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"), nullable=False, index=True)
# Core fields
date: Mapped[datetime] = mapped_column(DateTime, nullable=False, index=True)
amount: Mapped[Decimal] = mapped_column(Numeric(precision=15, scale=2), nullable=False)
currency: Mapped[str] = mapped_column(String(10), nullable=False, default="INR")
type: Mapped[TransactionType] = mapped_column(Enum(TransactionType), nullable=False, index=True)
# Categorization
account: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
category: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
subcategory: Mapped[str | None] = mapped_column(String(255), nullable=True)
# Transfer-specific (only used when type=Transfer)
from_account: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True)
to_account: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True)
# Optional
note: Mapped[str | None] = mapped_column(Text, nullable=True)
# Metadata
source_file: Mapped[str] = mapped_column(String(500), nullable=False)
last_seen_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, index=True)
is_deleted: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False, index=True)Transaction Types (enum): Expense, Income, Transfer
Table Schema:
| Column | Type | Constraints | Index | Description |
|---|---|---|---|---|
| transaction_id | VARCHAR(64) | PRIMARY KEY | YES | SHA-256 hash (dedup + PK) |
| user_id | INTEGER | FOREIGN KEY | YES | Owning user |
| date | TIMESTAMP | NOT NULL | YES | Transaction date |
| amount | DECIMAL(15,2) | NOT NULL | NO | Transaction amount |
| currency | VARCHAR(10) | DEFAULT INR | NO | Currency code |
| type | ENUM | NOT NULL | YES | Expense / Income / Transfer |
| account | VARCHAR(255) | NOT NULL | YES | Account name |
| category | VARCHAR(255) | NOT NULL | YES | Category |
| subcategory | VARCHAR(255) | NULL | NO | Sub-category |
| from_account | VARCHAR(255) | NULL | YES | Transfer source account |
| to_account | VARCHAR(255) | NULL | YES | Transfer destination account |
| note | TEXT | NULL | NO | Transaction note |
| source_file | VARCHAR(500) | NOT NULL | NO | Source Excel filename |
| last_seen_at | TIMESTAMP | NOT NULL | YES | Last import time |
| is_deleted | BOOLEAN | DEFAULT 0 | YES | Soft delete flag |
Composite Indexes:
ix_transactions_date_type (date, type)
ix_transactions_category_subcategory (category, subcategory)
ix_transactions_user_date (user_id, date)
ix_transactions_user_deleted (user_id, is_deleted)
ix_transactions_user_type_deleted (user_id, type, is_deleted)
ix_transactions_user_category (user_id, category)
ix_transactions_user_date_type (user_id, date, type)The database also includes these models (see backend/src/ledger_sync/db/models.py):
- User — OAuth authentication (Google, GitHub) with JWT tokens. Fields:
email,full_name,auth_provider,auth_provider_id,is_verified, timestamps - UserPreferences — fiscal year, essential categories, investment mappings, income classification, budget defaults, display format, anomaly settings, recurring settings, spending rule targets, credit card limits, earning start date, fixed expenses, savings/investment targets, payday, tax regime, excluded accounts, notification preferences, salary/tax projections, and AI assistant config:
salary_structure(JSON) — FY-keyed salary CTC breakdown (basic, HRA, special allowance, EPF, NPS, professional tax, variable pay)rsu_grants(JSON) — array of RSU grants with vesting schedulesgrowth_assumptions(JSON) — salary hike %, variable growth %, stock appreciation %, projection years, RSU inclusion flagai_provider(VARCHAR(20), nullable) —openai,anthropic, orbedrockai_model(VARCHAR(100), nullable) — provider-specific model ID; for Bedrock uses{model_id}|{region}composite format to avoid adding a separate region columnai_api_key_encrypted(TEXT, nullable) — base64-encodedsalt(16) || nonce(12) || ciphertext. Encrypted with AES-256-GCM using a PBKDF2-derived key (salt is per-ciphertext random)
- AccountClassification — User-defined account type mappings (Bank, Investment, Credit Card, etc.)
- DailySummary — Pre-computed daily aggregations (income, expenses, net, counts, top category per day). Unique index on (user_id, date). Used by YearInReview heatmap.
- MonthlySummary — Pre-calculated monthly income/expense/savings aggregations
- CategoryTrend — Category-level trends over time periods
- TransferFlow — Aggregated transfer flows between accounts
- RecurringTransaction — Detected recurring patterns (SIPs, subscriptions, salaries)
- MerchantIntelligence — Extracted merchant data from transaction notes
- NetWorthSnapshot — Point-in-time net worth with asset/liability breakdown
- FYSummary — Fiscal year summaries with YoY changes
- Anomaly — Detected anomalies (high expenses, budget exceeded)
- Budget — User-defined budget limits per category
- FinancialGoal — Savings goals with target amounts and dates
- AuditLog — Operation audit trail
The database is automatically initialized on application startup. The init_db() function in db/session.py runs when the FastAPI app starts (via the lifespan handler in api/main.py) and creates all tables defined by SQLAlchemy models if they do not already exist. This means you do not need to manually create tables for a fresh install -- simply starting the backend is sufficient. For schema changes after the initial setup, use Alembic migrations (see below).
Insert a new transaction:
transaction = Transaction(
transaction_id="abc123...",
date=datetime(2025, 1, 15),
amount=5000.00,
type=TransactionType.EXPENSE,
category="Groceries",
account="Checking",
note="Weekly groceries",
source_file="MoneyManager.xlsx"
)
session.add(transaction)
session.commit()Get transactions by various criteria:
# Get all transactions
all_txns = session.query(Transaction).all()
# Get by date range
txns = session.query(Transaction).filter(
Transaction.date >= start_date,
Transaction.date <= end_date
).all()
# Get by category
category_txns = session.query(Transaction).filter(
Transaction.category == "Rent"
).all()
# Get non-deleted transactions
active = session.query(Transaction).filter(
Transaction.is_deleted == False
).all()
# Get by transaction_id (for deduplication check)
existing = session.query(Transaction).filter(
Transaction.transaction_id == transaction_id
).first()
# Aggregations
from sqlalchemy import func
total_income = session.query(func.sum(Transaction.amount)).filter(
Transaction.type == "Income"
).scalar()Update an existing transaction:
transaction = session.query(Transaction).filter(
Transaction.transaction_id == transaction_id
).first()
if transaction:
transaction.category = "Groceries"
transaction.amount = 5500.00
transaction.updated_at = datetime.utcnow()
session.commit()Mark transaction as deleted instead of removing:
transaction = session.query(Transaction).filter(
Transaction.transaction_id == transaction_id
).first()
if transaction:
transaction.is_deleted = True
transaction.updated_at = datetime.utcnow()
session.commit()alembic revision --autogenerate -m "Add new field to transactions"This creates a migration file in alembic/versions/.
# Apply all pending migrations
alembic upgrade head
# Apply specific migration
alembic upgrade abc123def456
# Rollback one migration
alembic downgrade -1
# Rollback all
alembic downgrade baseMigration files are Python scripts that define upgrade() and downgrade() functions:
def upgrade() -> None:
op.add_column('transactions',
sa.Column('new_field', sa.String(100), nullable=True)
)
def downgrade() -> None:
op.drop_column('transactions', 'new_field')Transaction IDs are generated using SHA-256 hash of:
hash_input = f"{date}|{amount}|{category}|{account}"
transaction_id = hashlib.sha256(hash_input.encode()).hexdigest()Benefits:
- Deterministic: Same transaction always generates same ID
- No central ID service needed
- Collision resistant (SHA-256)
- Enables file re-import without duplicates
- Unique transaction_id - No duplicate transactions
- Non-null date - Every transaction must have a date
- Non-null amount - Every transaction must have an amount
- Non-null type - Transaction type is required
User-linked child tables reference users.id via foreign keys. Cascade behavior varies by migration — newer tables include ON DELETE CASCADE, but older migrations may not. Verify per table if needed.
created_at- When record was createdupdated_at- When record was last modifiedis_deleted- Soft delete instead of hard deletelast_import_time- When transaction was last imported
transaction_id- O(1) lookup for duplicatesdate- Fast range queriescategory- Fast filtering by categoryaccount- Fast filtering by accountis_deleted- Fast filtering of active records(user_id, category)- Fast per-user category aggregation(user_id, date, type)- Fast per-user time-range + type filtering
- Use indexes - Always filter by indexed columns
- Avoid full table scans - Use WHERE clauses
- Limit results - Use LIMIT for pagination
- Batch operations - Use bulk_insert for large imports
- Connection pooling - Reuse database connections
- Pre-computed aggregation tables -
daily_summaries,monthly_summaries, andcategory_trendsstore pre-computed data. Calculation endpoints use a fast-path (read from these tables) when no date filter is active, falling back to raw transaction scans when filters are applied.
# Instead of this (full table scan)
all_txns = session.query(Transaction).all()
rent = [t for t in all_txns if t.category == "Rent"]
# Do this (indexed query)
rent = session.query(Transaction).filter(
Transaction.category == "Rent",
Transaction.is_deleted == False
).all()# Copy SQLite database file
cp ledger_sync.db ledger_sync.db.backup
# Or with timestamp
cp ledger_sync.db "ledger_sync.db.$(date +%Y%m%d_%H%M%S).backup"# Restore from backup
cp ledger_sync.db.backup ledger_sync.dbsqlite3 ledger_sync.db ".backup ledger_sync.db.backup"sqlite3 ledger_sync.db "PRAGMA integrity_check;"# Reclaim space and optimize
sqlite3 ledger_sync.db "VACUUM;"# Update query optimizer statistics
sqlite3 ledger_sync.db "ANALYZE;"PostgreSQL is supported out of the box. The application auto-detects the database type from DATABASE_URL and applies the appropriate configuration:
- SQLite: WAL mode, 64MB cache, NORMAL sync, foreign keys enabled
- PostgreSQL: Connection pooling (pool_size=20, max_overflow=10, pool_pre_ping=True)
For high-availability production deployments:
- Set
LEDGER_SYNC_DATABASE_URLto a PostgreSQL connection string - Add replication for high availability
- Implement partitioning for large tables
# Change DATABASE_URL
# postgresql://user:password@localhost/ledger_sync
# SQLAlchemy automatically uses PostgreSQL featuresThe User model already exists with full OAuth support (Google, GitHub). Account data is derived from transactions (no separate accounts table). Categories are also derived from transaction data and managed via UserPreferences.essential_categories.
sqlite:///ledger_sync.db
postgresql://username:password@localhost:5432/ledger_sync
Environment variables (in .env):
DATABASE_URL=sqlite:///ledger_sync.db
DB_ECHO=False # Log SQL queries (set to True for debugging)from ledger_sync.db.session import SessionLocal
def process_transactions(transactions):
session = SessionLocal()
try:
for txn in transactions:
session.add(txn)
session.commit()
except Exception as e:
session.rollback()
raise
finally:
session.close()from contextlib import contextmanager
@contextmanager
def get_db():
session = SessionLocal()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
# Usage
with get_db() as db:
db.add(transaction)