上一篇对RDB的源码分析是比较多的,但是AOF持久化执行进行了一些理论上的分析和概念的说明。本来想自己偷一些懒,将上篇文章中最后所给链接的AOF实现代码随便过一过算了,后来也就是在过的过程中发现自己这也看不懂那也看不懂才知道AOF的重要性和难度。

后来又花了不少时间查阅资料、结合源代码分析,对AOF的大概执行过程有了更深一些的了解,现在就将自己的理解和大家进行分享。其中肯定有理解不正确的地方,还望大神们能给予指正。

AOF相关配置项

首先我们看一下redis.conf里的关于AOF的配置选项:
Appendonly(yes,no)——是否开启AOF持久化
Appendfilename(log/appendonly.aof)——AOF日志文件
Appendfsync(always,everysec,no)——AOF日志文件同步的频率,always代表每次写都进行fsync,everysec每秒钟一次,no不主动fsync,由OS自己来完成。
no-appendfsync-on-rewrite(yes,no)——进行rewrite时,是否需要fsync
auto-aof-rewrite-percentage(100)——当AOF文件增长了这个比例(这里是增加了一倍),则后台rewrite自动运行
auto-aof-rewrite-min-size(64mb)——进行后面rewrite要求的最小AOF文件大小。这两个选项共同决定了后面rewrite进程是否到达运行的时机

通过上面的选项我们可以知道redis有三个AOF处理流程:

  • 每次更新操作进行的AOF写操作(涉及同步频率);
  • Rewrite,当满足auto-aof-rewrite-percentage,auto-aof-rewrite-min-size时后面自动运行rewrite操作;
  • Rewrite,当收到bgrewriteaof客户端命令时,马上运行后面rewrite操作。

注:当某个key过期的时候也会写AOF,其实它跟第一种很类似,也就是DEL操作。

在redis的较新版本中(不知道从哪个版本开始)增加了两个新的子进程:

  • REDIS_BIO_CLOSE_FILE,负责所有的close file操作
  • REDIS_BIO_AOF_FSYNC,负责fsync操作

因为这两个操作都可能会引起阻塞,如果在主线程中完成的话,会影响系统对事件的响应,所以这里统一由相应的子线程来完成,每个子线程都有一个自己的bio_jobs list,用来保存需要的处理的job任务。其相应的代码在bio.c(线程处理函数为bioProcessBackgroundJobs)里,这两个线程在initServer时创建bioInit()。

void initServer() {
//...
// 初始化 BIO 系统
    bioInit();
}

 

AOF的处理流程

  1.每次更新操作进行的AOF写操作(涉及同步频率)

主要涉及的配置是:Appendfsync(AOF日志文件同步的频率),no-appendfsync-on-rewrite(进行rewrite时,是否需要fsync),该操作的入口在redis.c。

void call(redisClient *c, int flags) {
... 
// 保留旧 dirty 计数器值
    dirty = server.dirty;
    // 计算命令开始执行的时间
    start = ustime();
    // 执行实现函数
    c->cmd->proc(c);
    // 计算命令执行耗费的时间
    duration = ustime()-start;
    // 计算命令执行之后的 dirty 值
    dirty = server.dirty-dirty;
    ....
    /* Propagate the command into the AOF and replication link */
    // 将命令复制到 AOF 和 slave 节点
    if (flags & REDIS_CALL_PROPAGATE) {
        int flags = REDIS_PROPAGATE_NONE;
        // 强制 REPL 传播
        if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;

        // 强制 AOF 传播
        if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;

        // 如果数据库有被修改,那么启用 REPL 和 AOF 传播
        if (dirty)
            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);

        if (flags != REDIS_PROPAGATE_NONE)
            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
    }
    ...
}

 

我们再来看一下propagate的实现:

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    // 传播到 AOF
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);

    // 传播到 slave
    if (flags & REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

 

我们再来看一下feedAppendOnlyFile的实现:

void feedAppendOnlyFile(struct redisCommand…{
if (dictid != server.aof_selected_db) {//当前操作的db与上一次不一样,所以要重新写一个新的select db命令,当rewrite的时候也会把appendseldb置为-1
        char seldb[64];
        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
 }
…
buf = catAppendOnlyGenericCommand(buf,argc,argv); //转换为标准命令格式
server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); //将命令写到aofbuf,这个buf会在serverCron当Appendfsync到满足时fsync到文件
if (server.bgrewritechildpid != -1) //如果有bgrewrite子进程的话,则也必须把该命令保存到bgrewritebuf,以便在子进程结束时,把新的变更追加到rewrite后的文件
    server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
…
}

 

 

可以看到到上面AOF操作也只是写到buf中,并没有将其写到文件中,下面我们将查看写到文件中的过程。通过查看代码我们可以知道flushAppendOnlyFile()函数是进行真正的写入文件操作。另外我们可以知道该函数会在beforeSleepserverCron中调用。其中beforeSleep是aeMain循环,每次进行事件处理前必须调用一次:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}
/* This function gets called every time Redis is entering the
 * main loop of the event driven library, that is, before to sleep
 * for ready file descriptors. */
// 每次处理事件之前执行
void beforeSleep(struct aeEventLoop *eventLoop) {
    ...
    /* Write the AOF buffer on disk */
    // 将 AOF 缓冲区的内容写入到 AOF 文件
    flushAppendOnlyFile(0);
    ...
}

 

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
     // 根据 AOF 政策,
    // 考虑是否需要将 AOF 缓冲区中的内容写入到 AOF 文件中
    /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    ...
}

下面我们来看一下该函数flushAppendOnlyFile的实现

/* Write the append only file buffer on disk.
 *
 * 将 AOF 缓存写入到文件中。
 *
 * Since we are required to write the AOF before replying to the client,
 * and the only way the client socket can get a write is entering when the
 * the event loop, we accumulate all the AOF writes in a memory
 * buffer and write it on disk using this function just before entering
 * the event loop again.
 *
 * 因为程序需要在回复客户端之前对 AOF 执行写操作。
 * 而客户端能执行写操作的唯一机会就是在事件 loop 中,
 * 因此,程序将所有 AOF 写累积到缓存中,
 * 并在重新进入事件 loop 之前,将缓存写入到文件中。
 *
 * About the 'force' argument:
 *
 * 关于 force 参数:
 *
 * When the fsync policy is set to 'everysec' we may delay the flush if there
 * is still an fsync() going on in the background thread, since for instance
 * on Linux write(2) will be blocked by the background fsync anyway.
 *
 * 当 fsync 策略为每秒钟保存一次时,如果后台线程仍然有 fsync 在执行,
 * 那么我们可能会延迟执行冲洗(flush)操作,
 * 因为 Linux 上的 write(2) 会被后台的 fsync 阻塞。
 *
 * When this happens we remember that there is some aof buffer to be
 * flushed ASAP, and will try to do that in the serverCron() function.
 *
 * 当这种情况发生时,说明需要尽快冲洗 aof 缓存,
 * 程序会尝试在 serverCron() 函数中对缓存进行冲洗。
 *
 * However if force is set to 1 we'll write regardless of the background
 * fsync. 
 *
 * 不过,如果 force 为 1 的话,那么不管后台是否正在 fsync ,
 * 程序都直接进行写入。
 */
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;

    // 缓冲区中没有任何内容,直接返回
    if (sdslen(server.aof_buf) == 0) return;

    // 策略为每秒 FSYNC 
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        // 是否有 SYNC 正在后台进行?
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

    // 每秒 fsync ,并且强制写入为假
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {

        /* With this append fsync policy we do background fsyncing.
         *
         * 当 fsync 策略为每秒钟一次时, fsync 在后台执行。
         *
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. 
         *
         * 如果后台仍在执行 FSYNC ,那么我们可以延迟写操作一两秒
         * (如果强制执行 write 的话,服务器主线程将阻塞在 write 上面)
         */
        if (sync_in_progress) {

            // 有 fsync 正在后台进行 。。。

            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponinig, remember that we are
                 * postponing the flush and return. 
                 *
                 * 前面没有推迟过 write 操作,这里将推迟写操作的起始时间记录下来
                 * 然后就返回,不执行 write 或者 fsync
                 */
                server.aof_flush_postponed_start = server.unixtime;
                return;

            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. 
                 *
                 * 如果之前已经因为 fsync 而推迟了 write 操作
                 * 但是推迟的时间不超过 2 秒,那么直接返回
                 * 不执行 write 或者 fsync
                 */
                return;

            }

            /* Otherwise fall trough, and go write since we can't wait
             * over two seconds. 
             *
             * 如果后台还有 fsync 在执行,并且 write 已经推迟 >= 2 秒
             * 那么执行写操作(write 将被阻塞)
             */
            server.aof_delayed_fsync++;
            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }

    /* If you are following this code path, then we are going to write so
     * set reset the postponed flush sentinel to zero. 
     *
     * 执行到这里,程序会对 AOF 文件进行写入。
     *
     * 清零延迟 write 的时间记录
     */
    server.aof_flush_postponed_start = 0;

    /* We want to perform a single write. This should be guaranteed atomic
     * at least if the filesystem we are writing is a real physical one.
     *
     * 执行单个 write 操作,如果写入设备是物理的话,那么这个操作应该是原子的
     *
     * While this will save us against the server being killed I don't think
     * there is much to do about the whole server stopping for power problems
     * or alike 
     *
     * 当然,如果出现像电源中断这样的不可抗现象,那么 AOF 文件也是可能会出现问题的
     * 这时就要用 redis-check-aof 程序来进行修复。
     */
    nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    if (nwritten != (signed)sdslen(server.aof_buf)) {//写入文件有错

        static time_t last_write_error_log = 0;
        int can_log = 0;

        /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
        // 将日志的记录频率限制在每行 AOF_WRITE_LOG_ERROR_RATE 秒
        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
            can_log = 1;
            last_write_error_log = server.unixtime;
        }

        /* Lof the AOF write error and record the error code. */
        // 如果写入出错,那么尝试将该情况写入到日志里面
        if (nwritten == -1) {
            if (can_log) {
                redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
                    strerror(errno));
                server.aof_last_write_errno = errno;
            }
        } else {
            if (can_log) {
                redisLog(REDIS_WARNING,"Short write while writing to "
                                       "the AOF file: (nwritten=%lld, "
                                       "expected=%lld)",
                                       (long long)nwritten,
                                       (long long)sdslen(server.aof_buf));
            }

            // 尝试移除新追加的不完整内容
            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                if (can_log) {
                    redisLog(REDIS_WARNING, "Could not remove short write "
                             "from the append-only file.  Redis may refuse "
                             "to load the AOF the next time it starts.  "
                             "ftruncate: %s", strerror(errno));
                }
            } else {
                /* If the ftrunacate() succeeded we can set nwritten to
                 * -1 since there is no longer partial(部分的,局部的) data into the AOF. */
                nwritten = -1;
            }
            server.aof_last_write_errno = ENOSPC;
        }

        /* Handle the AOF write error. */
        // 处理写入 AOF 文件时出现的错误
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            /* We can't recover when the fsync policy is ALWAYS since the
             * reply for the client is already in the output buffers, and we
             * have the contract with the user that on acknowledged write data
             * is synched on disk. */
            //当fsync是ALWAYS时,那么如果出错我们是不可能进行恢复的,因为尽管出错,我们对用户的回复已经
            //到达了输出缓冲区,并且我们还向用户说明(set sadd等操作的)写数据已经写到了磁盘
            redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
            exit(1);
        } else {
            /* Recover from failed write leaving data into the buffer. However
             * set an error to stop accepting writes as long as the error
             * condition is not cleared. */
            server.aof_last_write_status = REDIS_ERR;

            /* Trim the sds buffer if there was a partial write, and there
             * was no way to undo it with ftruncate(2). */
            //如果这是局部写的话(我靠,我也翻译不好),那就缩减sds buffer(aof_buffer)的大小
            if (nwritten > 0) {
                server.aof_current_size += nwritten;
                sdsrange(server.aof_buf,nwritten,-1);
            }
            return; /* We'll try again on the next call... */
        }
    } else {//写入文件没错
        /* Successful write(2). If AOF was in error state, restore the
         * OK state and log the event. */
        // 写入成功,更新最后写入状态
        if (server.aof_last_write_status == REDIS_ERR) {
            redisLog(REDIS_WARNING,
                "AOF write error looks solved, Redis can write again.");
            server.aof_last_write_status = REDIS_OK;
        }
    }

    // 更新写入后的 AOF 文件大小
    server.aof_current_size += nwritten;

    /* Re-use AOF buffer when it is small enough. The maximum comes from the
     * arena size of 4k minus some overhead (but is otherwise arbitrary). 
     *
     * 如果 AOF 缓存的大小足够小的话,那么重用这个缓存,
     * 否则的话,释放 AOF 缓存。
     * sdsavail(server.aof_buf)返回 aof_buf 可用空间的长度
     * sdslen(server.aof_buf)返回 aof_buf 实际保存的字符串的长度
     */
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        // 清空缓存中的内容,等待重用
        sdsclear(server.aof_buf);
    } else {
        // 释放缓存
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }

    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. 
     *
     * 如果 no-appendfsync-on-rewrite 选项为开启状态,
     * 并且有 BGSAVE 或者 BGREWRITEAOF 正在进行的话,
     * 那么不执行 fsync 
     */
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;

    /* Perform the fsync if needed. */

    // 总是执行 fsnyc
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        // 更新最后一次执行 fsnyc 的时间
        server.aof_last_fsync = server.unixtime;
    // 策略为每秒 fsnyc ,并且距离上次 fsync 已经超过 1 秒
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        // 放到后台执行
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        // 更新最后一次执行 fsync 的时间
        server.aof_last_fsync = server.unixtime;
    }

    // 其实上面无论执行 if 部分还是 else 部分都要更新 fsync 的时间
    // 可以将代码挪到下面来
    // server.aof_last_fsync = server.unixtime;
}
View Code

相关文章:

  • 2021-12-26
  • 2021-12-06
  • 2022-01-01
  • 2021-12-01
  • 2021-08-02
  • 2021-05-17
  • 2021-12-17
猜你喜欢
  • 2021-07-09
  • 2021-10-22
  • 2022-01-04
  • 2021-11-20
  • 2022-01-10
  • 2022-02-07
相关资源
相似解决方案