1.这几天研究skynet中的 lua-netpack.c 中的解析数据包过程。于是把lua部分去掉,修改了一些接口,留下解包相关的代码。再结合云风写的网络代码的例子,
写了一个最简单形式的客户端封包,服务器解包的代码,作为学习笔记的同时也希望能够帮助一些像我一样的新手学习理解封包,解包的概念。
ps:修改的代码实现了,当收到一个整包时,打印整包内容的功能,但是并没有从完整的包队列中pop完整包的接口,可自行加上。
2.服务器和客户端程序简介:
server:server端实现的很简单,基于云风给出的skynet网络层代码的例子。在收到客户端发来数据以后,加上了解包过程。
1):生成。将代码放到一个文件夹下以后,直接运行make便可得到 socket-server, 执行 ./socket-server 运行程序。
2):可自行在解包处 filter_data_里加上log来具体分析解包过程,ps:我就是这样搞的。
3):理解流程以后,可自行实现修改 message_resolve.c和test.c来实现更详细的测试
client: 该客户端比较简单,只是简单的建立连接,然后输入发送消息的条数(n),然后会自动随机生成n条长度为 1到2^16次方的随机消息(因为skynet的消息长度字段是两个字节)
ps:之前是手动输入消息,但是消息太短会导致服务器解包过程中的一些条件覆盖不到,于是只需输入消息条数,让程序自动生成随机长度消息(实际上只是字符'a'的序列,可以自行修改
生成消息的函数 gen_msg 来生成其它内容的消息)
1):client只有一个文件socket-cient.c 直接通过命令行 gcc -o socket-client socket-client.c 来获得客户端的可执行程序 socket-client,
运行./socket-client 来执行客户端程序。
2):修改 #define MSG_LEN 来更改随机消息长度范围,可设置小一些。
3.解包过程浅析:
1客户端消息格式: 2字节消息长度(大端字节序)
服务器解包过程中的几种情况:
1)首次收到消息包的时候,可能之后到一个字节,因为每个包开头两字节组成包长字段,所以这种情况要将其放入为接受完队列。待下次从该fd中读取剩余消息;
2)当收到的是整个包的时候,直接获得完整的消息;
3)当收到的是多余一个包时,将获得的完整包放入完整的消息包队列,然后将剩余的消息放入未完成消息队列中,下次接收到消息时,找到该fd的未完成消息队列,
继续解析。
4.代码:结构如下
Makefile:
socket-server : socket_server.c message_resolve.c test.c gcc -g -Wall -o $@ $^ -lpthread -I/message_resolve.h clean: rm -f socket-server
message_resolve.c:
#include "skynet_malloc.h" #include "message_resolve.h" #include <assert.h> #include <stdint.h> #include <stdlib.h> #include <string.h> #include <stdio.h> #define QUEUESIZE 1024 #define HASHSIZE 4096 #define SMALLSTRING 2048 #define TYPE_DATA 1 #define TYPE_MORE 2 #define TYPE_ERROR 3 #define TYPE_OPEN 4 #define TYPE_CLOSE 5 #define TYPE_WARNING 6 /* Each package is uint16 + data , uint16 (serialized in big-endian) is the number of bytes comprising the data . */ struct netpack { int id; int size; void * buffer; }; struct uncomplete { struct netpack pack; struct uncomplete * next; int read; int header; }; struct queue { int cap; int head; int tail; struct uncomplete * hash[HASHSIZE]; struct netpack queue[QUEUESIZE]; }; static struct queue *Q; void init_queue() { struct queue *q = (struct queue *)skynet_malloc(sizeof(*q)); q->head = 0; q->tail = 0; q->cap = HASHSIZE; int i; for (i = 0; i < HASHSIZE; i++) { q->hash[i] = NULL; } Q = q; } /*static void clear_list(struct uncomplete * uc) { while (uc) { skynet_free(uc->pack.buffer); void * tmp = uc; uc = uc->next; skynet_free(tmp); } }*/ /*static int clear(struct queue *q) { if (q == NULL) { return 0; } int i; for (i = 0; i < HASHSIZE; i++) { clear_list(q->hash[i]); q->hash[i] = NULL; } if(q->head > q->tail) { q->tail += q->cap; } for (i = q->head; i < q->tail; i++) { struct netpack *np = &q->queue[i % q->cap]; skynet_free(np->buffer); np->buffer = NULL; } q->head = q->tail = 0; return 0; }*/ static inline int hash_fd(int fd) { int a = fd >> 24; int b = fd >> 12; int c = fd; return (int)(((uint32_t)(a+b+c))) % HASHSIZE; } static inline int read_size(uint8_t *buffer) { int r = (int)buffer[0] << 8 | (int)buffer[1]; return r; } static struct uncomplete * save_uncomplete(struct queue *q, int fd) { int h = hash_fd(fd); struct uncomplete *uc = (struct uncomplete *)skynet_malloc(sizeof(struct uncomplete)); memset(uc, 0, sizeof(*uc)); uc->next = q->hash[h]; uc->pack.id = fd; q->hash[h] = uc; return uc; } static struct uncomplete * find_uncomplete(struct queue *q, int fd) { if (q == NULL) { return NULL; } int h = hash_fd(fd); struct uncomplete *uc = q->hash[h]; if (uc == NULL) { return NULL; } if (uc->pack.id == fd) { q->hash[h] = uc->next; return uc; } //hash冲突,可能不同fd对应同一个slot,根据id == fd 区分 struct uncomplete *last = uc; while(last->next) { uc = last->next; if(uc->pack.id == fd) { last->next = uc->next; return uc; } last = uc; } return NULL; } static inline void expand_queue(struct queue *q) { struct queue *nq = (struct queue *)skynet_malloc(sizeof(struct queue) + sizeof(struct netpack) * q->cap); nq->cap = QUEUESIZE + q->cap; nq->head = 0; nq->tail = q->cap; memcpy(nq->hash, q->hash, sizeof(nq->hash)); memset(q->hash, 0, sizeof(q->hash)); //之前有疑惑,为什么queue 不能用memcpy直接拷贝,原因是,要保证新队列的值是从旧队列的head开始的,但是旧队列的head并不一定是下标==0得到完整的一个包,并push到队列 int i; for (i = 0; i < q->cap; i++) { int idx = (q->head + i) % q->cap; nq->queue[i] = q->queue[idx]; } q->head = q->tail = 0; Q = nq; } //得到完整的一个包,并push到队列 static inline void push_data(struct queue *q, int fd, uint8_t *buffer, int size, int clone) { if(clone) { void *tmp = (void *)skynet_malloc(size); memcpy(tmp, buffer, size); buffer = tmp; } struct netpack *np = &q->queue[q->tail]; if(++q->tail >= q->cap) { q->tail -= q->cap; } np->id = fd; np->size = size; np->buffer = buffer; if(q->head == q->tail) { expand_queue(q); } } static inline void push_more(struct queue *q, uint8_t *buffer, int size, int fd) { if (size == 1) { struct uncomplete *uc = save_uncomplete(q, fd); uc->header = *buffer; uc->read = -1; return; } int pack_size = read_size(buffer); buffer += 2; size -= 2; if (size < pack_size) { struct uncomplete *uc = save_uncomplete(q, fd); uc->pack.size = pack_size; uc->read = size; uc->pack.buffer = (void *)skynet_malloc(pack_size); memcpy(uc->pack.buffer, buffer, size); return; } push_data(q, fd, buffer, pack_size, 1); buffer += pack_size; size -= pack_size; if (size > 0) { push_more(q, buffer, size, fd); } } /*static void close_uncomplete(struct queue *q, int fd) { struct uncomplete *uc = find_uncomplete(q, fd); if (uc) { skynet_free(uc->pack.buffer); skynet_free(uc); } }*/ int filter_data_(uint8_t *buffer, int size, int fd, struct my_data *md) { printf("size[%d], fd[%d]\n", size, fd); struct queue *q = Q; struct uncomplete *uc = find_uncomplete(q, fd); if (uc) { if (uc->read < 0) { assert(uc->read == -1); int pack_size = *buffer; pack_size |= uc->header << 8; ++buffer; --size; uc->pack.size = pack_size; uc->pack.buffer = (void *)skynet_malloc(pack_size); uc->read = 0; } int need = uc->pack.size - uc->read; printf("need[%d];pack.size[%d];uc->read[%d]\n", need , uc->pack.size ,uc->read); if (size < need) { memcpy(uc->pack.buffer + uc->read, buffer, size); uc->read += size; int h = hash_fd(fd); uc->next = q->hash[h]; q->hash[h] = uc; return 1; } memcpy(uc->pack.buffer + uc->read, buffer, need); buffer += need; size -= need; if (size == 0) { //TODO md->size = uc->pack.size; md->fd = fd; md->data = (void *)skynet_malloc(md->size); memcpy(md->data, uc->pack.buffer, uc->pack.size); skynet_free(uc); return 0; } push_data(q, fd, uc->pack.buffer, uc->pack.size, 0); skynet_free(uc); push_more(q, buffer, size, fd); return 2; } else { if(size == 1) { struct uncomplete *uc = save_uncomplete(q, fd); uc->read = -1; uc->header = *buffer; return 1; } int pack_size = read_size(buffer); buffer += 2; size -= 2; //printf("pack_size[%d], size[%d], buffer[%s]\n", pack_size, size, buffer); if (size < pack_size) { printf("size < pack_size\n"); struct uncomplete *uc = save_uncomplete(q, fd); uc->read = size; uc->pack.size = pack_size; uc->pack.buffer = (void *)skynet_malloc(pack_size); memcpy(uc->pack.buffer, buffer, size); return 1; } if(size == pack_size) { printf("size == packsize\n"); //TODO md->size = size; md->fd = fd; md->data = (void *)skynet_malloc(size); memcpy(md->data, buffer, size); return 0; } push_data(q, fd, buffer, pack_size, 1); buffer += pack_size; size -= pack_size; push_more(q, buffer, size, fd); return 2; } }