Skip to content

fix: sync api improvements #758

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/api/syncapi/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (i *Identity) loadOrGenerateKey() error {
if pubKeyBlock == nil {
return errors.New("no public key found in pem")
}
pubKey, err := x509.ParsePKIXPublicKey(pubKeyBytes)
pubKey, err := x509.ParsePKIXPublicKey(pubKeyBlock.Bytes)
if err != nil {
return fmt.Errorf("parse public key: %w", err)
}
Expand Down
39 changes: 30 additions & 9 deletions internal/api/syncapi/identity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package syncapi

import (
"fmt"
"os"
"path/filepath"
"testing"
)

func TestIdentity(t *testing.T) {
func TestCreateAndLoad(t *testing.T) {
dir := t.TempDir()

// Create a new identity
Expand All @@ -16,14 +15,36 @@ func TestIdentity(t *testing.T) {
t.Fatalf("failed to create identity: %v", err)
}

// Load the identity
loaded, err := NewIdentity("test-instance", filepath.Join(dir, "myidentity.pem"))
if err != nil {
t.Fatalf("failed to load identity: %v", err)
}

// Verify the identity
if !ident.privateKey.Equal(loaded.privateKey) {
t.Fatalf("identities do not match")
}
}

func TestSignatures(t *testing.T) {
dir := t.TempDir()

// Create a new identity
ident, err := NewIdentity("test-instance", filepath.Join(dir, "myidentity.pem"))
if err != nil {
t.Fatalf("failed to create identity: %v", err)
}

// Sign a message
signature, err := ident.SignMessage([]byte("hello world!"))
if err != nil {
t.Fatalf("failed to sign message: %v", err)
}
fmt.Printf("signed message: %x\n", signature)

// Load and print identity file
bytes, _ := os.ReadFile(filepath.Join(dir, "myidentity.pem"))
t.Log(string(bytes))

// Load and print public key file
bytes, _ = os.ReadFile(filepath.Join(dir, "myidentity.pem.pub"))
t.Log(string(bytes))
// verify the signature
if err := ident.VerifySignature([]byte("hello world!"), signature); err != nil {
t.Fatalf("failed to verify signature: %v", err)
}
}
1 change: 0 additions & 1 deletion internal/api/syncapi/syncclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type SyncClient struct {

// mutable properties
mu sync.Mutex
remoteConfigStore RemoteConfigStore
connectionStatus v1.SyncConnectionState
connectionStatusMessage string
}
Expand Down
46 changes: 33 additions & 13 deletions internal/api/syncapi/synchandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
}
zap.S().Infof("syncserver accepted a connection from client instance ID %q", authorizedClientPeer.InstanceId)

opIDLru, _ := lru.New[int64, int64](128) // original ID -> local ID
flowIDLru, _ := lru.New[int64, int64](128) // original flow ID -> local flow ID
opIDLru, _ := lru.New[int64, int64](2048) // original ID -> local ID
flowIDLru, _ := lru.New[int64, int64](2048) // original flow ID -> local flow ID

insertOrUpdate := func(op *v1.Operation) error {
op.OriginalId = op.Id
Expand All @@ -123,18 +123,38 @@ func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStre
}
}
if op.FlowId, ok = flowIDLru.Get(op.OriginalFlowId); !ok {
var flowOp *v1.Operation
if err := h.mgr.oplog.Query(oplog.Query{}.
SetOriginalFlowID(op.OriginalFlowId).
SetInstanceID(op.InstanceId), func(o *v1.Operation) error {
flowOp = o
return nil
}); err != nil {
return fmt.Errorf("mapping remote flow ID to local ID: %w", err)
tryFindFlowID := func(q oplog.Query) (int64, error) {
var flowOp *v1.Operation
if err := h.mgr.oplog.Query(q, func(o *v1.Operation) error {
flowOp = o
return nil
}); err != nil {
return 0, fmt.Errorf("mapping remote flow ID to local ID: %w", err)
}
if flowOp != nil {
return flowOp.FlowId, nil
}
return 0, nil
}
if flowOp != nil {
op.FlowId = flowOp.FlowId
flowIDLru.Add(op.OriginalFlowId, flowOp.FlowId)

var err error
var flowId int64
flowId, err = tryFindFlowID(oplog.Query{}.SetSnapshotID(op.SnapshotId))
if err != nil {
return err
}
if flowId == 0 {
flowId, err = tryFindFlowID(oplog.Query{}.
SetOriginalFlowID(op.OriginalFlowId).
SetInstanceID(op.InstanceId))
if err != nil {
return err
}
}

if flowId != 0 {
op.FlowId = flowId
flowIDLru.Add(op.OriginalFlowId, flowId)
}
}

Expand Down
Loading