Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions CMV_SUPPORT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# CMV (Continuous Materialized View) Support for little_bigtable

## Overview

Bigtable CMVs allow you to re-key table data for efficient queries on alternate key orderings.
Production Bigtable handles CMV maintenance automatically. This emulator replicates that
behavior via the standard `CreateMaterializedView` gRPC method.

## How It Works

### 1. Creating a CMV

There are two ways to create a CMV in the emulator:

#### Option A: gRPC (recommended, matches production)

Use the standard Go admin client, pointed at the emulator:

```go
iac, err := bigtable.NewInstanceAdminClient(ctx, project)
err = iac.CreateMaterializedView(ctx, instanceID, &bigtable.MaterializedViewInfo{
MaterializedViewID: "events_by_account",
Query: `SELECT
SPLIT(_key, '#')[SAFE_OFFSET(3)] AS region,
SPLIT(_key, '#')[SAFE_OFFSET(4)] AS account_id,
SPLIT(_key, '#')[SAFE_OFFSET(1)] AS ts,
SPLIT(_key, '#')[SAFE_OFFSET(2)] AS typ,
SPLIT(_key, '#')[SAFE_OFFSET(0)] AS item_id,
_key AS src_key,
cf1 AS cf1
FROM ` + "`events`" + `
ORDER BY region, account_id, ts, typ, item_id, src_key`,
})
```

The emulator parses the SQL to extract the key transformation config. The same code works
against both production Bigtable and the emulator.

#### Option B: `--cmv-config` flag (local testing convenience)

For local testing without writing client code, pass a JSON config file at startup:

```bash
./little_bigtable --port 9000 --db-file /tmp/lbt.db --cmv-config /path/to/cmv_config.json
```

The JSON file is an array of CMV config objects:

```json
[
{
"source_table": "events",
"view_id": "events_by_account",
"key_separator": "#",
"key_mapping": [3, 4, 1, 2, 0],
"append_source_key": true,
"include_families": ["cf1"]
}
]
```

| Field | Description |
|---|---|
| `source_table` | Table ID of the source table (not the fully-qualified name) |
| `view_id` | Table ID to use for the CMV shadow table |
| `key_separator` | Delimiter used in the composite row key |
| `key_mapping` | Ordered list of 0-based source key component indices for the CMV key |
| `append_source_key` | If true, appends the full original source key as the final component |
| `include_families` | Column families to copy; omit or leave empty to include all |

> **Note:** The `--cmv-config` approach registers CMVs directly without going through SQL
> parsing. It is intended for local testing only. In production (and for full emulator
> fidelity), use `CreateMaterializedView` via the gRPC client.

> **Persistence:** CMV registrations loaded via either approach are in-memory only. If the
> emulator restarts, CMVs must be re-registered (or the `--cmv-config` flag re-supplied).

### 2. Write-time Sync

When data is written to a source table (via MutateRow, MutateRows, CheckAndMutateRow,
or ReadModifyWriteRow), the emulator automatically:

1. Detects if the target table has any registered CMVs
2. Creates the CMV shadow table (if it doesn't exist yet) with matching column families
3. Transforms the source row key per the SQL's `ORDER BY`
4. Writes the re-keyed row to the shadow table

### 3. Delete Propagation

When source rows are deleted (DeleteFromRow mutation, DropRowRange), the emulator
derives the CMV key and deletes the corresponding CMV row.

### 4. Reading from the CMV

Since the CMV shadow table is a regular table, reads use the standard approach:

```go
table := client.Open("events_by_account")
row, err := table.ReadRow(ctx, "region-a#account-42#...")
```

## What's Changed

### New Files
- `bttest/cmv.go` — CMV config types, registry, key transformation logic
- `bttest/sql_parse.go` — SQL parser for extracting CMV config from a `CreateMaterializedView` query
- `bttest/cmv_test.go` — Tests for key transformation, write sync, delete propagation
- `bttest/sql_parse_test.go` — Tests for the SQL parser

### Modified Files
- `little_bigtable.go` — Version bump to 0.2.0; added `--cmv-config` flag
- `bttest/inmem.go` — Added `cmvs` field to server struct, CMV registration,
shadow table creation, write-time sync hooks in MutateRow/MutateRows/
CheckAndMutateRow/ReadModifyWriteRow/DropRowRange
- `bttest/instance_server.go` — Implemented CreateMaterializedView, GetMaterializedView,
ListMaterializedViews, UpdateMaterializedView (DeletionProtection only), DeleteMaterializedView

## Known Limitations

- **SQL parser**: CMV SQL is parsed with regex scoped to the standard Bigtable CMV format.
Unusual SQL formatting may fail to parse.
- **GC policy propagation**: The CMV shadow table copies column families from the source
at creation time. If the source table's GC policies change later, the CMV won't update.
- **ModifyColumnFamilies sync**: Column family changes on the source table after CMV
creation are not reflected in the CMV table.
- **Backfill**: Data written to the source table before the CMV is registered is not
retroactively copied.
- **Persistence**: CMV registrations are in-memory only and do not survive a restart.
Re-register via `CreateMaterializedView` or re-supply `--cmv-config` on each startup.

## Example: Key Transformation

Source row key format (5 components):
```
item_id#timestamp#type#region#account_id
```

With `ORDER BY region, account_id, timestamp, type, item_id, src_key`, a source key:
```
item-abc#9999999#type-x#region-a#account-42
```
Becomes CMV key:
```
region-a#account-42#9999999#type-x#item-abc#item-abc#9999999#type-x#region-a#account-42
```

The first 5 components are the re-ordered key; the remainder is the full original source
key appended because `_key AS src_key` appears in the `ORDER BY`.
192 changes: 192 additions & 0 deletions bttest/cmv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package bttest

import (
"encoding/json"
"fmt"
"log"
"os"
"strings"
"sync"
)

// CMVConfig defines a Continuous Materialized View for the emulator.
// CMVs are created via the CreateMaterializedView gRPC method.
type CMVConfig struct {
// SourceTable is the Bigtable table ID that feeds this CMV.
SourceTable string `json:"source_table"`
// ViewID is the materialized view ID (used as the table name for reads).
ViewID string `json:"view_id"`
// KeySeparator is the delimiter used in the source table's composite row key.
KeySeparator string `json:"key_separator"`
// KeyMapping defines how source key components map to CMV key components.
// Each entry is the 0-based index into the SPLIT result of the source key.
// The CMV row key is built by joining the mapped components with KeySeparator.
// Example: [3,4,1,2,0] means CMV key = source[3]#source[4]#source[1]#source[2]#source[0]
KeyMapping []int `json:"key_mapping"`
// IncludeFamilies lists the column families to carry from source to CMV.
// An empty list means all families are included.
IncludeFamilies []string `json:"include_families,omitempty"`
// AppendSourceKey appends the original source row key as the final component.
AppendSourceKey bool `json:"append_source_key,omitempty"`
}

// cmvRegistry maps plain source table IDs to CMV definitions.
// Lookups match by table ID suffix against fully-qualified table names.
// Its own mu protects concurrent reads/writes to configs independently of s.mu.
type cmvRegistry struct {
mu sync.RWMutex
configs map[string][]CMVConfig
}

func newCMVRegistry() *cmvRegistry {
return &cmvRegistry{
configs: make(map[string][]CMVConfig),
}
}

func (r *cmvRegistry) register(cfg CMVConfig) {
r.mu.Lock()
defer r.mu.Unlock()
r.configs[cfg.SourceTable] = append(r.configs[cfg.SourceTable], cfg)
}

func (r *cmvRegistry) deregister(viewID string) {
r.mu.Lock()
defer r.mu.Unlock()
for src, cfgs := range r.configs {
filtered := cfgs[:0]
for _, c := range cfgs {
if c.ViewID != viewID {
filtered = append(filtered, c)
}
}
if len(filtered) == 0 {
delete(r.configs, src)
} else {
r.configs[src] = filtered
}
}
}

// deregisterBySource removes all CMV configs for a given source table and
// returns the view IDs that were registered against it.
func (r *cmvRegistry) deregisterBySource(sourceTable string) []string {
r.mu.Lock()
defer r.mu.Unlock()
cfgs := r.configs[sourceTable]
if len(cfgs) == 0 {
return nil
}
viewIDs := make([]string, len(cfgs))
for i, c := range cfgs {
viewIDs[i] = c.ViewID
}
delete(r.configs, sourceTable)
return viewIDs
}

func (r *cmvRegistry) cmvsForTable(fqTable string) []*cmvInstance {
parent, tableID := splitFQTable(fqTable)
r.mu.RLock()
cfgs, ok := r.configs[tableID]
if !ok {
r.mu.RUnlock()
return nil
}
result := make([]*cmvInstance, len(cfgs))
for i := range cfgs {
result[i] = &cmvInstance{config: cfgs[i], parent: parent}
}
r.mu.RUnlock()
return result
}

// splitFQTable splits "projects/p/instances/i/tables/t" into parent and tableID.
func splitFQTable(fqTable string) (parent, tableID string) {
idx := strings.LastIndex(fqTable, "/tables/")
if idx < 0 {
return "", fqTable
}
return fqTable[:idx], fqTable[idx+len("/tables/"):]
}

type cmvInstance struct {
config CMVConfig
parent string // e.g., projects/p/instances/i
}

func (c *cmvInstance) transformKey(sourceKey string) string {
parts := strings.Split(sourceKey, c.config.KeySeparator)
var newParts []string
for _, idx := range c.config.KeyMapping {
if idx < len(parts) {
newParts = append(newParts, parts[idx])
} else {
log.Printf("CMV %q: key_mapping index %d out of bounds for source key %q (%d parts) — check your config",
c.config.ViewID, idx, sourceKey, len(parts))
newParts = append(newParts, "")
}
}
if c.config.AppendSourceKey {
newParts = append(newParts, sourceKey)
}
return strings.Join(newParts, c.config.KeySeparator)
}

// shouldIncludeFamily returns true for all families when IncludeFamilies is empty.
func (c *cmvInstance) shouldIncludeFamily(famName string) bool {
if len(c.config.IncludeFamilies) == 0 {
return true
}
for _, f := range c.config.IncludeFamilies {
if f == famName {
return true
}
}
return false
}

// buildCMVRow builds a re-keyed CMV row by copying all included families from the source row.
func (c *cmvInstance) buildCMVRow(sourceRow *row) *row {
newKey := c.transformKey(sourceRow.key)
cmvRow := newRow(newKey)
for famName, fam := range sourceRow.families {
if !c.shouldIncludeFamily(famName) {
continue
}
newFam := &family{
Name: famName,
Order: fam.Order,
Cells: make(map[string][]cell),
}
newFam.ColNames = make([]string, len(fam.ColNames))
copy(newFam.ColNames, fam.ColNames)
for col, cells := range fam.Cells {
newCells := make([]cell, len(cells))
copy(newCells, cells)
newFam.Cells[col] = newCells
}
cmvRow.families[famName] = newFam
}
return cmvRow
}

// deriveCMVKey returns the CMV row key for a given source key.
func (c *cmvInstance) deriveCMVKey(sourceKey string) string {
return c.transformKey(sourceKey)
}

// LoadCMVConfigs reads a JSON array of CMVConfig from the given file path.
// This is used with the --cmv-config flag for local testing convenience.
func LoadCMVConfigs(path string) ([]CMVConfig, error) {
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("opening cmv config: %w", err)
}
defer f.Close()
var configs []CMVConfig
if err := json.NewDecoder(f).Decode(&configs); err != nil {
return nil, fmt.Errorf("parsing cmv config: %w", err)
}
return configs, nil
}
Loading