Skip to content

Commit 891b208

Browse files
author
Raphael V Rosa
authored
Adds heartbeat implementation to inventory client/server (#243)
1 parent cc6f634 commit 891b208

10 files changed

Lines changed: 593 additions & 248 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ license: $(VENV_DIR) ## Check licensing with the reuse tool
4141
build: ## build in all subprojects
4242
for dir in $(SUBPROJECTS); do $(MAKE) -C $$dir build; done
4343

44-
DOCKER_PROJECTS := api exporters-inventory inventory tenant-controller
44+
DOCKER_PROJECTS := api apiv2 exporters-inventory inventory tenant-controller
4545
docker-build: ## build all docker containers
4646
for dir in $(DOCKER_PROJECTS); do $(MAKE) -C $$dir $@; done
4747

inventory/api/inventory/v1/inventory.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ service InventoryService {
7474

7575
// Deletes all resources of given kind for tenant.
7676
rpc DeleteAllResources(DeleteAllResourcesRequest) returns (DeleteAllResourcesResponse) {}
77+
78+
// Custom RPC to establish clients heartbeat and subscription verification.
79+
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
7780
}
7881

7982
enum ClientKind {
@@ -505,3 +508,10 @@ message DeleteAllResourcesRequest {
505508
}
506509

507510
message DeleteAllResourcesResponse {}
511+
512+
message HeartbeatRequest {
513+
// The UUID of the client.
514+
string client_uuid = 1 [(buf.validate.field).string = {uuid: true}];
515+
}
516+
517+
message HeartbeatResponse {}

inventory/docs/api/inventory.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@
112112
- [GetTreeHierarchyResponse](#inventory-v1-GetTreeHierarchyResponse)
113113
- [GetTreeHierarchyResponse.Node](#inventory-v1-GetTreeHierarchyResponse-Node)
114114
- [GetTreeHierarchyResponse.TreeNode](#inventory-v1-GetTreeHierarchyResponse-TreeNode)
115+
- [HeartbeatRequest](#inventory-v1-HeartbeatRequest)
116+
- [HeartbeatResponse](#inventory-v1-HeartbeatResponse)
115117
- [ListInheritedTelemetryProfilesRequest](#inventory-v1-ListInheritedTelemetryProfilesRequest)
116118
- [ListInheritedTelemetryProfilesRequest.InheritBy](#inventory-v1-ListInheritedTelemetryProfilesRequest-InheritBy)
117119
- [ListInheritedTelemetryProfilesResponse](#inventory-v1-ListInheritedTelemetryProfilesResponse)
@@ -1868,6 +1870,31 @@ resource ID, generated by inventory on Create |
18681870

18691871

18701872

1873+
<a name="inventory-v1-HeartbeatRequest"></a>
1874+
1875+
### HeartbeatRequest
1876+
1877+
1878+
1879+
| Field | Type | Label | Description |
1880+
| ----- | ---- | ----- | ----------- |
1881+
| client_uuid | [string](#string) | | The UUID of the client. |
1882+
1883+
1884+
1885+
1886+
1887+
1888+
<a name="inventory-v1-HeartbeatResponse"></a>
1889+
1890+
### HeartbeatResponse
1891+
1892+
1893+
1894+
1895+
1896+
1897+
18711898
<a name="inventory-v1-ListInheritedTelemetryProfilesRequest"></a>
18721899

18731900
### ListInheritedTelemetryProfilesRequest
@@ -2163,6 +2190,7 @@ Client (API, Resource Manager, etc) registration and event streaming
21632190
| GetTreeHierarchy | [GetTreeHierarchyRequest](#inventory-v1-GetTreeHierarchyRequest) | [GetTreeHierarchyResponse](#inventory-v1-GetTreeHierarchyResponse) | Returns the upstream tree hierarchy given the resource ID in the request. The response contains a list of adjacent nodes, from which the tree can be reconstructed. |
21642191
| GetSitesPerRegion | [GetSitesPerRegionRequest](#inventory-v1-GetSitesPerRegionRequest) | [GetSitesPerRegionResponse](#inventory-v1-GetSitesPerRegionResponse) | Returns a list of the number of sites per region ID given the list of region IDs in the request. The response contains a list of objects with a region ID associated to the total amount of sites under it. The sites under a region account for all the sites under its child regions recursively, respecting the max-depth of parent relationships among regions. |
21652192
| DeleteAllResources | [DeleteAllResourcesRequest](#inventory-v1-DeleteAllResourcesRequest) | [DeleteAllResourcesResponse](#inventory-v1-DeleteAllResourcesResponse) | Deletes all resources of given kind for tenant. |
2193+
| Heartbeat | [HeartbeatRequest](#inventory-v1-HeartbeatRequest) | [HeartbeatResponse](#inventory-v1-HeartbeatResponse) | Custom RPC to establish clients heartbeat and subscription verification. |
21662194

21672195

21682196

inventory/internal/inventory/inventory.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,21 @@ func (srv *InventorygRPCServer) Authorize(ctx context.Context, request interface
101101
return nil
102102
}
103103

104+
func (srv *InventorygRPCServer) Heartbeat(
105+
_ context.Context,
106+
in *inv_v1.HeartbeatRequest,
107+
) (*inv_v1.HeartbeatResponse, error) {
108+
// Check if the client is already registered.
109+
// if the client is already registered, send an ack.
110+
if _, ok := srv.CR.ClientRegistrationMap().Load(in.GetClientUuid()); !ok {
111+
err := errors.Errorfr(errors.Reason_UNKNOWN_CLIENT,
112+
"client with UUID %s not found", in.GetClientUuid(),
113+
)
114+
return nil, errors.Wrap(err)
115+
}
116+
return &inv_v1.HeartbeatResponse{}, nil
117+
}
118+
104119
func (srv *InventorygRPCServer) SubscribeEvents(
105120
in *inv_v1.SubscribeEventsRequest,
106121
stream inv_v1.InventoryService_SubscribeEventsServer,

inventory/pkg/api/inventory/v1/inventory.pb.go

Lines changed: 370 additions & 246 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

inventory/pkg/api/inventory/v1/inventory_constants.pb.go

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

inventory/pkg/api/inventory/v1/inventory_grpc.pb.go

Lines changed: 38 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

inventory/pkg/client/client.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package client
66
import (
77
"context"
88
"errors"
9+
"flag"
910
"io"
1011
"sync"
1112
"time"
@@ -28,6 +29,7 @@ import (
2829
inv_errors "github.com/open-edge-platform/infra-core/inventory/v2/pkg/errors"
2930
"github.com/open-edge-platform/infra-core/inventory/v2/pkg/logging"
3031
"github.com/open-edge-platform/infra-core/inventory/v2/pkg/metrics"
32+
"github.com/open-edge-platform/infra-core/inventory/v2/pkg/tenant"
3133
"github.com/open-edge-platform/infra-core/inventory/v2/pkg/tracing"
3234
"github.com/open-edge-platform/infra-core/inventory/v2/pkg/util"
3335
"github.com/open-edge-platform/infra-core/inventory/v2/pkg/util/filters"
@@ -63,6 +65,21 @@ const (
6365
InvCacheStaleTimeoutOffsetDescription = "Parameter to set the timeout offset percentage for the Inventory UUID cache"
6466
)
6567

68+
const (
69+
defaultHeatbeatBackoffRetries = 3
70+
defaultHeatbeatBackoffInterval = time.Second * 1
71+
defaultHeartbeatInterval = time.Second * 30
72+
)
73+
74+
var (
75+
heatbeatBackoffRetries = flag.Uint64(
76+
"heatbeatBackoffRetries", defaultHeatbeatBackoffRetries, "The number of retries the heartbeat backoff will attempt")
77+
heatbeatBackoffInterval = flag.Duration(
78+
"heatbeatBackoffInterval", defaultHeatbeatBackoffInterval, "The interval between heartbeat backoff retries")
79+
heartbeatInterval = flag.Duration(
80+
"heartbeatInterval", defaultHeartbeatInterval, "The interval between heartbeats")
81+
)
82+
6683
type WatchEvents struct {
6784
Ctx context.Context
6885
Event *inv_v1.SubscribeEventsResponse
@@ -576,6 +593,43 @@ func NewTenantAwareInventoryClient(
576593
return cl, nil
577594
}
578595

596+
// Set up the heartbeat ticker to keep the client connection alive.
597+
func (client *inventoryClient) heartbeat(clientUUID string) error {
598+
heartbeetReq := &inv_v1.HeartbeatRequest{
599+
ClientUuid: clientUUID,
600+
}
601+
602+
ticker := time.NewTicker(*heartbeatInterval)
603+
defer ticker.Stop()
604+
for {
605+
select {
606+
case <-ticker.C:
607+
clientCtxDeadline, cancel := context.WithDeadline(client.streamCtx, time.Now().Add(*heatbeatBackoffInterval))
608+
clientCtx := tenant.AddTenantIDToContext(clientCtxDeadline, clientUUID)
609+
610+
err := backoff.Retry(func() error {
611+
zlog.Debug().Msgf("Heartbeat client UUID: %s", clientUUID)
612+
_, errHearbeat := client.invAPI.Heartbeat(clientCtx, heartbeetReq)
613+
// If the error is an unknown client, we return a permanent error
614+
// to stop the heartbeat retry loop.
615+
if errHearbeat != nil && inv_errors.IsUnKnownClient(errHearbeat) {
616+
return backoff.Permanent(errHearbeat)
617+
}
618+
return errHearbeat
619+
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(*heatbeatBackoffInterval), *heatbeatBackoffRetries))
620+
if err != nil {
621+
zlog.InfraErr(err).Msgf("failed to heartbeat client UUID: %s", clientUUID)
622+
cancel() // cancel the context to avoid leaking resources
623+
return err
624+
}
625+
cancel() // cancel the context to avoid leaking resources
626+
case <-client.stream.Context().Done():
627+
zlog.InfraSec().Info().Msgf("finished to heartbeat client UUID: %s", clientUUID)
628+
return nil
629+
}
630+
}
631+
}
632+
579633
// register registers the inventory client on a name and a list of resource kinds.
580634
// It is meant to be used by any register retry go routine that can be called
581635
// once the subscriptions stream context is closed by any unexpected reasons.
@@ -624,6 +678,16 @@ func (client *inventoryClient) register() error {
624678
zlog.InfraSec().Info().Msgf("Registered inventory client with UUID: %s", resp.ClientUuid)
625679
client.uuidMutex.Unlock()
626680

681+
// Start the heartbeat ticker to keep the client connection alive.
682+
go func(clientUUID string) {
683+
if err := client.heartbeat(clientUUID); err != nil {
684+
// heartbeat failed, close the stream and connection.
685+
zlog.InfraSec().InfraErr(err).Msgf("heartbeat stopped for client UUID: %s", clientUUID)
686+
client.Close()
687+
zlog.Fatal().Msgf("failed to heartbeat client UUID: %s", clientUUID)
688+
}
689+
}(resp.ClientUuid)
690+
627691
return nil
628692
}
629693

inventory/pkg/mocks/inventory_mock.go

Lines changed: 35 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

inventory/python/infra_inventory/inventory/v1.py

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)