专注综合财经股票网,提供炒股知识,追涨停技巧等文章,是广大股民的学习社区!

brpc中定时器的实现

发布:互联网2019-09-17 08:52:25分类: 综合

这里主要分析下brpc中多线程下定时器实现,主要是设计思路,顺便列一下其他开源中的实现,这里主要是整体架构实现,非单纯的定时器实现。

如果要测试对比性能,可能也没法对比,可能分析下复杂度,因为不同的实现如小根堆,timewheel,linux内核中的实现以及fd之类的实现,各有不同,需要根据自己的业务去处理选择,这边以开源实现分析,fd这个不是很清楚就暂时跳过。

管理多个定时节点的数据结构有小根堆,timewheel,linux内核中的实现和brpc中多个bucket加链表的形式。一般定时任务都由一个定时线程来处理,可能是那种每隔一段时间执行一次,要执行多少次,这里可以用个loop字段控制即可。

小根堆

小根堆的时间复杂度在建堆时为O(n),超时删除根节点和增加定时节点时调整堆为O(logn),删除某个定时节点要调整堆结构,也是O(logn)。以根节点和当前时间比较超时距离,epoll_wait这个时间差,要么超时要么有事件发生,类似实现比如redis,或者如phxrpc一样强制4ms,虽然有点误差,但这里的设计有点微妙。这里列出phxrpc中协程加timer的实现部分:

std::vector<TimerObj> timer_heap_;  //timer的实现245 bool UThreadEpollScheduler::Run() {246     //more code...250     int next_timeout = timer_.GetNextTimeout();251 252     for (; (run_forever_) || (!runtime_.IsAllDone());) {253         int nfds = epoll_wait(epoll_fd_, events, max_task_, 4);//强制wait 4ms254         //more code...285             DealwithTimeout(next_timeout); //处理超时任务295     } //more code296     return true;297 }315 void UThreadEpollScheduler::DealwithTimeout(int &next_timeout) {316     while (true) {317         next_timeout = timer_.GetNextTimeout();318         if (0 != next_timeout) {319             break;320         }321 322         UThreadSocket_t * socket = timer_.PopTimeout();323         socket->waited_events = UThreadEpollREvent_Timeout;324         runtime_.Resume(socket->uthread_id);325     }326 }//让出协程533 void UThreadWait(UThreadSocket_t &socket, const int timeout_ms) {534     socket.uthread_id = socket.scheduler->GetCurrUThread();535     socket.scheduler->AddTimer(&socket, timeout_ms);//添加定时器536     socket.scheduler->YieldTask(); //切出去537     socket.scheduler->RemoveTimer(socket.timer_id);//切回来后删除定时器538 }

每个线程一个timer管理器,不会有竞争,且一个线程里有一些协程。

timewheel

timewheel是时间轮定时器,比如有100个槽的时间轮,会把当前时间的对应tick数加上一个超时时间tick,取99余,然后分发到某个槽中,比如每200ms一个tick,那么下一个tick时处理当前的槽中,判断每个节点有没有超时,同一个槽中的节点都是对tick数取余相等的节点,一般用链表实现。这样余相同的可能会超时,可能不会超时,余不相同的肯定不会在当前tick超时。那这里可能每个tick要唤醒一次,然后遍历当前的槽并判断有没有超时,这个链表的长短可能不均匀,且超时后,摘除节点为O(1)。如果tick的精度不够,可能被唤醒的次数更多。

之前看过某开源是这么实现的,然后上一个抗ddos防火墙项目也是这么设计的,多线程,但每个线程中都有一个timewheel,挂着几千万个会话信息,等待超时。这块就不分析具体开源实现了。

类似实现可以参考TimingWheel[时间轮]介绍

linux内核中的定时器

linux内核中的定时器,我没看,但是看的是skynet中的,具体移至浅析skynet底层框架中篇,结构如下:

 46 struct timer { 47     struct link_list near[TIME_NEAR];  //即将处理的节点 48     struct link_list t[4][TIME_LEVEL];  //由时间远近分层 49     struct spinlock lock; 50     uint32_t time;  //累计多少个十毫秒 51     uint32_t starttime; 52     uint64_t current; 53     uint64_t current_point; 54 }; 55 static void 56 wakeup(struct monitor *m, int busy) { 57     if (m->sleep >= m->count - busy) { 58         // signal sleep worker, "spurious wakeup" is harmless 59         pthread_cond_signal(&m->cond); 60     } 61 } 63 static void * 64 thread_socket(void *p) { 65     struct monitor * m = p; 66     skynet_initthread(THREAD_SOCKET); 67     for (;;) { 68         int r = skynet_socket_poll(); 69         //more code ... 75         wakeup(m,0); //有消息过来,唤醒一个工程线程处理定时任务 76     } 77     return NULL; 78 }128 static void *129 thread_timer(void *p) {130     struct monitor * m = p;131     skynet_initthread(THREAD_TIMER);132     for (;;) {133         skynet_updatetime();134         CHECK_ABORT135         wakeup(m,m->count-1);  //唤醒一个工作线程136         usleep(2500); //强制sleep2.5ms137         //more code ...141     }142     //more code ...149     return NULL;150 }152 static void *153 thread_worker(void *p) {154     //more code ...161     while (!m->quit) {162         q = skynet_context_message_dispatch(sm, q, weight);163         if (q == NULL) {164             if (pthread_mutex_lock(&m->mutex) == 0) { 165                 ++ m->sleep;166                 // "spurious wakeup" is harmless,167                 // because skynet_context_message_dispatch() can be call at any time.168                 if (!m->quit)169                     pthread_cond_wait(&m->cond, &m->mutex);//没消息则挂起170                 -- m->sleep;171                 if (pthread_mutex_unlock(&m->mutex)) {173                     exit(1);174                 }175             }176         }177     }178     return NULL;179 } 95 static void 96 timer_add(struct timer *T,void *arg,size_t sz,int time) { 97     struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz); 98     memcpy(node+1,arg,sz); 99 100     SPIN_LOCK(T);101 102         node->expire=time+T->time;103         add_node(T,node);104 105     SPIN_UNLOCK(T);106 }

如上实现,多线程竞争一把全局自旋锁,然后把定时任务添加到timer结构中,并设置超时时间。

brpc timer的实现

关于brpc中timer+futex的实现,看github上的参考资料不是特别明白这么用的好处是什么和设计出于什么想法,然后在群上问gejun大佬貌似没收到回复,哈哈。还是先分析下吧它的实现吧。

其中自己实现了futex功能,具体用在什么场景和好处, 这个还需要多想想:

 29 class SimuFutex { 30 public: 31     SimuFutex() : counts(0) 32                 , ref(0) { 33         pthread_mutex_init(&lock, NULL); 34         pthread_cond_init(&cond, NULL); 35     } 36     ~SimuFutex() { 37         pthread_mutex_destroy(&lock); 38         pthread_cond_destroy(&cond); 39     } 40  41 public:      42     pthread_mutex_t lock; 43     pthread_cond_t cond; 44     int32_t counts;  //有多少个线程等待 45     int32_t ref; //引用计数 46 }; 48 static pthread_mutex_t s_futex_map_mutex = PTHREAD_MUTEX_INITIALIZER; 49 static pthread_once_t init_futex_map_once = PTHREAD_ONCE_INIT; 50 static std::unordered_map<void*, SimuFutex>* s_futex_map = NULL; 60 int futex_wait_private(void* addr1, int expected, const timespec* timeout) { //等待 61     if (pthread_once(&init_futex_map_once, InitFutexMap) != 0) { 63         exit(1); 64     } 65     std::unique_lock<pthread_mutex_t> mu(s_futex_map_mutex); 66     SimuFutex& simu_futex = (*s_futex_map)[addr1]; 67     ++simu_futex.ref; 68     mu.unlock(); 69  70     int rc = 0; 71     { 72         std::unique_lock<pthread_mutex_t> mu1(simu_futex.lock); 73         if (static_cast<butil::atomic<int>*>(addr1)->load() == expected) { 74             ++simu_futex.counts; 75             if (timeout) { 76                 timespec timeout_abs = butil::timespec_from_now(*timeout); 77                 if ((rc = pthread_cond_timedwait(&simu_futex.cond, &simu_futex.lock, &timeout_ab    s)) != 0) { 78                     errno = rc; 79                     rc = -1; 80                 } 81             } else { 82                 if ((rc = pthread_cond_wait(&simu_futex.cond, &simu_futex.lock)) != 0) { 83                     errno = rc; 84                     rc = -1; 85                 } 86             } 87             --simu_futex.counts; 88         } else { 89             errno = EAGAIN; 90             rc = -1; 91         } 92     } 94     std::unique_lock<pthread_mutex_t> mu1(s_futex_map_mutex); 95     if (--simu_futex.ref == 0) { 96         s_futex_map->erase(addr1); 97     } 98     mu1.unlock(); 99     return rc;100 }102 int futex_wake_private(void* addr1, int nwake) { //唤醒103     if (pthread_once(&init_futex_map_once, InitFutexMap) != 0) {105         exit(1);106     }107     std::unique_lock<pthread_mutex_t> mu(s_futex_map_mutex);108     auto it = s_futex_map->find(addr1);109     if (it == s_futex_map->end()) {110         mu.unlock();111         return 0;112     }113     SimuFutex& simu_futex = it->second;114     ++simu_futex.ref;115     mu.unlock();116 117     int nwakedup = 0;118     int rc = 0;119     {120         std::unique_lock<pthread_mutex_t> mu1(simu_futex.lock);121         nwake = (nwake < simu_futex.counts)? nwake: simu_futex.counts;122         for (int i = 0; i < nwake; ++i) {123             if ((rc = pthread_cond_signal(&simu_futex.cond)) != 0) {124                 errno = rc;125                 break;126             } else {127                 ++nwakedup;128             }129         }130     }131 132     std::unique_lock<pthread_mutex_t> mu2(s_futex_map_mutex);133     if (--simu_futex.ref == 0) {134         s_futex_map->erase(addr1);135     }136     mu2.unlock();137     return nwakedup;138 }

引用连接中的一段话“For reference, on my 4.0 SELinux test server with support for syscall auditing enabled, the minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is 2.7 usec, and the average is more like 10 usec. That can be a big drag on RockDB’s single-writer design.”,不过貌似mutex的实现也差不多,先看能不能获取锁,能的话直接返回,如果下面的结构:

struct mutex { 引用计数器 1: 锁可以利用。  小于等于0:该锁已被获取,需要等待 atomic_t  count;  自旋锁类型,保证多cpu下,对等待队列访问是安全的。 spinlock_t  wait_lock;  等待队列,如果该锁被获取,任务将挂在此队列上,等待调度。 struct list_head wait_list;};

linux 2.6 互斥锁的实现-源码分析

brpc中的timer也是一个线程来处理,但这里把每个timer散列到buckets中的一个,这样降低锁的竞争,这种设计思想在leveldb中的cache也有。每个bucket有一把锁。

 41 struct BAIDU_CACHELINE_ALIGNMENT TimerThread::Task { 42     Task* next;                 // For linking tasks in a Bucket. 43     int64_t run_time;           // run the task at this realtime 44     void (*fn)(void*);          // the fn(arg) to run 45     void* arg; 46     // Current TaskId, checked against version in TimerThread::run to test 47     // if this task is unscheduled. 48     TaskId task_id; 49     // initial_version:     not run yet 50     // initial_version + 1: running 51     // initial_version + 2: removed (also the version of next Task reused 52     //                      this struct) 53     butil::atomic<uint32_t> version; 54  55     Task() : version(2/*skip 0*/) {} 56  57     // Run this task and delete this struct. 58     // Returns true if fn(arg) did run. 59     bool run_and_delete(); 60  61     // Delete this struct if this task was unscheduled. 62     // Returns true on deletion. 63     bool try_delete(); 64 }; 66 // Timer tasks are sharded into different Buckets to reduce contentions. 67 class BAIDU_CACHELINE_ALIGNMENT TimerThread::Bucket { 68 public: 69     Bucket() 70         : _nearest_run_time(std::numeric_limits<int64_t>::max()) 71         , _task_head(NULL) { 72     } 73  74     ~Bucket() {} 75  76     struct ScheduleResult { 77         TimerThread::TaskId task_id; 78         bool earlier; 79     }; 80  81     // Schedule a task into this bucket. 82     // Returns the TaskId and if it has the nearest run time. 83     ScheduleResult schedule(void (*fn)(void*), void* arg, 84                             const timespec& abstime); 85  86     // Pull all scheduled tasks. 87     // This function is called in timer thread. 88     Task* consume_tasks(); 89  90 private: 91     internal::FastPthreadMutex _mutex; 92     int64_t _nearest_run_time; 93     Task* _task_head; 94 };
180 TimerThread::Bucket::ScheduleResult181 TimerThread::Bucket::schedule(void (*fn)(void*), void* arg,182                               const timespec& abstime) {183     butil::ResourceId<Task> slot_id;184     Task* task = butil::get_resource<Task>(&slot_id);185     if (task == NULL) {186         ScheduleResult result = { INVALID_TASK_ID, false };187         return result;188     }189     task->next = NULL;190     task->fn = fn;191     task->arg = arg;192     task->run_time = butil::timespec_to_microseconds(abstime);193     uint32_t version = task->version.load(butil::memory_order_relaxed);194     if (version == 0) {  // skip 0.195         task->version.fetch_add(2, butil::memory_order_relaxed);196         version = 2;197     }198     const TaskId id = make_task_id(slot_id, version);199     task->task_id = id;200     bool earlier = false;201     {202         BAIDU_SCOPED_LOCK(_mutex);203         task->next = _task_head;204         _task_head = task;205         if (task->run_time < _nearest_run_time) {206             _nearest_run_time = task->run_time;207             earlier = true;208         }209     }210     ScheduleResult result = { id, earlier };211     return result;212 }

以上是定位到某个bucket后,和bucket自己的_nearest_run_time比较,如果更早则earlier为true且之后会与timer中的_nearest_run_time比较,并可能进行wait或wake:

214 TimerThread::TaskId TimerThread::schedule(215     void (*fn)(void*), void* arg, const timespec& abstime) {216     if (_stop.load(butil::memory_order_relaxed) || !_started) {217         // Not add tasks when TimerThread is about to stop.218         return INVALID_TASK_ID;219     }220     // Hashing by pthread id is better for cache locality.221     const Bucket::ScheduleResult result =222         _buckets[butil::fmix64(pthread_numeric_id()) % _options.num_buckets]223         .schedule(fn, arg, abstime);//hash到某个bucket中224     if (result.earlier) { //有更早的timer过来225         bool earlier = false;226         const int64_t run_time = butil::timespec_to_microseconds(abstime);227         {228             BAIDU_SCOPED_LOCK(_mutex);229             if (run_time < _nearest_run_time) {//和全局的比较230                 _nearest_run_time = run_time;231                 ++_nsignals;232                 earlier = true; //需要唤醒233             }234         }235         if (earlier) {236             futex_wake_private(&_nsignals, 1);237         }238     }239     return result.task_id;240 }

如果多个工作线程调用timerThread->schedule(...)添加定时任务时,而且都比hash到某个bucket中的_nearest_run_time更早,其实还是在228行有竞争。

timer线程的主要逻辑如下:

310 void TimerThread::run() {319     // min heap of tasks (ordered by run_time)320     std::vector<Task*> tasks;321     tasks.reserve(4096);339     while (!_stop.load(butil::memory_order_relaxed)) {343         {344             BAIDU_SCOPED_LOCK(_mutex);345             _nearest_run_time = std::numeric_limits<int64_t>::max();//在这条语句之前,_nearest_run_time是所有最早的时间点346         }347 348         // 遍历_buckets收集没有被unscheduled的定时任务349         for (size_t i = 0; i < _options.num_buckets; ++i) {350             Bucket& bucket = _buckets[i];351             for (Task* p = bucket.consume_tasks(); p != NULL;352                  p = p->next, ++nscheduled) {353                 if (!p->try_delete()) { // remove the task if it's unscheduled354                     tasks.push_back(p);355                     std::push_heap(tasks.begin(), tasks.end(), task_greater);356                 }357             }358         }//维护个堆结构360         bool pull_again = false;361         while (!tasks.empty()) {362             Task* task1 = tasks[0];  // the about-to-run task363             if (task1->try_delete()) { // already unscheduled364                 std::pop_heap(tasks.begin(), tasks.end(), task_greater);365                 tasks.pop_back();366                 continue;367             }368             if (butil::gettimeofday_us() < task1->run_time) {  // not ready yet.369                 break;370             }381             {382                 BAIDU_SCOPED_LOCK(_mutex);383                 if (task1->run_time > _nearest_run_time) {384                     // a task is earlier than task1. We need to check buckets.385                     pull_again = true; //在执行任务的过程中又有更早的任务到来,则会重新遍历一次386                     break;387                 }388             }389             std::pop_heap(tasks.begin(), tasks.end(), task_greater);390             tasks.pop_back();391             if (task1->run_and_delete()) { //执行定时任务393             }394         }395         if (pull_again) {397             continue;398         }400         // 算出需要睡眠的时间401         int64_t next_run_time = std::numeric_limits<int64_t>::max();402         if (tasks.empty()) {403             next_run_time = std::numeric_limits<int64_t>::max();404         } else {405             next_run_time = tasks[0]->run_time;406         }411         int expected_nsignals = 0;412         {413             BAIDU_SCOPED_LOCK(_mutex);414             if (next_run_time > _nearest_run_time) {//全局时间有变化,需要重新处理415                 // a task is earlier that what we would wait for.416                 // We need to check buckets.417                 continue;418             } else {419                 _nearest_run_time = next_run_time;420                 expected_nsignals = _nsignals;421             }422         }423         timespec* ptimeout = NULL;424         timespec next_timeout = { 0, 0 };425         const int64_t now = butil::gettimeofday_us();426         if (next_run_time != std::numeric_limits<int64_t>::max()) {427             next_timeout = butil::microseconds_to_timespec(next_run_time - now);428             ptimeout = &next_timeout;429         }            //睡眠431         futex_wait_private(&_nsignals, expected_nsignals, ptimeout);433     }435 }

在run的过程中,可能会有些变量会被其他线程修改,这时需要重新处理,以免造成误差,比如不能及时醒来。当_nsignals不等于expected_nsignals时就不会去真正的pthread_cond_timedwait,而_nsignals的改变只会在有更早的定时任务到来时才会唤醒。

有些注释和原理可以看下源码,这里只是贴上主要代码并加一些自己的理解。觉得自己还是要多理解思考下上面代码的实现。

参考资料timer_keeping.mdrocksdb源码分析 写优化之JoinBatchGroupLinux Futex浅析多线程编程的时候,使用无锁结构会不会比有锁结构更加快?

温馨提示如有转载或引用以上内容之必要,敬请将本文链接作为出处标注,谢谢合作!