Elasticsearch 源码 - Index Shard Allocation

本系列的目标:

  • Elasticsearch 索引是如何分片? **
  • Elasticsearch 索引持久化机制?
  • Elasticsearch 索引如何故障恢复?
  • Elasticsearch 索引shard rebalancing?

源码:8.13

Index Shard Allocation

触发场景

  • index的创建和删除

  • NodeJoin/NodeLeft

  • 执行reroute命令

  • shard failed

  • cluster start

    • reroute
  • update replicas

本文以index的创建为例

Index-level shard allocation

Example:

curl -X PUT "localhost:9200/my-index-000001?pretty"

入口:

TransportAction.proceed =>

TransportMasterNodeAction.doExecute =>

TransportMasterNodeAction$AsyncSingleAction.doStart =>

TransportMasterNodeAction.executeMasterOperation =>

TransportCreateIndexAction.masterOperation =>

MetadataCreateIndexService.createIndex =>

MetadataCreateIndexService.applyCreateIndexRequest =>

核心代码如下:

  • getIndexNumberOfRoutingShards 获取shards分配配置

  • 生成 RoutingTable

    • 索引级别的路由信息:每个索引都有其自己的RoutingTable,其中包含所有分片的路由信息
    • 分片级别的路由信息:每个分片都有一个分片路由对象,记录该分片的主副本和副本的状态(初始化、恢复中、已分配等)和位置
    • RoutingTable决定每个索引的主分片和副本分片的分配位置
    • 当一个查询或索引请求到达时,RoutingTable帮助定位相关分片,从而将请求路由到适当的节点进行处理
    • 在节点失效或新增节点时,RoutingTable用于重新分配分片,确保数据的高可用性和负载均衡
  • allocationService.reroute 分配索引的shard,代码较多,核心功能

    • 计算分配:reroute方法会根据当前集群状态和分配策略,计算新的分片分配计划。考虑的因素包括:

      • 当前分片状态(已分配、未分配、初始化中等)。
      • 节点的可用性和健康状态。
      • 分配策略和约束(例如,分片和副本不能分配到同一节点)。
      • 节点资源使用情况(CPU、内存等)
    • 执行分配:根据计算出的分配计划,执行实际的分片重新分配操作

      • 将未分配的分片分配到新的节点
      • 从失败的节点上迁移分片到健康的节点
      • 调整分片以实现更好的负载均衡
    • 更新集群状态:分片重新分配后,更新集群状态信息,确保所有节点都能同步最新的分片分配情况

//applyCreateIndexRequest 片段
        ......
        int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null);
        IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards);

        return applyCreateIndexWithTemporaryService(
            currentState,
            request,
            silent,
            null,
            tmpImd,
            mappings,
            indexService -> resolveAndValidateAliases(
                request.index(),
                // data stream aliases are created separately in MetadataCreateDataStreamService::createDataStream
                isDataStream ? Set.of() : request.aliases(),
                isDataStream ? List.of() : MetadataIndexTemplateService.resolveAliases(currentState.metadata(), templateName),
                currentState.metadata(),
                xContentRegistry,
                // the context is used ony for validation so it's fine to pass fake values for the shard id and the current timestamp
                indexService.newSearchExecutionContext(0, 0, null, () -> 0L, null, emptyMap()),
                IndexService.dateMathExpressionResolverAt(request.getNameResolvedAt()),
                systemIndices::isSystemName
            ),
            Collections.singletonList(templateName),
            metadataTransformer,
            rerouteListener
        );
/**
     * Given the state and a request as well as the metadata necessary to build a new index,
     * validate the configuration with an actual index service as return a new cluster state with
     * the index added (and rerouted)
     * @param currentState the current state to base the new state off of
     * @param request the create index request
     * @param silent a boolean for whether logging should be at a lower or higher level
     * @param sourceMetadata when recovering from an existing index, metadata that should be copied to the new index
     * @param temporaryIndexMeta metadata for the new index built from templates, source metadata, and request settings
     * @param mappings a list of all mapping definitions to apply, in order
     * @param aliasSupplier a function that takes the real {@link IndexService} and returns a list of {@link AliasMetadata} aliases
     * @param templatesApplied a list of the names of the templates applied, for logging
     * @param metadataTransformer if provided, a function that may alter cluster metadata in the same cluster state update that
     *                            creates the index
     * @return a new cluster state with the index added
     */
    private ClusterState applyCreateIndexWithTemporaryService(
        final ClusterState currentState,
        final CreateIndexClusterStateUpdateRequest request,
        final boolean silent,
        final IndexMetadata sourceMetadata,
        final IndexMetadata temporaryIndexMeta,
        final List<CompressedXContent> mappings,
        final Function<IndexService, List<AliasMetadata>> aliasSupplier,
        final List<String> templatesApplied,
        final BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer,
        final ActionListener<Void> rerouteListener
    ) throws Exception {
        // create the index here (on the master) to validate it can be created, as well as adding the mapping
        return indicesService.<ClusterState, Exception>withTempIndexService(temporaryIndexMeta, indexService -> {
            try {
                updateIndexMappingsAndBuildSortOrder(indexService, request, mappings, sourceMetadata);
            } catch (Exception e) {
                logger.log(silent ? Level.DEBUG : Level.INFO, "failed on parsing mappings on index creation [{}]", request.index(), e);
                throw e;
            }

            final List<AliasMetadata> aliases = aliasSupplier.apply(indexService);

            final IndexMetadata indexMetadata;
            try {
                indexMetadata = buildIndexMetadata(
                    request.index(),
                    aliases,
                    indexService.mapperService()::documentMapper,
                    temporaryIndexMeta.getSettings(),
                    temporaryIndexMeta.getRoutingNumShards(),
                    sourceMetadata,
                    temporaryIndexMeta.isSystem()
                );
            } catch (Exception e) {
               ......
            }

           ......
            indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetadata.getIndex(), indexMetadata.getSettings());

            ClusterState updated = clusterStateCreateIndex(
                currentState,
                request.blocks(),
                indexMetadata,
                metadataTransformer,
                allocationService.getShardRoutingRoleStrategy()
            );
            if (request.performReroute()) {
                updated = allocationService.reroute(updated, "index [" + indexMetadata.getIndex().getName() + "] created", rerouteListener);
            }
            return updated;
        });
    }
public ClusterState reroute(ClusterState clusterState, String reason, ActionListener<Void> listener) {
        return executeWithRoutingAllocation(
            clusterState,
            reason,
            routingAllocation -> shardsAllocator.allocate(routingAllocation, listener)
        );
    }
//AllocationService.java

public ClusterState executeWithRoutingAllocation(ClusterState clusterState, String reason, RerouteStrategy rerouteStrategy) {
        ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
        RoutingAllocation allocation = createRoutingAllocation(fixedClusterState, currentNanoTime());
        reroute(allocation, rerouteStrategy);
        if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
            return clusterState;
        }
        return buildResultAndLogHealthChange(clusterState, allocation, reason);
    }

private void reroute(RoutingAllocation allocation, RerouteStrategy rerouteStrategy) {
        assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
        assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), () -> allocation).isEmpty()
            : "auto-expand replicas out of sync with number of nodes in the cluster";
        assert assertInitialized();
        rerouteStrategy.removeDelayMarkers(allocation);
        allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first
        rerouteStrategy.execute(allocation);
        assert RoutingNodes.assertShardStats(allocation.routingNodes());
    }
  • 如何决策shard落在哪个Node上呢?AllocationDeciders

    AllocationDeciders是一个决策模块的集合,用于在分片分配和重新分配过程中,评估是否允许某个分片分配到某个节点。每个AllocationDecider代表一个特定的决策逻辑或策略,所有的决策结果共同决定分片的最终分配

public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
        return withDeciders(
            allocation,
            decider -> decider.canAllocate(shardRouting, allocation),
            (decider, decision) -> Strings.format("Can not allocate [%s] on any node. [%s]: %s", shardRouting, decider, decision)
        );
    }
  • EnableAllocationDecider 基于ALLOCATION_ENABLE_SETTING配置决定Node可以执行那种Shard,ALL/PRIMARIES/NEW_PRIMARIES/NONE
  • DiskThresholdDecider 基于磁盘使用率的分配决策。确保节点有足够的磁盘空间来容纳新的分片,防止磁盘用量过高导致的节点问题
  • SameShardAllocationDecider 确保同一索引的主分片和副本分片不分配到同一个节点上,避免单点故障导致数据丢失
  • ThrottlingAllocationDecider 控制并限制同一时间内分配或重新分配的分片数量,以避免对集群造成过大的负载
  • NodeVersionAllocationDecider 基于节点的版本号进行分配决策,确保分片不会分配到低版本的节点,避免兼容性问题
  • AwarenessAllocationDecider 根据节点或物理机架位置等属性明确控制应将分片分配到何处
  • ReplicaAfterPrimaryActiveAllocationDecider 确保只有在主分片变为活跃状态后,副本分片才能被分配,从而保证数据一致性
  • ................
  • 每一个节点必须全部满足👆的决策才会被选择
  • 支持配置化
  • FilterChain模式,支持扩展

执行流程如下:

  • 评估每个节点:对于每个待分配分片,依次评估集群中的每个节点

  • 调用每个AllocationDecider:对每个节点调用所有的AllocationDeciders,获取分配决策

  • 决策结果:如果任何一个AllocationDecider拒绝分配,则该节点不适合作为目标节点。只有当所有AllocationDeciders都允许分配时,分片才会分配到该节点上

  • 选择最佳节点:在所有允许分配的节点中,选择最适合的节点进行分配

    • decideAllocateUnassigned方法的主要目的是为未分配的分片找到一个合适的节点进行分配,并返回分配决策和目标节点
    • 调用allocation.deciders().canAllocate(shard, allocation)进行全局分配决策。如果返回NO且不需要解释(explain == false),则直接返回AllocateUnassignedDecision.no,表示无法分配该分片
    • 遍历所有节点,计算每个节点当前索引的权重
    float weight(Balancer balancer, ModelNode node, String index) {
              final float weightShard = node.numShards() - balancer.avgShardsPerNode();
              final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
              final float ingestLoad = (float) (node.writeLoad() - balancer.avgWriteLoadPerNode());
              final float diskUsage = (float) (node.diskUsageInBytes() - balancer.avgDiskUsageInBytesPerNode());
              return theta0 * weightShard + theta1 * weightIndex + theta2 * ingestLoad + theta3 * diskUsage;
          }
    
    • 对每个节点调用allocation.deciders().canAllocate(shard, node.getRoutingNode(), allocation)进行具体节点的分配决策
    • 如果当前节点的决策允许分配(YES或THROTTLE),且其权重低于当前最小权重,更新最优节点和最小权重
/**
         * Make a decision for allocating an unassigned shard. This method returns a two values in a tuple: the
         * first value is the {@link Decision} taken to allocate the unassigned shard, the second value is the
         * {@link ModelNode} representing the node that the shard should be assigned to. If the decision returned
         * is of type {@link Type#NO}, then the assigned node will be null.
         */
        private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) {
            if (shard.assignedToNode()) {
                // we only make decisions for unassigned shards here
                return AllocateUnassignedDecision.NOT_TAKEN;
            }

            final boolean explain = allocation.debugDecision();
            Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation);
            if (shardLevelDecision.type() == Type.NO && explain == false) {
                // NO decision for allocating the shard, irrespective of any particular node, so exit early
                return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null);
            }

            /* find an node with minimal weight we can allocate on*/
            float minWeight = Float.POSITIVE_INFINITY;
            ModelNode minNode = null;
            Decision decision = null;
            /* Don't iterate over an identity hashset here the
             * iteration order is different for each run and makes testing hard */
            Map<String, NodeAllocationResult> nodeExplanationMap = explain ? new HashMap<>() : null;
            List<Tuple<String, Float>> nodeWeights = explain ? new ArrayList<>() : null;
            for (ModelNode node : nodes.values()) {
                if (node.containsShard(shard) && explain == false) {
                    // decision is NO without needing to check anything further, so short circuit
                    continue;
                }

                // weight of this index currently on the node
                float currentWeight = weight.weight(this, node, shard.getIndexName());
                // moving the shard would not improve the balance, and we are not in explain mode, so short circuit
                if (currentWeight > minWeight && explain == false) {
                    continue;
                }

                Decision currentDecision = allocation.deciders().canAllocate(shard, node.getRoutingNode(), allocation);
                if (explain) {
                    nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0));
                    nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight));
                }
                if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
                    final boolean updateMinNode;
                    if (currentWeight == minWeight) {
                        /*  we have an equal weight tie breaking:
                         *  1. if one decision is YES prefer it
                         *  2. prefer the node that holds the primary for this index with the next id in the ring ie.
                         *  for the 3 shards 2 replica case we try to build up:
                         *    1 2 0
                         *    2 0 1
                         *    0 1 2
                         *  such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater
                         *  than the id of the shard we need to assign. This works find when new indices are created since
                         *  primaries are added first and we only add one shard set a time in this algorithm.
                         */
                        if (currentDecision.type() == decision.type()) {
                            final int repId = shard.id();
                            final int nodeHigh = node.highestPrimary(shard.index().getName());
                            final int minNodeHigh = minNode.highestPrimary(shard.getIndexName());
                            updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId))
                                && (nodeHigh < minNodeHigh)) || (nodeHigh > repId && minNodeHigh < repId));
                        } else {
                            updateMinNode = currentDecision.type() == Type.YES;
                        }
                    } else {
                        updateMinNode = currentWeight < minWeight;
                    }
                    if (updateMinNode) {
                        minNode = node;
                        minWeight = currentWeight;
                        decision = currentDecision;
                    }
                }
            }
            if (decision == null) {
                // decision was not set and a node was not assigned, so treat it as a NO decision
                decision = Decision.NO;
            }
            List<NodeAllocationResult> nodeDecisions = null;
            if (explain) {
                nodeDecisions = new ArrayList<>();
                // fill in the correct weight ranking, once we've been through all nodes
                nodeWeights.sort((nodeWeight1, nodeWeight2) -> Float.compare(nodeWeight1.v2(), nodeWeight2.v2()));
                int weightRanking = 0;
                for (Tuple<String, Float> nodeWeight : nodeWeights) {
                    NodeAllocationResult current = nodeExplanationMap.get(nodeWeight.v1());
                    nodeDecisions.add(new NodeAllocationResult(current.getNode(), current.getCanAllocateDecision(), ++weightRanking));
                }
            }
            return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.routingNode.node() : null, nodeDecisions);
        }

决定了哪个节点可以分配分片之后,最终调用RoutingNodes.initializeShard初始化Shard

public ShardRouting initializeShard(
        ShardRouting unassignedShard,
        String nodeId,
        @Nullable String existingAllocationId,
        long expectedSize,
        RoutingChangesObserver routingChangesObserver
    ) {
        ensureMutable();
        assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard;
        ShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);
        node(nodeId).add(initializedShard);
        inactiveShardCount++;
        if (initializedShard.primary()) {
            inactivePrimaryCount++;
        }
        addRecovery(initializedShard);
        assignedShardsAdd(initializedShard);
        routingChangesObserver.shardInitialized(unassignedShard, initializedShard);
        return initializedShard;
    }

上面的决策由谁来调用?

DesiredBalanceShardsAllocator

Cluster启动的时候完成create ShardsAllocator

  • 创建索引时reroute,设置RerouteStrategy lambda =>routingAllocation -> shardsAllocator.allocate(routingAllocation, listener)

  • DesiredBalanceShardsAllocator.processInput 处理👆传入的请求

  • 之后执行DesiredBalanceComputer.compute

    • 计算shard分片的决策,对primary replica 进行分配
    • 同时负责relocateShard
  • 之后执行DesiredBalancedShardsAllocator.allocate

最终执行的方法👇

//DesiredBalanceShardsAllocator.java
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
        assert MasterService.assertMasterUpdateOrTestThread() : Thread.currentThread().getName();
        assert allocation.ignoreDisable() == false;

        computationsSubmitted.inc();

        var index = indexGenerator.incrementAndGet();
        logger.debug("Executing allocate for [{}]", index);
        queue.add(index, listener);
        desiredBalanceComputation.onNewInput(DesiredBalanceInput.create(index, allocation));

        // Starts reconciliation towards desired balance that might have not been updated with a recent calculation yet.
        // This is fine as balance should have incremental rather than radical changes.
        // This should speed up achieving the desired balance in cases current state is still different from it (due to THROTTLING).
        reconcile(currentDesiredBalance, allocation);
    }
//处理newInput请求,执行desiredBalanceComputer.compute
 protected void processInput(DesiredBalanceInput desiredBalanceInput) {
                processNodeShutdowns(desiredBalanceInput.routingAllocation().getClusterState());

                long index = desiredBalanceInput.index();
                logger.debug("Starting desired balance computation for [{}]", index);

                recordTime(
                    cumulativeComputationTime,
                    () -> setCurrentDesiredBalance(
                        desiredBalanceComputer.compute(
                            getInitialDesiredBalance(),
                            desiredBalanceInput,
                            pendingDesiredBalanceMoves,
                            this::isFresh
                        )
                    )
                );
                computationsExecuted.inc();
                if (isFresh(desiredBalanceInput)) {
                    logger.debug("Desired balance computation for [{}] is completed, scheduling reconciliation", index);
                    computationsConverged.inc();
                    submitReconcileTask(currentDesiredBalance);
                } else {
                    logger.debug("Desired balance computation for [{}] is discarded as newer one is submitted", index);
                }
            }
private void recordTime(CounterMetric metric, Runnable action) {
        final long started = threadPool.relativeTimeInMillis();
        try {
            action.run();//执行shard策略的地方,desiredBalanceComputer.compute
        } finally {
            final long finished = threadPool.relativeTimeInMillis();
            metric.inc(finished - started);
        }
    }

desiredBalanceComputer.compute 代码量比较大,暂不列出

总结

分片策略最重要的是达到平衡状态

  • 节点负载:确保节点的负载均衡,避免某些节点过载
  • 资源使用情况:考虑节点的磁盘空间、CPU使用率等资源限制
  • 数据分布策略:确保分片和副本分布在不同的节点上,以提高数据的高可用性
  • 分片分配策略:遵循集群配置的分片分配策略和约束条件
  • 节点健康状态:避免将分片分配到不健康或过载的节点上

ES通过对Node的Active检查以及AllocationDeciders 完成分配决策

FilterChain/Async 设计模式,也值得在类似的业务系统使用,简化代码逻辑

参考

源代码 8.13

ChatGPT