Skip to content

Commit 4dc2b67

Browse files
committed
wip
1 parent c4c37ba commit 4dc2b67

File tree

14 files changed

+169
-290
lines changed

14 files changed

+169
-290
lines changed

packages/client-proxy/internal/edge/api/api.gen.go

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/client-proxy/internal/edge/api/spec.gen.go

Lines changed: 26 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 37 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,90 +1,57 @@
11
package handlers
22

33
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"time"
8+
49
"github.com/gin-gonic/gin"
10+
"go.uber.org/zap"
11+
12+
"github.com/e2b-dev/infra/packages/proxy/internal/edge/orchestrators"
13+
orchestratorinfo "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator-info"
514
)
615

716
func (a *APIStore) V1ServiceDiscoveryNodeDrain(c *gin.Context, nodeId string) {
8-
/*
9-
findCtx, findCtxCancel := context.WithTimeout(c, 5*time.Second)
10-
defer findCtxCancel()
11-
12-
logger := a.logger.With(zap.String("node_id", nodeId))
13-
14-
node, err := a.serviceDiscovery.GetNodeById(findCtx, nodeId)
15-
if err != nil {
16-
logger.Error("failed to get node by id", zap.Error(err))
17-
18-
if errors.Is(err, service_discovery.NodeNotFoundErr) {
19-
a.sendAPIStoreError(c, http.StatusNotFound, "node not found")
20-
} else {
21-
a.sendAPIStoreError(c, http.StatusInternalServerError, "failed to get node by id")
22-
}
23-
24-
return
25-
}
17+
err := a.sendNodeRequest(c, nodeId, orchestratorinfo.ServiceInfoStatus_OrchestratorDraining)
18+
if err != nil {
19+
a.sendAPIStoreError(c, http.StatusBadRequest, "Error when calling service discovery node")
20+
return
21+
}
2622

27-
currentNodeId := a.serviceDiscovery.GetSelfNodeId()
28-
if nodeId != currentNodeId {
29-
logger.Info("sending draining node request to neighbor", zap.String("node_ip", node.NodeIp))
23+
c.Status(http.StatusOK)
24+
}
3025

31-
err := a.serviceDiscoveryCallNeighborNodes(c, nodeId, node.NodeIp, node.NodePort, "drain")
32-
if err != nil {
33-
logger.Error("failed to call node drain", zap.Error(err))
34-
a.sendAPIStoreError(c, http.StatusInternalServerError, "failed to call neighbor node")
35-
return
36-
}
26+
func (a *APIStore) sendNodeRequest(ctx context.Context, nodeId string, status orchestratorinfo.ServiceInfoStatus) error {
27+
findCtx, findCtxCancel := context.WithTimeout(ctx, 5*time.Second)
28+
defer findCtxCancel()
3729

38-
logger.Info("cluster node drain request delivered")
39-
c.Status(http.StatusOK)
40-
return
41-
}
30+
logger := a.logger.With(zap.String("node_id", nodeId))
4231

43-
// handle self drain process
44-
if a.selfDrainHandler == nil {
45-
logger.Error("self drain handler is not set")
46-
a.sendAPIStoreError(c, http.StatusInternalServerError, "self drain handler is not configured")
47-
return
32+
// try to find orchestrator node first
33+
o, err := a.orchestratorPool.GetOrchestrator(nodeId)
34+
if err != nil {
35+
if !errors.Is(err, orchestrators.ErrOrchestratorNotFound) {
36+
logger.Warn("Failed to get orchestrator", zap.Error(err))
37+
return errors.New("Error when getting orchestrator node")
4838
}
39+
}
4940

50-
logger.Info("starting self-drain process")
41+
if o != nil {
42+
logger.Info("found orchestrator node, calling drain request")
43+
_, err = o.Client.Info.ServiceStatusOverride(
44+
findCtx, &orchestratorinfo.ServiceStatusChangeRequest{ServiceStatus: status},
45+
)
5146

52-
err = (*a.selfDrainHandler)()
5347
if err != nil {
54-
logger.Error("failed to start self drain process", zap.Error(err))
55-
a.sendAPIStoreError(c, http.StatusInternalServerError, "failed to start self drain process")
56-
return
48+
logger.Error("failed to drain orchestrator node", zap.Error(err))
49+
return errors.New("Failed to drain orchestrator node")
5750
}
5851

59-
a.healthStatus = api.Draining
60-
c.Status(http.StatusOK)
61-
62-
*/
63-
}
64-
65-
/*
66-
func (a *APIStore) serviceDiscoveryCallNeighborNodes(ctx context.Context, nodeId string, nodeIp string, nodePort int, callMethod string) error {
67-
// todo: add authorization when implemented
68-
// update / drain
69-
reqUrl := fmt.Sprintf("http://%s:%d/v1/service-discovery/nodes/%s/%s", nodeIp, nodePort, nodeId, callMethod)
70-
req, err := http.NewRequest(http.MethodPost, reqUrl, nil)
71-
if err != nil {
72-
return err
73-
}
74-
75-
reqCtx, reqCtxCancel := context.WithTimeout(ctx, 5*time.Second)
76-
defer reqCtxCancel()
77-
78-
req.WithContext(reqCtx)
79-
res, err := http.DefaultClient.Do(req)
80-
if err != nil {
81-
return err
82-
}
83-
84-
if res.StatusCode != http.StatusOK {
85-
return fmt.Errorf("failed to call neighbor node: %s", res.Status)
52+
return nil
8653
}
8754

88-
return nil
55+
// todo: call edge api node
56+
return errors.New("Failed to find node, it must be edge api node")
8957
}
90-
*/
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package handlers
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/gin-gonic/gin"
7+
8+
orchestratorinfo "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator-info"
9+
)
10+
11+
func (a *APIStore) V1ServiceDiscoveryNodeKill(c *gin.Context, nodeId string) {
12+
err := a.sendNodeRequest(c, nodeId, orchestratorinfo.ServiceInfoStatus_OrchestratorUnhealthy)
13+
if err != nil {
14+
a.sendAPIStoreError(c, http.StatusBadRequest, "Error when calling service discovery node")
15+
return
16+
}
17+
18+
c.Status(http.StatusOK)
19+
}

packages/client-proxy/internal/edge/handlers/service-discovery-node-update.go

Lines changed: 0 additions & 63 deletions
This file was deleted.

packages/client-proxy/internal/edge/orchestrators/orchestrator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ func (o *Orchestrator) sync() {
106106
select {
107107
case <-o.ctx.Done():
108108
zap.L().Info("context done", zap.String("orchestrator sync id", o.ServiceId))
109+
return
109110
case <-ticker.C:
110111
o.syncRun()
111112
}

0 commit comments

Comments
 (0)