Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.

Commit 6784ab8

Browse files
authored
Merge pull request #1785 from travisturner/cluster-resize-fix
Cluster resize fix
2 parents 047b587 + ca22417 commit 6784ab8

File tree

10 files changed

+297
-156
lines changed

10 files changed

+297
-156
lines changed

api.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,23 @@ func (api *API) FragmentBlocks(ctx context.Context, indexName, fieldName, viewNa
559559
return blocks, nil
560560
}
561561

562+
// FragmentData returns all data in the specified fragment.
563+
func (api *API) FragmentData(ctx context.Context, indexName, fieldName, viewName string, shard uint64) (io.WriterTo, error) {
564+
span, _ := tracing.StartSpanFromContext(ctx, "API.FragmentData")
565+
defer span.Finish()
566+
567+
if err := api.validate(apiFragmentData); err != nil {
568+
return nil, errors.Wrap(err, "validating api method")
569+
}
570+
571+
// Retrieve fragment from holder.
572+
f := api.holder.fragment(indexName, fieldName, viewName, shard)
573+
if f == nil {
574+
return nil, ErrFragmentNotFound
575+
}
576+
return f, nil
577+
}
578+
562579
// Hosts returns a list of the hosts in the cluster including their ID,
563580
// URL, and which is the coordinator.
564581
func (api *API) Hosts(ctx context.Context) []*Node {
@@ -1203,6 +1220,7 @@ const (
12031220
apiExportCSV
12041221
apiFragmentBlockData
12051222
apiFragmentBlocks
1223+
apiFragmentData
12061224
apiField
12071225
apiFieldAttrDiff
12081226
//apiHosts // not implemented
@@ -1232,7 +1250,8 @@ var methodsCommon = map[apiMethod]struct{}{
12321250
}
12331251

12341252
var methodsResizing = map[apiMethod]struct{}{
1235-
apiResizeAbort: {},
1253+
apiFragmentData: {},
1254+
apiResizeAbort: {},
12361255
}
12371256

12381257
var methodsNormal = map[apiMethod]struct{}{

client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type InternalClient interface {
5252
ColumnAttrDiff(ctx context.Context, uri *URI, index string, blks []AttrBlock) (map[uint64]map[string]interface{}, error)
5353
RowAttrDiff(ctx context.Context, uri *URI, index, field string, blks []AttrBlock) (map[uint64]map[string]interface{}, error)
5454
SendMessage(ctx context.Context, uri *URI, msg []byte) error
55-
RetrieveShardFromURI(ctx context.Context, index, field string, shard uint64, uri URI) (io.ReadCloser, error)
55+
RetrieveShardFromURI(ctx context.Context, index, field, view string, shard uint64, uri URI) (io.ReadCloser, error)
5656
ImportRoaring(ctx context.Context, uri *URI, index, field string, shard uint64, remote bool, req *ImportRoaringRequest) error
5757
}
5858

@@ -149,6 +149,6 @@ func (n nopInternalClient) RowAttrDiff(ctx context.Context, uri *URI, index, fie
149149
func (n nopInternalClient) SendMessage(ctx context.Context, uri *URI, msg []byte) error {
150150
return nil
151151
}
152-
func (n nopInternalClient) RetrieveShardFromURI(ctx context.Context, index, field string, shard uint64, uri URI) (io.ReadCloser, error) {
152+
func (n nopInternalClient) RetrieveShardFromURI(ctx context.Context, index, field, view string, shard uint64, uri URI) (io.ReadCloser, error) {
153153
return nil, nil
154154
}

cluster.go

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,7 +1218,7 @@ func (c *cluster) unprotectedGenerateResizeJobByAction(nodeAction nodeAction) (*
12181218
Node: toCluster.unprotectedNodeByID(id),
12191219
Coordinator: c.unprotectedCoordinatorNode(),
12201220
Sources: sources,
1221-
Schema: &Schema{Indexes: c.holder.Schema()}, // Include the schema to ensure it's in sync on the receiving node.
1221+
NodeStatus: c.nodeStatus(), // Include the NodeStatus in order to ensure that schema and availableShards are in sync on the receiving node.
12221222
ClusterStatus: c.unprotectedStatus(),
12231223
}
12241224
j.Instructions = append(j.Instructions, instr)
@@ -1277,12 +1277,30 @@ func (c *cluster) followResizeInstruction(instr *ResizeInstruction) error {
12771277
span, ctx := tracing.StartSpanFromContext(context.Background(), "Cluster.followResizeInstruction")
12781278
defer span.Finish()
12791279

1280-
// Sync the schema received in the resize instruction.
1280+
// Sync the NodeStatus received in the resize instruction.
1281+
// Sync schema.
12811282
c.logger.Debugf("holder applySchema")
1282-
if err := c.holder.applySchema(instr.Schema); err != nil {
1283+
if err := c.holder.applySchema(instr.NodeStatus.Schema); err != nil {
12831284
return errors.Wrap(err, "applying schema")
12841285
}
12851286

1287+
// Sync available shards.
1288+
for _, is := range instr.NodeStatus.Indexes {
1289+
for _, fs := range is.Fields {
1290+
f := c.holder.Field(is.Name, fs.Name)
1291+
1292+
// if we don't know about a field locally, log an error because
1293+
// fields should be created and synced prior to shard creation
1294+
if f == nil {
1295+
c.logger.Printf("local field not found: %s/%s", is.Name, fs.Name)
1296+
continue
1297+
}
1298+
if err := f.AddRemoteAvailableShards(fs.AvailableShards); err != nil {
1299+
return errors.Wrap(err, "adding remote available shards")
1300+
}
1301+
}
1302+
}
1303+
12861304
// Request each source file in ResizeSources.
12871305
for _, src := range instr.Sources {
12881306
c.logger.Printf("get shard %d for index %s from host %s", src.Shard, src.Index, src.Node.URI)
@@ -1309,7 +1327,7 @@ func (c *cluster) followResizeInstruction(instr *ResizeInstruction) error {
13091327

13101328
// Stream shard from remote node.
13111329
c.logger.Printf("retrieve shard %d for index %s from host %s", src.Shard, src.Index, src.Node.URI)
1312-
rd, err := c.InternalClient.RetrieveShardFromURI(ctx, src.Index, src.Field, src.Shard, srcURI)
1330+
rd, err := c.InternalClient.RetrieveShardFromURI(ctx, src.Index, src.Field, src.View, src.Shard, srcURI)
13131331
if err != nil {
13141332
// For now it is an acceptable error if the fragment is not found
13151333
// on the remote node. This occurs when a shard has been skipped and
@@ -1318,7 +1336,7 @@ func (c *cluster) followResizeInstruction(instr *ResizeInstruction) error {
13181336
// TODO: figure out a way to distinguish from "fragment not found" errors
13191337
// which are true errors and which simply mean the fragment doesn't have data.
13201338
if err == ErrFragmentNotFound {
1321-
return nil
1339+
continue
13221340
}
13231341
return errors.Wrap(err, "retrieving shard")
13241342
} else if rd == nil {
@@ -1817,6 +1835,30 @@ func (c *cluster) nodeLeave(nodeID string) error {
18171835
return nil
18181836
}
18191837

1838+
func (c *cluster) nodeStatus() *NodeStatus {
1839+
ns := &NodeStatus{
1840+
Node: c.Node,
1841+
Schema: &Schema{Indexes: c.holder.Schema()},
1842+
}
1843+
var availableShards *roaring.Bitmap
1844+
for _, idx := range ns.Schema.Indexes {
1845+
is := &IndexStatus{Name: idx.Name}
1846+
for _, f := range idx.Fields {
1847+
if field := c.holder.Field(idx.Name, f.Name); field != nil {
1848+
availableShards = field.AvailableShards()
1849+
} else {
1850+
availableShards = roaring.NewBitmap()
1851+
}
1852+
is.Fields = append(is.Fields, &FieldStatus{
1853+
Name: f.Name,
1854+
AvailableShards: availableShards,
1855+
})
1856+
}
1857+
ns.Indexes = append(ns.Indexes, is)
1858+
}
1859+
return ns
1860+
}
1861+
18201862
func (c *cluster) mergeClusterStatus(cs *ClusterStatus) error {
18211863
c.mu.Lock()
18221864
defer c.mu.Unlock()
@@ -1918,7 +1960,7 @@ type ResizeInstruction struct {
19181960
Node *Node
19191961
Coordinator *Node
19201962
Sources []*ResizeSource
1921-
Schema *Schema
1963+
NodeStatus *NodeStatus
19221964
ClusterStatus *ClusterStatus
19231965
}
19241966

encoding/proto/proto.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ func encodeResizeInstruction(m *pilosa.ResizeInstruction) *internal.ResizeInstru
457457
Node: encodeNode(m.Node),
458458
Coordinator: encodeNode(m.Coordinator),
459459
Sources: encodeResizeSources(m.Sources),
460-
Schema: encodeSchema(m.Schema),
460+
NodeStatus: encodeNodeStatus(m.NodeStatus),
461461
ClusterStatus: encodeClusterStatus(m.ClusterStatus),
462462
}
463463
}
@@ -737,8 +737,8 @@ func decodeResizeInstruction(ri *internal.ResizeInstruction, m *pilosa.ResizeIns
737737
decodeNode(ri.Coordinator, m.Coordinator)
738738
m.Sources = make([]*pilosa.ResizeSource, len(ri.Sources))
739739
decodeResizeSources(ri.Sources, m.Sources)
740-
m.Schema = &pilosa.Schema{}
741-
decodeSchema(ri.Schema, m.Schema)
740+
m.NodeStatus = &pilosa.NodeStatus{}
741+
decodeNodeStatus(ri.NodeStatus, m.NodeStatus)
742742
m.ClusterStatus = &pilosa.ClusterStatus{}
743743
decodeClusterStatus(ri.ClusterStatus, m.ClusterStatus)
744744
}

http/client.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -705,24 +705,19 @@ func (c *InternalClient) exportNodeCSV(ctx context.Context, node *pilosa.Node, i
705705
return nil
706706
}
707707

708-
func (c *InternalClient) RetrieveShardFromURI(ctx context.Context, index, field string, shard uint64, uri pilosa.URI) (io.ReadCloser, error) {
708+
func (c *InternalClient) RetrieveShardFromURI(ctx context.Context, index, field, view string, shard uint64, uri pilosa.URI) (io.ReadCloser, error) {
709709
span, ctx := tracing.StartSpanFromContext(ctx, "InternalClient.RetrieveShardFromURI")
710710
defer span.Finish()
711711

712712
node := &pilosa.Node{
713713
URI: uri,
714714
}
715-
return c.backupShardNode(ctx, index, field, shard, node)
716-
}
717-
718-
func (c *InternalClient) backupShardNode(ctx context.Context, index, field string, shard uint64, node *pilosa.Node) (io.ReadCloser, error) {
719-
span, ctx := tracing.StartSpanFromContext(ctx, "InternalClient.backupShardNode")
720-
defer span.Finish()
721715

722-
u := nodePathToURL(node, "/fragment/data")
716+
u := nodePathToURL(node, "/internal/fragment/data")
723717
u.RawQuery = url.Values{
724718
"index": {index},
725719
"field": {field},
720+
"view": {view},
726721
"shard": {strconv.FormatUint(shard, 10)},
727722
}.Encode()
728723

http/handler.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ func (h *Handler) populateValidators() {
192192
h.validators["PostClusterMessage"] = queryValidationSpecRequired()
193193
h.validators["GetFragmentBlockData"] = queryValidationSpecRequired()
194194
h.validators["GetFragmentBlocks"] = queryValidationSpecRequired("index", "field", "view", "shard")
195+
h.validators["GetFragmentData"] = queryValidationSpecRequired("index", "field", "view", "shard")
195196
h.validators["GetFragmentNodes"] = queryValidationSpecRequired("shard", "index")
196197
h.validators["PostIndexAttrDiff"] = queryValidationSpecRequired()
197198
h.validators["PostFieldAttrDiff"] = queryValidationSpecRequired()
@@ -262,6 +263,7 @@ func newRouter(handler *Handler) *mux.Router {
262263
router.HandleFunc("/internal/cluster/message", handler.handlePostClusterMessage).Methods("POST").Name("PostClusterMessage")
263264
router.HandleFunc("/internal/fragment/block/data", handler.handleGetFragmentBlockData).Methods("GET").Name("GetFragmentBlockData")
264265
router.HandleFunc("/internal/fragment/blocks", handler.handleGetFragmentBlocks).Methods("GET").Name("GetFragmentBlocks")
266+
router.HandleFunc("/internal/fragment/data", handler.handleGetFragmentData).Methods("GET").Name("GetFragmentData")
265267
router.HandleFunc("/internal/fragment/nodes", handler.handleGetFragmentNodes).Methods("GET").Name("GetFragmentNodes")
266268
router.HandleFunc("/internal/index/{index}/attr/diff", handler.handlePostIndexAttrDiff).Methods("POST").Name("PostIndexAttrDiff")
267269
router.HandleFunc("/internal/index/{index}/field/{field}/attr/diff", handler.handlePostFieldAttrDiff).Methods("POST").Name("PostFieldAttrDiff")
@@ -1214,6 +1216,27 @@ type getFragmentBlocksResponse struct {
12141216
Blocks []pilosa.FragmentBlock `json:"blocks"`
12151217
}
12161218

1219+
// handleGetFragmentData handles GET /internal/fragment/data requests.
1220+
func (h *Handler) handleGetFragmentData(w http.ResponseWriter, r *http.Request) {
1221+
// Read shard parameter.
1222+
q := r.URL.Query()
1223+
shard, err := strconv.ParseUint(q.Get("shard"), 10, 64)
1224+
if err != nil {
1225+
http.Error(w, "shard required", http.StatusBadRequest)
1226+
return
1227+
}
1228+
// Retrieve fragment data from holder.
1229+
f, err := h.api.FragmentData(r.Context(), q.Get("index"), q.Get("field"), q.Get("view"), shard)
1230+
if err != nil {
1231+
http.Error(w, err.Error(), http.StatusNotFound)
1232+
return
1233+
}
1234+
// Stream fragment to response body.
1235+
if _, err := f.WriteTo(w); err != nil {
1236+
h.logger.Printf("error streaming fragment data: %s", err)
1237+
}
1238+
}
1239+
12171240
// handleGetVersion handles /version requests.
12181241
func (h *Handler) handleGetVersion(w http.ResponseWriter, r *http.Request) {
12191242
if !validHeaderAcceptJSON(r.Header) {

0 commit comments

Comments
 (0)