일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- 게임개발
- unorderedset
- photon
- Vector
- NotFoundException: String resource ID #0x0
- 바이너리세마포
- Unity
- 안드로이드스튜디오
- C++
- unorderedmap
- Java
- 광유다
- registerForActivityResult
- mutex
- ARface
- 유니티슈팅게임
- StartActivityForResult
- 포톤
- map
- unityAR
- 동기화
- 유니티
- 뮤텍스
- 스핀락
- 지크슈
- semaphore
- SpinLock
- list
- 세마포
- dependencyResilutionManagement
- Today
- Total
와와
[ 씹어먹는 C++ ] 쓰레드 풀 만들기 본문
씹어먹는 C++ - <15 - 5. C++ 쓰레드풀(ThreadPool) 만들기>
모두의 코드 씹어먹는 C++ - <15 - 5. C++ 쓰레드풀(ThreadPool) 만들기> 작성일 : 2019-05-19 이 글은 57869 번 읽혔습니다. 이번 강좌에서는에 대해 다룹니다.안녕하세요 여러분! 이번 강좌에서는 여태까지
modoocode.com
쓰레드풀이란, 쓰레드들을 위한 직업 소개소와 같은 개념이다.
여러 개의 쓰레드들이 대기하고 있다가, 할 일이 들어오게 되면, 대기하고 있던 쓰레드들 중 하나가 이를 받아서 실행하게 된다.
쓰레드를 보관할 장소
// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;
할 일들을 보관할 장소 ( 큐 사용. 가장 오래 전에 추가된 작업을 쉽게 알아내기 위해 )
// 할일들을 보관하는 job 큐.
std::queue<std::function<void()>> jobs_;
queue 는 멀티 쓰레드 환경에서 안전하지 않기 때문에 이 queue 를 race condition 에서 보호할 장치들이 필요하다.
std::condition_variable cv_job_q_;
std::mutex m_job_q_;
Worker 쓰레드들을 종료시킬 조건을 나타내는 멤버 변수
// 모든 쓰레드 종료
bool stop_all;
생성자
num_threads 만큼의 쓰레드 생성
이 때 각 쓰레드들은 ThreadPool 에 정의된 WorkerThread 함수를 실행하게 된다.
ThreadPool::ThreadPool(size_t num_threads)
: num_threads_(num_threads), stop_all(false) {
worker_threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i) {
worker_threads_.emplace_back([this]() { this->WorkerThread(); });
}
}
ThreadPool::WorkerThread()
반복문으로 job이 생길 때까지 대기하다가 job이 생기면 처리
void ThreadPool::WorkerThread() {
while (true) {
std::unique_lock<std::mutex> lock(m_job_q_);
cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
if (stop_all && this->jobs_.empty()) {
return;
}
// 맨 앞의 job 을 뺀다.
std::function<void()> job = std::move(jobs_.front());
jobs_.pop();
lock.unlock();
// 해당 job 을 수행한다 :)
job();
}
}
이때, wait을 통해 일이 생기거나 stop_all이 될 때까지 대기한다.
일이 생기면 가장 앞의 일을 꺼내 수행
stop_all이라면 종료한다.
작업 추가
void ThreadPool::EnqueueJob(std::function<void()> job) {
if (stop_all) {
throw std::runtime_error("ThreadPool 사용 중지됨");
}
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push(std::move(job));
}
cv_job_q_.notify_one();
}
job을 추가하고 공유 변수를 통해 스레드 하나를 깨운다.
소멸자
ThreadPool::~ThreadPool() {
stop_all = true;
cv_job_q_.notify_all();
for (auto& t : worker_threads_) {
t.join();
}
}
stop_all을 켜주고, 모든 스레드들을 꺠운다.
각 스레드에서는 wait의 조건을 다시 검사한 뒤, 아래 조건문을 통해 return 됨
소멸자에서는 모든 스레드들에 대해 join으로 대기한다.
전체 코드
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
namespace ThreadPool {
class ThreadPool {
public:
ThreadPool(size_t num_threads);
~ThreadPool();
// job 을 추가한다.
void EnqueueJob(std::function<void()> job);
private:
// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;
// 할일들을 보관하는 job 큐.
std::queue<std::function<void()>> jobs_;
// 위의 job 큐를 위한 cv 와 m.
std::condition_variable cv_job_q_;
std::mutex m_job_q_;
// 모든 쓰레드 종료
bool stop_all;
// Worker 쓰레드
void WorkerThread();
};
ThreadPool::ThreadPool(size_t num_threads)
: num_threads_(num_threads), stop_all(false) {
worker_threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i) {
worker_threads_.emplace_back([this]() { this->WorkerThread(); });
}
}
void ThreadPool::WorkerThread() {
while (true) {
std::unique_lock<std::mutex> lock(m_job_q_);
cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
if (stop_all && this->jobs_.empty()) {
return;
}
// 맨 앞의 job 을 뺀다.
std::function<void()> job = std::move(jobs_.front());
jobs_.pop();
lock.unlock();
// 해당 job 을 수행한다 :)
job();
}
}
ThreadPool::~ThreadPool() {
stop_all = true;
cv_job_q_.notify_all();
for (auto& t : worker_threads_) {
t.join();
}
}
void ThreadPool::EnqueueJob(std::function<void()> job) {
if (stop_all) {
throw std::runtime_error("ThreadPool 사용 중지됨");
}
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push(std::move(job));
}
cv_job_q_.notify_one();
}
} // namespace ThreadPool
void work(int t, int id) {
printf("%d start \n", id);
std::this_thread::sleep_for(std::chrono::seconds(t));
printf("%d end after %ds\n", id, t);
}
int main() {
ThreadPool::ThreadPool pool(3);
for (int i = 0; i < 10; i++) {
pool.EnqueueJob([i]() { work(i % 3 + 1, i); });
}
}
임의의 함수 받기
packagedtask 사용하여 일을 추가할 때 (EnqueueJob()) future 를 반환하여 저장하고 있다가 값을 돌려받는다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
F f, Args... args) {
if (stop_all) {
throw std::runtime_error("ThreadPool 사용 중지됨");
}
using return_type = typename std::result_of<F(Args...)>::type;
std::packaged_task<return_type()> job(std::bind(f, args...));
std::future<return_type> job_result_future = job.get_future();
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push([&job]() { job(); });
}
cv_job_q_.notify_one();
return job_result_future;
}
shared_ptr 에 packaged_task 를 보관하여 job을 실행하는 시점에서도 packed_task 객체가 계속 살아있도록 한다.
make_shared 를 통해서 shared_ptr 을 생성하였고, 대신에 람다 함수에 shared_ptr 의 복사본을 전달해서 람다 함수 안에서도 packaged_task 의 shared_ptr 하나를 붙들고 있음
auto job =
std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...));
std::future<return_type> job_result_future = job->get_future();
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push([job]() { (*job)(); });
}
완벽한 전달
아래와 같이 인자들의 복사본을 받지 않고
ThreadPool::EnqueueJob(F f, Args... args);
인자들을 우측값 레퍼런스로 바꾼다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
F&& f, Args&&... args);
bind 함수에 forward 로 인자를 전달하여 Enqueue 함수에 인자들을 완벽히 전달할 수 있게함
auto job = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
최최종
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
namespace ThreadPool {
class ThreadPool {
public:
ThreadPool(size_t num_threads);
~ThreadPool();
// job 을 추가한다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
F&& f, Args&&... args);
private:
// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;
// 할일들을 보관하는 job 큐.
std::queue<std::function<void()>> jobs_;
// 위의 job 큐를 위한 cv 와 m.
std::condition_variable cv_job_q_;
std::mutex m_job_q_;
// 모든 쓰레드 종료
bool stop_all;
// Worker 쓰레드
void WorkerThread();
};
ThreadPool::ThreadPool(size_t num_threads)
: num_threads_(num_threads), stop_all(false) {
worker_threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i) {
worker_threads_.emplace_back([this]() { this->WorkerThread(); });
}
}
void ThreadPool::WorkerThread() {
while (true) {
std::unique_lock<std::mutex> lock(m_job_q_);
cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
if (stop_all && this->jobs_.empty()) {
return;
}
// 맨 앞의 job 을 뺀다.
std::function<void()> job = std::move(jobs_.front());
jobs_.pop();
lock.unlock();
// 해당 job 을 수행한다 :)
job();
}
}
ThreadPool::~ThreadPool() {
stop_all = true;
cv_job_q_.notify_all();
for (auto& t : worker_threads_) {
t.join();
}
}
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
F&& f, Args&&... args) {
if (stop_all) {
throw std::runtime_error("ThreadPool 사용 중지됨");
}
using return_type = typename std::result_of<F(Args...)>::type;
auto job = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> job_result_future = job->get_future();
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push([job]() { (*job)(); });
}
cv_job_q_.notify_one();
return job_result_future;
}
} // namespace ThreadPool
// 사용 예시
int work(int t, int id) {
printf("%d start \n", id);
std::this_thread::sleep_for(std::chrono::seconds(t));
printf("%d end after %ds\n", id, t);
return t + id;
}
int main() {
ThreadPool::ThreadPool pool(3);
std::vector<std::future<int>> futures;
for (int i = 0; i < 10; i++) {
futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
}
for (auto& f : futures) {
printf("result : %d \n", f.get());
}
}