Skip to content

Commit b597955

Browse files
committed
use futex for IPC notification
1 parent 74074d6 commit b597955

File tree

5 files changed

+103
-67
lines changed

5 files changed

+103
-67
lines changed

SConscript

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ msgq_objects = env.SharedObject([
1212
'msgq/impl_zmq.cc',
1313
'msgq/impl_msgq.cc',
1414
'msgq/impl_fake.cc',
15+
'msgq/futex.cc',
1516
'msgq/msgq.cc',
1617
])
1718
msgq = env.Library('msgq', msgq_objects)

msgq/futex.cc

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#include "msgq/futex.h"
2+
3+
#include <fcntl.h>
4+
#include <limits.h>
5+
#include <linux/futex.h>
6+
#include <stdio.h>
7+
#include <sys/mman.h>
8+
#include <syscall.h>
9+
#include <unistd.h>
10+
11+
#include <cassert>
12+
#include <stdexcept>
13+
14+
Futex::Futex(const std::string &path) {
15+
auto fd = open(path.c_str(), O_RDWR | O_CREAT, 0664);
16+
if (fd < 0) {
17+
throw std::runtime_error("Failed to open file: " + path);
18+
}
19+
20+
if (ftruncate(fd, sizeof(uint32_t)) < 0) {
21+
close(fd);
22+
throw std::runtime_error("Failed to truncate file: " + path);
23+
}
24+
25+
int *mem = (int *)mmap(NULL, sizeof(uint32_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
26+
close(fd);
27+
if (mem == MAP_FAILED) {
28+
throw std::runtime_error("Failed to mmap file: " + path);
29+
}
30+
31+
futex = reinterpret_cast<std::atomic<uint32_t> *>(mem);
32+
}
33+
34+
Futex::~Futex() {
35+
munmap(futex, sizeof(uint32_t));
36+
}
37+
38+
void Futex::broadcast() {
39+
// Increment the futex value to signal waiting threads
40+
futex->fetch_add(1, std::memory_order_relaxed);
41+
42+
// Wake up all threads waiting on the futex
43+
syscall(SYS_futex, futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
44+
}
45+
46+
bool Futex::wait(uint32_t expected, int timeout_ms) {
47+
if (futex->load(std::memory_order_relaxed) != expected) {
48+
return true; // Already not equal, no need to wait
49+
}
50+
51+
if (timeout_ms <= 0) {
52+
return false; // Timeout immediately
53+
}
54+
55+
// Perform the futex wait syscall
56+
struct timespec ts;
57+
ts.tv_sec = timeout_ms / 1000;
58+
ts.tv_nsec = (timeout_ms % 1000) * 1000 * 1000;
59+
syscall(SYS_futex, futex, FUTEX_WAIT, expected, &ts, nullptr, 0);
60+
61+
return futex->load(std::memory_order_relaxed) != expected;
62+
}

msgq/futex.h

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <atomic>
5+
#include <string>
6+
7+
8+
class Futex {
9+
public:
10+
Futex(const std::string &path);
11+
~Futex();
12+
void broadcast();
13+
bool wait(uint32_t expected, int timeout_ms);
14+
inline uint32_t value() const { return futex->load(); }
15+
16+
private:
17+
std::atomic<uint32_t> *futex = nullptr;
18+
};

msgq/impl_msgq.cc

+2-9
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ Message * MSGQSubSocket::receive(bool non_blocking){
8282
}
8383

8484
msgq_msg_t msg;
85-
8685
MSGQMessage *r = NULL;
8786

8887
int rc = msgq_msg_recv(&msg, q);
@@ -93,21 +92,15 @@ Message * MSGQSubSocket::receive(bool non_blocking){
9392
items[0].q = q;
9493

9594
int t = (timeout != -1) ? timeout : 100;
96-
97-
int n = msgq_poll(items, 1, t);
98-
rc = msgq_msg_recv(&msg, q);
99-
100-
// The poll indicated a message was ready, but the receive failed. Try again
101-
if (n == 1 && rc == 0){
102-
continue;
95+
if (msgq_poll(items, 1, t) > 0) {
96+
rc = msgq_msg_recv(&msg, q);
10397
}
10498

10599
if (timeout != -1){
106100
break;
107101
}
108102
}
109103

110-
111104
if (!non_blocking){
112105
std::signal(SIGINT, prev_handler_sigint);
113106
std::signal(SIGTERM, prev_handler_sigterm);

msgq/msgq.cc

+20-58
Original file line numberDiff line numberDiff line change
@@ -3,31 +3,21 @@
33
#include <cerrno>
44
#include <cmath>
55
#include <cstring>
6-
#include <cstdint>
76
#include <chrono>
87
#include <algorithm>
98
#include <cstdlib>
10-
#include <csignal>
119
#include <random>
1210
#include <string>
1311
#include <limits>
14-
15-
#include <poll.h>
16-
#include <sys/ioctl.h>
1712
#include <sys/mman.h>
18-
#include <sys/stat.h>
19-
#include <sys/types.h>
2013
#include <sys/syscall.h>
2114
#include <fcntl.h>
2215
#include <unistd.h>
2316

24-
#include <stdio.h>
25-
17+
#include "msgq/futex.h"
2618
#include "msgq/msgq.h"
2719

28-
void sigusr2_handler(int signal) {
29-
assert(signal == SIGUSR2);
30-
}
20+
Futex g_futex("/dev/shm/msgq_futex");
3121

3222
uint64_t msgq_get_uid(void){
3323
std::random_device rd("/dev/urandom");
@@ -85,7 +75,6 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){
8575

8676
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
8777
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
88-
std::signal(SIGUSR2, sigusr2_handler);
8978

9079
std::string full_path = "/dev/shm/";
9180
const char* prefix = std::getenv("OPENPILOT_PREFIX");
@@ -142,7 +131,6 @@ void msgq_close_queue(msgq_queue_t *q){
142131
}
143132
}
144133

145-
146134
void msgq_init_publisher(msgq_queue_t * q) {
147135
//std::cout << "Starting publisher" << std::endl;
148136
uint64_t uid = msgq_get_uid();
@@ -158,15 +146,6 @@ void msgq_init_publisher(msgq_queue_t * q) {
158146
q->write_uid_local = uid;
159147
}
160148

161-
static void thread_signal(uint32_t tid) {
162-
#ifndef SYS_tkill
163-
// TODO: this won't work for multithreaded programs
164-
kill(tid, SIGUSR2);
165-
#else
166-
syscall(SYS_tkill, tid, SIGUSR2);
167-
#endif
168-
}
169-
170149
void msgq_init_subscriber(msgq_queue_t * q) {
171150
assert(q != NULL);
172151
assert(q->num_readers != NULL);
@@ -185,14 +164,11 @@ void msgq_init_subscriber(msgq_queue_t * q) {
185164

186165
for (size_t i = 0; i < NUM_READERS; i++){
187166
*q->read_valids[i] = false;
188-
189-
uint64_t old_uid = *q->read_uids[i];
190167
*q->read_uids[i] = 0;
191-
192-
// Wake up reader in case they are in a poll
193-
thread_signal(old_uid & 0xFFFFFFFF);
194168
}
195169

170+
// Notify readers
171+
g_futex.broadcast();
196172
continue;
197173
}
198174

@@ -293,10 +269,7 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){
293269
PACK64(*q->write_pointer, write_cycles, new_ptr);
294270

295271
// Notify readers
296-
for (uint64_t i = 0; i < num_readers; i++){
297-
uint64_t reader_uid = *q->read_uids[i];
298-
thread_signal(reader_uid & 0xFFFFFFFF);
299-
}
272+
g_futex.broadcast();
300273

301274
return msg->size;
302275
}
@@ -414,42 +387,31 @@ int msgq_msg_recv(msgq_msg_t * msg, msgq_queue_t * q){
414387
goto start;
415388
}
416389

417-
418390
return msg->size;
419391
}
420392

421-
422-
423-
int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){
393+
int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout) {
424394
int num = 0;
395+
int timeout_ms = (timeout == -1) ? 100 : timeout;
396+
uint32_t current_futex_value = 0;
425397

426-
// Check if messages ready
427-
for (size_t i = 0; i < nitems; i++) {
428-
items[i].revents = msgq_msg_ready(items[i].q);
429-
if (items[i].revents) num++;
430-
}
431-
432-
int ms = (timeout == -1) ? 100 : timeout;
433-
struct timespec ts;
434-
ts.tv_sec = ms / 1000;
435-
ts.tv_nsec = (ms % 1000) * 1000 * 1000;
436-
437-
398+
auto start_time = std::chrono::high_resolution_clock::now();
438399
while (num == 0) {
439-
int ret;
440-
441-
ret = nanosleep(&ts, &ts);
400+
if (g_futex.wait(current_futex_value, timeout_ms)) {
401+
current_futex_value = g_futex.value();
442402

443-
// Check if messages ready
444-
for (size_t i = 0; i < nitems; i++) {
445-
if (items[i].revents == 0 && msgq_msg_ready(items[i].q)){
446-
num += 1;
447-
items[i].revents = 1;
403+
// Check if messages ready
404+
for (size_t i = 0; i < nitems; i++) {
405+
items[i].revents = msgq_msg_ready(items[i].q);
406+
if (items[i].revents) ++num;
448407
}
449408
}
450409

451-
// exit if we had a timeout and the sleep finished
452-
if (timeout != -1 && ret == 0){
410+
// Update the remaining timeout
411+
auto current_time = std::chrono::high_resolution_clock::now();
412+
timeout_ms -= std::chrono::duration_cast<std::chrono::milliseconds>(current_time - start_time).count();
413+
start_time = current_time;
414+
if (timeout_ms <= 0) {
453415
break;
454416
}
455417
}

0 commit comments

Comments
 (0)