- redis源码从main开始1中initServerConfig方法初始化配置,在这之后,initServer方法根据配置创建redis所需要的相关内容,创建监听端口等相关操作
void initServer()
{
int j;
// 设置信号处理函数
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
// 设置 syslog
if (server.syslog_enabled)
{
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
server.syslog_facility);
}
// 初始化并创建数据结构
server.current_client = NULL;
server.clients = listCreate(); // 一个链表,保存了所有客户端状态结构
server.clients_to_close = listCreate(); // 链表,保存了所有待关闭的客户端
server.slaves = listCreate(); // 链表,保存了所有从服务器
server.monitors = listCreate(); // 链表,保存了所有监视器
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate(); // 取消所有在 unblocked_clients 链表中的客户端的阻塞状态
server.ready_keys = listCreate(); // If the specified key has clients blocked waiting for list pushes, this* function will put the key reference into the server.ready_keys list.
server.clients_waiting_acks = listCreate(); // 等待命令返回的客户端
server.get_ack_from_slaves = 0; /* If true we send REPLCONF GETACK. */
server.clients_paused = 0; /* True if clients are currently paused */
// 创建共享对象 通过复用来减少内存碎片,以及减少操作耗时的共享对象
createSharedObjects();
adjustOpenFilesLimit(); // 检查文件打开数够不够
server.el = aeCreateEventLoop(server.maxclients + REDIS_EVENTLOOP_FDSET_INCR); //创建事件循环
server.db = zmalloc(sizeof(redisDb) * server.dbnum);
/* Open the TCP listening socket for the user commands. */
// 打开 TCP 监听端口,用于等待客户端的命令请求
if (server.port != 0 &&
listenToPort(server.port, server.ipfd, &server.ipfd_count) == REDIS_ERR)
exit(1);
/* Open the listening Unix domain socket. */
// 打开 UNIX 本地端口
if (server.unixsocket != NULL)
{
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr, server.unixsocket,
server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR)
{
redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL, server.sofd);
}
/* Abort if there are no listening sockets at all. */
if (server.ipfd_count == 0 && server.sofd < 0)
{
redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
exit(1);
}
/* Create the Redis databases, and initialize other internal state. */
// 创建并初始化数据库结构
for (j = 0; j < server.dbnum; j++)
{
server.db[j].dict = dictCreate(&dbDictType, NULL);
server.db[j].expires = dictCreate(&keyptrDictType, NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType, NULL);
server.db[j].ready_keys = dictCreate(&setDictType, NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType, NULL);
server.db[j].eviction_pool = evictionPoolAlloc();
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
// 创建 PUBSUB 相关结构
server.pubsub_channels = dictCreate(&keylistDictType, NULL);
server.pubsub_patterns = listCreate();
listSetFreeMethod(server.pubsub_patterns, freePubsubPattern);
listSetMatchMethod(server.pubsub_patterns, listMatchPubsubPattern);
server.cronloops = 0; // /* Number of times the cron function run */
server.rdb_child_pid = -1; /* PID of RDB saving child */
server.aof_child_pid = -1; /* PID if rewriting process */
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL); /* Server start time */
server.stat_peak_memory = 0; /* Max used memory record */
server.resident_set_size = 0; /* RSS sampled in serverCron(). */
server.lastbgsave_status = REDIS_OK;
server.aof_last_write_status = REDIS_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
updateCachedTime();
/* Create the serverCron() time event, that's our main way to process
* background operations. */
// 为 serverCron() 创建时间事件
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR)
{
redisPanic("Can't create the serverCron time event.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
// 为 TCP 连接关联连接应答(accept)处理器
// 用于接受并应答客户端的 connect() 调用
for (j = 0; j < server.ipfd_count; j++)
{
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler, NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// 为本地套接字关联应答处理器
if (server.sofd > 0 && aeCreateFileEvent(server.el, server.sofd, AE_READABLE,
acceptUnixHandler, NULL) == AE_ERR)
redisPanic("Unrecoverable error creating server.sofd file event.");
/* Open the AOF file if needed. */
// 如果 AOF 持久化功能已经打开,那么打开或创建一个 AOF 文件
if (server.aof_state == REDIS_AOF_ON)
{
server.aof_fd = open(server.aof_filename,
O_WRONLY | O_APPEND | O_CREAT, 0644);
if (server.aof_fd == -1)
{
redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
strerror(errno));
exit(1);
}
}
/* 32 bit instances are limited to 4GB of address space, so if there is
* no explicit limit in the user provided configuration we set a limit
* at 3 GB using maxmemory with 'noeviction' policy'. This avoids
* useless crashes of the Redis instance for out of memory. */
// 对于 32 位实例来说,默认将最大可用内存限制在 3 GB
if (server.arch_bits == 32 && server.maxmemory == 0)
{
redisLog(REDIS_WARNING, "Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
server.maxmemory = 3072LL * (1024 * 1024); /* 3 GB */
server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
}
// 如果服务器以 cluster 模式打开,那么初始化 cluster
if (server.cluster_enabled)
clusterInit();
// 初始化复制功能有关的脚本缓存
replicationScriptCacheInit();
// 初始化脚本系统
scriptingInit();
// 初始化慢查询功能
slowlogInit();
// 初始化 BIO 系统
bioInit();
}
int listenToPort(int port, int *fds, int *count)
{
int j;
/* Force binding of 0.0.0.0 if no bind address is specified, always
* entering the loop if j == 0. */
if (server.bindaddr_count == 0)
server.bindaddr[0] = NULL;
for (j = 0; j < server.bindaddr_count || j == 0; j++)
{
if (server.bindaddr[j] == NULL)
{
/* Bind * for both IPv6 and IPv4, we enter here only if
* server.bindaddr_count == 0. */
fds[*count] = anetTcp6Server(server.neterr, port, NULL,
server.tcp_backlog);
if (fds[*count] != ANET_ERR)
{
anetNonBlock(NULL, fds[*count]);
(*count)++;
}
// 接收监听
fds[*count] = anetTcpServer(server.neterr, port, NULL,
server.tcp_backlog);
if (fds[*count] != ANET_ERR)
{
anetNonBlock(NULL, fds[*count]);
(*count)++;
}
/* Exit the loop if we were able to bind * on IPv4 or IPv6,
* otherwise fds[*count] will be ANET_ERR and we'll print an
* error and return to the caller with an error. */
if (*count)
break;
}
else if (strchr(server.bindaddr[j], ':'))
{
/* Bind IPv6 address. */
fds[*count] = anetTcp6Server(server.neterr, port, server.bindaddr[j],
server.tcp_backlog);
}
else
{
/* Bind IPv4 address. */
fds[*count] = anetTcpServer(server.neterr, port, server.bindaddr[j],
server.tcp_backlog);
}
if (fds[*count] == ANET_ERR)
{
redisLog(REDIS_WARNING,
"Creating Server TCP listening socket %s:%d: %s",
server.bindaddr[j] ? server.bindaddr[j] : "*",
port, server.neterr);
return REDIS_ERR;
}
anetNonBlock(NULL, fds[*count]);
(*count)++;
}
return REDIS_OK;
}
//ipv4开始监听, 返回socket监听的fd
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
int s, rv;
char _port[6]; /* strlen("65535") */
struct addrinfo hints, *servinfo, *p;
snprintf(_port, 6, "%d", port);
memset(&hints, 0, sizeof(hints));
hints.ai_family = af;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */
if ((rv = getaddrinfo(bindaddr, _port, &hints, &servinfo)) != 0)
{
anetSetError(err, "%s", gai_strerror(rv));
return ANET_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next)
{
if ((s = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
continue;
if (af == AF_INET6 && anetV6Only(err, s) == ANET_ERR)
goto error;
if (anetSetReuseAddr(err, s) == ANET_ERR)
goto error;
if (anetListen(err, s, p->ai_addr, p->ai_addrlen, backlog) == ANET_ERR)
goto error;
goto end;
}
if (p == NULL)
{
anetSetError(err, "unable to bind socket");
goto error;
}
error:
s = ANET_ERR;
end:
freeaddrinfo(servinfo);
return s;
}
int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
}
基于版本3.0.0版本,点击下载https://download.redis.io/releases/redis-3.0.0.tar.gz
本文地址,https://www.ccagml.com/?p=363