와와

[ 씹어먹는 C++ ] C++의 멀티스레딩 본문

개발/C++

[ 씹어먹는 C++ ] C++의 멀티스레딩

정으주 2024. 10. 13. 01:28

C++의 멀티스레딩

 

 

1. 프로세스와 스레드

 

프로세스는 실행 중인 프로그램의 인스턴스이다. 각 프로세스는 독립된 메모리 공간을 가집니다.

반면 스레드는 프로세스 내에서 실행되는 작업의 단위로, 같은 프로세스 내의 스레드들은 메모리를 공유한다.

 

2. 왜 멀티스레드 프로그래밍을 할까?

  • 병렬화 가능한 작업: 예를 들어, 1부터 10000까지의 합을 계산할 때, 이를 여러 스레드로 나누어 계산하면 더 빠르게 결과를 얻을 수 있다.
  • 대기 시간이 긴 작업: 네트워크 요청과 같이 대기 시간이 긴 작업을 별도의 스레드에서 처리하면, 메인 스레드가 블로킹되지 않고 다른 작업을 계속할 수 있다.

 

3. C++에서 스레드 생성하기

C++11 부터 표준 라이브러리에 thread가 추가되어 쉽게 스레드를 생성할 수 있다.

#include <iostream>
#include <thread>

void func1() {
  for (int i = 0; i < 10; i++) {
    std::cout << "스레드 1 작동중! \n";
  }
}

int main() {
  std::thread t1(func1);
  t1.join();
  return 0;
}
  • 스레드 객체 생성: std::thread 생성자가 호출되는 순간, 새로운 스레드가 생성되고 실행 시작
  • 즉시 실행: 생성자에 전달된 함수가 새 스레드에서 즉시 실행됨
  • 비동기 실행: 메인 스레드는 새 스레드를 생성한 후 즉시 다음 코드 라인으로 넘어감. 즉, 새 스레드의 실행을 기다리지 않다.

 

4. join vs detach

위의 스레드 실행 코드에서 볼 수 있듯이

t1.join()은 t1 스레드가 종료될 때까지 메인 스레드가 기다리게 한다.

detach()는 쓰레드를 실행 시킨 후, 잊어버린다. 대신 쓰레드는 알아서 백그라운드에서 돌아가다가 프로세스가 종료되면 종료된다.

 

 

5. 스레드에 인자 전달하기

스레드 생성 시 함수에 인자를 전달할 수 있다.

// 예제: 1부터 9999까지의 합을 4개의 스레드로 나누어 계산

void worker(std::vector<int>::iterator start, std::vector<int>::iterator end, int* result) {
  int sum = 0;
  for (auto itr = start; itr < end; ++itr) {
    sum += *itr;
  }
  *result = sum;
}

int main() {
  std::vector<int> data(10000);
  for (int i = 0; i < 10000; i++) {
    data[i] = i;
  }

  std::vector<int> partial_sums(4);
  std::vector<std::thread> workers;

  for (int i = 0; i < 4; i++) {
    workers.push_back(std::thread(worker, data.begin() + i * 2500,
                                  data.begin() + (i + 1) * 2500, &partial_sums[i]));
  }

  for (auto& t : workers) {
    t.join();
  }

  int total = 0;
  for (int sum : partial_sums) {
    total += sum;
  }
  std::cout << "전체 합 : " << total << std::endl;
}

 

 

주의사항

멀티스레드 프로그래밍에서는 여러 스레드가 동시에 같은 메모리에 접근할 때 주의가 필요하다.

 

 

 


 

 

뮤텍스와 조건 변수

 

 

1. Race Condition

Race Condition은 여러 스레드가 공유 데이터에 동시에 접근할 때 발생할 수 있는 문제다. 스레드의 실행 순서나 타이밍에 따라 프로그램의 결과가 달라질 수 있는 상황을 말한다.

 

#include <iostream>
#include <thread>
#include <vector>

void worker(int& counter) {
  for (int i = 0; i < 10000; i++) {
    counter += 1;
  }
}

int main() {
  int counter = 0;

  std::vector<std::thread> workers;
  for (int i = 0; i < 4; i++) {
    workers.push_back(std::thread(worker, std::ref(counter)));
  }

  for (int i = 0; i < 4; i++) {
    workers[i].join();
  }

  std::cout << "Counter 최종 값 : " << counter << std::endl;
}

 

 

이 코드에서는 4개의 스레드가 동시에 counter를 증가시키려 한다. 이상적으로는 counter의 최종 값이 40000이 되어야 하지만, 실제로는 그보다 작은 값이 나온다.

 

 

왜일까?!

 

이는 counter += 1 연산이 아토믹하지 않기 때문이다. 이 연산은 실제로 다음과 같은 단계로 이루어진다:

  1. counter의 현재 값을 읽는다.
  2. 읽은 값에 1을 더한다.
  3. 결과를 다시 counter에 저장한다.

여러 스레드가 동시에 이 과정을 수행하면, 어떤 스레드의 작업 결과가 다른 스레드에 의해 덮어씌워질 수 있다.



2. 뮤텍스 (Mutex)

뮤텍스(Mutual Exclusion)는 Race Condition을 해결하기 위한 동기화 도구다. 뮤텍스는 한 번에 하나의 스레드만 특정 코드 영역(임계 영역)에 접근할 수 있도록 보장한다.

#include <mutex>

std::mutex m;

void worker(int& result, std::mutex& m) {
  for (int i = 0; i < 10000; i++) {
    m.lock();
    result += 1;
    m.unlock();
  }
}

 

이 코드에서 m.lock()과 m.unlock() 사이의 영역이 임계 영역이 된다. 한 스레드가 이 영역에 있을 때 다른 스레드는 이 영역에 진입할 수 없다.

 

- lock_guard

void worker(int& result, std::mutex& m) {
  for (int i = 0; i < 10000; i++) {
    std::lock_guard<std::mutex> lock(m);
    result += 1;
  }
}

 

lock()과 unlock()을 직접 호출하는 것은 실수의 여지가 있다. C++에서는 RAII(Resource Acquisition Is Initialization) 원칙에 따라 lock_guard를 제공한다.

 

lock_guard는 생성될 때 뮤텍스를 잠그고, 소멸될 때 자동으로 뮤텍스를 해제한다.

 

 

 

3. 데드락 (Deadlock)

데드락은 두 개 이상의 스레드가 서로가 가진 자원을 기다리며 무한히 대기하는 상태를 말한다. 이는 프로그램이 더 이상 진행되지 못하고 멈추는 심각한 문제를 야기한다.

 

void worker1(std::mutex& m1, std::mutex& m2) {
  std::lock_guard<std::mutex> lock1(m1);
  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  std::lock_guard<std::mutex> lock2(m2);
  // Do something
}

void worker2(std::mutex& m1, std::mutex& m2) {
  std::lock_guard<std::mutex> lock2(m2);
  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  std::lock_guard<std::mutex> lock1(m1);
  // Do something
}

 

이 코드에서 worker1과 worker2가 동시에 실행되면, 각각 하나의 뮤텍스를 잠근 후 다른 뮤텍스를 기다리게 되어 데드락이 발생할 수 있다.

 

데드락 방지 방법...

  1. 중첩된 Lock을 사용하는 것을 피해라
  2. Lock을 소유하고 있을 때 유저 코드를 호출하는 것을 피해라
  3. Lock들을 언제나 정해진 순서로 획득해라

 

 

4. 생산자-소비자 패턴

생산자-소비자 패턴은 멀티스레딩 프로그래밍에서 자주 사용되는 디자인 패턴이다.

생산자 스레드는 데이터를 생성하고, 소비자 스레드는 그 데이터를 처리한다.

 

#include <queue>
#include <mutex>
#include <condition_variable>

std::queue<std::string> downloaded_pages;
std::mutex m;
std::condition_variable cv;

void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,
              int index, std::condition_variable* cv) {
  for (int i = 0; i < 5; i++) {
    std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
    std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" +
                          std::to_string(index) + ")\n";

    m->lock();
    downloaded_pages->push(content);
    m->unlock();

    cv->notify_one();
  }
}

void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,
              int* num_processed, std::condition_variable* cv) {
  while (*num_processed < 25) {
    std::unique_lock<std::mutex> lk(*m);

    cv->wait(
        lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });

    if (*num_processed == 25) {
      lk.unlock();
      return;
    }

    std::string content = downloaded_pages->front();
    downloaded_pages->pop();

    (*num_processed)++;
    lk.unlock();

    std::cout << content;
    std::this_thread::sleep_for(std::chrono::milliseconds(80));
  }
}

 

 

- condition_variable 사용

condition_variable은 스레드 간 통신을 위해 사용된다. 생산자는 데이터를 생성한 후 notify_one()을 호출하여 대기 중인 소비자 중 하나를 깨운다. 소비자는 wait()을 통해 데이터가 있을 때까지 대기한다.

 

- unique_lock 사용

unique_lock은 lock_guard보다 더 유연한 잠금 메커니즘을 제공한다. condition_variable의 wait()은 unique_lock을 인자로 받아, 대기 중에는 잠금을 해제하고 깨어날 때 다시 잠금을 획득한다.

 

 


 

atomic 객체와 명령어 재배치

 

1. CPU와 메모리

 

메모리 접근 속도

CPU가 메모리에 접근하는 데 걸리는 시간은 생각보다 길다. 예를 들어, 인텔의 i7-6700 CPU의 경우 메모리에서 데이터를 읽어오는 데 최소 42 사이클이 걸린다. 이는 CPU가 42번의 덧셈을 수행할 수 있는 시간이다.

 

CPU 캐시

이런 문제를 해결하기 위해 CPU는 캐시(Cache)를 사용한다. 캐시는 CPU 칩 안에 있는 작은 메모리로, 주 메모리보다 훨씬 빠르게 접근할 수 있다.

캐시는 보통 여러 레벨로 구성된다:

  • L1 캐시: 가장 빠르지만 크기가 작음 (예: 32KB)
  • L2 캐시: L1보다 조금 느리지만 더 큼 (예: 256KB)
  • L3 캐시: 가장 크지만 가장 느림 (예: 8MB)

L1 캐시 접근은 4 사이클, L2는 12 사이클, L3는 36 사이클 정도 걸린다.

 

캐시의 작동 방식

CPU는 메모리를 읽을 때 해당 데이터를 캐시에 저장한다. 만약 캐시가 가득 찼다면, 가장 오래된 데이터를 제거하고 새 데이터를 저장한다. 이를 LRU(Least Recently Used) 방식이라고 한다.

예를 들어, 다음 두 코드 중 어느 것이 더 빠를까?

// 첫번째
for (int i = 0; i < 10000; i++) {
  for (int j = 0; j < 10000; j++) {
    s += data[j];
  }
}

// 두번째
for (int j = 0; j < 10000; j++) {
  for (int i = 0; i < 10000; i++) {
    s += data[j];
  }
}

 

두 번째 코드가 더 빠르다. 첫 번째 코드는 매 반복마다 새로운 데이터를 읽어와야 하지만, 두 번째 코드는 같은 데이터를 계속 사용하므로 캐시 히트율이 높다.

 

 

2. 컴파일러와 CPU의 최적화

 

컴파일러의 최적화

컴파일러는 코드의 성능을 높이기 위해 다양한 최적화를 수행한다. 이 과정에서 코드의 실행 순서가 바뀔 수 있다.

예를 들어

int a = 0;
int b = 0;

void foo() {
  a = b + 1;
  b = 1;
}

 

이 코드는 다음과 같이 최적화될 수 있다

mov rax, qword ptr [rbp - 8]
mov ecx, dword ptr [rax]
add ecx, 1
mov dword ptr [rax], ecx

 

무려 b = 1가 a = b + 1보다 먼저! 실행된다.

 

CPU의 최적화

CPU도 명령어 실행 순서를 변경할 수 있다. 

a = 1;  // 캐시에 없음
b = 1;  // 캐시에 있음

 

이런 경우 CPU는 b = 1을 먼저 실행할 수 있다. 왜냐하면 b가 이미 캐시에 있어 빠르게 처리할 수 있기 때문이다.

 

3. 원자성(Atomicity)

멀티스레드 환경에서는 여러 스레드가 동시에 같은 데이터에 접근할 수 있다. 이때 데이터의 일관성을 유지하기 위해 원자적 연산이 필요하다.

 

std::atomic

C++에서는 std::atomic을 통해 원자적 연산을 제공한다.

#include <atomic>
#include <iostream>
#include <thread>
#include <vector>

void worker(std::atomic<int>& counter) {
  for (int i = 0; i < 10000; i++) {
    counter++;
  }
}

int main() {
  std::atomic<int> counter(0);

  std::vector<std::thread> workers;
  for (int i = 0; i < 4; i++) {
    workers.push_back(std::thread(worker, std::ref(counter)));
  }

  for (int i = 0; i < 4; i++) {
    workers[i].join();
  }

  std::cout << "Counter 최종 값 : " << counter << std::endl;
}

 

이 코드는 락 없이도 정확히 40000을 출력한다.

 

4. 메모리 순서(Memory Order)

memory_order_relaxed

가장 느슨한 메모리 순서다. 다른 메모리 접근과의 순서를 보장하지 않는다.

std::atomic<int> counter(0);
counter.fetch_add(1, std::memory_order_relaxed);

 

 

memory_order_acquire와 memory_order_release

release는 이전의 모든 메모리 접근이 이후로 재배치되는 것을 막고, acquire는 이후의 모든 메모리 접근이 이전으로 재배치되는 것을 막는다.

std::atomic<bool> is_ready(false);
int data;

// Thread 1
data = 42;
is_ready.store(true, std::memory_order_release);

// Thread 2
while (!is_ready.load(std::memory_order_acquire)) {}
// 여기서 data는 반드시 42

 

memory_order_acq_rel

acquire와 release를 모두 수행한다.

 

memory_order_seq_cst

가장 강력한 메모리 순서로, 모든 스레드에서 동일한 순서를 관찰할 수 있다.

atomic 객체를 사용할 때,  memory_order를 지정해주지 않는다면 디폴트로 memory_order_seq_cst가 지정이 된다. 비용이 큰 연산이니 필요할 때만 사용하는 것이 좋다.

 

 


 

비동기 연산을 위한 도구들

 

1. 동기 vs 비동기

 

동기적 실행은 한 작업이 끝날 때까지 다음 작업으로 넘어가지 않는 반면, 비동기적 실행은 여러 작업을 동시에 진행할 수 있게 해준다.

 

2. std::promise & std::future

C++에서 비동기 프로그래밍을 위한 기본 도구로 std::promise와 std::future를 제공한다.

 

#include <future>
#include <iostream>
#include <thread>
using std::string;

void worker(std::promise<string>* p) {
  p->set_value("some data");
}

int main() {
  std::promise<string> p;
  std::future<string> data = p.get_future();
  std::thread t(worker, &p);
  
  data.wait();
  std::cout << "받은 데이터 : " << data.get() << std::endl;
  t.join();
}

 

promise는 결과를 설정하고, future는 그 결과를 받아온다. future.wait()은 결과가 준비될 때까지 기다린다.

future.get() 도 결과를 기다렸다가 가져옴

 

 

- 예외 전달

promise와 future는 예외도 전달할 수 있다.

void worker(std::promise<string>* p) {
  try {
    throw std::runtime_error("Some Error!");
  } catch (...) {
    p->set_exception(std::current_exception());
  }
}

int main() {
  std::promise<string> p;
  std::future<string> data = p.get_future();
  std::thread t(worker, &p);
  
  try {
    data.get();
  } catch (const std::exception& e) {
    std::cout << "예외 : " << e.what() << std::endl;
  }
  t.join();
}

 

 

3. std::shared_future

shared_future를 사용하면 여러 스레드에서 동일한 결과를 기다릴 수 있다.

void runner(std::shared_future<void> start) {
  start.get();
  std::cout << "출발!" << std::endl;
}

int main() {
  std::promise<void> p;
  std::shared_future<void> start = p.get_future();

  thread t1(runner, start);
  thread t2(runner, start);
  thread t3(runner, start);
  thread t4(runner, start);

  p.set_value();

  t1.join(); t2.join(); t3.join(); t4.join();
}

 

4.  std::packaged_task

packaged_task는 함수의 실행과 결과 처리를 캡슐화한다

int some_task(int x) { return 10 + x; }

int main() {
  std::packaged_task<int(int)> task(some_task);
  std::future<int> start = task.get_future();
  std::thread t(std::move(task), 5);
  std::cout << "결과값 : " << start.get() << std::endl;
  t.join();
}

 

전달된 함수를 실행해서, 그 함수의 리턴값을 promise에 설정한다.

참고로 packaged_task는 복사 생성이 불가능하므로 명시적으로 move 해줘야만 한다.

굳이 promise를 전달하지 않아도 알아서 함수의 리턴값을 처리해줘서 매우 편리함

 

5. std::async

std::async는 함수를 전달하면 쓰레드를 알아서 만든 뒤 해당 함수를 비동기적으로 실행하고, 그 결과값을 future에 전달한다.

int sum(const std::vector<int>& v, int start, int end) {
  int total = 0;
  for (int i = start; i < end; ++i) {
    total += v[i];
  }
  return total;
}

int parallel_sum(const std::vector<int>& v) {
  std::future<int> lower_half_future =
    std::async(std::launch::async, sum, std::cref(v), 0, v.size() / 2);

  int upper_half = sum(v, v.size() / 2, v.size());

  return lower_half_future.get() + upper_half;
}

 

첫번째 인자로 어떤 형태로 함수를 실행할지를 전달하는데

1. std::launch::async : 바로 쓰레드를 생성해서 인자로 전달된 함수를 실행한다.

2. std::launch::deferred : future의 get 함수가 호출되었을 때 실행한다. (당장 비동기적으로 실행할 필요가 없다면)