Skip to content

Commit d987fc9

Browse files
committed
stream implementation for combined client
Pull Request: #3245
1 parent c42ee79 commit d987fc9

File tree

8 files changed

+148
-9
lines changed

8 files changed

+148
-9
lines changed

.github/actions/setup-nix/action.yml

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ inputs:
1111
description: "Enable sccache with GitHub Actions cache backend"
1212
required: false
1313
default: "false"
14+
gc-max-store-size:
15+
description: "Max Nix store size before garbage collection (e.g. 5G, 10G). Only applies on WarpBuild runners."
16+
required: false
17+
default: "8G"
1418
runs:
1519
using: "composite"
1620
steps:
@@ -19,17 +23,99 @@ runs:
1923
github_access_token: ${{ inputs.github-token }}
2024
extra_nix_config: |
2125
accept-flake-config = true
22-
- name: Nix store cache (WarpBuild)
26+
- name: Check Nix store cache (WarpBuild)
27+
id: nix-cache-check
28+
if: startsWith(runner.name, 'warp-')
29+
uses: WarpBuilds/cache/restore@v1
30+
with:
31+
path: |
32+
/nix/store
33+
/nix/var/nix/db
34+
/nix/var/nix/profiles
35+
key: nix-store-${{ runner.os }}-${{ runner.arch }}-${{ hashFiles('flake.lock') }}-${{ github.job }}-${{ github.run_id }}
36+
restore-keys: |
37+
nix-store-${{ runner.os }}-${{ runner.arch }}-${{ hashFiles('flake.lock') }}-${{ github.job }}-
38+
nix-store-${{ runner.os }}-${{ runner.arch }}-${{ hashFiles('flake.lock') }}-
39+
nix-store-${{ runner.os }}-${{ runner.arch }}-
40+
lookup-only: true
41+
- name: Prepare Nix directories for cache restore
42+
if: startsWith(runner.name, 'warp-') && steps.nix-cache-check.outputs.cache-matched-key != ''
43+
run: |
44+
# Clear store so cache restore doesn't conflict with paths the
45+
# Nix installer already created. The cached archive includes
46+
# Nix itself, so it will be fully restored from cache.
47+
sudo rm -rf /nix/store /nix/var/nix/db /nix/var/nix/profiles
48+
# Recreate directories owned by runner user so tar can
49+
# write during restore and read during save
50+
sudo mkdir -p /nix/store /nix/var/nix/db /nix/var/nix/profiles
51+
sudo chown $USER /nix/store /nix/var/nix/db /nix/var/nix/profiles
52+
shell: bash
53+
- name: Restore Nix store cache (WarpBuild)
2354
if: startsWith(runner.name, 'warp-')
2455
uses: WarpBuilds/cache@v1
2556
with:
2657
path: |
2758
/nix/store
2859
/nix/var/nix/db
2960
/nix/var/nix/profiles
30-
key: nix-store-${{ runner.os }}-${{ runner.arch }}-${{ hashFiles('flake.lock') }}
61+
key: nix-store-${{ runner.os }}-${{ runner.arch }}-${{ hashFiles('flake.lock') }}-${{ github.job }}-${{ github.run_id }}
3162
restore-keys: |
63+
nix-store-${{ runner.os }}-${{ runner.arch }}-${{ hashFiles('flake.lock') }}-${{ github.job }}-
64+
nix-store-${{ runner.os }}-${{ runner.arch }}-${{ hashFiles('flake.lock') }}-
3265
nix-store-${{ runner.os }}-${{ runner.arch }}-
66+
- name: Fix Nix directory ownership for cache save
67+
if: startsWith(runner.name, 'warp-') && steps.nix-cache-check.outputs.cache-matched-key == ''
68+
run: |
69+
# On cache miss the store is still root-owned from the installer.
70+
# Fix ownership so the post-step cache save can read all files.
71+
sudo chown -R $USER /nix/store /nix/var/nix/db /nix/var/nix/profiles
72+
shell: bash
73+
- name: Register post-step Nix store GC
74+
if: startsWith(runner.name, 'warp-')
75+
uses: webiny/action-post-run@3.1.0
76+
with:
77+
run: bash ${{ github.workspace }}/.github/scripts/nix-store-gc.sh ${{ inputs.gc-max-store-size }}
78+
- name: Write Nix GC script
79+
if: startsWith(runner.name, 'warp-')
80+
run: |
81+
mkdir -p "${{ github.workspace }}/.github/scripts"
82+
cat > "${{ github.workspace }}/.github/scripts/nix-store-gc.sh" << 'NIXGCEOF'
83+
#!/usr/bin/env bash
84+
set -euo pipefail
85+
parse_size() {
86+
local val="${1%[GgMmKk]*}"
87+
local unit="${1: -1}"
88+
case "$unit" in
89+
G|g) echo $(( val * 1073741824 )) ;;
90+
M|m) echo $(( val * 1048576 )) ;;
91+
K|k) echo $(( val * 1024 )) ;;
92+
*) echo "$1" ;;
93+
esac
94+
}
95+
THRESHOLD="${1:-8G}"
96+
THRESHOLD_BYTES=$(parse_size "$THRESHOLD")
97+
if du -sb /nix/store >/dev/null 2>&1; then
98+
STORE_BYTES=$(du -sb /nix/store | cut -f1)
99+
else
100+
STORE_BYTES=$(( $(du -sk /nix/store | cut -f1) * 1024 ))
101+
fi
102+
STORE_MB=$(( STORE_BYTES / 1048576 ))
103+
echo "Nix store size: ${STORE_MB}MB, threshold: $THRESHOLD"
104+
if [ "$STORE_BYTES" -gt "$THRESHOLD_BYTES" ]; then
105+
echo "Store exceeds threshold, running garbage collection..."
106+
nix store gc 2>&1 || true
107+
if du -sb /nix/store >/dev/null 2>&1; then
108+
NEW_BYTES=$(du -sb /nix/store | cut -f1)
109+
else
110+
NEW_BYTES=$(( $(du -sk /nix/store | cut -f1) * 1024 ))
111+
fi
112+
echo "Store size after GC: $(( NEW_BYTES / 1048576 ))MB"
113+
else
114+
echo "Store is within limit, skipping GC"
115+
fi
116+
NIXGCEOF
117+
chmod +x "${{ github.workspace }}/.github/scripts/nix-store-gc.sh"
118+
shell: bash
33119
- uses: cachix/cachix-action@v16
34120
if: ${{ inputs.cachix-auth-token != '' }}
35121
with:

crates/xmtp_api_d14n/src/queries/combined.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use crate::d14n::FetchD14nCutover;
1515
use crate::protocol::CursorStore;
1616
use crate::protocol::FullXmtpApiArc;
1717

18+
mod streams;
19+
1820
type XmtpApiClient = FullXmtpApiArc<ApiClientError<GrpcError>>;
1921

2022
#[derive(Clone)]
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use crate::CombinedD14nClient;
2+
use crate::protocol::CursorStore;
3+
4+
use xmtp_api_grpc::error::GrpcError;
5+
use xmtp_proto::api::{ApiClientError, Client};
6+
use xmtp_proto::api_client::{BoxedGroupS, BoxedWelcomeS, XmtpMlsStreams};
7+
use xmtp_proto::types::{GroupId, InstallationId, TopicCursor};
8+
9+
#[xmtp_common::async_trait]
10+
impl<C, Store> XmtpMlsStreams for CombinedD14nClient<C, Store>
11+
where
12+
C: Client<Error = GrpcError>,
13+
<C as Client>::Stream: 'static,
14+
Store: CursorStore + Clone,
15+
{
16+
type Error = ApiClientError<GrpcError>;
17+
18+
type GroupMessageStream = BoxedGroupS<ApiClientError<GrpcError>>;
19+
20+
type WelcomeMessageStream = BoxedWelcomeS<ApiClientError<GrpcError>>;
21+
22+
async fn subscribe_group_messages(
23+
&self,
24+
group_ids: &[&GroupId],
25+
) -> Result<Self::GroupMessageStream, Self::Error> {
26+
Ok(self
27+
.choose_client()
28+
.await?
29+
.subscribe_group_messages(group_ids)
30+
.await?)
31+
}
32+
33+
async fn subscribe_group_messages_with_cursors(
34+
&self,
35+
topics: &TopicCursor,
36+
) -> Result<Self::GroupMessageStream, Self::Error> {
37+
Ok(self
38+
.choose_client()
39+
.await?
40+
.subscribe_group_messages_with_cursors(topics)
41+
.await?)
42+
}
43+
44+
async fn subscribe_welcome_messages(
45+
&self,
46+
installations: &[&InstallationId],
47+
) -> Result<Self::WelcomeMessageStream, Self::Error> {
48+
Ok(self
49+
.choose_client()
50+
.await?
51+
.subscribe_welcome_messages(installations)
52+
.await?)
53+
}
54+
}

crates/xmtp_api_grpc/src/streams.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ pub use try_from_item::*;
1818
mod fake_empty;
1919
pub use fake_empty::*;
2020

21+
mod multiplexed;
22+
pub use multiplexed::*;
23+
2124
use prost::bytes::Bytes;
2225
use tonic::{Response, Status, Streaming};
2326

crates/xmtp_mls/src/subscriptions/stream_utils/multiplexed.rs renamed to crates/xmtp_api_grpc/src/streams/multiplexed.rs

File renamed without changes.

crates/xmtp_mls/src/subscriptions/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ pub mod process_welcome;
2121
mod stream_all;
2222
mod stream_conversations;
2323
pub mod stream_messages;
24-
mod stream_utils;
2524

2625
#[cfg(any(test, feature = "test-utils"))]
2726
use crate::subscriptions::stream_messages::stream_stats::{StreamStatsWrapper, StreamWithStats};

crates/xmtp_mls/src/subscriptions/stream_conversations.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use super::{LocalEvents, Result, SubscribeError, process_welcome::ProcessWelcomeResult};
22
use crate::subscriptions::StreamKind;
3-
use crate::subscriptions::stream_utils::{MultiplexedStream, multiplexed};
43
use crate::{
54
context::XmtpSharedContext, groups::MlsGroup,
65
subscriptions::process_welcome::ProcessWelcomeFuture,
76
};
7+
use xmtp_api_grpc::streams::{MultiplexedStream, multiplexed};
88
use xmtp_common::task::JoinSet;
99
use xmtp_db::{consent_record::ConsentState, group::ConversationType};
1010

crates/xmtp_mls/src/subscriptions/stream_utils.rs

Lines changed: 0 additions & 5 deletions
This file was deleted.

0 commit comments

Comments
 (0)