DFS Deadlock Detection 

 

데드락을 DFS 를 통해서 미리 판별할수 있다는 컨셉으로 접근하는 방식

 

 

 

 

 

인접행렬 3,4 : 3에서 4로 가는 경우가 있으니 3번째줄에서 오른쪽으로 4번이 1인걸 알수 있다

 

 

 

그래프에서 노드마다 각각 lock 을 갖고 있다고 가정한다

 

이때 노드 방문시 역방향이 일어나면 데드락이 발생 하는데

예를 들어 0번 노드에서 1번으로 간다음 아직 그래프 탐색이 끝나지 않은 상태에서 1번에서

다시 0번으로 가는 경우를 말한다

 

그런데 만약 노란색 선이 있었다 가정하면

0과 3이 역방향으로 성립되어 사이클이 형성 되지만

3->0->1->3 번 순으로 사이클이 도는것을 알 수 있고 이럴때도 데드락이 발생한다고 할수 있다

무한히 돌아가는 상황이 발생

그래서 데드락을 DFS 를 통해서 런타임시 판단하는 방식인데 생각보다 빠르게 판별할수 있다

 

 

먼저 그래프는 3가지의 간성이 있을수 있는데

 

1. 순방향 간선 : 최초 0번을 방문 한다음 1번노드나 3번 노드를 방문 할떄와 같은 상황을 말한다

2. 교차 간선 : 3번 노드부터 방문을 시작할때 다음처럼 방문이 끝나게 된다

이때 5번 노드부터 탐색을 시작 할대 이미 3과 4가 하나의 그룹으로 탐색이 완료된 상태에서 5번이 완료된 곳으로 진입하게 될때 이때의 5에서 4로 가는 간석을 교차간선이라 한다

 

3. 역방향 간선 : 노드탐색이 완료된 위 노란색박스 상태가 아닌 상태에서 즉 아직 탐색이 끝나지 않은 상태에서 역으로 가는 상태가 나타날때를 역방향 간선이라고 한다 예를 들어 0번 노드부터 시작을 하면 0번 노드에 방문을 표시를 한다음
1과 3 노드에 방문하게 되는데 1번 노드에서 부터 다시 재귀적으로 DFS 를 돌게 될것이고 1번 노드에서 0으로 (빨간색 화살표)로 방문을 하게되어 아직 완료 되지 않은 상태에서 역방향 사이클이 형성 되는것을 알 수 있다

 

사이클이 일어나면 데드락상황이 된다

 

각 노드들에는 Lock 이 있다 가정하고 이것을 감지하는 방식은 Lock 을 write 할때와 풀어줄때는 생성자-파괴자에 lock 감지와 풀어주는 코드를 넣어주고 이것을 활용하여 순환이 되는지를 체크해보면 된다

 

 

 

 

int main()
{
	GThreadManager->Launch([=]
		{
			while (true)
			{
				cout << "PlayerThenAccount" << endl;
				GPlayerManager.PlayerThenAccount();
				this_thread::sleep_for(100ms);
			}
		});

	GThreadManager->Launch([=]
		{
			while (true)
			{
				cout << "AccountThenPlayer" << endl;
				GAccountManager.AccountThenPlayer();
				this_thread::sleep_for(100ms);
			}
		});

	GThreadManager->Join();
}

 

아래와 같이 Daed Lock 상황이 발생한다 가정해보자

각각 스레드가 다음 처럼 돌아간다 

 


B 에서 A-2 로 진입시 A 의 lock 이 풀릴때까지 A-2 진입 대기
바로 지금 상황에서 A 의 lock이 풀리는지 A 에서 흐름을 따라가보면

A 에서 B-2 로 진입시 B 의 lock 이 풀릴때까지 B-2 진입 대기

하게 된다 B가 이전에 소유되었음으로 이래서 서로 풀지 못하는 Dead Lock 상황이 된다

 

=> 추적을 위해 dead lock 상황을 만든 상태이다

그리고 write 또는 read 시작 하는 순간 lock을 하는 순간 writeLockGuard 에서 typeid 로 현재 클래스의 이름을 넘겨주는데 이것이 dead lock 이 발생하는 곳을 좀더 알기 쉽게 발견하기 위한 이름이 된다

 

 

lock 을 잡는 순서를 따라 가다보면 아래와 같은 그림을 그릴 수 있다

이렇게 A 의 lock 과 B 의 lock 간의 싸이클이 형성 된다는 것을 알 수 있다

 

 

 

DFS Deadlock Detection 특징

  1. 이런 사이클이 일어나는 현상을 감지하여 Dead lock 일 발생하고 있음을 DFS 로 추적하여 알려주면 된다
  2. 거의 실행 직후 바로 감지 할 수 있다는 특징이 있다
  3. 보통 Debug 모드에서만 감지처리를 하고 shipping 모드에선 제거 하기 때문에 검출을 위한 성능 저하는 없다

 

 

 

코드는 DFS 방문 특징을 먼저 이해하는 것이 수월합니다

 

가장먼저 DFS 를 실행 하게 되는 발생 원인은

 

_history : prevId 이 전노드 기준 현재노드까지 간 내역을 history 에 담고 있게 되는 것인데 만약

만약 새로운 간선이 추가 된다면 

역순환이 있는지(DeadLock) 확인하기 위해 DFS 실행 하는 것입니다

 

_history 에는 이전 prevId 기준에서 현재  노드(lockNode_Id )가 방문 가능한 노드 중에 방문하지 못했던

노드라면 _history 에 lockNode_Id 추가하고 이렇게 되면 새로운 간선이 추가 된거이기 떄문에

DFS 실행 하여 역순환 확인을 하는 것입니다

 

CheckCycle_startDFS : DFS 시작 함수

 

 

 

알고리즘 특징

( _lockHistory(인접행렬) , _nameToId, _idToName) 는 날리지 않고 계속 보유한다

 

CheckCycle_startDFS  순환 체크 검사를 시작하면 초기화 되는 변수들은

CheckCycle_startDFS 함수 실행시 초기화 됩니다

_discoveredCount = 0;

_vecDiscoveredOrder { -1, -1 }  : 방문할때 here 인덱스에는 순서 값이 들어감

_vecParent {-1, -1}

_vecFinished {0, 0}

 

 

 

아래 그림은 기억해야 될 변수 3개와 사이클 체크시 초기화 되는 변수들을 나열하여 변화를 체크한 것입니다

 

변화 과정

 

변화 과정을 추적하다보면 문제가 발생하는 상황을 알수 있습니다

 

 

 

전체적인 흐름은

 

  1. 기존에 발견되지 않은 간선히스토리가 생기게 된다면(_lockHistory)
    DFS 를 처음노드부터 끝 노드까지 돌면서 역 순환(데드락)이 발생하는지 확인한다
  2. 역순환이 있는지 확인하기 위해 DFS 실행 CheckCycle_startDFS(); //Dfs 시작
    이때 첫 노드부터 끝 노드까지 모두다 조사한다 (비어있는 5번 노드 같은게 있을수 있기 때문에)
    1. lockCount(lock을 잡는 수=노드 수) 만큼, 즉 노드 개수 만큼 Dfs 를 재귀적으로 돌린다
    2. 상황에 따라 5번과 같은 노드가 있을수 있기 때문에(대부분 한번 DFS 가 실행 되고 난 이후엔 연결된 대부분의 노드는 한번씩 방문했다 가정한다면 방문했던걸 다시 또 방문하진 않게 된다고 이해하면 수월합니다)

 

_vecDiscoveredOrder 변수 같은 경우는 CheckCycle_startDFS 함수에서 미리 한번 초기화 한다음 dfs 로 들어가는 변수로 방문했던 적이 있었던 노드인지를 판별하며 만약 방문했었다면 dfs 탐색을 중지하는 역할을 합니다

dfs 특성상 한쪽으로 쭉 내려가는 특징이 있기 때문에 한쪽으로 쭉 방문하면서 발견 순서를 저장하게 되고

만약 탐색이 아직 끝나지 않은 노드를 방문했을때 방문순서 전과 후의 상황을 보고 역방향인지 판별합니다

 

// here가 there보다 먼저 발견되었다면, there는 here의 후손이다. (순방향 간선)
if (_vecDiscoveredOrder[here] < _vecDiscoveredOrder[there])
{

이때는 순방향 간선

continue;

}

 

// 순방향이 아니고, Dfs(there)가 아직 종료하지 않았다면, there는 here의 선조이다. (역방향 간선)
if (_vecFinished[there] == false)
{

이곳에 들어오면 역방향 간선이다

메세지 발생!

}

 

 

class DeadLockProfiler
{
public:

	//name : lock 이름
	void PushLock(const char* name);
	void PopLock(const char* name);

	//사이클이 일어나는지 판별
	void CheckCycle_startDFS();

private:
	void Dfs(int32 index);

private:

	//name 을 id로 매핑
	unordered_map<const char*, int32>	_nameToId;

	//id를 name 으로 매핑
	unordered_map<int32, const char*>	_idToName;

	//lock 이 실행되는것을 추적하기 위한 것
	stack<int32>						_lockStack;

	//n번 노드에서 n+1 번노드로 가는  간선으로 이동하는 내역
	//어떤 락이 어떤 락을 잡았는지에 대한 간선 히스토리
	map<int32, set<int32>>				_lockHistory;

	Mutex _lock;

	//사이클을 판별하기 위한 vector로 checkCycle() 에서 사용되는 벡터이다
	//방문 순서를 추적 : 노드가 발견되는 순서를 기록하는 배열
	vector<int32>	_vecDiscoveredOrder; 

	//노드가 발견된 순서를 위한 카운팅
	int32			_discoveredCount = 0; 

	//dfs(index)가 종료 되었는지 여부
	vector<bool>	_vecFinished; 

	//내가 발견된 부모가 누군지 추적을 위한 벡터
	vector<int32>	_vecParent;
};

 

void DeadLockProfiler::PushLock(const char* name)
{
	//아이디를 찾거나 발급한다
	//검출 목적이라 그냥 mutex를 사용한다, debug 시 실행되지 않을 구문이기 때문에
	LockGuard guard(_lock);		

	//현재 노드의 id 를 찾거나 발급한다.
	int32 lockNode_Id = 0;

	auto findIt = _nameToId.find(name);
	if (findIt == _nameToId.end())
	{
		//새로운 아이디를 할당한다
		lockNode_Id = static_cast<int32>(_nameToId.size());
		_nameToId[name] = lockNode_Id;	//이름을 아이디와 매핑
		_idToName[lockNode_Id] = name;	//아이디를 이름과 매핑
	}
	else
	{
		lockNode_Id = findIt->second;
	}

	// 잡고 있는 락이 있었다면
	if (_lockStack.empty() == false)
	{

		// 기존에 발견되지 않은 간선히스토리가 생기게 된다면(_lockHistory)
		//DFS 를 처음노드부터 끝 노드까지 돌면서 역 순환(데드락)이 발생하는지 확인한다

		const int32 prevId = _lockStack.top();
		if (lockNode_Id != prevId)
		{
			set<int32>& history = _lockHistory[prevId];
			if (history.find(lockNode_Id) == history.end())
			{
				//history : prevId 이 전노드 기준 현재노드까지 간 내역을 history 에 담고 있게 되는 것
				//prevId 에서 방문 가능한 노드 중에 방문하지 못했던 노드라면 현재 history 에 lockNode_Id 추가 
				//새로운 간선이 추가 된것!!
				history.insert(lockNode_Id);
				

				//그럼으로 역순환이 있는지 확인하기 위해 DFS 실행
				CheckCycle_startDFS(); //Dfs 시작
			}
		}
	}


	//현재(lockNode_Id) 와 이전 node id 를 비교하기위해 한타이밍 늦게 추가한다
	_lockStack.push(lockNode_Id);
}


//Dfs 시작 하면서 역순환이 있는지 확인한다
//첫 노드부터 끝 노드까지 모두다 조사한다
void DeadLockProfiler::CheckCycle_startDFS()
{
	const int32 lockCount = static_cast<int32>(_nameToId.size());

	//vector 인  _discoveredOrder 를 lockCount 개수 만큼 -1로 초기화
	_vecDiscoveredOrder = vector<int32>(lockCount, -1);
	_discoveredCount = 0;
	_vecFinished = vector<bool>(lockCount, false);
	_vecParent = vector<int32>(lockCount, -1);


	//lockCount 만큼, 즉 노드 개수 만큼 Dfs 를 재귀적으로 돌린다
	//혹시나 5번과 같은 노드가 있을까봐, 대부분 한번 DFS 가 실행 되고 난 이후엔 대부분의 노드는 한번씩 방문했기 마련이다(꼭 그런건 아니지만 이해를 위해)
	for (int32 lockId = 0; lockId < lockCount; lockId++)
	{
		Dfs(lockId);
	}
		

	// 연산이 끝났으면 정리한다.
	_vecDiscoveredOrder.clear();
	_vecFinished.clear();
	_vecParent.clear();
}

void DeadLockProfiler::Dfs(int32 here)
{
	//"아직 탐색이 끝나지 않은 상태"에서 발견했었던 노드라면 발견했던 노드들의 자식들은 모두 봤음으로 해당 노드는 중지시킨다
	//현재 노드가 발견 됐었던 노드라면 dfs 중지시킨다
	if (_vecDiscoveredOrder[here] != -1)
	{
		//방문했던 적이 있던 노드라면 역추적 검사 하지 않는다
		return;
	}
		
	//아직 탐색이 끝나지 않은 상태에서 발견되지 않았던 노드라면 _vecDiscoveredOrder에 현재 노드에 대한 발견 순서를 기록한다
	_vecDiscoveredOrder[here] = _discoveredCount++;

	//현재 노드 기준으로 모든 인접한 정점을 순회한다.
	auto findIt = _lockHistory.find(here);
	if (findIt == _lockHistory.end())
	{
		//현재 노드기준으로 방문했던적 이력이 아무것도 없다면 현재 노드를 처음 방문한것임으로
		//현재 노드를  방문 완료
		_vecFinished[here] = true;
		return;
	}


	//방문했던 이력이 있다면 사이클링 조사 시작
	set<int32>& nextSet = findIt->second;
	for (int32 there : nextSet)
	{
		// 아직 방문한 적이 없다면 방문한다.
		if (_vecDiscoveredOrder[there] == -1)
		{
			_vecParent[there] = here;
			Dfs(there);	//노드에 연결된 노드들 돌아가기(자식 간선들)
			continue;
		}

		//노드를 방문했던 적이 있는 노드라면 서로 순서를 따진다

		// here가 there보다 먼저 발견되었다면, there는 here의 후손이다. (순방향 간선)
		if (_vecDiscoveredOrder[here] < _vecDiscoveredOrder[there])
		{
			continue;
		}

		// 순방향이 아니고, Dfs(there)가 아직 종료하지 않았다면, there는 here의 선조이다. (역방향 간선)
		if (_vecFinished[there] == false)
		{
			printf("%s -> %s\n", _idToName[here], _idToName[there]);

			int32 now = here;
			while (true)
			{
				printf("%s -> %s\n", _idToName[_vecParent[now]], _idToName[now]);
				now = _vecParent[now];
				if (now == there)
					break;
			}

			CRASH("DEADLOCK_DETECTED");
		}
	}

	_vecFinished[here] = true;
}



void DeadLockProfiler::PopLock(const char* name)
{
	LockGuard guard(_lock);

	if (_lockStack.empty())
		CRASH("MULTIPLE_UNLOCK");

	int32 lockId = _nameToId[name];
	if (_lockStack.top() != lockId)
		CRASH("INVALID_UNLOCK");

	_lockStack.pop();
}

 

 

lock 코드에 아래 처럼 코드를 추가하면 됩니다

void Lock::WriteLock(const char* name)
{
#if _DEBUG
	GDeadLockProfiler->PushLock(name);
#endif



void Lock::WriteUnlock(const char* name)
{
#if _DEBUG
	GDeadLockProfiler->PopLock(name);
#endif


void Lock::ReadLock(const char* name)
{
#if _DEBUG
	GDeadLockProfiler->PushLock(name);
#endif



void Lock::ReadUnlock(const char* name)
{
#if _DEBUG
	GDeadLockProfiler->PopLock(name);
#endif

 

 

 

 

 

반응형

+ Recent posts