Redis源码 - 持久化处理
开门见山,本文想重点聊聊以下内容:
- 持久化的时机,同步还是异步?
- 异步持久化遇到server退出如何处理
- 持久化文件大了怎么保证性能?
- Redis 有哪些机制保障数据的可靠性存储?
- RDB/AOF 各有哪些优缺点?
- AOF重写时,新数据如何处理?
出发点是,假设你需要实现一个带有存储的cache服务,你会如何做?
持久化的时机
redis 有两种持久化方式,分别为:aof 和 rdb,默认开启 rdb
如果需求开启aof
- redis.conf : appendonly on
先聊聊RDB
Redis RDB机制,先将内存快照保存至temp-pid.rdb文件,再copy至dump.rdb
RDB(point-in-time snapshot)无法保证数据不丢失,仅对可靠性提供弱支持,这是在性能要求下做的牺牲
优点:
- 实现简单,无update操作
- 可以用于独立分析,比如大key
- 提供故障恢复能力
缺点:
- 无法提供完善地可靠性保证
- 不支持检索功能
- RDB 需要经常调用 fork() 函数,以便使用子进程将数据持久化到磁盘上。如果数据集很大,fork() 可能会非常耗时,并且可能导致 Redis 停止服务客户端几毫秒甚至在数据集非常大且 CPU 性能不佳的情况下长达一秒钟
RDB 异步持久化
- serverCron 定时触发
- bgsaveCommand 命令触发
rdbSaveBackground,主进程通过 fork 子进程异步进行 childpid = fork()
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
openChildInfoPipe();
start = ustime();
if ((childpid = fork()) == 0) {
int retval;
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
......
return C_OK;
}
return C_OK; /* unreached */
}
异步触发的频率
- save 900 1
- save 300 10
- save 60 10000
// rdb 定期存盘参数
struct saveparam {
time_t seconds; // 时间间隔
int changes; // 修改次数
};
可见,异步持久化,无法保证不丢失数据
RDB 同步持久化
rdbSave
命令触发
- saveCommand
- flushallCommand
- shutdownCommand
- kill signal 进程退出触发prepareForShutdown->rdbSave
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
rio rdb;
int error = 0;
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Failed opening the RDB file %s (in server root dir %s) "
"for saving: %s",
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
return C_ERR;
}
rioInitWithFile(&rdb,fp);
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
// fflush 是 libc 提供的方法,调用 write 函数写到磁盘-其实是写到内核的缓冲区
// fsync 是系统提供的系统调用,把内核缓冲刷到磁盘上
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Error moving temp DB file %s on the final "
"destination %s (in server root dir %s): %s",
tmpfile,
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
return C_ERR;
}
serverLog(LL_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
return C_OK;
werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
return C_ERR;
}
同步持久化会保证数据不丢吗?
-
一样不会,因为时命令触发,在两次同步间隔时间出现故障将丢失数据 大量数据持久化,如果减少对系统的影响?
-
rdb-save-incremental-fsync yes 增量刷新到磁盘 Redis 5.0开始提供
- REDIS_AUTOSYNC_BYTES,缓存刷新到磁盘fsync
void rioSetAutoSync(rio *r, off_t bytes) {
if (r->write != rioFileIO.write) return;
r->io.file.autosync = bytes;
}
/* Returns 1 or 0 for success/failure. */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
size_t retval;
retval = fwrite(buf,len,1,r->io.file.fp);
r->io.file.buffered += len;
if (r->io.file.autosync &&
r->io.file.buffered >= r->io.file.autosync)
{
fflush(r->io.file.fp);
redis_fsync(fileno(r->io.file.fp));
r->io.file.buffered = 0;
}
return retval;
}
数据读取
redis 程序启动,从磁盘 rdb 文件加载数据到内存
int main(int argc, char **argv) {
...
if (!server.sentinel_mode) {
loadDataFromDisk();
}
}
/* flags on the purpose of rdb save or load */
#define RDBFLAGS_NONE 0
#define RDBFLAGS_AOF_PREAMBLE (1<<0)
#define RDBFLAGS_REPLICATION (1<<1)
/* Function called at startup to load RDB or AOF file in memory. */
void loadDataFromDisk(void) {
long long start = ustime();
if (server.aof_state == AOF_ON) {
if (loadAppendOnlyFile(server.aof_filename) == C_OK)
......
} else {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
......
}
}
......
}
AOF持久化
针对rdb非完整可靠性支持,AOF提供了更高的可靠性保证
优点:
- AOF的持久化方式采用append方式,同样不会修改已有数据,只会追加,减少I/O的随机读取,没有seek
- 使用 AOF 使得 Redis 更加持久化:有不同的 fsync 策略:appendfsync no,appendfsync everysec,每个查询 fsync 一次。采用默认策略,即每秒 fsync 一次,写入性能仍然很好。fsync 通过后台线程执行,而主线程会努力在没有进行 fsync 的时候执行写入操作,因此您最多只会丢失一秒钟的写入数据
- Redis 能够在 AOF 文件变得过大时自动在后台重写它。重写过程是完全安全的,因为在 Redis 继续向旧文件追加数据的同时,会生成一个全新的文件,其中包含创建当前数据集所需的最小操作集合,一旦这个新文件准备就绪,Redis 就会切换这两个文件,并开始向新文件追加数据
- AOF 包含了一个易于理解和解析的格式的所有操作日志。可以轻松导出 AOF 文件。例如,即使不小心使用 FLUSHALL 命令清空了所有数据,只要在此期间没有执行日志的重写,仍然可以通过停止服务器、移除最后的命令,并再次启动 Redis 来保存数据
缺点:
- AOF 通常比RDB文件大很多
- 由于AOF更频繁的fsync(系统调用),RDB(异步)提供更好的性能保证
持久化时机
-
写命令触发
-
先写入到全局的aof buf
-
将buf写入磁盘时机
- serverCron 定时将buf写入文件
- beforeSleep 每次eventloop
- prepareForShutdown/stopAppendOnly
-
fsync策略
- 如果appendfsync everysec 将异步执行,添加任务进入job,bio线程异步处理
- 如果appendfsync always 同步写入fsync,非常影响性能,但安全
- 如果appendfsync no 不实用fsync,性能最好,最不安全
void call(client *c, int flags)
// 需要append到aof buffer
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
struct redisServer {
......
sds aof_buf; /* AOF buffer, written before entering the event loop */
......
}
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
......
// 格式化为 redis 命令格式,然后追加到 aof buf
buf = catAppendOnlyGenericCommand(buf,argc,argv);
......
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
// 如果有子进程正在重写,父进程将新的数据发送给正在重写的子进程,使得重写文件数据更完备。
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
......
}
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
......
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* an higher frequency. */
run_with_period(1000) {
if (server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}
......
}
/* Write the append only file buffer on disk.
*
* Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
*
* About the 'force' argument:
*
* When the fsync policy is set to 'everysec' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
*
* However if force is set to 1 we'll write regardless of the background
* fsync. */
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force){
......
if (sdslen(server.aof_buf) == 0) return;
//需要换取当前是否有fsync任务
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
// 已经有fsync,delay处理
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponing, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
}
......
latencyStartMonitor(latency);
//将buf写入到内核缓冲区-系统调用write
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
......
/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* redis_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
}
flushAppendOnlyFile 核心逻辑两部分:
-
调用write将server.aof_buf写入到文件
-
根据条件选择是异步调用或者同步调用fsync保证数据刷到磁盘上
- 根据配置,如果配置了AOF_FSYNC_ALWAYS, 就同步的调用redis_fsync宏来完成刷盘, 具体的工作是redis_fsync完成的
- 如果配置了AOF_FSYNC_EVERYSEC,且没有正在运行的异步刷盘任务,就会调用aof_background_fsync来创建一个异步bio任务 (bioCreateBackgroundJob) 来完成刷盘
- 如果配置了AOF_FSYNC_EVERYSEC 且已有fsync job在进行,delay处理
AOF重写
AOF重写的目的是缩小文件的大小,防止无限增长
实现 AOF 重写的函数是 rewriteAppendOnlyFileBackground
触发时机:
-
bgrewriteaofCommand ,需要send bgrewriteaof命令给server,属于手动触发
-
前置条件:
- 没有AOF重写的子进程正在执行
- 没有创建 RDB 的子进程正在执行
-
-
configSetCommand ,手动触发,config set appendonly yes
- 一旦 AOF 功能启用后,configSetCommand 函数就会调用 startAppendOnly 函数,执行一次 AOF 重写
-
restartAOFAfterSYNC 它会在主从节点的复制过程中被调用 留到复制文章里谈论
-
老朋友 serverCron,serverCron 函数是被周期性执行的。然后它在执行的过程中,会做判断来决定是否执行 AOF 重写
-
判断条件
-
AOF功能启用
-
没有RDB子进程和AOF重写子进程在执行
-
AOF文件大小比例设定了阈值,以及AOF文件大小绝对值超出了阈值
- auto-aof-rewrite-percentage AOF 文件大小超出基础大小的比例,默认值为 100%,即超出 1 倍大小
- auto-aof-rewrite-min-size AOF 文件大小绝对值的最小值,默认为 64MB
-
-
运行机制: 异步
重写过程:
-
fork子进程,child调用rewriteAppendOnlyFile,主线程仍然可以处理客户端请求,没有阻塞
-
创建 aof 临时文件
-
子进程利用fork-on-write将数据存到 aof 临时文件:rioInitWithFile(&aof,fp)
-
逐步将文件缓存刷新到磁盘
- 根据aof_rewrite_incremental_fsync/REDIS_AUTOSYNC_BYTES
-
根据配置,重写文件内容方式,rdb 或者 aof,aof 存储方式支持 rdb 和 aof 内容兼容在同一个 aof 文件
-
fflush
-
fsync
/* ----------------------------------------------------------------------------
* AOF background rewrite
* ------------------------------------------------------------------------- */
/* This is how rewriting of the append only file in background works:
*
* 1) The user calls BGREWRITEAOF
* 2) Redis calls this function, that forks():
* 2a) the child rewrite the append only file in a temp file.
* 2b) the parent accumulates differences in server.aof_rewrite_buf.
* 3) When the child finished '2a' exists.
* 4) The parent will trap the exit code, if it's OK, will append the
* data accumulated into server.aof_rewrite_buf into the temp file, and
* finally will rename(2) the temp file in the actual file name.
* The the new file is reopened as the new append only file. Profit!
*/
int rewriteAppendOnlyFileBackground(void) {
......
if ((childpid = fork()) == 0) {
char tmpfile[256];
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-aof-rewrite");
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_AOF);
exitFromChild(0);
} else {
exitFromChild(1);
}
}
......
}
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename){
......
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}
server.aof_child_diff = sdsempty();
rioInitWithFile(&aof,fp);
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
/* Do an initial slow fsync here while the parent is still sending
* data, in order to make the next final fsync faster. */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
......
}
这里面有一个问题,既然是异步child进程完成,那rewrite期间,有新的命令进来如何处理?
Redis 利用 aof_rewrite_buf_blocks 列表处理这期间的新的写命令, item是aofrwblock
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
/* ----------------------------------------------------------------------------
* AOF rewrite buffer implementation.
*
* The following code implement a simple buffer used in order to accumulate
* changes while the background process is rewriting the AOF file.
*
* We only need to append, but can't just use realloc with a large block
* because 'huge' reallocs are not always handled as one could expect
* (via remapping of pages at OS level) but may involve copying data.
*
* For this reason we use a list of blocks, every block is
* AOF_RW_BUF_BLOCK_SIZE bytes.
* ------------------------------------------------------------------------- */
#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */
typedef struct aofrwblock {
unsigned long used, free;
char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
......
if (len) { /* First block to allocate, or need another block. */
int numblocks;
block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
listAddNodeTail(server.aof_rewrite_buf_blocks,block);
/* Log every time we cross more 10 or 100 blocks, respectively
* as a notice or warning. */
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
LL_NOTICE;
serverLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
/* Install a file event to send data to the rewrite child if there is
* not one already. */
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
/* Event handler used to send data to the child process doing the AOF
* rewrite. We send pieces of our AOF differences buffer so that the final
* write when the child finishes the rewrite will be small. */
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln;
aofrwblock *block;
ssize_t nwritten;
UNUSED(el);
UNUSED(fd);
UNUSED(privdata);
UNUSED(mask);
while(1) {
//从aof_rewrite_buf_blocks列表中取出数据
ln = listFirst(server.aof_rewrite_buf_blocks);
block = ln ? ln->value : NULL;
if (server.aof_stop_sending_diff || !block) {
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
AE_WRITABLE);
return;
}
if (block->used > 0) {
//调用write将数据块写入主进程和重写子进程间的管道,后面会详细解释
nwritten = write(server.aof_pipe_write_data_to_child,
block->buf,block->used);
if (nwritten <= 0) return;
memmove(block->buf,block->buf+nwritten,block->used-nwritten);
block->used -= nwritten;
block->free += nwritten;
}
if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
}
}
aofrwblock 结构体就相当于是一个 10MB 的数据块,记录了 AOF 重写期间主进程收到的命令
aof_rewrite_buf_blocks 列表负责将这些数据块连接起来
父进程写入过程:
- 当 feedAppendOnlyFile 执行append操作时,且server.aof_child_pid != -1 (说明有子进程在重写)
- aofRewriteBufferAppend 函数被调用,它会将命令data append到 aof_rewrite_buf_blocks 列表
- 使用aeCreateFileEvent注册写事件,回调函数aofChildWriteDiffData完成
- 当fd(server.aof_pipe_write_data_to_child)可写时 调用aofChildWriteDiffData取出数据,write to child pipe
子进程读取过程:
-
rewriteAppendOnlyFile进行重写时会调用aofReadDiffFromParent函数
- rewriteAppendOnlyFileRio也会调用aofReadDiffFromParent
- rdbSaveRio 这个函数是创建 RDB 文件的函数。当我们使用 AOF 和 RDB 混合持久化机制时,这个函数也会调用 aofReadDiffFromParent 函数
-
aofReadDiffFromParent 使用一个 64KB 大小的缓冲区,然后调用 read 函数,从fd(server.aof_pipe_read_data_from_parent)读取👆写入的数据
-
append到到全局变量 server.aof_child_diff 字符串中
-
rewriteAppendOnlyFile 在执行中将aof_child_diff中累积的操作写入AOF重写日志文件
子进程何时注册的读事件呢?
- rewriteAppendOnlyFile 执行中过程中注册,见👇代码
- aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1)
/* This function is called by the child rewriting the AOF file to read
* the difference accumulated from the parent into a buffer, that is
* concatenated at the end of the rewrite. */
ssize_t aofReadDiffFromParent(void) {
char buf[65536]; /* Default pipe buffer size on most Linux systems. */
ssize_t nread, total = 0;
while ((nread =
read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
total += nread;
}
return total;
}
int rewriteAppendOnlyFile(char *filename) {
......
int nodata = 0;
mstime_t start = mstime();
while(mstime()-start < 1000 && nodata < 20) {
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0) {
nodata++;
continue;
}
nodata = 0; /* Start counting from zero, we stop on N *contiguous*
timeouts. */
aofReadDiffFromParent();
}
}
至此完成父子进程间的数据同步
AOF数据读取
类似RDB文件读取,这里略
总结
开头的几个问题,相信都可以在上面的文字中找到答案
参考
Redis文档more
Redis source code 5.0.1