LockFree - Stack (2) - 삭제 처리
삭제의 핵심은 분리를 미리하고 분리한 대상을 현재 스레드의 스택 메모리에 삭제할 주소를 지역변수로 들고 있다가
다시 자신의 스레드로 돌아왔을때 분리된 삭제할 메모리를 그때서야 삭제 하는 것이다
삭제하는 방식(룰)
- 데이터 분리
- count 체크
- 나 혼자면 삭제
void tryDelete(Node* oldHead)
{
나 외에 누가 있는지 먼저 체크
_popCount 는 atomic 이다 1보다 크면 누군가 tryPop 에 들어와 추가로 pop 을 하려고 하는 상황
if (_popCount == 1)
{
oldHead 를 삭제 할거긴한데 대기 삭제 리스트도 이때 한번 삭제 시도를 해보자는것
_pendingList 은 null 이 되고 _pendingList 포인터를 node 에 넣음
이로써 이 쓰레드에서 데이터를 분리해둔 상태가 되고, 다른스레드에서 이 작업을 한다 해도 별도의 스택영역
지역변수인 node 에 가져오려고 하는것이고 , _pendingList는 이미 현 스레드에서 null 이 됐음으로
현재 스레드의 node만 _pendingList 를 들고 있는 안전한 상태가 된다
즉 이렇게 꺼내도면 다른 쓰레드에서 push 해서 노드를 추가한다고 해도 이미 기존 _pendingList 들을 현재
스레드의 node 에 포인터를 꺼내왔고 _pendingList를 null 로 만들었기 때문에 아무 상관이 없다
Node* node = _pendingList.exchange(nullptr); //한번에 교환 처리 됨 atomic 임으로
atomic 으로 감소가 atomic 하게 한줄로처리 됨, 감소 처리하고 리턴 되는것은
복사 된 스택 임시 값과 == 0 의 비교 임으로 비교 자체는 다른 스레드와 영향 없음
if (--_popCount == 0)
{
그래서 -1 을 감소시켜서 0이라면 중간에 끼어든 애가 없다는 것
끼어든 애가 없음 -> 삭제 진행
만약 if (--_popCount == 0) 이 구문이 지나고 if 문 안으로 들어와서 다른스레드가 _popCount를 증가 시켰어도
이미 미리 node 값으로 _pendingList 을 빼 왔고 지역 변수 node로 스레드 분리된 값으로 pendingList 를 들고
있어서 안전함 => 삭제 가능
그래서 분리 시켜놨기 때문에 이 구문안에서 삭제 처리를 진행 할 수 있다
DeleteNodes(node);
}
else if(node)
{
다른 스레드로 누군가 tryDelete 을 또 호출 했기 때문에 누가 끼어들었으니 다시 갖다 놓아야 함
ChainPendingNodeList(node); //node 를 _pendlingList 에 추가해 놓는 처리
}
혼자서 pop 을 할려고 하는것이긴 한데
이 안에 들어 왔을 때 바로 delete를 수행하는데 만약 if 문 이 구문 안에 현 스레드가 들어 왔을때
tryPop 함수를 다른 스레드에서 또 호출하게 된다면? 상황이 복잡해 질것 같지만
이미 tryDelete 를 하기 전에 oldhead 노드를 따로 삭제할 노드 포인터로 빼놓았기 때문에
즉 현재 삭제하려고 한 oldhead 의 로컬 변수와, 새롭게 진입한 tryPop 스레드의 oldHead 는 서로 다른 로컬 변수이 기 때문에 서로 상관 없다 => 즉 안전하다
그래서 여기서 delete 를 하는게 가능하다
delete oldHead;
...
}
else
{
.....
#pragma once
#include <mutex>
template<typename T>
class LockStack
{
public :
LockStack() = default;
LockStack(const LockStack&) = delete;
LockStack& operator=(const LockStack&) = delete;
void push(T value)
{
lock_guard<mutex> lock(_mutex);
_stack.push( std::move(value));
_conVar.notify_one();
}
//데이터를 꺼낼때 여기에서 따로 구현하게 되는 Empty 함수를 호출 체크하는게 의미가 없는데, Empty 함수안에
// lock_guard 가 있고 Empty 함수가 끝난 이후 Pop 함수로 되돌아 갈때
// 다른 스레드에서 stack 에 데이터를 추가하거나 제거 하게 된다면 현재 스레드에서 pop 을 할때
//스택이 비어 있는지 차 있는지 알수 없음으로 (그 순간 사이에서) 무의미하다
//그래서 pop 할때 empty 함수를 호출하지 않고 내부에 구현한다, 유저 편의상 Empty 함수를 제공해 줄 순 있겠지만..
bool tryPop(T& value)
{
lock_guard<mutex> lock(_mutex);
if (_stack.empty())
{
return false;
}
value = std::move(_stack.top());
_stack.pop();
return true;
}
//pop 할 데이터가 없을때 대기하다가 pop 할 데이터가 생기면 그때 pop하고 끝내는 함수
void waitPop(T& value)
{
unique_lock<mutex> lock(_mutex);
// stack 이 비어있지 않다면(false) == false => true
// stack 이 비어 있다면 (true) == false => false , 조건이 만족하지 않으면 lock 을 풀고 sleep 하게 된다
_conVar.wait(lock, [this] { return _stack.empty() == false; });
value = std::move(_stack.top());
_stack.pop();
}
private :
stack<T> _stack;
mutex _mutex;
condition_variable _conVar;
};
template<typename T>
class LockFreeStack
{
struct Node
{
Node(const T& value) : data(value), next(nullptr){}
T data;
Node* next;
};
public :
/* 1. 새 노드를 만든다
2. 새 노드의 next = nead
3. head = 새노드 */
//node 추가는 head 바로 다음에 새로운 노드가 추가 되는 방식이다
void push(const T& value)
{
Node* node = new Node(value);
node->next = _head;
while (_head.compare_exchange_weak(node->next, node) == false) //atomic 으로 한번에 처리됨
{
//node->next = _head; //없어도 push 로직상 맞아 떨어짐
}
}
/* 1. head 읽기
2. head->next 읽기
3. head = dead->next
4. data 추출해서 반환
5. 추출한 노드를 삭제 */
bool tryPop(T& value)
{
//삭제를 시작하고 있는 스레드 카운트 증가
++_popCount;
Node* oldHead = _head;
/*
if (_head == oldHead)
{
_head = oldHead->next;
return true;
}
else
{
oldHead = _head;
return false;
}
*/
//이 줄이 실행 완료 되면 노드를 스택에서빼낸 상태 oldHead
while (oldHead && _head.compare_exchange_weak(oldHead, oldHead->next) == false) //atomic 으로 한번에 처리가 됨
{
// oldHead = _head;
}
if (oldHead == nullptr)
{
//삭제 할게 없다면 해당 삭제하고 있는 스레드 감소
--_popCount;
return false;
}
value = oldHead->data;
tryDelete(oldHead);
return true;
}
//삭제하는 방식(룰)
//1. 데이터 분리
//2. count 체크
//3 나 혼자면 삭제
void tryDelete(Node* oldHead)
{
//나 외에 누가 있는지 먼저 체크
if (_popCount == 1) //_popCount 는 atomic 이다 1보다 크면 누군가 tryPop 에 들어와 추가로 pop 을 하려고 하는 상황
{
Node* node = _pendingList.exchange(nullptr); //한번에 교환 처리 됨 atomic 임으로
if (--_popCount == 0) //atomic 으로 감소가 atomic 하게 한줄로처리 됨, 감소 처리하고 리턴 되는것은 복사 된 스택 임시 값과 == 0 의 비교 임으로 비교 자체는 다른 스레드와 영향 없음
{
//그래서 -1 을 감소시켜서 0이라면 중간에 끼어든 애가 없다는 것
//끼어든 애가 없음 -> 삭제 진행
//만약 if (--_popCount == 0) 이 구문이 지나고 if 문 안으로 들어와서 다른스레드가 _popCount를 증가 시켰어도
//이미 미리 node 값으로 _pendingList 을 빼 왔고 지역 변수 node로 스레드 분리된 값으로 pendingList 를 들고 있어서 안전함 => 삭제 가능
DeleteNodes(node);
}
else if(node)
{
//누가 끼어들었으니 다시 갖다 놓아야 함
ChainPendingNodeList(node);
}
//혼자서 pop 을 할려고 하는것이긴 한데
//이 안에 들어 왔을 때 바로 delete를 수행하는데 만약 if 문 이 구문 안에 현 스레드가 들어 왔을때
//tryPop 함수를 다른 스레드에서 또 호출하게 된다면? 상황이 복잡해 질것 같지만
// 이미 tryDelete 를 하기 전에 oldhead 노드를 따로 삭제할 노드 포인터로 빼놓았기 때문에
//즉 현재 삭제하려고 한 oldhead 의 로컬 변수와, 새롭게 진입한 tryPop 스레드의 oldHead 는 서로 다른 로컬 변수이기 때문에 서로 상관 없다 => 즉 안전하다
//그래서 여기서 delete 를 하는게 가능하다
delete oldHead;
}
else
{
//누가 있는 상황이니 그럼 지금 삭제 하지 않고, 삭제 예약만 해놓는다 => _pendingList 에 추가해 놓는다
ChainPendingNode(oldHead);
--_popCount;
}
}
void ChainPendingNode(Node* node)
{
ChainPendingNodeList(node, node);
}
//first 노드의 첫번째
//last 노드의 마지막
void ChainPendingNodeList(Node* first, Node* last)
{
//삭제 하는 로직인데 Push 하는것과 동일하게 _pendingList에 삭제할 로직을 뒤에 계속 쌓게 된다
last->next = _pendingList;
while (_pendingList.compare_exchange_weak(last->next, first) == false)
{
}
}
//리스트로 된 스택을 기존 _pendingList에 추가로 넣고 추가된 첫번째 노드를 _pendingList 이 가리키게 한다
void ChainPendingNodeList(Node* node)
{
Node* last = node;
while (last->next)
{
last = last->next;
}
ChainPendingNodeList(node, last);
}
static void DeleteNodes(Node* node)
{
while (node)
{
Node* next = node->next;
delete node;
node = next;
}
}
private :
atomic<Node*> _head; //head 가 다음 노드를 가르키는 방식
atomic<uint32> _popCount = 0; //pop 을 실행중인 스레드 개수
atomic<Node*> _pendingList; //삭제 되어야 할 노드들(첫번째 노드)
};