RxSwift调度者scheduler

news/2024/7/6 6:31:50 标签: RxSwift, 调度者

RxSwift的核心非常简单,无非就是以下四点:

  • 可观察序列 Observalbe
  • 观察者 Observer
  • 调度者 Scheduler
  • 销毁者 Dispose

可观察序列、观察者,在《RxSwift核心源码探索》中有讲,下面就来看看调度者Scheduler具体做了哪些事情。调度者scheduler主要用于控制任务在哪个线程或队列运行,而scheduler是对GCD的封装,GCD我们很熟悉,通过GCD创建队列,开启线程,开发中所有动作都等价于任务+队列

任务:

  • 异步任务
  • 同步任务

队列:

  • 主队列
  • 全局队列
  • 并行队列
  • 串行队列

参见:《GCD部分总结》

scheduler中调度队列如下:

  • MainScheduler主线程,与UI相关的任务均在该线程下执行
  • SerialDispatchQueueScheduler相当于GCD对应的串行队列
  • ConcurrentDispatchQueueScheduler相当于GCD并行队列
  • OperationQueueScheduler相当于NSOperationQueue管理者可以设置并发数
  • CurrentThreadScheduler-当前线程

scheduler.png

以上几种类型,通过代码能够发现实际上就是对GCD队列创建的封装,以及Operation的封装。

1、MainScheduler

表示为主线程。开发中需要执行一些和UI相关的任务,则需要我们切换到该Scheduler上执行。点击进入MainScheduler类,如下:

public final class MainScheduler : SerialDispatchQueueScheduler {

    private let _mainQueue: DispatchQueue

    var numberEnqueued = AtomicInt(0)

    /// Initializes new instance of `MainScheduler`.
    public init() {
        self._mainQueue = DispatchQueue.main
        super.init(serialQueue: self._mainQueue)
    }
}

MainScheduler继承了SerialDispatchQueueScheduler串行队列类,当然这里不难理解,因为主队列就是一个特殊的串行队列。在该类中,能够清楚的看到,在初始化对象时,确定了队列类型为主队列self._mainQueue = DispatchQueue.main

2、SerialDispatchQueueScheduler

串行队列,有需要串行执行的任务,都需要切换至该scheduler下。

public class SerialDispatchQueueScheduler : SchedulerType {
    public typealias TimeInterval = Foundation.TimeInterval
    public typealias Time = Date
    
    /// - returns: Current time.
    public var now : Date {
        return Date()
    }

    let configuration: DispatchQueueConfiguration
    
    /**
    Constructs new `SerialDispatchQueueScheduler` that wraps `serialQueue`.

    - parameter serialQueue: Target dispatch queue.
    - parameter leeway: The amount of time, in nanoseconds, that the system will defer the timer.
    */
    init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
    }

    public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
        serialQueueConfiguration?(queue)
        self.init(serialQueue: queue, leeway: leeway)
    }
}

同样初始化的时候,通过DispatchQueue对attributes属性置空操作设定了队列为串行队列。

3、ConcurrentDispatchQueueScheduler

并行队列,如下载任务,我们需要多个任务同时进行,则需要切换到当前scheduler

public class ConcurrentDispatchQueueScheduler: SchedulerType {
    public typealias TimeInterval = Foundation.TimeInterval
    public typealias Time = Date
    
    public var now : Date {
        return Date()
    }

    let configuration: DispatchQueueConfiguration
     
    public init(queue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.configuration = DispatchQueueConfiguration(queue: queue, leeway: leeway)
    }
    @available(iOS 8, OSX 10.10, *)
    public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.init(queue: DispatchQueue(
            label: "rxswift.queue.\(qos)",
            qos: qos,
            attributes: [DispatchQueue.Attributes.concurrent],
            target: nil),
            leeway: leeway
        )
    }
}

和我们的串行队列的配置方法参数,一样,不同的是对队列类型attributes的设置,该处置空为串行,concurrent为并行。

4、OperationQueueScheduler

用来获取当前线程,看到名称我们应该就能猜到该类就是对OperationQueue的封装。

public class OperationQueueScheduler: ImmediateSchedulerType {
    public let operationQueue: OperationQueue
    public let queuePriority: Operation.QueuePriority

    public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
        self.operationQueue = operationQueue
        self.queuePriority = queuePriority
    }
}

5、CurrentThreadScheduler

表示当前线程,默认就在当前调度上。

public class CurrentThreadScheduler : ImmediateSchedulerType {
    typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>

    /// The singleton instance of the current thread scheduler.
    public static let instance = CurrentThreadScheduler()

    private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
        let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
        defer {
#if swift(>=4.1)
            key.deallocate()
#else
            key.deallocate(capacity: 1)
#endif
        }

    static var queue : ScheduleQueue? {
        get {
            return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
        }
        set {
            Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
        }
    }

    /// Gets a value that indicates whether the caller must call a `schedule` method.
    public static fileprivate(set) var isScheduleRequired: Bool {
        get {
            return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
        }
        set(isScheduleRequired) {
            if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                rxFatalError("pthread_setspecific failed")
            }
        }
    }
}
  • isScheduleRequired:用来表示是否必须调用schedule方法
  • 通过queue方法的set方法将当前你线程绑定到相应的key上,get方法通过key获取当前线程,queue是在schedule<StateType>中调用的

schedule方法:

public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    if CurrentThreadScheduler.isScheduleRequired {
        CurrentThreadScheduler.isScheduleRequired = false

        let disposable = action(state)

        defer {
            CurrentThreadScheduler.isScheduleRequired = true
            CurrentThreadScheduler.queue = nil
        }

        guard let queue = CurrentThreadScheduler.queue else {
            return disposable
        }

        while let latest = queue.value.dequeue() {
            if latest.isDisposed {
                continue
            }
            latest.invoke()
        }

        return disposable
    }

    let existingQueue = CurrentThreadScheduler.queue

    let queue: RxMutableBox<Queue<ScheduledItemType>>
    if let existingQueue = existingQueue {
        queue = existingQueue
    }
    else {
        queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
        CurrentThreadScheduler.queue = queue
    }

    let scheduledItem = ScheduledItem(action: action, state: state)
    queue.value.enqueue(scheduledItem)

    return scheduledItem
}

通过Thread.setThreadLocalStorageValue方法看一下内部做了哪些工作,代码如下:

extension Thread {
    static func setThreadLocalStorageValue<T: AnyObject>(_ value: T?, forKey key: NSCopying) {
        let currentThread = Thread.current
        let threadDictionary = currentThread.threadDictionary

        if let newValue = value {
            threadDictionary[key] = newValue
        }
        else {
            threadDictionary[key] = nil
        }
    }

    static func getThreadLocalStorageValueForKey<T>(_ key: NSCopying) -> T? {
        let currentThread = Thread.current
        let threadDictionary = currentThread.threadDictionary
        
        return threadDictionary[key] as? T
    }
}
  • threadDictionary一个可变类型的字典,
  • setThreadLocalStorageValue绑定当前线程到key
  • getThreadLocalStorageValueForKey通过key获取绑定的线程

调度者scheduler使用

模拟一个异步线程处理数据,完成后在主线程展示。

GCD实现:

DispatchQueue.init(label: "label",qos: .default,attributes:.concurrent).async {
    var num = 0
    for i in 0...100{
        num += I
    }
    DispatchQueue.main.sync {
        print("num:\(num)  \(Thread.current)")
    }
}
  • DispatchQueue.init:初始化一个队列
  • label:队列标识
  • qos:设置队列优先级
  • DispatchQueue.main.sync:返回主线程
  • attributes:设置队列类型,不设置默认为串行

RxSwift实现:

Observable<Any>.create { (observer) -> Disposable in
        var num = 0
        for i in 0...100{
            num += I
        }
        observer.onNext(num)
        return Disposables.create()
    }
    .subscribeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "yahibo"))
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { (val) in
        print(val)
    }).disposed(by: disposeBag)
  • 创建一个序列,在内部处理耗时操作
  • subscribeOn:决定序列的构造函数在哪个Scheduler上运行,使用SerialDispatchQueueScheduler设置为串行队列
  • ObserverOn:决定在哪个Scheduler上监听序列,使用MainScheduler设置为主线程队列中观察

还是我们熟悉的编码方式,通过点语法来设置序列所在线程。

无论是对序列元素的观察,UI绑定,还是多线程,在RxSwift中,统一处理成这种链式的形式,函数与函数之间没有强依赖性,使用灵活,降低了编码的复杂度。


http://www.niftyadmin.cn/n/1737794.html

相关文章

RxSwift-MVVM

MVVM核心在于数据与UI的双向绑定&#xff0c;数据的变化会更新UI&#xff0c;UI变化会更新我们的数据。那这种绑定操作谁来做呢&#xff1f;当然是我们的RxSwift。学习RxSwift框架以来&#xff0c;似乎并没有真正使用过这个框架&#xff0c;下面就来看看&#xff0c;RxSwift具体…

Alamofire-初探

对于iOS开发者来说&#xff0c;AFNetworking是我们大家所熟知的&#xff0c;而Alamofire呢&#xff1f;Alamofire框架其实就是AFNetworking兄弟&#xff0c;出自于同一个作者。既是同一个作者&#xff0c;那么他们的使用方法&#xff0c;框架结构上应该也是保持一致的。AFNetwo…

Linux中的同步机制 -- Futex

Linux中的同步机制(一)--Futex引子在编译2.6内核的时候&#xff0c;你会在编译选项中看到[*] Enable futex support这一项&#xff0c;上网查&#xff0c;有的资料会告诉你"不选这个内核不一定能正确的运行使用glibc的程序"&#xff0c;那futex是什么&#xff1f;和g…

Alamofire-后台下载

上一篇文章提到了后台下载&#xff0c;下面看看在Alamofire中是如何处理后台下载的。首先使用原生写法来实现一个后台下载任务&#xff0c;在使用Alamofire来实现&#xff0c;通过对比来看看Alamofire的优势。 数据源地址&#xff1a;http://testapi.onapp.top/public/videos/v…

64位的linux服务器上面安装apache服务器的问题定位

一&#xff0c;下载apache的源码包&#xff1a;httpd-2.2.4.tar.gz 二&#xff0c; 解压包到当前目录&#xff1a; tar zxvf httpd-2.2.4.tar.gz 三&#xff0c; 解压完毕&#xff0c;进入解压目录&#xff1a; cd httpd-2.2.4 四&#xff0c; 执行configure ./configure \…

如何把一个命令加入到某个用户sudo的列表中

有时候我们只需要执行一条root权限的命令也要su到root&#xff0c;是不是有些不方便&#xff1f;这时可以用sudo代替。默认新建的用户不在sudo组&#xff0c;需要编辑/etc/sudoers文件将用户加入&#xff0c;该文件建议使用visudo命令进行编辑&#xff08;其使用方法跟vim基本一…

Alamofire-Request

一、简述 在Alamofire中为了方便管理&#xff0c;明确分工&#xff0c;Alamofire对整个请求过程做了明确划分&#xff0c;并统一交由SessionManager来管理。SessionManager负责SessionDelegate、URLSession、URLRequest等对象创建与管理。先看一段请求示例&#xff1a; let u…

各浏览中对 MAP 和 AREA 元素的事件处理行为不同

from : http://w3help.org/zh-cn/causes/HO2009 标准参考 MAP 和 AREA 元素通常组合起来使用为图片要设置一个超链接区域。使用时将 IMG 元素的 "usemap" 属性1关联到一个 MAP 元素上&#xff0c;这个 MAP 元素的 "name" 属性值要与 IMG 元素的 "use…