在redis源码从main开始7中看完了一个命令的执行过程,本章我们来看redis将命令执行的结果返回给客户端的过程
// 从main开始之回复消息
/*
* 把redis对象创建回复给客户端
*/
void addReplyBulk(redisClient *c, robj *obj)
{
addReplyBulkLen(c, obj); // 回复前缀既 回复长度\r\n
addReply(c, obj); // 回复的具体内容
addReply(c, shared.crlf); // 增加常见回复\r\n
}
/* 创建一个回复前缀 */
void addReplyBulkLen(redisClient *c, robj *obj)
{
size_t len;
if (sdsEncodedObject(obj))
{
len = sdslen(obj->ptr);
}
else
{
long n = (long)obj->ptr;
/* 回复的长度 */
len = 1;
if (n<0)
{
len++;
n = -n;
}
while ((n = n / 10) != 0)
{
len++;
}
}
if (len'); // 构建字符串
}
// 给客户端发消息
void addReply(redisClient *c, robj *obj)
{
// 给能发送消息的网络客户端增加写消息的处理器
if (prepareClientToWrite(c) != REDIS_OK)
return;
/*
* 一些优化判断类型看要不要拷贝啥的
*/
if (sdsEncodedObject(obj))
{
// 试试看能不能直接复制进c->buf中
if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != REDIS_OK)
// 不能复制 c->buf 中,就加到 c->reply 链表中
_addReplyObjectToList(c, obj);
}
else if (obj->encoding == REDIS_ENCODING_INT)
{
/* 优化 试试看能不能长整数优化成字符串*/
if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32)
{
char buf[32];
int len;
len = ll2string(buf, sizeof(buf), (long)obj->ptr);
if (_addReplyToBuffer(c, buf, len) == REDIS_OK)
return;
}
obj = getDecodedObject(obj);
// 普通的发送redis对象的方式
if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c, obj);
decrRefCount(obj);
}
else
{
redisPanic("Wrong obj->encoding in addReply()");
}
}
/*
创建新的事件循环写的处理方法
*/
int prepareClientToWrite(redisClient *c)
{
// LUA 脚本环境所使用的伪客户端总是可写的
if (c->flags & REDIS_LUA_CLIENT)
return REDIS_OK;
// 客户端是主服务器并且不接受查询,
// 那么它是不可写的,出错
if ((c->flags & REDIS_MASTER) &&
!(c->flags & REDIS_MASTER_FORCE_REPLY))
return REDIS_ERR;
// 无连接的伪客户端总是不可写的
if (c->fd <= 0)
return REDIS_ERR; /* Fake client */
// 一般情况,为客户端套接字安装写处理器到事件循环
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR)
return REDIS_ERR;
return REDIS_OK;
}
/*
* 事件触发 给套接字写内容的方法
*/
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
{
redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen;
size_t objmem;
robj *o;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
// 有内容就发
while (c->bufpos > 0 || listLength(c->reply))
{
if (c->bufpos > 0)
{
// 写入内容到套接字
nwritten = write(fd, c->buf + c->sentlen, c->bufpos - c->sentlen);
if (nwritten <= 0)
break;
c->sentlen += nwritten;
totwritten += nwritten;
/* 发光了就清空*/
if (c->sentlen == c->bufpos)
{
c->bufpos = 0;
c->sentlen = 0;
}
}
else
{
// 发送链表的等待发送内容
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr);
objmem = getStringObjectSdsUsedMemory(o);
// 略过空对象
if (objlen == 0)
{
listDelNode(c->reply, listFirst(c->reply));
c->reply_bytes -= objmem;
continue;
}
// 写入内容到套接字
nwritten = write(fd, ((char *)o->ptr) + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0)
break;
c->sentlen += nwritten;
totwritten += nwritten;
// 删除写完的节点
if (c->sentlen == objlen)
{
listDelNode(c->reply, listFirst(c->reply));
c->sentlen = 0;
c->reply_bytes -= objmem;
}
}
/*
// 达到一定写入长度就中断,等待下次在写
*/
if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory))
break;
}
// 写入出错检查
if (nwritten == -1)
{
if (errno == EAGAIN)
{
nwritten = 0;
}
else
{
redisLog(REDIS_VERBOSE,
"Error writing to client: %s", strerror(errno));
freeClient(c);
return;
}
}
if (totwritten > 0)
{
/* 检查发送超时的东西 */
if (!(c->flags & REDIS_MASTER))
c->lastinteraction = server.unixtime;
}
if (c->bufpos == 0 && listLength(c->reply) == 0)
{
c->sentlen = 0;
// 删除 写事件
aeDeleteFileEvent(server.el, c->fd, AE_WRITABLE);
// 判断要不要关闭客户端
if (c->flags & REDIS_CLOSE_AFTER_REPLY)
freeClient(c);
}
}
基于版本3.0.0版本,点击下载https://download.redis.io/releases/redis-3.0.0.tar.gz
本文地址,https://www.ccagml.com/?p=395