linux后台开发之海量、高并发场景下,定时器如何设计
5种红黑树的场景,从Linux内核谈到Nginx源码,听完醍醐灌顶
网络程序需要处理的第三类事件是定时事件,比如定期检测一个客户连接的活动状态。服务器程序通常管理着众多定时事件,因此有效组织定时事件,使之能在预期的时间点被触发且不影响服务器的主要逻辑。
将每个定时事件分别封装成定时器,并使用某种容器类数据结构,比如链表、排序链表和时间轮,将所有定时器串联起来,以实现对定时事件的统一管理。
定时定时是指在一段时间之后触发某段代码的机制。Linux提供了三种定时方法:
socket选项SO_RCVTIMEO和SO_SNDTIMEOSIGALRM信号I/O复用系统调用的超时参数socket选项SO_RCVTIMEO和SO_SNDTIMEO
socket选项SO_RCVTIMEO和SO_SNDTIMEO分别用来设置socket接收数据超时时间和发送数据超时时间。这两个选项仅对与数据接收和发送相关的socket专用系统调用有效,这些系统调用包括send、sendmsg、recv、recvmsg、accept和connect。可以根据系统调用的返回值以及errno来判断超时时间是否已到,进而决定是否开始处理定时任务。
1#include <sys/types.h> 2 #include <sys/socket.h> 3 #include <netinet/in.h> 4 #include <arpa/inet.h> 5 #include <stdlib.h> 6 #include <assert.h> 7 #include <stdio.h> 8 #include <errno.h> 9 #include <fcntl.h> 10 #include <unistd.h> 11 #include <string.h> 12 //超时连接函数 13 int timeout_connect(const char* ip, int port, int time); 14 int main(int argc, char* argv[]) 15 { 16 if(argc <= 2) 17 { 18 printf("usage: %s ip_address port_number\n", basename(argv[0])); 19 return 1; 20 } 21 const char* ip = argv[1]; 22 int port = atoi(argv[2]); 23 int sockfd = timeout_connect(ip,port,10); 24 if(sockfd < 0) 25 { 26 return 1; 27 } 28 return 0; 29 } 30 int timeout_connect(const char* ip, int port, int time) 31 { 32 int ret = 0; 33 struct sockaddr_in address; 34 bzero(&address,sizeof(address)); 35 address.sin_family = AF_INET; 36 inet_pton(AF_INET,ip,&address.sin_addr); 37 address.sin_port = htons(port); 38 int sockfd = socket(PF_INET,SOCK_STREAM,0); 39 assert(sockfd>=0); 40 //通过选项SO_RCVTIMEO和SO_SNDTIMEO所设置的超时时间的类型是timeval,这和select系统 41 //调用的超时参数类型相同 42 struct timeval timeout; 43 timeout.tv_sec = time; 44 timeout.tv_usec = 0; 45 socklen_t len = sizeof(timeout); 46 ret = setsockopt(sockfd,SOL_SOCKET, SO_SNDTIMEO, &timeout, len); 47 assert(ret!=-1); 48 49 ret = connect(sockfd,(struct sockaddr*)&address, sizeof(address)); 50 if( ret == -1 ) 51 { 52 //超时对应的错误号是EINPROGRESS 53 if(errno == EINPROGRESS) 54 { 55 printf("connecting timeout, process timeout logic\n"); 56 return -1; 57 } 58 printf("error occur when connecting to server\n"); 59 return -1; 60 } 61 return sockfd; 62 }
SIGALRM信号
由alarm和setitimer函数设置的实时闹钟一旦超时,将触发SIGALRM信号。我们可以利用该信号的信号处理函数来处理定时任务。但是,如果要处理多个定时任务,我们就需要不断地触发SIGALRM信号,并在其信号处理函数中执行到期的任务。一般而言,SIGALRM信号按照固定的频率生成,即由alarm或setitimer函数设置的定时周期T保持不变。如果某个定时任务的超时时间不是T的整数倍,那么它实际被执行的时间和预期的时间将略有偏差。因此定时周期T反映了定时的精度。
举例:
基于升序链表的定时器-》简单的定时器实现 添加定时器的时间复杂度是O(n),删除定时器的时间复杂度是O(1),执行定时任务的时间复杂度是O(1)
定时器通常至少要包含两个成员:一个超时时间(相对时间或绝对时间)和一个任务回调函数。有时候还可能包含回调函数被执行时需要传入的参数,以及是否重启定时器等信息。
服务器程序通常要定期处理非活动连接:给客户端发一个重连请求,或者关闭该连接,或者其他。Linux在内核中提供了对连接是否处于活动状态的定期检查机制,我们可以通过socket选项KEEPALIVE来激活它。不过使用这种方式将使得应用程序对连接的管理变得复杂。因此,考虑在应用层实现类似于KEEPALIVE的机制,以管理所有长时间处于非活动状态的连接。
利用alarm函数周期性地触发SIGALRM信号,该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务-关闭非活动的连接。
timelink.h
1 #ifndef LST_TIMER 2 #define LST_TIMER 3 4 #include <time.h> 5 #define BUFFER_SIZE 64 6 class util_timer; //前向声明 7 8 //用户数据结构:客户端socket地址、socket文件描述符、读缓存和定时器 9 struct client_data 10 { 11 sockaddr_in address; 12 int sockfd; 13 char buf[BUFFER_SIZE]; 14 util_timer* timer; 15 }; 16 17 //定时器类 18 class util_timer 19 { 20 public: 21 util_timer():prev(NULL),next(NULL){} 22 23 time_t expire; //任务的超时时间,这里使用绝对时间 24 void (*cb_func)(client_data*); //任务回调函数,输入参数是客户数据 25 client_data* user_data; //指向用户数据 26 util_timer* prev; //指向前一个定时器 27 util_timer* next; //指向下一个定时器 28 }; 29 30 //定时器链表,它是一个升序、双向链表,且带有头结点和尾结点 31 class sort_timer_lst 32 { 33 public: 34 sort_timer_lst():head(NULL),tail(NULL){} 35 ~sort_timer_lst() 36 { 37 util_timer* tmp = head; 38 while(tmp) 39 { 40 head = tmp->next; 41 delete tmp; 42 tmp = head; 43 } 44 } 45 46 //将目标定时器timer添加到链表中 47 void add_timer(util_timer* timer) 48 { 49 if(!timer) 50 { 51 return; 52 } 53 if(!head) 54 { 55 head = tail = timer; 56 return; 57 } 58 //如果目标定时器的超时时间小于当前链表中所有定时器的超时时间,则把该定时器插 入链表头部 59 if(timer->expire < head->expire) 60 { 61 timer->next = head; 62 head->prev = timer; 63 head = timer; 64 return; 65 } 66 add_timer(timer,head); 67 } 68 69 70 //当某个定时任务发生变化时,调整对应的定时器在链表中的位置 71 //这个函数只考虑被调整的定时器的超时时间延长的情况 72 void adjust_timer(util_timer* timer) 73 { 74 if(!timer) 75 { 76 return; 77 } 78 util_timer* tmp = timer->next; 79 //如果被调整的目标定时器在链表尾部,或者该定时器新的超时值仍然小于下一个定时 器超时值,则不用调整 80 if(!tmp||(timer->expire < tmp->expire)) 81 { 82 return; 83 } 84 //如果目标定时器是链表的头结点,则将该定时器从链表中取出并重新插入链表 85 if(timer == head) 86 { 87 head = head->next; 88 head->prev = NULL; 89 timer->next = NULL; 90 add_timer(timer,head); 91 }else{ 92 //如果目标定时器不是链表的头结点,则将该定时器从链表中取出,然后插入其原来所 在位置之后的部分链表 93 timer->prev->next = timer->next; 94 timer->next->prev = timer->prev; 95 add_timer(timer,timer->next); 96 } 97 } 98 99 //将目标定时器从链表中删除100 void del_timer(util_timer* timer)101 {102 if(!timer)103 {104 return;105 }106 //链表中只有一个定时器107 if((timer == head)&&(timer == tail))108 {109 delete timer;110 head = NULL;111 tail = NULL;112 return;113 }114 //如果链表中至少有两个定时器,且目标定时器是链表头结点115 if(timer==head)116 {117 head = head->next;118 head->prev = NULL;119 delete timer;120 return;121 }122 //如果链表中至少有两个定时器,且目标定时器是链表的尾结点123 if(timer==tail)124 {125 tail = tail->prev;126 tail->next = NULL;127 delete timer;128 return;129 }130 //如果目标定时器位于链表的中间131 timer->prev->next = timer->next;132 timer->next->prev = timer->prev;133 delete timer;134 }135 136 //SIGALRM信号每次被触发就在其信号处理哈桑农户中执行一次tick函数,以处理链表上到期的> 任务137 void tick()138 {139 if(!head)140 {141 return;142 }143 printf("timer tick\n");144 time_t cur = time(NULL); //获得系统当前的时间145 util_timer* tmp = head;146 //从头结点开始依次处理每个定时器,直到遇到一个尚未到期的定时器,这就是定时器 的核心逻辑147 while(tmp)148 {149 //比较定时器的超时值和系统当前时间150 if(cur<tmp->expire)151 {152 break;153 }154 155 //调用定时器的回调函数,以执行定时任务156 tmp->cb_func(tmp->user_data);157 158 //执行完定时器中的定时任务后,将它从链表中删除159 head = tmp->next;160 if(head)161 {162 head->prev = NULL;163 }164 delete tmp;165 tmp = head;166 }167 }168 private:169 //私有重载定时器添加函数,添加到结点lst_head之后的链表中170 void add_timer(util_timer* timer, util_timer* lst_head)171 {172 util_timer* prev = lst_head;173 util_timer* tmp = prev->next;174 //遍历lst_head节点之后的部分链表,直到找到一个超时时间大于目标定时器的超时时 间的结点,并将目标定时器插入该结点之前175 while(tmp)176 {177 if(timer->expire < tmp->expire)178 {179 prev->next = timer;180 timer->next = tmp;181 tmp->prev = timer;182 timer->prev = prev;183 break;184 }185 prev = tmp;186 tmp = tmp->next;187 }188 //如果遍历完lst_head结点之后的部分链表,仍未找到超时时间大于目标定时器时间的 结点,则将目标定时器插入链表尾部,并把它设置为新的尾结点189 if(!tmp)190 {191 prev->next = timer;192 timer->prev = prev;193 timer->next = NULL;194 tail = timer;195 }196 }197 private:198 util_timer* head;199 util_timer* tail;200 };201 202 #endif
【文章福利】需要C/C Linux服务器架构师学习资料加群812855908(资料包括C/C ,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等)
closelink.cpp
1 #include <sys/types.h> 2 #include <sys/socket.h> 3 #include <netinet/in.h> 4 #include <arpa/inet.h> 5 #include <assert.h> 6 #include <stdio.h> 7 #include <signal.h> 8 #include <unistd.h> 9 #include <errno.h> 10 #include <string.h> 11 #include <fcntl.h> 12 #include <stdlib.h> 13 #include <sys/epoll.h> 14 #include <pthread.h> 15 #include "timelink.h" 16 17 #define FD_LIMIT 65535 18 #define MAX_EVENT_NUMBER 1024 19 #define TIMESLOT 5 20 static int pipefd[2]; 21 static sort_timer_lst timer_lst; 22 static int epollfd = 0; 23 24 int setnonblocking(int fd) 25 { 26 int old_option = fcntl(fd,F_GETFL); 27 int new_option = old_option | O_NONBLOCK; 28 fcntl(fd,F_SETFL,new_option); 29 return old_option; 30 } 31 void addfd(int epollfd, int fd) 32 { 33 epoll_event event; 34 event.data.fd = fd; 35 event.events = EPOLLIN|EPOLLET; 36 epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event); 37 setnonblocking(fd); 38 } 39 void sig_handler(int sig) 40 { 41 int save_errno = errno; 42 int msg = sig; 43 send(pipefd[1],(char*)&msg,1,0); 44 errno = save_errno; 45 } 46 void addsig(int sig) 47 { 48 struct sigaction sa; 49 memset(&sa,'\0',sizeof(sa)); 50 sa.sa_handler = sig_handler; 51 sa.sa_flags |= SA_RESTART; 52 sigfillset(&sa.sa_mask); 53 assert(sigaction(sig,&sa,NULL)!=-1); 54 } 55 //定时处理任务,实际上就是调用tick函数 56 void timer_handler() 57 { 58 timer_lst.tick(); 59 //因为一次alarm调用只会引起一次SIGALRM信号,需要重新定义,以不断触发SIGALRM信号 60 alarm(TIMESLOT); 61 } 62 //定时器回调函数,它删除非活动连接socket上的注册事件,并关闭之 63 void cb_func(client_data* user_data) 64 { 65 assert(user_data); 66 epoll_ctl(epollfd,EPOLL_CTL_DEL,user_data->sockfd,0); 67 close(user_data->sockfd); 68 printf("close fd %d\n",user_data->sockfd); 69 } 70 int main(int argc, char* argv[]) 71 { 72 if( argc <= 2) 73 { 74 printf("usage: %s ip_address port_number\n",basename(argv[0])); 75 return 1; 76 } 77 const char* ip = argv[1]; 78 int port = atoi(argv[2]); 79 int ret = 0; 80 struct sockaddr_in address; 81 bzero(&address,sizeof(address)); 82 address.sin_family = AF_INET; 83 inet_pton(AF_INET, ip, &address.sin_addr); 84 address.sin_port = htons(port); 85 int listenfd = socket(PF_INET,SOCK_STREAM,0); 86 assert(listenfd>=0); 87 ret = bind(listenfd,(struct sockaddr*)&address,sizeof(address)); 88 assert(ret!=-1); 89 ret = listen(listenfd,5); 90 assert(ret!=-1); 91 epoll_event events[MAX_EVENT_NUMBER]; 92 int epollfd = epoll_create(5); 93 assert(epollfd!=-1); 94 addfd(epollfd,listenfd); 95 ret = socketpair(PF_UNIX,SOCK_STREAM,0,pipefd); 96 assert(ret!=-1); 97 setnonblocking(pipefd[1]); 98 addfd(epollfd,pipefd[0]); 99 100 //设置信号处理函数101 addsig(SIGALRM);102 addsig(SIGTERM);103 bool stop_server = false;104 105 client_data* users = new client_data[FD_LIMIT];106 bool timeout = false;107 alarm(TIMESLOT); //定时108 109 while(!stop_server)110 {111 int number = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);112 if((number<0)&&(errno!=EINTR))113 {114 printf("epoll failure\n");115 break;116 }117 for(int i = 0; i < number; i )118 {119 int sockfd = events[i].data.fd;120 if(sockfd==listenfd)121 {122 //处理新到的客户连接123 struct sockaddr_in client_address;124 socklen_t client_addrlength = sizeof(client_address);125 int connfd = accept(listenfd,(struct sockaddr*)&client_addre ss,&client_addrlength);126 addfd(epollfd,connfd);127 users[connfd].address = client_address;128 users[connfd].sockfd = connfd;129 //创建定时器,设置其回调函数与超时时间,然后绑定定时器与用户数据,最后将定时器添加到链表timer_lst中130 util_timer* timer = new util_timer;131 timer->user_data = &users[connfd];132 timer->cb_func = cb_func;133 time_t cur = time(NULL);134 timer->expire = cur 3*TIMESLOT;135 users[connfd].timer = timer;136 timer_lst.add_timer(timer);137 }else if((sockfd==pipefd[0])&&(events[i].events & EPOLLIN)){138 //处理信号139 int sig;140 char signals[1024];141 ret = recv(pipefd[0],signals,sizeof(signals),0);142 if(ret==-1)143 {144 //处理错误145 continue;146 }else if(ret == 0)147 {148 continue;149 }else{150 for(int i = 0; i < ret; i)151 {152 switch(signals[i])153 {154 case SIGALRM:155 //用timeout标记有定时任务需> 要处理156 {timeout = true; break;}157 case SIGTERM:158 {stop_server = true;}159 }160 }161 }162 }else if(events[i].events & EPOLLIN)163 {164 //处理客户连接上接收到的数据165 memset(users[sockfd].buf, '\0', BUFFER_SIZE);166 ret = recv(sockfd,users[sockfd].buf,BUFFER_SIZE-1,0);167 168 util_timer* timer = users[sockfd].timer;169 if(ret <0)170 { //如果发生读错误,则关闭连接,并移除其对应的定时器171 if(errno!=EAGAIN)172 {173 cb_func(&users[sockfd]);174 if(timer)175 {176 timer_lst.del_timer(timer);177 }178 }179 }else if(ret == 0)180 { //如果对方关闭连接,则服务器也关闭连接,并移除对应的 定时器181 cb_func(&users[sockfd]);182 if(timer)183 {184 timer_lst.del_timer(timer);185 }186 }else{ //如果某个客户连接上有数据,则要调整该连接对应的定时> 器,以延迟该连接被关闭的时间187 printf("get %d bytes of client data %s from %d\n",re t,users[sockfd].buf,sockfd);188 if(timer)189 {190 time_t cur = time(NULL);191 timer->expire = cur 3*TIMESLOT;192 printf("adjust timer once\n");193 timer_lst.adjust_timer(timer);194 }195 }196 }else{197 //others198 }199 }200 //最后处理定时事件,因为I/O事件有更高优先级,这样会导致定时任务不能按照预期> 时间执行201 if(timeout)202 {203 timer_handler();204 timeout = false;205 }206 }207 close(listenfd);208 close(pipefd[1]);209 close(pipefd[0]);210 delete[] users;211 return 0;212 }
I/O复用系统调用的超时参数
Linux下的3组I/O复用系统调用都带有超时参数,因此它们不仅能统一处理信号和I/O事件,也能统一处理定时事件。但是由于I/O复用系统调用可能在超时时间到期之前就返回(有I/O事件发生),所以如果要利用它们来定时,就需要不断更新定时参数以反映剩余的时间。
#define TIMEOUT 5000int timeout = TIMEOUT;time_t start = time(NULL);time_t end = time(NULL);while(1){printf("the timeout is now %d mil-seconds\n",timeout);start = time(NULL);int number = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,timeout);if((number<0)&&(errno!=EINTR)){printf("epoll failure\n");break;}//如果epoll_wait成功返回0,则说明超时时间到,此时便可处理定时任务,并重置定时时间if(number==0){timeout = TIMEOUT;continue;}end = time(NULL);//如果epoll_wait的返回值大于0,则本次epoll_wait调用持续的时间是(end-start)*1000ms,我们需要将定时时间timeout减去这段时间,以获得下次epoll_wait调用的超时参数timeout -= (end-start)*1000;//重新计算后timeout有可能等于0,说明本次epoll_wait调用返回时,不仅有文件描述符就绪,而且其超时时间也刚好到达,此时我们也要处理定时任务,并重置定时时间if(timeout <= 0){timeout = TIMEOUT;}//handle connections}
时间轮
在时间轮内,(实线)指针指向轮子上的一个槽(slot),它以恒定的速度顺时针转动,每转动一步就指向下一个槽(虚线指针指向的槽),每次转动称为一个滴答(tick)。一个滴答的时间称为时间轮的槽间隔si(slot interval),它实际上就是心搏时间。该时间轮共有N个槽,因此它运转一周的时间是Nsi。每个槽指向一条定时器链表,每条链表上的定时器具有相同的特征:它们的定时时间相差Nsi的整数倍。时间轮正是利用这个关系将定时器散列到不同的链表中。假如现在指针指向槽cs,我们要添加一个定时时间为ti的定时器,则该定时器将被插入槽ts(timer slot)对应的链表中:ts=(cs (ti/si))%N
基于排序链表的定时器使用唯一的一条链表来管理所有定时器,所以插入操作的效率随着定时器数目的增多而降低。而时间轮私用哈希表的思想,将定时器散列到不同的链表上。这样每条链表上的定时器数目都将明显少于原来的排序链表上的定时器数目,插入操作的效率基本不受定时器数目的影响。对于时间轮而言,添加一个定时器的时间复杂度是O(1),删除一个定时器的时间复杂度也是O(1),指向一个定时器的时间复杂度是O(n)。但实际上执行一个定时器任务的效率要比O(n)好得多,因为时间轮将所有的定时器散列到了不同的链表上。时间轮的槽越多,等价于散列表的入口越多,从而每条链表上的定时器数量越少。
timeround.h
1 #ifndef TIME_WHEEL_TIMER 2 #define TIME_WHEEL_TIMER 3 #include <time.h> 4 #include <netinet/in.h> 5 #include <stdio.h> 6 #define BUFFER_SIZE 64 7 class tw_timer; 8 struct client_data 9 { 10 sockaddr_in address; 11 int sockfd; 12 char buf[BUFFER_SIZE]; 13 tw_timer* timer; 14 }; 15 class tw_timer 16 { 17 pubilc: 18 tw_timer(int rot, int ts):rotation(rot),time_slot(ts),next(NULL),prev(NULL){} 19 int rotation; //记录定时器在时间轮转多少圈后生效 20 int time_slot; //记录定时器属于时间轮上哪个槽 21 void (*cb_func)(client_data*); //定时器回调函数 22 client_data* user_data; //客户数据 23 tw_timer* next; //指向下一个定时器 24 tw_timer* prev; //指向前一个定时器 25 }; 26 27 class time_wheel 28 { 29 public: 30 time_wheel():cur_slot(0) 31 { 32 for(int i = 0; i < N; i) 33 { 34 slots[i] = NULL; 35 } 36 } 37 ~time_wheel() 38 { 39 for(int i = 0; i < N; i) 40 { 41 tw_timer* tmp = slots[i]; 42 while(tmp) 43 { 44 slots[i] = tmp->next; 45 delete tmp; 46 tmp = slots[i]; 47 } 48 } 49 } 50 51 //根据定时值timeout创建一个定时器,并把它插入何时的槽中 52 tw_timer* add_timer(int timeout) 53 { 54 if(timeout < 0) 55 { 56 return NULL; 57 } 58 int ticks = 0; 59 //下面根据待插入定时器的超时值计算它将在时间轮转动多少个滴答后被触发 60 //并将该滴答数存储于变量ticks中。如果待插入定时器的超时值小于时间轮 61 //的槽间隔SI,则将ticks向上折合为1,否则就将ticks向下折合为timeout/SI 62 if(timeout < SI) 63 { 64 ticks = 1; 65 }else{ 66 ticks = timeout / SI; 67 } 68 //计算待插入的定时器在时间轮转动多少圈后被触发 69 int rotation = ticks/N; 70 //计算待插入的定时器应该被插入哪个槽中 71 int ts = (cur_slot (ticks%N))%N; 72 //创建新的定时器,它在时间轮转动rotation圈之后被触发,且位于第ts个槽 73 tw_timer* timer = new tw_timer(rotation,ts); 74 //如果第ts个槽中尚无任何定时器,则把新建的定时器插入其中,并将该定时器设置为该槽的头 结点 75 if(!slots[ts]) 76 { 77 printf("add timer, rotation is %d, ts is %d, cur_slot is %d\n",rotation,ts,c ur_slot); 78 slots[ts] = timer; 79 }else{ 80 //否则将定时器插入第ts个槽中 81 timer->next = slots[ts]; 82 slots[ts]->prev = timer; 83 slots[ts] = timer; 84 } 85 return timer; 86 } 87 88 //删除目标定时器timer 89 void del_timer(tw_timer* timer) 90 { 91 if(!timer) 92 { 93 return; 94 } 95 int ts = timer->time_slot; 96 //slots[ts]是目标定时器所在槽的头结点 97 if(timer==slots[ts]) 98 { 99 slots[ts] = slots[ts]->next;100 if(slots[ts])101 {102 slots[ts]->prev = NULL;103 }104 delete timer;105 }else{106 timer->prev->next = timer->next;107 if(timer->next)108 {109 timer->next->prev = timer->prev;110 }111 delete timer;112 }113 }114 115 //SI时间到后,调用该函数,时间轮向前滚动一个槽的间隔116 void tick()117 {118 tw_timer* tmp = slots[cur_slot]; //取得时间轮上当前槽的头结点119 printf("current slot is %d\n",cur_slot);120 while(tmp)121 {122 printf("tick the timer once\n");123 //如果定时器的rotation值大于0,则它在这一轮不起作用124 if(tmp->rotation>0)125 {126 tmp->rotation--;127 tmp = tmp->next;128 }else{129 //定时器已经到期,于是执行定时任务,然后删除该定时器130 tmp->cb_func(tmp->user_data);131 if(tmp==slots[cur_slot])132 {133 printf("delete header in cur_slot\n");134 slots[cur_slot] = tmp->next;135 delete tmp;136 if(slots[cur_slot])137 {138 slots[cur_slot]->prev = NULL;139 }140 tmp = slots[cur_slot];141 }else{142 tmp->prev->next = tmp->next;143 if(tmp->next)144 {145 tmp->next->prev = tmp->prev;146 }147 tw_timer* tmp2 = tmp->next;148 delete tmp;149 tmp = tmp2;150 }151 152 }153 }154 //更新时间轮的当前槽155 cur_slot = cur_slot % N;156 }157 private:158 static const int N = 60; //时间轮上槽的数目159 static const int SI = 1; //每1s时间轮转动一次,即槽间隔为1s160 tw_timer* slots[N]; //时间轮的槽,其中每个元素指向一个定时器链表,链表无序161 int cur_slot; //时间轮的当前槽162 163 };164 165 #endif
时间堆
前面讨论的定时方案都是以固定的频率调用心搏tick,并在其中依次检测到期的定时器,然后执行到期定时器上的回调函数。设计定时器的另外一种思路是:将所有定时器中超时时间最小的一个定时器的超时值作为心搏间隔。这样,一旦心搏tick被调用,超时时间最小的定时器必然到期,我们就可以在tick函数中处理该定时器。然后,再次从剩余的定时器中找出超时时间最小的一个,并将这段最小时间设置为下一次心搏间隔。如此反复,就实现了较为精确的定时。
最小堆是指每个节点的值都小于或等于其子节点的值的完全二叉树。树的基本操作是插入节点和删除节点。为了将一个元素X插入最小堆,我们可以在树的下一个空闲位置创建一个空穴。如果X可以放在空穴中而不破坏堆序,则插入完成。否则就执行上虑操作,即交换空穴和它的父节点上的元素。不断执行上述过程,直到X可以被放入空穴,则插入操作完成。
最小堆的删除操作指的是删除其根节点上的元素,并且不被坏堆序性质。执行删除操作时,我们需要先在根节点处创建一个空穴。由于堆现在少了一个元素,因此我们可以把堆的最后一个元素X移动到该堆的某个地方。如果X可以被放入空穴,则删除操作完成。否则就执行下虑操作,即交换空穴和它的两个儿子节点中的较小者。不断进行上述过程,直到X可以被放入空穴,则删除操作完成。
由于最小堆是一种完全二叉树,所以可以用数组来组织其中的元素。比如第一幅图所示的最小堆可以用下图表示。对于数组中的任意一个位置i上的元素,其左儿子节点在位置2i 1上,其右儿子节点在位置2i 2上,其父节点则在位置[(i-1)/2] (i>0)上。与用链表来表示堆相比,用数组表示堆不仅节省空间,而且更容易实现堆的插入、删除等操作。
假设我们已经有一个包含N个元素的数组,现在要把它初始化为一个最小堆。那么最简单的方法是:初始化一个空堆,然后将数组中的每个元素插入该堆中。不过这样做效率偏低。实际上,我们只需要对数组中的第[(N-1)/2] ~ 0个元素执行下虑操作,即可保证该数组构成一个最小堆。这是因为对包含N个元素的完全二叉树而言,它具有[(N-1)/2]个非叶子节点,这些非叶子节点正是该完全二叉树的第0 ~ [(N-1)/2]个节点。我们只要确保这些非叶子节点构成的子树都具有堆序性质,整个树就具有堆序性质。
timestack.h
1 #ifndef MIN_HEAP 2 #define MIN_HEAP 3 #include <iostream> 4 #include <netinet/in.h> 5 #include <time.h> 6 using std::exception; 7 #define BUFFER_SIZE 64 8 9 class heap_timer; 10 struct client_data 11 { 12 sockaddr_in address; 13 int sockfd; 14 char buf[BUFFER_SIZE]; 15 heap_timer* timer; 16 }; 17 class heap_timer 18 { 19 public: 20 heap_timer(int delay) { expire = time(NULL) delay; } 21 time_t expire; //定时器生效的绝对时间 22 void (*cb_func)(client_data*); //定时器的回调函数 23 client_data* user_data; //用户数据 24 }; 25 26 class time_heap 27 { 28 public: 29 time_heap(int cap) throw(std::exception):capacity(cap),cur_size(0) 30 { 31 array = new heap_timer* [capacity]; 32 if(!array) 33 { 34 throw std::exception(); 35 } 36 for(int i = 0; i < capacity; i) 37 { 38 array[i] = NULL; 39 } 40 } 41 time_heap(heap_timer** init_array, int size, int capacity) throw(std::exception): 42 capacity(capacity),cur_size(size) 43 { 44 if(capacity<size) 45 { 46 throw std::exception(); 47 } 48 array = new heap_timer* [capacity]; 49 if(!array) 50 { 51 throw std::exception(); 52 } 53 for(int i = 0; i < capacity; i) 54 { 55 array[i] = NULL; 56 } 57 if(size!=0) 58 { 59 for(int i = 0; i < size; i) 60 { 61 array[i] = init_array[i]; 62 } 63 for(int i = (cur_size-1)/2; i >=0; --i) 64 { //对数组中的第[(cur_size-1)/2]~0个元素执行下虑操作 65 percolate_down(i); 66 } 67 } 68 } 69 ~time_heap() 70 { 71 for(int i =0; i < cur_size; i) 72 { 73 delete array[i]; 74 } 75 delete [] array; 76 } 77 78 //添加目标定时器timer 79 void add_timer(heap_timer* timer) throw(std::exception) 80 { 81 if(!timer) 82 { 83 return; 84 } 85 if(cur_size>=capacity) 86 { 87 resize(); 88 } 89 //新插入一个元素,当前堆大小加1,hole是新建空穴的位置 90 int hole = cur_size ; 91 int parent = 0; 92 //对从空穴到梗节点的路径上的所有节点执行上虑操作 93 for(;hole>0;hole=parent) 94 { 95 parent = (hole-1)/2; 96 if(array[parent]->expire <= timer->expire) 97 { 98 break; 99 }100 array[hole] = array[parent];101 }102 array[hole] = timer;103 }104 105 //删除目标定时器timer106 void del_timer(heap_timer* timer)107 {108 if(!timer)109 {110 return;111 }112 //仅将目标定时器的回调函数设置为空,即所谓的延迟销毁113 //这节省真正删除该定时器造成的开销,但容易使堆数组膨胀114 timer->cb_func = NULL;115 }116 117 //获得堆顶部的定时器118 heap_timer* top() const119 {120 if(empty())121 {122 return NULL;123 }124 return array[0];125 }126 127 //删除堆顶部的定时器128 void pop_timer()129 {130 if(empty())131 {132 return;133 }134 if(array[0])135 {136 delete array[0];137 //将原来的堆顶元素替换为堆数组中最后一个元素138 array[0] = array[--cur_size];139 percolate_down(0); //对新的堆顶元素执行下虑操作140 }141 }142 143 //心搏函数,处理一批到期的定时器144 void tick()145 {146 heap_timer* tmp = array[0];147 time_t cur = time(NULL);148 while(!empty())149 {150 if(!tmp)151 {152 break;153 }154 //如果堆顶定时器没有到期,则退出循环155 if(tmp->expire > cur)156 {157 break;158 }159 //否则就执行堆顶定时器中的任务160 if(array[0]->cb_func)161 {162 array[0]->cb_func(array[0]->user_data);163 }164 //将堆顶元素删除,同时生成新的堆顶定时器165 pop_timer();166 tmp = array[0];167 }168 }169 170 //判空函数171 bool empty() const { return cur_size == 0; }172 private:173 //最小堆的下虑操作,它确保数组中以第hole个节点作为根的子树拥有最小堆性质174 void percolate_down(int hole)175 {176 heap_timer* temp = array[hole];177 int child = 0;178 //hole当前节点 hole*2 1左子节点 179 for(;(hole*2 1)<=(cur_size-1);hole=child)180 {181 child = hole*2 1;182 if((child<(cur_size-1))&&(array[child 1]->expire < array[child]->expire))183 {184 child; //找到子节点超时值小的185 }186 if(array[child]->expire < temp->expire)187 {188 array[hole] = array[child];189 }else{190 break;191 }192 }193 array[hole] = temp;194 }195 196 //将堆数组容量扩大1倍197 void resize() throw(std::exception)198 {199 heap_timer** temp = new heap_timer* [2*capacity];200 if(!temp)201 {202 throw std::exception();203 }204 for(int i = 0; i < 2*capacity; i)205 {206 temp[i] = NULL;207 }208 capacity = 2 * capacity;209 for(int i = 0; i < cur_size; i)210 {211 temp[i] = array[i];212 }213 delete [] array;214 array = temp;215 }216 217 heap_timer** array; //堆数组218 int capacity; //堆数组的容量219 int cur_size; //堆数组当前包含元素的个数220 };221 222 #endif
对于时间堆而言,添加一个定时器的时间复杂度是O(lgn),删除一个定时器的时间复杂度是O(1),执行一个定时器的时间复杂度是O(1)。