Skip to content

Commit 7a3ad4f

Browse files
app: storage + cloud: Keep data in storage until cloud confirms sent
Modify storage module to keep data in the backend until cloud confirms that an item has been sent. This allows for better handling of failed sends, as the data will still be available in storage for retrying. Signed-off-by: Trond F. Christiansen <trond.christiansen@nordicsemi.no>
1 parent 3fd73d0 commit 7a3ad4f

8 files changed

Lines changed: 417 additions & 116 deletions

File tree

app/src/modules/cloud/cloud.c

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,27 @@ static int request_storage_batch_data(uint32_t session_id)
525525
return 0;
526526
}
527527

528+
static bool handle_send_error(int err, const struct storage_data_item *item)
529+
{
530+
/* Non-network errors (ENOTSUP, EINVAL, ENODATA) indicate malformed or
531+
* unsupported data that will never succeed.
532+
*/
533+
if (err == -ENOTSUP || err == -EINVAL || err == -ENODATA) {
534+
LOG_ERR("Data error sending storage data (type %d), "
535+
"dropping item: %d",
536+
item->type, err);
537+
538+
return false;
539+
}
540+
541+
/* Network/transport error */
542+
LOG_ERR("Network error sending storage data (type %d), "
543+
"aborting session: %d",
544+
item->type, err);
545+
546+
return true;
547+
}
548+
528549
static void handle_storage_batch_available(const struct storage_msg *msg)
529550
{
530551
int err;
@@ -536,10 +557,22 @@ static void handle_storage_batch_available(const struct storage_msg *msg)
536557
.type = STORAGE_BATCH_CLOSE,
537558
.session_id = session_id,
538559
};
560+
struct storage_msg consume_msg = {
561+
.type = STORAGE_BATCH_CONSUME,
562+
.session_id = session_id,
563+
};
539564
bool session_error = false;
540565

541566
LOG_INF("Processing storage batch: %u items available", items_available);
542567

568+
/* Suppress delivery of storage_chan messages back to cloud_subscriber while we
569+
* are blocking in this loop.
570+
*/
571+
err = zbus_obs_set_chan_notification_mask(&cloud_subscriber, &storage_chan, true);
572+
if (err) {
573+
LOG_WRN("Failed to mask storage_chan for cloud_subscriber, error: %d", err);
574+
}
575+
543576
/* Drain the batch buffer: read until timeout, abort on hard error */
544577
while (!session_error) {
545578
err = storage_batch_read(&item, K_MSEC(500));
@@ -557,14 +590,34 @@ static void handle_storage_batch_available(const struct storage_msg *msg)
557590

558591
err = send_storage_data_to_cloud(&item);
559592
if (err) {
560-
LOG_ERR("Failed to send storage data to cloud, error: %d", err);
593+
session_error = handle_send_error(err, &item);
594+
if (session_error) {
595+
/* Network error, abort */
596+
continue;
597+
}
598+
/* For data errors, fall through to consume the item and prevent retrying
599+
* it in the next session.
600+
*/
601+
} else {
602+
items_processed++;
561603
}
562604

563-
items_processed++;
605+
/* Consume the item: confirms a successful send or skips a malformed item */
606+
consume_msg.data_type = item.type;
607+
err = zbus_chan_pub(&storage_chan, &consume_msg, PUB_TIMEOUT);
608+
if (err) {
609+
LOG_ERR("Failed to consume storage item, error: %d", err);
610+
}
564611
}
565612

566613
LOG_DBG("Processed %u/%u storage items", items_processed, items_available);
567614

615+
/* Re-enable storage_chan notifications to cloud_subscriber */
616+
err = zbus_obs_set_chan_notification_mask(&cloud_subscriber, &storage_chan, false);
617+
if (err) {
618+
LOG_WRN("Failed to unmask storage_chan for cloud_subscriber, error: %d", err);
619+
}
620+
568621
if (!session_error && msg->more_data) {
569622
LOG_DBG("More data available in batch, requesting next batch");
570623

app/src/modules/storage/Kconfig.storage

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ config APP_STORAGE_BATCH_BUFFER_SIZE
8888

8989
config APP_STORAGE_SESSION_TIMEOUT_SECONDS
9090
int "Storage batch session timeout"
91-
default 60
91+
default 30
9292
help
9393
Timeout for an active storage batch session. If no activity
9494
occurs on the session for this duration, it will be automatically

app/src/modules/storage/storage.c

Lines changed: 80 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -480,110 +480,101 @@ static void send_batch_available_response(uint32_t session_id, size_t item_count
480480
send_batch_response(STORAGE_BATCH_AVAILABLE, session_id, item_count, more_available);
481481
}
482482

483-
/* Populate the pipe with all stored data
483+
/* Populate the pipe with the next pending item.
484484
*
485-
* @return 0 on success (all data written or pipe full)
486-
* @return -EIO on data retrieval or size validation error
485+
* Peeks the head item of the first non-empty type and writes it to the pipe.
486+
* The item is NOT removed from the backend; that only happens on STORAGE_BATCH_CONSUME.
487+
*
488+
* @return 0 on success (one item written to pipe)
489+
* @return -ENODATA if no items are available across all types
490+
* @return -EIO on peek or pipe write error
487491
*/
488492
static int populate_pipe(struct storage_state *state_object)
489493
{
490494
const struct storage_backend *backend = storage_backend_get();
491-
size_t total_bytes_sent = 0;
492495

493496
state_object->current_session.more_data = false;
494497

495-
/* Populate pipe with all stored data */
496498
STRUCT_SECTION_FOREACH(storage_data, type) {
497-
int count = backend->count(type);
498-
499-
while (count > 0) {
500-
int ret;
501-
size_t total_size;
502-
uint8_t item_buffer[sizeof(struct storage_pipe_header) +
503-
STORAGE_MAX_DATA_SIZE];
504-
struct storage_pipe_header *header =
505-
(struct storage_pipe_header *)item_buffer;
506-
uint8_t *data = item_buffer + sizeof(struct storage_pipe_header);
507-
508-
/* Peek at size without copying data (data = NULL) */
509-
ret = backend->peek(type, NULL, 0);
510-
if (ret == -EAGAIN) {
511-
/* No more data of this type */
512-
break;
513-
} else if (ret < 0) {
514-
LOG_ERR("Failed to peek %s data size: %d", type->name, ret);
515-
516-
return -EIO;
517-
}
518-
519-
/* Prepare header with actual size */
520-
header->type = (uint8_t)type->data_type;
521-
522-
if ((ret < 0) || (ret > (int)STORAGE_MAX_DATA_SIZE) ||
523-
(ret > UINT16_MAX)) {
524-
LOG_ERR("Invalid data size for header: %d", ret);
525-
526-
return -EIO;
527-
}
499+
int ret;
500+
uint8_t item_buffer[sizeof(struct storage_pipe_header) + STORAGE_MAX_DATA_SIZE];
501+
struct storage_pipe_header *header = (struct storage_pipe_header *)item_buffer;
502+
uint8_t *data = item_buffer + sizeof(struct storage_pipe_header);
503+
size_t total_size;
504+
505+
/* Peek the head item */
506+
ret = backend->peek(type, data, STORAGE_MAX_DATA_SIZE);
507+
if (ret == -EAGAIN) {
508+
/* No items of this type, try next */
509+
continue;
510+
} else if (ret < 0) {
511+
LOG_ERR("Failed to peek %s data: %d", type->name, ret);
512+
return -EIO;
513+
}
528514

529-
header->data_size = (uint16_t)ret;
515+
if (ret > UINT16_MAX) {
516+
LOG_ERR("Peek returned oversized item for %s: %d", type->name, ret);
517+
return -EIO;
518+
}
530519

531-
/* Calculate exact total size needed using actual data size */
532-
total_size = sizeof(struct storage_pipe_header) + ret;
533-
if (total_size > sizeof(item_buffer)) {
534-
LOG_ERR("Combined data too large: %zu > %zu",
535-
total_size, sizeof(item_buffer));
520+
header->type = (uint8_t)type->data_type;
521+
header->data_size = (uint16_t)ret;
522+
total_size = sizeof(struct storage_pipe_header) + (size_t)ret;
536523

537-
return -EIO;
538-
}
524+
ret = pipe_write_all(&storage_pipe, item_buffer, total_size, PUB_TIMEOUT);
525+
if (ret < 0) {
526+
LOG_ERR("Unexpected pipe write failure: %d", ret);
527+
return -EIO;
528+
}
539529

540-
/* Check if exact size fits in remaining pipe buffer space */
541-
if (total_bytes_sent + total_size > CONFIG_APP_STORAGE_BATCH_BUFFER_SIZE) {
542-
/* Pipe buffer full - stop here without consuming data */
543-
LOG_DBG("Pipe buffer full");
530+
state_object->current_session.items_sent++;
544531

545-
state_object->current_session.more_data = true;
532+
LOG_DBG("Pipe populated for session 0x%X: with %s item (%zu bytes)",
533+
state_object->current_session.session_id,
534+
type->name, total_size);
546535

547-
break;
548-
}
536+
return 0;
537+
}
549538

550-
/* Now that we know it fits, retrieve the data from backend */
551-
ret = backend->retrieve(type, data, STORAGE_MAX_DATA_SIZE);
552-
if (ret < 0) {
553-
LOG_ERR("Failed to retrieve %s data after peek: %d",
554-
type->name, ret);
539+
return -ENODATA;
540+
}
555541

556-
return -EIO;
557-
}
542+
/* Consume the confirmed-sent item from the backend and peek the next one into the pipe. */
543+
static bool handle_batch_consume(struct storage_state *state_object,
544+
const struct storage_msg *msg)
545+
{
546+
const struct storage_backend *backend = storage_backend_get();
547+
uint8_t discard[STORAGE_MAX_DATA_SIZE];
548+
int ret;
558549

559-
/* Sanity check: retrieved size should match peeked size */
560-
__ASSERT_NO_MSG(ret == (int)header->data_size);
550+
if (msg->session_id != state_object->current_session.session_id) {
551+
LOG_WRN("CONSUME session ID mismatch: 0x%X (current: 0x%X), ignoring",
552+
msg->session_id, state_object->current_session.session_id);
553+
return false;
554+
}
561555

562-
/* Write combined buffer atomically to pipe */
563-
ret = pipe_write_all(&storage_pipe, item_buffer, total_size, PUB_TIMEOUT);
556+
/* Remove the confirmed-sent item from the backend queue head */
557+
STRUCT_SECTION_FOREACH(storage_data, type) {
558+
if (type->data_type == msg->data_type) {
559+
ret = backend->retrieve(type, discard, sizeof(discard));
564560
if (ret < 0) {
565-
/* This should never happen since we checked space above */
566-
LOG_ERR("Unexpected pipe write failure after space check: %d", ret);
567-
568-
return -EIO;
561+
LOG_ERR("Failed to consume item (type %d): %d", msg->data_type, ret);
562+
} else {
563+
LOG_DBG("Consumed %s item from backend", type->name);
569564
}
570-
571-
__ASSERT_NO_MSG(ret == (int)total_size);
572-
573-
/* Update session progress and byte tracking */
574-
state_object->current_session.items_sent++;
575-
total_bytes_sent += total_size;
576-
577-
count--;
565+
break;
578566
}
579567
}
580568

581-
LOG_DBG("Batch population complete for session 0x%X: %zu/%zu items",
582-
state_object->current_session.session_id,
583-
state_object->current_session.items_sent,
584-
state_object->current_session.total_items);
569+
/* Peek the next item into the pipe for the consumer */
570+
ret = populate_pipe(state_object);
571+
if (ret == -ENODATA) {
572+
LOG_DBG("All items consumed, pipe empty");
573+
} else if (ret < 0) {
574+
LOG_ERR("Failed to populate pipe after consume: %d", ret);
575+
}
585576

586-
return 0;
577+
return true;
587578
}
588579

589580
/* Start a new batch session.
@@ -633,10 +624,10 @@ static int start_batch_session(struct storage_state *state_object,
633624
return err;
634625
}
635626

636-
/* Success - pipe populated (fully or partially) */
627+
/* Success, one item in pipe, report total available to consumer */
637628
send_batch_available_response(request_msg->session_id,
638-
state_object->current_session.items_sent,
639-
state_object->current_session.more_data);
629+
total_items,
630+
false);
640631

641632
LOG_DBG("Started batch session (session_id 0x%X), %zu items in batch (%zu total)",
642633
state_object->current_session.session_id,
@@ -881,6 +872,13 @@ static enum smf_state_result state_buffer_pipe_active_run(void *o)
881872

882873
return SMF_EVENT_HANDLED;
883874

875+
case STORAGE_BATCH_CONSUME:
876+
if (handle_batch_consume(state_object, msg)) {
877+
k_work_reschedule(&state_object->session_timeout_work,
878+
K_SECONDS(STORAGE_SESSION_TIMEOUT_SECONDS));
879+
}
880+
return SMF_EVENT_HANDLED;
881+
884882
case STORAGE_BATCH_REQUEST: {
885883
int err;
886884

app/src/modules/storage/storage.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ enum storage_msg_type {
8282

8383
/* Batch is busy - cannot process request at this time. */
8484
STORAGE_BATCH_BUSY,
85+
86+
/* Confirm that a batch item was successfully sent to cloud.
87+
* Must contain `data_type` identifying the type of the item just sent.
88+
* Storage removes the item from the backend queue head and makes the next
89+
* item available in the pipe. Only valid during an active batch session.
90+
*/
91+
STORAGE_BATCH_CONSUME,
8592
};
8693

8794
/**

0 commit comments

Comments
 (0)