ループをParallelクラスで並列処理にするには?[C#/VB].NET TIPS

Parallelクラスを使って、複数の処理を並列に実行する方法を解説する。また、PLINQを使ったコード例や例外処理を行う上での注意点なども取り上げる。

» 2017年06月21日 05時00分 公開
[山本康彦BluewaterSoft/Microsoft MVP for Windows Development]
「.NET TIPS」のインデックス

連載目次

 何らかの処理を現在のスレッドとは別のスレッドで非同期に実行開始するには、Task.Runメソッドを使えばよい。では、複数の処理を並列に実行するにはどうしたらよいだろうか? Task.Runメソッドをそのまま使っても可能だが(後述)、.NET Framework 4.0で導入されたParallelクラス(System.Threading.Tasks名前空間)を使うと、簡単に実現できる。本稿では、Parallelクラスを使ってループを並列実行する方法を主に解説する。

 なお、本稿のサンプルコードにはParallelクラスより新しい内容も含んでいる。サンプルコードをそのまま試すには、Visual Studio 2015(またはそれ以降)が必要である。

Parallelクラスを使って並列処理を記述するには?

 ループを並列処理するには、ParallelクラスのForメソッドかForEachメソッドを使う(次のコード)。

using System.Threading.Tasks;

……省略……

// Forメソッド
Parallel.For(startIndex, endIndex, i => {
  // カウンター変数iを使ったループ内の処理
});

// ForEachメソッド
Parallel.ForEach(collection, item => {
  // コレクションの各要素itemを使ったループ内の処理
});

' Forメソッド
Parallel.For(startIndex, endIndex,
              Sub(i)
                ' カウンター変数iを使ったループ内の処理
              End Sub)

' ForEachメソッド
WriteLine("Parallel.ForEach()")
Parallel.ForEach(collection,
                  Sub(item)
                    ' コレクションの各要素itemを使ったループ内の処理
                  End Sub)

Parallelクラスを使ってループを並列化する(上:C#、下:VB)
Parallel.Forメソッドのカウンター変数「i」は「startIndex」から始まり「endIndex」の1つ手前までである。VBでは間違えやすいので注意してほしい。
なお、Parallel.ForEachメソッドを使う方法は、PLINQでも書ける(後述)。

 また、ループではない複数の処理を並列に実行するには、Invokeメソッドを使う(次のコード)。

using System.Threading.Tasks;

……省略……

Parallel.Invoke(
  () => /* 処理A */,
  () => /* 処理B */,
  () => /* 処理C */
  );

Parallel.Invoke(
  Sub() ……省略(処理A)……,
  Sub() ……省略(処理B)……,
  Sub() ……省略(処理C)……
  )

Parallel.Invokeで複数の処理を並列に実行する(上:C#、下:VB)

 ただし、Parallelクラスで並列化できるのは、それぞれの処理が独立している場合である。ループ内の処理がその前のループの処理結果に依存していたり、複数の処理の間に依存関係があったりすると並列化できない。

 なお、同時に使用するスレッド数や実行を開始する順序などは、実行時に自動的に決定される。また、例外をキャッチする方法は後述する。

実際の例

 ループを並列処理するコンソールアプリの例を示そう(次のコード)。比較のために通常のforループとParallel.Forメソッドを使っている。

using System;
using System.Linq;
using System.Threading.Tasks;
using static System.Console;

class Program
{
  // 並列処理させたいメソッド
  static int Work(int n)
  {
    string startTime = $"{DateTimeOffset.Now:ss.fff}";
    WriteLine($"n={n}, TaskID={Task.CurrentId}, start:{startTime}");

    int delayCount = (new Random()).Next(10000, 1000000);
    System.Threading.Thread.SpinWait(delayCount); // ランダムな時間を待機する

    if (n == 7) // この例外をキャッチする方法は後述する
      throw new ApplicationException($"{nameof(n)}={n}は計算できません");

    string endTime = $"{DateTimeOffset.Now:ss.fff}";
    WriteLine($"n={n}, TaskID={Task.CurrentId},   end:{endTime}");

    return n;
  }

  static void Main(string[] args)
  {
    WriteLine("通常のfor文");
    for (int n = 1; n < 6; n++)
    {
      Work(n);
    }
    // 出力例:
    // 通常のfor文
    // n=1, TaskID=, start:29.283
    // n=1, TaskID=,   end:29.334
    // n=2, TaskID=, start:29.334
    // n=2, TaskID=,   end:29.433
    // n=3, TaskID=, start:29.433
    // n=3, TaskID=,   end:29.519
    // n=4, TaskID=, start:29.522
    // n=4, TaskID=,   end:29.532
    // n=5, TaskID=, start:29.533
    // n=5, TaskID=,   end:29.594

    WriteLine("Parallel.For()");
    Parallel.For(1, 6, n => {
      Work(n);
    });
    WriteLine("Parallel.For()終了");
    // 出力例:
    // Parallel.For()
    // n=1, TaskID=1, start:29.637
    // n=2, TaskID=2, start:29.640
    // n=2, TaskID=2,   end:29.643
    // n=3, TaskID=2, start:29.644
    // n=1, TaskID=1,   end:29.647
    // n=4, TaskID=1, start:29.699
    // n=3, TaskID=2,   end:29.665
    // n=5, TaskID=3, start:29.700
    // n=5, TaskID=3,   end:29.799
    // n=4, TaskID=1,   end:29.799
    // Parallel.For()終了

#if DEBUG
    ReadKey();
#endif
  }
}

Imports System.Console

Module Module1
  ' 並列処理させたいメソッド
  Function Work(n As Integer) As Integer
    Dim startTime As String = $"{DateTimeOffset.Now:ss.fff}"
    WriteLine($"n={n}, TaskID={Task.CurrentId}, start:{startTime}")

    Dim delayCount As Integer = (New Random()).Next(10000, 1000000)
    System.Threading.Thread.SpinWait(delayCount) ' ランダムな時間を待機する

    If (n = 7) Then ' この例外をキャッチする方法は後述する
      Throw New ApplicationException($"{NameOf(n)}={n}は計算できません")
    End If

    Dim endTime As String = $"{DateTimeOffset.Now:ss.fff}"
    WriteLine($"n={n}, TaskID={Task.CurrentId},   end:{endTime}")

    Return n
  End Function

  Sub Main()

    WriteLine("通常のfor文")
    For n As Integer = 1 To 5
      Work(n)
    Next
    ' 出力例:
    ' 通常のfor文
    ' n=1, TaskID=, start:51.381
    ' n=1, TaskID=,   end:51.811
    ' n=2, TaskID=, start:51.816
    ' n=2, TaskID=,   end:51.887
    ' n=3, TaskID=, start:51.887
    ' n=3, TaskID=,   end:51.919
    ' n=4, TaskID=, start:51.920
    ' n=4, TaskID=,   end:51.993
    ' n=5, TaskID=, start:51.993
    ' n=5, TaskID=,   end:52.033

    WriteLine("Parallel.For()")
    Parallel.For(1, 6,
                 Sub(n)
                   Work(n)
                 End Sub)
    WriteLine("Parallel.For()終了")
    ' 出力例:
    ' Parallel.For()
    ' n=2, TaskID=1, start:52.037
    ' n=1, TaskID=3, start:52.038
    ' n=1, TaskID=3,   end:52.066
    ' n=3, TaskID=3, start:52.086
    ' n=2, TaskID=1,   end:52.052
    ' n=4, TaskID=1, start:52.088
    ' n=5, TaskID=2, start:52.089
    ' n=5, TaskID=2,   end:52.115
    ' n=3, TaskID=3,   end:52.131
    ' n=4, TaskID=1,   end:52.155
    ' Parallel.For()終了

#If DEBUG Then
    ReadKey()
#End If
  End Sub
End Module

Parallel.Forでループを並列化するコンソールアプリの例(上:C#、下:VB)
並列処理させたい「Work」メソッドは、その内部では非同期処理を行わない普通のメソッドである。そのため、通常のforループでは逐次実行されている。このときの「Work」メソッドはメインスレッドで動いているため、出力のTaskIDは空欄になっている。
Parallel.Forメソッドの方では、実行するたびにその出力結果が変わる。上に示した出力例を見ると、「start」と「end」がオーバーラップしていて同時に複数の処理が実行されていることが分かる。また、TaskIDが「1」「2」「3」の3種類あることから、この実行例では3つのスレッドが使われていると分かる。
また、「Parallel.For()終了」という表示は、ループ内の全ての処理が終わってから出力されている。TaskクラスのRunメソッドと異なり、Parallelクラスは全ての処理が終わるまで待機するのである(ループ内の処理でさらに別の非同期処理を起動すると、それは待機できない)。

 また、Parallel.ForEachメソッドの例を次のコードに示す。それと等価なPLINQのコードも載せてある。

WriteLine("Parallel.ForEach()");
Parallel.ForEach(Enumerable.Range(1, 5), n => {
  Work(n);
});
// 出力例:
// Parallel.ForEach()
// n=1, TaskID=9, start:47.834
// n=2, TaskID=10, start:47.836
// n=3, TaskID=11, start:47.836
// n=2, TaskID=10,   end:47.868
// n=3, TaskID=11,   end:47.872
// n=4, TaskID=12, start:47.849
// n=5, TaskID=13, start:47.851
// n=1, TaskID=9,   end:47.910
// n=5, TaskID=13,   end:48.031
// n=4, TaskID=12,   end:48.056

WriteLine("PLINQ:Parallel.ForEach()と同じ");
Enumerable.Range(1, 5).AsParallel().ForAll(n => {
  Work(n);
});
// 出力例:
// PLINQ:Parallel.ForEach()と同じ
// n=1, TaskID=21, start:48.080
// n=2, TaskID=19, start:48.080
// n=4, TaskID=20, start:48.088
// n=1, TaskID=21,   end:48.135
// n=5, TaskID=21, start:48.173
// n=3, TaskID=18, start:48.081
// n=3, TaskID=18,   end:48.231
// n=2, TaskID=19,   end:48.207
// n=4, TaskID=20,   end:48.193
// n=5, TaskID=21,   end:48.308

WriteLine("Parallel.ForEach()")
Parallel.ForEach(Enumerable.Range(1, 5),
                  Sub(n)
                    Work(n)
                  End Sub)
' 出力例:
' Parallel.ForEach()
' n=1, TaskID=7, start:05.898
' n=5, TaskID=11, start:05.904
' n=2, TaskID=8, start:05.898
' n=3, TaskID=10, start:05.898
' n=3, TaskID=10,   end:05.940
' n=4, TaskID=9, start:05.898
' n=5, TaskID=11,   end:05.958
' n=4, TaskID=9,   end:06.007
' n=1, TaskID=7,   end:06.022
' n=2, TaskID=8,   end:05.976

WriteLine("PLINQ:Parallel.ForEach()と同じ")
Enumerable.Range(1, 5).AsParallel().ForAll(
  Sub(n)
    Work(n)
  End Sub)
' 出力例:
' PLINQ:Parallel.ForEach()と同じ
' n=1, TaskID=17, start:06.069
' n=2, TaskID=14, start:06.091
' n=2, TaskID=14,   end:06.101
' n=5, TaskID=14, start:06.149
' n=4, TaskID=16, start:06.091
' n=1, TaskID=17,   end:06.134
' n=3, TaskID=15, start:06.091
' n=5, TaskID=14,   end:06.210
' n=4, TaskID=16,   end:06.227
' n=3, TaskID=15,   end:06.270

Parallel.ForEachでループを並列化する例(上:C#、下:VB)
ループ内で呼び出している「Work」メソッドは、前出のサンプルと同じである。
Parallel.ForEachメソッドを使ったコードは、Parallel LINQ(PLINQ)で書き換えることもできる。AsParallel拡張メソッド以降は並列実行される。今回のように何らかの処理を並列実行したいだけ(処理結果をコレクションにしない)ときは、ForAll拡張メソッドの中に処理を書けばよい。
なお、出力例のTaskIDが1番から始まっていないのは、前掲のサンプルと連続して実行しているからだ。

例外をキャッチするには?

 Parallelクラスを使ってループを並列実行しているときに発生した例外は、普通にループの中でキャッチしてもよいし、ループの外でまとめてキャッチしてもよい(次のコード)。

 ただし、ループの外でキャッチする場合は、発生した例外がAggregateException例外(System名前空間)にラップされて渡される。実際に発生した例外を得るには、AggregateException例外のInnerExceptionsプロパティを見る必要がある。

WriteLine("例外の捕まえ方-その1(ループ内)");
Parallel.For(6, 11, n => { // n=7のとき例外が出る
  try
  {
    Work(n);
  }
  catch (Exception ex)
  {
    WriteLine($"n={n}で例外:{ex.GetType().Name}({ex.Message})");
    WriteLine(ex.StackTrace);
  }
});
// 出力例:
// 例外の捕まえ方-その1(ループ内)
// n=6, TaskID=17, start:01.816
// n=7, TaskID=18, start:01.828
// n=8, TaskID=19, start:01.828
// n=9, TaskID=20, start:01.832
// n=6, TaskID=17,   end:01.890
// n=9, TaskID=20,   end:01.905
// n=10, TaskID=21, start:01.852
// n=8, TaskID=19,   end:01.923
// n=7で例外:ApplicationException(n=7は計算できません)
// n=10, TaskID=21,   end:01.995
//    場所 Program.Work(Int32 n) 場所 C:\……省略……\Program.cs:行 17
//    場所 Program.<>c.<Main>b__1_3(Int32 n) 場所……省略……\Program.cs:行 112

WriteLine("例外の捕まえ方-その2(ループ外)");
try
{
  Parallel.For(6, 11, n => { // n=7のとき例外が出る
    Work(n);
  });
}
catch (AggregateException aex)
{
  foreach (var ex in aex.InnerExceptions)
  {
    WriteLine($"ループ内で例外:{ex.GetType().Name}({ex.Message})");
    WriteLine(ex.StackTrace);
  }
}
// 出力例:
// 例外の捕まえ方-その2(ループ外)
// n=6, TaskID=26, start:32.594
// n=7, TaskID=27, start:32.599
// n=8, TaskID=28, start:32.618
// n=10, TaskID=30, start:32.646
// n=9, TaskID=29, start:32.627
// n=10, TaskID=30,   end:32.712
// n=6, TaskID=26,   end:32.630
// n=9, TaskID=29,   end:32.754
// n=8, TaskID=28,   end:32.671
// ループ内で例外:ApplicationException(n=7は計算できません)
//    場所 Program.Work(Int32 n) 場所 C:\……省略……\Program.cs:行 17
//    場所 Program.<>c.<Main>b__1_4(Int32 n) 場所……省略……\Program.cs:行 130
//    場所 System.Threading.Tasks.Parallel.<>……省略…….<ForWorker>b__1()
//    場所 System.Threading.Tasks.Task.InnerInvoke()
//    場所 System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
//    場所 System.Threading.Tasks.Task.<>……省略…….<ExecuteSelfReplicating>b__0(Object )

WriteLine("例外の捕まえ方-その1(ループ内)")
Parallel.For(6, 11,
             Sub(n) ' n = 7のとき例外が出る
               Try
                 Work(n)
               Catch ex As Exception
                 WriteLine($"n={n}で例外:{ex.GetType().Name}({ex.Message})")
                 WriteLine(ex.StackTrace)
               End Try
             End Sub)
' 出力例:
' 例外の捕まえ方-その1(ループ内)
' n=6, TaskID=18, start:09.326
' n=7, TaskID=17, start:09.326
' n=10, TaskID=21, start:09.362
' n=6, TaskID=18,   end:09.348
' n=9, TaskID=20, start:09.341
' n=8, TaskID=19, start:09.335
' n=10, TaskID=21,   end:09.433
' n=7で例外:ApplicationException(n=7は計算できません)
'    場所 dotNetTips1195VB.Module1.Work(Int32 n) 場所……省略……\Module1.vb:行 14
'    場所 ……省略…….Module1._Closure$……省略…… 場所……省略……\Module1.vb:行 109
' n=9, TaskID=20,   end:09.488
' n=8, TaskID=19,   end:09.564

WriteLine("例外の捕まえ方-その2(ループ外)")
Try
  Parallel.For(6, 11,
               Sub(n) ' n = 7のとき例外が出る
                 Work(n)
               End Sub)
Catch aex As AggregateException
  For Each ex In aex.InnerExceptions
    WriteLine($"ループ内で例外:{ex.GetType().Name}({ex.Message})")
    WriteLine(ex.StackTrace)
  Next
End Try
' 出力例:
' 例外の捕まえ方-その2(ループ外)
' n=6, TaskID=26, start:09.615
' n=7, TaskID=27, start:09.616
' n=9, TaskID=29, start:09.633
' n=10, TaskID=30, start:09.642
' n=6, TaskID=26,   end:09.625
' n=10, TaskID=30,   end:09.737
' n=9, TaskID=29,   end:09.792
' n=8, TaskID=28, start:09.624
' n=8, TaskID=28,   end:09.869
' ループ内で例外:ApplicationException(n=7は計算できません)
'    場所 ……省略……Module1.Work(Int32 n) 場所……省略……\Module1.vb:行 14
'    場所 ……省略……Module1._Closure$……省略…… 場所……省略……\Module1.vb:行 121
'    場所 System.Threading.Tasks.Parallel.<>……省略…….<ForWorker>b__1()
'    場所 System.Threading.Tasks.Task.InnerInvoke()
'    場所 System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
'    場所 ……省略…….Task.<……省略…….<ExecuteSelfReplicating>b__0(Object )

ループを並列実行しているときの例外をキャッチする例(上:C#、下:VB)
Parallel.Forメソッドでの例を示す。Parallel.ForEachメソッドでも同様である。
C#の行17/VBの行14が、「Work」メソッド内で例外を投げている行である。
ループ内、あるいは、呼び出しているメソッド内でのキャッチの仕方は、普通と同じである。ループ外でキャッチする場合は、このコードの後半に示したようにAggregateException例外をキャッチして、そのInnerExceptionsプロパティに格納されている例外を見る(ループ外でキャッチする場合、Visual Studio IDEからデバッグ実行すると、未処理の例外があったものとして、一度デバッグ実行が中断されることがある。デバッグを続行すれば、上の出力例のようにキャッチされる)。
なお、ループ内でキャッチした場合は、そのcatch句も並列実行される。上のC#の出力例を見てもらうと、n=7のときのcatch句での出力をしている途中でn=10の終了が出力されているので、並列実行されていると分かる(VBでは、n=7のときのcatch句が終わってから、n=9とn=8の終了が出力されている)。メインスレッドに戻ってからcatch句を実行したいときは、ループ外でAggregateException例外をキャッチすればよい。

参考:Parallelクラスを使わずに並列実行する

 Parallelクラスを使わなくても、Task.Runメソッドで並列実行できる。その場合は、全ての処理が完了するまで待つ処理も自前で書かねばならない。

 「例外の捕まえ方-その2」をTask.Runメソッドで書き直した例を次のコードに示す(C#のみ)。比べてみれば、Parallelクラスで簡単に書けるようになったことが分かるだろう。

WriteLine("Task.Runで並列実行する例");
var taskList = new List<Task>(); // 複数の非同期処理を管理するためのコレクション
for (int n = 6; n < 11; n++)
{
  int i = n; // ループ変数はキャッシュしてから非同期処理に渡さねばならない
  taskList.Add(Task.Run(() => Work(i)));
}
WriteLine("並列実行中…");
try
{
  Task.WaitAll(taskList.ToArray()); // 全ての処理が終わるまで待機する
}
catch (AggregateException aex)
{
  foreach (var ex in aex.InnerExceptions)
  {
    WriteLine($"ループ内で例外:{ex.GetType().Name}({ex.Message})");
    WriteLine(ex.StackTrace);
  }
}
WriteLine($"end of Main:{DateTimeOffset.Now:ss.fff}");
// 出力例:
// Task.Runで並列実行する例
// 並列実行中…
// n=7, TaskID=2, start:33.590
// n=8, TaskID=3, start:33.596
// n=9, TaskID=4, start:33.598
// n=6, TaskID=1, start:33.603
// n=8, TaskID=3,   end:33.635
// n=6, TaskID=1,   end:33.662
// n=10, TaskID=5, start:33.678
// n=9, TaskID=4,   end:33.628
// n=10, TaskID=5,   end:33.692
// ループ内で例外:ApplicationException(n=7は計算できません)
//    場所 Program.Work(Int32 n) 場所 ……省略……\Program.cs:行 17
//    場所 Program.……省略…….<Main>b__0() 場所 ……省略……\Program.cs: 行 32
//    場所 System.Threading.Tasks.Task`1.InnerInvoke()
//    場所 System.Threading.Tasks.Task.Execute()
// end of Main:33.763

Parallelクラスを使わずに並列実行を記述する例(C#)
Task.Runメソッドを繰り返し呼び出せば、並列実行を開始できる。
ただし、全ての処理の完了を待機するために、Task.Runメソッドの返値(Sytem.Threading.Tasks名前空間のTaskオブジェクト)をコレクションに保持しておき、Task.WaitAllメソッドを使う。
また、ループ変数はローカル変数にキャッシュしてから非同期処理に渡さねばならない。そうしないと、非同期処理を実際に開始する時点のループ変数の値が使われてしまう。出力例では「並列実行中…」と出力されてからWorkメソッドの処理が始まっている。つまり、ループを抜けてから非同期処理が開始されているのである。その時点のループ変数の値とは、このコード例では11である。

まとめ

 干渉しないループは、Parallel.For/ForEachメソッドで簡単に並列処理できる。例外をループの外でキャッチするときはAggregateException例外になることだけは覚えておこう。

 なお、本稿では扱わなかったが、ループを途中で打ち切ることもできる。詳しくはMSDN「方法: Parallel.For ループを停止または中断する」を参照していただきたい。

「.NET TIPS」のインデックス

.NET TIPS

Copyright© Digital Advantage Corp. All Rights Reserved.

RSSについて

アイティメディアIDについて

メールマガジン登録

@ITのメールマガジンは、 もちろん、すべて無料です。ぜひメールマガジンをご購読ください。