哨兵模式

哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。

哨兵有两个作用:

  • 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
  • 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。

哨兵配置
首先配置Redis的主从服务器,修改redis.conf文件如下:

1
2
3
4
5
6
7
8
# 使得Redis服务器可以跨网络访问
bind 0.0.0.0
# 设置密码
requirepass "123456"
# 指定主服务器,注意:有关slaveof的配置只是配置从服务器,主服务器不需要配置
slaveof 192.168.11.128 6379
# 主服务器密码,注意:有关slaveof的配置只是配置从服务器,主服务器不需要配置
masterauth 123456

上述内容主要是配置Redis服务器,从服务器比主服务器多一个slaveof的配置和密码。

配置3个哨兵,每个哨兵的配置都是一样的。在Redis安装目录下有一个sentinel.conf文件,copy一份进行修改

1
2
3
4
5
6
7
# 禁止保护模式
protected-mode no
# 配置监听的主服务器,这里sentinel monitor代表监控,mymaster代表服务器的名称,可以自定义,192.168.11.128代表监控的主服务器,6379代表端口,2代表只有两个或两个以上的哨兵认为主服务器不可用的时候,才会进行failover操作。
sentinel monitor mymaster 192.168.11.128 6379 2
# sentinel author-pass定义服务的密码,mymaster是服务名称,123456是Redis服务器密码
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster 123456

上述关闭了保护模式,便于测试。

有了上述的修改,我们可以进入Redis的安装目录的src目录,通过下面的命令启动服务器和哨兵

1
2
3
4
# 启动Redis服务器进程
./redis-server ../redis.conf
# 启动哨兵进程
./redis-sentinel ../sentinel.conf

注意启动的顺序。首先是主机(192.168.11.128)的Redis服务进程,然后启动从机的服务进程,最后启动3个哨兵的服务进程。

源码解析

启动时根据启动参数,如果是哨兵模式则会执行相应的方法。

  1. initSentinelConfig();initSentinel(); 初始相应的配置;
  2. initServer();中执行aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)开启Sentinel模式的定时任务,定时执行serverCron() -> sentinelTimer()方法。
  3. sentinelIsRunning();验证Sentinel模式是否在运行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int main(int argc, char **argv) {
// ...
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
//..
initServer();
// ...
if (!server.sentinel_mode) {
// ...
}else {
sentinelIsRunning();
}
}

初始化 initSentinel()

initSentinel()初始生成配置对象.

该方法在Sentinel.c中实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void initSentinel(void) {
unsigned int j;

/* Remove usual Redis commands from the command table, then just add
* the SENTINEL command. */
dictEmpty(server.commands,NULL);
for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
int retval;
struct redisCommand *cmd = sentinelcmds+j;

retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
serverAssert(retval == DICT_OK);
}

/* Initialize various data structures. */
sentinel.current_epoch = 0;
sentinel.masters = dictCreate(&instancesDictType,NULL);
sentinel.tilt = 0;
sentinel.tilt_start_time = 0;
sentinel.previous_time = mstime();
sentinel.running_scripts = 0;
sentinel.scripts_queue = listCreate();
sentinel.announce_ip = NULL;
sentinel.announce_port = 0;
sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
sentinel.deny_scripts_reconfig = SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG;
memset(sentinel.myid,0,sizeof(sentinel.myid));
}

验证是否在运行 sentinelIsRunning()

该方法在Sentinel.c中实现.

  1. 配置和权限校验;
  2. 如果自身的myid未设置则生成一个随机的myid;
  3. 监控每个配置在配置文件中的master节点;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void sentinelIsRunning(void) {
int j;
// 配置和权限校验
if (server.configfile == NULL) {
serverLog(LL_WARNING,
"Sentinel started without a config file. Exiting...");
exit(1);
} else if (access(server.configfile,W_OK) == -1) {
serverLog(LL_WARNING,
"Sentinel config file %s is not writable: %s. Exiting...",
server.configfile,strerror(errno));
exit(1);
}
/* If this Sentinel has yet no ID set in the configuration file, we
* pick a random one and persist the config on disk. From now on this
* will be this Sentinel ID across restarts. */
// 如果自身的myid未设置则生成一个随机的myid
for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
if (sentinel.myid[j] != 0) break;

if (j == CONFIG_RUN_ID_SIZE) {
/* Pick ID and persist the config. */
getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
sentinelFlushConfig();
}

/* Log its ID to make debugging of issues simpler. */
serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid);
// 开启监控每个配置的master节点
/* We want to generate a +monitor event for every configured master
* at startup. */
sentinelGenerateInitialMonitorEvents();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* This function is called only at startup and is used to generate a
* +monitor event for every configured master. The same events are also
* generated when a master to monitor is added at runtime via the
* SENTINEL MONITOR command. */
void sentinelGenerateInitialMonitorEvents(void) {
dictIterator *di;
dictEntry *de;

di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
}
dictReleaseIterator(di);
}

开启定时任务 initServer()

该方法在Server.c中实现

1
2
3
4
5
6
7
void initServer(void) {
// ...
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
}

执行定时任务serverCron方法。

1
2
3
4
5
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
if (server.sentinel_mode) sentinelTimer();
//...
}

sentinel模式核心方法 sentinelTimer()

该方法在Sentinel.c中实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void sentinelTimer(void) {
// 检查定时是否正常,是否需要进入Tilt模式
sentinelCheckTiltCondition();
// 处理master节点异常,切换主从
sentinelHandleDictOfRedisInstances(sentinel.masters);
// 运行等待执行的脚本
sentinelRunPendingScripts();
// 清理已执行完毕的脚本,并重试出错的脚本
sentinelCollectTerminatedScripts();
// 杀死运行超时的脚本
sentinelKillTimedoutScripts();

/* We continuously change the frequency of the Redis "timer interrupt"
* in order to desynchronize every Sentinel from every other.
* This non-determinism avoids that Sentinels started at the same time
* exactly continue to stay synchronized asking to be voted at the
* same time again and again (resulting in nobody likely winning the
* election because of split brain voting). */
server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}

检查定时是否正常,是否需要进入Tilt模式 sentinelCheckTiltCondition()

Redis Sentinel 严重依赖计算机的时间功能:比如说,为了判断一个实例是否可用,Sentinel 会记录这个实例最后一次相应 PING 命令的时间,并将这个时间和当前时间进行对比,从而知道这个实例有多长时间没有和 Sentinel 进行任何成功通讯。

不过, 一旦计算机的时间功能出现故障, 或者计算机非常忙碌, 又或者进程因为某些原因而被阻塞时, Sentinel 可能也会跟着出现故障。

TILT 模式是一种特殊的保护模式, 当 Sentinel 发现系统有些不对劲时,Sentinel 就会进入 TILT 模式。

因为 Sentinel 的时间中断器默认每秒执行 10 次 所以我们预期时间中断器的两次执行之间的间隔为 100 毫秒左右。Sentinel 的做法是,记录上一次时间中断器执行时的时间,并将它和这一次时间中断器执行的时间进行对比:

如果两次调用时间之间的差距为负值,或者非常大(超过 2 秒钟 SENTINEL_TILT_TRIGGER),那么 Sentinel 进入 TILT 模式。
如果 Sentinel 已经进入 TILT 模式, 那么 Sentinel 延迟退出 TILT 模式的时间。
当 Sentinel 进入 TILT 模式时, 它仍然会继续监视所有目标, 但是:

  • 它不再执行任何操作,比如故障转移;
  • 当有实例向这个 Sentinel 发送 SENTINEL is-master-down-by-addr 命令时, Sentinel 返回负值: 因为这个 Sentinel 所进行的下线判断已经不再准确;
  • 如果 TILT 可以正常维持 30 秒钟, 那么 Sentinel 退出 TILT 模式;
1
2
3
4
5
6
7
8
9
10
11
12
void sentinelCheckTiltCondition(void) {
mstime_t now = mstime();
mstime_t delta = now - sentinel.previous_time;
// SENTINEL_TILT_TRIGGER = 2000
// 定时器两次
if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
sentinel.tilt = 1;
sentinel.tilt_start_time = mstime();
sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered");
}
sentinel.previous_time = mstime();
}

master监控-处理主从切换 sentinelHandleDictOfRedisInstances(sentinel.masters)

  1. 遍历管理的master列表:
    1. 对master节点存活检测,并进行故障处理(主从切换)
    2. 对master节点下的slaves存活检测;
    3. 对监控master节点的sentinels存活检测;
    4. 如果故障恢复处于SENTINEL_FAILOVER_STATE_UPDATE_CONFIG状态,则记录当前master为switch_to_promoted;
  2. 如果switch_to_promoted不为空,则执行sentinelFailoverSwitchToPromotedSlave。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;

/* There are a number of things we need to perform against every master. */
// 生成被管理的master列表的迭代器
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) { // 迭代所有实例
sentinelRedisInstance *ri = dictGetVal(de);
// xxx
sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}
if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}

存活检测与故障处理 sentinelHandleDictOfRedisInstances

  1. 断开重连;
  2. 发送INFO, PING, PUBLIST hello命令
  3. 节点存活校验(主观性判断)
  4. 如果节点是master:
    1. 节点存活校验(客观性判断)
    2. 判断节点是否需要故障转移,并向master节点发送故障转移的信号:
      1. 向监视该master的所有sentinel节点发送is-master-down-by-addr命令
    3. 故障恢复:故障恢复状态机
    4. 处理响应超时的sentinel并清除该sentinel的leader属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* ========== MONITORING HALF ============ */
/* Every kind of instance */
// 如果连接断开就重新连接
sentinelReconnectInstance(ri);
// 向节点发送 INFO,INFO响应后调用sentinelInfoReplyCallback,这一部分与故障恢复有关
// 向节点发送 PING
// 向所有节点PUBLISH hello
sentinelSendPeriodicCommands(ri);

/* ============== ACTING HALF ============= */
/* We don't proceed with the acting half if we are in TILT mode.
* TILT happens when we find something odd with the time, like a
* sudden change in the clock. */
if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}

/* Every kind of instance */
// 节点存活校验(主观性判断)
sentinelCheckSubjectivelyDown(ri);

/* Masters and slaves */
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
/* Nothing so far. */
}

/* Only masters */
// 主观下线会将master状态标记为 SRI_MASTER
if (ri->flags & SRI_MASTER) {
// 节点存活校验(客观性判断)
sentinelCheckObjectivelyDown(ri);
// 判断节点是否需要故障转移,并向master节点发送故障转移的信号
// 如果已经开始则会跳过
if (sentinelStartFailoverIfNeeded(ri))
// 向监视该master的所有sentinel节点发送is-master-down-by-addr命令
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
// 故障恢复状态机
sentinelFailoverStateMachine(ri);
// 处理响应超时的sentinel并清除该sentinel的leader属性
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}

主观下线判断 sentinelCheckSubjectivelyDown(ri);

  1. 检查是否需要关闭ping连接;
  2. 检查是否需要关闭pubsub连接;
  3. 没有回应命令或Sentinel认为实例是主服务器,这个服务器向Sentinel报告它将成为从服务器,但在超过给定时限之后,服务器仍然没有完成这一角色转换,则将标记为 SDOWN。
  4. 如果不满足3则清除 SDOWN 标记。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/* Is this instance down from our point of view? */
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;

if (ri->link->act_ping_time)
elapsed = mstime() - ri->link->act_ping_time;PUBLISH hello
elapsed = mstime() - ri->link->last_avail_time;

/* Check if we are in need for a reconnection of one of the
* links, because we are detecting low activity.
*
* 1) Check if the command link seems connected, was connected not less
* than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
* pending ping for more than half the timeout. */
if (ri->link->cc &&
(mstime() - ri->link->cc_conn_time) >P
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
ri->link->act_ping_time != 0 && /* There is a pending ping... */
/* The pending ping is delayed, and we did not receive
* error replies as well. */
(mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
(mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
{
instanceLinkCloseConnection(ri->link,ri->link->cc);
}

/* 2) Check if the pubsub link seems connected, was connected not less
* than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
* activity in the Pub/Sub channel for more than
* SENTINEL_PUBLISH_PERIOD * 3.
*/
if (ri->link->pc &&
(mstime() - ri->link->pc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
(mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
{
instanceLinkCloseConnection(ri->link,ri->link->pc);
}

/* Update the SDOWN flag. We believe the instance is SDOWN if:
*
* 1) It is not replying.
* 2) We believe it is a master, it reports to be a slave for enough time
* to meet the down_after_period, plus enough time to get two times
* INFO report from the instance. */
if (elapsed > ri->down_after_period ||
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time >
(ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
{
/* Is subjectively down */
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {
/* Is subjectively up */
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}

客观下线判断 sentinelCheckObjectivelyDown(ri)

  1. 遍历监视这个被主观下线master的sentinel实例;
    1. 如果某sentinel也认为这个master下线了,则计数quorum加一;
  2. 如果满足实际认为主观下线的sentinel个数大于master配置的下线所需的sentinel个数,则标记odown = 1;
  3. 如果odown = 1,则将master的标志设置为SRI_O_DOWN,并记录实际;
  4. 如果不是SRI_O_DOWN,则将master的标志清除SRI_O_DOWN。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;
// master被标记为主观下线状态
if (master->flags & SRI_S_DOWN) {
/* Is down for enough sentinels? */
// 被多少个sentinel主观下线
quorum = 1; /* the current sentinel. */
/* Count all the other sentinels. */
di = dictGetIterator(master->sentinels);
// 遍历所有sentinel实例,判断是否认为该master主观下线
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
// 如果 认为主观下线的sentinel个数(quorum)大于配置的
// 该master下线所要的个数,则标记为odown = 1
if (quorum >= master->quorum) odown = 1;
}

/* Set the flag accordingly to the outcome. */
// 客观下线,设置标志为SRI_O_DOWN
if (odown) {
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
} else { // 清除SRI_O_DOWN
if (master->flags & SRI_O_DOWN) {
sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
}
}
}

校验并执行故障恢复(主从切换)sentinelStartFailoverIfNeeded(ri)

  1. SRI_O_DOWN标志(客观下线)校验;
  2. SRI_FAILOVER_IN_PROGRESS(正在恢复)校验;
  3. 故障恢复(主从切换)已经运行的时间小于failover_timeout*2
    1. failover_delay_logged不等于failover_start_time
      1. 设置failover_delay_logged时间为当前时间;
    2. 返回0;
  4. 执行故障恢复(主从切换)sentinelStartFailover;
  5. 返回1;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
/* We can't failover if the master is not in O_DOWN state. */
if (!(master->flags & SRI_O_DOWN)) return 0;

/* Failover already in progress? */
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;

/* Last failover attempt started too little time ago? */
if (mstime() - master->failover_start_time <
master->failover_timeout*2)
{
if (master->failover_delay_logged != master->failover_start_time) {
time_t clock = (master->failover_start_time +
master->failover_timeout*2) / 1000;
char ctimebuf[26];

ctime_r(&clock,ctimebuf);
ctimebuf[24] = '\0'; /* Remove newline. */
master->failover_delay_logged = master->failover_start_time;
serverLog(LL_WARNING,
"Next failover delay: I will not start a failover before %s",
ctimebuf);
}
return 0;
}

sentinelStartFailover(master);
return 1;
}

故障恢复(主从切换) sentinelStartFailover(master)

  1. 校验主节点是否处于客观下线状态;
  2. 设置故障恢复状态为SENTINEL_FAILOVER_STATE_WAIT_START;
  3. 设置节点状态为正在故障恢复SRI_FAILOVER_IN_PROGRESS;
  4. failover_epoch时代加一;
  5. 向master发送新时代;
  6. 向master发送尝试故障恢复;
  7. 记录开始时间failover_start_time;
  8. 记录状态改变时间.
1
2
3
4
5
6
7
8
9
10
11
12
void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(master->flags & SRI_MASTER);

master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
master->flags |= SRI_FAILOVER_IN_PROGRESS;
master->failover_epoch = ++sentinel.current_epoch;
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
master->failover_state_change_time = mstime();
}

is-master-down-by-addr命令发送sentinelAskMasterStateToOtherSentinels()

SENTINEL is-master-down-by-addr ip port current_epoch runid:ip:主观下线的服务id,port:主观下线的服务端口,current_epoch:sentinel的纪元,runid:*表示检测服务下线状态,如果是sentinel 运行id,表示用来选举领头sentinel

  1. 遍历监听这个master的sentinel节点
    1. 最后一次回复is-master-down-by-addr的时间超过 SENTINEL_ASK_PERIOD*5,则清除该sentinel的leader属性
    2. 向监控该 master 节点所有的 sentinel 发送is-master-down-by-addr命令,如果是下面三个情况则不发送;如果master的failover_state状态不是SENTINEL_FAILOVER_STATE_NONE状态则会发起的是选举领头sentinel,不然是监测服务下线状态。
      1. master标志不是客观下线状态(SRI_S_DOWN);
      2. 与这个Sentinel节点失去了连接;
      3. 传入参数flags不是SENTINEL_ASK_FORCED,同时这个sentinel最后回复is-master-down-by-addr命令的时间不超过SENTINEL_ASK_PERIOD。

命令是调用redisAsyncCommand向ri异步发送命令,命令的回调函数是sentinelReceiveIsMasterDownReply。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#define SENTINEL_ASK_FORCED (1<<0)
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
dictIterator *di;
dictEntry *de;
//获取监听这个master的sentinel
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
// 距离该 sentinel 最后一次回复 SENTINEL master-down-by-addr 命令已经过了多久(用于客观下线判断)
// is-master-down-by-addr:哨兵节点之间可以通过该命令询问主节点是否下线
mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
char port[32];
int retval;

/* If the master state from other sentinel is too old, we clear it. */
// 最后一次查询时间超过 SENTINEL_ASK_PERIOD*5,则清除该sentinel的leader属性
if (elapsed > SENTINEL_ASK_PERIOD*5) {
ri->flags &= ~SRI_MASTER_DOWN;
sdsfree(ri->leader);
ri->leader = NULL;
}

/* Only ask if master is down to other sentinels if:
*
* 1) We believe it is down, or there is a failover in progress.
* 2) Sentinel is connected.
* 3) We did not receive the info within SENTINEL_ASK_PERIOD ms. */
if ((master->flags & SRI_S_DOWN) == 0) continue;
if (ri->link->disconnected) continue;
// master有响应,且最后一次响应时间要在SENTINEL_ASK_PERIOD内
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;1

/* Ask */
// 端口的字符串
ll2string(port,sizeof(port),master->addr->port);
// 发起 is-master-down-by-addr 请求
// 回调函数是sentinelReceiveIsMasterDownReply
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, ri,
"%s is-master-down-by-addr %s %s %llu %s",
sentinelInstanceMapCommand(ri,"SENTINEL"),
master->addr->ip, port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
// 等待命令加一
if (retval == C_OK) ri->link->pending_commands++;
}
dictReleaseIterator(di);
}

命令响应回调函数sentinelReceiveIsMasterDownReply

sentinel节点在确定master客观下线后会向监听master的所有sentinel节点发送命令,而sentinelReceiveIsMasterDownReply方法负责处理响应结果。

  1. 忽略错误和不期望的回复;
  2. 记录最新一次回复时间(last_master_down_reply_time)为当前时间;
  3. 如果返回的第一个参数是1则标记master节点为下线状态 ri->flags |= SRI_MASTER_DOWN;不然标记为未下线状态 ri->flags &= ~SRI_MASTER_DOWN;
  4. 返回的第二个参数与*比较大小,如果大于则代表这是个回复投票;
    1. 释放这个sentinel节点的leader属性(ri->leader);
    2. 将这个sentinel节点的leader属性(ri->leader)设置为回复的值(响应的第二个参数);
    3. 将这个sentinel节点的leader_epoch属性设置为回复的值(响应的个参第三个参数)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

/* Ignore every error or unexpected reply.
* Note that if the command returns an error for any reason we'll
* end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
r->element[0]->type == REDIS_REPLY_INTEGER &&
r->element[1]->type == REDIS_REPLY_STRING &&
r->element[2]->type == REDIS_REPLY_INTEGER)
{
ri->last_master_down_reply_time = mstime();
if (r->element[0]->integer == 1) {
ri->flags |= SRI_MASTER_DOWN;
} else {
ri->flags &= ~SRI_MASTER_DOWN;
}
if (strcmp(r->element[1]->str,"*")) {
/* If the runid in the reply is not "*" the Sentinel actually
* replied with a vote. */
sdsfree(ri->leader);
if ((long long)ri->leader_epoch != r->element[2]->integer)
serverLog(LL_WARNING,
"%s voted for %s %llu", ri->name,
r->element[1]->str,
(unsigned long long) r->element[2]->integer);
ri->leader = sdsnew(r->element[1]->str);
ri->leader_epoch = r->element[2]->integer;
}
}
}

选举主sentinel sentinelGetLeader

  1. 计算监控该master的所有sentinel数量(包含当前sentinel节点);
  2. 遍历所有监控maaster的sentinel节点:
    1. 如果该sentinel节点推举的leader不为空且与当前sentinel属于同个时代,则记录投票信息到字典中(key:sentinel节点,value:投票数);
  3. 计算字典中的投票结果,得出票选最多的节点(winner)和票数;
  4. 如果有投票结果,则设置投票结果推举的节点为当前节点推举的leader,如果不是则推举自己;
  5. 将当前节点的投票信息加入之前的投票结果的字典中,并且再次统计票选最多的节点(winner)和票数;
  6. 判断得票最多的节点的票数:如果小于sentinel节点数的一半加一或小于master->quorum配置的数量,则将票选最多的节点(winner)置空;
  7. 返回投票结果(winner)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
dict *counters;
dictIterator *di;
dictEntry *de;
unsigned int voters = 0, voters_quorum;
char *myvote;
char *winner = NULL;
uint64_t leader_epoch;
uint64_t max_votes = 0;

serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
counters = dictCreate(&leaderVotesDictType,NULL);
// 监听master的总sentinel数
voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/

/* Count other sentinels votes */
// 统计同时代的选举票数
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
// 这个leader的票数加一
sentinelLeaderIncr(counters,ri->leader);
}
dictReleaseIterator(di);

/* Check what's the winner. For the winner to win, it needs two conditions:
* 1) Absolute majority between voters (50% + 1).
* 2) And anyway at least master->quorum votes. */
di = dictGetIterator(counters);
// 计算最大票数
while((de = dictNext(di)) != NULL) {
uint64_t votes = dictGetUnsignedIntegerVal(de);

if (votes > max_votes) {
max_votes = votes;
winner = dictGetKey(de);
}
}
dictReleaseIterator(di);

/* Count this Sentinel vote:
* if this Sentinel did not voted yet, either vote for the most
* common voted sentinel, or for itself if no vote exists at all. */
// 如果有最大的票则投最大票对应的节点,不然就投自己
if (winner)
myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
else
myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch);
// 加上自己的投票再次计算投票数
if (myvote && leader_epoch == epoch) {
uint64_t votes = sentinelLeaderIncr(counters,myvote);

if (votes > max_votes) {
max_votes = votes;
winner = myvote;
}
}
// 半数
voters_quorum = voters/2+1;
// 如果投票结果没有超过半数或没有达到master->quorum的数,则置空。
if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
winner = NULL;

winner = winner ? sdsnew(winner) : NULL;
sdsfree(myvote);
dictRelease(counters);
return winner;
}

选出可以升级为主节点的从节点 sentinelSelectSlave

  1. 状态校验;
  2. 遍历下线的这个master下的所有slave节点, 满足以下条件的记录到数组instance;
    1. slave状态不是主观下线或者客观下线(SRI_S_DOWN|SRI_O_DOWN);
    2. slave没有断开连接;
    3. slave上一次可用距离现在没超过5倍的SENTINEL_PING_PERIOD;
    4. slave的优先级(slave_priority)不是0;
    5. 如果master主观下线,则从节点的最新信息距离现在不超过SENTINEL_PING_PERIOD5,不然不超过SENTINEL_PING_PERIOD3;
    6. 如果从节点和主节点的链接已断开时间不超过max_master_down_time;
  3. instance数组使用qsort排序,然后取第一个作为选择的slave。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0])*dictSize(master->slaves));
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
mstime_t max_master_down_time = 0;
// 客观下线状态
if (master->flags & SRI_S_DOWN)
max_master_down_time += mstime() - master->s_down_since_time;
max_master_down_time += master->down_after_period * 10;

di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;

if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
if (slave->link->disconnected) continue;
if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
if (slave->slave_priority == 0) continue;

/* If the master is in SDOWN state we get INFO for slaves every second.
* Otherwise we get it with the usual period so we need to account for
* a larger delay. */
if (master->flags & SRI_S_DOWN)
info_validity_time = SENTINEL_PING_PERIOD*5;
else
info_validity_time = SENTINEL_INFO_PERIOD*3;
if (mstime() - slave->info_refresh > info_validity_time) continue;
if (slave->master_link_down_time > max_master_down_time) continue;
instance[instances++] = slave;
}
dictReleaseIterator(di);
if (instances) {
qsort(instance,instances,sizeof(sentinelRedisInstance*),
compareSlavesForPromotion);
selected = instance[0];
}
zfree(instance);
return selected;
}

compareSlavesForPromotion 两个slave比较的函数。

  1. 先根据优先级比较优先级高的优先,如果相等则继续判断;
  2. 比较redis主从同步的offset(slave_repl_offset),offset小的优先;
  3. 比较runid,runid相等为0,a为null的优先,都不为null则使用strcasecmp判断大小。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    int compareSlavesForPromotion(const void *a, const void *b) {
    sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
    **sb = (sentinelRedisInstance **)b;
    char *sa_runid, *sb_runid;

    if ((*sa)->slave_priority != (*sb)->slave_priority)
    return (*sa)->slave_priority - (*sb)->slave_priority;

    /* If priority is the same, select the slave with greater replication
    * offset (processed more data from the master). */
    if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
    return -1; /* a < b */
    } else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
    return 1; /* a > b */
    }

    /* If the replication offset is the same select the slave with that has
    * the lexicographically smaller runid. Note that we try to handle runid
    * == NULL as there are old Redis versions that don't publish runid in
    * INFO. A NULL runid is considered bigger than any other runid. */
    sa_runid = (*sa)->runid;
    sb_runid = (*sb)->runid;
    if (sa_runid == NULL && sb_runid == NULL) return 0;
    else if (sa_runid == NULL) return 1; /* a > b */
    else if (sb_runid == NULL) return -1; /* a < b */
    return strcasecmp(sa_runid, sb_runid);
    }

故障恢复状态机及相应方法 sentinelFailoverStateMachine()

根据故障恢复当前的状态执行相应的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);
// 如果没有在故障恢复状态则结束
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
// 根据状态判断
switch(ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START: // 选出sentinel主节点
sentinelFailoverWaitStart(ri);
break;
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: // 在集群slave中选出新的主节点
sentinelFailoverSelectSlave(ri);
break;
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: // 将选出的slave节点提升为master节点
sentinelFailoverSendSlaveOfNoOne(ri);
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: // 等待选出的slave节点提升为master(校验是否超时)
sentinelFailoverWaitPromotion(ri);
break;
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: // 其他从节点同步新的master
sentinelFailoverReconfNextSlave(ri);
break;
}
}

等待开始状态选出sentinel主节点 sentinelFailoverWaitStart()

  1. 选举sentinel主节点;
  2. 如果leader不是当前节点,且当前节点标记不为SRI_FORCE_FAILOVER:
    1. 获取选举超时的最大时间;
    2. 判断选举是否超时,是则中断故障恢复;
    3. return
  3. 发送选举的结果leader
  4. 判断程序是否发生异常,是则退出exit(99);不是则继续运行;
  5. 故障恢复状态改为SENTINEL_FAILOVER_STATE_SELECT_SLAVE
  6. 记录当前时间为状态改变时间;
  7. 发送选取从节点的事件(failover-state-select-slave)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
char *leader;
int isleader;

/* Check if we are the leader for the failover epoch. */
// 选举sentinel主节点
leader = sentinelGetLeader(ri, ri->failover_epoch);、
// 判断是不是自己是leader
isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
sdsfree(leader);

/* If I'm not the leader, and it is not a forced failover via
* SENTINEL FAILOVER, then I can't continue with the failover. */
// 当前节点不是leader,且节点标记不为SRI_FORCE_FAILOVER
if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
int election_timeout = SENTINEL_ELECTION_TIMEOUT;

/* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT
* and the configured failover timeout. */
// 选举超时时间上限
if (election_timeout > ri->failover_timeout)
election_timeout = ri->failover_timeout;
/* Abort the failover if I'm not the leader after some time. */
// 如果选举超时,则中断故障恢复
if (mstime() - ri->failover_start_time > election_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}
// 发送选举的结果
sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
// 异常,程序退出exit(99)
if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
sentinelSimFailureCrash();
// 故障恢复状态改为SENTINEL_FAILOVER_STATE_SELECT_SLAVE
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
ri->failover_state_change_time = mstime();
// 发送选取从节点的事件
sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
}

选择从节点 sentinelFailoverSelectSlave()

  1. 选出替代的slave节点;
  2. 如果替代节点为空则中断故障恢复(清除线下状态),结束;
  3. 不然,发送selected-slave事件;
  4. 设置从节点标志为SRI_PROMOTED;
  5. 记录ri->promoted_slave为选择的从节点;
  6. 故障恢复状态改为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
  7. 记录状态改变时间;
  8. 发送事件failover-state-send-slaveof-noone;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
sentinelRedisInstance *slave = sentinelSelectSlave(ri);

/* We don't handle the timeout in this state as the function aborts
* the failover or go forward in the next state. */
if (slave == NULL) {
sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
sentinelAbortFailover(ri);
} else {
sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
slave->flags |= SRI_PROMOTED;
ri->promoted_slave = slave;
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
slave, "%@");
}
}

从节点切换成主节点 sentinelFailoverSendSlaveOfNoOne()

  1. 判断选中的slave节点是否断开连接,如果断开判断故障中断状态距离当前的时间是否超过failover_timeout,如果超过则中断故障恢复过程,不然则结束该方法;
  2. sentinelSendSlaveOf(ri->promoted_slave,NULL,0) 从节点切换成主节点,且更新config配置断开客户端连接;
  3. 发送failover-state-wait-promotion事件;
  4. 故障恢复状态改为SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
  5. 记录状态改变时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
int retval;

/* We can't send the command to the promoted slave if it is now
* disconnected. Retry again and again with this state until the timeout
* is reached, then abort the failover. */
if (ri->promoted_slave->link->disconnected) {
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}

/* Send SLAVEOF NO ONE command to turn the slave into a master.
* We actually register a generic callback for this command as we don't
* really care about the reply. We check if it worked indirectly observing
* if INFO returns a different role (master instead of slave). */
retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
if (retval != C_OK) return;
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_slave,"%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
}

sentinelSendSlaveOf

故障恢复时向选中slave节点发送SLAVEOF NO ONE使从节点结束主从复制变为主节点。

SLAVEOF host port 命令,可以将当前服务器转变为指定服务器的从属服务器(slave server)。
SLAVEOF NO ONE 将使得这个从属服务器关闭复制功能,并从从属服务器转变回主服务器

SLAVEOF NO ONE: sentinelSendSlaveOf(ri->promoted_slave,NULL,0);

依次发送以下命令,如果任意一个命令出现异常都会return retval:

  1. MULTI命令 标记一个事务块的开始;
  2. SLAVEOF NO ONE 从节点转为主节点;
  3. CONFIG REWRITE 对启动 Redis 服务器时所指定的 redis.conf 文件进行改写;
  4. CLIENT KILL TYPE normal,其中类型是之一normal,master,slave和pubsub(的master类型可从V3.2)。这将关闭指定类中所有客户端的连接。请注意,被锁定到 MONITOR 命令中的客户端被认为属于normal该类。
  5. EXEC 执行所有事务块内的命令。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
char portstr[32];
int retval;

ll2string(portstr,sizeof(portstr),port);

/* If host is NULL we send SLAVEOF NO ONE that will turn the instance
* into a master. */
// 没有指定master就代表将该节点结束主从复制,变成master节点
if (host == NULL) {
host = "NO";
memcpy(portstr,"ONE",4);
}

/* In order to send SLAVEOF in a safe way, we send a transaction performing
* the following tasks:
* 1) Reconfigure the instance according to the specified host/port params.
* 2) Rewrite the configuration.
* 3) Disconnect all clients (but this one sending the commnad) in order
* to trigger the ask-master-on-reconnection protocol for connected
* clients.
*
* Note that we don't check the replies returned by commands, since we
* will observe instead the effects in the next INFO output. */
//MULTI命令 标记一个事务块的开始。
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"MULTI"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"SLAVEOF"),
host, portstr);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s REWRITE",
sentinelInstanceMapCommand(ri,"CONFIG"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

/* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12,
* however sending it to an instance not understanding this command is not
* an issue because CLIENT is variadic command, so Redis will not
* recognized as a syntax error, and the transaction will not fail (but
* only the unsupported command will fail). */
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s KILL TYPE normal",
sentinelInstanceMapCommand(ri,"CLIENT"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"EXEC"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

return C_OK;
}

等待从节点提升为主节点 sentinelFailoverWaitPromotion

校验状态改变时间是否距离当前已经超过failover_timeout,如果超过则中断故障恢复.

1
2
3
4
5
6
7
8
9
10
/* We actually wait for promotion indirectly checking with INFO when the
* slave turns into a master. */
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
/* Just handle the timeout. Switching to the next state is handled
* by the function parsing the INFO command of the promoted slave. */
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
}

其它从节点同步新主节点 sentinelFailoverReconfNextSlave

状态变更为SENTINEL_FAILOVER_STATE_RECONF_SLAVES的流程如下:

sentinelHandleRedisInstance
-> sentinelSendPeriodicCommands
-> redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, “%s”, sentinelInstanceMapCommand(ri,”INFO”))
-> sentinelInfoReplyCallback
-> sentinelRefreshInstanceInfo
-> ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;

开始时sentinel节点会向master节点发送INFO命令检测,然后会执行后续的逻辑;(异步)master响应后会调用回调函数sentinelInfoReplyCallback,在回调函数中处理故障恢复过程的状态变更。

以下解析回调函数最终调用的刷新信息(状态)的方法 sentinelRefreshInstanceInfo

  1. 解析redis节点返回的信息(字符串解析);
  2. 记录INFO信息更新的时间;
  3. 如果节点发送主从角色变更,则记录变更信息,且发送role-change事件;
  4. 如果sentinel处于tilt模式则return;
  5. 如果slave节点变成master了:
    1. 是否处于故障恢复slave升为主节点的状态(SENTINEL_FAILOVER_STATE_WAIT_PROMOTION);
      1. 更新config_epoch 和状态(SENTINEL_FAILOVER_STATE_RECONF_SLAVES)
      2. 发送promoted-slave事件;
      3. 如果是slave升级为master的过程中失败了,则执行sentinelSimFailureCrash()
      4. 发送failover-state-reconf-slaves事件;
      5. ReconfScript:让slave节点更新master的ConfScript;
      6. 修改last_pub_time时间,强制进行”Hello”(正常是周期向Hello订阅发送信息);
    2. 如果不是:
      1. 向该节点发送SlaveOf命令将变为slave;
  6. 如果从节点同步的主节点与当前记录的不一致,通知从节点同步信息;
  7. 节点是slave且flag是SRI_RECONF_SENT,则将flag改为SRI_RECONF_INPROG;
  8. 节点是slave、flag是SRI_RECONF_INPROG且slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP,则将flag改为SRI_RECONF_DONE并发送slave-reconf-done事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/* Process the INFO output from masters. */
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
sds *lines;
int numlines, j;
int role = 0;

/* cache full INFO output for instance */
sdsfree(ri->info);
ri->info = sdsnew(info);

/* The following fields must be reset to a given value in the case they
* are not found at all in the INFO output. */
ri->master_link_down_time = 0;

/* Process line by line. */
// 解析返回信息
lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
for (j = 0; j < numlines; j++) {
// ... 省略解析INFO返回的内容
}
// 记录INFO信息刷新的时间
ri->info_refresh = mstime();
sdsfreesplitres(lines,numlines);

/* ---------------------------- Acting half -----------------------------
* Some things will not happen if sentinel.tilt is true, but some will
* still be processed. */

/* Remember when the role changed. */
// 记录角色(主从)变更
if (role != ri->role_reported) {
ri->role_reported_time = mstime();
ri->role_reported = role;
if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();
/* Log the event with +role-change if the new role is coherent or
* with -role-change if there is a mismatch with the current config. */
sentinelEvent(LL_VERBOSE,
((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?
"+role-change" : "-role-change",
ri, "%@ new reported role is %s",
role == SRI_MASTER ? "master" : "slave",
ri->flags & SRI_MASTER ? "master" : "slave");
}

/* None of the following conditions are processed when in tilt mode, so
* return asap. */
// sentinel处于tilt模式则结束
if (sentinel.tilt) return;

/* Handle master -> slave role switch. */
// master变成slave了, 这里不处理
if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
/* Nothing to do, but masters claiming to be slaves are
* considered to be unreachable by Sentinel, so eventually
* a failover will be triggered. */
}

/* Handle slave -> master role switch. */
// slave节点变成master了
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
/* If this is a promoted slave we can change state to the
* failover state machine. */
// 是否处于slave升为主节点的状态
if ((ri->flags & SRI_PROMOTED) &&
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
{
/* Now that we are sure the slave was reconfigured as a master
* set the master configuration epoch to the epoch we won the
* election to perform this failover. This will force the other
* Sentinels to update their config (assuming there is not
* a newer one already available). */
// 更新config_epoch 和状态
ri->master->config_epoch = ri->master->failover_epoch;
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
ri->master->failover_state_change_time = mstime();
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
// 如果是slave升级为master的过程中失败了
if (sentinel.simfailure_flags &
SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
sentinelSimFailureCrash();
sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
ri->master,"%@");
sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
"start",ri->master->addr,ri->addr);
// 修改last_pub_time时间,强制进行"Hello"
sentinelForceHelloUpdateForMaster(ri->master);
} else {
/* A slave turned into a master. We want to force our view and
* reconfigure as slave. Wait some time after the change before
* going forward, to receive new configs if any. */
mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;
// 发送SlaveOf命令将变为slave
if (!(ri->flags & SRI_PROMOTED) &&
sentinelMasterLooksSane(ri->master) &&
sentinelRedisInstanceNoDownFor(ri,wait_time) &&
mstime() - ri->role_reported_time > wait_time)
{
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
if (retval == C_OK)
sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
}
}
}

/* Handle slaves replicating to a different master address. */
// 从节点同步的主节点变更,通知从节点同步信息
if ((ri->flags & SRI_SLAVE) &&
role == SRI_SLAVE &&
(ri->slave_master_port != ri->master->addr->port ||
strcasecmp(ri->slave_master_host,ri->master->addr->ip)))
{
mstime_t wait_time = ri->master->failover_timeout;

/* Make sure the master is sane before reconfiguring this instance
* into a slave. */
if (sentinelMasterLooksSane(ri->master) &&
sentinelRedisInstanceNoDownFor(ri,wait_time) &&
mstime() - ri->slave_conf_change_time > wait_time)
{
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
if (retval == C_OK)
sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@");
}
}

/* Detect if the slave that is in the process of being reconfigured
* changed state. */
//
if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
(ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
{
/* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
// 发送slave-reconf-inprog,执行slaveof + SYNC过程,如过slave收到“+slave-reconf-sent”之后将会执行slaveof操作。
if ((ri->flags & SRI_RECONF_SENT) &&
ri->slave_master_host &&
strcmp(ri->slave_master_host,
ri->master->promoted_slave->addr->ip) == 0 &&
ri->slave_master_port == ri->master->promoted_slave->addr->port)
{
ri->flags &= ~SRI_RECONF_SENT;
ri->flags |= SRI_RECONF_INPROG;
sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
}

/* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
// slave-reconf-done: slave同步完成,此后leader可以继续下一个slave的reconfig操作
if ((ri->flags & SRI_RECONF_INPROG) &&
ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
{
ri->flags &= ~SRI_RECONF_INPROG;
ri->flags |= SRI_RECONF_DONE;
sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
}
}
}

sentinelFailoverReconfNextSlave 从节点同步新的主节点

SENTINEL_FAILOVER_STATE_RECONF_SLAVES执行的sentinelFailoverReconfNextSlave

  1. 统计处于SRI_RECONF_SENT|SRI_RECONF_INPROG状态的slave个数in_progress;
  2. 遍历slave:
    1. 节点flag是SRI_PROMOTED|SRI_RECONF_DONE的, 则continue;
    2. 如果是slave节点的flag是SRI_RECONF_SENT且slave_reconf_sent_time超时,则发送-slave-reconf-sent-timeout事件并把flag改为SRI_RECONF_DONE;
    3. 如果是SRI_RECONF_SENT|SRI_RECONF_INPROG的, 则continue;
    4. 如果slave节点断开连接了,则continue;
    5. 给slave节点发送SlaveOf命令,使它开始作为slave同步新的主节点;
    6. 发送SlaveOf命令返回成功,则flag标志为SRI_RECONF_SENT;
    7. 记录slave_reconf_sent_time时间为当前时间;
    8. 发送slave-reconf-sent事件;
    9. 计数in_progress加一;
  3. 执行sentinelFailoverDetectEnd校验超时及其它;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/* Send SLAVE OF <new master address> to all the remaining slaves that
* still don't appear to have the configuration updated. */
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
int in_progress = 0;

di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);

if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
in_progress++;
}
dictReleaseIterator(di);

di = dictGetIterator(master->slaves);
while(in_progress < master->parallel_syncs &&
(de = dictNext(di)) != NULL)
{
sentinelRedisInstance *slave = dictGetVal(de);
int retval;

/* Skip the promoted slave, and already configured slaves. */
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;

/* If too much time elapsed without the slave moving forward to
* the next state, consider it reconfigured even if it is not.
* Sentinels will detect the slave as misconfigured and fix its
* configuration later. */
if ((slave->flags & SRI_RECONF_SENT) &&
(mstime() - slave->slave_reconf_sent_time) >
SENTINEL_SLAVE_RECONF_TIMEOUT)
{
sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
slave->flags &= ~SRI_RECONF_SENT;
slave->flags |= SRI_RECONF_DONE;
}

/* Nothing to do for instances that are disconnected or already
* in RECONF_SENT state. */
if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
if (slave->link->disconnected) continue;

/* Send SLAVEOF <new master>. */
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
if (retval == C_OK) {
slave->flags |= SRI_RECONF_SENT;
slave->slave_reconf_sent_time = mstime();
sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
in_progress++;
}
}
dictReleaseIterator(di);

/* Check if all the slaves are reconfigured and handle timeout. */
sentinelFailoverDetectEnd(master);
}

sentinelFailoverDetectEnd

  1. 如果被提升为主节点的节点还没有或者已经客观下线,则return;
  2. 遍历所有slaves统计flag不是SRI_PROMOTED|SRI_RECONF_DONE且没有客观下线的节点的数量not_reconfigured;
  3. 判断故障恢复是否超时(failover_timeout),是则not_reconfigured为0,记录超时状态,且发送failover-end-for-timeout事件;
  4. 如果not_reconfigured=0, 则发送failover-end事件,且状态改为SENTINEL_FAILOVER_STATE_UPDATE_CONFIG, 记录状态变更时间;
  5. 如果超时:
    1. 将flag不是SRI_RECONF_DONE|SRI_RECONF_SENT且存活的slave节点发送SlaveOf命令再次同步新master;
    2. 如果SlaveOf成功,发送slave-reconf-sent-be事件,flag标记为SRI_RECONF_SENT。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
int not_reconfigured = 0, timeout = 0;
dictIterator *di;
dictEntry *de;
mstime_t elapsed = mstime() - master->failover_state_change_time;

/* We can't consider failover finished if the promoted slave is
* not reachable. */
if (master->promoted_slave == NULL ||
master->promoted_slave->flags & SRI_S_DOWN) return;

/* The failover terminates once all the reachable slaves are properly
* configured. */
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);

if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
if (slave->flags & SRI_S_DOWN) continue;
not_reconfigured++;
}
dictReleaseIterator(di);

/* Force end of failover on timeout. */
if (elapsed > master->failover_timeout) {
not_reconfigured = 0;
timeout = 1;
sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
}

if (not_reconfigured == 0) {
sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
master->failover_state_change_time = mstime();
}

/* If I'm the leader it is a good idea to send a best effort SLAVEOF
* command to all the slaves still not reconfigured to replicate with
* the new master. */
if (timeout) {
dictIterator *di;
dictEntry *de;

di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;

if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
if (slave->link->disconnected) continue;

retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
if (retval == C_OK) {
sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
slave->flags |= SRI_RECONF_SENT;
}
}
dictReleaseIterator(di);
}
}

rator(di);
}
}
`