A의 골드 감소 이렇게 모두 처리되야 하는데 중간에 실패가 난다면 완전히 거래가 처리 되는것이 아니고 오류가 되어버리는데 이렇게 되면 테이블 데이터가 잘못되게 된다, 강화 하는 경우에도 비슷한 케이스 All or Nothing 이런걸 해결 하기위해 TRANSACTION 이 있다 아무것도 쓰지 않으면 기본 적으로 TRANSACTION 이 있고 그 뒤에 COMMIT 이 있는 것인데 (EX : INSERT INTO ...) BEGIN TRAN; 을 명시하면 뒤에 COMMIT 또는 ROLLBACK 을 적어 처리할지 되돌릴지를 정할 수 있다
INSERT INTO accounts VALUES(1, 'TESET STR', 100, GETUTCDATE());
BEGIN TRAN;
INSERT INTO accounts VALUES(2, 'TESET STR', 100, GETUTCDATE());
ROLLBACK;
결과를 보면 두번째 것은 추가 안된것을 알수 있다 ROLLBACK 되었음으로
INSERT INTO accounts VALUES(1, 'TESET STR', 100, GETUTCDATE());
BEGIN TRAN;
INSERT INTO accounts VALUES(2, 'TESET STR', 100, GETUTCDATE());
COMMIT;
추가된 경우
아래 구문을 실행하기 전 데이터
--TRY CATCH 와 비슷한 구문
BEGIN TRY
BEGIN TRAN;
INSERT INTO accounts VALUES(1, 'T1', 100, GETUTCDATE());
INSERT INTO accounts VALUES(2, 'T2', 100, GETUTCDATE());
COMMIT;
END TRY
BEGIN CATCH
IF @@TRANCOUNT > 0
ROLLBACK;
END CATCH
@@TRANCOUNT 는 TRAN 이 몇개 인지 알수 있는 매크로인데
BEGIN TRAN;
TRAN;
이렇게 TRAN 을 중첩시키면 1이상이 될 수가 있는데 이때 이 개수를 리턴해주는게 @@TRANCOUNT 이다
그리고 INSERT 는 accountId 가 primary key 임으로 이미 존재하는 키가 있을 경우 또다시 추가 하려고 하면 에러가 발생되어 catch 로 잡히게 된다
실행후 상황
아무 영향이 없었다는 걸 알 수 있다
PRINT('TTT') 를 ROLLBACK 구문 쪽에 써서 ROLLBACK 된 이유를 적어줄 수도 있다
중요한건 TRAN 으로 묶어놓은건 한번에 다 실행하거나 아니면 중간에 오류/예외가 발생하면 모두 실행되지 않는 다는 것이다
그리고 TRANSACTION 은 보통 두개 이상의 테이블에 어떤 변경이나 영향을 줄때 사용된다
사용시 주의 할점은
TRAN 안에는 꼭 원자적으로 실행될 애들만 넣어야 한다
즉 성능적으로 문제가 될 수 있기 때문인데
LOCK이 되기 때문
만약
BEGIN TRAN;
INSERT INTO accounts VALUES(2, 'TESET STR', 100, GETUTCDATE());
이렇게 까지 되어 있으면 위코드는 COMMIT 이나 ROLLBACK 을 만나기 전까지 계속 LOCK 리 걸린 상태가 되며
다른 구문에서 accounts 를 조회하려는 구문을 실행한다 해도 select 구문은 실행 되지 않고 계속 대기하게 된다
대기상태에 빠지게 됨 commit 이나 rollback 을 만나지 않는다면 원자적 특성때문에
같은 얘기지만 TRAN 과 COMMIT/ROLLBACK 사이의 구문은 길지 않게 작성하는 것이 좋다
CROSS JOIN (교차 결합) 서로 교차를 하면서 하나씩 결합을 한다는 것 (1,A), (1,B), (1,C), (2,A)... 총 9개
CREATE TABLE testA
(
a INTEGER
)
CREATE TABLE testB
(
B VARCHAR(10)
)
INSERT INTO testA VALUES(1);
INSERT INTO testA VALUES(2);
INSERT INTO testA VALUES(3);
INSERT INTO testB VALUES('A');
INSERT INTO testB VALUES('B');
INSERT INTO testB VALUES('C');
SELECT *
FROM testA;
SELECT *
FROM testB;
--CROSS JOIN (교차 결합)
--서로 교차를 하면서 하나씩 결합을 한다는 것
--(1,A), (1,B), (1,C), (2,A)... 총 9개
SELECT *
FROM testA
CROSS JOIN testB;
SELECT *
FROM testA, testB;
결과화면
기본 데이터 보기
USE BaseballData;
SELECT *
FROM players
ORDER BY playerID;
SELECT *
FROM salaries
ORDER BY playerID;
INNER JOIN(두개의 테이블을 가로로 결합 + 결합 기준을 ON 으로 한다) UNION 은 세로 즉 위아래로 합치는 것이였다면 INNER JOIN 은 옆으로 합치는 것
SELECT *
FROM players AS P
INNER JOIN salaries AS S
ON P.playerID = S.playerID;
players 의 playerID 와 salaries 의 playerID 가 같은 행 끼리 합치는 것
주의 할점은 조건이 = 인데 양쪽 모두에 playerID 가 있어야 붙여지게 되지 한쪽이라도 id 값이 없다면 해당 행은 걸러진다
즉 양쪽에 모두 정보가 있을때만 나온다
결과를 보면 뒤에 추가 된걸 볼 수 있다
inner join 을 한다는 건 새로운 테이블을 만든 것
-- outer join (외부 결합) --어느 한쪽에만 존재하는 데이터가 있을때 정책을 어떻게 할것인지에 대한 것 -- left join 인경우로 예를 들어보면 두개를 조인 할때 왼쪽에만 있고 오른쪽에는 없다면 --왼쪽 정보를 그대로 채워 넣고 없는 오른쪽 정보는 null 로 채워 넣어서 join 을 한다는 것으로 --inner join 과 유사한데 비어 있는 것을 어떻게 처리 할것인가에 대한 내용이다
SELECT *
FROM players AS P
LEFT JOIN salaries AS S
ON P.playerID = S.playerID
ORDER BY P.playerID;
아래는 playerID 로 정렬하여 두 테이블(players과 salaries )을 본것이고
LEFT JOIN 한다음의 모습이다
adairbi99m 의 줄에서 끝을 보면
salaries 에는 adairbi99m 이 없기 때문에 끝에 NULL 로 채워진것을 볼 수 있다
RIGHT JOIN 은 반대의 개념이 된다
정보가 오른쪽에 있다면 표시되고 그 이후에 같은 행에 왼쪽(plyaer)에 없으면 왼쪽 정보는 NULL 로 채워진다
예시 이미지
오른쪽에 정보는 있지만
왼쪽이 null 로 채워진 경우
그런데 left 나 right 나 테이블 순서를 바꿔주면 동일한 효과가 된다
정리하자면
cross join 은 * 이고
inner join, left join right 조인은 같은 행에 추가 하여 테이블을 만드는 것이다
select playerID, AVG(salary)
from salaries
group by playerID
having AVG(salary) >= 3000000
UNION
--12월에 태어난 선수들의 playerID
select playerID, birthMonth
from players
where birthMonth = 12
order by playerID asc
;
아래는 위 결과를 실행했을때의 결과인데 avg 와 birthmonth 때문에 합쳐지지 않아
silvaca01 이 별도 있는 거을 볼 수 있다
union 열을 같게끔 해주면 중복은제거된다
--커리어 평균 연봉이 3000000 이상인 선수들의 playerID
select playerID
from salaries
group by playerID
having AVG(salary) >= 300000
UNION
--12월에 태어난 선수들의 playerID
select playerID
from players
where birthMonth = 12
order by playerID asc
;
이렇게 하면 합쳐진 하나만 나오는 것을 알수 있다 ( 중복 제거 )
union all 은 중복을 허용한다
select playerID
from salaries
group by playerID
having AVG(salary) >= 300000
UNION all
--12월에 태어난 선수들의 playerID
select playerID
from players
where birthMonth = 12
order by playerID asc
;
union 을 쓰게 되면 order by 는 가장 하단에 와야 한다
교집합(intersect) 을 구한다 즉 양쪽 모두 만족하고 존재하는것을 구한다
--교집합(intersect) 을 구한다 즉 양쪽 모두 만족하고 존재하는것을 구한다
--커리어 평균 연봉이 3000000 이상이거나 (&&) 12월에 태어난 선수들
select playerID
from salaries
group by playerID
having AVG(salary) >= 300000
intersect
--12월에 태어난 선수들의 playerID
select playerID
from players
where birthMonth = 12
order by playerID asc
;
--차집합
--커리어 평균 연봉이 3000000 이상이거나 (-) 12월에 태어난 선수들
select playerID
from salaries
group by playerID
having AVG(salary) >= 300000
except
--12월에 태어난 선수들의 playerID
select playerID
from players
where birthMonth = 12
order by playerID asc
;
use BaseballData;
select birthMonth
from players;
select *,
case birthMonth
when 1 then N'겨울'
when 2 then N'봄'
when 3 then N'가을'
when 8 then N'8이다'
else N'그밖에'
end as birthSeason
from players;
switch case 와 비슷한걸 알 수 있다
위에서 end as birthSeason 끝에 새로 추가된 컬럼의 이름을 birthSeason 으로 지정하겠다는 얘기다
case 의 where 조건에 맞춰 문자로 변환되어 추가 된것을 볼 수 있다
아래 처럼 조건문을 추가 하는 구문 또한 있다
select *,
case
when birthMonth <=1 then N'back'
when birthMonth <=3 then N'나이스'
when birthMonth <=6 then N'앜'
when birthMonth <=9 then N'9이하'
when birthMonth <=12 then N'12이하'
else N'그밖에'
end as birthSeason
from players;
위 구문들에서 else 구문이 없다면 else 에 에 해당 하는것ㅇ느 birthSeason 에서 NULL 이 된다
주의 할점 birthMonth = NULL 이렇게 조건문을 쓸 수 없고 birthMont is NULL 이렇게 비교를 해야한다
USE [BaseballData]
GO
INSERT INTO [dbo].[DateTimeTest]
([time])
VALUES
--('20090909')
(CURRENT_TIMESTAMP)
GO
use BaseballData;
SELECT *
FROM DateTimeTest;
-- 은 주석이다
SELECT *
FROM DateTimeTest;
몇번 추가한 모습
조건식으로 비교할때 다음 처럼 할 수 있다 두개의 결과는 같다
use BaseballData;
SELECT *
FROM DateTimeTest
where time >= CAST('20200101' as DATETIME);
SELECT *
FROM DateTimeTest
where time >= '20200101';
command 패턴으로 서버에서 처리할 명령을 바로 처리하지 않고 우선 처리할 명령을 Queue 에 먾어 넣어 모아두었다가
이 방식은 서버가 처리 할수 있을때 처리 하는 방식으로 직접적으로 명령을 다수의 스레드가 한번에 처리 할때 lock 을 걸어야 하는 상황에서의 부하를 줄일 수 있다
설명
이전까지는 클라이언트 세션을 연결되자마자 바로 GameRoom의 List에 넣어주고 있었지만, 이번에는 JobQueue 개념을 이용해 Queue에 일단 GameRoom의 List에 넣어주는 작업을 '예약'해주고 있다.
코드
ServerCore
JobQueue.cs
public interface IJobQueue
{
void Push(Action job);
}
public class JobQueue : IJobQueue
{
Queue<Action> _jobQueue = new Queue<Action>();
object _lock = new object();
bool _flush = false; // 큐에 쌓인걸 '자신이' 실행할건지. 누군가 하고 있으면 자신은 하지 않는다.
public void Push(Action job)
{
bool flush = false;
lock(_lock)
{
_jobQueue.Enqueue(job);
if (_flush == false)
flush = _flush = true;
}
if (flush)
Flush();
}
void Flush()
{
while(true)
{
// 하나씩 꺼내는 와중에도 다른 애가 Push해서 JobQueue에다 넣을 수 있기 때문에 Pop을 할 때 lock을 잡아줘야 한다.
Action action = Pop();
if (action == null)
return;
action.Invoke();
}
}
Action Pop()
{
lock(_lock)
{
if(_jobQueue.Count == 0)
{
_flush = false;
return null;
}
return _jobQueue.Dequeue();
}
}
}
public class Listener
{
Socket _listenSocket;
Func<Session> _sessionFactory;
public void Init(IPEndPoint endPoint, Func<Session> sessionFactory, int register = 10, int backlog = 10)
{
_listenSocket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
_sessionFactory += sessionFactory;
// 문지기 교육
_listenSocket.Bind(endPoint);
// 영업 시작
// backlog : 최대 대기수
_listenSocket.Listen(backlog);
for (int i = 0; i < register; i++)
{
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted);
RegisterAccept(args);
}
}
...
Server
GameRoom.cs
class GameRoom : IJobQueue
{
List<ClientSession> _sessions = new List<ClientSession>();
JobQueue _jobQueue = new JobQueue();
public void Push(Action job)
{
_jobQueue.Push(job);
}
public void Broadcast(ClientSession session, string chat)
{
// 이 부분은 다른 쓰레드와 공유하고 있지 않음
S_Chat packet = new S_Chat();
packet.playerId = session.SessionId;
packet.chat = $"{chat} 나는 {packet.playerId}";
ArraySegment<byte> segment = packet.Write();
foreach (ClientSession s in _sessions)
s.Send(segment);
}
public void Enter(ClientSession session)
{
_sessions.Add(session);
session.Room = this;
}
public void Leave(ClientSession session)
{
_sessions.Remove(session);
}
}
ClientSession.cs
class ClientSession : PacketSession
{
public int SessionId { get; set; }
public GameRoom Room { get; set; }
public override void OnConnected(EndPoint endPoint)
{
Console.WriteLine($"OnConnected : {endPoint}");
Program.Room.Push(() => Program.Room.Enter(this));
}
public override void OnRecvPacket(ArraySegment<byte> buffer)
{
PacketManager.Instance.OnRecvPacket(this, buffer);
}
public override void OnDisconnected(EndPoint endPoint)
{
SessionManager.Inst.Remove(this);
if(Room != null)
{
Room.Push(() => Room.Leave(this));
Room = null;
}
Console.WriteLine($"OnDisconnected : {endPoint}");
}
public override void OnSend(int numOfBytes)
{
Console.WriteLine($"Transferred bytes: {numOfBytes}");
}
}
PacketHandler.cs
class PacketHandler
{
public static void C_ChatHandler(PacketSession session, IPacket packet)
{
C_Chat chatPacket = packet as C_Chat;
ClientSession clientSession = session as ClientSession;
if (clientSession.Room == null)
return;
GameRoom room = clientSession.Room;
// 할 일을 바로 해주는게 아니라 Push
room.Push(() => room.Broadcast(clientSession, chatPacket.chat));
}
}
Server의 Main에서 Listener를 Init할때 ClientSession 객체를 바로 생성해주는 것이 아닌, SessionManager를 통해 Generate해서 모든 세션을 관리하는 방식을 채택했다. 세션마다 고유 번호를 할당하고 id를 키값으로 Dictionary에 ClientSession을 넣어서 생성, 찾기, 삭제를 할 때마다 lock을 걸어 동시 접근을 차단하는 식으로 구현했다.
서버에는 하나의 GameRoom이 존재하며, 하나의 GameRoom에는 List타입의 _sessions가 존재한다. 그리고 GameRoom에서 어떤 하나의 클라이언트가 메시지를 서버에게 전송하면 서버는 입장 중인 모든 세션 객체에 그 메시지를 Broadcast한다.
하나의 ClientSession이 서버와의 접속이 끊어질 경우, OnDisconnected함수가 호출되어 SessionManager.Remove와 해당 세션이 참여 중인 GameRoom이 있을 경우 GameRoom.Leave() 함수 호출을 통해 퇴장하도록 했다.
10명의 유저가 한 공간에 존재한다고 할때, 한 명의 유저가 메시지를 전송하면, 그 공간 안에 10명의 유저에게 뿌려줘야 한다. 그렇다면 10명의 유저가 동시에 메시지를 하나씩 전송한다면, 10*10=100번의 패킷을 전송해야한다는 말이다. 시간복잡도로 말하자면 O(n^2)가 된다. 그래서 n을 100, 1000으로 늘리면 늘릴수록 서버에 부담이 갈 수 있다.
코드를 그대로 실행해보면 Broadcast 부분에서 수많은 작업자 스레드가 lock에서 대기하고 있는 모습을 볼 수 있다. 하나의 쓰레드가 foreach문을 다 돌 때까지 다른 쓰레드가 대기를 할 수 밖에 없는 상황인데, 최악인 것은 쓰레드가 처리를 못하고 시간을 끌면 작업자 쓰레드를 새로 만들어버리는 악순환이 발생한다는 것이다.
따라서 모든 로직을 lock을 잡아 실행하는 것이 아니라 GameRoom에 Queue를 하나 만들고, 쓰레드들이 일감을 queue에 넣어두고 대기하지 말고 각자 할 일을 하러 가게끔 만들면 된다. 그런 큐를 JobQueue라고 한다.
protobuf 에서 제공하는 컴파일로 컴파일 하게 되면 직렬화하여 넣을 수 있는 구조가 만들어지고
다음 처럼 C++ 에서 사용 하여 데이터를 넣을 수 있다
add_buffs() 는 데이터를 내무적으로 추가하고 포인터를 얻어와 data 포인터를 통해서 데이터를 쓰는 방식이다
sendbuffer 에 패킷 데이터를 밀어 넣기
->Buffer()); //위 코드 중 짤린 부분
S_TEST 패킷을 만들어 데이터를 넣은 모습이다
실제 패킷 보내기처리
다음은 역질렬화방법이다
헤더 만큼은 건너 뛰고 S_TEST::pkt 에 데이터를 담아온다
프로토토콜 버퍼는 구글에서 개발하고 오픈소스로 공개한, 직렬화 데이타 구조 (Serialized Data Structure)이다. C++,C#, Go, Java, Python, Object C, Javascript, Ruby 등 다양한 언어를 지원하며 특히 직렬화 속도가 빠르고 직렬화된 파일의 크기도 작아서 Apache Avro 파일 포맷과 함께 많이 사용된다.
(직렬화란 데이타를 파일로 저장하거나 또는 네트워크로 전송하기 위하여 바이너리 스트림 형태로 저장하는 행위이다.)
특히 GRPC 라는 네트워크 프로토콜의 경우 HTTP 2.0 을 기반으로 하면서, 메세지를 이 프로토콜 버퍼를 이용하여 직렬화하기 때문에, 프로토콜 버퍼를 이해해놓으면 GRPC를 습득하는 것이 상대적으로 쉽다.
프로토콜 버퍼는 하나의 파일에 최대 64M까지 지원할 수 있으며, 재미있는 기능중 하나는 JSON 파일을 프로토콜 버퍼 파일 포맷으로 전환이 가능하고, 반대로 프로토콜 버퍼 파일도 JSON으로 전환이 가능하다.
설치 및 구성
프로토콜 버퍼 개발툴킷은 크게 두가지 부분이 있다. 데이타 포맷 파일을 컴파일 해주는 protoc 와 각 프로그래밍 언어에서 프로토콜 버퍼를 사용하게 해주는 라이브러리 SDK가 있다.
프로토콜 버퍼를 사용하기 위해서는 저장하기 위한 데이타형을 proto file 이라는 형태로 정의한다. 프로토콜 버퍼는 하나의 프로그래밍 언어가 아니라 여러 프로그래밍 언어를 지원하기 때문에, 특정 언어에 종속성이 없는 형태로 데이타 타입을 정의하게 되는데, 이 파일을 proto file이라고 한다.
이렇게 정의된 데이타 타입을 프로그래밍 언어에서 사용하려면, 해당 언어에 맞는 형태의 데이타 클래스로 생성을 해야 하는데, protoc 컴파일러로 proto file을 컴파일하면, 각 언어에 맞는 형태의 데이타 클래스 파일을 생성해준다.
결과 화면인데 패킷중 첫번째를 size로 받아와 size 바이트 만큼 받아 왔으면 그다음 id 그리고 데이터를 순차적으로 읽어온다, 여기서 패킷은 TCP 로 전송 될경우 데이터 한 덩어리중 일부가 지연으로 나중에 올경우를 대비해
패킷 중 사이즈가 2바이트라면 2바이트를 먼저 받고 해당 사이즈를 먼저 추출한다음 전체 바이트 만큼 모두 도착 할때까지 수신을 반복한다, 그리고 모두다 데이터를 수신 받았다면 이후부터 패킷의 내용들을 까서 데이터로 받는 처리를 하게 된다
TCP 는 데이터의 순서가 보장 됨으로 나중에 왔다 하더라고 모두 전송된 데이터의 비트들은 정상적으로 정렬되어 있다
public override void OnRecvPacket(ArraySegment<byte> buffer)
{
int pos = 0;
ushort size = BitConverter.ToUInt16(buffer.Array, buffer.Offset);
pos += 2;
ushort id = BitConverter.ToUInt16(buffer.Array, buffer.Offset + pos);
pos += 2;
// TODO
switch ((PacketID)id)
{
case PacketID.PlayerInfoReq:
{
long playerId = BitConverter.ToInt64(buffer.Array, buffer.Offset + pos);
pos += 8;
}
break;
case PacketID.PlayerInfoOk:
{
int hp = BitConverter.ToInt32(buffer.Array, buffer.Offset + pos);
pos += 4;
int attack = BitConverter.ToInt32(buffer.Array, buffer.Offset + pos);
pos += 4;
}
//Handle_PlayerInfoOk();
break;
default:
break;
}
Console.WriteLine($"RecvPacketId: {id}, Size {size}");
}
Thread Local Storage(TLS)
TLS : 스레드 빌딩 블록으로 스레드 마다 고유한 메모리 영역에 할당되게 된다
public static ThreadLocal<SendBuffer> CurrentBuffer = new ThreadLocal<SendBuffer>(() => { return null; });
코드
ServerCore
Session.cs
namespace ServerCore
{
public abstract class PacketSession : Session
{
public static readonly int HeaderSize = 2;
// size = size 포함한 전체 패킷 크기
// [size(2)][packetId(2)][ ... ][size(2)][packetId(2)][ ... ]
public sealed override int OnRecv(ArraySegment<byte> buffer) // 오버라이드 다시 불가
{
int processLen = 0;
while (true)
{
// 최소한 헤더는 파싱할 수 있는지 확인
if (buffer.Count < HeaderSize)
break;
// 패킷이 완전체로 도착했는지 확인
ushort dataSize = BitConverter.ToUInt16(buffer.Array, buffer.Offset); // ushort
if (buffer.Count < dataSize)
break;
// 여기까지 왔으면 패킷 조립 가능. new 사용했다고 힙에다 할당해주는게 아니라 스택 복사
OnRecvPacket(new ArraySegment<byte>(buffer.Array, buffer.Offset, dataSize));
processLen += dataSize;
// [size(2)][packetId(2)][ ... ] 다음 부분으로 위치 변경
buffer = new ArraySegment<byte>(buffer.Array, buffer.Offset + dataSize, buffer.Count - dataSize);
}
return processLen;
}
public abstract void OnRecvPacket(ArraySegment<byte> buffer);
}
public abstract class Session
{
Socket _socket;
int _disconnected = 0;
RecvBuffer _recvBuffer = new RecvBuffer(1024);
object _lock = new object();
Queue<ArraySegment<byte>> _sendQueue = new Queue<ArraySegment<byte>>();
List<ArraySegment<byte>> _pendingList = new List<ArraySegment<byte>>();
SocketAsyncEventArgs _sendArgs = new SocketAsyncEventArgs();
SocketAsyncEventArgs _recvArgs = new SocketAsyncEventArgs();
public abstract void OnConnected(EndPoint endPoint);
public abstract int OnRecv(ArraySegment<byte> buffer); // 얼마만큼 데이터를 처리했는지 리턴
public abstract void OnSend(int numOfBytes);
public abstract void OnDisconnected(EndPoint endPoint);
public void Start(Socket socket)
{
_socket = socket;
_recvArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnRecvCompleted);
_sendArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendCompleted);
RegisterRecv();
}
public void Send(ArraySegment<byte> sendBuff)
{
lock (_lock)
{
_sendQueue.Enqueue(sendBuff);
if (_pendingList.Count == 0)
RegisterSend();
}
}
public void Disconnect()
{
if (Interlocked.Exchange(ref _disconnected, 1) == 1)
return;
OnDisconnected(_socket.RemoteEndPoint);
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
}
#region 네트워크 통신
void RegisterSend()
{
while (_sendQueue.Count > 0)
{
ArraySegment<byte> buff = _sendQueue.Dequeue();
_pendingList.Add(buff);
}
_sendArgs.BufferList = _pendingList;
bool pending = _socket.SendAsync(_sendArgs);
if (pending == false)
OnSendCompleted(null, _sendArgs);
}
void OnSendCompleted(object sender, SocketAsyncEventArgs args)
{
lock (_lock)
{
if (args.BytesTransferred > 0 && args.SocketError == SocketError.Success)
{
try
{
_sendArgs.BufferList = null;
_pendingList.Clear();
OnSend(_sendArgs.BytesTransferred);
if (_sendQueue.Count > 0)
RegisterSend();
}
catch (Exception e)
{
Console.WriteLine($"OnSendCompleted Failed {e}");
}
}
else
{
Disconnect();
}
}
}
void RegisterRecv()
{
_recvBuffer.Clean(); // 커서가 너무 뒤로 가있는 상태 방지
// 유효한 범위 설정. 다음으로 버퍼를 받을 공간 Set.
ArraySegment<byte> segment = _recvBuffer.WriteSegment;
_recvArgs.SetBuffer(segment.Array, segment.Offset, segment.Count); // count=freesize, 이만큼 받을 수 있다.
bool pending = _socket.ReceiveAsync(_recvArgs);
if (pending == false)
OnRecvCompleted(null, _recvArgs);
}
void OnRecvCompleted(object sender, SocketAsyncEventArgs args)
{
if (args.BytesTransferred > 0 && args.SocketError == SocketError.Success)
{
try
{
// Write 커서 이동
if (_recvBuffer.OnWrite(args.BytesTransferred) == false)
{
Disconnect();
return;
}
// 컨텐츠 쪽으로 데이터를 넘겨주고 얼마나 처리했는지 받는다
int processLen = OnRecv(_recvBuffer.ReadSegment);
if (processLen < 0 || _recvBuffer.DataSize < processLen)
{
Disconnect();
return;
}
// Read 커서 이동
if (_recvBuffer.OnRead(processLen) == false)
{
Disconnect();
return;
}
RegisterRecv();
}
catch (Exception e)
{
Console.WriteLine($"OnRecvCompleted Failed {e}");
}
}
else
{
Disconnect();
}
}
#endregion
}
}
SendBuffer
namespace ServerCore
{
public class SendBufferHelper
{
// 전역이지만 내 쓰레드에서만 사용할 수 있음.
public static ThreadLocal<SendBuffer> CurrentBuffer = new ThreadLocal<SendBuffer>(() => { return null; }); // 처음 만들어질때 null만 리턴하도록
public static int ChunkSize { get; set; } = 4096 * 100;
public static ArraySegment<byte> Open(int reserveSize)
{
if (CurrentBuffer.Value == null) // SendBuffer 한번도 사용 안한 상태
CurrentBuffer.Value = new SendBuffer(ChunkSize);
if (CurrentBuffer.Value.FreeSize < reserveSize)
CurrentBuffer.Value = new SendBuffer(ChunkSize); // 기존 청크 없앤 후 새롭게 할당
return CurrentBuffer.Value.Open(reserveSize);
}
public static ArraySegment<byte> Close(int usedSize)
{
return CurrentBuffer.Value.Close(usedSize);
}
}
public class SendBuffer
{
// RecvBuffer처럼 Clean이 없는 이유는 내가 사용이 끝난 부분 이전 부분을 다른 세션에서 Session클래스의 _sendQueue에 넣어논 상태일 수 있기 때문에, 즉 누군가 앞부분을 참조 중인 상태일 수 있기 때문에 재위치 시킬 수 없다.
// 패킷에 가변인자 들어가면 크기 예측하는게 까다롭기 때문에, 크게 할당 받아놓고 자르는 방식
// [][][][][][][][][u][] : ChunkSize = 4096 * 100
byte[] _buffer;
int _usedSize = 0;
public int FreeSize { get { return _buffer.Length - _usedSize; } }
public SendBuffer(int chunkSize)
{
_buffer = new byte[chunkSize];
}
public ArraySegment<byte> Open(int reserveSize) // 요구할 예약 공간
{
if (reserveSize > FreeSize)
return null;
// 예약공간이기 때문에 usedSize 이동 x
return new ArraySegment<byte>(_buffer, _usedSize, reserveSize); // usedSize 위치 포함해서 reserveSize만큼
}
public ArraySegment<byte> Close(int usedSize) // 예약 사이즈는 3이라도 실제로 2개 사용되었다면 2만큼 범위 리턴
{
ArraySegment<byte> segment = new ArraySegment<byte>(_buffer, _usedSize, usedSize);
_usedSize += usedSize;
return segment;
}
}
}
Server
ClientSession.cs
namespace Server
{
class Packet
{
public ushort size;
public ushort packetId;
}
class PlayerInfoReq : Packet
{
public long playerId;
}
class PlayerInfoOk : Packet
{
public int hp;
public int attack;
}
public enum PacketID
{
PlayerInfoReq = 1,
PlayerInfoOk = 2,
}
class ClientSession : PacketSession
{
public override void OnConnected(EndPoint endPoint)
{
Console.WriteLine($"OnConnected : {endPoint}");
Thread.Sleep(5000);
Disconnect();
}
public override void OnRecvPacket(ArraySegment<byte> buffer)
{
int pos = 0;
ushort size = BitConverter.ToUInt16(buffer.Array, buffer.Offset);
pos += 2;
ushort id = BitConverter.ToUInt16(buffer.Array, buffer.Offset + pos);
pos += 2;
// TODO
switch ((PacketID)id)
{
case PacketID.PlayerInfoReq: // required
{
long playerId = BitConverter.ToInt64(buffer.Array, buffer.Offset + pos);
pos += 8;
}
break;
case PacketID.PlayerInfoOk:
{
int hp = BitConverter.ToInt32(buffer.Array, buffer.Offset + pos);
pos += 4;
int attack = BitConverter.ToInt32(buffer.Array, buffer.Offset + pos);
pos += 4;
}
//Handle_PlayerInfoOk();
break;
default:
break;
}
Console.WriteLine($"RecvPacketId: {id}, Size {size}");
}
// TEMP
public void Handle_PlayerInfoOk(ArraySegment<byte> buffer)
{
}
...
}
DummyClient
ServerSession.cs
namespace DummyClient
{
class Packet
{
public ushort size;
public ushort packetId;
}
class PlayerInfoReq : Packet
{
public long playerId;
}
class PlayerInfoOk : Packet
{
public int hp;
public int attack;
}
public enum PacketID
{
PlayerInfoReq = 1,
PlayerInfoOk = 2,
}
class ServerSession : Session
{
// unsafe : 포인터 조작. 속도가 빠른 장점
static unsafe void ToBytes(byte[] array, int offset, ulong value)
{
fixed (byte* ptr = &array[offset])
*(ulong*)ptr = value;
}
static unsafe void ToBytes<T>(byte[] array, int offset, T value) where T : unmanaged
{
fixed (byte* ptr = &array[offset])
*(T*)ptr = value;
}
public override void OnConnected(EndPoint endPoint)
{
Console.WriteLine($"OnConnected : {endPoint}");
// 패킷의 크기는 아래에서 정해짐
PlayerInfoReq packet = new PlayerInfoReq() { size = 4, packetId = (ushort)PacketID.PlayerInfoReq, playerId = 1001 };
// 보낸다
for (int i = 0; i < 5; i++)
{
ArraySegment<byte> s = SendBufferHelper.Open(4096);
//byte[] size = BitConverter.GetBytes(packet.size);
//byte[] packetId = BitConverter.GetBytes(packet.packetId);
//byte[] playerId = BitConverter.GetBytes(packet.playerId);
ushort size = 0; // int로 만들면 TryWriteBytes의 버전 중 int 버전으로 넘겨줌. ToUInt16
bool success = true;
// destination(Span<byte>)의 공간보다 value의 크기가 크다면 실패
// offset+size부터 offset-size만큼의 범위
size += 2;
success &= BitConverter.TryWriteBytes(new Span<byte>(s.Array, s.Offset + size, s.Count - size), packet.packetId);
size += 2;
success &= BitConverter.TryWriteBytes(new Span<byte>(s.Array, s.Offset + size, s.Count - size), packet.playerId);
size += 8;
success &= BitConverter.TryWriteBytes(new Span<byte>(s.Array, s.Offset, s.Count), size);
ArraySegment<byte> sendBuff = SendBufferHelper.Close(size); // 실질적으로 보내줄 버퍼.
if (success)
Send(sendBuff);
}
}
...
}
현재 보낼 수 있으면 큐에 있는 내용을 보내고 보낼수 없는 상황이라면 큐에 보낼 내용을 담아 놓기만한다
보낼수 있는 상황과 보낼수 없는 상황은
우선 한번 SendAsync 를 할때 동시에 할수는 없고 한번에 하나만 보낼수 있음으로 SendAsync 가 완료 될때까지 다른 스레드가 SendAsync 를 하지 못하게 데이터만 큐에 쌓아 놓게 한뒤 SendAsync 처리가 완료되는 OnSendCompleted 가 호출 되면 SendAsync 처리를 마무리 한다
그런데 OnSendCompleted 가 호출 될때 SendAsync 의 한번의 루틴이 끝나기 전에 데이터가 쌓인게 있다면
다시 한번 RegisterSend 를 호출하여 기존에 쌓여 있는것을 다시 send 해주는데
이렇게 하는 이유는 SocketAsyncEventArg 를 여러개 생성하지 않고 재사용하기 위해 이런 처리를 하는건데
이유는 좀금이라도 send 할때의 비용을 아끼기 위해서이다 (유저가 많아질 수록 이 부하도 점점 쌓이기 때문)
session 코드
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ServerCore
{
class Session
{
Socket _socket;
int _disconnected = 0;
SocketAsyncEventArgs _sendArgs = new SocketAsyncEventArgs();
Queue<byte[]> _sendQueue = new Queue<byte[]>();
bool _pending = false;
object _lock = new object();
public void Start(Socket socket)
{
_socket = socket;
SocketAsyncEventArgs recvArgs = new SocketAsyncEventArgs();
recvArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnRecvCompleted);
//receive 할때 데이터를 받는 버퍼를 만들어준다
recvArgs.SetBuffer(new byte[1024], 0, 1024); //버퍼를 크게 만들어서 인덱스를 지정하여 분리하여 받아 들일 수도 있다
//초기에 한번 receive 되도록 등록해 준다
RegisterRecv(recvArgs);
_sendArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendCompleted);
}
public void Send(byte[] sendBuff)
{
lock(_lock)
{
_sendQueue.Enqueue(sendBuff);
if (_pending == false)
{
RegisterSend();
}
}
}
void RegisterSend()
{
_pending = true;
byte[] buff = _sendQueue.Dequeue();
_sendArgs.SetBuffer(buff, 0, buff.Length);
bool pending = _socket.SendAsync(_sendArgs);
if (pending == false)
OnSendCompleted(null, _sendArgs);
}
//다른 소켓에 의해 OnSendCompleted 호출 될 수 있음으로 lock 을 걸어준다
private void OnSendCompleted(object value, SocketAsyncEventArgs args)
{
lock (_lock)
{
if (args.BytesTransferred > 0 && args.SocketError == SocketError.Success)
{
try
{
if(_sendQueue.Count > 0)
{
RegisterSend(); //send를 보내는 처리 과정 사이에 다른 스레드에 의해서 send 가 보내진게 있다면 다시 send 를 보내도록 한다
}
else
{
_pending = false;
}
}
catch (System.Exception ex)
{
Console.WriteLine($"OnSendCompleted Failed {ex}");
}
}
else
{
DisConnect();
}
}
}
void RegisterRecv(SocketAsyncEventArgs args)
{
bool pending = _socket.ReceiveAsync(args);
//pending 이 false 인 경우엔 즉 기다리는것이 없이 바로 처리가 될 경우에는
//OnRecvCompleted를 직접 호출해 줘야 한다, 그 외는 ReceiveAsync 내부에서 나중에 알아서 OnRecvCompleted 를 호출한다
if (pending == false)
{
OnRecvCompleted(null, args);
}
}
private void OnRecvCompleted(object sender, SocketAsyncEventArgs args)
{
//받은 바이트가 0 바이트 이상이고
if(args.BytesTransferred > 0 && args.SocketError == SocketError.Success)
{
try
{
//args.Buffer : recvArgs.SetBuffer 에서 설정한 바이트다
//args.BytesTransferred : 몇바이트를 받았는지 바이트 수
string recvData = Encoding.UTF8.GetString(args.Buffer, args.Offset, args.BytesTransferred);
Console.WriteLine($"[From client] {recvData}");
RegisterRecv(args);
}
catch (Exception e)
{
Console.WriteLine($"OnRecevCompleted Failed {e}");
}
}
else
{
DisConnect();
}
}
public void DisConnect()
{
//멀티 스레드에서 동시에 disconnect를 처리 할 수 있기 때문에 Interlocked를 사용하여 처리한다
//Exchange는 _disconnected 값을 1로 바꾼다, 그리고 오리지널 값을 반환한다
if (Interlocked.Exchange(ref _disconnected, 1) == 1)
{
//즉 리턴 값이 1 이라는 얘기는 이전에 한번 disconnect 가 됐었다는 얘기 임으로
//다시 한번 더 disconnect 를 하려고 하면 리턴처리한다 = > 중복 disconnect 방지
return;
}
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
}
}
}
서버 시작 부분
using System;
using System.Net.Sockets;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Text;
namespace ServerCore
{
class Program
{
static Listener _listener = new Listener();
//클라이언트로부터 접속이 와서 Accept 되었을때 호출 되는 함수
static void OnAcceptHandler(Socket clientSocket)
{
try
{
Session session = new Session();
session.Start(clientSocket); //안에 receive 가 비동기로 처리됨
//클라이언트로 보내는 처리
byte[] sendBuff = Encoding.UTF8.GetBytes("Welcome to server!");
session.Send(sendBuff);
Thread.Sleep(1000);
session.DisConnect();
//session.DisConnect(); //이렇게 두번 처리 해도 멀티 스레드에 안전하게 처리 했음으로 문제가 되지 않는다
}
catch (System.Exception ex)
{
Console.WriteLine(ex);
}
}
static void Main(string[] args)
{
//DNS : Domain Name System
// 도메인을 하나 등록해서 해당하는 IP 를 찾아오면 관리가 쉬워짐
//www.google.com =>
string host = Dns.GetHostName();
//host = "google.com"; //ipHost.AddressList[0] == {172.217.161.238}
IPHostEntry ipHost = Dns.GetHostEntry(host);
//이렇게 GetHostEntry 로 주소를 얻어오는 건 DNS 서버를 통해서 얻어 올 수 있게 됨
// ipHost.addressList[0] = IPAddress.Parse(" 경우에 따라서 ip 주소는 여러개 일 수도 있다 부하 분산을 위해서 addressList
IPAddress ipAddr = ipHost.AddressList[0];
IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777); //최종 주소
try
{
_listener.Init(endPoint, OnAcceptHandler);
while (true)
{
}
}
catch(Exception e)
{
Console.WriteLine(e);
}
}
}
}
리슨 부분 ( 이전에 있던 Listen 과 유사하다)
using System;
using System.Collections.Generic;
using System.Text;
using System.Net.Sockets;
using System.Net;
namespace ServerCore
{
class Listener
{
Socket _listenSocket;
Action<Socket> _onAcceptHandler;
public void Init(IPEndPoint iPEndPoint, Action<Socket> onAcceptHandler)
{
//AddressFamily ip version 4,6 에 대한 것 , 위에서 자동으로 만들어줌,
//tcp 로 할 경우 stream, tcp 로 설정해준다
//리슨 하는 자체가 소켓을 하나 만들어야 한다
_listenSocket = new Socket(iPEndPoint.AddressFamily , SocketType.Stream, ProtocolType.Tcp);
_onAcceptHandler += onAcceptHandler;
_listenSocket.Bind(iPEndPoint); //소켓에 ip 와 포트 할당
//최대 동시 대기 수, 동시에 들어올대 10명까지만 처리 가능하고 그 위로는 실패가 된다
//Listen() 메서드는 동시에 여러 클라이언트가 접속되었을 때 큐에 몇 개의 클라이언트가
//대기할 수 있는지 지정할 수 있는데, 위의 경우는 예시를 위해 10을 넣었다.
_listenSocket.Listen(10);
//이건 한번 사용하고 재사용이 가능하다
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted);
//최초 한번은 등록해준다
RegisterAccept(args);
}
void RegisterAccept(SocketAsyncEventArgs args)
{
//재사용이 됨으로 null 로 처리한다
args.AcceptSocket = null;
//SocketAsyncEventArgs 가 하나일때
//AcceptAsync 는 대기 하고 있는 클라 접속중 하나에 대해서만 receive 처리를 하고 동시에 두개를 하진 않는다,
//하나하고 그다음 하나 step by step
bool pending = _listenSocket.AcceptAsync(args); //비동기 임으로 예약만 하고 넘어간다, accdept 완료는 eventHandler 를 통해서 완료된다
if (pending == false) //false 면 pending 없이 바로 완료 됐다는 얘기임
OnAcceptCompleted(null, args);
}
void OnAcceptCompleted(object sender, SocketAsyncEventArgs args)
{
if(args.SocketError == SocketError.Success)
{
//accept 되어 새로 생성된 소켓을
_onAcceptHandler.Invoke(args.AcceptSocket); //넘겨준다
}
else
{
Console.WriteLine(args.SocketError.ToString());
}
//위에 까지 처리가 된것은 accept 가 완료 된것임으로 새로운 accept 를 위해서
//RegisterAccept 를 다시 호출하여 OnAcceptCompleted 이벤트를 받아 들을 수 있는 상태로 만든다
RegisterAccept(args);
}
public Socket Accept()
{
//return _listenSocket.Accept();
//클라의 접속이 있다면 받아오는 처리, 접속이 있을때까지 계속 대기, 즉 다음으로 넘어가지 않는다
//클라로부터 접속이 왔다면 accept 되어 클라와 별도 통실한 socket 이 생성되어 리턴된다
//return _listenSocket.Accept();
//async 는 비동기로 처리 된다
//return _listenSocket.AcceptAsync()
return null;
}
}
}
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace DummyClient
{
class Program
{
static void Main(string[] args)
{
string host = Dns.GetHostName();
IPHostEntry ipHost = Dns.GetHostEntry(host);
IPAddress ipAddr = ipHost.AddressList[0];
IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777);
while (true)
{
try
{
Socket socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
//소켓으로 서버에 연결한다 , 서버 입장에선 accept 가 된다
socket.Connect(endPoint);
Console.WriteLine($"connected to {socket.RemoteEndPoint.ToString()}");
//서버로 보낸다
for(int i=0;i< 5; ++i)
{
byte[] sendBuff = Encoding.UTF8.GetBytes($"Hello world! {i} ");
int bytesSent = socket.Send(sendBuff);
}
//서버에서 받는다
byte[] recvBuff = new byte[1024];
int recvBytes = socket.Receive(recvBuff);
string recvData = Encoding.UTF8.GetString(recvBuff, 0, recvBytes);
Console.WriteLine($"received : {recvData}");
//서버와 통신을 위해 생성했던 소켓 종료 처리
socket.Shutdown(SocketShutdown.Both);
socket.Close();
}
catch (System.Exception ex)
{
Console.WriteLine(ex);
}
Thread.Sleep(100);
}
}
}
}
클라이언트 더미 코드 또한 동일하다 : 좀 더 다양하게 서버로 접속하게 처리해야 하지만 우선 send 에 만 비동기이면서 멀티스레드로 보낼수 있는 구조로 만드는 것에 초점을 맞춘다
서버코어 부분인데 이 부분에서 listen과 receive 를 하기 위한 초기호및 보내기 데이터 설정 등을 한다
using System;
using System.Net.Sockets;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Text;
namespace ServerCore
{
class Program
{
static Listener _listener = new Listener();
//클라이언트로부터 접속이 와서 Accept 되었을때 호출 되는 함수
static void OnAcceptHandler(Socket clientSocket)
{
try
{
Session session = new Session();
session.Start(clientSocket); //안에 receive 가 비동기로 처리됨
//클라이언트로 보내는 처리
byte[] sendBuff = Encoding.UTF8.GetBytes("Welcome to server!");
session.Send(sendBuff);
Thread.Sleep(1000);
session.DisConnect();
//session.DisConnect(); //이렇게 두번 처리 해도 멀티 스레드에 안전하게 처리 했음으로 문제가 되지 않는다
}
catch (System.Exception ex)
{
Console.WriteLine(ex);
}
}
static void Main(string[] args)
{
//DNS : Domain Name System
// 도메인을 하나 등록해서 해당하는 IP 를 찾아오면 관리가 쉬워짐
//www.google.com =>
string host = Dns.GetHostName();
//host = "google.com"; //ipHost.AddressList[0] == {172.217.161.238}
IPHostEntry ipHost = Dns.GetHostEntry(host);
//이렇게 GetHostEntry 로 주소를 얻어오는 건 DNS 서버를 통해서 얻어 올 수 있게 됨
// ipHost.addressList[0] = IPAddress.Parse(" 경우에 따라서 ip 주소는 여러개 일 수도 있다 부하 분산을 위해서 addressList
IPAddress ipAddr = ipHost.AddressList[0];
IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777); //최종 주소
try
{
_listener.Init(endPoint, OnAcceptHandler);
while (true)
{
}
}
catch(Exception e)
{
Console.WriteLine(e);
}
}
}
}
리슨을 담당하는 코드
Listen() 메서드는 동시에 여러 클라이언트가 접속되었을 때 큐에 몇 개의 클라이언트가 대기할 수 있는지 지정할 수 있는데, 위의 경우는 예시를 위해 10을 넣었다.
using System;
using System.Collections.Generic;
using System.Text;
using System.Net.Sockets;
using System.Net;
namespace ServerCore
{
class Listener
{
Socket _listenSocket;
Action<Socket> _onAcceptHandler;
public void Init(IPEndPoint iPEndPoint, Action<Socket> onAcceptHandler)
{
//AddressFamily ip version 4,6 에 대한 것 , 위에서 자동으로 만들어줌,
//tcp 로 할 경우 stream, tcp 로 설정해준다
//리슨 하는 자체가 소켓을 하나 만들어야 한다
_listenSocket = new Socket(iPEndPoint.AddressFamily , SocketType.Stream, ProtocolType.Tcp);
_onAcceptHandler += onAcceptHandler;
_listenSocket.Bind(iPEndPoint); //소켓에 ip 와 포트 할당
//최대 동시 대기 수, 동시에 들어올대 10명까지만 처리 가능하고 그 위로는 실패가 된다
_listenSocket.Listen(10);
//이건 한번 사용하고 재사용이 가능하다
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted);
//최초 한번은 등록해준다
RegisterAccept(args);
}
void RegisterAccept(SocketAsyncEventArgs args)
{
//재사용이 됨으로 null 로 처리한다
args.AcceptSocket = null;
bool pending = _listenSocket.AcceptAsync(args); //비동기 임으로 예약만 하고 넘어간다, accdept 완료는 eventHandler 를 통해서 완료된다
if (pending == false) //false 면 pending 없이 바로 완료 됐다는 얘기임
OnAcceptCompleted(null, args);
}
void OnAcceptCompleted(object sender, SocketAsyncEventArgs args)
{
if(args.SocketError == SocketError.Success)
{
//accept 되어 새로 생성된 소켓을
_onAcceptHandler.Invoke(args.AcceptSocket); //넘겨준다
}
else
{
Console.WriteLine(args.SocketError.ToString());
}
//위에 까지 처리가 된것은 accept 가 완료 된것임으로 새로운 accept 를 위해서
//RegisterAccept 를 다시 호출하여 OnAcceptCompleted 이벤트를 받아 들을 수 있는 상태로 만든다
RegisterAccept(args);
}
public Socket Accept()
{
//return _listenSocket.Accept();
//클라의 접속이 있다면 받아오는 처리, 접속이 있을때까지 계속 대기, 즉 다음으로 넘어가지 않는다
//클라로부터 접속이 왔다면 accept 되어 클라와 별도 통실한 socket 이 생성되어 리턴된다
//return _listenSocket.Accept();
//async 는 비동기로 처리 된다
//return _listenSocket.AcceptAsync()
return null;
}
}
}
세션코드로 클라로부터 들어온 데이터를 비동기로 받는 로직이다
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ServerCore
{
class Session
{
Socket _socket;
int _disconnected = 0;
public void Start(Socket socket)
{
_socket = socket;
SocketAsyncEventArgs recvArgs = new SocketAsyncEventArgs();
recvArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnRecvCompleted);
//receive 할때 데이터를 받는 버퍼를 만들어준다
recvArgs.SetBuffer(new byte[1024], 0, 1024); //버퍼를 크게 만들어서 인덱스를 지정하여 분리하여 받아 들일 수도 있다
//초기에 한번 receive 되도록 등록해 준다
RegisterRecv(recvArgs);
}
public void Send(byte[] sendBuff)
{
_socket.Send(sendBuff);
}
void RegisterRecv(SocketAsyncEventArgs args)
{
bool pending = _socket.ReceiveAsync(args);
//pending 이 false 인 경우엔 즉 기다리는것이 없이 바로 처리가 될 경우에는
//OnRecvCompleted를 직접 호출해 줘야 한다, 그 외는 ReceiveAsync 내부에서 나중에 알아서 OnRecvCompleted 를 호출한다
if (pending == false)
{
OnRecvCompleted(null, args);
}
}
private void OnRecvCompleted(object sender, SocketAsyncEventArgs args)
{
//받은 바이트가 0 바이트 이상이고
if(args.BytesTransferred > 0 && args.SocketError == SocketError.Success)
{
try
{
//args.Buffer : recvArgs.SetBuffer 에서 설정한 바이트다
//args.BytesTransferred : 몇바이트를 받았는지 바이트 수
string recvData = Encoding.UTF8.GetString(args.Buffer, args.Offset, args.BytesTransferred);
Console.WriteLine($"[From client] {recvData}");
RegisterRecv(args);
}
catch (Exception e)
{
Console.WriteLine($"OnRecevCompleted Failed {e}");
}
}
else
{
}
}
public void DisConnect()
{
//멀티 스레드에서 동시에 disconnect를 처리 할 수 있기 때문에 Interlocked를 사용하여 처리한다
//Exchange는 _disconnected 값을 1로 바꾼다, 그리고 오리지널 값을 반환한다
if (Interlocked.Exchange(ref _disconnected, 1) == 1)
{
//즉 리턴 값이 1 이라는 얘기는 이전에 한번 disconnect 가 됐었다는 얘기 임으로
//다시 한번 더 disconnect 를 하려고 하면 리턴처리한다 = > 중복 disconnect 방지
return;
}
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
}
}
}
서버와 클라간의 테스트를 위한 더미 클라이언트 코드
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace DummyClient
{
class Program
{
static void Main(string[] args)
{
string host = Dns.GetHostName();
IPHostEntry ipHost = Dns.GetHostEntry(host);
IPAddress ipAddr = ipHost.AddressList[0];
IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777);
while (true)
{
try
{
Socket socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
//소켓으로 서버에 연결한다 , 서버 입장에선 accept 가 된다
socket.Connect(endPoint);
Console.WriteLine($"connected to {socket.RemoteEndPoint.ToString()}");
//서버로 보낸다
for(int i=0;i< 5; ++i)
{
byte[] sendBuff = Encoding.UTF8.GetBytes($"Hello world! {i} ");
int bytesSent = socket.Send(sendBuff);
}
//서버에서 받는다
byte[] recvBuff = new byte[1024];
int recvBytes = socket.Receive(recvBuff);
string recvData = Encoding.UTF8.GetString(recvBuff, 0, recvBytes);
Console.WriteLine($"received : {recvData}");
//서버와 통신을 위해 생성했던 소켓 종료 처리
socket.Shutdown(SocketShutdown.Both);
socket.Close();
}
catch (System.Exception ex)
{
Console.WriteLine(ex);
}
Thread.Sleep(100);
}
}
}
}
bool pending = _listenSocket.AcceptAsync(args);
SocketAsyncEventArgs 가 하나일때(즉 n 번 선언해 놓고 Register 를 n 번 한게 아닌 n = 1 일때) AcceptAsync 는 대기 하고 있는 클라 접속중 하나에 대해서만 receive 처리를 하고 동시에 두개를 하진 않는다, 하나하고 그다음 하나 step by step
클라 접속이 이뤄지더라도 AcceptAsync를 명시적으로 호출해줘야 OnAcceptCompleted가 실행이 된다 즉, AcceptAsync는 일종의 입장 허락의 개념인데 그 전에 접속을 희망한 클라들은 대기열(큐)에 입장 대기를 하고 있게 되고 OnAcceptCompleted를 호출해서 입장 관련 처리를 다 끝낸 다음, 마지막에 RegisterAccept를 다시 호출해서 다음 입장을 받아주기 때문에 동시에 여러 쓰레드가 OnAcceptCompleted를 실행할 수 없기도 하다
Accepting connections asynchronously gives you the ability to send and receive data within a separate execution thread. Before calling the AcceptAsync method, you must call the Listen method to listen for and queue incoming connection requests.
결과 화면 : 더미 클라에서 send 를 연달아 다섯번정도하기 때문에 서버에서 비동기 receive 로 5번이 처리 되는것 것을 볼 수 있다, 콘솔 창에서 보면 알 수 있듯이 receive 가 동시에 멀티 스레드 처럼 5번 처리 되는것이 아니고 한번에 한번만 처리 되는 것이라 볼 수 있는데 현재 Recv 코드는 오직 1번에 1개의 쓰레드만 접근 할 수 있다
다시 말해 최초에 RegisterRecv 를 1개만 걸어 놨기 때문에 Register->Completed .. 과정도 오로지 한번에 1개만 일어난다
using System;
using System.Net.Sockets;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Text;
namespace ServerCore
{
class Program
{
static Listener _listener = new Listener();
static void OnAcceptHandler(Socket clientSocket)
{
try
{
//클라이언트로부터 받아오는 처리
byte[] recvBuff = new byte[1024];
int recvBytpes = clientSocket.Receive(recvBuff);
string recvData = Encoding.UTF8.GetString(recvBuff, 0, recvBytpes);
Console.WriteLine($"[From Client] {recvData}");
//클라이언트로 보내는 처리
byte[] sendBuff = Encoding.UTF8.GetBytes("Welcome to server!");
clientSocket.Send(sendBuff);
clientSocket.Shutdown(SocketShutdown.Both); //연결된 소켓 끊기, 듣기와 말하기를 하지 않겠다는 것
clientSocket.Close(); //클라와 서버간의 연결 끊기
}
catch (System.Exception ex)
{
Console.WriteLine(ex);
}
}
static void Main(string[] args)
{
//DNS : Domain Name System
// 도메인을 하나 등록해서 해당하는 IP 를 찾아오면 관리가 쉬워짐
//www.google.com =>
string host = Dns.GetHostName();
//host = "google.com"; //ipHost.AddressList[0] == {172.217.161.238}
IPHostEntry ipHost = Dns.GetHostEntry(host);
//이렇게 GetHostEntry 로 주소를 얻어오는 건 DNS 서버를 통해서 얻어 올 수 있게 됨
// ipHost.addressList[0] = IPAddress.Parse(" 경우에 따라서 ip 주소는 여러개 일 수도 있다 부하 분산을 위해서 addressList
IPAddress ipAddr = ipHost.AddressList[0];
IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777); //최종 주소
try
{
_listener.Init(endPoint, OnAcceptHandler);
while (true)
{
}
}
catch(Exception e)
{
Console.WriteLine(e);
}
}
}
}
리스터 코드 : 비동기 accept
using System;
using System.Collections.Generic;
using System.Text;
using System.Net.Sockets;
using System.Net;
namespace ServerCore
{
class Listener
{
Socket _listenSocket;
Action<Socket> _onAcceptHandler;
public void Init(IPEndPoint iPEndPoint, Action<Socket> onAcceptHandler)
{
//AddressFamily ip version 4,6 에 대한 것 , 위에서 자동으로 만들어줌,
//tcp 로 할 경우 stream, tcp 로 설정해준다
//리슨 하는 자체가 소켓을 하나 만들어야 한다
_listenSocket = new Socket(iPEndPoint.AddressFamily , SocketType.Stream, ProtocolType.Tcp);
_onAcceptHandler += onAcceptHandler;
_listenSocket.Bind(iPEndPoint); //소켓에 ip 와 포트 할당
//최대 동시 대기 수, 동시에 들어올대 10명까지만 처리 가능하고 그 위로는 실패가 된다
_listenSocket.Listen(10);
//이건 한번 사용하고 재사용이 가능하다
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted);
//최초 한번은 등록해준다
RegisterAccept(args);
}
void RegisterAccept(SocketAsyncEventArgs args)
{
//재사용이 됨으로 null 로 처리한다
args.AcceptSocket = null;
bool pending = _listenSocket.AcceptAsync(args); //비동기 임으로 예약만 하고 넘어간다, accdept 완료는 eventHandler 를 통해서 완료된다
if (pending == false) //false 면 pending 없이 바로 완료 됐다는 얘기임
OnAcceptCompleted(null, args);
}
void OnAcceptCompleted(object sender, SocketAsyncEventArgs args)
{
if(args.SocketError == SocketError.Success)
{
//accept 되어 새로 생성된 소켓을
_onAcceptHandler.Invoke(args.AcceptSocket); //넘겨준다
}
else
{
Console.WriteLine(args.SocketError.ToString());
}
//위에 까지 처리가 된것은 accept 가 완료 된것임으로 새로운 accept 를 위해서
//RegisterAccept 를 다시 호출하여 OnAcceptCompleted 이벤트를 받아 들을 수 있는 상태로 만든다
RegisterAccept(args);
}
public Socket Accept()
{
//return _listenSocket.Accept();
//클라의 접속이 있다면 받아오는 처리, 접속이 있을때까지 계속 대기, 즉 다음으로 넘어가지 않는다
//클라로부터 접속이 왔다면 accept 되어 클라와 별도 통실한 socket 이 생성되어 리턴된다
//return _listenSocket.Accept();
//async 는 비동기로 처리 된다
//return _listenSocket.AcceptAsync()
return null;
}
}
}
더미 클라이언트 코드
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace DummyClient
{
class Program
{
static void Main(string[] args)
{
string host = Dns.GetHostName();
IPHostEntry ipHost = Dns.GetHostEntry(host);
IPAddress ipAddr = ipHost.AddressList[0];
IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777);
while (true)
{
try
{
Socket socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
//소켓으로 서버에 연결한다 , 서버 입장에선 accept 가 된다
socket.Connect(endPoint);
Console.WriteLine($"connected to {socket.RemoteEndPoint.ToString()}");
//서버로 보낸다
byte[] sendBuff = Encoding.UTF8.GetBytes("Hello world!");
int bytesSent = socket.Send(sendBuff);
//서버에서 받는다
byte[] recvBuff = new byte[1024];
int recvBytes = socket.Receive(recvBuff);
string recvData = Encoding.UTF8.GetString(recvBuff, 0, recvBytes);
Console.WriteLine($"received : {recvData}");
//서버와 통신을 위해 생성했던 소켓 종료 처리
socket.Shutdown(SocketShutdown.Both);
socket.Close();
}
catch (System.Exception ex)
{
Console.WriteLine(ex);
}
Thread.Sleep(100);
}
}
}
}
AcceptAsync : 비동기 방식으로 클라의 접속을 말한다
AcceptAsync 로 처리하면 서버에서 클라의 접속을 받아 Accept() 함수 처럼 accept 하게 될때 해당 라인에서 무한 대기 하지 않고 클라와 통신하기 위한 전용 socket 을 만들어 리턴하고 서버에선 바로 다음 로직을 처리 할 수 있게 된다
using System;
using System.Net.Sockets;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Text;
namespace ServerCore
{
class Program
{
static void Main(string[] args)
{
//DNS : Domain Name System
// 도메인을 하나 등록해서 해당하는 IP 를 찾아오면 관리가 쉬워짐
//www.google.com =>
string host = Dns.GetHostName();
//host = "google.com"; //ipHost.AddressList[0] == {172.217.161.238}
IPHostEntry ipHost = Dns.GetHostEntry(host);
//이렇게 GetHostEntry 로 주소를 얻어오는 건 DNS 서버를 통해서 얻어 올 수 있게 됨
// ipHost.addressList[0] = IPAddress.Parse(" 경우에 따라서 ip 주소는 여러개 일 수도 있다 부하 분산을 위해서 addressList
IPAddress ipAddr = ipHost.AddressList[0];
IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777); //최종 주소
//AddressFamily ip version 4,6 에 대한 것 , 위에서 자동으로 만들어줌,
//tcp 로 할 경우 stream, tcp 로 설정해준다
//리슨 하는 자체가 소켓을 하나 만들어야 한다
Socket listenSocket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
try
{
listenSocket.Bind(endPoint); //소켓에 ip 와 포트 할당
//최대 동시 대기 수, 동시에 들어올대 10명까지만 처리 가능하고 그 위로는 실패가 된다
listenSocket.Listen(10);
while (true)
{
Console.WriteLine("Listening .... ");
//클라의 접속이 있다면 받아오는 처리, 접속이 있을때까지 계속 대기, 즉 다음으로 넘어가지 않는다
//클라로부터 접속이 왔다면 accept 되어 클라와 별도 통실한 socket 이 생성되어 리턴된다
Socket clientSocket = listenSocket.Accept();
//클라이언트로부터 받아오는 처리
byte[] recvBuff = new byte[1024];
int recvBytpes = clientSocket.Receive(recvBuff);
string recvData = Encoding.UTF8.GetString(recvBuff, 0, recvBytpes);
Console.WriteLine($"[From Client] {recvData}");
//클라이언트로 보내는 처리
byte[] sendBuff = Encoding.UTF8.GetBytes("Welcome to server!");
clientSocket.Send(sendBuff);
clientSocket.Shutdown(SocketShutdown.Both); //연결된 소켓 끊기, 듣기와 말하기를 하지 않겠다는 것
clientSocket.Close(); //클라와 서버간의 연결 끊기
}
}
catch(Exception e)
{
Console.WriteLine(e);
}
}
}
}
클라이언트 코드
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace DummyClient
{
class Program
{
static void Main(string[] args)
{
string host = Dns.GetHostName();
IPHostEntry ipHost = Dns.GetHostEntry(host);
IPAddress ipAddr = ipHost.AddressList[0];
IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777);
Socket socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
//소켓으로 서버에 연결한다 , 서버 입장에선 accept 가 된다
socket.Connect(endPoint);
Console.WriteLine($"connected to {socket.RemoteEndPoint.ToString()}");
//서버로 보낸다
byte[] sendBuff = Encoding.UTF8.GetBytes("Hello world!");
int bytesSent = socket.Send(sendBuff);
//서버에서 받는다
byte[] recvBuff = new byte[1024];
int recvBytes = socket.Receive(recvBuff);
string recvData = Encoding.UTF8.GetString(recvBuff, 0, recvBytes);
Console.WriteLine($"received : {recvData}");
//서버와 통신을 위해 생성했던 소켓 종료 처리
socket.Shutdown(SocketShutdown.Both);
socket.Close();
}
}
}