Skip to content

Commit 9a1c2cc

Browse files
committed
use futex for IPC notification
1 parent 74074d6 commit 9a1c2cc

File tree

5 files changed

+106
-79
lines changed

5 files changed

+106
-79
lines changed

SConscript

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ msgq_objects = env.SharedObject([
1212
'msgq/impl_zmq.cc',
1313
'msgq/impl_msgq.cc',
1414
'msgq/impl_fake.cc',
15+
'msgq/futex.cc',
1516
'msgq/msgq.cc',
1617
])
1718
msgq = env.Library('msgq', msgq_objects)

msgq/futex.cc

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#include "msgq/futex.h"
2+
3+
#include <fcntl.h>
4+
#include <limits.h>
5+
#include <linux/futex.h>
6+
#include <stdio.h>
7+
#include <sys/mman.h>
8+
#include <syscall.h>
9+
#include <unistd.h>
10+
11+
#include <cassert>
12+
#include <stdexcept>
13+
14+
Futex::Futex(const std::string &path) {
15+
auto fd = open(path.c_str(), O_RDWR | O_CREAT, 0664);
16+
if (fd < 0) {
17+
throw std::runtime_error("Failed to open file: " + path);
18+
}
19+
20+
if (ftruncate(fd, sizeof(uint32_t)) < 0) {
21+
close(fd);
22+
throw std::runtime_error("Failed to truncate file: " + path);
23+
}
24+
25+
int *mem = (int *)mmap(NULL, sizeof(uint32_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
26+
close(fd);
27+
if (mem == MAP_FAILED) {
28+
throw std::runtime_error("Failed to mmap file: " + path);
29+
}
30+
31+
futex = reinterpret_cast<std::atomic<uint32_t> *>(mem);
32+
}
33+
34+
Futex::~Futex() {
35+
munmap(futex, sizeof(uint32_t));
36+
}
37+
38+
void Futex::broadcast() {
39+
// Increment the futex value to signal waiting threads
40+
futex->fetch_add(1, std::memory_order_relaxed);
41+
42+
// Wake up all threads waiting on the futex
43+
syscall(SYS_futex, futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
44+
}
45+
46+
bool Futex::wait(uint32_t expected, int timeout_ms) {
47+
if (futex->load(std::memory_order_relaxed) != expected) {
48+
return true; // Already not equal, no need to wait
49+
}
50+
51+
if (timeout_ms <= 0) {
52+
return false; // Timeout immediately
53+
}
54+
55+
// Perform the futex wait syscall
56+
struct timespec ts;
57+
ts.tv_sec = timeout_ms / 1000;
58+
ts.tv_nsec = (timeout_ms % 1000) * 1000 * 1000;
59+
syscall(SYS_futex, futex, FUTEX_WAIT, expected, &ts, nullptr, 0);
60+
61+
return futex->load(std::memory_order_relaxed) != expected;
62+
}

msgq/futex.h

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <atomic>
5+
#include <string>
6+
7+
8+
class Futex {
9+
public:
10+
Futex(const std::string &path);
11+
~Futex();
12+
void broadcast();
13+
bool wait(uint32_t expected, int timeout_ms);
14+
inline uint32_t value() const { return futex->load(); }
15+
16+
private:
17+
std::atomic<uint32_t> *futex = nullptr;
18+
};

msgq/impl_msgq.cc

+2-9
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ Message * MSGQSubSocket::receive(bool non_blocking){
8282
}
8383

8484
msgq_msg_t msg;
85-
8685
MSGQMessage *r = NULL;
8786

8887
int rc = msgq_msg_recv(&msg, q);
@@ -93,21 +92,15 @@ Message * MSGQSubSocket::receive(bool non_blocking){
9392
items[0].q = q;
9493

9594
int t = (timeout != -1) ? timeout : 100;
96-
97-
int n = msgq_poll(items, 1, t);
98-
rc = msgq_msg_recv(&msg, q);
99-
100-
// The poll indicated a message was ready, but the receive failed. Try again
101-
if (n == 1 && rc == 0){
102-
continue;
95+
if (msgq_poll(items, 1, t) > 0) {
96+
rc = msgq_msg_recv(&msg, q);
10397
}
10498

10599
if (timeout != -1){
106100
break;
107101
}
108102
}
109103

110-
111104
if (!non_blocking){
112105
std::signal(SIGINT, prev_handler_sigint);
113106
std::signal(SIGTERM, prev_handler_sigterm);

msgq/msgq.cc

+23-70
Original file line numberDiff line numberDiff line change
@@ -3,44 +3,25 @@
33
#include <cerrno>
44
#include <cmath>
55
#include <cstring>
6-
#include <cstdint>
76
#include <chrono>
87
#include <algorithm>
98
#include <cstdlib>
10-
#include <csignal>
119
#include <random>
1210
#include <string>
1311
#include <limits>
14-
15-
#include <poll.h>
16-
#include <sys/ioctl.h>
1712
#include <sys/mman.h>
18-
#include <sys/stat.h>
19-
#include <sys/types.h>
20-
#include <sys/syscall.h>
2113
#include <fcntl.h>
2214
#include <unistd.h>
2315

24-
#include <stdio.h>
25-
16+
#include "msgq/futex.h"
2617
#include "msgq/msgq.h"
2718

28-
void sigusr2_handler(int signal) {
29-
assert(signal == SIGUSR2);
30-
}
19+
Futex g_futex("/dev/shm/msgq_futex");
3120

32-
uint64_t msgq_get_uid(void){
21+
static uint64_t msgq_get_uid(void) {
3322
std::random_device rd("/dev/urandom");
34-
std::uniform_int_distribution<uint64_t> distribution(0, std::numeric_limits<uint32_t>::max());
35-
36-
#ifdef __APPLE__
37-
// TODO: this doesn't work
38-
uint64_t uid = distribution(rd) << 32 | getpid();
39-
#else
40-
uint64_t uid = distribution(rd) << 32 | syscall(SYS_gettid);
41-
#endif
42-
43-
return uid;
23+
std::uniform_int_distribution<uint64_t> distribution(1, std::numeric_limits<uint64_t>::max());
24+
return distribution(rd);
4425
}
4526

4627
int msgq_msg_init_size(msgq_msg_t * msg, size_t size){
@@ -85,7 +66,6 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){
8566

8667
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
8768
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
88-
std::signal(SIGUSR2, sigusr2_handler);
8969

9070
std::string full_path = "/dev/shm/";
9171
const char* prefix = std::getenv("OPENPILOT_PREFIX");
@@ -142,7 +122,6 @@ void msgq_close_queue(msgq_queue_t *q){
142122
}
143123
}
144124

145-
146125
void msgq_init_publisher(msgq_queue_t * q) {
147126
//std::cout << "Starting publisher" << std::endl;
148127
uint64_t uid = msgq_get_uid();
@@ -158,15 +137,6 @@ void msgq_init_publisher(msgq_queue_t * q) {
158137
q->write_uid_local = uid;
159138
}
160139

161-
static void thread_signal(uint32_t tid) {
162-
#ifndef SYS_tkill
163-
// TODO: this won't work for multithreaded programs
164-
kill(tid, SIGUSR2);
165-
#else
166-
syscall(SYS_tkill, tid, SIGUSR2);
167-
#endif
168-
}
169-
170140
void msgq_init_subscriber(msgq_queue_t * q) {
171141
assert(q != NULL);
172142
assert(q->num_readers != NULL);
@@ -185,14 +155,11 @@ void msgq_init_subscriber(msgq_queue_t * q) {
185155

186156
for (size_t i = 0; i < NUM_READERS; i++){
187157
*q->read_valids[i] = false;
188-
189-
uint64_t old_uid = *q->read_uids[i];
190158
*q->read_uids[i] = 0;
191-
192-
// Wake up reader in case they are in a poll
193-
thread_signal(old_uid & 0xFFFFFFFF);
194159
}
195160

161+
// Notify readers
162+
g_futex.broadcast();
196163
continue;
197164
}
198165

@@ -293,10 +260,7 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){
293260
PACK64(*q->write_pointer, write_cycles, new_ptr);
294261

295262
// Notify readers
296-
for (uint64_t i = 0; i < num_readers; i++){
297-
uint64_t reader_uid = *q->read_uids[i];
298-
thread_signal(reader_uid & 0xFFFFFFFF);
299-
}
263+
g_futex.broadcast();
300264

301265
return msg->size;
302266
}
@@ -414,42 +378,31 @@ int msgq_msg_recv(msgq_msg_t * msg, msgq_queue_t * q){
414378
goto start;
415379
}
416380

417-
418381
return msg->size;
419382
}
420383

421-
422-
423-
int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){
384+
int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout) {
424385
int num = 0;
386+
int timeout_ms = (timeout == -1) ? 100 : timeout;
387+
uint32_t current_futex_value = 0;
425388

426-
// Check if messages ready
427-
for (size_t i = 0; i < nitems; i++) {
428-
items[i].revents = msgq_msg_ready(items[i].q);
429-
if (items[i].revents) num++;
430-
}
431-
432-
int ms = (timeout == -1) ? 100 : timeout;
433-
struct timespec ts;
434-
ts.tv_sec = ms / 1000;
435-
ts.tv_nsec = (ms % 1000) * 1000 * 1000;
436-
437-
389+
auto start_time = std::chrono::high_resolution_clock::now();
438390
while (num == 0) {
439-
int ret;
440-
441-
ret = nanosleep(&ts, &ts);
391+
if (g_futex.wait(current_futex_value, timeout_ms)) {
392+
current_futex_value = g_futex.value();
442393

443-
// Check if messages ready
444-
for (size_t i = 0; i < nitems; i++) {
445-
if (items[i].revents == 0 && msgq_msg_ready(items[i].q)){
446-
num += 1;
447-
items[i].revents = 1;
394+
// Check if messages ready
395+
for (size_t i = 0; i < nitems; i++) {
396+
items[i].revents = msgq_msg_ready(items[i].q);
397+
if (items[i].revents) ++num;
448398
}
449399
}
450400

451-
// exit if we had a timeout and the sleep finished
452-
if (timeout != -1 && ret == 0){
401+
// Update the remaining timeout
402+
auto current_time = std::chrono::high_resolution_clock::now();
403+
timeout_ms -= std::chrono::duration_cast<std::chrono::milliseconds>(current_time - start_time).count();
404+
start_time = current_time;
405+
if (timeout_ms <= 0) {
453406
break;
454407
}
455408
}

0 commit comments

Comments
 (0)