連載:Reactive Extensions(Rx)入門

第2回 イベント・プログラミングとRx

河合 宜文
2012/01/06
Page1 Page2 Page3

 本連載の第1回では、Reactive Extensions(以降、Rx)の概要とインストール方法を解説した。今回からは具体的な使い方を見ていこう。

 まずはRxを利用する際の基本的な流れを見ていく。次に、Rxの持つ代表的な2つの性質であるイベントと非同期のうち、イベントを中心的に取り上げる。

基本的な記述方法

 最初に、シンプルなRx(Observableオブジェクト)のコードと、foreach文(Enumerableオブジェクト)のコードの対比を見てみよう。

using System.Linq;
using System.Reactive.Linq;

// Observableオブジェクト(Rxのコード)
Observable.Range(1, 5)
  .Subscribe(x => Console.WriteLine(x));

// Enumerableオブジェクト
foreach (var x in Enumerable.Range(1, 5))
  Console.WriteLine(x);
Imports System.Reactive.Linq

' Observableオブジェクト(Rxのコード)
Observable.Range(1, 5).
  Subscribe(Sub(x) Console.WriteLine(x))

' Enumerableオブジェクト
For Each x In Enumerable.Range(1, 5)
  Console.WriteLine(x)
Next
同じ実行結果のRxとforeach文のコード(上:C#、下:VB)
System.Reactiveアセンブリを参照する必要がある。

 実行結果はともに「1」から「5」が出力される。Enumerableオブジェクト(以降、「Enumerable」と略す)はPull型のシーケンス(=一連の処理の流れ)、Observableオブジェクト(以降、「Observable」と略す)はPush型のシーケンスを表す*1が、どちらも同じように記述できる。Enumerableではシーケンス(Range)があり、それを消費するもの(foreach)がある。Rxにおいてもそれは変わらず、シーケンスがあり、foreach文と同様の感覚、同じ位置でSubscribeメソッドを用いるというのが基本的な流れになる。

*1 Enumerableはデザイン・パターンでいうところのIteratorパターン、ObservableはObserverパターンを表している。

 それでは、詳細を知るためにインターフェイスを見てみよう。Rxでの主要なインターフェイスは2つ、IObservable<T>とIObserver<T>だ。次のコードはそのコード内容である(C#の場合)。

public interface IObservable<out T>
{
  IDisposable Subscribe(IObserver<T> observer);
}

public interface IObserver<in T>
{
  void OnCompleted();
  void OnError(Exception error);
  void OnNext(T value);
}
IObservable<T>インターフェイスとIObserver<T>インターフェイス(C#の場合)

 最初に挙げたシンプルな例の中でも、この2つのインターフェイスが利用されている。IObservable<T>オブジェクトがシーケンスを表し、IObserver<T>オブジェクトが消費するものを表している。つまり、「Observable.Range」というコードから得られるオブジェクトはIObservable<int>型だ。しかし、IObserver<T>オブジェクトは見当たらない。実はIObserver<T>オブジェクトは、Subscribeメソッドに渡しているラムダ式の中で自動的に作られている。その詳細を明かすと、中では以下のようになっている。

var observer = Observer.Create<int>(
  x => Console.WriteLine(x), // onNextパラメータ(デリゲート)
  ex => { throw ex; }, // onErrorパラメータ(デリゲート)
  () => { });  // onCompletedパラメータ(デリゲート)

Observable.Range(1, 5).Subscribe(observer);
Dim observer_ = Observer.Create(Of Integer)(
  Sub(x) Console.WriteLine(x),
  Sub(ex) Throw ex,
  Sub() End)

Observable.Range(1, 5).Subscribe(observer_)
Subscribeメソッドの引数に記述したラムダ式により生成されるIObserver<int>オブジェクトの詳細(上:C#、下:VB)

 Observer.Createメソッドには3つのデリゲートを受け取るパラメータがあり、順に、IObserver<T>インターフェイスのOnNext/OnError/OnCompletedメソッドのシグネチャと同じデリゲート型になっている。このため、その場でIObserver<T>インターフェイスを実装したインスタンスを生成できるようになっている。

 しかし、それでもわざわざObserver.CreateメソッドでIObserver<T>オブジェクトを作成するのは手間だろう。そこで、IObservable<T>インターフェイスには、IObserver<T>オブジェクトのみを受け取るSubcribeメソッドだけでなく、各種のデリゲートを受け取る、複数のオーバーロードのSubcribeメソッドが、拡張メソッドによって追加されている*2。これにより、Enumerableでのforeach文と同じ感覚で利用することが可能になっているのだ。

*2 ObservableExtensionsクラス(System名前空間)に、onNext/onError/onCompletedパラメータの組み合わせパターンによる5つのメソッドが定義されている。

IObserver<T>オブジェクトの詳細

 最初の「同じ実行結果のRxとforeach文のコード」の例ではIObserver<T>インターフェイスのメンバのうち、OnNextメソッドのデリゲートしか記述していなかった。このOnNextは、値がオブザーバに届くたびに実行されるメソッドで、最も利用されるものだろう。残りの2つのうち、OnErrorは例外発生時に実行されるメソッド、OnCompletedはシーケンスが完了後に実行されるメソッドを表す。これら3つのメソッドのデリゲートを指定する場合の、Subcribeメソッドのコード例を示す。

// 正常終了する場合
// 実行結果:1, 2, 3, 4, 5, Completed
Observable.Range(1, 5)
  .Subscribe(
    x => Console.WriteLine(x),
    ex => Console.WriteLine("Error"),
    () => Console.WriteLine("Completed"));

// 途中で例外が発生する場合
// 実行結果:1, 2, Error
Observable.Range(1, 5)
  .Do(x => { if (x == 3) throw new Exception(); })
  .Subscribe(
    x => Console.WriteLine(x),
    ex => Console.WriteLine("Error"),
    () => Console.WriteLine("Completed"));
' 正常終了する場合
' 実行結果:1, 2, 3, 4, 5, Completed
Observable.Range(1, 5).
  Subscribe(
    Sub(x) Console.WriteLine(x),
    Sub(ex) Console.WriteLine("Error"),
    Sub() Console.WriteLine("Completed"))

' 途中で例外が発生する場合
' 実行結果:1, 2, Error
Observable.Range(1, 5).
  Do(Sub(x)
      If (x = 3) Then
        Throw New Exception()
      End If
    End Sub).
  Subscribe(
    Sub(x) Console.WriteLine(x),
    Sub(ex) Console.WriteLine("Error"),
    Sub() Console.WriteLine("Completed"))
OnError/OnCompletedメソッドのデリゲートを含めたSubcribeメソッドのコードの実行例(上:C#、下:VB)
Doメソッドはパイプラインを通過する値に処理を加えて、元の値はそのまま後続へ渡す。今回は値が「3」のときに例外を発生させるようにした。

 上記のコードを実行した場合、Rangeメソッドにより「1」〜「5」の値の生成が完了すると、OnCompletedメソッドが呼ばれる、もしくは途中で例外が発生するとOnErrorメソッドが呼ばれるのが確認できるだろう。なお、OnErrorメソッド、もしくはOnCompletedメソッドは、どちらか片方しか呼ばれることはない。つまり、OnErrorメソッドが呼ばれた後にOnCompletedメソッドが呼ばれたりすることはない。また、OnErrorメソッド、もしくはOnCompletedメソッドが呼ばれた後にOnNextメソッドが呼ばれることもない*3

*3 この法則はRxに標準で用意されているメソッドではすべて守られている。また、メソッドを自作する場合でもObservable.CreateメソッドといったRx標準で用意されている生成メソッドを用いれば守られる。さらにSubscirbeメソッドの呼び出し時にもRxで用意されているObserver(前述のSystem.ObservableExtensionsクラスによる拡張メソッド、もしくはObserver.Createメソッド)を使えば自動的に守られるようになっている。このように、幾重にも渡って法則が厳守されるようになっているため、ほぼ100%この法則が破られることはないが、IObservable<T>/IObserver<T>オブジェクトをすべて一から自前で実装する場合は、この法則に反する挙動をする可能性もある。ただし、そのような実装はすべきではない。安全のためにも、IObservable<T>インターフェイスやIObserver<T>インターフェイスを実装する際は、極力、Rxに用意されているObservable.Create/Observer.Createメソッドを利用して実装すべきだろう。どうしてもそれらが利用できない場合は、法則を守るように注意深く実装する必要がある。

 なぜこのような挙動になっているのか、そうでなければならないのかというと、foreach文で考えてみると分かりやすい。

foreach文上にOnNext/OnError/OnCompletedメソッドを当てはめたイメージ

 foreach文での列挙がすべて完了(=ループを抜ける)したら、再度、foreach文のループに入ることはない。また、列挙の最中に例外が発生したら、再度、foreach文のループに入ることもないし、列挙完了とは違うループの抜け方をするため、列挙完了の部分に入ることもない。

 このような見方が可能なのは、連載の第1回で「IObservable<T>/IObserver<T>インターフェイスの成り立ち自体が、IEnumerable<T>/IEnumerator<T>インターフェイスを反転させて作られた」と説明したように、IObservable<T>オブジェクトはEnumerable<T>オブジェクトの性質をすべて守るようにできているし、また、そうでなければならないからである。

“Dispose”の必要性

 Subscribeメソッドの戻り値はIDisposableオブジェクトであるが、上記の例では変数に受けることもなく無視している。これは、RxにおいてIDisposableオブジェクトの意味が、従来使われている「解放する必要があるリソースを抱えている可能性がある」とは異なるからだ。

 Rxでは元ソースがイベントの場合はイベントの「デタッチ」、タイマーの場合は「中止」、非同期の場合は「キャンセル」の意味で使われる。いずれにせよ「終了」の役割を果たす(ただしこの場合、OnCompletedメソッドは実行されない)が、それらは全て、必ずしも呼び出さなければならないものではない。むしろ、呼び出さないケースの方が多いかもしれない。よって、Subscribeメソッドの戻り値は必要がなければ無視しても問題ないのである。

Observableオブジェクトの生成子

 Enumerableにおいて、シーケンス、つまりIEnumerableインターフェイスを実装するものは、配列やList、Dictionaryなど、至る所に存在する。片や、Rxにおけるシーケンス、IObservable<T>インターフェイスを実装したものは、通常ではどこにも存在しない。そこで、生成するためのメソッドが大量に用意されている。

 以下にObservableの生成子をまとめる。なお、この表はRxのStable(安定)版に用意されている生成子のみで、Experimental(実験)版のRxには、さらに追加されている。

メソッド名 機能
Create 任意のObservableを作成
Defer Subscribeされるまで、中のファクトリ・メソッド実行を遅延
Empty OnCompletedメソッドのみを実行
FromAsyncPattern Begin-Endパターンから作成
FromEvent *4 Actionデリゲートのイベントから作成
FromEventPattern EventHandlerデリゲートのイベントから作成
Generate for文を模した値の発行
Interval 一定時間ごとに値を発行
Never 何も発行しない
Range 指定範囲の整数を発行
Repeat 指定回数、同一の値を発行
Return 1つのみの値を発行
Start 指定スケジューラ*5上で即座に実行し、完了後に1つのみの値を返す
Throw OnErrorメソッドのみを実行
Timer 一定時間後に一度のみ、もしくは一定時間後に一定周期で値を発行
ToAsync 指定スケジューラ*5上で実行し、完了後に1つのみの値を返すObservableを内包したFuncデリゲートを返す
Using 完了後、指定したリソースをDisposeするファクトリ・メソッドを作成
Observableオブジェクト生成のためのObservableクラスの静的メソッド
*4 Windows Phone 7に同梱されているMicrosoft.Phone.Reactiveアセンブリと、Data Developer Centerで配布されているSystem.Reactiveアセンブリでは機能が異なる。その詳細は後で詳しく述べる。
*5 デフォルトではスレッドプール(ThreadPool)上で実行され、Thread.StartメソッドやTask.Factory.StartNewメソッドなどの、Rxでの代替となる。スケジューラに関しては次回以降の連載で詳しく述べる。

メソッド名 機能
ToObservable IEnumerable<T>オブジェクトをObservableに変換
IEnumerable<T>オブジェクトへの拡張メソッド

メソッド名 機能
ToObservable TaskオブジェクトをObservableに変換
Taskオブジェクトへの拡張メソッド
「System.Reactive.Threading.Tasks」の「using」/「Imports」が必要。

 これらを最初のソースとして、WhereメソッドやSelectメソッドなどおなじみのクエリ演算子や、Rxだけにある演算子を適用させていき、値を変形させ、最後にSubscribeメソッドで値を渡す、というのがRxの基本的な流れとなる。

 例えば下記のコードを実行した場合、その下の図のような流れとなる。

// 実行結果:4, 16
Observable.Range(1, 5)
  .Where(x => x % 2 == 0)
  .Select(x => x * x)
  .Subscribe(Console.WriteLine);
' 実行結果:4, 16
Observable.Range(1, 5).
  Where(Function(x) x Mod 2 = 0).
  Select(Function(x) x * x).
  Subscribe(AddressOf Console.WriteLine)
演算子を適用させたメソッド・チェーンの例(上:C#、下:VB)

メソッド・チェーンで流れる値のイメージ
「Range」の一番右の四角はOnCompletedメソッドの呼び出しを表す。

 この例ではWhereメソッドで2の倍数のみに値をフィルタリングし、Selectメソッドで値を二乗している。

 それでは次のページから、本稿の主題である「イベントをRxで扱う方法」を説明しよう。


 INDEX
  [連載]Reactive Extensions(Rx)入門
  第2回 イベント・プログラミングとRx
  1.基本的な記述方法/IObserver<T>オブジェクトの詳細/“Dispose”の必要性/Observableオブジェクトの生成子
    2.イベントとは何か? イベントをRxで扱うことの利点/FromEventメソッド&FromEventPatternメソッド
    3.合成のためのメソッド

インデックス・ページヘ  「連載:Reactive Extensions(Rx)入門」


Insider.NET フォーラム 新着記事
  • 第2回 簡潔なコーディングのために (2017/7/26)
     ラムダ式で記述できるメンバの増加、throw式、out変数、タプルなど、C# 7には以前よりもコードを簡潔に記述できるような機能が導入されている
  • 第1回 Visual Studio Codeデバッグの基礎知識 (2017/7/21)
     Node.jsプログラムをデバッグしながら、Visual Studio Codeに統合されているデバッグ機能の基本の「キ」をマスターしよう
  • 第1回 明瞭なコーディングのために (2017/7/19)
     C# 7で追加された新機能の中から、「数値リテラル構文の改善」と「ローカル関数」を紹介する。これらは分かりやすいコードを記述するのに使える
  • Presentation Translator (2017/7/18)
     Presentation TranslatorはPowerPoint用のアドイン。プレゼンテーション時の字幕の付加や、多言語での質疑応答、スライドの翻訳を行える
@ITメールマガジン 新着情報やスタッフのコラムがメールで届きます(無料)
- PR -

注目のテーマ

Insider.NET 記事ランキング

本日 月間
ソリューションFLASH