This guide explains how to use the AppSync APIs:
reqRAGStore(input: [RAGIN]!): AWSJSON!queryRAGs(qs: AWSJSON!): AWSJSON!
to (1) ingest documents with metadata, and (2) query results intelligently using the global universe + per-product filtering approach.
- Documents live in S3 under a per-user directory.
- Indexing happens in a Fargate worker which produces
chunks.jsonin S3. reqRAGStorewrites a per-user doc registry (doc_registry.json) that maps each document S3 key to metadata (pid/categories/version/etc).queryRAGsreadschunks.json, merges doc-registry metadata into each chunk, then filters bypid/categoriesbefore scoring and (optionally) synthesizing an answer.
Per-user directory:
userDir = <cognito-sub OR sanitized-email>
Per-product docs:
s3://$RAG_BUCKET/<userDir>/<pid>/docs/<filename>
Per-product index:
s3://$RAG_BUCKET/<userDir>/<pid>/index/chunks.json
Global index (all products under a user):
s3://$RAG_BUCKET/<userDir>/global/index/chunks.json
Doc registry (metadata sidecar):
s3://$RAG_BUCKET/<userDir>/doc_registry.json
What it does
- Upserts records in
doc_registry.jsonkeyed bydocKey. - The
docKeyis derived from:<userDir>/<pid>/docs/<file>.
Input
Your current schema includes:
input RAGIN {
categories: String!
fid: ID!
file: String!
format: String!
options: AWSJSON!
pid: ID!
version: String!
}
type Mutation {
reqRAGStore(input: [RAGIN]!): AWSJSON!
}Notes
categoriescan be a single string or a delimiter-separated string (commas/|/;).optionsis stored as parsed JSON when possible.fidcan be any stable ID you use for the file.
Example mutation
mutation RegisterDocs($input: [RAGIN]!) {
reqRAGStore(input: $input)
}Variables example:
{
"input": [
{
"pid": "IC_007",
"fid": "ds-001",
"file": "IC_007_datasheet.pdf",
"format": "pdf",
"version": "revC-2026-01",
"categories": "datasheet",
"options": {"source": "vendor", "lang": "en"}
},
{
"pid": "IC_007",
"fid": "an-003",
"file": "IC_007_appnote_motorcontrol.pdf",
"format": "pdf",
"version": "1.0",
"categories": "app_note|motor_control",
"options": {"app_domain": "motor_control"}
}
]
}Return payload is an AWSJSON string (JSON-encoded), for example:
{"success": true, "count": 2, "docKeys": ["<userDir>/IC_007/docs/IC_007_datasheet.pdf", "<userDir>/IC_007/docs/IC_007_appnote_motorcontrol.pdf"]}What it does
- Loads
chunks.json(prefers per-pidindex, falls back toglobalindex). - Merges doc-registry metadata into
chunk.metadata. - Applies filters (
pid,categories) and scores chunks. - Returns
AWSJSON(JSON-encoded) with{ answer, chunks, query, mode }.
Schema
type Query {
queryRAGs(qs: AWSJSON!): AWSJSON!
}Recommended qs structure
{
"query": "How do I configure SPI mode 3?",
"pid": "IC_007",
"categories": "prog_guide|datasheet|errata",
"topK": 8,
"mode": "hybrid"
}pidis optional.- If omitted, you get “product selection mode” (global universe).
categoriesis optional.- Use it to restrict the universe to doc types like
errata,prog_guide, etc.
- Use it to restrict the universe to doc types like
- Upload your files to S3 under
<userDir>/<pid>/docs/…
In the web app, the typical flow is:
ragRequestUploadURLs→ PUT to presigned URLs →ragConfirmUploads
-
Register metadata with
reqRAGStorefor each file. -
Trigger indexing for that product
pid(so the worker builds<pid>/index/chunks.json). -
Query with
queryRAGs({ pid: "IC_007", ... }).
-
Upload docs to multiple product pids.
-
Call
reqRAGStorefor each doc so categories/pid/version are recorded. -
Trigger global indexing by running the worker with
pid=global.
- This produces:
<userDir>/global/index/chunks.json - Each chunk is tagged with
metadata.pidso you can filter later.
- Query without
pidto shortlist candidates, or query withpidto focus.
Example global query:
{
"query": "Which IC supports motor control at 24V and has SPI config registers?",
"topK": 12,
"categories": "datasheet|app_note"
}Then follow up with a focused query:
{
"query": "Show the SPI init sequence and errata caveats",
"pid": "IC_007",
"categories": "prog_guide|errata",
"topK": 10
}Below are plain examples (no framework assumptions). If you already use Amplify or an existing API client, adapt the graphqlRequest() function.
async function graphqlRequest({ url, apiKey, query, variables }) {
const res = await fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-api-key': apiKey,
},
body: JSON.stringify({ query, variables }),
});
const json = await res.json();
if (json.errors?.length) throw new Error(json.errors[0].message);
return json.data;
}
const MUTATION = `mutation RegisterDocs($input: [RAGIN]!) { reqRAGStore(input: $input) }`;
// NOTE: reqRAGStore returns AWSJSON string, so you usually JSON.parse() it.
const data = await graphqlRequest({
url: process.env.APPSYNC_URL,
apiKey: process.env.APPSYNC_API_KEY,
query: MUTATION,
variables: {
input: [
{
pid: 'IC_007',
fid: 'ds-001',
file: 'IC_007_datasheet.pdf',
format: 'pdf',
version: 'revC-2026-01',
categories: 'datasheet',
options: { source: 'vendor' },
},
],
},
});
const result = JSON.parse(data.reqRAGStore);
console.log(result);const QUERY = `query QueryRAGs($qs: AWSJSON!) { queryRAGs(qs: $qs) }`;
const qs = {
query: 'What is the max SPI clock?',
pid: 'IC_007',
categories: 'datasheet|prog_guide|errata',
topK: 8,
};
const data = await graphqlRequest({
url: process.env.APPSYNC_URL,
apiKey: process.env.APPSYNC_API_KEY,
query: QUERY,
variables: { qs: JSON.stringify(qs) },
});
const out = JSON.parse(data.queryRAGs);
console.log(out.answer);
console.table(out.chunks.map(c => ({ score: c.score, source: c.source })));These examples use an API key header for simplicity.
import json
import requests
APPSYNC_URL = "https://<your-appsync-id>.appsync-api.<region>.amazonaws.com/graphql"
API_KEY = "<api-key>"
def gql(query: str, variables: dict):
r = requests.post(
APPSYNC_URL,
headers={"content-type": "application/json", "x-api-key": API_KEY},
data=json.dumps({"query": query, "variables": variables}),
timeout=60,
)
r.raise_for_status()
payload = r.json()
if payload.get("errors"):
raise RuntimeError(payload["errors"][0]["message"])
return payload["data"]
# --- reqRAGStore ---
REGISTER = """
mutation RegisterDocs($input: [RAGIN]!) {
reqRAGStore(input: $input)
}
"""
data = gql(REGISTER, {
"input": [
{
"pid": "IC_007",
"fid": "ds-001",
"file": "IC_007_datasheet.pdf",
"format": "pdf",
"version": "revC-2026-01",
"categories": "datasheet",
"options": {"source": "vendor"},
}
]
})
print(json.loads(data["reqRAGStore"]))
# --- queryRAGs ---
QUERY_RAGS = """
query QueryRAGs($qs: AWSJSON!) {
queryRAGs(qs: $qs)
}
"""
qs = {
"query": "Does IC_007 have errata about ADC saturation?",
"pid": "IC_007",
"categories": "errata|datasheet",
"topK": 10,
}
data = gql(QUERY_RAGS, {"qs": json.dumps(qs)})
out = json.loads(data["queryRAGs"])
print(out.get("answer"))
print("chunks:", len(out.get("chunks", [])))-
queryRAGsreturns no chunks- Ensure indexing has been run and
chunks.jsonexists in S3. - For global mode, ensure
pid=globalindexing was triggered and completed.
- Ensure indexing has been run and
-
Filtering by
piddoesn’t work- Ensure
chunks.jsoncontainsmetadata.pid. (Global indexing attaches this automatically; per-pid indexing will set pid too.)
- Ensure
-
Filtering by
categoriesdoesn’t work- Ensure you called
reqRAGStorefor the docs and categories are present. - Categories are stored in the doc registry and merged into chunk metadata.
- Ensure you called
-
Doc revisions
- Call
reqRAGStoreagain with a newerversion, then re-index.
- Call