This repository was archived by the owner on Sep 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgrpc.go
63 lines (56 loc) · 2.22 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package cirrus
import (
"fmt"
pb "github.com/hanakoa/cirrus/pb"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"log"
"net"
"time"
)
// GrpcServer is a gRPC server that provides an endpoint
// which allows apps to send "heartbeats" to request node IDs.
type GrpcServer struct {
port int
}
// Heartbeat is a gRPC endpoint to allow apps to request a node ID.
func (service *GrpcServer) Heartbeat(ctx context.Context, in *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
log.Printf("[gRPC] -- Processing heartbeat for app %s", in.AppID)
if nodeID := in.GetNodeID(); nodeID != 0 {
// TODO do we need to lock Heartbeats, since the prune goroutine is deleting from it?
if time.Now().Before(Heartbeats[int(nodeID)].Add(HeartbeatPeriodicity)) {
log.Printf("[gRPC] -- App %s already has a valid node ID... Extending ID lifetime...", in.AppID)
Heartbeats[int(nodeID)] = time.Now()
return &pb.HeartbeatResponse{AppID: in.AppID, NodeID: nodeID}, nil
}
log.Printf("[gRPC] -- App %s has expired node ID %d", in.AppID, nodeID)
}
// TODO if you don't create more than 1024, there should always be an available nodeID,
// however, we should probably return an error if there are no available nodes,
// instead of letting the client hang
// https://stackoverflow.com/questions/3398490/checking-if-a-channel-has-a-ready-to-read-value-using-go
// TODO check if channel has ready-to-read value, otherwise return error
nodeID := <-AvailableNodeIds
log.Printf("[gRPC] -- Granting node ID %d to app %s", nodeID, in.AppID)
Heartbeats[nodeID] = time.Now()
return &pb.HeartbeatResponse{AppID: in.AppID, NodeID: int32(nodeID)}, nil
}
func (service *GrpcServer) run() {
address := fmt.Sprintf(":%d", GrpcPort)
log.Printf("Listening on %s", address)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
log.Println("Starting grpc server...")
server := grpc.NewServer()
// Register our services
pb.RegisterHeartbeatServiceServer(server, service)
// Register reflection service on gRPC server.
reflection.Register(server)
log.Println("Registered grpc services...")
if err := server.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}