-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathatomic_queue.c
81 lines (68 loc) · 2.83 KB
/
atomic_queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/*
* Copyright (C) 2024 Mikhail Burakov. This file is part of receiver.
*
* receiver is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* receiver is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with receiver. If not, see <https://www.gnu.org/licenses/>.
*/
#include "atomic_queue.h"
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
static size_t min(size_t a, size_t b) { return a < b ? a : b; }
static size_t min3(size_t a, size_t b, size_t c) { return min(min(a, b), c); }
bool AtomicQueueCreate(struct AtomicQueue* atomic_queue, size_t alloc) {
void* buffer = malloc(alloc);
if (!buffer) return false;
*atomic_queue = (struct AtomicQueue){
.buffer = buffer,
.alloc = alloc,
};
atomic_init(&atomic_queue->size, 0);
return true;
}
size_t AtomicQueueWrite(struct AtomicQueue* atomic_queue, const void* buffer,
size_t size) {
size_t capacity =
atomic_queue->alloc -
atomic_load_explicit(&atomic_queue->size, memory_order_acquire);
size_t tail_size = atomic_queue->alloc - atomic_queue->write;
size_t copy_size = min3(size, capacity, tail_size);
memcpy((uint8_t*)atomic_queue->buffer + atomic_queue->write, buffer,
copy_size);
size_t offset = copy_size;
copy_size = min(size - copy_size, capacity - copy_size);
memcpy(atomic_queue->buffer, (const uint8_t*)buffer + offset, copy_size);
offset += copy_size;
atomic_queue->write = (atomic_queue->write + offset) % atomic_queue->alloc;
atomic_fetch_add_explicit(&atomic_queue->size, offset, memory_order_release);
return offset;
}
size_t AtomicQueueRead(struct AtomicQueue* atomic_queue, void* buffer,
size_t size) {
size_t avail =
atomic_load_explicit(&atomic_queue->size, memory_order_acquire);
size_t tail_size = atomic_queue->alloc - atomic_queue->read;
size_t copy_size = min3(size, avail, tail_size);
memcpy(buffer, (const uint8_t*)atomic_queue->buffer + atomic_queue->read,
copy_size);
size_t offset = copy_size;
copy_size = min(size - copy_size, avail - copy_size);
memcpy((uint8_t*)buffer + offset, atomic_queue->buffer, copy_size);
offset += copy_size;
atomic_queue->read = (atomic_queue->read + offset) % atomic_queue->alloc;
atomic_fetch_sub_explicit(&atomic_queue->size, offset, memory_order_release);
return offset;
}
void AtomicQueueDestroy(struct AtomicQueue* atomic_queue) {
free(atomic_queue->buffer);
}