Buffer

这里的Buffer类是参考陈硕大佬的Linux多线程服务端编程

首先,我们为什么要设计buffer:

  1. 减少系统调用,频繁从用户态切换到内核态,会浪费大量的cpu时间。
  2. 流量控制,使得网络通信的速度和处理速度之间能够更好地匹配。
  3. 数据完整性在网络传输中,数据可能会分多次到达。缓冲区可以将这些分片的数据收集完整,然后再通知应用程序处理完整的数据。

那么,我们要尽可能的在一次系统调用中读取最多的数据,减少系统调用。同时,我们还需要考虑内存问题,让buffer缓冲区又不至于占用过多内存。

由于调用Buffer的用户是线程安全或只有一个线程对Buffer进行操作,且buffer内的操作不会产生竞态。也就是说我们的Buffer不是线程安全的,线程安全性的责任被传递给了调用者

关键图片,Buffer的构造

muduo库里,由bufferextrabuffer组成接收数据的缓冲区,其中extrabuffer是局部变量,用于存储额外的数据,尽可能读取更多数据。当extrabuffer有数据时,我们把他append到vector里。

muduo库buffer的快速了解,建议认真阅读muduo学习笔记:net部分之实现TCP网络编程库-Buffer_muduo::net::buffer-CSDN博客

buffer.h

为了锻炼自己的编程能力,我们作为cv工程师就直接先来写.h头文件,再自己实现.cpp文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#ifndef BUFFER_H
#define BUFFER_H

#include<iostream>
//#include<string>
#include<cstring> //perror
#include<vector>
#include<atomic>


#include<unistd.h>
#include<sys/uio.h> //iovec
#include<assert.h>

class Buffer {
public:
Buffer(int InitBufferSize = 1024);
~Buffer() = default;
//采用下标而不是指针,因为缓冲区重新分配会使vector重新分配内存
size_t WritableBytes() const;
size_t ReadableBytes() const;
size_t PrependableBytes() const;

const char* Peek() const; //返回可读的实际地址
void EnsureWriteable(size_t len);
void HasWritten(size_t len);

void Retrieve(size_t len);
void RetrieveUntil(const char* end);
void RetrieveAll();
std::string RetrieveAllToStr();

//有两个版本
const char* BeginWriteConst() const;
char * BeginWrite();

void Append(const std::string& str);
void Append(const char* str, size_t len);
void Append(const void* data, size_t len);
void Append(const Buffer& buff);

ssize_t ReadFd(int fd, int* SaveErrno); //从socket读取数据
ssize_t WriteFd(int fd, int* SaveErrno); //写入数据socket

private:
char* BeginPtr_();
const char* BeginPtr_() const;
void MakeSpace_(size_t len);

std::vector<char> buffer_;
std::atomic<std::size_t> readPos_;
std::atomic<std::size_t> writePos_;
};


#endif

buffer.c

在编写.cpp文件时:

  • 大部分函数声明 =

    Buffer+作用域符(::)

    {
    看buffer构造图写代码;

    }

  • 记得assert()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169

#include "buffer.h"

Buffer::Buffer(int InitBufferSize)
: buffer_(InitBufferSize), readPos_(0), writePos_(0) {}

size_t Buffer::WritableBytes() const
{
return buffer_.size() - Buffer::writePos_;
}
size_t Buffer::ReadableBytes() const
{
return writePos_ - readPos_;
}
size_t Buffer::PrependableBytes() const
{
return readPos_;
}

const char *Buffer::Peek() const
{
return BeginPtr_() + readPos_;
}

void Buffer::EnsureWriteable(size_t len)
{
if (WritableBytes() < len)
MakeSpace_(len);
// 写下断言确保分配成功
assert(WritableBytes() >= len);
}
void Buffer::HasWritten(size_t len)
{
writePos_ = writePos_ + len;
}

void Buffer::Retrieve(size_t len)
{
assert(len <= ReadableBytes());
readPos_ = readPos_ + len;
}
void Buffer::RetrieveUntil(const char *end)
{
// assert(end -Peek() <= ReadableBytes()); //原项目有这个,但是这里应该是不用的,因为我们可以判断可读和可写的具体数据在内存中的位置。
// readPos_ += end -Peek();
assert(Peek() <= end);
Retrieve(end - Peek());
}
void Buffer::RetrieveAll()
{
// bzero(&buffer_[0], buffer_.size());
readPos_ = writePos_ = 0;
}

std::string Buffer::RetrieveAllToStr()
{
// std::string str(Peek(), writePos_ - readPos_);
// readPos_ = writePos_ = 0;
std::string str(Peek(), ReadableBytes());
RetrieveAll();
return str;
}

const char *Buffer::BeginWriteConst() const
{
return BeginPtr_() + writePos_;
}
char *Buffer::BeginWrite()
{
return BeginPtr_() + writePos_;
}

void Buffer::Append(const std::string &str)
{
Append(str.data(), str.length());
// c_str() 函数返回的指针可以确保指向以 null 结尾的字符串,适用于传递给需要以 C 风格字符串为参数的函数,而 data() 函数返回的指针可能不以 null 结尾,适用于需要访问字符串数据但不需要 null 结尾的场景。
// 一般来说data() 比 c_str() 更快,因为data()不需要创建副本,在buffer这种追求性能的场景中,我们使用data();
}

void Buffer::Append(const char *str, size_t len)
{
assert(str);
EnsureWriteable(len);
std::copy(str, str + len, BeginWrite());
HasWritten(len);
}

void Buffer::Append(const void *data, size_t len)
{
// std::copy(static_cast<const char *>(data),Peek(), len);
// HasWritten(len);
assert(data);
Append(static_cast<const char *>(data), len);
}

void Buffer::Append(const Buffer &buff)
{
// std::copy(buff.Peek() + readPos_,Peek(),buff.writePos_ - buff.readPos_);
// HasWritten(buff.writePos_ - buff.readPos_);
Append(buff.Peek(), buff.ReadableBytes());
}

ssize_t Buffer::ReadFd(int fd, int *SaveErrno) // 从socket读取数据
{
char buff[65535];
struct iovec iov[2]; // iovec 是一个在 POSIX 兼容系统中的结构体,用于在一次系统调用中读写多个非连续的内存区域。
const size_t writable = WritableBytes();
iov[0].iov_base = BeginPtr_() + writePos_;
iov[0].iov_len = writable;
iov[1].iov_base = &buff;
iov[1].iov_len = sizeof(buff);

const ssize_t len = readv(fd, iov, 2);
if (len <= 0)
{
*SaveErrno = errno; // 保存错误码,防止错误码被覆盖
}
else if (static_cast<size_t>(len) <= writable)
{
HasWritten(len);
}
else
{
writePos_ = buffer_.size();
Buffer::Append(buff, len - writable);
}
return len;
}

ssize_t Buffer::WriteFd(int fd, int *SaveErrno) // 写入数据socket
{

ssize_t len = write(fd, Peek(), ReadableBytes());
if (len <= 0)
{
*SaveErrno = errno; // 保存错误码,防止错误码被覆盖
return len;
}
Retrieve(len);
return len;
}

char *Buffer::BeginPtr_()
{
return &*buffer_.begin();
}
const char *Buffer::BeginPtr_() const
{
return &*buffer_.begin();
}
void Buffer::MakeSpace_(size_t len)
{ // 分为移动readable数据能找到空闲空间和一定需要分配空间两种情况
if (WritableBytes() + PrependableBytes() < len)
{
buffer_.resize(writePos_ + len + 1); // vector从0开始,要+1
}
else
{
size_t readable = ReadableBytes();
std::copy(Peek(), Peek() + readable, BeginPtr_());
readPos_ = 0;
writePos_ = readPos_ + readable;
assert(readable == ReadableBytes());
}
}
// std::vector<char> buffer_;
// std::atomic<std::size_t> readPos_;
// std::atomic<std::size_t> writePos_;