Elasticsearch 源码 - Index Recovery
本系列的目标:
- Elasticsearch 索引是如何分片?
- Elasticsearch 索引持久化机制?
- Elasticsearch 索引如何故障恢复?**
- Elasticsearch 索引shard rebalancing?
源码:8.13
Index Recovery
触发时机
- Recreates a shard lost during node failure
- Relocates a shard to another node due to a cluster rebalance or changes to the shard allocation settings
- 手动api触发
Local recovery
IndexShard.recoverLocallyUpToGlobalCheckpoint => doLocalRecovery
-
检查安全提交点,根据globalCheckpoint
-
safeCommit = store.findSafeIndexCommit(globalCheckpoint) 比较maxSeqNo
public Optional<SequenceNumbers.CommitInfo> findSafeIndexCommit(long globalCheckpoint) throws IOException { final List<IndexCommit> commits = DirectoryReader.listCommits(directory); assert commits.isEmpty() == false : "no commit found"; final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint); final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet()); // all operations of the safe commit must be at most the global checkpoint. if (commitInfo.maxSeqNo <= globalCheckpoint) { return Optional.of(commitInfo); } else { return Optional.empty(); } }
-
启用currentEngineReference,启动临时引擎以便在给定检查点上恢复本地translog
synchronized (engineMutex) { assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = createEngine(config); onNewEngine(newEngine); currentEngineReference.set(newEngine); // We set active because we are now writing operations to the engine; this way, // we can flush if we go idle after some time and become inactive. active.set(true); }
-
使用runTranslogRecovery方法重放translog中的操作,并更新恢复状态
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { recoveryState.getTranslog().totalLocal(snapshot.totalOperations()); final int recoveredOps = runTranslogRecovery( engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, recoveryState.getTranslog()::incrementRecoveredOperations ); recoveryState.getTranslog().totalLocal(recoveredOps); // adjust the total local to reflect the actual count return recoveredOps; }; innerOpenEngineAndTranslog(() -> globalCheckpoint); getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint, recoveryCompleteListener.map(v -> { logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint)); return v; }));
-
InternalEngine.recoverFromTranslogInternal 读取translog.snapshot,执行👆的translogRecoveryRunner进行回放
private void recoverFromTranslogInternal( TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo, ActionListener<Void> listener ) { ActionListener.run(listener, l -> { final int opsRecovered; final long localCheckpoint = getProcessedLocalCheckpoint(); if (localCheckpoint < recoverUpToSeqNo) { try (Translog.Snapshot snapshot = newTranslogSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) { opsRecovered = translogRecoveryRunner.run(this, snapshot); } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); } } else { opsRecovered = 0; } // flush if we recovered something or if we have references to older translogs // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length. assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; pendingTranslogRecovery.set(false); // we are good - now we can commit logger.trace( () -> format( "flushing post recovery from translog: ops recovered [%s], current translog generation [%s]", opsRecovered, translog.currentFileGeneration() ) ); // flush might do something async and complete the listener on a different thread, from which we must fork back to a generic // thread to continue with recovery, but if it doesn't do anything async then there's no need to fork, hence why we use a // SubscribableListener here final var flushListener = new SubscribableListener<FlushResult>(); flush(false, true, flushListener); flushListener.addListener(l.delegateFailureAndWrap((ll, r) -> { translog.trimUnreferencedReaders(); ll.onResponse(null); }), engineConfig.getThreadPool().generic(), null); }); }
-
使用translog.snapshot 避免对原文件的影响
local translog 模式的recovery仅能解决Node本地问题,并不能完全解决集群故障问题
例如一个replica shard, 还需要进行peer recovery 👇
Peer recovery
当 Elasticsearch 发生以下情况时,peer恢复会自动进行:
- 在节点故障期间重新创建丢失的分片
- 由于集群重新平衡或分片分配设置的更改,将分片迁移到另一个节点
IndexShard.startRecovery 执行对一个特定分片的恢复流程,根据此分片的恢复类型执行相应的恢复过程
public void startRecovery(
RecoveryState recoveryState,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
BiConsumer<MappingMetadata, ActionListener<Void>> mappingUpdateConsumer,
IndicesService indicesService,
long clusterStateVersion
) {
// TODO: Create a proper object to encapsulate the recovery context
// all of the current methods here follow a pattern of:
// resolve context which isn't really dependent on the local shards and then async
// call some external method with this pointer.
// with a proper recovery context object we can simply change this to:
// startRecovery(RecoveryState recoveryState, ShardRecoverySource source ) {
// markAsRecovery("from " + source.getShortDescription(), recoveryState);
// threadPool.generic().execute() {
// onFailure () { listener.failure() };
// doRun() {
// if (source.recover(this)) {
// recoveryListener.onRecoveryDone(recoveryState);
// }
// }
// }}
// }
assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE, EXISTING_STORE -> executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
case PEER -> {
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), clusterStateVersion, recoveryListener);
} catch (Exception e) {
failShard("corrupted preexisting index", e);
recoveryListener.onRecoveryFailure(new RecoveryFailedException(recoveryState, null, e), true);
}
}
case SNAPSHOT -> {
final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository();
executeRecovery(
"from snapshot",
recoveryState,
recoveryListener,
l -> restoreFromRepository(repositoriesService.repository(repo), l)
);
}
case LOCAL_SHARDS -> {
final IndexMetadata indexMetadata = indexSettings().getIndexMetadata();
final Index resizeSourceIndex = indexMetadata.getResizeSourceIndex();
final List<IndexShard> startedShards = new ArrayList<>();
final IndexService sourceIndexService = indicesService.indexService(resizeSourceIndex);
final Set<ShardId> requiredShards;
final int numShards;
if (sourceIndexService != null) {
requiredShards = IndexMetadata.selectRecoverFromShards(
shardId().id(),
sourceIndexService.getMetadata(),
indexMetadata.getNumberOfShards()
);
for (IndexShard shard : sourceIndexService) {
if (shard.state() == IndexShardState.STARTED && requiredShards.contains(shard.shardId())) {
startedShards.add(shard);
}
}
numShards = requiredShards.size();
} else {
numShards = -1;
requiredShards = Collections.emptySet();
}
if (numShards == startedShards.size()) {
assert requiredShards.isEmpty() == false;
executeRecovery(
"from local shards",
recoveryState,
recoveryListener,
l -> recoverFromLocalShards(
mappingUpdateConsumer,
startedShards.stream().filter((s) -> requiredShards.contains(s.shardId())).toList(),
l
)
);
} else {
final RuntimeException e;
if (numShards == -1) {
e = new IndexNotFoundException(resizeSourceIndex);
} else {
e = new IllegalStateException(
"not all required shards of index "
+ resizeSourceIndex
+ " are started yet, expected "
+ numShards
+ " found "
+ startedShards.size()
+ " can't recover shard "
+ shardId()
);
}
throw e;
}
}
default -> throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());
}
}
- EMPTY_STORE, EXISTING_STORE 略
- PEER replica 向primary 请求recovery
- SNAPSHOT 通过snapshot 进行restore
- LOCAL_SHARDS replica与primary在同一个Node
流程的控制通过状态机控制
/**
* Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set.
*/
public IndexShardState markAsRecovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
}
if (state == IndexShardState.STARTED) {
throw new IndexShardStartedException(shardId);
}
if (state == IndexShardState.RECOVERING) {
throw new IndexShardRecoveringException(shardId);
}
if (state == IndexShardState.POST_RECOVERY) {
throw new IndexShardRecoveringException(shardId);
}
this.recoveryState = recoveryState;
return changeState(IndexShardState.RECOVERING, reason);
}
}
重点介绍 PEER 类型
startRecovery 过程
-
tryAcquireSnapshotDownloadPermits 并发控制
-
onGoingRecoveries.startRecovery 创建一个新的RecoveryTarget
public class RecoveryTarget extends AbstractRefCounted implements RecoveryTargetHandler { private final Logger logger; private static final AtomicLong idGenerator = new AtomicLong(); private static final String RECOVERY_PREFIX = "recovery."; private final ShardId shardId;//表示正在恢复的分片的 ID private final long recoveryId;//当前恢复过程的唯一标识符,由 idGenerator 生成 private final IndexShard indexShard; //目标分片的索引分片实例 private final DiscoveryNode sourceNode;//数据源节点(主分片所在的节点) private final long clusterStateVersion;//集群状态版本,用于协调分片恢复的一致性 private final SnapshotFilesProvider snapshotFilesProvider;//用于提供快照文件的提供者 private volatile MultiFileWriter multiFileWriter;//通过 MultiFileWriter 实例管理多个文件的写入,确保数据从主分片正确复制到目标分片 private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker();//恢复请求的追踪器,管理恢复过程中各请求的状 private final Store store;//分片的数据存储 private final PeerRecoveryTargetService.RecoveryListener listener;//通过 RecoveryRequestTracker👆 和 RecoveryListener 跟踪和监听恢复过程的状态变化,确保恢复过程按预期进行 private final AtomicBoolean finished = new AtomicBoolean(); private final CancellableThreads cancellableThreads; // last time this status was accessed private volatile long lastAccessTime = System.nanoTime(); private final AtomicInteger recoveryMonitorBlocks = new AtomicInteger(); @Nullable // if we're not downloading files from snapshots in this recovery or we're retrying private volatile Releasable snapshotFileDownloadsPermit;//用于控制快照文件下载的许可 // latch that can be used to blockingly wait for RecoveryTarget to be closed private final CountDownLatch closedLatch = new CountDownLatch(1);//用于等待 RecoveryTarget 关闭的闭锁 ...... }
-
threadPool.generic().execute(new RecoveryRunner(recoveryId)) 执行recovery
//PeerRecoveryTargetService.java
public void startRecovery(
final IndexShard indexShard,
final DiscoveryNode sourceNode,
final long clusterStateVersion,
final RecoveryListener listener
) {
final Releasable snapshotFileDownloadsPermit = tryAcquireSnapshotDownloadPermits();
// create a new recovery status, and process...
final long recoveryId = onGoingRecoveries.startRecovery(
indexShard,
sourceNode,
clusterStateVersion,
snapshotFilesProvider,
listener,
recoverySettings.activityTimeout(),
snapshotFileDownloadsPermit
);
// we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause
// assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool.
threadPool.generic().execute(new RecoveryRunner(recoveryId));
}
恢复可以划分阶段
-
indexShard::preRecovery 准备阶段, loadSnapshot
private static void ensureSnapshotIsLoaded(IndexShard indexShard) { final var store = indexShard.store(); final SearchableSnapshotDirectory directory = unwrapDirectory(store.directory()); assert directory != null; final ListenableFuture<Void> preWarmListener = new ListenableFuture<>(); final boolean success = directory.loadSnapshot(indexShard.recoveryState(), store::isClosing, preWarmListener); final ShardRouting shardRouting = indexShard.routingEntry(); if (success && shardRouting.isRelocationTarget()) { final Runnable preWarmCondition = indexShard.addCleanFilesDependency(); preWarmListener.addListener(ActionListener.wrap(v -> preWarmCondition.run(), e -> { logger.warn( () -> format( "pre-warm operation failed for [%s] while it was the target of primary relocation [%s]", shardRouting.shardId(), shardRouting ), e ); preWarmCondition.run(); })); } assert directory.listAll().length > 0 : "expecting directory listing to be non-empty"; assert success || indexShard.routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "loading snapshot must not be called twice unless we are retrying a peer recovery"; }
-
prepareForIndexRecovery 为recovery做最后的准备,设置状态
-
recoverFromStore/recoverLocallyUpToGlobalCheckpoint(无法晋升为primary分片的恢复逻辑openEngineAndSkipTranslogRecovery 这里不展开),recoverLocallyUpToGlobalCheckpoint逻辑同👆的local recovery,recoverFromStore再👇详细展开
-
markRecoveryAsDone 清理recoveryId
-
send Response 返回recovery结果
private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) {
final RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId);
if (recoveryRef == null) {
logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
return;
}
final RecoveryTarget recoveryTarget = recoveryRef.target();
assert recoveryTarget.sourceNode() != null : "cannot do a recovery without a source node";
final RecoveryState recoveryState = recoveryTarget.state();
final RecoveryState.Timer timer = recoveryState.getTimer();
final IndexShard indexShard = recoveryTarget.indexShard();
final Releasable onCompletion = Releasables.wrap(recoveryTarget.disableRecoveryMonitor(), recoveryRef);
// async version of the catch/finally structure we need, but this does nothing with successes so needs further modification below
final var cleanupOnly = ActionListener.notifyOnce(ActionListener.runBefore(ActionListener.noop().delegateResponse((l, e) -> {
// this will be logged as warning later on...
logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
onGoingRecoveries.failRecovery(
recoveryId,
new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e),
true
);
}), onCompletion::close));
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
assert preExistingRequest == null;
assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false;
ActionListener.run(cleanupOnly.map(v -> {
logger.trace("{} preparing unpromotable shard for recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
// Skip unnecessary intermediate stages
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
indexShard.openEngineAndSkipTranslogRecovery();
recoveryState.getIndex().setFileDetailsComplete();
recoveryState.setStage(RecoveryState.Stage.FINALIZE);
onGoingRecoveries.markRecoveryAsDone(recoveryId);
return null;
}), indexShard::preRecovery);
return;
}
if (indexShard.routingEntry().isSearchable() == false && recoveryState.getPrimary()) {
assert preExistingRequest == null;
assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false;
try (onCompletion) {
client.execute(
StatelessPrimaryRelocationAction.TYPE,
new StatelessPrimaryRelocationAction.Request(
recoveryId,
indexShard.shardId(),
transportService.getLocalNode(),
indexShard.routingEntry().allocationId().getId(),
recoveryTarget.clusterStateVersion()
),
new ActionListener<>() {
@Override
public void onResponse(ActionResponse.Empty ignored) {
onGoingRecoveries.markRecoveryAsDone(recoveryId);
}
@Override
public void onFailure(Exception e) {
final var cause = ExceptionsHelper.unwrapCause(e);
final var sendShardFailure =
// these indicate the source shard has already failed, which will independently notify the master and fail
// the target shard
false == (cause instanceof ShardNotFoundException
|| cause instanceof IndexNotFoundException
|| cause instanceof AlreadyClosedException);
// TODO retries? See RecoveryResponseHandler#handleException
onGoingRecoveries.failRecovery(
recoveryId,
new RecoveryFailedException(recoveryState, null, e),
sendShardFailure
);
}
}
);
return;
}
}
record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {}
final ActionListener<StartRecoveryRequestToSend> toSendListener = cleanupOnly.map(r -> {
logger.trace(
"{} [{}]: recovery from {}",
r.startRecoveryRequest().shardId(),
r.actionName(),
r.startRecoveryRequest().sourceNode()
);
transportService.sendRequest(
r.startRecoveryRequest().sourceNode(),
r.actionName(),
r.requestToSend(),
new RecoveryResponseHandler(r.startRecoveryRequest(), timer)
);
return null;
});
if (preExistingRequest == null) {
SubscribableListener
// run pre-recovery activities
.newForked(indexShard::preRecovery)
// recover the shard as far as possible based on data held locally
.<Long>andThen((l, v) -> {
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
if (indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot()) {
// for searchable snapshots, peer recovery is treated similarly to recovery from snapshot
indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard);
final Store store = indexShard.store();
store.incRef();
try {
StoreRecovery.bootstrap(indexShard, store);
} finally {
store.decRef();
}
}
indexShard.recoverLocallyUpToGlobalCheckpoint(ActionListener.assertOnce(l));
})
// now construct the start-recovery request
.andThenApply(startingSeqNo -> {
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
final var startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
return new StartRecoveryRequestToSend(startRequest, PeerRecoverySourceService.Actions.START_RECOVERY, startRequest);
})
// finally send the start-recovery request
.addListener(toSendListener);
} else {
toSendListener.onResponse(
new StartRecoveryRequestToSend(
preExistingRequest,
PeerRecoverySourceService.Actions.REESTABLISH_RECOVERY,
new ReestablishRecoveryRequest(recoveryId, preExistingRequest.shardId(), preExistingRequest.targetAllocationId())
)
);
}
}
recovery的数据来源于哪里?RecoverySourceHandler.recoverToTarget
假设由你来实现,会如何做?
- index & translog flush到磁盘
- 生成本地copy - snapshot - 耗时操作
- 依照request的offset将snapshot发送到remote节点 - 耗时操作,应减少sendfile的bytes
- append Updates 处理snapshot之后的增量
看看Elasticsearch是如何做的?
-
判断是否SeqNoBased恢复 - primary local checkpoint <= request.startingNo
- Local checkpoint primary commit&持久化的checkpoint
- 此时request请求者的本地offset 大于primary local checkpoint
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& isTargetSameHistory()
&& shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())
&& ((retentionLease == null && shard.useRetentionLeasesInPeerRecovery() == false)
|| (retentionLease != null && retentionLease.retainingSequenceNumber() <= request.startingSeqNo()));
// InternalEngine.java
public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
return getMinRetainedSeqNo() <= startingSeqNo;
}
/*
* This policy retains operations for two purposes: peer-recovery and querying changes history.
* - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit,
* then sends operations after the local checkpoint of that commit. This requires keeping all ops after
* localCheckpointOfSafeCommit.
* - Changes APIs are driven by a combination of the global checkpoint, retention operations, and retention leases. Here we
* prefer using the global checkpoint instead of the maximum sequence number because only operations up to the global
* checkpoint are exposed in the changes APIs.
*/
public final long getMinRetainedSeqNo() {
return softDeletesPolicy.getMinRetainedSeqNo();
}
-
如果是 SeqNoBased恢复 ,无须执行phase1;否则 执行snapshot & phase1
-
执行snaphot,详细不展开,index 本地的safecommit snapshots
final Engine.IndexCommitRef safeCommitRef; try { safeCommitRef = acquireSafeCommit(shard); resources.add(safeCommitRef); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } /** * Snapshots the most recent safe index commit from the currently running engine. * All index files referenced by this index commit won't be freed until the commit/snapshot is closed. */ public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException { final IndexShardState state = this.state; // one time volatile read // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine if (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) { return getEngine().acquireSafeIndexCommit(); } else { throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed"); } }
-
Phase1 处理Index文件Sync
-
计算offset:estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo) 从startingSeqNo开始计算所有的操作
-
如果两个分片(source/target) 拥有相同的syncId,则无需sendfile
-
否则发送offset内容
- 计算两个分片的diff,生成 ShardRecoveryPlan
- 根据ShardRecoveryPlan 发送内容
-
/**
* Perform phase1 of the recovery operations. Once this {@link IndexCommit}
* snapshot has been performed no commit operations (files being fsync'd)
* are effectively allowed on this index until all recovery phases are done
* <p>
* Phase1 examines the segment files on the target node and copies over the
* segments that are missing. Only segments that have the same size and
* checksum can be reused
*/
void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
}
-
prepareTarget For Translog
-
准备好负责处理recovery的engine(targetNode)
-
基于startingSeqNo,生成phase2 translog snapshot
- LuceneChangesSnapshot 创建一个新的来自 Lucene 的 "translog" 快照
void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> listener) { StopWatch stopWatch = new StopWatch().start(); final ActionListener<Void> wrappedListener = ActionListener.wrap(nullVal -> { stopWatch.stop(); final TimeValue tookTime = stopWatch.totalTime(); logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime); listener.onResponse(tookTime); }, e -> listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e))); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. logger.trace("recovery [phase1]: prepare remote engine for translog"); cancellableThreads.checkForCancel(); recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener); } // 生成phase2 snapshot final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot( "peer-recovery", startingSeqNo, Long.MAX_VALUE, false, false, true );
-
-
Phase2
- 使用当前 translog 的快照(translog 快照是 translog 的时间点视图)。然后,它将每个 translog 操作发送到目标节点,以便这些操作可以在新的分片中重放
final OperationBatchSender sender = new OperationBatchSender( startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersion, sendListener ); sendListener.addListener(listener.delegateFailureAndWrap((delegate, ignored) -> { final long skippedOps = sender.skippedOps.get(); final int totalSentOps = sender.sentOps.get(); final long targetLocalCheckpoint = sender.targetLocalCheckpoint.get(); assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps + totalSentOps : String.format( Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps, totalSentOps ); stopWatch.stop(); final TimeValue tookTime = stopWatch.totalTime(); logger.trace("recovery [phase2]: took [{}]", tookTime); delegate.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps, tookTime)); })); sender.start();
-
finalize Recovery target 节点trim所有序列号 >= startingSeqNo 的操作,因为我们已经在第二阶段发送了所有这些操作
没有见到如何处理append updates的过程,ES如何处理?
- phase1,执行了IndexCommit 快照,在所有恢复阶段完成之前,该索引上实际上不允许进行任何提交操作(文件被 fsync),此期间block write操作
- phase2,target 分片是可以同步处理新的write请求
这里面就引出了另一个问题,recovery的数据是旧的,重放操作数据不一致如何避免?
lucene index放入了version
根据版本决定新旧,解决了👆的问题
参考
源码 8.13
ChatGPT