Elasticsearch 源码 - Index Persistence
本系列的目标:
- Elasticsearch 索引是如何分片?
- Elasticsearch 索引持久化机制? **
- Elasticsearch 索引如何故障恢复?
- Elasticsearch 索引shard rebalancing?
源码:8.13
索引持久化机制
InternalEngine 核心类
主要成员变量
- translog 负责持久化记录所有写操作,以便在节点崩溃后进行恢复
- mergeScheduler 管理和调度Lucene索引的合并操作
- indexWriter Lucene的IndexWriter,用于执行索引写操作
- externalReaderManager / internalReaderManager 管理索引的外部和内部读操作,确保在读取数据时获得一致性视图
- flushLock/optimizeLock 用于控制flush(刷新)和optimize(优化)操作的锁,确保这些操作是线程安全的
- versionMap 管理实时的文档版本信息,确保在进行实时搜索时数据的一致性
- lastUnsafeSegmentGenerationForGets/preCommitSegmentGeneration 跟踪在执行commit操作时的segment生成信息,确保实时获取操作的安全性
- lastCommittedSegmentInfos 记录上次commit操作时的segment信息
- throttle 限流
- localCheckpointTracker 跟踪本地的检查点信息,用于确保数据的持久性和一致性
- flushListener 监听flush操作的事件
- translogSyncProcessor 处理translog同步
- inFlightDocCount 跟踪正在添加到IndexWriter中的文档数
- ......
InternalEngine类是Elasticsearch的核心部分,负责管理索引和搜索操作的底层实现。它通过各种锁、计数器、状态跟踪和策略组件,确保数据的可靠性、一致性和高效性
索引写入方法:index
ADD 操作
-
APPEND ONLY
- 减少磁盘写入带来的latency
- 实现简单,减少重试写冲突,减少了lock使用
- autoGeneratedIdTimestamp。时间戳在文档生成时设置,并且在事务日志中保留。这样可以确保在重试时能够检测到重复文档
- 在网络中断或其他问题导致文档重试时,Elasticsearch使用时间戳来建立一个先后关系(happens before relationship)。如果新文档的时间戳大于当前记录的最大时间戳(maxUnsafeAutoIdTimestamp),则执行添加操作,否则执行更新操作以确保一致性
public IndexResult index(Index index) throws IOException {
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
final boolean doThrottle = index.origin().isRecovery() == false;
try (var ignored1 = acquireEnsureOpenRef()) {
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
int reservedDocs = 0;
try (
Releasable ignored = versionMap.acquireLock(index.uid().bytes());
Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}
) {
lastWriteNanos = index.startTime();
......
final IndexingStrategy plan = indexingStrategyForOperation(index);
reservedDocs = plan.reservedDocs;
final IndexResult indexResult;
if (plan.earlyResultOnPreFlightError.isPresent()) {
assert index.origin() == Operation.Origin.PRIMARY : index.origin();
indexResult = plan.earlyResultOnPreFlightError.get();
assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
} else {
// generate or register sequence number
if (index.origin() == Operation.Origin.PRIMARY) {
index = new Index(
index.uid(),
index.parsedDoc(),
generateSeqNoForOperationOnPrimary(index),
index.primaryTerm(),
index.version(),
index.versionType(),
index.origin(),
index.startTime(),
index.getAutoGeneratedIdTimestamp(),
index.isRetry(),
index.getIfSeqNo(),
index.getIfPrimaryTerm()
);
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
if (toAppend == false) {
advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo());
}
} else {
markSeqNoAsSeen(index.seqNo());
}
assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(
plan.versionForIndexing,
index.primaryTerm(),
index.seqNo(),
plan.currentNotFoundOrDeleted,
index.id()
);
}
}
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
final NoOp noOp = new NoOp(
indexResult.getSeqNo(),
index.primaryTerm(),
index.origin(),
index.startTime(),
indexResult.getFailure().toString()
);
location = innerNoOp(noOp).getTranslogLocation();
} else {
location = null;
}
indexResult.setTranslogLocation(location);
}
if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
versionMap.maybePutIndexUnderLock(
index.uid().bytes(),
new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
);
}
localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
if (indexResult.getTranslogLocation() == null) {
// the op is coming from the translog (and is hence persisted already) or it does not have a sequence number
assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO;
localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
}
indexResult.setTook(relativeTimeInNanosSupplier.getAsLong() - index.startTime());
indexResult.freeze();
return indexResult;
} finally {
releaseInFlightDocs(reservedDocs);
}
} catch (RuntimeException | IOException e) {
try {
if (e instanceof AlreadyClosedException == false && treatDocumentFailureAsTragicError(index)) {
failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
} else {
maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
}
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
}
持久化过程
- 先写入 Lucene的Segment
- 再写入 Translog
持久性
- Lucene的Segment 异步保证,默认刷新时间1s
- translog 同步保证,recover不会丢失
- 集群中primary 与 replica 多副本存储,保证数据冗余
一致性
-
write-ack机制
- 当写操作完成时,Elasticsearch会等待Segment和translog都写入成功后才返回确认给客户端,详细代码见👆
-
Lucene的Segment 写入成功,translog失败,异常,由调用者进行重试,一致性有保障
-
Lucene的Segment 写入成功,写入translog成功,断电/故障导致segment内存丢失
-
当 Elasticsearch 节点启动/重启的时候, 它会从磁盘中使用最后一个提交点去恢复已知的段,并且会重放 translog 中所有在最后一次提交后发生的变更操作
-
详细见👇代码
- RecoveryTranslogOperationsRequest
- PeerRecoveryTargetService.performTranslogOps
- RecoveryTarget.indexTranslogOperations
-
-
lucene 异步flush机制,在多Node上难以保证同步,所以这时一致性保证较弱,最终一致性
- globalcheckpoint,详细见elasticsearch-replication-model
容错性
- 单shard,translog 同步存储,在节点重启或崩溃后,Elasticsearch会重放translog中的操作,以确保所有未被持久化的变更都能被恢复
- 集群,primary 与 replica 多副本冗余数据,保证一定程度的容错能力
传统RDS/No-SQL实现正好相反,先写commit log (WHL),用于故障恢复和一致性保证
why?
Elasticsearch如此处理是基于性能考虑
- 先写入Lucene的Segment时,lucene 优先写入内存,执行比较快(异步refresh segment)
- 后写translog需要同步I/O操作,落盘持久化,延时相对上一步较大。这样做还有一个好处是,写入lucene成功,表明数据合法,之后写入translog减少无效数据存储,同时也减少了recovery阶段的判断
- segment 批量flush 磁盘,减少I/O操作
而Elasticsearch在搜索方面是NRT,不保证实时search,符合产品设计初衷
RecoveryTranslogOperationsRequest(StreamInput in) throws IOException {
super(in);
operations = Translog.readOperations(in, "recovery");
totalTranslogOps = in.readVInt();
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
maxSeqNoOfUpdatesOrDeletesOnPrimary = in.readZLong();
retentionLeases = new RetentionLeases(in);
mappingVersionOnPrimary = in.readVLong();
}
//PeerRecoveryTargetService.java
private class TranslogOperationsRequestHandler extends RecoveryRequestHandler<RecoveryTranslogOperationsRequest> {
@Override
protected void handleRequest(
RecoveryTranslogOperationsRequest request,
RecoveryTarget recoveryTarget,
ActionListener<Void> listener
) {
performTranslogOps(request, listener, recoveryTarget);
}
private void performTranslogOps(
final RecoveryTranslogOperationsRequest request,
final ActionListener<Void> listener,
final RecoveryTarget recoveryTarget
) {
......
recoveryTarget.indexTranslogOperations(
request.operations(),
request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary(),
request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
request.retentionLeases(),
request.mappingVersionOnPrimary(),
ActionListener.wrap(checkpoint -> listener.onResponse(null), e -> {
// do not retry if the mapping on replica is at least as recent as the mapping
// that the primary used to index the operations in the request.
if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) {
retryOnMappingException.accept(e);
} else {
listener.onFailure(e);
}
})
);
......
}
}
public void indexTranslogOperations(
final List<Translog.Operation> operations,
final int totalTranslogOps,
final long maxSeenAutoIdTimestampOnPrimary,
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
final RetentionLeases retentionLeases,
final long mappingVersionOnPrimary,
final ActionListener<Long> listener
) {
ActionListener.completeWith(listener, () -> {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
/*
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
*/
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
/*
* Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on.
*/
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
/*
* We have to update the retention leases before we start applying translog operations to ensure we are retaining according to
* the policy.
*/
indexShard().updateRetentionLeasesOnReplica(retentionLeases);
for (Translog.Operation operation : operations) {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new MapperException("mapping updates are not allowed [" + operation + "]");
}
if (result.getFailure() != null) {
if (Assertions.ENABLED && result.getFailure() instanceof MapperException == false) {
throw new AssertionError("unexpected failure while replicating translog entry", result.getFailure());
}
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
}
}
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
translog.incrementRecoveredOperations(operations.size());
indexShard().sync();
// roll over / flush / trim if needed
indexShard().afterWriteOperation();
return indexShard().getLocalCheckpoint();
});
}
Update/Delete Operation
- Update 先 delete 再更新doc
indexIntoLucene => updateDocs => IndexWriter.softUpdateDocuments
public long softUpdateDocuments(
Term term, Iterable<? extends Iterable<? extends IndexableField>> docs, Field... softDeletes)
throws IOException {
if (term == null) {
throw new IllegalArgumentException("term must not be null");
}
if (softDeletes == null || softDeletes.length == 0) {
throw new IllegalArgumentException("at least one soft delete must be present");
}
return updateDocuments(
DocumentsWriterDeleteQueue.newNode(buildDocValuesUpdate(term, softDeletes)), docs);
}
- Version Map 作用
- 使用Version Map来跟踪每个文档的版本信息。当文档被并发更新或删除时,系统会检查Version Map以确保操作是基于最新的版本。这可以防止“写入丢失”的问题,即较旧版本的文档覆盖较新版本的文档
index 函数内代码
if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
versionMap.maybePutIndexUnderLock(
index.uid().bytes(),
new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
);
}
VersionValue 重要attrs
final long version;//counter
/** the seq number of the operation that last changed the associated uuid */
final long seqNo;
/** the term of the operation that last changed the associated uuid */
final long term;
- 在处理实时Read请求(例如,实时的GET请求)时,Version Map允许Elasticsearch快速查找到文档的最新版本,而不需要从磁盘上读取索引文件。这提高了实时查找的性能
getVersionFromMap
protected GetResult realtimeGetUnderLock(
Get get,
MappingLookup mappingLookup,
DocumentParser documentParser,
Function<Engine.Searcher, Engine.Searcher> searcherWrapper,
boolean getFromSearcher
) {
assert isDrainedForClose() == false;
assert get.realtime();
final VersionValue versionValue;
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
}
。。。。。。
}
- Version Map还用于管理“垃圾回收删除”(GC deletes),即那些在特定时间窗口后被删除的文档。在主节点上,删除操作会保留一个GC周期,以确保用户可以看到这些删除操作。在副本节点上,Version Map会根据本地检查点修剪过时的删除操作
public void writeIndexingBuffer() throws IOException {
// 获取versionMap中可回收的内存字节数
final long reclaimableVersionMapBytes = versionMap.reclaimableRefreshRamBytes();
// 获取IndexWriter当前使用的内存字节数,减去正在刷新的内存字节数
final long indexWriterBytesUsed = indexWriter.ramBytesUsed() - indexWriter.getFlushingBytes();
// 如果versionMap中可回收的内存大于等于IndexWriter的内存使用量
if (reclaimableVersionMapBytes >= indexWriterBytesUsed) {
// 刷新以回收versionMap的内存
reclaimVersionMapMemory();
} else {
// 否则,写入最大的待写入段
indexWriter.flushNextBuffer();
}
}
//LiveVersionMap.java
/**
* Returns how much RAM could be reclaimed from the version map.
* <p>
* In stateful, this is the RAM usage of the current version map, and could be reclaimed by refreshing. It doesn't include tombstones
* since they don't get cleared on refresh, nor the old version map that is being reclaimed.
* <p>
* In stateless, this is the RAM usage of current and old version map plus the RAM usage of the parts of the archive that require
* a new unpromotable refresh. To reclaim all three components we need to refresh AND flush.
*/
long reclaimableRefreshRamBytes() {
return archive == LiveVersionMapArchive.NOOP_ARCHIVE
? maps.current.ramBytesUsed.get()
: maps.ramBytesUsed() + archive.getReclaimableRamBytes();
}
Elasticsearch需要有效地管理内存,以确保在高负载下不会因为内存不足而导致性能下降或者崩溃。通过比较versionMap和IndexWriter的内存使用情况,可以决定是通过刷新versionMap来释放内存,还是将IndexWriter的缓冲区写入磁盘
Merge Operation
Merge是索引的一个重要操作,旨在将多个较小的段合并成一个较大的段,从而减少段的数量,提高查询效率,同时淘汰deleted状态的doc,减少文件数据冗余
- API触发
InternalEngine.java
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, final String forceMergeUUID)
throws EngineException, IOException {
if (onlyExpungeDeletes && maxNumSegments >= 0) {
throw new IllegalArgumentException("only_expunge_deletes and max_num_segments are mutually exclusive");
}
/*
* We do NOT acquire the readlock here since we are waiting on the merges to finish
* that's fine since the IW.rollback should stop all the threads and trigger an IOException
* causing us to fail the forceMerge
*/
optimizeLock.lock();
try {
ensureOpen();
store.incRef(); // increment the ref just to ensure nobody closes the store while we optimize
try {
if (onlyExpungeDeletes) {
indexWriter.forceMergeDeletes(true /* blocks and waits for merges*/);
} else if (maxNumSegments <= 0) {
indexWriter.maybeMerge();
} else {
indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/);
this.forceMergeUUID = forceMergeUUID;
}
if (flush) {
// TODO: Migrate to using async apic
flush(false, true);
// If any merges happened then we need to release the unmerged input segments so they can be deleted. A periodic refresh
// will do this eventually unless the user has disabled refreshes or isn't searching this shard frequently, in which
// case we should do something here to ensure a timely refresh occurs. However there's no real need to defer it nor to
// have any should-we-actually-refresh-here logic: we're already doing an expensive force-merge operation at the user's
// request and therefore don't expect any further writes so we may as well do the final refresh immediately and get it
// out of the way.
refresh("force-merge");
}
} finally {
store.decRef();
}
} catch (AlreadyClosedException ex) {
/* in this case we first check if the engine is still open. If so this exception is just fine
* and expected. We don't hold any locks while we block on forceMerge otherwise it would block
* closing the engine as well. If we are not closed we pass it on to failOnTragicEvent which ensures
* we are handling a tragic even exception here */
ensureOpen(ex);
failOnTragicEvent(ex);
throw ex;
} catch (Exception e) {
try {
maybeFailEngine("force merge", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
} finally {
optimizeLock.unlock();
}
}
- 周期触发
ElasticsearchConcurrentMergeScheduler 对lucene ConcurrentMergeScheduler的扩展
//ElasticsearchConcurrentMergeScheduler.java
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
int totalNumDocs = merge.totalNumDocs();
long totalSizeInBytes = merge.totalBytesSize();
long timeNS = System.nanoTime(); // 获取当前时间,以纳秒为单位
currentMerges.inc();
currentMergesNumDocs.inc(totalNumDocs);
currentMergesSizeInBytes.inc(totalSizeInBytes);
OnGoingMerge onGoingMerge = new OnGoingMerge(merge);
onGoingMerges.add(onGoingMerge);
if (logger.isTraceEnabled()) {
logger.trace(
"merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size",
getSegmentName(merge),
merge.segments.size(),
totalNumDocs,
ByteSizeValue.ofBytes(totalSizeInBytes),
ByteSizeValue.ofBytes(merge.estimatedMergeBytes)
);
}
try {
beforeMerge(onGoingMerge); // 在合并之前进行一些处理
super.doMerge(mergeSource, merge); // 调用父类的合并方法
} finally {
......
}
详细代码均在lucene里面,这里不再展开
Others
Translog 过大如何处理?
- IndexShard.shouldRollTranslogGeneration() 根据indexSettings配置判断是否rollGeneration
- Translog.rollGeneration() 生成一个新的log,ps. flush操作也会触发rollGeneration()
- translog.trimUnreferencedReaders() 清理不再饮用的translog
- afterWriteOperation方法会在replica/recover/flush阶段被触发
//IndexShard.java
/**
* Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be
* executed asynchronously on the flush thread pool.
*/
public void afterWriteOperation() {
....
if (shouldRollTranslogGeneration()) {
logger.debug("submitting async roll translog generation request");
final AbstractRunnable roll = new AbstractRunnable() {
@Override
public void onFailure(final Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to roll translog generation", e);
}
}
@Override
protected void doRun() {
rollTranslogGeneration();
}
@Override
public void onAfter() {
flushOrRollRunning.compareAndSet(true, false);
afterWriteOperation();
}
.....
}
//InternalEngine.java
/**
* Tests whether or not the translog generation should be rolled to a new generation. This test
* is based on the size of the current generation compared to the configured generation
* threshold size.
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
public boolean shouldRollGeneration() {
final long threshold = this.indexSettings.getGenerationThresholdSize().getBytes();
try (ReleasableLock ignored = readLock.acquire()) {
return this.current.sizeInBytes() > threshold;
}
}
public void rollTranslogGeneration() throws EngineException {
try (var ignored = acquireEnsureOpenRef()) {
translog.rollGeneration();
translog.trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
try {
failEngine("translog trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to roll translog", e);
}
}
//Translog.java
/**
* Roll the current translog generation into a new generation if it's not empty. This does not commit the translog.
*
* @throws IOException if an I/O exception occurred during any file operations
*/
public void rollGeneration() throws IOException {
syncBeforeRollGeneration();
if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) {
return;
}
try (Releasable ignored = writeLock.acquire()) {
ensureOpen();
try {
final TranslogReader reader = current.closeIntoReader();
readers.add(reader);
assert Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)).generation == current.getGeneration();
copyCheckpointTo(location.resolve(getCommitCheckpointFileName(current.getGeneration())));
// create a new translog file; this will sync it and update the checkpoint data;
current = createWriter(current.getGeneration() + 1);
logger.trace("current translog set to [{}]", current.getGeneration());
} catch (final Exception e) {
tragedy.setTragicException(e);
closeOnTragicEvent(e);
throw e;
}
}
}
总结
以上梳理了InternalEngine写入部分的主要逻辑
向ES学习如何包裹第三方框架,如lucene,如何添加可靠性保证translog,如何做commit checkpoint 管理等
顺便也了解下持久化 - I/O 耗时操作在哪里
参考
源码8.13
ChatGPT