From e75f0b90e78b2ca7a01a3d109b1ed73591fa7ad7 Mon Sep 17 00:00:00 2001 From: Brandon Sneed Date: Thu, 9 May 2024 10:50:09 -0700 Subject: [PATCH] Updated references to atomic --- Sources/Segment/Analytics.swift | 10 ++- .../Platforms/Vendors/AppleUtils.swift | 6 +- .../Platforms/iOS/iOSLifecycleEvents.swift | 4 +- .../Segment/Plugins/SegmentDestination.swift | 4 +- Sources/Segment/Plugins/StartupQueue.swift | 2 +- Sources/Segment/Utilities/Atomic.swift | 68 +++++++++++++------ .../Policies/CountBasedFlushPolicy.swift | 4 +- Sources/Segment/Utilities/QueueTimer.swift | 4 +- Sources/Segment/Utilities/UserAgent.swift | 9 ++- Tests/Segment-Tests/Atomic_Tests.swift | 2 +- Tests/Segment-Tests/FlushPolicy_Tests.swift | 2 +- Tests/Segment-Tests/Storage_Tests.swift | 2 +- Tests/Segment-Tests/StressTests.swift | 68 ++++++------------- 13 files changed, 98 insertions(+), 87 deletions(-) diff --git a/Sources/Segment/Analytics.swift b/Sources/Segment/Analytics.swift index 85dd400c..031ae6a4 100644 --- a/Sources/Segment/Analytics.swift +++ b/Sources/Segment/Analytics.swift @@ -423,12 +423,16 @@ extension Analytics { } internal static func addActiveWriteKey(_ writeKey: String) { - Self.activeWriteKeys.append(writeKey) + Self._activeWriteKeys.mutate { keys in + keys.append(writeKey) + } } internal static func removeActiveWriteKey(_ writeKey: String) { - Self.activeWriteKeys.removeAll { key in - writeKey == key + Self._activeWriteKeys.mutate { keys in + keys.removeAll { key in + writeKey == key + } } } } diff --git a/Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift b/Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift index 901b5ace..0c5106a2 100644 --- a/Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift +++ b/Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift @@ -348,17 +348,17 @@ internal class ConnectionMonitor { SCNetworkReachabilityCreateWithAddress(nil, zeroSockAddress) } }) else { - connectionStatus = .unknown + _connectionStatus.set(.unknown) return } var flags : SCNetworkReachabilityFlags = [] if !SCNetworkReachabilityGetFlags(defaultRouteReachability, &flags) { - connectionStatus = .unknown + _connectionStatus.set(.unknown) return } - connectionStatus = ConnectionStatus(reachabilityFlags: flags) + _connectionStatus.set(ConnectionStatus(reachabilityFlags: flags)) } } diff --git a/Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift b/Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift index edfb91d7..dcc340e5 100644 --- a/Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift +++ b/Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift @@ -28,7 +28,7 @@ class iOSLifecycleEvents: PlatformPlugin, iOSLifecycle { // Make sure we aren't double calling application:didFinishLaunchingWithOptions // by resetting the check at the start - didFinishLaunching = true + _didFinishLaunching.set(true) if analytics?.configuration.values.trackApplicationLifecycleEvents == false { return @@ -88,7 +88,7 @@ class iOSLifecycleEvents: PlatformPlugin, iOSLifecycle { } func applicationDidEnterBackground(application: UIApplication?) { - didFinishLaunching = false + _didFinishLaunching.set(false) if analytics?.configuration.values.trackApplicationLifecycleEvents == false { return } diff --git a/Sources/Segment/Plugins/SegmentDestination.swift b/Sources/Segment/Plugins/SegmentDestination.swift index 1ab2260f..8a741dcf 100644 --- a/Sources/Segment/Plugins/SegmentDestination.swift +++ b/Sources/Segment/Plugins/SegmentDestination.swift @@ -116,7 +116,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion guard let storage = self.storage else { return } // Send Event to File System storage.write(.events, value: event) - self._eventCount.withValue { count in + self._eventCount.mutate { count in count += 1 } } @@ -135,7 +135,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion // don't flush if analytics is disabled. guard analytics.enabled == true else { return } - eventCount = 0 + _eventCount.set(0) cleanupUploads() let type = storage.dataStore.transactionType diff --git a/Sources/Segment/Plugins/StartupQueue.swift b/Sources/Segment/Plugins/StartupQueue.swift index 8f316f8e..6e7a3479 100644 --- a/Sources/Segment/Plugins/StartupQueue.swift +++ b/Sources/Segment/Plugins/StartupQueue.swift @@ -47,7 +47,7 @@ public class StartupQueue: Plugin, Subscriber { extension StartupQueue { internal func runningUpdate(state: System) { - running = state.running + _running.set(state.running) if state.running { replayEvents() } diff --git a/Sources/Segment/Utilities/Atomic.swift b/Sources/Segment/Utilities/Atomic.swift index 1dc6c07e..922c4819 100644 --- a/Sources/Segment/Utilities/Atomic.swift +++ b/Sources/Segment/Utilities/Atomic.swift @@ -7,34 +7,62 @@ import Foundation -// NOTE: Revised from previous implementation which used a struct and NSLock's. -// Thread Sanitizer was *correctly* capturing this issue, which was a little obscure -// given the property wrapper PLUS the semantics of a struct. Moving to `class` -// removes the semantics problem and lets TSan approve of what's happening. -// -// Additionally, moving to a lock free version is just desirable, so moved to a queue. -// -// Also see thread here: https://github.com/apple/swift-evolution/pull/1387 +/* + Revised the implementation yet again. Tiziano Coriano noticed that this wrapper + can be misleading about it's atomicity. A single set would be atomic, but a compound + operation like += would cause an atomic read, and a separate atomic write, in which + point another thread could've changed the value we're now working off of. + + This implementation removes the ability to set wrappedValue, and callers now must use + the set() or mutate() functions explicitly to ensure a proper atomic mutation. + + The use of a dispatch queue was also removed in favor of an unfair lock (yes, it's + implemented correctly). + */ @propertyWrapper public class Atomic { - private var value: T - private let queue = DispatchQueue(label: "com.segment.atomic.\(UUID().uuidString)") - + internal typealias os_unfair_lock_t = UnsafeMutablePointer + internal var unfairLock: os_unfair_lock_t + + internal var value: T + public init(wrappedValue value: T) { + self.unfairLock = UnsafeMutablePointer.allocate(capacity: 1) + self.unfairLock.initialize(to: os_unfair_lock()) self.value = value } - + + deinit { + unfairLock.deallocate() + } + public var wrappedValue: T { - get { return queue.sync { return value } } - set { queue.sync { value = newValue } } + get { + lock() + defer { unlock() } + return value + } + // set is not allowed, use set() or mutate() } + + public func set(_ newValue: T) { + mutate { $0 = newValue } + } + + public func mutate(_ mutation: (inout T) -> Void) { + lock() + defer { unlock() } + mutation(&value) + } +} - @discardableResult - public func withValue(_ operation: (inout T) -> Void) -> T { - queue.sync { - operation(&self.value) - return self.value - } +extension Atomic { + internal func lock() { + os_unfair_lock_lock(unfairLock) + } + + internal func unlock() { + os_unfair_lock_unlock(unfairLock) } } diff --git a/Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift b/Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift index 0a07edf1..d5be3d0d 100644 --- a/Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift +++ b/Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift @@ -37,12 +37,12 @@ public class CountBasedFlushPolicy: FlushPolicy { } public func updateState(event: RawEvent) { - _count.withValue { value in + _count.mutate { value in value += 1 } } public func reset() { - count = 0 + _count.set(0) } } diff --git a/Sources/Segment/Utilities/QueueTimer.swift b/Sources/Segment/Utilities/QueueTimer.swift index da6d97fd..4f01a700 100644 --- a/Sources/Segment/Utilities/QueueTimer.swift +++ b/Sources/Segment/Utilities/QueueTimer.swift @@ -57,7 +57,7 @@ internal class QueueTimer { if state == .suspended { return } - state = .suspended + _state.set(.suspended) timer.suspend() } @@ -65,7 +65,7 @@ internal class QueueTimer { if state == .resumed { return } - state = .resumed + _state.set(.resumed) timer.resume() } } diff --git a/Sources/Segment/Utilities/UserAgent.swift b/Sources/Segment/Utilities/UserAgent.swift index 60ab8a5e..6530f2b9 100644 --- a/Sources/Segment/Utilities/UserAgent.swift +++ b/Sources/Segment/Utilities/UserAgent.swift @@ -35,13 +35,18 @@ internal struct UserAgent { private static let defaultWebKitAppName = "" #endif - internal static var _value: String = "" + @Atomic internal static var _value: String = "" + internal static let lock = NSLock() public static var value: String { + lock.lock() + defer { lock.unlock() } + if _value.isEmpty { - _value = value(applicationName: defaultWebKitAppName) + __value.set(value(applicationName: defaultWebKitAppName)) } return _value + //return "someUserAgent" } private static func version() -> String { diff --git a/Tests/Segment-Tests/Atomic_Tests.swift b/Tests/Segment-Tests/Atomic_Tests.swift index 6e1d1216..d6b420b0 100644 --- a/Tests/Segment-Tests/Atomic_Tests.swift +++ b/Tests/Segment-Tests/Atomic_Tests.swift @@ -13,7 +13,7 @@ final class Atomic_Tests: XCTestCase { // `queue.sync { counter = oldValue + 1 }` // And the threads are free to suspend in between the two calls to `queue.sync`. - _counter.withValue { value in + _counter.mutate { value in value += 1 } } diff --git a/Tests/Segment-Tests/FlushPolicy_Tests.swift b/Tests/Segment-Tests/FlushPolicy_Tests.swift index 0de096c0..636a5792 100644 --- a/Tests/Segment-Tests/FlushPolicy_Tests.swift +++ b/Tests/Segment-Tests/FlushPolicy_Tests.swift @@ -142,7 +142,7 @@ class FlushPolicyTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) if analytics.pendingUploads!.count > 0 { // flush was triggered - flushSent = true + _flushSent.set(true) } } diff --git a/Tests/Segment-Tests/Storage_Tests.swift b/Tests/Segment-Tests/Storage_Tests.swift index d4c89894..4d6cb7e7 100644 --- a/Tests/Segment-Tests/Storage_Tests.swift +++ b/Tests/Segment-Tests/Storage_Tests.swift @@ -290,7 +290,7 @@ class StorageTests: XCTestCase { @Atomic var done = false analytics.flush { print("flush completed") - done = true + _done.set(true) } while !done { diff --git a/Tests/Segment-Tests/StressTests.swift b/Tests/Segment-Tests/StressTests.swift index 22331f4a..f4553621 100644 --- a/Tests/Segment-Tests/StressTests.swift +++ b/Tests/Segment-Tests/StressTests.swift @@ -24,19 +24,11 @@ class StressTests: XCTestCase { // register our network blocker guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return } - let analytics = Analytics(configuration: Configuration(writeKey: "stressTest").errorHandler({ error in + let analytics = Analytics(configuration: Configuration(writeKey: "stressTest2").errorHandler({ error in XCTFail("Storage Error: \(error)") })) + analytics.purgeStorage() analytics.storage.hardReset(doYouKnowHowToUseThis: true) - analytics.storage.onFinish = { url in - // check that each one is valid json - do { - let json = try Data(contentsOf: url) - _ = try JSONSerialization.jsonObject(with: json) - } catch { - XCTFail("\(error) in \(url)") - } - } DirectoryStore.fileValidator = { url in do { @@ -83,22 +75,22 @@ class StressTests: XCTestCase { // schedule a bunch of events to go out for i in 0..<1_000_000 { - let randomInt = Int.random(in: 0..<30) - let queue = queues[randomInt] - group.enter() - queue.async { - writeBlock(randomInt) - group.leave() - } + let randomInt = Int.random(in: 0..<30) + let queue = queues[randomInt] + group.enter() + queue.async { + writeBlock(randomInt) + group.leave() + } } group.notify(queue: DispatchQueue.main) { - ready = false + _ready.set(false) print("\(eventsWritten) events written, across 30 queues.") print("all queues finished.") } - ready = true + _ready.set(true) group.leave() @@ -117,15 +109,6 @@ class StressTests: XCTestCase { XCTFail("Storage Error: \(error)") })) analytics.storage.hardReset(doYouKnowHowToUseThis: true) - analytics.storage.onFinish = { url in - // check that each one is valid json - do { - let json = try Data(contentsOf: url) - _ = try JSONSerialization.jsonObject(with: json) - } catch { - XCTFail("\(error) in \(url)") - } - } DirectoryStore.fileValidator = { url in do { @@ -175,7 +158,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 1 wrote \(eventsWritten) events.") - queue1Done = true + _queue1Done.set(true) } writeQueue2.async { @@ -189,7 +172,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 2 wrote \(eventsWritten) events.") - queue2Done = true + _queue2Done.set(true) } writeQueue3.async { @@ -203,7 +186,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 3 wrote \(eventsWritten) events.") - queue3Done = true + _queue3Done.set(true) } writeQueue4.async { @@ -217,7 +200,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 4 wrote \(eventsWritten) events.") - queue4Done = true + _queue4Done.set(true) } flushQueue.async { @@ -233,10 +216,10 @@ class StressTests: XCTestCase { counter += 1 } print("flushed \(counter) times.") - ready = false + _ready.set(false) } - ready = true + _ready.set(true) while (ready) { RunLoop.main.run(until: Date.distantPast) @@ -257,15 +240,6 @@ class StressTests: XCTestCase { XCTFail("Storage Error: \(error)") })) analytics.storage.hardReset(doYouKnowHowToUseThis: true) - analytics.storage.onFinish = { url in - // check that each one is valid json - do { - let json = try Data(contentsOf: url) - _ = try JSONSerialization.jsonObject(with: json) - } catch { - XCTFail("\(error) in \(url)") - } - } waitUntilStarted(analytics: analytics) @@ -302,7 +276,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 1 wrote \(eventsWritten) events.") - queue1Done = true + _queue1Done.set(true) } writeQueue2.async { @@ -316,7 +290,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 2 wrote \(eventsWritten) events.") - queue2Done = true + _queue2Done.set(true) } flushQueue.async { @@ -332,10 +306,10 @@ class StressTests: XCTestCase { counter += 1 } print("flushed \(counter) times.") - ready = false + _ready.set(false) } - ready = true + _ready.set(true) while (ready) { RunLoop.main.run(until: Date.distantPast)