非同期処理(その5)

前回のプロデューサー/コンシューマー パターンを利用して、同時実行するタスクの数を制限できるプログラムを考えてみます。

実行時の画面は次のとおりです。
実行画面
この画面では、コンシューマを3つ動かしていて、時刻の次の ProcessId: で 0 から 2 まで表示されています。4つめの "ddd" の表示は、コンシューマが全て塞がっている -> キューに置かれる -> "aaa" を処理していたコンシューマが空いてキューから取り出される -> "ddd" が処理される という手順を経ることにになり、コンシューマが実行する処理に5秒かかるようにしてあるので、最初の "aaa" の表示の5秒後となっています。

プログラムは次のとおりです。
まずは AsyncProducerConsumerCollection<T> クラスです(特に変更はありません)。

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ManagedTasksTest5
{
    class AsyncProducerConsumerCollection<T>
    {
        // 渡すデータのキュー
        private readonly Queue<T> _collection;
        // 渡すデータ無かった時に未完了状態で返された TaskCompletionSource オブジェクトのキュー
        private readonly Queue<TaskCompletionSource<T>> _waiting;

        public AsyncProducerConsumerCollection()
        {
            _collection = new Queue<T>();
            _waiting = new Queue<TaskCompletionSource<T>>();
        }

        public void Add(T item)
        {
            TaskCompletionSource<T> tcs = null;
            lock (_collection)
            {
                if (_waiting.Count > 0)
                {
                    tcs = _waiting.Dequeue();
                }
                else
                {
                    _collection.Enqueue(item);
                }
            }
            if (tcs != null)
            {
                // 未完了状態のオブジェクトにセットするとき
                // item のセットともに、タスクの状態を「正常に完了」への遷移を試みる
                tcs.TrySetResult(item);
            }
        }

        public Task<T> Take()
        {
            return Take(default(CancellationToken));
        }

        public Task<T> Take(CancellationToken token)
        {
            if (token.WaitHandle != null)
            {
                token.Register(() => canceled());
            }

            lock (_collection)
            {
                if (_collection.Count > 0)
                {
                    // データのキューが空でなければキューから取り出す
                    return Task.FromResult(_collection.Dequeue());
                }
                else
                {
                    // キューが空だったら、未完了状態の TaskCompletionSource から作成されるタスクを返す
                    var tcs = new TaskCompletionSource<T>();
                    _waiting.Enqueue(tcs);
                    return tcs.Task;
                }
            }
        }

        private void canceled()
        {
            lock (_collection)
            {
                // キューに入っているデータをクリア
                _collection.Clear();

                while (_waiting.Count > 0)
                {
                    // 未完了状態の TaskCompletionSource オブジェクトを取り出し
                    var tcs = _waiting.Dequeue();
                    // Canceld 状態への遷移を試みる
                    tcs.TrySetCanceled();
                }
            }
        }
    }
}

次に Program.cs

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ManagedTasksTest5
{
    class Program
    {
        private const int ConcurrencyLimit = 3;
        private static AsyncProducerConsumerCollection<string> _queue;

        static void Main(string[] args)
        {
            _queue = new AsyncProducerConsumerCollection<string>();

            var cts = new CancellationTokenSource();
            // コンシューマを ConcurrencyLimit で指定された数立ち上げる
            var taskList = new List<Task>();
            for (var i = 0; i < ConcurrencyLimit; ++i)
            {
                taskList.Add(TaskConsumerAsync(i, cts.Token));
            }

            while (true)
            {
                Console.WriteLine("入力してください(終了: \"]\"キー)。");
                var input = Console.ReadLine();

                if (input == "]") break;

                _queue.Add(input);
            }

            Console.WriteLine("Cancel を通知します。");
            cts.Cancel();

            Console.WriteLine("非同期処理の終了を待ちます。");
            Task.WaitAll(taskList.ToArray());

            Console.WriteLine("Enter キー押下で終了します。");
            Console.ReadLine();
        }

        static async Task TaskConsumerAsync(int id, CancellationToken token)
        {
            try
            {
                while (!token.IsCancellationRequested)
                {
                    var task = _queue.Take(token);
                    var item = await task;
                    var taskDoProcess = DoSomethingAsync(item, id, token);
                    var result = "";
                    try
                    {
                        result = await taskDoProcess;
                    }
                    catch (OperationCanceledException e)
                    {
                        Console.WriteLine(e.Message);
                    }

                    if (taskDoProcess.Status == TaskStatus.RanToCompletion)
                    {
                        var message = string.Format("{0:T} {1}", DateTime.Now, result);
                        Console.WriteLine(message);
                    }
                }
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("処理データの待機中にキャンセルが要求されました。");
            }
        }

        static async Task<string> DoSomethingAsync(string s, int id, CancellationToken token)
        {
            await Task.Delay(2500);
            if (token.IsCancellationRequested)
            {
                var message = string.Format("キャンセルしました: 処理中のデータ({0})", s);
                throw new OperationCanceledException(message);
            }
            await Task.Delay(2500);
            return string.Format("ProcessId: {0}, Input: {1}", id, s);
        }
    }
}

コンシューマを複数立ち上げて管理するために、19行目で List<Task> 型の taskList を作り、20行目からの for ループでコンシューマを ConcurrencyLimit で指定する数作成しています。

次回は、以前作ったクライアント・サーバー型のスケルトン的なものをプロデューサー/コンシューマー パターンを利用して書きなおしてみます。


非同期処理(インデックス)


非同期処理(その5)」への1件のフィードバック

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です