連載:Reactive Extensions(Rx)入門

第1回 Reactive Extensionsの概要と利用方法

河合 宜文
2011/11/01

Page1 Page2

 C# 3.0より導入され、プログラミング・スタイルに大きな変革をもたらしたLINQはご存じだろうか。詳しくは「特集:Road to LINQ」で紹介されているが、データ加工における処理の手間を大きく解消するものである。LINQの特徴として、各種のデータソース(=配列やXMLデータ、SQLデータベースなど)に対して統一的な記法を提供することが挙げられる。

 Reactive Extensions(以降、Rx)は、LINQが適用できるデータソースの概念を「非同期」と「イベント」に広げた、いわば「LINQ to Asynchronous」「LINQ to Events」とでも言うべきものである。それにより、従来では手間のかかった複雑な非同期処理やイベント処理、時間が関係する処理などを、LINQの形で、簡単に、宣言的に記述できるようになる。

 Rxが利用できるプラットフォームは、

  • .NET Framework 3.5
  • .NET Framework 4.0 Client Profile
  • Silverlight 3
  • Silverlight 4
  • Windows Phone 7
  • XNA 4.0

と多岐に渡る。特に、通信に非同期APIしか提供されていないSilverlightやWindows Phone 7で大きな力を発揮するだろう。また、JavaScript向けには「RxJS」というライブラリが別途、用意されており、言語を超えた仕組みとして、JavaScript上でも.NET Framework用と同じように、Rxの非同期やイベント処理に対する強力な機能を使うことが可能だ。

Reactive Extensions(Rx)の歴史

 Rxは、2009年11月18日に、マイクロソフトの開発部門およびMicrosoft Researchの実験的なプロジェクトを公開するポータル・サイト「Microsoft DevLabs」にてページが開設され、その後、1年半に渡り細かく試験的なリリースが続けられてきた。DevLabsの実験的なプロジェクトは、正式な製品に昇格するものもあれば、最終的には打ち切られてしまうものもある*1。Rxは、2011年1月21日に、正式な製品として認められ、以降、プロジェクトのページは*2Data Developer Centerへと移行した。また、正式プロジェクト昇格以前には、Rxでの主要なインターフェイスであるIObservable<T>とIObserver<T>が.NET Framework 4/Silverlight 4にすでに入っているほか、Windows Phone 7 SDKにMicrosoft.Phone.Reactive名前空間として標準でライブラリが搭載されていた。

 インターフェイスがすでに.NETのクラス・ライブラリに搭載されていること、Windows Phone 7に標準搭載されていることなどから、「.NET Framework 4.5に搭載されるのでは?!」という期待も持たれているが、現在、Developer Previewとして公開されているVisual Studio 11に同梱されている.NET Framework 4.5にはRxは入っていない。とはいえ、DevLabsでの実験的プロジェクトではなく、正式なプロジェクトであるため、開発やサポートが打ち切られることはないので、安心して利用していい。

*1 打ち切られたものではSTM.NETなど、正式な製品となったものにはSmall Basicなどが挙げられる。
*2 Data Developer Centerは、言語をja-jpでアクセスすると「データ アクセス デベロッパー センター」として、en-usとはまったく異なる日本語用のページが表示されるが、ja-jpのページでは、現在のところ、Rxに関するページにはアクセスできない。

Reactive Extensions(Rx)が可能にすること

 Rxの扱える内容は多岐に渡る。最初に述べたように非同期とイベントが最たるものだが、ほかにも時間や、LINQ to Objectsで扱う通常のコレクション(=IEnumerable<T>インターフェイスを実装したオブジェクト)も扱うことが可能だ。

 なぜそのようなことが可能なのかというと、例えばMouseClickイベントを思い浮かべてほしい。マウスをクリックする、その1秒後にマウスをクリックする、その3秒後にマウスをクリックする。クリックのたびにMouseEventArgsオブジェクトによって、クリックされた座標などの値が発行される。これは時間という数直線の上に値が乗っかっている、と見ることができる。

 では時間そのもの、例えばTimerイベントはどうだろうか。これは、指定された時間間隔で、イベントが発行されていると見ることができる。

 非同期処理はどうだろうか? この場合は、ある時間によって処理の開始が支持され、処理が終了したときに値が発行される、と見ることができる。

 配列はどうだろう? その場合でも、一瞬のうち、例えば0.0000001秒間隔で値が発行されると考えることが可能なのだ。

 これらを図にまとめると、次のようになる。

イベント/非同期/配列が同じ時間軸に乗るイメージ図

 Rxはさまざまな値を、時間軸に乗る非同期なストリームと考えることにより、そこに乗せられるすべての値を等しくLINQ演算子によって射影・抽出・合成することを可能にしたのである。

LINQであるということ

 Rxの根本を支えるインターフェイスはIObservable<T>だ。これは、従来、LINQで使われているインターフェイスのIEnumerable<T>ではない。しかし、従来のLINQと同じ感覚で、同じメソッドでコードを書くことができるし、クエリ構文を使うことも可能だ(次のコードを参照)。

using System.Reactive.Linq;

// LINQ to Objectsでのクエリの書き方
var ix = from x in Enumerable.Range(1, 10)
         where x % 2 == 0
         select x * x;

// Rxでのクエリの書き方
var rx = from x in Observable.Range(1, 10)
         where x % 2 == 0
         select x * x;
Imports System.Reactive.Linq

' LINQ to Objectsでのクエリの書き方
Dim ix = From x In Enumerable.Range(1, 10)
         Where x Mod 2 = 0
         Select x * x

' Rxでのクエリの書き方
Dim rx = From x In Observable.Range(1, 10)
         Where x Mod 2 = 0
         Select x * x
LINQ to ObjectsとReactive Extensionsで同じ出力結果が得られるクエリ構文の例(上:C#、下:VB)

 より多くのことを扱うため、Rxだけに用意されたメソッドも数多くある。従って、必ずしもすべて同じとはいかないが、従来、使われているメソッドと同じ名前・同じ意味を持つものが多数あることにより、新たな学習コストが抑えられている。

 まったく異なるインターフェイスであるのに、同じようにLINQの演算子を用いることができている。このことを可能にしたのは、IObservable<T>/IObserver<T>インターフェイスの成り立ち自体が、IEnumerable<T>/IEnumerator<T>インターフェイスを反転させて作られたものだからだ。

 以下にIEnumerator<T>インターフェイスを反転させるとIObserver<T>インターフェイスが現れることの証明を記す(C#のみ。Visual Basicは割愛)。

// 単純化したIEnumerator<T>インターフェイス
public interface IEnumerator<T>
{
  T Current { get; }
  bool MoveNext();
  // void Reset(); Resetは現在、通常使われていない
}
// MoveNextメソッドのbool値を分解し、Currentプロパティを集約する
public interface IEnumerator<T>
{
  // MoveNextメソッドが成功したときはT型オブジェクトを返す
  // 失敗したときはもう何も返さない、つまりvoidになる
  // また、例外が発生することもあるので、戻り値は3種類
  T|void|Exception GetNext(void);
}
// 引数と戻り値を反転させる
// Pull型として次の値をGetする、から
// Push型として次の値をGotする、へ
public interface IEnumeratorDual<T>
{
  void GotNext(T|void|Exception);
}
// 3種類の引数を分ける
public interface IEnumeratorDual<T>
{
  void GotNext(T);
  void GotVoid(void); // voidを得るというのは完了したということ
  void GotException(Exception);
}
// それはIObserver<T>インターフェイスとなる
public interface IObserver<T>
{
  void OnNext(T value);
  void OnCompleted();
  void OnError(Exception error);
}
IEnumerator<T>インターフェイスからIObserver<T>インターフェイスを導出する例(C#)

 ここでは記さないが、IEnumerable<T>インターフェイスに関しても、同様の操作を行うことでIObservable<T>インターフェイスが現れることを確認可能だ。以上のことは、IEnumerable<T>インターフェイスとIObservable<T>インターフェイスを、情報の欠落なく、相互に変換することを可能にしている(実際、IEnumerable<T>インターフェイスにToObservable拡張メソッド、IObservable<T>インターフェイスにToEnumerable拡張メソッドがRxを参照することで追加される)。

 なお、次の図に示すとおり、IEnumerable<T>はPull型のインターフェイスであり、IObservable<T>はPush型のインターフェイスである。

Pull型とPush型の比較
図中の「Subscribe」(=サブスクライブ)とは、Push通知のオブザーバ(=IObserver<T>オブジェクト)を登録すること。

 Push型の場合、プログラムは最初にIObserver<T>オブジェクトをIObservable<T>オブジェクトに登録し(=Subscribe)、以降は両者間で値が伝達される。Pull型の場合は、プログラムはIEnumerator<T>オブジェクトを取得し、以降はそれを介して能動的に値を取り出す。

 それらに関する説明は、「特集:Road to LINQのpull型とpush型の説明」を参照してほしい。

イベントに適用する例

 イベントはさまざまな形で現れる。GUIのマウス・イベントなどもそうだし、Windows Phone 7で考えるのならジェスチャやセンサーからのイベントも身近だろう。INotifyPropertyChangedインターフェイスによるオブジェクト自身の通知やデータバインドなどもそうだ。

 Rxはイベントの合成を可能にする。例えばマウス・カーソルのダウン/ムーブ/アップなどのイベントを合成して、1つのドラッグ&ドロップ・イベントを生成する、などということが可能になる。次のコードは、その例である。

using System.Reactive.Linq;

// WindowsフォームのFormクラスでドラッグ(=マウスの左ボタンを
// 押しながら移動している間)の座標イベントを生成
var drag = from down in this.MouseDownAsObservable()
           from move in this.MouseMoveAsObservable().TakeUntil(this.MouseUpAsObservable())
           select new { move.X, move.Y };

// イベントからIObservable<T>オブジェクトへの変換コードを分離
// なお、ここでは簡略化のため文字列によるイベント名の指定を行った
// が、そうでない形式でも可能だ
// このFromEventPatternメソッドに関しては、詳しくは以降の連載で解説
public static class FormExtensions
{
  public static IObservable<MouseEventArgs> MouseMoveAsObservable(this Form form)
  {
    return Observable.FromEventPattern<MouseEventArgs>(form, "MouseMove").Select(e => e.EventArgs);
  }

  public static IObservable<MouseEventArgs> MouseDownAsObservable(this Form form)
  {
    return Observable.FromEventPattern<MouseEventArgs>(form, "MouseDown").Select(e => e.EventArgs);
  }

  public static IObservable<MouseEventArgs> MouseUpAsObservable(this Form form)
  {
    return Observable.FromEventPattern<MouseEventArgs>(form, "MouseUp").Select(e => e.EventArgs);
  }
}
Imports System.Reactive.Linq

' WindowsフォームのFormクラスでドラッグ(=マウスの左ボタンを
' 押しながら移動している間)の座標イベントを生成
Dim drag = From down In Me.MouseDownAsObservable()
           From move In Me.MouseMoveAsObservable().TakeUntil(Me.MouseUpAsObservable())
           Select New With {move.X, move.Y}

' イベントからIObservable<T>オブジェクトへの変換コードを分離
' なお、ここでは簡略化のため文字列によるイベント名の指定を行った
' が、そうでない形式でも可能だ
' このFromEventPatternに関しては、詳しくは以降の連載で解説
Public Module FormExtensions

  <System.Runtime.CompilerServices.Extension()>
  Public Function MouseMoveAsObservable(ByVal form As Form) As IObservable(Of MouseEventArgs)
    Return Observable.FromEventPattern(Of MouseEventArgs)(form, "MouseMove").Select(Function(e) e.EventArgs)
  End Function

  <System.Runtime.CompilerServices.Extension()>
  Public Function MouseDownAsObservable(ByVal form As Form) As IObservable(Of MouseEventArgs)
    Return Observable.FromEventPattern(Of MouseEventArgs)(form, "MouseDown").Select(Function(e) e.EventArgs)
  End Function

  <System.Runtime.CompilerServices.Extension()>
  Public Function MouseUpAsObservable(ByVal form As Form) As IObservable(Of MouseEventArgs)
    Return Observable.FromEventPattern(Of MouseEventArgs)(form, "MouseUp").Select(Function(e) e.EventArgs)
  End Function

End Module
3つのイベントを合成して1つのドラッグイベントを生成する(上:C#、下:VB)

 また、時間を使った値のフィルタリングを可能にする。次のコードはWindowsフォームでの例だ。

// (Windowsフォーム上の)TextBoxコントロールである「textBox」から
// TextChangedイベントを取得
Observable.FromEventPattern<EventArgs>(textBox, "TextChanged")
  .Select(_ => textBox.Text)
  .Throttle(TimeSpan.FromSeconds(1)); // 1秒間待機して別の値が
                                      // 来なければ流す
' (Windowsフォーム上の)TextBoxコントロールである「textBox」から
' TextChangedイベントを取得
Observable.FromEventPattern(Of EventArgs)(textBox, "TextChanged").
    Select(Function(sndr, ev) textBox.Text).
    Throttle(TimeSpan.FromSeconds(1)) ' 1秒間待機して別の値が
                                      ' 来なければ流す
時間を使ったフィルタリング(上:C#、下:VB)

 インクリメンタル・サーチなど、入力にリアルタイムに応じてネットワーク通信をしようとする場合などに、すべての値で通信をしようとするのは無駄が多いだろう。その場合、値を適度に間引きたいはずだ。上記の例はテキストボックスの入力に変化があった場合、1秒間の間にTextChangedイベントが発生して別の値が来たら前の値を無視するようなフィルタリングをする。そうすることにより、無駄なネットワーク通信を抑えられる。

 このような時間を使った操作は、従来ではかなりの手間をかけなければ記述できなかったことだが、Rxならばそういった操作は非常に得意とするところで、メソッド1つ適用するだけで済んでしまう。

非同期処理に適用する例

 Rxは非同期処理において大きな力を発揮する。例えば(WPFで)WebRequestクラス(System.Net名前空間)で非同期通信を行う場合を考えてみよう。Begin-Endで行う非同期処理はラムダ式がネストしがちだ。その場合、複数のリクエストを扱うなど、複雑な処理をするのが難しく、また、例外処理の範囲に注意を払う必要があり、ネストしたラムダ式それぞれに独立してtry-catch文を書く必要がある。UIスレッドへのBeginInvoke(=非同期での処理実行の開始依頼)もかなりの手間だ。

using System.Net;
using System.IO;

var req = WebRequest.Create("http://hoge/");
req.BeginGetResponse(ar =>
{
  try
  {
    var res = req.EndGetResponse(ar);
    var url = new StreamReader(res.GetResponseStream()).ReadToEnd();
    var req2 = WebRequest.Create(url); // 最初のリクエストの結果を元に次のリクエスト
    req2.BeginGetResponse(ar2 =>
    {
      try
      {
        var res2 = req2.EndGetResponse(ar2);
        var str = new StreamReader(res2.GetResponseStream()).ReadToEnd();
        Dispatcher.BeginInvoke(new Action(() => MessageBox.Show(str)));
      }
      catch (WebException e) { Dispatcher.BeginInvoke(new Action(() => MessageBox.Show(e.ToString()))); }
    }, null);
  }
  catch (WebException e)
  {
    Dispatcher.BeginInvoke(new Action(() => MessageBox.Show(e.ToString())));
  }
}, null);
Imports System.Net
Imports System.IO

Dim req = WebRequest.Create("http://hoge/")
req.BeginGetResponse(
  Sub(ar)
    Try
      Dim res = req.EndGetResponse(ar)
      Dim url = New StreamReader(res.GetResponseStream()).ReadToEnd()
      Dim req2 = WebRequest.Create(url) ' 最初のリクエストの結果を元に次のリクエスト
      req2.BeginGetResponse(
        Sub(ar2)
          Try
            Dim res2 = req2.EndGetResponse(ar2)
            Dim str = New StreamReader(res2.GetResponseStream()).ReadToEnd()
            Dispatcher.BeginInvoke(New Action(Sub() MessageBox.Show(str)))
          Catch e As WebException
            Dispatcher.BeginInvoke(New Action(Sub() MessageBox.Show(e.ToString())))
          End Try
        End Sub, Nothing)
    Catch e As WebException
      Dispatcher.BeginInvoke(New Action(Sub() MessageBox.Show(e.ToString())))
    End Try
  End Sub, Nothing)
通常の非同期処理コード(上:C#、下:VB)

 Rxを使うと、同様のコードは以下のようになる。

WebRequest.Create("http://hoge/")
  .DownloadStringAsync()
  .SelectMany(url => WebRequest.Create(url).DownloadStringAsync())
  .ObserveOnDispatcher() // UIスレッドへ戻す
  .Subscribe(
    str => MessageBox.Show(str), // 成功時の挙動
    e => MessageBox.Show(e.ToString())); // 失敗時の挙動

public static class WebRequestExtensions
{
  // Rxを使うと、拡張メソッドの形で処理を分離することができるので、本体をよりスッキリ書ける
  public static IObservable<string> DownloadStringAsync(this WebRequest request)
  {
    return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
      .Select(res =>
      {
        using (var stream = res.GetResponseStream())
        using (var sr = new StreamReader(stream))
        {
          return sr.ReadToEnd();
        }
      });
  }
}
WebRequest.Create("http:'hoge/").
  DownloadStringAsync().
  SelectMany(Function(url As String) WebRequest.Create(url).DownloadStringAsync()).
  ObserveOnDispatcher().
  Subscribe(
    Sub(str)
      MessageBox.Show(str) ' 成功時の挙動
    End Sub,
    Sub(e)
      MessageBox.Show(e.ToString()) ' 失敗時の挙動
    End Sub)

Public Module WebRequestExtensions

  ' Rxを使うと、拡張メソッドの形で処理を分離することができるので、本体をよりスッキリ書ける
  <System.Runtime.CompilerServices.Extension()>
  Public Function DownloadStringAsync(ByVal request As WebRequest) As IObservable(Of String)
    Return Observable.FromAsyncPattern(Of WebResponse)(AddressOf request.BeginGetResponse, AddressOf request.EndGetResponse)().
      Select(
        Function(res)
          Using stream = res.GetResponseStream()
            Using sr = New StreamReader(stream)
              Return sr.ReadToEnd()
            End Using
          End Using
          Return Nothing
        End Function)
  End Function

End Module
Rxを用いた非同期処理コード(上:C#、下:VB)

 Rxを使うことでラムダ式のネストはメソッド・チェーンとなり、また、例外処理やUIスレッドへ戻すことが一箇所に集約されることで、大幅に記述量が減少する。

 拡張メソッドにより一部の処理を外に出しているので、その分だけアンフェアに感じるかもしれないが、このように簡単に小さな部品として分離することが可能だということも、Rxの大きなメリットの1つだ。

 ところで、「特集:.NET開発者のための非同期入門」で紹介されたように、C# 5.0から非同期処理に関する構文が追加される。では、Rxを使って非同期処理を書くのは待った方がいいのだろうか?

 答えは違う。まず、Rxは今すぐ使える技術だ。そして、RxをC# 5.0のasync構文で使うためのTask<T>オブジェクトに変換することもまた容易なので、今書いた非同期処理は決して無駄にならない。

 最後に、RxとC# 5.0のasync構文は、ある程度、領域はかぶるが、全く別の価値を提供する面も少なくないので、決してRx自体の価値は損なわれない。

 このことについて詳しくは、次回以降の非同期処理について説明するときに解説する。

 続いて次のページで、Rxのインストール方法やアセンブリの種類について説明する。


 INDEX
  連載:Reactive Extensions(Rx)入門
  第1回 Reactive Extensionsの概要と利用方法
  1.Rxの歴史/LINQであるということ/イベントや非同期処理に適用する例
    2.Rxのインストール方法/アセンブリの種類/名前空間の分類

インデックス・ページヘ  「連載:Reactive Extensions(Rx)入門」

@IT Special

- PR -

TechTargetジャパン

Insider.NET フォーラム 新着記事
@ITメールマガジン 新着情報やスタッフのコラムがメールで届きます(無料)
- PR -

イベントカレンダー

PickUpイベント

- PR -

アクセスランキング

もっと見る

ホワイトペーパーTechTargetジャパン

注目のテーマ

Insider.NET 記事ランキング

本日 月間
ソリューションFLASH