pool
包括了sql
连接池和执行任务的线程
池。
SqlConnRAII
这里使用RAII来管理和获取pool池资源,使得在构造时获取资源,在析构时释放资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| #ifndef SQLCONNRAII_H #define SQLCONNRAII_H #include "sqlconnpool.h" #include <cassert> #include <mysql/mysql.h>
class SqlConnRAII { public: SqlConnRAII(MYSQL **sql, SqlConnPool *connPool) { assert(connPool); *sql = connPool->GetConn(); sql_ = *sql; connpool_ = connPool; } ~SqlConnRAII() { if (sql_) { connpool_->FreeConn(sql_); } }
private: MYSQL *sql_; SqlConnPool *connpool_; };
#endif
|
SqlConnpool.h
这里的单例模式保证只有一个sql连接队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| #ifndef SQLCONNPOOL_H #define SQLCONNPOOL_H
#include <mutex> #include <queue> #include <string> #include <thread>
#include <mysql/mysql.h> #include <semaphore.h>
#include "../log/log.h"
class SqlConnPool { public: static SqlConnPool *Instance();
MYSQL *GetConn(); void FreeConn(MYSQL *conn); int GetFreeConnCount(); void Init(const char *host, int port, const char *user, const char *pwd, const char *dbName, int connSize); void ClosePool();
private: SqlConnPool(); ~SqlConnPool();
private: int MAX_CONN_; int useCount_; int freeCount_;
std::queue<MYSQL *> connQue_; std::mutex mtx_; sem_t semId_; };
#endif
|
SqlConnpool.cpp
因为DB操作往往较慢,且频繁连接与断开数据库池不仅会引起大量开销,还可能引发异常。因此这次我们采用队列来建立数据库连接池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| #include "sqlconnpool.h"
SqlConnPool *SqlConnPool::Instance() { static SqlConnPool connPool; return &connPool; }
MYSQL *SqlConnPool::GetConn() { MYSQL *sql = nullptr; if (connQue_.empty()) { LOG_ERROR("SqlConnPool busy!"); return nullptr; } sem_wait(&semId_); { std::lock_guard<std::mutex> locker(mtx_); sql = connQue_.front(); connQue_.pop(); } return sql; } void SqlConnPool::FreeConn(MYSQL *conn) { assert(conn); std::lock_guard<std::mutex> locker(mtx_); connQue_.push(conn); sem_post(&semId_); } int SqlConnPool::GetFreeConnCount() { std::lock_guard<std::mutex> locker(mtx_); return connQue_.size(); } void SqlConnPool::Init(const char *host, int port, const char *user, const char *pwd, const char *dbName, int connSize = 10) { assert(connSize > 0); for (int i = 0; i < connSize; i++) { MYSQL *sql = nullptr; sql = mysql_init(sql); if (!sql) { LOG_ERROR("MySql init error!"); assert(sql); } sql = mysql_real_connect(sql, host, user, pwd, dbName, port, nullptr, 0); if (!sql) { LOG_ERROR("MySql Connect error!"); } connQue_.push(sql); } MAX_CONN_ = connSize; sem_init(&semId_, 0, MAX_CONN_); } void SqlConnPool::ClosePool() { std::lock_guard<std::mutex> locker(mtx_); while (!connQue_.empty()) { auto item = connQue_.front(); connQue_.pop(); mysql_close(item); } mysql_library_end(); }
SqlConnPool::SqlConnPool() : useCount_(0), freeCount_(0) {}
SqlConnPool::~SqlConnPool() { ClosePool(); }
|
threadpool.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| #ifndef THREADPOOL_H #define THREADPOOL_H
#include <condition_variable> #include <functional> #include <mutex> #include <queue> #include <thread>
#include <assert.h> #include <utility>
class ThreadPool { public: explicit ThreadPool(size_t threadCount = 8) : pool_(std::make_shared<Pool>()) { assert(threadCount > 0); for (size_t i = 0; i < threadCount; i++) { std::thread([pool = pool_] { std::unique_lock<std::mutex> locker(pool->mtx); while (true) { if (!pool->tasks.empty()) { auto task = std::move(pool->tasks.front()); pool->tasks.pop(); locker.unlock(); task(); locker.lock(); } else if (pool->isClosed) break; else { pool->cond.wait(locker); } } }).detach(); } }
ThreadPool() = default; ThreadPool(ThreadPool &&) = default;
~ThreadPool() { if (pool_ != nullptr) { std::lock_guard<std::mutex> locker(pool_->mtx); pool_->isClosed = true; } pool_->cond.notify_all(); }
template <class F> void AddTask(F &&task) { { std::lock_guard<std::mutex> locker(pool_->mtx); pool_->tasks.emplace(std::forward<F>(task)); } pool_->cond.notify_one(); }
private: struct Pool { std::mutex mtx; std::condition_variable cond; bool isClosed; std::queue<std::function<void()>> tasks; }; std::shared_ptr<Pool> pool_; };
#endif
|