|
| 1 | +/* |
| 2 | +Copyright 2022 The Vitess Authors. |
| 3 | +
|
| 4 | +Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +you may not use this file except in compliance with the License. |
| 6 | +You may obtain a copy of the License at |
| 7 | +
|
| 8 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +
|
| 10 | +Unless required by applicable law or agreed to in writing, software |
| 11 | +distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +See the License for the specific language governing permissions and |
| 14 | +limitations under the License. |
| 15 | +*/ |
| 16 | + |
| 17 | +/* |
| 18 | +
|
| 19 | +A Vindex which uses a mapping lookup table `placement_map` to set the first `placement_prefix_bytes` of the Keyspace ID |
| 20 | +and another Vindex type `placement_sub_vindex_type` (which must support Hashing) as a sub-Vindex to set the rest. |
| 21 | +This is suitable for regional sharding (like region_json or region_experimental) but does not require a mapping file, |
| 22 | +and can support non-integer types for the sharding column. All parameters are prefixed with `placement_` so as to avoid |
| 23 | +conflict, because the `params` map is passed to the sub-Vindex as well. |
| 24 | +
|
| 25 | +*/ |
| 26 | + |
| 27 | +package vindexes |
| 28 | + |
| 29 | +import ( |
| 30 | + "bytes" |
| 31 | + "context" |
| 32 | + "encoding/binary" |
| 33 | + "fmt" |
| 34 | + "strconv" |
| 35 | + "strings" |
| 36 | + |
| 37 | + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" |
| 38 | + "vitess.io/vitess/go/vt/vterrors" |
| 39 | + |
| 40 | + "vitess.io/vitess/go/sqltypes" |
| 41 | + "vitess.io/vitess/go/vt/key" |
| 42 | +) |
| 43 | + |
| 44 | +var ( |
| 45 | + _ MultiColumn = (*Placement)(nil) |
| 46 | + |
| 47 | + PlacementRequiredParams = []string{ |
| 48 | + "placement_map", |
| 49 | + "placement_prefix_bytes", |
| 50 | + "placement_sub_vindex_type", |
| 51 | + } |
| 52 | +) |
| 53 | + |
| 54 | +func init() { |
| 55 | + Register("placement", NewPlacement) |
| 56 | +} |
| 57 | + |
| 58 | +type PlacementMap map[string]uint64 |
| 59 | + |
| 60 | +type Placement struct { |
| 61 | + name string |
| 62 | + placementMap PlacementMap |
| 63 | + subVindex Vindex |
| 64 | + subVindexType string |
| 65 | + subVindexName string |
| 66 | + prefixBytes int |
| 67 | +} |
| 68 | + |
| 69 | +// Parse a string containing a list of delimited string:integer key-value pairs, e.g. "foo:1,bar:2". |
| 70 | +func parsePlacementMap(s string) (*PlacementMap, error) { |
| 71 | + placementMap := make(PlacementMap) |
| 72 | + for _, entry := range strings.Split(s, ",") { |
| 73 | + if entry == "" { |
| 74 | + continue |
| 75 | + } |
| 76 | + |
| 77 | + kv := strings.Split(entry, ":") |
| 78 | + if len(kv) != 2 { |
| 79 | + return nil, fmt.Errorf("entry: %v; expected key:value", entry) |
| 80 | + } |
| 81 | + if kv[0] == "" { |
| 82 | + return nil, fmt.Errorf("entry: %v; unexpected empty key", entry) |
| 83 | + } |
| 84 | + if kv[1] == "" { |
| 85 | + return nil, fmt.Errorf("entry: %v; unexpected empty value", entry) |
| 86 | + } |
| 87 | + |
| 88 | + value, err := strconv.ParseUint(kv[1], 0, 64) |
| 89 | + if err != nil { |
| 90 | + return nil, fmt.Errorf("entry: %v; %v", entry, err) |
| 91 | + } |
| 92 | + placementMap[kv[0]] = value |
| 93 | + } |
| 94 | + return &placementMap, nil |
| 95 | +} |
| 96 | + |
| 97 | +func NewPlacement(name string, params map[string]string) (Vindex, error) { |
| 98 | + var missingParams []string |
| 99 | + for _, param := range PlacementRequiredParams { |
| 100 | + if params[param] == "" { |
| 101 | + missingParams = append(missingParams, param) |
| 102 | + } |
| 103 | + } |
| 104 | + |
| 105 | + if len(missingParams) > 0 { |
| 106 | + return nil, fmt.Errorf("missing params: %s", strings.Join(missingParams, ", ")) |
| 107 | + } |
| 108 | + |
| 109 | + placementMap, parseError := parsePlacementMap(params["placement_map"]) |
| 110 | + if parseError != nil { |
| 111 | + return nil, fmt.Errorf("malformed placement_map; %v", parseError) |
| 112 | + } |
| 113 | + |
| 114 | + prefixBytes, prefixError := strconv.Atoi(params["placement_prefix_bytes"]) |
| 115 | + if prefixError != nil { |
| 116 | + return nil, prefixError |
| 117 | + } |
| 118 | + |
| 119 | + if prefixBytes < 1 || prefixBytes > 7 { |
| 120 | + return nil, fmt.Errorf("invalid placement_prefix_bytes: %v; expected integer between 1 and 7", prefixBytes) |
| 121 | + } |
| 122 | + |
| 123 | + subVindexType := params["placement_sub_vindex_type"] |
| 124 | + subVindexName := fmt.Sprintf("%s_sub_vindex", name) |
| 125 | + subVindex, createVindexError := CreateVindex(subVindexType, subVindexName, params) |
| 126 | + if createVindexError != nil { |
| 127 | + return nil, fmt.Errorf("invalid placement_sub_vindex_type: %v", createVindexError) |
| 128 | + } |
| 129 | + |
| 130 | + // TODO: Should we support MultiColumn Vindex? |
| 131 | + if _, subVindexSupportsHashing := subVindex.(Hashing); !subVindexSupportsHashing { |
| 132 | + return nil, fmt.Errorf("invalid placement_sub_vindex_type: %v; does not support the Hashing interface", createVindexError) |
| 133 | + } |
| 134 | + |
| 135 | + return &Placement{ |
| 136 | + name: name, |
| 137 | + placementMap: *placementMap, |
| 138 | + subVindex: subVindex, |
| 139 | + subVindexType: subVindexType, |
| 140 | + subVindexName: subVindexName, |
| 141 | + prefixBytes: prefixBytes, |
| 142 | + }, nil |
| 143 | +} |
| 144 | + |
| 145 | +func (p *Placement) String() string { |
| 146 | + return p.name |
| 147 | +} |
| 148 | + |
| 149 | +func (p *Placement) Cost() int { |
| 150 | + return 1 |
| 151 | +} |
| 152 | + |
| 153 | +func (p *Placement) IsUnique() bool { |
| 154 | + return true |
| 155 | +} |
| 156 | + |
| 157 | +func (p *Placement) NeedsVCursor() bool { |
| 158 | + return false |
| 159 | +} |
| 160 | + |
| 161 | +func (p *Placement) PartialVindex() bool { |
| 162 | + return true |
| 163 | +} |
| 164 | + |
| 165 | +func makeDestinationPrefix(value uint64, prefixBytes int) []byte { |
| 166 | + destinationPrefix := make([]byte, 8) |
| 167 | + binary.BigEndian.PutUint64(destinationPrefix, value) |
| 168 | + if prefixBytes < 8 { |
| 169 | + // Shorten the prefix to the desired length. |
| 170 | + destinationPrefix = destinationPrefix[(8 - prefixBytes):] |
| 171 | + } |
| 172 | + |
| 173 | + return destinationPrefix |
| 174 | +} |
| 175 | + |
| 176 | +func (p *Placement) Map(ctx context.Context, vcursor VCursor, rowsColValues [][]sqltypes.Value) ([]key.Destination, error) { |
| 177 | + destinations := make([]key.Destination, 0, len(rowsColValues)) |
| 178 | + |
| 179 | + for _, row := range rowsColValues { |
| 180 | + if len(row) != 1 && len(row) != 2 { |
| 181 | + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of column values were passed: expected 1-2, got %d", len(row)) |
| 182 | + } |
| 183 | + |
| 184 | + // Calculate the destination prefix from the placement key which will be the same whether this is a partial |
| 185 | + // or full usage of the Vindex. |
| 186 | + placementKey := row[0].ToString() |
| 187 | + placementDestinationValue, placementMappingFound := p.placementMap[placementKey] |
| 188 | + if !placementMappingFound { |
| 189 | + destinations = append(destinations, key.DestinationNone{}) |
| 190 | + continue |
| 191 | + } |
| 192 | + |
| 193 | + placementDestinationPrefix := makeDestinationPrefix(placementDestinationValue, p.prefixBytes) |
| 194 | + |
| 195 | + if len(row) == 1 { // Partial Vindex usage with only the placement column provided. |
| 196 | + destinations = append(destinations, NewKeyRangeFromPrefix(placementDestinationPrefix)) |
| 197 | + } else if len(row) == 2 { // Full Vindex usage with the placement column and subVindex column provided. |
| 198 | + subVindexValue, hashingError := p.subVindex.(Hashing).Hash(row[1]) |
| 199 | + if hashingError != nil { |
| 200 | + return nil, hashingError // TODO: Should we be less fatal here and use DestinationNone? |
| 201 | + } |
| 202 | + |
| 203 | + // Concatenate and add to destinations. |
| 204 | + rowDestination := append(placementDestinationPrefix, subVindexValue...) |
| 205 | + destinations = append(destinations, key.DestinationKeyspaceID(rowDestination[0:8])) |
| 206 | + } |
| 207 | + } |
| 208 | + |
| 209 | + return destinations, nil |
| 210 | +} |
| 211 | + |
| 212 | +func (p *Placement) Verify(ctx context.Context, vcursor VCursor, rowsColValues [][]sqltypes.Value, keyspaceIDs [][]byte) ([]bool, error) { |
| 213 | + result := make([]bool, len(rowsColValues)) |
| 214 | + destinations, _ := p.Map(ctx, vcursor, rowsColValues) |
| 215 | + for i, destination := range destinations { |
| 216 | + switch d := destination.(type) { |
| 217 | + case key.DestinationKeyspaceID: |
| 218 | + result[i] = bytes.Equal(d, keyspaceIDs[i]) |
| 219 | + default: |
| 220 | + result[i] = false |
| 221 | + } |
| 222 | + } |
| 223 | + return result, nil |
| 224 | +} |
0 commit comments