send 함수를 멀티스레드로 보내는데 우선 queue 에 담아 놓고

현재 보낼 수 있으면 큐에 있는 내용을 보내고 보낼수 없는 상황이라면 큐에 보낼 내용을 담아 놓기만한다

 

보낼수 있는 상황과 보낼수 없는 상황은

우선 한번 SendAsync 를 할때 동시에 할수는 없고 한번에 하나만 보낼수 있음으로 SendAsync 가 완료 될때까지 다른 스레드가 SendAsync 를 하지 못하게 데이터만 큐에 쌓아 놓게 한뒤 SendAsync 처리가 완료되는 OnSendCompleted 가 호출 되면 SendAsync  처리를 마무리 한다

 

그런데 OnSendCompleted 가 호출 될때 SendAsync 의 한번의 루틴이 끝나기 전에 데이터가 쌓인게 있다면

다시 한번 RegisterSend 를 호출하여 기존에 쌓여 있는것을 다시 send 해주는데

이렇게 하는 이유는 SocketAsyncEventArg 를 여러개 생성하지 않고 재사용하기 위해 이런 처리를 하는건데

이유는 좀금이라도 send 할때의 비용을 아끼기 위해서이다 (유저가 많아질 수록 이 부하도 점점 쌓이기 때문)

 

 

session 코드

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ServerCore
{
    class Session
    {
        Socket _socket;
        int _disconnected = 0;

        SocketAsyncEventArgs _sendArgs = new SocketAsyncEventArgs();
        Queue<byte[]> _sendQueue = new Queue<byte[]>();
        bool _pending = false;
        object _lock = new object();

        public void Start(Socket socket)
        {
            _socket = socket;

            SocketAsyncEventArgs recvArgs = new SocketAsyncEventArgs();
            recvArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnRecvCompleted);


            //receive 할때 데이터를 받는 버퍼를 만들어준다
            recvArgs.SetBuffer(new byte[1024], 0, 1024);        //버퍼를 크게 만들어서 인덱스를 지정하여 분리하여 받아 들일 수도 있다

            //초기에 한번 receive 되도록 등록해 준다
            RegisterRecv(recvArgs);

            _sendArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendCompleted);

        }

        
        public void Send(byte[] sendBuff)
        {
            lock(_lock)
            {
                _sendQueue.Enqueue(sendBuff);
                if (_pending == false)
                {
                    RegisterSend();
                }
            }
           
            
        }


        void RegisterSend()
        {
            _pending = true;
            byte[] buff = _sendQueue.Dequeue();
            
            _sendArgs.SetBuffer(buff, 0, buff.Length);
            bool pending = _socket.SendAsync(_sendArgs);
            if (pending == false)
                OnSendCompleted(null, _sendArgs);
            
        }

        //다른 소켓에 의해 OnSendCompleted 호출 될 수 있음으로 lock 을 걸어준다
        private void OnSendCompleted(object value, SocketAsyncEventArgs args)
        {

            lock (_lock)
            {
                if (args.BytesTransferred > 0 && args.SocketError == SocketError.Success)
                {
                    try
                    {
                        if(_sendQueue.Count > 0)
                        {
                            RegisterSend();     //send를 보내는 처리 과정 사이에 다른 스레드에 의해서 send 가 보내진게 있다면 다시 send 를 보내도록 한다
                        }
                        else
                        {
                            _pending = false;
                        }
                        
                    }
                    catch (System.Exception ex)
                    {
                        Console.WriteLine($"OnSendCompleted Failed {ex}");
                    }
                }
                else
                {
                    DisConnect();
                }
            }

        }

        void RegisterRecv(SocketAsyncEventArgs args)
        {
            bool pending = _socket.ReceiveAsync(args);

            //pending 이  false 인 경우엔 즉 기다리는것이 없이 바로 처리가 될 경우에는
            //OnRecvCompleted를 직접 호출해 줘야 한다, 그 외는 ReceiveAsync 내부에서 나중에 알아서 OnRecvCompleted 를 호출한다
            if (pending == false)       
            {
                OnRecvCompleted(null, args);
            }
        }


        private void OnRecvCompleted(object sender, SocketAsyncEventArgs args)
        {

            //받은 바이트가 0 바이트 이상이고
            if(args.BytesTransferred > 0 &&  args.SocketError == SocketError.Success)
            {
                try
                {
                    //args.Buffer : recvArgs.SetBuffer 에서 설정한 바이트다
                    //args.BytesTransferred : 몇바이트를 받았는지 바이트 수
                    string recvData = Encoding.UTF8.GetString(args.Buffer, args.Offset, args.BytesTransferred);
                    Console.WriteLine($"[From client] {recvData}");
                    RegisterRecv(args);
                }
                catch (Exception e)
                {
                    Console.WriteLine($"OnRecevCompleted Failed {e}");
                }
            }
            else
            {
                DisConnect();
            }
        }

        public void DisConnect()
        {
            //멀티 스레드에서 동시에 disconnect를 처리 할 수 있기 때문에 Interlocked를 사용하여 처리한다
            //Exchange는 _disconnected 값을 1로 바꾼다, 그리고 오리지널 값을 반환한다
            if (Interlocked.Exchange(ref _disconnected, 1) == 1)
            {
                //즉 리턴 값이 1 이라는 얘기는 이전에 한번 disconnect 가 됐었다는 얘기 임으로
                //다시 한번 더 disconnect 를 하려고 하면 리턴처리한다 = > 중복 disconnect 방지
                return;
            }
            _socket.Shutdown(SocketShutdown.Both);
            _socket.Close();
        }
    }
}

 

 

 

서버 시작 부분

using System;
using System.Net.Sockets;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Text;

namespace ServerCore
{
    class Program
    {
        static Listener _listener = new Listener();

        //클라이언트로부터 접속이 와서 Accept 되었을때 호출 되는 함수
        static void OnAcceptHandler(Socket clientSocket)
        {
            try
            {
                
                Session session = new Session();
                session.Start(clientSocket);        //안에 receive 가 비동기로 처리됨

                //클라이언트로 보내는 처리
                byte[] sendBuff = Encoding.UTF8.GetBytes("Welcome to server!");
                session.Send(sendBuff);
                
                Thread.Sleep(1000);
                session.DisConnect();
                //session.DisConnect();     //이렇게 두번 처리 해도 멀티 스레드에 안전하게 처리 했음으로 문제가 되지 않는다

            }
            catch (System.Exception ex)
            {
                Console.WriteLine(ex);
            }
        }

        static void Main(string[] args)
        {
            //DNS : Domain Name System
            // 도메인을 하나 등록해서 해당하는 IP 를 찾아오면 관리가 쉬워짐
            //www.google.com => 

            string host = Dns.GetHostName();
            //host = "google.com";            //ipHost.AddressList[0] == {172.217.161.238}
            IPHostEntry ipHost = Dns.GetHostEntry(host);
            //이렇게 GetHostEntry 로 주소를 얻어오는 건 DNS 서버를 통해서 얻어 올 수 있게 됨

            // ipHost.addressList[0] = IPAddress.Parse("        경우에 따라서 ip 주소는 여러개 일 수도 있다 부하 분산을 위해서 addressList
            IPAddress ipAddr = ipHost.AddressList[0];
            IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777);     //최종 주소

            
            try
            {

                _listener.Init(endPoint, OnAcceptHandler);
                

                while (true)
                {
                }
            }
            catch(Exception e)
            {
                Console.WriteLine(e);
            }
        }
    }
}

 

 

리슨 부분 ( 이전에 있던 Listen 과 유사하다)

using System;
using System.Collections.Generic;
using System.Text;
using System.Net.Sockets;
using System.Net;

namespace ServerCore
{
    class Listener
    {
        Socket _listenSocket;

        Action<Socket> _onAcceptHandler;
        
        public void Init(IPEndPoint iPEndPoint, Action<Socket> onAcceptHandler)
        {
            //AddressFamily ip version 4,6 에 대한 것 , 위에서 자동으로 만들어줌, 
            //tcp 로 할 경우 stream, tcp 로 설정해준다
            //리슨 하는 자체가 소켓을 하나 만들어야 한다
            _listenSocket = new Socket(iPEndPoint.AddressFamily , SocketType.Stream, ProtocolType.Tcp);
            _onAcceptHandler += onAcceptHandler;

            _listenSocket.Bind(iPEndPoint);        //소켓에 ip 와 포트 할당


            //최대 동시 대기 수, 동시에 들어올대 10명까지만 처리 가능하고 그 위로는 실패가 된다
            //Listen() 메서드는 동시에 여러 클라이언트가 접속되었을 때 큐에 몇 개의 클라이언트가
            //대기할 수 있는지 지정할 수 있는데, 위의 경우는 예시를 위해 10을 넣었다.
            _listenSocket.Listen(10);

            //이건 한번 사용하고 재사용이 가능하다
            SocketAsyncEventArgs args = new SocketAsyncEventArgs();
            args.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted);

            //최초 한번은 등록해준다
            RegisterAccept(args);
            
        }

        void RegisterAccept(SocketAsyncEventArgs args)
        {
            //재사용이 됨으로 null 로 처리한다
            args.AcceptSocket = null;

            //SocketAsyncEventArgs 가 하나일때
            //AcceptAsync 는 대기 하고 있는 클라 접속중 하나에 대해서만 receive 처리를 하고 동시에 두개를 하진 않는다,
            //하나하고 그다음 하나 step by step
            bool pending = _listenSocket.AcceptAsync(args);     //비동기 임으로 예약만 하고 넘어간다, accdept 완료는 eventHandler 를 통해서 완료된다
            if (pending == false)   //false 면 pending 없이 바로 완료 됐다는 얘기임
                OnAcceptCompleted(null, args);
        }

        void OnAcceptCompleted(object sender, SocketAsyncEventArgs args)
        {
            if(args.SocketError == SocketError.Success)
            {
                //accept 되어 새로 생성된 소켓을 
                _onAcceptHandler.Invoke(args.AcceptSocket); //넘겨준다
            }
            else
            {
                Console.WriteLine(args.SocketError.ToString());
            }

            //위에 까지 처리가 된것은 accept 가 완료 된것임으로 새로운 accept 를 위해서
            //RegisterAccept 를 다시 호출하여 OnAcceptCompleted 이벤트를 받아 들을 수 있는 상태로 만든다 
            RegisterAccept(args);
        }

        public Socket Accept()
        {

            //return _listenSocket.Accept();    
            //클라의 접속이 있다면 받아오는 처리, 접속이 있을때까지 계속 대기, 즉 다음으로 넘어가지 않는다
            //클라로부터 접속이 왔다면 accept 되어 클라와 별도 통실한 socket 이 생성되어 리턴된다
            //return _listenSocket.Accept();    

            //async 는 비동기로 처리 된다
            //return _listenSocket.AcceptAsync()
            return null;
        }
    }
}

 

 

 

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;

namespace DummyClient
{
    class Program
    {
        static void Main(string[] args)
        {
            string host = Dns.GetHostName();
            IPHostEntry ipHost = Dns.GetHostEntry(host);
            IPAddress ipAddr = ipHost.AddressList[0];
            IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777);     
            
            while (true)
            {
                try
                {
                    Socket socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);

                    //소켓으로 서버에 연결한다 , 서버 입장에선 accept 가 된다
                    socket.Connect(endPoint);
                    Console.WriteLine($"connected to {socket.RemoteEndPoint.ToString()}");


                    //서버로 보낸다
                    for(int i=0;i< 5; ++i)
                    {
                        byte[] sendBuff = Encoding.UTF8.GetBytes($"Hello world! {i}  ");
                        int bytesSent = socket.Send(sendBuff);
                    }

                    //서버에서 받는다
                    byte[] recvBuff = new byte[1024];
                    int recvBytes = socket.Receive(recvBuff);
                    string recvData = Encoding.UTF8.GetString(recvBuff, 0, recvBytes);
                    Console.WriteLine($"received : {recvData}");

                    //서버와 통신을 위해 생성했던 소켓 종료 처리
                    socket.Shutdown(SocketShutdown.Both);
                    socket.Close();

                }
                catch (System.Exception ex)
                {
                    Console.WriteLine(ex);
                }
                Thread.Sleep(100);
            }
           

        }
    }
}

클라이언트 더미 코드 또한 동일하다 : 좀 더 다양하게 서버로 접속하게 처리해야 하지만 우선 send 에 만 비동기이면서 멀티스레드로 보낼수 있는 구조로 만드는 것에 초점을 맞춘다

 

반응형

+ Recent posts