pool

包括了sql连接池和执行任务的线程池。

SqlConnRAII

这里使用RAII来管理和获取pool池资源,使得在构造时获取资源,在析构时释放资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

#ifndef SQLCONNRAII_H
#define SQLCONNRAII_H
#include "sqlconnpool.h"
#include <cassert>
#include <mysql/mysql.h>

class SqlConnRAII {
public:
SqlConnRAII(MYSQL **sql, SqlConnPool *connPool) {
assert(connPool);
//不能直接将指针赋值给另一个指针,因为指针本身就是一个值,而不是一个引用。
*sql = connPool->GetConn();
sql_ = *sql;
connpool_ = connPool;
}
~SqlConnRAII() {
if (sql_) {
connpool_->FreeConn(sql_);
}
}

private:
MYSQL *sql_;
SqlConnPool *connpool_;
};

#endif

SqlConnpool.h

这里的单例模式保证只有一个sql连接队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

#ifndef SQLCONNPOOL_H
#define SQLCONNPOOL_H

#include <mutex>
#include <queue>
#include <string>
#include <thread>

#include <mysql/mysql.h>
#include <semaphore.h> //POSIX信号量库

#include "../log/log.h"

class SqlConnPool {
public:
static SqlConnPool *Instance();

MYSQL *GetConn();
void FreeConn(MYSQL *conn);
int GetFreeConnCount();
void Init(const char *host, int port, const char *user, const char *pwd,
const char *dbName, int connSize);
void ClosePool();

private:
SqlConnPool();
~SqlConnPool();

private:
int MAX_CONN_;
int useCount_;
int freeCount_;

std::queue<MYSQL *> connQue_;
std::mutex mtx_;
sem_t semId_;
};

#endif

SqlConnpool.cpp

因为DB操作往往较慢,且频繁连接与断开数据库池不仅会引起大量开销,还可能引发异常。因此这次我们采用队列来建立数据库连接池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

#include "sqlconnpool.h"


SqlConnPool *SqlConnPool::Instance() {
static SqlConnPool connPool;
return &connPool;
}

MYSQL *SqlConnPool::GetConn() {
MYSQL *sql = nullptr;
if (connQue_.empty()) {
LOG_ERROR("SqlConnPool busy!");
return nullptr;
}
sem_wait(&semId_);
{
std::lock_guard<std::mutex> locker(mtx_);
sql = connQue_.front();
connQue_.pop();
}
return sql;
}
void SqlConnPool::FreeConn(MYSQL *conn) {
assert(conn);
std::lock_guard<std::mutex> locker(mtx_);
connQue_.push(conn);
sem_post(&semId_);
}
int SqlConnPool::GetFreeConnCount() {
std::lock_guard<std::mutex> locker(mtx_);
return connQue_.size();
}
void SqlConnPool::Init(const char *host, int port, const char *user,
const char *pwd, const char *dbName, int connSize = 10) {
assert(connSize > 0);
for (int i = 0; i < connSize; i++) {
MYSQL *sql = nullptr;
sql = mysql_init(sql);
if (!sql) {
LOG_ERROR("MySql init error!");
assert(sql); //因为无法初始化就无法正常执行数据库连接,因此需要输出错误位置结束程序
}
sql = mysql_real_connect(sql, host, user, pwd, dbName, port, nullptr, 0);
if (!sql) {
LOG_ERROR("MySql Connect error!");
}
connQue_.push(sql);
}
MAX_CONN_ = connSize;
sem_init(&semId_, 0, MAX_CONN_);
}
void SqlConnPool::ClosePool() {
std::lock_guard<std::mutex> locker(mtx_);
while (!connQue_.empty()) {
auto item = connQue_.front();
connQue_.pop();
mysql_close(item);
}
mysql_library_end(); //释放mysql资源,防止内存泄露
}

SqlConnPool::SqlConnPool() : useCount_(0), freeCount_(0) {}

SqlConnPool::~SqlConnPool() { ClosePool(); }

threadpool.cpp

  • 主线程通过将读写的操作交由线程来提升效率。

  • make_sharedshared_ptr的区别

    • make_shared 通常更高效,因为它减少了内存分配的次数。

    • make_shared 提供了异常安全的保障,当构造对象和 shared_ptr 的过程中抛出异常时,不会产生内存泄漏。

    • shared_ptr 可以管理任何动态分配的内存,而 make_shared 用于在栈上自动创建对象。

    • 使用 shared_ptr 时,可以更灵活地选择内存分配器和自定义删除器,而 make_shared 则没有这些灵活性。

  • 这里我们通过使用移动语义来避免复制操作,这篇文章详细解释了什么是万能引用以及完美转发是如何实现与万能引用的联动引用折叠和完美转发 - 知乎 (zhihu.com)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>

#include <assert.h>
#include <utility>

class ThreadPool {
public:
explicit ThreadPool(size_t threadCount = 8)
: pool_(std::make_shared<Pool>()) {
assert(threadCount > 0);
for (size_t i = 0; i < threadCount; i++) {
std::thread([pool = pool_] {
std::unique_lock<std::mutex> locker(pool->mtx);
while (true) {
if (!pool->tasks.empty()) {
auto task = std::move(pool->tasks.front());
pool->tasks.pop();
locker.unlock();
task();
locker.lock();
} else if (pool->isClosed)
break;
else {
pool->cond.wait(locker);
}
}
}).detach();
}
}

//生成默认构造函数和移动构造函数
ThreadPool() = default;
ThreadPool(ThreadPool &&) = default;

~ThreadPool() {
if (pool_ != nullptr) {
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->isClosed = true;
}
pool_->cond.notify_all();
}

template <class F> void AddTask(F &&task) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->tasks.emplace(std::forward<F>(task));
}
pool_->cond.notify_one();
}

private:
struct Pool {
std::mutex mtx;
std::condition_variable cond;
bool isClosed;
std::queue<std::function<void()>> tasks;
};
std::shared_ptr<Pool> pool_;
};

#endif