✍️ 作者:茶水间Tech

🏷️ 标签:#云计算#云原生#kubernetes#容器

📖 前言

​ kubernetes的模块比较多,架构复杂,代码量更是庞大,看代码比较麻烦,我们从现实场景出发,从创建POD分析在Kubernetes内部的代码流程,本系列文章从POD创建,整体梳理Kubernetes源码实现,其中本节主要分析kubelet侧的流程实现。

​ 本文基于 Client Version: v1.34.3 , Server Version: v1.34.2

​ 📌 POD创建的整体架构图

在这里插入图片描述

💻 正文

📑 一、关于kubelet

​ 在kubernetes集群中,每个Node节点都会启动kubelet进程,用来处理Master节点下发到本节点的任务,管理Pod和其中的容器。

​ kubelet 是基于 PodSpec 来工作的。每个 PodSpec 是一个描述 Pod 的 YAML 或 JSON 对象。 kubelet 接受通过各种机制(主要是通过 apiserver)提供的一组 PodSpec,并确保这些 PodSpec 中描述的容器处于运行状态且运行状况良好。

在每个 Node 上运行的 Kubelet,都会维持一个与 API Server 的长连接。

  1. 带过滤条件的 Watch:
    Kubelet 只关心属于它自己的 Pod。它会向 API Server 发起一个类似这样的请求:
    GET /v1/pods?watch=true&fieldSelector=spec.nodeName={My_Node_Name}
    • 这里的 fieldSelector 极其重要,它确保了 Node-A 不会收到发给 Node-B 的 Pod 信息。
  2. 事件触达:
    当 Scheduler 完成 Bind 动作,API Server 中的 Pod 对象更新了 nodeName 字段。API Server 会立刻通过这个长连接,向该节点所在的 Kubelet 推送一个 “MODIFIED” 事件。
📑 二、代码分析
程序入口:Run (scheduler.go)

Run中启动了syncLoop 循环同步

代码路径:kubernetes/pkg/kubelet/kubelet.go

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	//...(略)

	kl.syncLoop(ctx, updates, kl)
}
详细流程如下:

kubelet

2.1 主循环:syncLoop(kubelet.go)

这个 syncLoop 是 Kubelet 的心脏,确保节点上的 Pod 状态与期望状态保持一致。

代码路径:kubernetes/pkg/kubelet/kubelet.go

func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	klog.InfoS("Starting kubelet main sync loop")
	// The syncTicker wakes up kubelet to checks if there are any pod workers
	// that need to be sync'd. A one-second period is sufficient because the
	// sync interval is defaulted to 10s.
	// 同步定时器:每秒触发一次,检查需要同步的 Pod
	syncTicker := time.NewTicker(time.Second)
	defer syncTicker.Stop()
	// 清理定时器:执行周期性清理任务(默认2分钟)
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	defer housekeepingTicker.Stop()
	plegCh := kl.pleg.Watch()
	const (
		base   = 100 * time.Millisecond    // 初始延迟:100ms
		max    = 5 * time.Second           // 最大延迟:5秒
		factor = 2                         // 指数因子:2倍增长
	)
	duration := base
	// Responsible for checking limits in resolv.conf
	// The limits do not have anything to do with individual pods
	// Since this is called in syncLoop, we don't need to call it anywhere else
	if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
		kl.dnsConfigurer.CheckLimitsForResolvConf(klog.FromContext(ctx))
	}

	for {
		if err := kl.runtimeState.runtimeErrors(); err != nil {
			klog.ErrorS(err, "Skipping pod synchronization")
			// exponential backoff
			time.Sleep(duration)
			 // 计算下次退避时间(最大5秒)
			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
			continue
		}
		// reset backoff if we have a success
		duration = base

		kl.syncLoopMonitor.Store(kl.clock.Now())
		// 执行一次同步迭代
		if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}
}
2.2 同步迭代:syncLoopIteration(kubelet.go)

syncLoopIteration 对POD的不同操作做对应的处理,这里发现是创建POD,则会调用HandlePodAdditions 进行创建POD

代码路径:kubernetes/pkg/kubelet/kubelet.go

func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	logger := klog.FromContext(ctx)
	select {
	case u, open := <-configCh:
		// Update from a config source; dispatch it to the right handler
		// callback.
		if !open {
			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:    //接收到POD创建事件
			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			//主POD处理函数
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		// ...(略)
	}
	return true
}

2.3 主处理函数:HandlePodAdditions(kubelet.go)

HandlePodAdditions是 kubelet 处理新 Pod 添加的核心函数。当 kubelet 从 API server 接收到新 Pod 时,此函数负责

  • Pod 注册:将新 Pod 添加到 pod manager 中作为期望状态的单一事实来源
  • 准入控制:检查节点资源是否足够接纳新 Pod
  • 证书管理:跟踪 Pod 的证书信息
  • Mirror Pod 处理:处理静态 Pod 的 mirror pod
  • 垂直扩缩容:支持 Pod 的原地垂直扩缩容(InPlacePodVerticalScaling)
  • 工作调度:将 Pod 传递给 pod workers 进行实际创建和同步

代码路径:kubernetes/pkg/kubelet/kubelet.go

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	start := kl.clock.Now()
	// 对 Pod 进行排序(静态 Pod 优先,然后按创建时间)
	sort.Sort(sliceutils.PodsByCreationTime(pods))
	var pendingResizes []types.UID
	for _, pod := range pods {
		// Always add the pod to the pod manager. Kubelet relies on the pod
		// manager as the source of truth for the desired state. If a pod does
		// not exist in the pod manager, it means that it has been deleted in
		// the apiserver and no action (other than cleanup) is required.
		// 将 Pod 添加到 Pod Manager
		// Pod Manager 是 Kubelet 的真实状态源
		kl.podManager.AddPod(pod)

		kl.podCertificateManager.TrackPod(context.TODO(), pod)
		// 获取 Pod 和它的 Mirror Pod
		pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
		if wasMirror {
			if pod == nil {
				klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
				continue
			}
			kl.podWorkers.UpdatePod(UpdatePodOptions{
				Pod:        pod,
				MirrorPod:  mirrorPod,
				UpdateType: kubetypes.SyncPodUpdate,
				StartTime:  start,
			})
			continue
		}

		// Only go through the admission process if the pod is not requested
		// for termination by another part of the kubelet. If the pod is already
		// using resources (previously admitted), the pod worker is going to be
		// shutting it down. If the pod hasn't started yet, we know that when
		// the pod worker is invoked it will also avoid setting up the pod, so
		// we simply avoid doing any work.
		// We also do not try to admit the pod that is already in terminated state.
		// 检查 Pod 是否正在终止或已终止
		if !kl.podWorkers.IsPodTerminationRequested(pod.UID) && !podutil.IsPodPhaseTerminal(pod.Status.Phase) {
			// Check if we can admit the pod; if not, reject it.
			// We failed pods that we rejected, so activePods include all admitted
			// pods that are alive.
			// 检查资源是否足够
            // 返回:是否接受、拒绝原因、详细消息
			if ok, reason, message := kl.allocationManager.AddPod(kl.GetActivePods(), pod); !ok {
				kl.rejectPod(pod, reason, message)
				// We avoid recording the metric in canAdmitPod because it's called
				// repeatedly during a resize, which would inflate the metric.
				// Instead, we record the metric here in HandlePodAdditions for new pods
				// and capture resize events separately.
				recordAdmissionRejection(reason)
				continue
			}

			if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
				// Backfill the queue of pending resizes, but only after all the pods have
				// been added. This ensures that no resizes get resolved until all the
				// existing pods are added.
				_, updatedFromAllocation := kl.allocationManager.UpdatePodFromAllocation(pod)
				if updatedFromAllocation {
					pendingResizes = append(pendingResizes, pod.UID)
				}
			}
		}
		// 通过 podWorkers.UpdatePod 异步创建 Pod
		kl.podWorkers.UpdatePod(UpdatePodOptions{
			Pod:        pod,
			MirrorPod:  mirrorPod,
			UpdateType: kubetypes.SyncPodCreate,   // 创建类型
			StartTime:  start,
		})
	}
	if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
		// 回填 Pod 调整大小的条件
		kl.statusManager.BackfillPodResizeConditions(pods)
		// 推送待处理的调整大小请求
		for _, uid := range pendingResizes {
			kl.allocationManager.PushPendingResize(uid)
		}
		// 重试待处理的调整
		if len(pendingResizes) > 0 {
			kl.allocationManager.RetryPendingResizes(allocation.TriggerReasonPodsAdded)
		}
	}
}
a. Pod Manager
type podManager struct {
    // 存储所有 Pod 的期望状态
    pods map[types.UID]*v1.Pod
    
    // Mirror Pod 映射
    mirrorPods map[types.UID]*v1.Pod
    
    // 静态 Pod 映射
    staticPods map[types.UID]*v1.Pod
}
b. Allocation Manager
type allocationManager struct {
    // 管理节点资源分配
    // 包括 CPU、内存、设备等
}

func (m *allocationManager) AddPod(activePods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
    // 检查资源是否足够
    // 返回:是否接受、拒绝原因、详细消息
}
c. InPlace Pod 垂直扩缩容

这是一个新特性,允许在不重启 Pod 的情况下调整资源:

if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
		kl.statusManager.BackfillPodResizeConditions(pods)
		for _, uid := range pendingResizes {
			kl.allocationManager.PushPendingResize(uid)
		}
		if len(pendingResizes) > 0 {
			kl.allocationManager.RetryPendingResizes(allocation.TriggerReasonPodsAdded)
		}
}
2.4 更新创建POD:UpdatePod(pod_workers.go)

UpdatePod是 kubelet 中处理 Pod 更新的核心入口函数,负责:

  • Pod 状态管理:维护 Pod 的同步状态,跟踪 Pod 的生命周期(首次同步、终止中、已终止)
  • 更新调度:接收 Pod 更新请求并调度到相应的 worker goroutine
  • 生命周期转换:处理 Pod 从创建到终止的状态转换
  • 资源管理:集成资源分配管理器,支持原地垂直扩缩容
  • 并发控制:协调多个 Pod 更新请求,确保状态一致性

代码路径:kubernetes/pkg/kubelet/pod_workers.go

func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
	// ...(略)

	// start the pod worker goroutine if it doesn't exist
	podUpdates, exists := p.podUpdates[uid]
	if !exists {
		// buffer the channel to avoid blocking this method
		podUpdates = make(chan struct{}, 1)
		p.podUpdates[uid] = podUpdates

		// ensure that static pods start in the order they are received by UpdatePod
		if kubetypes.IsStaticPod(pod) {
			p.waitingToStartStaticPodsByFullname[status.fullname] =
				append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
		}

		// allow testing of delays in the pod update channel
		var outCh <-chan struct{}
		if p.workerChannelFn != nil {
			outCh = p.workerChannelFn(uid, podUpdates)
		} else {
			outCh = podUpdates
		}

		// spawn a pod worker
		// 启动 worker goroutine
		go func() {
			// TODO: this should be a wait.Until with backoff to handle panics, and
			// accept a context for shutdown
			defer runtime.HandleCrash()
			defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)
			p.podWorkerLoop(uid, outCh)
		}()
	}

	// ...(略)
}
2.5 真实创建:podWorkerLoop(pod_workers.go)

podWorkerLoop是 kubelet 中每个 Pod 的独立工作循环,负责处理单个 Pod 的所有同步操作。每个 Pod 都有自己专属的 goroutine 运行此函数,主要功能包括:

  • 事件驱动处理:通过 podUpdates channel 接收并处理 Pod 的更新事件
  • 状态同步:根据 Pod 的当前工作类型执行相应的同步操作
  • 生命周期管理:处理 Pod 从创建、运行到终止的完整生命周期
  • 错误恢复:在操作失败时进行重试,确保 Pod 状态最终一致
  • 资源清理:在 Pod 终止时清理相关资源并关闭 worker

最终调用调用 SyncPod执行同步。

代码路径:kubernetes/pkg/kubelet/pod_workerss.go

func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
	var lastSyncTime time.Time
	for range podUpdates {
		//启动同步操作
		ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)
		// If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate.
			//...(略)

			// Take the appropriate action (illegal phases are prevented by UpdatePod)
			switch {
			case update.WorkType == TerminatedPod:
				err = p.podSyncer.SyncTerminatedPod(ctx, update.Options.Pod, status)

			case update.WorkType == TerminatingPod:
				var gracePeriod *int64
				if opt := update.Options.KillPodOptions; opt != nil {
					gracePeriod = opt.PodTerminationGracePeriodSecondsOverride
				}
				podStatusFn := p.acknowledgeTerminating(podUID)

				// if we only have a running pod, terminate it directly
				if update.Options.RunningPod != nil {
					err = p.podSyncer.SyncTerminatingRuntimePod(ctx, update.Options.RunningPod)
				} else {
					err = p.podSyncer.SyncTerminatingPod(ctx, update.Options.Pod, status, gracePeriod, podStatusFn)
				}

			default:
				//正常同步 Pod 状态
				isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
			}

			lastSyncTime = p.clock.Now()
			return err
		}()

		//...(略)
	}
}
2.6 kubelet创建:SyncPod(kubelet.go)

[SyncPod] 是 kubelet 中同步 Pod 状态的核心函数,负责将 Pod 的期望状态与实际状态对齐。主要功能包括:

  • Pod 状态同步:根据 Pod 的期望状态和当前运行时状态,执行必要的操作使两者一致
  • 生命周期管理:处理 Pod 从创建、运行到终止的完整生命周期
  • 资源管理:创建和管理 Pod 的 Cgroups,确保资源隔离和 QoS
  • 卷管理:等待并挂载 Pod 所需的存储卷
  • 网络配置:检查网络插件状态,处理主机网络 Pod
  • 可观测性:记录指标、日志和追踪信息,支持性能监控和问题诊断

最后调用 kl.containerRuntime.SyncPod 调用容器运行时同步回调

代码路径:kubernetes/pkg/kubelet/kubelet.go

func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
	// ...(略)

	// Make data directories for the pod
	// 创建 Pod 目录
	if err := kl.makePodDataDirs(pod); err != nil {
		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
		klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
		return false, err
	}

	// Wait for volumes to attach/mount
	// 等待挂载卷
	if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
		var volumeAttachLimitErr *volumemanager.VolumeAttachLimitExceededError
		if errors.As(err, &volumeAttachLimitErr) {
			kl.rejectPod(pod, volumemanager.VolumeAttachmentLimitExceededReason, volumeAttachLimitErr.Error())
			recordAdmissionRejection(volumemanager.VolumeAttachmentLimitExceededReason)
			return true, nil
		}
		if !wait.Interrupted(err) {
			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
			klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
		}
		return false, err
	}

	// Fetch the pull secrets for the pod
	//为POD下载secret
	pullSecrets := kl.getPullSecretsForPod(pod)

	//...(略)
	//调用容器运行时创建和启动容器
	result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.crashLoopBackOff)
	kl.reasonCache.Update(pod.UID, result)

	if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
		for _, r := range result.SyncResults {
			if r.Action == kubecontainer.ResizePodInPlace && r.Error != nil {
				// If the condition already exists, the observedGeneration does not get updated.
				kl.statusManager.SetPodResizeInProgressCondition(pod.UID, v1.PodReasonError, r.Message, pod.Generation)
			}
		}
	}

	return false, result.Error()
}

运行时

在这里插入图片描述

2.7 运行时:SyncPod(kuberuntime_manager.go)

SyncPod 是 kubelet 容器运行时管理器的核心同步函数,负责将 Pod 的期望状态与容器运行时的实际状态对齐。主要功能包括:

  • Pod 同步:根据 Pod 规范和当前运行时状态,执行必要的操作使两者一致
  • Sandbox 管理:创建、更新或删除 Pod sandbox(容器运行时隔离环境)
  • 容器生命周期管理:启动、停止、重启容器(包括 init 容器、临时容器和主容器)
  • 资源管理:处理容器资源分配和原地垂直扩缩容
  • 网络配置:管理 Pod IP 地址和网络配置
  • 镜像管理:拉取容器镜像和镜像卷
  • 错误处理:记录同步结果和错误,支持重试机制

代码路径:kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go

// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create ephemeral containers.
//  6. Create init containers.
//  7. Resize running containers (if InPlacePodVerticalScaling==true)
//  8. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	logger := klog.FromContext(ctx)
	//...(略)

	// Step 4: Create a sandbox for the pod if necessary.
	podSandboxID := podContainerChanges.SandboxID
	if podContainerChanges.CreateSandbox {
		var msg string
		var err error

		logger.V(4).Info("Creating PodSandbox for pod", "pod", klog.KObj(pod))
		//...(略)
		//开始创建沙箱
		podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
		//...(略)
		logger.V(4).Info("Created PodSandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod))
		//获取沙箱状态
		resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false)
		//...(略)

		// If we ever allow updating a pod from non-host-network to
		// host-network, we may use a stale IP.
		//给POD分配IP
		if !kubecontainer.IsHostNetworkPod(pod) {
			// Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.
			podIPs = m.determinePodSandboxIPs(ctx, pod.Namespace, pod.Name, resp.GetStatus())
			logger.V(4).Info("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod))
		}
	}

	// the start containers routines depend on pod ip(as in primary pod ip)
	// instead of trying to figure out if we have 0 < len(podIPs)
	// everytime, we short circuit it here
	podIP := ""
	if len(podIPs) != 0 {
		podIP = podIPs[0]
	}

	// Get podSandboxConfig for containers to start.
	configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
	result.AddSyncResult(configPodSandboxResult)
	podSandboxConfig, err := m.generatePodSandboxConfig(ctx, pod, podContainerChanges.Attempt)
	if err != nil {
		message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
		logger.Error(err, "GeneratePodSandboxConfig for pod failed", "pod", klog.KObj(pod))
		configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
		return
	}

	imageVolumePullResults, err := m.getImageVolumes(ctx, pod, podSandboxConfig, pullSecrets)
	if err != nil {
		logger.Error(err, "Get image volumes for pod failed", "pod", klog.KObj(pod))
		configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, err.Error())
		return
	}

	// Helper containing boilerplate common to starting all types of containers.
	// typeName is a description used to describe this type of container in log messages,
	// currently: "container", "init container" or "ephemeral container"
	// metricLabel is the label used to describe this type of container in monitoring metrics.
	// currently: "container", "init_container" or "ephemeral_container"
	//沙箱建好了,接下来在POD内创建容器
	start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {
		startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
		result.AddSyncResult(startContainerResult)

		isInBackOff, msg, err := m.doBackOff(ctx, pod, spec.container, podStatus, backOff)
		if isInBackOff {
			startContainerResult.Fail(err, msg)
			logger.V(4).Info("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container.Name, "pod", klog.KObj(pod))
			return err
		}

		metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()
		if sc.HasWindowsHostProcessRequest(pod, spec.container) {
			metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc()
		}
		logger.V(4).Info("Creating container in pod", "containerType", typeName, "container", spec.container.Name, "pod", klog.KObj(pod))

		// We fail late here to populate the "ErrImagePull" and "ImagePullBackOff" correctly to the end user.
		imageVolumes, err := m.toKubeContainerImageVolumes(ctx, imageVolumePullResults, spec.container, pod, startContainerResult)
		if err != nil {
			return err
		}

		// NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
		msg, err = m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs, imageVolumes)
		incrementImageVolumeMetrics(err, msg, spec.container, imageVolumes)
		if err != nil {
			// startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
			// useful to cluster administrators to distinguish "server errors" from "user errors".
			metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
			if sc.HasWindowsHostProcessRequest(pod, spec.container) {
				metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
			}
			startContainerResult.Fail(err, msg)
			// known errors that are logged in other places are logged at higher levels here to avoid
			// repetitive log spam
			switch {
			case err == images.ErrImagePullBackOff:
				logger.V(3).Info("Container start failed in pod", "containerType", typeName, "container", spec.container.Name, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
			default:
				utilruntime.HandleError(fmt.Errorf("%v %v start failed in pod %v: %w: %s", typeName, spec.container.Name, format.Pod(pod), err, msg))
			}
			return err
		}

		return nil
	}

	// Step 5: start ephemeral containers
	// These are started "prior" to init containers to allow running ephemeral containers even when there
	// are errors starting an init container. In practice init containers will start first since ephemeral
	// containers cannot be specified on pod creation.
	for _, idx := range podContainerChanges.EphemeralContainersToStart {
		start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
	}

	// Step 6: start init containers.
	for _, idx := range podContainerChanges.InitContainersToStart {
		container := &pod.Spec.InitContainers[idx]
		// Start the next init container.
		if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
			if podutil.IsRestartableInitContainer(container) {
				logger.V(4).Info("Failed to start the restartable init container for the pod, skipping", "initContainerName", container.Name, "pod", klog.KObj(pod))
				continue
			}
			logger.V(4).Info("Failed to initialize the pod, as the init container failed to start, aborting", "initContainerName", container.Name, "pod", klog.KObj(pod))
			return
		}

		// Successfully started the container; clear the entry in the failure
		logger.V(4).Info("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
	}

	// Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources
	if resizable, _, _ := allocation.IsInPlacePodVerticalScalingAllowed(pod); resizable {
		if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
			result.SyncResults = append(result.SyncResults, m.doPodResizeAction(ctx, pod, podStatus, podContainerChanges))
		}
	}

	// Step 8: start containers in podContainerChanges.ContainersToStart.
	for _, idx := range podContainerChanges.ContainersToStart {
		start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
	}

	return
}
2.8 创建沙箱:createPodSandbox(kuberuntime_sandbox.go)

createPodSandbox是 kubelet 容器运行时管理器中创建 Pod sandbox 的核心函数。Sandbox 是容器运行时的隔离环境,为 Pod 提供共享的网络、PID 等命名空间。主要功能包括:

  • 生成 Sandbox 配置:根据 Pod 规范生成 sandbox 配置
  • 创建日志目录:为 Pod 创建日志存储目录
  • 查找运行时处理器:根据 RuntimeClass 查找对应的容器运行时
  • 创建 Sandbox:调用容器运行时接口创建实际的 sandbox
  • 错误处理:在每个步骤进行错误检查和日志记录

代码路径:kubernetes/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go

func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, pod *v1.Pod, attempt uint32) (string, string, error) {
	// ...(略)

	podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)
	if err != nil {
		message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)
		logger.Error(err, "Failed to create sandbox for pod", "pod", klog.KObj(pod))
		return "", message, err
	}

	return podSandBoxID, "", nil
}
2.9 调用CRI:RunPodSandbox(remote_runtime.go)

[RunPodSandbox]是 CRI(容器运行时接口)客户端的核心方法,负责通过 gRPC 调用远程容器运行时创建 Pod sandbox。主要功能包括:

  • 超时管理:为 sandbox 创建操作设置超时时间(默认 4 分钟)
  • 远程调用:通过 gRPC 客户端调用容器运行时的 RunPodSandbox 接口
  • 响应验证:验证返回的 sandbox ID 是否有效
  • 错误处理:记录错误日志并返回适当的错误信息
  • 日志记录:记录请求和响应的详细信息,便于调试

代码路径:kubernetes/staging/src/k8s.io/cri-client/pkg/remote_runtime.go

func (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
	// ...(略)
	resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
		Config:         config,
		RuntimeHandler: runtimeHandler,
	})

	// ...(略)

	return podSandboxID, nil
}

Containerd

进入containerd 的内容

📝 总结与展望

Kubelet 的 syncLoop定期(默认 10 秒) 执行一次全量同步,。

  • 如果容器崩了:在下一次同步时,Kubelet 发现“API Server 说应该有这个 Pod,但 containerd 说没这个容器”,Kubelet 会立即重新调用 CRI 创建它。
  • 如果网络断了:等网络恢复,Kubelet 会通过 Watch 补偿机制补齐断网期间错过的所有 Pod 更新。

创建POD过程中,会通过CRI调用containerd 先创建pause沙箱,再在沙箱内创建业务容器,实现整个POD的生命周期管理。

这就是为什么 Kubernetes 是“状态驱动”而不是“指令驱动”: Kubelet 永远在对比“API 记录的准则”和“我这台机器上的实际容器”,一旦不对劲就立刻修正。

📚 参考资料

https://kubernetes.io/zh-cn/docs/reference/command-line-tools-reference/kubelet/

https://blog.huweihuang.com/kubernetes-notes/principle/component/kubernetes-core-principle-kubelet/

Logo

脑启社区是一个专注类脑智能领域的开发者社区。欢迎加入社区,共建类脑智能生态。社区为开发者提供了丰富的开源类脑工具软件、类脑算法模型及数据集、类脑知识库、类脑技术培训课程以及类脑应用案例等资源。

更多推荐