在redis源码从main开始7中看完了一个命令的执行过程,本章我们来看redis将命令执行的结果返回给客户端的过程


// 从main开始之回复消息

/*
 * 把redis对象创建回复给客户端
 */
void addReplyBulk(redisClient *c, robj *obj)
{
    addReplyBulkLen(c, obj);  // 回复前缀既 回复长度\r\n
    addReply(c, obj);         // 回复的具体内容
    addReply(c, shared.crlf); // 增加常见回复\r\n
}

/* 创建一个回复前缀 */
void addReplyBulkLen(redisClient *c, robj *obj)
{
    size_t len;

    if (sdsEncodedObject(obj))
    {
        len = sdslen(obj->ptr);
    }
    else
    {
        long n = (long)obj->ptr;

        /* 回复的长度 */
        len = 1;
        if (n<0)
        {
            len++;
            n = -n;
        }
        while ((n = n / 10) != 0)
        {
            len++;
        }
    }

    if (len'); // 构建字符串
}

// 给客户端发消息
void addReply(redisClient *c, robj *obj)
{

    // 给能发送消息的网络客户端增加写消息的处理器
    if (prepareClientToWrite(c) != REDIS_OK)
        return;

    /* 
     * 一些优化判断类型看要不要拷贝啥的
     */
    if (sdsEncodedObject(obj))
    {
        // 试试看能不能直接复制进c->buf中
        if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != REDIS_OK)
            // 不能复制 c->buf 中,就加到 c->reply 链表中
            _addReplyObjectToList(c, obj);
    }
    else if (obj->encoding == REDIS_ENCODING_INT)
    {
        /* 优化 试试看能不能长整数优化成字符串*/
        if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32)
        {
            char buf[32];
            int len;

            len = ll2string(buf, sizeof(buf), (long)obj->ptr);
            if (_addReplyToBuffer(c, buf, len) == REDIS_OK)
                return;
        }
        obj = getDecodedObject(obj);
        // 普通的发送redis对象的方式
        if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != REDIS_OK)
            _addReplyObjectToList(c, obj);
        decrRefCount(obj);
    }
    else
    {
        redisPanic("Wrong obj->encoding in addReply()");
    }
}

/* 
创建新的事件循环写的处理方法
 */
int prepareClientToWrite(redisClient *c)
{

    // LUA 脚本环境所使用的伪客户端总是可写的
    if (c->flags & REDIS_LUA_CLIENT)
        return REDIS_OK;

    // 客户端是主服务器并且不接受查询,
    // 那么它是不可写的,出错
    if ((c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_MASTER_FORCE_REPLY))
        return REDIS_ERR;

    // 无连接的伪客户端总是不可写的
    if (c->fd <= 0)
        return REDIS_ERR; /* Fake client */

    // 一般情况,为客户端套接字安装写处理器到事件循环
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
                          sendReplyToClient, c) == AE_ERR)
        return REDIS_ERR;

    return REDIS_OK;
}

/*
 * 事件触发 给套接字写内容的方法
 */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
{
    redisClient *c = privdata;
    int nwritten = 0, totwritten = 0, objlen;
    size_t objmem;
    robj *o;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    // 有内容就发
    while (c->bufpos > 0 || listLength(c->reply))
    {

        if (c->bufpos > 0)
        {
            // 写入内容到套接字
            nwritten = write(fd, c->buf + c->sentlen, c->bufpos - c->sentlen);
            if (nwritten <= 0)
                break;
            c->sentlen += nwritten;
            totwritten += nwritten;

            /* 发光了就清空*/
            if (c->sentlen == c->bufpos)
            {
                c->bufpos = 0;
                c->sentlen = 0;
            }
        }
        else
        {

            // 发送链表的等待发送内容
            o = listNodeValue(listFirst(c->reply));
            objlen = sdslen(o->ptr);
            objmem = getStringObjectSdsUsedMemory(o);

            // 略过空对象
            if (objlen == 0)
            {
                listDelNode(c->reply, listFirst(c->reply));
                c->reply_bytes -= objmem;
                continue;
            }

            // 写入内容到套接字
            nwritten = write(fd, ((char *)o->ptr) + c->sentlen, objlen - c->sentlen);
            if (nwritten <= 0)
                break;
            c->sentlen += nwritten;
            totwritten += nwritten;

            // 删除写完的节点
            if (c->sentlen == objlen)
            {
                listDelNode(c->reply, listFirst(c->reply));
                c->sentlen = 0;
                c->reply_bytes -= objmem;
            }
        }
        /* 
        // 达到一定写入长度就中断,等待下次在写
         */
        if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
            (server.maxmemory == 0 ||
             zmalloc_used_memory() < server.maxmemory))
            break;
    }

    // 写入出错检查
    if (nwritten == -1)
    {
        if (errno == EAGAIN)
        {
            nwritten = 0;
        }
        else
        {
            redisLog(REDIS_VERBOSE,
                     "Error writing to client: %s", strerror(errno));
            freeClient(c);
            return;
        }
    }

    if (totwritten > 0)
    {
        /* 检查发送超时的东西 */
        if (!(c->flags & REDIS_MASTER))
            c->lastinteraction = server.unixtime;
    }
    if (c->bufpos == 0 && listLength(c->reply) == 0)
    {
        c->sentlen = 0;

        // 删除 写事件
        aeDeleteFileEvent(server.el, c->fd, AE_WRITABLE);

        // 判断要不要关闭客户端
        if (c->flags & REDIS_CLOSE_AFTER_REPLY)
            freeClient(c);
    }
}

基于版本3.0.0版本,点击下载https://download.redis.io/releases/redis-3.0.0.tar.gz

本文地址,https://www.ccagml.com/?p=395

发表评论