Skip to content

Commit 44a095c

Browse files
committed
Use an AtomicInt32 to count pendingUnitCount instead of using AsyncQueue
Adding an item to `AsyncQueue<Serial>` is linear in the number of pending queue items, thus adding n items to an `AsyncQueue` before any can execute is in O(n^2). This decision was made intentionally because the primary use case for `AsyncQueue` was to track pending LSP requests, of which we don’t expect to have too many pending requests at any given time. `SourceKitIndexDelegate` was also using `AsyncQueue` to track the number of pending units to be processed and eg. after indexing SourceKit-LSP, I have seen this grow up to ~20,000. With the quadratic behavior, this explodes time-wise. Turns out that we don’t actually need to use a queue here at all, an atomic is sufficient and much faster. Independently, we should consider mitigating the quadratic behavior of `AsyncQueue<Serial>` or `AsyncQueue` in general. Fixes #1541 rdar://130844901
1 parent 8b3275e commit 44a095c

File tree

3 files changed

+68
-29
lines changed

3 files changed

+68
-29
lines changed

Sources/CAtomics/include/CAtomics.h

+26
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,30 @@ static inline void atomic_uint32_destroy(CAtomicUInt32 *_Nonnull atomic) {
4444
free(atomic);
4545
}
4646

47+
typedef struct {
48+
_Atomic(int32_t) value;
49+
} CAtomicInt32;
50+
51+
static inline CAtomicInt32 *_Nonnull atomic_int32_create(int32_t initialValue) {
52+
CAtomicInt32 *atomic = malloc(sizeof(CAtomicInt32));
53+
atomic->value = initialValue;
54+
return atomic;
55+
}
56+
57+
static inline int32_t atomic_int32_get(CAtomicInt32 *_Nonnull atomic) {
58+
return atomic->value;
59+
}
60+
61+
static inline void atomic_int32_set(CAtomicInt32 *_Nonnull atomic, int32_t newValue) {
62+
atomic->value = newValue;
63+
}
64+
65+
static inline int32_t atomic_int32_fetch_and_increment(CAtomicInt32 *_Nonnull atomic) {
66+
return atomic->value++;
67+
}
68+
69+
static inline void atomic_int32_destroy(CAtomicInt32 *_Nonnull atomic) {
70+
free(atomic);
71+
}
72+
4773
#endif // SOURCEKITLSP_CATOMICS_H

Sources/SKSupport/Atomics.swift

+25
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,28 @@ public final class AtomicUInt32: Sendable {
8282
return atomic_uint32_fetch_and_increment(atomic)
8383
}
8484
}
85+
86+
public final class AtomicInt32: Sendable {
87+
private nonisolated(unsafe) let atomic: UnsafeMutablePointer<CAtomicInt32>
88+
89+
public init(initialValue: Int32) {
90+
self.atomic = atomic_int32_create(initialValue)
91+
}
92+
93+
public var value: Int32 {
94+
get {
95+
atomic_int32_get(atomic)
96+
}
97+
set {
98+
atomic_int32_set(atomic, newValue)
99+
}
100+
}
101+
102+
deinit {
103+
atomic_int32_destroy(atomic)
104+
}
105+
106+
public func fetchAndIncrement() -> Int32 {
107+
return atomic_int32_fetch_and_increment(atomic)
108+
}
109+
}

Sources/SourceKitLSP/SourceKitIndexDelegate.swift

+17-29
Original file line numberDiff line numberDiff line change
@@ -19,60 +19,48 @@ import SwiftExtensions
1919

2020
/// `IndexDelegate` for the SourceKit workspace.
2121
actor SourceKitIndexDelegate: IndexDelegate {
22-
23-
let queue = AsyncQueue<Serial>()
24-
2522
/// Registered `MainFilesDelegate`s to notify when main files change.
2623
var mainFilesChangedCallbacks: [@Sendable () async -> Void] = []
2724

2825
/// The count of pending unit events. Whenever this transitions to 0, it represents a time where
2926
/// the index finished processing known events. Of course, that may have already changed by the
3027
/// time we are notified.
31-
var pendingUnitCount: Int = 0
28+
let pendingUnitCount = AtomicInt32(initialValue: 0)
3229

3330
public init() {}
3431

3532
nonisolated public func processingAddedPending(_ count: Int) {
36-
queue.async {
37-
await self.addPending(count)
38-
}
39-
}
40-
41-
private func addPending(_ count: Int) {
42-
pendingUnitCount += count
33+
pendingUnitCount.value += Int32(count)
4334
}
4435

4536
nonisolated public func processingCompleted(_ count: Int) {
46-
queue.async {
47-
await self.processCompleted(count)
48-
}
49-
}
50-
51-
private func processCompleted(_ count: Int) {
52-
pendingUnitCount -= count
53-
if pendingUnitCount == 0 {
54-
indexChanged()
37+
pendingUnitCount.value -= Int32(count)
38+
if pendingUnitCount.value == 0 {
39+
Task {
40+
await indexChanged()
41+
}
5542
}
5643

57-
if pendingUnitCount < 0 {
58-
assertionFailure("pendingUnitCount = \(pendingUnitCount) < 0")
59-
pendingUnitCount = 0
60-
indexChanged()
44+
if pendingUnitCount.value < 0 {
45+
// Technically this is not data race safe because `pendingUnitCount` might change between the check and us setting
46+
// it to 0. But then, this should never happen anyway, so it's fine.
47+
logger.fault("pendingUnitCount dropped below zero: \(self.pendingUnitCount.value)")
48+
pendingUnitCount.value = 0
49+
Task {
50+
await indexChanged()
51+
}
6152
}
6253
}
6354

64-
private func indexChanged() {
55+
private func indexChanged() async {
6556
logger.debug("IndexStoreDB changed")
6657
for callback in mainFilesChangedCallbacks {
67-
queue.async {
68-
await callback()
69-
}
58+
await callback()
7059
}
7160
}
7261

7362
/// Register a delegate to receive notifications when main files change.
7463
public func addMainFileChangedCallback(_ callback: @escaping @Sendable () async -> Void) {
7564
mainFilesChangedCallbacks.append(callback)
7665
}
77-
7866
}

0 commit comments

Comments
 (0)