|
| 1 | +--- |
| 2 | +title: "*FalkorDB* connector" |
| 3 | +toc_max_heading_level: 4 |
| 4 | +description: > |
| 5 | + Write to FalkorDB — a Redis-backed graph database — with support for node |
| 6 | + tables, relationship tables (edges), per-graph multitenancy, and vector |
| 7 | + indexes with cosine / euclidean / inner-product distances. |
| 8 | +--- |
| 9 | + |
| 10 | +The `falkordb` connector writes records to FalkorDB, a Cypher-compatible graph database that runs as a Redis module. It supports node tables (labels), relationship tables (edge types), per-graph multitenancy (one Redis instance, many isolated graphs), and vector indexes. |
| 11 | + |
| 12 | +```python |
| 13 | +from cocoindex.connectors import falkordb |
| 14 | +``` |
| 15 | + |
| 16 | +:::note[Dependencies] |
| 17 | +This connector requires additional dependencies. Install with: |
| 18 | + |
| 19 | +```bash |
| 20 | +pip install cocoindex[falkordb] |
| 21 | +``` |
| 22 | + |
| 23 | +::: |
| 24 | + |
| 25 | +## Connection setup |
| 26 | + |
| 27 | +Create a `ConnectionFactory` and provide it via a `ContextKey`. The factory holds the FalkorDB URI plus the target graph name, and yields a graph handle on demand. |
| 28 | + |
| 29 | +:::note |
| 30 | +The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed rows. See [ContextKey as stable identity](../programming_guide/context#contextkey-as-stable-identity) before renaming. |
| 31 | +::: |
| 32 | + |
| 33 | +```python |
| 34 | +from collections.abc import AsyncIterator |
| 35 | +from cocoindex.connectors import falkordb |
| 36 | +import cocoindex as coco |
| 37 | + |
| 38 | +KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db") |
| 39 | + |
| 40 | +@coco.lifespan |
| 41 | +async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]: |
| 42 | + builder.provide( |
| 43 | + KG_DB, |
| 44 | + falkordb.ConnectionFactory( |
| 45 | + uri="falkor://localhost:6379", |
| 46 | + graph="knowledge_graph", |
| 47 | + ), |
| 48 | + ) |
| 49 | + yield |
| 50 | +``` |
| 51 | + |
| 52 | +### Multitenancy |
| 53 | + |
| 54 | +A single Redis instance can host many fully isolated graphs. Pair each graph with its own `ContextKey` and `ConnectionFactory(graph=...)`: |
| 55 | + |
| 56 | +```python |
| 57 | +KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db") |
| 58 | +APIS_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("apis_db") |
| 59 | + |
| 60 | +@coco.lifespan |
| 61 | +async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]: |
| 62 | + uri = "falkor://localhost:6379" |
| 63 | + builder.provide(KG_DB, falkordb.ConnectionFactory(uri=uri, graph="knowledge_graph")) |
| 64 | + builder.provide(APIS_DB, falkordb.ConnectionFactory(uri=uri, graph="apis_graph")) |
| 65 | + yield |
| 66 | +``` |
| 67 | + |
| 68 | +Different `ContextKey`s with different graph names produce fully separate target-state trees — changes to one never spill into the other. |
| 69 | + |
| 70 | +## As target |
| 71 | + |
| 72 | +The `falkordb` connector provides target state APIs for writing records to node tables and relation tables. CocoIndex tracks what records should exist and automatically handles upserts and deletions. |
| 73 | + |
| 74 | +Each `graph.query` call against FalkorDB is its own atomic unit (FalkorDB does not expose multi-statement transactions); the connector orders writes within a batch as **node upserts → relation upserts → relation deletes → node deletes** so dependent edges always see their endpoints. |
| 75 | + |
| 76 | +### Declaring target states |
| 77 | + |
| 78 | +#### Node tables (parent state) |
| 79 | + |
| 80 | +Declares a node label as a target state. Returns a `TableTarget` for declaring records. |
| 81 | + |
| 82 | +```python |
| 83 | +def declare_table_target( |
| 84 | + db: ContextKey, |
| 85 | + table_name: str, |
| 86 | + table_schema: TableSchema[RowT] | None = None, |
| 87 | + *, |
| 88 | + primary_key: str = "id", |
| 89 | + managed_by: Literal["system", "user"] = "system", |
| 90 | +) -> TableTarget[RowT, coco.PendingS] |
| 91 | +``` |
| 92 | + |
| 93 | +**Parameters:** |
| 94 | + |
| 95 | +- `db` — A `ContextKey[falkordb.ConnectionFactory]` for the FalkorDB connection. |
| 96 | +- `table_name` — The Cypher node label (e.g. `"Document"`). |
| 97 | +- `table_schema` — Optional schema definition (see [Table Schema](#table-schema-from-python-class)). FalkorDB does not enforce per-property types server-side, so the schema participates in CocoIndex's fingerprint (so two flows declaring the same label must agree) but no per-column DDL is emitted. |
| 98 | +- `primary_key` — Single property name used as the node's primary key. Defaults to `"id"`. Compound primary keys are not supported in v1.0. |
| 99 | +- `managed_by` — Whether CocoIndex manages the table lifecycle (`"system"`) or assumes it exists (`"user"`). |
| 100 | + |
| 101 | +**Returns:** A pending `TableTarget`. Use `await falkordb.mount_table_target(KG_DB, ...)` to get a resolved target. |
| 102 | + |
| 103 | +#### Records (child states) |
| 104 | + |
| 105 | +Once a `TableTarget` is resolved, declare records to be upserted (translated to `MERGE (n:Label {pk: $key_0}) SET n += $props`): |
| 106 | + |
| 107 | +```python |
| 108 | +def TableTarget.declare_record( |
| 109 | + self, |
| 110 | + *, |
| 111 | + row: RowT, |
| 112 | +) -> None |
| 113 | +``` |
| 114 | + |
| 115 | +**Parameters:** |
| 116 | + |
| 117 | +- `row` — A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include the `primary_key` field declared above. |
| 118 | + |
| 119 | +`declare_row` is an alias for `declare_record`, for compatibility with Postgres and other RDBMS targets. |
| 120 | + |
| 121 | +#### Relation tables (parent state) |
| 122 | + |
| 123 | +Declares a relationship type as a target state. Returns a `RelationTarget` for declaring edges. |
| 124 | + |
| 125 | +```python |
| 126 | +def declare_relation_target( |
| 127 | + db: ContextKey, |
| 128 | + table_name: str, |
| 129 | + from_table: TableTarget, |
| 130 | + to_table: TableTarget, |
| 131 | + table_schema: TableSchema[RowT] | None = None, |
| 132 | + *, |
| 133 | + primary_key: str = "id", |
| 134 | + managed_by: Literal["system", "user"] = "system", |
| 135 | +) -> RelationTarget[RowT, coco.PendingS] |
| 136 | +``` |
| 137 | + |
| 138 | +**Parameters:** |
| 139 | + |
| 140 | +- `db` — A `ContextKey[falkordb.ConnectionFactory]` for the FalkorDB connection. |
| 141 | +- `table_name` — The Cypher relationship type (e.g. `"MENTION"`). |
| 142 | +- `from_table` — The `TableTarget` whose nodes are the *source* endpoints of edges in this relationship. |
| 143 | +- `to_table` — The `TableTarget` whose nodes are the *target* endpoints of edges in this relationship. |
| 144 | +- `table_schema` — Optional schema for the relationship's own properties (see [Table Schema](#table-schema-from-python-class)). The relationship's `primary_key` field uniquely identifies each edge. |
| 145 | +- `primary_key` — Single property name used as the edge's primary key. Defaults to `"id"`. |
| 146 | +- `managed_by` — Whether CocoIndex manages the relationship lifecycle (`"system"`) or assumes it exists (`"user"`). |
| 147 | + |
| 148 | +**Returns:** A pending `RelationTarget`. Use `await falkordb.mount_relation_target(KG_DB, ...)` to get a resolved target. |
| 149 | + |
| 150 | +#### Relations (child states) |
| 151 | + |
| 152 | +Once a `RelationTarget` is resolved, declare edges. Each declaration produces a triple-MERGE: source endpoint, target endpoint, then the relationship. |
| 153 | + |
| 154 | +```python |
| 155 | +def RelationTarget.declare_relation( |
| 156 | + self, |
| 157 | + *, |
| 158 | + from_id: Any, |
| 159 | + to_id: Any, |
| 160 | + record: RowT | None = None, |
| 161 | +) -> None |
| 162 | +``` |
| 163 | + |
| 164 | +**Parameters:** |
| 165 | + |
| 166 | +- `from_id` — The source node's primary-key value. The connector MERGEs `(s:FromLabel {pk: $from_id})` so endpoints are auto-created if absent. |
| 167 | +- `to_id` — The target node's primary-key value. Same MERGE behavior. |
| 168 | +- `record` — Optional row object whose fields populate the relationship's properties. Must include the relationship's `primary_key` field if provided. |
| 169 | + |
| 170 | +If `record` is omitted, the connector derives a deterministic edge id from `(from_label, from_id, to_label, to_id)`. This is convenient when an edge has no properties of its own. |
| 171 | + |
| 172 | +#### Vector indexes (attachment) |
| 173 | + |
| 174 | +Declares a vector index on a column of a node table. Vector indexes are an [attachment](../advanced_topics/custom_target_connector#implementing-attachment-providers) to a `TableTarget`: |
| 175 | + |
| 176 | +```python |
| 177 | +def TableTarget.declare_vector_index( |
| 178 | + self, |
| 179 | + *, |
| 180 | + name: str | None = None, |
| 181 | + field: str, |
| 182 | + metric: Literal["cosine", "euclidean", "ip"] = "cosine", |
| 183 | + dimension: int, |
| 184 | +) -> None |
| 185 | +``` |
| 186 | + |
| 187 | +**Parameters:** |
| 188 | + |
| 189 | +- `name` — Optional logical name for the index. Defaults to `f"idx_{table_name}__{field}"`. |
| 190 | +- `field` — The node property holding the vector. |
| 191 | +- `metric` — Similarity metric: `"cosine"`, `"euclidean"`, or `"ip"` (inner product). Translated to FalkorDB's `similarityFunction` option. |
| 192 | +- `dimension` — The vector's dimension. Required. |
| 193 | + |
| 194 | +The connector emits `CREATE VECTOR INDEX FOR (e:Label) ON (e.field) OPTIONS {dimension: N, similarityFunction: '...'}`. Vectors are float32 only — wider vector dtypes are not supported. |
| 195 | + |
| 196 | +### Table schema: from Python class |
| 197 | + |
| 198 | +Build a `TableSchema` by introspecting a record type: |
| 199 | + |
| 200 | +```python |
| 201 | +@classmethod |
| 202 | +async def TableSchema.from_class( |
| 203 | + cls, |
| 204 | + record_type: type[RowT], |
| 205 | + *, |
| 206 | + primary_key: str = "id", |
| 207 | + column_overrides: dict[str, FalkorType | VectorSchemaProvider] | None = None, |
| 208 | +) -> TableSchema[RowT] |
| 209 | +``` |
| 210 | + |
| 211 | +**Parameters:** |
| 212 | + |
| 213 | +- `record_type` — A dataclass, NamedTuple, or Pydantic model. |
| 214 | +- `primary_key` — Field name to use as the table's primary key. Defaults to `"id"`. |
| 215 | +- `column_overrides` — Optional dict mapping field names to `FalkorType` or `VectorSchemaProvider` to override the default Python-to-FalkorDB type mapping. |
| 216 | + |
| 217 | +**Returns:** A `TableSchema[RowT]` populated from the class's fields. |
| 218 | + |
| 219 | +#### Default Python → FalkorDB type mapping |
| 220 | + |
| 221 | +| Python type | FalkorDB type | Notes | |
| 222 | +|---|---|---| |
| 223 | +| `bool` | `boolean` | | |
| 224 | +| `int`, NumPy integer scalars | `integer` | | |
| 225 | +| `float`, NumPy float scalars | `float` | | |
| 226 | +| `decimal.Decimal` | `string` | Encoded via `str()` — FalkorDB has no decimal type. | |
| 227 | +| `str` | `string` | | |
| 228 | +| `bytes` | `string` | Encoded as base64. | |
| 229 | +| `uuid.UUID` | `string` | Encoded via `str()`. | |
| 230 | +| `datetime.date` / `datetime.datetime` / `datetime.time` | `string` | Encoded via `.isoformat()`. | |
| 231 | +| `datetime.timedelta` | `integer` | Encoded as milliseconds (`int(td.total_seconds() * 1000)`). | |
| 232 | +| `numpy.ndarray` (with `VectorSchema` annotation) | `vector<float32, N>` | Encoded as `list[float]`. | |
| 233 | +| `dict`, list, nested record, `Any` | `map` / `array` | Passed through native parameter binding. | |
| 234 | + |
| 235 | +#### FalkorType |
| 236 | + |
| 237 | +Override the default mapping for a single column with `FalkorType`: |
| 238 | + |
| 239 | +```python |
| 240 | +class FalkorType(NamedTuple): |
| 241 | + falkor_type: str |
| 242 | + encoder: ValueEncoder | None = None |
| 243 | +``` |
| 244 | + |
| 245 | +Use with `typing.Annotated`: |
| 246 | + |
| 247 | +```python |
| 248 | +from typing import Annotated |
| 249 | +from dataclasses import dataclass |
| 250 | +from cocoindex.connectors.falkordb import FalkorType |
| 251 | + |
| 252 | +@dataclass |
| 253 | +class Row: |
| 254 | + id: str |
| 255 | + score: Annotated[float, FalkorType("decimal", encoder=str)] |
| 256 | +``` |
| 257 | + |
| 258 | +The `falkor_type` string is metadata-only — it participates in the schema fingerprint (so two flows declaring the same table must agree) but no DDL is emitted from it. |
| 259 | + |
| 260 | +#### VectorSchemaProvider |
| 261 | + |
| 262 | +For NumPy `ndarray` columns, attach a `VectorSchema` annotation to specify dtype + dimension. See [VectorSchema](../common_resources/vector_schema) for details. |
| 263 | + |
| 264 | +### Table schema: explicit column definitions |
| 265 | + |
| 266 | +Build a `TableSchema` directly from a dict of column definitions when the row type is dynamic: |
| 267 | + |
| 268 | +```python |
| 269 | +from cocoindex.connectors.falkordb import TableSchema, ColumnDef |
| 270 | + |
| 271 | +schema = TableSchema( |
| 272 | + columns={ |
| 273 | + "filename": ColumnDef(type="string"), |
| 274 | + "title": ColumnDef(type="string"), |
| 275 | + "summary": ColumnDef(type="string", nullable=True), |
| 276 | + }, |
| 277 | + primary_key="filename", |
| 278 | +) |
| 279 | +``` |
| 280 | + |
| 281 | +`ColumnDef` fields: |
| 282 | + |
| 283 | +- `type` — The FalkorDB type string (metadata only; see table above). |
| 284 | +- `nullable` — Whether the column may be `None`. Defaults to `True`. |
| 285 | +- `encoder` — Optional `Callable[[Any], Any]` applied to non-`None` values before they're sent to FalkorDB. |
| 286 | + |
| 287 | +### DDL: indexes and constraints |
| 288 | + |
| 289 | +For each managed table, the connector creates the supporting Cypher index on the primary key field on first run: |
| 290 | + |
| 291 | +- For node tables: `CREATE INDEX FOR (e:Label) ON (e.<pk>)`. |
| 292 | +- For relation tables: `CREATE INDEX FOR ()-[e:RelType]-() ON (e.<pk>)`. |
| 293 | + |
| 294 | +It then attempts a uniqueness constraint via the `GRAPH.CONSTRAINT CREATE` Redis command (best-effort — failures are logged but do not abort). Indexes and constraints are dropped on `cocoindex drop` or when the table is no longer declared. |
| 295 | + |
| 296 | +When `managed_by="user"` is set, the connector skips DDL entirely — you're responsible for creating and dropping the schema. Record-level upserts and deletes still work. |
| 297 | + |
| 298 | +### Example: Node tables |
| 299 | + |
| 300 | +```python |
| 301 | +from collections.abc import AsyncIterator |
| 302 | +from dataclasses import dataclass |
| 303 | +import cocoindex as coco |
| 304 | +from cocoindex.connectors import falkordb |
| 305 | + |
| 306 | +KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db") |
| 307 | + |
| 308 | + |
| 309 | +@dataclass |
| 310 | +class Document: |
| 311 | + filename: str |
| 312 | + title: str |
| 313 | + summary: str |
| 314 | + |
| 315 | + |
| 316 | +@coco.lifespan |
| 317 | +async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]: |
| 318 | + builder.provide(KG_DB, falkordb.ConnectionFactory( |
| 319 | + uri="falkor://localhost:6379", graph="knowledge_graph", |
| 320 | + )) |
| 321 | + yield |
| 322 | + |
| 323 | + |
| 324 | +@coco.fn |
| 325 | +async def app_main() -> None: |
| 326 | + schema = await falkordb.TableSchema.from_class(Document, primary_key="filename") |
| 327 | + documents = await falkordb.mount_table_target( |
| 328 | + KG_DB, "Document", schema, primary_key="filename", |
| 329 | + ) |
| 330 | + documents.declare_record( |
| 331 | + row=Document( |
| 332 | + filename="overview.md", |
| 333 | + title="Overview", |
| 334 | + summary="An overview of CocoIndex...", |
| 335 | + ) |
| 336 | + ) |
| 337 | + |
| 338 | + |
| 339 | +app = coco.App(coco.AppConfig(name="docs_to_falkordb"), app_main) |
| 340 | +``` |
| 341 | + |
| 342 | +### Example: Relation tables (knowledge graph) |
| 343 | + |
| 344 | +```python |
| 345 | +@dataclass |
| 346 | +class Entity: |
| 347 | + value: str |
| 348 | + |
| 349 | + |
| 350 | +@dataclass |
| 351 | +class RelationshipRow: |
| 352 | + id: str |
| 353 | + predicate: str |
| 354 | + |
| 355 | + |
| 356 | +@coco.fn |
| 357 | +async def kg_app_main() -> None: |
| 358 | + documents = await falkordb.mount_table_target( |
| 359 | + KG_DB, "Document", |
| 360 | + await falkordb.TableSchema.from_class(Document, primary_key="filename"), |
| 361 | + primary_key="filename", |
| 362 | + ) |
| 363 | + entities = await falkordb.mount_table_target( |
| 364 | + KG_DB, "Entity", |
| 365 | + await falkordb.TableSchema.from_class(Entity, primary_key="value"), |
| 366 | + primary_key="value", |
| 367 | + ) |
| 368 | + relationships = await falkordb.mount_relation_target( |
| 369 | + KG_DB, "RELATIONSHIP", |
| 370 | + entities, entities, |
| 371 | + await falkordb.TableSchema.from_class(RelationshipRow, primary_key="id"), |
| 372 | + primary_key="id", |
| 373 | + ) |
| 374 | + |
| 375 | + # populate ... |
| 376 | + documents.declare_record(row=Document(filename="overview.md", title="Overview", summary="...")) |
| 377 | + entities.declare_record(row=Entity(value="CocoIndex")) |
| 378 | + entities.declare_record(row=Entity(value="FalkorDB")) |
| 379 | + relationships.declare_relation( |
| 380 | + from_id="CocoIndex", |
| 381 | + to_id="FalkorDB", |
| 382 | + record=RelationshipRow(id="rel-1", predicate="writes_to"), |
| 383 | + ) |
| 384 | + |
| 385 | + |
| 386 | +kg_app = coco.App(coco.AppConfig(name="kg_app"), kg_app_main) |
| 387 | +``` |
| 388 | + |
| 389 | +The `Entity` table is declared up-front (via `mount_table_target`) so its index and constraint are reconciled before any `RELATIONSHIP` edge MERGEs entity endpoints. The relationship's three-MERGE pattern (source endpoint → target endpoint → edge) means missing endpoints are auto-created — but it's good practice to declare them explicitly so deletion-cascade behavior stays predictable. |
0 commit comments