Skip to content

Commit 2f9d3f1

Browse files
authored
Queue optimizations (#29)
* QUEUE_NO_LOCK * Respect flush requests when QUEUE_PEEK_THRESHOLD mode
1 parent 27eacba commit 2f9d3f1

File tree

7 files changed

+297
-147
lines changed

7 files changed

+297
-147
lines changed

examples/tokio_demo/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ const CALPAGE1: CalPage1 = CalPage1 {
9393
// Stop after 5s
9494
// Trigger a global event when any task starts or stops
9595
// Trigger a thread local event, in each loop
96-
// Once the A2L registry is created on XCP client connect, tli events and variable instances are fixed and addional instances are not visible
96+
// Once the A2L registry is created on XCP client connect, tli events and variable instances are fixed and additional instances are not visible
9797
// Tokio occasionally creates new worker threads and destroys old ones very late, so the number of instances may change
9898

9999
#[allow(dead_code)]
@@ -227,7 +227,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
227227
counter = 0;
228228
}
229229

230-
// Triger the measurement event "task"
230+
// Trigger the measurement event "task"
231231
// The measurement event timestamp is taken here and captured data is sent to CANape
232232
event.trigger();
233233
}

tests/test_multi_thread.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ use xcp_test_executor::test_executor;
2525
const TEST_CAL: xcp_test_executor::TestModeCal = xcp_test_executor::TestModeCal::Cal; // Execute calibration tests: Cal or None
2626
const TEST_DAQ: xcp_test_executor::TestModeDaq = xcp_test_executor::TestModeDaq::DaqMultiThread; // Execute measurement tests: MultiThreadDAQ or None
2727

28-
const TEST_TASK_COUNT: usize = 50; // Number of test tasks to create
28+
const TEST_TASK_COUNT: usize = 50; // Number of test tasks (threads) to create
2929
const TEST_SIGNAL_COUNT: usize = 32; // Number of signals is TEST_SIGNAL_COUNT + 5 for each task
3030
const TEST_DURATION_MS: u64 = 10 * 1000; // Stop after TEST_DURATION_MS milliseconds
31-
const TEST_CYCLE_TIME_US: u32 = 250; // Cycle time in microseconds
32-
const TEST_QUEUE_SIZE: u32 = 1024 * 1024; // Size of the XCP server transmit queue in Bytes
31+
const TEST_CYCLE_TIME_US: u32 = 100; // Cycle time in microseconds
32+
const TEST_QUEUE_SIZE: u32 = 2 * 1024 * 1024; // Size of the XCP server transmit queue in Bytes
3333

3434
//-----------------------------------------------------------------------------
3535
// Calibration Segment

xcplib/src/a2l.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
#include "xcp_cfg.h" // for XCP_xxx
2828
#include "xcptl_cfg.h" // for XCPTL_xxx
2929

30+
MUTEX gA2lMutex;
31+
3032
static FILE *gA2lFile = NULL;
3133

3234
static bool gA2lUseTCP = false;
@@ -1048,6 +1050,7 @@ bool A2lInit(const char *a2l_filename, const char *a2l_projectname, const uint8_
10481050
memcpy(&gA2lOptionBindAddr, addr, 4);
10491051
gA2lOptionPort = port;
10501052
gA2lUseTCP = useTCP;
1053+
mutexInit(&gA2lMutex, false, 1000);
10511054

10521055
// Check if A2L file already exists and rename it to 'name.old' if it does
10531056
if (file_exists(a2l_filename)) {

xcplib/src/platform.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ bool socketStartup(void) {
404404
WSADATA wsaData;
405405

406406
// @@@@ TODO: Workaround for Windows
407-
mutexInit(&gWinMutex, true, 0);
407+
mutexInit(&gWinMutex, false, 1000);
408408

409409
// Init Winsock2
410410
wsaVersionRequested = MAKEWORD(2, 2);

xcplib/src/xcpEthServer.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ extern void *XcpServerTransmitThread(void *par)
185185
gXcpServer.TransmitThreadRunning = true;
186186
while (gXcpServer.TransmitThreadRunning) {
187187

188-
// Transmit all commmited messages from the transmit queue
188+
// Transmit all committed messages from the transmit queue
189189
n = XcpTlHandleTransmitQueue();
190190
if (n < 0) {
191191
DBG_PRINT_ERROR("XcpTlHandleTransmitQueue failed!\n");

xcplib/src/xcpQueue32.c

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ Transport Layer segment, message, packet:
5454
#define CACHE_LINE_SIZE 64u // Cache line size, used to align the queue entries and the queue header
5555

5656
typedef struct {
57-
uint16_t dlc; // lenght
57+
uint16_t dlc; // length
5858
uint16_t ctr; // message counter
5959
uint8_t packet[]; // packet
6060
} tXcpMessage;
@@ -63,7 +63,7 @@ static_assert(sizeof(tXcpMessage) == XCPTL_TRANSPORT_LAYER_HEADER_SIZE, "tXcpMes
6363

6464
typedef struct {
6565
uint32_t magic; // Magic number to identify the segment buffer
66-
uint16_t uncommited; // Number of uncommited messages in this segment
66+
uint16_t uncommitted; // Number of uncommitted messages in this segment
6767
uint16_t size; // Number of overall bytes in this segment
6868
uint8_t msg_buffer[XCPTL_MAX_SEGMENT_SIZE]; // Segment/UDP MTU - concatenated transport layer messages tXcpMessage
6969
} tXcpSegmentBuffer;
@@ -103,7 +103,7 @@ static void newSegmentBuffer(tQueue *queue) {
103103
i -= queue->queue_size;
104104
b = &queue->queue[i];
105105
b->size = 0;
106-
b->uncommited = 0;
106+
b->uncommitted = 0;
107107
queue->msg_ptr = b;
108108
queue->queue_len++;
109109
assert(queue->msg_ptr->magic == 0x12345678); // Check magic number
@@ -147,13 +147,13 @@ tQueueHandle QueueInit(uint32_t queue_buffer_size) {
147147
assert(queue->queue != NULL);
148148
for (uint32_t i = 0; i < queue->queue_size; i++) {
149149
queue->queue[i].magic = 0x12345678; // Magic number to identify the segment buffer
150-
queue->queue[i].uncommited = 0; // No uncommited messages
150+
queue->queue[i].uncommitted = 0; // No uncommitted messages
151151
queue->queue[i].size = 0; // No data in this segment
152152
}
153153

154154
DBG_PRINTF4("QueueInit: queue_buffer_size=%" PRIu32 ", queue_size=%" PRIu32 " (%" PRIu32 " Bytes)\n", queue->queue_buffer_size, queue->queue_size, queue->queue_buffer_size);
155155

156-
mutexInit(&queue->Mutex_Queue, 0, 1000);
156+
mutexInit(&queue->Mutex_Queue, false, 1000);
157157

158158
mutexLock(&queue->Mutex_Queue);
159159
queue->queue_rp = 0;
@@ -176,7 +176,6 @@ void QueueDeinit(tQueueHandle queueHandle) {
176176

177177
clearQueue(queue); // Clear the queue
178178

179-
//_aligned_free(queue->queue);
180179
free(queue->queue);
181180
queue->queue = NULL;
182181
queue->queue_buffer_size = 0;
@@ -224,8 +223,8 @@ tQueueBuffer QueueAcquire(tQueueHandle queueHandle, uint16_t packet_size) {
224223
p->ctr = 0xEEEE; // Reserved value, indicates that this message is not yet commited
225224
p->dlc = (uint16_t)packet_size;
226225
b->size = (uint16_t)(b->size + msg_size);
227-
b->uncommited++;
228-
DBG_PRINTF5("QueueAcquire: size=%" PRIu16 ", uncommited=%" PRIu16 "\n", b->size, b->uncommited);
226+
b->uncommitted++;
227+
DBG_PRINTF5("QueueAcquire: size=%" PRIu16 ", uncommitted=%" PRIu16 "\n", b->size, b->uncommitted);
229228
} else {
230229
// No segment buffer available, queue overflow
231230
queue->packets_lost++;
@@ -253,11 +252,11 @@ void QueuePush(tQueueHandle queueHandle, tQueueBuffer *const queueBuffer, bool f
253252

254253
tQueue *queue = (tQueue *)queueHandle;
255254

256-
DBG_PRINTF5("QueuePush: size=%" PRIu16 ", uncommited=%" PRIu16 "\n", queueBuffer->size, ((tXcpSegmentBuffer *)queueBuffer->handle)->uncommited);
255+
DBG_PRINTF5("QueuePush: size=%" PRIu16 ", uncommitted=%" PRIu16 "\n", queueBuffer->size, ((tXcpSegmentBuffer *)queueBuffer->handle)->uncommitted);
257256

258257
mutexLock(&queue->Mutex_Queue);
259258

260-
((tXcpSegmentBuffer *)queueBuffer->handle)->uncommited--;
259+
((tXcpSegmentBuffer *)queueBuffer->handle)->uncommitted--;
261260

262261
tXcpMessage *p = (tXcpMessage *)(queueBuffer->buffer - XCPTL_TRANSPORT_LAYER_HEADER_SIZE);
263262
assert(p->dlc > 0 && p->dlc <= XCPTL_MAX_DTO_SIZE);
@@ -317,7 +316,7 @@ tQueueBuffer QueuePeek(tQueueHandle queueHandle, bool flush, uint32_t *packets_l
317316
}
318317

319318
// Return tail segment buffer if it is not empty, fully committed and there are more segments in the queue
320-
if (!(queue->queue_len > 1 && b->uncommited == 0 && b->size > 0)) {
319+
if (!(queue->queue_len > 1 && b->uncommitted == 0 && b->size > 0)) {
321320
b = NULL;
322321
}
323322
}
@@ -366,7 +365,7 @@ void QueueRelease(tQueueHandle queueHandle, tQueueBuffer *const queueBuffer) {
366365

367366
DBG_PRINTF5("QueueRelease: size=%" PRIu16 "\n", queueBuffer->size);
368367

369-
// Free this segment buffer when succesfully sent
368+
// Free this segment buffer when successfully sent
370369
mutexLock(&queue->Mutex_Queue);
371370
if (++queue->queue_rp >= queue->queue_size)
372371
queue->queue_rp = 0;

0 commit comments

Comments
 (0)