Multithreading your game has the potential to massively increase performance. You are getting an entire new thread to work with, so you can offload heavy tasks to your worker thread while your game thread keeps running at a high framerate.

Getting started with multithreading is daunting, at least it was for me, so I'm writing this to help out any fellow devs who might be interested in using this for your own project.

First off: is multithreading the right solution for you?The use case is quite limited, since there are a lot of things you can't do on the other thread. You can't read or write data that lives on your game thread, this includes modifying any UObjects. Doing so will result in undefined behavior - usually access violation errors.

So, what can you use it for? Think of it like an isolated part of your code, you can give it inputs, and then it will go off on it's own and calculate something only using the inputs you gave it. You can only (easily) multithread parts of your code that can be decoupled from the rest of the program.

For example, the way I used it was to calculate a custom lightmap using a recursive flood fill algorithm, the result is a dynamic 2D lightmap like the one in Terraria. First, the game thread prepares the data for the worker thread; it gets the details of every light on screen. Then, it gives the data to the worker thread, which performs the expensive calculations. Finally, the worker thread gives back the finished lightmap, and the cycle continues.

If you think your expensive task can be performed like this, you're in luck, because this guide will show you exactly how to implement it - without worrying about locking variables and mutexes.

Flip flop threading paradigm

I have no idea if that's a real paradigm, but that's what I'm going to call this system I created. This paradigm can be summarized by one video:

 

The game thread is the person flicking the switch, and the worker thread is the thing that switches it off. The switch itself is a boolean.

Basically, when the switch is off, the worker thread does nothing but check if the switch is on. This is when the game thread reads and writes to the worker thread. Once the game thread has finished, it flips the switch to on. When the worker thread sees that it's on, it will perform all of the calculations using the fresh data is received, and flips the switch off once it has finished.

Let's see how that looks in code.

First, let's lay out the foundations to get a FRunnable thread up and running. It won't do anything yet, other than log messages so we can see it working.

Header File

// Copyright notice

#pragma once

#include "CoreMinimal.h"
#include "HAL/Runnable.h"

/**
 *
 */
class GAMENAME_API FMyWorker : public FRunnable
{
public:

	// Constructor, create the thread by calling this
	FMyWorker();

	// Destructor
	virtual ~FMyWorker() override;


	// Overriden from FRunnable
	// Do not call these functions youself, that will happen automatically
	bool Init() override; // Do your setup here, allocate memory, ect.
	uint32 Run() override; // Main data processing happens here
	void Stop() override; // Clean up any memory you allocated here


private:

	// Thread handle. Control the thread using this, with operators like Kill and Suspend
	FRunnableThread* Thread;

	// Used to know when the thread should exit, changed in Stop(), read in Run()
	bool bRunThread;
};

C++ File

// Copyright notice

#include "MyWorker.h" // Change this to reference the header file above


#pragma region Main Thread Code
// This code will be run on the thread that invoked this thread (i.e. game thread)


FMyWorker::FMyWorker(/* You can pass in inputs here */)
{
	// Constructs the actual thread object. It will begin execution immediately
	// If you've passed in any inputs, set them up before calling this.
	Thread = FRunnableThread::Create(this, TEXT("Give your thread a good name"));
}


FMyWorker::~FMyWorker()
{
	if (Thread)
	{
		// Kill() is a blocking call, it waits for the thread to finish.
		// Hopefully that doesn't take too long
		Thread->Kill();
		delete Thread;
	}
}


#pragma endregion
// The code below will run on the new thread.


bool FMyWorker::Init()
{
	UE_LOG(LogTemp, Warning, TEXT("My custom thread has been initialized"))

	// Return false if you want to abort the thread
	return true;
}


uint32 FMyWorker::Run()
{
	// Peform your processor intensive task here. In this example, a neverending
	// task is created, which will only end when Stop is called.
	while (bRunThread)
	{
		UE_LOG(LogTemp, Warning, TEXT("My custom thread is running!"))
		FPlatformProcess::Sleep(1.0f);
	}

	return 0;
}


// This function is NOT run on the new thread!
void FMyWorker::Stop()
{
	// Clean up memory usage here, and make sure the Run() function stops soon
	// The main thread will be stopped until this finishes!

	// For this example, we just need to terminate the while loop
	// It will finish in <= 1 sec, due to the Sleep()
	bRunThread = false;
}

Creating and controlling the thread

This part is easy, just create an instance of the class above using regular C++:

FMyWorker* Worker = new FMyWorker();

The example is set up to run straight away, so if you want to pass it some data, you can either use the constructor, or move the Create() command to another function you can invoke when ready.

Remember to hold onto the pointer so you can delete it when no longer needed.

When finished, use the normal delete command:

delete Worker;

When you've done this, you should have a new thread that announces when it has been created, and will print a message to the log every second. The code should be stable and crash free.

Implementing the Flip Flop

Now that we have a basic thread, we can give it more functionality.

We need to create the boolean that acts as the switch, and the loop that reads this bool.

It's fairly straightforward, here is the code you need to add:

Header File

class GAMENAME_API FMyWorker : public FRunnable
{
public:

	[...]

	// ----------


	// The boolean that acts as the main switch
	// When this is false, inputs and outputs can be safely read from game thread
	bool bInputReady = false;


	// Declare the variables that are the inputs and outputs here.
	// You can have as many as you want. Remember not to use pointers, this must be
	// plain old data
	// For example:
	int ExampleIntInput = 0;
	float ExampleFloatOutput = 0.0f;


private:

	[...]
};

C++ File

uint32 FMyWorker::Run()
{
	// Peform your processor intensive task here. In this example, a neverending
	// task is created, which will only end when Stop is called.
	while (bRunThread)
	{
		if (bInputReady)
		{
			// Do your intensive tasks here. You can safely modify all input variables.
			// For the example, I'm just going to convert the input int to a float
			ExampleFloatOutput = ExampleIntInput;
			FPlatformProcess::Sleep(1.0f); // Simulate a heavy workload

			// Do this once you've finished using the input/output variables
			// From this point onwards, do not touch them!
			bInputReady = false;

			// I hear that it's good to let the thread sleep a bit, so the OS can utilise it better or something.
			FPlatformProcess::Sleep(0.01f);
		}

	}

	return 0;
}

That's all you need on the thread side of things. To access it from the game thread, do something like this:

 

// You need a function like Tick. You can also use timers
void AMyClass::Tick(float DeltaTime)
{
	if (Worker)
	{
		if (!Worker->bInputReady)
		{
			// Read and write the variables here
			Worker->ExampleIntInput = 3141592;
			UE_LOG(LogTemp, Warning, TEXT("Game thread: value gotten from worker thread: %f"), Worker->ExampleFloatOutput)


			// Flick the switch to let the worker thread run
			// Do not read/write variables from thread from here on!
			Worker->bInputReady = true;
		}
	}
}

Note: the variable Worker needs to be initialized before, like in BeginPlay(). Scroll up a bit to see how to initialize it.

 

ref : https://unrealcommunity.wiki/multithreading-with-frunnable-2a4xuf68

 

반응형

 

 

게임, 전화, 디스플레이, 네트워크 데이터 등등 각각의 프로세스별로 레디큐를 두어 처리 하는 방식이다

(게임 ,전화 등등이 우선순위가 하나의 기준으로 우선순위가 처리되지 않음으로)

각 우선순위를 위한 분리된 큐

 

 

 

위에 있는 것일 수록 우선순위가 높음

각각 레디큐가 별도고 가장 높은 우선순위의 큐가 비게 되면 그다음 system process 의 큐를 실행하고

system process 도 다 끝나면 그다음 interactive process (예를 들어 ui같은) 를 그다음 순위로 처리하는 방식이다

 

그런데 이때 문제점이 있는데

가장 상단의 우선순위가 높기 때문에 아래 있는 것일 수록 실행 되지 않는 기아상태가 발생할 수도 있기 때문에

 

그래서 우선순위가 가장 높은 큐는 퀀텀을 8 을 줘서 짧게 실행하게 하고 나가고 그다음 우순위에선 퀀텀 시간을 16을 줘서 좀 더 실행하게 하고 그다음 후순위는 FCFS 로 아래로 갈 수록 더 많은 CPU Burst time 을 할당 받기 때문에 아래 있는 것도 우선순위가 낮아 대기는 하지만 한번 실행될때 대기 한 만큼 좀 더 실행이 되도록 하여 어느정도 균형을 좀 맞출수 있는 형태가 된다

 

 

 

그런데 현대적 컴퓨터에선 프로세스 스케줄링을 하지 않고 스레드 스케줄링을 한다

그런데 지금까지 본것은 프로세스 스케줄링인데 스레드 스케줄링은 프로세스 스케줄링에 견주어 생각해보면 된다

그리고 프로세스에는 커널스레드와 유저스레드가 있는데 이 중에서도 커널 스레드만 스케줄링 하면 되는데

유저 스레드는 스레드 라이브러리에 의해서 관리 되는 소프트웨어적인 측면이기 때문에 커널 유저스레드에 대해 모른다

즉 커널 스레드가 스케룰이 되면 커널스레드는 유저스레드를 서비스만 해주면 된다

 

즉 지금까지 본것은 커널스레드를 스케줄링한다고 생각하면 된다

 

 

 

반응형

기본적으로 SJF 인데 비교 대상이 Burst Time 이 아니가 Priority 를 두어 이것을 기반으로

먼저 처리 할것을 선정하는 방식이다

 

이때 발생하는 문제가 실행시간이 짧은 프로세스가 있는데 우선순위가 먼저 처리되야 하는게 계속 들어오게 되면 실행되지 못하고 게속 대기해야 하는 기아(starvation) 이 발생하게 되는 문제가 있다

 

 

 

그래서 이 문제를 해결 하기 위해서 하는 방법은 프로세스에 aging 숫자를 도입해 오래 된 프로세스일 수록 우선순위를 높여주는 처리를 하여 이 문제를 막을 수 있다

 

 

 

 

좀더 나은 대안으로 RR 과 우선순위 기반의 스케줄링 두개를 섞어서 사용하는 방식도 있다

즉 우선순위를 먼저 처리하고 RR 로 돌아가면서 실행되도록

 

먼저 우선순위 높은걸 처리한다음 동일한 우선순위의 프로세스에 대해선 RR 로 처리한다

 

처음 P4 우선순위가 높기 때문에 P4 가 7초 동안 실행된 다음 P2, P3 우선순위가 같기 때문에 time quantum 즉 시간을 분할 하여 2초 마다 P2, 와 P3  환원큐 방식으로 돌아가면서 처리 되도록 하는 형태인걸 알수 있다

 

 

반응형

 

선행글 : https://3dmpengines.tistory.com/2340

 

이렇게 남아 있는 시간중에 더 짧은 시간인것이 먼저 처리 되게 하는 방식을

SRTF : 즉 Preemptive SJF 스케줄링 이라 한다 , R 은 Remaining

 

0초에 P1 이 도착

1 초에 P2 가 도착

2 초에 P3 가 도착

3 초에 P4 가 도착

 

위 처럼 뭉처 있게 되는데

 

실행순서를 보면 처음 P1 은 레디큐에 아무것도 없기 때문에 바로 실행된다

 

이때 1초가 지났을때 즉 절대 시간 1초에서 보면 

 

P1 의 Burst Time 은 8초에서 이제 7초가 남았고 

P2 가 이제 들어오게 되는데 P2 의 Burst Time 이 4 라서 P1 에 남아 있는 시간

7초보다 작으니 P2가 선점되어 실행되게 된다

 

그다음 절대 시간 3초에서 보면 P2의 남은 시간은 3초고 P3가 이제 도착 했는데 Burst Time 이 9초 임으로

P2 가 계속 실행된다 

 

절대시간 4초에서 보면 P2의 남아 있는 시간은 2초고 P4의 Burst Time 이 5초 임으로 마찬가지로 P2 가 계속 실행된다

 

절대시간 5초에서 보면 P2 는 끝났고 이제 남아 있는 P1의 시간과 나머지 P2, P4 와 마찬가지로 시간 경합을 하여 실행하게 된다

 

최종적으로 실행된 결과는 다음 처럼 된다

사실 위에 있는 이미지와 동일한데 한번더 쓴것

 

 

 

이것은 SJF 하고 비교하면 SRTF 가 평균대기 시간이 좀더 짧을 수 있다

실제 계산해보면 SJF 는 평균 대기 시간이 7.75가 된다

 

 

반응형

SRTF의 경우 짧은 시간을 먼저 실행하도록 계속 변경하다보면

만약  긴 CPU Burst time 동안 실행 되는게 있을때 계속 바뀌면 유저가 끊기는 불편을 느낄 수도 있다

게임이 끊겨서 버벅 거리는 것 과 같이 

 

 

 

RR 은 FCFS 를 돌면서 처리 하는 것인데 

FCFS 에서 시간을 쪼개서( time Quantum 또는time slice )실행하게 한다음 빠져나가게 처리 하는 방식 

그런데 이 쪼개는 시간 단위를 아주 작은 시간단위로 준다 10 ~ 100 밀리 세컨드의 길이로 분할한다

 

이렇게 되면 Ready Queue 에서는 환원큐로 구현하고

 

P1, P2, P3, P4 그다음 다시 P1 앞의 것이 실행 되도록 시간을 분할하여 돌아가며 실행되는 형태이다

 

 

 

그런데 만약 time quantum 시간 보다 프로세스가 처리 되는 시간이 짧다면?

레디큐에서 다음 실행할 프로세스를 바로 실행시키고

1 time quantum 시간 보다 길다면 OS 에서 시간이 됐을때 인터럽트를 발생시켜서 컨텍스트스위치가 일어나게 되고 다음 프로세스 실행시킨다

RR 은 평균 대기 시간이 조금 더 길수는 있다

RR은 time quantum 을 얼마는 주는지에 따라 성능이 확확 달라진다

quantum 시간을 짧게 주면 줄수록 즉 잘게 짜를 수록 컨텍스트 스위칭이 자주 발생하게 되어

전환시간 또한 높아지게 된다(dispatch latency)

quantum 을 최대로 길게하면 FCFS 가 된다

 

 

 

반응형

'운영체제 & 병렬처리 > 시스템프로그래밍' 카테고리의 다른 글

스케줄링 시간 : Priority-Base 과 RR  (0) 2023.02.26
스케줄링 시간 : SRTF  (0) 2023.02.25
스케줄링 시간 : SJF  (0) 2023.02.23
스케줄링 시간 : FCFS  (0) 2023.02.22
스케줄링  (0) 2023.02.21

SJF(Shortest-Job-First)

수행시간이 적은 것을 먼저 처리 하는 방식

만약 다음 처리 시간이 같다면 FCFS 로처리

 

 

 

Process 가 Ready Queue 에 있고 Burst Time 이 주어질때

 

Burst Time 이 가장 작은것 먼저 실행 시키면 위 처럼 간트차트가 된다

 

총 대기시간 28초

평균대기시간은 7초

 

턴어라운드 타임은 프로세스당 앞에 기다린 시간과 해당 프로세스가 처리 되는 시간까지 임으로

 

토탈 턴어라운드 타임은 52

평균 턴어라운드 타임은 13이 된다

 

 

최적이 증명가능하다

평균 대기 시간이 왜 최소로 갖느냐면 짧은게 앞에 있으니 대기시간이 짤방질 수 밖에 없다

 

그런데 이건 구현이 불가능한데, 다음 CPU Burst 타임을 알수가 없기 때문이다

대신 근사적으로 구하는건 예측을 통해서 구현할수 있는데 

예측은 이 프로세스가 과거에 얼마만큼 시간을 썼는지를 보고 판단하게 된다

 

시간은 지수적 시간을 적용하는데 그 이유는 최근에 많이 썼을 수록 가중치를 더 줘서 CPU burst 시간을 조정한다

 

10*1/2 + 6*1/2 = 8

타우식은 CPU Burst 타임이 타우 그래프에 따라 파란색 처럼 이동 평균선 역할을 해준다

 

그런데 이 전 사용시간에 대해서도 저장처리를 해야 하기 때문에 이론적으론 옵티멀한데 실제 사용하긴 어렵다

 

 

SJF 는 Preemtpvie 할 수도 있고 Non-Preemptive 할 수도 있다

앞서서 5초로 CPU burst 이 계산된게 있는데 새로 계산된 프로세스의 시간이 1 이라면 

이때 1초가 먼저 실행 되면 선점형이고 기다렸다 처리되면 비선점형이 된다

그런데 이렇게 할려면 해당  프로세스의 남은 CUP burst Time 을 알아야 하기 때문에 이것 또한 쉽지 않다

 

 

 

 

 

반응형

 

선행글 : https://3dmpengines.tistory.com/2338

 

 

 

시간 0 에서 위 프로세스들이 도착 했다고 가정 했을때 CPU 사용 시간이 24, 3, 3 이라 가정했을때

FCFS 는 다음과 같을 것이다 (아래 그림을 간츠차트라 한다)

FCFS : non-preemptive 로 비선점형이다

 

도착은 P1, P2, P3 순으로 도착하게 되고 P1 은 24 초에 끝나고 P2는 27초에 끝나게 된다

 

대기 시간을 보면

 

P1은 대기 시간이 0 

P2는 24

P3는 27이 된다

 

그래서 총 대기 시간은 0 + 24 + 27 = 51 초가 된다

평균은 51/3 = 17초가 된다

 

이때 턴어라운드 타임은 = p1=24, p2=27, p3=30  이 되고 

총 턴어라운드 타입은 = 24+27+30 = 81

평균 턴어라운드 타임은 = 81/3 = 27 이 된다

 

 

 

이때 평균 대기 시간을 줄일수 있는가? 에 대한 질문이 나오게 된다

여기서 선수를 바꾸면

이렇게 되고

 

그래서 총 대기 시간은 9 초가 된다

평균은 3 초가 되어 줄어들게 된다

 

이때 턴어라운드 타임은 = p1=3, p2=6, p3=30  이 되고

총 턴어라운드 타입은 = 3+6+30 = 39

평균 턴어라운드 타임은 = 39/3 = 13 이 된다

 

결론 : FCFS 같은 경우 프로세스의 처리 시간과 순서에 따라 평균 대기 시간이 확달라진다

FCFS 로는 좋은 효율을 얻기에는 어렵다

 

 

반응형

CPU 에 현재 프로세스가 끝난다음 이후 프로세스를 어떤거롤 선택할 것인가?

 

FIFO 는 큐형태를 말하는 것

우선순위큐는 우선순위가 높은것이 먼저 돌아간다

 

선점형 비선점형으로 우선순위가 갈리는데

 

Preemptive : 쫓아내는 것 , 돌아가고 있는 프로세스 쫓아내는 것

Non-preemptive : 못 쫓아내는 것 , 이것은 프로세스가 cpu 를 선점하고 나면 그 프로세스가 릴리즈 끝날때까지 기다린다는 것

 

 

 

 

1,4번의 경우를 보면 자발적으로 wait 으로 가거나 terminate 상태로 간다 즉 non-preemptive 하기 때문에 non-preemptive  나 preemptive를 고민할 필요가 없다

 

2,3 번의 경우는 2번은 돌다가 레디로 가니 쫓아내어지게 되는 것이고 3번은 대기하고 있는것이 준비가 갔을때 준비로간 프로세스 우선순위가 높다면 바로 실행으로 가기 때문에 쫓아내는 형태가 될수 있다

 

 

디스패처(Dispatcher) 란? : 컨텍스트 스위치를 해주는 모듈을 디스패처라고 한다

디스패처는 당연히 빨라야 한다

 

P0, P1 이 컨텍스트 스위칭 할때

 

PCB 란 운영체제가  프로세스를 제어하기 위해 정보를 저장해 놓은 곳으로 프로세스의 상태 정보를 저장하는 구조체이다

Context Switching 할대 필요하다

PCB 는 프로세스 생성 시 만들어지며 주기억장치에 유지된다

 

PCB0 번 블럭과 PCB1 번 블럭이 교체 되면서 컨텍스트 스위칭이 일어난다

그리고 이렇게 교체 되는 딜레이 시간을 dispatch latency 라고 한다 그리고 이 시간은 가금적 짧아야 한다

 


-------PCB에서 유지되는 정보----------
● PID : 프로세스의 고유 번호
● 상태 : 준비, 대기, 실행 등의 상태
● 포인터 : 다음 실행될 프로세스의 포인터
● Register save area : 레지스터 관련 정보
● Priority : 스케줄링 및 프로세스 우선순위
● 할당된 자원 정보
● Account : CPU 사용시간, 실제 사용된 시간
● 입출력 상태 정보
-------------------------------------------




CPU의 레지스터들의 값이 다른 것로 바뀌기 전에 어딘가에 저장을 하고 나가야해요.

이때 PCB에 CPU에서 수행되던 레지스터 값들이 저장이 됩니다. 내가 수행하던 프로세스가
어디까지 수행됐는지(프로그램 카운터), stack pointer의 위치가 어디인지, 그 외 register들의
집합 정보들을 잠시 저장한다는거죠. 어디에? PCB에!

그림을 보면 executing되다가 다른 프로세스 P1)을 수행시키기 위해 PCB0에다가
P0프로세스 정보를 저장하네요. 그리고 레지스터에 PCB1에 저장되어있던 process 1 정보를
가져와 P1을 수행시킵니다.


이런 저장 공간을 PCB라고 하고 사실 이렇게 수행중인 프로세스를 변경할 때 레지스터에
프로세스의 정보가 바뀌는 것을 Context Switching 문맥교환이라고 합니다.

 

 

위 그림은 컨텍스트 스위치가 얼마나 자주일어나는지 알수 있는 내용이다

vmstat 1 3 은 1초에 3번 확인한것

1초에 72번 1초에 73번 1초에 81번은 CW 가 일어났다는 것

 

 

스케줄링 기준

CPU utilization : cup 가 놀지 않고 최대한 CPU 를 돌리는 방식을 말한다

Throughput : 이 수치를 높이는 것인데 단위시간 내에 프로세스의 완결되는 숫자를 높이는 것

Turnaround time : 프로세스가 실행하면서 종료 될때까지 즉 실행에서 종료되는 시간까지의 시간을 최소화 시키는 것

Waiting time : 어떤 프로세스가 Ready Queue 에서 대기 시간을 최소화 시켜주는 것으로 Ready queue  에 있는 대기 시간의 합을 최소화 시켜주는 방식

 

Response time : 응답 시간을 최소화 하는 것 (UI 같이 반응이 즉각 일어 나는 것들의 반응 시간을 최소화 하는것)

 

 

스케줄링 프로블럼 : Ready Queue 에 있는 프로세스 중에서 어떤걸 CPU 에 올릴것인지에 대한 문제 

 

 

해결방법

 

FCFS : 처음 들어온걸 먼저 올려준다

SJF : SRTF  : 의 방식은 짧은 잡을 먼저 올린다

이렇게 둘은 전체의 해당 프로세스의 시간이 있을때 그 시간 전체를 모두다 할당해 주는 방식이다

 

RR :Round-Robin : 시분할로 시간을 분할하여  정해진 시간 만큼 실행되고 해당 시간이 종료되면 다른것이 올라와 실행 되는 방식

 

Priority-based : RR 을 쓰는데 우선순위를 부여하여 선점하는 방식

MLQ : Multi-Level Queue :  경우에 따라서 위의 방식들 중에서 스케줄링 방식을 선택하는 것

MLFQ :  MLQ를 하는데 피드백을 줘서 상황에 따라 더 좋은 걸 선택하려는 방식

 

 

 

 

 

반응형

 

Freeze and thaw thread execution

You can freeze and thaw, or suspend and resume, threads to control the order in which the threads perform work. Freezing and thawing threads can help you resolve concurrency issues, such as deadlocks and race conditions.

 Tip

To follow a single thread without freezing other threads, which is also a common debugging scenario, see Get started debugging multithreaded applications.

To freeze and unfreeze threads:

  1. In the Threads window, right-click any thread and then select Freeze. A Pause icon in the Current Thread column indicates that the thread is frozen.
  2. Select Columns in the Threads window toolbar, and then select Suspended Count to display the Suspended Count column. The suspended count value for the frozen thread is 1.
  3. Right-click the frozen thread and select Thaw.
  4. The Pause icon disappears, and the Suspended Count value changes to 0.

 

 

Switch to another thread

You may see a The application is in break mode window when you try to switch to another thread. This window tells you that the thread does not have any code that the current debugger can display. For example, you may be debugging managed code, but the thread is native code. The window offers suggestions for resolving the issue.

To switch to another thread:

  1. In the Threads window, make a note of the current thread ID, which is the thread with a yellow arrow in the Current Thread column. You'll want to switch back to this thread to continue your app.
  2. Right-click a different thread and select Switch To Thread from the context menu.
  3. Observe that the yellow arrow location has changed in the Threads window. The original current thread marker also remains, as an outline.
  4. Look at the tooltip on the thread marker in the code source editor, and the list in the Thread dropdown on the Debug Location toolbar. Observe that the current thread has also changed there.
  5. On the Debug Location toolbar, select a different thread from the Thread list. Note that the current thread changes in the other two locations also.
  6. In the source code editor, right-click a thread marker, point to Switch To Thread, and select another thread from the list. Observe that the current thread changes in all three locations.

With the thread marker in source code, you can switch only to threads that are stopped at that location. By using the Threads window and Debug Location toolbar, you can switch to any thread.

 

 

 

 

https://learn.microsoft.com/en-us/visualstudio/debugger/how-to-use-the-threads-window?view=vs-2022&tabs=csharp 

 

Debug a multithreaded app - Visual Studio (Windows)

Debug using the Threads window and the Debug Location toolbar in Visual Studio

learn.microsoft.com

 

반응형

 

 

 

You can flag a thread that you want to give special attention by marking it with an icon in the Threads, Parallel Stacks (thread view), Parallel Watch, and GPU Threads windows. This icon can help you and others distinguish flagged threads from other threads.

Flagged threads also receive special treatment in the Thread list on the Debug Location toolbar and in the other multithreaded debugging windows. You can show all threads or only flagged threads in the Thread list or in the other windows.

To flag or unflag a thread

  • In the Threads or Parallel Watch window, find the thread you are interested in and click the flag icon to select or clear the flag.
  • In the Parallel Stacks window, right-click on a thread or group of threads and select Flag / <thread> or Unflag / <thread>.

To unflag all threads

  • In the Threads window, right-click any thread and then click Unflag All Threads.
  • In the Parallel Watch window, select all flagged threads, then right-click and select Unflag.

To display only flagged threads

  • Choose the Show Flagged Threads Only button in one of the multithreaded debugging windows.

To flag Just My Code

  1. On the toolbar at the top of the Threads window, click the flag icon.
  2. In the drop-down list, click Flag Just My Code.

To flag threads that are associated with selected modules

  1. On the toolbar of the Threads window, click the flag icon.
  2. In the drop-down list, click Flag Custom Module Selection.
  3. In the Select Modules dialog box, select the modules that you want.
  4. (Optional) In the Search box, type a string to search for specific modules.
  5. Click OK.

 

 

ref : https://learn.microsoft.com/en-us/visualstudio/debugger/how-to-flag-and-unflag-threads?view=vs-2022 

 

Flag and Unflag Threads - Visual Studio (Windows)

Learn to flag or unflag threads in Visual Studio. Flag or unflag a thread, several threads, or all threads. Flag just your code or ones associated with a module.

learn.microsoft.com

 

반응형

atomic 을 사용한다고 가시성이 해결 되지 않고 단지

 

동일한 객체에 대해서 동일한 수정순서를 관찰 할 수가 있다

수정 순서란 : 이전에  B 값에서 C 값으로 바뀐 순서를 말하며 B  로 수정순서가 완료됐었을때는 B 이전의 상태인 A 로 되돌아가지 못한다 ( A 가 과거의 값이였다는 가정하에)

 

그래서 atomic 을 사용한다고 해서 가시성과 코드 재배치의 문제가 해결 되지 않는다

하지만...

 

 

 

B  에서 C로 아직 바뀌지 않은 상태는 현재 cpu 에서 캐쉬쪽은 최신 값일순 있어도 아직 메모리에는 이전 값이 있게 되는 현상을 말한다

즉 atomic 을 사용했어도 이 현상이 있기 때문에

 

가시성, 코드 재배치 문제의 해결이 되지 않음 단지 동일한 객체에 대해서 다른스레드 들에서 동일한 수정순서는 관찰 할수 있다

 

 

그런데 atomic 을 사용할대 뒤의 인자 memory_order:.... 를 어떤걸 쓰느냐에 따라

가시성과 코드 재배치의 문제가 해결이 되느냐 안되느냐가 결정이 된다

 

ex : memory_order::memory_order_seq_cst

 

1. Sequentially consistent  (가장 엄격, 최적화 여지 적음)  seq_cst  , default 버전

     가시성, 코드 재배치 문제가 해결된다 , 그냥 atomic 을 쓰면 default 인자로  memory_order::memory_order_seq_cst         이 들어가기 때문에 가시성과 코드 재배치 문제가 해결된다

 

2. Acquire-Release ( acquire , release) : 두개를 써주는데  이 경우에는

 

memory_order::memory_order_release  : 

release명령 이전 메모리 명령들이, 해당 명령 이후로 재배치 되어 들어가는것을 막아준다

즉 release 위에 있는 것들음 메모리 명령 재배치가 일어날 수는 있다는 것

 

그리고 acquire 로 같은 변수를 읽는 스레드가 있다면

memory_order::memory_order_acquire

release 이전의 명령들이 acquire 하는 순간에 관찰 가능 해진다 (즉 가시성이 보장이 된다는 것)

 

acquire 아래 있는 명령들이 acqurie 위로 올라가는 재배치를 막아 준다는 것이면서 

release 이전의 내용들이 acquire 이후에는 모두다 메모리에서 제대로 보여지는 상태가 보장이 되어

가시성이 보장이 된다는 것이다

release 이전의 value = 10 이 acqure 이후에는 value 에서는 가시성이 보장이 acquire 로 되었음으로 

acquire 이후로 정확하게 가시성을 보장받아 value== 10 이 된다

 

그래서 1,3 주간 단계라 할 수 있다

 

 

3) Released (relaxed) : memory_order::memory_order_relaxed

최적화 여지가 많음

     코드 재배치와 가시성이 해결되지 못하고,
     단지 동일객체에 대한  동일 수정순서만 보증해준다 (즉 현재 값 B 과 변경 예상인 값C 들 보다 더 과거 값이(A) 되진 않는다는 것 )

 

 

 

 

 

정리하자면 1,2 번을 쓰고 왠만하면 seq_cst 를 쓰자

 

그런데 Intel, AMD 에서는 seq_cst 를 자체적으로 보장해줘서 seq_cst 를 기본으로 써도 성능차이는 거의 나지 않는다 하지만 ARM  에서는 차이가 발생한다

 

자바에선 volatile키워드로 가시성 문제를 해결하는데 c++ 은 단지 코드 최적화만 막아준다

 


 

 

 

exchange 가 있는 이유는 이 함수로 바꿔야지 prev 에 데이터가 flag 에 변경되어진 값을 스레드 세이프하게 받을수 있음

그렇지 않고 

bool prev = flag ; 이렇게 받으면 prev 의 값과 flag 값이 스레드 타이밍상 이 순간에서조차 다른 값일 수 있다

 

 

 

원자적으로 바꾸는 다른 예시..

compare_change-strong 이 함수는 아래  if 처럼 동작한다

 

반응형

이글을 먼저 보신 후 보시면 되겠습니다

 

https://3dmpengines.tistory.com/2233

 

MemoryPool

메모리 풀의 전체 구조는 아래와 같습니다 컨셉은 다양한 크기의 메모리를 메모리 풀화시켜서 사용 하는 방식입니다 주요 클래스는 다음과 같습니다 오른쪽 하단 부터 위쪽으로 화살표 방향대

3dmpengines.tistory.com

 

위 Memory Poll 에서 ms 로 변경 처리를 하면 다음 처럼 바뀝니다 (주석에 설명이 들어 있음)

왠만하면 기본제공해 주는걸 사용하는게 좋습니다

 

 

 

 

 

MeoryHeader 부분

 : 16바이트 단위로 메모리를 정렬, SLIST_ENTRY 을 상속 받으면 메모리가 SLIST_ENTRY 이 먼저 배치됨
    그래서 next 포인터가 먼저 존재하게 됨

    그리고 나머지들도 16바이트 단위로 메모리로 정렬

#pragma once

enum
{
	SLIST_ALIGNMENT = 16
};

//16바이트 단위로 메모리를 정렬, SLIST_ENTRY 을 상속 받으면 메모리가 SLIST_ENTRY 이 먼저 배치됨
//그래서 next 포인터가 먼저 존재하게 됨
DECLSPEC_ALIGN(SLIST_ALIGNMENT)
struct MemoryHeader : SLIST_ENTRY
{
	// [MemoryHeader][Data]
	MemoryHeader(int32 size) : allocSize(size) { }

	static void* AttachHeader(MemoryHeader* header, int32 size)
	{
		new(header)MemoryHeader(size); // placement new
		return reinterpret_cast<void*>(++header);
	}

	static MemoryHeader* DetachHeader(void* ptr)
	{
		MemoryHeader* header = reinterpret_cast<MemoryHeader*>(ptr) - 1;
		return header;
	}

	int32 allocSize;
	// TODO : 필요한 추가 정보
};



//16바이트 단위로 메모리를 정렬
DECLSPEC_ALIGN(SLIST_ALIGNMENT)
class MemoryPool
{
public:
	MemoryPool(int32 allocSize);
	~MemoryPool();

	void			Push(MemoryHeader* ptr);
	MemoryHeader*	Pop();

private:

	//lock 과 queue 를제거하고 SLIST_HEADER 를 추가하여 queue를 대체한다
	SLIST_HEADER _header;

	int32 _allocSize = 0;
	atomic<int32> _allocCount = 0;

	//USE_LOCK;
	//queue<MemoryHeader*> _queue;
};

 

 

cpp 부분

 

 

기존 push 와 pop 을 ms 함수로 대체한 코드입니다

동작은 동일합니다

#include "pch.h"
#include "MemoryPool.h"


MemoryPool::MemoryPool(int32 allocSize) : _allocSize(allocSize)
{
	//헤드를 초기화 해준다 
	::InitializeSListHead(&_header);
}

MemoryPool::~MemoryPool()
{
	/*
	* 이 부분은 더이상 사용하지 않고, 아래 구문으로 대체
	while (_queue.empty() == false)
	{
		MemoryHeader* header = _queue.front();
		_queue.pop();
		::free(header);
	}
	*/
	while (MemoryHeader* next = static_cast<MemoryHeader*>(::InterlockedPopEntrySList(&_header)))
	{
		//16바이트 정렬 할당을 aligned malloc 을 했으니 이에 맞춰 _aligned_free 로 해제처리 해준다
		::_aligned_free(next);
	}
	
}

void MemoryPool::Push(MemoryHeader* ptr)
{
	//lock 은 안쓰니 제거
	//WRITE_LOCK;
	ptr->allocSize = 0;

	//queue 부분 제거
	// Pool에 메모리 반납
	//_queue.push(ptr);

	//대신 ms 걸 쓴다
	::InterlockedPushEntrySList(&_header, static_cast<PSLIST_ENTRY>(ptr));


	_allocCount.fetch_sub(1);
}

MemoryHeader* MemoryPool::Pop()
{
	//pop 할때도 기존 것은 제거하고 ms 걸로 대체한다
	//static_cast<MemoryHeader*> 캐스팅 하는 이유는 _header 를 통해 꺼내온  
	//PSLIST_ENTRY 는  MemoryHeader 가 SLIST_ENTRY를 상속받고 있음으로 가능하다
	MemoryHeader* newNext =  static_cast<MemoryHeader*>(::InterlockedPopEntrySList(&_header));
	/* 기존 queue 부분 제거
	{
		WRITE_LOCK;
		// Pool에 여분이 있는지?
		if (_queue.empty() == false)
		{
			// 있으면 하나 꺼내온다
			header = _queue.front();
			_queue.pop();
		}
	}
	*/

	// 없으면 새로 만들다
	if (newNext == nullptr)
	{
		//header = reinterpret_cast<MemoryHeader*>(::malloc(_allocSize));
		//그냥 malloc 하면 안되고 MemoryHeader에 대한 메모리 할당임으로 16 바이트로 정렬된 메모리를 할당 받기 위해서 _aligned_malloc 사용한다
		newNext = reinterpret_cast<MemoryHeader*>(::_aligned_malloc(_allocSize, SLIST_ALIGNMENT));
	}
	else
	{
		ASSERT_CRASH(newNext->allocSize == 0);
	}

	_allocCount.fetch_add(1);

	return newNext;
}

 

 

 

 

 

 

이부분은 동일..

#pragma once
#include "Allocator.h"

class MemoryPool;


class Memory
{
	enum
	{
		// ~1024까지 32단위, ~2048까지 128단위, ~4096까지 256단위
		POOL_COUNT = (1024 / 32) + (1024 / 128) + (2048 / 256),
		MAX_ALLOC_SIZE = 4096
	};

public:
	Memory();
	~Memory();

	void*	Allocate(int32 size);
	void	Release(void* ptr);

private:
	vector<MemoryPool*> _pools;

	// 메모리 크기 <-> 메모리 풀
	// O(1) 빠르게 찾기 위한 테이블
	MemoryPool* _poolTable[MAX_ALLOC_SIZE + 1];
};


template<typename Type, typename... Args>
Type* xnew(Args&&... args)
{
	Type* memory = static_cast<Type*>(xalloc_(sizeof(Type)));
	new(memory)Type(forward<Args>(args)...); // placement new
	return memory;
}

template<typename Type>
void xdelete(Type* obj)
{
	obj->~Type();
	xrelease_(obj);
}

 

 

 

cpp

: 메모리 정렬 부분들이 추가 되었습니다

#include "pch.h"
#include "Memory.h"
#include "MemoryPool.h"


Memory::Memory()
{
	int32 size = 0;
	int32 tableIndex = 0;

	for (size = 32; size <= 1024; size += 32)
	{
		MemoryPool* pool = new MemoryPool(size);
		_pools.push_back(pool);

		while (tableIndex <= size)
		{
			_poolTable[tableIndex] = pool;
			tableIndex++;
		}
	}

	for (; size <= 2048; size += 128)
	{
		MemoryPool* pool = new MemoryPool(size);
		_pools.push_back(pool);

		while (tableIndex <= size)
		{
			_poolTable[tableIndex] = pool;
			tableIndex++;
		}
	}

	for (; size <= 4096; size += 256)
	{
		MemoryPool* pool = new MemoryPool(size);
		_pools.push_back(pool);

		while (tableIndex <= size)
		{
			_poolTable[tableIndex] = pool;
			tableIndex++;
		}
	}
}

Memory::~Memory()
{
	for (MemoryPool* pool : _pools)
		delete pool;

	_pools.clear();
}

void* Memory::Allocate(int32 size)
{
	MemoryHeader* header = nullptr;
	const int32 allocSize = size + sizeof(MemoryHeader);

	if (allocSize > MAX_ALLOC_SIZE)
	{
		// 메모리 풀링 최대 크기를 벗어나면 일반 할당
		header = reinterpret_cast<MemoryHeader*>(::_aligned_malloc(allocSize, SLIST_ALIGNMENT));
	}
	else
	{
		// 메모리 풀에서 꺼내온다
		header = _poolTable[allocSize]->Pop();
	}

	return MemoryHeader::AttachHeader(header, allocSize);
}

void Memory::Release(void* ptr)
{
	MemoryHeader* header = MemoryHeader::DetachHeader(ptr);

	const int32 allocSize = header->allocSize;
	ASSERT_CRASH(allocSize > 0);

	if (allocSize > MAX_ALLOC_SIZE)
	{
		// 메모리 풀링 최대 크기를 벗어나면 일반 해제
		::_aligned_free(header);
	}
	else
	{
		// 메모리 풀에 반납한다
		_poolTable[allocSize]->Push(header);
	}
}

 

반응형

 

이 글을 먼저 참고한 이후 보시면 됩니다

https://3dmpengines.tistory.com/2237

 

MemoryPool 2 (포인터와 인덱스 값으로 ABA 문제 해결방안 128bit)

interlockedCompareExchange128 의 동작은 아래 글을 참고 https://3dmpengines.tistory.com/2236 InterlockedCompareExchange128 vs InterlockedCompareExchange64 주요 차이점 리턴 값이 다르다 128 비트를 사..

3dmpengines.tistory.com

 

 

위 글하고 같은건데 이것은 MS 에서 제공하는 내장 함수를 사용하는 안전한 방식 입니다

(이전 코드에는 약간의 테스트코드에서 하자가 있음) => 기본 제공 하는 MS 방식이 검증된것으로 안전한 방식이기도..

 

 

 

 

제공되는 SLIST_HEADER 는 아래와 같습니다 (이전 코드와 같은 걸 알 수 있습니다)

typedef union DECLSPEC_ALIGN(16) _SLIST_HEADER {
    struct {  // original struct
        ULONGLONG Alignment;
        ULONGLONG Region;
    } DUMMYSTRUCTNAME;
    struct {  // x64 16-byte header
        ULONGLONG Depth:16;
        ULONGLONG Sequence:48;
        ULONGLONG Reserved:4;
        ULONGLONG NextEntry:60; // last 4 bits are always 0's
    } HeaderX64;
} SLIST_HEADER, *PSLIST_HEADER;

 

 

SLIST_ENTRY 또한 다음 노드만 가리키는 형태입니다

typedef struct DECLSPEC_ALIGN(16) _SLIST_ENTRY {
    struct _SLIST_ENTRY *Next;
} SLIST_ENTRY, *PSLIST_ENTRY;

 

 

아래는 MS 것을 사용한 테스크 코드입니다

 

헤더(SLIST_HEADER)가 있고 그 이후로 SLIST_ENTRY 를 추가하거나 삭제 할수 있습니다, 멀티스레드 환경에서, 스택처럼

 

lock-free stack 은 다음 처럼 사용하면 됩니다

DECLSPEC_ALIGN(16)
class Data // : public SListEntry
{
public:
	SLIST_ENTRY _entry;
	int64 _rand = rand() % 1000;
};

SLIST_HEADER* GHeader;

int main()
{
	GHeader = new SLIST_HEADER();
	ASSERT_CRASH(((uint64)GHeader % 16) == 0);
	//헤드 초기화
	InitializeSListHead(GHeader);

	for (int32 i = 0; i < 3; i++)
	{
		GThreadManager->Launch([]()
			{
				while (true)
				{
					Data* data = new Data();
					ASSERT_CRASH(((uint64)data % 16) == 0);

					//헤드에 다음 노드 추가
					::InterlockedPushEntrySList(GHeader, (SLIST_ENTRY*)data);
					this_thread::sleep_for(10ms);
				}
			});
	}

	for (int32 i = 0; i < 2; i++)
	{
		GThreadManager->Launch([]()
			{
				while (true)
				{
					Data* pop = nullptr;
					pop = (Data*)::InterlockedPopEntrySList(GHeader);

					if (pop)
					{
						cout << pop->_rand << endl;
						delete pop;
					}
					else
					{
						cout << "NONE" << endl;
					}
				}
			});
	}

	GThreadManager->Join();
}

 

 

추가적으로 

 

DECLSPEC_ALIGN(16)
class Data // : public SLIST_ENTRY 
{
public:
SLIST_ENTRY _entry;
int64 _rand = rand() % 1000;

}

 

 

Data  부분은 SLIST_ENTRY  를 상속 받거나 멤버변수에 첫번째로 넣어 16 바이트 정렬을 맞춰 준 다음

포인터 연산을 가능하게 해주면 됩니다

 

 

반응형

 

 

 

interlockedCompareExchange128  의 동작은 아래 글을 참고

 

https://3dmpengines.tistory.com/2236

 

InterlockedCompareExchange128 vs InterlockedCompareExchange64

주요 차이점 리턴 값이 다르다 128 비트를 사용하는가, 64 비트를 사용 하는가가 다르다 BOOLEAN InterlockedCompareExchange128( [in, out] LONG64 volatile *Destination, [in] LONG64 ExchangeHigh, [in] LONG6..

3dmpengines.tistory.com

 

 

 

 

 

아래는 아직 아직 ABA 을 고려하지 않은 상황

데이터안에 다음 노드가 담기는 타입의 스택컨셉입니다

 

데이터 안에 List 를 연결한 포인터를 갖고 있고 이 포인터가 노드들을 이어서 리스트를 이룹니다

즉 데이터가 바깥으로 빠져 있는 형태입니다

  

 

헤더와 데이터가 있다면 데이터와 헤더 사이에 새로 추가된 노드를 끼워 넣는 방식입니다

pop 할때는 반대로 처리 하면 됩니다

 

 

 

위의 방식은 싱글 스레드 방식이라서 이것을 멀티 스레드에서 동작하게 하려면 주의 할것은 ABA problem 이다

 

참고..

https://3dmpengines.tistory.com/2234

 

lock-free aba problem

예전에 게임 서버 포폴 만드는 공부에서 IOCP 구조로 만든 네트워크 스레드(n개)에서 받아들여온 패킷들을 Lock-Free Queue에 넣고, 게임 로직 스레드에서 꺼내다가 로직처리하도록 만들었었다. I have

3dmpengines.tistory.com

 

 

 

 

 

 

노드가 추가 될때를 우선 보면 되는데

아래 같은 경우 ABA problem 이 발생 할 수 있다

이런 경우에 interlockedCompared.. .함수에 들어가기 직전에 header 값이 삭제 됐다가 다시 할당 받았을 경우

우연히 이전 주소와 같은 값일 경우가 발생 할수가 있다 이것을 ABA problem 이라 하는데 이것을 해결 하기 위해서

주소 값만 비교한는 것이 아닌 비교 대상이 이전에 증가한적이 있는지 등의 값을 추가로 넣어 좀 더 큰 비트로 비교하여처리하게 된다

 

 

 

 

 

위와 같은 식으로 인덱스를 증가 시켜는 방식인데

비교 할때 현재 header 의 인덱스와 주소 그리고 expected 의 주소와 인덱스 값들 마저 같다면

인덱스가 증가된 disred 값으로 header 인덱스와 다음 새로운 entry 를 가리키도록 한다

 

 

InterlockedCompareExchaged128 실행 전

 

 

InterlockedCompareExchaged128 실행 이후 (성공했을 경우)

 

다른곳에서 header 가 변경 되지 않았다면 desired 값으로 header 값을 변경한다

interlockedCompareExchange128 값이 변경 됐다면 header 는 depth 과 sequence 가 1씩 증가하고 next 또한 header 의 다음 노드를 가리키게 된다, 만약 interlocked.. 이 실패하면 아무 header 는 변화 하지 않은채 변경 될때까지 시도한다

 



SListHeader 그림에 next 가 있지만
왼쪽 코드 처럼 union 안에 next 가 있다고 생각하면 됩니다

 

위 상황을 그림으로 나타내자면 

이런 상황이 됩니다 위 상황은 interlockedCompareExchange128 이 실행 되기 전까지의 과정입니다

 

interlockedCompareExchange128 이 실행 된 이후의 과정은 다음과 같습니다

 

  • Interlocked…. 128 실행하는 상황에서
    • header 와 expected 가 같다면
    • desired 값을 header 에 복사한다, 그렇지 않다면 복사가 header 에 되지 않음
    • + expected 에는 관계 없이 원래 오리지널 값이 들어갑니다

 

실행 이후의 과정

 

heaer 가 원래가리키던 곳이 아닌 그 다음을 가리킨 것을 알수 있으며 비교할때 인덱스 값들과 한번에 비교하는 연산으로 처리하여 이전에 삭제되어서 동일한 주소로 다시 생성된 것인지 또한 판별할 수 있는 상황이 되었습니다

즉 ABA 문제를 해결

 

 

아래는 코드입니다

 

함수들은 멤버 함수는 아니고 일반 함수입니다

DECLSPEC_ALIGN(16)
struct SListEntry
{
	SListEntry* next;
};

DECLSPEC_ALIGN(16)
struct SListHeader
{
	SListHeader()
	{
		alignment = 0;
		region = 0;
	}

	union
	{
		struct
		{
			uint64 alignment;
			uint64 region;
		} DUMMYSTRUCTNAME;
		struct
		{
			uint64 depth : 16;
			uint64 sequence : 48;
			uint64 reserved : 4;
			uint64 next : 60;
		} HeaderX64;
	};
};

void InitializeHead(SListHeader* header);
void PushEntrySList(SListHeader* header, SListEntry* entry);
SListEntry* PopEntrySList(SListHeader* header);

 

 

 

void InitializeHead(SListHeader* header)
{
	header->alignment = 0;
	header->region = 0;
}

void PushEntrySList(SListHeader* header, SListEntry* entry)
{
	SListHeader expected = {};
	SListHeader desired = {};
	
	// 16 바이트 정렬
	desired.HeaderX64.next = (((uint64)entry) >> 4);

	while (true)
	{
		expected = *header;

		// 이 사이에서 변경 가능성이 존재함

		entry->next = (SListEntry*)(((uint64)expected.HeaderX64.next) << 4);
		desired.HeaderX64.depth = expected.HeaderX64.depth + 1;
		desired.HeaderX64.sequence = expected.HeaderX64.sequence + 1;

		if (::InterlockedCompareExchange128((int64*)header, desired.region, desired.alignment, (int64*)&expected) == 1)
			break;
	}
}

SListEntry* PopEntrySList(SListHeader* header)
{
	SListHeader expected = {};
	SListHeader desired = {};
	SListEntry* entry = nullptr;

	while (true)
	{
		expected = *header;

		entry = (SListEntry*)(((uint64)expected.HeaderX64.next) << 4);
		if (entry == nullptr)
			break;

		// Use-After-Free
		desired.HeaderX64.next = ((uint64)entry->next) >> 4;
		desired.HeaderX64.depth = expected.HeaderX64.depth - 1;
		desired.HeaderX64.sequence = expected.HeaderX64.sequence + 1;

		if (::InterlockedCompareExchange128((int64*)header, desired.region, desired.alignment, (int64*)&expected) == 1)
			break;
	}

	return entry;
}

 

 

 

 

 

 

//테스트 해볼 코드

 

while (true)
{
        Data* pop = nullptr;
        pop = (Data*)PopEntrySList(GHeader);
                    //여기서 pop 한것을 아래에서 삭제 하려고 할때 이미 다른 스레드에 의해서
                    //이 포인터터가 물고 있던 대상이 이미 삭제 되었을 수도 있음, 하지만 다루진 않음

        if (pop)
        {
                cout << pop->_rand << endl;
                delete pop;
        }
        else

        ...

}

 

 

 

테스트할 Data 또한 16 바이트여야 함으로 16 바이트 ALIGN 을 해주고

 

 

ASSERT_CRASH(((uint64)GHeader % 16) == 0);

이 구문으로 해당 바이트가 16 바이트의 배수인지 => 16바이트인지 체크 해볼 수 있습니다

 

DECLSPEC_ALIGN(16)
class Data // : public SListEntry
{
public:
	SListEntry _entry;
	int64 _rand = rand() % 1000;
};

SListHeader* GHeader;

int main()
{
	GHeader = new SListHeader();
	ASSERT_CRASH(((uint64)GHeader % 16) == 0);
	InitializeHead(GHeader);

	for (int32 i = 0; i < 3; i++)
	{
		GThreadManager->Launch([]()
			{
				while (true)
				{
					Data* data = new Data();
					ASSERT_CRASH(((uint64)data % 16) == 0);

					PushEntrySList(GHeader, (SListEntry*)data);
					this_thread::sleep_for(10ms);
				}
			});
	}

	for (int32 i = 0; i < 2; i++)
	{
		GThreadManager->Launch([]()
			{
				while (true)
				{
					Data* pop = nullptr;
					pop = (Data*)PopEntrySList(GHeader);
                    //여기서 pop 한것을 아래에서 삭제 하려고 할때 이미 다른 스레드에 의해서
                    //이 포인터터가 물고 있던 대상이 이미 삭제 되었을 수도 있음, 하지만 다루진 않음

					if (pop)
					{
						cout << pop->_rand << endl;
						delete pop;
					}
					else
					{
						cout << "NONE" << endl;
					}
				}
			});
	}

	GThreadManager->Join();
}

 

 

lock free 는 직접 구현해 쓰기에는 모든 경우의 수를 예측하긴 어렵기 때문에 잘 검증된 것을 사용하는것이 났습니다

lock free 를 사용하려면 안정 검증까진 대규모 테스트 및 많은 수행 착오를 거처야 합니다

 

 

반응형

 

주요 차이점 

  1. 리턴 값이 다르다
  2. 128 비트를 사용하는가, 64 비트를 사용 하는가가 다르다

 

BOOLEAN InterlockedCompareExchange128(
  [in, out] LONG64 volatile *Destination,
  [in]      LONG64          ExchangeHigh,
  [in]      LONG64          ExchangeLow,
  [in, out] LONG64          *ComparandResult
);

 

Parameters

[in, out] Destination

비교할 값

 

[in] ExchangeHigh

덮어쓸 상위 비트

 

[in] ExchangeLow

덮어쓸 하위비트

 

[in, out] ComparandResult

비교 대상이 되는 값

 

 

설명 :

Destination과 ComparandResult 를 비교해서 이 둘이 같다면

ExchangeHigh:ExchangeLow 이 값을 Destination에 덮어 씁니다

 

Return value

InterlockedCompareExchange128

 

이 함수 자체의 리턴 값은 Destination과 ComparandResult 를 비교하여 같다면 1 을 리턴하고 같지 않다면 0을 리턴합니다

 

 

Remarks

 

주의할 점은 같다면 설명 : 처럼 동작하는데 그렇지 않다면 Destination 은 수정 되지 않은 체 그대로 남아 있게됩니다

그리고 이때  ComparandResult 에는 original Destination value(에 첫번째 인자인 destination 값)  값으로 저장 됩니다

 

파라미터들은 16 바이트 aligned 된 상태여야 합니다, 그렇지 않으면 예측되지 않은 동작을 하게 됩니다

See _aligned_malloc.

 

이 함수는 (당연히) atomic 하게 동작하고 컴파일러 내장을 하용하면서 x64 시스템에서 동작합니다

 

이 함수는 메모리 장벽(memory barrier 또는 fence) 를 사용합니다 , 연산의 순서적 처리를 보장하기 위해서

 

 

 

 

#include <stdio.h>
#include <intrin.h>
#include <iostream>


#include <Windows.h>
#include <winnt.h>
#include <mutex>
#include <atomic>
#define WIN32_LEAN_AND_MEAN // 거의 사용되지 않는 내용을 Windows 헤더에서 제외합니다.

using namespace std;

typedef struct t_node
{
	volatile __int64 arr[2];
}node;

int main()
{

	node* a = new node();

	a->arr[0] = 100;
	a->arr[1] = 1;

	__int64 i = 200;
	__int64 j = 500;


	volatile __int64 arr2[2] = {300,1};


	char r = ::InterlockedCompareExchange128((__int64*)a->arr, i, j, (__int64*)(arr2));

	cout << endl << "Interlocked Compare Res: " << r;

	cin.get();
	return 0;
}

 

 

 

 

 

LONG64 InterlockedCompareExchange64(
  [in, out] LONG64 volatile *Destination,
  [in]      LONG64          ExChange,
  [in]      LONG64          Comperand
);

 

 

이건 위에 것 보다 쉬운데

 

Destination 과 Comperand

두개를 비교해서 같다면 ExChange 를 Destination 에 저장합니다

만약 같지 않다면 아무 동작을 수행하진 않습니다

 

이것을 사용하려면 64비트 로 정렬 되어 있어야 합니다 만약 그렇지 않으면 예측 되지 않은 동작을 하며 x86 멀티 프로세스 그리고 x86 시스템이 아닌 어떤 곳에서 예측되지 않은 동작을 하게 됩니다 

Return value

리턴 값은 Destination  의 초기 값을 리턴합니다

 

 

 

https://learn.microsoft.com/en-us/windows/win32/api/winnt/nf-winnt-interlockedcompareexchange64

https://learn.microsoft.com/en-us/windows/win32/api/winnt/nf-winnt-interlockedcompareexchange128

반응형

예전에 게임 서버 포폴 만드는 공부에서 IOCP 구조로 만든 네트워크 스레드(n개)에서 받아들여온 패킷들을 Lock-Free Queue에 넣고, 게임 로직 스레드에서 꺼내다가 로직처리하도록 만들었었다.
I have developed my game server portfolio using IOCP structure. That server receives packets from network threads and pushes the lock-free queue. Then game logic thread pop a message from the lock-free queue and perform a suitable process.

그 때 겪었던 문제 중 하나가 ABA problem이었는데, stack을 가지고

간단하게 설명 글을 만들어보고자 한다.
One of the problems is the ABA problem. So, I explain the problem using the stack.


아 일단은 stack에 대해 알고 있다는 가정하에 설명하고 있다.

일단 MrRobot이라는 클래스를 보관하는 stack 이 있다고 가정하겠다.

스택이니까 간단하게 푸시를 먼저 해보자.

아래의 0x100, 0x200, 0x300은 MrRobot 객체가 할당된 메모리 주소를 예시로 든 것이다.

First I am explaining on the assumption that you know about the stack.

We have a stack that stores object of `MrRobot` class. The first step, push an object of 'MrRobot' class. The following 0x100, 0x200, 0x300 are examples of memory addresses assigned to MrRobot objects.

최초 스택 상태1개 Push한 상태2개 Push한 상태3개 Push한 상태


그럼 이제 pop하는 과정을 살펴보자. 일단은 간단한 pop 코드는 아래와 같다.

물론 일부러 ABA problem 문제가 발생하는 코드와 과정을 예시로 들 것이다.

Now let's take a look at the pop process. My sample pop code is following.
Of course, I will deliberately present code that has ABA problem.

  class Stack
   {
       volatile MrRobot* topPtr_;

       MrRobot* Pop()
       {
           while(1)
           {
               MrRobot* _retPtr = topPtr_;      
                                           
               if (nullptr == _retPtr) 
               {
                   return NULL;                                              
               }

               MrRobot* _nextPtr = _retPtr->next;   
                         
               if(_InterlockedCompareExchange( topPtr_, _nextPtr, _retPtr ))
                   return _retPtr;
           }
       }
   }

   class MrRobot
   {
   public:
      int var_;
   }​


위의 코드에서 _InterlockedCompare() 함수에 대해 간략히 설명하면 아래와 같다.
첫 번째 인자: 비교할 값의 메모리 주소

A description of the _InterlockedCompare() function is shown below.

first parameter



 : compare the value's memory address.


두 번째 인자: 첫 번째 인자와 세 번째 인자의 비교 결과가 같을 경우 첫 번째 인자값을 이 값으로 교체한다.

second parameter

 : If the comparison results of the first and third parameters are the same, replace the first parameter with this value.


세 번째 인자: 첫 번째 인자와 비교할 대상

third parameter

 : compare with first parameter.


아래는 해당 함수에 대한 msdn 링크다.

Interlocked.ComapreExchange method



간단한 샘플 코드는 요렇다.

Simply sample code is below to use _InterlockedCompareExchange function.

//예시 코드
int main()
{
    volatile long *pLong = new long(5);

    // pLong이 가리키는 값이 5 -> 3으로 변경된다.
    _InterlockedCompareExchange(pLong, 3, 5);
}
 


동시에 다른 스레드에서 push와 pop하는 과정도 살펴볼 것이다.우리가 중점적으로 살펴볼 스레드는 1번이다.

And at the same time, we will look at the push and pop process on multi-threads.

we will focus on the first thread.

4번 스레드의 빨간 글씨와 1번 스레드의 마지막 부분을 자세히 살펴보면

아래와 같은 내용을 알 수 있다.


0x200은 이미 pop되고 없는데?
게다가 0x300번지는 1번 스레드에서 pop했을 때와 이미 다른 정보다.

반납한 메모리가 금새 다시 재할당되서 들어올 수 있냐고 하는데,
전혀 가능성이 없는 일이 아니다.

시스템에서 제공하는 메모리 관련 연산자를 사용해서 할당하던,
개인적으로 만든 메모리 풀을 사용하던 가능성이 있는 일은 배제해야되니까..

정리해보면 ABA 문제는 멀티스레드 환경에서 동기화 과정에서
현재 진행 중인 스레드(위의 예시에서는 1번 스레드)에서 두 번의 읽기 사이에
다른 스레드가 실행되어 값을 변경하게 되는데, 첫 번째 스레드에서는
아무것도 변하지 않았다고 인식하게 되어 속는 문제를 말한다.

 

 

ref : http://cutup9999.blogspot.com/2018/08/lock-free-aba-problem.html

반응형

 

메모리 풀의 전체 구조는 아래와 같습니다

컨셉은 다양한 크기의 메모리를 메모리 풀화시켜서 사용 하는 방식입니다

 

 

 

 

주요 클래스는 다음과 같습니다

 

오른쪽 하단 부터 위쪽으로 화살표 방향대로 메모리를 생성하는 방향입니다

 

 

 

 

 

 Thread-safe 한 형태

 

 

STL 에 사용할 Allocator

template<typename T>
class StlAllocator
{
public:
	using value_type = T;

	StlAllocator() { }

	template<typename Other>
	StlAllocator(const StlAllocator<Other>&) { }

	T* allocate(size_t count)
	{
		const int32 size = static_cast<int32>(count * sizeof(T));
		//return static_cast<T*>(xalloc__(size));
		return static_cast<T*>(PoolAllocator::Alloc(size));
	}

	void deallocate(T* ptr, size_t count)
	{
		//xrelease__(ptr);
		PoolAllocator::Release(ptr);
	}
};

 

 

PoolAllocator : Memory 클래스를 경유하기 위한 중간 클래스

class PoolAllocator
{
public:
	static void*	Alloc(int32 size);
	static void		Release(void* ptr);
};


//어딘가에 있으면 됨
Memory*				GMemory = nullptr;

void* PoolAllocator::Alloc(int32 size)
{
	return GMemory->Allocate(size);
}

void PoolAllocator::Release(void* ptr)
{
	GMemory->Release(ptr);
}

 

 

 

 

 

Memory : 메모리 풀 전체를 관장하는 메니저 유사한 것

메모리에는 메모리풀을 여러개 갖고 있다

//메모리에는 메모리풀을 여러개 갖고 있다
class Memory
{
	enum
	{
		// ~1024까지 32단위, ~2048까지 128단위, ~4096까지 256단위
		POOL_COUNT = (1024 / 32) + (1024 / 128) + (2048 / 256),
		MAX_ALLOC_SIZE = 4096
	};

public:
	Memory();
	~Memory();

	void*	Allocate(int32 size);
	void	Release(void* ptr);

private:
	vector<MemoryPool*> _pools;

	// 메모리 크기 <-> 메모리 풀
	// O(1) 빠르게 찾기 위한 테이블
	MemoryPool* _poolTable[MAX_ALLOC_SIZE + 1];
};

 

 

 

바이트가 적은 메모리를 보통 많이 사용하기 때문에 보다 적은 메모리를 사용 할경우에는

더 많이 사용 할 수 있도록 하고(queue개수 자체가 많아짐 queue.size() 를 말하는 것이 아님)

 

더 큰 공간을 사용할때는 개수 자체를 좀 줄이도록 하는데 큰 메모리를 pooling 할 일이 많진 않음으로 일반적으로

Memory::Memory()
{
	int32 size = 0;
	int32 tableIndex = 0;

	for (size = 32; size <= 1024; size += 32)
	{
		MemoryPool* pool = new MemoryPool(size);
		_pools.push_back(pool);

		while (tableIndex <= size)
		{
			_poolTable[tableIndex] = pool;
			tableIndex++;
		}
	}

	for (; size <= 2048; size += 128)
	{
		MemoryPool* pool = new MemoryPool(size);
		_pools.push_back(pool);

		while (tableIndex <= size)
		{
			_poolTable[tableIndex] = pool;
			tableIndex++;
		}
	}

	for (; size <= 4096; size += 256)
	{
		MemoryPool* pool = new MemoryPool(size);
		_pools.push_back(pool);

		while (tableIndex <= size)
		{
			_poolTable[tableIndex] = pool;
			tableIndex++;
		}
	}
}

Memory::~Memory()
{
	for (MemoryPool* pool : _pools)
		delete pool;

	_pools.clear();
}

void* Memory::Allocate(int32 size)
{
	MemoryHeader* header = nullptr;
	const int32 allocSize = size + sizeof(MemoryHeader);

	if (allocSize > MAX_ALLOC_SIZE)
	{
		// 메모리 풀링 최대 크기를 벗어나면 일반 할당
		header = reinterpret_cast<MemoryHeader*>(::malloc(allocSize));
	}
	else
	{
		// 메모리 풀에서 꺼내온다
		//Pop() : 메모리를 할당 받거나, queue 에서 꺼내올 수 있는 MemoryHeader 가 있다면 꺼내온다
		header = _poolTable[allocSize]->Pop();
	}

	//= 할당할 메모리 가장 앞 부분을 MemoryHeader 영역으로 사용하고 그다음 MemoryHeader 이 사이즈 만큼 증가 시킨다음 
	//실제 vector 에서  사용할 메모리 번지를 void* 로 리턴 시킴
	return MemoryHeader::AttachHeader(header, allocSize);
}

//해제 할때는 free 로 해제 하기 때문에 MemoryHeader가 시작되는 첫 주소를 역으로 받아와서 해제 하거나 다시 queue 에 넣는다
void Memory::Release(void* ptr)
{
	MemoryHeader* header = MemoryHeader::DetachHeader(ptr);

	const int32 allocSize = header->allocSize;
	ASSERT_CRASH(allocSize > 0);

	if (allocSize > MAX_ALLOC_SIZE)
	{
		// 메모리 풀링 최대 크기를 벗어나면 일반 해제
		::free(header);
	}
	else
	{
		// 메모리 풀에 반납한다
		_poolTable[allocSize]->Push(header);
	}
}

 

 

 

 

 

MemoryPool : 실제 메모리 풀을 관리하는 클래스 

class MemoryPool
{
public:
	MemoryPool(int32 allocSize);
	~MemoryPool();

	void			Push(MemoryHeader* ptr);
	MemoryHeader*	Pop();

private:
	int32 _allocSize = 0;
	atomic<int32> _allocCount = 0;

	USE_LOCK;
	queue<MemoryHeader*> _queue;
};




MemoryPool::MemoryPool(int32 allocSize) : _allocSize(allocSize)
{
}

MemoryPool::~MemoryPool()
{
	while (_queue.empty() == false)
	{
		MemoryHeader* header = _queue.front();
		_queue.pop();
		::free(header);
	}
}

void MemoryPool::Push(MemoryHeader* ptr)
{
	WRITE_LOCK;
	ptr->allocSize = 0;

	// Pool에 메모리 반납
	_queue.push(ptr);

	_allocCount.fetch_sub(1);
}

MemoryHeader* MemoryPool::Pop()
{
	MemoryHeader* header = nullptr;

	{
		WRITE_LOCK;
		// Pool에 여분이 있는지?
		if (_queue.empty() == false)
		{
			// 있으면 하나 꺼내온다
			header = _queue.front();
			_queue.pop();
		}
	}

	// 없으면 새로 만들다
	if (header == nullptr)
	{
		header = reinterpret_cast<MemoryHeader*>(::malloc(_allocSize));
	}
	else
	{
		ASSERT_CRASH(header->allocSize == 0);
	}

	_allocCount.fetch_add(1);

	return header;
}

 

 

 

 

 

MemoryHeader : 사용할 메모리 앞에 붙는 헤더로 부가적인 정보를 붙일때 사용 할 수 있다

struct MemoryHeader
{
	// [MemoryHeader][Data]
	MemoryHeader(int32 size) : allocSize(size) { }

	static void* AttachHeader(MemoryHeader* header, int32 size)
	{
		new(header)MemoryHeader(size); // placement new
		return reinterpret_cast<void*>(++header);
	}

	static MemoryHeader* DetachHeader(void* ptr)
	{
		MemoryHeader* header = reinterpret_cast<MemoryHeader*>(ptr) - 1;
		return header;
	}

	int32 allocSize;
	// TODO : 필요한 추가 정보
};
반응형

먼저 선행해서 보면 좋은 부분 memory pool 을 하기 위한 new 오버로딩

https://3dmpengines.tistory.com/1399

 

placement new , 생성자를 원하는 시점에 호출하기

operator new, operator delete 연산자 C++에서 동적 객체를 생성하기 위해 일반적으로 new 연산자를 사용하는데, new 연산자는 2가지의 동작단계를 지니고 있습니다. 요청한 타입의 객체를 담을수 있는 크

3dmpengines.tistory.com

 

 

메모리가 오염되어 잘못 건드릴때나 엉뚱한 곳의 메모리를 건드려 나중에(몇시간, 또는 며칠) 시간이 한참 지나서 크래쉬가 나는현상을 격을 수 있는데 그럴때의 대비책이다

 

 

 


 

 

 

++ 기본 new와 delete, malloc, free를 쓰면 해제된 메모리에 접근했는데도 crash가 발생하지 않는 문제가 있다.

엉뚱한 메모리를 오염시키고 결국 오류가 나는 곳은 문제의 원인과 멀리 떨어져있는 경우가 많아서 디버그 하기도 쉽지않다.

// Case 1
Knight* k1 = new Knight();
k1->_hp = 200;
k1->_mp = 50;
delete k1;
k1->_hp = 100; // User-After-Free

// Case 2
vector<int32> v{ 1,2,3,4,5 };
for (int32 i = 0; i < 5; i++)
{
	int32 value = v[i];
	// TODO
	if (value == 3)
	{
		v.clear(); // 이후부터 유효하지 않은데, 또 접근.. 메모리 오염
	}
}

// Case 3
Player* p = new Player();
Knight* k = static_cast<Knight*>(p); // casting 오류 ,, dynamic_cast를 사용하고 nullptr를 체크하면 되지만 느림.
k->_hp = 100; // 메모리 오염

 

OS는 가상메모리를 사용한다. 프로그램에서 메모리를 요구하면 실제 메모리를 할당한 후, 물리 주소를 반환하는 게 아니라 물리주소에 매핑된 가상메모리를 반환한다. 따라서 다른 프로그램에서 같은 메모리를 주소를 참조한다고 하여도 실제 물리 주소는 다르다. 

 

운영체제는 메모리 할당 최소단위가 있어 아무리 잘게 메모리 할당을 요청해도 최소단위 만큼 메모리를 할당한다. 최소단위는 페이자 불리는 동일한 크기의 여러 구역으로 나뉜다. 이러한 정책을 페이징이라고 한다. 각 페이지마다 보안정책을 설정할 수 있다.

 

4KB = 4 * 1024 : 16진수로 0x1000 과 같음

4KB = 65536Byte = 64 * 1024 : 16 진수로 0x10000 과 같음

 

아래 함수에서 virtual alloc 으로 메모리를 잡을때 페이지 단위 크기로 잡는다(allocSize 을 4로 해놨어도)

즉 메모리를 페이지 단위로 관리해준다

::VirtualAlloc(NULL, allocSize, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE);

 

 

2GB 면 64KB 로 페이지 잡는다고 할떄 32,768 개수 만큼을 잡을 수 있다

32,768 * 64 = 2,097,152 KB = 2GB

 

 

// 2GB [                      ]
// 2GB [ooxxooxxoxooooxoxoooos]
// 페이징 정책을 쓴다 4Kb[r][w][rw][][][][][] 페이지에 보안정책을 설정할 수 있다.

SYSTEM_INFO info;
::GetSystemInfo(&info);

info.dwPageSize; // 4KB (0x1000)
info.dwAllocationGranularity; // 64KB (0x10000) 메모리 할당할 때 이 숫자의 배수로 할당한다.

즉 페이지를 64kb 배수로 할당 하겠다는 것이고 메모리를 이 페이지 단위로 관리된다

dwAllocationGranularity = 64KB

 

그리고 이 페이지에 실제 메모리에 대한 권한 w,r, rw 등의 권한을 주어 보안 관리를 할 수 있고

dwAllocationGranularity 의 목적은 너무 세분화 되면 효율이 떨어지기 때문에 이것을 막기위해 64KB 로 설정된다

(이것은 운영체제마다 달라질 수 있겠지만)

 

 

운영체제에 직접 메모리 할당, 반환을 요청하는 API가 있는데 이를 이용하면 잘못된 주소를 참조할 때 바로 crash가 발생한다.

 

윈도우에서 할당 요청 함수는 VirtualAlloc, 반환 함수는 VirtualFree이다. 이를 이용해 StompAllocator를 만들어보자.

Allocator.h, Allocator.cpp

/*---------------------
	StompAllocator
---------------------*/

class StompAllocator
{
	enum { PAGE_SIZE = 0x1000 };

public:
	static void* Alloc(int32 size);
	static void Release(void* ptr);
};
/*---------------------
	StompAllocator
---------------------*/

void* StompAllocator::Alloc(int32 size)
{
	const int64 pageCount = (size + PAGE_SIZE - 1) / PAGE_SIZE;
	const int64 dataOffset = pageCount * PAGE_SIZE - size;

	void* baseAddress = ::VirtualAlloc(NULL, pageCount * PAGE_SIZE, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE);
	return static_cast<void*>(static_cast<int8*>(baseAddress) + dataOffset);
}

void StompAllocator::Release(void* ptr)
{
	const int64 address = reinterpret_cast<int64>(ptr);
	const int64 baseAddress = address - (address % PAGE_SIZE);
	::VirtualFree(reinterpret_cast<void*>(baseAddress), 0, MEM_RELEASE);
}

pageCount는 가장 작은 PAGE_SIZE의 배수를 찾는다. pageCount * PAGE_SIZE가 요청할 메모리 크기이다.

dataOffset은 메모리 overflow를 찾기 위해 필요하다. offset없이 할당하면

[[       ]                 ] 와 같이 할당된다. 실제 사용메모리 부분을 할당 메모리 끝쪽에 위치시키면

overflow가 났을 때 잡아준다. [                [       ]]

 

메모리 반환하기 위해서 다시 원래 할당 주소를 찾아야 한다. address % PAGE_SIZE으로 offset을 구하고 그만큼 앞으로 이동한 포인터를 해제한다. 

 

메모리 alloc release 매크로를 바꾸면 모든 코드에 적용되어 편리하게 사용할 수 있다.

CoreMacro.h

	/*----------------------
			Memory
	----------------------*/
#ifdef _DEBUG
	#define x_alloc(size) StompAllocator::Alloc(size)
	#define x_release(ptr) StompAllocator::Release(ptr)
#else
	#define x_alloc(size) BaseAllocator::Alloc(size)
	#define x_release(ptr) BaseAllocator::Release(ptr)
#endif
// overflow 문제 잡기
//[                         [    ]]
Knight* knight = (Knight*)xnew<Player>();

knight->_hp = 100;
xdelete(knight);

User-After-Free 뿐 아니라 메모리 overflow 상황도 대응가능하다.

 

 

 

 


Custom해서 만든 allocator를 stl에서도 쓸 수 있다. 약간의 코드 작업이 필요하다. 먼저 stl에서 기대하는 API를 구현한 allocator가 필요하다.

Allocator.h

/*---------------------
	STL Allocator
---------------------*/

template<typename T>
class StlAllocator
{
public:
	using value_type = T;

	StlAllocator() {}

	template<typename Other>
	StlAllocator(const StlAllocator<Other>&) {}

	T* allocate(size_t count)
	{
		const int32 size = static_cast<int32>(count * sizeof(T));
		return static_cast<T*>( x_alloc (size));
	}

	void deallocate(T* ptr, size_t count)
	{
		x_release (ptr);
	}
};

 

allocate, deallocate에서 count는 메모리 사이즈가 아니라 할당할 메모리 개수이다.

 

StlAllocator를 사용한 stl container들을 재정의한다.

CorePch.h

#pragma once
...
#include "Container.h"
...

Container.h

#pragma once

#include "Types.h"
#include "Allocator.h"

#include <vector>
#include <list>
#include <queue>
#include <stack>
#include <map>
#include <set>
#include <unordered_map>
#include <unordered_set>
using namespace std;

template<typename Type>
using Vector = vector<Type, StlAllocator<Type>>;

template<typename Type>
using List = list<Type, StlAllocator<Type>>;

template<typename Key, typename Type, typename Pred = less<Key>>
using Map = map<Key, Type, Pred, StlAllocator<pair<const Key, Type>>>;

template<typename Key, typename Pred = less<Key>>
using Set = set<Key, Pred, StlAllocator<Key>>;

template<typename Type>
using Deque = deque<Type, StlAllocator<Type>>;

template<typename Type, typename Container = Deque<Type>>
using Queue = queue<Type, Container>;

template<typename Type, typename Container = Deque<Type>>
using Stack = stack<Type, Container>;

template<typename Type, typename Container = Vector<Type>, typename Pred = less<typename Container::value_type>>
using PriorityQueue = priority_queue<Type, Container, Pred>;

using String = basic_string<char, char_traits<char>, StlAllocator<char>>;

using WString = basic_string<wchar_t, char_traits<wchar_t>, StlAllocator<wchar_t>>;

template<typename Key, typename Type, typename Hasher = hash<Key>, typename KeyEq = equal_to<Key>>
using HashMap = unordered_map<Key, Type, Hasher, KeyEq, StlAllocator<pair<const Key, Type>>>;

template<typename Key, typename Hasher = hash<Key>, typename KeyEq = equal_to<Key>>
using HashSet = unordered_set<Key, Hasher, KeyEq, StlAllocator<Key>>;

 

아래와 같이 사용하면 된다.

int main()
{
	Vector<Knight> v;
	HashMap<int32, Knight> map;
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

ref : https://dockdocklife.tistory.com/entry/STL-allocator?category=985040 

ref : https://dockdocklife.tistory.com/entry/Stomp-allocator

반응형

본 내용은 프라우드넷에 국한된 내용이 아닌 일반적인 프로그래밍의 이야기입니다. 어쨌건 프라우드넷을 개발하다가 튀어나온 이슈인지라 여기에 적어봅니다.

 

프라우드넷 오래전 버전 내부에서는 shared_ptr을 거의 쓰지를 않았습니다. 내부적으로는 처리속도를 우선하기 위해 shared_ptr이나 여타 smart ptr 객체를 안 썼습니다. 그 댓가로, 직접 포인터를 관리하는 방식이었죠. 네. 수동 new delete 말입니다.

하지만 내부 코드를 유지보수하는 것이 힘이 들대로 들어서, shared_ptr을 중간에 도입을 했었습니다.

 

그런데 이렇게 하고 났더니, 스트레스 테스트 결과 성능이 평소보다 다섯배 이상으로 하락하는 것이 발견됐었습니다. 결국 해결했습니다... 어떻게 해결했는지 소개하겠습니다.

문제의 원인은 shared_ptr의 복사 연산자였습니다.

 

코드로 예를 들게요.

 

shared_ptr<X> a = xxx;

shared_ptr<X> b = a; // [1]

 

여기서 [1]의 속도가 어마무시하게 느렸다는거죠.

이 코드가 항상 느린 것은 아닙니다. 일반적인 경우 느리게 작동 하지 않습니다. 둘 이상의 스레드가 [1]에서 경합할 때만 엄청나게 느렸죠.

 

프라우드넷은 리눅스, 윈도 모두 멀티코어 활용을 전제로 하고 있습니다. 포인터가 가리키는 객체에 대한 액세스 즉 off the pointer는 격리를 시키지만 정작 이 객체들을 가리키는 포인터 변수에 자체에 대해서는 격리되어 있지 않는 경우들이 많습니다. (멀티스레드 플밍하면 당연한 얘기죠.)

 

결국 이 문제는 두가지로 해결했습니다.

  1. move semantics.
  2. 파라메터 전달을 할 때 byval 대신 const byref를 하기.

 

이 두가지를 적용하고 나니 원래 속도에 근접하게 되돌아왔습니다. 물론 shared_ptr을 쓰면서 발생하는 약간의 성능 저하는 그냥 감수하기로 했고요. shared_ptr을 도입한 후부터 유지보수하기가 훨씬 좋아졌는데 어떻게 버리겠어요. ㅎㅎ

 

문제의 원인은 shared_ptr의 복사 연산자 내부에서 atomic operation을 하는 부분이 심각하게 느리다는데 있었습니다. 네. InterlockedIncrement나 gcc builtin atomic operations 자체가 엄청나게 느렸다는데 있었죠.

 

shared_ptr의 복사 연산자 안에서는 reference count를 1 증가시킵니다. 그리고 shared_ptr 변수가 사라질 때는 1 감소하고요. 이 증가와 감소 연산 말고도 다른 뭔가를 하는 것들이 있습니다.

shared_ptr 안에서는 이 증가와 감소를 atomic operation으로 하고 있습니다. 기계어로 풀어보면

lock xchg xxx xxx

딱 한줄입니다.

 

이 기계어 명령 한줄은 경합이 없으면 non atomic보다 2배 느린 수준에서 끝납니다. 그러나  경합이 있으면 이 기계어 한줄은 40배 정도 느려집니다.

 

그런데 byref나 move semantics를 쓰면 atomic op 자체를 건너뜁니다. 따라서 저 극악의 성능 저하가 예방되는거죠.

 

mutex나 critical section을 lock 하는 것보다 아토믹 연산이 훨씬 빠르다고 알려져 있습니다. 그러나 아토믹 연산도 결국 경합 앞에서는 엄청나게 느립니다. 다만 소프트웨어로 경합을 중재하는 것보다 훨씬 빠를 뿐 경합이 전혀 없을 때 보다는 훨씬 느리게 작동을 합니다. 안타깝게도 이 문제는 하드웨어 엔지니어 선에서 해결해야 합니다.

 

글만 쭈욱 적었는데, 예시 코드로 보여드립니다.

 

void func1()

{

shared_ptr<X> x;

func2(x);

func3(x);

}

 

void func2(shared_ptr<X> a) // byval

{

a->xxx();

}

 

void func3(const shared_ptr<X>& a) // const byref

{

a->xxx();

}

 

func2와 func3의 속도 차이는 평소에는 별로 없습니다. func2와 func3을 여러 스레드에서 병렬 실행을 해도 별 차이가 안납니다. 그러나 둘 이상의 스레드 안에서 동일한 shared_ptr 객체에 대해 func2를 동시에 실행할때와 func3를 동시에 실행하게 되면, func2 쪽이 훨씬 느리게 작동합니다. 이게 shared_ptr에 대한 byval과 byref의 차이입니다.

 

 

ref : http://lab.gamecodi.com/board/zboard.php?id=GAMECODILAB_Proudnet_Lec&page=1&sn1=&divpage=1&sn=off&ss=on&sc=on&select_arrange=headnum&desc=asc&no=14

반응형

 

std::shared_ptr은 reference counter resource로 구성되어 있습니다.

reference counter 자체는 스레드로부터 '안전' 합니다.

그러나 resource의 접근은 안전하지 않습니다. 

즉, reference counter 수정 원자적(atomic)작업이며, resource가 정확히 한 번 삭제된다는 보장이 있습니다.

 


 

 

 

#include "shared_ptr.h"
#include "shared_ptr.hpp"

using namespace std;

shared_ptr<int> global_ptr;
void thread1()
{
	while (true)
	{
		shared_ptr<int> temp;
		temp = global_ptr;
		if (1 != (*temp)) cout << "Error \n";
	}
}

int main(void)
{
	global_ptr = shared_ptr<int>(new int(1));
	thread th(thread1);

	while (true)
	{
		shared_ptr<int> _newP = shared_ptr<int>(new int(1));
		global_ptr = _newP;
	}

	system("pause");
	return 0;
}

 

 

 

왜 이런 현상이 발생하는지 알아보면,

thread1 에서 

shared_ptr<int> temp; 
temp = global_ptr;


바로 이구간, temp에 글로벌 shared_ptr을 넣는 부분에서 컨텍스트 스위칭이 일어나며 mainThread로 옮겨져 

eb_shared_ptr<int> _newP = eb_shared_ptr<int>(new int(1)); 
global_ptr = _newP;



이 코드가 실행되었다면, thread1에 가지고있는 temp는 값이 사라지게 됩니다. 
따라서 Error텍스트가 노출되고, 스코프를 벗어나며 해제된 메모리에 접근하여 _refCount를 가져와서 크래시까지 발생하게 되는 것입니다. 

 


이 방법을 해결하기 위해서는 두가지 방법이 있습니다. 


1. atomic_load를 사용한다. 

atomic_load를 사용하면 깔끔한 구현이 만들어집니다. 

shared_ptr의 생성과 소멸이 잦은 경우가 아니라면 성능에 큰 지장이 있지도 않을 것 같습니다. 
atomic_load를 살펴보면 내부에 전역 spin lock이 구현되어 있습니다. 
atomic_load을 사용하는 객체는 타입이 다른 shraed_ptr인 경우에도 모두 단 한개의 lock만을 이용하게 될 것입니다. 생성과 소멸이 매우 잦은 경우라면 문제가 생길수도 있다고 생각합니다. 


단점이 있다면 atomic_load를 강제할 수 있도록 만들어 놓지 않는다면 빼먹고 그냥 make_shared를 사용하여 로드를 할 수도 있겠죠.. 

 

 

 

2. c++20에 있는 atomic_shared_ptr을 사용한다. 

 

c++20에서는 atomic_shared_ptr을 사용할 수 있습니다.
이 atomic_shared_ptr은 LockFree로 구현되어 있어 성능상 매우 좋을것으로 생각됩니다. 

 

ref : https://openmynotepad.tistory.com/90

ref : https://agh2o.tistory.com/45

반응형

1과 자기 자신 이외의 다른 양의 정수로 나누어지지 않는 1보다 큰 정수라고 하였습니다.

소수는 양의 약수로 1과 자신만을 가진 자연수 = 2개

 

 

 

주의 : 로직에는 소수의 누적 개수를 제대로 구했는지에 대한 확인 코드가 같이 들어있습니다

 

필자의 pc 에서 hardware_concurrency 값은24

 

vector<int> retV;

vector<vector<int>> range;

bool isPrime(int num)
{
	if (num == 0 || num == 1)
	{
		return false;
	}
	else if ( num == 2)
	{
		return true;
	}

	//int maxNum=sqrtl(num);
	for (int i=2; i*i <= num;++i)
	{
		if (num % i == 0)
		{
			return false;
		}
	}
	return true;
}



void primeCount(int start, int end, int i)
{
	int count = 0;
	for (int i=start;i<end;++i)
	{
		count +=isPrime(i);
	}

	retV[i] = count;
}


int verifyPrimeCount(int start, int end)
{
	int count = 0;

	for (int i = start; i < end; ++i)
	{
		if (isPrime(i))
		{
			count += 1;
		}
	}
	return count;
}




int main()
{


	//int count = 100'0000;	//=>	78498
	int count = 1'0000;		//=> 1229
	//int count = 1000;			//168
	//int count = 24 * 2 + 1;			
	//int count = 3;			
	//필자의 pc 코어는 
	//int count = 10;			//=> 4

	count += 1;		//해당 숫자 까지 임으로
	

	int jj = 0;
	int total = 0;
	int verifyCount = 0;

	retV.reserve(thread::hardware_concurrency());

	while (jj <= count)
	{

		int32 threadCount = thread::hardware_concurrency();
		verifyCount = verifyPrimeCount(0, count);

		//스레드가 구할 개수보다 더 많으면 분할 수치에 오류가 생김으로
		//코어가 많아봐야 24*2 정도 => 가정짐 기준
		//또는 hardware_concurrency 이 0 을 리턴할 수도 있다
		if (threadCount * 2 >= count)
		{
			threadCount = 1;
		}

		//int32 threadCount = 3;
// 		range.resize(threadCount + 1);
// 		for (auto& elem : range)
// 		{
// 			elem.resize(2);
// 		}

		retV.resize(threadCount + 1);

		vector<thread> threads;

		int divi = count / threadCount;
		for (int i = 0; i <= threadCount; ++i)
		{
			threads.push_back(thread([i, count, divi]()
			{
				//확인을 위한 분할 값
				//range[i][0] = divi * i;
				//range[i][1] = min(count, divi * (i + 1));

				primeCount(divi * i, min(count, divi * (i + 1)), i);
			}));
		}


		for (auto& th : threads)
		{
			th.join();
		}

		total = 0;
		total = std::accumulate(retV.begin(), retV.end(), 0);

		//총 소수개수
		//cout << total << std::endl;

		if (verifyCount != total)
		{
			int different = 0;
			different = 1;
		}

		++jj;
	}
    return 0;
}

이거보다 더 개선할 방향들은 있을 겁니다

 

반응형

DFS Deadlock Detection 

 

데드락을 DFS 를 통해서 미리 판별할수 있다는 컨셉으로 접근하는 방식

 

 

 

 

 

인접행렬 3,4 : 3에서 4로 가는 경우가 있으니 3번째줄에서 오른쪽으로 4번이 1인걸 알수 있다

 

 

 

그래프에서 노드마다 각각 lock 을 갖고 있다고 가정한다

 

이때 노드 방문시 역방향이 일어나면 데드락이 발생 하는데

예를 들어 0번 노드에서 1번으로 간다음 아직 그래프 탐색이 끝나지 않은 상태에서 1번에서

다시 0번으로 가는 경우를 말한다

 

그런데 만약 노란색 선이 있었다 가정하면

0과 3이 역방향으로 성립되어 사이클이 형성 되지만

3->0->1->3 번 순으로 사이클이 도는것을 알 수 있고 이럴때도 데드락이 발생한다고 할수 있다

무한히 돌아가는 상황이 발생

그래서 데드락을 DFS 를 통해서 런타임시 판단하는 방식인데 생각보다 빠르게 판별할수 있다

 

 

먼저 그래프는 3가지의 간성이 있을수 있는데

 

1. 순방향 간선 : 최초 0번을 방문 한다음 1번노드나 3번 노드를 방문 할떄와 같은 상황을 말한다

2. 교차 간선 : 3번 노드부터 방문을 시작할때 다음처럼 방문이 끝나게 된다

이때 5번 노드부터 탐색을 시작 할대 이미 3과 4가 하나의 그룹으로 탐색이 완료된 상태에서 5번이 완료된 곳으로 진입하게 될때 이때의 5에서 4로 가는 간석을 교차간선이라 한다

 

3. 역방향 간선 : 노드탐색이 완료된 위 노란색박스 상태가 아닌 상태에서 즉 아직 탐색이 끝나지 않은 상태에서 역으로 가는 상태가 나타날때를 역방향 간선이라고 한다 예를 들어 0번 노드부터 시작을 하면 0번 노드에 방문을 표시를 한다음
1과 3 노드에 방문하게 되는데 1번 노드에서 부터 다시 재귀적으로 DFS 를 돌게 될것이고 1번 노드에서 0으로 (빨간색 화살표)로 방문을 하게되어 아직 완료 되지 않은 상태에서 역방향 사이클이 형성 되는것을 알 수 있다

 

사이클이 일어나면 데드락상황이 된다

 

각 노드들에는 Lock 이 있다 가정하고 이것을 감지하는 방식은 Lock 을 write 할때와 풀어줄때는 생성자-파괴자에 lock 감지와 풀어주는 코드를 넣어주고 이것을 활용하여 순환이 되는지를 체크해보면 된다

 

 

 

 

int main()
{
	GThreadManager->Launch([=]
		{
			while (true)
			{
				cout << "PlayerThenAccount" << endl;
				GPlayerManager.PlayerThenAccount();
				this_thread::sleep_for(100ms);
			}
		});

	GThreadManager->Launch([=]
		{
			while (true)
			{
				cout << "AccountThenPlayer" << endl;
				GAccountManager.AccountThenPlayer();
				this_thread::sleep_for(100ms);
			}
		});

	GThreadManager->Join();
}

 

아래와 같이 Daed Lock 상황이 발생한다 가정해보자

각각 스레드가 다음 처럼 돌아간다 

 


B 에서 A-2 로 진입시 A 의 lock 이 풀릴때까지 A-2 진입 대기
바로 지금 상황에서 A 의 lock이 풀리는지 A 에서 흐름을 따라가보면

A 에서 B-2 로 진입시 B 의 lock 이 풀릴때까지 B-2 진입 대기

하게 된다 B가 이전에 소유되었음으로 이래서 서로 풀지 못하는 Dead Lock 상황이 된다

 

=> 추적을 위해 dead lock 상황을 만든 상태이다

그리고 write 또는 read 시작 하는 순간 lock을 하는 순간 writeLockGuard 에서 typeid 로 현재 클래스의 이름을 넘겨주는데 이것이 dead lock 이 발생하는 곳을 좀더 알기 쉽게 발견하기 위한 이름이 된다

 

 

lock 을 잡는 순서를 따라 가다보면 아래와 같은 그림을 그릴 수 있다

이렇게 A 의 lock 과 B 의 lock 간의 싸이클이 형성 된다는 것을 알 수 있다

 

 

 

DFS Deadlock Detection 특징

  1. 이런 사이클이 일어나는 현상을 감지하여 Dead lock 일 발생하고 있음을 DFS 로 추적하여 알려주면 된다
  2. 거의 실행 직후 바로 감지 할 수 있다는 특징이 있다
  3. 보통 Debug 모드에서만 감지처리를 하고 shipping 모드에선 제거 하기 때문에 검출을 위한 성능 저하는 없다

 

 

 

코드는 DFS 방문 특징을 먼저 이해하는 것이 수월합니다

 

가장먼저 DFS 를 실행 하게 되는 발생 원인은

 

_history : prevId 이 전노드 기준 현재노드까지 간 내역을 history 에 담고 있게 되는 것인데 만약

만약 새로운 간선이 추가 된다면 

역순환이 있는지(DeadLock) 확인하기 위해 DFS 실행 하는 것입니다

 

_history 에는 이전 prevId 기준에서 현재  노드(lockNode_Id )가 방문 가능한 노드 중에 방문하지 못했던

노드라면 _history 에 lockNode_Id 추가하고 이렇게 되면 새로운 간선이 추가 된거이기 떄문에

DFS 실행 하여 역순환 확인을 하는 것입니다

 

CheckCycle_startDFS : DFS 시작 함수

 

 

 

알고리즘 특징

( _lockHistory(인접행렬) , _nameToId, _idToName) 는 날리지 않고 계속 보유한다

 

CheckCycle_startDFS  순환 체크 검사를 시작하면 초기화 되는 변수들은

CheckCycle_startDFS 함수 실행시 초기화 됩니다

_discoveredCount = 0;

_vecDiscoveredOrder { -1, -1 }  : 방문할때 here 인덱스에는 순서 값이 들어감

_vecParent {-1, -1}

_vecFinished {0, 0}

 

 

 

아래 그림은 기억해야 될 변수 3개와 사이클 체크시 초기화 되는 변수들을 나열하여 변화를 체크한 것입니다

 

변화 과정

 

변화 과정을 추적하다보면 문제가 발생하는 상황을 알수 있습니다

 

 

 

전체적인 흐름은

 

  1. 기존에 발견되지 않은 간선히스토리가 생기게 된다면(_lockHistory)
    DFS 를 처음노드부터 끝 노드까지 돌면서 역 순환(데드락)이 발생하는지 확인한다
  2. 역순환이 있는지 확인하기 위해 DFS 실행 CheckCycle_startDFS(); //Dfs 시작
    이때 첫 노드부터 끝 노드까지 모두다 조사한다 (비어있는 5번 노드 같은게 있을수 있기 때문에)
    1. lockCount(lock을 잡는 수=노드 수) 만큼, 즉 노드 개수 만큼 Dfs 를 재귀적으로 돌린다
    2. 상황에 따라 5번과 같은 노드가 있을수 있기 때문에(대부분 한번 DFS 가 실행 되고 난 이후엔 연결된 대부분의 노드는 한번씩 방문했다 가정한다면 방문했던걸 다시 또 방문하진 않게 된다고 이해하면 수월합니다)

 

_vecDiscoveredOrder 변수 같은 경우는 CheckCycle_startDFS 함수에서 미리 한번 초기화 한다음 dfs 로 들어가는 변수로 방문했던 적이 있었던 노드인지를 판별하며 만약 방문했었다면 dfs 탐색을 중지하는 역할을 합니다

dfs 특성상 한쪽으로 쭉 내려가는 특징이 있기 때문에 한쪽으로 쭉 방문하면서 발견 순서를 저장하게 되고

만약 탐색이 아직 끝나지 않은 노드를 방문했을때 방문순서 전과 후의 상황을 보고 역방향인지 판별합니다

 

// here가 there보다 먼저 발견되었다면, there는 here의 후손이다. (순방향 간선)
if (_vecDiscoveredOrder[here] < _vecDiscoveredOrder[there])
{

이때는 순방향 간선

continue;

}

 

// 순방향이 아니고, Dfs(there)가 아직 종료하지 않았다면, there는 here의 선조이다. (역방향 간선)
if (_vecFinished[there] == false)
{

이곳에 들어오면 역방향 간선이다

메세지 발생!

}

 

 

class DeadLockProfiler
{
public:

	//name : lock 이름
	void PushLock(const char* name);
	void PopLock(const char* name);

	//사이클이 일어나는지 판별
	void CheckCycle_startDFS();

private:
	void Dfs(int32 index);

private:

	//name 을 id로 매핑
	unordered_map<const char*, int32>	_nameToId;

	//id를 name 으로 매핑
	unordered_map<int32, const char*>	_idToName;

	//lock 이 실행되는것을 추적하기 위한 것
	stack<int32>						_lockStack;

	//n번 노드에서 n+1 번노드로 가는  간선으로 이동하는 내역
	//어떤 락이 어떤 락을 잡았는지에 대한 간선 히스토리
	map<int32, set<int32>>				_lockHistory;

	Mutex _lock;

	//사이클을 판별하기 위한 vector로 checkCycle() 에서 사용되는 벡터이다
	//방문 순서를 추적 : 노드가 발견되는 순서를 기록하는 배열
	vector<int32>	_vecDiscoveredOrder; 

	//노드가 발견된 순서를 위한 카운팅
	int32			_discoveredCount = 0; 

	//dfs(index)가 종료 되었는지 여부
	vector<bool>	_vecFinished; 

	//내가 발견된 부모가 누군지 추적을 위한 벡터
	vector<int32>	_vecParent;
};

 

void DeadLockProfiler::PushLock(const char* name)
{
	//아이디를 찾거나 발급한다
	//검출 목적이라 그냥 mutex를 사용한다, debug 시 실행되지 않을 구문이기 때문에
	LockGuard guard(_lock);		

	//현재 노드의 id 를 찾거나 발급한다.
	int32 lockNode_Id = 0;

	auto findIt = _nameToId.find(name);
	if (findIt == _nameToId.end())
	{
		//새로운 아이디를 할당한다
		lockNode_Id = static_cast<int32>(_nameToId.size());
		_nameToId[name] = lockNode_Id;	//이름을 아이디와 매핑
		_idToName[lockNode_Id] = name;	//아이디를 이름과 매핑
	}
	else
	{
		lockNode_Id = findIt->second;
	}

	// 잡고 있는 락이 있었다면
	if (_lockStack.empty() == false)
	{

		// 기존에 발견되지 않은 간선히스토리가 생기게 된다면(_lockHistory)
		//DFS 를 처음노드부터 끝 노드까지 돌면서 역 순환(데드락)이 발생하는지 확인한다

		const int32 prevId = _lockStack.top();
		if (lockNode_Id != prevId)
		{
			set<int32>& history = _lockHistory[prevId];
			if (history.find(lockNode_Id) == history.end())
			{
				//history : prevId 이 전노드 기준 현재노드까지 간 내역을 history 에 담고 있게 되는 것
				//prevId 에서 방문 가능한 노드 중에 방문하지 못했던 노드라면 현재 history 에 lockNode_Id 추가 
				//새로운 간선이 추가 된것!!
				history.insert(lockNode_Id);
				

				//그럼으로 역순환이 있는지 확인하기 위해 DFS 실행
				CheckCycle_startDFS(); //Dfs 시작
			}
		}
	}


	//현재(lockNode_Id) 와 이전 node id 를 비교하기위해 한타이밍 늦게 추가한다
	_lockStack.push(lockNode_Id);
}


//Dfs 시작 하면서 역순환이 있는지 확인한다
//첫 노드부터 끝 노드까지 모두다 조사한다
void DeadLockProfiler::CheckCycle_startDFS()
{
	const int32 lockCount = static_cast<int32>(_nameToId.size());

	//vector 인  _discoveredOrder 를 lockCount 개수 만큼 -1로 초기화
	_vecDiscoveredOrder = vector<int32>(lockCount, -1);
	_discoveredCount = 0;
	_vecFinished = vector<bool>(lockCount, false);
	_vecParent = vector<int32>(lockCount, -1);


	//lockCount 만큼, 즉 노드 개수 만큼 Dfs 를 재귀적으로 돌린다
	//혹시나 5번과 같은 노드가 있을까봐, 대부분 한번 DFS 가 실행 되고 난 이후엔 대부분의 노드는 한번씩 방문했기 마련이다(꼭 그런건 아니지만 이해를 위해)
	for (int32 lockId = 0; lockId < lockCount; lockId++)
	{
		Dfs(lockId);
	}
		

	// 연산이 끝났으면 정리한다.
	_vecDiscoveredOrder.clear();
	_vecFinished.clear();
	_vecParent.clear();
}

void DeadLockProfiler::Dfs(int32 here)
{
	//"아직 탐색이 끝나지 않은 상태"에서 발견했었던 노드라면 발견했던 노드들의 자식들은 모두 봤음으로 해당 노드는 중지시킨다
	//현재 노드가 발견 됐었던 노드라면 dfs 중지시킨다
	if (_vecDiscoveredOrder[here] != -1)
	{
		//방문했던 적이 있던 노드라면 역추적 검사 하지 않는다
		return;
	}
		
	//아직 탐색이 끝나지 않은 상태에서 발견되지 않았던 노드라면 _vecDiscoveredOrder에 현재 노드에 대한 발견 순서를 기록한다
	_vecDiscoveredOrder[here] = _discoveredCount++;

	//현재 노드 기준으로 모든 인접한 정점을 순회한다.
	auto findIt = _lockHistory.find(here);
	if (findIt == _lockHistory.end())
	{
		//현재 노드기준으로 방문했던적 이력이 아무것도 없다면 현재 노드를 처음 방문한것임으로
		//현재 노드를  방문 완료
		_vecFinished[here] = true;
		return;
	}


	//방문했던 이력이 있다면 사이클링 조사 시작
	set<int32>& nextSet = findIt->second;
	for (int32 there : nextSet)
	{
		// 아직 방문한 적이 없다면 방문한다.
		if (_vecDiscoveredOrder[there] == -1)
		{
			_vecParent[there] = here;
			Dfs(there);	//노드에 연결된 노드들 돌아가기(자식 간선들)
			continue;
		}

		//노드를 방문했던 적이 있는 노드라면 서로 순서를 따진다

		// here가 there보다 먼저 발견되었다면, there는 here의 후손이다. (순방향 간선)
		if (_vecDiscoveredOrder[here] < _vecDiscoveredOrder[there])
		{
			continue;
		}

		// 순방향이 아니고, Dfs(there)가 아직 종료하지 않았다면, there는 here의 선조이다. (역방향 간선)
		if (_vecFinished[there] == false)
		{
			printf("%s -> %s\n", _idToName[here], _idToName[there]);

			int32 now = here;
			while (true)
			{
				printf("%s -> %s\n", _idToName[_vecParent[now]], _idToName[now]);
				now = _vecParent[now];
				if (now == there)
					break;
			}

			CRASH("DEADLOCK_DETECTED");
		}
	}

	_vecFinished[here] = true;
}



void DeadLockProfiler::PopLock(const char* name)
{
	LockGuard guard(_lock);

	if (_lockStack.empty())
		CRASH("MULTIPLE_UNLOCK");

	int32 lockId = _nameToId[name];
	if (_lockStack.top() != lockId)
		CRASH("INVALID_UNLOCK");

	_lockStack.pop();
}

 

 

lock 코드에 아래 처럼 코드를 추가하면 됩니다

void Lock::WriteLock(const char* name)
{
#if _DEBUG
	GDeadLockProfiler->PushLock(name);
#endif



void Lock::WriteUnlock(const char* name)
{
#if _DEBUG
	GDeadLockProfiler->PopLock(name);
#endif


void Lock::ReadLock(const char* name)
{
#if _DEBUG
	GDeadLockProfiler->PushLock(name);
#endif



void Lock::ReadUnlock(const char* name)
{
#if _DEBUG
	GDeadLockProfiler->PopLock(name);
#endif

 

 

 

 

 

반응형

Reader-Writer lock 은 쓰거나 읽거나 둘중 하나만 동작하게 하여 동작하게 하는 방식인데

 

보통 주로 읽는 용도로 사용되다가 어쩌다 한번 쓰게 되는용도로 사용 될때 즉

보통 읽는 처리가 대부분인 상황에서 어쩌다가 간혹가다가 쓸때가 있다면

 

Reader-Writer lock 을 고려해 볼 수 있고 이것은 상호배타적으로 동작한다

쓸때 읽는게 안된다거나 읽을때 쓰는게 안된다거나..

 

여기서 병합영역을 최소화 하는것이 중요한데

뮤택스를 사용하지 않고 빠르게 처리하기 위해 CAS 를 사용해 non block thread 로 처리를 할 수있다

 

Reader-Writer lock

정리하자면 대부분 읽는 처리를 할건데 어쩌다 한번 쓰는것 때문에 뮤택스를 써서 읽을때마다 병합을 처리해야 하는 비용을 줄인다는것

 

방식은 spinlock 방식으로 사용한다 : 잠시만 데이터를 참조했다가 놔주는 형태가 될것임으로

 

 

 

Lock 컨셉 설명

Lock 를 커스터마이징하여 사용하는데

_lockFlag 를 상위 16 bit는 write 할 스레드 ID 를 저장하는 용도이고 하위 16 bit 는 Read 할때 Read 된 스레드 만큼 카운트를 증가 하는 방식이다 

 

단 write 할때의 상위 16 비트는 한번에 한 쓰레드만이 쓸수 있으며 누적해서 쓸때의 카운트는 _lockFlag  에 누적시키는 것이 아닌 별도의 카운트 변수로 사용한다

 

 

* _lockFlag :  상위를 Write 상태인지, 하위를 Read 상태인지로 보겠다는 것
* 비어 있다면 끼어들어서 Lock 을 차지 할수 있고 상위에 자신의 threadID 를 넣어 획득/소유권을 얻을 수 있다
* [WWWWWWWW][wwwwwwww][RRRRRRRR][rrrrrrrr])
* W : writeFlag (Exclusive Lock Onwer THreadID   //독립적인 스레드 ID TLS 에 아이디 생성
* R : ReadingFlag (Shared Lock Count)

 

 

 

 

 

lock 함수 설명 요약

 

 

WriteLock()

  • 쓸때는 _lockFlag 에 쓰레드 ID 를 쓰는데 현재 _lockFlag를 소유한 스레드만이 쓰기를 할수 있고 다른 쓰레드는 
    현재 쓰레드가 소유권을 포기 하기 전까지 소유권 획득 불가로 소유권은 한 스레드만 갖게 한다
  • 동일한 스레드가 소유권을 갖으려고 할대 별도의 변수 _writeCount를 두어 이 값이 증가하게 처리한다
    즉 동일 스레드가 재귀적으로 획득 하려고 할때 

writeUnlock() :

  • 소유했던 스래드의 일처리가 끝난다음 _writeCount 를 하나씩 감소시키고 
    _writeCount 가 0 이 되면(코드상0이 되기 직전에) _lockFlag   값을 0 으로 밀어준다

 

readLock()

  • 아무도 소유하고 있지 안을때(다시말해 write 를 하고 있지 안을때)는 경합해서 공유 카운트를 올리게 한다
  • 동일한 스레드가 소유하고 있다면 _lockFlag(공유 카운트)   값을 증가시켜준다
    => CAS 를 사용하게 될텐데 이때 비교는 _lockFlag 에 누구도 write 하지 않은 상태에서(소유권이 없는 상태에서)
    readCount(_lockFlag의 하위 16비트) 가 같다면 _lockFlag 값을 1 증가 시키겠다는 얘기

readUnlock() :

  • _lockFlag.fetch_sub(1) 을 하여 하위 16비트에서 1 씩을 빼준다
  • _lockFlag 이 되면 다른 스레드에서 소유권을 획득하여 쓰기를 하거나 또는 읽기스레드가 먼저 읽어 카운트를 올리거나 할 수 있다

 

 

 

 

 

 

 

/*
* _lockFlag :  상위를 Write 상태인지, 하위를 Read 상태인지로 보겠다는 것
* 비어 있다면 끼어들어서 Lock 을 차지 할수 있고 상위에 자신의 threadID 를 넣어 획득/소유권을 얻을 수 있다
* [WWWWWWWW][wwwwwwww][RRRRRRRR][rrrrrrrr])
* W : writeFlag (Exclusive Lock Onwer THreadID   //독립적인 스레드 ID TLS 에 아이디 생성
* R : ReadingFlag (Shared Lock Count)
*/

class Lock
{
public :

	enum : uint32
	{
		EMPTY_FLAG = 0x0000'0000,
		ACQUIRE_TIMEOUT_TICK = 10000,			//최대로 기다려줄 tick 시간
		MAX_SPIN_COUNT = 5000,						//스핀 카운트를 최대 몇번돌것인가
		WRITE_THREAD_MASK = 0xFFFF'0000,			//비트 플래그에서 상위 16 를 뽑아오기 위한 마스크
		READ_COUNT_MASK = 0X0000'FFFF
	};


public :

	void writeLock();
	void writeUnlock();
	void readLock();
	void readUnlock();

private :
	atomic<uint32> _lockFlag = EMPTY_FLAG;
	uint16 _writeCount = 0;
};

 

 

 

 


한가지 정책사항이 있는데 

동일한 스레드 내에서 즉  write 스래드내에서 다시 write 하는 경우에는 ok 즉 
 write -> write 하는 경우 OK
 Write -> read 하는경우도 OK

 

 하지만 동일한 스레드내에서 
 Read -> Write 하는 경우는 막는다

 

현 예제에선 나오지 않지만 구문을 작성하다보면 쓰면서 읽거나 읽으면서 쓰거나를 해야 되는 상황이 발생할수 있는데 이Read->Write 상황은 허용하지 않겠다는것

 

 



코드 설명

  • _lockFlag에 자신의 아이디를 썼을때(소유)  _lockFlag 를 해제 하기 전까지 다중 스레드의 Read 불가
    _lockFlag 가 아무도 소유되지 않은 상위 16바이트가 0 일때 , 즉 아무도 안소유한 상태일때, _lockFlag 다중 스레드들로부터 Read 가 가능
  • 아무도 소유하고 있지 않고 다중스레드로부터 Read 하는 _lockFlag 가 증가하고 있는데, 이떼 write 쓰레드가 들어와서 쓸때는 _lockFlag 에 쓰레드 ID 를 쓰는데 현재 _lockFlag를 소유한 스레드만이 쓰기를 할수 있고 다른 쓰레드는 현재 쓰레드가 소유권을 포기 하기 전까지 소유권 획득 불가 
//쓸때는 _lockFlag 에 쓰레드 ID 를 쓰는데 현재 _lockFlag를 소유한 스레드만이 쓰기를 할수 있고 다른 쓰레드는 
//현재 쓰레드가 소유권을 포기 하기 전까지 소유권 획득 불가
void Lock::writeLock()
{
	//동일한 스레드가 송유하고 있다면 write count 를 증가시켜준다
	const uint32 lockThreadID = (_lockFlag.load() & WRITE_THREAD_MASK) >> 16;
	if (LThreadId == lockThreadID)
	{
		++_writeCount;	
		//이때 _lockFlag 에는 더하지 않는다
		return;
	}
	 
	//아무도 소유 및 공유하고 있지 않다면 경합해서 소유권을 얻는다
	/*
	* 하지칸 코드를 이렇게 작성하면 현재 스레드에서 비교를 하려고 if 문을 방금 통과 했는데 다른 스레드가 치고 나가서 if문 비교뿐만 아니라 _lockFlag 에 값을 써버린다면
	* 현 기존 _lockFlag 그가 망가짐 그래서 아래 처럼 쓰면 안되고 CAS 연산으로 비교 및 할당을 한줄로 처리해야함 원자적 연산으로
	if (_lockFlag == EMPTY_FLAG)
	{
		const uint32 desired = ((LThreadId << 16) & 16 & WRITE_THREAD_MASK);
		_lockFlag = desired;
	}
	*/


	const int64 startTick = ::GetTickCount64();


	

	//desired 변수 상위 16 비트에 현재 스레드 ID를 세팅
	const uint32 desired = ((LThreadId << 16) & WRITE_THREAD_MASK);
	while (true)
	{ 
		for (uint32 spintCount = 0; spintCount < MAX_SPIN_COUNT; ++spintCount)
		{
			//_lockFlag 가 비어 있다면 현재 쓰래드 ID 를  _lockFlag의 상위  WWWWWWWWwwwwwwww 에 write 플래그쪽으로 ID를 넣어준다 => 쓰기가 시작돼다는 기
			//즉 이것은 write 또는 read 둘중 하나만 있는 상태가 된다

			//exptedted 를 while 바깥으로 빼면 안된다 왜냐면이 값은 compare_exchange_strong 가 실행된다음  return 이 false 면 
			//expected 값을 _lockFlag 로 바꾸기 때문에
			uint32 expected = EMPTY_FLAG;	
			//_lockFlag 가 아무도 소유하고 있지 않은 상태에서만 쓰기가 한 쓰레드만 허용 된다는 것
			if (_lockFlag.compare_exchange_strong(expected, desired))
			{
				//이곳은 한 스레드만 들어올 수 있음
				++_writeCount;		//증가 시키는 이유는 재귀적을 write Lock 을 호출하면 증가 시켜주기 위해서
				//이때 _lockFlag 에는 더하지 않는다
				return;
			}
		}

		//소유권 선점 시간이 너무 오래 걸리면 크래쉬를 낸다 => 이것도 문제가 될 수 있기 때문에(dead lock 일 수 있음)
		if (::GetTickCount64() - startTick >= ACQUIRE_TIMEOUT_TICK)
		{
			CRASH("Lock_TimeOUT");
		}

		//실패하면 소유권 선점을 잠시 쉰다
		this_thread::yield();
	}


}

void Lock::writeUnlock()
{
	//ReadLock  다 풀기 전에넌 write unlock 은 불가능하게 할것임
	if ((_lockFlag.load() & READ_COUNT_MASK) != 0)	//write 상태인지 확인 =>wite 상태여야 unlock 하는게 가능
	{
		CRASH("INVALID_writeUnlock");
	}

	const int32 lockCount = --_writeCount;
	if (lockCount == 0)
	{
		_lockFlag.store(EMPTY_FLAG);
	}

}


//TheadID 

//동일한 스레드가 소유하고 있다면 성공하게 할것인데 
//아무도 소유하고 있지 안을때(write 를 하고 있지 안을때)는 경합해서 공유 카운트를 올리게 한다
void Lock::readLock()
{
	//동일한 스레드가 소유하고 있다면 _lockFlag 자체에 count를 증가시켜주고 끝낸다
	const uint32 lockThreadID = (_lockFlag.load() & WRITE_THREAD_MASK) >> 16;
	if (LThreadId == lockThreadID)
	{
		_lockFlag.fetch_add(1);
		return;
	}

	const int64 startTick = ::GetTickCount64();
	while(true)
	{
		for (uint32 spinCount = 0;spinCount < MAX_SPIN_COUNT; ++spinCount)
		{
			uint32 expected = (_lockFlag.load() & READ_COUNT_MASK);
			//Read 부분만 읽어와서 리드 부분이 _lockFlag 과 같다면 +1 증가 , 즉 write 상태가 아니라는 것을 의미
			//=> 실패하게 되는 케이스
			//1. 누군가가 write 로 _lockFlag 을 잡고 있는 경우 => 이땐 증가 불가
			//.2. 이 코드 중간에 다른 스레드가 들어와서 _lockFlag 중 Read 부분의 카운트를 다른 스레드가 증가시킨경우 => 대기해서 다음에 증가 시도처리

			//여기서의 비교는 누구도 write 하지 않은 상태에서 readCount 가 같다면 증가 시키겠다는 얘기
			if (_lockFlag.compare_exchange_strong(expected, expected + 1))
			{
				return;
			}
		}

		//리드 시도 하려는 것이 너무 오래 걸리면 크래쉬를 낸다 => 이것도 문제가 될 수 있기 때문에(dead lock 일 수 있음)
		if (::GetTickCount64() - startTick >= ACQUIRE_TIMEOUT_TICK)
		{
			CRASH("Lock_TimeOUT");

		}
		this_thread::yield();

	}

}

void Lock::readUnlock()
{
	//fetch_sub 은 1을 빽 이전 값을 리턴한다
	if ((_lockFlag.fetch_sub(1) & READ_COUNT_MASK) == 0)
	{
		//만약 이곳에 들어온다면 이전의 read 카운트가 이미 0 이였기 때문에 여기서 더  빼면 안되는 상황이라
		//안전상 크래쉬를 걸어준다
		CRASH("MULTIPLE_UNLOCK");
	}
		
}

 

 

 

 

 

 

ReadLockGuard, WriteLockGuard

Lock 을 std::lock_guard 처럼 사용하기 위한 중간자라고 생각하면 됩니다

 

class ReadLockGuard
{
public :

	ReadLockGuard(Lock& lock) : _lock(lock)
	{
		_lock.readLock();
	}
	~ReadLockGuard()
	{
		_lock.readUnlock();
	}

private :
	Lock& _lock;
};


class WriteLockGuard
{
public:

	WriteLockGuard(Lock& lock) : _lock(lock)
	{
		_lock.writeLock();
	}
	~WriteLockGuard()
	{
		_lock.writeUnlock();
	}

private:
	Lock& _lock;
};

 

 

 

 

 

 

 

 

 

 

 

main 실행 구문

 

ThreadManager 로 스레드를 관리하는데 자세한 사항은 이것을 참고하면 됩니다

https://3dmpengines.tistory.com/2220

 

Threadmanager

thread 생성 하면서 TLS 초기화를 해주는 스레드 메니저 클래스이고 대단한 기능이 있는건 아니고 관리적인 측면의 클래스라 보면 됩니다 tls 는 이글을 참고하면 됩니다 https://3dmpengines.tistory.com/220

3dmpengines.tistory.com

 

 

TestLock 클래스내에 Lock 을 사용하고 있느 이 lock 을 WriteLockGuard 나 ReadLockGuard 를 사용하여 NonBlocking thread 방식으로 처리 하는것을 볼수 있으며 그래서 뮤택스가 없다는 것을 알 수 있습니다

const int32 lockCount = 1;

const int32 lockIndex = 0;

class TestLock
{
	
	Lock _locks[lockCount];

public:
	int32 testRead()
	{
		ReadLockGuard readLockGuard0(_locks[lockIndex]);

		if (_queue.empty())
		{
			return -1;
		}
		return _queue.front();
		
	}

	void testPush()
	{
		WriteLockGuard writeLockGuard0(_locks[lockIndex]);

		_queue.push(rand() % 100);

	}

	void testPop()
	{
		WriteLockGuard writeLockGuard0(_locks[lockIndex]);

		if (_queue.empty() == false)
		{
			_queue.pop();
		}
	}

private :
	queue<int32> _queue;
};


TestLock testLock;



void threadWrite() 
{
	while (true)
	{
		testLock.testPush();
		this_thread::sleep_for(1ms);
		testLock.testPop();
	}
}


void threadRead()
{
	while (true)
	{
		int32 value = testLock.testRead();
		cout << value << endl;
		this_thread::sleep_for(1ms);
	}
}


int main()
{

	for (int32 i = 0; i < 2; ++i)
	{
		GThreadManager->launch(threadWrite);
	}


	//this_thread::sleep_for(1ms);


	for (int32 i = 0; i < 5; ++i)
	{
		GThreadManager->launch(threadRead);
	}
	

	GThreadManager->Join();

	return 0;
}

 

 

실행 결과  

실행결과 창에 뜬느 값의 의미가 크게 있진않습니다

 

 

 

한가지 주의사항

 

while(...) 반복문
{
	uint32 expected = EMPTY_FLAG;
	if (_lockFlag.compare_exchange_strong(expected, desired))
}

 

이런 구문이 있다고 했을때 반복문 안의 uint32 expected = EMPTY_FLAG; 이 값을 반복문 바끝으로 빼면

굳이 필요 없는 일을 안해도 될텐데 라는 생각을 할 수도 있는데, 이는 오산입니다

왜냐면 CAS 연산하면서 즉 compare_exchange_strong 이 함수가 실행되고 나면 실패시 expected  값을 내부적으로 바꿔주기 대문에 반복문 안에 저렇게 초기화 해주는것이 로직상 의미를 갖을 수 있기 때문입니다

 

반응형

std::this_thread::yield

 

다른 스레드가 실행 될 수 있게 허용하기 위해서 스레드의 실행을 재스케줄링하기 위한 구현 힌트를 주는 함수이다

 

정확한 동작은 구현에 의존되는데, 특히 OS 스케줄러의 머신과 시스템의 상태에 따라 그렇다

 

예를 들어 처음 들어온것이 처음 나가는 리얼타임 스케쥴러 (리눅스에서 SCHED_FIFO)는 현재 스레드를 지연 시킬수 있고 실행 준비된 동일 우선순위의 스레드의 큐 뒤에 추가될수도 있다

 

그리고 만약 동일한 우선순위의 다른 스레드들이 없다면 yield() 는 어떤 영향도 주지 못한다

 

#include <iostream>
#include <chrono>
#include <thread>
 
// "busy sleep" while suggesting that other threads run 
// for a small amount of time
void little_sleep(std::chrono::microseconds us)
{
    auto start = std::chrono::high_resolution_clock::now();
    auto end = start + us;
    do {
        std::this_thread::yield();
    } while (std::chrono::high_resolution_clock::now() < end);
}
 
int main()
{
    auto start = std::chrono::high_resolution_clock::now();
 
    little_sleep(std::chrono::microseconds(100));
 
    auto elapsed = std::chrono::high_resolution_clock::now() - start;
    std::cout << "waited for "
              << std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count()
              << " microseconds\n";
}
Possible output:

waited for 128 microseconds

 

 

 

 

ref : https://en.cppreference.com/w/cpp/thread/yield

반응형

'운영체제 & 병렬처리 > Multithread' 카테고리의 다른 글

DFS Deadlock Detection  (0) 2022.10.02
Reader-Writer lock  (0) 2022.10.02
제프리 리처의 유저모드 동기화들  (0) 2022.09.28
Threadmanager  (0) 2022.09.27
Lock-Free Queue  (0) 2022.09.26

 

 

아래 설명 되는 유저모드 동기화(또는 유사한것) 외에는  커널모드 동기화가 되겠지요

 

1. 원자적 접근 : Interlocked 함수들

2. 캐시 라인( 유저 모드 동기화라기 보단.. 캐시 히트를 높이자, 가시성에 주의 할것)

3. 건너 띔

4. Critical Section

5. Slim Reader Writer lock

6. Condition Variable

 

 

InterlockedExchaged 는  스핀락을 구현해야 하는 경우에 유용하게 사용 될 수 있다

 

 

유저모드 동기화 : atomic , interlock..., Critical Section,  (Slim) Reader-Wirter lock,   condition variable

 

 

 

 

 

 

 

 

캐시 미스가 적게 일어나도록 신경 써야 한다

 

 

 

 

 

다른 캐쉬에 있는걸 인지하지 못한다는 것인데 c++ atomic 을 사용하면 이문제를 해결 할 수 있다

 

 

 

 

 

 

 

 

 

 

 

 

 

 

SRW (슬림 리더-라이터 락 )

 

 

read 를 주로 하는 구조에서 어쩌다 데이터를 한번 쓸때 

ex) 운영 툴로 고정 데이터를 어쩌다가 한번 변경 줘야 할때

 

하지만 주의할것은 크리티컬 섹션과 다르게 한 스레드가 여러번 락을 걸 수 없다

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Tip

 

락을 장시간 점유하지 말 것!

장시간 작업이 필요할 땐 리소스를 복사해서 사용

 

 

 

 

ref : https://www.slideshare.net/hiscale/120128-windows-viacpp8

반응형

'운영체제 & 병렬처리 > Multithread' 카테고리의 다른 글

Reader-Writer lock  (0) 2022.10.02
std::this_thread::yield 스케줄러에게 지연 힌트를 줌  (0) 2022.09.28
Threadmanager  (0) 2022.09.27
Lock-Free Queue  (0) 2022.09.26
atomic 가시성, 원자성  (0) 2022.09.24

 

 

 

 

 

thread 생성 하면서 TLS 초기화를 해주는 스레드 메니저 클래스이고

대단한 기능이 있는건 아니고 관리적인 측면의 클래스라 보면 됩니다

 

 

tls 는 이글을 참고하면 됩니다

https://3dmpengines.tistory.com/2206?category=511467 

 

TLS(Thread Local Storage) : thread_local

쓰레드를 잘 배분한다 했어도 작업에 따라 한곳으로 요청이 몰릴 수가 있는데 Headp, Data 영역은 쓰레드들 끼리 공유해서 사용하게 되어 lock 등으로 제어를 해줘야 한다 TLS 라는게 스택과 (Heap,Data

3dmpengines.tistory.com

 

#include <thread>
#include <functional>

class Threadmanager
{

public :

	Threadmanager();
	~Threadmanager();


	void launch(function<void(void)> callback);
	void Join();

	static void InitTLS();
	static void DestroyTLS();


private :
	mutex _lock;
	vector<thread> _threads;
};

 

 

 

#include "pch.h"
#include "Threadmanager.h"

Threadmanager::Threadmanager()
{
	//main 함수에 대한 thread id 에 해당
	InitTLS();
}

Threadmanager::~Threadmanager()
{
	Join();
}

void Threadmanager::launch(function<void(void)> callback)
{
	lock_guard<mutex> guard(_lock);

	_threads.push_back( thread ( [=]() 
	{
		InitTLS();		//새로 생성된 thread id 에 해당
		callback();
		DestroyTLS();
	})
	);

}

void Threadmanager::Join()
{
	for (thread& t : _threads)
	{
		if (t.joinable())
		{
			t.join();
		}
	}
}

void Threadmanager::InitTLS()
{
	static atomic<uint32> SThreadId = 1;
	LThreadId = SThreadId.fetch_add(1);

}

void Threadmanager::DestroyTLS()
{
	//동적으로 생성된게 있다면 여기서 삭제 처리 함
}

 

 

 

 

 

Threadmanager* GThreadManager = nullptr;

CoreGlobal::CoreGlobal()
{
	GThreadManager = new Threadmanager;
}

CoreGlobal::~CoreGlobal()
{
	delete GThreadManager;
}

스레드 메니저의 생성은 global 변수로 생성 해놓고 CoreGlobal 클래스를 전역으로 생성하면 CoreGlobal 클래스의 생성자에 의해서 연달아 메니저가 순서를 고려하여 생성하도록 만든 것으로 이역시 자주 볼 수 있는 구조입니다

 

 

 

 

 

실행 코드

CoreGlobal core;



void threadMain() {
	while (true)
	{
		cout << "Thread ID : " << LThreadId << endl;
		this_thread::sleep_for(1s);
	}
}



int main()
{

	for (int32 i = 0; i < 5; ++i)
	{
		GThreadManager->launch(threadMain);
	}
	

	GThreadManager->Join();

	return 0;
}

 

 

실행 결과

 

 

스레드 특성상 글자가 엇나가는 것처럼 보이는건 감안하고 보면 되겠습니다

 

 

반응형

 

이글을 읽기 위해서 이 글들에 대한 선행 이해가 필요합니다

https://3dmpengines.tistory.com/2210?category=511467 

 

LockFree - Stack (1)

경우의 수는 이것보다 훨씬 많을 수 있지만 몇개만 추려보았고 크래쉬기 나지 않고 Lock free를 시도 하려는 중간 과정이라 보면 되겠다 스택이기 때문에 끝에서 추가되고 삭제 된다는것과 다른

3dmpengines.tistory.com

https://3dmpengines.tistory.com/2212?category=511467 

 

LockFree - Stack (2) - 삭제 처리

삭제의 핵심은 분리를 미리하고 분리한 대상을 현재 스레드의 스택 메모리에 삭제할 주소를 지역변수로 들고 있다가 다시 자신의 스레드로 돌아왔을때 분리된 삭제할 메모리를 그때서야 삭제

3dmpengines.tistory.com

 

 

[Queue] 컨셉

 

 

기본 적인 Queue 의 컨셉은

[data][data][value][dummy]
[head][tail]

 

 

 

 

head 와 tail 이 있고 추가는 tail 에서 추가를 하며

제거는 head 에서 처리한다

 

그런데 최초에는 

 

[dummy]
[head][tail]

아무것도 빈 노드가 아닌 

 

최초에 head 와 tail 은 dummy 노드를 가리키고 있는 상태에서 시작된다

 

push 노드에 추가 할떄는 dummy 에 데이터를 넣고 그다음 tail의 next 노드에

새로운 더미를 만들어 추가해 나가는 방식이다

제거 반대로 head 에서 제거 해 나가는 방식

 

 

[Lock-Free Queue]

  • lock free stack 과 동일하게 노드는 shared_ptr 로 구현하지 않는다 ref count 는 수동으로 구현, shared_ptr 자체가 lock free 가 아님으로

  • Non blocking thread 는 여러 쓰레드에서 공용이 되는 데이터를 쓸려고 할때 atomic  compare-and-swap (CAS)  로 연산을 해서 다른 스레드에 영향을 주지 않고 한방에 연산 처리 하는부분이 중요하다

  • 그런 부분들에서 문제가 생길 수 있기 때문에 경합 지점이 있는 부분에선 CAS 함수인 atomic.compare_exchange_strong  함수로 해결을 해야한다 (atomic 은 가시성, 원자성 해결)

  • 그리고 아무도 참조 하고 있지 않은 데이터를 shared_ptr 을 사용하는건 아니지만 ref count를 수동으로 구현하였기에 이 수치가 0이 되면 Node 를 제거하는 방식으로 처리가 되어 있다

  • push 쪽은 여러 스래드가 동시에 실행된다 가정 했을 때 참조권을 먼저 획득하고 그다음 실제 노드에 대한 소유권을 획득한 스레드가 tail 에 새로운 dummy 노드를 추가하여 노드를 연결해 나가는 방식으로 진행 된다

  • 찰나의 순간에 추가된 노드가 바로 삭제될 수도 있기에 그 부분을 위한 처리가 필요하다

 

 

 

 

[설명] Push

 

* 큐이다 보니 head 오 tail 이 있고 head 와 tail 사이에 데이터를 넣는 방식
* 데이터를 넣을때는 tail 즉 data 의 끝에 추가하고
* 빼올때는 head 데이터의 제일 앞에서 빼온다

[data][data][value][]
[head][tail]

그런데 최초에 head 와 tail 은 dummy [] 노드를 가르키는 방식으로 처리한다
그래서 head 와 tail 이 같은 노드를 가리키고 있다면 비어 있는 것으로 판단할 것이다


[]
[head][tail]

 

void Push(const T& value)
{

문제는 head 에서 삭제를 하고 tail 쪽에서 양쪽에서는 삭제를 하기 때문에 양쪽에서 변화가 생길 수 있다는 것

좀더 정확히는 tryPop 에서 삭제 하는건 맞는데 push 에서 찰나의 순간때문에 push 된 데이터가 바로 삭제 되는 경우가 발생 할수 있기에 push 에도 삭제 가능성이 있는 로직이 들어가게 된다 (설명은 이후에..)

 

 

먼저 tail 쪽 먼저 보자..

 

 

새로운 데이터를 준비한다
unique_ptr<T> newData = make_unique<T>(value);
CountedNodePtr dummy;
dummy.ptr = new Node;
dummy.externalCount = 1;

 

최초에 _tail 을 가져오긴 하지만

최초의 _tail 은 dummy node 이다 
CountedNodePtr oldTail = _tail.load(); 

 

queue 방식이 tail 에다가 계속 추가하는 방식이라 dummy 이기도함
oldTail 변수는 스레드별로 별도로 존재 (스택에 존재하는 변수 임으로)

 



while (true)
{

 

[참조권 획득 과정]


     참조권 획득 => externalCount를 현시점 기준 +1 한 스레드가 참조권을 획득한다 
     현재 스레드는 IncreaseExternalCount 함수 내에서 참조 권을 획득하지 못하면 획득 할때까지

     참조권을 획득하기 위해서 루프를 돌며 반복 시도 함

     IncreaseExternalCount(_tail, oldTail);

     => 결국 여기선 참조권을 획득한 스레드가 내려옴, 하지만 데이터에 대한 소육권은 아님


하지만 간발의 차이로 두 스레드가 참조권을 획득 할 수도 있다는 로직상 가정을 해야 한다

 

 

// 소유권 획득 (data를 먼저를 교환한 애가 이김)
T* oldData = nullptr;

 

여기서 가정을 하면서 본다
=>모든 스레드가 T* oldData = nullptr; 까지 실행 됐다는 가정 하에..

 

 

하단 로직에서 현재 스레드는 data 가 null 이면 if 안을 실행하겠지만

다른 스레드들에선 

oldTail.ptr->data 가 아직 null 이라면 if 문에서 _tail 의 ptr->data 가 null 이 안되어 다른 스레드가 
if 문 안쪽으로 실행 되지 못하고 if 문을 벗어나서 루프를 돌게 된다

* tail은 새로운 dummy 가 되기 전까지 oldTail 각 스레드별도의 스텍 상에 있는것이기 때문에 
다른 스레드는 oldTail = _tail.exchange(dummy); 이 실행 되기 전까지 계속 이전과 동일한 노드를 가리키게 되어 => 위에서의 변화 가능성이 있지만 우선 이런 경우라 가정

현 스래드의 oldTail.ptr->data 이 뀌어서 newData.get() 가 들어갔다면   

다른 스레드에서는 oldTail.ptr->data은 null 이 안되게 된다

 

//oldTail = _tail.exchange(dummy); 이 실행 된 이후 IncreaseExternalCount(_tail, oldTail);  이 함수가 다시 실행 되면서 
// 다른 스레드들의 oldTail 이 dummy 를 가리키기 전까지 계속 while 을 돌게 된다


 

[소유권 획득 과정]

oldTail 은 dummy 라 데이터를 넣어주는 시도를 먼저 함, 데이터를 넣지 못한 다른 스레드는 이 if 를 건너 뛰게 됨
성공하면 데이터를 먼저 새로운 dummy 노드인 oldTail.ptr->data 포인터가 새로운 데이터를 가리키게함
if (oldTail.ptr->data.compare_exchange_strong(oldData, newData.get()))
{
     

      여기에 들어오게 되면 실제 소유권을 획득하게 된것, 다른 스레드는 data 가 null 이 아니기 때문에 못들어옴

      oldTail.ptr->next = dummy;


tip : exchange() 리턴은 바뀌기전 _tail 값을 리턴한다
oldTail = _tail.exchange(dummy); //_tail을 새로운 dummy 로 바꿔준다, 함수 실행 후 oldTail 은 이전 _tail 값이 된다
//이렇게 호출을 하게 되면  IncreaseExternalCount(_tail, oldTail); 새로운노드가 추가 됐어도 에서의 _tail 은 현재 것이 되고 oldTail 은 마찬가지로 이전것이 된다

 

데이터를 바꾸고 다음 노드를 세팅하게 되면 externalCount를 원상태로 돌려줌 =>  감소 처리해서 메모리를 삭제해야 된다는 뜻 


그런데 push 에서도 삭제가 있는것인데 push 가 완료 되기 전 찰나에 pop 이 실행되어 노드를 삭제해야 하는 상황에 발생 할수 있는데,
정리하자면 나의 순간에 pop 이 push 보다 먼저 돌게 되면  push 에서는 노드가 없기 때문에 push 처리 도중 문제가 생길 수 있어서 push  완료 된다음 삭제를 하기 위해 externalCountRemaining 이걸로 지워도 되는지 판단하고
 문제가 없다면 push 에서도 삭제하게끔 처리를 해주기 위해 push 에서도 삭제가 들어가게 된다
FreeExternalCount(oldTail); //externalCount 를 줄여는 함수
newData.release(); //해제없이 소유권을 놓기만 함, 나중에 delete 는 알아서 해줘야 함
break;
}

 

 

 

[소유권 경쟁 실패]

현재 스레드에서 소유권까지 획득한 다음 자기자신 외에 동시에 실행되는 스레드 개수 만큼 
새로운 dummy 로 바뀌기전의  _tail의 internalCount 가 올라 가는데 
즉 현재 스레드 외에 나머지 스레드에선 소유권을 획득하지 못하였음으로 internalCount 의 값을 1 씩 낮춰준다, 마지막  internalCount == 0이 되는 스레드에서 삭제 처리를 한다
oldTail.ptr->ReleaseRef();


}
}

 

 

 

 

 

 

 

[설명] TryPop

 

// [data][data][ ]
// [head][tail]
shared_ptr<T> TryPop()
{


CountedNodePtr oldHead = _head.load();

while (true)
{

[참조권 획득 과정]

(externalCount를 현시점 기준 +1 한 애가 이김), push 와 동일


//push 에서 _tail 은 같은 곳을 바로보고 있긴 하지만 oldTail 이 지역변수를 통해 별도의 externalCount를 우선 유지한다
// [data][data][ ]
// [head][tail]
// 

 

복습 및 정리 차원에서 써놓음
push 할때 노드가  tail 스레드에서 externalCount를 증가 시켜도 IncreaseExternalCount 의 함수에서 data 노드쪽의 externalCount를 증가시켜야 적요 되는것임으로 head 쪽의 externalCount 이 바로 적용 되는것은 아니다

 

그리고 만약 서로 같은 노드를 head, tail 이 두개가 같은 노드를 가리킨다 할지라도  externalCountRemaining 이 값을 통해 지워도 되는지를 한번더 판단하게 되어 push 작업이 마무리 되기 전 먼저 삭제 처리 하는 것을 막게 되기도 한다

 

 

//IncreaseExternalCount 이 함수내에서 실제 적용 된것이 참조권을 갖는다 
IncreaseExternalCount(_head, oldHead);

Node* ptr = oldHead.ptr;

 

//[dummy]
//[head][tail]
이 부분은 쉽게 최초의 경우를 생각해보면 head와 tail 은 같은 dummy 를 가리키고 있음으로 
둘다 같은 곳을 가리킬때에는 삭제 처리 할것이 없다 이럴때는 리턴하고 종료처리하는 부분
if (ptr == _tail.load().ptr)
{
ptr->ReleaseRef();
return shared_ptr<T>();
}


여기까지 실행 됐다면 참조권은 획득한 상태

[소유권 획득 과정]

//(head = ptr->next)
if (_head.compare_exchange_strong(oldHead, ptr->next))
{
소유권이 획득 되면 리턴해서 pop 스레드 종료처리 해버림
여러 스레드가 돌아간다는 것을 상기하고자 하는 차원에서 적자면

=> 어차피 pop 하는 거라 현대 스레드에서 종료 해도 
다른 스레드에서 pop 이 돌고 있다고해도 다른 다른 스레드의 pop 들은 이 if 문들을 들어오지 않는 이상 계속 돌고 있음

데이터를 리턴했으면 pop 스레드 역할은 다 한것임으로 리턴처리 한다


//data 포인터를 얻어온다
T* res = ptr->data.load(); // exchange(nullptr); 로 하면 버그 있음!
FreeExternalCount(oldHead); //externalCount 를 줄여는 함수
return shared_ptr<T>(res);
}

ptr->ReleaseRef();
}
}

 

 

 

LockFreeQueue 코드...

#pragma once
#include <mutex>


template<typename T>
class LockFreeQueue
{
	struct Node;

	struct CountedNodePtr
	{
		int32 externalCount; // 참조권
		Node* ptr = nullptr;
	};

	struct NodeCounter
	{
		uint32 internalCount : 30; // 참조권 반환 관련
		uint32 externalCountRemaining : 2; // Push & Pop 다중 참조권 관련
	};

	struct Node
	{
		Node()
		{
			NodeCounter newCount;
			newCount.internalCount = 0;
			newCount.externalCountRemaining = 2;
			count.store(newCount);

			next.ptr = nullptr;
			next.externalCount = 0;
		}

		void ReleaseRef()
		{
			NodeCounter oldCounter = count.load();

			while (true)
			{
				NodeCounter newCounter = oldCounter;
				newCounter.internalCount--;

				// 끼어들 수 있음
				if (count.compare_exchange_strong(oldCounter, newCounter))
				{
					if (newCounter.internalCount == 0 && newCounter.externalCountRemaining == 0)
						delete this;

					break;
				}
			}
		}

		atomic<T*> data;
		atomic<NodeCounter> count;
		CountedNodePtr next;
	};

public:
	LockFreeQueue()
	{
		//초기니깐 head 와 tail 이 같은 노드를 가리키도록 한다
		CountedNodePtr node;
		node.ptr = new Node;
		node.externalCount = 1;

		_head.store(node);
		_tail.store(node);
	}

	LockFreeQueue(const LockFreeQueue&) = delete;
	LockFreeQueue& operator=(const LockFreeQueue&) = delete;
	
	/*
* 큐이다 보니 head 오 tail 이 있고 head 와 tail 사이에 데이터를 넣는 방식
* 데이터를 넣을때는 tail 즉 data 의 끝에 추가하고
* 빼올때는 head 데이터의 제일 앞에서 빼온다
*/
//[data][data][value][]
//[head][tail]

//그런데 최초에 head 와 tail 은 dummy [] 노드를 가르키는 방식으로 처리한다
//그래서 head 와 tail 이 같은 노드를 가리키고 있다면 비어 있는 것으로 판단할 것이다
//[]
//[head][tail]
	void Push(const T& value)
	{

		//문제는 head 에서 삭제를 하고 tail 쪽에서 양쪽에서는 삭제를 하기 때문에 양쪽에서 변화가 생길 수 있다는 것

		unique_ptr<T> newData = make_unique<T>(value);

		CountedNodePtr dummy;
		dummy.ptr = new Node;
		dummy.externalCount = 1;

		CountedNodePtr oldTail = _tail.load(); //최초에 _tail 은 dummy node 이다 이다, queue 방식이 tail 에다가 계속 추가하는 방식이라 dummy 이기도함
		

		while (true)
		{
			//[참조권 획득]
			// 참조권 획득 => externalCount를 현시점 기준 +1 한 스레드가 참조권을 획득한다 
			// 현재 스레드는 IncreaseExternalCount 함수 내에서 참조 권을 획득하지 못하면 획득 할때까지 참조권을 획득하기 위해서 시도를 반복적으로 함

			IncreaseExternalCount(_tail, oldTail);			

			//결국 여기선 참조권을 획득한 스레드가 내려옴, 
			//하지만 간발의 차이로 두 스레드가 참조권을 획득 할 수도 있다는 로직상 가정을 해야 한다
			// 소유권 획득 (data를 먼저를 교환한 애가 이김)
			T* oldData = nullptr;



			//모든 스레드가 T* oldData = nullptr; 까지 실행 됐다는 가정 하에..

			//oldTail.ptr->data 가 아직 null 이라면 여기 그낭하고 if 문안으로 들어가면서 _tail 의 ptr->data 가 null 이 안되어 다른 스레드가 
			//접근하지 못하고 다음 루프를 돌게 된다
			// 
			// tail은 새로운 dummy 가 되기 전까지 oldTail 각 스레드별도의 스텍 상에 있는것이기 때문에 
			// 다른 스레드는 oldTail = _tail.exchange(dummy); 이 실행 되기 전까지 계속 이전과 동일한 노드를 가리키게 되어 => 위에서의 변화 가능성이 있지만 우선 이런 경우라 가정
			// 현 스래드의 oldTail.ptr->data 이 뀌어서 newData.get() 가 들어갔다면 다른 스레드에서는 oldTail.ptr->data은 null 이 안되게 된다
			// oldTail = _tail.exchange(dummy); 이 실행 된 이후 IncreaseExternalCount(_tail, oldTail);	 이 함수가 다시 실행 되면서 
			// 다른 스레드들의 oldTail 이 dummy 를 가리키기 전까지 계속 while 을 돌게 된다
			

			//[소유권 획득]
			//oldTail 은 dummy 라 데이터를 넣어주는 시도를 먼저 함, 데이터를 넣지 못한 다른 스레드는 이 if 를 건너 뛰게 됨
			//성공하면 데이터를 먼저 새로운 dummy 노드인 oldTail.ptr->data 포인터 가 새로운 데이터를 가리키게함
			if (oldTail.ptr->data.compare_exchange_strong(oldData, newData.get()))
			{
				//여기에 들어오게 되면 실제 소유권을 획득하게 된것

				oldTail.ptr->next = dummy;
				//exchange 리턴은 바뀌기전 _tail 값을 리턴한다
				oldTail = _tail.exchange(dummy);			//_tail을 새로운 dummy 로 바꿔준다, 함수 실행 후 oldTail 은 이전 _tail 값이 된다
				//이렇게 호출을 하게 되면 	IncreaseExternalCount(_tail, oldTail);		에서의 _tail 은 현재 것이 되고 oldTail 은 마찬가지로 이전것이 된다, 새로운노드가 추가 됐어도

				//데이터를 바꾸고 다음 노드를 세팅하게 되면 externalCount를 원상태로 돌려줌
				//push 에서도 삭제가 있는것인데 push 가 완료 되기 전 찰나에 pop 이 실행되어 노드를 삭제해야 하는 상황에 발생 할수 있는데 
				// 이때 찰나의 순간에 pop 이 push 보다 먼저 돌게 되면  push 서 는 노드가 없기 때문에 push 처리 도중 문제가 생길 수 있어서 
				//push  완료 된다음 삭제를 하기 위해 externalCountRemaining 이걸로 지워도 되는지 판단하고
				//문제가 없다면 push 에서도 삭제하게끔 처리를 해주기 위해 push 에서도 삭제가 들어가게 된다
				FreeExternalCount(oldTail);	//externalCount 를 줄여는 함수
				newData.release();		//해제없이 소유권을 놓기만 함, 나중에 delete 는 알아서 해줘야 함
				break;
			}

			//소유권 경쟁 실패
			//현재 스레드에서 소유권까지 획득한 다음 자기자신 외에 동시에 실행되는 스레드 개수 만큼 
			//새로운 dummy 로 바뀌기전의  _tail의 internalCount 가 올라 가는데 
			//즉 현재 스레드 외에 나머지 스레드에선 소유권을 획득하지 못하였음으로 internalCount 의 값을 1 씩 낮춰준다, 마지막  internalCount == 0이 되는 스레드에서 삭제 처리를 한다
			oldTail.ptr->ReleaseRef();
		}
	}

	// [data][data][ ]
	// [head][tail]
	shared_ptr<T> TryPop()
	{
		

		CountedNodePtr oldHead = _head.load();

		while (true)
		{
			// 참조권 획득 (externalCount를 현시점 기준 +1 한 애가 이김)
			//push 에서 _tail 은 같은 곳을 바로보고 있긴 하지만 oldTail 이 지역변수를 통해 변도의 externalCount를 유지했다가 
			// [data][data][ ]
			// [head][tail]
			// 
			//복습 및 정리 차원에서 써놓음
			//push 할때 노드가  tail 스레드에서 externalCount를 증가 시켜도 IncreaseExternalCount 의 함수에서 data 노드쪽의 externalCount를 증가시켜야 적요 되는것임으로
			//head 쪽의 externalCount 이 바로 적용 되는것은 아니다
			// 그리고 만약 서로 같은 노드를 head, tail 이 두개가 같은 노드를 가리킨다 할지라도  externalCountRemaining 이 값을 통해 지워도 되는지를 
			// 한번더 판단하게 되어 push 작업이 마무리 되기 전 먼저 삭제 처리 하는 것을 막게 되기도 한다
			//IncreaseExternalCount 이 함수내에서 실제 적용 된것이 참조권을 갖는다 
			IncreaseExternalCount(_head, oldHead);

			Node* ptr = oldHead.ptr;

			//[dummy]
			//[head][tail]
			//쉽게 최초의 경우를 생각해보면 head와 tail 은 같은 dummy 를 가리키고 있음으로 
			//둘다 같은 곳을 가리킬때에는 삭제 처리 할것이 없다 이럴때는 리턴하고 종료처리
			if (ptr == _tail.load().ptr)	
			{
				ptr->ReleaseRef();
				return shared_ptr<T>();
			}


			//여기까지 실행 됐다면 참조권은 획득한 상태

			// 소유권 획득 (head = ptr->next)
			if (_head.compare_exchange_strong(oldHead, ptr->next))
			{
				//소유권이 획득 되면 리턴해서 pop 스레드 종료처리 해버림
				//여러 스레드가 돌아간다는 것을 상기하고자 하는 차원에서 적자면 => 어차피 pop 하는 거라 현대 스레드에서 종료 해도 
				//다른 스레드에서 pop 이 돌고 있다고해도 다른 다른 스레드의 pop 들은 이 if 문들을 들어오지 않는 이상 계속 돌고 있음

				//data 포인터를 얻어온다
				T* res = ptr->data.load(); // exchange(nullptr); 로 하면 버그 있음!
				FreeExternalCount(oldHead);	//externalCount 를 줄여는 함수
				return shared_ptr<T>(res);
			}

			ptr->ReleaseRef();
		}
	}

private:
	//head 삭제 나 ,tail  추가 되기 때문에 문제가 생길수 있는 부분을 방지하기 위해 atomic<...>& 으로 처리한다
	//참조 권을 획득하지 못하면 획득 할때까지 계속 루프를 돈다
	static void IncreaseExternalCount(atomic<CountedNodePtr>& counter, CountedNodePtr& oldCounter)
	{
		while (true)
		{
			CountedNodePtr newCounter = oldCounter;
			newCounter.externalCount++;			
			//1을 증가 시킨애를 막는 스레드가 참조권을 획득하게 되는것이다
			//성공 할때까지 계속 루프를 돌게  => 다른 스레드가 push 를 이미 했다면 현재 스레드는 실패하게 되고 
			//실패하면 _tail 을 와 oldTail 에 넣게 됨으로 둘은 같은 상태가 우선 된다음 다음 루프에서 참조권 획득을 시도할것이다 
			if (counter.compare_exchange_strong(oldCounter, newCounter))				
			{
				oldCounter.externalCount = newCounter.externalCount;
				break;
			}
		}
	}

	//externalCount 를 줄여는 함수
	static void FreeExternalCount(CountedNodePtr& oldNodePtr)
	{
		Node* ptr = oldNodePtr.ptr;
		const int32 countIncrease = oldNodePtr.externalCount - 2;			//참조권을 획득하면 externalCount 은 2 가 됨으로

		NodeCounter oldCounter = ptr->count.load();

		while (true)
		{
			NodeCounter newCounter = oldCounter;
			//push 쪽에서 데이터 사용하고 지나간 노드임으로 -1 시켜줌, 그럼 나머지 1이 남는데 그건 pop 할때 -1 을 한번 더 해주고 그렇다면
			//아래 if 문에서 삭제와 추가가 서로 겹쳐지는 문제를 externalCountRemaining 값이 0 이 아니라는 값을 판단 할수 있게 되고 externalCountRemaining ==0 이면 삭제 가능한 상태라는 걸 알 수 잇게 된다
			newCounter.externalCountRemaining--;			
			newCounter.internalCount += countIncrease;

			if (ptr->count.compare_exchange_strong(oldCounter, newCounter))
			{
				//newCounter.internalCount == 0 이 면 삭제가 되야 하는 상태 && externalCountRemaining == 0 추가 삭제 한번씩 거친 노드를 의미한다
				//externalCountRemaining 은 dummy 노드를  push 해서 사용하고 있는데 pop 스레드에서 해당 노드를 바로 삭제 해버리는 것을 막기 위함
				//즉 push 할때 push 작업이 완료 되기도 전에 pop이 실행 되어 _tail 을 삭제 하려고 하면 문제가 발생하기 때문
				if (newCounter.internalCount == 0 && newCounter.externalCountRemaining == 0)
					delete ptr;

				break;
			}
		}
	}

private:
	// [data][data][]
	// [head][tail]
	atomic<CountedNodePtr> _head;
	atomic<CountedNodePtr> _tail;
};

 

 

 

 

 

 

main 코드

LockFreeQueue<int32> q;

void Push()
{
	while (true)
	{
		int32 value = rand() % 100;
		q.Push(value);

		this_thread::sleep_for(1ms);
	}
}

void Pop()
{
	while (true)
	{
		auto data = q.TryPop();
		if (data != nullptr)
			cout << (*data) << endl;
	}
}


int main()
{
	thread t1(Push);
	thread t2(Pop);
	thread t3(Pop);

	t1.join();
	t2.join();
	t3.join();
}

 

 

결과 화면..

 

 

모든 경우의 수를 다 따져가져 이해하기 보다 전체적인 흐름을 이해해보도록 하고 구현대비 성능이 그리 뛰어나진 않다, 별 차이 없음 mutex 가 들어가는 것과

CAS 를 구현하려면 while 이 쓰여지게 되고 이러다보면 스래드들끼리 병합이 일어날때 대기를 계속 해야 하는건 마찬가지이기 때문..

 

 

 

 

반응형

 

 

c++ 의 atomic 은 두가지를 해결해 준다 원자성(명령어 쪼개져서 여러쓰래드의 간섭 방지), 가시성(캐쉬 연산 얽힘 방지)

=? 양자역학의 얽힘을 말하는 것이 아님 ㅋ

캐쉬 연산 얽힘 이란 용어는 있는것이 아니지만 뭔가 한단어로 표현할게 없을까 생각해봤을때 떠오르는 것이 이단어면 괜찮을것 같다는..

 

 Atomic 변수를 다른 스레드에서 같은 변수를 사용한다고 하면 한번에 하나만 실행 되는건 맞는데, 약간 병목현상이 발생 하게 됨


C++

std::atmoic

원자성 : 

atomic은 원자라는 의미를 가지며 원자적 연산(더 이상 쪼개질 수 없는 연산)을 진행한다. 

원자적 연산은 처리하는 중간에 다른 스레드가 끼어들 여지를 주지 않으며 전부 처리하거나 or 아무것도 하지 못했다. 이 두 가지 상황만이 존재하는 연산. 즉 한번에 일어나는 연산.

이것은 싱글 쓰레드 환경에서는 중요치 않지만, 멀티 쓰레드 환경에서는 중요한 개념.

 -> 둘 이상의 스레드가 공유 자원에 접근할 때 발생할 수 있는 문제를 경쟁 상태(Race condition)이라 한다.

 -> 경쟁 상태는 mutex와 같은 상호 배제 객체로도 해결이 가능하지만, atomic연산을 통해서도 위의 문제를 방지할 수 있다.

 

 -> 경쟁 상태가 문제가 되는 이유는 운영체제의 콘텍스트 스위칭(Context Swiching, 문맥 교환)때문.

 -> C++11이전에는 Interlocked(인터락) 함수를 사용하였음.

 -> std::mutex와 같이 c++11에 추가된 상호 배제 객체는 유저 모드 동기화. 내부적으로 원자적 연산(CAS)을 통해 처리된 결과를 반환.

 

가시성 :

또한 atomic연산은 메모리 가시성 문제또한 해결한다. 가시성이란 멀티 코어의 캐시(Cache) 간 불일치의 문제를 말하는데, 하나의 스레드에서 특정한 변수의 값을 수정하였는데, 다른 스레드에서 그 수정된 값을 제대로 읽어 들인다는 보장이 없다.

(참고로 C++의 volatile은 JAVA와는 달리 가시성 문제를 해결해주지 않는다. 단순히 컴파일러 최적화를 막기만 함)

CPU에서 메모리에 선언된 변수를 읽어 들일 때 최적화를 위해 해당 메모리와 주위의 메모리를 가져가 캐시에 저장한다.

 

 

std::atomic at; 은 이 걸 사용하는 명령어 한줄이 원자적으로 실행된다

at.load()

at.store()

 

이런 명령 외에 앞뒤로 명령어들이 atomic 이 아닌것들이 있다면 이들 사이의 명령어 순서의 보장은 보장하지 못한다

 

 

 


Java 

(참고로 C++의 volatile은 JAVA와는 달리 가시성 문제를 해결해주지 않는다. 단순히 컴파일러 최적화를 막기만 함)

CPU에서 메모리에 선언된 변수를 읽어 들일 때 최적화를 위해 해당 메모리와 주위의 메모리를 가져가 캐시에 저장한다.

 

C++ 과 votaile 이 하는 역할이 다르긴 하지만 개념 설명이 좋아서 덧붙인다

 

 

 

 


 

멀티 스레드를 다루는 과정의 기초가 되는 가시성과 원자성을 정의해볼 것

사실 가시성과 원자성이라고 하는 단어는 문제를 해결하기 위한 원칙이다.

바꿔 말해 Multi Thread를 구성하다보니, 비 가시성, 비 원자성 문제가 발생했고, Multi Thread를 문제없이 사용하려면 가시성과 원자성을 확보해야한다.

여기서 ~해야한다는 비단 개발자의 노력만으로 이루어질 수 없음을 말한다. (CPU, OS, JVM 등이 지원해야한다.)

참고로 가시성과 원자성은 여러 Thread가 동시에 접근 가능한 공유 변수에 대한 이야기다.

공유변수에 대한 Thread간 경합 (race condition)

Thread 내에서 생성된 변수 또는 로직적으로 잘 격리된 멤버변수 접근 (비경합)

 

 

 

 

프롤로그

가시성과 이번에 다루려는 원자성은 Multi Thread 환경에서 Thread간 공유 메모리 이슈를 발생시킨다는 점에서 공통분모를 가지고 있고, 서로간의 상호작용을 잘 염두에 두어야 한다는 것은 사실이다.

하지만 시스템 관점에서 보면 이 두 개념은 조금 다른 곳에 존재한다.

  • 가시성 : CPU - Cache - Memory 관계상의 개념.
  • 원자성 : 한줄의 프로그램 statement(문장)가 컴파일러에 의해 기계어로 변경되면서, 이를 기계가 순차적으로 처리하기 위한 여러 개의 machine instruction이 만들어져 실행되기 때문에 일어나는 현상을 설명하는 용어.

 

 

 

원자성이 뭔가 간단히 알고 먼저 가시성을 보도록 한다

원자단위 연산의 이해

원자성 즉, 연산의 원자 단위를 이해하기 위해, 이전에 다루었던 i++를 원자 연산으로 분해해보도록 하겠다.

프로그램 언어 문장 -> machine instruction 변환

i++이 위 그림과 같이 캐싱을 배제하더라도 읽고(READ) > 연산하고(MODIFY) > 저장(WRITE)하는 총 3가지의 instruction이 수행된다.

이 원자단위의 연산을, 수행중에는 다른 Thread에 의해서 컨트롤되는 타 CPU의 개입이 있을 수 없는 최소 단위의 연산이라고 이해하면 된다.

 

 

 

 

가시성(Visibility)

가시성, 그러니까 잘 보임의 대상은 무엇일까?

위에서 이미 거론했지만, 우리는 공유하는 변수에 대해 다루고 있다.

이 변수가 HW에서는 Main Memoruy에 적재된다고 알고있다. 맞다. 헌데 int i = 10;이렇게 하면 그냥 메모리에 딱 하고 들어가면 좋겠는데, 성능이니 뭐니 하면서 시스템은 아주 복잡한 설계를 해놨다.

Thread는 동작하는 시점에 하나의 CPU(요즘에는 core라고 해야 맞다)를 점유하고 동작을 한다.

선언한 변수의 값이 Memory에만 존재하는 것이 아니라, CPU cahce라고 하는 영역에도 존재한다. 이는 CPU가 Memory에서 값을 읽어들여오고 다시 쓰고 하는 시간을 아끼기 위함이다. 더 큰 문제는 CPU cache에 값이 Memory에 언제 옮겨갈지도 모른다는 것이다. 이를 해결하는 것을 가시성이라고 한다.

각 Thread가 각기 바라보는 CPU의 cache

CPU1은 7까지 증가했으나, CPU2의 cache는 아직도 0으로 알고있다.

 

가시성과 원자성 두가지를 이해하면 Multi Thread 프로그램을 작성할 때 무엇을 주의해야하는지 명확해진다.

또 다른 표현으로 설명하자면 단일 Thread 프로그램에서는 가시성과 원자성을 고려하지 않아도 프로그램 작성하는데 문제가 없다.

그렇다고 가시성과 원자성이 Multi Thread를 할 때만 나타나는 개념은 아니라는 점은 명확히 밝혀둔다. 이는 원래 하드웨어를 설계한 사람들에 의해 만들어진 컴퓨터 내의 구조에 관한 이야기이다.

비 가시성 (가시성 이슈)

각기 다른 Thread 2개는 CPU1과 CPU2를 할당 받아 공유자원에 해당하는 변수를 연산한다. 이때 메인 메모리에서 값을 읽어들여서 연산에 사용하는 것이 아니라, CPU에 존재하는 Cache에 옮겨놓고 연산을 한다.

그 사이에 다른 쓰레드에서 같은 변수를 대상으로 연산을 한다. 이때 Thread는 타 Thread가 사용하고 있는 CPU의 Cache값을 알지 못한다. 언제 Cache의 값이 메인 메모리로 쓰여질지도 모른다. 그래서 아래 코드를 돌려보면 각각의 Thread가 100회씩 변수를 증가연산을 실시했는데, 200에 훨씬 못미치는 103~105쯤이 나타난다. (이 실험 결과는 하드웨어 성능에 따라 상이할 수 있다.)

만약 동시에 시작하는 Thread가 아닌 순차처리라면 200이 나와야하는 현상이다.

Cache에 담아서 연산을 하더라도 바로바로 메모리에 적용했으면 200이 아니더라도 180~190 정도는 나올 것 같은데 너무나도 100에 가까운 결과에 놀라는 독자도 있을 것이다.

 

 

 

Java  - Volatile을 이용한 가시성 확보

visibility(가시성)에 이어 volatile(변덕스러운) 용어가 나왔다.

가시성은 이제 좀 와닿는데 아직도 왜 volatile이라는 단어를 사용하는지 이해를 못한다고 원글 쓰신분은 말씀하신다.

암튼 Java에서 위에서 설명한 비 가시성 이슈를 해결하기 위해 Java 1.4부터 volatile을 지원하기 시작했다.

변수를 선언할 때 해당 단어를 앞에 써주기만 하면 된다. 이렇게만해도 테스트 코드가 "거의" 200을 반환한다. (사실 이 결과는 미신같은 이야기다. 정확히 이야기하면 200이거나 200보다 작은 값을 반환한다.)

원리는 이렇다. volatile로 선언된 변수를 CPU에서 연산을 하면 바로 메모리에 쓴다. (Cache에서 메모리로 값이 이동하는 것을 다른 책이나 문서에서는 flush라고 표현한다.)

그러니 운이 좋게 Thread 두개가 주거니 받거니 하면서 증가를 시키면 200에 가까운 결과를 얻어내는 것이다.

하지만 개발자라면 이 미신같은 결과에 흡족해하면 안된다.

원글 작성자의 컴퓨터 기준으로 각 Thread당 100회가 아닌 1000회 정도 연산을 시키면 2000이 아닌 1998같은 결과를 얻어낸다.

이 이야기인 즉, 가시성이 확보되더라도 원자성 문제 (동시에 같은 값을 읽어다 증가시키고 flush하는...)로 인해 이와 같은 문제가 생긴다.

 

 

 

 

 

 

 

 

가시성을 확보하더라도 원자성이 문제되는 경우

 

 

 

 

 

 

 

원자성 (Atomicity)

가시성이라는 용어도 그렇지만, 원자성이라는 이 단어 역시 상당히 은유적이다.

가시성 역시 메모리 가시성이라고 하면 좀더 쉽게 와닿을 것이고, 원자성 역시 연산의 원자성이라고 하면 좀 더 이해가 쉽다.

관련 도서나 자료에서는 이를 연산의 단일성 혹은 단일 연산이라고 부르기도 한다.

공유되는 변수를 변경할 때, 기존의 값을 기반으로 하여 새로운 값이 결정되는 과정에서 여러 Thread가 이를 동시에 수행할 때 생기는 이슈를 부르는 명칭이기도 하다.

다수의 책이나 자료에서 다루는 예가 i++연산이다.

자연어 입장에서는 하나의 문장이지만, 이를 CPU가 수행하기 위해서는 총 3가지 instruction이 발생한다.

  • i의 기존 값을 읽는다. (READ)
  • i에 1을 더한다. (MODIFY)
  • i의 값을 변수에 할당한다. (WRITE)

이를 두개 Thread가 동시에 100회 수행한다고 했을때, 만약 i++이 원자성을 가지고 있는 연산이라고 하면 결과가 200이어야 하겠지만, 실제로는 200보다 작은 값이 도출된다.

원인은 다시한번 이야기하지만, i++이 3개의 instruction(READ-MODITY-WRITE)로 구성되어있기 때문이다. Thread-1이 값을 읽어 i+1을 하기 직전에 Thread-2가 i를 읽어 i+1을 수행하고 반영하는 동작을 수행한다면 후자의 연산은 무효가 되는 현상이 발생한다.

가시성 문제를 해결하더라도 원자성이 확보되지 못해 원치 않는 결과가 도출

 

 

원자단위 연산의 이해

원자성 즉, 연산의 원자 단위를 이해하기 위해, 이전에 다루었던 i++를 원자 연산으로 분해해보도록 하겠다.

프로그램 언어 문장 -> machine instruction 변환

i++이 위 그림과 같이 캐싱을 배제하더라도 읽고(READ) > 연산하고(MODIFY) > 저장(WRITE)하는 총 3가지의 instruction이 수행된다.

이 원자단위의 연산을, 수행중에는 다른 Thread에 의해서 컨트롤되는 타 CPU의 개입이 있을 수 없는 최소 단위의 연산이라고 이해하면 된다.

여기에 Multi Thread를 개입시켜보자. 각 instruction이 수행되는 사이에는 다른 Thread가 공유 메모리(변수)에 접근이 가능하여, 소위 값이 꼬이는 현상이 생기게 된다. 이러한 점을 예방하는 방법을 알아보자.

 

Java 동기화 처리를 통한 Thread 안정성 확보

하나의 원자 연산이 이루어지는 동안, 이 연산은 다른 Thread - CPU에 의해 간섭받을 수 없다. 이것은 시스템이 이렇게 구현되어있는 것이다.

헌데, 우리가 다루고자하는 연산은 원자단위로 하기에는 너무 복잡하다. 각종 조건, 반복, 보조장치로부터의 데이터 읽기와 쓰기 등이 복합적으로 이루어지는 연산이 섞여있다. 이러한 복잡한 연산을 i++도 한번에 못하는 시스템한테 한번에 시킨다는 것은 불가능하다. (시스템의 발달로 i++를 원자단위로 할 방법은 있다.)

개발자가 이를 쉽게 해결할 수 있는 방법이 바로 임계영역(Critical Section) 지정이다. 동시에 처리되면 문제가 되어 상호 배타적인(Mutual Exclusion) 영역을 설정하는 것이다. 여기서 말하는 영역은 공간적인 영역이 아닌 statement(명령문)의 블럭이다.

 

 

 

 

 

 

ref : https://velog.io/@syleemk/Java-Concurrent-Programming-%EA%B0%80%EC%8B%9C%EC%84%B1%EA%B3%BC-%EC%9B%90%EC%9E%90%EC%84%B1

ref : https://marmelo12.tistory.com/320

반응형

lock free 에 대한 이해가 부족하다면

지난 lockFree 1,2 를 참고해야 이 글을 볼 수 있습니다

 

https://3dmpengines.tistory.com/2210?category=511467 

 

LockFree - Stack (1)

경우의 수는 이것보다 훨씬 많을 수 있지만 몇개만 추려보았고 크래쉬기 나지 않고 Lock free를 시도 하려는 중간 과정이라 보면 되겠다 스택이기 때문에 끝에서 추가되고 삭제 된다는것과 다른

3dmpengines.tistory.com

 

https://3dmpengines.tistory.com/2212?category=511467 

 

LockFree - Stack (2) - 삭제 처리

삭제의 핵심은 분리를 미리하고 분리한 대상을 현재 스레드의 스택 메모리에 삭제할 주소를 지역변수로 들고 있다가 다시 자신의 스레드로 돌아왔을때 분리된 삭제할 메모리를 그때서야 삭제

3dmpengines.tistory.com

 

 

 

void push(const T& value)

 

push 로직에선 기존과 크게 다를건 없고 

전반적으로 shared_ptr 로 Node 를 관리하는것이 아니고 Data 부분만 shaed_ptr 로 구현하고 

Node 자체는 CountedNodePtr 구조체로 래핑 시켜서 노드를 참조(비교하자면 shared_ptr 의 ref count 부분)하고 있는 참조카운팅 externalCount 변수로 참조 횟수를 관리한다

그래서 생성시(push) externalCount  = 1 로 만든다

 

 

 

이후 tryPop 이 실행 되면서 해당 노드에 접근할때 IncreaseHeadCount 이 함수로

접근 가능한 상태에 대한 참조권 획득을(+1, externalCount 에 +1) 을 하게 된다

IncreaseHeadCount 는 while(true) 으로 되어 있기 때문에

다른스레드가 개입 되더라도 현재 스레드가 참조 권을 획득 할때까지 무한히 루프를 돌면서 대기하게 된다

=> 즉 참조권을 얻어야 진행 가능하다는 얘기

 

 

 

if (_head.compare_exchange_strong(oldHead, ptr->next))

를 통해 참조권을 획득한 (externalCount ==2 인 oldHead)와 _head 가 같은 스레드며 

현재 oldHead 가 참조권을 획득한 것이고 if 문 안으로 들어간다

 

여기서 리턴할 데이터를 미리 빼놓고 

 

 

그다음 삭제가 남았는데
if (_head.compare_exchange_strong(oldHead, ptr->next))  

 

삭제시 고려해봐야 하는 점은 이 구문을 들어오는게 하나의 스레드가 아닌

여러개의 스레드가 들어 올 수도 있다


그래서 삭제처리 또한 다른 스레드가 삭제를 시도하려는지를 살펴보고 삭제를 해야 해서

internalCount를 통해 최종 마지막에 접근하게 되어 internalCount 이 0 이 되는 스레드에서 

해당 노드를 삭제 처리를 해줘야 한다 => shared_ptr 처럼 (이렇게 하려는 것이 의도 임으로)

 

이에 대한 구현은 아래 와 같다

즉 externalCount  를 통해 여러 스레드가 해당 노드에 얼마나 접근 하려고 하는지를 체크하는 것이고

internalCount 를 통해 실제 노드를 삭제해도 되는지 를 보려는 것이다 shared_ptr 의 ref count 처럼 동작하게 하기 위해서

 

 

 

	struct CountedNodePtr
	{
		int32 externalCount = 0;				//Node 포인터 참조 횟수
		Node* ptr = nullptr;					//Node 포인터
	};

	struct Node 
	{
		Node(const T& value) : data(make_shared<T>(value)){}
		shared_ptr<T> data;
		atomic<int32> internalCount = 0;
		CountedNodePtr next;
	};

 

 

#pragma once
#include <mutex>



template<typename T>
class LockFreeStack
{
	struct Node;
	//이렇게 그냥 쓰면 lock free 라 안될 수도 있지만 externalCount 를 16 bit, ptr 을 48 bit 등으로 쪼개어 64 비트 맞출 수도 있다 => 이렇게 되면 atomic 조건이 됨
	struct CountedNodePtr
	{
		int32 externalCount = 0;				//Node 포인터 참조 횟수
		Node* ptr = nullptr;					//Node 포인터
	};

	struct Node 
	{
		Node(const T& value) : data(make_shared<T>(value)){}
		shared_ptr<T> data;
		atomic<int32> internalCount = 0;
		CountedNodePtr next;
	};

public :

	//node 추가는 head 바로 다음에 새로운 노드가 추가 되는 방식이다
	void push(const T& value)
	{
		CountedNodePtr  node;
		node.ptr = new Node(value);
		node.externalCount = 1;

		node.ptr->next = _head;
		while (_head.compare_exchange_weak(node.ptr->next, node) == false)
		{

		}
	}

	shared_ptr<T> tryPop()
	{
		
		CountedNodePtr oldHead = _head;

		//참조권을 획득해야 함
		while (true)
		{
			//oldHead 의 externalCount 가 1 증가 된다
			//이게 실행 되고 나면 최소 externalCount >= 2 상태가 됨으로 삭제는 바로 할 수 없는 즉 안전하게 접근 할수 있는 상태가 된것이다
			IncreaseHeadCount(oldHead);		


			//여기에서 참조권을 얻오온것은 맞는데 다른 스레드도 동시에 실행 됐다면 
			//다른 스레드도 externalCount 를 증가 시킨체 참조권을 마찬가지로 획득한 상태가 될 수 있다
			Node* ptr = oldHead.ptr;	

			//데이터 없음
			if (ptr == nullptr)
			{
				return shared_ptr<T>();
			}


			//그래서 externalCount 가 또 다른 스레드에 의해서 증가 된상태일 수 있음으로 소유권까지 획득해야 정확해진다
			//소유권 획득(ptr->next 로 head를 바꿔치기 한 애가 이김)

			//만약 이게(compare_exchange_strong) 참이라면 소유권을 획득했다는 것이고 아까 꺼내온 oldHead과 _head의 externalCount 까지 일치한다는 얘기가 되는데
			//만약 다른 스레드가 나중에 externalCount를 증가 시켰다면 현 스레드(지금 비교 하려는)의 _head와 oldHead 의 externalCount 
			// 는 같지 않기 떄문에 소유권을 획득하지 못한다는 것
			if (_head.compare_exchange_strong(oldHead, ptr->next))
			{
				//ptr를 삭제할 것임으로 ptr 의 다음노드가 head 가 되게 처리한다음

				//여기에 들어오면 소유권을 획득한 것임으로 
				shared_ptr<T> res;
				res.swap(ptr->data);
                
                //경우의 수 .1
				//external : 1 -> 2(+1)  => 최초 기본적인 상황에선 external 이 2가 됨
				//internal : 0				=> 최초에는 internal 은 0 이됨
				//그렇다면 삭제가 된다

		//경우의 수 .2
				//만약 이곳(if문)에 들어오기 전에 참조권이 여러명에게 있었다고 가정해보자 
				//현재 스레드외에 두개의 스레드가 더 있어서 참조권을 다른 스레드들에 의해서 2가 증가 됐다고 가정
				//external : 1 -> 2(+1)  -> 4(나 +2, 다른 스레드 +2)

				//이 상황에선 countIncrease == 2가 되고 아직 internalCount fetch_add 이 실행 되기 이전엔 0 임으로 
				//0 == -2 같지 않아서 삭제처리를 못하고
				//그 다음에 바로 internalCount == 2 가 된다 , external == 4
				//이후 다른 스레드에서 봤을때 if (_head.compare_exchange_strong(oldHead, ptr->next)) 은 실패가 됨으로 => 왜냐면 여기에 먼저 들어온 쓰레드가 _head를 다음 노드인 ptr->next 로 바꿨기 때문에
				//여기로 들어온 스레드가 internalCount 를 2 증가 시켰음으로 실패한 스레드에서 1씩 감소 시키면서 (동일한 ptr 주소까지 들고 있는 곳까지 모두 실행했었다는 가정하에 : 스택변수임으로 그당시에 들고 있는 주소를 들고 있음)
				//internalCount를 최종적으로 0으로 (정확히는 0이 되기직전의 1) 만드는 스레드에서 ptr 을제거처리한다



                //아래는 어떤 경우의 수든 통용되는 설명
				//지금 삭제 하려고 하는것 말고 다른애들이 삭제하려고 하는 shared_ptr 을 가리키고 있을수 있기 때문에 삭제를 시도하는 처리가 있어야 한다
				//참조하는 것이 나 외에 다른 애들이 있는가?
				const int32 countIncrease = oldHead.externalCount - 2;		//현재 스레드로 인해서 externalCount 가 2 만큼 증가했으니 그 수만큼 빼줘서 0 이면 다른곳에서도 참조하고 있는게 없는 것임

				//나밖에 없다면 countIncrease == 0 이 된다
				//그런데 fetch_add 는 더하기 이전의 결과를 리턴한다 => 이때도 최초에는 0 인건 마찬가지
				//countIncrease 를 internalCount 에 더하는 이유는 현재 스레드 말고 다른스레드에서 자원을 참조하고 있다면 해당 레퍼런스 만큼 더해주기 위해서 
                //즉 현재 스레드에서 처리를 지연시킴 (shared_ptr 의 ref count 처럼)
				if (ptr->internalCount.fetch_add(countIncrease) == -countIncrease) 
				{
					//삭제 예정이였던 노드 삭제 처리
					delete ptr;
				}

				return res;
			}
			//_head과 oldHead 이 같지 않다면   ptr->internalCount 카운트를 1 감소시킴
			// 즉 _head==oldHead 가 같은 쓰레드 이외의 다른 스레드에서 실행 되는 거면  소유권 카운트를 1 감소 시킨다
			//fetch_sub 리턴은 감소하기 전 값을 리턴한다
			else if(ptr->internalCount.fetch_sub(1)==1)
			{
				//internalCount 이 0 이 됐다는 건 더이상 참조하고 있는것이 없기 때문에 삭제처리 한다
				//참조권은 얻었으나, 소유권은 실패 
				delete ptr;
			}

		}

	}

	//head 의 externalCount를 1 증가 시키려는 목적
	void IncreaseHeadCount(CountedNodePtr& oldCounter)
	{
		while (true)
		{
			CountedNodePtr newCounter = oldCounter;
			newCounter.externalCount++;

			//newCounter.externalCount++  1 증가 한것을 _head = newCounter.externalCount; 이렇게 바로 넣어 버리는 것은 위험한데, 
			//이 구간에서 병합이 일어날 수 있기 때문이다 if .. else...

			//IncreaseHeadCount 이 함수 안에 들어왔다는 것은 어쨋든 _head의 externalCount 를 1 증가 시키려는 것이 목적이니
			//현재 _head 와 이전 head즉 oldCounter 이 같다면(=다른 스레드가 영향을 주지 않아 변화가 없다면) 
			//1 증가시킨 것(newCounter)을 _head에 넣어 _head를 증가 시켜라 라는 것임
			//즉 중간에 스레드 개입이 있어 이미 _head 가 증가 되어 있다면 _head 가 증가 됐음을 oldCounter 에게 알리고 
			//oldCounter 를 현재 _head.externalCount 로 맞춘다 
			//그리고
			//compare_exchange_strong 가 실패하면 어쨌든 다른 스레드에서 externalCount 를 1 증가 시킨 상태임으로 이 카운트를 맞춰 주기 위해서
			//compare_exchange_strong 가 false 일때 oldCounter = _head ; 이 내부적으로 실행되어 현재 head 의  externalCount 를 맞추게 된다, 그리고 while 을 통해 다시 증가 시도
			if (_head.compare_exchange_strong(oldCounter, newCounter))	
			{
				//여기 까지 성공하면 oldHead는 참조권을 획득한 상태가 되는것
				//위 함수가 성공 하면 oldCounter 쪽의 externalCount 도 변해야 함으로 같이 변경해준다
				oldCounter.externalCount = newCounter.externalCount;
				break;
			}

		}
	}

private :
	atomic<CountedNodePtr> _head;

	atomic<uint32> _popCount = 0;	//pop 을 실행중인 스레드 개수
	atomic<Node*> _pendingList;		//삭제 되어야 할 노드들(첫번째 노드)
	
	
};

 

 

 

 

 

 

실행 main 함수 로직

LockFreeStack<int> s;

void push()
{
	int i = 0;
	while (true)
	{
		s.push(i++);
		this_thread::sleep_for(10ms);
	}
}

void pop()
{
	while (true)
	{
		auto popData = s.tryPop();
 		if (popData != nullptr)
 		{
 			cout << *popData << endl;
 		}
	}
}

 

 

 

 

경우의 수는 간단하게 만 보면

 

경우의 수 1.

최초 실행 될때그리고 바로 삭제 될때

 

external : 1 -> 2(+1)  => 최초 기본적인 상황에선 external 이 2가 됨
internal : 0 => 최초에는 internal 은 0 이됨

 

생성시 external 은 1이 되고 IncreaseHeadCount 를 통해 external  은 2가 된다


const int32 countIncrease = oldHead.externalCount - 2;

현재 스레드로 인해서 externalCount 가 2 만큼 증가했으니

그 수만큼 빼줘서 0 이면 다른곳에서도 참조하고 있는게 없는 것임

countIncrease  == 0이 된다

 

나밖에 없다면 countIncrease == 0 이 되는데

이후 if (ptr->internalCount.fetch_add(countIncrease) == -countIncrease) 

fetch_add 는 더하기 이전의 결과를 리턴한다 => 이때도 최초에는 internal  0 이였음으로 

 

이 if 문은 참이 되어 삭제 처리를 하게 된다

if (ptr->internalCount.fetch_add(countIncrease) == -countIncrease) 
{
    //삭제 예정이였던 노드 삭제 처리
delete ptr;
}

 

 

 

경우의 수 2.

 

만약 이곳(if문)에 들어오기 전에 참조권이 여러명에게 있었다고 가정해보자 
현재 스레드외에 두개의 스레드가 더 있어서 참조권을 다른 스레드들에 의해서 2가 증가 됐다고 가정
external : 1 -> 2(+1)  -> 4(나 +2, 다른 스레드 +2)

 

 

이 상황에선 countIncrease == 2가 되고 아직 internalCount fetch_add 이 실행 되기 이전엔 0 임으로 
0 == -2 같지 않아서 삭제처리를 못하고
그 다음에 바로 internalCount == 2 가 된다 , external == 4

 

이후 다른 스레드에서 봤을때

if (_head.compare_exchange_strong(oldHead, ptr->next)) 은 실패가 됨으로

=> 왜냐면 여기에 먼저 들어온 쓰레드가 _head를 다음 노드인 ptr->next 로 바꿨기 때문에


여기로 들어온 스레드가 internalCount 를 2 증가 시켰음으로 실패한 스레드에서 1씩 감소 시키게 된다

(동일한 ptr 주소까지 들고 있는 곳까지 모두 실행했었다는 가정하에 : 스택변수임으로 그당시에 들고 있는 주소를 들고 있음)


결국 다른 스레드들에 의해서 internalCount를 최종적으로 0으로 (정확히는 0이 되기직전의 1) 만드는 스레드에서 ptr 을제거처리한다

 

 

실행 결과는 아래와 같으며

 

 

push 스레드에서 this_thread::sleep_for(10ms); 로 추가 되는 시간을 지연시켜놔서 메모리 또한 얼추 일정 사태를 유지한다

 

만약 sleep_for 를 걸지 않으면 추가 되는것이 삭제 되는것보다 많기 때문에 (삭제시 병합 과정때문에 느려짐으로..)

아래 처럼 메모리가 치솟는 것을 볼 수 있다

 

 

 lock-free 가 이름만 들어선 좋아 보이지만 lock 만 쓰지 않다 뿐이지 병합이 일어나는 구간에선

여전히 cpu 를 갉아 먹으면서 대기해야 하는 부분은 여전하고 lock-free 특성상 해당 로직이 정상적으로 실행 되려면 이전에 실행했던것이 실행 되지 않고 처음부터 다시 실행하는 상황이 발생하는 경우가 대부분이라 함

 

위으 코드 중  tryPop 코드를보면 tryPop 함수내부가 전체 while(true ) 로 감싸져 있는 것을 볼 수 있다

즉 실행되지 못한다면 계속 대기 하면서 실행 가능 할때까지 처음부터다시 실행해야 한다는 것..

 

 

 

또한 사람의 장시간의 이해가 필요로 하는 부분이기 때문에 (중간 중간 변칙적인 부분들) 사람이 모든걸 예측해서 구현하기는 투여 시간대비 효율이 높다고 하긴 어려운 부분이 있다

 

성능이 앞도적으로 높은것도 아닌 얼추 엇비슷하다고 한다

 

그래서 lock-free 를 쓰는것이 긍정적으로 평가 되진않는다고 함 => 연구들은 계속 진행 중..

반응형
#include <iostream>
#include <mutex>
#include <queue>
#include <windows.h>

using namespace std;

mutex m;
queue<__int32> q;
HANDLE handle;

void producer()
{
	while (true)
	{
		{
			unique_lock<mutex> lock(m);
			q.push(100);
			
		}
		::SetEvent(handle);
		this_thread::sleep_for(1000ms);
	}
}

void consumer()
{
	while (true)
	{
		::WaitForSingleObject(handle, INFINITE);

		cout << "consu" << endl;
		/*
		unique_lock<mutex> lock(m);
		if (q.empty() == false)
		{
			__int32 data = q.front();
			q.pop();
			cout << q.size() << endl;
		}
		*/
	}
}

int main()
{
	handle = ::CreateEvent(NULL, FALSE, FALSE, NULL);

	thread t1(producer);
	thread t2(consumer);

	t1.join();
	t2.join();

	return 0;
}

 

위 코드는 1초마다  "consu" 가 뜨는 로직이다 

여기서 주목할 것은 ::SetEvent(handle); 를 통해서 ::WaitForSingleObject(handle, INFINITE); 이것 또한 깨어난다는 것이다

 

 

 

그리고 다음로직을 보면 q 에 추가된 개수가 1개 추가되고 1개 소비하는 것이 아닌  계속 증가하는 것을 볼수 있다

void producer()
{
	while (true)
	{
		{
			unique_lock<mutex> lock(m);
			q.push(100);
		}
		::SetEvent(handle);
		//this_thread::sleep_for(1000ms);
	}
}

void consumer()
{
	while (true)
	{
		::WaitForSingleObject(handle, INFINITE);
		unique_lock<mutex> lock(m);
		if (q.empty() == false)
		{
			__int32 data = q.front();
			q.pop();
			cout << q.size() << endl;
		}
		
	}
}

 

문제는 

::WaitForSingleObject(handle, INFINITE);

----- 절취 ------
unique_lock<mutex> lock(m);

 

----- 절취 ------ 사이에 producer 스레드가 돌아가서 lock 을 먼저 잡을수 있고 q 에 데이터를 먼저 추가 할수 있기 때문에 개수가 증가하는 문제가 발생하는 것 

 

 

 

 

bool expected = false;
bool desired = true;
flag.compare_exchange_weak(expected, desired);

compare_exchange_weak 동작은 compare_exchange_strong 과 같은데 spruious failure 가 발생 할 수 있다
즉 이 함수 내(compare_exchange_weak )에서 다른 스레드의 Interruption 을 받아서 값 변경이 중간에 실패 할 수 있다

비교 당시 flag. 와 expected 가  같았지만 내부 처리상황으로 인해(spurious failure) return false 가 리턴 될 수 있다

 

 

그래서 이때 wait 함수 안에 lock 을 소유하고 있는지 한번더 판단해주는 동작이 들어가 있어서 이걸 lock 이 다른 스레드에 의해서 소유됐는지 아닌지를 판단 할 수 가 있게 된다

compare_exchange_weak 이게 기본적인 형태인데 만약에 이렇게 실패가 일어나면

프로그래머가 작성해 놓은 while에 compare_exchange_strong 를 놓아서 계속 돌려

성공할때까지 시도할 수 있다 이때 쓰는 것이 compare_exchange_strong 이 된다 

 

compare_exchange_weak 도 사용할떄는 while 루프를 사용해서 처리하는건 마찬가지이다

 

그런데 compare_exchange_strong 그래서 이게 좀 더 부하가 있을 수 있다곤 하지만

성능 차이가 엄청 크게 차이 나진 않는다

 


 

Spurious wakeup(가짜 기상) 

//condition_variable 은 커널오브젝트인 event 와 유사한것 같지만
//유저모드 레벨 오브젝라는 점이 다르다 즉, 동일한 프로그램 내에서만 사용 가능하다
//표준 mutex 와 짝지어 사용 가능하다
condition_variable cv;

 

q가 비어 있으면 결과는 (true == false) => false  인데 wait(false) 로 들어오면 lock 풀고 대기 한다
q가 비어 있지 않으면 결과는 (false == false) => true wait(true) 로 들어오면 빠져나와서 다음 코드를 진행한다

wait 에 false 가 들어가면 lock풀고 대기
wait 에 true 가 들어가면 다음 코드 진행

q가 비어 있다면 => (true==false) => false , wait(false) =>결과적으로 lock 풀고 대기 한다
q가 차 있다면 => (false==false) => true , wait(true) => 결과적으로 다음 실행해서 데이터 꺼내서 출력

 

 

 

unique_lock<mutex> lock(m); //현재 스레드가 이구문을 먼저 들어오게 되어 lock을 하게 되면 다른 스레드는 풀릴때까지 대기
//이 라인에선 lock 이 잡힌상태

cv.wait(lock, []() { return q.empty() == false; });

wait 에 대한 케이스를 보면 다음과 같다
case 1 : 결과적으로 return 하는 값이 true 이면 현재 스레드 대기 즉 락을 풀고 현재 스레드는 멈춘다
case 2 : 결과적으로 return 이 false 를 리턴하면 위에서 잡혀 있던 lock 을 그대로 유지한체 아래 구문을 계속 수행한다

그런데 case 1 : 에서 결과적으로 return 하는 값이 true 일때 락을 풀고 현재 스레드는 대기 했었으니 
위에서 notice_one 을 하게 되면 case 1: 일때 로직이 아직 wait( 이있는 로직 라인이 실행순서임으로 wait 라인을 통과 할지 말지 즉 다시 lock 을 잡을지 아니면 다시 lock 을 풀고 현재 스레드를 대기 시킬지를 결정하게 된다

가짜기상의 가능성 : 위에서 cv.notify_one(); 를 했으면 무조건 조건식이 만족하는것이 아닌가? 라는 생각을 할수 있지만
cv.notify_one(); 가 실행되고 난 이후 cv.wait(lock, []() { return q.empty() == false; }); 이 구문 안에서 lock 을 소유하기 전에
다른 스레드에서 데이터를 q 에서 꺼낼 수가 있기 때문에 이럴땐 cv.notify_one(); 한다고 해서
항상 []() { return q.empty() == false; } 이 조건을 만족하게 되진 않는다는 것이 가짜기상이라 한다
그래서 notify_one 할때 즉 가짜 기상에 의해서 무조건 wait 구문에서 lock을 잡게 되는 것이 아니라는 것이다
=> 못잡게 될 가능성도 생각해야 한다

 

그래서 이 가짜 기상을 방지하기 위해서 wait 에다가 조건도(람다) 같이 넣어줘서

가짜 기상이일어나 다른 곳에서 q 의 데이터를 먼저 다 제거 한후라면 cunsumer 에서의 wait 은 실행되면 안됨으로 조건을 추가하여 다시 스레드가 대기하도록 하여 가짜 기상을 방지해 해놓은것

 

정리하자면 : 꺠어나긴 했는데 다른 스레드에 의해 데이터가 삭제되서 현재 스레드가 실행되지 않고(가짜기상) 조건문에 의해서 또 다시 처리를 하게 되는 것 또한  wait 조건문 이 자체가 가짜 기상에 대한 오류를 방지하는 연결상황이 된다

 

mutex m;
queue<__int32> q;
HANDLE handle;
condition_variable cv;

void producer()
{
	while (true)
	{
		{
			unique_lock<mutex> lock(m);
			q.push(100);
		}

		//wait 중인 스레드가 있다면 1개만 깨운다
		cv.notify_one();
	}
}

void consumer()
{
	while (true)
	{
		unique_lock<mutex> lock(m);		//현재 스레드가 이구문을 먼저 들어오게 되어 lock을 하게 되면 다른 스레드는 풀릴때까지 대기
		//이 라인에선 lock 이 잡힌상태

		cv.wait(lock, []() { return q.empty() == false; });		

		if (q.empty() == false)
		{
			__int32 data = q.front();
			q.pop();
			cout << q.size() << endl;
		}
	}
}

int main()
{
	handle = ::CreateEvent(NULL, FALSE, FALSE, NULL);

	thread t1(producer);
	thread t2(consumer);

	t1.join();
	t2.join();

	return 0;
}

 

 

 

 

https://3dmpengines.tistory.com/2187

 

condition_variable 예제 (Produce, Consumer)

condition_variable 은 전통적인 CreateEvent, SetEvet 를 대체하는 방식으로 볼 수 있다 condition_variable 은 멀티 플랫폼에서도 작동하는 표준으로 들어가 있다 condition_variable 은 커널오브젝트인 event..

3dmpengines.tistory.com

 

반응형

+ Recent posts