diff --git a/CMV_SUPPORT.md b/CMV_SUPPORT.md new file mode 100644 index 0000000..ed7dde1 --- /dev/null +++ b/CMV_SUPPORT.md @@ -0,0 +1,148 @@ +# CMV (Continuous Materialized View) Support for little_bigtable + +## Overview + +Bigtable CMVs allow you to re-key table data for efficient queries on alternate key orderings. +Production Bigtable handles CMV maintenance automatically. This emulator replicates that +behavior via the standard `CreateMaterializedView` gRPC method. + +## How It Works + +### 1. Creating a CMV + +There are two ways to create a CMV in the emulator: + +#### Option A: gRPC (recommended, matches production) + +Use the standard Go admin client, pointed at the emulator: + +```go +iac, err := bigtable.NewInstanceAdminClient(ctx, project) +err = iac.CreateMaterializedView(ctx, instanceID, &bigtable.MaterializedViewInfo{ + MaterializedViewID: "events_by_account", + Query: `SELECT + SPLIT(_key, '#')[SAFE_OFFSET(3)] AS region, + SPLIT(_key, '#')[SAFE_OFFSET(4)] AS account_id, + SPLIT(_key, '#')[SAFE_OFFSET(1)] AS ts, + SPLIT(_key, '#')[SAFE_OFFSET(2)] AS typ, + SPLIT(_key, '#')[SAFE_OFFSET(0)] AS item_id, + _key AS src_key, + cf1 AS cf1 +FROM ` + "`events`" + ` +ORDER BY region, account_id, ts, typ, item_id, src_key`, +}) +``` + +The emulator parses the SQL to extract the key transformation config. The same code works +against both production Bigtable and the emulator. + +#### Option B: `--cmv-config` flag (local testing convenience) + +For local testing without writing client code, pass a JSON config file at startup: + +```bash +./little_bigtable --port 9000 --db-file /tmp/lbt.db --cmv-config /path/to/cmv_config.json +``` + +The JSON file is an array of CMV config objects: + +```json +[ + { + "source_table": "events", + "view_id": "events_by_account", + "key_separator": "#", + "key_mapping": [3, 4, 1, 2, 0], + "append_source_key": true, + "include_families": ["cf1"] + } +] +``` + +| Field | Description | +|---|---| +| `source_table` | Table ID of the source table (not the fully-qualified name) | +| `view_id` | Table ID to use for the CMV shadow table | +| `key_separator` | Delimiter used in the composite row key | +| `key_mapping` | Ordered list of 0-based source key component indices for the CMV key | +| `append_source_key` | If true, appends the full original source key as the final component | +| `include_families` | Column families to copy; omit or leave empty to include all | + +> **Note:** The `--cmv-config` approach registers CMVs directly without going through SQL +> parsing. It is intended for local testing only. In production (and for full emulator +> fidelity), use `CreateMaterializedView` via the gRPC client. + +> **Persistence:** CMV registrations loaded via either approach are in-memory only. If the +> emulator restarts, CMVs must be re-registered (or the `--cmv-config` flag re-supplied). + +### 2. Write-time Sync + +When data is written to a source table (via MutateRow, MutateRows, CheckAndMutateRow, +or ReadModifyWriteRow), the emulator automatically: + +1. Detects if the target table has any registered CMVs +2. Creates the CMV shadow table (if it doesn't exist yet) with matching column families +3. Transforms the source row key per the SQL's `ORDER BY` +4. Writes the re-keyed row to the shadow table + +### 3. Delete Propagation + +When source rows are deleted (DeleteFromRow mutation, DropRowRange), the emulator +derives the CMV key and deletes the corresponding CMV row. + +### 4. Reading from the CMV + +Since the CMV shadow table is a regular table, reads use the standard approach: + +```go +table := client.Open("events_by_account") +row, err := table.ReadRow(ctx, "region-a#account-42#...") +``` + +## What's Changed + +### New Files +- `bttest/cmv.go` — CMV config types, registry, key transformation logic +- `bttest/sql_parse.go` — SQL parser for extracting CMV config from a `CreateMaterializedView` query +- `bttest/cmv_test.go` — Tests for key transformation, write sync, delete propagation +- `bttest/sql_parse_test.go` — Tests for the SQL parser + +### Modified Files +- `little_bigtable.go` — Version bump to 0.2.0; added `--cmv-config` flag +- `bttest/inmem.go` — Added `cmvs` field to server struct, CMV registration, + shadow table creation, write-time sync hooks in MutateRow/MutateRows/ + CheckAndMutateRow/ReadModifyWriteRow/DropRowRange +- `bttest/instance_server.go` — Implemented CreateMaterializedView, GetMaterializedView, + ListMaterializedViews, UpdateMaterializedView (DeletionProtection only), DeleteMaterializedView + +## Known Limitations + +- **SQL parser**: CMV SQL is parsed with regex scoped to the standard Bigtable CMV format. + Unusual SQL formatting may fail to parse. +- **GC policy propagation**: The CMV shadow table copies column families from the source + at creation time. If the source table's GC policies change later, the CMV won't update. +- **ModifyColumnFamilies sync**: Column family changes on the source table after CMV + creation are not reflected in the CMV table. +- **Backfill**: Data written to the source table before the CMV is registered is not + retroactively copied. +- **Persistence**: CMV registrations are in-memory only and do not survive a restart. + Re-register via `CreateMaterializedView` or re-supply `--cmv-config` on each startup. + +## Example: Key Transformation + +Source row key format (5 components): +``` +item_id#timestamp#type#region#account_id +``` + +With `ORDER BY region, account_id, timestamp, type, item_id, src_key`, a source key: +``` +item-abc#9999999#type-x#region-a#account-42 +``` +Becomes CMV key: +``` +region-a#account-42#9999999#type-x#item-abc#item-abc#9999999#type-x#region-a#account-42 +``` + +The first 5 components are the re-ordered key; the remainder is the full original source +key appended because `_key AS src_key` appears in the `ORDER BY`. diff --git a/bttest/cmv.go b/bttest/cmv.go new file mode 100644 index 0000000..e86aa32 --- /dev/null +++ b/bttest/cmv.go @@ -0,0 +1,192 @@ +package bttest + +import ( + "encoding/json" + "fmt" + "log" + "os" + "strings" + "sync" +) + +// CMVConfig defines a Continuous Materialized View for the emulator. +// CMVs are created via the CreateMaterializedView gRPC method. +type CMVConfig struct { + // SourceTable is the Bigtable table ID that feeds this CMV. + SourceTable string `json:"source_table"` + // ViewID is the materialized view ID (used as the table name for reads). + ViewID string `json:"view_id"` + // KeySeparator is the delimiter used in the source table's composite row key. + KeySeparator string `json:"key_separator"` + // KeyMapping defines how source key components map to CMV key components. + // Each entry is the 0-based index into the SPLIT result of the source key. + // The CMV row key is built by joining the mapped components with KeySeparator. + // Example: [3,4,1,2,0] means CMV key = source[3]#source[4]#source[1]#source[2]#source[0] + KeyMapping []int `json:"key_mapping"` + // IncludeFamilies lists the column families to carry from source to CMV. + // An empty list means all families are included. + IncludeFamilies []string `json:"include_families,omitempty"` + // AppendSourceKey appends the original source row key as the final component. + AppendSourceKey bool `json:"append_source_key,omitempty"` +} + +// cmvRegistry maps plain source table IDs to CMV definitions. +// Lookups match by table ID suffix against fully-qualified table names. +// Its own mu protects concurrent reads/writes to configs independently of s.mu. +type cmvRegistry struct { + mu sync.RWMutex + configs map[string][]CMVConfig +} + +func newCMVRegistry() *cmvRegistry { + return &cmvRegistry{ + configs: make(map[string][]CMVConfig), + } +} + +func (r *cmvRegistry) register(cfg CMVConfig) { + r.mu.Lock() + defer r.mu.Unlock() + r.configs[cfg.SourceTable] = append(r.configs[cfg.SourceTable], cfg) +} + +func (r *cmvRegistry) deregister(viewID string) { + r.mu.Lock() + defer r.mu.Unlock() + for src, cfgs := range r.configs { + filtered := cfgs[:0] + for _, c := range cfgs { + if c.ViewID != viewID { + filtered = append(filtered, c) + } + } + if len(filtered) == 0 { + delete(r.configs, src) + } else { + r.configs[src] = filtered + } + } +} + +// deregisterBySource removes all CMV configs for a given source table and +// returns the view IDs that were registered against it. +func (r *cmvRegistry) deregisterBySource(sourceTable string) []string { + r.mu.Lock() + defer r.mu.Unlock() + cfgs := r.configs[sourceTable] + if len(cfgs) == 0 { + return nil + } + viewIDs := make([]string, len(cfgs)) + for i, c := range cfgs { + viewIDs[i] = c.ViewID + } + delete(r.configs, sourceTable) + return viewIDs +} + +func (r *cmvRegistry) cmvsForTable(fqTable string) []*cmvInstance { + parent, tableID := splitFQTable(fqTable) + r.mu.RLock() + cfgs, ok := r.configs[tableID] + if !ok { + r.mu.RUnlock() + return nil + } + result := make([]*cmvInstance, len(cfgs)) + for i := range cfgs { + result[i] = &cmvInstance{config: cfgs[i], parent: parent} + } + r.mu.RUnlock() + return result +} + +// splitFQTable splits "projects/p/instances/i/tables/t" into parent and tableID. +func splitFQTable(fqTable string) (parent, tableID string) { + idx := strings.LastIndex(fqTable, "/tables/") + if idx < 0 { + return "", fqTable + } + return fqTable[:idx], fqTable[idx+len("/tables/"):] +} + +type cmvInstance struct { + config CMVConfig + parent string // e.g., projects/p/instances/i +} + +func (c *cmvInstance) transformKey(sourceKey string) string { + parts := strings.Split(sourceKey, c.config.KeySeparator) + var newParts []string + for _, idx := range c.config.KeyMapping { + if idx < len(parts) { + newParts = append(newParts, parts[idx]) + } else { + log.Printf("CMV %q: key_mapping index %d out of bounds for source key %q (%d parts) — check your config", + c.config.ViewID, idx, sourceKey, len(parts)) + newParts = append(newParts, "") + } + } + if c.config.AppendSourceKey { + newParts = append(newParts, sourceKey) + } + return strings.Join(newParts, c.config.KeySeparator) +} + +// shouldIncludeFamily returns true for all families when IncludeFamilies is empty. +func (c *cmvInstance) shouldIncludeFamily(famName string) bool { + if len(c.config.IncludeFamilies) == 0 { + return true + } + for _, f := range c.config.IncludeFamilies { + if f == famName { + return true + } + } + return false +} + +// buildCMVRow builds a re-keyed CMV row by copying all included families from the source row. +func (c *cmvInstance) buildCMVRow(sourceRow *row) *row { + newKey := c.transformKey(sourceRow.key) + cmvRow := newRow(newKey) + for famName, fam := range sourceRow.families { + if !c.shouldIncludeFamily(famName) { + continue + } + newFam := &family{ + Name: famName, + Order: fam.Order, + Cells: make(map[string][]cell), + } + newFam.ColNames = make([]string, len(fam.ColNames)) + copy(newFam.ColNames, fam.ColNames) + for col, cells := range fam.Cells { + newCells := make([]cell, len(cells)) + copy(newCells, cells) + newFam.Cells[col] = newCells + } + cmvRow.families[famName] = newFam + } + return cmvRow +} + +// deriveCMVKey returns the CMV row key for a given source key. +func (c *cmvInstance) deriveCMVKey(sourceKey string) string { + return c.transformKey(sourceKey) +} + +// LoadCMVConfigs reads a JSON array of CMVConfig from the given file path. +// This is used with the --cmv-config flag for local testing convenience. +func LoadCMVConfigs(path string) ([]CMVConfig, error) { + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("opening cmv config: %w", err) + } + defer f.Close() + var configs []CMVConfig + if err := json.NewDecoder(f).Decode(&configs); err != nil { + return nil, fmt.Errorf("parsing cmv config: %w", err) + } + return configs, nil +} diff --git a/bttest/cmv_test.go b/bttest/cmv_test.go new file mode 100644 index 0000000..0fc4414 --- /dev/null +++ b/bttest/cmv_test.go @@ -0,0 +1,500 @@ +package bttest + +import ( + "context" + "database/sql" + "fmt" + "strings" + "testing" + + btapb "cloud.google.com/go/bigtable/admin/apiv2/adminpb" + btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/fieldmaskpb" +) + +func newTestServerWithCMV(t *testing.T, configs []CMVConfig) *server { + dbFilename := newDBFile(t) + db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared", dbFilename)) + require.NoError(t, err) + t.Cleanup(func() { db.Close() }) + db.SetMaxOpenConns(1) + CreateTables(context.Background(), db) + + s := &server{ + tables: make(map[string]*table), + materializedViews: make(map[string]*btapb.MaterializedView), + db: db, + tableBackend: NewSqlTables(db), + cmvs: newCMVRegistry(), + } + for _, cfg := range configs { + s.cmvs.register(cfg) + } + return s +} + +func TestCMVTransformKey(t *testing.T) { + // key_mapping [2,3,1,0] + append_source_key: + // CMV key = parts[2]#parts[3]#parts[1]#parts[0]# + inst := &cmvInstance{ + config: CMVConfig{ + SourceTable: "sensor_readings", + ViewID: "readings_by_region", + KeySeparator: "#", + KeyMapping: []int{2, 3, 1, 0}, + AppendSourceKey: true, + }, + } + + sourceKey := "device-1#ts-100#us-east#user-42" + got := inst.transformKey(sourceKey) + // parts: [0]device-1 [1]ts-100 [2]us-east [3]user-42 + // mapped: parts[2]#parts[3]#parts[1]#parts[0] + full source key + want := "us-east#user-42#ts-100#device-1#device-1#ts-100#us-east#user-42" + assert.Equal(t, want, got) +} + +func TestCMVTransformKeyNoAppend(t *testing.T) { + inst := &cmvInstance{ + config: CMVConfig{ + SourceTable: "test_table", + ViewID: "test_cmv", + KeySeparator: "#", + KeyMapping: []int{2, 0, 1}, + AppendSourceKey: false, + }, + } + + sourceKey := "a#b#c" + got := inst.transformKey(sourceKey) + assert.Equal(t, "c#a#b", got) +} + +func TestCMVWriteSync(t *testing.T) { + ctx := context.Background() + configs := []CMVConfig{{ + SourceTable: "src_table", + ViewID: "src_table_by_other", + KeySeparator: "#", + KeyMapping: []int{1, 0}, + IncludeFamilies: []string{"cf1"}, + AppendSourceKey: false, + }} + s := newTestServerWithCMV(t, configs) + + parent := "projects/test/instances/test" + + // Create source table with column family cf1. + _, err := s.CreateTable(ctx, &btapb.CreateTableRequest{ + Parent: parent, + TableId: "src_table", + Table: &btapb.Table{ + ColumnFamilies: map[string]*btapb.ColumnFamily{ + "cf1": {}, + }, + }, + }) + require.NoError(t, err) + + fqSrc := parent + "/tables/src_table" + fqCMV := parent + "/tables/src_table_by_other" + + // Write a row to the source table. + _, err = s.MutateRow(ctx, &btpb.MutateRowRequest{ + TableName: fqSrc, + RowKey: []byte("alpha#beta"), + Mutations: []*btpb.Mutation{{ + Mutation: &btpb.Mutation_SetCell_{ + SetCell: &btpb.Mutation_SetCell{ + FamilyName: "cf1", + ColumnQualifier: []byte("col"), + TimestampMicros: 1000, + Value: []byte("hello"), + }, + }, + }}, + }) + require.NoError(t, err) + + // Verify CMV shadow table was created and contains the re-keyed row. + s.mu.Lock() + cmvTbl, ok := s.tables[fqCMV] + s.mu.Unlock() + require.True(t, ok, "CMV shadow table should be created") + + // The CMV key should be beta#alpha (mapping [1, 0]). + cmvRow := cmvTbl.rows.Get(btreeKey("beta#alpha")) + require.NotNil(t, cmvRow) + r := cmvRow.(*row) + assert.Equal(t, "beta#alpha", r.key) + assert.Contains(t, r.families, "cf1") + assert.Contains(t, r.families["cf1"].Cells, "col") + assert.Equal(t, []byte("hello"), r.families["cf1"].Cells["col"][0].Value) +} + +func TestCMVDeleteSync(t *testing.T) { + ctx := context.Background() + configs := []CMVConfig{{ + SourceTable: "src_table", + ViewID: "src_cmv", + KeySeparator: "#", + KeyMapping: []int{1, 0}, + }} + s := newTestServerWithCMV(t, configs) + + parent := "projects/test/instances/test" + fqSrc := parent + "/tables/src_table" + fqCMV := parent + "/tables/src_cmv" + + _, err := s.CreateTable(ctx, &btapb.CreateTableRequest{ + Parent: parent, + TableId: "src_table", + Table: &btapb.Table{ + ColumnFamilies: map[string]*btapb.ColumnFamily{ + "cf1": {}, + }, + }, + }) + require.NoError(t, err) + + // Write a row. + _, err = s.MutateRow(ctx, &btpb.MutateRowRequest{ + TableName: fqSrc, + RowKey: []byte("x#y"), + Mutations: []*btpb.Mutation{{ + Mutation: &btpb.Mutation_SetCell_{ + SetCell: &btpb.Mutation_SetCell{ + FamilyName: "cf1", + ColumnQualifier: []byte("c"), + TimestampMicros: 1000, + Value: []byte("v"), + }, + }, + }}, + }) + require.NoError(t, err) + + // Confirm CMV has the row. + s.mu.Lock() + cmvTbl := s.tables[fqCMV] + s.mu.Unlock() + require.NotNil(t, cmvTbl) + assert.Equal(t, 1, cmvTbl.rows.Len()) + + // Delete the row from source. + _, err = s.MutateRow(ctx, &btpb.MutateRowRequest{ + TableName: fqSrc, + RowKey: []byte("x#y"), + Mutations: []*btpb.Mutation{{ + Mutation: &btpb.Mutation_DeleteFromRow_{ + DeleteFromRow: &btpb.Mutation_DeleteFromRow{}, + }, + }}, + }) + require.NoError(t, err) + + // CMV row should be gone. + assert.Equal(t, 0, cmvTbl.rows.Len()) +} + +func TestCMVDropRowRangeAll(t *testing.T) { + ctx := context.Background() + configs := []CMVConfig{{ + SourceTable: "src_table", + ViewID: "src_cmv", + KeySeparator: "#", + KeyMapping: []int{1, 0}, + }} + s := newTestServerWithCMV(t, configs) + + parent := "projects/test/instances/test" + fqSrc := parent + "/tables/src_table" + fqCMV := parent + "/tables/src_cmv" + + _, err := s.CreateTable(ctx, &btapb.CreateTableRequest{ + Parent: parent, + TableId: "src_table", + Table: &btapb.Table{ + ColumnFamilies: map[string]*btapb.ColumnFamily{ + "cf1": {}, + }, + }, + }) + require.NoError(t, err) + + // Write multiple rows. + for _, key := range []string{"a#b", "c#d", "e#f"} { + _, err = s.MutateRow(ctx, &btpb.MutateRowRequest{ + TableName: fqSrc, + RowKey: []byte(key), + Mutations: []*btpb.Mutation{{ + Mutation: &btpb.Mutation_SetCell_{ + SetCell: &btpb.Mutation_SetCell{ + FamilyName: "cf1", + ColumnQualifier: []byte("c"), + TimestampMicros: 1000, + Value: []byte("v"), + }, + }, + }}, + }) + require.NoError(t, err) + } + + s.mu.Lock() + cmvTbl := s.tables[fqCMV] + s.mu.Unlock() + require.NotNil(t, cmvTbl) + assert.Equal(t, 3, cmvTbl.rows.Len()) + + // Drop all rows from source. + _, err = s.DropRowRange(ctx, &btapb.DropRowRangeRequest{ + Name: fqSrc, + Target: &btapb.DropRowRangeRequest_DeleteAllDataFromTable{DeleteAllDataFromTable: true}, + }) + require.NoError(t, err) + + assert.Equal(t, 0, cmvTbl.rows.Len()) +} + +func TestCMVTransformKeyOutOfBounds(t *testing.T) { + inst := &cmvInstance{ + config: CMVConfig{ + SourceTable: "my_table", + ViewID: "my_view", + KeySeparator: "#", + KeyMapping: []int{0, 99}, // index 99 is out of bounds + }, + } + // Should not panic; out-of-bounds index produces an empty component. + got := inst.transformKey("only#two#parts") + assert.Equal(t, "only#", got) +} + +func TestCreateMaterializedViewRPC(t *testing.T) { + ctx := context.Background() + dbFilename := newDBFile(t) + db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared", dbFilename)) + require.NoError(t, err) + t.Cleanup(func() { db.Close() }) + db.SetMaxOpenConns(1) + CreateTables(ctx, db) + + s := &server{ + tables: make(map[string]*table), + materializedViews: make(map[string]*btapb.MaterializedView), + db: db, + tableBackend: NewSqlTables(db), + cmvs: newCMVRegistry(), + } + + parent := "projects/test/instances/test" + mvSQL := "SELECT\n" + + " SPLIT(_key, '#')[SAFE_OFFSET(2)] AS region,\n" + + " SPLIT(_key, '#')[SAFE_OFFSET(3)] AS user_id,\n" + + " SPLIT(_key, '#')[SAFE_OFFSET(1)] AS ts,\n" + + " SPLIT(_key, '#')[SAFE_OFFSET(0)] AS device_id,\n" + + " _key AS src_key,\n" + + " data AS data\n" + + "FROM `sensor_readings`\n" + + "ORDER BY region, user_id, ts, device_id, src_key" + + op, err := s.CreateMaterializedView(ctx, &btapb.CreateMaterializedViewRequest{ + Parent: parent, + MaterializedViewId: "readings_by_region", + MaterializedView: &btapb.MaterializedView{Query: mvSQL}, + }) + require.NoError(t, err) + assert.True(t, op.Done) + + mv, err := s.GetMaterializedView(ctx, &btapb.GetMaterializedViewRequest{ + Name: parent + "/materializedViews/readings_by_region", + }) + require.NoError(t, err) + assert.Equal(t, "readings_by_region", mv.Name[strings.LastIndex(mv.Name, "/")+1:]) + assert.Equal(t, mvSQL, mv.Query) + + list, err := s.ListMaterializedViews(ctx, &btapb.ListMaterializedViewsRequest{Parent: parent}) + require.NoError(t, err) + assert.Len(t, list.MaterializedViews, 1) + + // CMV should fire on writes to the source table. + _, err = s.CreateTable(ctx, &btapb.CreateTableRequest{ + Parent: parent, + TableId: "sensor_readings", + Table: &btapb.Table{ColumnFamilies: map[string]*btapb.ColumnFamily{"data": {}}}, + }) + require.NoError(t, err) + + _, err = s.MutateRow(ctx, &btpb.MutateRowRequest{ + TableName: parent + "/tables/sensor_readings", + RowKey: []byte("device-1#ts-100#us-east#user-42"), + Mutations: []*btpb.Mutation{{ + Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ + FamilyName: "data", ColumnQualifier: []byte("temp"), Value: []byte("72"), + }}, + }}, + }) + require.NoError(t, err) + + fqCMV := parent + "/tables/readings_by_region" + cmvTbl := s.tables[fqCMV] + require.NotNil(t, cmvTbl, "CMV shadow table should have been auto-created") + + // DeleteMaterializedView should remove the view. + _, err = s.DeleteMaterializedView(ctx, &btapb.DeleteMaterializedViewRequest{ + Name: parent + "/materializedViews/readings_by_region", + }) + require.NoError(t, err) + list, err = s.ListMaterializedViews(ctx, &btapb.ListMaterializedViewsRequest{Parent: parent}) + require.NoError(t, err) + assert.Len(t, list.MaterializedViews, 0) +} + +func TestCMVDropRowRangePrefix(t *testing.T) { + ctx := context.Background() + // Key mapping [1,0]: CMV key = source[1]#source[0] (swap two components). + configs := []CMVConfig{{ + SourceTable: "src_table", + ViewID: "src_cmv", + KeySeparator: "#", + KeyMapping: []int{1, 0}, + }} + s := newTestServerWithCMV(t, configs) + + parent := "projects/test/instances/test" + fqSrc := parent + "/tables/src_table" + fqCMV := parent + "/tables/src_cmv" + + _, err := s.CreateTable(ctx, &btapb.CreateTableRequest{ + Parent: parent, + TableId: "src_table", + Table: &btapb.Table{ + ColumnFamilies: map[string]*btapb.ColumnFamily{ + "cf1": {}, + }, + }, + }) + require.NoError(t, err) + + // Write three rows: two share the prefix "alpha#" and one does not. + for _, key := range []string{"alpha#one", "alpha#two", "beta#three"} { + _, err = s.MutateRow(ctx, &btpb.MutateRowRequest{ + TableName: fqSrc, + RowKey: []byte(key), + Mutations: []*btpb.Mutation{{ + Mutation: &btpb.Mutation_SetCell_{ + SetCell: &btpb.Mutation_SetCell{ + FamilyName: "cf1", + ColumnQualifier: []byte("c"), + TimestampMicros: 1000, + Value: []byte("v"), + }, + }, + }}, + }) + require.NoError(t, err) + } + + s.mu.Lock() + cmvTbl := s.tables[fqCMV] + s.mu.Unlock() + require.NotNil(t, cmvTbl) + assert.Equal(t, 3, cmvTbl.rows.Len()) + + // Drop source rows with prefix "alpha#". + _, err = s.DropRowRange(ctx, &btapb.DropRowRangeRequest{ + Name: fqSrc, + Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte("alpha#")}, + }) + require.NoError(t, err) + + // CMV should now have only 1 row: "three#beta" (from "beta#three"). + assert.Equal(t, 1, cmvTbl.rows.Len()) + // The remaining CMV row should be the transformed "beta#three" → "three#beta". + cmvRow := cmvTbl.rows.Get(btreeKey("three#beta")) + assert.NotNil(t, cmvRow, "CMV row for non-deleted source should still exist") +} + +func newTestInstanceServer(t *testing.T) (*server, string) { + t.Helper() + ctx := context.Background() + db, err := sql.Open("sqlite3", ":memory:") + require.NoError(t, err) + t.Cleanup(func() { db.Close() }) + db.SetMaxOpenConns(1) + CreateTables(ctx, db) + s := &server{ + tables: make(map[string]*table), + materializedViews: make(map[string]*btapb.MaterializedView), + db: db, + tableBackend: NewSqlTables(db), + cmvs: newCMVRegistry(), + } + return s, "projects/test/instances/test" +} + +func TestDeletionProtection_BlocksDelete(t *testing.T) { + ctx := context.Background() + s, parent := newTestInstanceServer(t) + + mvSQL := "SELECT SPLIT(_key, '#')[SAFE_OFFSET(0)] AS a FROM `src` ORDER BY a" + _, err := s.CreateMaterializedView(ctx, &btapb.CreateMaterializedViewRequest{ + Parent: parent, + MaterializedViewId: "protected_view", + MaterializedView: &btapb.MaterializedView{Query: mvSQL, DeletionProtection: true}, + }) + require.NoError(t, err) + + // Delete should fail while DeletionProtection is enabled. + _, err = s.DeleteMaterializedView(ctx, &btapb.DeleteMaterializedViewRequest{ + Name: parent + "/materializedViews/protected_view", + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "protected against deletion") + + // Verify the view still exists. + mv, err := s.GetMaterializedView(ctx, &btapb.GetMaterializedViewRequest{ + Name: parent + "/materializedViews/protected_view", + }) + require.NoError(t, err) + assert.True(t, mv.DeletionProtection) +} + +func TestDeletionProtection_UpdateThenDelete(t *testing.T) { + ctx := context.Background() + s, parent := newTestInstanceServer(t) + + mvSQL := "SELECT SPLIT(_key, '#')[SAFE_OFFSET(0)] AS a FROM `src` ORDER BY a" + _, err := s.CreateMaterializedView(ctx, &btapb.CreateMaterializedViewRequest{ + Parent: parent, + MaterializedViewId: "protected_view", + MaterializedView: &btapb.MaterializedView{Query: mvSQL, DeletionProtection: true}, + }) + require.NoError(t, err) + + // Unprotect via UpdateMaterializedView. + _, err = s.UpdateMaterializedView(ctx, &btapb.UpdateMaterializedViewRequest{ + MaterializedView: &btapb.MaterializedView{ + Name: parent + "/materializedViews/protected_view", + DeletionProtection: false, + }, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"deletion_protection"}}, + }) + require.NoError(t, err) + + // Confirm the flag was cleared. + mv, err := s.GetMaterializedView(ctx, &btapb.GetMaterializedViewRequest{ + Name: parent + "/materializedViews/protected_view", + }) + require.NoError(t, err) + assert.False(t, mv.DeletionProtection) + + // Delete should now succeed. + _, err = s.DeleteMaterializedView(ctx, &btapb.DeleteMaterializedViewRequest{ + Name: parent + "/materializedViews/protected_view", + }) + require.NoError(t, err) +} diff --git a/bttest/inmem.go b/bttest/inmem.go index e340f07..3af13a6 100644 --- a/bttest/inmem.go +++ b/bttest/inmem.go @@ -94,6 +94,9 @@ type server struct { gcc chan int // set when gcloop starts, closed when server shuts down db *sql.DB tableBackend *SqlTables + cmvs *cmvRegistry + + materializedViews map[string]*btapb.MaterializedView // keyed by full resource name // Any unimplemented methods will cause a panic. btapb.BigtableTableAdminServer @@ -115,10 +118,12 @@ func NewServer(laddr string, db *sql.DB, opt ...grpc.ServerOption) (*Server, err l: l, srv: grpc.NewServer(opt...), s: &server{ - tables: make(map[string]*table), - instances: make(map[string]*btapb.Instance), - db: db, - tableBackend: NewSqlTables(db), + tables: make(map[string]*table), + instances: make(map[string]*btapb.Instance), + materializedViews: make(map[string]*btapb.MaterializedView), + db: db, + tableBackend: NewSqlTables(db), + cmvs: newCMVRegistry(), }, } opsServer := &operationsServer{ @@ -135,6 +140,17 @@ func NewServer(laddr string, db *sql.DB, opt ...grpc.ServerOption) (*Server, err return s, nil } +// RegisterCMVs pre-loads CMV configs for local testing (used with --cmv-config). +// This is equivalent to calling CreateMaterializedView for each config but does +// not require a project/instance path and bypasses SQL parsing. +func (s *Server) RegisterCMVs(configs []CMVConfig) { + s.s.mu.Lock() + defer s.s.mu.Unlock() + for _, cfg := range configs { + s.s.cmvs.register(cfg) + } +} + // Close shuts down the server. func (s *Server) Close() { s.s.mu.Lock() @@ -154,6 +170,113 @@ func (s *server) LoadTables() { } } +// ensureCMVTable creates the shadow table for a CMV if it doesn't already exist. +// Copies column families from the source table, filtered by IncludeFamilies. +// Must be called with s.mu held. +func (s *server) ensureCMVTable(cmv *cmvInstance, sourceTbl *table) { + fqView := cmv.parent + "/tables/" + cmv.config.ViewID + if _, exists := s.tables[fqView]; exists { + return + } + + fams := make(map[string]*columnFamily) + var c uint64 + sourceTbl.mu.RLock() + for id, cf := range sourceTbl.families { + if cmv.shouldIncludeFamily(id) { + fams[id] = &columnFamily{ + Name: fqView + "/columnFamilies/" + id, + Order: c, + GCRule: cf.GCRule, + } + c++ + } + } + sourceTbl.mu.RUnlock() + + viewTbl := &table{ + parent: cmv.parent, + tableId: cmv.config.ViewID, + families: fams, + counter: c, + rows: NewSqlRows(s.db, cmv.parent, cmv.config.ViewID), + } + s.tables[fqView] = viewTbl + s.tableBackend.Save(viewTbl) + log.Printf("created CMV shadow table %q for source %q", cmv.config.ViewID, cmv.config.SourceTable) +} + +// syncCMVRow writes/updates the CMV shadow row for a given source row mutation. +// Must NOT be called with s.mu held (acquires its own locks). +func (s *server) syncCMVRow(fqSourceTable string, sourceRow *row) { + cmvs := s.cmvs.cmvsForTable(fqSourceTable) + if len(cmvs) == 0 { + return + } + + s.mu.Lock() + sourceTbl := s.tables[fqSourceTable] + for _, cmv := range cmvs { + s.ensureCMVTable(cmv, sourceTbl) + fqView := cmv.parent + "/tables/" + cmv.config.ViewID + viewTbl := s.tables[fqView] + if viewTbl == nil { + continue + } + cmvRow := cmv.buildCMVRow(sourceRow) + viewTbl.mu.Lock() + viewTbl.rows.ReplaceOrInsert(cmvRow) + viewTbl.mu.Unlock() + } + s.mu.Unlock() +} + +// deleteCMVRow removes the corresponding CMV row when a source row is deleted. +// Must NOT be called with s.mu held (acquires its own locks). +func (s *server) deleteCMVRow(fqSourceTable string, sourceKey string) { + cmvs := s.cmvs.cmvsForTable(fqSourceTable) + if len(cmvs) == 0 { + return + } + + s.mu.Lock() + for _, cmv := range cmvs { + fqView := cmv.parent + "/tables/" + cmv.config.ViewID + viewTbl := s.tables[fqView] + if viewTbl == nil { + continue + } + cmvKey := cmv.deriveCMVKey(sourceKey) + viewTbl.mu.Lock() + viewTbl.rows.Delete(btreeKey(cmvKey)) + viewTbl.mu.Unlock() + } + s.mu.Unlock() +} + +// dropCMVAllRows clears all rows from CMV shadow tables when the source table is +// fully truncated via DropRowRange(DeleteAllDataFromTable=true). +// Must NOT be called with s.mu held (acquires its own locks). +func (s *server) dropCMVAllRows(fqSourceTable string) { + cmvs := s.cmvs.cmvsForTable(fqSourceTable) + if len(cmvs) == 0 { + return + } + + s.mu.Lock() + for _, cmv := range cmvs { + fqView := cmv.parent + "/tables/" + cmv.config.ViewID + viewTbl := s.tables[fqView] + if viewTbl == nil { + continue + } + viewTbl.mu.Lock() + viewTbl.rows.DeleteAll() + viewTbl.mu.Unlock() + } + s.mu.Unlock() +} + func (s *server) CreateTable(ctx context.Context, req *btapb.CreateTableRequest) (*btapb.Table, error) { tbl := req.Parent + "/tables/" + req.TableId @@ -249,12 +372,33 @@ func (s *server) UpdateTable(ctx context.Context, req *btapb.UpdateTableRequest) func (s *server) DeleteTable(ctx context.Context, req *btapb.DeleteTableRequest) (*emptypb.Empty, error) { s.mu.Lock() defer s.mu.Unlock() - if tbl, ok := s.tables[req.Name]; !ok { + tbl, ok := s.tables[req.Name] + if !ok { return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name) - } else { - s.tableBackend.Delete(tbl) - tbl.rows.DeleteAll() - delete(s.tables, req.Name) + } + s.tableBackend.Delete(tbl) + tbl.rows.DeleteAll() + delete(s.tables, req.Name) + + // Clean up any CMVs that use this table as their source. + idx := strings.LastIndex(req.Name, "/tables/") + if idx >= 0 { + parent := req.Name[:idx] + tableID := req.Name[idx+len("/tables/"):] + for _, viewID := range s.cmvs.deregisterBySource(tableID) { + fqShadow := parent + "/tables/" + viewID + if shadowTbl, exists := s.tables[fqShadow]; exists { + s.tableBackend.Delete(shadowTbl) + shadowTbl.rows.DeleteAll() + delete(s.tables, fqShadow) + } + for mvName := range s.materializedViews { + if strings.HasSuffix(mvName, "/materializedViews/"+viewID) { + delete(s.materializedViews, mvName) + break + } + } + } } return &emptypb.Empty{}, nil } @@ -323,14 +467,17 @@ func (s *server) DropRowRange(ctx context.Context, req *btapb.DropRowRangeReques return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name) } + deleteAll := req.GetDeleteAllDataFromTable() + var deletedKeys []string // populated for prefix case; nil for deleteAll + tbl.mu.Lock() - defer tbl.mu.Unlock() - if req.GetDeleteAllDataFromTable() { + if deleteAll { tbl.rows.DeleteAll() } else { // Delete rows by prefix. prefixBytes := req.GetRowKeyPrefix() if prefixBytes == nil { + tbl.mu.Unlock() return nil, fmt.Errorf("missing row key prefix") } prefix := string(prefixBytes) @@ -349,8 +496,20 @@ func (s *server) DropRowRange(ctx context.Context, req *btapb.DropRowRangeReques }) for _, r := range rowsToDelete { tbl.rows.Delete(r) + deletedKeys = append(deletedKeys, r.key) + } + } + tbl.mu.Unlock() + + // Propagate to CMV shadow tables. + if deleteAll { + s.dropCMVAllRows(req.Name) + } else { + for _, key := range deletedKeys { + s.deleteCMVRow(req.Name, key) } } + return &emptypb.Empty{}, nil } @@ -943,10 +1102,19 @@ func (s *server) MutateRow(ctx context.Context, req *btpb.MutateRowRequest) (*bt } fs := tbl.columnFamilies() + // Check if any mutation is a full row delete (for CMV propagation). + hasDeleteFromRow := false + for _, mut := range req.Mutations { + if _, ok := mut.Mutation.(*btpb.Mutation_DeleteFromRow_); ok { + hasDeleteFromRow = true + break + } + } + tbl.mu.Lock() - defer tbl.mu.Unlock() r := tbl.mutableRow(string(req.RowKey)) if err := applyMutations(tbl, r, req.Mutations, fs); err != nil { + tbl.mu.Unlock() return nil, err } // JIT per-row GC @@ -959,6 +1127,16 @@ func (s *server) MutateRow(ctx context.Context, req *btpb.MutateRowRequest) (*bt } tbl.rows.ReplaceOrInsert(r) + rowCopy := r.copy() + tbl.mu.Unlock() + + // Propagate to CMV shadow tables. + if hasDeleteFromRow || rowCopy.isEmpty() { + s.deleteCMVRow(req.TableName, string(req.RowKey)) + } else { + s.syncCMVRow(req.TableName, rowCopy) + } + return &btpb.MutateRowResponse{}, nil } @@ -981,9 +1159,15 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu } res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))} + type cmvAction struct { + key string + rowCopy *row + deleted bool + } + var cmvActions []cmvAction + cfs := tbl.columnFamilies() tbl.mu.Lock() - defer tbl.mu.Unlock() for i, entry := range req.Entries { r := tbl.mutableRow(string(entry.RowKey)) code, msg := int32(codes.OK), "" @@ -1003,7 +1187,33 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu } } tbl.rows.ReplaceOrInsert(r) + + if code == int32(codes.OK) { + deleted := false + for _, mut := range entry.Mutations { + if _, ok := mut.Mutation.(*btpb.Mutation_DeleteFromRow_); ok { + deleted = true + break + } + } + cmvActions = append(cmvActions, cmvAction{ + key: string(entry.RowKey), + rowCopy: r.copy(), + deleted: deleted || r.isEmpty(), + }) + } } + tbl.mu.Unlock() + + // Propagate to CMV shadow tables after releasing the source table lock. + for _, action := range cmvActions { + if action.deleted { + s.deleteCMVRow(req.TableName, action.key) + } else { + s.syncCMVRow(req.TableName, action.rowCopy) + } + } + return stream.Send(res) } @@ -1018,7 +1228,6 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate cfs := tbl.columnFamilies() tbl.mu.Lock() - defer tbl.mu.Unlock() r := tbl.mutableRow(string(req.RowKey)) // Figure out which mutation to apply. @@ -1033,6 +1242,7 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate match, err := filterRow(req.PredicateFilter, nr) if err != nil { + tbl.mu.Unlock() return nil, err } whichMut = match && !nr.isEmpty() @@ -1044,6 +1254,7 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate } if err := applyMutations(tbl, r, muts, cfs); err != nil { + tbl.mu.Unlock() return nil, err } r.gc(tbl.gcRulesNoLock()) @@ -1054,6 +1265,25 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate } } tbl.rows.ReplaceOrInsert(r) + + // Determine if this was a delete-type operation. + hasDelete := false + for _, mut := range muts { + if _, ok := mut.Mutation.(*btpb.Mutation_DeleteFromRow_); ok { + hasDelete = true + break + } + } + rowCopy := r.copy() + tbl.mu.Unlock() + + // Propagate to CMV shadow tables. + if hasDelete || rowCopy.isEmpty() { + s.deleteCMVRow(req.TableName, string(req.RowKey)) + } else { + s.syncCMVRow(req.TableName, rowCopy) + } + return res, nil } @@ -1184,7 +1414,6 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri cfs := tbl.columnFamilies() tbl.mu.Lock() - defer tbl.mu.Unlock() rowKey := string(req.RowKey) r := tbl.mutableRow(rowKey) resultRow := newRow(rowKey) // copy of updated cells @@ -1193,6 +1422,7 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri // TODO(dsymonds): Verify this assumption and document it in the proto. for _, rule := range req.Rules { if _, ok := cfs[rule.FamilyName]; !ok { + tbl.mu.Unlock() return nil, fmt.Errorf("unknown family %q", rule.FamilyName) } @@ -1216,6 +1446,7 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri switch rule := rule.Rule.(type) { default: + tbl.mu.Unlock() return nil, fmt.Errorf("unknown RMW rule oneof %T", rule) case *btpb.ReadModifyWriteRule_AppendValue: newCell = cell{Ts: ts, Value: append(prevCell.Value, rule.AppendValue...)} @@ -1224,6 +1455,7 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri if !isEmpty { prevVal := prevCell.Value if len(prevVal) != 8 { + tbl.mu.Unlock() return nil, fmt.Errorf("increment on non-64-bit value") } v = int64(binary.BigEndian.Uint64(prevVal)) @@ -1250,6 +1482,11 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri } } tbl.rows.ReplaceOrInsert(r) + rowCopy := r.copy() + tbl.mu.Unlock() + + // Propagate to CMV shadow tables. + s.syncCMVRow(req.TableName, rowCopy) // Build the response using the result row res := &btpb.Row{ diff --git a/bttest/inmem_test.go b/bttest/inmem_test.go index 7908f73..9bba5ef 100644 --- a/bttest/inmem_test.go +++ b/bttest/inmem_test.go @@ -78,6 +78,7 @@ func newTestServer(t *testing.T) *server { tables: make(map[string]*table), db: db, tableBackend: NewSqlTables(db), + cmvs: newCMVRegistry(), } return s } diff --git a/bttest/instance_server.go b/bttest/instance_server.go index f8892fa..3aeca79 100644 --- a/bttest/instance_server.go +++ b/bttest/instance_server.go @@ -16,7 +16,10 @@ package bttest import ( "context" + "fmt" "regexp" + "strings" + "time" btapb "cloud.google.com/go/bigtable/admin/apiv2/adminpb" "cloud.google.com/go/iam/apiv1/iampb" @@ -24,6 +27,7 @@ import ( "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" ) var _ btapb.BigtableInstanceAdminServer = (*server)(nil) @@ -139,3 +143,154 @@ func (s *server) TestIamPermissions(ctx context.Context, req *iampb.TestIamPermi func (s *server) ListHotTablets(ctx context.Context, req *btapb.ListHotTabletsRequest) (*btapb.ListHotTabletsResponse, error) { return nil, errUnimplemented } + +// CreateMaterializedView parses the SQL query in the request, registers a CMV +// config on the server, and stores the view metadata for later retrieval. +func (s *server) CreateMaterializedView(ctx context.Context, req *btapb.CreateMaterializedViewRequest) (*longrunning.Operation, error) { + if req.MaterializedViewId == "" { + return nil, status.Errorf(codes.InvalidArgument, "materialized_view_id is required") + } + mv := req.GetMaterializedView() + if mv == nil || mv.Query == "" { + return nil, status.Errorf(codes.InvalidArgument, "materialized_view.query is required") + } + + cfg, err := ParseCMVConfigFromSQL(req.MaterializedViewId, mv.Query) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid materialized view query: %v", err) + } + + name := req.Parent + "/materializedViews/" + req.MaterializedViewId + + s.mu.Lock() + defer s.mu.Unlock() + + if _, exists := s.materializedViews[name]; exists { + return nil, status.Errorf(codes.AlreadyExists, "materialized view %q already exists", name) + } + + s.cmvs.register(*cfg) + stored := &btapb.MaterializedView{ + Name: name, + Query: mv.Query, + DeletionProtection: mv.DeletionProtection, + } + s.materializedViews[name] = stored + + respAny, err := anypb.New(stored) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to wrap result: %v", err) + } + return &longrunning.Operation{ + Name: fmt.Sprintf("operations/op-%d", time.Now().UnixNano()), + Done: true, + Result: &longrunning.Operation_Response{Response: respAny}, + }, nil +} + +func (s *server) GetMaterializedView(ctx context.Context, req *btapb.GetMaterializedViewRequest) (*btapb.MaterializedView, error) { + s.mu.Lock() + defer s.mu.Unlock() + + mv, ok := s.materializedViews[req.Name] + if !ok { + return nil, status.Errorf(codes.NotFound, "materialized view %q not found", req.Name) + } + return mv, nil +} + +func (s *server) ListMaterializedViews(ctx context.Context, req *btapb.ListMaterializedViewsRequest) (*btapb.ListMaterializedViewsResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + var views []*btapb.MaterializedView + for name, mv := range s.materializedViews { + if strings.HasPrefix(name, req.Parent+"/") { + views = append(views, mv) + } + } + return &btapb.ListMaterializedViewsResponse{MaterializedViews: views}, nil +} + +// UpdateMaterializedView supports toggling DeletionProtection. Query changes +// are not supported since CMV queries are immutable after creation. +func (s *server) UpdateMaterializedView(ctx context.Context, req *btapb.UpdateMaterializedViewRequest) (*longrunning.Operation, error) { + mv := req.GetMaterializedView() + if mv == nil { + return nil, status.Errorf(codes.InvalidArgument, "materialized_view is required") + } + + s.mu.Lock() + defer s.mu.Unlock() + + stored, ok := s.materializedViews[mv.Name] + if !ok { + return nil, status.Errorf(codes.NotFound, "materialized view %q not found", mv.Name) + } + + for _, path := range req.GetUpdateMask().GetPaths() { + switch path { + case "deletion_protection": + stored.DeletionProtection = mv.DeletionProtection + default: + return nil, status.Errorf(codes.InvalidArgument, "unsupported update field: %q", path) + } + } + + respAny, err := anypb.New(stored) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to wrap result: %v", err) + } + return &longrunning.Operation{ + Name: fmt.Sprintf("operations/op-%d", time.Now().UnixNano()), + Done: true, + Result: &longrunning.Operation_Response{Response: respAny}, + }, nil +} + +func (s *server) DeleteMaterializedView(ctx context.Context, req *btapb.DeleteMaterializedViewRequest) (*empty.Empty, error) { + s.mu.Lock() + defer s.mu.Unlock() + + mv, ok := s.materializedViews[req.Name] + if !ok { + return nil, status.Errorf(codes.NotFound, "materialized view %q not found", req.Name) + } + if mv.DeletionProtection { + return nil, status.Errorf(codes.FailedPrecondition, "materialized view %q is protected against deletion", req.Name) + } + + // Extract parent and view ID from the full resource name. + parts := strings.Split(mv.Name, "/materializedViews/") + if len(parts) == 2 { + s.cmvs.deregister(parts[1]) + fqShadow := parts[0] + "/tables/" + parts[1] + if shadowTbl, exists := s.tables[fqShadow]; exists { + s.tableBackend.Delete(shadowTbl) + shadowTbl.rows.DeleteAll() + delete(s.tables, fqShadow) + } + } + delete(s.materializedViews, req.Name) + return new(empty.Empty), nil +} + +func (s *server) CreateLogicalView(ctx context.Context, req *btapb.CreateLogicalViewRequest) (*longrunning.Operation, error) { + return nil, errUnimplemented +} + +func (s *server) GetLogicalView(ctx context.Context, req *btapb.GetLogicalViewRequest) (*btapb.LogicalView, error) { + return nil, errUnimplemented +} + +func (s *server) ListLogicalViews(ctx context.Context, req *btapb.ListLogicalViewsRequest) (*btapb.ListLogicalViewsResponse, error) { + return nil, errUnimplemented +} + +func (s *server) UpdateLogicalView(ctx context.Context, req *btapb.UpdateLogicalViewRequest) (*longrunning.Operation, error) { + return nil, errUnimplemented +} + +func (s *server) DeleteLogicalView(ctx context.Context, req *btapb.DeleteLogicalViewRequest) (*empty.Empty, error) { + return nil, errUnimplemented +} diff --git a/bttest/sql_parse.go b/bttest/sql_parse.go new file mode 100644 index 0000000..b512b3b --- /dev/null +++ b/bttest/sql_parse.go @@ -0,0 +1,150 @@ +package bttest + +import ( + "fmt" + "regexp" + "strconv" + "strings" +) + +// Compiled patterns for CMV SQL parsing. Bigtable CMV queries follow a +// restricted GoogleSQL subset using SPLIT(_key, sep)[SAFE_OFFSET(n)] AS alias +// to extract row key components. +var ( + reFrom = regexp.MustCompile("(?i)FROM\\s+`([^`]+)`") + reGroupBy = regexp.MustCompile(`(?i)\bGROUP\s+BY\b`) + + // Matches: SPLIT(_key, '#')[SAFE_OFFSET(3)] AS alias + reSplitOffset = regexp.MustCompile( + `(?i)SPLIT\(_key,\s*'([^']+)'\)\[SAFE_OFFSET\((\d+)\)\]\s+AS\s+(\w+)`) + + reKeyAlias = regexp.MustCompile(`(?i)\b_key\s+AS\s+(\w+)`) + reOrderBy = regexp.MustCompile(`(?i)ORDER\s+BY\s+(.+)$`) +) + +// ParseCMVConfigFromSQL extracts a CMVConfig from a Bigtable CMV SQL query. +// +// Only ORDER BY (secondary index) queries are supported. GROUP BY +// (aggregation) queries require maintaining running aggregates and are +// not implemented by the emulator. +// +// WHERE clauses are silently ignored; the emulator propagates all source +// writes to the CMV regardless of any filter predicate. +// +// Supported SELECT column forms: +// +// SPLIT(_key, '')[SAFE_OFFSET()] AS +// _key AS → sets AppendSourceKey when alias is in ORDER BY +// AS → adds to IncludeFamilies +// +// The ORDER BY clause determines the CMV key component ordering. +func ParseCMVConfigFromSQL(viewID, query string) (*CMVConfig, error) { + if reGroupBy.MatchString(query) { + return nil, fmt.Errorf("GROUP BY (aggregation) queries are not supported; only ORDER BY (key re-mapping) CMVs are emulated") + } + + cfg := &CMVConfig{ViewID: viewID} + + sourceTable, err := parseSourceTable(query) + if err != nil { + return nil, err + } + cfg.SourceTable = sourceTable + + colMap, sep, err := parseSplitColumns(query) + if err != nil { + return nil, err + } + cfg.KeySeparator = sep + + sourceKeyAlias := parseSourceKeyAlias(query) + + keyMapping, appendSourceKey, err := parseOrderBy(query, colMap, sourceKeyAlias) + if err != nil { + return nil, err + } + cfg.KeyMapping = keyMapping + cfg.AppendSourceKey = appendSourceKey + + cfg.IncludeFamilies = parseFamilies(query, colMap) + + return cfg, nil +} + +// parseSourceTable extracts the table name from "FROM `table_name`". +func parseSourceTable(query string) (string, error) { + m := reFrom.FindStringSubmatch(query) + if m == nil { + return "", fmt.Errorf("could not parse FROM clause in CMV query") + } + return m[1], nil +} + +// parseSplitColumns extracts SPLIT(_key, sep)[SAFE_OFFSET(n)] AS alias +// expressions and returns a map of alias → offset index plus the separator. +func parseSplitColumns(query string) (colMap map[string]int, sep string, err error) { + matches := reSplitOffset.FindAllStringSubmatch(query, -1) + if len(matches) == 0 { + return nil, "", fmt.Errorf("could not parse SPLIT/SAFE_OFFSET expressions in CMV query") + } + colMap = make(map[string]int, len(matches)) + for _, m := range matches { + sep = m[1] + idx, _ := strconv.Atoi(m[2]) + colMap[m[3]] = idx + } + return colMap, sep, nil +} + +// parseSourceKeyAlias detects "_key AS " in the SELECT clause. +// Returns the alias if found, empty string otherwise. +func parseSourceKeyAlias(query string) string { + if m := reKeyAlias.FindStringSubmatch(query); m != nil { + return m[1] + } + return "" +} + +// parseOrderBy processes the ORDER BY clause to build the key mapping. +// Columns that reference SPLIT aliases become key mapping entries; +// _key or its alias sets appendSourceKey. +func parseOrderBy(query string, colMap map[string]int, sourceKeyAlias string) (keyMapping []int, appendSourceKey bool, err error) { + m := reOrderBy.FindStringSubmatch(strings.TrimSpace(query)) + if m == nil { + return nil, false, fmt.Errorf("could not parse ORDER BY clause in CMV query") + } + for _, col := range strings.Split(m[1], ",") { + col = strings.TrimSpace(col) + if col == "_key" || (sourceKeyAlias != "" && col == sourceKeyAlias) { + appendSourceKey = true + continue + } + idx, ok := colMap[col] + if !ok { + return nil, false, fmt.Errorf("ORDER BY column %q not found in SELECT", col) + } + keyMapping = append(keyMapping, idx) + } + if len(keyMapping) == 0 { + return nil, false, fmt.Errorf("no key mapping columns found in ORDER BY") + } + return keyMapping, appendSourceKey, nil +} + +// parseFamilies extracts column family inclusions from the SELECT clause. +// A family is identified by the pattern " AS " where both sides +// match and the name is not a SPLIT column alias or "_key". +func parseFamilies(query string, colMap map[string]int) []string { + famRe := regexp.MustCompile(`(?:,\s*)(\w+)\s+AS\s+(\w+)`) + matches := famRe.FindAllStringSubmatch(query, -1) + var families []string + for _, m := range matches { + src, alias := m[1], m[2] + if src == alias && src != "_key" { + if _, isCol := colMap[src]; !isCol { + families = append(families, src) + } + } + } + return families +} diff --git a/bttest/sql_parse_test.go b/bttest/sql_parse_test.go new file mode 100644 index 0000000..9201cf0 --- /dev/null +++ b/bttest/sql_parse_test.go @@ -0,0 +1,126 @@ +package bttest + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseCMVConfigFromSQL(t *testing.T) { + tests := []struct { + query string + expectErr string + expectCfg *CMVConfig + }{ + { + query: `SELECT + SPLIT(_key, '#')[SAFE_OFFSET(2)] AS region, + SPLIT(_key, '#')[SAFE_OFFSET(3)] AS user_id, + SPLIT(_key, '#')[SAFE_OFFSET(1)] AS ts, + SPLIT(_key, '#')[SAFE_OFFSET(0)] AS device_id, + _key AS src_key, + info AS info, + stats AS stats +FROM ` + "`sensor_readings`" + ` +ORDER BY region, user_id, ts, device_id, src_key`, + expectCfg: &CMVConfig{ + SourceTable: "sensor_readings", + KeySeparator: "#", + KeyMapping: []int{2, 3, 1, 0}, + IncludeFamilies: []string{"info", "stats"}, + AppendSourceKey: true, + }, + }, + { + query: `SELECT + SPLIT(_key, '#')[SAFE_OFFSET(2)] AS c, + SPLIT(_key, '#')[SAFE_OFFSET(0)] AS a, + SPLIT(_key, '#')[SAFE_OFFSET(1)] AS b, + cf1 AS cf1 +FROM ` + "`my_table`" + ` +ORDER BY c, a, b`, + expectCfg: &CMVConfig{ + SourceTable: "my_table", + KeySeparator: "#", + KeyMapping: []int{2, 0, 1}, + IncludeFamilies: []string{"cf1"}, + }, + }, + { + query: `SELECT + SPLIT(_key, '#')[SAFE_OFFSET(1)] AS b, + SPLIT(_key, '#')[SAFE_OFFSET(0)] AS a, + meta AS meta, + logs AS logs, + tags AS tags, + raw AS raw +FROM ` + "`source`" + ` +ORDER BY b, a`, + expectCfg: &CMVConfig{ + SourceTable: "source", + KeySeparator: "#", + KeyMapping: []int{1, 0}, + IncludeFamilies: []string{"meta", "logs", "tags", "raw"}, + }, + }, + { + query: `SELECT SPLIT(_key, '#')[SAFE_OFFSET(0)] AS a ORDER BY a`, + expectErr: "FROM", + }, + { + query: "SELECT a FROM `t` ORDER BY a", + expectErr: "SPLIT", + }, + { + query: "SELECT SPLIT(_key, '#')[SAFE_OFFSET(0)] AS a FROM `t`", + expectErr: "ORDER BY", + }, + { + query: `SELECT + SPLIT(_key, '#')[SAFE_OFFSET(0)] AS a +FROM ` + "`t`" + ` +ORDER BY a, unknown_col`, + expectErr: "unknown_col", + }, + { + query: `SELECT + SPLIT(_key, '|')[SAFE_OFFSET(1)] AS b, + SPLIT(_key, '|')[SAFE_OFFSET(0)] AS a, + cf AS cf +FROM ` + "`t`" + ` +ORDER BY b, a`, + expectCfg: &CMVConfig{ + SourceTable: "t", + KeySeparator: "|", + KeyMapping: []int{1, 0}, + IncludeFamilies: []string{"cf"}, + }, + }, + { + // GROUP BY (aggregation) queries are not supported — the emulator only handles + // ORDER BY (key re-mapping) CMVs. Aggregation requires maintaining running state + // across writes, which is a fundamentally different execution model. + query: "SELECT _key, count(fam1['col1']) as count FROM `t1` GROUP BY _key", + expectErr: "GROUP BY", + }, + } + + for i, tc := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + cfg, err := ParseCMVConfigFromSQL("", tc.query) + + if tc.expectErr != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.expectErr) + return + } + + require.NoError(t, err) + require.NotNil(t, tc.expectCfg) + require.NotNil(t, cfg) + assert.Equal(t, *tc.expectCfg, *cfg) + }) + } +} diff --git a/little_bigtable.go b/little_bigtable.go index 29cb539..d29da16 100644 --- a/little_bigtable.go +++ b/little_bigtable.go @@ -19,13 +19,14 @@ import ( const ( maxMsgSize = 256 * 1024 * 1024 // 256 MiB - version = "0.1.1" + version = "0.2.0" ) func main() { host := flag.String("host", "localhost", "the address to bind to on the local machine") port := flag.Int("port", 9000, "the port number to bind to on the local machine") dbFile := flag.String("db-file", "little_bigtable.db", "path to data file") + cmvConfig := flag.String("cmv-config", "", "optional path to a JSON file pre-loading CMV configs for local testing") showVersion := flag.Bool("version", false, "show version") ctx := context.Background() @@ -62,6 +63,15 @@ func main() { log.Fatalf("failed to start emulator: %v", err) } + if *cmvConfig != "" { + configs, err := bttest.LoadCMVConfigs(*cmvConfig) + if err != nil { + log.Fatalf("failed to load cmv config: %v", err) + } + srv.RegisterCMVs(configs) + log.Printf("loaded %d CMV config(s) from %s", len(configs), *cmvConfig) + } + log.Printf("\"little\" Bigtable emulator running. DB:%s Connect with environment variable BIGTABLE_EMULATOR_HOST=%q", *dbFile, srv.Addr) select {} }