RxSwiftにおけるマルチスレッドの理解を深める – Schedulerについて

この記事はeureka Native Advent Calendar 2017 – Qiitaの9日目の記事です。

こんにちは。Pairs JP事業部でスクラムマスター & iOS / Webエンジニアをしているです。今回はRxSwiftのSchedulerについてお話しします。Techブログで初めてUIKit以外の記事を書いています。

ことの発端はあるObservableをsubscribeしたとき、subscribeのクロージャは[unowned self]と、[weak self]のどちらにすべきかで社内で議論が起こりました。

API.Me.Get
  .subscribe(onNext: { [unowned self] _ in
    self.someMethod()
  })
  .disposed(by: self.disposeBag)

[weak self]で書けば安全ですが、毎回selfのアンラップをするのはしんどいので、unownedで書けるものは書いた方が良いと個人的に思っています。

そこで問題になってくるのがSchedulerの考えになってきます。本記事ではSchedulerについてわかりやすく体系的にまとめることを目指します。そして最後にdisposeとsubscribe(onNext:の順序、unownedで書ける条件について話したいと思います。想定読者はRxSwiftをすでに使用しており、ある程度の知識があるものとしています。

Versions

RxSwift 4.0
Swift4.0

目次

  • Schedulerとは
  • observeOnとsubscribeOnとは?
  • Schedulerの種類
  • unowned self / weak self
  • さいごに

Schedulerとは

RxSwiftにおいて、Schedulerの役割はObservableやOperator、Subscribeなどの処理をどのスレッド、キューで実行するかを決めることです。

observeOnとsubscribeOnとは?

SchedulerはObservableTypeのobserveOnsubscribeOnというメソッドに渡して使います。ReactiveXのドキュメントがわかりやすいです。以下の図はReactiveXのドキュメントに掲載されている図です。ストリームの横線の色が実行されているSchedulerを示しています。

observeOnはどのSchedulerで次のObserverに通知していくかを決定します。上の図で言うオレンジとピンクの線になります。

observeOnについては、こちらのサンプルコードがわかりやすいと思います。observeOnを呼ぶたびに次のOperatorとSubscribeの処理のスレッドが切り替わります。

someObservable
  .observeOn(MainScheduler.instance) // これはメインスレッドで動く特別なScheduler
  .map { _ in
    // メインスレッドで実行される
  }
  .observeOn(backgroundScheduler) // これはバックグラウンドで動くように自身で作成したScheduler
  .map { _ in
    // バックグラウンドスレッドで実行される
  }
  .subscribe(onNext: { _ in
    // バックグラウンドスレッドで実行される
  })

subscribeOnは、ストリームの元となるObservableがどのSchedulerで処理されるかを決定します。上の図で言うブルーの線になります。ColdなObservableに対して有効です(subscribeされるまでObservableは実行されないため)。

subscribeOnについては、こちらのサンプルコードをご覧ください。とはいえ、subscribeOnはほとんど使われないと思います。

let someObservable = Observable<Int>.create({ (observer) -> Disposable in

  // 下のコードでsubscribeOn(backgroundScheduler)を呼んでいるので、バックグラウンドスレッドで実行される
  observer.on(.next(1))
  observer.on(.completed)

  return Disposables.create()
})

// 1. ここがメインスレッド or バックグラウンドスレッド
someObservable
  .subscribeOn(backgroundScheduler)
  .map { _ in
    // 上記の1と同じスレッドで実行される
  }
  .subscribe(onNext: { _ in
    // 上記の1と同じスレッドで実行される
  })

observeOnで実験をしてみる

// ここはメインスレッド
let someObservable = Observable<Int>.create({ (observer) -> Disposable in

  observer.on(.next(1))
  observer.on(.completed)

  return Disposables.create()
})

DispatchQueue.global(qos: .background).async { // [1]

  _ = someObservable
    .map { n in
      // バックグラウンドスレッド(特に指定していないので[1]の影響)
    }
    .observeOn(MainScheduler.instance) // [2]
    .map { n in
      // メインスレッド([2]の影響)
    }
    .observeOn(backgroundScheduler) // [3]
    .map { n in
      // バックグラウンドスレッド([3]の影響)
    }
    .do(onNext: { _ in
      // バックグラウンドスレッド([3]の影響)
    })
    .observeOn(MainScheduler.instance) // [4]
    .subscribe(onNext: { _ in
      // メインスレッド([4]の影響)
    })
}

Schedulerの種類

RxSwift 4.0では現在8つのSchedulerが存在します。

  • MainScheduler
  • ConcurrentMainScheduler
  • SerialDispatchQueueScheduler
  • ConcurrentDispatchQueueScheduler
  • OperationQueueScheduler
  • CurrentThreadScheduler
  • HistoricalSchedulerTimeConverter
  • VirtualTimeScheduler

HistoricalSchedulerTimeConverterとVirtualTimeSchedulerはRxSwiftのテストコードで使用するSchedulerになるので、今回の記事では省略したいと思います。上の6つのSchedulerについて詳細を説明していきます。

メインスレッド

メインスレッドで実行するSchedulerは2つ存在します。

  • MainScheduler
  • ConcurrentMainScheduler

使い分けとしては、MainSchedulerがobserveOnに最適化されており、ConcurrentMainSchedulerがsubscribeOnに最適化されています。

それぞれシングルトンのインスタンスを持っており、これらを使用します。

  • MainScheduler.instance
  • MainScheduler.asyncInstance(ただのSerialDispatchQueueScheduler)
  • ConcurrentMainScheduler.instance

MainScheduler.instanceMainScheduler.asyncInstanceの違いは、
MainScheduler.instanceはキューイングされているものがない場合、即時実行される
MainScheduler.asyncInstanceは必ずDispatchQueue.mainでディスパッチされる
です。

また、DriverはMainScheduler.instanceでdrive, subscribe, bindのメソッドが実行されます。ControlPropertyはConcurrentMainScheduler.instanceでsubscribeOnされています。

DispatchQueue

SerialDispatchQueueScheduler

MainSchedulerはSerialDispatchQueueSchedulerのサブクラスです。MainScheduler.asyncInstanceもこのSerialDispatchQueueSchedulerになります。

DispatchQoSやDispatchQueueを渡してinitします。渡されたDispatchQueueがConcurrentでも中でSerial Queueが生成されており、必ずSerialになります。

let serialScheduler1 = SerialDispatchQueueScheduler(qos: .background)

let serialScheduler2 = SerialDispatchQueueScheduler(queue: DispatchQueue.main, internalSerialQueueName: "jp.eure.pairs.main.serial")

ConcurrentDispatchQueueScheduler

バックグラウンドスレッドで動作させたい場合は通常こちらのSchedulerを使用します。

let backgroundScheduler = ConcurrentDispatchQueueScheduler(qos: .background)

OperationQueue

あるOperationQueueでまとまった処理をしたい場合は、こちらのOperationQueueSchedulerを使います。特にConcurrentに処理する数を制限したいときにmaxConcurrentOperationCountを設定します。

let operationQueue = OperationQueue()
operationQueue.maxConcurrentOperationCount = 3
let operationScheduler = OperationQueueScheduler(operationQueue: operationQueue)

特に指定しない場合

特に指定がなければ、CurrentThreadSchedulerで動きます。こちらはSubscribeが実行されるスレッドでOperatorやSubscribeの処理が実行されます。

DispatchQueue.main.async {

  _ = someObservable1
    .subscribe(onNext: { _ in
      // メインスレッド
    })
}

DispatchQueue.global(qos: .background).async {

  _ = someObservable2
    .subscribe(onNext: { _ in
      // バックグラウンドスレッド
    })
}

unowned self / weak self

最初の問題に戻りましょう。

API.Me.Get
  .subscribe(onNext: { [unowned self] _ in
    self.someMethod()
  })
  .disposed(by: self.disposeBag)

この問題はdiseposeされた後に、 subscribe(onNext: が呼ばれることがなければ [unowned self]で良いことになります。

disposeの後にonNextが呼ばれないことが保証される場合は、

  • SerialなScheduler(MainScheduler, SerialDispatchQueueScheduler)でobserveOnしている
  • 同じQueueでdisposeされる(disposeBagが同じQueueでdeinit または 明示的にdispose)

となります。それ以外の条件ではdispose後にonNextが呼ばれる可能性があるので、必ず[weak self]にすべきです。
以下の例では基本的にMainSchedulerで動いているので、[unowned self]で良いです。

class ViewController: UIViewController {
  let disposeBag = DisposeBag() // メインスレッドでdeinitされる

  override func viewDidLoad() {
    super.viewDidLoad()

    // メインスレッド
    API.Me.Get
      .subscribe(onNext: { [unowned self] _ in
     // メインスレッド(observeOnが指定されていないので、CurrentThreadScheduler)
        self.someMethod()
      })
      .disposed(by: self.disposeBag)    
  }
}

さいごに

今回はRxSwiftのSchedulerについてまとめました。Schedulerを意識せずにRxSwiftを使用することは可能ですが、意図しない動作をする可能性があるので危険です。また、disposeとonNextのタイミングについても理解をしておくと良いと思います。

  • このエントリーをはてなブックマークに追加

エウレカでは、一緒に働いていただける方を絶賛募集中です。募集中の職種はこちらからご確認ください!皆様のエントリーをお待ちしております!

Recommend

オウンドメディアの古い記事を整理して、トラフィックを約2倍にした話

eureka Go ~PairsのGo言語フルスクラッチの舞台裏~ を開催しました!