반응형

lock free 에 대한 이해가 부족하다면

지난 lockFree 1,2 를 참고해야 이 글을 볼 수 있습니다

 

https://3dmpengines.tistory.com/2210?category=511467 

 

LockFree - Stack (1)

경우의 수는 이것보다 훨씬 많을 수 있지만 몇개만 추려보았고 크래쉬기 나지 않고 Lock free를 시도 하려는 중간 과정이라 보면 되겠다 스택이기 때문에 끝에서 추가되고 삭제 된다는것과 다른

3dmpengines.tistory.com

 

https://3dmpengines.tistory.com/2212?category=511467 

 

LockFree - Stack (2) - 삭제 처리

삭제의 핵심은 분리를 미리하고 분리한 대상을 현재 스레드의 스택 메모리에 삭제할 주소를 지역변수로 들고 있다가 다시 자신의 스레드로 돌아왔을때 분리된 삭제할 메모리를 그때서야 삭제

3dmpengines.tistory.com

 

 

 

void push(const T& value)

 

push 로직에선 기존과 크게 다를건 없고 

전반적으로 shared_ptr 로 Node 를 관리하는것이 아니고 Data 부분만 shaed_ptr 로 구현하고 

Node 자체는 CountedNodePtr 구조체로 래핑 시켜서 노드를 참조(비교하자면 shared_ptr 의 ref count 부분)하고 있는 참조카운팅 externalCount 변수로 참조 횟수를 관리한다

그래서 생성시(push) externalCount  = 1 로 만든다

 

 

 

이후 tryPop 이 실행 되면서 해당 노드에 접근할때 IncreaseHeadCount 이 함수로

접근 가능한 상태에 대한 참조권 획득을(+1, externalCount 에 +1) 을 하게 된다

IncreaseHeadCount 는 while(true) 으로 되어 있기 때문에

다른스레드가 개입 되더라도 현재 스레드가 참조 권을 획득 할때까지 무한히 루프를 돌면서 대기하게 된다

=> 즉 참조권을 얻어야 진행 가능하다는 얘기

 

 

 

if (_head.compare_exchange_strong(oldHead, ptr->next))

를 통해 참조권을 획득한 (externalCount ==2 인 oldHead)와 _head 가 같은 스레드며 

현재 oldHead 가 참조권을 획득한 것이고 if 문 안으로 들어간다

 

여기서 리턴할 데이터를 미리 빼놓고 

 

 

그다음 삭제가 남았는데
if (_head.compare_exchange_strong(oldHead, ptr->next))  

 

삭제시 고려해봐야 하는 점은 이 구문을 들어오는게 하나의 스레드가 아닌

여러개의 스레드가 들어 올 수도 있다


그래서 삭제처리 또한 다른 스레드가 삭제를 시도하려는지를 살펴보고 삭제를 해야 해서

internalCount를 통해 최종 마지막에 접근하게 되어 internalCount 이 0 이 되는 스레드에서 

해당 노드를 삭제 처리를 해줘야 한다 => shared_ptr 처럼 (이렇게 하려는 것이 의도 임으로)

 

이에 대한 구현은 아래 와 같다

즉 externalCount  를 통해 여러 스레드가 해당 노드에 얼마나 접근 하려고 하는지를 체크하는 것이고

internalCount 를 통해 실제 노드를 삭제해도 되는지 를 보려는 것이다 shared_ptr 의 ref count 처럼 동작하게 하기 위해서

 

 

 

	struct CountedNodePtr
	{
		int32 externalCount = 0;				//Node 포인터 참조 횟수
		Node* ptr = nullptr;					//Node 포인터
	};

	struct Node 
	{
		Node(const T& value) : data(make_shared<T>(value)){}
		shared_ptr<T> data;
		atomic<int32> internalCount = 0;
		CountedNodePtr next;
	};

 

 

#pragma once
#include <mutex>



template<typename T>
class LockFreeStack
{
	struct Node;
	//이렇게 그냥 쓰면 lock free 라 안될 수도 있지만 externalCount 를 16 bit, ptr 을 48 bit 등으로 쪼개어 64 비트 맞출 수도 있다 => 이렇게 되면 atomic 조건이 됨
	struct CountedNodePtr
	{
		int32 externalCount = 0;				//Node 포인터 참조 횟수
		Node* ptr = nullptr;					//Node 포인터
	};

	struct Node 
	{
		Node(const T& value) : data(make_shared<T>(value)){}
		shared_ptr<T> data;
		atomic<int32> internalCount = 0;
		CountedNodePtr next;
	};

public :

	//node 추가는 head 바로 다음에 새로운 노드가 추가 되는 방식이다
	void push(const T& value)
	{
		CountedNodePtr  node;
		node.ptr = new Node(value);
		node.externalCount = 1;

		node.ptr->next = _head;
		while (_head.compare_exchange_weak(node.ptr->next, node) == false)
		{

		}
	}

	shared_ptr<T> tryPop()
	{
		
		CountedNodePtr oldHead = _head;

		//참조권을 획득해야 함
		while (true)
		{
			//oldHead 의 externalCount 가 1 증가 된다
			//이게 실행 되고 나면 최소 externalCount >= 2 상태가 됨으로 삭제는 바로 할 수 없는 즉 안전하게 접근 할수 있는 상태가 된것이다
			IncreaseHeadCount(oldHead);		


			//여기에서 참조권을 얻오온것은 맞는데 다른 스레드도 동시에 실행 됐다면 
			//다른 스레드도 externalCount 를 증가 시킨체 참조권을 마찬가지로 획득한 상태가 될 수 있다
			Node* ptr = oldHead.ptr;	

			//데이터 없음
			if (ptr == nullptr)
			{
				return shared_ptr<T>();
			}


			//그래서 externalCount 가 또 다른 스레드에 의해서 증가 된상태일 수 있음으로 소유권까지 획득해야 정확해진다
			//소유권 획득(ptr->next 로 head를 바꿔치기 한 애가 이김)

			//만약 이게(compare_exchange_strong) 참이라면 소유권을 획득했다는 것이고 아까 꺼내온 oldHead과 _head의 externalCount 까지 일치한다는 얘기가 되는데
			//만약 다른 스레드가 나중에 externalCount를 증가 시켰다면 현 스레드(지금 비교 하려는)의 _head와 oldHead 의 externalCount 
			// 는 같지 않기 떄문에 소유권을 획득하지 못한다는 것
			if (_head.compare_exchange_strong(oldHead, ptr->next))
			{
				//ptr를 삭제할 것임으로 ptr 의 다음노드가 head 가 되게 처리한다음

				//여기에 들어오면 소유권을 획득한 것임으로 
				shared_ptr<T> res;
				res.swap(ptr->data);
                
                //경우의 수 .1
				//external : 1 -> 2(+1)  => 최초 기본적인 상황에선 external 이 2가 됨
				//internal : 0				=> 최초에는 internal 은 0 이됨
				//그렇다면 삭제가 된다

		//경우의 수 .2
				//만약 이곳(if문)에 들어오기 전에 참조권이 여러명에게 있었다고 가정해보자 
				//현재 스레드외에 두개의 스레드가 더 있어서 참조권을 다른 스레드들에 의해서 2가 증가 됐다고 가정
				//external : 1 -> 2(+1)  -> 4(나 +2, 다른 스레드 +2)

				//이 상황에선 countIncrease == 2가 되고 아직 internalCount fetch_add 이 실행 되기 이전엔 0 임으로 
				//0 == -2 같지 않아서 삭제처리를 못하고
				//그 다음에 바로 internalCount == 2 가 된다 , external == 4
				//이후 다른 스레드에서 봤을때 if (_head.compare_exchange_strong(oldHead, ptr->next)) 은 실패가 됨으로 => 왜냐면 여기에 먼저 들어온 쓰레드가 _head를 다음 노드인 ptr->next 로 바꿨기 때문에
				//여기로 들어온 스레드가 internalCount 를 2 증가 시켰음으로 실패한 스레드에서 1씩 감소 시키면서 (동일한 ptr 주소까지 들고 있는 곳까지 모두 실행했었다는 가정하에 : 스택변수임으로 그당시에 들고 있는 주소를 들고 있음)
				//internalCount를 최종적으로 0으로 (정확히는 0이 되기직전의 1) 만드는 스레드에서 ptr 을제거처리한다



                //아래는 어떤 경우의 수든 통용되는 설명
				//지금 삭제 하려고 하는것 말고 다른애들이 삭제하려고 하는 shared_ptr 을 가리키고 있을수 있기 때문에 삭제를 시도하는 처리가 있어야 한다
				//참조하는 것이 나 외에 다른 애들이 있는가?
				const int32 countIncrease = oldHead.externalCount - 2;		//현재 스레드로 인해서 externalCount 가 2 만큼 증가했으니 그 수만큼 빼줘서 0 이면 다른곳에서도 참조하고 있는게 없는 것임

				//나밖에 없다면 countIncrease == 0 이 된다
				//그런데 fetch_add 는 더하기 이전의 결과를 리턴한다 => 이때도 최초에는 0 인건 마찬가지
				//countIncrease 를 internalCount 에 더하는 이유는 현재 스레드 말고 다른스레드에서 자원을 참조하고 있다면 해당 레퍼런스 만큼 더해주기 위해서 
                //즉 현재 스레드에서 처리를 지연시킴 (shared_ptr 의 ref count 처럼)
				if (ptr->internalCount.fetch_add(countIncrease) == -countIncrease) 
				{
					//삭제 예정이였던 노드 삭제 처리
					delete ptr;
				}

				return res;
			}
			//_head과 oldHead 이 같지 않다면   ptr->internalCount 카운트를 1 감소시킴
			// 즉 _head==oldHead 가 같은 쓰레드 이외의 다른 스레드에서 실행 되는 거면  소유권 카운트를 1 감소 시킨다
			//fetch_sub 리턴은 감소하기 전 값을 리턴한다
			else if(ptr->internalCount.fetch_sub(1)==1)
			{
				//internalCount 이 0 이 됐다는 건 더이상 참조하고 있는것이 없기 때문에 삭제처리 한다
				//참조권은 얻었으나, 소유권은 실패 
				delete ptr;
			}

		}

	}

	//head 의 externalCount를 1 증가 시키려는 목적
	void IncreaseHeadCount(CountedNodePtr& oldCounter)
	{
		while (true)
		{
			CountedNodePtr newCounter = oldCounter;
			newCounter.externalCount++;

			//newCounter.externalCount++  1 증가 한것을 _head = newCounter.externalCount; 이렇게 바로 넣어 버리는 것은 위험한데, 
			//이 구간에서 병합이 일어날 수 있기 때문이다 if .. else...

			//IncreaseHeadCount 이 함수 안에 들어왔다는 것은 어쨋든 _head의 externalCount 를 1 증가 시키려는 것이 목적이니
			//현재 _head 와 이전 head즉 oldCounter 이 같다면(=다른 스레드가 영향을 주지 않아 변화가 없다면) 
			//1 증가시킨 것(newCounter)을 _head에 넣어 _head를 증가 시켜라 라는 것임
			//즉 중간에 스레드 개입이 있어 이미 _head 가 증가 되어 있다면 _head 가 증가 됐음을 oldCounter 에게 알리고 
			//oldCounter 를 현재 _head.externalCount 로 맞춘다 
			//그리고
			//compare_exchange_strong 가 실패하면 어쨌든 다른 스레드에서 externalCount 를 1 증가 시킨 상태임으로 이 카운트를 맞춰 주기 위해서
			//compare_exchange_strong 가 false 일때 oldCounter = _head ; 이 내부적으로 실행되어 현재 head 의  externalCount 를 맞추게 된다, 그리고 while 을 통해 다시 증가 시도
			if (_head.compare_exchange_strong(oldCounter, newCounter))	
			{
				//여기 까지 성공하면 oldHead는 참조권을 획득한 상태가 되는것
				//위 함수가 성공 하면 oldCounter 쪽의 externalCount 도 변해야 함으로 같이 변경해준다
				oldCounter.externalCount = newCounter.externalCount;
				break;
			}

		}
	}

private :
	atomic<CountedNodePtr> _head;

	atomic<uint32> _popCount = 0;	//pop 을 실행중인 스레드 개수
	atomic<Node*> _pendingList;		//삭제 되어야 할 노드들(첫번째 노드)
	
	
};

 

 

 

 

 

 

실행 main 함수 로직

LockFreeStack<int> s;

void push()
{
	int i = 0;
	while (true)
	{
		s.push(i++);
		this_thread::sleep_for(10ms);
	}
}

void pop()
{
	while (true)
	{
		auto popData = s.tryPop();
 		if (popData != nullptr)
 		{
 			cout << *popData << endl;
 		}
	}
}

 

 

 

 

경우의 수는 간단하게 만 보면

 

경우의 수 1.

최초 실행 될때그리고 바로 삭제 될때

 

external : 1 -> 2(+1)  => 최초 기본적인 상황에선 external 이 2가 됨
internal : 0 => 최초에는 internal 은 0 이됨

 

생성시 external 은 1이 되고 IncreaseHeadCount 를 통해 external  은 2가 된다


const int32 countIncrease = oldHead.externalCount - 2;

현재 스레드로 인해서 externalCount 가 2 만큼 증가했으니

그 수만큼 빼줘서 0 이면 다른곳에서도 참조하고 있는게 없는 것임

countIncrease  == 0이 된다

 

나밖에 없다면 countIncrease == 0 이 되는데

이후 if (ptr->internalCount.fetch_add(countIncrease) == -countIncrease) 

fetch_add 는 더하기 이전의 결과를 리턴한다 => 이때도 최초에는 internal  0 이였음으로 

 

이 if 문은 참이 되어 삭제 처리를 하게 된다

if (ptr->internalCount.fetch_add(countIncrease) == -countIncrease) 
{
    //삭제 예정이였던 노드 삭제 처리
delete ptr;
}

 

 

 

경우의 수 2.

 

만약 이곳(if문)에 들어오기 전에 참조권이 여러명에게 있었다고 가정해보자 
현재 스레드외에 두개의 스레드가 더 있어서 참조권을 다른 스레드들에 의해서 2가 증가 됐다고 가정
external : 1 -> 2(+1)  -> 4(나 +2, 다른 스레드 +2)

 

 

이 상황에선 countIncrease == 2가 되고 아직 internalCount fetch_add 이 실행 되기 이전엔 0 임으로 
0 == -2 같지 않아서 삭제처리를 못하고
그 다음에 바로 internalCount == 2 가 된다 , external == 4

 

이후 다른 스레드에서 봤을때

if (_head.compare_exchange_strong(oldHead, ptr->next)) 은 실패가 됨으로

=> 왜냐면 여기에 먼저 들어온 쓰레드가 _head를 다음 노드인 ptr->next 로 바꿨기 때문에


여기로 들어온 스레드가 internalCount 를 2 증가 시켰음으로 실패한 스레드에서 1씩 감소 시키게 된다

(동일한 ptr 주소까지 들고 있는 곳까지 모두 실행했었다는 가정하에 : 스택변수임으로 그당시에 들고 있는 주소를 들고 있음)


결국 다른 스레드들에 의해서 internalCount를 최종적으로 0으로 (정확히는 0이 되기직전의 1) 만드는 스레드에서 ptr 을제거처리한다

 

 

실행 결과는 아래와 같으며

 

 

push 스레드에서 this_thread::sleep_for(10ms); 로 추가 되는 시간을 지연시켜놔서 메모리 또한 얼추 일정 사태를 유지한다

 

만약 sleep_for 를 걸지 않으면 추가 되는것이 삭제 되는것보다 많기 때문에 (삭제시 병합 과정때문에 느려짐으로..)

아래 처럼 메모리가 치솟는 것을 볼 수 있다

 

 

 lock-free 가 이름만 들어선 좋아 보이지만 lock 만 쓰지 않다 뿐이지 병합이 일어나는 구간에선

여전히 cpu 를 갉아 먹으면서 대기해야 하는 부분은 여전하고 lock-free 특성상 해당 로직이 정상적으로 실행 되려면 이전에 실행했던것이 실행 되지 않고 처음부터 다시 실행하는 상황이 발생하는 경우가 대부분이라 함

 

위으 코드 중  tryPop 코드를보면 tryPop 함수내부가 전체 while(true ) 로 감싸져 있는 것을 볼 수 있다

즉 실행되지 못한다면 계속 대기 하면서 실행 가능 할때까지 처음부터다시 실행해야 한다는 것..

 

 

 

또한 사람의 장시간의 이해가 필요로 하는 부분이기 때문에 (중간 중간 변칙적인 부분들) 사람이 모든걸 예측해서 구현하기는 투여 시간대비 효율이 높다고 하긴 어려운 부분이 있다

 

성능이 앞도적으로 높은것도 아닌 얼추 엇비슷하다고 한다

 

그래서 lock-free 를 쓰는것이 긍정적으로 평가 되진않는다고 함 => 연구들은 계속 진행 중..

반응형

+ Recent posts