@@ -14,7 +14,10 @@ use cosmic_client_toolkit::{
1414 toplevel_info:: { ToplevelInfo , ToplevelInfoState } ,
1515 workspace:: WorkspaceState ,
1616} ;
17- use futures:: channel:: oneshot;
17+ use futures:: {
18+ channel:: oneshot,
19+ stream:: { FuturesOrdered , Stream , StreamExt } ,
20+ } ;
1821use rustix:: fd:: { FromRawFd , RawFd } ;
1922use std:: {
2023 collections:: HashMap ,
@@ -158,6 +161,8 @@ impl AppData {
158161#[ derive( Default ) ]
159162struct SessionState {
160163 formats : Option < Formats > ,
164+ stopped : bool ,
165+ wakers : Vec < std:: task:: Waker > ,
161166}
162167
163168struct SessionInner {
@@ -175,17 +180,32 @@ impl Session {
175180 }
176181
177182 fn update < F : FnOnce ( & mut SessionState ) > ( & self , f : F ) {
178- f ( & mut self . 0 . state . lock ( ) . unwrap ( ) ) ;
183+ let mut state = self . 0 . state . lock ( ) . unwrap ( ) ;
184+ f ( & mut state) ;
185+ for waker in std:: mem:: take ( & mut state. wakers ) {
186+ waker. wake ( ) ;
187+ }
179188 self . 0 . condvar . notify_all ( ) ;
180189 }
181190
182- fn wait_for_formats < T , F : FnMut ( & Formats ) -> T > ( & self , mut cb : F ) -> T {
183- let data = self
184- . 0
185- . condvar
186- . wait_while ( self . 0 . state . lock ( ) . unwrap ( ) , |data| data. formats . is_none ( ) )
187- . unwrap ( ) ;
188- cb ( data. formats . as_ref ( ) . unwrap ( ) )
191+ /// Wait for the `Formats` to be sent from the compositor for the stream, and run
192+ /// a callback with the state mutex locked.
193+ ///
194+ /// If formats has not been sent, this will wait until it is received. It returns
195+ /// `None` if the server has sent `stopped`.
196+ pub async fn wait_for_formats < T , F : FnMut ( & Formats ) -> T > ( & self , mut cb : F ) -> Option < T > {
197+ std:: future:: poll_fn ( |context| {
198+ let mut state = self . 0 . state . lock ( ) . unwrap ( ) ;
199+ if state. stopped {
200+ std:: task:: Poll :: Ready ( None )
201+ } else if let Some ( formats) = & state. formats {
202+ std:: task:: Poll :: Ready ( Some ( cb ( formats) ) )
203+ } else {
204+ state. wakers . push ( context. waker ( ) . clone ( ) ) ;
205+ std:: task:: Poll :: Pending
206+ }
207+ } )
208+ . await
189209 }
190210
191211 /// Capture to `wl_buffer`, blocking until capture either succeeds or fails
@@ -289,35 +309,32 @@ impl WaylandHelper {
289309 }
290310 }
291311
292- pub async fn capture_output_toplevels_shm (
293- & self ,
312+ pub fn capture_output_toplevels_shm < ' a > (
313+ & ' a self ,
294314 output : & wl_output:: WlOutput ,
295315 overlay_cursor : bool ,
296- ) -> Vec < ShmImage < OwnedFd > > {
316+ ) -> impl Stream < Item = ShmImage < OwnedFd > > + ' a {
297317 // get the active workspace for this output
298318 // get the toplevels for that workspace
299319 // capture each toplevel
300320
301- let Some ( toplevels) = self
321+ let toplevels = self
302322 . inner
303323 . output_toplevels
304324 . lock ( )
305325 . unwrap ( )
306326 . get ( output)
307327 . cloned ( )
308- else {
309- return Vec :: new ( ) ;
310- } ;
328+ . unwrap_or_default ( ) ;
311329
312- // TODO is `FuturesOrdered` more optimal?
313- let mut images = Vec :: new ( ) ;
314- for foreign_toplevel in toplevels. into_iter ( ) {
315- let source = CaptureSource :: Toplevel ( foreign_toplevel. clone ( ) ) ;
316- if let Some ( image) = self . capture_source_shm ( source, overlay_cursor) . await {
317- images. push ( image) ;
318- }
319- }
320- images
330+ toplevels
331+ . into_iter ( )
332+ . map ( |foreign_toplevel| {
333+ let source = CaptureSource :: Toplevel ( foreign_toplevel. clone ( ) ) ;
334+ self . capture_source_shm ( source, overlay_cursor)
335+ } )
336+ . collect :: < FuturesOrdered < _ > > ( )
337+ . filter_map ( |x| async { x } )
321338 }
322339
323340 pub fn capture_source_session ( & self , source : CaptureSource , overlay_cursor : bool ) -> Session {
@@ -364,7 +381,9 @@ impl WaylandHelper {
364381 let session = self . capture_source_session ( source, overlay_cursor) ;
365382
366383 // TODO: Check that format has been advertised in `Formats`
367- let ( width, height) = session. wait_for_formats ( |formats| formats. buffer_size ) ;
384+ let ( width, height) = session
385+ . wait_for_formats ( |formats| formats. buffer_size )
386+ . await ?;
368387
369388 let fd = buffer:: create_memfd ( width, height) ;
370389 let buffer =
@@ -577,8 +596,13 @@ impl ScreencopyHandler for AppData {
577596 }
578597 }
579598
580- fn stopped ( & mut self , _conn : & Connection , _qh : & QueueHandle < Self > , _session : & CaptureSession ) {
581- // TODO
599+ fn stopped ( & mut self , _conn : & Connection , _qh : & QueueHandle < Self > , session : & CaptureSession ) {
600+ if let Some ( session) = Session :: for_session ( session) {
601+ session. update ( |data| {
602+ data. stopped = true ;
603+ } ) ;
604+ }
605+ // TODO signal users of session in some way?
582606 }
583607
584608 fn ready (
0 commit comments