サーバー側接続待ちの別スレッド化の試作(サーバー)

クライアント・サーバー型システムの試作(サーバー)」の投稿でマルチスレッド化に触れたので、ちょっとサーバー接続部分のマルチスレッド化のコードを書いてみました。

サーバーは、クライアントが接続してくると応答を返し、その後クライアントから送られてきた文字列を受信の15秒後にクライアントへ送り返すだけとします(15秒待つのは並行処理の様子を観察するため)。

— 2012/01/26 22:30 追記
メインスレッドが子タスクより先に終了した場合、子タスクの後始末が行われないことを確認したため、端折っていた子タスク管理と子タスクへのキャンセル通知をきちんと行うように修正しました(それに伴いコンソールの画像も差し替えています)。

IPv4 と IPv6 双方で接続待ちを行うプログラムも書いてみました。興味ある方はご覧下さい。

サーバー側のマルチスレッド化をプロデューサー/コンシューマー パターンを利用して書きなおしたものを投稿したので、興味のある方はどうぞ[2013/04/04]。

プロジェクト・ファイルをダウンロードしたい方は、[MultiThreadAcceptTest.zip] をダウンロードして下さい(ダウンロードページを作るのがめんどくさいので、ここからダウンロードするようにしました)。

ちなみに、並行処理の最大数を3にして、4つのクライアントから要求を出したときのコンソールへの表示内容は次のようになります。

サーバーの表示
server

クライアント1の表示
client1

クライアント2の表示
client2

クライアント3の表示
client3

クライアント4の表示
client4

4つめの接続要求がキューに入っている状態で Enter キーを押下して Listen を終了させたときのコンソールへの表示内容は次のようになります。
server cancel

まずは Main メソッドから。
ServerService クラスの ServiceStart メソッドを使用することでクライアントへのサービスを開始し、ServiceEnd メソッドを使用することでクライアントへのサービスを終了します。

//
        static void Main(string[] args)
        {
            // 文字コードを UTF-8 とする
            var enc = System.Text.Encoding.UTF8;

            // ローカルアドレスで listen を開始する
            var host = "localhost";
            var port = 2001;

            var service = new ServerService()
            {
                HostName = host,
                PortNo = port,
                Encoding = enc
            };

            service.ServiceStart();

            Console.WriteLine("Enter キー押下で Listen を終了します。");
            Console.ReadLine();

            service.ServiceEnd();
            Console.WriteLine("{0} Listen スレッドの 終了を検知しました。Enter キー押下で終了します。",
                DateTime.Now.ToString());
            Console.ReadLine();
        }

で、その ServerService クラスです。
プロパティ等やコンストラクタ、ServiceStart メソッド、ServiceEnd メソッドです。
ServiceStart メソッドで、Listen 等を行う startListen メソッドを別スレッドで開始します。
ServiceEnd メソッドで、Action デリゲート型の notifyCancel にセットされたキャンセル通知をキックし、続けて StartListen メソッド終了の契機となる endListen メソッドを呼び出してから、StartListen メソッドの終了待ちを行なっています。

//
        public string HostName { get; set; }
        public int PortNo { get; set; }
        public System.Text.Encoding Encoding { get; set; }

        private static System.Threading.Semaphore pool;
        private System.Threading.Tasks.Task taskListen;
        private bool isCreatedListner = false;
        private System.Net.Sockets.TcpListener listner = null;
        private bool isServiceEnd = false;
        private Action notifyCancel;

        public ServerService()
        {
        }

        public void ServiceStart()
        {
            taskListen = System.Threading.Tasks.Task.Factory.StartNew(() =>
            {
                startListen();
            });
        }

        public void ServiceEnd()
        {
            isServiceEnd = true;
            if (notifyCancel != null)
                notifyCancel();
            endListen();
            taskListen.Wait();
        }

次は startListen メソッドです。マルチスレッド化の肝の部分になります。
LIMIT_CONCURRENCY に並行処理する最大スレッド数をセットしています。そして、セマフォを利用することで、並行処理するスレッドの数をコントロールしています。
クライアントが接続してきたら、当該クライアントとの間の処理を行う doProcess メソッドを別スレッドで開始します。
メソッドが終了する前に、すべての子タスクの終了待ちを行なっています。

//
        private void startListen()
        {
            const int LIMIT_CONCURRENCY = 3;

            var lockObj = new object();
            var taskList = new List<System.Threading.Tasks.Task>();
            var tokenSource = new System.Threading.CancellationTokenSource();
            var token = tokenSource.Token;
            notifyCancel = new Action(() => tokenSource.Cancel());

            try
            {
                var ipAddress = System.Net.Dns.GetHostAddresses(HostName).First();

                listner = new System.Net.Sockets.TcpListener(ipAddress, PortNo);
                isCreatedListner = true;
                listner.Start();
                Console.WriteLine("{0} Port[{1}]の Listen を開始しました。",
                    DateTime.Now.ToString(), PortNo);

                // セマフォを生成
                pool = new System.Threading.Semaphore(LIMIT_CONCURRENCY, LIMIT_CONCURRENCY);

                while (true)
                {
                    pool.WaitOne();     // セマフォ待機

                    if (isServiceEnd)     // 終了時にセマフォ待機が解けて進入してきた場合への対応
                        break;

                    // 接続要求の受け入れ
                    var client = listner.AcceptTcpClient();     // 接続要求が来るまでブロック
                    Console.WriteLine("{0} クライアントが接続しました。",
                        DateTime.Now.ToString());

                    // 別スレッドで doProcess() を動かすようにセット
                    var task = new System.Threading.Tasks.Task(() => doProcess(client, token), token);
                    lock (lockObj)
                    {
                        taskList.Add(task);
                        Console.WriteLine("task数: {0}", taskList.Count);
                    }
                    // スレッド終了時に TcpClient のクローズとセマフォの開放を行うようにセット
                    task.ContinueWith((t) =>
                    {
                        client.Close();
                        Console.WriteLine("{0} 切断しました。", DateTime.Now.ToString());

                        if (!isServiceEnd)
                            pool.Release();
                        lock (lockObj)
                        {
                            taskList.Remove(task);
                            Console.WriteLine("task数: {0}", taskList.Count);
                        }
                    });
                    // スレッド開始
                    task.Start();
                }
            }
            catch (System.Net.Sockets.SocketException e)
            {
                switch (e.ErrorCode)
                {
                    case 10004:
                        Console.WriteLine("{0} Port[{1}]の Listen が強制終了されました。",
                            DateTime.Now.ToString(), PortNo);
                        break;
                    default:
                        Console.WriteLine("{0} Socket exception: {1}, errCode({2})",
                            DateTime.Now.ToString(), e.Message, e.ErrorCode);
                        break;
                }
            }
            finally
            {
                endListen();
                if (pool != null)
                    pool.Close();
                if (tokenSource != null)
                    tokenSource.Dispose();
            }

            System.Threading.Tasks.Task[] tasks;
            lock (lockObj)
            {
                tasks = taskList.ToArray();
            }
            System.Threading.Tasks.Task.WaitAll(tasks);
        }

次は endListen メソッドです。

//
        private void endListen()
        {
            if (isCreatedListner)
            {
                Console.WriteLine("{0} Port[{1}]の Listen を終了します。",
                    DateTime.Now.ToString(), PortNo);
                listner.Stop();
                if (pool != null)    // ループ開始前に Exception が発生したときにはセマフォ操作を行わない
                    pool.Release();
                isCreatedListner = false;
            }
        }

次は doProcess メソッドです。
クライアントからデータを受信した後の15秒の待ちは、5秒ごとに3分割してキャンセルが通知されているか確認を行うようにしています。キャンセルが通知されていたら、OperationCanceledException を投げて処理を中断します。

//
        private void doProcess(System.Net.Sockets.TcpClient client, System.Threading.CancellationToken token)
        {
            // ネットワークストリームを取得
            System.Net.Sockets.NetworkStream stream = null;
            try
            {
                stream = client.GetStream();
                sendData(stream, "Server ready");

                var rString = receiveData(stream);

                token.ThrowIfCancellationRequested();   // キャンセル通知の確認 & キャンセル実行

                // サーバー側処理の代わりに15秒待つ
                //(並行処理の様子の観察に都合が良い)
                System.Threading.Thread.Sleep(5000);
                token.ThrowIfCancellationRequested();   // キャンセル通知の確認 & キャンセル実行

                System.Threading.Thread.Sleep(5000);
                token.ThrowIfCancellationRequested();   // キャンセル通知の確認 & キャンセル実行

                System.Threading.Thread.Sleep(5000);
                token.ThrowIfCancellationRequested();   // キャンセル通知の確認 & キャンセル実行

                sendData(stream, rString);
            }
            catch (System.IO.IOException e)
            {
                if (e.InnerException.GetType() == typeof(System.Net.Sockets.SocketException) &&
                    (e.InnerException as System.Net.Sockets.SocketException).ErrorCode == 10054)
                {
                    Console.WriteLine("{0} クライアントが切断していたためデータを送信できませんでした。",
                        DateTime.Now.ToString());
                }
            }
            catch (OperationCanceledException)
            {   // キャンセル実施をキャッチ
                Console.WriteLine("{0} 子タスクをキャンセルしました。", DateTime.Now.ToString());
            }
            finally
            {
                if (stream != null)
                    stream.Close();
            }
        }

次は receiveData メソッドと sendData メソッドです。

//
        private string receiveData(System.IO.Stream stream)
        {
            var receiveMessage = "";
            using (var memStream = new System.IO.MemoryStream())
            {
                var rBuff = new byte[256];
                int rSize;
                do 
                {
                    // データの一部を受信する
                    rSize = stream.Read(rBuff, 0, rBuff.Length);
                    // rSize が 0 のときにはクライアントが切断したと判断
                    if (rSize == 0)
                    {
                        Console.WriteLine("クライアントが切断しました。");
                        throw new ConnectionCloseException("ストリームの読み出しの際、クライアントが切断していました。");
                    }
                    // 受信したデータを蓄積する
                    memStream.Write(rBuff, 0, rSize);
                } while ((stream as System.Net.Sockets.NetworkStream).DataAvailable);

                // 受信したデータを文字列に変換
                receiveMessage = Encoding.GetString(memStream.ToArray());
                memStream.Close();
            }

            Console.WriteLine("{0} [C] {1}", DateTime.Now.ToString(), receiveMessage);
            return receiveMessage;
        }

        private void sendData(System.IO.Stream stream, string str)
        {
            // 文字列をバイト配列へ
            var data = Encoding.GetBytes(str);
            stream.Write(data, 0, data.Length);
            Console.WriteLine("{0} [S] {1}", DateTime.Now.ToString(), str);
        }

そして、ConnectionCloseException クラスです。

//
    public class ConnectionCloseException : ApplicationException
    {
        public ConnectionCloseException(string message) : base(message) { }
    }

長くなったので、クライアント側を見たい方はプロジェクトファイルをダウンロードして見てください 😉


サーバー側接続待ちの別スレッド化の試作(サーバー)」への1件のフィードバック

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です