連載
» 2018年01月31日 05時00分 公開

.NET TIPS:キューを利用するには?[C#/VB]

.NET Frameworkが提供するQueue<T>クラスの基本的な使い方と、マルチスレッドで同期を取りながらQueue<T>クラスを利用する方法を解説する。

[山本康彦,BluewaterSoft/Microsoft MVP for Windows Development]
「.NET TIPS」のインデックス

連載目次

 キュー(待ち行列)とは入れたものが入れた順に出てくるコレクションのことだ。「先入れ先出し」(FIFO、"First In, First Out")のコレクションともいう。キューを実現するには、配列やList<T>コレクション(System.Collections.Generic名前空間)などを使って実装することも可能だが、.NET Framework 2.0以降ではQueue<T>コレクション(System.Collections.Generic名前空間)を利用するとよい。

 本稿では、Queue<T>コレクションの基本的な使い方とマルチスレッドに対応する方法を解説する。

POINT Queue<T>コレクションの基本的な使い方

Queue<T>コレクションの基本的な使い方まとめ Queue<T>コレクションの基本的な使い方まとめ


 特定のトピックをすぐに知りたいという方は以下のリンクを活用してほしい。

 なお、本稿に掲載したサンプルコードをそのまま試すにはVisual Studio 2015以降が必要である。サンプルコードはコンソールアプリの一部であり、コードの冒頭に以下の宣言が必要となる。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using static System.Console;

Imports System.Console

本稿のサンプルコードに必要な宣言(上:C#、下:VB)

Queueの基本的な使い方とは?

 キューに対する基本的な操作は、「キューにデータを入れる」(enqueue)/「キューからデータを取り出す」(dequeue)/「次のデータを見る」(peek)の3つだ。Queue<T>コレクションは、それらを次の3つのメソッドとして実装している。

  • Enqueueメソッド:キューの末尾にデータを追加する
  • Dequeueメソッド:キューの先頭からデータを取り出す
  • Peekメソッド:キューの先頭のデータを読み取る(取り出さない)

 Enqueueメソッドでデータをキューの末尾に追加するとき、キューに空きがなければ自動的にキューのサイズが拡張される。Dequeueメソッドで先頭のデータを取り出すと、残りのデータは先頭に向かって1つずつ移動する。

 この他に、Queue<T>コレクションにはキューに入っているデータの数を返すCountプロパティもある。キューにデータが入っていないときにDequeue/Peekメソッドを呼び出すと例外が出るので、呼び出す前にCountプロパティをチェックすべきである。また、Queue<T>コレクションはIEnumerable<T>インタフェース(System.Collections.Generic名前空間)を実装しているので、LINQ拡張メソッドも利用できる。

 Queue<T>コレクションの基本的な使い方の例を示す(次のコード)。

static void Main(string[] args)
{
  // 整数を格納するキュー
  var que = new Queue<int>();

  // キューに1から3まで順に投入
  for (int i = 1; i < 4; i++)
    que.Enqueue(i);

  // キューから全てを取り出す
  while (que.Count > 0)
  {
    WriteLine($"キューの先頭を見る:{que.Peek()}");
    WriteLine($"キューに入っている個数:{que.Count}");

    int n = que.Dequeue();
    WriteLine($"キューから取り出し:{n}");
    WriteLine($"キューに入っている個数:{que.Count}");
  }
  // 出力:
  // キューの先頭を見る:1
  // キューに入っている個数:3
  // キューから取り出し:1
  // キューに入っている個数:2
  // キューの先頭を見る:2
  // キューに入っている個数:2
  // キューから取り出し:2
  // キューに入っている個数:1
  // キューの先頭を見る:3
  // キューに入っている個数:1
  // キューから取り出し:3
  // キューに入っている個数:0

#if DEBUG
  ReadKey();
#endif
}

Sub Main()
  ' 整数を格納するキュー
  Dim que = New Queue(Of Integer)()

  ' キューに1から3まで順に投入
  For i As Integer = 1 To 3
    que.Enqueue(i)
  Next

  ' キューから全てを取り出す
  While (que.Count > 0)
    WriteLine($"キューの先頭を見る:{que.Peek()}")
    WriteLine($"キューに入っている個数:{que.Count}")

    Dim n As Integer = que.Dequeue()
    WriteLine($"キューから取り出し:{n}")
    WriteLine($"キューに入っている個数:{que.Count}")
  End While
  ' 出力:
  ' キューの先頭を見る:1
  ' キューに入っている個数:3
  ' キューから取り出し:1
  ' キューに入っている個数:2
  ' キューの先頭を見る:2
  ' キューに入っている個数:2
  ' キューから取り出し:2
  ' キューに入っている個数:1
  ' キューの先頭を見る:3
  ' キューに入っている個数:1
  ' キューから取り出し:3
  ' キューに入っている個数:0

#If DEBUG Then
  ReadKey()
#End If
End Sub

Queue<T>コレクションの基本的な使い方の例(上:C#、下:VB)
Enqueueメソッドを使って整数の1/2/3を順にキューに投入している。その後、Dequeueメソッドを使ってキューから1つずつ取り出すと、投入したときと同じ順で1/2/3が出てくる。なお、Peekメソッドは、得られる値はDequeueメソッドと同じだが、Dequeueメソッドとは違ってそのデータはキューに残ったままになる(キューに入っている個数が変化しない)。

 Queue<T>コレクションのIEnumerable<T>インタフェースとLINQ拡張メソッドを利用する例を示す(次のコード)。Queue<T>コレクションには最後の要素を読み取るメソッドは用意されていないが、LINQのLast拡張メソッドを使えばよいのである。

// 整数を格納するキュー
var que = new Queue<int>();
// キューに1から3まで順に投入
for (int i = 1; i < 4; i++)
  que.Enqueue(i);

WriteLine($"IEnumerable<int>:キューの内容:{string.Join(", ", que)}");
// 出力:IEnumerable<int>:キューの内容:1, 2, 3
WriteLine($"LINQ:キューの最後の要素:{que.Last()}");
// 出力:LINQ:キューの最後の要素:3
WriteLine($"LINQ:キューの合計:{que.Sum()}");
// 出力:LINQ:キューの合計:6
WriteLine($"LINQ:奇数の個数:{que.Count(n => (n % 2 != 0))}");
// 出力:LINQ:奇数の個数:2

' 整数を格納するキュー
Dim que = New Queue(Of Integer)()
' キューに1から3まで順に投入
For i As Integer = 1 To 3
  que.Enqueue(i)
Next

WriteLine($"IEnumerable<int>:キューの内容:{String.Join(", ", que)}")
' 出力:IEnumerable<int>:キューの内容:1, 2, 3
WriteLine($"LINQ:キューの最後の要素:{que.Last()}")
' 出力:LINQ:キューの最後の要素:3
WriteLine($"LINQ:キューの合計:{que.Sum()}")
' 出力:LINQ:キューの合計:6
WriteLine($"LINQ:奇数の個数:{que.AsEnumerable().Count(Function(n) n Mod 2 <> 0)}")
' 出力:LINQ:奇数の個数:2

Queue<T>コレクションのIEnumerable<T>インタフェースを利用する例(上:C#、下:VB)
IEnumerable<T>インタフェースを実装しているので、stringクラスのJoinメソッドの引数として使える。もちろんforeach(C#)/For Each(VB)ステートメントを使ってキューの内容を順に読み取ることも可能だ。
LINQ拡張メソッドが使えるので、キューの末尾を読み取ったり、キューの全要素の合計を求めたり、特定の条件を満たす要素の数を求めたりなど、LINQ拡張の多様な機能も利用できる。
なお、VBでは、同じ名前のプロパティと拡張メソッドがある場合には、プロパティしか呼び出せない。この例ではAsEnumerable拡張メソッドを使ってCountプロパティを持たないIEnumerable<T>型に変換することで、Count拡張メソッドを呼び出せるようにしている。

マルチスレッドでQueueを使うには?

 Queue<T>コレクションはスレッドセーフではない。非同期/並列処理でQueue<T>コレクションにアクセスするにはスレッド間の排他処理が必要になる。

 .NET Framework 4.0以降であれば、スレッドセーフなConcurrentQueue<T>コレクションやBlockingCollection<T>コレクション(ともにSystem.Collections.Concurrent名前空間)が用意されているので、Queueコレクションではなくそちらを使うべきである。以下、.NET Framework 4.0未満の場合にQueueコレクションでマルチスレッドに対応する方法を説明しよう。

 キューはマルチスレッドで使われることが多い。有名なものに「Producer−Consumerパターン」(生産者−消費者パターン)がある。Producerが処理すべきデータを作ってどんどんキューに投入していく。一方、時間がかかるデータの処理は、別の複数のスレッドで動作しているConsumerがキューからデータを取り出して行うというものだ。

 Producer−Consumerパターンを実装するには、次のような注意点がある。

  • キューに対するアクセスを排他にする
  • キューのサイズを制限する(制限しないとメモリを使い果たす可能性がある)
  • 生産または消費の速度が間に合わないときは、他方の処理を一時的に止める必要がある

 ここでは、スレッド間の排他にはlock構文を使おう(Monitorクラスを使うコードにコンパイルされる)。また、処理を一時的に止めるにはMonitorクラス(System.Threading名前空間)のWaitメソッド/PulseAllメソッドを使うことにする。Queueコレクションはサイズ固定にはできないが、インスタンス化時にキューのサイズを設定し、以降はそれを超えないようにコーディングしよう。

 Queue<T>コレクションを使ってそのようなProducer−Consumerパターンを実装した例を示す(次のコード)。全体を簡潔に書くためにTPL(タスク並列ライブラリ)を使っているが、Producer/Consumerのそれぞれの処理は.NET Framework 2.0の範囲内で作成してある。

const int MAX_QUE = 3; // キューに保持する最大個数
var que = new Queue<int>(MAX_QUE);
bool runningFlag = true; // falseにした後、キューが空になるとConsumerが止まる

// Producerを定義して実行開始
var produceTask = Task.Run(() => {
  // 1から5まで順にデータを投入していく
  for (int i = 1; i <= 5; i++)
  {
    // データ投入が間に合わない場合をシミュレート
    if (i == 5)
    {
      WriteLine($"stop enqueue");
      System.Threading.Thread.Sleep(1000);
      WriteLine($"restart enqueue");
    }

    // 投入データを作成する処理をシミュレート
    System.Threading.Thread.Sleep(10);

    // キューへデータを投入する
    lock (que) // 排他ロックを取得
    {
      while (que.Count >= MAX_QUE)
      {
        // MAX_QUEを超えないように投入を待機する
        WriteLine($"wait enqueue");
        System.Threading.Monitor.Wait(que); // 待機する(他でMonitor.PulseAllされると再開)
      }

      que.Enqueue(i);
      WriteLine($"Enqueue({i}), count={que.Count}");
      System.Threading.Monitor.PulseAll(que); // 待機中のスレッドがあったら、再開させる
    } // 排他ロックを解放
  }
  runningFlag = false; // データ投入完了を通知
  WriteLine($"end enqueue");
});

// Consumerを定義
Action<string> consume = (header) => {
  while (true)
  {
    int? data = null;
    lock (que) // 排他ロックを取得
    {
      while (que.Count == 0 && runningFlag)
      {
        // キューが空の間は待機
        WriteLine($"{header} wait dequeue");
        System.Threading.Monitor.Wait(que); // 待機する(他でMonitor.PulseAllされると再開)
      }

      // キューからデータを取り出す
      if (que.Count > 0)
      {
        data = que.Dequeue();
        WriteLine($"{header} Dequeue({data.Value}), count={que.Count}");
        System.Threading.Monitor.PulseAll(que); // 待機中のスレッドがあったら、再開させる
      }
      else // キューが空で、runningFlag==falseのとき
        break; // 無限ループを抜ける
    } // 排他ロックを解放
    // データの処理はロックを外してから行う
    if (data.HasValue)
    {
      // キューから取り出したデータを使う処理をシミュレート
      WriteLine($"{header} start process({data.Value})");
      System.Threading.Thread.Sleep(300);
      WriteLine($"{header} end process({data.Value})");
    }
  }
  WriteLine($"{header} EXIT");
};

System.Threading.Thread.Sleep(100);

// Consumerの実行を開始(2スレッド)
var consumeTask1 = Task.Run(() => consume("  [1]"));
var consumeTask2 = Task.Run(() => consume("    [2]"));

// 実行終了を待機
Task.WaitAll(produceTask, consumeTask1, consumeTask2);
WriteLine("COMPLETED");

Const MAX_QUE As Integer = 3 ' キューに保持する最大個数
Dim que = New Queue(Of Integer)(MAX_QUE)
Dim runningFlag As Boolean = True ' falseにした後、キューが空になるとConsumerが止まる

' Producerを定義して実行開始
Dim produceTask = Task.Run(
  Sub()
    ' 1から5まで順にデータを投入していく
    For i As Integer = 1 To 5
      ' データ投入が間に合わない場合をシミュレート
      If (i = 5) Then
        WriteLine($"stop enqueue")
        System.Threading.Thread.Sleep(1000)
        WriteLine($"restart enqueue")
      End If

      ' 投入データを作成する処理をシミュレート
      System.Threading.Thread.Sleep(10)

      ' キューへデータを投入する
      SyncLock que ' 排他ロックを取得
        While (que.Count >= MAX_QUE)
          ' MAX_QUEを超えないように投入を待機する
          WriteLine($"wait enqueue")
          System.Threading.Monitor.Wait(que) ' 待機する(他でMonitor.PulseAllされると再開)
        End While

        que.Enqueue(i)
        WriteLine($"Enqueue({i}), count={que.Count}")
        System.Threading.Monitor.PulseAll(que) ' 待機中のスレッドがあったら、再開させる
      End SyncLock '排他ロックを解放
    Next
    runningFlag = False ' データ投入完了を通知
    WriteLine($"end enqueue")
  End Sub)

' Consumerを定義
Dim consume As Action(Of String) =
  Sub(header)
    While (True)
      Dim data As Integer? = Nothing
      SyncLock que '排他ロックを取得
        While (que.Count = 0 AndAlso runningFlag)
          ' キューが空の間は待機
          WriteLine($"{header} wait dequeue")
          System.Threading.Monitor.Wait(que) ' 待機する(他でMonitor.PulseAllされると再開)
        End While

        ' キューからデータを取り出す
        If (que.Count > 0) Then
          data = que.Dequeue()
          WriteLine($"{header} Dequeue({data.Value}), count={que.Count}")
          System.Threading.Monitor.PulseAll(que) ' 待機中のスレッドがあったら、再開させる
        Else ' キューが空で、runningFlag==falseのとき
          Exit While ' 無限ループを抜ける
        End If
      End SyncLock ' 排他ロックを解放
      ' データの処理はロックを外してから行う
      If (data.HasValue) Then
        ' キューから取り出したデータを使う処理をシミュレート
        WriteLine($"{header} start process({data.Value})")
        System.Threading.Thread.Sleep(300)
        WriteLine($"{header} end process({data.Value})")
      End If
    End While
    WriteLine($"{header} EXIT")
  End Sub

System.Threading.Thread.Sleep(100)

' Consumerの実行を開始(2スレッド)
Dim consumeTask1 = Task.Run(Sub() consume("  [1]"))
Dim consumeTask2 = Task.Run(Sub() consume("    [2]"))

' 実行終了を待機
Task.WaitAll(produceTask, consumeTask1, consumeTask2)
WriteLine("COMPLETED")

Queueコレクションを使ってProducer−Consumerパターンを実装した例(上:C#、下:VB)
本文にも記したが、.NET Framework 4.0以降であればConcurrentQueue<T>コレクションやBlockingCollection<T>コレクションを利用すべきである。
Producer−Consumerパターンでは、キューにアクセスするときの排他ロックの管理だけでなく、キューが空のときや満杯のときに一時的に処理を止める管理も行わねばならない。ちなみに、BlockingCollection<T>コレクションではそういった処理が簡潔に書けるようになっている。

 上のコードの実行例を次の画像に示す。

Producer−Consumerパターンの実行例 Producer−Consumerパターンの実行例
インデントなしの出力がProducer、1段インデントした出力は1つ目のConsumer、そして2段インデントした出力が2つ目のConsumerからのものである。
出力を見ると、データの投入や処理が間に合わないときに他方のスレッドが止まっているのが分かる。このようにして、複数のProducer(データ生成)とConsumer(データ処理)のスレッドが協調しながら可能な限り効率よく処理をしていく。

 ちなみに余談ではあるが、ジェネリック版ではないQueueコレクション(System.Collections名前空間)にあったSynchronizedメソッドは、Queue<T>コレクションでは削除された。Synchronizedメソッドで得られたラッパークラスのオブジェクトといえども、次のコードに示すような場合にはスレッド間の排他ロックが必要なのだが、それを忘れてコーディングしてしまう例が頻発したため、Synchronizedメソッドを廃止したということらしい。

var que = new Queue();
var syncQue = Queue.Synchronized(que); 

……省略……

Object obj = null;
// 次のコードは正しくない
if (syncQue.Count > 0) {
  // ↑↓この間に他スレッドでキューが空にされたら……
  obj = syncQue.Dequeue(); // ←ここで例外が出てしまう
}

// 正しくは次のようにロックしなければならない
lock (syncQue.SyncRoot) { 
  if (syncQue.Count > 0)
    obj = syncQue.Dequeue();
}

ノンジェネリック版Queueコレクションの誤った非同期コーディングの例(C#)
ジェネリック版でSynchronizedメソッドを廃止した経緯はマイクロソフトのBCL Team Blog「Synchronization in Generic Collections [Brian Grunkemeyer]」(2005/3/15)に記述されている。

まとめ

 キュー(待ち行列)は、処理待ちのデータを一時的に蓄えておくのに適したコレクションだ。シングルスレッドの場合は、本稿で解説したQueue<T>クラスを使えばよい。マルチスレッドの場合、.NET Framework 4.0未満ではQueue<T>クラスを使うことになるが、本稿で示したように少々複雑になる(.NET Framework 4.0以降ならばBlockingCollection<T>クラスを使った方がよい)。

利用可能バージョン:.NET Framework 2.0以降
カテゴリ:クラス・ライブラリ 処理対象:コレクション
使用ライブラリ:Queueクラス(System.Collections.Generic名前空間)
関連TIPS:構文:クラス名を書かずに静的メソッドを呼び出すには?[C# 6.0]
関連TIPS:VB.NETでクラス名を省略してメソッドや定数を利用するには?
関連TIPS:数値を右詰めや0埋めで文字列化するには?[C#、VB]
関連TIPS:Visual Studioでコンソール・アプリケーションのデバッグ実行時にコマンド・プロンプトを閉じないようにするには?


「.NET TIPS」のインデックス

.NET TIPS

Copyright© 1999-2018 Digital Advantage Corp. All Rights Reserved.

RSSについて

アイティメディアIDについて

メールマガジン登録

@ITのメールマガジンは、 もちろん、すべて無料です。ぜひメールマガジンをご購読ください。