aof保存
aof是redis的一种持久化方式,可以根据配置自动触发保存,也可以手动执行保存
//在配置中
appendonly yes //打开aof保存
appendfilename “aof111.aof” // 设置aof报错路径
appendfsync always // 每次操作都aof保存
appendfsync everysec // 每秒aof保存
appendfsync no // 不保存
1.手动保存,当客户端执行bgrewriteaof命令,redis服务端会执行bgrewriteaofCommand方法开始相关的保存逻辑
2.自动保存,当客户端传来命令后,redis在call方法执行c->cmd->proc命令后,会对命令进行记录,将命令存放到server.aof_buf中,当下一个事件到来时(beforeSleep)或者服务器每毫秒的循环(serverCron)会根据配置看是否触发写文件
//1.beforeSleep手动保存
void bgrewriteaofCommand(redisClient *c)
{
if (server.aof_child_pid != -1)
{
addReplyError(c, "Background append only file rewriting already in progress");
}
else if (server.rdb_child_pid != -1)
{
server.aof_rewrite_scheduled = 1;
addReplyStatus(c, "Background append only file rewriting scheduled");
}
else if (rewriteAppendOnlyFileBackground() == REDIS_OK)
{
addReplyStatus(c, "Background append only file rewriting started");
}
else
{
addReply(c, shared.err);
}
}
// 后台保存
int rewriteAppendOnlyFileBackground(void)
{
pid_t childpid;
long long start;
// 已经有进程在进行 AOF 重写了
if (server.aof_child_pid != -1)
return REDIS_ERR;
// 记录 fork 开始前的时间,计算 fork 耗时用
start = ustime();
if ((childpid = fork()) == 0)
{
char tmpfile[256];
/* Child */
// 关闭网络连接 fd
closeListeningSockets(0);
// 为进程设置名字,方便记认
redisSetProcTitle("redis-aof-rewrite");
// 创建临时文件,并进行 AOF 重写
snprintf(tmpfile, 256, "temp-rewriteaof-bg-%d.aof", (int)getpid());
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK)
{
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty)
{
redisLog(REDIS_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty / (1024 * 1024));
}
// 发送重写成功信号
exitFromChild(0);
}
else
{
// 发送重写失败信号
exitFromChild(1);
}
}
else
{
/* Parent */
// 记录执行 fork 所消耗的时间
server.stat_fork_time = ustime() - start;
if (childpid == -1)
{
redisLog(REDIS_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,
"Background append only file rewriting started by pid %d", childpid);
// 记录 AOF 重写的信息
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
// 关闭字典自动 rehash
updateDictResizePolicy();
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge.
*
* 将 aof_selected_db 设为 -1 ,
* 强制让 feedAppendOnlyFile() 下次执行时引发一个 SELECT 命令,
* 从而确保之后新添加的命令会设置到正确的数据库中
*/
server.aof_selected_db = -1;
replicationScriptCacheFlush();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
//具体重写逻辑
int rewriteAppendOnlyFile(char *filename)
{
dictIterator *di = NULL;
dictEntry *de;
rio aof;
FILE *fp;
char tmpfile[256];
int j;
long long now = mstime();
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function.
*
* 创建临时文件
*
* 注意这里创建的文件名和 rewriteAppendOnlyFileBackground() 创建的文件名稍有不同
*/
snprintf(tmpfile, 256, "temp-rewriteaof-%d.aof", (int)getpid());
fp = fopen(tmpfile, "w");
if (!fp)
{
redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return REDIS_ERR;
}
// 初始化文件 io
rioInitWithFile(&aof, fp);
// 设置每写入 REDIS_AOF_AUTOSYNC_BYTES 字节
// 就执行一次 FSYNC
// 防止缓存中积累太多命令内容,造成 I/O 阻塞时间过长
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof, REDIS_AOF_AUTOSYNC_BYTES);
// 遍历所有数据库
for (j = 0; j < server.dbnum; j++)
{
char selectcmd[] = "*2\r\n6\r\nSELECT\r\n";
redisDb *db = server.db + j;
// 指向键空间
dict *d = db->dict;
if (dictSize(d) == 0)
continue;
// 创建键空间迭代器
di = dictGetSafeIterator(d);
if (!di)
{
fclose(fp);
return REDIS_ERR;
}
/* SELECT the new DB *
* 首先写入 SELECT 命令,确保之后的数据会被插入到正确的数据库上
*/
if (rioWrite(&aof, selectcmd, sizeof(selectcmd) - 1) == 0)
goto werr;
if (rioWriteBulkLongLong(&aof, j) == 0)
goto werr;
/* Iterate this DB writing every entry *
* 遍历数据库所有键,并通过命令将它们的当前状态(值)记录到新 AOF 文件中
*/
while ((de = dictNext(di)) != NULL)
{
sds keystr;
robj key, *o;
long long expiretime;
// 取出键
keystr = dictGetKey(de);
// 取出值
o = dictGetVal(de);
initStaticStringObject(key, keystr);
// 取出过期时间
expiretime = getExpire(db, &key);
/* If this key is already expired skip it *
* 如果键已经过期,那么跳过它,不保存
*/
if (expiretime != -1 && expiretime3\r\nSET\r\n";
if (rioWrite(&aof, cmd, sizeof(cmd) - 1) == 0)
goto werr;
/* Key and value */
if (rioWriteBulkObject(&aof, &key) == 0)
goto werr;
if (rioWriteBulkObject(&aof, o) == 0)
goto werr;
}
else if (o->type == REDIS_LIST)
{
if (rewriteListObject(&aof, &key, o) == 0)
goto werr;
}
else if (o->type == REDIS_SET)
{
if (rewriteSetObject(&aof, &key, o) == 0)
goto werr;
}
else if (o->type == REDIS_ZSET)
{
if (rewriteSortedSetObject(&aof, &key, o) == 0)
goto werr;
}
else if (o->type == REDIS_HASH)
{
if (rewriteHashObject(&aof, &key, o) == 0)
goto werr;
}
else
{
redisPanic("Unknown object type");
}
/* Save the expire time
*
* 保存键的过期时间
*/
if (expiretime != -1)
{
char cmd[] = "*3\r\n9\r\nPEXPIREAT\r\n";
// 写入 PEXPIREAT expiretime 命令
if (rioWrite(&aof, cmd, sizeof(cmd) - 1) == 0)
goto werr;
if (rioWriteBulkObject(&aof, &key) == 0)
goto werr;
if (rioWriteBulkLongLong(&aof, expiretime) == 0)
goto werr;
}
}
// 释放迭代器
dictReleaseIterator(di);
}
/* Make sure data will not remain on the OS's output buffers */
// 冲洗并关闭新 AOF 文件
if (fflush(fp) == EOF)
goto werr;
if (aof_fsync(fileno(fp)) == -1)
goto werr;
if (fclose(fp) == EOF)
goto werr;
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. *
* 原子地改名,用重写后的新 AOF 文件覆盖旧 AOF 文件
*/
if (rename(tmpfile, filename) == -1)
{
redisLog(REDIS_WARNING, "Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE, "SYNC append only file rewrite performed");
return REDIS_OK;
werr:
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING, "Write error writing append only file on disk: %s", strerror(errno));
if (di)
dictReleaseIterator(di);
return REDIS_ERR;
}
// 2.自动开启的aof
//redis.c
void call(redisClient *c, int flags)
{
// 执行实现函数
c->cmd->proc(c);
// 将命令复制到 AOF 和 slave 节点
if (flags&REDIS_CALL_PROPAGATE)
{
int flags = REDIS_PROPAGATE_NONE;
// 强制 REPL 传播
if (c->flags&REDIS_FORCE_REPL)
flags |= REDIS_PROPAGATE_REPL;
// 强制 AOF 传播
if (c->flags&REDIS_FORCE_AOF)
flags |= REDIS_PROPAGATE_AOF;
// 如果数据库有被修改,那么启用 REPL 和 AOF 传播
if (dirty)
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd, c->db->id, c->argv, c->argc, flags);
}
}
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
// 传播到 AOF
if (server.aof_state != REDIS_AOF_OFF && flags&REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd, dbid, argv, argc);
// 传播到 slave
if (flags&REDIS_PROPAGATE_REPL)
replicationFeedSlaves(server.slaves, dbid, argv, argc);
}
/*
* 将命令追加到 AOF 文件中,
* 如果 AOF 重写正在进行,那么也将命令追加到 AOF 重写缓存中。
*/
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc)
{
sds buf = sdsempty();
robj *tmpargv[3];
/* The DB this command was targeting is not the same as the last command
* we appendend. To issue a SELECT command is needed. *
* 使用 SELECT 命令,显式设置数据库,确保之后的命令被设置到正确的数据库
*/
if (dictid != server.aof_selected_db)
{
char seldb[64];
snprintf(seldb, sizeof(seldb), "%d", dictid);
buf = sdscatprintf(buf, "*2\r\n6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb), seldb);
server.aof_selected_db = dictid;
}
// EXPIRE 、 PEXPIRE 和 EXPIREAT 命令
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand)
{
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT
*
* 将 EXPIRE 、 PEXPIRE 和 EXPIREAT 都翻译成 PEXPIREAT
*/
buf = catAppendOnlyExpireAtCommand(buf, cmd, argv[1], argv[2]);
// SETEX 和 PSETEX 命令
}
else if (cmd->proc == setexCommand || cmd->proc == psetexCommand)
{
/* Translate SETEX/PSETEX to SET and PEXPIREAT
*
* 将两个命令都翻译成 SET 和 PEXPIREAT
*/
// SET
tmpargv[0] = createStringObject("SET", 3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf, 3, tmpargv);
// PEXPIREAT
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf, cmd, argv[1], argv[2]);
// 其他命令
}
else
{
/* All the other commands don't need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
buf = catAppendOnlyGenericCommand(buf, argc, argv);
}
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed.
*
* 将命令追加到 AOF 缓存中,
* 在重新进入事件循环之前,这些命令会被冲洗到磁盘上,
* 并向客户端返回一个回复。
*/
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file.
*
* 如果 BGREWRITEAOF 正在进行,
* 那么我们还需要将命令追加到重写缓存中,
* 从而记录当前正在重写的 AOF 文件和数据库当前状态的差异。
*/
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char *)buf, sdslen(buf));
// 释放
sdsfree(buf);
}
// 将aof_buf中的命令刷到文件
void flushAppendOnlyFile(int force)
{
ssize_t nwritten;
int sync_in_progress = 0;
// 缓冲区中没有任何内容,直接返回
if (sdslen(server.aof_buf) == 0)
return;
// 策略为每秒 FSYNC
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
// 是否有 SYNC 正在后台进行?
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
// 每秒 fsync ,并且强制写入为假
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force)
{
/* With this append fsync policy we do background fsyncing.
*
* 当 fsync 策略为每秒钟一次时, fsync 在后台执行。
*
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds.
*
* 如果后台仍在执行 FSYNC ,那么我们可以延迟写操作一两秒
* (如果强制执行 write 的话,服务器主线程将阻塞在 write 上面)
*/
if (sync_in_progress)
{
// 有 fsync 正在后台进行 。。。
if (server.aof_flush_postponed_start == 0)
{
/* No previous write postponinig, remember that we are
* postponing the flush and return.
*
* 前面没有推迟过 write 操作,这里将推迟写操作的时间记录下来
* 然后就返回,不执行 write 或者 fsync
*/
server.aof_flush_postponed_start = server.unixtime;
return;
}
else if (server.unixtime - server.aof_flush_postponed_start < 2)
{
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again.
*
* 如果之前已经因为 fsync 而推迟了 write 操作
* 但是推迟的时间不超过 2 秒,那么直接返回
* 不执行 write 或者 fsync
*/
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds.
*
* 如果后台还有 fsync 在执行,并且 write 已经推迟 >= 2 秒
* 那么执行写操作(write 将被阻塞)
*/
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE, "Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* If you are following this code path, then we are going to write so
* set reset the postponed flush sentinel to zero.
*
* 执行到这里,程序会对 AOF 文件进行写入。
*
* 清零延迟 write 的时间记录
*/
server.aof_flush_postponed_start = 0;
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
*
* 执行单个 write 操作,如果写入设备是物理的话,那么这个操作应该是原子的
*
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike
*
* 当然,如果出现像电源中断这样的不可抗现象,那么 AOF 文件也是可能会出现问题的
* 这时就要用 redis-check-aof 程序来进行修复。
*/
nwritten = write(server.aof_fd, server.aof_buf, sdslen(server.aof_buf));
if (nwritten != (signed)sdslen(server.aof_buf))
{
static time_t last_write_error_log = 0;
int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
// 将日志的记录频率限制在每行 AOF_WRITE_LOG_ERROR_RATE 秒
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE)
{
can_log = 1;
last_write_error_log = server.unixtime;
}
/* Lof the AOF write error and record the error code. */
// 如果写入出错,那么尝试将该情况写入到日志里面
if (nwritten == -1)
{
if (can_log)
{
redisLog(REDIS_WARNING, "Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
}
else
{
if (can_log)
{
redisLog(REDIS_WARNING, "Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
// 尝试移除新追加的不完整内容
if (ftruncate(server.aof_fd, server.aof_current_size) == -1)
{
if (can_log)
{
redisLog(REDIS_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s",
strerror(errno));
}
}
else
{
/* If the ftrunacate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
/* Handle the AOF write error. */
// 处理写入 AOF 文件时出现的错误
if (server.aof_fsync == AOF_FSYNC_ALWAYS)
{
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synched on disk. */
redisLog(REDIS_WARNING, "Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
}
else
{
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = REDIS_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0)
{
server.aof_current_size += nwritten;
sdsrange(server.aof_buf, nwritten, -1);
}
return; /* We'll try again on the next call... */
}
}
else
{
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
// 写入成功,更新最后写入状态
if (server.aof_last_write_status == REDIS_ERR)
{
redisLog(REDIS_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = REDIS_OK;
}
}
// 更新写入后的 AOF 文件大小
server.aof_current_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary).
*
* 如果 AOF 缓存的大小足够小的话,那么重用这个缓存,
* 否则的话,释放 AOF 缓存。
*/
if ((sdslen(server.aof_buf) + sdsavail(server.aof_buf)) < 4000)
{
// 清空缓存中的内容,等待重用
sdsclear(server.aof_buf);
}
else
{
// 释放缓存
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background.
*
* 如果 no-appendfsync-on-rewrite 选项为开启状态,
* 并且有 BGSAVE 或者 BGREWRITEAOF 正在进行的话,
* 那么不执行 fsync
*/
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
/* Perform the fsync if needed. */
// 总是执行 fsnyc
if (server.aof_fsync == AOF_FSYNC_ALWAYS)
{
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
// 更新最后一次执行 fsnyc 的时间
server.aof_last_fsync = server.unixtime;
// 策略为每秒 fsnyc ,并且距离上次 fsync 已经超过 1 秒
}
else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync))
{
// 放到后台执行
if (!sync_in_progress)
aof_background_fsync(server.aof_fd);
// 更新最后一次执行 fsync 的时间
server.aof_last_fsync = server.unixtime;
}
// 其实上面无论执行 if 部分还是 else 部分都要更新 fsync 的时间
// 可以将代码挪到下面来
// server.aof_last_fsync = server.unixtime;
}
// 每次处理事件之前执行
void beforeSleep(struct aeEventLoop *eventLoop)
{
/* Write the AOF buffer on disk */
// 将 AOF 缓冲区的内容写入到 AOF 文件
flushAppendOnlyFile(0);
}
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
{
// 根据 AOF 政策,
// 考虑是否需要将 AOF 缓冲区中的内容写入到 AOF 文件中
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if (server.aof_flush_postponed_start)
flushAppendOnlyFile(0);
/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* an higher frequency. */
run_with_period(1000)
{
if (server.aof_last_write_status == REDIS_ERR)
flushAppendOnlyFile(0);
}
}
基于版本3.0.0版本,点击下载https://download.redis.io/releases/redis-3.0.0.tar.gz
本文地址,https://www.ccagml.com/?p=437