非同期処理(その4)

今回は、同時実行するタスクの数を制限してみます。基にするコードは前々回の ContinueWith を用いたものにします。
実行画面は次のとおりで、これは同時実行数を2に制限していますが、3件目のデータを入力したところで、コンソールからの入力処理がブロックされています。
実行画面

プログラムは次のとおりです。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

// 同時実行数を制限する
// 起動しているタスクが同実行数に達すると、UI スレッドがブロックされてしまう。
// もうひと工夫必要

// 別スレッドで走る doSomething メソッド中で例外をキャッチせず、ジョインされたスレッド側で
// キャッチを行なっているため、デバッグ実行を行うと例外の発生で一時停止します。その場合は
// [継続] ボタンをクリックして続行させてください。「デバッグ無しで開始」させると、途中での停止は
// 発生しません。

namespace ManagedTasksTest4
{
    class Program
    {
        private const int ConcurrencyLimit = 2;

        static void Main(string[] args)
        {
            var tasks = new List<Task>();
            var syncObject = new object();
            var cts = new CancellationTokenSource();

            while (true)
            {
                Console.WriteLine("入力してください(終了: \"]\"キー)。");
                var input = Console.ReadLine();

                if (input == "]") break;

                if (tasks.Count >= ConcurrencyLimit)
                {
                    Task.WaitAny(tasks.ToArray());
                }

                var task = new Task<string>(() => doSomething(input, cts.Token));
                task.ContinueWith((s) =>
                {
                    // task は正常終了したか?
                    if (task.Status == TaskStatus.RanToCompletion)
                    {
                        Console.WriteLine(s.Result);
                    }
                    else
                    {
                        Console.WriteLine("非同期処理が正常終了しませんでした。");
                    }
                    lock (syncObject)
                    {
                        tasks.Remove(task);
                    }
                });
                lock (syncObject)
                {
                    tasks.Add(task);
                }
                task.Start();
            }

            // キャンセル要求を伝える
            cts.Cancel();

            Console.WriteLine("非同期処理の終了を待ちます。");
            Task[] taskArray;
            lock (syncObject)
            {
                taskArray = tasks.ToArray();
            }
            try
            {
                Task.WaitAll(taskArray);
            }
            // ジョインされたタスクで発生した例外をキャッチ
            catch (AggregateException ae)
            {
                ae.Handle((ex) =>
                {
                    var result = false;

                    // 例外が OperationCanceledException だったら例外を除去する(ほかの例外は再スローされる)
                    if (ex is OperationCanceledException)
                    {
                        Console.WriteLine(ex.Message);
                        result = true;
                    }
                    return result;
                });
            }

            Console.WriteLine("Enter キー押下で終了します。");
            Console.ReadLine();
        }

        static string doSomething(string s, CancellationToken token)
        {
            Thread.Sleep(2500);
            if (token.IsCancellationRequested)
            {
                var message = string.Format("キャンセルしました: 処理中のデータ({0})", s);
                throw new OperationCanceledException(message);
            }
            Thread.Sleep(2500);

            return string.Format("retrun: {0}", s);
        }
    }
}

同時実行数を制限するために、Main メソッドの while ループ中で実行数が制限値に達していたら WaitAny で終了を待つようにしていますが、この終了待ちでコンソール入力のスレッドがブロックされてしまうわけです。これを解決するにはコンソール入力のスレッドとデータ処理のスレッドを疎結合にする必要があります。

次回は、疎結合にする方法についてです。


非同期処理(インデックス)


コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です