Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions generate-sfu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env bash

set -e

REPO_URL="git@github.com:GetStream/protocol.git"
REFERENCE_TYPE="branch"
REFERENCE_VALUE="main"

PROJECT_ROOT="$(dirname "$(realpath "$0")")/"
BUILD_DIR="$PROJECT_ROOT/build"
CLONE_DIR="$BUILD_DIR/protocol-repo"
OUTPUT_CLIENT_PATH="$PROJECT_ROOT/stream-video-android-core/src/main/proto/video/sfu/"

# Step 1: Delete OUTPUT_CLIENT_PATH if exists else create an empty directory
echo "🧹 Preparing output directory: $OUTPUT_CLIENT_PATH"
rm -rf "$OUTPUT_CLIENT_PATH"
mkdir -p "$OUTPUT_CLIENT_PATH"

# Step 2: Clone the repository with shallow depth
echo "🚀 Cloning repository: $REPO_URL (Type: $REFERENCE_TYPE, Value: $REFERENCE_VALUE)..."
rm -rf "$CLONE_DIR"
mkdir -p "$BUILD_DIR"
git clone --depth=1 --branch "$REFERENCE_VALUE" "$REPO_URL" "$CLONE_DIR"

cd "$CLONE_DIR"

# Step 3: Checkout to the correct branch, tag, or commit
if [ "$REFERENCE_TYPE" == "branch" ]; then
git checkout "$REFERENCE_VALUE"
elif [ "$REFERENCE_TYPE" == "tag" ]; then
git fetch --tags
git checkout "tags/$REFERENCE_VALUE"
elif [ "$REFERENCE_TYPE" == "commit" ]; then
git fetch --depth=1 origin "$REFERENCE_VALUE"
git checkout "$REFERENCE_VALUE"
Comment thread
rahul-lohra marked this conversation as resolved.
else
echo "❌ ERROR: Invalid reference type '$REFERENCE_TYPE'. Use 'branch', 'tag', or 'commit'."
exit 1
fi

# Step 4: Copy content from CLONE_DIR/protobuf/video/sfu to OUTPUT_CLIENT_PATH
echo "📦 Copying proto files..."
SOURCE_PROTO_PATH="$CLONE_DIR/protobuf/video/sfu"

if [ ! -d "$SOURCE_PROTO_PATH" ]; then
echo "❌ ERROR: Source proto directory does not exist: $SOURCE_PROTO_PATH"
exit 1
fi

cp -R "$SOURCE_PROTO_PATH/"* "$OUTPUT_CLIENT_PATH"

# Step 5: Run Spotless (from project root, not inside cloned repo)
echo "✨ Running Spotless..."
cd "$PROJECT_ROOT"
./gradlew spotlessApply

# Step 6: Delete CLONE_DIR
echo "🗑 Cleaning up cloned repo..."
rm -rf "$CLONE_DIR"

echo "✅ Done."
205 changes: 186 additions & 19 deletions stream-video-android-core/api/stream-video-android-core.api

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.getstream.video.android.core

import android.app.Notification
import android.os.Bundle
import android.util.Log
import androidx.compose.runtime.Stable
import androidx.core.app.NotificationManagerCompat
import io.getstream.android.video.generated.models.BlockedUserEvent
Expand Down Expand Up @@ -165,6 +166,9 @@ import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.collections.map
import kotlin.collections.none
import kotlin.collections.toMutableList
import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toDuration
Expand Down Expand Up @@ -207,6 +211,8 @@ public sealed interface RealtimeConnection {
public data object Disconnected : RealtimeConnection // normal disconnect by the app
}

private typealias SessionId = String

/**
* The CallState class keeps all state for a call
* It's available on every call object
Expand Down Expand Up @@ -308,12 +314,12 @@ public class CallState(
private val _dominantSpeaker: MutableStateFlow<ParticipantState?> = MutableStateFlow(null)
public val dominantSpeaker: StateFlow<ParticipantState?> = _dominantSpeaker

internal val _localPins: MutableStateFlow<Map<String, PinUpdateAtTime>> =
internal val _localPins: MutableStateFlow<Map<SessionId, PinUpdateAtTime>> =
MutableStateFlow(emptyMap())
internal val _serverPins: MutableStateFlow<Map<String, PinUpdateAtTime>> =
internal val _serverPins: MutableStateFlow<Map<SessionId, PinUpdateAtTime>> =
MutableStateFlow(emptyMap())

internal val _pinnedParticipants: StateFlow<Map<String, OffsetDateTime>> =
internal val _pinnedParticipants: StateFlow<Map<SessionId, OffsetDateTime>> =
combine(_localPins, _serverPins) { local, server ->
val combined = mutableMapOf<String, PinUpdateAtTime>()
combined.putAll(local)
Expand All @@ -326,7 +332,7 @@ public class CallState(
/**
* Pinned participants, combined value both from server and local pins.
*/
val pinnedParticipants: StateFlow<Map<String, OffsetDateTime>> = _pinnedParticipants
val pinnedParticipants: StateFlow<Map<SessionId, OffsetDateTime>> = _pinnedParticipants

val stats = CallStats(call, scope)

Expand Down Expand Up @@ -1079,6 +1085,8 @@ public class CallState(
"[ParticipantJoinedEvent] #participants; #debounce; Failed to debounce, processing as usual."
}
getOrCreateParticipant(event.participant)
} finally {
updateServerSidePins(internalParticipants, event)
}
}

Expand All @@ -1096,8 +1104,14 @@ public class CallState(
// Remove any pins for the participant
unpin(sessionId)
}

Log.d(
"Noob",
"[ParticipantLeftEvent], _serverPins: ${_serverPins.value.map { it.key }.joinToString(
",",
) }",
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
if (_serverPins.value.containsKey(sessionId)) {
_serverPins.value = _serverPins.value.filter { it.key != event.participant.session_id }
scope.launch {
call.unpinForEveryone(sessionId, event.participant.user_id)
}
Expand Down Expand Up @@ -1252,10 +1266,19 @@ public class CallState(
}

private fun updateServerSidePins(pins: List<PinUpdate>) {
val internalParticipantsText =
internalParticipants.toList().joinToString(",") {
"[name: ${it.second.name.value}, sessionId: ${it.second.sessionId}]"
}

// Update participants that are still in the call
val pinnedInCall = pins.filter {
internalParticipants.containsKey(it.sessionId)
}
Log.d(
"Noob",
"[updateServerSidePins] 2, pins: $pins, internalParticipantsText: $internalParticipantsText, pinnedInCall: $pinnedInCall",
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
_serverPins.value = pinnedInCall.associate {
Pair(
it.sessionId,
Expand All @@ -1264,6 +1287,27 @@ public class CallState(
}
}

internal fun updateServerSidePins(internalParticipants: Map<SessionId, ParticipantState>, event: ParticipantJoinedEvent) {
val participantSessionId = event.participant.session_id
if (internalParticipants.containsKey(participantSessionId)) {
if (event.isPinned) {
val pinUpdate =
PinUpdate(event.participant.user_id, participantSessionId)
val tempPinUpdateList = _serverPins.value.map { it.value.it }
val participantIsNotPresent =
tempPinUpdateList.none { it.sessionId == participantSessionId }
if (participantIsNotPresent) {
val updatedList = tempPinUpdateList.toMutableList().apply {
add(pinUpdate)
}
updateServerSidePins(updatedList)
}
} else {
_serverPins.value = _serverPins.value.filter { it.key != participantSessionId }
}
}
}

private fun updateRingingState(rejectReason: RejectReason? = null) {
when (ringingState.value) {
RingingState.TimeoutNoAnswer, RingingState.RejectedByAll -> {
Expand Down Expand Up @@ -1458,6 +1502,9 @@ public class CallState(
}

public fun upsertParticipants(participants: List<ParticipantState>) {
val participantNames = participants.joinToString(",") {
"[name:${it.name.value}, sessionId:${it.sessionId}]"
}
Comment thread
rahul-lohra marked this conversation as resolved.
Outdated
val screensharing = mutableListOf<ParticipantState>()
participants.forEach {
internalParticipants[it.sessionId] = it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public object RTCEventMapper {
}

event.participant_joined != null -> with(event.participant_joined) {
ParticipantJoinedEvent(participant!!, call_cid)
ParticipantJoinedEvent(participant!!, call_cid, is_pinned)
}

event.participant_left != null -> with(event.participant_left) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import stream.video.sfu.signal.ICERestartResponse
import stream.video.sfu.signal.ICETrickleResponse
import stream.video.sfu.signal.SendAnswerRequest
import stream.video.sfu.signal.SendAnswerResponse
import stream.video.sfu.signal.SendMetricsRequest
import stream.video.sfu.signal.SendMetricsResponse
import stream.video.sfu.signal.SendStatsRequest
import stream.video.sfu.signal.SendStatsResponse
import stream.video.sfu.signal.SetPublisherRequest
Expand Down Expand Up @@ -97,6 +99,10 @@ internal class SignalLostSignalingServiceDecorator(private val decorated: Signal
}
}

override suspend fun sendMetrics(sendMetricsRequest: SendMetricsRequest): SendMetricsResponse {
return decorated.sendMetrics(sendMetricsRequest)
}

override suspend fun startNoiseCancellation(startNoiseCancellationRequest: StartNoiseCancellationRequest): StartNoiseCancellationResponse {
return decorated.startNoiseCancellation(startNoiseCancellationRequest).onError {
if (error?.code == ErrorCode.ERROR_CODE_PARTICIPANT_SIGNAL_LOST) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ public data class TrackUnpublishedEvent(
) : SfuDataEvent()

public data class ParticipantJoinedEvent(

val participant: Participant,
val callCid: String,
val isPinned: Boolean,
) : SfuDataEvent()

public data class ParticipantLeftEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import stream.video.sfu.signal.ICERestartResponse
import stream.video.sfu.signal.ICETrickleResponse
import stream.video.sfu.signal.SendAnswerRequest
import stream.video.sfu.signal.SendAnswerResponse
import stream.video.sfu.signal.SendMetricsRequest
import stream.video.sfu.signal.SendMetricsResponse
import stream.video.sfu.signal.SendStatsRequest
import stream.video.sfu.signal.SendStatsResponse
import stream.video.sfu.signal.SetPublisherRequest
Expand Down Expand Up @@ -123,6 +125,10 @@ internal class SignalingServiceTracerDecorator<T : SignalServerService>(
return target.sendStats(sendStatsRequest)
}

override suspend fun sendMetrics(sendMetricsRequest: SendMetricsRequest): SendMetricsResponse {
return target.sendMetrics(sendMetricsRequest)
}

override suspend fun startNoiseCancellation(startNoiseCancellationRequest: StartNoiseCancellationRequest): StartNoiseCancellationResponse {
tracer.trace("startNoiseCancellation", startNoiseCancellationRequest.toString())
val response = target.startNoiseCancellation(startNoiseCancellationRequest)
Expand Down
Loading
Loading