Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ The materialized status is written to `presence:{account}:status` and broadcast
- Add a third **external** layer (Teams) to the presence aggregation, stored at `presence:{account}:azure` and read atomically in the recompute Lua (same `{account}` hash-tag → same cluster slot).
- A new **one-shot cron binary** (`user-presence-service/sync`) reconciles Teams call state into that layer each run, writing directly to Valkey and publishing `PresenceState` changes itself.
- To let a separate binary share the recompute Lua + key layout, **extract** the Valkey store from the daemon's `package main` into an importable `user-presence-service/presencestore` package, consumed by both the daemon and the sync.
- Extend `pkg/msgraph` with app-only `GetUserByPrincipalName` (targeted single-user lookup) and an ROPC (delegated) `PresenceClient.GetPresencesByUserId`.
- Extend `pkg/msgraph` with app-only `GetUsersByAccounts` (batched, domain-agnostic prefix lookup) and an ROPC (delegated) `PresenceClient.GetPresencesByUserId`.

## 4. Decisions (resolved during brainstorming)

Expand All @@ -43,10 +43,10 @@ The materialized status is written to `presence:{account}:status` and broadcast
| D5 | Runtime | **One-shot** binary, triggered by an external cron (K8s CronJob / pipeline schedule); runs once, exits. |
| D6 | Teams→in-call mapping | **Call/meeting activities only**: activity ∈ {`InACall`, `InAConferenceCall`, `Presenting`}. |
| D7 | Graph auth | **ROPC** service account for presence (`Presence.Read.All`, delegated) + **app-only** for `/users` (`User.Read.All`). |
| D8 | User matching | `account + "@" + TEAMS_EMAIL_DOMAIN` used as the UPN for a targeted `GET /users/{upn}` lookup (case-insensitive in AAD). |
| D9 | Graph client location | **Extend `pkg/msgraph`** (app-only `GetUserByPrincipalName`; new ROPC `PresenceClient`). |
| D8 | User matching | Match by `startsWith(userPrincipalName,'account@')` (domain-agnostic — accounts may live under different domains); map results back by UPN local-part, case-insensitively, first match wins. No fixed `TEAMS_EMAIL_DOMAIN`. |
| D9 | Graph client location | **Extend `pkg/msgraph`** (app-only `GetUsersByAccounts`; new ROPC `PresenceClient`). Hand-rolled `net/http`, no Graph SDK. |
| D10 | Directory layout | `user-presence-service/presencestore/` (shared) + `user-presence-service/sync/` (`package main` + `deploy/`). |
| D11 | id→account mapping | **Permanent** `account → azureObjectID` cache in Valkey (`presence:idmap:azure`, no TTL — the mapping is immutable); Graph is queried (targeted `GetUserByPrincipalName` per account) only to fill accounts missing from the cache. |
| D11 | id→account mapping | **Permanent** `account → azureObjectID` cache in Valkey (`presence:idmap:azure`, no TTL — the mapping is immutable); Graph is queried (batched `GetUsersByAccounts`, chunked ≤15 startsWith clauses/query) only to fill accounts missing from the cache. |
| D12 | work-list scope | Reconcile only **active** accounts (live connection), sourced from the reused `presence:sweep` zset via `ActiveAccounts()` — not all site users from Mongo. Disconnected users are offline regardless of Teams, so the sync has no Mongo dependency. |

## 5. Aggregation / precedence (the core change)
Expand Down Expand Up @@ -75,15 +75,15 @@ This splits today's single manual overlay into a **high tier** (away / appear_of
| `presence:{account}:azure` | string | presencestore | External (Teams) status: `"in-call"` or absent. Read as `KEYS[4]` in recompute. Written with a **TTL safety-net** (~`EXTERNAL_TTL`, default 5m) so a dead sync self-heals. |
| `presence:sweep` | zset | presencestore (daemon) | **Reused, not new.** Members are accounts with ≥1 live connection (an account is `ZREM`'d when it fully disconnects). The sync sources its work-list from here via `ActiveAccounts()` — only connected users can be shown in-call. |
| `presence:status:index:azure` | set | sync | Accounts currently marked in-call. Lets a run compute `toClear = prev − current`. Single key (own slot); updated in a pipeline (not atomic with per-account keys — acceptable for a reconciler). |
| `presence:idmap:azure` | hash | sync | **Permanent** `account → azureObjectID` cache (no TTL — the mapping is immutable). Read every run; accounts missing from it are filled lazily via a targeted per-account Graph lookup (see §7). |
| `presence:idmap:azure` | hash | sync | **Permanent** `account → azureObjectID` cache (no TTL — the mapping is immutable). Read every run; accounts missing from it are filled lazily via a batched Graph prefix lookup (see §7). |

## 7. The sync service

**Type:** `user-presence-service/sync` — `package main`, its own `main.go` + `deploy/Dockerfile`, triggered by an external cron. Runs one reconcile and exits (non-zero on a job-level failure).

**Reconcile flow (per run):**
1. **Active accounts:** `ActiveAccounts()` = `ZRANGE presence:sweep` → accounts with a live connection. A disconnected user is offline regardless of Teams (the §5 invariant), so checking them is wasted work — we scope to active users only. No Mongo involved.
2. **Resolve ids:** `HMGET presence:idmap:azure <active accounts>` → found ids + a not-found list. For each **missing** account, look it up targeted via Graph `GetUserByPrincipalName(account@domain)` (case-insensitive UPN path; no tenant-wide enumeration) and `HSET` the results **permanently**. Steady state with no new users makes zero Graph calls. A single account's lookup failure is logged and skipped.
2. **Resolve ids:** `HMGET presence:idmap:azure <active accounts>` → found ids + a not-found list. For the **missing** accounts, batch a Graph `GetUsersByAccounts` call — `startsWith(userPrincipalName,'account@')` OR'd across accounts, chunked ≤15/query (Graph rejects overly complex filters), domain-agnostic — map results back by UPN local-part (case-insensitive, first match wins), and `HSET` them **permanently**. Steady state with no new users makes zero Graph calls; a lookup failure is logged and yields nothing (non-fatal).
3. Graph ROPC `GetPresencesByUserId(ids)` (batched ≤650/req).
4. `current` = accounts whose presence `isInCall` (activity ∈ {InACall, InAConferenceCall, Presenting}).
5. `prev` = `SMEMBERS presence:status:index:azure`.
Expand All @@ -97,14 +97,14 @@ Full status reconciliation each run means a missed/crashed run self-corrects nex

## 8. `pkg/msgraph` extensions

- **App-only** (existing client, reuses `accessToken`): `GetUserByPrincipalName(ctx, upn) (*GraphUser, error)` — targeted `/users/{upn}` lookup, `(nil,nil)` on 404; `GraphUser{ID, Mail, UserPrincipalName}`.
- **App-only** (existing client, reuses `accessToken`): `GetUsersByAccounts(ctx, accounts) ([]GraphUser, error)` — `startsWith(userPrincipalName,'account@')` OR'd + chunked (`ConsistencyLevel: eventual`, `$count=true`); `GraphUser{ID, Mail, UserPrincipalName}`.
- **ROPC** (new `PresenceClient`, separate token cache, `grant_type=password`): `GetPresencesByUserId(ctx, ids) ([]Presence, error)`; `Presence{ID, Availability, Activity}`; batches at Graph's 650-id cap. Behind a `PresenceReader` interface for mocking.

## 9. Code structure & impact

```text
pkg/model/presence.go + StatusInCall
pkg/msgraph/msgraph.go + GetUserByPrincipalName, GraphUser
pkg/msgraph/msgraph.go + GetUsersByAccounts, GraphUser
pkg/msgraph/presence.go NEW ROPC PresenceClient, GetPresencesByUserId

user-presence-service/presencestore/ NEW (moved from store_valkey.go)
Expand All @@ -131,7 +131,7 @@ The daemon's runtime behavior is unchanged except the new precedence in the shar

## 11. Testing strategy

- **Unit:** model round-trip (`StatusInCall`); `pkg/msgraph` `GetUserByPrincipalName` (found/404/error) + ROPC `GetPresencesByUserId` (httptest, grant-type/username asserted, token never logged); sync `isInCall`/`accountFromEmail` table tests; sync `reconcile.run` with mocked Graph/store/index/publisher (cache hit/miss, set + clear, per-account-failure paths).
- **Unit:** model round-trip (`StatusInCall`); `pkg/msgraph` `GetUsersByAccounts` (batch/chunk/eventual-header) + ROPC `GetPresencesByUserId` (httptest, grant-type/username asserted, token never logged); sync `isInCall`/`localPart` table tests; sync `reconcile.run` with mocked Graph/store/index/publisher (cache hit/miss, case-insensitive match, set + clear, per-account-failure paths).
- **Integration (testcontainers Valkey, `pkg/testutil`):** precedence matrix (away/appear_offline beat in-call; in-call beats online/busy; offline invariant; clear restores connection-derived) and `SetExternal` in `presencestore`.
- **Gates:** `make test`, `make lint`, `make sast` (no medium+), `make test-integration SERVICE=user-presence-service`. ≥80% coverage on new packages.

Expand Down
108 changes: 86 additions & 22 deletions pkg/msgraph/msgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,23 @@ type Client interface {
// concurrent or repeated calls with the same ExternalID return the same
// meeting — Graph itself is the idempotency source of truth.
CreateOnlineMeeting(ctx context.Context, req CreateOnlineMeetingRequest) (*OnlineMeeting, error)
}

// DirectoryReader resolves accounts to directory users. Kept separate from
// Client so consumers that only need meetings (room-service) don't depend on
// the user-lookup surface. App-only (User.Read.All).
type DirectoryReader interface {
// GetUsersByAccounts resolves account local-parts to users by matching
// startsWith(userPrincipalName,'account@') — domain-agnostic, so accounts
// under different domains still resolve. Accounts are batched into chunked
// $filter queries.
GetUsersByAccounts(ctx context.Context, accounts []string) ([]GraphUser, error)
}

// GetUserByPrincipalName looks up a single user by userPrincipalName
// (account@domain). Returns (nil, nil) when no such user exists. App-only
// (User.Read.All).
GetUserByPrincipalName(ctx context.Context, upn string) (*GraphUser, error)
// NewDirectoryClient returns an app-only directory reader (shares the graph
// client used for meetings; New always returns a *graphClient).
func NewDirectoryClient(cfg Config, opts ...Option) DirectoryReader {
return New(cfg, opts...).(*graphClient)
}

// GraphUser is the subset of a Graph user resource the presence sync needs.
Expand Down Expand Up @@ -198,41 +210,93 @@ func (g *graphClient) accessToken(ctx context.Context) (string, error) {
return g.token, nil
}

// GetUserByPrincipalName resolves a single user by userPrincipalName. The
// /users/{upn} path lookup resolves case-insensitively in Azure AD, so the
// caller need not try multiple casings. Returns (nil, nil) on 404 so a
// non-Teams account is simply skipped.
func (g *graphClient) GetUserByPrincipalName(ctx context.Context, upn string) (*GraphUser, error) {
// maxUserFilterClauses caps startsWith clauses per $filter query. Microsoft
// Graph rejects overly complex filters, so accounts are looked up in chunks.
const maxUserFilterClauses = 15

// maxAccountsPerQuery bounds accounts per query. Each account emits both a
// lower- and upper-cased startsWith clause (see casedVariants), so up to two
// clauses each — keep the total within maxUserFilterClauses.
const maxAccountsPerQuery = maxUserFilterClauses / 2

// GetUsersByAccounts resolves account local-parts to users, matching
// startsWith(userPrincipalName,'account@') so any domain resolves. Both the
// lower- and upper-cased account are matched (rather than relying on Graph's
// case-insensitivity). Accounts are batched into chunked $filter queries.
func (g *graphClient) GetUsersByAccounts(ctx context.Context, accounts []string) ([]GraphUser, error) {
if len(accounts) == 0 {
return nil, nil
}
token, err := g.accessToken(ctx)
if err != nil {
return nil, fmt.Errorf("acquire graph token: %w", err)
}
endpoint := g.baseURL + "/users/" + url.PathEscape(upn) + "?$select=id,mail,userPrincipalName"
var out []GraphUser
for start := 0; start < len(accounts); start += maxAccountsPerQuery {
end := min(start+maxAccountsPerQuery, len(accounts))
batch, err := g.usersByAccountChunk(ctx, token, accounts[start:end])
if err != nil {
return nil, err
}
out = append(out, batch...)
}
return out, nil
}

// casedVariants returns the lower- and upper-cased forms of s (deduped when the
// value has no cased letters), so the startsWith filter matches accounts stored
// in either case.
func casedVariants(s string) []string {
lower, upper := strings.ToLower(s), strings.ToUpper(s)
if lower == upper {
return []string{lower}
}
return []string{lower, upper}
}

func (g *graphClient) usersByAccountChunk(ctx context.Context, token string, chunk []string) ([]GraphUser, error) {
clauses := make([]string, 0, len(chunk)*2)
for _, a := range chunk {
for _, variant := range casedVariants(a) {
// Escape single quotes for the OData string literal.
esc := strings.ReplaceAll(variant, "'", "''")
clauses = append(clauses, fmt.Sprintf("startsWith(userPrincipalName,'%s@')", esc))
}
}
q := url.Values{}
q.Set("$filter", strings.Join(clauses, " or "))
q.Set("$select", "id,mail,userPrincipalName")
q.Set("$count", "true")
q.Set("$top", "999")
endpoint := g.baseURL + "/users?" + q.Encode()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return nil, fmt.Errorf("build get-user request: %w", err)
return nil, fmt.Errorf("build get-users request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+token)
// startsWith on userPrincipalName is an advanced query — Graph requires
// eventual consistency (paired with $count).
req.Header.Set("ConsistencyLevel", "eventual")
resp, err := g.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("get user: %w", err)
return nil, fmt.Errorf("get users: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<22))
if err != nil {
return nil, fmt.Errorf("read get-user response: %w", err)
}
if resp.StatusCode == http.StatusNotFound {
return nil, nil
return nil, fmt.Errorf("read get-users response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("get user: graph returned status %d", resp.StatusCode)
return nil, fmt.Errorf("get users: graph returned status %d", resp.StatusCode)
}
var page struct {
Value []GraphUser `json:"value"`
}
var u GraphUser
if err := json.Unmarshal(body, &u); err != nil {
return nil, fmt.Errorf("decode get-user response: %w", err)
if err := json.Unmarshal(body, &page); err != nil {
return nil, fmt.Errorf("decode get-users response: %w", err)
}
return &u, nil
return page.Value, nil
}

// onlineMeetingPayload is the Graph createOrGet-onlineMeeting request body.
Expand Down
78 changes: 50 additions & 28 deletions pkg/msgraph/msgraph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package msgraph
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
Expand All @@ -21,6 +22,15 @@ func newTestClient(tokenURL, baseURL string) Client {
)
}

// newTestDirectory wires a DirectoryReader at the given token + graph servers.
func newTestDirectory(tokenURL, baseURL string) DirectoryReader {
return NewDirectoryClient(
Config{TenantID: "t", ClientID: "c", ClientSecret: "s"},
WithTokenURL(tokenURL),
WithBaseURL(baseURL),
)
}

func TestCreateOnlineMeeting_Success(t *testing.T) {
var tokenCalls, meetingCalls int

Expand Down Expand Up @@ -185,58 +195,70 @@ func TestNew_TLSInsecureSkipVerify(t *testing.T) {
assert.True(t, tr.TLSClientConfig.InsecureSkipVerify)
}

func TestGetUserByPrincipalName_Found(t *testing.T) {
func TestGetUsersByAccounts_BatchesAndReturns(t *testing.T) {
tokenSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_ = json.NewEncoder(w).Encode(tokenResponse{AccessToken: "tok", ExpiresIn: 3600}) // #nosec G117 -- test mock OAuth token
}))
defer tokenSrv.Close()

graphSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "Bearer tok", r.Header.Get("Authorization"))
// Targeted single-user path lookup (escaped @), not a tenant-wide list.
assert.True(t, strings.Contains(r.URL.Path, "/users/alice%40corp.com") ||
strings.Contains(r.URL.Path, "/users/alice@corp.com"), "path %s", r.URL.Path)
_ = json.NewEncoder(w).Encode(GraphUser{ID: "ida", Mail: "alice@corp.com", UserPrincipalName: "alice@corp.com"})
assert.Equal(t, "eventual", r.Header.Get("ConsistencyLevel"), "startsWith needs advanced query")
filter := r.URL.Query().Get("$filter")
// Domain-agnostic prefix match; both lower- and upper-cased variants of
// each account OR'd into one query.
assert.Contains(t, filter, "startsWith(userPrincipalName,'alice@')")
assert.Contains(t, filter, "startsWith(userPrincipalName,'ALICE@')")
assert.Contains(t, filter, "startsWith(userPrincipalName,'bob@')")
assert.Contains(t, filter, "startsWith(userPrincipalName,'BOB@')")
assert.Contains(t, filter, " or ")
_ = json.NewEncoder(w).Encode(map[string]any{"value": []GraphUser{
{ID: "ida", UserPrincipalName: "alice@corp.com"},
{ID: "idb", UserPrincipalName: "bob@partner.io"}, // different domain
}})
}))
defer graphSrv.Close()

c := newTestClient(tokenSrv.URL, graphSrv.URL)
u, err := c.GetUserByPrincipalName(context.Background(), "alice@corp.com")
c := newTestDirectory(tokenSrv.URL, graphSrv.URL)
users, err := c.GetUsersByAccounts(context.Background(), []string{"alice", "bob"})
require.NoError(t, err)
require.NotNil(t, u)
assert.Equal(t, "ida", u.ID)
require.Len(t, users, 2)
assert.Equal(t, "ida", users[0].ID)
assert.Equal(t, "bob@partner.io", users[1].UserPrincipalName)
}

func TestGetUserByPrincipalName_NotFound(t *testing.T) {
func TestGetUsersByAccounts_ChunksLargeInput(t *testing.T) {
tokenSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_ = json.NewEncoder(w).Encode(tokenResponse{AccessToken: "tok", ExpiresIn: 3600}) // #nosec G117 -- test mock OAuth token
}))
defer tokenSrv.Close()

var calls int
graphSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNotFound)
calls++
_ = json.NewEncoder(w).Encode(map[string]any{"value": []GraphUser{}})
}))
defer graphSrv.Close()

c := newTestClient(tokenSrv.URL, graphSrv.URL)
u, err := c.GetUserByPrincipalName(context.Background(), "ghost@corp.com")
accounts := make([]string, maxAccountsPerQuery+1) // one over a chunk -> 2 requests
for i := range accounts {
accounts[i] = fmt.Sprintf("u%d", i)
}
c := newTestDirectory(tokenSrv.URL, graphSrv.URL)
_, err := c.GetUsersByAccounts(context.Background(), accounts)
require.NoError(t, err)
assert.Nil(t, u, "404 -> (nil, nil) so the account is skipped")
assert.Equal(t, 2, calls, "accounts beyond one chunk trigger a second query")
}

func TestGetUserByPrincipalName_Error(t *testing.T) {
tokenSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_ = json.NewEncoder(w).Encode(tokenResponse{AccessToken: "tok", ExpiresIn: 3600}) // #nosec G117 -- test mock OAuth token
}))
defer tokenSrv.Close()

graphSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusForbidden)
}))
defer graphSrv.Close()
func TestCasedVariants(t *testing.T) {
assert.Equal(t, []string{"alice", "ALICE"}, casedVariants("alice"))
assert.Equal(t, []string{"alice", "ALICE"}, casedVariants("Alice"))
assert.Equal(t, []string{"123"}, casedVariants("123"), "caseless value -> single clause")
}

c := newTestClient(tokenSrv.URL, graphSrv.URL)
_, err := c.GetUserByPrincipalName(context.Background(), "alice@corp.com")
require.Error(t, err)
assert.Contains(t, err.Error(), "403")
func TestGetUsersByAccounts_Empty(t *testing.T) {
c := NewDirectoryClient(Config{TenantID: "t"})
users, err := c.GetUsersByAccounts(context.Background(), nil)
require.NoError(t, err)
assert.Empty(t, users)
}
Loading
Loading