一、整体流程概览
从GitHub下载源码后,代理的源码在src中,同时还用到了lib库中的一些函数。对项目的工作流程有个大概理解是分析mosquitto的访问控制权限的基础,网络上已有很多中文博客在介绍,如逍遥子,尽管比较老,但是主要结构体的意义没有变;首先对结构体的含义有所理解对后面进一步看源码是非常有帮助的,如struct mosquitto代表了一个客户端,mosquitto_db代表代理内的一个仓库来存储各种东西。
因为是C语言编写,首先寻找main函数,服务器从/src/mosquitto.c中的main函数开始启动。注意,看的时候会发现有很多宏定义(如WIN32),我们选择自己熟悉的一个平台开始看就好,把其他的折叠掉以免产生混乱。main函数进行了订阅树初始化和加载安全配置文件后,便进入mosquitto_main_loop主循环;该函数首先开始用epoll机制来监听socket读,之后便进入了真正的核心主循环while(run){},这里也才是服务器运行真正逻辑开始的地方。
从上至下流程概括如下:
- 检查并释放闲置的代表客户端结构体context;
- 然后通过哈希表的形式遍历客户端(即context),发送客户端context队列里的数据包,并且把超时的断掉。
- 通过epoll_wait循环处理socket事件,net__socket_accept里接受客户端的连接同时创建了该客户端的context;loop_handle_reads_writes根据读写事件发送或接收数据包。
- packet__read会读取所有数据包内容,然后开始调用handle__packet开始根据数据帧的协议类型分开处理,特别注意一下服务器使用的是src文件夹下的read_handle.c里的函数,区别于客户端使用的lib,有时候自动跳转会坑。根据handle__packet函数里的switch case,就可以方便的进行更详细的跟进。
while(run){//进入主死循环 context__free_disused(db); #ifdef WITH_SYS_TREE if(db->config->sys_interval > 0){ sys_tree__update(db, db->config->sys_interval, start_time); } #endif #ifndef WITH_EPOLL memset(pollfds, -1, sizeof(struct pollfd)*pollfd_max); pollfd_index = 0; for(i=0; i<listensock_count; i++){ pollfds[pollfd_index].fd = listensock[i]; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; pollfd_index++; } #endif now_time = time(NULL); time_count = 0; HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){//遍历哈希表 if(time_count > 0){ time_count--; }else{ time_count = 1000; now = mosquitto_time(); } context->pollfd_index = -1; if(context->sock != INVALID_SOCKET){ #ifdef WITH_BRIDGE if(context->bridge){ mosquitto__check_keepalive(db, context); if(context->bridge->round_robin == false && context->bridge->cur_address != 0 && context->bridge->primary_retry && now > context->bridge->primary_retry){ if(context->bridge->primary_retry_sock == INVALID_SOCKET){ rc = net__try_connect(context, context->bridge->addresses[0].address, context->bridge->addresses[0].port, &context->bridge->primary_retry_sock, NULL, false); if(rc == 0){ COMPAT_CLOSE(context->bridge->primary_retry_sock); context->bridge->primary_retry_sock = INVALID_SOCKET; context->bridge->primary_retry = 0; net__socket_close(db, context); context->bridge->cur_address = 0; } }else{ len = sizeof(int); if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ if(err == 0){ COMPAT_CLOSE(context->bridge->primary_retry_sock); context->bridge->primary_retry_sock = INVALID_SOCKET; context->bridge->primary_retry = 0; net__socket_close(db, context); context->bridge->cur_address = context->bridge->address_count-1; }else{ COMPAT_CLOSE(context->bridge->primary_retry_sock); context->bridge->primary_retry_sock = INVALID_SOCKET; context->bridge->primary_retry = now+5; } }else{ COMPAT_CLOSE(context->bridge->primary_retry_sock); context->bridge->primary_retry_sock = INVALID_SOCKET; context->bridge->primary_retry = now+5; } } } } #endif /* Local bridges never time out in this fashion. */ if(!(context->keepalive) || context->bridge || now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){ //判断当客户端在线时,给客户端发送inflight的数据包 if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){ #ifdef WITH_EPOLL if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){ if(!(context->events & EPOLLOUT)) { ev.data.fd = context->sock; ev.events = EPOLLIN | EPOLLOUT; if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno)); } } context->events = EPOLLIN | EPOLLOUT; } context->ws_want_write = false; } else{ if(context->events & EPOLLOUT) { ev.data.fd = context->sock; ev.events = EPOLLIN; if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno)); } } context->events = EPOLLIN; } } #else pollfds[pollfd_index].fd = context->sock; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){ pollfds[pollfd_index].events |= POLLOUT; context->ws_want_write = false; } context->pollfd_index = pollfd_index; pollfd_index++; #endif }else{ do_disconnect(db, context); } }else{//客户端超时 if(db->config->connection_messages == true){ if(context->id){ id = context->id; }else{ id = "<unknown>"; } log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", id); } /* Client has exceeded keepalive*1.5 */ do_disconnect(db, context); } } } #ifdef WITH_BRIDGE time_count = 0; for(i=0; i<db->bridge_count; i++){ if(!db->bridges[i]) continue; context = db->bridges[i]; if(context->sock == INVALID_SOCKET){ if(time_count > 0){ time_count--; }else{ time_count = 1000; now = mosquitto_time(); } /* Want to try to restart the bridge connection */ if(!context->bridge->restart_t){ context->bridge->restart_t = now+context->bridge->restart_timeout; context->bridge->cur_address++; if(context->bridge->cur_address == context->bridge->address_count){ context->bridge->cur_address = 0; } }else{ if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect) || (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){ #if defined(__GLIBC__) && defined(WITH_ADNS) if(context->adns){ /* Connection attempted, waiting on DNS lookup */ rc = gai_error(context->adns); if(rc == EAI_INPROGRESS){ /* Just keep on waiting */ }else if(rc == 0){ rc = bridge__connect_step2(db, context); if(rc == MOSQ_ERR_SUCCESS){ #ifdef WITH_EPOLL ev.data.fd = context->sock; ev.events = EPOLLIN; if(context->current_out_packet){ ev.events |= EPOLLOUT; } if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno)); } }else{ context->events = ev.events; } #else pollfds[pollfd_index].fd = context->sock; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; if(context->current_out_packet){ pollfds[pollfd_index].events |= POLLOUT; } context->pollfd_index = pollfd_index; pollfd_index++; #endif }else if(rc == MOSQ_ERR_CONN_PENDING){ context->bridge->restart_t = 0; }else{ context->bridge->cur_address++; if(context->bridge->cur_address == context->bridge->address_count){ context->bridge->cur_address = 0; } context->bridge->restart_t = 0; } }else{ /* Need to retry */ if(context->adns->ar_result){ freeaddrinfo(context->adns->ar_result); } mosquitto__free(context->adns); context->adns = NULL; context->bridge->restart_t = 0; } }else{ rc = bridge__connect_step1(db, context); if(rc){ context->bridge->cur_address++; if(context->bridge->cur_address == context->bridge->address_count){ context->bridge->cur_address = 0; } }else{ /* Short wait for ADNS lookup */ context->bridge->restart_t = 1; } } #else { rc = bridge__connect(db, context); context->bridge->restart_t = 0; if(rc == MOSQ_ERR_SUCCESS){ if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ context->bridge->primary_retry = now + 5; } #ifdef WITH_EPOLL ev.data.fd = context->sock; ev.events = EPOLLIN; if(context->current_out_packet){ ev.events |= EPOLLOUT; } if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno)); } }else{ context->events = ev.events; } #else pollfds[pollfd_index].fd = context->sock; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; if(context->current_out_packet){ pollfds[pollfd_index].events |= POLLOUT; } context->pollfd_index = pollfd_index; pollfd_index++; #endif }else{ context->bridge->cur_address++; if(context->bridge->cur_address == context->bridge->address_count){ context->bridge->cur_address = 0; } } } #endif } } } } #endif now_time = time(NULL); if(db->config->persistent_client_expiration > 0 && now_time > expiration_check_time){ HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){ if(context->sock == INVALID_SOCKET && context->clean_session == 0){ /* This is a persistent client, check to see if the * last time it connected was longer than * persistent_client_expiration seconds ago. If so, * expire it and clean up. */ if(now_time > context->disconnect_t+db->config->persistent_client_expiration){ if(context->id){ id = context->id; }else{ id = "<unknown>"; } log__printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", id); G_CLIENTS_EXPIRED_INC(); context->clean_session = true; context->state = mosq_cs_expiring; do_disconnect(db, context); } } } expiration_check_time = time(NULL) + 3600; } #ifndef WIN32 sigprocmask(SIG_SETMASK, &sigblock, &origsig); #ifdef WITH_EPOLL //监听socket事件 fdcount = epoll_wait(db->epollfd, events, MAX_EVENTS, 100); #else fdcount = poll(pollfds, pollfd_index, 100); #endif sigprocmask(SIG_SETMASK, &origsig, NULL); #else fdcount = WSAPoll(pollfds, pollfd_index, 100); #endif #ifdef WITH_EPOLL switch(fdcount){ case -1: if(errno != EINTR){ log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll waiting: %s.", strerror(errno)); } break; case 0: break; default: //循环处理socket事件 for(i=0; i<fdcount; i++){ for(j=0; j<listensock_count; j++){ if (events[i].data.fd == listensock[j]) { if (events[i].events & (EPOLLIN | EPOLLPRI)){ //接受客户端的连接,net__socket_accept里同时创建了该客户端的context while((ev.data.fd = net__socket_accept(db, listensock[j])) != -1){ ev.events = EPOLLIN; if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) { log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: %s", strerror(errno)); } context = NULL; HASH_FIND(hh_sock, db->contexts_by_sock, &(ev.data.fd), sizeof(mosq_sock_t), context); if(!context) { log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context"); } context->events = EPOLLIN; } } break; } } if (j == listensock_count) { loop_handle_reads_writes(db, events[i].data.fd, events[i].events); } } } #else if(fdcount == -1){ log__printf(NULL, MOSQ_LOG_ERR, "Error in poll: %s.", strerror(errno)); }else{ loop_handle_reads_writes(db, pollfds); for(i=0; i<listensock_count; i++){ if(pollfds[i].revents & (POLLIN | POLLPRI)){ while(net__socket_accept(db, listensock[i]) != -1){ } } } } #endif #ifdef WITH_PERSISTENCE if(db->config->persistence && db->config->autosave_interval){ if(db->config->autosave_on_changes){ if(db->persistence_changes >= db->config->autosave_interval){ persist__backup(db, false); db->persistence_changes = 0; } }else{ if(last_backup + db->config->autosave_interval < mosquitto_time()){ persist__backup(db, false); last_backup = mosquitto_time(); } } } #endif #ifdef WITH_PERSISTENCE if(flag_db_backup){ persist__backup(db, false); flag_db_backup = false; } #endif if(flag_reload){ log__printf(NULL, MOSQ_LOG_INFO, "Reloading config."); config__read(db, db->config, true); mosquitto_security_cleanup(db, true); mosquitto_security_init(db, true); mosquitto_security_apply(db); log__close(db->config); log__init(db->config); flag_reload = false; } if(flag_tree_print){ sub__tree_print(db->subs, 0); flag_tree_print = false; } #ifdef WITH_WEBSOCKETS for(i=0; i<db->config->listener_count; i++){ /* Extremely hacky, should be using the lws provided external poll * interface, but their interface has changed recently and ours * will soon, so for now websockets clients are second class * citizens. */ if(db->config->listeners[i].ws_context){ libwebsocket_service(db->config->listeners[i].ws_context, 0); } } if(db->config->have_websockets_listener){ temp__expire_websockets_clients(db); } #endif }//end while(run)