feat: add langflow-saas plugin for multi-tenant SaaS capabilities#12884
feat: add langflow-saas plugin for multi-tenant SaaS capabilities#12884pachu4u wants to merge 8 commits intolangflow-ai:mainfrom
Conversation
Introduces a self-contained `langflow-saas` Python package that plugs into Langflow's existing `langflow.plugins` entry-point group, adding full multi-tenancy on top of any Langflow instance with exactly one 4-line addition to the core (`plugin_routes.py` gains `add_middleware()`). ## What's included ### Core patch (plugin_routes.py) - `_PluginAppWrapper.add_middleware()` — exposes ASGI middleware registration to plugins so the SaaS layer can install rate-limiting and tenant context without modifying any other Langflow file. ### src/backend/saas/ — the plugin package **Models & DB (models.py, migrations/)** - 9 new `saas_*` tables: plan, organization, user_organization, invitation, team, team_member, subscription, usage_record, audit_log - Alembic migration (001saas) with Free/Pro/Enterprise plan seeds - `SaasAlembicVersion` SQLModel table so Langflow's migration drift-checker doesn't flag the separate version table as unknown **Middleware (middleware.py)** - `RateLimitMiddleware` — Redis sliding-window rate limiter (graceful no-op when Redis is unavailable) - `TenantContextMiddleware` — decodes JWT or API key, resolves active org from `X-Org-ID` header or auto-selects, stores `OrgContextData` on `request.state.saas_context` - `QuotaEnforcementMiddleware` — blocks flow executions when daily quota is exceeded, records usage after successful runs **REST API (api/)** - Orgs: CRUD + personal-org protection - Members: list / role-change / remove with IDOR guard - Invitations: HMAC-signed tokens, email delivery, accept flow - Teams: create / list / delete + member management - Billing: plan listing, Stripe checkout, webhook handler, usage summary - Audit log: paginated per-org log **Services & Settings (services.py, settings.py)** - `AuditService`, `BillingService`, pluggable email backends (console / SMTP / SendGrid / Resend) - All config via `SAAS_*` env vars with safe defaults Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughIntroduces a comprehensive SaaS module with organization management, team and member administration, billing integration, and multi-tenant enforcement via middleware. Includes API routes, database models, authentication/authorization, email services, rate limiting, quota enforcement, database migrations, and Langflow plugin integration. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant RateLimitMW as RateLimitMiddleware
participant TenantMW as TenantContextMiddleware
participant QuotaMW as QuotaEnforcementMiddleware
participant Handler as Route Handler
participant UsageDB as UsageRecord DB
participant Redis
Client->>RateLimitMW: POST /api/saas/v1/orgs/{id}/execute
RateLimitMW->>Redis: GET rate:key:minute
alt Redis Available
Redis-->>RateLimitMW: counter value
RateLimitMW->>Redis: INCR with TTL
alt Limit Exceeded
RateLimitMW-->>Client: HTTP 429 + headers
else Within Limit
RateLimitMW->>TenantMW: pass request
end
else Redis Down
RateLimitMW->>TenantMW: fail-open, pass request
end
TenantMW->>TenantMW: extract JWT token
TenantMW->>TenantMW: resolve user_id (no DB)
TenantMW->>TenantMW: select org (X-Org-ID or auto)
TenantMW->>TenantMW: load org/plan from DB
TenantMW->>TenantMW: populate OrgContextData
TenantMW->>QuotaMW: request.state.saas_context
QuotaMW->>UsageDB: SELECT SUM(value) TODAY for FLOW_EXECUTION
alt Usage Within Plan Limit
QuotaMW->>Handler: pass request
Handler->>Handler: execute flow
Handler-->>QuotaMW: 2xx response
QuotaMW->>UsageDB: INSERT UsageRecord (increment quota)
QuotaMW-->>Client: 2xx + quota headers
else Quota Exhausted
QuotaMW-->>Client: HTTP 429 + quota headers
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~55 minutes Important Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional. ❌ Failed checks (1 error, 2 warnings, 1 inconclusive)
✅ Passed checks (5 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Two plug-and-play improvements: 1. TenantContextMiddleware now auto-creates a personal org + OWNER membership on the first authenticated request for any user that has no org yet. New Langflow signups get their SaaS context without any manual DB seeding or restarting the server. 2. The 001saas migration skips all CREATE TABLE statements when the saas_plan table already exists, then upserts the built-in plans via ON CONFLICT DO NOTHING. Re-installing the plugin on an existing DB (or any future Langflow upgrade that re-runs migrations) no longer crashes with "table already exists". Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 17
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
🟡 Minor comments (15)
src/backend/saas/langflow_saas/plugin.py-41-70 (1)
41-70:⚠️ Potential issue | 🟡 MinorMigration failures are swallowed; SaaS will start with a broken schema.
_run_migrationswraps everything intry/except Exceptionand only logs. Ifcommand.upgrade(..., "heads")fails (for example because a new migration is malformed or the DB is unreachable), Langflow continues to start, the SaaS routes mount, and every subsequent request hits a half-migrated schema with cryptic 500s. That makes the failure mode much harder to diagnose than failing fast.Consider re-raising (or at minimum surfacing a setting like
SAAS_FAIL_FAST_ON_MIGRATION_ERROR=truedefaulting to true in production) so operators see the failure at boot rather than via runtime errors.Also note that line 65 forcibly overrides
sqlalchemy.url, which contradicts the comment insrc/backend/saas/alembic.ini(lines 5–6) sayingenv.pyresolves the URL fromSAAS_DATABASE_URL. Either drop the override here and letenv.pydo its job, or update the alembic.ini comment to reflect that the plugin path always sets the URL programmatically.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/plugin.py` around lines 41 - 70, The _run_migrations function currently swallows all migration errors (around command.upgrade) which lets the app start with a broken schema; change the try/except so migration failures are surfaced: after logger.exception re-raise the exception (or consult a new setting like SAAS_FAIL_FAST_ON_MIGRATION_ERROR exposed via get_saas_settings() and re-raise only if that flag is true, defaulting to true in production). Also stop silently overriding the DB URL: either remove the alembic_cfg.set_main_option("sqlalchemy.url", db_url) call and let alembic/env.py read SAAS_DATABASE_URL as the alembic.ini comment expects, or update the alembic.ini comment to state that the plugin programmatically sets sqlalchemy.url; make these changes in _run_migrations and the code paths that call alembic_cfg so behavior is consistent.src/backend/saas/langflow_saas/api/orgs.py-47-47 (1)
47-47:⚠️ Potential issue | 🟡 MinorReplace EN DASH with hyphen-minus (ruff RUF001 failure).
Static analysis is failing on this string. The character
–between3and63is an EN DASH (U+2013).🔧 Proposed fix
- detail="Slug must be 3–63 lowercase alphanumeric characters or hyphens, cannot start or end with a hyphen.", + detail="Slug must be 3-63 lowercase alphanumeric characters or hyphens, cannot start or end with a hyphen.",🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/api/orgs.py` at line 47, The validation error message assigned to the detail parameter contains an EN DASH (U+2013) between "3" and "63" which triggers ruff RUF001; edit the string literal used for detail (the "Slug must be 3–63 lowercase..." message) to replace the EN DASH with a plain ASCII hyphen-minus '-' so it reads "3-63", keeping the rest of the text unchanged.src/backend/saas/langflow_saas/migrations/env.py-1-11 (1)
1-11:⚠️ Potential issue | 🟡 MinorDocstring contradicts the actual implementation.
The docstring claims migrations "share Langflow's
alembic_versiontable via Alembic branch labels so there is no separate version table", butrun_migrations_online()at line 89 explicitly setsversion_table="saas_alembic_version", andmodels.pydefines the dedicatedSaasAlembicVersiontable. Please update the docstring (and the inline comment at line 69) to match the actual design, otherwise future maintainers will be misled.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/migrations/env.py` around lines 1 - 11, Docstring and an inline comment incorrectly state that migrations share Langflow's alembic_version via branch labels; instead the code creates and uses a dedicated version table. Update the module docstring and the inline comment near run_migrations_online to describe the actual behavior: that run_migrations_online configures Alembic with version_table="saas_alembic_version" and that models.py defines SaasAlembicVersion as the separate table used for tracking saas_* migrations. Ensure the wording clearly states there is a dedicated saas_alembic_version table (not shared) and mention the relevant symbols run_migrations_online, version_table="saas_alembic_version", and SaasAlembicVersion.src/backend/saas/langflow_saas/middleware.py-326-340 (1)
326-340:⚠️ Potential issue | 🟡 Minor
require_org_header=Truedoesn't actually reject requests with multiple orgs and no header.When the user has multiple memberships and
require_org_headeris set, the middleware justreturns without settingrequest.state.saas_context. Langflow's underlying auth still lets the request through (no SaaS context simply means SaaS routes raise 401 viaget_org_context), but non-SaaS routes execute without any tenant scoping — exactly the failure mode the flag is meant to prevent. Either return a 400/403 response from the middleware here, or document that the flag only protects/api/saas/...routes.Additionally, lines 330–337 issue one
SELECTper membership to find the personal org — please collapse this into a single query (e.g.SELECT * FROM saas_organization WHERE id IN (...) AND is_personal = true).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/middleware.py` around lines 326 - 340, The middleware currently early-returns when settings.require_org_header is True and the user has multiple memberships, which leaves request.state.saas_context unset and allows non-SaaS routes to run unscoped; change this to raise/return an HTTP error (400 or 403) from the middleware when multiple memberships are present and no org header is provided so the request is rejected immediately (ensure you use the same fastapi/Starlette response mechanism used elsewhere in the module). Also replace the per-membership SELECT loop that checks Organization.is_personal with a single query: use db.exec(select(Organization).where(Organization.id.in_([m.org_id for m in memberships]), Organization.is_personal == True)) to fetch all personal orgs at once, then pick membership = personal_memberships[0] if any else memberships[0]; make sure you still set request.state.saas_context/membership as before when resolved.src/backend/saas/langflow_saas/dependencies.py-7-12 (1)
7-12:⚠️ Potential issue | 🟡 MinorDocstring is out of sync with the exported names and behavior.
- Lines 9–11 document
RequireOrgRole,RequireOrgAdmin,RequireOrgOwner, but the actual exports (line 67–69) areRequireMember,RequireAdmin,RequireOwnerand the factory isrequire_role.- Lines 36–38 claim
get_org_contextraises 403 when an authenticated user has no org membership — but the implementation only raises 401 whensaas_contextis missing. The 403 case is never reached in this dependency.Please update the docstrings so they match the actual API.
Also applies to: 33-46
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/dependencies.py` around lines 7 - 12, Update the module docstring to match the actual exported symbols and behavior: replace references to RequireOrgRole/RequireOrgAdmin/RequireOrgOwner with the real factory and exports (require_role, RequireMember, RequireAdmin, RequireOwner) in the dependency graph and examples, and describe that require_role is a factory producing those role-checking dependencies. Also correct the get_org_context behavior note to state that it raises 401 when saas_context (authentication) is missing and does not itself raise 403 for missing org membership (remove or reword any claim that get_org_context raises 403); if you want to document a 403 case, mention where that check actually occurs (the role-requiring dependencies produced by require_role/RequireMember/RequireAdmin/RequireOwner). Ensure all docstring sections (including the block around CurrentOrgContext and the paragraph covering get_org_context) are updated to use these exact symbol names.src/backend/saas/langflow_saas/middleware.py-193-196 (1)
193-196:⚠️ Potential issue | 🟡 Minor
X-Forwarded-Fornot parsed correctly behind multi-hop proxies.
X-Forwarded-Foris a comma-separated chain likeclient, proxy1, proxy2; using the whole header as the rate-limit key collapses any client behind the same chain into a single bucket and lets attackers pad the header to bypass the limit. Take only the leftmost (client) hop and strip whitespace.🛠 Suggested fix
- if not key_id: - forwarded_for = request.headers.get("X-Forwarded-For") - key_id = forwarded_for or request.client.host if request.client else "unknown" + if not key_id: + xff = request.headers.get("X-Forwarded-For") + client_ip = xff.split(",", 1)[0].strip() if xff else None + if not client_ip and request.client: + client_ip = request.client.host + key_id = client_ip or "unknown"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/middleware.py` around lines 193 - 196, The rate-limit key generation uses the entire X-Forwarded-For header which is comma-separated; change the logic around key_id/forwarded_for in the middleware so you parse request.headers.get("X-Forwarded-For") by splitting on commas and taking the leftmost non-empty entry with .strip() as the client IP, and only use that value for key_id; fall back to request.client.host (if request.client exists) or "unknown" when the header is missing or empty—update the code that currently assigns key_id from forwarded_for or request.client.host to implement this parsing and trimming (references: key_id, forwarded_for, request.headers.get("X-Forwarded-For")).src/backend/saas/langflow_saas/api/billing.py-85-126 (1)
85-126:⚠️ Potential issue | 🟡 Minor
CheckoutRequestis dead code, andcreate_checkoutbypasses Pydantic body validation.
CheckoutRequest(lines 85–87) inherits fromPlanReadand is never referenced anywhere. The handler instead readsawait request.json()and pullsstripe_price_idmanually, which:
- Skips automatic validation/422 errors and OpenAPI schema generation,
- Forces clients to know undocumented field names,
- Subclassing
PlanReadfor a request body is also wrong (it pulls in plan-read fields likeid,slug,max_flows, etc.).Define a dedicated request model and bind it as a body parameter.
🛠 Suggested fix
-class CheckoutRequest(PlanRead): - stripe_price_id: str - billing_cycle: str = "monthly" # "monthly" | "yearly" - - -@router.post("/orgs/{org_id}/billing/checkout") -async def create_checkout(org_id: UUID, request: Request, ctx: RequireAdmin): +class CheckoutRequest(SQLModel): + stripe_price_id: str + billing_cycle: str = "monthly" # "monthly" | "yearly" + + +@router.post("/orgs/{org_id}/billing/checkout") +async def create_checkout(org_id: UUID, body: CheckoutRequest, ctx: RequireAdmin): assert_org_match(org_id, ctx) settings = get_saas_settings() if not settings.billing_enabled: raise HTTPException(501, "Billing is not enabled on this instance.") - - body = await request.json() - price_id: str = body.get("stripe_price_id", "") - if not price_id: - raise HTTPException(400, "stripe_price_id is required.") + price_id = body.stripe_price_id(Make sure to import
SQLModelif not already available.)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/api/billing.py` around lines 85 - 126, CheckoutRequest is dead and create_checkout bypasses Pydantic validation by calling await request.json(); remove or repurpose the unused CheckoutRequest/PlanRead subclass and instead define a dedicated request model (e.g., CheckoutCreate with a stripe_price_id: str and optional billing_cycle) using SQLModel/ Pydantic, then change the create_checkout signature to accept that model as a body parameter (e.g., async def create_checkout(org_id: UUID, payload: CheckoutCreate, ctx: RequireAdmin)), validate/use payload.stripe_price_id rather than reading request.json(), and keep the downstream logic (org lookup, owner_email resolution, get_billing_service().create_checkout_session, and returned checkout_url) unchanged so OpenAPI schema and automatic 422 validation are enabled.src/backend/saas/langflow_saas/api/billing.py-15-15 (1)
15-15:⚠️ Potential issue | 🟡 MinorPipeline failure: Ruff TC003 for
UUIDimport.Ruff is configured strict for this repo and the CI fails on TC003. With
from __future__ import annotations(line 12)UUIDis used purely as a type annotation in this file, so the import can move to aTYPE_CHECKINGblock.🛠 Suggested fix
-from datetime import datetime, timezone -from uuid import UUID +from datetime import datetime, timezone +from typing import TYPE_CHECKING @@ from langflow_saas.services import get_billing_service from langflow_saas.settings import get_saas_settings + +if TYPE_CHECKING: + from uuid import UUID🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/api/billing.py` at line 15, The top-level import of UUID is causing Ruff TC003 because UUID is only used in type annotations under from __future__ import annotations; move the UUID import into a TYPE_CHECKING block: add "from typing import TYPE_CHECKING" if missing, wrap "from uuid import UUID" inside "if TYPE_CHECKING:" and leave all runtime imports unchanged so UUID is only evaluated during type checking (target the UUID symbol used in this module).src/backend/saas/langflow_saas/api/teams.py-85-110 (1)
85-110:⚠️ Potential issue | 🟡 Minor
target_user_idis taken from the query string — likely unintentional.With
target_user_id: UUIDdeclared as a non-path, non-body parameter inadd_team_member, FastAPI binds it from the query string (POST /orgs/.../members?target_user_id=...). For a state-changing endpoint, the user being added is normally placed in the body. This is also a poor fit for audit logs and tooling. Please decide explicitly viaBody(...)/a Pydantic model and document it.Additionally, this handler (and
remove_team_memberbelow) does not emit an audit log entry, even thoughteam.created/team.deleteddo — that's inconsistent with the PR's audit-log story and with the AI summary which claims add/remove are audited.🛠 Suggested API + audit fix
-@router.post("/orgs/{org_id}/teams/{team_id}/members", status_code=status.HTTP_201_CREATED) -async def add_team_member(org_id: UUID, team_id: UUID, target_user_id: UUID, ctx: RequireAdmin): +class AddTeamMemberBody(SQLModel): + user_id: UUID + + +@router.post("/orgs/{org_id}/teams/{team_id}/members", status_code=status.HTTP_201_CREATED) +async def add_team_member( + org_id: UUID, team_id: UUID, body: AddTeamMemberBody, ctx: RequireAdmin, request: Request +): assert_org_match(org_id, ctx) + target_user_id = body.user_id ... + await get_audit_service().log( + action="team.member_added", + org_id=org_id, + user_id=ctx.user_id, + resource_type="team", + resource_id=str(team_id), + ip_address=request.client.host if request.client else None, + ) return {"ok": True}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/api/teams.py` around lines 85 - 110, The add_team_member endpoint currently receives target_user_id from the query string; change the signature to accept the user ID explicitly from the request body (either using Body(...) or a small Pydantic model) so the POST payload contains the target_user_id, update any OpenAPI docs accordingly, and mirror this change for remove_team_member; additionally emit an audit log entry when adding (and removing) a TeamMember (use the same audit pattern used by team.created / team.deleted) after db.commit() so add/remove operations are recorded; relevant symbols: add_team_member, remove_team_member, TeamMember, session_scope, and the existing audit logging helper used for team.created/team.deleted.src/backend/saas/langflow_saas/middleware.py-433-455 (1)
433-455:⚠️ Potential issue | 🟡 MinorAdd explicit
DateTime(timezone=True)configuration torecorded_atcolumn for safe timezone handling.While the current code works because
recorded_at's default factory usesdatetime.now(timezone.utc), the field lacks explicit SQLAlchemy column configuration. Withoutsa_column=Column(..., DateTime(timezone=True)), there is implicit reliance on SQLModel's type inference, which creates risk if any code path later setsrecorded_atexplicitly to a naive datetime. Add explicit timezone configuration to the column definition to guarantee SQLAlchemy's proper handling of timezone-aware datetimes during storage and retrieval.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/middleware.py` around lines 433 - 455, The UsageRecord.timestamp column needs an explicit SQLAlchemy timezone-aware column definition so datetime objects are stored/retrieved with UTC awareness; update the UsageRecord model's recorded_at field to use sa_column=Column(DateTime(timezone=True), default=<timezone-aware factory>, nullable=<as-required>) and ensure you import Column and DateTime from sqlalchemy and keep the default factory using datetime.now(timezone.utc) (or equivalent) so the model no longer relies on SQLModel type inference and safely handles timezone-aware datetimes during storage and queries that reference UsageRecord.recorded_at.src/backend/saas/langflow_saas/api/members.py-131-159 (1)
131-159:⚠️ Potential issue | 🟡 Minor
remove_memberlets an admin delete themselves and orphan the org.A non-owner admin can
DELETEtheir own membership row. There’s no “self-removal” path that re-checks invariants (e.g., last admin standing, or the actor leaving the org they’re still authenticated against). Either disallowtarget_user_id == ctx.user_idhere and add a separate “leave org” endpoint, or explicitly handle the self-removal case.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/api/members.py` around lines 131 - 159, The remove_member handler currently allows an admin to delete their own membership and potentially orphan the org; update remove_member to prevent self-removal by rejecting requests where target_user_id == ctx.user_id (raise HTTPException 400 or 403) and add a TODO comment referencing a separate "leave org" endpoint, or alternatively handle self-removal explicitly by checking invariants (e.g., ensure there is at least one other owner/admin before allowing removal) before deleting; refer to remove_member, assert_org_match, and the membership.role checks to locate where to add the self-check and appropriate error response, and ensure the audit log still records the attempted action.src/backend/saas/langflow_saas/api/members.py-334-336 (1)
334-336:⚠️ Potential issue | 🟡 MinorDocstring fails Ruff D205.
Ruff is failing the build on this docstring (
D205 1 blank line required between summary line and description).🛠 Fix
- """Accept a pending invitation. Caller must be authenticated as the invited email - OR an admin can accept on behalf. For simplicity we trust the authenticated user. - """ + """Accept a pending invitation. + + Caller must be authenticated as the invited email OR an admin can accept on behalf. + For simplicity we trust the authenticated user. + """🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/api/members.py` around lines 334 - 336, The triple-quoted docstring that begins "Accept a pending invitation." is missing the required blank line between the one-line summary and the following description; update that docstring so there is one empty line after the summary (i.e., add a blank line between the first line "Accept a pending invitation." and the subsequent sentence "Caller must be authenticated as the invited email OR an admin can accept on behalf. For simplicity we trust the authenticated user.") to satisfy Ruff D205.src/backend/saas/langflow_saas/api/members.py-254-273 (1)
254-273:⚠️ Potential issue | 🟡 MinorPending invitations past
expires_atare still listed as PENDING.There’s no background task converting expired
PENDINGinvitations toEXPIRED, so this endpoint will keep returning rows thataccept_invitationwill reject with 410. Either filterexpires_at >= nowhere, or transition the status on read, so admins see an accurate “open” list.♻️ Suggested filter
async with session_scope() as db: + now = datetime.now(timezone.utc) result = await db.exec( - select(Invitation).where(Invitation.org_id == org_id, Invitation.status == InvitationStatus.PENDING) + select(Invitation).where( + Invitation.org_id == org_id, + Invitation.status == InvitationStatus.PENDING, + Invitation.expires_at >= now, + ) )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/api/members.py` around lines 254 - 273, The list_invitations endpoint is returning Invitation rows still marked PENDING even if expires_at is past; update list_invitations to only return truly open invites by filtering Invitation where Invitation.status == InvitationStatus.PENDING AND Invitation.expires_at >= datetime.utcnow() (or, alternatively, load the pending rows and for each where inv.expires_at < now set status to InvitationStatus.EXPIRED and persist before returning) so admins see only non-expired invites; refer to the list_invitations function and the Invitation model/InvitationStatus.PENDING and accept_invitation behavior when implementing the filter or transition-on-read approach.src/backend/saas/langflow_saas/api/members.py-46-56 (1)
46-56:⚠️ Potential issue | 🟡 MinorSilent
try/except/passhides bugs and trips Ruff S110.Catching
Exceptionand swallowing it makes legitimate token‑decoding errors invisible in logs, and Ruff is already failing on this hunk. Narrow the catch to the parsing errors you actually expect (ValueError) and at leastlogger.debugthe failure.🛠 Suggested fix
def _verify_token(token: str, secret: str) -> UUID | None: """Return the invitation UUID if the token is valid, else None.""" try: id_part, sig_part = token.split("_", 1) inv_id = UUID(id_part) expected = hmac.new(secret.encode(), str(inv_id).encode(), hashlib.sha256).hexdigest() if hmac.compare_digest(expected, sig_part): return inv_id - except Exception: # noqa: BLE001 - pass + except (ValueError, AttributeError) as exc: + logger.debug("Invalid invitation token: %s", exc) return None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/api/members.py` around lines 46 - 56, The _verify_token function currently swallows all exceptions; narrow the exception handling to the expected parsing errors (catch ValueError, and optionally TypeError) instead of Exception, and log the failure at debug level before returning None. Update the except block in _verify_token to catch ValueError (and TypeError if desired), call logger.debug with a short message including the token and the exception (avoid logging secrets), and then return None so errors aren't silently ignored.src/backend/saas/langflow_saas/services.py-271-281 (1)
271-281:⚠️ Potential issue | 🟡 MinorValidate Stripe secret key format to reject invalid values.
The current check only rejects empty strings. While the default is correctly empty (
SecretStr("")), a developer who manually sets an invalid placeholder in.envwould still pass validation and make failed Stripe API calls. Stripe secret keys must start withsk_test_,sk_live_,rk_test_, orrk_live_—validate against these prefixes to fail loudly on misconfiguration.🛠 Suggested fix
- key = get_saas_settings().stripe_secret_key.get_secret_value() - if not key: - raise RuntimeError("SAAS_STRIPE_SECRET_KEY is not set.") + key = get_saas_settings().stripe_secret_key.get_secret_value() + if not key or not key.startswith(("sk_test_", "sk_live_", "rk_test_", "rk_live_")): + raise RuntimeError("SAAS_STRIPE_SECRET_KEY is not configured with a valid Stripe key.")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/services.py` around lines 271 - 281, The _stripe function currently only checks for empty secret but should also validate the secret key format: after retrieving key = get_saas_settings().stripe_secret_key.get_secret_value(), strip whitespace and verify it starts with one of the valid prefixes ("sk_test_", "sk_live_", "rk_test_", "rk_live_"); if not, raise a RuntimeError with a clear message about invalid SAAS_STRIPE_SECRET_KEY format so misconfigurations fail fast; keep the existing assignment _stripe.api_key = key and return _stripe.
🧹 Nitpick comments (14)
src/backend/base/langflow/plugin_routes.py (1)
99-101: Note the call-time constraint ofadd_middleware.Starlette finalizes its middleware stack on the first request and
FastAPI.add_middlewarethen raisesRuntimeError. Sinceload_plugin_routes()is invoked at startup this is fine in practice, but a plugin that delays middleware registration to astartuphandler (or any post-boot path) will fail. Worth calling out in the docstring so plugin authors know it must be invoked synchronously fromregister().📝 Suggested docstring tweak
def add_middleware(self, middleware_class, **kwargs): - """Allow plugins to register ASGI middleware on the host app.""" + """Allow plugins to register ASGI middleware on the host app. + + Must be called synchronously from ``register()`` — Starlette finalizes + the middleware stack on the first request, after which + ``FastAPI.add_middleware`` raises ``RuntimeError``. + """ self._app.add_middleware(middleware_class, **kwargs)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/base/langflow/plugin_routes.py` around lines 99 - 101, Update the add_middleware docstring to warn that Starlette/FastAPI finalizes middleware on first request and that FastAPI.add_middleware will raise RuntimeError if called after startup; explicitly state that PluginBase.add_middleware (the add_middleware method) must be called synchronously during plugin.register() or during initial load_plugin_routes() and not from startup/post-boot handlers so plugin authors know the call-time constraint. Mention the relevant symbols add_middleware, register, and load_plugin_routes to help locate where to update the docs.src/backend/saas/langflow_saas/api/orgs.py (1)
96-106: Avoid reopeningsession_scopesolely to render the response.In both
create_org(lines 96–106) andupdate_org(lines 171–182) you commit the first session, then open a secondsession_scopewhose only job is to satisfy_org_to_read'sdbparameter so it can queryPlan. Two sessions per write is wasteful; load the plan inside the first session before exiting theasync withblock, or have_org_to_readopen its own short-lived session.♻️ Sketch — fold the read into the existing session
async with session_scope() as db: ... await db.commit() await db.refresh(org) + result = await _org_to_read(org, OrgRole.OWNER, db) await get_audit_service().log(...) - - async with session_scope() as db: - return await _org_to_read(org, OrgRole.OWNER, db) + return resultAlso applies to: 171-182
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/api/orgs.py` around lines 96 - 106, The code reopens session_scope just to satisfy _org_to_read(db) after committing; instead, load the Plan (and any other data _org_to_read needs) inside the original async with session_scope block in create_org and update_org and pass those objects to _org_to_read (or call _org_to_read while the original session is still open), eliminating the second session_scope; alternatively modify _org_to_read to accept a db=None and open its own short-lived session when db is not provided—update references to session_scope, create_org, update_org, _org_to_read, and Plan accordingly so only one session is used per request.src/backend/saas/langflow_saas/settings.py (1)
113-120: Preferfunctools.lru_cacheover a hand-rolled module global.The hand-rolled singleton is correct but
@lru_cache(maxsize=1)is the idiomatic, thread-safe equivalent and removes theglobalstatement.🛠 Suggested refactor
-_settings: SaaSSettings | None = None - - -def get_saas_settings() -> SaaSSettings: - global _settings - if _settings is None: - _settings = SaaSSettings() - return _settings +from functools import lru_cache + + +@lru_cache(maxsize=1) +def get_saas_settings() -> SaaSSettings: + return SaaSSettings()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/settings.py` around lines 113 - 120, Replace the hand-rolled module-global singleton for SaaSSettings with functools.lru_cache: remove the _settings global and the manual instantiation in get_saas_settings, and instead decorate get_saas_settings() with `@functools.lru_cache`(maxsize=1) so it returns a cached SaaSSettings() instance; keep the function name get_saas_settings and the SaaSSettings constructor call but eliminate the global and any explicit None checks to make the singleton idiomatic and thread-safe.src/backend/saas/langflow_saas/dependencies.py (1)
17-17: MoveUUIDimport underTYPE_CHECKINGto resolve Ruff TC003.Because
from __future__ import annotations(line 14) is in effect, the only use ofUUIDat line 78 is treated as a string annotation, so it can move into aTYPE_CHECKINGblock to satisfy ruff and avoid a runtime import.🛠 Suggested fix
from typing import Annotated -from uuid import UUID +from typing import TYPE_CHECKING from fastapi import Depends, HTTPException, Request, status from langflow_saas.middleware import OrgContextData from langflow_saas.models import OrgRole +if TYPE_CHECKING: + from uuid import UUID🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/dependencies.py` at line 17, Move the UUID import into a TYPE_CHECKING block: remove the top-level "from uuid import UUID" and add "from typing import TYPE_CHECKING" if not present, then under "if TYPE_CHECKING:" add "from uuid import UUID" so the UUID symbol is only imported for type-checking (it is referenced as a string annotation around line 78); ensure existing "__future__ import annotations" remains unchanged.src/backend/saas/langflow_saas/models.py (1)
119-122:__table_args__placement inconsistent across SQLModel classes.In
UserOrganizationandTeamMember,__table_args__is declared before the fields, while inTeam,UsageRecord, andAuditLogit is declared after. SQLModel handles both, but the inconsistency is a readability/maintenance papercut. Consider standardizing on one placement.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/models.py` around lines 119 - 122, The __table_args__ attribute is inconsistently placed across SQLModel classes (UserOrganization, TeamMember vs Team, UsageRecord, AuditLog); pick one standard location (e.g., place __table_args__ after the model field declarations) and move the declarations in UserOrganization and TeamMember to match the others (or vice versa if you prefer before fields) so all classes use the same placement for __table_args__ to improve readability and maintenance.src/backend/saas/langflow_saas/api/billing.py (1)
239-256: Validate pagination inputs and bound offset.
limit = -100slips throughmin(limit, 500)(returns-100), which on most engines either errors or returns nothing.offsetis unbounded and not validated; very large values force expensive sequential scans.- The endpoint silently caps
limitat 500 — clients get no signal.Use FastAPI's
Query(..., ge=1, le=500)/Query(..., ge=0)for declarative validation.🛠 Suggested fix
-from fastapi import APIRouter, HTTPException, Request, status +from fastapi import APIRouter, HTTPException, Query, Request, status @@ -@router.get("/audit") -async def get_audit_log( - ctx: RequireAdmin, - limit: int = 100, - offset: int = 0, -): +@router.get("/audit") +async def get_audit_log( + ctx: RequireAdmin, + limit: int = Query(100, ge=1, le=500), + offset: int = Query(0, ge=0, le=100_000), +): @@ - .limit(min(limit, 500)) + .limit(limit)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/api/billing.py` around lines 239 - 256, The get_audit_log endpoint accepts unvalidated pagination parameters (limit, offset) allowing negative or huge values; update the function signature to use FastAPI's Query to enforce bounds (e.g., limit: int = Query(100, ge=1, le=500) and offset: int = Query(0, ge=0)), remove the manual min(limit, 500) usage in the DB query, and ensure the select(AuditLog)...offset(offset).limit(limit) call uses the validated values so clients receive declarative validation errors instead of silent behavior; adjust any imports to include Query and keep session_scope and AuditLog usage unchanged.src/backend/saas/langflow_saas/migrations/env.py (1)
46-57:include_objectmay incorrectly include indexes/constraints from non-SaaS tables in autogenerate.When Alembic autogenerates against a DB that already contains Langflow tables, reflected indexes/constraints attached to non-
saas_tables fall through to thehasattr(obj, "table")branch and are correctly excluded. However, the earlyreturn Truefallback on line 57 will accept any non-table object that lacks a.tableattribute (e.g. schemas). For an additive plugin that only ever ownssaas_*objects, returningFalseas the default is safer.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/migrations/env.py` around lines 46 - 57, The include_object function currently defaults to returning True for non-table objects which can mistakenly include non-saas schemas or other DB objects; update include_object (signature: include_object(obj, name, type_, reflected, compare_to)) so that after checking type_ == "table" and the hasattr(obj, "table") branch it returns False instead of True, ensuring only objects belonging to tables starting with "saas_" are included by autogenerate.src/backend/saas/langflow_saas/migrations/versions/001_saas_foundation.py (1)
220-220: Redundant unique constraint + explicit unique index.
org_id(line 220) andstripe_subscription_id(line 229) are declared withunique=Trueon the column, which already creates an implicit unique constraint. Thenop.create_index("ix_saas_subscription_org", ..., ["org_id"])andix_saas_subscription_stripecreate non-unique indexes for the same columns. Most engines will then maintain two indexes per column. Drop the explicitcreate_indexcalls (or passunique=Trueand remove the column-levelunique=True) to avoid the duplicate.Also applies to: 229-229
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/migrations/versions/001_saas_foundation.py` at line 220, The migration declares org_id and stripe_subscription_id columns with unique=True, and then calls op.create_index for "ix_saas_subscription_org" and "ix_saas_subscription_stripe", causing duplicate indexes; to fix, remove the redundant op.create_index(...) calls for these columns (or alternatively remove column-level unique=True and make the op.create_index(..., unique=True) instead) so only a single unique constraint/index is created for org_id and stripe_subscription_id; update the migration to keep one approach (prefer removing the two op.create_index calls referencing org_id and stripe_subscription_id).src/backend/saas/langflow_saas/middleware.py (1)
350-360: Dead computation for the per-requestrpm_limitfallback.rpm_limit = settings.default_max_executions_per_day // (60 * 24) # = 1000 // 1440 = 0 rpm_limit = max(rpm_limit, settings.rate_limit_default_rpm) # always = 60The first line is always dominated by the second (default 60 ≫ 0), so it's confusing dead arithmetic. Simplify to just use
settings.rate_limit_default_rpmand document any intended relationship between daily quota and per-minute limit elsewhere.🛠 Suggested fix
- plan_slug = "free" - rpm_limit = settings.default_max_executions_per_day // (60 * 24) # crude default - rpm_limit = max(rpm_limit, settings.rate_limit_default_rpm) + plan_slug = "free" + rpm_limit = settings.rate_limit_default_rpm🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/middleware.py` around lines 350 - 360, The per-request rpm_limit calculation is dead code because settings.default_max_executions_per_day // (60 * 24) yields 0 and is always overridden by settings.rate_limit_default_rpm; replace the two-line fallback with a single assignment rpm_limit = settings.rate_limit_default_rpm (keep the org.plan_id lookup and plan.rpm_limit override as-is), and add a short comment noting that any mapping from daily quota (settings.default_max_executions_per_day) to per-minute limits should be handled/configured elsewhere if desired; update references to rpm_limit, settings.default_max_executions_per_day, settings.rate_limit_default_rpm, org.plan_id, and Plan accordingly.src/backend/saas/langflow_saas/api/teams.py (1)
40-49: TOCTOU on uniqueness checks; rely on the DB constraint.
create_teamandadd_team_memberfirstSELECTto check for an existing row, thenINSERT. Two concurrent requests can both pass the check and one will then fail with anIntegrityError(which becomes a 500 instead of the intended 409). CatchIntegrityErroraftercommit()and translate it.🛠 Suggested pattern
from sqlalchemy.exc import IntegrityError try: db.add(team) await db.commit() except IntegrityError as e: await db.rollback() raise HTTPException(409, "...") from eAlso applies to: 90-108
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/api/teams.py` around lines 40 - 49, The TOCTOU occurs in create_team (and add_team_member) where you SELECT before INSERT; update the code to catch sqlalchemy.exc.IntegrityError around the db.add(...) / await db.commit() sequence inside the session_scope() so a conflicting insert becomes a 409 instead of a 500: wrap db.add(team) and await db.commit() in a try/except IntegrityError block, call await db.rollback() in the except, and raise HTTPException(409, ...) (you may keep the pre-check SELECT for a fast-fail but must still handle IntegrityError after db.commit to cover concurrent requests). Ensure you reference Team, create_team, add_team_member, session_scope, db.commit, db.rollback, IntegrityError and HTTPException in the change.src/backend/saas/langflow_saas/api/members.py (1)
184-185: Avoid materializing all members just to count them.
len(count_result.all())loads every membership row into memory. For larger orgs on shared deployments this is wasteful (and on hot invite paths defeats indexing). Use aSELECT count(*).♻️ Proposed refactor
- count_result = await db.exec(select(UserOrganization).where(UserOrganization.org_id == org_id)) - member_count = len(count_result.all()) + from sqlalchemy import func + count_result = await db.exec( + select(func.count()).select_from(UserOrganization).where(UserOrganization.org_id == org_id) + ) + member_count = count_result.one() or 0🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/api/members.py` around lines 184 - 185, Replace the current approach that materializes all membership rows (count_result = await db.exec(select(UserOrganization).where(UserOrganization.org_id == org_id)); member_count = len(count_result.all())) with a proper COUNT query: use SQLAlchemy's func.count() in a select over UserOrganization with the same where clause, execute it (e.g., via db.exec(select(func.count()).where(UserOrganization.org_id == org_id))) and read the numeric result with scalar_one() or scalar_one_or_none(); ensure you import func from sqlalchemy so member_count becomes the integer count without loading all rows into memory.src/backend/saas/langflow_saas/services.py (3)
50-70: Audit logger swallows the exception details.
logger.warning(...)is called withoutexc_info=True, so when an audit write fails (DB down, model mismatch, etc.) you’ll see only"Failed to write audit log"with no stack trace, making operational debugging hard. Audit failures already “must never surface to callers” — at least preserve the traceback in the log.♻️ Suggested fix
- except Exception: # noqa: BLE001 - # Audit failures must never surface to callers. - logger.warning("Failed to write audit log: action=%s org=%s user=%s", action, org_id, user_id) + except Exception: # noqa: BLE001 + # Audit failures must never surface to callers. + logger.exception( + "Failed to write audit log: action=%s org=%s user=%s", action, org_id, user_id + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/services.py` around lines 50 - 70, The exception handler in the audit write block swallows error details; update the except block in the code that creates AuditLog and uses session_scope (the async with block surrounding db.add(entry) and await db.commit()) to log the full traceback by passing exc_info=True to logger.warning or by using logger.exception, so the log retains the stack trace while still not re-raising the exception.
346-373: Webhook handler has no event-id deduplication.Stripe will retry webhooks aggressively (up to 3 days).
_upsert_subscriptionis mostly idempotent, but_on_payment_failedflips status toPAST_DUEand updatesupdated_aton every redelivery; a latecustomer.subscription.updatedevent arriving after a newer one will also cause state to flap because there’s no ordering check onevent["created"].Persist processed
event["id"](or compare againststripe_sub["latest_invoice"]/event["created"]) and short-circuit duplicates. This is operationally important for billing correctness.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/services.py` around lines 346 - 373, The webhook handler handle_webhook needs deduplication: before dispatching to handlers like _on_payment_failed and _on_subscription_updated, extract event_id = event["id"] and event_ts = event["created"] and check/persist it (e.g., in a processed_events table or by comparing against stripe_sub["latest_invoice"] / stored subscription.updated_at) to short-circuit duplicates or out‑of‑order older events; if already processed or older than the subscription record, return {"processed": False, ...}; if not processed, mark the event_id as processed atomically (or use a DB upsert with unique constraint) and then call _upsert_subscription/_on_payment_failed so repeated deliveries don’t flip status or update updated_at incorrectly.
197-238: Reuse module-levelhttpx.AsyncClientto avoid connection overhead and add retry logic for transient failures.Each
send_raw()creates and closes a newhttpx.AsyncClient(), opening a fresh TLS connection without pooling or retries on 5xx/429 responses. For invitation and quota-warning bursts, this adds unnecessary latency.Create a module-level
httpx.AsyncClient(initialized lazily in the singleton) and wire cleanup into the app's lifespan context. Add a small bounded retry on transient failures usinghttpx.AsyncClient(..., limits=...)and request retry logic.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/saas/langflow_saas/services.py` around lines 197 - 238, Both send_raw implementations open a new httpx.AsyncClient per call causing TLS/connection overhead and lack retries; replace that by creating a lazily-initialized module-level AsyncClient (e.g., _httpx_client created by init_httpx_client()) with appropriate limits and timeout settings and reuse it inside the send_raw methods on both the SendGrid-related send_raw and ResendEmailService.send_raw (classes inheriting BaseEmailService). Add a small bounded retry loop around the client.post (retry on network errors, 5xx and 429 with backoff and max attempts) and expose a close_httpx_client() to be invoked from the application lifespan shutdown to cleanly close the client. Ensure the send_raw implementations reference the shared _httpx_client and do not construct new AsyncClient instances.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ab132c9d-0089-4805-bc2b-aac3af3e12d8
📒 Files selected for processing (19)
src/backend/base/langflow/plugin_routes.pysrc/backend/saas/alembic.inisrc/backend/saas/langflow_saas/__init__.pysrc/backend/saas/langflow_saas/api/__init__.pysrc/backend/saas/langflow_saas/api/billing.pysrc/backend/saas/langflow_saas/api/members.pysrc/backend/saas/langflow_saas/api/orgs.pysrc/backend/saas/langflow_saas/api/router.pysrc/backend/saas/langflow_saas/api/teams.pysrc/backend/saas/langflow_saas/dependencies.pysrc/backend/saas/langflow_saas/middleware.pysrc/backend/saas/langflow_saas/migrations/env.pysrc/backend/saas/langflow_saas/migrations/script.py.makosrc/backend/saas/langflow_saas/migrations/versions/001_saas_foundation.pysrc/backend/saas/langflow_saas/models.pysrc/backend/saas/langflow_saas/plugin.pysrc/backend/saas/langflow_saas/services.pysrc/backend/saas/langflow_saas/settings.pysrc/backend/saas/pyproject.toml
| @router.post("/billing/webhook", status_code=status.HTTP_200_OK) | ||
| async def stripe_webhook(request: Request): | ||
| settings = get_saas_settings() | ||
| if not settings.billing_enabled: | ||
| raise HTTPException(501, "Billing not enabled.") | ||
|
|
||
| payload = await request.body() | ||
| sig_header = request.headers.get("stripe-signature", "") | ||
|
|
||
| try: | ||
| result = await get_billing_service().handle_webhook(payload=payload, sig_header=sig_header) | ||
| except Exception as exc: | ||
| raise HTTPException(400, f"Webhook processing failed: {exc}") from exc | ||
|
|
||
| return result |
There was a problem hiding this comment.
Stripe webhook lacks idempotency handling and leaks raw exception messages.
Stripe is documented to deliver events at-least-once and recommends idempotent processing keyed by event.id. The current handler dispatches every payload to BillingService.handle_webhook without any seen-event check, which can cause duplicate Subscription upserts / audit entries. Additionally, surfacing f"Webhook processing failed: {exc}" in the response leaks internal error details to Stripe (and any attacker that can probe the endpoint). Log the exception server-side and return a generic 400.
🛠 Suggested fix
- try:
- result = await get_billing_service().handle_webhook(payload=payload, sig_header=sig_header)
- except Exception as exc:
- raise HTTPException(400, f"Webhook processing failed: {exc}") from exc
+ try:
+ result = await get_billing_service().handle_webhook(payload=payload, sig_header=sig_header)
+ except Exception as exc:
+ logger.exception("Stripe webhook processing failed")
+ raise HTTPException(400, "Webhook processing failed.") from excIdempotency can be added inside handle_webhook by recording processed event.ids in a small dedicated table and short-circuiting on duplicates.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/saas/langflow_saas/api/billing.py` around lines 217 - 231, The
stripe_webhook endpoint should stop returning raw exception text and should
enforce idempotency by event.id; update stripe_webhook and
BillingService.handle_webhook so that handle_webhook first parses the Stripe
event id, checks a dedicated "processed_events" table/record keyed by event.id
and short-circuits (returning 200) if seen, otherwise records the id before
performing Subscription upserts/audits, and on any exception log the full
exception server-side (use the existing logger) and raise a generic
HTTPException(400, "Webhook processing failed") from stripe_webhook instead of
f-stringing the exception; reference functions: stripe_webhook,
get_billing_service().handle_webhook, and BillingService.handle_webhook when
adding the idempotency check and persistent processed-events recording.
| async with session_scope() as db: | ||
| # Check member cap. | ||
| org_result = await db.exec(select(Organization).where(Organization.id == org_id)) | ||
| org = org_result.first() | ||
|
|
||
| count_result = await db.exec(select(UserOrganization).where(UserOrganization.org_id == org_id)) | ||
| member_count = len(count_result.all()) | ||
| if org and member_count >= settings.default_max_members: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_402_PAYMENT_REQUIRED, | ||
| detail=f"Member limit ({settings.default_max_members}) reached. " | ||
| "Upgrade your plan to invite more members.", | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify Plan model exposes a per-plan member cap field.
rg -nP -C2 '\b(max_members|member_limit|seats?)\b'
ast-grep --pattern 'class Plan(SQLModel, $$$):
$$$
'Repository: langflow-ai/langflow
Length of output: 4568
🏁 Script executed:
# View the actual code in members.py around lines 179-191
sed -n '170,200p' src/backend/saas/langflow_saas/api/members.py | cat -nRepository: langflow-ai/langflow
Length of output: 1570
🏁 Script executed:
# Check the Organization model for plan_id field
rg -nA10 'class Organization\(SQLModel' src/backend/saas/langflow_saas/models.py | head -30Repository: langflow-ai/langflow
Length of output: 628
🏁 Script executed:
# View the Plan model definition in full
sed -n '62,90p' src/backend/saas/langflow_saas/models.py | cat -nRepository: langflow-ai/langflow
Length of output: 1626
Use the organization's plan member limit, not the global default.
The code applies settings.default_max_members to every organization regardless of their tier. Since Organization.plan_id resolves to a Plan with a per-plan max_members field, this should be used instead. Enterprise customers will incorrectly hit the Free tier's 5-member cap.
Fetch the related Plan via org.plan_id and enforce plan.max_members, falling back to settings.default_max_members if the plan is absent.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/saas/langflow_saas/api/members.py` around lines 179 - 191, The
member-cap check currently uses settings.default_max_members for all orgs;
update the logic inside the async with session_scope() block to fetch the org's
plan via Organization.plan_id (load the Plan model by that id), read
plan.max_members and use that limit when comparing against the UserOrganization
count, falling back to settings.default_max_members if no plan or max_members is
present; keep raising the same HTTPException with the limit value substituted
from plan.max_members (or the fallback) and leave the rest of the flow
(session_scope, Organization, UserOrganization) unchanged.
| invitation = Invitation( | ||
| org_id=org_id, | ||
| email=body.email, | ||
| role=body.role, | ||
| invited_by=ctx.user_id, | ||
| expires_at=expires_at, | ||
| token_hash="", # filled below after we have the id | ||
| ) | ||
| db.add(invitation) | ||
| await db.flush() | ||
|
|
||
| token = _make_token(invitation.id, settings.invitation_secret.get_secret_value()) | ||
| invitation.token_hash = hashlib.sha256(token.encode()).hexdigest() | ||
| db.add(invitation) | ||
| await db.commit() |
There was a problem hiding this comment.
Critical: Empty token_hash placeholder breaks the unique constraint on concurrent invites.
Invitation.token_hash is declared unique=True (see models.py snippet, line 186). Here you insert with token_hash="" and only fill the real hash after db.flush() / db.commit(). Two invites being created concurrently — or even sequentially within the same uncommitted unit of work in some setups — will both try to insert "" and the second one will fail with a unique‑constraint violation (and on most DBs, once "" exists, no other invite can be flushed until the first commits).
Generate the token from a pre‑allocated UUID before the first db.add() so the row is inserted with its final, unique hash.
🛠 Suggested fix
+ from uuid import uuid4
+
+ invitation_id = uuid4()
+ token = _make_token(invitation_id, settings.invitation_secret.get_secret_value())
+ token_hash = hashlib.sha256(token.encode()).hexdigest()
+
expires_at = datetime.now(timezone.utc) + timedelta(hours=settings.invitation_expire_hours)
invitation = Invitation(
+ id=invitation_id,
org_id=org_id,
email=body.email,
role=body.role,
invited_by=ctx.user_id,
expires_at=expires_at,
- token_hash="", # filled below after we have the id
+ token_hash=token_hash,
)
db.add(invitation)
- await db.flush()
-
- token = _make_token(invitation.id, settings.invitation_secret.get_secret_value())
- invitation.token_hash = hashlib.sha256(token.encode()).hexdigest()
- db.add(invitation)
await db.commit()
await db.refresh(invitation)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/saas/langflow_saas/api/members.py` around lines 206 - 220, Create
the invitation token before inserting the Invitation row so token_hash is set
atomically and cannot collide: generate a random token (e.g. uuid.uuid4().hex),
compute its SHA256 hash, set Invitation.token_hash to that hash when
constructing the Invitation, then db.add(invitation) / await db.flush()/commit;
return or keep the plain token for sending. Replace the current flow that calls
_make_token(invitation.id, ...) after flush with pre-generated token/hash (or
adapt _make_token to accept a precomputed nonce) so Invitation.token_hash is
never the empty string and unique constraints cannot fail.
| @router.post("/invitations/{token}/accept", status_code=status.HTTP_200_OK) | ||
| async def accept_invitation(token: str, ctx: CurrentOrgContext, request: Request): | ||
| """Accept a pending invitation. Caller must be authenticated as the invited email | ||
| OR an admin can accept on behalf. For simplicity we trust the authenticated user. | ||
| """ | ||
| settings = get_saas_settings() | ||
| inv_id = _verify_token(token, settings.invitation_secret.get_secret_value()) | ||
| if not inv_id: | ||
| raise HTTPException(400, "Invalid invitation token.") |
There was a problem hiding this comment.
Critical: Authenticated user can accept any invitation if they hold the token.
accept_invitation does not verify that ctx’s user identity (or email) matches inv.email. Combined with the public get_invitation_info endpoint that reveals the invitee’s email, anyone with a leaked/guessed token (HMAC, but still a bearer token in URLs that are commonly logged in proxy/access logs, browser history, and referer headers) can join the org under the invited role — including ADMIN.
The docstring explicitly notes “For simplicity we trust the authenticated user,” but this is a real privilege‑escalation vector, not a stylistic shortcut. At minimum, require the authenticated user's email to equal inv.email (case‑insensitive), and consider issuing tokens that are returned only to the recipient (e.g., not echoed by the public info endpoint).
🛡️ Suggested email match check
async with session_scope() as db:
result = await db.exec(select(Invitation).where(Invitation.id == inv_id))
inv = result.first()
if not inv:
raise HTTPException(404, "Invitation not found.")
if inv.status != InvitationStatus.PENDING:
raise HTTPException(410, f"Invitation is {inv.status.value}.")
if inv.expires_at < datetime.now(timezone.utc):
raise HTTPException(410, "Invitation has expired.")
+
+ # The authenticated caller must be the invited recipient.
+ caller_email = (getattr(ctx, "email", "") or "").strip().lower()
+ if not caller_email or caller_email != inv.email.strip().lower():
+ raise HTTPException(403, "This invitation was issued to a different email.")🧰 Tools
🪛 GitHub Check: Ruff Style Check (3.13)
[failure] 334-336: Ruff (D205)
src/backend/saas/langflow_saas/api/members.py:334:5: D205 1 blank line required between summary line and description
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/saas/langflow_saas/api/members.py` around lines 332 - 340,
accept_invitation currently trusts possession of a token and never checks that
the authenticated user (ctx) matches the invited email; fetch the invitation
record after _verify_token (e.g., via the existing invitation retrieval function
or model using inv_id), compare the authenticated user's email (ctx.user.email
or ctx.email) to inv.email case‑insensitively, and if they do not match then
reject with HTTP 403 unless the caller has an admin role in ctx (allow admins to
accept on behalf). Ensure the check occurs before any organization membership
creation or role assignment so a leaked token cannot be used to escalate
privileges.
| @router.delete("/{org_id}", status_code=status.HTTP_204_NO_CONTENT) | ||
| async def delete_org(org_id: UUID, ctx: RequireOwner, request: Request): | ||
| assert_org_match(org_id, ctx) | ||
| from langflow.services.deps import session_scope | ||
|
|
||
| async with session_scope() as db: | ||
| org_result = await db.exec(select(Organization).where(Organization.id == org_id)) | ||
| org = org_result.first() | ||
| if not org: | ||
| raise HTTPException(status_code=404, detail="Organization not found.") | ||
| if org.is_personal: | ||
| raise HTTPException(status_code=400, detail="Personal organizations cannot be deleted.") | ||
|
|
||
| await db.delete(org) | ||
| await db.commit() | ||
|
|
||
| await get_audit_service().log( | ||
| action="org.deleted", | ||
| org_id=org_id, | ||
| user_id=ctx.user_id, | ||
| resource_type="organization", | ||
| resource_id=str(org_id), | ||
| ip_address=request.client.host if request.client else None, | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Locate FK definitions targeting saas_organization to verify cascade behaviour.
fd -e py . src/backend/saas | xargs rg -nP -C2 '(ForeignKey|foreign_key).*("organization|saas_organization|Organization\.id)'
# Inspect Organization model relationships and cascade options.
rg -nP -C5 'class Organization\b' src/backend/saasRepository: langflow-ai/langflow
Length of output: 7189
🏁 Script executed:
# Check Organization model for is_active field
rg -nP -C10 'class Organization\b' src/backend/saas/langflow_saas/models.py | head -50Repository: langflow-ai/langflow
Length of output: 1064
🏁 Script executed:
# Check list_orgs function at line 121
sed -n '110,135p' src/backend/saas/langflow_saas/api/orgs.pyRepository: langflow-ai/langflow
Length of output: 1122
🏁 Script executed:
# Look for all uses of is_active in the Organization context
rg -n 'is_active' src/backend/saas/langflow_saas/Repository: langflow-ai/langflow
Length of output: 1546
delete_org performs a hard DELETE that contradicts the soft-delete architecture.
Organization carries an is_active flag filtered throughout the codebase (list_orgs, middleware). Hard-deleting via db.delete(org) will cascade-delete UserOrganization, Team, UsageRecord, and Subscription rows, while AuditLog rows are preserved (org_id set to NULL). However, the presence of the is_active pattern indicates the design intent is soft-delete, not hard-delete.
Recommend converting this endpoint to mark is_active = False:
🛡️ Suggested change to soft-delete
- await db.delete(org)
- await db.commit()
+ org.is_active = False
+ org.updated_at = datetime.now(timezone.utc)
+ db.add(org)
+ await db.commit()This aligns with the architectural pattern used in list_orgs and middleware, ensuring inactive orgs are filtered consistently across all operations. If hard-delete is genuinely intended, document the cascade behavior and confirm it aligns with product requirements.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/saas/langflow_saas/api/orgs.py` around lines 185 - 208, The
delete_org endpoint currently hard-deletes the Organization via db.delete(org),
which breaks the app's soft-delete pattern (Organization.is_active used by
list_orgs and middleware) and causes cascading removals; change delete_org to
set org.is_active = False (and optionally set a deactivated_at timestamp if
present) and then await db.add(org) / await db.commit() instead of deleting;
keep the existing audit log call (get_audit_service().log) but ensure it records
the deactivation action (e.g., action="org.deactivated") and still references
org_id and user_id so the soft-deleted org remains traceable; touch the same
symbols: delete_org, Organization, session_scope, and get_audit_service() when
making the change.
| async def send_invitation( | ||
| self, | ||
| *, | ||
| to_email: str, | ||
| org_name: str, | ||
| inviter_name: str, | ||
| role: str, | ||
| accept_url: str, | ||
| expire_hours: int, | ||
| ) -> None: | ||
| subject = f"You've been invited to join {org_name} on Langflow" | ||
| html = f""" | ||
| <h2>You're invited!</h2> | ||
| <p><b>{inviter_name}</b> has invited you to join <b>{org_name}</b> | ||
| as a <b>{role}</b>.</p> | ||
| <p>This invitation expires in {expire_hours} hours.</p> | ||
| <p><a href="{accept_url}" style="padding:10px 20px;background:#4f46e5;color:white; | ||
| text-decoration:none;border-radius:4px">Accept Invitation</a></p> | ||
| <p>Or copy this URL: {accept_url}</p> | ||
| """ | ||
| text = ( | ||
| f"{inviter_name} invited you to join {org_name} as {role}.\n" | ||
| f"Accept here: {accept_url}\n" | ||
| f"Expires in {expire_hours} hours." | ||
| ) | ||
| await self.send_raw(to=to_email, subject=subject, html=html, text=text) | ||
|
|
||
| async def send_password_reset(self, *, to_email: str, reset_url: str, expire_hours: int) -> None: | ||
| subject = "Reset your Langflow password" | ||
| html = f""" | ||
| <h2>Password Reset Request</h2> | ||
| <p>We received a request to reset your password.</p> | ||
| <p>This link expires in {expire_hours} hours.</p> | ||
| <p><a href="{reset_url}" style="padding:10px 20px;background:#4f46e5;color:white; | ||
| text-decoration:none;border-radius:4px">Reset Password</a></p> | ||
| <p>If you didn't request this, ignore this email.</p> | ||
| """ | ||
| text = f"Reset your password: {reset_url}\nExpires in {expire_hours} hours." | ||
| await self.send_raw(to=to_email, subject=subject, html=html, text=text) | ||
|
|
||
| async def send_quota_warning(self, *, to_email: str, org_name: str, metric: str, used: int, limit: int) -> None: | ||
| pct = int(used / limit * 100) if limit else 0 | ||
| subject = f"[{org_name}] Usage alert: {pct}% of {metric} quota used" | ||
| html = f""" | ||
| <h2>Usage Alert for {org_name}</h2> | ||
| <p>Your organization has used <b>{used}/{limit}</b> ({pct}%) of its | ||
| daily <b>{metric}</b> quota.</p> | ||
| <p>Upgrade your plan to increase your limits.</p> | ||
| """ | ||
| text = f"[{org_name}] {metric}: {used}/{limit} ({pct}%) used." | ||
| await self.send_raw(to=to_email, subject=subject, html=html, text=text) |
There was a problem hiding this comment.
Critical: HTML email templates are vulnerable to injection.
org_name, inviter_name, role, and accept_url are interpolated directly into the HTML body via f-strings. org_name and inviter_name are user-controlled (org creators, invited users), and accept_url is built from app_base_url. A malicious org name like </p><script>…</script> or " onmouseover="… will be rendered verbatim by HTML-capable mail clients. Even with a sanitized accept_url, an org_name of "><a href="https://attacker"> allows phishing inside an otherwise-trusted email.
Escape every interpolated value with html.escape(..., quote=True) (and validate accept_url is an absolute http(s) URL), or switch to a real template engine with autoescape (jinja2.Environment(autoescape=True)).
🛡️ Minimal fix
+import html as _html
...
async def send_invitation(
self, *, to_email: str, org_name: str, inviter_name: str,
role: str, accept_url: str, expire_hours: int,
) -> None:
- subject = f"You've been invited to join {org_name} on Langflow"
- html = f"""
- <h2>You're invited!</h2>
- <p><b>{inviter_name}</b> has invited you to join <b>{org_name}</b>
- as a <b>{role}</b>.</p>
+ e = _html.escape
+ subject = f"You've been invited to join {org_name} on Langflow"
+ html_body = f"""
+ <h2>You're invited!</h2>
+ <p><b>{e(inviter_name)}</b> has invited you to join <b>{e(org_name)}</b>
+ as a <b>{e(role)}</b>.</p>
<p>This invitation expires in {expire_hours} hours.</p>
- <p><a href="{accept_url}" ...>Accept Invitation</a></p>
- <p>Or copy this URL: {accept_url}</p>
+ <p><a href="{e(accept_url, quote=True)}" ...>Accept Invitation</a></p>
+ <p>Or copy this URL: {e(accept_url)}</p>
"""🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/saas/langflow_saas/services.py` around lines 94 - 144, The HTML
templates in send_invitation (and similarly send_password_reset and
send_quota_warning) directly interpolate user-controlled values (org_name,
inviter_name, role, accept_url) which allows HTML/JS injection; fix by escaping
all interpolated values with html.escape(..., quote=True) before composing the
html string (e.g., escape org_name, inviter_name, role, and the visible
accept_url text) and separately validate/sanitize accept_url to ensure it is an
absolute http(s) URL (or only include it as a plain text link in the text body);
update send_invitation, send_password_reset, and send_quota_warning to use the
escaped values when calling send_raw.
| async def get_or_create_customer(self, *, org_id: UUID, org_name: str, email: str) -> str: | ||
| """Return Stripe customer_id, creating one if absent.""" | ||
| import asyncio | ||
|
|
||
| from langflow.services.deps import session_scope | ||
| from sqlmodel import select | ||
|
|
||
| from langflow_saas.models import Organization | ||
|
|
||
| async with session_scope() as db: | ||
| result = await db.exec(select(Organization).where(Organization.id == org_id)) | ||
| org = result.first() | ||
| if org and org.stripe_customer_id: | ||
| return org.stripe_customer_id | ||
|
|
||
| # Create new Stripe customer. | ||
| stripe = self._stripe() | ||
| customer = await asyncio.to_thread( | ||
| stripe.Customer.create, | ||
| name=org_name, | ||
| email=email, | ||
| metadata={"langflow_org_id": str(org_id)}, | ||
| ) | ||
| customer_id: str = customer["id"] | ||
|
|
||
| # Persist the customer_id. | ||
| async with session_scope() as db: | ||
| result = await db.exec(select(Organization).where(Organization.id == org_id)) | ||
| org = result.first() | ||
| if org: | ||
| org.stripe_customer_id = customer_id | ||
| org.updated_at = datetime.now(timezone.utc) | ||
| db.add(org) | ||
| await db.commit() | ||
|
|
||
| return customer_id |
There was a problem hiding this comment.
Race condition can create duplicate Stripe customers.
The function reads org.stripe_customer_id, exits the session, calls Stripe to create a customer, then re-reads + writes the column. Two concurrent callers (e.g., user clicks “Subscribe” twice, or a checkout + webhook race) will both observe None, both create a Stripe customer, and only the second commit() wins — leaving an orphaned customer in Stripe and the org row potentially pointing at the wrong one. There is no row-level lock or unique constraint guarding this.
Either acquire SELECT … FOR UPDATE (or use Stripe-Idempotency-Key keyed on org_id) before calling Customer.create, or persist via INSERT … ON CONFLICT DO NOTHING and re-read.
🛡️ Mitigation: Stripe idempotency key
- customer = await asyncio.to_thread(
- stripe.Customer.create,
- name=org_name,
- email=email,
- metadata={"langflow_org_id": str(org_id)},
- )
+ customer = await asyncio.to_thread(
+ stripe.Customer.create,
+ name=org_name,
+ email=email,
+ metadata={"langflow_org_id": str(org_id)},
+ idempotency_key=f"org-customer-{org_id}",
+ )Plus a re-check of org.stripe_customer_id under the second session before overwriting.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/saas/langflow_saas/services.py` around lines 283 - 318,
get_or_create_customer has a race where two callers can both see no
stripe_customer_id, each call Stripe.Customer.create, and one overwrite leaves
an orphaned Stripe customer; fix by acquiring a row lock or using Stripe
idempotency plus a guarded re-check before write: inside get_or_create_customer,
when loading Organization (select Organization where Organization.id == org_id)
do a SELECT ... FOR UPDATE (or otherwise lock the row via your
session_scope/ORM) before calling stripe.Customer.create so only one creator
proceeds, or alternatively pass a deterministic Stripe-Idempotency-Key based on
org_id to stripe.Customer.create and then, after the threaded create returns,
open a new session and re-query Organization to check org.stripe_customer_id and
only set/commit if it is still empty (do not blindly overwrite). Ensure the code
references the same Organization model and the get_or_create_customer method and
updates org.stripe_customer_id and commits only under the lock or after the
re-check.
| customer_id: str = stripe_sub["customer"] | ||
| stripe_sub_id: str = stripe_sub["id"] | ||
| stripe_price_id: str | None = stripe_sub.get("items", {}).get("data", [{}])[0].get("price", {}).get("id") | ||
| raw_status = status_override or stripe_sub.get("status", "active") | ||
|
|
||
| try: | ||
| stripe_status = SubscriptionStatus(raw_status) | ||
| except ValueError: | ||
| stripe_status = SubscriptionStatus.ACTIVE |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm available SubscriptionStatus members so the safe default actually exists.
ast-grep --pattern 'class SubscriptionStatus($_):
$$$
'Repository: langflow-ai/langflow
Length of output: 46
🏁 Script executed:
#!/bin/bash
# Search for SubscriptionStatus definition
rg "class SubscriptionStatus|enum SubscriptionStatus" -A 15 --type pyRepository: langflow-ai/langflow
Length of output: 1172
🏁 Script executed:
#!/bin/bash
# Also check for imports of SubscriptionStatus to locate its module
rg "SubscriptionStatus" src/backend/saas/langflow_saas/services.py -B 5 -A 2Repository: langflow-ai/langflow
Length of output: 1480
🏁 Script executed:
#!/bin/bash
# Read the actual file at the specified lines (409-417)
sed -n '400,425p' src/backend/saas/langflow_saas/services.pyRepository: langflow-ai/langflow
Length of output: 1261
Fix unsafe fallback to ACTIVE on unknown Stripe statuses and prevent IndexError when subscription items data is empty.
The code has two related issues in this slice:
stripe_sub.get("items", {}).get("data", [{}])[0]— when Stripe sends an emptyitems.datalist, the default[{}]is bypassed (the key exists with an empty value), and[0]raisesIndexError, causing the webhook handler to crash and Stripe to retry indefinitely.SubscriptionStatus(raw_status)falling back toACTIVEonValueErrorsilently marks unknown statuses (e.g.,incomplete_expired,unpaid) asACTIVE, granting access that should be revoked. Default toPAST_DUEinstead and log the unknown status.
Suggested fix
- stripe_price_id: str | None = stripe_sub.get("items", {}).get("data", [{}])[0].get("price", {}).get("id")
+ items_data = (stripe_sub.get("items") or {}).get("data") or [{}]
+ stripe_price_id: str | None = (items_data[0] or {}).get("price", {}).get("id")
raw_status = status_override or stripe_sub.get("status", "active")
try:
stripe_status = SubscriptionStatus(raw_status)
except ValueError:
- stripe_status = SubscriptionStatus.ACTIVE
+ logger.warning("Unknown Stripe subscription status %r; marking PAST_DUE", raw_status)
+ stripe_status = SubscriptionStatus.PAST_DUE🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/saas/langflow_saas/services.py` around lines 409 - 417, The
current extraction of stripe_price_id and status handling is unsafe: change the
stripe_price_id logic (where stripe_price_id is derived from
stripe_sub.get("items", {}).get("data", [{}])[0] ) to first fetch items_data =
stripe_sub.get("items", {}).get("data") and ensure it's a non-empty list before
indexing—if items_data is missing or empty set stripe_price_id = None; and
replace the try/except around SubscriptionStatus(raw_status) so that on
ValueError you log the unknown raw_status and set stripe_status =
SubscriptionStatus.PAST_DUE (not ACTIVE) to avoid granting access inadvertently;
update references to customer_id, stripe_sub_id, stripe_price_id, raw_status,
and SubscriptionStatus in the same block.
| period_start = stripe_sub.get("current_period_start") | ||
| period_end = stripe_sub.get("current_period_end") |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Were current_period_start and current_period_end removed from the Stripe Subscription object in the 2025 API version, and where do they live now?
💡 Result:
Yes. In Stripe’s 2025 API version rollout dated 2025-03-31 (“basil”), the fields current_period_start and current_period_end were removed from the top-level Subscription object. They now live on the subscription items instead: items.data[].current_period_start and items.data[].current_period_end (i.e., each SubscriptionItem tracks its own billing period).
Citations:
- 1: https://docs.stripe.com/changelog/basil/2025-03-31/deprecate-subscription-current-period-start-and-end.md
- 2: https://docs.stripe.com/api/subscriptions/object
🏁 Script executed:
# Get context around lines 419-420
sed -n '410,430p' src/backend/saas/langflow_saas/services.pyRepository: langflow-ai/langflow
Length of output: 1003
🏁 Script executed:
# Search for Stripe API version configuration
rg -i "stripe.*api.*version|api_version.*stripe" --type py -A 2 -B 2Repository: langflow-ai/langflow
Length of output: 46
🏁 Script executed:
# Look for any Stripe initialization or configuration
rg "stripe\." src/backend/saas/langflow_saas/services.py -B 5 -A 5 | head -100Repository: langflow-ai/langflow
Length of output: 1851
🏁 Script executed:
# Check for Stripe SDK version in requirements
find . -name "requirements*.txt" -o -name "setup.py" -o -name "pyproject.toml" | xargs grep -l stripe 2>/dev/nullRepository: langflow-ai/langflow
Length of output: 98
🏁 Script executed:
# Look for Stripe API version pinning
rg "api_version" --type py src/backend/ -B 2 -A 2Repository: langflow-ai/langflow
Length of output: 1503
🏁 Script executed:
# Check stripe initialization more thoroughly
rg "_stripe\(\)" src/backend/saas/langflow_saas/services.py -B 10 -A 3Repository: langflow-ai/langflow
Length of output: 1596
🏁 Script executed:
# Check Stripe version in pyproject.toml
cat src/backend/saas/pyproject.toml | grep -A 5 -B 5 stripeRepository: langflow-ai/langflow
Length of output: 394
🏁 Script executed:
# See how period_start and period_end are used
sed -n '419,445p' src/backend/saas/langflow_saas/services.pyRepository: langflow-ai/langflow
Length of output: 1226
🏁 Script executed:
# See how period_start and period_end are assigned to db objects
sed -n '440,480p' src/backend/saas/langflow_saas/services.pyRepository: langflow-ai/langflow
Length of output: 1495
Pull current_period_start and current_period_end from subscription items instead of top-level subscription object.
In Stripe API version 2025‑03‑31 and later, current_period_start and current_period_end were moved from the subscription object to per-item fields at items.data[].current_period_start/end. The current .get() calls on lines 419-420 will silently return None for accounts pinned to this API version or later, leaving the subscription's period fields unpopulated in the database.
Since the codebase already accesses items.data[0] on line 418 for the price ID, use a similar pattern to pull period values from items.data[0] as a fallback when the top-level fields are missing.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/saas/langflow_saas/services.py` around lines 419 - 420, The code
is reading current_period_start/current_period_end from the top-level stripe_sub
but newer Stripe API versions move these into each subscription item; update the
logic around where stripe_sub is processed (the block that already reads
items.data[0] for the price id) to first attempt
stripe_sub.get("current_period_start")/get("current_period_end") and if either
is None, fall back to pulling those values from stripe_sub.get("items",
{}).get("data", [])[0].get("current_period_start") and
.get("current_period_end") respectively (ensure you guard against empty
items.data). Adjust the assignments for period_start and period_end to use this
fallback so subscriptions pinned to API 2025-03-31+ get correct period values.
| invitation_secret: SecretStr = Field( | ||
| default_factory=lambda: SecretStr(os.getenv("SAAS_INVITATION_SECRET", "change-me-in-production")), | ||
| description="Secret for signing invitation tokens.", | ||
| ) |
There was a problem hiding this comment.
Insecure default invitation_secret silently degrades token security in production.
If an operator forgets to set SAAS_INVITATION_SECRET, the plugin will sign HMAC invitation tokens with the well-known string "change-me-in-production", allowing anyone to mint valid invitations. Since invitations grant org membership, this is a privilege-escalation risk. Either:
- Refuse to start when this default is in effect and a non-development environment is detected, or
- Auto-generate an ephemeral secret per-process and log a warning (forces tokens to be short-lived but non-forgeable across restarts).
🛠 Example: warn loudly and refuse to issue invites with the default
import secrets, warnings
def _default_invitation_secret() -> SecretStr:
explicit = os.getenv("SAAS_INVITATION_SECRET")
if explicit:
return SecretStr(explicit)
warnings.warn(
"SAAS_INVITATION_SECRET is not set; generating an ephemeral secret. "
"Set it explicitly in production.",
stacklevel=2,
)
return SecretStr(secrets.token_urlsafe(48))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/saas/langflow_saas/settings.py` around lines 87 - 90, Replace the
insecure lambda default for invitation_secret with a helper like
_default_invitation_secret that reads SAAS_INVITATION_SECRET and if set returns
SecretStr(value); otherwise either (A) raise/exit when running in
non-development environments to refuse startup, or (B) generate an ephemeral
SecretStr via secrets.token_urlsafe(48) and emit a loud warning via
warnings.warn() (and ensure invite issuance is disabled or tokens are
short-lived). Update the Field default_factory to point to
_default_invitation_secret and reference the existing invitation_secret symbol
so the behavior is enforced at process start.
Multi-tenant data isolation and billing hygiene in one change:
1. saas_flow_org shadow table (migration 002saas)
Maps Langflow flow UUIDs to orgs without touching Langflow's flow table.
Idempotent migration — skips DDL if table already exists.
2. FlowOwnershipMiddleware
Intercepts successful POST /api/v1/flows, buffers the JSON response,
extracts the new flow id, and inserts a saas_flow_org row — all
transparently. Failures never surface to the caller.
3. Org-scoped flows API (/api/saas/v1/orgs/{id}/flows)
GET — list all flows owned by the org (joined with Langflow flow rows)
POST /assign — manually assign an existing flow to an org
DELETE /assign — unassign (flow itself is not deleted)
4. Subscription auto-provisioning in _bootstrap_personal_org
Every new personal org now gets a Free Subscription row at creation time
so plan-specific quota checks have a real row to read.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Intercepts POST /api/v1/users/ (Langflow's public signup endpoint). On a 201 response, buffers the body, extracts the new user's id + username, and calls _bootstrap_personal_org() immediately — before the user makes any other request. This closes the gap where a new user's first request to a non-SaaS route (e.g. GET /api/v1/flows) would succeed without an org context, causing any subsequent SaaS route to return 401 until the lazy bootstrap fired. Middleware execution order is now: RateLimit → UserRegistration → TenantContext → FlowOwnership → QuotaEnforcement Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
hmmm very interesting concept |
Summary
src/backend/saas/Python package (langflow-saas) that plugs into Langflow'slangflow.pluginsentry-point group, bringing full multi-tenancy to any Langflow instanceplugin_routes.pygainsadd_middleware()) — every other capability lives in the plugin packagelangflow.dbWhat changed
src/backend/base/langflow/plugin_routes.py(core)_PluginAppWrapper.add_middleware()added so plugins can register ASGI middleware on the host app without touching any other Langflow file.src/backend/saas/(new package)Data layer
saas_*tables:plan,organization,user_organization,invitation,team,team_member,subscription,usage_record,audit_log001saas) with Free / Pro / Enterprise plan seedsSaasAlembicVersionregistered as SQLModel so Langflow's migration drift-checker doesn't flag the separate version tableMiddleware (three layers, outermost-first)
RateLimitMiddleware— Redis sliding-window per user/IP, graceful no-op when Redis is absentTenantContextMiddleware— resolves JWT / API key → org membership → storesOrgContextDataonrequest.stateQuotaEnforcementMiddleware— blocks flow-execution endpoints when daily quota is exhausted, records usage asynchronouslyREST API (
/api/saas/v1/)Config — all via
SAAS_*env vars with safe defaults; email backend isconsoleby default so zero external dependencies are requiredHow to test
Reviewer notes
saas_alembic_versiontable must be present and contain001saasfor the server to start (Langflow's migration checker validates all tables inSQLModel.metadata);plugin.pyhandles this at startupsa.Uuid()on SQLite stores as 32-char hex without hyphens — any direct DB seeding must useuuid.hexformatOrgRoleenum values are stored by SQLAlchemy name (OWNER, notowner) — match this in any fixtures🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features