Elasticsearch 源码 - Index Shard Allocation
本系列的目标:
- Elasticsearch 索引是如何分片? **
- Elasticsearch 索引持久化机制?
- Elasticsearch 索引如何故障恢复?
- Elasticsearch 索引shard rebalancing?
源码:8.13
Index Shard Allocation
触发场景
-
index的创建和删除
-
NodeJoin/NodeLeft
-
执行reroute命令
-
shard failed
-
cluster start
- reroute
-
update replicas
本文以index的创建为例
Index-level shard allocation
Example:
curl -X PUT "localhost:9200/my-index-000001?pretty"
入口:
TransportAction.proceed =>
TransportMasterNodeAction.doExecute =>
TransportMasterNodeAction$AsyncSingleAction.doStart =>
TransportMasterNodeAction.executeMasterOperation =>
TransportCreateIndexAction.masterOperation =>
MetadataCreateIndexService.createIndex =>
MetadataCreateIndexService.applyCreateIndexRequest =>
核心代码如下:
-
getIndexNumberOfRoutingShards 获取shards分配配置
-
生成 RoutingTable
- 索引级别的路由信息:每个索引都有其自己的RoutingTable,其中包含所有分片的路由信息
- 分片级别的路由信息:每个分片都有一个分片路由对象,记录该分片的主副本和副本的状态(初始化、恢复中、已分配等)和位置
- RoutingTable决定每个索引的主分片和副本分片的分配位置
- 当一个查询或索引请求到达时,RoutingTable帮助定位相关分片,从而将请求路由到适当的节点进行处理
- 在节点失效或新增节点时,RoutingTable用于重新分配分片,确保数据的高可用性和负载均衡
-
allocationService.reroute 分配索引的shard,代码较多,核心功能
-
计算分配:reroute方法会根据当前集群状态和分配策略,计算新的分片分配计划。考虑的因素包括:
- 当前分片状态(已分配、未分配、初始化中等)。
- 节点的可用性和健康状态。
- 分配策略和约束(例如,分片和副本不能分配到同一节点)。
- 节点资源使用情况(CPU、内存等)
-
执行分配:根据计算出的分配计划,执行实际的分片重新分配操作
- 将未分配的分片分配到新的节点
- 从失败的节点上迁移分片到健康的节点
- 调整分片以实现更好的负载均衡
-
更新集群状态:分片重新分配后,更新集群状态信息,确保所有节点都能同步最新的分片分配情况
-
//applyCreateIndexRequest 片段
......
int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null);
IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards);
return applyCreateIndexWithTemporaryService(
currentState,
request,
silent,
null,
tmpImd,
mappings,
indexService -> resolveAndValidateAliases(
request.index(),
// data stream aliases are created separately in MetadataCreateDataStreamService::createDataStream
isDataStream ? Set.of() : request.aliases(),
isDataStream ? List.of() : MetadataIndexTemplateService.resolveAliases(currentState.metadata(), templateName),
currentState.metadata(),
xContentRegistry,
// the context is used ony for validation so it's fine to pass fake values for the shard id and the current timestamp
indexService.newSearchExecutionContext(0, 0, null, () -> 0L, null, emptyMap()),
IndexService.dateMathExpressionResolverAt(request.getNameResolvedAt()),
systemIndices::isSystemName
),
Collections.singletonList(templateName),
metadataTransformer,
rerouteListener
);
/**
* Given the state and a request as well as the metadata necessary to build a new index,
* validate the configuration with an actual index service as return a new cluster state with
* the index added (and rerouted)
* @param currentState the current state to base the new state off of
* @param request the create index request
* @param silent a boolean for whether logging should be at a lower or higher level
* @param sourceMetadata when recovering from an existing index, metadata that should be copied to the new index
* @param temporaryIndexMeta metadata for the new index built from templates, source metadata, and request settings
* @param mappings a list of all mapping definitions to apply, in order
* @param aliasSupplier a function that takes the real {@link IndexService} and returns a list of {@link AliasMetadata} aliases
* @param templatesApplied a list of the names of the templates applied, for logging
* @param metadataTransformer if provided, a function that may alter cluster metadata in the same cluster state update that
* creates the index
* @return a new cluster state with the index added
*/
private ClusterState applyCreateIndexWithTemporaryService(
final ClusterState currentState,
final CreateIndexClusterStateUpdateRequest request,
final boolean silent,
final IndexMetadata sourceMetadata,
final IndexMetadata temporaryIndexMeta,
final List<CompressedXContent> mappings,
final Function<IndexService, List<AliasMetadata>> aliasSupplier,
final List<String> templatesApplied,
final BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer,
final ActionListener<Void> rerouteListener
) throws Exception {
// create the index here (on the master) to validate it can be created, as well as adding the mapping
return indicesService.<ClusterState, Exception>withTempIndexService(temporaryIndexMeta, indexService -> {
try {
updateIndexMappingsAndBuildSortOrder(indexService, request, mappings, sourceMetadata);
} catch (Exception e) {
logger.log(silent ? Level.DEBUG : Level.INFO, "failed on parsing mappings on index creation [{}]", request.index(), e);
throw e;
}
final List<AliasMetadata> aliases = aliasSupplier.apply(indexService);
final IndexMetadata indexMetadata;
try {
indexMetadata = buildIndexMetadata(
request.index(),
aliases,
indexService.mapperService()::documentMapper,
temporaryIndexMeta.getSettings(),
temporaryIndexMeta.getRoutingNumShards(),
sourceMetadata,
temporaryIndexMeta.isSystem()
);
} catch (Exception e) {
......
}
......
indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetadata.getIndex(), indexMetadata.getSettings());
ClusterState updated = clusterStateCreateIndex(
currentState,
request.blocks(),
indexMetadata,
metadataTransformer,
allocationService.getShardRoutingRoleStrategy()
);
if (request.performReroute()) {
updated = allocationService.reroute(updated, "index [" + indexMetadata.getIndex().getName() + "] created", rerouteListener);
}
return updated;
});
}
public ClusterState reroute(ClusterState clusterState, String reason, ActionListener<Void> listener) {
return executeWithRoutingAllocation(
clusterState,
reason,
routingAllocation -> shardsAllocator.allocate(routingAllocation, listener)
);
}
//AllocationService.java
public ClusterState executeWithRoutingAllocation(ClusterState clusterState, String reason, RerouteStrategy rerouteStrategy) {
ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
RoutingAllocation allocation = createRoutingAllocation(fixedClusterState, currentNanoTime());
reroute(allocation, rerouteStrategy);
if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
}
private void reroute(RoutingAllocation allocation, RerouteStrategy rerouteStrategy) {
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), () -> allocation).isEmpty()
: "auto-expand replicas out of sync with number of nodes in the cluster";
assert assertInitialized();
rerouteStrategy.removeDelayMarkers(allocation);
allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first
rerouteStrategy.execute(allocation);
assert RoutingNodes.assertShardStats(allocation.routingNodes());
}
-
如何决策shard落在哪个Node上呢?AllocationDeciders
AllocationDeciders是一个决策模块的集合,用于在分片分配和重新分配过程中,评估是否允许某个分片分配到某个节点。每个AllocationDecider代表一个特定的决策逻辑或策略,所有的决策结果共同决定分片的最终分配
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
return withDeciders(
allocation,
decider -> decider.canAllocate(shardRouting, allocation),
(decider, decision) -> Strings.format("Can not allocate [%s] on any node. [%s]: %s", shardRouting, decider, decision)
);
}
- EnableAllocationDecider 基于ALLOCATION_ENABLE_SETTING配置决定Node可以执行那种Shard,ALL/PRIMARIES/NEW_PRIMARIES/NONE
- DiskThresholdDecider 基于磁盘使用率的分配决策。确保节点有足够的磁盘空间来容纳新的分片,防止磁盘用量过高导致的节点问题
- SameShardAllocationDecider 确保同一索引的主分片和副本分片不分配到同一个节点上,避免单点故障导致数据丢失
- ThrottlingAllocationDecider 控制并限制同一时间内分配或重新分配的分片数量,以避免对集群造成过大的负载
- NodeVersionAllocationDecider 基于节点的版本号进行分配决策,确保分片不会分配到低版本的节点,避免兼容性问题
- AwarenessAllocationDecider 根据节点或物理机架位置等属性明确控制应将分片分配到何处
- ReplicaAfterPrimaryActiveAllocationDecider 确保只有在主分片变为活跃状态后,副本分片才能被分配,从而保证数据一致性
- ................
- 每一个节点必须全部满足👆的决策才会被选择
- 支持配置化
- FilterChain模式,支持扩展
执行流程如下:
-
评估每个节点:对于每个待分配分片,依次评估集群中的每个节点
-
调用每个AllocationDecider:对每个节点调用所有的AllocationDeciders,获取分配决策
-
决策结果:如果任何一个AllocationDecider拒绝分配,则该节点不适合作为目标节点。只有当所有AllocationDeciders都允许分配时,分片才会分配到该节点上
-
选择最佳节点:在所有允许分配的节点中,选择最适合的节点进行分配
- decideAllocateUnassigned方法的主要目的是为未分配的分片找到一个合适的节点进行分配,并返回分配决策和目标节点
- 调用allocation.deciders().canAllocate(shard, allocation)进行全局分配决策。如果返回NO且不需要解释(explain == false),则直接返回AllocateUnassignedDecision.no,表示无法分配该分片
- 遍历所有节点,计算每个节点当前索引的权重
float weight(Balancer balancer, ModelNode node, String index) { final float weightShard = node.numShards() - balancer.avgShardsPerNode(); final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); final float ingestLoad = (float) (node.writeLoad() - balancer.avgWriteLoadPerNode()); final float diskUsage = (float) (node.diskUsageInBytes() - balancer.avgDiskUsageInBytesPerNode()); return theta0 * weightShard + theta1 * weightIndex + theta2 * ingestLoad + theta3 * diskUsage; }
- 对每个节点调用allocation.deciders().canAllocate(shard, node.getRoutingNode(), allocation)进行具体节点的分配决策
- 如果当前节点的决策允许分配(YES或THROTTLE),且其权重低于当前最小权重,更新最优节点和最小权重
/**
* Make a decision for allocating an unassigned shard. This method returns a two values in a tuple: the
* first value is the {@link Decision} taken to allocate the unassigned shard, the second value is the
* {@link ModelNode} representing the node that the shard should be assigned to. If the decision returned
* is of type {@link Type#NO}, then the assigned node will be null.
*/
private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) {
if (shard.assignedToNode()) {
// we only make decisions for unassigned shards here
return AllocateUnassignedDecision.NOT_TAKEN;
}
final boolean explain = allocation.debugDecision();
Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation);
if (shardLevelDecision.type() == Type.NO && explain == false) {
// NO decision for allocating the shard, irrespective of any particular node, so exit early
return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null);
}
/* find an node with minimal weight we can allocate on*/
float minWeight = Float.POSITIVE_INFINITY;
ModelNode minNode = null;
Decision decision = null;
/* Don't iterate over an identity hashset here the
* iteration order is different for each run and makes testing hard */
Map<String, NodeAllocationResult> nodeExplanationMap = explain ? new HashMap<>() : null;
List<Tuple<String, Float>> nodeWeights = explain ? new ArrayList<>() : null;
for (ModelNode node : nodes.values()) {
if (node.containsShard(shard) && explain == false) {
// decision is NO without needing to check anything further, so short circuit
continue;
}
// weight of this index currently on the node
float currentWeight = weight.weight(this, node, shard.getIndexName());
// moving the shard would not improve the balance, and we are not in explain mode, so short circuit
if (currentWeight > minWeight && explain == false) {
continue;
}
Decision currentDecision = allocation.deciders().canAllocate(shard, node.getRoutingNode(), allocation);
if (explain) {
nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0));
nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight));
}
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
final boolean updateMinNode;
if (currentWeight == minWeight) {
/* we have an equal weight tie breaking:
* 1. if one decision is YES prefer it
* 2. prefer the node that holds the primary for this index with the next id in the ring ie.
* for the 3 shards 2 replica case we try to build up:
* 1 2 0
* 2 0 1
* 0 1 2
* such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater
* than the id of the shard we need to assign. This works find when new indices are created since
* primaries are added first and we only add one shard set a time in this algorithm.
*/
if (currentDecision.type() == decision.type()) {
final int repId = shard.id();
final int nodeHigh = node.highestPrimary(shard.index().getName());
final int minNodeHigh = minNode.highestPrimary(shard.getIndexName());
updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId))
&& (nodeHigh < minNodeHigh)) || (nodeHigh > repId && minNodeHigh < repId));
} else {
updateMinNode = currentDecision.type() == Type.YES;
}
} else {
updateMinNode = currentWeight < minWeight;
}
if (updateMinNode) {
minNode = node;
minWeight = currentWeight;
decision = currentDecision;
}
}
}
if (decision == null) {
// decision was not set and a node was not assigned, so treat it as a NO decision
decision = Decision.NO;
}
List<NodeAllocationResult> nodeDecisions = null;
if (explain) {
nodeDecisions = new ArrayList<>();
// fill in the correct weight ranking, once we've been through all nodes
nodeWeights.sort((nodeWeight1, nodeWeight2) -> Float.compare(nodeWeight1.v2(), nodeWeight2.v2()));
int weightRanking = 0;
for (Tuple<String, Float> nodeWeight : nodeWeights) {
NodeAllocationResult current = nodeExplanationMap.get(nodeWeight.v1());
nodeDecisions.add(new NodeAllocationResult(current.getNode(), current.getCanAllocateDecision(), ++weightRanking));
}
}
return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.routingNode.node() : null, nodeDecisions);
}
决定了哪个节点可以分配分片之后,最终调用RoutingNodes.initializeShard初始化Shard
public ShardRouting initializeShard(
ShardRouting unassignedShard,
String nodeId,
@Nullable String existingAllocationId,
long expectedSize,
RoutingChangesObserver routingChangesObserver
) {
ensureMutable();
assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard;
ShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);
node(nodeId).add(initializedShard);
inactiveShardCount++;
if (initializedShard.primary()) {
inactivePrimaryCount++;
}
addRecovery(initializedShard);
assignedShardsAdd(initializedShard);
routingChangesObserver.shardInitialized(unassignedShard, initializedShard);
return initializedShard;
}
上面的决策由谁来调用?
DesiredBalanceShardsAllocator
Cluster启动的时候完成create ShardsAllocator
-
创建索引时reroute,设置RerouteStrategy lambda =>routingAllocation -> shardsAllocator.allocate(routingAllocation, listener)
-
DesiredBalanceShardsAllocator.processInput 处理👆传入的请求
-
之后执行DesiredBalanceComputer.compute
- 计算shard分片的决策,对primary replica 进行分配
- 同时负责relocateShard
-
之后执行DesiredBalancedShardsAllocator.allocate
最终执行的方法👇
//DesiredBalanceShardsAllocator.java
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
assert MasterService.assertMasterUpdateOrTestThread() : Thread.currentThread().getName();
assert allocation.ignoreDisable() == false;
computationsSubmitted.inc();
var index = indexGenerator.incrementAndGet();
logger.debug("Executing allocate for [{}]", index);
queue.add(index, listener);
desiredBalanceComputation.onNewInput(DesiredBalanceInput.create(index, allocation));
// Starts reconciliation towards desired balance that might have not been updated with a recent calculation yet.
// This is fine as balance should have incremental rather than radical changes.
// This should speed up achieving the desired balance in cases current state is still different from it (due to THROTTLING).
reconcile(currentDesiredBalance, allocation);
}
//处理newInput请求,执行desiredBalanceComputer.compute
protected void processInput(DesiredBalanceInput desiredBalanceInput) {
processNodeShutdowns(desiredBalanceInput.routingAllocation().getClusterState());
long index = desiredBalanceInput.index();
logger.debug("Starting desired balance computation for [{}]", index);
recordTime(
cumulativeComputationTime,
() -> setCurrentDesiredBalance(
desiredBalanceComputer.compute(
getInitialDesiredBalance(),
desiredBalanceInput,
pendingDesiredBalanceMoves,
this::isFresh
)
)
);
computationsExecuted.inc();
if (isFresh(desiredBalanceInput)) {
logger.debug("Desired balance computation for [{}] is completed, scheduling reconciliation", index);
computationsConverged.inc();
submitReconcileTask(currentDesiredBalance);
} else {
logger.debug("Desired balance computation for [{}] is discarded as newer one is submitted", index);
}
}
private void recordTime(CounterMetric metric, Runnable action) {
final long started = threadPool.relativeTimeInMillis();
try {
action.run();//执行shard策略的地方,desiredBalanceComputer.compute
} finally {
final long finished = threadPool.relativeTimeInMillis();
metric.inc(finished - started);
}
}
desiredBalanceComputer.compute 代码量比较大,暂不列出
总结
分片策略最重要的是达到平衡状态
- 节点负载:确保节点的负载均衡,避免某些节点过载
- 资源使用情况:考虑节点的磁盘空间、CPU使用率等资源限制
- 数据分布策略:确保分片和副本分布在不同的节点上,以提高数据的高可用性
- 分片分配策略:遵循集群配置的分片分配策略和约束条件
- 节点健康状态:避免将分片分配到不健康或过载的节点上
ES通过对Node的Active检查以及AllocationDeciders 完成分配决策
FilterChain/Async 设计模式,也值得在类似的业务系统使用,简化代码逻辑
参考
源代码 8.13
ChatGPT