视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
Redis数据持久化机制AOF原理分析一
2020-11-09 07:35:49 责编:小采
文档


int loadAppendOnlyFile(char *filename) {
 struct redisClient *fakeClient;
 FILE *fp = fopen(filename,"r");
 struct redis_stat sb;
 int old_aof_state = server.aof_state;
 long loops = 0;

 //redis_fstat就是fstat函数,通过fileno(fp)得到文件描述符,获取文件的状态存储于sb中,
 //具体可以参考stat函数,st_size就是文件的字节数
 if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
 server.aof_current_size = 0;
 fclose(fp);
 return REDIS_ERR;
 }

 if (fp == NULL) {//打开文件失败
 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
 exit(1);
 }

 /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
 * to the same file we're about to read. */
 server.aof_state = REDIS_AOF_OFF;

 fakeClient = createFakeClient(); //建立伪终端
 startLoading(fp); // 定义于 rdb.c ,更新服务器的载入状态

 while(1) {
 int argc, j;
 unsigned long len;
 robj **argv;
 char buf[128];
 sds argsds;
 struct redisCommand *cmd;

 /* Serve the clients from time to time */
 // 有间隔地处理外部请求,ftello()函数得到文件的当前位置,返回值为long
 if (!(loops++ % 1000)) {
 loadingProgress(ftello(fp));//保存aof文件读取的位置,ftellno(fp)获取文件当前位置
 aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);//处理事件
 }
 //按行读取AOF数据
 if (fgets(buf,sizeof(buf),fp) == NULL) {
 if (feof(fp))//达到文件尾EOF
 break;
 else
 goto readerr;
 }
 //读取AOF文件中的命令,依照Redis的协议处理
 if (buf[0] != '*') goto fmterr;
 argc = atoi(buf+1);//参数个数
 if (argc < 1) goto fmterr;

 argv = zmalloc(sizeof(robj*)*argc);//参数值
 for (j = 0; j < argc; j++) {
 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
 if (buf[0] != '$') goto fmterr;
 len = strtol(buf+1,NULL,10);//每个bulk的长度
 argsds = sdsnewlen(NULL,len);//新建一个空sds
 //按照bulk的长度读取
 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
 argv[j] = createObject(REDIS_STRING,argsds);
 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF 跳过\r\n*/
 }

 /* Command lookup */
 cmd = lookupCommand(argv[0]->ptr);
 if (!cmd) {
 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr);
 exit(1);
 }
 /* Run the command in the context of a fake client */
 fakeClient->argc = argc;
 fakeClient->argv = argv;
 cmd->proc(fakeClient);//执行命令

 /* The fake client should not have a reply */
 redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
 /* The fake client should never get blocked */
 redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);

 /* Clean up. Command code may have changed argv/argc so we use the
 * argv/argc of the client instead of the local variables. */
 for (j = 0; j < fakeClient->argc; j++)
 decrRefCount(fakeClient->argv[j]);
 zfree(fakeClient->argv);
 }

 /* This point can only be reached when EOF is reached without errors.
 * If the client is in the middle of a MULTI/EXEC, log error and quit. */
 if (fakeClient->flags & REDIS_MULTI) goto readerr;

 fclose(fp);
 freeFakeClient(fakeClient);
 server.aof_state = old_aof_state;
 stopLoading();
 aofUpdateCurrentSize(); //更新server.aof_current_size,AOF文件大小
 server.aof_rewrite_base_size = server.aof_current_size;
 return REDIS_OK;
	…………
}
在前面一篇关于AOF参数配置的博客遗留了一个问题,server.aof_current_size参数的初始化,下面解决这个疑问。
void aofUpdateCurrentSize(void) {
 struct redis_stat sb;

 if (redis_fstat(server.aof_fd,&sb) == -1) {
 redisLog(REDIS_WARNING,"Unable to obtain the AOF file length. stat: %s",
 strerror(errno));
 } else {
 server.aof_current_size = sb.st_size;
 }
}
redis_fstat是作者对Linux中fstat函数的重命名,该还是就是获取文件相关的参数信息,具体可以Google之,sb.st_size就是当前AOF文件的大小。这里需要知道server.aof_fd即AOF文件描述符,该参数的初始化在initServer()函数中
/* Open the AOF file if needed. */
 if (server.aof_state == REDIS_AOF_ON) {
 server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,04);
 if (server.aof_fd == -1) {
 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",strerror(errno));
 exit(1);
 }
 }
至此,Redis Server启动加载硬盘中AOF文件数据的操作就成功结束了。

Server数据库产生新数据如何持久化到硬盘

当客户端执行Set等修改数据库中字段的指令时就会造成Server数据库中数据被修改,这些修改的数据应该被实时更新到AOF文件中,并且也要按照一定的fsync机制刷新到硬盘中,保证数据不会丢失。

在上一篇博客中,提到了三种fsync方式:appendfsync always, appendfsync everysec, appendfsync no. 具体体现在server.aof_fsync参数中。

首先看当客户端请求的指令造成数据被修改,Redis是如何将修改数据的指令添加到server.aof_buf中的。

call() -> propagate() -> feedAppendOnlyFile(),call()函数判断执行指令后是否造成数据被修改。

feedAppendOnlyFile函数首先会判断Server是否开启了AOF,如果开启AOF,那么根据Redis通讯协议将修改数据的指令重现成请求的字符串,注意在超时设置的处理方式,接着将字符串append到server.aof_buf中即可。该函数最后两行代码需要注意,这才是重点,如果server.aof_child_pid != -1那么表明此时Server正在重写rewrite AOF文件,需要将被修改的数据追加到server.aof_rewrite_buf_blocks链表中,等待rewrite结束后,追加到AOF文件中。具体见下面代码的注释。

/* Propagate the specified command (in the context of the specified database id)
 * to AOF and Slaves.
 *
 * flags are an xor between:
 * + REDIS_PROPAGATE_NONE (no propagation of command at all)
 * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
 * + REDIS_PROPAGATE_REPL (propagate into the replication link)
 */
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
 int flags)
{
 //将cmd指令变动的数据追加到AOF文件中
 if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
 feedAppendOnlyFile(cmd,dbid,argv,argc);
 if (flags & REDIS_PROPAGATE_REPL)
 replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
//cmd指令修改了数据,先将更新的数据写到server.aof_buf中
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
 sds buf = sdsempty();
 robj *tmpargv[3];

 /* The DB this command was targeting is not the same as the last command
 * we appendend. To issue a SELECT command is needed. */
 // 当前 db 不是指定的 aof db,通过创建 SELECT 命令来切换数据库
 if (dictid != server.aof_selected_db) {
 char seldb[];

 snprintf(seldb,sizeof(seldb),"%d",dictid);
 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
 (unsigned long)strlen(seldb),seldb);
 server.aof_selected_db = dictid;
 }

 // 将 EXPIRE / PEXPIRE / EXPIREAT 命令翻译为 PEXPIREAT 命令
 if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
 cmd->proc == expireatCommand) {
 /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
 buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
 }// 将 SETEX / PSETEX 命令翻译为 SET 和 PEXPIREAT 组合命令
 else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
 /* Translate SETEX/PSETEX to SET and PEXPIREAT */
 tmpargv[0] = createStringObject("SET",3);
 tmpargv[1] = argv[1];
 tmpargv[2] = argv[3];
 buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
 decrRefCount(tmpargv[0]);
 buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
 } else {//其他的指令直接追加
 /* All the other commands don't need translation or need the
 * same translation already operated in the command vector
 * for the replication itself. */
 buf = catAppendOnlyGenericCommand(buf,argc,argv);
 }

 /* Append to the AOF buffer. This will be flushed on disk just before
 * of re-entering the event loop, so before the client will get a
 * positive reply about the operation performed. */
 // 将 buf 追加到服务器的 aof_buf 末尾,在beforeSleep中写到AOF文件中,并且根据情况fsync刷新到硬盘
 if (server.aof_state == REDIS_AOF_ON)
 server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

 /* If a background append only file rewriting is in progress we want to
 * accumulate the differences between the child DB and the current one
 * in a buffer, so that when the child process will do its work we
 * can append the differences to the new append only file. */
 //如果server.aof_child_pid不为1,那就说明有快照进程正在写数据到临时文件(已经开始rewrite),
 //那么必须先将这段时间接收到的指令更新的数据先暂时存储起来,等到快照进程完成任务后,
 //将这部分数据写入到AOF文件末尾,保证数据不丢失
 //解释为什么需要aof_rewrite_buf_blocks,当server在进行rewrite时即读取所有数据库中的数据,
 //有些数据已经写到新的AOF文件,但是此时客户端执行指令又将该值修改了,因此造成了差异
 if (server.aof_child_pid != -1)
 aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
 /*这里说一下server.aof_buf和server.aof_rewrite_buf_blocks的区别
 aof_buf是正常情况下aof文件打开的时候,会不断将这份数据写入到AOF文件中。
 aof_rewrite_buf_blocks 是如果用户主动触发了写AOF文件的命令时,比如 config set appendonly yes命令
 那么redis会fork创建一个后台进程,也就是当时的数据快照,然后将数据写入到一个临时文件中去。
 在此期间发送的命令,我们需要把它们记录起来,等后台进程完成AOF临时文件写后,serverCron定时任务
 感知到这个退出动作,然后就会调用backgroundRewriteDoneHandler进而调用aofRewriteBufferWrite函数,
 将aof_rewrite_buf_blocks上面的数据,也就是diff数据写入到临时AOF文件中,然后再unlink替换正常的AOF文件。
 因此可以知道,aof_buf一般情况下比aof_rewrite_buf_blocks要少,
 但开始的时候可能aof_buf包含一些后者不包含的前面部分数据。*/

 sdsfree(buf);
} 
Server在每次事件循环之前会调用一次beforeSleep函数,下面看看这个函数做了什么工作?
/* This function gets called every time Redis is entering the
 * main loop of the event driven library, that is, before to sleep
 * for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
 REDIS_NOTUSED(eventLoop);
 listNode *ln;
 redisClient *c;

 /* Run a fast expire cycle (the called function will return
 * ASAP if a fast cycle is not needed). */
 if (server.active_expire_enabled && server.masterhost == NULL)
 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

 /* Try to process pending commands for clients that were just unblocked. */
 while (listLength(server.unblocked_clients)) {
 ln = listFirst(server.unblocked_clients);
 redisAssert(ln != NULL);
 c = ln->value;
 listDelNode(server.unblocked_clients,ln);
 c->flags &= ~REDIS_UNBLOCKED;

 /* Process remaining data in the input buffer. */
 //处理客户端在阻塞期间接收到的客户端发送的请求
 if (c->querybuf && sdslen(c->querybuf) > 0) {
 server.current_client = c;
 processInputBuffer(c);
 server.current_client = NULL;
 }
 }

 /* Write the AOF buffer on disk */
 //将server.aof_buf中的数据追加到AOF文件中并fsync到硬盘上
 flushAppendOnlyFile(0);
}
通过上面的代码及注释可以发现,beforeSleep函数做了三件事:1、处理过期键,2、处理阻塞期间的客户端请求,3、将server.aof_buf中的数据追加到AOF文件中并fsync刷新到硬盘上,flushAppendOnlyFile函数给定了一个参数force,表示是否强制写入AOF文件,0表示非强制即支持延迟写,1表示强制写入。
void flushAppendOnlyFile(int force) {
 ssize_t nwritten;
 int sync_in_progress = 0;
 if (sdslen(server.aof_buf) == 0) return;
 // 返回后台正在等待执行的 fsync 数量
 if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
 sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

 // AOF 模式为每秒 fsync ,并且 force 不为 1 如果可以的话,推延冲洗
 if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
 /* With this append fsync policy we do background fsyncing.
 * If the fsync is still in progress we can try to delay
 * the write for a couple of seconds. */
 // 如果 aof_fsync 队列里已经有正在等待的任务
 if (sync_in_progress) {
 // 上一次没有推迟冲洗过,记录推延的当前时间,然后返回
 if (server.aof_flush_postponed_start == 0) {
 /* No previous write postponinig, remember that we are
 * postponing the flush and return. */
 server.aof_flush_postponed_start = server.unixtime;
 return;
 } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
 // 允许在两秒之内的推延冲洗
 /* We were already waiting for fsync to finish, but for less
 * than two seconds this is still ok. Postpone again. */
 return;
 }
 /* Otherwise fall trough, and go write since we can't wait
 * over two seconds. */
 server.aof_delayed_fsync++;
 redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
 }
 }
 /* If you are following this code path, then we are going to write so
 * set reset the postponed flush sentinel to zero. */
 server.aof_flush_postponed_start = 0;

 /* We want to perform a single write. This should be guaranteed atomic
 * at least if the filesystem we are writing is a real physical one.
 * While this will save us against the server being killed I don't think
 * there is much to do about the whole server stopping for power problems
 * or alike */
 // 将 AOF 缓存写入到文件,如果一切幸运的话,写入会原子性地完成
 nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
 if (nwritten != (signed)sdslen(server.aof_buf)) {//出错
 /* Ooops, we are in troubles. The best thing to do for now is
 * aborting instead of giving the illusion that everything is
 * working as expected. */
 if (nwritten == -1) {
 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
 } else {
 redisLog(REDIS_WARNING,"Exiting on short write while writing to "
 "the append-only file: %s (nwritten=%ld, "
 "expected=%ld)",
 strerror(errno),
 (long)nwritten,
 (long)sdslen(server.aof_buf));

 if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
 redisLog(REDIS_WARNING, "Could not remove short write "
 "from the append-only file. Redis may refuse "
 "to load the AOF the next time it starts. "
 "ftruncate: %s", strerror(errno));
 }
 }
 exit(1);
 }
 server.aof_current_size += nwritten;

 /* Re-use AOF buffer when it is small enough. The maximum comes from the
 * arena size of 4k minus some overhead (but is otherwise arbitrary). */
 // 如果 aof 缓存不是太大,那么重用它,否则,清空 aof 缓存
 if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
 sdsclear(server.aof_buf);
 } else {
 sdsfree(server.aof_buf);
 server.aof_buf = sdsempty();
 }

 /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
 * children doing I/O in the background. */
 //aof rdb子进程运行中不支持fsync并且aof rdb子进程正在运行,那么直接返回,
 //但是数据已经写到aof文件中,只是没有刷新到硬盘
 if (server.aof_no_fsync_on_rewrite &&
 (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
 return;

 /* Perform the fsync if needed. */
 if (server.aof_fsync == AOF_FSYNC_ALWAYS) {//总是fsync,那么直接进行fsync
 /* aof_fsync is defined as fdatasync() for Linux in order to avoid
 * flushing metadata. */
 aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
 server.aof_last_fsync = server.unixtime;
 } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
 server.unixtime > server.aof_last_fsync)) {
 if (!sync_in_progress) aof_background_fsync(server.aof_fd);//放到后台线程进行fsync
 server.aof_last_fsync = server.unixtime;
 }
}
上述代码中请关注server.aof_fsync参数,即设置Redis fsync AOF文件到硬盘的策略,如果设置为AOF_FSYNC_ALWAYS,那么直接在主进程中fsync,如果设置为AOF_FSYNC_EVERYSEC,那么放入后台线程中fsync,后台线程的代码在bio.c中。

小结

文章写到这,已经解决的了Redis Server启动加载AOF文件和如何将客户端请求产生的新的数据追加到AOF文件中,对于追加数据到AOF文件中,根据fsync的配置策略如何将写入到AOF文件中的新数据刷新到硬盘中,直接在主进程中fsync或是在后台线程fsync。

至此,AOF数据持久化还剩下如何rewrite AOF,接受客户端发送的BGREWRITEAOF请求,此部分内容待下篇博客中解析。

感谢此篇博客给我在理解Redis AOF数据持久化方面的巨大帮助,http://chenzhenianqing.cn/articles/786.html

本人Redis-2.8.2的源码注释已经放到Github中,有需要的读者可以下载,我也会在后续的时间中更新,https://github.com/xkeyideal/annotated-redis-2.8.2

本人不怎么会使用Git,望有人能教我一下。

下载本文
显示全文
专题