Log

日志系统是各大项目的基石,可以帮我们进行调试、错误定位。当然,看着日志系统产生一行行输出也是一种享受。

今天发现代码补全有点问题,经过一番搜索,发现在wsl2上使用clangd比自带的补全强多了,强推(虽然最后发现是模板写错了)!

阻塞队列

  • 同步变量

    • 线程在某些条件得到满足之前挂起
    • notify_one:随机选择一个线程唤醒。
    • notify_all:唤醒所有在条件变量上等待的线程,可能会引起惊群效应
  • 互斥锁:防止多个线程同时访问共享资源。

  • C++11多线程之互斥量(mutex)与条件变量(condition_variable)_c++ 条件变量和互斥-CSDN博客

  • 生产者消费者模型:生产者线程通过 push_backpush_front 向队列中添加元素,消费者线程通过 pop 从队列中取出元素。条件变量的使用确保了生产者在队列已满时会等待,消费者在队列为空时会等待,并且在有元素可取时唤醒。

  • 模板:通过使用模板在编译时生成代码,使得各种数据结构都能使用该阻塞队列。

  • 阻塞队列是一种非常有用的工具,它可以简化生产者-消费者模式的实现,提高系统资源的利用率,并实现线程同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200

#ifndef BLOCKQUEUE_H
#define BLOCKQUEUE_H

#include <chrono>
#include <mutex> //互斥量
#include <deque> //双端队列
#include <condition_variable> //条件变量
#include <sys/time.h>
#include <assert.h>
//template class<T> //写错了,找了半天代码补全的问题
template <class T>
class BlockDeque{
public:
explicit BlockDeque(size_t MaxCapacity = 1000);

~BlockDeque();

void clear();

bool empty();

bool full();

void Close();

ssize_t size();

ssize_t capacity();

T front();

T back();

void push_back(const T &item);

void push_front(const T &item);

bool pop(T &item);

bool pop(T &item, int timeout);

void flush();

private:
std::deque<T> deq_;

size_t capacity_;

std::mutex mtx_;

bool isClose_;

std::condition_variable condConsumer_;

std::condition_variable condProducer_;
};

template<class T>
BlockDeque<T>::BlockDeque(size_t MaxCapacity)
:capacity_(MaxCapacity)
{
assert(MaxCapacity > 0);
isClose_ = false;
}

template<class T>
BlockDeque<T>::~BlockDeque()
{
Close();
}

template<class T>
void BlockDeque<T>::clear()
{
std::lock_guard<std::mutex> locker(mtx_);
deq_.clear();
}

template<class T>
bool BlockDeque<T>::empty()
{
std::lock_guard<std::mutex> locker(mtx_);
return deq_.empty();
}

template<class T>
bool BlockDeque<T>::full()
{
std::lock_guard<std::mutex> locker(mtx_);
// if(deq_.size()==capacity_)
// return true;
// return false;
return deq_.size() >= capacity_;
}

template<class T>
void BlockDeque<T>::Close()
{
{
std::lock_guard<std::mutex> locker(mtx_);
deq_.clear();
isClose_ = true;
}
condProducer_.notify_all();
condConsumer_.notify_all();
}

template<class T>
ssize_t BlockDeque<T>::size()
{
std::lock_guard<std::mutex> locker(mtx_);
return deq_.size();
}

template<class T>
ssize_t BlockDeque<T>::capacity()
{
std::lock_guard<std::mutex> locker(mtx_);
return capacity_;
}

template<class T>
T BlockDeque<T>::front()
{
std::lock_guard<std::mutex> locker(mtx_);
return deq_.front();
}

template<class T>
T BlockDeque<T>::back()
{
std::lock_guard<std::mutex> locker(mtx_);
return deq_.back();
}

template<class T>
void BlockDeque<T>::push_back(const T &item)
{
//考虑容量问题
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.size() >= capacity_)
{
condProducer_.wait(locker);
}
deq_.push_back(item);
condConsumer_.notify_one();
}

template<class T>
void BlockDeque<T>::push_front(const T &item)
{
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.size() >= capacity_)
{
condProducer_.wait(locker);
}
deq_.push_front(item);
condConsumer_.notify_one();
}

template<class T>
bool BlockDeque<T>::pop(T &item)
{
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.empty())
{
condConsumer_.wait(locker);
if(isClose_) return false;
}
item = deq_.front(); //注意先取队列再弹出;
deq_.pop_front();
condProducer_.notify_one();
return true;
}

template<class T>
bool BlockDeque<T>::pop(T &item, int timeout)
{
std::unique_lock<std::mutex> locker(mtx_);
while(deq_.empty())
{
if(condConsumer_.wait_for(locker,std::chrono::seconds(timeout))
== std::cv_status::timeout) return false;

if(isClose_) return false;
}
item = deq_.front(); //注意先取队列再弹出;
deq_.pop_front();
condProducer_.notify_one();
return true;
}

template<class T>
void BlockDeque<T>::flush()
{
condConsumer_.notify_one();
}

#endif

LoG

  • 这里我们使用阻塞队列,由于日志通常是异步执行,且日志操作由于涉及IO操作往往较慢。当日志产生速度远远大于日志写入速度时,阻塞队列可以作为一个缓冲区,暂时存储大量的日志消息。

  • 由于有时候我们需要自定义格式化字符串,传入的参数不定,那么,我们就需要在如cpp实现可变参数:

    • cpp方法(C++11),使用可变参数模板。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    #include <iostream>
    template <typename... Args>
    void print(const Args&... args)
    {
    (std::cout << ... << args) << std::endl;;
    }
    int main()
    {
    print(1,2,3,4,5);
    return 0;
    }
  • c语言方法,使用va_list机制来实现可变参数。Log同时使用了c语言的这两种方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    #include <stdio.h>
    #include <stdarg.h>

    //通过宏实现
    #define MACRO_LOG(format, ...) printf("[MACRO_LOG] "); printf(format, ##__VA_ARGS__)

    //通过va_list实现
    void valist_log(const char* format,...)
    {
    va_list args;
    va_start(args, format);
    printf("[valist_log] ");
    vprintf(format,args);
    va_end(args);
    }

    int main() {
    int x = 10;
    double y = 3.14;
    char *str = "Hello, World!";

    MACRO_LOG("x = %d, y = %.2f, str = %s\n", x, y, str);


    valist_log("x = %d, y = %.2f, str = %s\n", x, y, str);

    return 0;
    }
  • 为什么pop要检查close变量而push不用?
    当调用close时,会清空队列。push进的元素马上就被消费者吃掉。而pop会因为队列无元素而产生异常。

  • snprintfvsnprintf

    • 使用 snprintf 时,你需要提前定义好格式化字符串,并将其作为固定参数传递给 snprintf
    • 使用 vsnprintf 时,你可以直接在调用时传递格式化字符串和可变参数列表,不需要提前定义格式化字符串。

LOG_H

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

#ifndef LOG_H
#define LOG_H

#include <bits/types/FILE.h>
#include <memory>
#include <mutex>
#include <string>
#include <thread>

#include <string.h>
#include <sys/time.h>
#include <stdarg.h> //提供了C标准库中的宏va_start, va_end等,用于处理可变参数函数。
#include <assert.h>
#include <sys/stat.h> //linux文件系统头文件

#include "../buffer/buffer.h"
#include "./blockqueue.h"

class Log{
public:
void init(int level = 1, const char* path = "./log",
const char* suffix = ".log",
int maxQueueCapacity = 1024);
static Log* Instance(); //单例模式
static void FlushLogThread();

void write(int level, const char *format, ...);
void flush();

int GetLevel();
void SetLevel(int level);
bool IsOpen() { return isOpen_; }
private:
Log();
void AppendLogLevelTitle_(int level);
virtual ~Log(); //TODO
void AsyncWrite_();

private:
static const int LOG_PATH_LEN = 256;
static const int LOG_NAME_LEN = 256;
static const int MAX_LINES = 50000;

const char* path_;
const char* suffix_;

int MAX_LINES_;

int lineCount_;
int toDay_;

bool isOpen_;

Buffer buff_;
int level_;
bool isAsync_;

FILE* fp_;
std::unique_ptr<BlockDeque<std::string>> deque_;
std::unique_ptr<std::thread> writeThread_;
std::mutex mtx_;
};

#define LOG_BASE(level, format, ...) \
do {\
Log* log = Log::Instance();\
if (log->IsOpen() && log->GetLevel() <= level) {\
log->write(level, format, ##__VA_ARGS__); \
log->flush();\
}\
} while(0);

//TODO
#define LOG_DEBUG(format, ...) do {LOG_BASE(0, format, ##__VA_ARGS__)} while(0);
#define LOG_INFO(format, ...) do {LOG_BASE(1, format, ##__VA_ARGS__)} while(0);
#define LOG_WARN(format, ...) do {LOG_BASE(2, format, ##__VA_ARGS__)} while(0);
#define LOG_ERROR(format, ...) do {LOG_BASE(3, format, ##__VA_ARGS__)} while(0);

#endif

LOG_CPP

  • 这里使用了移动操作来提高效率
1
2
3
4
5
6
7
8
std::unique_ptr<BlockDeque<std::string>>	newDequeue(new BlockDeque<std::string>);
deque_ = move(newDequeue); //move避免不必要的对象复制
std::unique_ptr<std::thread> newThread(new std::thread(FlushLogThread)); // 创建一个新的线程对象并使用函数 FlushLogThread 初始化它。
writeThread_ = move(newThread); //move避免不必要的对象复制

//下面是错误的,原来的资源并未释放
//std::unique_ptr<BlockDeque<std::string>> deque_(new BlockDeque<std::string>);
//std::unique_ptr<std::thread> writeThread_(new std::thread(FlushLogThread));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199

#include "log.h"


void Log::init(int level, const char *path, const char *suffix,
int maxQueueCapacity) {
isOpen_ = true;
level_ = level;
//处理是同步日志还是异步日志
if (maxQueueCapacity > 0) {
isAsync_ = true;
if (!deque_) {
std::unique_ptr<BlockDeque<std::string>> newDequeue(
new BlockDeque<std::string>);
deque_ = move(newDequeue);
std::unique_ptr<std::thread> newThread(new std::thread(FlushLogThread));
writeThread_ = move(newThread);
}
} else {
isAsync_ = false;
}

lineCount_ = 0;
time_t timer = std::time(nullptr);
struct tm *sysTime = localtime(&timer);
struct tm t = *sysTime;
path_ = path;
suffix_ = suffix;

char fileName[LOG_NAME_LEN] = {0};
snprintf(fileName, LOG_NAME_LEN, "%s/%04d_%02d_%02d%s", path_,
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, suffix_);
toDay_ = t.tm_mday;

{
std::lock_guard<std::mutex> locker(mtx_);
buff_.RetrieveAll(); //清空缓冲区
if (fp_) //关闭上次未关闭的文件描述符
{
flush();
fclose(fp_);
}

fp_ = fopen(fileName, "a");
if (fp_ == nullptr) {
mkdir(path_, 0777);
fp_ = fopen(fileName, "a");
}

assert(fp_ != nullptr);
}
}


Log *Log::Instance() {
static Log inst;
return &inst;
}

void Log::FlushLogThread() {
//切换为同步模式写完日志
Log::Instance()->AsyncWrite_();
}

void Log::write(int level, const char *format, ...) {
struct timeval now = {0, 0};
//通过将 now 初始化为 {0, 0},表示当前时间被设置为 Epoch(1970-01-01 00:00:00
//UTC)。
gettimeofday(&now, nullptr);
time_t tSec = now.tv_sec;
struct tm *systime = localtime(&tSec);
struct tm t = *systime;

va_list vaList;

//若日志日期或日志长度超过,我们需要重建文件
if (toDay_ != t.tm_mday || (lineCount_ && (lineCount_ % MAX_LINES == 0))) {
std::unique_lock<std::mutex> locker(mtx_);
locker.unlock();
char newfileName[LOG_NAME_LEN];
char tail[36] = {0};
snprintf(tail, 36, "%04d_%02d_%02d", t.tm_year + 1900, t.tm_mon + 1,
t.tm_mday);

if (toDay_ != t.tm_mday) {
snprintf(newfileName, LOG_NAME_LEN - 72, "%s/%s%s", path_, tail, suffix_);
toDay_ = t.tm_mday;
lineCount_ = 0;
} else {
snprintf(newfileName, LOG_NAME_LEN - 72, "%s/%s-%d%s", path_, tail,
(lineCount_ / MAX_LINES), suffix_);
}

locker.lock();
flush();
fclose(fp_);
fp_ = fopen(newfileName, "a");
assert(fp_ != nullptr);
}

{
std::unique_lock<std::mutex> locker(mtx_);
lineCount_++;
int n =
snprintf(buff_.BeginWrite(), 128, "%d-%02d-%02d %02d:%02d:%02d.%06ld",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min,
t.tm_sec, now.tv_usec);
buff_.HasWritten(n);
AppendLogLevelTitle_(level_);

va_start(vaList, format);
int m =
vsnprintf(buff_.BeginWrite(), buff_.WritableBytes(), format, vaList);
va_end(vaList);
buff_.HasWritten(m);
buff_.Append("\n\0", 2);
if (isAsync_ && deque_ && !deque_->full()) {
deque_->push_back(buff_.RetrieveAllToStr());
} else {
fputs(buff_.Peek(), fp_);

}
buff_.RetrieveAll();
}

// TODO这里频繁开关文件的原因
//及时更新日志,防止webserver意外终止时日志丢失
}
void Log::flush() {
if (isAsync_) {
deque_->flush();
}
fflush(fp_); //清空流
}

int Log::GetLevel() {
std::unique_lock<std::mutex> locker(mtx_);
return level_;
}
void Log::SetLevel(int level) {
std::unique_lock<std::mutex> locker(mtx_);
level_ = level;
}

// bool IsOpen() { return isOpen_; }

Log::Log() {
lineCount_ = 0;
toDay_ = 0;
isAsync_ = false;
fp_ = nullptr;
deque_ = nullptr;
writeThread_ = nullptr;
}

void Log::AppendLogLevelTitle_(int level) {

switch (level) {
case 0:
buff_.Append("[debug]: ", 9);
break;
case 1:
buff_.Append("[info] : ", 9);
break;
case 2:
buff_.Append("[warn] : ", 9);
break;
case 3:
buff_.Append("[error]: ", 9);
break;
default:
buff_.Append("[info] : ", 9);
break;
}
}

Log::~Log() {
if (writeThread_ && writeThread_->joinable()) {
std::lock_guard<std::mutex> locker(mtx_);
while (!deque_->empty()) {
deque_->flush();
}
deque_->Close();
writeThread_->join();
}
if (fp_) {
std::lock_guard<std::mutex> locker(mtx_);
flush();
fclose(fp_);
}
}
void Log::AsyncWrite_() {
std::string str = "";
while (deque_->pop(str)) {
std::lock_guard<std::mutex> locker(mtx_);
fputs(str.c_str(), fp_);
}
}