【问题标题】:How create thread safe variable in Swift 3?如何在 Swift 3 中创建线程安全变量?
【发布时间】:2017-03-15 11:58:24
【问题描述】:

我有单身课

class DeviceController:NSObject, CocoaMQTTDelegate {
     static let sharedInstance = DeviceController()
     var deviceOnArray:[String] = []
     var deviceOffArray:[String] = []
     private override init() {

        clientID = "xyz-" + String(ProcessInfo().processIdentifier)
        mqtt = CocoaMQTT(clientID: clientID, host: "device_controller.xyz.net", port: 1883)
        mqtt.username = "username"
        mqtt.password = "password"
        mqtt.willMessage = CocoaMQTTWill(topic: "/will", message: "dieout")
        mqtt.keepAlive = 30
        mqtt.cleanSession = true
        DeviceController.isConnecting = true
        super.init()
        mqtt.delegate = self
        mqtt.connect()
        self.registerBackgroundTask()
    }
   func sendArm(topic:String){
      // add device to deviceOnArray
   }
   func sendDisarm(topic:String){
      // remove device from deviceOnArray if exist here if I check by code that device is in array it returns false but on console if print it contains the device, It only heppens when I call sendArm and sendDisarm with a second.
     let lockQueue = DispatchQueue(label: "com.test.LockQueue")
        lockQueue.sync() {
           // all code now inside this
         }
     // I also used above code but it's not working
   }


}

如果您阅读了代码,那么您就会知道我在从 deviceOnArray/deviceOffArray 读取正确值时遇到了问题,我不知道如何解释这个问题,但我认为我需要的是 Obj-C 原子线程安全多变的。知道如何创建一个吗?

【问题讨论】:

  • 我会使用串行调度队列
  • 你能告诉我怎么做吗?

标签: ios swift multithreading thread-safety atomic


【解决方案1】:

详情

  • Swift 5.1,Xcode 11.3.1

解决方案

import Foundation
class AtomicValue<T> {

    private var _value: T
    private var accessQueue: DispatchQueue!

    init (value: T) {
        _value = value
        let address = Unmanaged.passUnretained(self).toOpaque()
        let label = "accessQueue.\(type(of: self)).\(address)"
        accessQueue = DispatchQueue(label: label)
    }
}

// MARK: Get/Set with synchronization in current queue

extension AtomicValue {
    func waitSet(lockValueAccessWhile closure: ((_ currentValue: T) -> T)?) {
        accessQueue.sync { [weak self] in
            guard let self = self, let value = closure?(self._value) else { return }
            self._value = value
        }
    }

    func waitGet(lockValueAccessWhile closure: ((_ currentValue: T) -> Void)?) {
        accessQueue.sync { [weak self] in
            guard let self = self else { return }
            closure?(self._value)
        }
    }

    func waitUpdate(lockValueAccessWhile closure: ((_ currentValue: inout T) -> Void)?) {
        accessQueue.sync { [weak self] in
            guard let self = self else { return }
            closure?(&self._value)
        }
    }

    func waitMap<V>(lockValueAccessWhile closure: ((_ currentValue: T) -> V)?) -> NotQueueSafeValue<V> {
        var value: V!
        waitGet { value = closure?($0) }
        return NotQueueSafeValue(notQueueSafeValue: value)
    }

    // Be careful with func waitGet() -> NotQueueSafeValue<T>. It is ONLY for WRITE (SAVE).
    // BAD CODE: atomicValue.waitGet().notQueueSafeValue.doSometing().
    //      it is possible that atomicValue._value could be changed while func doSometing() is performing
    // GOOD CODE: atomicValue.waitGet { $0.doSometing() }.
    //      atomicValue._value will never changed while func doSometing() is performing

    struct NotQueueSafeValue<T> { let notQueueSafeValue: T }

    func waitGet() -> NotQueueSafeValue<T> {
        var value: T!
        waitGet { value = $0 }
        return NotQueueSafeValue(notQueueSafeValue: value)
    }

    func waitSet(value: T) { waitSet { _ in return value } }
}

// MARK: Get/Set in current queue

extension AtomicValue {
    func set(waitAccessIn queue: DispatchQueue, closure: ((_ currentValue: T) -> T)?) {
        queue.async { [weak self] in self?.waitSet(lockValueAccessWhile: closure) }
    }

    func get(waitAccessIn queue: DispatchQueue, closure: ((_ currentValue: T) -> Void)?) {
        queue.async { [weak self] in self?.waitGet(lockValueAccessWhile: closure) }
    }

    func update(waitAccessIn queue: DispatchQueue, closure: ((_ currentValue: inout T) -> Void)?) {
        queue.async { [weak self] in self?.waitUpdate(lockValueAccessWhile: closure) }
    }
}

用法

// Usage
let atomicValue = AtomicValue(value: 0)

atomicValue.waitSet { currentValue -> Int in
    // Multiple sequential (sync) actions
    return currentValue + 1
}

atomicValue.waitGet { currentValue in
    // Multiple sequential (sync) actions
    print("\(currentValue)")
}

atomicValue.waitUpdate { currentValue in
    // Multiple sequential (sync) actions
    currentValue += 1
}

atomicValue.set(waitAccessIn: .global(qos: .default)) { currentValue -> Int in
    // Multiple sequential (sync) actions
    return currentValue + 1
}

atomicValue.get(waitAccessIn: .global(qos: .default)) { currentValue in
    // Multiple sequential (sync) actions
    print("\(currentValue)")
}

atomicValue.update(waitAccessIn: .global(qos: .background)) { currentValue in
    // Multiple sequential (sync) actions
    currentValue += 1
}

print(atomicValue.waitGet().notQueueSafeValue)
atomicValue.waitSet(value: 2)

print("\(atomicValue.waitMap { "value: \($0)" }.notQueueSafeValue )")

示例

不要忘记在此处粘贴解决方案代码。

class Test {
    private let atomicValue = AtomicValue(value: 0)
    private(set) var count = 0
    private var usecDelay: UInt32 = 1000
    private let range = (0..<100)

    private func _set(currentValue: Int, dispatchGroup: DispatchGroup? = nil) -> Int {
        let newValue = currentValue + 1
        //print("\(queue.label) queue: \(newValue)")
        count += 1
        usleep(usecDelay)
        dispatchGroup?.leave()
        return newValue
    }

    private func _get(value: Int, queueLabel: String, dispatchGroup: DispatchGroup? = nil) {
        print("  get \(queueLabel) queue: \(value)")
        usleep(usecDelay)
        dispatchGroup?.leave()
    }

    func test1(queue: DispatchQueue, completion: (() -> Void)?) {
        queue.async { [weak self] in
            guard let self = self else { return }
            self.range.forEach { _ in
                self.atomicValue.waitSet { [weak self] in
                    return self?._set(currentValue: $0) ?? $0
                }
                self.atomicValue.waitGet { [weak self] in
                    self?._get(value: $0, queueLabel: queue.label)
                }
            }
            print("test1.work at \(queue.label) queue completed")
            completion?()
        }
    }

    func test2(queue: DispatchQueue, completion: (() -> Void)?) {
        let dispatchGroup = DispatchGroup()
        range.forEach { _ in
            dispatchGroup.enter()
            dispatchGroup.enter()
        }
        queue.async { [weak self] in
            guard let self = self else { return }
            self.range.forEach { _ in
                self.atomicValue.waitUpdate { [weak self] in
                    guard let value = self?._set(currentValue: $0, dispatchGroup: dispatchGroup) else { return }
                    $0 = value
                }
                self.atomicValue.waitGet { [weak self] in
                    self?._get(value: $0, queueLabel: queue.label, dispatchGroup: dispatchGroup)
                }
            }
        }
        dispatchGroup.notify(queue: queue) {
            print("test2.work at \(queue.label) queue completed")
            completion?()
        }
    }


    func test3(queue: DispatchQueue, waitIn waitQueue: DispatchQueue, completion: (() -> Void)?) {
        let dispatchGroup = DispatchGroup()
        range.forEach { _ in
            dispatchGroup.enter()
            dispatchGroup.enter()
            dispatchGroup.enter()
        }
        queue.async { [weak self] in
            guard let self = self else { return }
            self.range.forEach { _ in
                self.atomicValue.set(waitAccessIn: waitQueue) { [weak self] in
                    return self?._set(currentValue: $0, dispatchGroup: dispatchGroup) ?? $0
                }
                self.atomicValue.get(waitAccessIn: waitQueue)  { [weak self] in
                    self?._get(value: $0, queueLabel: queue.label, dispatchGroup: dispatchGroup)
                }
                self.atomicValue.update(waitAccessIn: waitQueue) { [weak self] in
                    guard let value = self?._set(currentValue: $0, dispatchGroup: dispatchGroup) else { return }
                    $0 = value
                }
            }
        }
        dispatchGroup.notify(queue: queue) {
            print("test3.work at \(queue.label) queue completed")
            completion?()
        }
    }

    func run() {
        let dspatchGroup = DispatchGroup()
        dspatchGroup.enter()
        test1(queue: .global(qos: .utility)) { dspatchGroup.leave() }                                           // result: count += range.max()
        dspatchGroup.enter()
        test1(queue: .global(qos: .unspecified))  { dspatchGroup.leave() }                                      // result: count += range.max()

        dspatchGroup.enter()
        test2(queue: .main) { dspatchGroup.leave() }                                                            // result: count += range.max()
        dspatchGroup.enter()
        test2(queue: .global(qos: .unspecified)) { dspatchGroup.leave() }                                       // result: count += range.max()

        dspatchGroup.enter()
        test3(queue: .global(qos: .default), waitIn: .global(qos: .background)) { dspatchGroup.leave() }        // result: count += 2*range.max()
        dspatchGroup.enter()
        test3(queue: .main, waitIn: .global(qos: .userInteractive)) { dspatchGroup.leave() }                    // result: count += 2*range.max()

        dspatchGroup.notify(queue: .main) { print("End. Count: \(self.count)") }
    }
}

Test().run()

结果(日志)

  get com.apple.root.default-qos queue: 3
  get com.apple.root.default-qos queue: 4
  get com.apple.root.default-qos queue: 4
  get com.apple.root.utility-qos queue: 5
  get com.apple.root.default-qos queue: 6
  get com.apple.root.default-qos queue: 7
  get com.apple.root.default-qos queue: 8
  get com.apple.root.default-qos queue: 9
  get com.apple.root.default-qos queue: 10
  get com.apple.root.default-qos queue: 11
  get com.apple.root.default-qos queue: 12
  get com.apple.root.default-qos queue: 13
  get com.apple.root.default-qos queue: 14
test3.work at com.apple.root.default-qos queue completed
  get com.apple.main-thread queue: 15
  get com.apple.root.default-qos queue: 17
  get com.apple.root.default-qos queue: 18
  get com.apple.root.utility-qos queue: 18
  get com.apple.main-thread queue: 19
  get com.apple.root.default-qos queue: 21
  get com.apple.root.default-qos queue: 22
  get com.apple.root.utility-qos queue: 22
  get com.apple.main-thread queue: 23
  get com.apple.root.default-qos queue: 25
  get com.apple.root.default-qos queue: 26
  get com.apple.root.utility-qos queue: 26
  get com.apple.main-thread queue: 27
  get com.apple.root.default-qos queue: 29
  get com.apple.root.default-qos queue: 30
  get com.apple.root.utility-qos queue: 30
  get com.apple.main-thread queue: 31
  get com.apple.root.default-qos queue: 33
  get com.apple.root.default-qos queue: 34
  get com.apple.root.utility-qos queue: 34
  get com.apple.main-thread queue: 35
  get com.apple.root.default-qos queue: 37
  get com.apple.root.default-qos queue: 38
  get com.apple.root.utility-qos queue: 38
  get com.apple.main-thread queue: 39
  get com.apple.root.default-qos queue: 41
  get com.apple.root.default-qos queue: 42
  get com.apple.root.utility-qos queue: 42
  get com.apple.main-thread queue: 43
  get com.apple.root.default-qos queue: 45
  get com.apple.root.default-qos queue: 46
  get com.apple.root.utility-qos queue: 46
  get com.apple.main-thread queue: 47
  get com.apple.root.default-qos queue: 49
test1.work at com.apple.root.default-qos queue completed
  get com.apple.root.default-qos queue: 50
test2.work at com.apple.root.default-qos queue completed
  get com.apple.root.utility-qos queue: 50
  get com.apple.main-thread queue: 50
test1.work at com.apple.root.utility-qos queue completed
  get com.apple.main-thread queue: 51
test2.work at com.apple.main-thread queue completed
  get com.apple.main-thread queue: 52
  get com.apple.main-thread queue: 53
  get com.apple.main-thread queue: 54
  get com.apple.main-thread queue: 55
  get com.apple.main-thread queue: 56
  get com.apple.main-thread queue: 57
  get com.apple.main-thread queue: 58
  get com.apple.main-thread queue: 59
  get com.apple.main-thread queue: 60
test3.work at com.apple.main-thread queue completed
End. Count: 60

【讨论】:

  • 这太完美了!我将它用作结构(将mutating 添加到func set() - 就像一个魅力。谢谢。
  • atomicValue.value += 1(先读后写操作)发生并且另一个线程尝试设置atomicValue.value时会出现问题吗?如果是这样,我认为我们应该删除valueset 并改用set(closure:)
  • @Cyber​​Mew 是的,你说得对。我稍后会更新这个类。我有几个新想法。
  • 如果在主线程上调用这个原子值会导致死锁。所以你必须在其他队列上使用它,比如全局。我建议在 AtomicValue 类中使用私有串行队列,并通过队列串行处理 setter 和 getter。 @VasilyBodnarchuk
  • @Mehrdad 是的,这可以捕获死锁。更多信息:stackoverflow.com/a/54104259
【解决方案2】:

您可以使用串行调度队列来确保数组仅以线程安全的方式更新。

最好将您的deviceOnArray 属性更改为私有,以确保它不能被其他对象访问。如果您需要将此数组公开给其他对象,请通过计算属性执行此操作。例如

class DeviceController:NSObject, CocoaMQTTDelegate {
     static let sharedInstance = DeviceController()
     private var deviceOnArray:[String] = []
     var deviceOn: [String] {
         return self.deviceOnArray
     }

     var deviceOffArray:[String] = []
     private let dispatchQueue = DispatchQueue(label:"DeviceControllerQueue")

     private override init() {

        clientID = "xyz-" + String(ProcessInfo().processIdentifier)
        mqtt = CocoaMQTT(clientID: clientID, host: "device_controller.xyz.net", port: 1883)
        mqtt.username = "username"
        mqtt.password = "password"
        mqtt.willMessage = CocoaMQTTWill(topic: "/will", message: "dieout")
        mqtt.keepAlive = 30
        mqtt.cleanSession = true
        DeviceController.isConnecting = true
        super.init()
        mqtt.delegate = self
        mqtt.connect()
        self.registerBackgroundTask()
    }
   func sendArm(topic:String){
      // add device to deviceOnArray
       self.dispatchQueue.sync {
           deviceOnArray.append(topic)
       }
   }

   func sendDisarm(topic:String){
      // remove device from deviceOnArray if exist here.
       self.dispatchQueue.sync {
           if let index = self.deviceOnArray.index(of: topic) {
               self.deviceOnArray.remove(at: index)
           }
       }
   }
}

【讨论】:

  • return self.deviceOnArray线上显示错误Value of type '(NSObject) -&gt; () -&gt; DeviceController' has no member 'deviceOnArray'
  • 我原来的代码有错误;但我现在已经编辑了。
  • 如果 sendDisarm() 被两个不同的线程调用会阻止吗?
  • 这段代码不会阻止sendDisarm被调用,但它确保sendDisarm中的代码在被两个不同的线程调用时不会同时执行,因为它是同步分派到串行上的调度队列。
【解决方案3】:

原子属性在这里帮不了你。它们旨在同步属性的分配作为一个整体,而不是内部(例如,它们不同步插入/删除元素到数组)。它们几乎只确保正确的保留/释放/自动释放调用,以防止您的程序崩溃/泄漏。

您需要Dispatch​Semaphore 或类似的东西(或者可能是更多本地的东西,posix pthread_mutex 的东西)来确保互斥访问。

【讨论】:

  • 我按照你的建议使用了let lockQueue = DispatchQueue(label: "com.test.LockQueue") lockQueue.sync() {},但它不起作用,我还编辑了问题以显示我是如何做到的
  • 现在是否正常工作,还是您仍有同步问题?
猜你喜欢
  • 1970-01-01
  • 2015-03-27
  • 1970-01-01
  • 2019-03-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多