Guide to deploying and scaling Valkey RAG cache in production.
┌─────────────────────────────────────────────────────────────────┐
│ Production Valkey Cluster │
│ (3 Shards, 2 Replicas Each) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Shard 1 Shard 2 Shard 3 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Primary │ │ Primary │ │ Primary │ │
│ │ (Write) │ │ (Write) │ │ (Write) │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ┌───┴───┐ ┌───┴───┐ ┌───┴───┐ │
│ ▼ ▼ ▼ ▼ ▼ ▼ │
│ Replica Replica Replica Replica Replica Replica │
│ (Read) (Read) (Read) (Read) (Read) (Read) │
│ │
└─────────────────────────────────────────────────────────────────┘
Memory = num_vectors × dimensions × 4 bytes × overhead_factor
Where overhead_factor:
FLAT index: 1.1
HNSW index: 1.5-2.0 (depends on M parameter)
| Vectors | Dimensions | Index | Memory/Shard |
|---|---|---|---|
| 100K | 1536 | HNSW | ~1 GB |
| 500K | 1536 | HNSW | ~5 GB |
| 1M | 1536 | HNSW | ~10 GB |
| 5M | 1536 | HNSW | ~50 GB |
| 10M | 1536 | HNSW | ~100 GB |
# valkey.conf for production
maxmemory 32gb
maxmemory-policy volatile-lru
# Memory efficiency
activedefrag yes
active-defrag-ignore-bytes 100mb
active-defrag-threshold-lower 10
active-defrag-threshold-upper 25aws elasticache create-replication-group \
--replication-group-id rag-cache-prod \
--replication-group-description "RAG Cache Production" \
--engine valkey \
--engine-version 7.2 \
--cache-node-type cache.r7g.xlarge \
--num-node-groups 3 \
--replicas-per-node-group 2 \
--automatic-failover-enabled \
--multi-az-enabled \
--transit-encryption-enabled \
--at-rest-encryption-enabledAWSTemplateFormatVersion: '2010-09-09'
Description: Valkey RAG Cache Cluster
Parameters:
Environment:
Type: String
Default: production
Resources:
ValkeySubnetGroup:
Type: AWS::ElastiCache::SubnetGroup
Properties:
Description: Subnet group for Valkey
SubnetIds:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
- !Ref PrivateSubnet3
ValkeySecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for Valkey
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 6379
ToPort: 6379
SourceSecurityGroupId: !Ref AppSecurityGroup
ValkeyCluster:
Type: AWS::ElastiCache::ReplicationGroup
Properties:
ReplicationGroupDescription: RAG Cache Cluster
Engine: valkey
EngineVersion: '7.2'
CacheNodeType: cache.r7g.xlarge
NumNodeGroups: 3
ReplicasPerNodeGroup: 2
AutomaticFailoverEnabled: true
MultiAZEnabled: true
TransitEncryptionEnabled: true
AtRestEncryptionEnabled: true
CacheSubnetGroupName: !Ref ValkeySubnetGroup
SecurityGroupIds:
- !Ref ValkeySecurityGroup# values.yaml for valkey helm chart
architecture: replication
master:
resources:
requests:
memory: 8Gi
cpu: 2
limits:
memory: 16Gi
cpu: 4
persistence:
enabled: true
size: 50Gi
storageClass: gp3
replica:
replicaCount: 2
resources:
requests:
memory: 8Gi
cpu: 2
persistence:
enabled: true
size: 50Gi
sentinel:
enabled: true
quorum: 2
metrics:
enabled: true
serviceMonitor:
enabled: trueapiVersion: apps/v1
kind: StatefulSet
metadata:
name: valkey
spec:
serviceName: valkey
replicas: 3
selector:
matchLabels:
app: valkey
template:
metadata:
labels:
app: valkey
spec:
containers:
- name: valkey
image: valkey/valkey:8.0
ports:
- containerPort: 6379
resources:
requests:
memory: "8Gi"
cpu: "2"
limits:
memory: "16Gi"
cpu: "4"
volumeMounts:
- name: data
mountPath: /data
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: gp3
resources:
requests:
storage: 50Giimport valkey
from valkey import ConnectionPool
# Create connection pool
pool = ConnectionPool(
host='valkey.example.com',
port=6379,
max_connections=50,
socket_timeout=5.0,
socket_connect_timeout=2.0,
retry_on_timeout=True,
health_check_interval=30,
)
# Use pool
client = valkey.Valkey(connection_pool=pool)import { createClient } from '@redis/client';
const client = createClient({
url: 'redis://valkey.example.com:6379',
socket: {
connectTimeout: 5000,
keepAlive: 30000,
},
commandsQueueMaxLength: 1000,
});
// Enable auto-reconnect
client.on('error', (err) => console.error('Valkey error:', err));
client.on('reconnecting', () => console.log('Reconnecting...'));# Route reads to replicas
class HAValkeyClient:
def __init__(self, primary_host, replica_hosts):
self.primary = valkey.Valkey(host=primary_host)
self.replicas = [valkey.Valkey(host=h) for h in replica_hosts]
self._replica_index = 0
def _get_replica(self):
replica = self.replicas[self._replica_index]
self._replica_index = (self._replica_index + 1) % len(self.replicas)
return replica
async def read(self, key):
"""Read from replica."""
return self._get_replica().get(key)
async def write(self, key, value):
"""Write to primary."""
return self.primary.set(key, value)
async def search(self, query):
"""Search from replica."""
return self._get_replica().ft(index_name).search(query)import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure = 0
self.state = CircuitState.CLOSED
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failures = 0
return result
except Exception as e:
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
raise e# Shard index across multiple keys using hash tags
def get_shard_key(doc_id: str, num_shards: int = 16) -> str:
shard = hash(doc_id) % num_shards
return f"{{shard{shard}}}:doc:{doc_id}"
# All keys with same {shardN} go to same slotasync def batch_store(items: list[tuple], batch_size=100):
"""Store items using pipelining."""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
pipe = client.pipeline()
for key, value, embedding in batch:
pipe.hset(key, mapping={"value": value, "embedding": embedding})
await pipe.execute()# Daily RDB snapshots
save 86400 1
# Append-only file for point-in-time recovery
appendonly yes
appendfsync everysec
# Backup script
valkey-cli BGSAVE
aws s3 cp /data/dump.rdb s3://backups/valkey/$(date +%Y%m%d).rdb# Use Global Datastore for cross-region
aws elasticache create-global-replication-group \
--global-replication-group-id rag-cache-global \
--primary-replication-group-id rag-cache-us-east-1