@@ -480,110 +480,103 @@ static void send_batch_available_response(uint32_t session_id, size_t item_count
480480 send_batch_response (STORAGE_BATCH_AVAILABLE , session_id , item_count , more_available );
481481}
482482
483- /* Populate the pipe with all stored data
483+ /* Populate the pipe with the next pending item.
484484 *
485- * @return 0 on success (all data written or pipe full)
486- * @return -EIO on data retrieval or size validation error
485+ * Peeks the head item of the first non-empty type and writes it to the pipe.
486+ * The item is NOT removed from the backend; that only happens on STORAGE_BATCH_CONSUME.
487+ *
488+ * @return 0 on success (one item written to pipe)
489+ * @return -ENODATA if no items are available across all types
490+ * @return -EIO on peek or pipe write error
487491 */
488492static int populate_pipe (struct storage_state * state_object )
489493{
490494 const struct storage_backend * backend = storage_backend_get ();
491- size_t total_bytes_sent = 0 ;
492495
493496 state_object -> current_session .more_data = false;
494497
495- /* Populate pipe with all stored data */
496498 STRUCT_SECTION_FOREACH (storage_data , type ) {
497- int count = backend -> count (type );
498-
499- while (count > 0 ) {
500- int ret ;
501- size_t total_size ;
502- uint8_t item_buffer [sizeof (struct storage_pipe_header ) +
503- STORAGE_MAX_DATA_SIZE ];
504- struct storage_pipe_header * header =
505- (struct storage_pipe_header * )item_buffer ;
506- uint8_t * data = item_buffer + sizeof (struct storage_pipe_header );
507-
508- /* Peek at size without copying data (data = NULL) */
509- ret = backend -> peek (type , NULL , 0 );
510- if (ret == - EAGAIN ) {
511- /* No more data of this type */
512- break ;
513- } else if (ret < 0 ) {
514- LOG_ERR ("Failed to peek %s data size: %d" , type -> name , ret );
515-
516- return - EIO ;
517- }
518-
519- /* Prepare header with actual size */
520- header -> type = (uint8_t )type -> data_type ;
521-
522- if ((ret < 0 ) || (ret > (int )STORAGE_MAX_DATA_SIZE ) ||
523- (ret > UINT16_MAX )) {
524- LOG_ERR ("Invalid data size for header: %d" , ret );
525-
526- return - EIO ;
527- }
499+ int ret ;
500+ uint8_t item_buffer [sizeof (struct storage_pipe_header ) + STORAGE_MAX_DATA_SIZE ];
501+ struct storage_pipe_header * header = (struct storage_pipe_header * )item_buffer ;
502+ uint8_t * data = item_buffer + sizeof (struct storage_pipe_header );
503+ size_t total_size ;
504+
505+ /* Peek the head item */
506+ ret = backend -> peek (type , data , STORAGE_MAX_DATA_SIZE );
507+ if (ret == - EAGAIN ) {
508+ /* No items of this type, try next */
509+ continue ;
510+ } else if (ret < 0 ) {
511+ LOG_ERR ("Failed to peek %s data: %d" , type -> name , ret );
512+ return - EIO ;
513+ }
528514
529- header -> data_size = (uint16_t )ret ;
515+ if (ret > UINT16_MAX ) {
516+ LOG_ERR ("Peek returned oversized item for %s: %d" , type -> name , ret );
517+ return - EIO ;
518+ }
530519
531- /* Calculate exact total size needed using actual data size */
532- total_size = sizeof (struct storage_pipe_header ) + ret ;
533- if (total_size > sizeof (item_buffer )) {
534- LOG_ERR ("Combined data too large: %zu > %zu" ,
535- total_size , sizeof (item_buffer ));
520+ header -> type = (uint8_t )type -> data_type ;
521+ header -> data_size = (uint16_t )ret ;
522+ total_size = sizeof (struct storage_pipe_header ) + (size_t )ret ;
536523
537- return - EIO ;
538- }
524+ ret = pipe_write_all (& storage_pipe , item_buffer , total_size , PUB_TIMEOUT );
525+ if (ret < 0 ) {
526+ LOG_ERR ("Unexpected pipe write failure: %d" , ret );
527+ return - EIO ;
528+ }
539529
540- /* Check if exact size fits in remaining pipe buffer space */
541- if (total_bytes_sent + total_size > CONFIG_APP_STORAGE_BATCH_BUFFER_SIZE ) {
542- /* Pipe buffer full - stop here without consuming data */
543- LOG_DBG ("Pipe buffer full" );
530+ state_object -> current_session .items_sent ++ ;
544531
545- state_object -> current_session .more_data = true;
532+ LOG_DBG ("Pipe populated for session 0x%X: with %s item (%zu bytes)" ,
533+ state_object -> current_session .session_id ,
534+ type -> name , total_size );
546535
547- break ;
548- }
536+ return 0 ;
537+ }
549538
550- /* Now that we know it fits, retrieve the data from backend */
551- ret = backend -> retrieve (type , data , STORAGE_MAX_DATA_SIZE );
552- if (ret < 0 ) {
553- LOG_ERR ("Failed to retrieve %s data after peek: %d" ,
554- type -> name , ret );
539+ return - ENODATA ;
540+ }
555541
556- return - EIO ;
557- }
542+ /* Consume the confirmed-sent item from the backend and peek the next one into the pipe.
543+ * Returns false if the session_id does not match the active session (message ignored).
544+ */
545+ static bool handle_batch_consume (struct storage_state * state_object ,
546+ const struct storage_msg * msg )
547+ {
548+ const struct storage_backend * backend = storage_backend_get ();
549+ uint8_t discard [STORAGE_MAX_DATA_SIZE ];
550+ int ret ;
558551
559- /* Sanity check: retrieved size should match peeked size */
560- __ASSERT_NO_MSG (ret == (int )header -> data_size );
552+ if (msg -> session_id != state_object -> current_session .session_id ) {
553+ LOG_WRN ("CONSUME session ID mismatch: 0x%X (current: 0x%X), ignoring" ,
554+ msg -> session_id , state_object -> current_session .session_id );
555+ return false;
556+ }
561557
562- /* Write combined buffer atomically to pipe */
563- ret = pipe_write_all (& storage_pipe , item_buffer , total_size , PUB_TIMEOUT );
558+ /* Remove the confirmed-sent item from the backend queue head */
559+ STRUCT_SECTION_FOREACH (storage_data , type ) {
560+ if (type -> data_type == msg -> data_type ) {
561+ ret = backend -> retrieve (type , discard , sizeof (discard ));
564562 if (ret < 0 ) {
565- /* This should never happen since we checked space above */
566- LOG_ERR ("Unexpected pipe write failure after space check: %d" , ret );
567-
568- return - EIO ;
563+ LOG_ERR ("Failed to consume item (type %d): %d" , msg -> data_type , ret );
564+ } else {
565+ LOG_DBG ("Consumed %s item from backend" , type -> name );
569566 }
570-
571- __ASSERT_NO_MSG (ret == (int )total_size );
572-
573- /* Update session progress and byte tracking */
574- state_object -> current_session .items_sent ++ ;
575- total_bytes_sent += total_size ;
576-
577- count -- ;
567+ break ;
578568 }
579569 }
580570
581- LOG_DBG ("Batch population complete for session 0x%X: %zu/%zu items" ,
582- state_object -> current_session .session_id ,
583- state_object -> current_session .items_sent ,
584- state_object -> current_session .total_items );
571+ /* Peek the next item into the pipe for the consumer */
572+ ret = populate_pipe (state_object );
573+ if (ret == - ENODATA ) {
574+ LOG_DBG ("All items consumed, pipe empty" );
575+ } else if (ret < 0 ) {
576+ LOG_ERR ("Failed to populate pipe after consume: %d" , ret );
577+ }
585578
586- return 0 ;
579+ return true ;
587580}
588581
589582/* Start a new batch session.
@@ -633,10 +626,10 @@ static int start_batch_session(struct storage_state *state_object,
633626 return err ;
634627 }
635628
636- /* Success - pipe populated (fully or partially) */
629+ /* Success, one item in pipe, report total available to consumer */
637630 send_batch_available_response (request_msg -> session_id ,
638- state_object -> current_session . items_sent ,
639- state_object -> current_session . more_data );
631+ total_items ,
632+ false );
640633
641634 LOG_DBG ("Started batch session (session_id 0x%X), %zu items in batch (%zu total)" ,
642635 state_object -> current_session .session_id ,
@@ -881,6 +874,13 @@ static enum smf_state_result state_buffer_pipe_active_run(void *o)
881874
882875 return SMF_EVENT_HANDLED ;
883876
877+ case STORAGE_BATCH_CONSUME :
878+ if (handle_batch_consume (state_object , msg )) {
879+ k_work_reschedule (& state_object -> session_timeout_work ,
880+ K_SECONDS (STORAGE_SESSION_TIMEOUT_SECONDS ));
881+ }
882+ return SMF_EVENT_HANDLED ;
883+
884884 case STORAGE_BATCH_REQUEST : {
885885 int err ;
886886
0 commit comments