SDK-Managed SMS MFA — Step-By-Step Implementation Plan
Production-grade, TDD-driven implementation of SDK-owned SMS MFA with recovery codes, applied as a full replacement for the legacy email_otp_enabled surface. This plan is grounded in the existing repo layout under app/ and mirrors the conventions used by the email-OTP flow in app/services/otp_service.py and app/routers/otp.py .
0. Agreed Decisions (locked-in context)
#
Decision
1
All work lives under app/ . The sdk/ package is not modified.
2
LocalSmsSender stores messages in a Redis list per-phone with short TTL. No HTTP debug endpoint. Tests read Redis via a fixture helper. Production config validator hard-rejects SMS__PROVIDER=local.
3
Recovery codes: 10 codes × 10 chars , Crockford base32 alphabet (no ILOU01), formatted as XXXXX-XXXXX.
4
Phone numbers are encrypted at rest via a new mfa.phone_encryption_key. A keyed HMAC phone_lookup_hash column supports uniqueness and lookup. phone_last4 is stored for masked display without decryption.
5
email_otp_enabled is fully replaced by mfa_enabled + mfa_primary_method in a single migration (drop column, update all call sites). No dual-write period.
6
Challenge token is a signed JWT (existing pattern), but jti is bound to Redis challenge state. Single-use semantics enforced by deleting the Redis row on verify/expiry.
7
New OAuth/SAML users default to mfa_enabled=false. No forced enrollment in v1.
8
Only SmsSender Protocol + LocalSmsSender in v1. Twilio/SNS are extension points, not implemented.
9
Rate limits are configurable via a new MfaSettings block. Defaults: SMS request 3/hour/user, 5/hour/IP; SMS resend 3/challenge; SMS verify 5/challenge; recovery-code 5 per 15 min/user.
10
phone_number_e164 is globally unique via unique index on phone_lookup_hash, partial on deleted_at IS NULL.
11
TDD: per-unit red/green/refactor for service logic, per-router behavior test for endpoints, shared fixtures under tests/ .
1. Target File Layout
New files in bold . Files removed are struck-through.
app/
config.py # + MfaSettings, SmsSettings
core/
mfa/ **new package**
__init__.py
codes.py # OTP + recovery code generation / hashing
phone.py # E.164 validation, masking, encryption, lookup hash
challenge.py # Redis challenge store (jti-bound, single-use)
services/
~~otp_service.py~~ **deleted** (replaced by mfa_service.py)
mfa_service.py **new** (generic, method-aware)
sms/ **new package**
__init__.py
base.py # SmsSender Protocol
local.py # LocalSmsSender (dev only)
factory.py # get_sms_sender() with production guard
routers/
~~otp.py~~ **deleted** (replaced by mfa.py)
mfa.py **new** (all MFA endpoints)
auth.py # updated: password login branches to MFA
oauth.py # updated: callback branches to MFA
saml.py # updated: callback branches to MFA
schemas/
~~otp.py~~ **deleted** (replaced by mfa.py)
mfa.py **new**
user.py # remove email_otp_enabled fields
models/
user.py # remove email_otp_enabled; add mfa_* fields
recovery_code.py **new** (UserRecoveryCode ORM)
services/
oauth_service.py # updated: route MFA branch
saml_service.py # updated: route MFA branch
token_service.py # claim rename: email_otp_enabled -> mfa_enabled
brute_force_service.py # reused as-is (record_failed_otp_attempt)
migrations/versions/
20260424_0014_mfa_core.py **new** (users.* + user_recovery_codes)
tests/
unit/
test_mfa_service.py **new**
test_mfa_phone_encryption.py **new**
test_mfa_recovery_codes.py **new**
test_sms_local_sender.py **new**
test_config_mfa_production_guard.py **new**
integration/
test_mfa_password_login_real.py **new**
test_mfa_oauth_login_real.py **new**
test_mfa_saml_login_real.py **new**
test_mfa_enrollment_real.py **new**
test_mfa_recovery_codes_real.py **new**
test_mfa_rate_limits_real.py **new**
docs/
configuration.md # + MFA/SMS settings
service-api.md # + new MFA endpoints
architecture.md # + MFA flow description
mfa-local-testing.md **new**
security-review.md # + SMS risk notes
2. TDD Working Agreement
Red : write one failing test for the smallest observable behavior.
Green : write the minimum production code to pass.
Refactor : clean up, keep tests green, run ruff check + ruff format + mypy (project conventions).
Each release in Section 3 lists its tests before its implementation tasks to enforce this order.
Shared fixtures: extend tests/unit/conftest.py and tests/integration/conftest.py with:
fake_sms_sender — captures sent SMS in-memory
local_sms_fixture — reads Redis-backed SMS list
mfa_enrolled_user — factory producing a user with verified phone + MFA on
recovery_codes_fixture — returns plaintext codes + hashed counterparts
3. Releases
Release 0 — Settings, Config Guard, and Baseline Contracts
Purpose : land new configuration surface and encryption key plumbing first so every subsequent release has a compiled place to anchor into. No behavior change yet.
Tests first (tests/unit/test_config_mfa_production_guard.py , tests/unit/test_config_production_hardening.py ):
test_production_rejects_local_sms_provider
test_production_requires_phone_encryption_key
test_production_requires_non_empty_mfa_rate_limits
test_development_defaults_allow_local_provider
Implementation :
Add to app/config.py :
class SmsSettings (BaseModel ):
provider : Literal ["local" , "twilio" , "sns" ] = "local"
local_ttl_seconds : int = Field (default = 600 , ge = 60 , le = 3600 )
class MfaRateLimits (BaseModel ):
sms_request_per_hour_per_user : int = Field (default = 3 , ge = 1 )
sms_request_per_hour_per_ip : int = Field (default = 5 , ge = 1 )
sms_resend_per_challenge : int = Field (default = 3 , ge = 1 )
sms_verify_attempts_per_challenge : int = Field (default = 5 , ge = 1 )
recovery_code_attempts_per_15min : int = Field (default = 5 , ge = 1 )
class MfaSettings (BaseModel ):
sms : SmsSettings = SmsSettings ()
rate_limits : MfaRateLimits = MfaRateLimits ()
sms_code_length : int = Field (default = 6 , ge = 4 , le = 12 )
sms_code_ttl_seconds : int = Field (default = 600 , ge = 60 , le = 3600 )
sms_max_attempts : int = Field (default = 5 , ge = 1 )
challenge_ttl_seconds : int = Field (default = 600 , ge = 60 , le = 3600 )
action_token_ttl_seconds : int = Field (default = 300 , ge = 60 , le = 3600 )
recovery_code_count : int = Field (default = 10 , ge = 6 , le = 20 )
recovery_code_length : int = Field (default = 10 , ge = 8 , le = 16 )
phone_encryption_key : SecretStr | None = None
phone_lookup_hash_key : SecretStr | None = None
Attach to Settings: mfa: MfaSettings = MfaSettings().
Extend Settings.validate_production_constraints (see app/config.py:313 ):
mfa.sms.provider == "local" → raise
mfa.phone_encryption_key is None → raise
mfa.phone_lookup_hash_key is None → raise
Update .env-sample with the new keys and a short comment pointing to docs/mfa-local-testing.md.
Do not reference the new settings elsewhere yet. This release is strictly additive plumbing.
Acceptance :
pytest tests/unit/test_config_mfa_production_guard.py green.
Existing test suite still green (no call sites changed).
Release 1 — Data Model + Migration (Atomic Replacement of email_otp_enabled)
Purpose : land the full schema change in one migration, with all code updated in the same commit so no intermediate state exists.
Tests first :
Implementation :
Create app/core/mfa/phone.py :
PhoneCipher dataclass with encrypt(e164: str) -> bytes, decrypt(blob: bytes) -> str. Use Fernet (already pulled in by cryptography).
PhoneHasher.lookup_hash(e164: str) -> str — keyed HMAC-SHA256 of the canonicalized E.164 string.
normalize_e164(raw: str) -> str — strict validator: ^\+[1-9]\d{7,14}$. Reject anything else.
mask_e164(e164: str) -> str — returns "+" + "*" * (len-4) + last4.
Two reloadable_singleton factories that source keys from settings.mfa.
Create app/models/recovery_code.py :
class UserRecoveryCode (Base , TimestampTenantMixin ):
__tablename__ = "user_recovery_codes"
__table_args__ = (
UniqueConstraint ("user_id" , "code_hash" , name = "uq_user_recovery_codes_user_hash" ),
Index ("ix_user_recovery_codes_user_id_used" , "user_id" , "used_at" ),
)
id : Mapped [UUID ] = mapped_column (PGUUID (as_uuid = True ), primary_key = True , default = uuid4 )
user_id : Mapped [UUID ] = mapped_column (
PGUUID (as_uuid = True ), ForeignKey ("users.id" , ondelete = "CASCADE" ), nullable = False
)
code_hash : Mapped [str ] = mapped_column (String (128 ), nullable = False )
used_at : Mapped [datetime | None ] = mapped_column (DateTime (timezone = True ), nullable = True )
Update app/models/user.py :
Remove email_otp_enabled.
Add :
phone_ciphertext: Mapped[bytes | None] (LargeBinary)
phone_last4: Mapped[str | None] (String(4))
phone_lookup_hash: Mapped[str | None] (String(64))
phone_verified: Mapped[bool] (default false)
phone_verified_at: Mapped[datetime | None]
mfa_enabled: Mapped[bool] (default false)
mfa_primary_method: Mapped[str | None] (String(16), CHECK in ('sms',))
Add relationship: recovery_codes: Mapped[list[UserRecoveryCode]].
Create migration migrations/versions/20260424_0014_mfa_core.py :
down_revision = "20260416_0013".
Add the new columns to users with safe defaults.
Drop email_otp_enabled from users.
Create user_recovery_codes table.
Add partial unique index: CREATE UNIQUE INDEX uq_users_phone_lookup_hash ON users (phone_lookup_hash) WHERE deleted_at IS NULL AND phone_lookup_hash IS NOT NULL;
downgrade() reverses cleanly (re-adds email_otp_enabled as false, drops new columns and table).
Sweep and delete every reference to email_otp_enabled across the repo:
Update app/schemas/user.py : drop any response field exposing email_otp_enabled; add mfa_enabled where appropriate.
Acceptance :
alembic upgrade head then alembic downgrade -1 round-trips cleanly against a fresh Postgres.
grep -R "email_otp_enabled" app tests migrations returns zero matches.
Existing non-MFA login flows still pass integration tests.
Release 2 — MFA Core (Challenge Store, OTP Helpers, Recovery Codes)
Purpose : implement the provider-agnostic primitives used by every later release.
Tests first (tests/unit/test_mfa_service.py , tests/unit/test_mfa_recovery_codes.py ):
test_challenge_store_roundtrip_hashes_code
test_challenge_is_single_use_after_verify
test_challenge_expiry_removes_redis_state
test_attempt_counter_caps_at_max_and_deletes_challenge
test_challenge_jti_bound_to_redis_row
test_recovery_code_generator_produces_unique_formatted_codes
test_recovery_code_hash_uses_keyed_hmac
test_recovery_code_verify_is_constant_time
test_recovery_code_regeneration_invalidates_previous
Implementation :
app/core/mfa/codes.py :
generate_sms_otp(length: int) -> str — reuses secrets.randbelow, zero-padded.
hash_sms_otp, verify_sms_otp — delegate to the existing app/core/otp.py HMAC machinery (keep core/otp.py intact as a low-level primitive, just rename the module's public surface to be method-neutral). Update app/core/otp.py docstring accordingly.
generate_recovery_codes(count, length) -> list[str] — Crockford base32, XXXXX-XXXXX formatting.
hash_recovery_code(code: str) -> str — keyed HMAC (same key as SMS OTP).
verify_recovery_code(raw, stored_hash) -> bool.
app/core/mfa/challenge.py :
MfaChallengePurpose = Literal["login", "action", "phone_verify"]
MfaMethod = Literal["sms"] (extensible)
ChallengeState dataclass: user_id, purpose, method, code_hash, attempt_count, audience, created_at, jti.
MfaChallengeStore (Redis-backed):
store(user_id, purpose, method, *, jti, code, audience, ttl) -> None
load(user_id, purpose) -> ChallengeState | None
increment_attempts(user_id, purpose) -> int
delete(user_id, purpose) -> None
assert_jti_matches(state, claimed_jti) -> None
Key format: mfa:challenge:{purpose}:{user_id}.
All Redis errors translated to the existing OTPServiceError-shaped contract (reuse its pattern but rename the class to MfaServiceError).
Add recovery-code repository helpers in app/services/mfa_service.py skeleton:
_persist_recovery_codes(db_session, user_id, hashes) — bulk insert inside an existing outer transaction.
_consume_recovery_code(db_session, user_id, raw_code) -> bool — SELECT ... FOR UPDATE over unused codes, verify, mark used_at.
_invalidate_recovery_codes(db_session, user_id) — bulk set used_at=now() where used_at IS NULL.
Acceptance :
All Release-2 unit tests green.
No integration wiring yet — MFA service is not yet exposed.
Release 3 — SMS Provider Abstraction
Purpose : concrete SmsSender and dev-only LocalSmsSender, production guard enforced by factory.
Tests first (tests/unit/test_sms_local_sender.py ):
test_local_sender_persists_message_with_ttl
test_local_sender_masks_code_when_requested_for_logging
test_local_sender_retains_most_recent_n_messages_per_phone
test_factory_raises_when_provider_local_in_production
test_factory_returns_local_sender_in_development
test_factory_raises_notimplemented_for_twilio_or_sns_in_v1
Implementation :
app/services/sms/base.py :
class SmsSender (Protocol ):
async def send_otp_sms (
self ,
* ,
to_phone_e164 : str ,
code : str ,
expires_in_seconds : int ,
purpose : MfaChallengePurpose ,
) -> None : ...
app/services/sms/local.py :
LocalSmsSender writes to Redis list sms:local:{phone} with LPUSH + LTRIM 0 9 + EXPIRE.
Payload is JSON: {sent_at, purpose, code, expires_in}.
Never logs the code ; it logs only {"to_last4": ..., "purpose": ..., "expires_in": ...}.
app/services/sms/factory.py :
get_sms_sender() @reloadable_singleton:
If settings.app.environment == "production" and settings.mfa.sms.provider == "local" → raise (defense in depth; config guard already catches this at startup).
If provider == "local" → return LocalSmsSender.
Otherwise raise NotImplementedError("SMS provider 'twilio' not implemented in v1").
Add a fixture in tests/integration/conftest.py :
async def pop_local_sms(redis, phone_e164) -> dict.
Acceptance :
All Release-3 unit tests green.
Local sender writes readable payloads via the fixture.
Factory raises in production regardless of runtime path.
Release 4 — MFAService Core API
Purpose : translate the old OTP service into a method-aware service. No router wiring yet.
Tests first (tests/unit/test_mfa_service.py , extended):
Phone enrollment:
test_request_phone_verification_rejects_invalid_e164
test_request_phone_verification_rejects_already_claimed_phone
test_request_phone_verification_sends_sms_and_stores_challenge
test_verify_phone_marks_verified_and_stores_encrypted
test_verify_phone_fails_on_wrong_code_and_increments_attempts
Enable/disable:
test_enable_mfa_requires_verified_phone
test_enable_mfa_sets_primary_method_and_returns_recovery_codes_once
test_disable_mfa_requires_step_up_and_clears_challenges
Login challenge:
test_start_login_challenge_issues_jwt_bound_to_redis_jti
test_verify_login_with_sms_code_returns_tokens_and_deletes_challenge
test_verify_login_with_recovery_code_succeeds_and_marks_code_used
test_verify_login_rejects_reused_challenge_token
test_verify_login_rejects_reused_recovery_code
Audience preservation:
test_challenge_preserves_requested_audience_for_login
Implementation — app/services/mfa_service.py :
Class skeleton modeled on app/services/otp_service.py :
class MfaService :
def __init__ (self , jwt , signing_keys , tokens , sessions , brute_force ,
redis , sms_sender , challenge_store , phone_cipher , phone_hasher ,
settings ): ...
Methods (each with precise preconditions and audit-friendly return dataclasses):
request_phone_verification(db, user_id, raw_phone) -> PhoneVerificationRequestResult
verify_phone(db, user_id, code) -> None
enable_mfa(db, user_id, action_token) -> EnableMfaResult (returns plaintext recovery codes; this is the only code path that ever returns them )
disable_mfa(db, user_id, action_token) -> None (requires valid action token; clears challenge + recovery codes marked used)
regenerate_recovery_codes(db, user_id, action_token) -> list[str]
start_login_challenge(db, user, *, requested_audience) -> LoginChallengeResult
verify_login(db, challenge_token, *, code=None, recovery_code=None, client_ip, user_agent) -> LoginVerificationResult
resend_login_code(db, challenge_token) -> None
request_action_code(db, user_id, action) -> ActionRequestResult
verify_action_code(db, user_id, code, action, *, audience) -> ActionVerificationResult
Brute-force integration: reuse app/services/brute_force_service.py — call record_failed_otp_attempt on SMS and recovery-code failures identically.
Challenge JWT binding:
Generate a fresh jti per challenge.
Store jti in the Redis challenge row.
On verify, assert_jti_matches(state, claims["jti"]); delete Redis row atomically on success/attempt-exhausted.
Rate limits:
Enforce mfa.rate_limits.sms_request_per_hour_per_user on phone-verify issuance and login challenges (combined with existing _ensure_issuance_not_blocked).
Per-IP limit requires client-IP plumbing; add client_ip parameter to request_phone_verification and start_login_challenge.
Errors: one MfaServiceError with stable code strings:
phone_invalid, phone_unavailable, phone_not_verified, mfa_not_enabled, mfa_already_enabled,
challenge_expired, invalid_code, invalid_recovery_code, challenge_reused, rate_limited,
action_token_invalid, session_backend_unavailable.
Delete app/services/otp_service.py in the same commit so there is never a compiled state with two services.
Acceptance :
All service-level unit tests green.
No router yet → HTTP surface still inactive. Covered by Release 5.
Release 5 — New MFA Router, Password Login MFA Path
Purpose : expose the new MFA endpoints and wire password login.
Tests first :
Router tests under tests/unit/test_mfa_router.py :
test_request_phone_requires_authentication
test_request_phone_accepts_valid_e164_and_returns_expires_in
test_enable_requires_step_up_or_reauth
test_enable_returns_recovery_codes_once_and_never_again
test_disable_requires_action_token
test_regenerate_recovery_codes_invalidates_previous
Integration tests/integration/test_mfa_password_login_real.py :
test_password_login_without_mfa_returns_tokens_unchanged
test_password_login_with_mfa_returns_challenge_not_tokens
test_password_login_with_mfa_verify_issues_tokens_and_creates_session
test_password_login_with_mfa_verify_accepts_recovery_code
test_password_login_with_mfa_resend_respects_limit
test_challenge_token_is_single_use
test_csrf_enforced_for_cookie_transport
Implementation :
app/schemas/mfa.py :
MfaChallengeResponse (note generic name; mfa_required: True, challenge_token, method, masked_destination, expires_in).
VerifyLoginMfaRequest (one of code or recovery_code; Pydantic model_validator enforces exactly one).
PhoneEnrollmentRequest, PhoneVerifyRequest, EnableMfaResponse (includes recovery_codes: list[str]), RegenerateRecoveryCodesResponse.
RequestActionMfaRequest, VerifyActionMfaRequest, VerifyActionMfaResponse, MfaEnrollmentResponse.
app/routers/mfa.py mirrors app/routers/otp.py structure:
POST /auth/mfa/phone/request
POST /auth/mfa/phone/verify
POST /auth/mfa/enable
POST /auth/mfa/disable
POST /auth/mfa/recovery-codes/regenerate
POST /auth/mfa/verify/login
POST /auth/mfa/resend/login
POST /auth/mfa/request/action
POST /auth/mfa/verify/action
CSRF + bearer extraction reuses app/core/browser_sessions.py .
Audit emissions mirror the event names in Section 5.
Delete app/routers/otp.py and app/schemas/otp.py in the same commit.
Update app/main.py router registration: replace otp.router with mfa.router.
Update app/routers/auth.py password-login handler:
Replace the email_otp_enabled branch with user.mfa_enabled branch.
If MFA enabled, call mfa_service.start_login_challenge(...) and return MfaChallengeResponse.
Otherwise, issue tokens exactly as today.
Preserve existing audit event names for non-MFA path; for the challenge path emit mfa.login.required and mfa.sms.sent.
Acceptance :
Password login with MFA returns challenge, not tokens.
Non-MFA password login response is byte-identical to the current contract.
Existing password-login integration tests pass unchanged.
Release 6 — OAuth and SAML MFA Branching
Tests first (tests/integration/test_mfa_oauth_login_real.py , tests/integration/test_mfa_saml_login_real.py ):
test_oauth_callback_without_mfa_preserves_current_contract
test_oauth_callback_with_mfa_returns_challenge_and_no_tokens
test_oauth_mfa_verify_preserves_requested_audience
test_oauth_mfa_new_user_defaults_to_mfa_disabled
Same four for SAML.
test_idp_amr_acr_no_longer_gates_session_issuance (negative: IdP-provided MFA evidence alone is insufficient)
Implementation :
Extract a helper in app/services/mfa_service.py :
async def maybe_start_provider_login_challenge (
self , db , user , * , requested_audience
) -> LoginChallengeResult | None
Returns None when MFA is disabled.
Update app/services/oauth_service.py complete_google_callback:
After resolving the user, call the helper.
If challenge is returned, return a new OAuthCallbackCompletion variant (add mfa_challenge: LoginChallengeResult | None = None field).
Skip session creation + token issuance when MFA challenge is present.
Update app/routers/oauth.py callback handler:
On MFA challenge, return MfaChallengeResponse (preserve redirect semantics: for browser-session transport, issue a short-lived non-auth CSRF cookie + return JSON; this path must NOT set access/refresh cookies).
Mirror the same change in app/services/saml_service.py and app/routers/saml.py .
Remove any code path that uses IdP amr/acr/SAML AuthnContextClassRef as a substitute for local MFA. If these exist only in audit metadata, leave them; they must not influence token/session issuance.
Acceptance :
All Release-6 integration tests green.
Cookie transport path never emits auth cookies when MFA is required.
Requested OAuth audience flows through the challenge into the verify step unchanged.
Release 7 — Sensitive Actions and Admin Hardening
Purpose : route admin/sensitive action OTP through the same MFA service; verify that admin gates read mfa_enabled from the DB, not from stale token claims.
Tests first (tests/integration/test_mfa_enrollment_real.py , tests/unit/test_admin_service.py extensions):
test_admin_gate_reads_db_not_token_claims
test_stale_access_token_with_old_mfa_claim_does_not_bypass_current_state
test_action_token_single_use_preserved
test_recovery_code_for_admin_step_up_is_audited
Implementation :
Audit every admin-sensitive handler in app/routers/admin.py and app/services/admin_service.py . Any check against claims["email_otp_enabled"] (now renamed to mfa_enabled) that is used to decide whether MFA is required must be replaced by a fresh DB read.
Keep X-Action-Token unchanged. Action tokens retain: user-bound, action-bound, short-lived, signed. Add single-use enforcement if not already present:
Store each minted action-token jti in Redis with TTL = mfa.action_token_ttl_seconds.
Reject verifies whose jti is already consumed.
Recovery-code use during admin step-up (if enabled) emits mfa.recovery_code.used + admin.step_up.recovery_code audit events.
Acceptance :
Admin cannot bypass MFA by presenting an access token minted before MFA was enabled.
Action tokens cannot be replayed.
Admin route behavior otherwise unchanged.
Release 8 — Observability, Rate Limits, Documentation
Purpose : operational readiness and docs.
Tests first (tests/integration/test_mfa_rate_limits_real.py ):
test_sms_request_rate_limit_per_user_per_hour
test_sms_request_rate_limit_per_ip_per_hour
test_sms_resend_limit_per_challenge
test_recovery_code_rate_limit_per_user
test_audit_events_contain_no_otp_or_recovery_code_values
test_error_codes_are_stable_across_abuse_paths
Implementation :
Audit events emitted by the MFA router (consistent names; all through app/services/audit_service.py ):
mfa.sms.sent, mfa.login.required, mfa.login.verified, mfa.phone.verified,
mfa.enabled, mfa.disabled, mfa.recovery_code.used, mfa.recovery_codes.regenerated,
mfa.failed, mfa.expired, mfa.excessive_failures.
Hard rule : metadata payloads contain user_id, method, purpose, masked_destination — never raw code or recovery code. Enforced by a dedicated unit test that greps audit records.
Rate limiting:
Per-user/per-IP limiters use Redis counters (mfa:rl:user:{id}:{bucket}, mfa:rl:ip:{ip}:{bucket}) with hourly buckets.
Key eviction via EXPIRE on first increment.
Stable error codes on breach: rate_limited with Retry-After header.
Documentation:
Acceptance :
Rate-limit integration tests green.
Audit payload test proves no secrets leak.
Docs render cleanly in Markdown.
Final Release — Production Gate
A single commit that flips version and adds the release-gate checklist. No code changes except version bump and any last-mile doc touch-ups.
Release gate (all must pass before tag) :
pytest -q — full unit + integration suite.
alembic upgrade head on fresh database, then alembic downgrade -1 round-trip.
ruff check . + ruff format --check . + mypy app — clean.
Manual smoke (local, MailHog + Redis + Postgres up):
Password login end-to-end with SMS MFA
OAuth login end-to-end with SMS MFA
SAML login end-to-end with SMS MFA
Recovery-code login when phone is unavailable
MFA disable requires action token
Regenerate recovery codes invalidates prior set
Production config validation:
APP__ENVIRONMENT=production with MFA__SMS__PROVIDER=local → startup fails with a specific, actionable error message.
Missing MFA__PHONE_ENCRYPTION_KEY → startup fails.
Final behavior (mirrors source plan):
Password, OAuth, and SAML all use the same SDK-owned MFA flow.
Tokens/sessions are never issued before MFA completion for MFA-enabled users.
SMS is the first supported method; recovery codes are the fallback.
Local SMS provider is development-only, hard-blocked in production.
SmsSender Protocol + MfaMethod Literal are the two extension points for future providers (Twilio/SNS) and methods (TOTP/WebAuthn).
4. Non-Negotiables (Secure Defaults)
Codes and recovery codes are never logged , never returned in audit metadata, never stored plaintext.
Phone numbers are encrypted at rest (Fernet), with a separate keyed HMAC phone_lookup_hash for uniqueness/indexing.
Challenge tokens are single-use via jti-bound Redis state; deletion happens on success, attempt exhaustion, and expiry.
Recovery codes are single-use (DB used_at IS NOT NULL check inside a SELECT ... FOR UPDATE row).
Rate limits fail closed on Redis errors (treat as "rate limited").
Production config guard rejects unsafe SMS provider at startup , with a defense-in-depth check in the factory.
All new endpoints honor the existing CSRF + cookie-transport semantics in app/core/browser_sessions.py .
5. Audit Event Catalog
Event
Emitted by
Required metadata
mfa.sms.sent
phone/request, resend, login challenge, action request
purpose, method="sms", masked_destination
mfa.phone.verified
verify_phone
—
mfa.enabled
enable_mfa
primary_method
mfa.disabled
disable_mfa
—
mfa.login.required
password/OAuth/SAML handlers
provider
mfa.login.verified
verify_login
provider, method_used (sms|recovery_code)
mfa.recovery_code.used
verify_login, admin step-up
context
mfa.recovery_codes.regenerated
regenerate
count
mfa.failed
any verify failure
purpose, reason_code
mfa.expired
expired challenge on verify
purpose
mfa.excessive_failures
lockout trigger
purpose
6. Implementation Order (Copy-Paste Checklist)
Release 0 — config settings + production guard
Release 1 — migration + model sweep of email_otp_enabled
Release 2 — mfa/codes, mfa/challenge, recovery-code helpers
Release 3 — sms/base, sms/local, sms/factory
Release 4 — MfaService (no HTTP yet)
Release 5 — schemas/mfa, routers/mfa, password-login wiring, delete otp files
Release 6 — OAuth + SAML MFA branching
Release 7 — admin/action-token hardening
Release 8 — rate limits, audit catalog, documentation
Final — release gate, version bump
Each release ends with: all relevant tests green, full suite green, no dead code, docs updated.
SDK-Managed SMS MFA — Step-By-Step Implementation Plan
Production-grade, TDD-driven implementation of SDK-owned SMS MFA with recovery codes, applied as a full replacement for the legacy
email_otp_enabledsurface. This plan is grounded in the existing repo layout under app/ and mirrors the conventions used by the email-OTP flow in app/services/otp_service.py and app/routers/otp.py.0. Agreed Decisions (locked-in context)
sdk/package is not modified.LocalSmsSenderstores messages in a Redis list per-phone with short TTL. No HTTP debug endpoint. Tests read Redis via a fixture helper. Production config validator hard-rejectsSMS__PROVIDER=local.ILOU01), formatted asXXXXX-XXXXX.mfa.phone_encryption_key. A keyed HMACphone_lookup_hashcolumn supports uniqueness and lookup.phone_last4is stored for masked display without decryption.email_otp_enabledis fully replaced bymfa_enabled+mfa_primary_methodin a single migration (drop column, update all call sites). No dual-write period.jtiis bound to Redis challenge state. Single-use semantics enforced by deleting the Redis row on verify/expiry.mfa_enabled=false. No forced enrollment in v1.SmsSenderProtocol +LocalSmsSenderin v1. Twilio/SNS are extension points, not implemented.MfaSettingsblock. Defaults: SMS request 3/hour/user, 5/hour/IP; SMS resend 3/challenge; SMS verify 5/challenge; recovery-code 5 per 15 min/user.phone_number_e164is globally unique via unique index onphone_lookup_hash, partial ondeleted_at IS NULL.1. Target File Layout
New files in bold. Files removed are struck-through.
2. TDD Working Agreement
ruff check+ruff format+mypy(project conventions).fake_sms_sender— captures sent SMS in-memorylocal_sms_fixture— reads Redis-backed SMS listmfa_enrolled_user— factory producing a user with verified phone + MFA onrecovery_codes_fixture— returns plaintext codes + hashed counterparts3. Releases
Release 0 — Settings, Config Guard, and Baseline Contracts
Purpose: land new configuration surface and encryption key plumbing first so every subsequent release has a compiled place to anchor into. No behavior change yet.
Tests first (tests/unit/test_config_mfa_production_guard.py, tests/unit/test_config_production_hardening.py):
test_production_rejects_local_sms_providertest_production_requires_phone_encryption_keytest_production_requires_non_empty_mfa_rate_limitstest_development_defaults_allow_local_providerImplementation:
Settings:mfa: MfaSettings = MfaSettings().Settings.validate_production_constraints(see app/config.py:313):mfa.sms.provider == "local"→ raisemfa.phone_encryption_key is None→ raisemfa.phone_lookup_hash_key is None→ raisedocs/mfa-local-testing.md.Acceptance:
pytest tests/unit/test_config_mfa_production_guard.pygreen.Release 1 — Data Model + Migration (Atomic Replacement of
email_otp_enabled)Purpose: land the full schema change in one migration, with all code updated in the same commit so no intermediate state exists.
Tests first:
test_phone_roundtrip_encrypt_decrypttest_phone_lookup_hash_is_deterministictest_phone_lookup_hash_changes_with_keytest_invalid_e164_rejectedtest_mask_shows_only_last_fourtest_alembic_upgrade_and_downgrade_roundtripImplementation:
PhoneCipherdataclass withencrypt(e164: str) -> bytes,decrypt(blob: bytes) -> str. Use Fernet (already pulled in bycryptography).PhoneHasher.lookup_hash(e164: str) -> str— keyed HMAC-SHA256 of the canonicalized E.164 string.normalize_e164(raw: str) -> str— strict validator:^\+[1-9]\d{7,14}$. Reject anything else.mask_e164(e164: str) -> str— returns"+" + "*" * (len-4) + last4.reloadable_singletonfactories that source keys fromsettings.mfa.email_otp_enabled.phone_ciphertext: Mapped[bytes | None](LargeBinary)phone_last4: Mapped[str | None](String(4))phone_lookup_hash: Mapped[str | None](String(64))phone_verified: Mapped[bool](default false)phone_verified_at: Mapped[datetime | None]mfa_enabled: Mapped[bool](default false)mfa_primary_method: Mapped[str | None](String(16), CHECK in('sms',))recovery_codes: Mapped[list[UserRecoveryCode]].down_revision = "20260416_0013".userswith safe defaults.email_otp_enabledfromusers.user_recovery_codestable.CREATE UNIQUE INDEX uq_users_phone_lookup_hash ON users (phone_lookup_hash) WHERE deleted_at IS NULL AND phone_lookup_hash IS NOT NULL;downgrade()reverses cleanly (re-addsemail_otp_enabledasfalse, drops new columns and table).email_otp_enabledacross the repo:email_otp_enabled→mfa_enabled, argument name, and docstring.create_login_session._issue_token_pairandcreate_login_session._issue_token_pair.email_otp_enabled.email_otp_enabled; addmfa_enabledwhere appropriate.Acceptance:
alembic upgrade headthenalembic downgrade -1round-trips cleanly against a fresh Postgres.grep -R "email_otp_enabled" app tests migrationsreturns zero matches.Release 2 — MFA Core (Challenge Store, OTP Helpers, Recovery Codes)
Purpose: implement the provider-agnostic primitives used by every later release.
Tests first (tests/unit/test_mfa_service.py, tests/unit/test_mfa_recovery_codes.py):
test_challenge_store_roundtrip_hashes_codetest_challenge_is_single_use_after_verifytest_challenge_expiry_removes_redis_statetest_attempt_counter_caps_at_max_and_deletes_challengetest_challenge_jti_bound_to_redis_rowtest_recovery_code_generator_produces_unique_formatted_codestest_recovery_code_hash_uses_keyed_hmactest_recovery_code_verify_is_constant_timetest_recovery_code_regeneration_invalidates_previousImplementation:
generate_sms_otp(length: int) -> str— reusessecrets.randbelow, zero-padded.hash_sms_otp,verify_sms_otp— delegate to the existing app/core/otp.py HMAC machinery (keepcore/otp.pyintact as a low-level primitive, just rename the module's public surface to be method-neutral). Update app/core/otp.py docstring accordingly.generate_recovery_codes(count, length) -> list[str]— Crockford base32,XXXXX-XXXXXformatting.hash_recovery_code(code: str) -> str— keyed HMAC (same key as SMS OTP).verify_recovery_code(raw, stored_hash) -> bool.MfaChallengePurpose = Literal["login", "action", "phone_verify"]MfaMethod = Literal["sms"](extensible)ChallengeStatedataclass:user_id, purpose, method, code_hash, attempt_count, audience, created_at, jti.MfaChallengeStore(Redis-backed):store(user_id, purpose, method, *, jti, code, audience, ttl) -> Noneload(user_id, purpose) -> ChallengeState | Noneincrement_attempts(user_id, purpose) -> intdelete(user_id, purpose) -> Noneassert_jti_matches(state, claimed_jti) -> Nonemfa:challenge:{purpose}:{user_id}.OTPServiceError-shaped contract (reuse its pattern but rename the class toMfaServiceError)._persist_recovery_codes(db_session, user_id, hashes)— bulk insert inside an existing outer transaction._consume_recovery_code(db_session, user_id, raw_code) -> bool—SELECT ... FOR UPDATEover unused codes, verify, markused_at._invalidate_recovery_codes(db_session, user_id)— bulk setused_at=now()whereused_at IS NULL.Acceptance:
Release 3 — SMS Provider Abstraction
Purpose: concrete
SmsSenderand dev-onlyLocalSmsSender, production guard enforced by factory.Tests first (tests/unit/test_sms_local_sender.py):
test_local_sender_persists_message_with_ttltest_local_sender_masks_code_when_requested_for_loggingtest_local_sender_retains_most_recent_n_messages_per_phonetest_factory_raises_when_provider_local_in_productiontest_factory_returns_local_sender_in_developmenttest_factory_raises_notimplemented_for_twilio_or_sns_in_v1Implementation:
LocalSmsSenderwrites to Redis listsms:local:{phone}withLPUSH+LTRIM 0 9+EXPIRE.{sent_at, purpose, code, expires_in}.{"to_last4": ..., "purpose": ..., "expires_in": ...}.get_sms_sender()@reloadable_singleton:settings.app.environment == "production"andsettings.mfa.sms.provider == "local"→ raise (defense in depth; config guard already catches this at startup).provider == "local"→ returnLocalSmsSender.NotImplementedError("SMS provider 'twilio' not implemented in v1").async def pop_local_sms(redis, phone_e164) -> dict.Acceptance:
Release 4 —
MFAServiceCore APIPurpose: translate the old OTP service into a method-aware service. No router wiring yet.
Tests first (tests/unit/test_mfa_service.py, extended):
test_request_phone_verification_rejects_invalid_e164test_request_phone_verification_rejects_already_claimed_phonetest_request_phone_verification_sends_sms_and_stores_challengetest_verify_phone_marks_verified_and_stores_encryptedtest_verify_phone_fails_on_wrong_code_and_increments_attemptstest_enable_mfa_requires_verified_phonetest_enable_mfa_sets_primary_method_and_returns_recovery_codes_oncetest_disable_mfa_requires_step_up_and_clears_challengestest_start_login_challenge_issues_jwt_bound_to_redis_jtitest_verify_login_with_sms_code_returns_tokens_and_deletes_challengetest_verify_login_with_recovery_code_succeeds_and_marks_code_usedtest_verify_login_rejects_reused_challenge_tokentest_verify_login_rejects_reused_recovery_codetest_challenge_preserves_requested_audience_for_loginImplementation — app/services/mfa_service.py:
request_phone_verification(db, user_id, raw_phone) -> PhoneVerificationRequestResultverify_phone(db, user_id, code) -> Noneenable_mfa(db, user_id, action_token) -> EnableMfaResult(returns plaintext recovery codes; this is the only code path that ever returns them)disable_mfa(db, user_id, action_token) -> None(requires valid action token; clears challenge + recovery codes marked used)regenerate_recovery_codes(db, user_id, action_token) -> list[str]start_login_challenge(db, user, *, requested_audience) -> LoginChallengeResultverify_login(db, challenge_token, *, code=None, recovery_code=None, client_ip, user_agent) -> LoginVerificationResultresend_login_code(db, challenge_token) -> Nonerequest_action_code(db, user_id, action) -> ActionRequestResultverify_action_code(db, user_id, code, action, *, audience) -> ActionVerificationResultrecord_failed_otp_attempton SMS and recovery-code failures identically.jtiper challenge.jtiin the Redis challenge row.assert_jti_matches(state, claims["jti"]); delete Redis row atomically on success/attempt-exhausted.mfa.rate_limits.sms_request_per_hour_per_useron phone-verify issuance and login challenges (combined with existing_ensure_issuance_not_blocked).client_ipparameter torequest_phone_verificationandstart_login_challenge.MfaServiceErrorwith stablecodestrings:phone_invalid,phone_unavailable,phone_not_verified,mfa_not_enabled,mfa_already_enabled,challenge_expired,invalid_code,invalid_recovery_code,challenge_reused,rate_limited,action_token_invalid,session_backend_unavailable.Acceptance:
Release 5 — New MFA Router, Password Login MFA Path
Purpose: expose the new MFA endpoints and wire password login.
Tests first:
test_request_phone_requires_authenticationtest_request_phone_accepts_valid_e164_and_returns_expires_intest_enable_requires_step_up_or_reauthtest_enable_returns_recovery_codes_once_and_never_againtest_disable_requires_action_tokentest_regenerate_recovery_codes_invalidates_previoustest_password_login_without_mfa_returns_tokens_unchangedtest_password_login_with_mfa_returns_challenge_not_tokenstest_password_login_with_mfa_verify_issues_tokens_and_creates_sessiontest_password_login_with_mfa_verify_accepts_recovery_codetest_password_login_with_mfa_resend_respects_limittest_challenge_token_is_single_usetest_csrf_enforced_for_cookie_transportImplementation:
MfaChallengeResponse(note generic name;mfa_required: True,challenge_token,method,masked_destination,expires_in).VerifyLoginMfaRequest(one ofcodeorrecovery_code; Pydanticmodel_validatorenforces exactly one).PhoneEnrollmentRequest,PhoneVerifyRequest,EnableMfaResponse(includesrecovery_codes: list[str]),RegenerateRecoveryCodesResponse.RequestActionMfaRequest,VerifyActionMfaRequest,VerifyActionMfaResponse,MfaEnrollmentResponse.POST /auth/mfa/phone/requestPOST /auth/mfa/phone/verifyPOST /auth/mfa/enablePOST /auth/mfa/disablePOST /auth/mfa/recovery-codes/regeneratePOST /auth/mfa/verify/loginPOST /auth/mfa/resend/loginPOST /auth/mfa/request/actionPOST /auth/mfa/verify/actionotp.routerwithmfa.router.email_otp_enabledbranch withuser.mfa_enabledbranch.mfa_service.start_login_challenge(...)and returnMfaChallengeResponse.mfa.login.requiredandmfa.sms.sent.Acceptance:
Release 6 — OAuth and SAML MFA Branching
Tests first (tests/integration/test_mfa_oauth_login_real.py, tests/integration/test_mfa_saml_login_real.py):
test_oauth_callback_without_mfa_preserves_current_contracttest_oauth_callback_with_mfa_returns_challenge_and_no_tokenstest_oauth_mfa_verify_preserves_requested_audiencetest_oauth_mfa_new_user_defaults_to_mfa_disabledtest_idp_amr_acr_no_longer_gates_session_issuance(negative: IdP-provided MFA evidence alone is insufficient)Implementation:
Nonewhen MFA is disabled.complete_google_callback:OAuthCallbackCompletionvariant (addmfa_challenge: LoginChallengeResult | None = Nonefield).MfaChallengeResponse(preserve redirect semantics: for browser-session transport, issue a short-lived non-auth CSRF cookie + return JSON; this path must NOT set access/refresh cookies).amr/acr/SAMLAuthnContextClassRefas a substitute for local MFA. If these exist only in audit metadata, leave them; they must not influence token/session issuance.Acceptance:
Release 7 — Sensitive Actions and Admin Hardening
Purpose: route admin/sensitive action OTP through the same MFA service; verify that admin gates read
mfa_enabledfrom the DB, not from stale token claims.Tests first (tests/integration/test_mfa_enrollment_real.py, tests/unit/test_admin_service.py extensions):
test_admin_gate_reads_db_not_token_claimstest_stale_access_token_with_old_mfa_claim_does_not_bypass_current_statetest_action_token_single_use_preservedtest_recovery_code_for_admin_step_up_is_auditedImplementation:
claims["email_otp_enabled"](now renamed tomfa_enabled) that is used to decide whether MFA is required must be replaced by a fresh DB read.X-Action-Tokenunchanged. Action tokens retain: user-bound, action-bound, short-lived, signed. Add single-use enforcement if not already present:jtiin Redis with TTL =mfa.action_token_ttl_seconds.jtiis already consumed.mfa.recovery_code.used+admin.step_up.recovery_codeaudit events.Acceptance:
Release 8 — Observability, Rate Limits, Documentation
Purpose: operational readiness and docs.
Tests first (tests/integration/test_mfa_rate_limits_real.py):
test_sms_request_rate_limit_per_user_per_hourtest_sms_request_rate_limit_per_ip_per_hourtest_sms_resend_limit_per_challengetest_recovery_code_rate_limit_per_usertest_audit_events_contain_no_otp_or_recovery_code_valuestest_error_codes_are_stable_across_abuse_pathsImplementation:
mfa.sms.sent,mfa.login.required,mfa.login.verified,mfa.phone.verified,mfa.enabled,mfa.disabled,mfa.recovery_code.used,mfa.recovery_codes.regenerated,mfa.failed,mfa.expired,mfa.excessive_failures.user_id,method,purpose,masked_destination— never raw code or recovery code. Enforced by a dedicated unit test that greps audit records.mfa:rl:user:{id}:{bucket},mfa:rl:ip:{ip}:{bucket}) with hourly buckets.EXPIREon first increment.rate_limitedwithRetry-Afterheader.MFA__*andSMS__*env vars.Acceptance:
Final Release — Production Gate
A single commit that flips version and adds the release-gate checklist. No code changes except version bump and any last-mile doc touch-ups.
Release gate (all must pass before tag):
pytest -q— full unit + integration suite.alembic upgrade headon fresh database, thenalembic downgrade -1round-trip.ruff check .+ruff format --check .+mypy app— clean.APP__ENVIRONMENT=productionwithMFA__SMS__PROVIDER=local→ startup fails with a specific, actionable error message.MFA__PHONE_ENCRYPTION_KEY→ startup fails.Final behavior (mirrors source plan):
SmsSenderProtocol +MfaMethodLiteral are the two extension points for future providers (Twilio/SNS) and methods (TOTP/WebAuthn).4. Non-Negotiables (Secure Defaults)
phone_lookup_hashfor uniqueness/indexing.jti-bound Redis state; deletion happens on success, attempt exhaustion, and expiry.used_at IS NOT NULLcheck inside aSELECT ... FOR UPDATErow).5. Audit Event Catalog
mfa.sms.sentpurpose,method="sms",masked_destinationmfa.phone.verifiedmfa.enabledprimary_methodmfa.disabledmfa.login.requiredprovidermfa.login.verifiedprovider,method_used(sms|recovery_code)mfa.recovery_code.usedcontextmfa.recovery_codes.regeneratedcountmfa.failedpurpose,reason_codemfa.expiredpurposemfa.excessive_failurespurpose6. Implementation Order (Copy-Paste Checklist)
email_otp_enabledEach release ends with: all relevant tests green, full suite green, no dead code, docs updated.