前面我们看完事件循环的主流程,这里我们开始看第一个命令set的在redis中的执行流程
// 一个set命令的执行流程
//src/networking.c
//在上文创建操作redis的客户端时使用readQueryFromClient方法接收处理连接新到达的命令
/*
* 处理连接请求内容
*/
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
{
processInputBuffer(c);
}
// 在readQueryFromClient通过该方法处理请求内容
void processInputBuffer(redisClient *c)
{
processMultibulkBuffer(c)
processCommand(c)
}
int processMultibulkBuffer(redisClient *c)
{
char *newline = NULL;
int pos = 0, ok;
long long ll;
// 读入命令的参数个数
// 比如执行命令是set aa 23333, 则这边的c->querybuf显示为
// *3\r\n3\r\nset\r\n2\r\naa\r\n5\r\n23333
// 客户端发过来的命令以 * 开头
fprintf(stderr, "set aa 23333 c->querybuf %s \n", c->querybuf);
if (c->multibulklen == 0)
{
/* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf, '\r');
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
redisAssertWithInfo(c, NULL, c->querybuf[0] == '*');
// 获得参数的个数,从*之后到第一个\r\n之前,存到ll中
ok = string2ll(c->querybuf + 1, newline - (c->querybuf + 1), &ll);
pos = (newline - c->querybuf) + 2;
// 设置参数数量
c->multibulklen = ll;
/* Setup argv array on client structure */
if (c->argv)
zfree(c->argv);
c->argv = zmalloc(sizeof(robj *) * c->multibulklen);
}
// 从 c->querybuf 中读入参数,并创建各个参数对象到 c->argv
while (c->multibulklen)
{
/* Read bulk length if unknown */
if (c->bulklen == -1)
{
// 确保 "\r\n" 存在
newline = strchr(c->querybuf + pos, '\r');
/* Buffer should also contain \n */
if (newline - (c->querybuf) > ((signed)sdslen(c->querybuf) - 2))
break;
// 检查刚好是
// *3\r\n3\r\nset\r\n2\r\naa\r\n5\r\n23333
if (c->querybuf[pos] != '')
{
addReplyErrorFormat(c,
"Protocol error: expected '', got '%c'",
c->querybuf[pos]);
setProtocolError(c, pos);
return REDIS_ERR;
}
// 读取长度
// 比如 *3\r\n3\r\nset\r\n2\r\naa\r\n5\r\n23333
ok = string2ll(c->querybuf + pos + 1, newline - (c->querybuf + pos + 1), &ll);
if (!ok || ll < 0 || ll > 512 * 1024 * 1024)
{
addReplyError(c, "Protocol error: invalid bulk length");
setProtocolError(c, pos);
return REDIS_ERR;
}
// 定位到参数的开头
// 比如
// *3\r\n3\r\nset\r\n2\r\naa\r\n5\r\n23333
// pos = 4 值为
// 增加后 pos = 8 值为s
// pos = 13 值为// 增加后 pos = 17 值为a
// pos = 22 值为
// 增加后 pos = 26 值为2
pos += newline - (c->querybuf + pos) + 2;
// 参数的长度
c->bulklen = ll;
}
/* Read bulk argument */
// 读入参数
if (sdslen(c->querybuf) - pos < (unsigned)(c->bulklen + 2))
/* Not enough data (+2 == trailing \r\n) */
break;
}
else
{
c->argv[c->argc++] =
createStringObject(c->querybuf + pos, c->bulklen);
pos += c->bulklen + 2;
// 还需要读取的参数个数
c->multibulklen--;
}
}
/* Trim to pos */
if (pos)
sdsrange(c->querybuf, pos, -1);
/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0)
return REDIS_OK;
/* Still not read to process the command */
// 如果还有参数未读取完,那么就协议内容有错
return REDIS_ERR;
}
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* If 1 is returned the client is still alive and valid and
* other operations can be performed by the caller. Otherwise
* if 0 is returned the client was destroyed (i.e. after QUIT).
*
*/
int processCommand(redisClient *c)
{
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
// 根据字符串查找命令, 检查参数个数
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd)
{
// 没有命令
}
else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity))
{
//参数错误
}
// 执行命令
call(c, REDIS_CALL_FULL);
}
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags)
{
// 执行命令的函数.在前面的lookupCommand会给cmd设置值
// 在例子set aa 23333 中相当于执行 setCommand方法
c->cmd->proc(c);
}
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
void setCommand(redisClient *c)
{
int j;
robj *expire = NULL;
int unit = UNIT_SECONDS;
int flags = REDIS_SET_NO_FLAGS;
// 设置选项参数
for (j = 3; j < c->argc; j++)
{
char *a = c->argv[j]->ptr;
robj *next = (j == c->argc - 1) ? NULL : c->argv[j + 1];
if ((a[0] == 'n' || a[0] == 'N') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0')
{
flags |= REDIS_SET_NX;
}
else if ((a[0] == 'x' || a[0] == 'X') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0')
{
flags |= REDIS_SET_XX;
}
else if ((a[0] == 'e' || a[0] == 'E') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next)
{
unit = UNIT_SECONDS;
expire = next;
j++;
}
else if ((a[0] == 'p' || a[0] == 'P') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next)
{
unit = UNIT_MILLISECONDS;
expire = next;
j++;
}
else
{
addReply(c, shared.syntaxerr);
return;
}
}
// 对参数编码
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c, flags, c->argv[1], c->argv[2], expire, unit, NULL, NULL);
}
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply)
{
long long milliseconds = 0; /* initialized to avoid any harmness warning */
// 取出过期时间
if (expire)
{
// 取出 expire 参数的值
// T = O(N)
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK)
return;
// expire 参数的值不正确时报错
if (milliseconds <= 0)
{
addReplyError(c, "invalid expire time in SETEX");
return;
}
// 将秒转成毫秒
if (unit == UNIT_SECONDS)
milliseconds *= 1000;
}
// 如果设置了 NX 或者 XX 参数,那么检查条件是否不符合这两个设置
// 在条件不符合时报错,报错的内容由 abort_reply 参数决定
if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db, key) != NULL) ||
(flags & REDIS_SET_XX && lookupKeyWrite(c->db, key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
// 将键值关联到数据库
setKey(c->db, key, val);
server.dirty++;
// 为键设置过期时间
if (expire)
setExpire(c->db, key, mstime() + milliseconds);
// 发送通知
notifyKeyspaceEvent(REDIS_NOTIFY_STRING, "set", key, c->db->id);
// 发送通知
if (expire)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
"expire", key, c->db->id);
// 设置成功回复客户端
addReply(c, ok_reply ? ok_reply : shared.ok);
}
/* High level Set operation. This function can be used in order to set
* a key, whatever it was existing or not, to a new object.
*
* 1) The ref count of the value object is incremented.
*
* 2) clients WATCHing for the destination key notified.
*
* 3) The expire time of the key is reset (the key is made persistent).
*/
void setKey(redisDb *db, robj *key, robj *val)
{
// 添加或覆写数据库中的键值对
if (lookupKeyWrite(db, key) == NULL)
{
// 没有值就添加新值
dbAdd(db, key, val);
}
else
{
// 有值就覆盖
dbOverwrite(db, key, val);
}
incrRefCount(val);
// 移除键的过期时间
removeExpire(db, key);
// 发送键修改通知
signalModifiedKey(db, key);
}
/* Add the key to the DB. It's up to the caller to increment the reference
* counter of the value if needed.
* The program is aborted if the key already exists.
*
*/
void dbAdd(redisDb *db, robj *key, robj *val)
{
// 复制操作的键
sds copy = sdsdup(key->ptr);
// 尝试添加键值对
int retval = dictAdd(db->dict, copy, val);
// 如果键已经存在,那么停止
redisAssertWithInfo(NULL, key, retval == REDIS_OK);
// 如果开启了集群模式,那么将键保存到槽里面
if (server.cluster_enabled)
slotToKeyAdd(key);
}
/* Overwrite an existing key with a new value. Incrementing the reference
* count of the new value is up to the caller.
*
* This function does not modify the expire time of the existing key.
*
* The program is aborted if the key was not already present.
*/
void dbOverwrite(redisDb *db, robj *key, robj *val)
{
dictEntry *de = dictFind(db->dict, key->ptr);
// 节点必须存在,否则中止
redisAssertWithInfo(NULL, key, de != NULL);
// 覆写旧值
dictReplace(db->dict, key->ptr, val);
}
基于版本3.0.0版本,点击下载https://download.redis.io/releases/redis-3.0.0.tar.gz
本文地址,https://www.ccagml.com/?p=387