本文将对Kubernetes中controller-manager的源码进行分析。分析的时间为2017年08月19日。采用主干的代码,commitId为2ab7ad14b4fad378a4a69a64c587497d77e60f44

Replicaset Controller 初始化过程

Replicaset Controller 创建其实可以理解从controllers["replicaset"] = startReplicaSetController开始的。

startReplicaSetController函数中,首先判断replicasets是否开启。然后创建并初始化一个ReplicaSetController对象。最后启动ReplicaSetController对象的Run时,进入循环处理流程。

func startReplicaSetController(ctx ControllerContext) (bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] {
        return false, nil
    }
    go replicaset.NewReplicaSetController(
        ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
        replicaset.BurstReplicas,
    ).Run(int(ctx.Options.ConcurrentRSSyncs), ctx.Stop)
    return true, nil
}

Replicaset Controller 创建对象

创建Replicaset Controller 对象的过程,在NewReplicaSetController完成。具体的步骤包括以下几个。(1)创建eventBroadcaster并设置对应的属性。(2)创建ReplicaSetController对象(3)设置rsInformerpodInformer对应的事件回调函数(4)设置rsc.syncHandler对象为rsc.syncReplicaSet函数

// NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
    if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
        metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().RESTClient().GetRateLimiter())
    }

    //创建并设置eventBroadcaster属性a
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(glog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})

  //创建ReplicaSetController对象
    rsc := &ReplicaSetController{
        kubeClient: kubeClient,
        podControl: controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
        },
        burstReplicas: burstReplicas,
        expectations:  controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
        queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicaset"),
    }

//设置rsInformer和podInformer对应的回调函数
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    rsc.enqueueReplicaSet,
        UpdateFunc: rsc.updateRS,
        // This will enter the sync loop and no-op, because the replica set has been deleted from the store.
        // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
        // way of achieving this is by performing a `stop` operation on the replica set.
        DeleteFunc: rsc.enqueueReplicaSet,
    })
    rsc.rsLister = rsInformer.Lister()
    rsc.rsListerSynced = rsInformer.Informer().HasSynced

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.addPod,
        // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
        // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
        // local storage, so it should be ok.
        UpdateFunc: rsc.updatePod,
        DeleteFunc: rsc.deletePod,
    })
    rsc.podLister = podInformer.Lister()
    rsc.podListerSynced = podInformer.Informer().HasSynced

//设置rsc.syncHandler处理的函数
    rsc.syncHandler = rsc.syncReplicaSet

    return rsc
}

Replicaset Controller 中Run函数的处理

// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer rsc.queue.ShutDown()

    glog.Infof("Starting replica set controller")
    defer glog.Infof("Shutting down replica set Controller")

//等待Informer初始化完成
    if !controller.WaitForCacheSync("replica set", stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
        return
    }

//启动woker协程
    for i := 0; i < workers; i++ {
        go wait.Until(rsc.worker, time.Second, stopCh)
    }

    <-stopCh
}

在协程中,转换到对应的处理函数。

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rsc *ReplicaSetController) worker() {
    for rsc.processNextWorkItem() {
    }
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
    key, quit := rsc.queue.Get()
    if quit {
        return false
    }
    defer rsc.queue.Done(key)

    err := rsc.syncHandler(key.(string))
    if err == nil {
        rsc.queue.Forget(key)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
    rsc.queue.AddRateLimited(key)

    return true
}

在处理函数中,实际的处理逻辑为。从queue中取出相应的key,然后调用syncHandler设置的函数进行处理。

Replicaset Controller 中syncHandler函数的处理

在 Replicaset Controller中syncHandler被设置为rsc.syncReplicaSet。rsc.syncReplicaSet的中的处理为

// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
    startTime := time.Now()
    defer func() {
        glog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Now().Sub(startTime))
    }()

    //名称解析
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }

    //获取RS的对象
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    if errors.IsNotFound(err) {
        glog.V(4).Infof("ReplicaSet has been deleted %v", key)
        rsc.expectations.DeleteExpectations(key)
        return nil
    }
    if err != nil {
        return err
    }

    //获取是否有异常信息
    rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)

    //获取Label标签
    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
        return nil
    }


    //获取命名空间下所有的Pod
    // list all pods to include the pods that don't match the rs`s selector
    // anymore but has the stale controller ref.
    // TODO: Do the List and Filter in a single pass, or use an index.
    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    if err != nil {
        return err
    }

    // Ignore inactive pods.
    //过滤掉状态为(v1.PodSucceeded,v1.PodFailed的Pod)
    var filteredPods []*v1.Pod
    for _, pod := range allPods {
        if controller.IsPodActive(pod) {
            filteredPods = append(filteredPods, pod)
        }
    }

    //直接通过kubeClient获取一次RS对象,然后再次检测RS对象是否被删除
    // If any adoptions are attempted, we should first recheck for deletion with
    // an uncached quorum read sometime after listing Pods (see #42639).
    canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
        fresh, err := rsc.kubeClient.ExtensionsV1beta1().ReplicaSets(rs.Namespace).Get(rs.Name, metav1.GetOptions{})
        if err != nil {
            return nil, err
        }
        if fresh.UID != rs.UID {
            return nil, fmt.Errorf("original ReplicaSet %v/%v is gone: got uid %v, wanted %v", rs.Namespace, rs.Name, fresh.UID, rs.UID)
        }
        return fresh, nil
    })

    //创建NewPodControllerRefManager对象,后续使用该对象处理Pod的相关操作
    cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind, canAdoptFunc)
    // NOTE: filteredPods are pointing to objects from cache - if you need to
    //获取属于这个RS的Pod,也包括捕获的孤儿Pod
    //这个是一个关键流程,涉及到哪些Pod属于这个RS,后面会展开分析
    // modify them, you need to copy it first.
    filteredPods, err = cm.ClaimPods(filteredPods)
    if err != nil {
        return err
    }

    //根据RS管理Pod,这个是处理的核心流程,后面会展开分析
    var manageReplicasErr error
    if rsNeedsSync && rs.DeletionTimestamp == nil {
        manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
    }

    //创建一个RS的副本
    copy, err := scheme.Scheme.DeepCopy(rs)
    if err != nil {
        return err
    }
    rs = copy.(*extensions.ReplicaSet)

    //计算RS的状态
    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

    //更新RS状态,失败后重试次数为1
    // Always updates status as pods come up or die.
    updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, newStatus)
    if err != nil {
        // Multiple things could lead to this update failing. Requeuing the replica set ensures
        // Returning an error causes a requeue without forcing a hotloop
        return err
    }

    //如果updatedRS.Spec.MinReadySeconds不为0,则RS延时MinReadySeconds后,再放入到队列中

    //AddAfter函数实现在\k8s.io\kubernetes\staging\src\k8s.io\client-go\util\workqueue\delaying_queue.go中

    // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
    if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
        updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
        updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
        rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    }

    //处理完成后返回
    return manageReplicasErr
}

上面的过程是RS的处理过程,其中主要涉及到Pod的选择和RS对Pod的管理。接下来我们将分别对Pod的选择与Pod的管理进行分析。

Replicaset Controller 中CliamPod的处理

在Replicaset Controller中,获取到namespace下所有的Pod后,会进入PodControllerRefManager对象的ClaimPods函数,判断哪些Pod是属于自己的。

ClaimPods函数的具体内容如下:

func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
    var claimed []*v1.Pod
    var errlist []error

    //定义标签匹配函数
    match := func(obj metav1.Object) bool {
        pod := obj.(*v1.Pod)
        // Check selector first so filters only run on potentially matching Pods.
        if !m.Selector.Matches(labels.Set(pod.Labels)) {
            return false
        }
        for _, filter := range filters {
            if !filter(pod) {
                return false
            }
        }
        return true
    }

    //定义捕获函数
    adopt := func(obj metav1.Object) error {
        return m.AdoptPod(obj.(*v1.Pod))
    }

    //定义释放函数
    release := func(obj metav1.Object) error {
        return m.ReleasePod(obj.(*v1.Pod))
    }

    //对每个Pod轮训,判断是否属于这个RS
    for _, pod := range pods {
        ok, err := m.ClaimObject(pod, match, adopt, release)
        if err != nil {
            errlist = append(errlist, err)
            continue
        }
        if ok {
            claimed = append(claimed, pod)
        }
    }
    return claimed, utilerrors.NewAggregate(errlist)
}

ClaimObject中的处理:

func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) {
    //获取Pod中的归属信息
    controllerRef := metav1.GetControllerOf(obj)
    if controllerRef != nil {
         //如果存在归属信息,则判断是否属于这个RS
         //通过UID判断,如果UID不相等则说明属于另外的资源
        if controllerRef.UID != m.Controller.GetUID() {
            // Owned by someone else. Ignore.
            return false, nil
        }

        //UID相同则进行一步判断Label是否相等,相等则认为是这个RS的Pod
        if match(obj) {
            // We already own it and the selector matches.
            // Return true (successfully claimed) before checking deletion timestamp.
            // We're still allowed to claim things we already own while being deleted
            // because doing so requires taking no actions.
            return true, nil
        }

        //如果不相等,则说明是归属异常的Pod
        //先判断是否处于删除状态,如果不处于删除状态则释放Pod

        // Owned by us but selector doesn't match.
        // Try to release, unless we're being deleted.
        if m.Controller.GetDeletionTimestamp() != nil {
            return false, nil
        }
        if err := release(obj); err != nil {
            // If the pod no longer exists, ignore the error.
            if errors.IsNotFound(err) {
                return false, nil
            }
            // Either someone else released it, or there was a transient error.
            // The controller should requeue and try again if it's still stale.
            return false, err
        }
        // Successfully released.
        return false, nil
    }

    //如果Pod没有确定归属信息,则Pod属于孤儿Pod
    //如果RS没有删除,Pod一直存在
    //尝试设置Pod属于这个RS
    // It's an orphan.
    if m.Controller.GetDeletionTimestamp() != nil || !match(obj) {
        // Ignore if we're being deleted or selector doesn't match.
        return false, nil
    }
    if obj.GetDeletionTimestamp() != nil {
        // Ignore if the object is being deleted
        return false, nil
    }
    // Selector matches. Try to adopt.
    if err := adopt(obj); err != nil {
        // If the pod no longer exists, ignore the error.
        if errors.IsNotFound(err) {
            return false, nil
        }
        // Either someone else claimed it first, or there was a transient error.
        // The controller should requeue and try again if it's still orphaned.
        return false, err
    }
    // Successfully adopted.
    return true, nil
}

Replicaset Controller 中manageReplicas函数的处理

在Replicaset Controller 中,核心的步骤是通过RS调节Pod的数量,这个在manageReplicas函数中完成。manageReplicas函数的处理代码:

func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *extensions.ReplicaSet) error {
    //判断设置的副本数量与实际Pod的数量
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err))
        return nil
    }
    var errCh chan error

    //如果数量过少,则增加Pod
    if diff < 0 {
        diff *= -1
        errCh = make(chan error, diff)

        //进行流量控制,一次最多增加500个Pod
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        // TODO: Track UIDs of creates just like deletes. The problem currently
        // is we'd need to wait on the result of a create to record the pod's
        // UID, which would require locking *across* the create, which will turn
        // into a performance bottleneck. We should generate a UID for the pod
        //获得异常信息
        // beforehand and store it via ExpectCreations.
        rsc.expectations.ExpectCreations(rsKey, diff)
        var wg sync.WaitGroup
        wg.Add(diff)
        glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        //每个Pod单独起协程增加,异步操作
        for i := 0; i < diff; i++ {
            go func() {
                defer wg.Done()
                var err error
                boolPtr := func(b bool) *bool { return &b }
                controllerRef := &metav1.OwnerReference{
                    APIVersion:         controllerKind.GroupVersion().String(),
                    Kind:               controllerKind.Kind,
                    Name:               rs.Name,
                    UID:                rs.UID,
                    BlockOwnerDeletion: boolPtr(true),
                    Controller:         boolPtr(true),
                }

                //调用Pod增加函数增加
                err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
                if err != nil && errors.IsTimeout(err) {
                    // Pod is created but its initialization has timed out.
                    // If the initialization is successful eventually, the
                    // controller will observe the creation via the informer.
                    // If the initialization fails, or if the pod keeps
                    // uninitialized for a long time, the informer will not
                    // receive any update, and the controller will create a new
                    // pod when the expectation expires.
                    return
                }
                if err != nil {
                    // Decrement the expected number of creates because the informer won't observe this pod
                    glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
                    rsc.expectations.CreationObserved(rsKey)
                    errCh <- err
                }
            }()
        }
        wg.Wait()
    } else if diff > 0 {
        //删除限制一次删除的数量
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        errCh = make(chan error, diff)
        glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        // No need to sort pods if we are about to delete all of them

        //进行排序,按照ActivePods.Less函数中的规则排序
        //删除排序靠前的Pod
        if *(rs.Spec.Replicas) != 0 {
            // Sort the pods in the order such that not-ready < ready, unscheduled
            // < scheduled, and pending < running. This ensures that we delete pods
            // in the earlier stages whenever possible.
            sort.Sort(controller.ActivePods(filteredPods))
        }
        // Snapshot the UIDs (ns/name) of the pods we're expecting to see
        // deleted, so we know to record their expectations exactly once either
        // when we see it as an update of the deletion timestamp, or as a delete.
        // Note that if the labels on a pod/rs change in a way that the pod gets
        // orphaned, the rs will only wake up after the expectations have
        // expired even if other pods are deleted.
        deletedPodKeys := []string{}
        for i := 0; i < diff; i++ {
            deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
        }
        rsc.expectations.ExpectDeletions(rsKey, deletedPodKeys)
        var wg sync.WaitGroup
        wg.Add(diff)
        for i := 0; i < diff; i++ {
            go func(ix int) {
                defer wg.Done()
                //异步操作,异常删除Pod
                if err := rsc.podControl.DeletePod(rs.Namespace, filteredPods[ix].Name, rs); err != nil {
                    // Decrement the expected number of deletes because the informer won't observe this deletion
                    podKey := controller.PodKey(filteredPods[ix])
                    glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rs.Namespace, rs.Name)
                    rsc.expectations.DeletionObserved(rsKey, podKey)
                    errCh <- err
                }
            }(i)
        }
        wg.Wait()
    }

   //等待操作完成
    select {
    case err := <-errCh:
        // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
        if err != nil {
            return err
        }
    default:
    }
    return nil
}

总结

Replicaset Controller的工作过程包括下面几个主要的步骤:
(1) 在kube-controller-manager框架下,通过startReplicaSetController函数创建对象并启动
(2) 在创建ReplicaSetController对象过程中,主要涉及的操作包括,创建rs监听器,pod监听器,设置syncHandler对象。(在rs监听器中,delete操作也会往队列中写入对象,然后再循环中统一返回nil,完成delete处理)
(3) 在Run函数中WaitForCacheSync等待监听器初始化完成,然后启动Work协程
(4) 在work协程中,依次从队列汇总去除rs对象,进行处理。处理成功则从队列中移除。处理失败则AddRateLimited。
(5) 在syncReplicaSet函数中进行真正的处理。处理过程中:
1、获取RS对象
2、获取Namespace下所有的Pod,并对Pod进行过滤
3、通过RS控制Pod的副本数

Logo

开源、云原生的融合云平台

更多推荐