반응형

 

삭제의 핵심은 분리를 미리하고 분리한 대상을 현재 스레드의 스택 메모리에 삭제할 주소를 지역변수로 들고 있다가

다시 자신의 스레드로 돌아왔을때 분리된 삭제할 메모리를 그때서야 삭제 하는 것이다

 

 

 

삭제하는 방식(룰)

  1. 데이터 분리
  2. count 체크
  3. 나 혼자면 삭제

 

void tryDelete(Node* oldHead)
{

 

   나 외에 누가 있는지 먼저 체크

   _popCount 는 atomic 이다 1보다 크면 누군가 tryPop 에 들어와 추가로 pop 을 하려고 하는 상황
   if (_popCount == 1) 
   {

   oldHead 를 삭제 할거긴한데 대기 삭제 리스트도 이때 한번 삭제 시도를 해보자는것
   _pendingList 은 null 이 되고 _pendingList 포인터를 node 에 넣음
   이로써 이 쓰레드에서 데이터를 분리해둔 상태가 되고, 다른스레드에서 이 작업을 한다 해도 별도의 스택영역
   지역변수인 node 에 가져오려고 하는것이고 , _pendingList는 이미 현 스레드에서 null 이 됐음으로

   현재 스레드의 node만  _pendingList 를 들고 있는 안전한 상태가 된다
   즉 이렇게 꺼내도면 다른 쓰레드에서 push 해서 노드를 추가한다고 해도 이미 기존 _pendingList 들을 현재
   스레드의 node 에 포인터를 꺼내왔고 _pendingList를 null 로 만들었기 때문에 아무 상관이 없다


   Node* node = _pendingList.exchange(nullptr); //한번에 교환 처리 됨 atomic 임으로

 

 

 

 

   atomic 으로  감소가  atomic 하게 한줄로처리 됨,  감소 처리하고 리턴 되는것은

   복사 된 스택 임시 값과 == 0 의 비교 임으로 비교 자체는 다른 스레드와 영향 없음
   if (--_popCount == 0) 

   {

 

     그래서 -1 을 감소시켜서 0이라면 중간에 끼어든 애가 없다는 것
     끼어든 애가 없음 -> 삭제 진행
     만약 if (--_popCount == 0) 이 구문이 지나고 if 문 안으로 들어와서 다른스레드가 _popCount를 증가 시켰어도
      이미 미리 node 값으로 _pendingList 을 빼 왔고 지역 변수 node로 스레드 분리된 값으로 pendingList 를 들고

     있어서 안전함 => 삭제 가능

 

      그래서 분리 시켜놨기 때문에 이 구문안에서 삭제 처리를 진행 할 수 있다
     DeleteNodes(node);
}
else if(node)
{

 

   다른 스레드로 누군가 tryDelete 을 또 호출 했기 때문에 누가 끼어들었으니 다시 갖다 놓아야 함
   ChainPendingNodeList(node); //node 를 _pendlingList 에 추가해 놓는 처리
}


   혼자서 pop 을 할려고 하는것이긴 한데
   이 안에 들어 왔을 때 바로 delete를 수행하는데 만약 if 문 이 구문 안에 현 스레드가 들어 왔을때
   tryPop 함수를 다른 스레드에서 또 호출하게 된다면? 상황이 복잡해 질것 같지만
   이미 tryDelete 를 하기 전에 oldhead 노드를 따로 삭제할 노드 포인터로 빼놓았기 때문에 
   즉 현재 삭제하려고 한 oldhead 의 로컬 변수와, 새롭게 진입한 tryPop 스레드의 oldHead 는 서로 다른 로컬 변수이   기 때문에 서로 상관 없다 => 즉 안전하다
   그래서 여기서 delete 를 하는게 가능하다
   delete oldHead;

...

}

else

{

.....

 

 

 

#pragma once
#include <mutex>


template<typename T>
class LockStack
{
public :

	LockStack() = default;

	LockStack(const LockStack&) = delete;
	LockStack& operator=(const LockStack&) = delete;

	void push(T value) 
	{
		lock_guard<mutex> lock(_mutex);
		_stack.push( std::move(value));
		_conVar.notify_one();
	}

	//데이터를 꺼낼때 여기에서 따로 구현하게 되는 Empty 함수를 호출 체크하는게 의미가 없는데,  Empty 함수안에
	// lock_guard 가 있고 Empty 함수가 끝난 이후 Pop 함수로 되돌아 갈때 
	// 다른 스레드에서 stack 에 데이터를 추가하거나 제거 하게 된다면 현재 스레드에서 pop 을 할때 
	//스택이 비어 있는지 차 있는지 알수 없음으로 (그 순간 사이에서) 무의미하다
	//그래서 pop 할때 empty 함수를 호출하지 않고 내부에 구현한다, 유저 편의상 Empty 함수를 제공해 줄 순 있겠지만.. 

	bool tryPop(T& value)
	{
		lock_guard<mutex> lock(_mutex);
		if (_stack.empty())
		{
			return false;
		}
		 value =  std::move(_stack.top());
		 _stack.pop();
		 return true;
	}


	//pop 할 데이터가 없을때 대기하다가 pop 할 데이터가 생기면 그때 pop하고 끝내는 함수
	void waitPop(T& value)
	{
		unique_lock<mutex> lock(_mutex);

		// stack 이 비어있지 않다면(false) == false => true
		// stack 이 비어 있다면 (true) == false => false  , 조건이 만족하지 않으면 lock 을 풀고 sleep 하게 된다
		_conVar.wait(lock, [this] { return _stack.empty() == false; }); 
		value = std::move(_stack.top());
		_stack.pop();

	}

private :
	stack<T> _stack;
	mutex _mutex;
	condition_variable _conVar;
};



template<typename T>
class LockFreeStack
{
	struct Node 
	{
		Node(const T& value) : data(value), next(nullptr){}
		T data;
		Node* next;
	};

public :

/*	1. 새 노드를 만든다
	2. 새 노드의 next = nead
	3. head = 새노드  */
	//node 추가는 head 바로 다음에 새로운 노드가 추가 되는 방식이다
	void push(const T& value)
	{
		Node* node = new Node(value);
		node->next = _head;		
		
		while (_head.compare_exchange_weak(node->next, node) == false)			//atomic 으로 한번에 처리됨
		{
			//node->next = _head; //없어도 push 로직상 맞아 떨어짐
		}
	}

/*	1. head 읽기
	2. head->next 읽기
	3. head = dead->next
	4. data 추출해서 반환
	5. 추출한 노드를 삭제	*/
	bool tryPop(T& value)
	{
		//삭제를 시작하고 있는 스레드 카운트 증가
		++_popCount;

		Node* oldHead = _head;
		/*
		if (_head == oldHead)
		{
			_head = oldHead->next;
			return true;
		}
		else
		{
			oldHead = _head;
			return false;
		}
		*/

		//이 줄이 실행 완료 되면 노드를 스택에서빼낸 상태 oldHead
		while (oldHead && _head.compare_exchange_weak(oldHead, oldHead->next) == false)			//atomic 으로 한번에 처리가 됨
		{
			// oldHead = _head;
		}

		if (oldHead == nullptr)
		{
			//삭제 할게 없다면 해당 삭제하고 있는 스레드 감소
			--_popCount;
			return false;
		}

		value = oldHead->data;
		tryDelete(oldHead);
		return true;
	}

	//삭제하는 방식(룰)
	//1. 데이터 분리
	//2. count 체크
	//3 나 혼자면 삭제
	void tryDelete(Node* oldHead)
	{
		//나 외에 누가 있는지 먼저 체크
		if (_popCount == 1)		//_popCount 는 atomic 이다 1보다 크면 누군가 tryPop 에 들어와 추가로 pop 을 하려고 하는 상황
		{

			
			Node* node = _pendingList.exchange(nullptr);		//한번에 교환 처리 됨 atomic 임으로

			if (--_popCount == 0) //atomic 으로  감소가  atomic 하게 한줄로처리 됨,  감소 처리하고 리턴 되는것은 복사 된 스택 임시 값과 == 0 의 비교 임으로 비교 자체는 다른 스레드와 영향 없음
			{
				//그래서 -1 을 감소시켜서 0이라면 중간에 끼어든 애가 없다는 것
				//끼어든 애가 없음 -> 삭제 진행
				//만약 if (--_popCount == 0) 이 구문이 지나고 if 문 안으로 들어와서 다른스레드가 _popCount를 증가 시켰어도
				//이미 미리 node 값으로 _pendingList 을 빼 왔고 지역 변수 node로 스레드 분리된 값으로 pendingList 를 들고 있어서 안전함 => 삭제 가능

				DeleteNodes(node);
			}
			else if(node)
			{
				//누가 끼어들었으니 다시 갖다 놓아야 함
				ChainPendingNodeList(node);
			}



			//혼자서 pop 을 할려고 하는것이긴 한데
			//이 안에 들어 왔을 때 바로 delete를 수행하는데 만약 if 문 이 구문 안에 현 스레드가 들어 왔을때
			//tryPop 함수를 다른 스레드에서 또 호출하게 된다면? 상황이 복잡해 질것 같지만
			// 이미 tryDelete 를 하기 전에 oldhead 노드를 따로 삭제할 노드 포인터로 빼놓았기 때문에 
			//즉 현재 삭제하려고 한 oldhead 의 로컬 변수와, 새롭게 진입한 tryPop 스레드의 oldHead 는 서로 다른 로컬 변수이기 때문에 서로 상관 없다 => 즉 안전하다
			//그래서 여기서 delete 를 하는게 가능하다
			delete oldHead;

		}
		else
		{
			//누가 있는 상황이니 그럼 지금 삭제 하지 않고, 삭제 예약만 해놓는다 => _pendingList 에 추가해 놓는다
			ChainPendingNode(oldHead);
			--_popCount;
		}
	}
	
	void ChainPendingNode(Node* node)
	{
		ChainPendingNodeList(node, node);
	}

	//first 노드의 첫번째
	//last 노드의 마지막
	void ChainPendingNodeList(Node* first, Node* last)
	{

		//삭제 하는 로직인데 Push 하는것과 동일하게 _pendingList에 삭제할 로직을 뒤에 계속 쌓게 된다
		last->next = _pendingList;
		while (_pendingList.compare_exchange_weak(last->next, first) == false)
		{

		}
	}

	//리스트로 된 스택을 기존 _pendingList에 추가로 넣고 추가된 첫번째 노드를 _pendingList 이 가리키게 한다
	void ChainPendingNodeList(Node* node)
	{
		Node* last = node;
		while (last->next)
		{
			last = last->next;
		}
		ChainPendingNodeList(node, last);
	}




	static void DeleteNodes(Node* node)
	{
		while (node)
		{
			Node* next = node->next;
			delete node;
			node = next;
		}
	}

private :
	atomic<Node*> _head;		//head 가 다음 노드를 가르키는 방식

	atomic<uint32> _popCount = 0;	//pop 을 실행중인 스레드 개수
	atomic<Node*> _pendingList;		//삭제 되어야 할 노드들(첫번째 노드)
	
};

 

반응형

'운영체제 & 병렬처리 > Multithread' 카테고리의 다른 글

shared_ptr 로 구현한 lock-free 은 lock-free 가 아니다 (3)  (0) 2022.09.22
Shared_ptr 와 atomic  (0) 2022.09.22
LockFree - Stack (1)  (0) 2022.09.15
LockBase Queue  (0) 2022.09.14
Lock base Stack  (0) 2022.09.14

+ Recent posts