Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/backend/base/langflow/plugin_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ def add_api_route(self, path: str, endpoint, **kwargs):
self._check_and_reserve(path, set(methods))
return self._app.add_api_route(path, endpoint, **kwargs)

def add_middleware(self, middleware_class, **kwargs):
"""Allow plugins to register ASGI middleware on the host app."""
self._app.add_middleware(middleware_class, **kwargs)


def load_plugin_routes(app: FastAPI) -> None:
"""Discover and register additional routers from enterprise plugins.
Expand Down
41 changes: 41 additions & 0 deletions src/backend/saas/alembic.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[alembic]
# Path to migration scripts, relative to this file.
script_location = langflow_saas/migrations

# sqlalchemy.url is intentionally left blank — the env.py reads
# SAAS_DATABASE_URL (falling back to LANGFLOW_DATABASE_URL) at runtime.
sqlalchemy.url =

[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
1 change: 1 addition & 0 deletions src/backend/saas/langflow_saas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""langflow-saas: pluggable SaaS / multi-tenancy layer for Langflow."""
Empty file.
270 changes: 270 additions & 0 deletions src/backend/saas/langflow_saas/api/billing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
"""Billing, plans, usage, and Stripe webhook endpoints.

Routes:
GET /api/saas/v1/plans — list active plans (public)
GET /api/saas/v1/orgs/{org_id}/billing — get subscription details
POST /api/saas/v1/orgs/{org_id}/billing/checkout — create Stripe checkout session
GET /api/saas/v1/orgs/{org_id}/usage — get usage summary
POST /api/saas/v1/billing/webhook — Stripe webhook (no auth, HMAC-verified)
GET /api/saas/v1/audit — audit log (admin+)
"""

from __future__ import annotations

from datetime import datetime, timezone
from uuid import UUID

Check failure on line 15 in src/backend/saas/langflow_saas/api/billing.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (TC003)

src/backend/saas/langflow_saas/api/billing.py:15:18: TC003 Move standard library import `uuid.UUID` into a type-checking block

from fastapi import APIRouter, HTTPException, Request, status
from sqlmodel import select

from langflow_saas.dependencies import CurrentOrgContext, RequireAdmin, assert_org_match
from langflow_saas.models import (
AuditLog,
Organization,
Plan,
PlanRead,
Subscription,
SubscriptionRead,
UsageMetric,
UsageRecord,
UsageSummary,
)
from langflow_saas.services import get_billing_service
from langflow_saas.settings import get_saas_settings

router = APIRouter(tags=["Billing & Plans"])


# ---------------------------------------------------------------------------
# Plans (public, no auth)
# ---------------------------------------------------------------------------


@router.get("/plans", response_model=list[PlanRead])
async def list_plans():
"""Return all active plans. Safe to call without authentication."""
from langflow.services.deps import session_scope

async with session_scope() as db:
result = await db.exec(select(Plan).where(Plan.is_active == True)) # noqa: E712
return [PlanRead.model_validate(p) for p in result.all()]


# ---------------------------------------------------------------------------
# Subscription info
# ---------------------------------------------------------------------------


@router.get("/orgs/{org_id}/billing", response_model=SubscriptionRead | None)
async def get_subscription(org_id: UUID, ctx: CurrentOrgContext):
assert_org_match(org_id, ctx)
from langflow.services.deps import session_scope

async with session_scope() as db:
sub_result = await db.exec(select(Subscription).where(Subscription.org_id == org_id))
sub = sub_result.first()
if not sub:
return None

plan_result = await db.exec(select(Plan).where(Plan.id == sub.plan_id))
plan = plan_result.first()
if not plan:
return None

return SubscriptionRead(
id=sub.id,
org_id=sub.org_id,
status=sub.status,
plan=PlanRead.model_validate(plan),
current_period_end=sub.current_period_end,
cancel_at_period_end=sub.cancel_at_period_end,
trial_end=sub.trial_end,
)


class CheckoutRequest(PlanRead):
stripe_price_id: str
billing_cycle: str = "monthly" # "monthly" | "yearly"


@router.post("/orgs/{org_id}/billing/checkout")
async def create_checkout(org_id: UUID, request: Request, ctx: RequireAdmin):
"""Create a Stripe Checkout Session and return the redirect URL."""
assert_org_match(org_id, ctx)
settings = get_saas_settings()
if not settings.billing_enabled:
raise HTTPException(501, "Billing is not enabled on this instance.")

body = await request.json()
price_id: str = body.get("stripe_price_id", "")
if not price_id:
raise HTTPException(400, "stripe_price_id is required.")

from langflow.services.deps import session_scope

async with session_scope() as db:
org_result = await db.exec(select(Organization).where(Organization.id == org_id))
org = org_result.first()
if not org:
raise HTTPException(404, "Organization not found.")

# Fetch owner email from Langflow user table.
from langflow.services.database.models.user.model import User

user_result = await db.exec(select(User).where(User.id == org.owner_id))
owner = user_result.first()
owner_email = getattr(owner, "email", "") or f"{org.slug}@noemail.local"

url = await get_billing_service().create_checkout_session(
org_id=org_id,
org_name=org.name,
owner_email=owner_email,
stripe_price_id=price_id,
success_url=f"{settings.app_base_url}/settings/billing?success=1",
cancel_url=f"{settings.app_base_url}/settings/billing?canceled=1",
)
return {"checkout_url": url}


# ---------------------------------------------------------------------------
# Usage summary
# ---------------------------------------------------------------------------


@router.get("/orgs/{org_id}/usage", response_model=UsageSummary)
async def get_usage(org_id: UUID, ctx: CurrentOrgContext):
assert_org_match(org_id, ctx)
settings = get_saas_settings()
from langflow.services.deps import session_scope
from sqlalchemy import func

today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)

async with session_scope() as db:
# Get plan limits.
org_result = await db.exec(select(Organization).where(Organization.id == org_id))
org = org_result.first()
plan: Plan | None = None
if org and org.plan_id:
plan_result = await db.exec(select(Plan).where(Plan.id == org.plan_id))
plan = plan_result.first()

max_flows = plan.max_flows if plan else settings.default_max_flows
max_exec = plan.max_executions_per_day if plan else settings.default_max_executions_per_day
max_storage = plan.max_storage_mb if plan else settings.default_max_storage_mb

# Count executions today.
exec_result = await db.exec(
select(func.sum(UsageRecord.value)).where(
UsageRecord.org_id == org_id,
UsageRecord.metric == UsageMetric.FLOW_EXECUTION,
UsageRecord.recorded_at >= today_start,
)
)
execs_today = int(exec_result.first() or 0)

# Count API calls today.
api_result = await db.exec(
select(func.sum(UsageRecord.value)).where(
UsageRecord.org_id == org_id,
UsageRecord.metric == UsageMetric.API_CALL,
UsageRecord.recorded_at >= today_start,
)
)
api_calls_today = int(api_result.first() or 0)

# Storage (sum of all storage_bytes records for this org).
storage_result = await db.exec(
select(func.sum(UsageRecord.value)).where(
UsageRecord.org_id == org_id, UsageRecord.metric == UsageMetric.STORAGE_BYTES
)
)
storage_bytes = int(storage_result.first() or 0)

# Count flows from Langflow's flows table for org members.
# We aggregate flows belonging to all members of the org.
from langflow.services.database.models.flow.model import Flow

from langflow_saas.models import UserOrganization

member_result = await db.exec(select(UserOrganization.user_id).where(UserOrganization.org_id == org_id))
member_ids = [r for r in member_result.all()]

Check failure on line 191 in src/backend/saas/langflow_saas/api/billing.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (C416)

src/backend/saas/langflow_saas/api/billing.py:191:22: C416 Unnecessary list comprehension (rewrite using `list()`)
flow_count = 0
if member_ids:
flow_count_result = await db.exec(
select(func.count(Flow.id)).where(Flow.user_id.in_(member_ids)) # type: ignore[attr-defined]
)
flow_count = int(flow_count_result.first() or 0)

return UsageSummary(
org_id=org_id,
executions_today=execs_today,
executions_limit=max_exec,
flows_count=flow_count,
flows_limit=max_flows,
storage_mb=round(storage_bytes / (1024 * 1024), 2),
storage_limit_mb=max_storage,
api_calls_today=api_calls_today,
plan_slug=plan.slug if plan else "free",
)


# ---------------------------------------------------------------------------
# Stripe Webhook (no auth — Stripe HMAC-verified)
# ---------------------------------------------------------------------------


@router.post("/billing/webhook", status_code=status.HTTP_200_OK)
async def stripe_webhook(request: Request):
settings = get_saas_settings()
if not settings.billing_enabled:
raise HTTPException(501, "Billing not enabled.")

payload = await request.body()
sig_header = request.headers.get("stripe-signature", "")

try:
result = await get_billing_service().handle_webhook(payload=payload, sig_header=sig_header)
except Exception as exc:
raise HTTPException(400, f"Webhook processing failed: {exc}") from exc

return result
Comment on lines +217 to +231
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Stripe webhook lacks idempotency handling and leaks raw exception messages.

Stripe is documented to deliver events at-least-once and recommends idempotent processing keyed by event.id. The current handler dispatches every payload to BillingService.handle_webhook without any seen-event check, which can cause duplicate Subscription upserts / audit entries. Additionally, surfacing f"Webhook processing failed: {exc}" in the response leaks internal error details to Stripe (and any attacker that can probe the endpoint). Log the exception server-side and return a generic 400.

🛠 Suggested fix
-    try:
-        result = await get_billing_service().handle_webhook(payload=payload, sig_header=sig_header)
-    except Exception as exc:
-        raise HTTPException(400, f"Webhook processing failed: {exc}") from exc
+    try:
+        result = await get_billing_service().handle_webhook(payload=payload, sig_header=sig_header)
+    except Exception as exc:
+        logger.exception("Stripe webhook processing failed")
+        raise HTTPException(400, "Webhook processing failed.") from exc

Idempotency can be added inside handle_webhook by recording processed event.ids in a small dedicated table and short-circuiting on duplicates.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/saas/langflow_saas/api/billing.py` around lines 217 - 231, The
stripe_webhook endpoint should stop returning raw exception text and should
enforce idempotency by event.id; update stripe_webhook and
BillingService.handle_webhook so that handle_webhook first parses the Stripe
event id, checks a dedicated "processed_events" table/record keyed by event.id
and short-circuits (returning 200) if seen, otherwise records the id before
performing Subscription upserts/audits, and on any exception log the full
exception server-side (use the existing logger) and raise a generic
HTTPException(400, "Webhook processing failed") from stripe_webhook instead of
f-stringing the exception; reference functions: stripe_webhook,
get_billing_service().handle_webhook, and BillingService.handle_webhook when
adding the idempotency check and persistent processed-events recording.



# ---------------------------------------------------------------------------
# Audit Log
# ---------------------------------------------------------------------------


@router.get("/audit")
async def get_audit_log(
ctx: RequireAdmin,
limit: int = 100,
offset: int = 0,
):
"""Paginated audit log for the current organization."""
from langflow.services.deps import session_scope

async with session_scope() as db:
result = await db.exec(
select(AuditLog)
.where(AuditLog.org_id == ctx.org_id)
.order_by(AuditLog.created_at.desc()) # type: ignore[union-attr]
.offset(offset)
.limit(min(limit, 500))
)
entries = result.all()

return [
{
"id": str(e.id),
"action": e.action,
"user_id": str(e.user_id) if e.user_id else None,
"resource_type": e.resource_type,
"resource_id": e.resource_id,
"metadata": e.log_metadata,
"ip_address": e.ip_address,
"created_at": e.created_at.isoformat(),
}
for e in entries
]
Loading
Loading