send 함수를 멀티스레드로 보내는데 우선 queue 에 담아 놓고
현재 보낼 수 있으면 큐에 있는 내용을 보내고 보낼수 없는 상황이라면 큐에 보낼 내용을 담아 놓기만한다
보낼수 있는 상황과 보낼수 없는 상황은
우선 한번 SendAsync 를 할때 동시에 할수는 없고 한번에 하나만 보낼수 있음으로 SendAsync 가 완료 될때까지 다른 스레드가 SendAsync 를 하지 못하게 데이터만 큐에 쌓아 놓게 한뒤 SendAsync 처리가 완료되는 OnSendCompleted 가 호출 되면 SendAsync 처리를 마무리 한다
그런데 OnSendCompleted 가 호출 될때 SendAsync 의 한번의 루틴이 끝나기 전에 데이터가 쌓인게 있다면
다시 한번 RegisterSend 를 호출하여 기존에 쌓여 있는것을 다시 send 해주는데
이렇게 하는 이유는 SocketAsyncEventArg 를 여러개 생성하지 않고 재사용하기 위해 이런 처리를 하는건데
이유는 좀금이라도 send 할때의 비용을 아끼기 위해서이다 (유저가 많아질 수록 이 부하도 점점 쌓이기 때문)
session 코드
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ServerCore
{
class Session
{
Socket _socket;
int _disconnected = 0;
SocketAsyncEventArgs _sendArgs = new SocketAsyncEventArgs();
Queue<byte[]> _sendQueue = new Queue<byte[]>();
bool _pending = false;
object _lock = new object();
public void Start(Socket socket)
{
_socket = socket;
SocketAsyncEventArgs recvArgs = new SocketAsyncEventArgs();
recvArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnRecvCompleted);
//receive 할때 데이터를 받는 버퍼를 만들어준다
recvArgs.SetBuffer(new byte[1024], 0, 1024); //버퍼를 크게 만들어서 인덱스를 지정하여 분리하여 받아 들일 수도 있다
//초기에 한번 receive 되도록 등록해 준다
RegisterRecv(recvArgs);
_sendArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendCompleted);
}
public void Send(byte[] sendBuff)
{
lock(_lock)
{
_sendQueue.Enqueue(sendBuff);
if (_pending == false)
{
RegisterSend();
}
}
}
void RegisterSend()
{
_pending = true;
byte[] buff = _sendQueue.Dequeue();
_sendArgs.SetBuffer(buff, 0, buff.Length);
bool pending = _socket.SendAsync(_sendArgs);
if (pending == false)
OnSendCompleted(null, _sendArgs);
}
//다른 소켓에 의해 OnSendCompleted 호출 될 수 있음으로 lock 을 걸어준다
private void OnSendCompleted(object value, SocketAsyncEventArgs args)
{
lock (_lock)
{
if (args.BytesTransferred > 0 && args.SocketError == SocketError.Success)
{
try
{
if(_sendQueue.Count > 0)
{
RegisterSend(); //send를 보내는 처리 과정 사이에 다른 스레드에 의해서 send 가 보내진게 있다면 다시 send 를 보내도록 한다
}
else
{
_pending = false;
}
}
catch (System.Exception ex)
{
Console.WriteLine($"OnSendCompleted Failed {ex}");
}
}
else
{
DisConnect();
}
}
}
void RegisterRecv(SocketAsyncEventArgs args)
{
bool pending = _socket.ReceiveAsync(args);
//pending 이 false 인 경우엔 즉 기다리는것이 없이 바로 처리가 될 경우에는
//OnRecvCompleted를 직접 호출해 줘야 한다, 그 외는 ReceiveAsync 내부에서 나중에 알아서 OnRecvCompleted 를 호출한다
if (pending == false)
{
OnRecvCompleted(null, args);
}
}
private void OnRecvCompleted(object sender, SocketAsyncEventArgs args)
{
//받은 바이트가 0 바이트 이상이고
if(args.BytesTransferred > 0 && args.SocketError == SocketError.Success)
{
try
{
//args.Buffer : recvArgs.SetBuffer 에서 설정한 바이트다
//args.BytesTransferred : 몇바이트를 받았는지 바이트 수
string recvData = Encoding.UTF8.GetString(args.Buffer, args.Offset, args.BytesTransferred);
Console.WriteLine($"[From client] {recvData}");
RegisterRecv(args);
}
catch (Exception e)
{
Console.WriteLine($"OnRecevCompleted Failed {e}");
}
}
else
{
DisConnect();
}
}
public void DisConnect()
{
//멀티 스레드에서 동시에 disconnect를 처리 할 수 있기 때문에 Interlocked를 사용하여 처리한다
//Exchange는 _disconnected 값을 1로 바꾼다, 그리고 오리지널 값을 반환한다
if (Interlocked.Exchange(ref _disconnected, 1) == 1)
{
//즉 리턴 값이 1 이라는 얘기는 이전에 한번 disconnect 가 됐었다는 얘기 임으로
//다시 한번 더 disconnect 를 하려고 하면 리턴처리한다 = > 중복 disconnect 방지
return;
}
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
}
}
}
서버 시작 부분
using System;
using System.Net.Sockets;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Text;
namespace ServerCore
{
class Program
{
static Listener _listener = new Listener();
//클라이언트로부터 접속이 와서 Accept 되었을때 호출 되는 함수
static void OnAcceptHandler(Socket clientSocket)
{
try
{
Session session = new Session();
session.Start(clientSocket); //안에 receive 가 비동기로 처리됨
//클라이언트로 보내는 처리
byte[] sendBuff = Encoding.UTF8.GetBytes("Welcome to server!");
session.Send(sendBuff);
Thread.Sleep(1000);
session.DisConnect();
//session.DisConnect(); //이렇게 두번 처리 해도 멀티 스레드에 안전하게 처리 했음으로 문제가 되지 않는다
}
catch (System.Exception ex)
{
Console.WriteLine(ex);
}
}
static void Main(string[] args)
{
//DNS : Domain Name System
// 도메인을 하나 등록해서 해당하는 IP 를 찾아오면 관리가 쉬워짐
//www.google.com =>
string host = Dns.GetHostName();
//host = "google.com"; //ipHost.AddressList[0] == {172.217.161.238}
IPHostEntry ipHost = Dns.GetHostEntry(host);
//이렇게 GetHostEntry 로 주소를 얻어오는 건 DNS 서버를 통해서 얻어 올 수 있게 됨
// ipHost.addressList[0] = IPAddress.Parse(" 경우에 따라서 ip 주소는 여러개 일 수도 있다 부하 분산을 위해서 addressList
IPAddress ipAddr = ipHost.AddressList[0];
IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777); //최종 주소
try
{
_listener.Init(endPoint, OnAcceptHandler);
while (true)
{
}
}
catch(Exception e)
{
Console.WriteLine(e);
}
}
}
}
리슨 부분 ( 이전에 있던 Listen 과 유사하다)
using System;
using System.Collections.Generic;
using System.Text;
using System.Net.Sockets;
using System.Net;
namespace ServerCore
{
class Listener
{
Socket _listenSocket;
Action<Socket> _onAcceptHandler;
public void Init(IPEndPoint iPEndPoint, Action<Socket> onAcceptHandler)
{
//AddressFamily ip version 4,6 에 대한 것 , 위에서 자동으로 만들어줌,
//tcp 로 할 경우 stream, tcp 로 설정해준다
//리슨 하는 자체가 소켓을 하나 만들어야 한다
_listenSocket = new Socket(iPEndPoint.AddressFamily , SocketType.Stream, ProtocolType.Tcp);
_onAcceptHandler += onAcceptHandler;
_listenSocket.Bind(iPEndPoint); //소켓에 ip 와 포트 할당
//최대 동시 대기 수, 동시에 들어올대 10명까지만 처리 가능하고 그 위로는 실패가 된다
//Listen() 메서드는 동시에 여러 클라이언트가 접속되었을 때 큐에 몇 개의 클라이언트가
//대기할 수 있는지 지정할 수 있는데, 위의 경우는 예시를 위해 10을 넣었다.
_listenSocket.Listen(10);
//이건 한번 사용하고 재사용이 가능하다
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted);
//최초 한번은 등록해준다
RegisterAccept(args);
}
void RegisterAccept(SocketAsyncEventArgs args)
{
//재사용이 됨으로 null 로 처리한다
args.AcceptSocket = null;
//SocketAsyncEventArgs 가 하나일때
//AcceptAsync 는 대기 하고 있는 클라 접속중 하나에 대해서만 receive 처리를 하고 동시에 두개를 하진 않는다,
//하나하고 그다음 하나 step by step
bool pending = _listenSocket.AcceptAsync(args); //비동기 임으로 예약만 하고 넘어간다, accdept 완료는 eventHandler 를 통해서 완료된다
if (pending == false) //false 면 pending 없이 바로 완료 됐다는 얘기임
OnAcceptCompleted(null, args);
}
void OnAcceptCompleted(object sender, SocketAsyncEventArgs args)
{
if(args.SocketError == SocketError.Success)
{
//accept 되어 새로 생성된 소켓을
_onAcceptHandler.Invoke(args.AcceptSocket); //넘겨준다
}
else
{
Console.WriteLine(args.SocketError.ToString());
}
//위에 까지 처리가 된것은 accept 가 완료 된것임으로 새로운 accept 를 위해서
//RegisterAccept 를 다시 호출하여 OnAcceptCompleted 이벤트를 받아 들을 수 있는 상태로 만든다
RegisterAccept(args);
}
public Socket Accept()
{
//return _listenSocket.Accept();
//클라의 접속이 있다면 받아오는 처리, 접속이 있을때까지 계속 대기, 즉 다음으로 넘어가지 않는다
//클라로부터 접속이 왔다면 accept 되어 클라와 별도 통실한 socket 이 생성되어 리턴된다
//return _listenSocket.Accept();
//async 는 비동기로 처리 된다
//return _listenSocket.AcceptAsync()
return null;
}
}
}
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace DummyClient
{
class Program
{
static void Main(string[] args)
{
string host = Dns.GetHostName();
IPHostEntry ipHost = Dns.GetHostEntry(host);
IPAddress ipAddr = ipHost.AddressList[0];
IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777);
while (true)
{
try
{
Socket socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
//소켓으로 서버에 연결한다 , 서버 입장에선 accept 가 된다
socket.Connect(endPoint);
Console.WriteLine($"connected to {socket.RemoteEndPoint.ToString()}");
//서버로 보낸다
for(int i=0;i< 5; ++i)
{
byte[] sendBuff = Encoding.UTF8.GetBytes($"Hello world! {i} ");
int bytesSent = socket.Send(sendBuff);
}
//서버에서 받는다
byte[] recvBuff = new byte[1024];
int recvBytes = socket.Receive(recvBuff);
string recvData = Encoding.UTF8.GetString(recvBuff, 0, recvBytes);
Console.WriteLine($"received : {recvData}");
//서버와 통신을 위해 생성했던 소켓 종료 처리
socket.Shutdown(SocketShutdown.Both);
socket.Close();
}
catch (System.Exception ex)
{
Console.WriteLine(ex);
}
Thread.Sleep(100);
}
}
}
}
클라이언트 더미 코드 또한 동일하다 : 좀 더 다양하게 서버로 접속하게 처리해야 하지만 우선 send 에 만 비동기이면서 멀티스레드로 보낼수 있는 구조로 만드는 것에 초점을 맞춘다
반응형
'서버(Server) > Server' 카테고리의 다른 글
TCP, UDP 차이 (0) | 2022.12.27 |
---|---|
send 시 데이터를 취합해 한번에 보내기 (0) | 2022.12.25 |
_socket.ReceiveAsync 비동기 처리 (0) | 2022.12.23 |
비동기 accept 처리 : _listenSocket.AcceptAsync (0) | 2022.12.23 |
기본적인 소켓 프로그래밍 (0) | 2022.12.23 |