Skip to content

Commit 2a2eb0f

Browse files
pavanputhraclaude
andcommitted
Port main branch updates to refactored structure
Backport 13 commits from main into the refactored api/common/conserver layout, adapting all file paths and imports from the old server/ structure. Changes ported: - diet link: S3 storage option for dialog bodies with presigned URLs, log redaction for sensitive options (aws_secret_access_key, etc.) - wtf_transcribe link: new link for vfun GPU transcription server, produces WTF-format analysis entries - webhook storage: new post-chain storage backend (parallel to existing conserver/links/webhook chain link) - api: /stats/queue public endpoint for Redis list depth monitoring - api: remove default VCON_REDIS_EXPIRY TTL on ingest; retention now controlled by storage backends - api: index_vcon_parties() extracts party indexing from index_vcon() to avoid redundant Redis reads on ingest path - tests: updated to reflect no-default-TTL and index_vcon_parties changes Also fixes: - pytest.ini pythonpath updated from old 'server' to 'common conserver api' - pyproject.toml dev group: add httpx (required by fastapi TestClient) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent a308ac7 commit 2a2eb0f

11 files changed

Lines changed: 1053 additions & 193 deletions

File tree

api/api.py

Lines changed: 49 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ async def version_endpoint() -> JSONResponse:
250250
)
251251
async def health_check() -> JSONResponse:
252252
"""Health check endpoint.
253-
253+
254254
Returns:
255255
JSONResponse with status and version info
256256
"""
@@ -260,6 +260,24 @@ async def health_check() -> JSONResponse:
260260
})
261261

262262

263+
@app.get(
264+
"/stats/queue",
265+
summary="Get queue depth",
266+
description="Returns the number of items in a Redis list (queue)",
267+
tags=["system"],
268+
)
269+
async def get_queue_depth(
270+
list_name: str = Query(..., description="Name of the Redis list to measure")
271+
) -> JSONResponse:
272+
"""Get the current depth of a Redis list. Public endpoint (no auth) for monitoring and backpressure."""
273+
try:
274+
depth = await redis_async.llen(list_name)
275+
return JSONResponse(content={"list_name": list_name, "depth": depth})
276+
except Exception as e:
277+
logger.error(f"Error getting queue depth for '{list_name}': {str(e)}")
278+
raise HTTPException(status_code=500, detail="Failed to get queue depth")
279+
280+
263281
class Vcon(BaseModel):
264282
"""Pydantic model representing a vCon (Voice Conversation) record.
265283
@@ -631,11 +649,6 @@ async def post_vcon(
631649
Stores the vCon in Redis and indexes it for searching. The vCon is added to a sorted
632650
set for timestamp-based retrieval and indexed by party information for searching.
633651
Optionally adds the vCon UUID to specified ingress lists for immediate processing.
634-
635-
The vCon is stored with a default TTL of VCON_REDIS_EXPIRY seconds (default 3600s/1 hour).
636-
This means vCons will automatically expire from Redis cache unless persisted to a
637-
storage backend or the expiry is updated. Configure VCON_REDIS_EXPIRY environment
638-
variable to change the default expiry time.
639652
640653
Args:
641654
inbound_vcon: The vCon to store
@@ -659,16 +672,12 @@ async def post_vcon(
659672

660673
logger.debug(f"Storing vCon {inbound_vcon.uuid} ({len(dict_vcon)} bytes)")
661674
await redis_async.json().set(key, "$", dict_vcon)
662-
663-
# Set default expiry on newly created vCons
664-
await redis_async.expire(key, VCON_REDIS_EXPIRY)
665-
logger.debug(f"Set TTL of {VCON_REDIS_EXPIRY}s on vCon {inbound_vcon.uuid}")
666-
675+
667676
logger.debug(f"Adding vCon {inbound_vcon.uuid} to sorted set")
668677
await add_vcon_to_set(key, timestamp)
669678

670679
logger.debug(f"Indexing vCon {inbound_vcon.uuid}")
671-
await index_vcon(inbound_vcon.uuid)
680+
await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"])
672681

673682
# Add to ingress lists if specified
674683
if ingress_lists:
@@ -720,9 +729,7 @@ async def external_ingress_vcon(
720729
- Multiple API keys can be configured for the same ingress list
721730
722731
The submitted vCon is stored, indexed, and automatically queued for processing
723-
in the specified ingress list. The vCon is stored with a default TTL of
724-
VCON_REDIS_EXPIRY seconds (default 3600s/1 hour), after which it will expire
725-
from Redis cache unless persisted to a storage backend.
732+
in the specified ingress list.
726733
727734
Args:
728735
request: FastAPI Request object for accessing headers
@@ -760,16 +767,12 @@ async def external_ingress_vcon(
760767
f"Storing vCon {inbound_vcon.uuid} ({len(dict_vcon)} bytes) via external ingress"
761768
)
762769
await redis_async.json().set(key, "$", dict_vcon)
763-
764-
# Set default expiry on newly created vCons
765-
await redis_async.expire(key, VCON_REDIS_EXPIRY)
766-
logger.debug(f"Set TTL of {VCON_REDIS_EXPIRY}s on vCon {inbound_vcon.uuid}")
767770

768771
logger.debug(f"Adding vCon {inbound_vcon.uuid} to sorted set")
769772
await add_vcon_to_set(key, timestamp)
770773

771774
logger.debug(f"Indexing vCon {inbound_vcon.uuid}")
772-
await index_vcon(inbound_vcon.uuid)
775+
await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"])
773776

774777
# Always add to the specified ingress list (required for this endpoint)
775778
vcon_uuid_str = str(inbound_vcon.uuid)
@@ -1057,25 +1060,17 @@ async def get_dlq_vcons(
10571060
raise HTTPException(status_code=500, detail="Failed to read DLQ")
10581061

10591062

1060-
async def index_vcon(uuid: UUID) -> None:
1061-
"""Index a vCon for searching.
1063+
async def index_vcon_parties(vcon_uuid: str, parties: list) -> None:
1064+
"""Index a vCon's parties for searching.
10621065
1063-
Adds the vCon to the sorted set and indexes it by party information
1064-
(tel, mailto, name) for searching. All indexed keys will expire after
1065-
VCON_INDEX_EXPIRY seconds.
1066+
Indexes by party information (tel, mailto, name). All indexed keys
1067+
will expire after VCON_INDEX_EXPIRY seconds.
10661068
10671069
Args:
1068-
uuid: UUID of the vCon to index
1070+
vcon_uuid: UUID string of the vCon
1071+
parties: List of party dicts from the vCon
10691072
"""
1070-
key = f"vcon:{uuid}"
1071-
vcon = await redis_async.json().get(key)
1072-
created_at = datetime.fromisoformat(vcon["created_at"])
1073-
timestamp = int(created_at.timestamp())
1074-
vcon_uuid = vcon["uuid"]
1075-
await add_vcon_to_set(key, timestamp)
1076-
1077-
# Index by party information with expiration
1078-
for party in vcon["parties"]:
1073+
for party in parties:
10791074
if party.get("tel"):
10801075
tel_key = f"tel:{party['tel']}"
10811076
await redis_async.sadd(tel_key, vcon_uuid)
@@ -1090,6 +1085,25 @@ async def index_vcon(uuid: UUID) -> None:
10901085
await redis_async.expire(name_key, VCON_INDEX_EXPIRY)
10911086

10921087

1088+
async def index_vcon(uuid: UUID) -> None:
1089+
"""Index a vCon for searching (reads from Redis).
1090+
1091+
Reads the vCon from Redis, adds it to the sorted set, and indexes
1092+
by party information. Used for bulk re-indexing. For the ingest path,
1093+
use index_vcon_parties() directly to avoid redundant Redis reads.
1094+
1095+
Args:
1096+
uuid: UUID of the vCon to index
1097+
"""
1098+
key = f"vcon:{uuid}"
1099+
vcon = await redis_async.json().get(key)
1100+
created_at = datetime.fromisoformat(vcon["created_at"])
1101+
timestamp = int(created_at.timestamp())
1102+
vcon_uuid = vcon["uuid"]
1103+
await add_vcon_to_set(key, timestamp)
1104+
await index_vcon_parties(vcon_uuid, vcon["parties"])
1105+
1106+
10931107
@api_router.get(
10941108
"/index_vcons",
10951109
status_code=200,

common/storage/webhook/__init__.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import time
2+
3+
from lib.vcon_redis import VconRedis
4+
from lib.logging_utils import init_logger
5+
from lib.metrics import record_histogram
6+
7+
import requests
8+
9+
logger = init_logger(__name__)
10+
11+
default_options = {
12+
"webhook-urls": [],
13+
"headers": {},
14+
}
15+
16+
17+
def save(vcon_uuid, opts=default_options):
18+
vcon_redis = VconRedis()
19+
vCon = vcon_redis.get_vcon(vcon_uuid)
20+
21+
json_dict = vCon.to_dict()
22+
23+
if json_dict.get("vcon") == "0.0.1" or "vcon" not in json_dict:
24+
json_dict["vcon"] = "0.3.0"
25+
26+
headers = opts.get("headers", {})
27+
28+
webhook_urls = opts.get("webhook-urls", [])
29+
if not webhook_urls:
30+
logger.warning(
31+
f"webhook storage: no webhook-urls configured for vcon {vcon_uuid}, skipping"
32+
)
33+
return
34+
35+
for url in webhook_urls:
36+
logger.info(
37+
f"webhook storage: posting vcon {vcon_uuid} to webhook url: {url}"
38+
)
39+
webhook_start = time.time()
40+
resp = requests.post(url, json=json_dict, headers=headers)
41+
webhook_duration = round(time.time() - webhook_start, 3)
42+
logger.info(
43+
f"webhook storage response for {vcon_uuid}: {resp.status_code} {resp.text}"
44+
)
45+
record_histogram("conserver.webhook.duration", webhook_duration,
46+
attributes={"status_code": str(resp.status_code)})

common/tests/test_external_ingress.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,21 @@ def test_validate_ingress_api_key_function(self):
3939

4040
@patch("config.Configuration.get_ingress_auth")
4141
@patch("api.add_vcon_to_set")
42-
@patch("api.index_vcon")
42+
@patch("api.index_vcon_parties")
4343
def test_successful_submission_single_api_key(
44-
self, mock_index_vcon, mock_add_vcon_to_set, mock_get_ingress_auth
44+
self, mock_index_vcon_parties, mock_add_vcon_to_set, mock_get_ingress_auth
4545
):
4646
"""Test successful vCon submission with single API key configuration."""
4747
# Configure mocks
4848
mock_get_ingress_auth.return_value = {self.ingress_list: self.valid_api_key}
4949

50-
# Mock Redis client properly
50+
# Mock Redis client properly (sadd/expire used by index_vcon_parties)
5151
mock_redis = MagicMock()
5252
mock_json = MagicMock()
5353
mock_json.set = AsyncMock()
5454
mock_redis.json.return_value = mock_json
5555
mock_redis.expire = AsyncMock()
56+
mock_redis.sadd = AsyncMock()
5657
mock_redis.rpush = AsyncMock()
5758

5859
# Set the global redis_async directly in the api module
@@ -77,35 +78,34 @@ def test_successful_submission_single_api_key(
7778

7879
# Verify Redis operations were called
7980
mock_json.set.assert_called_once()
80-
mock_redis.expire.assert_called_once() # Verify expiry was set
8181
mock_redis.rpush.assert_called_once_with(
8282
self.ingress_list, self.test_vcon["uuid"]
8383
)
8484
mock_add_vcon_to_set.assert_called_once()
85-
mock_index_vcon.assert_called_once()
8685

8786
finally:
8887
# Clean up the global variable
8988
api.redis_async = None
9089

9190
@patch("config.Configuration.get_ingress_auth")
9291
@patch("api.add_vcon_to_set")
93-
@patch("api.index_vcon")
92+
@patch("api.index_vcon_parties")
9493
def test_successful_submission_multiple_api_keys(
95-
self, mock_index_vcon, mock_add_vcon_to_set, mock_get_ingress_auth
94+
self, mock_index_vcon_parties, mock_add_vcon_to_set, mock_get_ingress_auth
9695
):
9796
"""Test successful vCon submission with multiple API keys for same ingress."""
9897
# Configure mocks - multiple API keys for same ingress list
9998
mock_get_ingress_auth.return_value = {
10099
self.ingress_list: ["partner-1-key", self.valid_api_key, "partner-3-key"]
101100
}
102101

103-
# Mock Redis client properly
102+
# Mock Redis client properly (sadd/expire used by index_vcon_parties)
104103
mock_redis = MagicMock()
105104
mock_json = MagicMock()
106105
mock_json.set = AsyncMock()
107106
mock_redis.json.return_value = mock_json
108107
mock_redis.expire = AsyncMock()
108+
mock_redis.sadd = AsyncMock()
109109
mock_redis.rpush = AsyncMock()
110110

111111
# Set the global redis_async directly in the api module
@@ -233,6 +233,7 @@ def test_redis_failure_handling(self, mock_get_ingress_auth):
233233
mock_json.set = AsyncMock(side_effect=Exception("Redis connection failed"))
234234
mock_redis.json.return_value = mock_json
235235
mock_redis.expire = AsyncMock()
236+
mock_redis.sadd = AsyncMock()
236237
mock_redis.rpush = AsyncMock()
237238

238239
# Set the global redis_async directly in the api module
@@ -254,9 +255,9 @@ def test_redis_failure_handling(self, mock_get_ingress_auth):
254255

255256
@patch("config.Configuration.get_ingress_auth")
256257
@patch("api.add_vcon_to_set")
257-
@patch("api.index_vcon")
258+
@patch("api.index_vcon_parties")
258259
def test_multiple_ingress_lists_isolation(
259-
self, mock_index_vcon, mock_add_vcon_to_set, mock_get_ingress_auth
260+
self, mock_index_vcon_parties, mock_add_vcon_to_set, mock_get_ingress_auth
260261
):
261262
"""Test that API keys are properly isolated between ingress lists."""
262263
# Configure different API keys for different ingress lists
@@ -266,12 +267,13 @@ def test_multiple_ingress_lists_isolation(
266267
"shared_ingress": ["partner-a-key", "partner-b-key-1", "shared-key"],
267268
}
268269

269-
# Mock Redis client properly
270+
# Mock Redis client properly (sadd/expire used by index_vcon_parties)
270271
mock_redis = MagicMock()
271272
mock_json = MagicMock()
272273
mock_json.set = AsyncMock()
273274
mock_redis.json.return_value = mock_json
274275
mock_redis.expire = AsyncMock()
276+
mock_redis.sadd = AsyncMock()
275277
mock_redis.rpush = AsyncMock()
276278

277279
# Set the global redis_async directly in the api module

0 commit comments

Comments
 (0)