interlockedCompareExchange128  의 동작은 아래 글을 참고

 

https://3dmpengines.tistory.com/2236

 

InterlockedCompareExchange128 vs InterlockedCompareExchange64

주요 차이점 리턴 값이 다르다 128 비트를 사용하는가, 64 비트를 사용 하는가가 다르다 BOOLEAN InterlockedCompareExchange128( [in, out] LONG64 volatile *Destination, [in] LONG64 ExchangeHigh, [in] LONG6..

3dmpengines.tistory.com

 

 

 

 

 

아래는 아직 아직 ABA 을 고려하지 않은 상황

데이터안에 다음 노드가 담기는 타입의 스택컨셉입니다

 

데이터 안에 List 를 연결한 포인터를 갖고 있고 이 포인터가 노드들을 이어서 리스트를 이룹니다

즉 데이터가 바깥으로 빠져 있는 형태입니다

  

 

헤더와 데이터가 있다면 데이터와 헤더 사이에 새로 추가된 노드를 끼워 넣는 방식입니다

pop 할때는 반대로 처리 하면 됩니다

 

 

 

위의 방식은 싱글 스레드 방식이라서 이것을 멀티 스레드에서 동작하게 하려면 주의 할것은 ABA problem 이다

 

참고..

https://3dmpengines.tistory.com/2234

 

lock-free aba problem

예전에 게임 서버 포폴 만드는 공부에서 IOCP 구조로 만든 네트워크 스레드(n개)에서 받아들여온 패킷들을 Lock-Free Queue에 넣고, 게임 로직 스레드에서 꺼내다가 로직처리하도록 만들었었다. I have

3dmpengines.tistory.com

 

 

 

 

 

 

노드가 추가 될때를 우선 보면 되는데

아래 같은 경우 ABA problem 이 발생 할 수 있다

이런 경우에 interlockedCompared.. .함수에 들어가기 직전에 header 값이 삭제 됐다가 다시 할당 받았을 경우

우연히 이전 주소와 같은 값일 경우가 발생 할수가 있다 이것을 ABA problem 이라 하는데 이것을 해결 하기 위해서

주소 값만 비교한는 것이 아닌 비교 대상이 이전에 증가한적이 있는지 등의 값을 추가로 넣어 좀 더 큰 비트로 비교하여처리하게 된다

 

 

 

 

 

위와 같은 식으로 인덱스를 증가 시켜는 방식인데

비교 할때 현재 header 의 인덱스와 주소 그리고 expected 의 주소와 인덱스 값들 마저 같다면

인덱스가 증가된 disred 값으로 header 인덱스와 다음 새로운 entry 를 가리키도록 한다

 

 

InterlockedCompareExchaged128 실행 전

 

 

InterlockedCompareExchaged128 실행 이후 (성공했을 경우)

 

다른곳에서 header 가 변경 되지 않았다면 desired 값으로 header 값을 변경한다

interlockedCompareExchange128 값이 변경 됐다면 header 는 depth 과 sequence 가 1씩 증가하고 next 또한 header 의 다음 노드를 가리키게 된다, 만약 interlocked.. 이 실패하면 아무 header 는 변화 하지 않은채 변경 될때까지 시도한다

 



SListHeader 그림에 next 가 있지만
왼쪽 코드 처럼 union 안에 next 가 있다고 생각하면 됩니다

 

위 상황을 그림으로 나타내자면 

이런 상황이 됩니다 위 상황은 interlockedCompareExchange128 이 실행 되기 전까지의 과정입니다

 

interlockedCompareExchange128 이 실행 된 이후의 과정은 다음과 같습니다

 

  • Interlocked…. 128 실행하는 상황에서
    • header 와 expected 가 같다면
    • desired 값을 header 에 복사한다, 그렇지 않다면 복사가 header 에 되지 않음
    • + expected 에는 관계 없이 원래 오리지널 값이 들어갑니다

 

실행 이후의 과정

 

heaer 가 원래가리키던 곳이 아닌 그 다음을 가리킨 것을 알수 있으며 비교할때 인덱스 값들과 한번에 비교하는 연산으로 처리하여 이전에 삭제되어서 동일한 주소로 다시 생성된 것인지 또한 판별할 수 있는 상황이 되었습니다

즉 ABA 문제를 해결

 

 

아래는 코드입니다

 

함수들은 멤버 함수는 아니고 일반 함수입니다

DECLSPEC_ALIGN(16)
struct SListEntry
{
	SListEntry* next;
};

DECLSPEC_ALIGN(16)
struct SListHeader
{
	SListHeader()
	{
		alignment = 0;
		region = 0;
	}

	union
	{
		struct
		{
			uint64 alignment;
			uint64 region;
		} DUMMYSTRUCTNAME;
		struct
		{
			uint64 depth : 16;
			uint64 sequence : 48;
			uint64 reserved : 4;
			uint64 next : 60;
		} HeaderX64;
	};
};

void InitializeHead(SListHeader* header);
void PushEntrySList(SListHeader* header, SListEntry* entry);
SListEntry* PopEntrySList(SListHeader* header);

 

 

 

void InitializeHead(SListHeader* header)
{
	header->alignment = 0;
	header->region = 0;
}

void PushEntrySList(SListHeader* header, SListEntry* entry)
{
	SListHeader expected = {};
	SListHeader desired = {};
	
	// 16 바이트 정렬
	desired.HeaderX64.next = (((uint64)entry) >> 4);

	while (true)
	{
		expected = *header;

		// 이 사이에서 변경 가능성이 존재함

		entry->next = (SListEntry*)(((uint64)expected.HeaderX64.next) << 4);
		desired.HeaderX64.depth = expected.HeaderX64.depth + 1;
		desired.HeaderX64.sequence = expected.HeaderX64.sequence + 1;

		if (::InterlockedCompareExchange128((int64*)header, desired.region, desired.alignment, (int64*)&expected) == 1)
			break;
	}
}

SListEntry* PopEntrySList(SListHeader* header)
{
	SListHeader expected = {};
	SListHeader desired = {};
	SListEntry* entry = nullptr;

	while (true)
	{
		expected = *header;

		entry = (SListEntry*)(((uint64)expected.HeaderX64.next) << 4);
		if (entry == nullptr)
			break;

		// Use-After-Free
		desired.HeaderX64.next = ((uint64)entry->next) >> 4;
		desired.HeaderX64.depth = expected.HeaderX64.depth - 1;
		desired.HeaderX64.sequence = expected.HeaderX64.sequence + 1;

		if (::InterlockedCompareExchange128((int64*)header, desired.region, desired.alignment, (int64*)&expected) == 1)
			break;
	}

	return entry;
}

 

 

 

 

 

 

//테스트 해볼 코드

 

while (true)
{
        Data* pop = nullptr;
        pop = (Data*)PopEntrySList(GHeader);
                    //여기서 pop 한것을 아래에서 삭제 하려고 할때 이미 다른 스레드에 의해서
                    //이 포인터터가 물고 있던 대상이 이미 삭제 되었을 수도 있음, 하지만 다루진 않음

        if (pop)
        {
                cout << pop->_rand << endl;
                delete pop;
        }
        else

        ...

}

 

 

 

테스트할 Data 또한 16 바이트여야 함으로 16 바이트 ALIGN 을 해주고

 

 

ASSERT_CRASH(((uint64)GHeader % 16) == 0);

이 구문으로 해당 바이트가 16 바이트의 배수인지 => 16바이트인지 체크 해볼 수 있습니다

 

DECLSPEC_ALIGN(16)
class Data // : public SListEntry
{
public:
	SListEntry _entry;
	int64 _rand = rand() % 1000;
};

SListHeader* GHeader;

int main()
{
	GHeader = new SListHeader();
	ASSERT_CRASH(((uint64)GHeader % 16) == 0);
	InitializeHead(GHeader);

	for (int32 i = 0; i < 3; i++)
	{
		GThreadManager->Launch([]()
			{
				while (true)
				{
					Data* data = new Data();
					ASSERT_CRASH(((uint64)data % 16) == 0);

					PushEntrySList(GHeader, (SListEntry*)data);
					this_thread::sleep_for(10ms);
				}
			});
	}

	for (int32 i = 0; i < 2; i++)
	{
		GThreadManager->Launch([]()
			{
				while (true)
				{
					Data* pop = nullptr;
					pop = (Data*)PopEntrySList(GHeader);
                    //여기서 pop 한것을 아래에서 삭제 하려고 할때 이미 다른 스레드에 의해서
                    //이 포인터터가 물고 있던 대상이 이미 삭제 되었을 수도 있음, 하지만 다루진 않음

					if (pop)
					{
						cout << pop->_rand << endl;
						delete pop;
					}
					else
					{
						cout << "NONE" << endl;
					}
				}
			});
	}

	GThreadManager->Join();
}

 

 

lock free 는 직접 구현해 쓰기에는 모든 경우의 수를 예측하긴 어렵기 때문에 잘 검증된 것을 사용하는것이 났습니다

lock free 를 사용하려면 안정 검증까진 대규모 테스트 및 많은 수행 착오를 거처야 합니다

 

 

반응형

+ Recent posts