Skip to content

Commit 37e2ead

Browse files
google-genai-botcopybara-github
authored andcommitted
Add an abstract service to enable us to override code for bidi streaming features.
PiperOrigin-RevId: 813631432
1 parent d7a2dd9 commit 37e2ead

13 files changed

+541
-150
lines changed

src/app/app.component.spec.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import {GRAPH_SERVICE, GraphService} from './core/services/graph.service';
3434
import {SAFE_VALUES_SERVICE} from './core/services/interfaces/safevalues';
3535
import {STRING_TO_COLOR_SERVICE} from './core/services/interfaces/string-to-color';
3636
import {SESSION_SERVICE, SessionService,} from './core/services/session.service';
37+
import {STREAM_CHAT_SERVICE} from './core/services/stream-chat.service';
3738
import {MockAgentService} from './core/services/testing/mock-agent.service';
3839
import {MockArtifactService} from './core/services/testing/mock-artifact.service';
3940
import {MockAudioService} from './core/services/testing/mock-audio.service';
@@ -44,6 +45,7 @@ import {MockFeatureFlagService} from './core/services/testing/mock-feature-flag.
4445
import {MockGraphService} from './core/services/testing/mock-graph.service';
4546
import {MockSafeValuesService} from './core/services/testing/mock-safevalues.service';
4647
import {MockSessionService} from './core/services/testing/mock-session.service';
48+
import {MockStreamChatService} from './core/services/testing/mock-stream-chat.service';
4749
import {MockStringToColorService} from './core/services/testing/mock-string-to-color.service';
4850
import {MockTraceService} from './core/services/testing/mock-trace.service';
4951
import {MockVideoService} from './core/services/testing/mock-video.service';
@@ -62,6 +64,7 @@ describe('AppComponent', () => {
6264
const audioService = new MockAudioService();
6365
const webSocketService = new MockWebSocketService();
6466
const videoService = new MockVideoService();
67+
const streamChatService = new MockStreamChatService();
6568
const eventService = new MockEventService();
6669
const downloadService = new MockDownloadService();
6770
const evalService = new MockEvalService();
@@ -111,6 +114,10 @@ describe('AppComponent', () => {
111114
provide: VIDEO_SERVICE,
112115
useValue: videoService,
113116
},
117+
{
118+
provide: STREAM_CHAT_SERVICE,
119+
useValue: streamChatService,
120+
},
114121
{
115122
provide: EVENT_SERVICE,
116123
useValue: eventService,

src/app/components/chat/chat.component.spec.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import {GRAPH_SERVICE, GraphService} from '../../core/services/graph.service';
4040
import {SAFE_VALUES_SERVICE} from '../../core/services/interfaces/safevalues';
4141
import {STRING_TO_COLOR_SERVICE} from '../../core/services/interfaces/string-to-color';
4242
import {SESSION_SERVICE, SessionService,} from '../../core/services/session.service';
43+
import {STREAM_CHAT_SERVICE} from '../../core/services/stream-chat.service';
4344
import {MockAgentService} from '../../core/services/testing/mock-agent.service';
4445
import {MockArtifactService} from '../../core/services/testing/mock-artifact.service';
4546
import {MockAudioService} from '../../core/services/testing/mock-audio.service';
@@ -50,6 +51,7 @@ import {MockFeatureFlagService} from '../../core/services/testing/mock-feature-f
5051
import {MockGraphService} from '../../core/services/testing/mock-graph.service';
5152
import {MockSafeValuesService} from '../../core/services/testing/mock-safevalues.service';
5253
import {MockSessionService} from '../../core/services/testing/mock-session.service';
54+
import {MockStreamChatService} from '../../core/services/testing/mock-stream-chat.service';
5355
import {MockStringToColorService} from '../../core/services/testing/mock-string-to-color.service';
5456
import {MockTraceService} from '../../core/services/testing/mock-trace.service';
5557
import {MockVideoService} from '../../core/services/testing/mock-video.service';
@@ -108,6 +110,7 @@ describe('ChatComponent', () => {
108110
let mockAudioService: MockAudioService;
109111
let mockWebSocketService: MockWebSocketService;
110112
let mockVideoService: MockVideoService;
113+
let mockStreamChatService: MockStreamChatService;
111114
let mockEventService: MockEventService;
112115
let mockDownloadService: MockDownloadService;
113116
let mockEvalService: MockEvalService;
@@ -129,6 +132,7 @@ describe('ChatComponent', () => {
129132
mockAudioService = new MockAudioService();
130133
mockWebSocketService = new MockWebSocketService();
131134
mockVideoService = new MockVideoService();
135+
mockStreamChatService = new MockStreamChatService();
132136
mockEventService = new MockEventService();
133137
mockDownloadService = new MockDownloadService();
134138
mockEvalService = new MockEvalService();
@@ -197,6 +201,7 @@ describe('ChatComponent', () => {
197201
{provide: WEBSOCKET_SERVICE, useValue: mockWebSocketService},
198202
{provide: VIDEO_SERVICE, useValue: mockVideoService},
199203
{provide: EVENT_SERVICE, useValue: mockEventService},
204+
{provide: STREAM_CHAT_SERVICE, useValue: mockStreamChatService},
200205
{provide: DOWNLOAD_SERVICE, useValue: mockDownloadService},
201206
{provide: EVAL_SERVICE, useValue: mockEvalService},
202207
{provide: TRACE_SERVICE, useValue: mockTraceService},

src/app/components/chat/chat.component.ts

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ import {Session, SessionState} from '../../core/models/Session';
4747
import {Event as AdkEvent} from '../../core/models/types';
4848
import {AGENT_SERVICE, AgentService} from '../../core/services/agent.service';
4949
import {ARTIFACT_SERVICE, ArtifactService} from '../../core/services/artifact.service';
50-
import {AUDIO_SERVICE, AudioService} from '../../core/services/audio.service';
5150
import {DOWNLOAD_SERVICE, DownloadService} from '../../core/services/download.service';
5251
import {EVAL_SERVICE, EvalService} from '../../core/services/eval.service';
5352
import {EVENT_SERVICE, EventService} from '../../core/services/event.service';
@@ -56,9 +55,8 @@ import {GRAPH_SERVICE, GraphService} from '../../core/services/graph.service';
5655
import {SAFE_VALUES_SERVICE, SafeValuesService} from '../../core/services/interfaces/safevalues';
5756
import {STRING_TO_COLOR_SERVICE} from '../../core/services/interfaces/string-to-color';
5857
import {SESSION_SERVICE, SessionService} from '../../core/services/session.service';
58+
import {STREAM_CHAT_SERVICE, StreamChatService} from '../../core/services/stream-chat.service';
5959
import {TRACE_SERVICE, TraceService} from '../../core/services/trace.service';
60-
import {VIDEO_SERVICE, VideoService} from '../../core/services/video.service';
61-
import {WEBSOCKET_SERVICE, WebSocketService} from '../../core/services/websocket.service';
6260
import {ResizableBottomDirective} from '../../directives/resizable-bottom.directive';
6361
import {ResizableDrawerDirective} from '../../directives/resizable-drawer.directive';
6462
import {ArtifactTabComponent, getMediaTypeFromMimetype, MediaType} from '../artifact-tab/artifact-tab.component';
@@ -265,9 +263,7 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
265263
constructor(
266264
@Inject(SESSION_SERVICE) private sessionService: SessionService,
267265
@Inject(ARTIFACT_SERVICE) private artifactService: ArtifactService,
268-
@Inject(AUDIO_SERVICE) private audioService: AudioService,
269-
@Inject(WEBSOCKET_SERVICE) private webSocketService: WebSocketService,
270-
@Inject(VIDEO_SERVICE) private videoService: VideoService,
266+
@Inject(STREAM_CHAT_SERVICE) private streamChatService: StreamChatService,
271267
private dialog: MatDialog,
272268
@Inject(EVENT_SERVICE) private eventService: EventService,
273269
private route: ActivatedRoute,
@@ -297,7 +293,7 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
297293
this.syncSelectedAppFromUrl();
298294
this.updateSelectedAppUrl();
299295

300-
this.webSocketService.onCloseReason().subscribe((closeReason) => {
296+
this.streamChatService.onStreamClose().subscribe((closeReason) => {
301297
const error =
302298
'Please check server log for full details: \n' + closeReason;
303299
this.openSnackBar(error, 'OK');
@@ -951,7 +947,7 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
951947
}
952948

953949
ngOnDestroy(): void {
954-
this.webSocketService.closeConnection();
950+
this.streamChatService.closeStream();
955951
}
956952

957953
onAppSelection(event: any) {
@@ -980,11 +976,11 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
980976
}
981977

982978
this.isAudioRecording = true;
983-
const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws';
984-
this.webSocketService.connect(
985-
`${protocol}://${URLUtil.getWSServerUrl()}/run_live?app_name=${this.appName}&user_id=${this.userId}&session_id=${this.sessionId}`,
986-
);
987-
this.audioService.startRecording();
979+
this.streamChatService.startAudioChat({
980+
appName: this.appName,
981+
userId: this.userId,
982+
sessionId: this.sessionId,
983+
});
988984
this.messages.update(
989985
messages =>
990986
[...messages,
@@ -995,8 +991,7 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
995991
}
996992

997993
stopAudioRecording() {
998-
this.audioService.stopRecording();
999-
this.webSocketService.closeConnection();
994+
this.streamChatService.stopAudioChat();
1000995
this.isAudioRecording = false;
1001996
}
1002997

@@ -1015,12 +1010,12 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
10151010
return;
10161011
}
10171012
this.isVideoRecording = true;
1018-
const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws';
1019-
this.webSocketService.connect(
1020-
`${protocol}://${URLUtil.getWSServerUrl()}/run_live?app_name=${this.appName}&user_id=${this.userId}&session_id=${this.sessionId}`,
1021-
);
1022-
this.videoService.startRecording(videoContainer);
1023-
this.audioService.startRecording();
1013+
this.streamChatService.startVideoChat({
1014+
appName: this.appName,
1015+
userId: this.userId,
1016+
sessionId: this.sessionId,
1017+
videoContainer,
1018+
});
10241019
this.messages.update(
10251020
messages => [...messages, {role: 'user', text: 'Speaking...'}]);
10261021
this.sessionHasUsedBidi.add(this.sessionId);
@@ -1031,9 +1026,7 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
10311026
if (!videoContainer) {
10321027
return;
10331028
}
1034-
this.audioService.stopRecording();
1035-
this.videoService.stopRecording(videoContainer);
1036-
this.webSocketService.closeConnection();
1029+
this.streamChatService.stopVideoChat(videoContainer);
10371030
this.isVideoRecording = false;
10381031
}
10391032

src/app/core/services/audio.service.spec.ts

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
import {fakeAsync, TestBed, tick} from '@angular/core/testing';
1919

20-
import {AudioService} from './audio.service';
20+
import {AUDIO_WORKLET_MODULE_PATH, AudioService} from './audio.service';
2121
import {MockWebSocketService} from './testing/mock-websocket.service';
22-
import {WebSocketService} from './websocket.service';
22+
import {WEBSOCKET_SERVICE, WebSocketService} from './websocket.service';
2323

2424
const AUDIO_PROCESSOR_PATH = './assets/audio-processor.js';
2525
const AUDIO_PROCESSOR_NAME = 'audio-processor';
@@ -69,7 +69,8 @@ describe('AudioService', () => {
6969
TestBed.configureTestingModule({
7070
providers: [
7171
AudioService,
72-
{provide: WebSocketService, useValue: webSocketServiceSpy},
72+
{provide: WEBSOCKET_SERVICE, useValue: webSocketServiceSpy},
73+
{provide: AUDIO_WORKLET_MODULE_PATH, useValue: AUDIO_PROCESSOR_PATH},
7374
],
7475
});
7576
service = TestBed.inject(AudioService);
@@ -125,16 +126,6 @@ describe('AudioService', () => {
125126
mockAudioContext.destination,
126127
);
127128
});
128-
129-
it('should set an interval to send buffered audio', fakeAsync(async () => {
130-
await service.startRecording();
131-
// Manually trigger onmessage to add data to buffer
132-
const audioData = new Float32Array([0.1, 0.2]);
133-
mockWorkletNode.port.onmessage({data: audioData});
134-
tick(250);
135-
expect(webSocketServiceSpy.sendMessage).toHaveBeenCalled();
136-
service.stopRecording();
137-
}));
138129
});
139130

140131
describe('stopRecording', () => {
@@ -155,16 +146,5 @@ describe('AudioService', () => {
155146
service.stopRecording();
156147
expect(mockAudioContext.close).toHaveBeenCalled();
157148
});
158-
159-
it('should clear interval', fakeAsync(async () => {
160-
await service.startRecording();
161-
const audioData = new Float32Array([0.1, 0.2]);
162-
mockWorkletNode.port.onmessage({data: audioData});
163-
tick(250);
164-
expect(webSocketServiceSpy.sendMessage).toHaveBeenCalledTimes(1);
165-
service.stopRecording();
166-
tick(250);
167-
expect(webSocketServiceSpy.sendMessage).toHaveBeenCalledTimes(1);
168-
}));
169149
});
170150
});

src/app/core/services/audio.service.ts

Lines changed: 32 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
* limitations under the License.
1616
*/
1717

18-
import {Injectable, InjectionToken} from '@angular/core';
18+
import {Inject, Injectable, InjectionToken} from '@angular/core';
19+
1920
import {LiveRequest} from '../models/LiveRequest';
20-
import {WebSocketService} from './websocket.service';
2121

2222
export const AUDIO_SERVICE = new InjectionToken<AudioService>('AudioService');
23+
export const AUDIO_WORKLET_MODULE_PATH =
24+
new InjectionToken<string>('AudioWorkletModulePath');
25+
2326

2427
@Injectable({
2528
providedIn: 'root',
@@ -31,23 +34,23 @@ export class AudioService {
3134
private source!: MediaStreamAudioSourceNode;
3235
private processor!: ScriptProcessorNode;
3336
private audioBuffer: Uint8Array[] = [];
34-
private audioIntervalId: any = null;
3537

36-
constructor(private wsService: WebSocketService) {}
38+
constructor(
39+
@Inject(AUDIO_WORKLET_MODULE_PATH)
40+
private readonly audioWorkletModulePath: string) {}
3741

3842
async startRecording() {
3943
try {
4044
this.stream = await navigator.mediaDevices.getUserMedia({audio: true});
4145

4246
this.audioContext = new AudioContext();
4347
await this.audioContext.audioWorklet.addModule(
44-
'./assets/audio-processor.js',
45-
);
48+
this.audioWorkletModulePath);
4649

4750
this.source = this.audioContext.createMediaStreamSource(this.stream);
4851
const workletNode = new AudioWorkletNode(
49-
this.audioContext,
50-
'audio-processor',
52+
this.audioContext,
53+
'audio-processor',
5154
);
5255

5356
workletNode.port.onmessage = (event) => {
@@ -58,37 +61,11 @@ export class AudioService {
5861

5962
this.source.connect(workletNode);
6063
workletNode.connect(this.audioContext.destination);
61-
this.audioIntervalId = setInterval(() => this.sendBufferedAudio(), 250);
6264
} catch (error) {
6365
console.error('Error accessing microphone:', error);
6466
}
6567
}
6668

67-
private sendBufferedAudio() {
68-
if (this.audioBuffer.length === 0) return;
69-
// Concatenate all accumulated chunks into one Uint8Array
70-
const totalLength = this.audioBuffer.reduce(
71-
(sum, chunk) => sum + chunk.length,
72-
0,
73-
);
74-
const combinedBuffer = new Uint8Array(totalLength);
75-
76-
let offset = 0;
77-
for (const chunk of this.audioBuffer) {
78-
combinedBuffer.set(chunk, offset);
79-
offset += chunk.length;
80-
}
81-
82-
const request: LiveRequest = {
83-
blob: {
84-
mime_type: 'audio/pcm',
85-
data: combinedBuffer,
86-
},
87-
};
88-
this.wsService.sendMessage(request);
89-
this.audioBuffer = [];
90-
}
91-
9269
stopRecording() {
9370
if (this.processor) {
9471
this.processor.disconnect();
@@ -102,10 +79,28 @@ export class AudioService {
10279
if (this.stream) {
10380
this.stream.getTracks().forEach((track) => track.stop());
10481
}
105-
if (this.audioIntervalId) {
106-
clearInterval(this.audioIntervalId);
107-
this.audioIntervalId = null;
82+
}
83+
84+
getCombinedAudioBuffer(): Uint8Array|void {
85+
if (this.audioBuffer.length === 0) return;
86+
// Concatenate all accumulated chunks into one Uint8Array
87+
const totalLength = this.audioBuffer.reduce(
88+
(sum, chunk) => sum + chunk.length,
89+
0,
90+
);
91+
const combinedBuffer = new Uint8Array(totalLength);
92+
93+
let offset = 0;
94+
for (const chunk of this.audioBuffer) {
95+
combinedBuffer.set(chunk, offset);
96+
offset += chunk.length;
10897
}
98+
99+
return combinedBuffer;
100+
}
101+
102+
cleanAudioBuffer() {
103+
this.audioBuffer = [];
109104
}
110105

111106
private float32ToPCM(audioData: Float32Array): Uint8Array {

0 commit comments

Comments
 (0)