Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 94 additions & 101 deletions omi/firmware/devkit/src/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -499,46 +499,54 @@ static struct ring_buf ring_buf;

static bool write_to_tx_queue(uint8_t *data, size_t size)
{
if (size > CODEC_OUTPUT_MAX_BYTES) {
return false;
if(size <= CODEC_OUTPUT_MAX_BYTES) {
size_t remained_space = ring_buf_space_get(&ring_buf);

if(remained_space >= (CODEC_OUTPUT_MAX_BYTES + RING_BUFFER_HEADER_SIZE)) {
// Copy data (TODO: Avoid this copy)
tx_buffer_2[0] = size & 0xFF;
tx_buffer_2[1] = (size >> 8) & 0xFF;
memcpy(tx_buffer_2 + RING_BUFFER_HEADER_SIZE, data, size);

// Write to ring buffer
int written =
ring_buf_put(&ring_buf,
tx_buffer_2,
(CODEC_OUTPUT_MAX_BYTES + RING_BUFFER_HEADER_SIZE)); // It always fits completely or not at all
if (written != CODEC_OUTPUT_MAX_BYTES + RING_BUFFER_HEADER_SIZE) {
return false;
} else {
return true;
}
} else {
LOG_ERR("RingBuf Overflow %d|%d", remained_space, size);
}
}

// Copy data (TODO: Avoid this copy)
tx_buffer_2[0] = size & 0xFF;
tx_buffer_2[1] = (size >> 8) & 0xFF;
memcpy(tx_buffer_2 + RING_BUFFER_HEADER_SIZE, data, size);

// Write to ring buffer
int written =
ring_buf_put(&ring_buf,
tx_buffer_2,
(CODEC_OUTPUT_MAX_BYTES + RING_BUFFER_HEADER_SIZE)); // It always fits completely or not at all
if (written != CODEC_OUTPUT_MAX_BYTES + RING_BUFFER_HEADER_SIZE) {
return false;
} else {
return true;
}
return false;
}

static bool read_from_tx_queue()
{
static bool read_from_tx_queue() {
// Check RingBuf available before getting it
if(ring_buf_size_get(&ring_buf) >= (CODEC_OUTPUT_MAX_BYTES + RING_BUFFER_HEADER_SIZE)) {
// Read from ring buffer
tx_buffer_size =
ring_buf_get(&ring_buf,
tx_buffer,
(CODEC_OUTPUT_MAX_BYTES + RING_BUFFER_HEADER_SIZE)); // It always fits completely or not at all
if (tx_buffer_size != (CODEC_OUTPUT_MAX_BYTES + RING_BUFFER_HEADER_SIZE)) {
LOG_ERR("Failed to read from ring buffer. not enough data %d", tx_buffer_size);
return false;
}

// Read from ring buffer
// memset(tx_buffer, 0, sizeof(tx_buffer));
tx_buffer_size =
ring_buf_get(&ring_buf,
tx_buffer,
(CODEC_OUTPUT_MAX_BYTES + RING_BUFFER_HEADER_SIZE)); // It always fits completely or not at all
if (tx_buffer_size != (CODEC_OUTPUT_MAX_BYTES + RING_BUFFER_HEADER_SIZE)) {
LOG_ERR("Failed to read from ring buffer. not enough data %d", tx_buffer_size);
return false;
// Adjust size
tx_buffer_size = tx_buffer[0] + (tx_buffer[1] << 8);
return true;
} else {
LOG_DBG("Waiting for enough data");
}

// Adjust size
tx_buffer_size = tx_buffer[0] + (tx_buffer[1] << 8);
// LOG_PRINTK("tx_buffer_size %d\n",tx_buffer_size);

return true;
return false;
}

//
Expand All @@ -550,101 +558,86 @@ K_THREAD_STACK_DEFINE(pusher_stack, 4096);
static struct k_thread pusher_thread;
static uint16_t packet_next_index = 0;
static uint8_t pusher_temp_data[CODEC_OUTPUT_MAX_BYTES + NET_BUFFER_HEADER_SIZE];
static uint8_t pusher_temp_data_len = 0;

static bool flush_gatt_buffer(struct bt_conn *conn)
{
if (pusher_temp_data_len == 0) {
return true; // nothing to send
}

uint32_t id = packet_next_index++;
static uint8_t index = 0;

// Fill header
pusher_temp_data[0] = id & 0xFF;
pusher_temp_data[1] = (id >> 8) & 0xFF;
pusher_temp_data[2] = index++;

uint32_t send_len = pusher_temp_data_len + NET_BUFFER_HEADER_SIZE;

LOG_INF("push_to_gatt %d | mtu %d", send_len, current_mtu);

int err = bt_gatt_notify(conn, &audio_service.attrs[1],
pusher_temp_data, send_len);

if (err) {
LOG_WRN("bt_gatt_notify failed (%d)", err);
return false; // do not clear buffer → retry later
}

// Only clear on success
pusher_temp_data_len = 0;
return true;
}

static bool push_to_gatt(struct bt_conn *conn)
{
// Read data from ring buffer
// Read one chunk from ring buffer into tx_buffer
if (!read_from_tx_queue()) {
return false;
}

// Push each frame
uint8_t *buffer = tx_buffer + RING_BUFFER_HEADER_SIZE;
uint32_t offset = 0;
uint8_t index = 0;
int retry_count = 0;
const int max_retries = 3;
uint8_t *payload = tx_buffer + RING_BUFFER_HEADER_SIZE;
// Actual len already update so we don't need subtract with HEADER
uint32_t payload_len = tx_buffer_size;
uint32_t max_payload = current_mtu - NET_BUFFER_HEADER_SIZE;

while (offset < tx_buffer_size) {
// Recombine packet
uint32_t id = packet_next_index++;
uint32_t packet_size = MIN(current_mtu - NET_BUFFER_HEADER_SIZE, tx_buffer_size - offset);
pusher_temp_data[0] = id & 0xFF;
pusher_temp_data[1] = (id >> 8) & 0xFF;
pusher_temp_data[2] = index;
memcpy(pusher_temp_data + NET_BUFFER_HEADER_SIZE, buffer + offset, packet_size);

offset += packet_size;
index++;

retry_count = 0;
while (retry_count < max_retries) {
// Try send notification
int err =
bt_gatt_notify(conn, &audio_service.attrs[1], pusher_temp_data, packet_size + NET_BUFFER_HEADER_SIZE);

// Log failure
if (err) {
LOG_DBG("bt_gatt_notify failed (err %d)", err);
LOG_DBG("MTU: %d, packet_size: %d", current_mtu, packet_size + NET_BUFFER_HEADER_SIZE);
k_sleep(K_MSEC(1));
retry_count++;
continue;
}
// If incoming chunk does not fit, flush what we have now
if (pusher_temp_data_len + payload_len > max_payload) {
if (!flush_gatt_buffer(conn)) {
return false; // preserve data and retry later
}
}

// Try to send more data if possible
if (err == -EAGAIN || err == -ENOMEM) {
retry_count++;
continue;
}
// Now append data safely (guaranteed to fit)
memcpy(pusher_temp_data + NET_BUFFER_HEADER_SIZE + pusher_temp_data_len,
payload, payload_len);

// Break if success
break;
}
pusher_temp_data_len += payload_len;

if (retry_count >= max_retries) {
LOG_ERR("Failed to send packet after %d retries", max_retries);
return false;
}
//If full, need to flush immediately
if (pusher_temp_data_len >= max_payload) {
return flush_gatt_buffer(conn);
}

return true;
}

#define OPUS_PREFIX_LENGTH 1
#define OPUS_PADDED_LENGTH 80
#define MAX_WRITE_SIZE 440
static uint8_t storage_temp_data[MAX_WRITE_SIZE];
static uint32_t offset = 0;
static uint16_t buffer_offset = 0;
// bool write_to_storage(void)
// {
// if (!read_from_tx_queue())
// {
// return false;
// }

// uint8_t *buffer = tx_buffer+2;
// const uint32_t packet_size = tx_buffer_size;
// //load into write at 400 bytes at a time. is faster
// memcpy(storage_temp_data + OPUS_PREFIX_LENGTH + buffer_offset, buffer, packet_size);
// storage_temp_data[buffer_offset] = (uint8_t)tx_buffer_size;

// buffer_offset = buffer_offset+OPUS_PADDED_LENGTH;
// if(buffer_offset >= OPUS_PADDED_LENGTH*5) {
// uint8_t *write_ptr = (uint8_t*)storage_temp_data;
// write_to_file(write_ptr,OPUS_PADDED_LENGTH*5);

// buffer_offset = 0;
// }

// return true;
// }

// for improving ble bandwidth
bool write_to_storage(void)
{ // max possible packing
if (!read_from_tx_queue()) {
return false;
}
LOG_DBG("write_to_storage %d bytes", tx_buffer_size);

uint8_t *buffer = tx_buffer + 2;
uint8_t packet_size = (uint8_t) (tx_buffer_size + OPUS_PREFIX_LENGTH);
Expand Down
5 changes: 4 additions & 1 deletion omi/firmware/devkit/src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,8 @@
LOG_ERR("Error at %s:%d:%d", __FILE__, __LINE__, result); \
return -1; \
}

#define ASSERT_DEV(result) \
if (!result) { \
__asm("bkpt"); \
}
#endif