Skip to content

Commit 961bff0

Browse files
committed
Remove atomic from shmem. Implement mutex for local writer locking
This controls concurrency on the event_detect writer side. The reader takes the risk of a torn read. Note that an architecural review of this, there is concurrence due to the low update frequency that the chances of a torn read are low, and atomics can't be implemented on C++03 code on some DC systems, and the overhead of using pthread_mutex_t is not worth it.
1 parent 37b7564 commit 961bff0

File tree

3 files changed

+36
-39
lines changed

3 files changed

+36
-39
lines changed

event_detect.cpp

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -80,20 +80,6 @@ SharedMemoryTimestampExporter g_shmem_exporter(SHMEM_NAME_CONFIG);
8080
//!
8181
std::atomic<bool> g_shm_initialized_successfully = false;
8282

83-
// Note this is to check that the data structures are the same between the std::atomic version of in64_t and the
84-
// raw version, because some DC platforms do not have access to std::atomic on their side, and so will read these
85-
// raw. Note that the chances of a torn read is very low, and on the write side, we are guaranteeing atomic writes
86-
// by the use of std::atomic.
87-
static_assert(sizeof(std::atomic<int64_t>) == sizeof(int64_t),
88-
"Size mismatch between atomic<int64_t> and int64_t");
89-
static_assert(alignof(std::atomic<int64_t>) == alignof(int64_t),
90-
"Alignment mismatch between atomic<int64_t> and int64_t");
91-
// Check the array specifically
92-
static_assert(sizeof(std::atomic<int64_t>[2]) == sizeof(int64_t[2]),
93-
"Size mismatch between atomic<int64_t>[2] and int64_t[2]");
94-
static_assert(alignof(std::atomic<int64_t>[2]) == alignof(int64_t[2]),
95-
"Alignment mismatch between atomic<int64_t>[2] and int64_t[2]");
96-
9783
// Class Monitor
9884

9985
Monitor::Monitor()
@@ -976,6 +962,9 @@ bool SharedMemoryTimestampExporter::CreateOrOpen(mode_t mode) {
976962
debug_log("INFO: %s: Initializing shared memory segment %s for atomic int64_t[2]", __func__, m_shm_name.c_str());
977963

978964
Cleanup(); // Ensure clean state
965+
966+
std::unique_lock<std::mutex> lock(mtx_shmem);
967+
979968
m_is_creator = false;
980969

981970
// 1. Create or open RW
@@ -1028,12 +1017,11 @@ bool SharedMemoryTimestampExporter::CreateOrOpen(mode_t mode) {
10281017
}
10291018

10301019
// 5. Store pointer, initialize if we created/resized it
1031-
m_mapped_ptr = static_cast<std::atomic<int64_t>*>(mapped_mem);
1020+
m_mapped_ptr = static_cast<int64_t*>(mapped_mem);
10321021
if (m_is_creator) {
1033-
int64_t initial_time = GetUnixEpochTime(); // Assumes GetUnixEpochTime() is available
1034-
// Initialize both elements atomically
1035-
m_mapped_ptr[0].store(initial_time, std::memory_order_relaxed); // update_time
1036-
m_mapped_ptr[1].store(initial_time, std::memory_order_relaxed); // last_active_time
1022+
int64_t initial_time = GetUnixEpochTime();
1023+
m_mapped_ptr[0] = initial_time; // update_time
1024+
m_mapped_ptr[1] = initial_time; // last_active_time
10371025
debug_log("INFO: %s: Shared memory segment %s initialized. update=%lld, last_active=%lld",
10381026
__func__, m_shm_name.c_str(), (long long)initial_time, (long long)initial_time);
10391027
} else {
@@ -1045,18 +1033,16 @@ bool SharedMemoryTimestampExporter::CreateOrOpen(mode_t mode) {
10451033
}
10461034

10471035
bool SharedMemoryTimestampExporter::UpdateTimestamps(int64_t update_time, int64_t last_active_time) {
1036+
std::unique_lock<std::mutex> lock(mtx_shmem);
1037+
10481038
if (!m_is_initialized.load() || m_mapped_ptr == nullptr) {
1049-
// Log periodically? Avoid flooding logs.
1050-
// static std::atomic<int> s_log_counter = 0;
1051-
// if (s_log_counter++ % 60 == 0) // Log once a minute maybe
1052-
// error_log("ERROR: %s: Cannot update timestamp, shared memory not initialized.", __func__);
10531039
return false;
10541040
}
10551041
// Use relaxed memory order: assumes readers don't need strict ordering
10561042
// relative to other non-atomic operations in this thread. Atomicity
10571043
// of the store itself is guaranteed.
1058-
m_mapped_ptr[0].store(update_time, std::memory_order_relaxed); // Store update_time at index 0
1059-
m_mapped_ptr[1].store(last_active_time, std::memory_order_relaxed); // Store last_active_time at index 1
1044+
m_mapped_ptr[0] = update_time; // Store update_time at index 0
1045+
m_mapped_ptr[1] = last_active_time; // Store last_active_time at index 1
10601046
return true;
10611047
}
10621048

@@ -1065,6 +1051,8 @@ bool SharedMemoryTimestampExporter::IsInitialized() const {
10651051
}
10661052

10671053
void SharedMemoryTimestampExporter::Cleanup() { // Renamed from Close
1054+
std::unique_lock<std::mutex> lock(mtx_shmem);
1055+
10681056
if (m_mapped_ptr != nullptr) {
10691057
debug_log("DEBUG: %s: Unmapping shared memory %s...", __func__, m_shm_name.c_str());
10701058
if (munmap(m_mapped_ptr, m_size) == -1) { // <-- Use correct size
@@ -1084,6 +1072,8 @@ void SharedMemoryTimestampExporter::Cleanup() { // Renamed from Close
10841072
}
10851073

10861074
bool SharedMemoryTimestampExporter::UnlinkSegment() {
1075+
std::unique_lock<std::mutex> lock(mtx_shmem);
1076+
10871077
debug_log("INFO: %s: Requesting unlink for shared memory %s...", __func__, m_shm_name.c_str());
10881078
errno = 0;
10891079
if (shm_unlink(m_shm_name.c_str()) == -1) {

event_detect.h

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ class IdleDetectMonitor
411411

412412
/**
413413
* @brief Manages a POSIX shared memory segment for exporting timestamps.
414-
* Stores an array of two atomic int64_t: {update_time, last_active_time}.
414+
* Stores an array of two int64_t: {update_time, last_active_time}.
415415
* Handles creation, mapping, updating, and cleanup via RAII.
416416
*/
417417
class SharedMemoryTimestampExporter {
@@ -435,15 +435,15 @@ class SharedMemoryTimestampExporter {
435435

436436
/**
437437
* @brief Creates (if necessary) and opens the shared memory segment,
438-
* sets its size (to sizeof(atomic<int64_t>[2])), and maps it for writing.
438+
* sets its size (to sizeof(int64_t[2])), and maps it for writing.
439439
* Must be called before UpdateTimestamps or IsInitialized.
440440
* @param mode Permissions (e.g., 0666 or 0660) to use if creating the segment.
441441
* @return True on success, false on any failure (shm_open, ftruncate, mmap).
442442
*/
443443
bool CreateOrOpen(mode_t mode = 0666);
444444

445445
/**
446-
* @brief Atomically updates both timestamps in the mapped shared memory.
446+
* @brief Updates both timestamps in the mapped shared memory.
447447
* @param update_time The timestamp of the current update cycle.
448448
* @param last_active_time The calculated overall last active time.
449449
* @return True if updated successfully, false if not initialized or pointer is invalid.
@@ -470,10 +470,19 @@ class SharedMemoryTimestampExporter {
470470
*/
471471
void Cleanup();
472472

473+
//!
474+
//! \brief This protects against multiple threads in the event_detect process from simultaneously accessing
475+
//! and writing to the shared memory segment. It does NOT protect from another process encountering a torn
476+
//! read. I would prefer to use a pthread_mutex_t in the shmem data structure and manege the mutex across
477+
//! the writer and reader, but the BOINC architect does not believe this is necessary in practice given the small
478+
//! size and low frequency of writing and reading.
479+
//!
480+
std::mutex mtx_shmem;
481+
473482
std::string m_shm_name;
474483
int m_shm_fd;
475-
std::atomic<int64_t>* m_mapped_ptr; // Pointer to the start of the atomic int64_t[2] array
476-
const size_t m_size; // Size of the atomic int64_t[2] array
484+
int64_t* m_mapped_ptr; // Pointer to the start of the int64_t[2] array
485+
const size_t m_size; // Size of the int64_t[2] array
477486
bool m_is_creator; // Did this instance create/resize the segment?
478487
std::atomic<bool> m_is_initialized;
479488
};

read_shmem_timestamps.cpp

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
*/
66

77
#include <string>
8-
#include <atomic>
98
#include <cstdint>
109
#include <cstring> // For strerror
1110
#include <iostream> // Keep for final std::cout output
@@ -22,7 +21,7 @@
2221

2322
// Define constants for the shared memory layout
2423
const char* DEFAULT_SHMEM_NAME = "/idle_detect_shmem";
25-
const size_t SHMEM_SIZE = sizeof(std::atomic<int64_t>[2]); // Size of the array
24+
const size_t SHMEM_SIZE = sizeof(int64_t[2]); // Size of the array
2625

2726
int main(int argc, char* argv[]) {
2827
// --- Argument Parsing ---
@@ -53,7 +52,7 @@ int main(int argc, char* argv[]) {
5352
// --- Read Shared Memory ---
5453
int shm_fd = -1;
5554
void* mapped_mem = MAP_FAILED;
56-
std::atomic<int64_t>* shm_atomic_ptr = nullptr;
55+
int64_t* shm_ptr = nullptr;
5756
int64_t update_time = -1;
5857
int64_t last_active_time = -1;
5958
bool read_success = false;
@@ -77,21 +76,20 @@ int main(int argc, char* argv[]) {
7776
}
7877

7978
// --- Access Data ---
80-
shm_atomic_ptr = static_cast<std::atomic<int64_t>*>(mapped_mem);
81-
// Atomically load both values from the array
82-
update_time = shm_atomic_ptr[0].load(std::memory_order_relaxed);
83-
last_active_time = shm_atomic_ptr[1].load(std::memory_order_relaxed);
79+
shm_ptr = static_cast<int64_t*>(mapped_mem);
80+
update_time = shm_ptr[0];
81+
last_active_time = shm_ptr[1];
8482
read_success = true;
8583

8684
// Unmap memory
8785
errno = 0;
88-
if (munmap(mapped_mem, sizeof(std::atomic<int64_t>)) == -1) {
86+
if (munmap(mapped_mem, SHMEM_SIZE) == -1) {
8987
log("WARN: %s: munmap failed for shm '%s': %s (%d)",
9088
__func__, shm_name, strerror(errno), errno);
9189
// Continue as we already have the value
9290
}
9391
mapped_mem = nullptr;
94-
shm_atomic_ptr = nullptr;
92+
shm_ptr = nullptr;
9593

9694
// --- Unmap ---
9795
errno = 0;

0 commit comments

Comments
 (0)