这篇文章主要讲解了“Nacos CP模式下Raft协议的服务注册和数据同步”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Nacos CP模式下Raft协议的服务注册和数据同步”吧!
Raft协议:
所有节点有三种状态Follower、Candidate、Leader
Leader选举:
所有节点都有一个随机的休眠时间
某节点最先休眠完成,会先给自己一票
之后将投票请求发给其它节点
(如果有节点同时苏醒并发起投票,则重新开始投票)
一开始都是Follower状态,某个节点发起投票前会是Candidate,将投票发给其它节点,如果超过半数节点返回同意,则发起投票节点状态置为Leader
数据同步:
所有的写操作都经过Leader
写操作到Leader时写入节点,此时状态是未提交
之后发送给其它节点,其它节点都返回确认后,Leader将状态置为提交,同时通知其它节点去写数据
Leader会定时向Follower发送心跳包,Follower发现需要更新数据则会主动向Leader拉取数据
Raft和ZAB区别:ZAB所有节点都可以发起投票,之后进行票数的比较,而Raft是休眠后最先苏醒的节点发起投票
Raft演示网站:http://thesecretlivesofdata.com/raft/
上篇博客《Nacos源码分析(注册发现、集群同步、心跳、Eureka对比)》写到添加实例的addInstance方法中调用consistencyService.put方法,这里的consistencyService是通过key中是否存在"ephemeral."匹配的,上篇梳理了AP模式,这里梳理一下CP模式服务注册的逻辑
consistencyService.put(key, instances);
@Override
public void put(String key, Record value) throws NacosException {
mapConsistencyService(key).put(key, value);
}
private ConsistencyService mapConsistencyService(String key) {
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl#put:
raftCore.signalPublish(key, value);
这里核心就是调用signalPublish方法:
public void signalPublish(String key, Record value) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
// 如果当前不是leader节点
if (!isLeader()) {
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
final RaftPeer leader = getLeader();
// 构造post请求将本次注册转发到leader节点
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}
OPERATE_LOCK.lock();
try {
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
ObjectNode json = JacksonUtils.createEmptyJsonNode();
json.replace("datum", JacksonUtils.transferToJsonNode(datum));
json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
// 更新注册实例数据到内存和磁盘文件上
onPublish(datum, peers.local());
final String content = json.toString();
// 构建CountDownLatch,值为实例数/2+1 即半数以上
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
// 遍历包括自己的所有节点
for (final String server : peers.allServersIncludeMyself()) {
// 是leader则-1并跳过
if (isLeader(server)) {
latch.countDown();
continue;
}
// 其它节点则异步调用/raft/datum/commit
final String url = buildUrl(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, result.getCode());
return;
}
// 调用成功,latch-1
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
}
@Override
public void onCancel() {
}
});
}
// 如果等待时间超过5s则抛异常(但是在之前本地已经修改了注册信息了,抛异常也没用)
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}更新注册实例数据的逻辑:
public void onPublish(Datum datum, RaftPeer source) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
RaftPeer local = peers.local();
if (datum.value == null) {
Loggers.RAFT.warn("received empty datum");
throw new IllegalStateException("received empty datum");
}
if (!peers.isLeader(source.ip)) {
Loggers.RAFT
.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(getLeader()));
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
}
if (source.term.get() < local.term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(local));
throw new IllegalStateException(
"out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
}
local.resetLeaderDue();
// if data should be persisted, usually this is true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
// 实例数据写入磁盘文件 NACOS_HOME/data/naming/data
raftStore.write(datum);
}
datums.put(datum.key, datum);
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
raftStore.updateTerm(local.term.get());
// 发布ValueChangeEvent事件
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier#onEvent接收到事件,执行notify方法
@Override
public void onEvent(ValueChangeEvent event) {
notify(event.getKey(), event.getAction(), find.apply(event.getKey()));
}最终调用listener.onChange(key, value);去刷新内存注册信息。
以上是CP模式服务注册逻辑,接下来分析服务选举
com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore#init:
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
final long start = System.currentTimeMillis();
// 从磁盘中加载配置信息
raftStore.loadDatums(notifier, datums);
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
initialized = true;
Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
// 定时线程池中发布主节点选举任务,500ms
masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
// 心跳任务,500ms
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
versionJudgement.registerObserver(isAllNewVersion -> {
stopWork = isAllNewVersion;
if (stopWork) {
try {
shutdown();
raftListener.removeOldRaftMetadata();
} catch (NacosException e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}
}, 100);
NotifyCenter.registerSubscriber(notifier);
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}主节点选举任务逻辑:
@Override
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) {
return;
}
// 选举前休眠,leaderDueMs大于0则直接返回
RaftPeer local = peers.local();
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.leaderDueMs > 0) {
return;
}
// reset timeout 重置选举时间和心跳时间
local.resetLeaderDue();
local.resetHeartbeatDue();
// 发送投票
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}发送投票逻辑:
private void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
local.term);
peers.reset();
local.term.incrementAndGet(); // 周期+1
local.voteFor = local.ip; // 投票给自己
local.state = RaftPeer.State.CANDIDATE; // 状态改为候选
Map<String, String> params = new HashMap<>(1);
params.put("vote", JacksonUtils.toJson(local));
// 遍历除自己外节点
for (final String server : peers.allServersWithoutMySelf()) {
// 异步调用/raft/vote
final String url = buildUrl(server, API_VOTE);
try {
HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
return;
}
RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);
Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
// 解析其它节点返回数据,决定leader节点
// 收到其它节点的同意加上自己的同意,超过半数则将自己状态置为leader
peers.decideLeader(peer);
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}接收投票请求的处理逻辑:
public synchronized RaftPeer receivedVote(RaftPeer remote) {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
if (!peers.contains(remote)) {
throw new IllegalStateException("can not find peer: " + remote.ip);
}
// 获得当前服务
RaftPeer local = peers.get(NetUtils.localServer());
// 如果收到的候选节点term小于等于当前服务的term
if (remote.term.get() <= local.term.get()) {
String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
Loggers.RAFT.info(msg);
// 将本地的voteFor置为自己的ip,即自己更适合做leader
if (StringUtils.isEmpty(local.voteFor)) {
local.voteFor = local.ip;
}
return local;
}
// 重置时间,将本地voteFor置为收到的节点ip 即本次投票通过
local.resetLeaderDue();
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
local.term.set(remote.term.get());
Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
return local;
}接下来分析心跳部分逻辑:
@Override
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}
local.resetHeartbeatDue();
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}开始和选举逻辑类似,休眠+重置时间,这里主要看sendBeat逻辑:
private void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
// 如果当前节点状态不是leader则不能发送心跳,直接return
if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
return;
}
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
}
local.resetLeaderDue();
// build data
ObjectNode packet = JacksonUtils.createEmptyJsonNode();
packet.replace("peer", JacksonUtils.transferToJsonNode(local));
ArrayNode array = JacksonUtils.createEmptyArrayNode();
if (switchDomain.isSendBeatOnly()) {
Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());
}
if (!switchDomain.isSendBeatOnly()) {
// 从内存中取出注册信息,将key和时间戳封装为element并放入array
for (Datum datum : datums.values()) {
ObjectNode element = JacksonUtils.createEmptyJsonNode();
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) {
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
element.put("timestamp", datum.timestamp.get());
array.add(element);
}
}
// 封装参数
packet.replace("datums", array);
// broadcast
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JacksonUtils.toJson(packet));
String content = JacksonUtils.toJson(params);
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes(StandardCharsets.UTF_8));
gzip.close();
byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
compressedContent.length());
}
// 遍历除自己外节点,异步调用/raft/beat发送心跳请求
for (final String server : peers.allServersWithoutMySelf()) {
try {
final String url = buildUrl(server, API_BEAT);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("send beat to server " + server);
}
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
return;
}
peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("receive beat response from: {}", url);
}
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,
throwable);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}
}服务端接收心跳请求逻辑:
com.alibaba.nacos.naming.controllers.RaftController#beat
@PostMapping("/beat")
public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
String value = URLDecoder.decode(entity, "UTF-8");
value = URLDecoder.decode(value, "UTF-8");
// 参数转换
JsonNode json = JacksonUtils.toObj(value);
// 处理心跳逻辑
RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));
return JacksonUtils.transferToJsonNode(peer);
}处理心跳逻辑:
// 如果收到的心跳不是来自leader则抛异常
if (remote.state != RaftPeer.State.LEADER) {
Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,
JacksonUtils.toJson(remote));
throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
}
// 如果收到心跳时当前节点不为FOLLOWER,则置为FOLLOWER(即投票中的话就没必要去再投票了)
if (local.state != RaftPeer.State.FOLLOWER) {
Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));
// mk follower
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}
// 构建receivedKeysMap,将本地内存的节点数据放入,value为0;即本地的数据
Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
for (Map.Entry<String, Datum> entry : datums.entrySet()) {
receivedKeysMap.put(entry.getKey(), 0);
}
// 遍历传入的数据包
for (Object object : beatDatums) {
processedCount = processedCount + 1;
JsonNode entry = (JsonNode) object;
String key = entry.get("key").asText();
final String datumKey;
if (KeyBuilder.matchServiceMetaKey(key)) {
datumKey = KeyBuilder.detailServiceMetaKey(key);
} else if (KeyBuilder.matchInstanceListKey(key)) {
datumKey = KeyBuilder.detailInstanceListkey(key);
} else {
// ignore corrupted key:
continue;
}
long timestamp = entry.get("timestamp").asLong();
// 取出key放入receivedKeysMap中,即1代表leader发过来的数据,下面会进行节点删除
receivedKeysMap.put(datumKey, 1);
try {
// 如果传入的key内存中已存在且版本号更大,则跳过
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
&& processedCount < beatDatums.size()) {
continue;
}
// 如果内存中不存在且版本号更校则添加入batch中
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey);
}
// 如果batch小于50且处理次数小于本次收到数据的数量则跳过(为了批量处理)
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue;
}
String keys = StringUtils.join(batch, ",");
if (batch.size() <= 0) {
continue;
}
// update datum entry
// 调用leader地址的/raft/datum方法,传入key拉取数据
String url = buildUrl(remote.ip, API_GET);
Map<String, String> queryParam = new HashMap<>(1);
queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
return;
}
List<JsonNode> datumList = JacksonUtils
.toObj(result.getData(), new TypeReference<List<JsonNode>>() {
});
// 遍历返回结果,通过raftStore.write写入本地文件
// 且调用 notifier.notify更新内存数据
for (JsonNode datumJson : datumList) {
Datum newDatum = null;
OPERATE_LOCK.lock();
try {
Datum oldDatum = getDatum(datumJson.get("key").asText());
if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp
.get()) {
continue;
}
if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.key = datumJson.get("key").asText();
serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
serviceDatum.value = JacksonUtils
.toObj(datumJson.get("value").toString(), Service.class);
newDatum = serviceDatum;
}
if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datumJson.get("key").asText();
instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
instancesDatum.value = JacksonUtils
.toObj(datumJson.get("value").toString(), Instances.class);
newDatum = instancesDatum;
}
if (newDatum == null || newDatum.value == null) {
Loggers.RAFT.error("receive null datum: {}", datumJson);
continue;
}
raftStore.write(newDatum);
datums.put(newDatum.key, newDatum);
notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);
local.resetLeaderDue();
if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}
raftStore.updateTerm(local.term.get());
Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);
} catch (Throwable e) {
Loggers.RAFT
.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,
e);
} finally {
OPERATE_LOCK.unlock();
}
}
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e);
}
return;
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable);
}
@Override
public void onCancel() {
}
});
batch.clear();
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
}
}
// 遍历receivedKeysMap取出值为0的数据放入deadKeys中
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}
// 将内存和文件中被删除的节点移除掉
for (String deadKey : deadKeys) {
try {
deleteDatum(deadKey);
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
}
}感谢各位的阅读,以上就是“Nacos CP模式下Raft协议的服务注册和数据同步”的内容了,经过本文的学习后,相信大家对Nacos CP模式下Raft协议的服务注册和数据同步这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。