admin管理员组文章数量:1584216
文章目录
- 一、为什么用户层需要设计网络缓冲区
- 1)设计读缓冲区的必要
- 2)设计写缓冲区的必要
- 3)普通buf+pos的问题
- 4)ringbuffer(只解决了普通buf移动频繁的问题)
- (1)头文件
- (2).c文件
- 二、ringbuffer以及多线程换将下数据安全思考
- 1)1读1写->内存屏障
- 2) 多线程(多读多写->需要加锁且加内存屏障)
- 3)不需要加内存屏障的情况(加锁加了pthread_spinlock,自旋锁就不加)
- 三、libevent中数据缓冲Bufferevent的设计
- 1)背景介绍(bufferevent\evbuffer\evbuffer_chain)
- 2)解释
- 3)源码展示
- 使用场景举例
kFIFO、 无锁队列、网络缓冲区设计
一、为什么用户层需要设计网络缓冲区
1)设计读缓冲区的必要
因为TCP无法判定从内核缓冲区读出来的数据包是一个完整的数据包,得用特殊字符来分隔数据包
2)设计写缓冲区的必要
如果写缓冲区数据满了(linux内核发送数据比我们往缓冲区写数据慢),写失败,那就要把数据在用户层缓存起来
3)普通buf+pos的问题
1)数据移动比较频繁
2)用户层缓冲区设置的是16k(固定分配的内存,基本要求:消费者速度必须得快于生产者)
4)ringbuffer(只解决了普通buf移动频繁的问题)
(1)头文件
#ifndef _RINGBUFFER_H__
#define _RINGBUFFER_H__
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <stdio.h>
#define min(lth , rth) ((lth) < (rth) ? (lth): (rth))
typedef struct
{
uint8_t* buf; //buffer
uint32_t size; //大小
uint32_t read_pos; //读的位置,也是循环的缓冲区,从0到2^32-1
uint32_t write_pos; //写的位置
}ringbuffer_t;
//接口总结:1.初始化,2.判断是否为空,3.判断满,4.释放内存
//基础接口
void rb_init(ringbuffer_t* t ,uint32_t sz);
int32_t rb_is_empty(ringbuffer_t* t);
int32_t rb_is_full(ringbuffer_t* t);
void rb_free(ringbuffer_t* t);
uint32_t rb_length(ringbuffer_t* t);
uint32_t rb_reamin(ringbuffer_t* t);
//读写数据接口
uint32_t rb_write(ringbuffer_t* dst_t,void* src_buf,uint32_t src_sz);
uint32_t rb_read(ringbuffer_t* src_t,void* dst_buf,uint32_t dst_sz);
//转换成2的n次幂
static inline uint32_t roundup_power_of_two(uint32_t sz)
{
if(sz < 2) return sz = 2;
//5 -> 8 101 -> 1000
int i = 0;
for(i = 0;sz != 0;++i)
sz>>1;
return 1 << i;
}
//判断是否是2的n次幂
static inline int32_t is_power_of_two(uint32_t sz)
{
if(sz < 2) return -1;
//m%n = m & (n-1)
//m%n = m - n * floor(m/n)
return (sz & (sz - 1)) == 0;
}
#endif
(2).c文件
#include "ringbuffer.h"
int32_t rb_is_empty(ringbuffer_t* t)
{
return t->read_pos == t->write_pos;
}
int32_t rb_is_full(ringbuffer_t* t)
{
//优化前
//return (t->read_pos+t->size)%t->size == t->write_pos;
//优化前提:
//1)缓冲区的大小必须是2的n次幂 (UInt3_t大小为2^32-1,范围是0到2^32 -1)
return t->size == t->write_pos - t->read_pos;
}
void rb_init(ringbuffer_t* t ,uint32_t sz)
{
t->buf = (uint8_t*)malloc(sz*sizeof(uint8_t*));
t->size = sz;
t->read_pos = t->write_pos = UINT32_MAX -2;
return;
}
void rb_free(ringbuffer_t* t)
{
if(t->buf)
{
free(t->buf);
t->buf = NULL;
}
t->read_pos = t->write_pos = t->size = 0;
return;
}
uint32_t rb_length(ringbuffer_t* t)
{
return t->write_pos - t->read_pos;
}
uint32_t rb_reamin(ringbuffer_t* t)
{
return t->size - rb_length(t);
}
uint32_t rb_write(ringbuffer_t* dst_t,void* src_buf,uint32_t src_sz)
{
if(src_sz > rb_reamin(dst_t))
return -1;
//dst_t->write_pos & (dst_t->size - 1) 把写坐标弄到这个圆环的范围之内
//看哪个剩余空间最小
//安全的拷贝
uint32_t i = min(src_sz,dst_t->size - dst_t->write_pos & (dst_t->size - 1));
//先拷贝后面的,再拷贝前面的
memcpy(dst_t->buf + (dst_t->write_pos & (dst_t->size - 1)), src_buf,i);
memcpy(dst_t->buf,src_buf + i,src_sz - i);//这里src_sz - i是负数是不会拷贝的
dst_t->write_pos += src_sz;
return src_sz;
}
uint32_t rb_read(ringbuffer_t* src_t,void* dst_buf,uint32_t dst_sz)
{
if(rb_is_empty(src_t))
return -1;
uint32_t i = 0;
dst_sz = min(dst_sz,rb_length(src_t) );
//rb_length(src_t)为实际的大小
//dst_sz为预期的大小
i = min(dst_sz,(src_t->size - src_t->read_pos & (src_t->size - 1)));
//src_t->read_pos & (src_t->size - 1))表示read_pos在size的位置
//src_t->size - src_t->read_pos & (src_t->size - 1)表示求出右边空余位置的大小
memcpy(dst_buf,src_t->buf + (src_t->read_pos & (src_t->size - 1)),i);
memcpy(dst_buf + i,src_t->buf,dst_sz - i);
src_t->read_pos +=dst_sz;
return dst_sz;
}
int main()
{
ringbuffer_t rb;
rb_init(&rb,9);
uint32_t put1 = rb_write(&rb,(void*)("mark"),4);
printf("put1 = %u,wr = %u,rd = %u,length = %u\n",put1,rb.write_pos,rb.read_pos,rb_length(&rb));
return 0;
}
二、ringbuffer以及多线程换将下数据安全思考
1)1读1写->内存屏障
uint32_t rb_write(ringbuffer_t* dst_t,void* src_buf,uint32_t src_sz)
{
if(src_sz > rb_reamin(dst_t))
return -1;
//内存屏障
#ifdef USE_MEMPROTECT
//确保开始移动buffer的时候,其他线程可以读到最新的read_pos
smp_mb(); //atomic_thread_fence()
#endif
//dst_t->write_pos & (dst_t->size - 1) 把写坐标弄到这个圆环的范围之内
//看哪个剩余空间最小
//安全的拷贝
uint32_t i = min(src_sz,dst_t->size - dst_t->write_pos & (dst_t->size - 1));
//先拷贝后面的,再拷贝前面的
memcpy(dst_t->buf + (dst_t->write_pos & (dst_t->size - 1)), src_buf,i);
memcpy(dst_t->buf,src_buf + i,src_sz - i);//这里src_sz - i是负数是不会拷贝的
#ifdef USE_MEMPROTECT
//确保write_pos不会被优化到前面去
smp_wmb();//放在dst_t->write_pos += src_sz;之前,确保memcpy正确拷贝
#endif
dst_t->write_pos += src_sz;
return src_sz;
}
uint32_t rb_read(ringbuffer_t* src_t,void* dst_buf,uint32_t dst_sz)
{
if(rb_is_empty(src_t))
return -1;
#ifdef USE_MEMPROTECT
smp_mb(); //atomic_thread_fence()
#endif
uint32_t i = 0;
dst_sz = min(dst_sz,rb_length(src_t) );
//rb_length(src_t)为实际的大小
//dst_sz为预期的大小
i = min(dst_sz,(src_t->size - src_t->read_pos & (src_t->size - 1)));
//src_t->read_pos & (src_t->size - 1))表示read_pos在size的位置
//src_t->size - src_t->read_pos & (src_t->size - 1)表示求出右边空余位置的大小
memcpy(dst_buf,src_t->buf + (src_t->read_pos & (src_t->size - 1)),i);
memcpy(dst_buf + i,src_t->buf,dst_sz - i);
#ifdef USE_MEMPROTECT
smp_wmb();//放在dst_t->write_pos += src_sz;之前,确保memcpy正确拷贝
#endif
src_t->read_pos +=dst_sz;
return dst_sz;
}
2) 多线程(多读多写->需要加锁且加内存屏障)
3)不需要加内存屏障的情况(加锁加了pthread_spinlock,自旋锁就不加)
三、libevent中数据缓冲Bufferevent的设计
1)背景介绍(bufferevent\evbuffer\evbuffer_chain)
- 基础概念
每个 bufferevent 都有一个输入缓冲区和一个输出缓冲区 ,它们的类型都是“struct evbuffer”。 有数据要写入到 bufferevent 时,添加数据到输出缓冲区 ;bufferevent 中有数据供读取的时候,从输入缓冲区抽取(drain)数据。evbuffer 接口支持很多种操作,后面的章节将讨论这些操作。
2)解释
结构体bufferevent下面的evbuffer是个链表结构体,这个链表的每个节点都是挂在不通缓冲区里面,链表的每个节点是struct evbuffer_chain,每个节点的unsigned char *buffer;指向分配的空间
3)源码展示
- bufferevent结构体
struct bufferevent {
/** Event base for which this bufferevent was created. */
struct event_base *ev_base;
/** Pointer to a table of function pointers to set up how this
bufferevent behaves. */
const struct bufferevent_ops *be_ops;
/** A read event that triggers when a timeout has happened or a socket
is ready to read data. Only used by some subtypes of
bufferevent. */
struct event ev_read;
/** A write event that triggers when a timeout has happened or a socket
is ready to write data. Only used by some subtypes of
bufferevent. */
struct event ev_write;
/** An input buffer. Only the bufferevent is allowed to add data to
this buffer, though the user is allowed to drain it. */
struct evbuffer *input;
/** An input buffer. Only the bufferevent is allowed to drain data
from this buffer, though the user is allowed to add it. */
struct evbuffer *output;
struct event_watermark wm_read;
struct event_watermark wm_write;
bufferevent_data_cb readcb;
bufferevent_data_cb writecb;
/* This should be called 'eventcb', but renaming it would break
* backward compatibility */
bufferevent_event_cb errorcb;
void *cbarg;
struct timeval timeout_read;
struct timeval timeout_write;
/** Events that are currently enabled: currently EV_READ and EV_WRITE
are supported. */
short enabled;
};
- evbuffer结构体
struct evbuffer {
/** The first chain in this buffer's linked list of chains. */
struct evbuffer_chain *first;
/** The last chain in this buffer's linked list of chains. */
struct evbuffer_chain *last;
/** Pointer to the next pointer pointing at the 'last_with_data' chain.
*
* To unpack:
*
* The last_with_data chain is the last chain that has any data in it.
* If all chains in the buffer are empty, it is the first chain.
* If the buffer has no chains, it is NULL.
*
* The last_with_datap pointer points at _whatever 'next' pointer_
* pointing at the last_with_data chain. If the last_with_data chain
* is the first chain, or it is NULL, then the last_with_datap pointer
* is &buf->first.
*/
struct evbuffer_chain **last_with_datap;
/** Total amount of bytes stored in all chains.*/
size_t total_len;
/** Number of bytes we have added to the buffer since we last tried to
* invoke callbacks. */
size_t n_add_for_cb;
/** Number of bytes we have removed from the buffer since we last
* tried to invoke callbacks. */
size_t n_del_for_cb;
#ifndef EVENT__DISABLE_THREAD_SUPPORT
/** A lock used to mediate access to this buffer. */
void *lock;
#endif
/** True iff we should free the lock field when we free this
* evbuffer. */
unsigned own_lock : 1;
/** True iff we should not allow changes to the front of the buffer
* (drains or prepends). */
unsigned freeze_start : 1;
/** True iff we should not allow changes to the end of the buffer
* (appends) */
unsigned freeze_end : 1;
/** True iff this evbuffer's callbacks are not invoked immediately
* upon a change in the buffer, but instead are deferred to be invoked
* from the event_base's loop. Useful for preventing enormous stack
* overflows when we have mutually recursive callbacks, and for
* serializing callbacks in a single thread. */
unsigned deferred_cbs : 1;
#ifdef _WIN32
/** True iff this buffer is set up for overlapped IO. */
unsigned is_overlapped : 1;
#endif
/** Zero or more EVBUFFER_FLAG_* bits */
ev_uint32_t flags;
/** Used to implement deferred callbacks. */
struct event_base *cb_queue;
/** A reference count on this evbuffer. When the reference count
* reaches 0, the buffer is destroyed. Manipulated with
* evbuffer_incref and evbuffer_decref_and_unlock and
* evbuffer_free. */
int refcnt;
/** A struct event_callback handle to make all of this buffer's callbacks
* invoked from the event loop. */
struct event_callback deferred;
/** A doubly-linked-list of callback functions */
LIST_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks;
/** The parent bufferevent object this evbuffer belongs to.
* NULL if the evbuffer stands alone. */
struct bufferevent *parent;
};
- 结构体evbuffer_chain
struct evbuffer_chain {
/** points to next buffer in the chain */
struct evbuffer_chain *next;
/** total allocation available in the buffer field. */
size_t buffer_len;
/** unused space at the beginning of buffer or an offset into a
* file for sendfile buffers. */
ev_misalign_t misalign;
/** Offset into buffer + misalign at which to start writing.
* In other words, the total number of bytes actually stored
* in buffer. */
size_t off;
/** Set if special handling is required for this chain */
unsigned flags;
#define EVBUFFER_FILESEGMENT 0x0001 /**< A chain used for a file segment */
#define EVBUFFER_SENDFILE 0x0002 /**< a chain used with sendfile */
#define EVBUFFER_REFERENCE 0x0004 /**< a chain with a mem reference */
#define EVBUFFER_IMMUTABLE 0x0008 /**< read-only chain */
/** a chain that mustn't be reallocated or freed, or have its contents
* memmoved, until the chain is un-pinned. */
#define EVBUFFER_MEM_PINNED_R 0x0010
#define EVBUFFER_MEM_PINNED_W 0x0020
#define EVBUFFER_MEM_PINNED_ANY (EVBUFFER_MEM_PINNED_R|EVBUFFER_MEM_PINNED_W)
/** a chain that should be freed, but can't be freed until it is
* un-pinned. */
#define EVBUFFER_DANGLING 0x0040
/** a chain that is a referenced copy of another chain */
#define EVBUFFER_MULTICAST 0x0080
/** number of references to this chain */
int refcnt;
/** Usually points to the read-write memory belonging to this
* buffer allocated as part of the evbuffer_chain allocation.
* For mmap, this can be a read-only buffer and
* EVBUFFER_IMMUTABLE will be set in flags. For sendfile, it
* may point to NULL.
*/
unsigned char *buffer;
};
版权声明:本文标题:服务器正文16:缓冲区设计(ringbuffer、用libevent配合ringbuffer做无锁队列) 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dianzi/1727934810a1138805.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论