プロデューサー/コンシューマー パターンを利用したサーバー側接続処理

前回プロデューサー/コンシューマー パターンを利用したコンソールアプリケーションを書いてみましたが、今回は以前作成したクライアント・サーバーシステムをプロデューサー/コンシューマー パターンを利用して書きなおしてみます。

クライアント・サーバー型のシステムに適用するということで、次のような方針を取ります。

  • プロデューサ側で接続待ちを行い、接続された TcpClient オブジェクトをキューへ登録する。
  • キャンセル要求が行われた際にキューに未処理のアイテムがある場合、当該アイテムが IDisposable インターフェイスを実装していれば、当該アイテムの Dispose() を行う。
  • キューへの未処理のアイテムの登録数に上限を持つことができるようにする。

2点目は、渡された TcpClinet オブジェクトの Dispose メソッドを呼び出すことで、ソケット接続を閉じることが目的です。
3点目は、クライアント・サーバー型ということで、クライアントからの要求をすべて Accept してキューに入れてしまう(Accept しなければ TcpListener のキューの最大数の制限が有効になり、最大数を超える接続要求は拒否されます)のはサーバー資源の管理上問題があるので、制限できるようにしています。ただし、制限することで、制限値に達した際には動作がブロックされることになります。

プロジェクト・ファイルをダウンロードしたい方は、[MultiThreadAcceptTest2.zip] をダウンロードして下さい。

動作させたときのキューへの登録数を表示する Trace 出力は次のとおりです。
Trace 出力
この画像はサーバー側でキュー登録の上限を2としコンシューマを2つ動作させ、クライアントを5つ立ち上げたときのものです。3つめ、4つめがキューに入り、5つめがキュー登録待ちとなり、1つめの処理が終了して3つめがキューから取り出されると、5つめがキューに登録されています。

それでは、プログラムです。
まず、AsyncProducerConsumerCollection クラスです。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MultiThreadAcceptTest2.Server
{
    class AsyncProducerConsumerCollection<T>
    {
        // キューへの投入を待機させるための TaskCompletionSource オブジェクト(TResult の bool 型はダミー)
        private TaskCompletionSource<bool> _queueLimitTask;
        // 渡すデータのキュー
        private readonly Queue<T> _collection;
        // 渡すデータが無かった時に未完了状態で返した TaskCompletionSource オブジェクトのキュー
        private readonly Queue<TaskCompletionSource<T>> _waiting;

        // queueEntriesLimit が 0 のときにはキューへの投入制限を行わない(デフォルト値 0)
        public AsyncProducerConsumerCollection(int queueEntriesLimit = 0)
        {
            _collection = new Queue<T>();
            _waiting = new Queue<TaskCompletionSource<T>>();
            _queueEntriesLimit = queueEntriesLimit;
            _queueLimitTask = null;
        }

        private readonly int _queueEntriesLimit;
        public int QueueEntriesLimit
        {
            get { return _queueEntriesLimit; }
        }

        public void Add(T item)
        {
            TaskCompletionSource<T> tcs = null;
            lock (_collection)
            {
                if (_waiting.Count > 0)
                {
                    tcs = _waiting.Dequeue();
                }
                else
                {
                    _collection.Enqueue(item);
                    System.Diagnostics.Trace.WriteLine(
                        string.Format("{0} データキューの数: {1}",
                        DateTime.Now.ToLongTimeString(), _collection.Count));
                    // キューへの投入制限値に達していた場合、待機用の TaskCompletionSource オブジェクトを生成
                    if (QueueEntriesLimit != 0 &&
                        _collection.Count >= QueueEntriesLimit)
                    {
                        _queueLimitTask = new TaskCompletionSource<bool>();
                    }
                }
            }
            if (tcs != null)
            {
                // 未完了状態のオブジェクトにセットするとき
                // item のセットとともに、タスクの状態を「正常に完了」への遷移を試みる
                tcs.TrySetResult(item);
            }
            // キューへの投入待機用オブジェクトが生成されていた場合には完了まで待機する
            if (_queueLimitTask != null)
            {
                _queueLimitTask.Task.Wait();
                _queueLimitTask = null;
            }
        }

        public Task<T> Take()
        {
            return Take(default(CancellationToken));
        }

        public Task<T> Take(CancellationToken token)
        {
            if (token.WaitHandle != null)
            {
                token.Register(() => canceled());
            }

            lock (_collection)
            {
                if (_collection.Count > 0)
                {
                    // データのキューが空でなければキューから取り出す
                    //return Task.FromResult(_collection.Dequeue()); // .NET Framework 4.0 には FromResult メソッドがない
                    return Task<T>.Factory.StartNew(() =>
                    {
                        var item = _collection.Dequeue();
                        System.Diagnostics.Trace.WriteLine(
                            string.Format("{0} データキューの数: {1}",
                            DateTime.Now.ToLongTimeString(), _collection.Count));
                        // キューへの投入待機用オブジェクトが生成されていた場合には「正常に完了」への遷移を試みる
                        if (_queueLimitTask != null)
                        {
                            _queueLimitTask.TrySetResult(true);
                        }
                        return item;
                    });
                }
                else
                {
                    // キューが空だったら、未完了状態の TaskCompletionSource から作成されるタスクを返す
                    var tcs = new TaskCompletionSource<T>();
                    _waiting.Enqueue(tcs);
                    return tcs.Task;
                }
            }
        }

        private void canceled()
        {
            lock (_collection)
            {
                // キューへの投入待機用オブジェクトが生成されていた場合には「正常に完了」への遷移を試みる
                if (_queueLimitTask != null)
                {
                    _queueLimitTask.TrySetResult(true);
                }

                // キューに入っているデータをクリア
                while (_collection.Count > 0)
                {
                    var item = _collection.Dequeue();
                    if (item as IDisposable != null)
                    {
                        (item as IDisposable).Dispose();
                    }
                }

                while (_waiting.Count > 0)
                {
                    // 未完了状態の TaskCompletionSource オブジェクトを取り出し
                    var tcs = _waiting.Dequeue();
                    // Canceld 状態への遷移を試みる
                    tcs.TrySetCanceled();
                }
            }
        }
    }
}

Add メソッドの中で _queueLimitTask を利用してキューへの投入制限を行なっています。
canceled メソッドの中で(122行目から)キューに残っているアイテムを取り出し、そのアイテムが IDisposable インターフェイスを実装していたら Dispose メソッドを実行しています。

次に、列挙型の ServerError と ServerServiceException クラスです。

using System;

namespace MultiThreadAcceptTest2.Server
{
    public enum ServerError
    {
        IpAddressError = 0, PortOutOfRange, EncodingNotSet, ConnectionClose
    }

    public class ServerServiceException : ApplicationException
    {
        public ServerServiceException(ServerError error, string message) : base(message)
        {
            Error = error;
        }

        public ServerError Error { get; private set; }
    }
}

次に、ServerService クラスです。

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace MultiThreadAcceptTest2.Server
{
    class ServerService : IDisposable
    {
        private int _concurrencyLimit;
        private Encoding _encoding;
        private AsyncProducerConsumerCollection<TcpClient> _queue;
        private List<Task> _tasks;
        private TcpListener _listener;
        private bool _disposed = false;
        private const int QueueLimit = 2; // キュー投入の制限数

        // スレッドで例外が発生した場合に発生するイベント
        public event ThreadExceptionEventHandler ThreadExceptionOccurred;

        public ServerService(int concurrencyLimit, Encoding encoding)
        {
            _concurrencyLimit = concurrencyLimit;
            _encoding = encoding;
            _queue = new AsyncProducerConsumerCollection<TcpClient>(QueueLimit);
            _tasks = new List<Task>();
            _listener = null;
        }

        ~ServerService()
        {
            Dispose(false);
        }

        private Action cancel;

        public void StartService(string hostAddress, int portNo)
        {
            try
            {
                var ipAddress = checkHostAddress(hostAddress);
                checkPortNo(portNo);

                var cts = new CancellationTokenSource();
                cancel = () => cts.Cancel();

                // コンシューマを ConcurrencyLimit で指定された数立ち上げる
                for (var i = 0; i < _concurrencyLimit; ++i)
                {
                    _tasks.Add(TaskConsumerAsync(cts.Token));
                }

                if (_listener == null)
                {
                    _tasks.Add(TaskProducerAsync(ipAddress, portNo, cts.Token));
                }
            }
            catch (ServerServiceException e)
            {
                raiseThreadExceptionOccurred(e);
            }
        }

        public void StopService()
        {
            if (cancel != null)
            {
                cancel();
                cancel = null;
            }

            if (_listener != null)
            {
                _listener.Stop();
                _listener = null;
            }
        }

        public void WaitTasksEnd()
        {
            Task.WaitAll(_tasks.ToArray());
        }

        private IPAddress checkHostAddress(string hostAddress)
        {
            if (string.IsNullOrEmpty(hostAddress))
            {
                throw new ServerServiceException(ServerError.IpAddressError,
                    "IP アドレスが設定されていません。");
            }

            var hostName = Dns.GetHostName();
            var addresses = Dns.GetHostAddresses(hostName);
            IPAddress ipAddress;
            if (!IPAddress.TryParse(hostAddress, out ipAddress))
            {
                throw new ServerServiceException(ServerError.IpAddressError,
                    "無効なアドレスが設定されています。");
            }
            if (!IPAddress.IsLoopback(ipAddress) && !addresses.Contains(ipAddress))
            {
                throw new ServerServiceException(ServerError.IpAddressError,
                    "設定されたアドレスは自ホストのアドレスではありません。");
            }

            return ipAddress;
        }

        private void checkPortNo(int portNo)
        {
            if (portNo < 0 || portNo > 65535)
            {
                throw new ServerServiceException(ServerError.PortOutOfRange,
                    "無効なポート番号が設定されています。");
            }
            if (portNo < 1024)
            {
                throw new ServerServiceException(ServerError.PortOutOfRange,
                    "ポート番号に予約ポートが設定されています。");
            }
        }

        private async Task TaskConsumerAsync(CancellationToken token)
        {
            try
            {
                while (!token.IsCancellationRequested)
                {
                    var client = await _queue.Take(token);
                    try
                    {
                        await doProcessAsync(client, token);
                    }
                    catch (IOException e)
                    {
                        if (e.InnerException.GetType() == typeof(SocketException) &&
                            (e.InnerException as SocketException).ErrorCode == 10054)
                        {
                            Console.WriteLine("{0} [コンシューマ] クライアントが切断していたためデータを送信できませんでした。",
                                DateTime.Now.ToLongTimeString());
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        Console.WriteLine("{0} [コンシューマ] 処理中にキャンセルが要求されました。",
                            DateTime.Now.ToLongTimeString());
                    }
                    finally
                    {
                        client.Close();
                    }
                }
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("{0} [コンシューマ] 処理データの待機中にキャンセルが要求されました。",
                    DateTime.Now.ToLongTimeString());
            }
        }

        private async Task doProcessAsync(TcpClient client, CancellationToken token)
        {
            using (var stream = client.GetStream())
            {
                Console.WriteLine("{0} [S] Server ready", DateTime.Now.ToLongTimeString());
                await sendDataAsync(stream, _encoding.GetBytes("Server ready"));

                var rData = await receiveDataAsync(stream);
                Console.WriteLine("{0} [C] {1}", DateTime.Now.ToLongTimeString(), _encoding.GetString(rData));

                token.ThrowIfCancellationRequested();

                // サーバー側処理の代わりに15秒待つ
                //(並行処理の様子の観察に都合が良い)
                await Task.Delay(5000);
                token.ThrowIfCancellationRequested();

                await Task.Delay(5000);
                token.ThrowIfCancellationRequested();

                await Task.Delay(5000);
                token.ThrowIfCancellationRequested();

                Console.WriteLine("{0} [S] {1}", DateTime.Now.ToLongTimeString(), _encoding.GetString(rData));
                await sendDataAsync(stream, rData);
            }
        }

        private async Task TaskProducerAsync(IPAddress ipAddress, int portNo, CancellationToken token)
        {
            try
            {
                _listener = new TcpListener(ipAddress, portNo);
                _listener.Start();
                Console.WriteLine("{0} {1}/Port[{2}]の Listen を開始しました。",
                    DateTime.Now.ToLongTimeString(), ipAddress.ToString(), portNo);

                while (!token.IsCancellationRequested)
                {
                    var client = await _listener.AcceptTcpClientAsync();
                    Console.WriteLine("{0} クライアントが接続しました。",
                        DateTime.Now.ToLongTimeString());
                    // 接続してきたクライアントをキューにセットする
                    _queue.Add(client);

                }
            }
            catch (SocketException e)
            {
                switch (e.ErrorCode)
                {
                    case 10004:
                        Console.WriteLine("{0} Port[{1}]の Listen が強制終了されました。",
                            DateTime.Now.ToLongTimeString(), portNo);
                        break;
                    default:
                        Console.WriteLine("{0} Socket exception: {1}, errCode({2})",
                            DateTime.Now.ToLongTimeString(), e.Message, e.ErrorCode);
                        break;
                }
            }
            catch (ObjectDisposedException)
            {
                Console.WriteLine("{0} Port[{1}]の Listen が破棄されました。",
                    DateTime.Now.ToLongTimeString(), portNo);
            }
            finally
            {
                if (_listener != null)
                {
                    _listener.Stop();
                }
            }
        }

        private async Task<byte[]> receiveDataAsync(NetworkStream stream)
        {
            byte[] result;
            using (var memStream = new MemoryStream())
            {
                var rBuff = new byte[1024];
                int rSize;
                do
                {
                    // データの一部を受信する
                    rSize = await stream.ReadAsync(rBuff, 0, rBuff.Length);
                    // rSize が 0 のときにはクライアントが切断したと判断
                    if (rSize == 0)
                    {
                        Console.WriteLine("クライアントが切断しました。");
                        throw new ServerServiceException(ServerError.ConnectionClose,
                            "ストリームの読み出しの際、クライアントが切断していました。");
                    }
                    // 受信したデータを蓄積する
                    memStream.Write(rBuff, 0, rSize);
                } while (stream.DataAvailable);

                result = memStream.ToArray();
            }
            Console.WriteLine("RecieveData(hex):{0}", bytes2Hex(result));
            return result;
        }

        private async Task sendDataAsync(NetworkStream stream, byte[] message)
        {
            await stream.WriteAsync(message, 0, message.Length);
            Console.WriteLine("SendData(hex):{0}", bytes2Hex(message));
        }

        private static string bytes2Hex(byte[] bytes)
        {
            var val = new StringBuilder();
            foreach (var n in bytes)
            {
                val.Append(string.Format("{0:X2} ", n));
            }
            return val.ToString();
        }

        // ThreadExceptionOccurred イベントを発火させる
        private void raiseThreadExceptionOccurred(Exception e)
        {
            var handler = ThreadExceptionOccurred;
            if (handler != null)
            {
                handler(this, new ThreadExceptionEventArgs(e));
            }
        }

        #region IDisposable

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (_disposed) return;

            _disposed = true;

            if (disposing)
            {
                // マネージ リソースの解放処理
            }
            // アンマネージ リソースの解放処理
            StopService();
        }

        #endregion
    }
}

TaskProducerAsync メソッドの 例外処理で「SocketException」「ObjectDisposedException」の2つをキャッチしていますが、これは接続待ちの際にリスナの Stop メソッドでリスナを停止したときの振る舞いが「AcceptTcpClient」メソッドと「AcceptTcpClientAsync」メソッドで異なっているためです。前者は SocketException(ErrorCode: 10004)を投げてくるのに対して、後者は ObjectDisposedException を投げてきます。

次に、Server クラスです。

using System;
using System.Text;
using System.Threading;

namespace MultiThreadAcceptTest2.Server
{
    class Server
    {
        private const string HostAddress = "127.0.0.1";
        private const int PortNo = 10050;
        private const int ConcurrencyLimit = 2;
        private static readonly Encoding ServerEncoding = Encoding.UTF8;

        static void Main(string[] args)
        {
            var service = new ServerService(ConcurrencyLimit, ServerEncoding);
            service.ThreadExceptionOccurred += onException;
            service.StartService(HostAddress, PortNo);

            Console.WriteLine("Enter キー押下で Listen を終了します。");
            Console.ReadLine();
            service.StopService();

            Console.WriteLine("タスクの終了を待ちます。");
            service.WaitTasksEnd(); ;

            Console.WriteLine("Enter キー押下で終了します。");
            Console.ReadLine();
        }

        static void onException(object sender, ThreadExceptionEventArgs e)
        {
            Console.WriteLine("例外が発生しました。: {0}, {1}",
                e.Exception.GetType().Name, e.Exception.Message);
        }
    }
}

サーバー側のプログラムは以上です。
長くなったので、クライアント側は掲載しません(以前のものとほぼ同じですし)。プログラムを動かしてみたい方は、先頭付近の Trace 出力画像の上にあるリンクから zip ファイルをダウンロードして下さい。動かす方法は、サーバー側をデバッグ実行(あるいは「デバッグ無しで開始」)して、クライアント側はコンパイル後の実行ファイルをエクスプローラーから起動します。

次回は、IPv4 と IPv6 で接続待ち行うプログラムにプロデューサー/コンシューマー パターンを利用したものを作ってみます。


非同期処理(インデックス)


1 thought on “プロデューサー/コンシューマー パターンを利用したサーバー側接続処理

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です