プロデューサー/コンシューマー パターン

前回及び前々回の「うまく書けない・気持ち悪いことになる」のは、キーボード入力の受け取りと受け取ったデータの処理が密接に結合していていることが原因となっています。そこで、この2つを疎結合とする方法として、プロデューサー/コンシューマー パターンを適用します。

データの受取部と処理部を切り離す方法として真っ先に思いつくのは、先入れ/先出しのキューを利用する方法だと思います。ただ、単純にキューを介してデータを渡す方法では、「キューが空の時は処理部を待機させて、データが入ってきた時には待機を解除させる」ことができません。この問題を解決するプロデューサー/コンシューマー パターンの C# 実装が MSDN のタスク ベースの非同期パターンの利用の(長い説明文の)最後に書かれています。

MSDN の記事には .NET Framework 4.5 では NuGet で System.Threading.Tasks.Dataflow 名前空間の BufferBlock クラスが取得できると書いてありますが、まだまだ Visual Studio 2010 & .NET Framework 4 の利用も多いと思うので、書かれている AsyncProducerConsumerCollection クラスを Visual Studio 2010 & .NET Framework 4 で使えるようにするとともに、キャンセルにも対応させてみます。

実行画面は次のとおりです。
実行画面

プログラムは次のとおりです。
AsyncProducerConsumerCollection クラス

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncProducerConsumerCollection
{
    // see. http://msdn.microsoft.com/ja-jp/library/hh873173.aspx AsyncProducerConsumerCollection
    class AsyncProducerConsumerCollection<T>
    {
        // 渡すデータのキュー
        private readonly Queue<T> _collection;
        // 渡すデータ無かった時に未完了状態で返した TaskCompletionSource オブジェクトのキュー
        private readonly Queue<TaskCompletionSource<T>> _waiting;

        public AsyncProducerConsumerCollection()
        {
            _collection = new Queue<T>();
            _waiting = new Queue<TaskCompletionSource<T>>();
        }

        public void Add(T item)
        {
            TaskCompletionSource<T> tcs = null;
            lock (_collection)
            {
                if (_waiting.Count > 0)
                {
                    tcs = _waiting.Dequeue();
                }
                else
                {
                    _collection.Enqueue(item);
                }
            }
            if (tcs != null)
            {
                // 未完了状態のオブジェクトにセットするとき
                // item のセットともに、タスクの状態を「正常に完了」への遷移を試みる
                tcs.TrySetResult(item);
            }
        }

        public Task<T> Take()
        {
            return Take(default(CancellationToken));
        }

        public Task<T> Take(CancellationToken token)
        {
            if (token.WaitHandle != null)
            {
                token.Register(() => canceled());
            }

            lock (_collection)
            {
                if (_collection.Count > 0)
                {
                    // データのキューが空でなければキューから取り出す
                    //return Task.FromResult(_collection.Dequeue()); // .NET Framework 4.0 には FromResult メソッドがない
                    return Task<T>.Factory.StartNew(() => _collection.Dequeue());
                }
                else
                {
                    // キューが空だったら、未完了状態の TaskCompletionSource から作成されるタスクを返す
                    var tcs = new TaskCompletionSource<T>();
                    _waiting.Enqueue(tcs);
                    return tcs.Task;
                }
            }
        }

        private void canceled()
        {
            lock (_collection)
            {
                // キューに入っているデータをクリア
                _collection.Clear();

                while (_waiting.Count > 0)
                {
                    // 未完了状態の TaskCompletionSource オブジェクトを取り出し
                    var tcs = _waiting.Dequeue();
                    // Canceld 状態への遷移を試みる
                    tcs.TrySetCanceled();
                }
            }
        }
    }
}

Program.cs

using System;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncProducerConsumerCollection
{
    class Program
    {
        private static AsyncProducerConsumerCollection<int> _queue;

        static void Main(string[] args)
        {
            _queue = new AsyncProducerConsumerCollection<int>();

            var cts = new CancellationTokenSource();
            var task = Task.Factory.StartNew(() => Consumer(cts.Token));

            while (true)
            {
                Console.WriteLine("入力してください(終了: \"]\"キー)。");
                var input = Console.ReadLine();

                if (input == "]") break;

                int inputNum;

                if (!int.TryParse(input, out inputNum))
                {
                    Console.WriteLine("数字を入力してください");
                    continue;
                }
                _queue.Add(inputNum);
            }

            Console.WriteLine("Cancel を通知します。");
            cts.Cancel();

            Console.WriteLine("非同期処理の終了を待ちます。");
            task.Wait();

            Console.WriteLine("Enter キー押下で終了します。");
            Console.ReadLine();
        }

        static void Consumer(CancellationToken token)
        {
            try
            {
                while (!token.IsCancellationRequested)
                {
                    var task = _queue.Take(token);
                    var item = task.Result; // Task<TResult>.Result プロパティは完了を待機します
                    var result = "";
                    try
                    {
                        result = doProcess(item, token);
                        Console.WriteLine(result);
                    }
                    catch (OperationCanceledException e)
                    {
                        Console.WriteLine(e.Message);
                    }

                }
            }
            catch (AggregateException ae)
            {
                ae.Handle((ex) =>
                {
                    var result = false;
                    if (ex is OperationCanceledException)
                    {
                        Console.WriteLine("処理データの待機中にキャンセルが要求されました。");
                        result = true;
                    }
                    return result;
                });
            }
        }

        static string doProcess(int item,CancellationToken token)
        {
            Thread.Sleep(2000);
            token.ThrowIfCancellationRequested();
            Thread.Sleep(2000);
            return string.Format("Input number: {0}", item);
        }
    }
}

AsyncProducerConsumerCollection クラスは、処理待ちのデータのキューと未完了状態のタスクのキューを持ちます。

処理スレッドからデータの取り出し要求を受けると、処理待ちのキューにセットされている要素があるかを調べ、要素があるときにはキューから要素を取り出して当該要素をセットした完了タスクを返却し、要素がないときには未完了状態のタスクを作成して未完了状態のタスクのキューに登録するとともに当該タスクを返却します。

データのセット要求を受けると、未完了状態のタスクのキューにセットされている要素があるかを調べ、要素があるときにはキューから要素を取り出して当該要素(未完了状態のタスク)にセット要求のあったアイテムをセットしたうえでタスクの状態を「正常に完了」へ移行させ、要素がないときには処理待ちのキューにセット要求のあったアイテムを登録します。

キャンセル対応は、未完了状態のタスクを取り出してキャンセル状態へ遷移させ、キューに入っている処理待ちデータについては単純にクリアしています。単純クリアでは問題があるときには、何らかの対応が必要です。

Program.cs の Consumer メソッドでは var item = task.Result; で処理するデータを受け取ります。Task.Result プロパティはタスクの完了を待つので、var task = _queue.Take(token); で未完了状態のタスクを受け取った場合には、当該タスクの完了待ち(つまり、データがセットされるまで待機)になります。

次回は、今回の非同期処理のまとめとして、プロデューサー/コンシューマー パターンを適用して同時実行するタスクの数を制限するプログラムです(まぁ、今回のも同時実行数:1 なわけですが)。


非同期処理(インデックス)


コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です