와와

[ 씹어먹는 C++ ] 쓰레드 풀 만들기 본문

카테고리 없음

[ 씹어먹는 C++ ] 쓰레드 풀 만들기

정으주 2024. 10. 26. 19:00

https://modoocode.com/285

 

씹어먹는 C++ - <15 - 5. C++ 쓰레드풀(ThreadPool) 만들기>

모두의 코드 씹어먹는 C++ - <15 - 5. C++ 쓰레드풀(ThreadPool) 만들기> 작성일 : 2019-05-19 이 글은 57869 번 읽혔습니다. 이번 강좌에서는에 대해 다룹니다.안녕하세요 여러분! 이번 강좌에서는 여태까지

modoocode.com

 

 

쓰레드풀이란, 쓰레드들을 위한 직업 소개소와 같은 개념이다.

여러 개의 쓰레드들이 대기하고 있다가, 할 일이 들어오게 되면, 대기하고 있던 쓰레드들 중 하나가 이를 받아서 실행하게 된다.

 

 

쓰레드를 보관할 장소

// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;

 

 

할 일들을 보관할 장소 ( 큐 사용. 가장 오래 전에 추가된 작업을 쉽게 알아내기 위해 )

// 할일들을 보관하는 job 큐.
std::queue<std::function<void()>> jobs_;

 

 

queue 는 멀티 쓰레드 환경에서 안전하지 않기 때문에 이 queue  race condition 에서 보호할 장치들이 필요하다.

std::condition_variable cv_job_q_;
std::mutex m_job_q_;

 

 

Worker 쓰레드들을 종료시킬 조건을 나타내는 멤버 변수

// 모든 쓰레드 종료
bool stop_all;

 

 

 

생성자

num_threads 만큼의 쓰레드 생성

 이 때 각 쓰레드들은 ThreadPool 에 정의된 WorkerThread 함수를 실행하게 된다.

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
      worker_threads_.reserve(num_threads_);
      for (size_t i = 0; i < num_threads_; ++i) {
        	worker_threads_.emplace_back([this]() { this->WorkerThread(); });
      }
}

 

 

 ThreadPool::WorkerThread()

반복문으로 job이 생길 때까지 대기하다가 job이 생기면 처리

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

 

이때, wait을 통해 일이 생기거나 stop_all이 될 때까지 대기한다.

일이 생기면 가장 앞의 일을 꺼내 수행

stop_all이라면 종료한다.

 

 

작업 추가

void ThreadPool::EnqueueJob(std::function<void()> job) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push(std::move(job));
  }
  cv_job_q_.notify_one();
}

 

job을 추가하고 공유 변수를 통해 스레드 하나를 깨운다.

 

 

소멸자

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

 

stop_all을 켜주고, 모든 스레드들을 꺠운다.

각 스레드에서는 wait의 조건을 다시 검사한 뒤, 아래 조건문을 통해 return 됨

소멸자에서는 모든 스레드들에 대해 join으로 대기한다.

 

 

전체 코드

#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {
class ThreadPool {
 public:
  ThreadPool(size_t num_threads);
  ~ThreadPool();

  // job 을 추가한다.
  void EnqueueJob(std::function<void()> job);

 private:
  // 총 Worker 쓰레드의 개수.
  size_t num_threads_;
  // Worker 쓰레드를 보관하는 벡터.
  std::vector<std::thread> worker_threads_;
  // 할일들을 보관하는 job 큐.
  std::queue<std::function<void()>> jobs_;
  // 위의 job 큐를 위한 cv 와 m.
  std::condition_variable cv_job_q_;
  std::mutex m_job_q_;

  // 모든 쓰레드 종료
  bool stop_all;

  // Worker 쓰레드
  void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

void ThreadPool::EnqueueJob(std::function<void()> job) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push(std::move(job));
  }
  cv_job_q_.notify_one();
}

}  // namespace ThreadPool

void work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
}

int main() {
  ThreadPool::ThreadPool pool(3);

  for (int i = 0; i < 10; i++) {
    pool.EnqueueJob([i]() { work(i % 3 + 1, i); });
  }
}

 

 

 

임의의 함수 받기

 

packagedtask 사용하여 일을 추가할 때 (EnqueueJob()) future 를 반환하여 저장하고 있다가 값을 돌려받는다.

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
  F f, Args... args) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }

  using return_type = typename std::result_of<F(Args...)>::type;
  std::packaged_task<return_type()> job(std::bind(f, args...));

  std::future<return_type> job_result_future = job.get_future();
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push([&job]() { job(); });
  }
  cv_job_q_.notify_one();

  return job_result_future;
}

 

 

shared_ptr  packaged_task 를 보관하여 job을 실행하는 시점에서도 packed_task 객체가 계속 살아있도록 한다.

make_shared 를 통해서 shared_ptr 을 생성하였고, 대신에 람다 함수에 shared_ptr 의 복사본을 전달해서 람다 함수 안에서도 packaged_task  shared_ptr 하나를 붙들고 있음

auto job =
  std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...));
std::future<return_type> job_result_future = job->get_future();
{
  std::lock_guard<std::mutex> lock(m_job_q_);
  jobs_.push([job]() { (*job)(); });
}

 

 

완벽한 전달

 

아래와 같이 인자들의 복사본을 받지 않고

ThreadPool::EnqueueJob(F f, Args... args);

 

인자들을 우측값 레퍼런스로 바꾼다.

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
  F&& f, Args&&... args);

 

bind 함수에 forward 로 인자를 전달하여 Enqueue 함수에 인자들을 완벽히 전달할 수 있게함

auto job = std::make_shared<std::packaged_task<return_type()>>(
  std::bind(std::forward<F>(f), std::forward<Args>(args)...));

 

 

최최종

#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {
class ThreadPool {
 public:
  ThreadPool(size_t num_threads);
  ~ThreadPool();

  // job 을 추가한다.
  template <class F, class... Args>
  std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
    F&& f, Args&&... args);

 private:
  // 총 Worker 쓰레드의 개수.
  size_t num_threads_;
  // Worker 쓰레드를 보관하는 벡터.
  std::vector<std::thread> worker_threads_;
  // 할일들을 보관하는 job 큐.
  std::queue<std::function<void()>> jobs_;
  // 위의 job 큐를 위한 cv 와 m.
  std::condition_variable cv_job_q_;
  std::mutex m_job_q_;

  // 모든 쓰레드 종료
  bool stop_all;

  // Worker 쓰레드
  void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
  F&& f, Args&&... args) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }

  using return_type = typename std::result_of<F(Args...)>::type;
  auto job = std::make_shared<std::packaged_task<return_type()>>(
    std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  std::future<return_type> job_result_future = job->get_future();
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push([job]() { (*job)(); });
  }
  cv_job_q_.notify_one();

  return job_result_future;
}

}  // namespace ThreadPool

// 사용 예시
int work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
  return t + id;
}

int main() {
  ThreadPool::ThreadPool pool(3);

  std::vector<std::future<int>> futures;
  for (int i = 0; i < 10; i++) {
    futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
  }
  for (auto& f : futures) {
    printf("result : %d \n", f.get());
  }
}