Skip to content

Commit a1af4aa

Browse files
authored
Improved atomicity; added additional stress tests & validation for file storage. (#343)
* Adding more stressful tests for debugging. * Remove unused onFinish closure. * Updated references to atomic * Some platform specific fixes. * some linux fixes * Modified directory store to validate prior to rename. * Reduced test size * wait for flushes to finish out ... * Combined linux/tvos/watchos skips.
1 parent 3e8b5b0 commit a1af4aa

17 files changed

+241
-74
lines changed

Examples/other_plugins/IDFACollection.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ extension IDFACollection: iOSLifecycle {
7878
func applicationDidBecomeActive(application: UIApplication?) {
7979
let status = ATTrackingManager.trackingAuthorizationStatus
8080

81-
_alreadyAsked.withValue { alreadyAsked in
81+
_alreadyAsked.mutate { alreadyAsked in
8282
if status == .notDetermined && !alreadyAsked {
8383
// we don't know, so should ask the user.
8484
alreadyAsked = true

Sources/Segment/Analytics.swift

+7-3
Original file line numberDiff line numberDiff line change
@@ -423,12 +423,16 @@ extension Analytics {
423423
}
424424

425425
internal static func addActiveWriteKey(_ writeKey: String) {
426-
Self.activeWriteKeys.append(writeKey)
426+
Self._activeWriteKeys.mutate { keys in
427+
keys.append(writeKey)
428+
}
427429
}
428430

429431
internal static func removeActiveWriteKey(_ writeKey: String) {
430-
Self.activeWriteKeys.removeAll { key in
431-
writeKey == key
432+
Self._activeWriteKeys.mutate { keys in
433+
keys.removeAll { key in
434+
writeKey == key
435+
}
432436
}
433437
}
434438
}

Sources/Segment/Plugins/Platforms/Mac/macOSLifecycleEvents.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class macOSLifecycleEvents: PlatformPlugin, macOSLifecycle {
2727
func application(didFinishLaunchingWithOptions launchOptions: [String : Any]?) {
2828
// Make sure we aren't double calling application:didFinishLaunchingWithOptions
2929
// by resetting the check at the start
30-
didFinishLaunching = true
30+
_didFinishLaunching.set(true)
3131

3232
if analytics?.configuration.values.trackApplicationLifecycleEvents == false {
3333
return

Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -348,17 +348,17 @@ internal class ConnectionMonitor {
348348
SCNetworkReachabilityCreateWithAddress(nil, zeroSockAddress)
349349
}
350350
}) else {
351-
connectionStatus = .unknown
351+
_connectionStatus.set(.unknown)
352352
return
353353
}
354354

355355
var flags : SCNetworkReachabilityFlags = []
356356
if !SCNetworkReachabilityGetFlags(defaultRouteReachability, &flags) {
357-
connectionStatus = .unknown
357+
_connectionStatus.set(.unknown)
358358
return
359359
}
360360

361-
connectionStatus = ConnectionStatus(reachabilityFlags: flags)
361+
_connectionStatus.set(ConnectionStatus(reachabilityFlags: flags))
362362
}
363363
}
364364

Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class iOSLifecycleEvents: PlatformPlugin, iOSLifecycle {
2828

2929
// Make sure we aren't double calling application:didFinishLaunchingWithOptions
3030
// by resetting the check at the start
31-
didFinishLaunching = true
31+
_didFinishLaunching.set(true)
3232

3333
if analytics?.configuration.values.trackApplicationLifecycleEvents == false {
3434
return
@@ -88,7 +88,7 @@ class iOSLifecycleEvents: PlatformPlugin, iOSLifecycle {
8888
}
8989

9090
func applicationDidEnterBackground(application: UIApplication?) {
91-
didFinishLaunching = false
91+
_didFinishLaunching.set(false)
9292
if analytics?.configuration.values.trackApplicationLifecycleEvents == false {
9393
return
9494
}

Sources/Segment/Plugins/SegmentDestination.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
116116
guard let storage = self.storage else { return }
117117
// Send Event to File System
118118
storage.write(.events, value: event)
119-
self._eventCount.withValue { count in
119+
self._eventCount.mutate { count in
120120
count += 1
121121
}
122122
}
@@ -135,7 +135,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
135135
// don't flush if analytics is disabled.
136136
guard analytics.enabled == true else { return }
137137

138-
eventCount = 0
138+
_eventCount.set(0)
139139
cleanupUploads()
140140

141141
let type = storage.dataStore.transactionType

Sources/Segment/Plugins/StartupQueue.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class StartupQueue: Plugin, Subscriber {
4747

4848
extension StartupQueue {
4949
internal func runningUpdate(state: System) {
50-
running = state.running
50+
_running.set(state.running)
5151
if state.running {
5252
replayEvents()
5353
}

Sources/Segment/Utilities/Atomic.swift

+66-20
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,80 @@
77

88
import Foundation
99

10-
// NOTE: Revised from previous implementation which used a struct and NSLock's.
11-
// Thread Sanitizer was *correctly* capturing this issue, which was a little obscure
12-
// given the property wrapper PLUS the semantics of a struct. Moving to `class`
13-
// removes the semantics problem and lets TSan approve of what's happening.
14-
//
15-
// Additionally, moving to a lock free version is just desirable, so moved to a queue.
16-
//
17-
// Also see thread here: https://github.com/apple/swift-evolution/pull/1387
10+
/*
11+
Revised the implementation yet again. Tiziano Coriano noticed that this wrapper
12+
can be misleading about it's atomicity. A single set would be atomic, but a compound
13+
operation like += would cause an atomic read, and a separate atomic write, in which
14+
point another thread could've changed the value we're now working off of.
15+
16+
This implementation removes the ability to set wrappedValue, and callers now must use
17+
the set() or mutate() functions explicitly to ensure a proper atomic mutation.
18+
19+
The use of a dispatch queue was also removed in favor of an unfair lock (yes, it's
20+
implemented correctly).
21+
*/
1822

1923
@propertyWrapper
2024
public class Atomic<T> {
21-
private var value: T
22-
private let queue = DispatchQueue(label: "com.segment.atomic.\(UUID().uuidString)")
23-
25+
#if os(Linux)
26+
let swiftLock: NSLock
27+
#else
28+
internal typealias os_unfair_lock_t = UnsafeMutablePointer<os_unfair_lock_s>
29+
internal var unfairLock: os_unfair_lock_t
30+
#endif
31+
32+
internal var value: T
33+
2434
public init(wrappedValue value: T) {
35+
#if os(Linux)
36+
self.swiftLock = NSLock()
37+
#else
38+
self.unfairLock = UnsafeMutablePointer<os_unfair_lock_s>.allocate(capacity: 1)
39+
self.unfairLock.initialize(to: os_unfair_lock())
40+
#endif
2541
self.value = value
2642
}
27-
43+
44+
deinit {
45+
#if !os(Linux)
46+
unfairLock.deallocate()
47+
#endif
48+
}
49+
2850
public var wrappedValue: T {
29-
get { return queue.sync { return value } }
30-
set { queue.sync { value = newValue } }
51+
get {
52+
lock()
53+
defer { unlock() }
54+
return value
55+
}
56+
// set is not allowed, use set() or mutate()
3157
}
58+
59+
public func set(_ newValue: T) {
60+
mutate { $0 = newValue }
61+
}
62+
63+
public func mutate(_ mutation: (inout T) -> Void) {
64+
lock()
65+
defer { unlock() }
66+
mutation(&value)
67+
}
68+
}
3269

33-
@discardableResult
34-
public func withValue(_ operation: (inout T) -> Void) -> T {
35-
queue.sync {
36-
operation(&self.value)
37-
return self.value
38-
}
70+
extension Atomic {
71+
internal func lock() {
72+
#if os(Linux)
73+
swiftLock.lock()
74+
#else
75+
os_unfair_lock_lock(unfairLock)
76+
#endif
77+
}
78+
79+
internal func unlock() {
80+
#if os(Linux)
81+
swiftLock.unlock()
82+
#else
83+
os_unfair_lock_unlock(unfairLock)
84+
#endif
3985
}
4086
}

Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ public class CountBasedFlushPolicy: FlushPolicy {
3737
}
3838

3939
public func updateState(event: RawEvent) {
40-
_count.withValue { value in
40+
_count.mutate { value in
4141
value += 1
4242
}
4343
}
4444

4545
public func reset() {
46-
count = 0
46+
_count.set(0)
4747
}
4848
}

Sources/Segment/Utilities/QueueTimer.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,15 @@ internal class QueueTimer {
5757
if state == .suspended {
5858
return
5959
}
60-
state = .suspended
60+
_state.set(.suspended)
6161
timer.suspend()
6262
}
6363

6464
func resume() {
6565
if state == .resumed {
6666
return
6767
}
68-
state = .resumed
68+
_state.set(.resumed)
6969
timer.resume()
7070
}
7171
}

Sources/Segment/Utilities/Storage/Storage.swift

-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ internal class Storage: Subscriber {
1313
let userDefaults: UserDefaults
1414
static let MAXFILESIZE = 475000 // Server accepts max 500k per batch
1515

16-
internal var onFinish: ((URL) -> Void)? = nil
1716
internal weak var analytics: Analytics? = nil
1817

1918
internal let dataStore: TransientDB

Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift

+7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import Foundation
99

1010
public class DirectoryStore: DataStore {
11+
internal static var fileValidator: ((URL) -> Void)? = nil
12+
1113
public typealias StoreConfiguration = Configuration
1214

1315
public struct Configuration {
@@ -174,6 +176,11 @@ extension DirectoryStore {
174176
try? writer.writeLine(fileEnding)
175177

176178
let url = writer.url
179+
180+
// do validation before we rename to prevent the file disappearing out from under us.
181+
DirectoryStore.fileValidator?(url)
182+
183+
// move it to make availble for flushing ...
177184
let newURL = url.appendingPathExtension(Self.tempExtension)
178185
try? FileManager.default.moveItem(at: url, to: newURL)
179186
self.writer = nil

Sources/Segment/Utilities/UserAgent.swift

+7-2
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,18 @@ internal struct UserAgent {
3535
private static let defaultWebKitAppName = ""
3636
#endif
3737

38-
internal static var _value: String = ""
38+
@Atomic internal static var _value: String = ""
39+
internal static let lock = NSLock()
3940

4041
public static var value: String {
42+
lock.lock()
43+
defer { lock.unlock() }
44+
4145
if _value.isEmpty {
42-
_value = value(applicationName: defaultWebKitAppName)
46+
__value.set(value(applicationName: defaultWebKitAppName))
4347
}
4448
return _value
49+
//return "someUserAgent"
4550
}
4651

4752
private static func version() -> String {

Tests/Segment-Tests/Atomic_Tests.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ final class Atomic_Tests: XCTestCase {
1313
// `queue.sync { counter = oldValue + 1 }`
1414
// And the threads are free to suspend in between the two calls to `queue.sync`.
1515

16-
_counter.withValue { value in
16+
_counter.mutate { value in
1717
value += 1
1818
}
1919
}

Tests/Segment-Tests/FlushPolicy_Tests.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class FlushPolicyTests: XCTestCase {
142142
RunLoop.main.run(until: Date.distantPast)
143143
if analytics.pendingUploads!.count > 0 {
144144
// flush was triggered
145-
flushSent = true
145+
_flushSent.set(true)
146146
}
147147
}
148148

Tests/Segment-Tests/Storage_Tests.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ class StorageTests: XCTestCase {
290290
@Atomic var done = false
291291
analytics.flush {
292292
print("flush completed")
293-
done = true
293+
_done.set(true)
294294
}
295295

296296
while !done {

0 commit comments

Comments
 (0)