Skip to content

Commit d31c00b

Browse files
authored
Merge pull request #9 from co-cddo/s3_vectors
S3_vectors
2 parents 8f6aeb0 + d3bd297 commit d31c00b

13 files changed

Lines changed: 2697 additions & 470 deletions

File tree

pyproject.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
[project]
22
name = "gds-idea-cdk-constructs"
3-
version = "0.3.0"
3+
version = "0.4.0"
44
description = "A repo for commonly used constructs in the team."
55
readme = "README.md"
66
authors = [
7-
{ name = "David Gillespie", email = "david.gillespie@digital.cabinet-office.gov.uk" }
7+
{ name = "David Gillespie", email = "david.gillespie@digital.cabinet-office.gov.uk" },
8+
{ name = "Jose Orjales", email = "jose.orjales@digital.cabinet-office.gov.uk" },
89
]
910
requires-python = ">=3.11"
1011
dependencies = [
11-
"aws-cdk-lib>=2.180.0",
12+
"aws-cdk-lib>=2.243.0",
1213
"boto3>=1.35.0",
1314
]
1415

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
"""Reusable CDK construct for Bedrock Knowledge Bases."""
2+
3+
from .props import (
4+
ChunkingConfig,
5+
ChunkingStrategy,
6+
EmbeddingModel,
7+
KnowledgeBaseProps,
8+
StorageType,
9+
)
10+
from .stack import KnowledgeBase
11+
12+
__all__ = [
13+
"KnowledgeBase",
14+
"KnowledgeBaseProps",
15+
"ChunkingConfig",
16+
"ChunkingStrategy",
17+
"StorageType",
18+
"EmbeddingModel",
19+
]
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
"""Storage strategy pattern for knowledge base vector backends
2+
3+
This module defines the ``IStorageStrategy`` ABC and concrete implementations
4+
for each supported :class:`~.props.StorageType`. The stack selects a strategy
5+
via :data:`STORAGE_STRATEGY_MAP` — consumers never interact with strategy
6+
classes directly.
7+
8+
To add a new storage backend:
9+
1. Implement :class:`IStorageStrategy`.
10+
2. Add a :class:`~.props.StorageType` enum member.
11+
3. Register the class in :data:`STORAGE_STRATEGY_MAP`.
12+
"""
13+
14+
from abc import ABC, abstractmethod
15+
16+
from aws_cdk import (
17+
CfnResource,
18+
RemovalPolicy,
19+
aws_bedrock as bedrock,
20+
aws_iam as iam,
21+
aws_s3vectors as s3vectors,
22+
)
23+
from constructs import Construct
24+
25+
from .props import KnowledgeBaseProps, StorageType
26+
27+
28+
class IStorageStrategy(ABC):
29+
"""Interface for knowledge base vector storage backends.
30+
31+
Each implementation creates the backend-specific infrastructure (e.g.,
32+
S3 Vector Bucket + Index) and exposes the information the Bedrock
33+
``CfnKnowledgeBase`` needs to reference it.
34+
35+
Args:
36+
scope: The CDK construct scope.
37+
kb_props: Knowledge base configuration properties.
38+
"""
39+
40+
def __init__(self, scope: Construct, kb_props: KnowledgeBaseProps) -> None:
41+
self.scope = scope
42+
self.kb_props = kb_props
43+
44+
@abstractmethod
45+
def create_storage_resources(
46+
self, app_prefix: str, env_name: str, retain: bool
47+
) -> None:
48+
"""Provision backend-specific storage resources.
49+
50+
Args:
51+
app_prefix: Naming prefix derived from the application name.
52+
env_name: Deployment environment name (e.g., ``"development"``).
53+
retain: Whether to apply RETAIN removal policy.
54+
"""
55+
56+
@abstractmethod
57+
def get_storage_configuration(
58+
self,
59+
) -> bedrock.CfnKnowledgeBase.StorageConfigurationProperty:
60+
"""Return the Bedrock storage configuration property.
61+
62+
Returns:
63+
A ``StorageConfigurationProperty`` that tells Bedrock where
64+
to store and retrieve vectors.
65+
"""
66+
67+
@abstractmethod
68+
def get_iam_policy_statements(self) -> list[iam.PolicyStatement]:
69+
"""Return IAM policy statements the KB service role needs.
70+
71+
Returns:
72+
A list of ``PolicyStatement`` objects granting the KB role
73+
access to the storage backend.
74+
"""
75+
76+
@abstractmethod
77+
def get_resource_dependencies(self) -> list[CfnResource]:
78+
"""Return CloudFormation resources the KB must depend on.
79+
80+
Returns:
81+
A list of ``CfnResource`` objects that must be created before
82+
the Knowledge Base.
83+
"""
84+
85+
86+
class S3VectorsStorageStrategy(IStorageStrategy):
87+
"""S3 Vectors storage backend — serverless vector storage built on S3.
88+
89+
Creates:
90+
- ``CfnVectorBucket`` for vector storage
91+
- ``CfnIndex`` for the vector index (float32, configurable dimensions
92+
and distance metric)
93+
"""
94+
95+
def __init__(self, scope: Construct, kb_props: KnowledgeBaseProps) -> None:
96+
super().__init__(scope, kb_props)
97+
self._vector_bucket: s3vectors.CfnVectorBucket | None = None
98+
self._vector_index: s3vectors.CfnIndex | None = None
99+
100+
def create_storage_resources(
101+
self, app_prefix: str, env_name: str, retain: bool
102+
) -> None:
103+
"""Create S3 Vector Bucket and Index.
104+
105+
Args:
106+
app_prefix: Naming prefix derived from the application name.
107+
env_name: Deployment environment name.
108+
retain: Whether to apply RETAIN removal policy.
109+
"""
110+
removal_policy = RemovalPolicy.RETAIN if retain else RemovalPolicy.DESTROY
111+
112+
self._vector_bucket = s3vectors.CfnVectorBucket(
113+
self.scope,
114+
"VectorBucket",
115+
vector_bucket_name=f"{app_prefix}-vectors-{env_name}",
116+
)
117+
self._vector_bucket.apply_removal_policy(removal_policy)
118+
119+
self._vector_index = s3vectors.CfnIndex(
120+
self.scope,
121+
"VectorIndex",
122+
index_name=f"{app_prefix}-index-{env_name}",
123+
vector_bucket_name=self._vector_bucket.vector_bucket_name,
124+
data_type="float32",
125+
dimension=self.kb_props.resolved_embedding_dimensions(),
126+
distance_metric=self.kb_props.distance_metric,
127+
metadata_configuration=s3vectors.CfnIndex.MetadataConfigurationProperty(
128+
non_filterable_metadata_keys=["content"],
129+
),
130+
)
131+
self._vector_index.add_dependency(self._vector_bucket)
132+
133+
def get_storage_configuration(
134+
self,
135+
) -> bedrock.CfnKnowledgeBase.StorageConfigurationProperty:
136+
"""Return S3 Vectors storage configuration for Bedrock.
137+
138+
Returns:
139+
A ``StorageConfigurationProperty`` pointing to the S3 Vector
140+
Bucket and Index.
141+
142+
Raises:
143+
RuntimeError: If called before ``create_storage_resources``.
144+
"""
145+
if self._vector_bucket is None or self._vector_index is None:
146+
raise RuntimeError(
147+
"create_storage_resources() must be called before "
148+
"get_storage_configuration()"
149+
)
150+
151+
return bedrock.CfnKnowledgeBase.StorageConfigurationProperty(
152+
type="S3_VECTORS",
153+
s3_vectors_configuration=bedrock.CfnKnowledgeBase.S3VectorsConfigurationProperty(
154+
vector_bucket_arn=self._vector_bucket.attr_vector_bucket_arn,
155+
index_name=self._vector_index.index_name,
156+
),
157+
)
158+
159+
def get_iam_policy_statements(self) -> list[iam.PolicyStatement]:
160+
"""Return S3 Vectors IAM permissions for the KB service role.
161+
162+
Returns:
163+
A list containing a single ``PolicyStatement`` granting
164+
read/write access to the vector bucket and index.
165+
166+
Raises:
167+
RuntimeError: If called before ``create_storage_resources``.
168+
"""
169+
if self._vector_bucket is None or self._vector_index is None:
170+
raise RuntimeError(
171+
"create_storage_resources() must be called before "
172+
"get_iam_policy_statements()"
173+
)
174+
175+
return [
176+
iam.PolicyStatement(
177+
sid="S3VectorsAccess",
178+
actions=[
179+
"s3vectors:CreateIndex",
180+
"s3vectors:GetIndex",
181+
"s3vectors:PutVectors",
182+
"s3vectors:GetVectors",
183+
"s3vectors:DeleteVectors",
184+
"s3vectors:QueryVectors",
185+
"s3vectors:ListVectors",
186+
"s3vectors:GetVectorBucket",
187+
"s3vectors:ListVectorBuckets",
188+
],
189+
resources=[
190+
self._vector_bucket.attr_vector_bucket_arn,
191+
self._vector_index.attr_index_arn,
192+
],
193+
)
194+
]
195+
196+
def get_resource_dependencies(self) -> list[CfnResource]:
197+
"""Return the CfnIndex as a dependency for the Knowledge Base.
198+
199+
Returns:
200+
A list containing the ``CfnIndex`` (which transitively depends
201+
on the ``CfnVectorBucket``).
202+
203+
Raises:
204+
RuntimeError: If called before ``create_storage_resources``.
205+
"""
206+
if self._vector_index is None:
207+
raise RuntimeError(
208+
"create_storage_resources() must be called before "
209+
"get_resource_dependencies()"
210+
)
211+
212+
return [self._vector_index]
213+
214+
215+
STORAGE_STRATEGY_MAP: dict[StorageType, type[IStorageStrategy]] = {
216+
StorageType.S3_VECTORS: S3VectorsStorageStrategy,
217+
}

src/gds_idea_cdk_constructs/knowledge_base/lambda_handlers/__init__.py

Whitespace-only changes.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""SQS-triggered Lambda handler for Bedrock Knowledge Base sync
2+
3+
Invoked by an SQS queue that receives S3 event notifications. The queue
4+
is configured with a batching window (default 20 minutes) so that many
5+
S3 object events are collected into a single Lambda invocation. This
6+
handler then fires **one** ``StartIngestionJob`` call regardless of how
7+
many files changed, avoiding API throttling.
8+
9+
Environment variables:
10+
KNOWLEDGE_BASE_ID: Bedrock Knowledge Base ID.
11+
DATA_SOURCE_ID: Bedrock Data Source ID.
12+
"""
13+
14+
import os
15+
16+
import boto3
17+
18+
KNOWLEDGE_BASE_ID = os.environ.get("KNOWLEDGE_BASE_ID", "")
19+
DATA_SOURCE_ID = os.environ.get("DATA_SOURCE_ID", "")
20+
21+
22+
def handler(event, context):
23+
"""Start a Bedrock KB ingestion job.
24+
25+
The SQS batch may contain many S3 event records — we intentionally
26+
ignore the individual records and trigger a full data-source sync.
27+
Bedrock handles incremental ingestion internally (only new/changed
28+
files are re-embedded).
29+
"""
30+
if not KNOWLEDGE_BASE_ID or not DATA_SOURCE_ID:
31+
print(
32+
"ERROR: KNOWLEDGE_BASE_ID and DATA_SOURCE_ID must be set. "
33+
f"KNOWLEDGE_BASE_ID={KNOWLEDGE_BASE_ID!r}, "
34+
f"DATA_SOURCE_ID={DATA_SOURCE_ID!r}"
35+
)
36+
return {"statusCode": 400, "body": "Missing environment variables"}
37+
38+
client = boto3.client("bedrock-agent")
39+
40+
num_records = len(event.get("Records", []))
41+
print(
42+
f"Starting ingestion job for KB={KNOWLEDGE_BASE_ID}, "
43+
f"DataSource={DATA_SOURCE_ID} "
44+
f"(triggered by {num_records} SQS message(s))"
45+
)
46+
47+
try:
48+
response = client.start_ingestion_job(
49+
knowledgeBaseId=KNOWLEDGE_BASE_ID,
50+
dataSourceId=DATA_SOURCE_ID,
51+
)
52+
ingestion_job = response.get("ingestionJob", {})
53+
job_id = ingestion_job.get("ingestionJobId", "unknown")
54+
status = ingestion_job.get("status", "unknown")
55+
print(f"Ingestion job started: jobId={job_id}, status={status}")
56+
57+
return {"statusCode": 200, "body": f"Ingestion job {job_id} started"}
58+
59+
except Exception as e:
60+
# Log but do NOT re-raise — a raised exception causes SQS to retry,
61+
# which would start duplicate ingestion jobs.
62+
print(f"ERROR starting ingestion job: {e}")
63+
return {"statusCode": 500, "body": str(e)}

0 commit comments

Comments
 (0)