【技术】从POD创建看Kubernetes源码实现 (五)- kubelet
✍️ 作者:茶水间Tech
🏷️ 标签:#云计算#云原生#kubernetes#容器
📖 前言
kubernetes的模块比较多,架构复杂,代码量更是庞大,看代码比较麻烦,我们从现实场景出发,从创建POD分析在Kubernetes内部的代码流程,本系列文章从POD创建,整体梳理Kubernetes源码实现,其中本节主要分析kubelet侧的流程实现。
本文基于 Client Version: v1.34.3 , Server Version: v1.34.2
📌 POD创建的整体架构图:

💻 正文
📑 一、关于kubelet
在kubernetes集群中,每个Node节点都会启动kubelet进程,用来处理Master节点下发到本节点的任务,管理Pod和其中的容器。
kubelet 是基于 PodSpec 来工作的。每个 PodSpec 是一个描述 Pod 的 YAML 或 JSON 对象。 kubelet 接受通过各种机制(主要是通过 apiserver)提供的一组 PodSpec,并确保这些 PodSpec 中描述的容器处于运行状态且运行状况良好。
在每个 Node 上运行的 Kubelet,都会维持一个与 API Server 的长连接。
- 带过滤条件的 Watch:
Kubelet 只关心属于它自己的 Pod。它会向 API Server 发起一个类似这样的请求:GET /v1/pods?watch=true&fieldSelector=spec.nodeName={My_Node_Name}- 这里的
fieldSelector极其重要,它确保了 Node-A 不会收到发给 Node-B 的 Pod 信息。
- 这里的
- 事件触达:
当 Scheduler 完成 Bind 动作,API Server 中的 Pod 对象更新了nodeName字段。API Server 会立刻通过这个长连接,向该节点所在的 Kubelet 推送一个 “MODIFIED” 事件。
📑 二、代码分析
程序入口:Run (scheduler.go)
Run中启动了syncLoop 循环同步
代码路径:kubernetes/pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
//...(略)
kl.syncLoop(ctx, updates, kl)
}
详细流程如下:
kubelet
2.1 主循环:syncLoop(kubelet.go)
这个 syncLoop 是 Kubelet 的心脏,确保节点上的 Pod 状态与期望状态保持一致。
代码路径:kubernetes/pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.InfoS("Starting kubelet main sync loop")
// The syncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
// 同步定时器:每秒触发一次,检查需要同步的 Pod
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
// 清理定时器:执行周期性清理任务(默认2分钟)
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond // 初始延迟:100ms
max = 5 * time.Second // 最大延迟:5秒
factor = 2 // 指数因子:2倍增长
)
duration := base
// Responsible for checking limits in resolv.conf
// The limits do not have anything to do with individual pods
// Since this is called in syncLoop, we don't need to call it anywhere else
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf(klog.FromContext(ctx))
}
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.ErrorS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
// 计算下次退避时间(最大5秒)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
// 执行一次同步迭代
if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
2.2 同步迭代:syncLoopIteration(kubelet.go)
syncLoopIteration 对POD的不同操作做对应的处理,这里发现是创建POD,则会调用HandlePodAdditions 进行创建POD
代码路径:kubernetes/pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
logger := klog.FromContext(ctx)
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
switch u.Op {
case kubetypes.ADD: //接收到POD创建事件
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
//主POD处理函数
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodRemoves(u.Pods)
// ...(略)
}
return true
}
2.3 主处理函数:HandlePodAdditions(kubelet.go)
HandlePodAdditions是 kubelet 处理新 Pod 添加的核心函数。当 kubelet 从 API server 接收到新 Pod 时,此函数负责
- Pod 注册:将新 Pod 添加到 pod manager 中作为期望状态的单一事实来源
- 准入控制:检查节点资源是否足够接纳新 Pod
- 证书管理:跟踪 Pod 的证书信息
- Mirror Pod 处理:处理静态 Pod 的 mirror pod
- 垂直扩缩容:支持 Pod 的原地垂直扩缩容(InPlacePodVerticalScaling)
- 工作调度:将 Pod 传递给 pod workers 进行实际创建和同步
代码路径:kubernetes/pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
// 对 Pod 进行排序(静态 Pod 优先,然后按创建时间)
sort.Sort(sliceutils.PodsByCreationTime(pods))
var pendingResizes []types.UID
for _, pod := range pods {
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
// 将 Pod 添加到 Pod Manager
// Pod Manager 是 Kubelet 的真实状态源
kl.podManager.AddPod(pod)
kl.podCertificateManager.TrackPod(context.TODO(), pod)
// 获取 Pod 和它的 Mirror Pod
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
StartTime: start,
})
continue
}
// Only go through the admission process if the pod is not requested
// for termination by another part of the kubelet. If the pod is already
// using resources (previously admitted), the pod worker is going to be
// shutting it down. If the pod hasn't started yet, we know that when
// the pod worker is invoked it will also avoid setting up the pod, so
// we simply avoid doing any work.
// We also do not try to admit the pod that is already in terminated state.
// 检查 Pod 是否正在终止或已终止
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) && !podutil.IsPodPhaseTerminal(pod.Status.Phase) {
// Check if we can admit the pod; if not, reject it.
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
// 检查资源是否足够
// 返回:是否接受、拒绝原因、详细消息
if ok, reason, message := kl.allocationManager.AddPod(kl.GetActivePods(), pod); !ok {
kl.rejectPod(pod, reason, message)
// We avoid recording the metric in canAdmitPod because it's called
// repeatedly during a resize, which would inflate the metric.
// Instead, we record the metric here in HandlePodAdditions for new pods
// and capture resize events separately.
recordAdmissionRejection(reason)
continue
}
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// Backfill the queue of pending resizes, but only after all the pods have
// been added. This ensures that no resizes get resolved until all the
// existing pods are added.
_, updatedFromAllocation := kl.allocationManager.UpdatePodFromAllocation(pod)
if updatedFromAllocation {
pendingResizes = append(pendingResizes, pod.UID)
}
}
}
// 通过 podWorkers.UpdatePod 异步创建 Pod
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodCreate, // 创建类型
StartTime: start,
})
}
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// 回填 Pod 调整大小的条件
kl.statusManager.BackfillPodResizeConditions(pods)
// 推送待处理的调整大小请求
for _, uid := range pendingResizes {
kl.allocationManager.PushPendingResize(uid)
}
// 重试待处理的调整
if len(pendingResizes) > 0 {
kl.allocationManager.RetryPendingResizes(allocation.TriggerReasonPodsAdded)
}
}
}
a. Pod Manager
type podManager struct {
// 存储所有 Pod 的期望状态
pods map[types.UID]*v1.Pod
// Mirror Pod 映射
mirrorPods map[types.UID]*v1.Pod
// 静态 Pod 映射
staticPods map[types.UID]*v1.Pod
}
b. Allocation Manager
type allocationManager struct {
// 管理节点资源分配
// 包括 CPU、内存、设备等
}
func (m *allocationManager) AddPod(activePods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
// 检查资源是否足够
// 返回:是否接受、拒绝原因、详细消息
}
c. InPlace Pod 垂直扩缩容
这是一个新特性,允许在不重启 Pod 的情况下调整资源:
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
kl.statusManager.BackfillPodResizeConditions(pods)
for _, uid := range pendingResizes {
kl.allocationManager.PushPendingResize(uid)
}
if len(pendingResizes) > 0 {
kl.allocationManager.RetryPendingResizes(allocation.TriggerReasonPodsAdded)
}
}
2.4 更新创建POD:UpdatePod(pod_workers.go)
UpdatePod是 kubelet 中处理 Pod 更新的核心入口函数,负责:
- Pod 状态管理:维护 Pod 的同步状态,跟踪 Pod 的生命周期(首次同步、终止中、已终止)
- 更新调度:接收 Pod 更新请求并调度到相应的 worker goroutine
- 生命周期转换:处理 Pod 从创建到终止的状态转换
- 资源管理:集成资源分配管理器,支持原地垂直扩缩容
- 并发控制:协调多个 Pod 更新请求,确保状态一致性
代码路径:kubernetes/pkg/kubelet/pod_workers.go
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
// ...(略)
// start the pod worker goroutine if it doesn't exist
podUpdates, exists := p.podUpdates[uid]
if !exists {
// buffer the channel to avoid blocking this method
podUpdates = make(chan struct{}, 1)
p.podUpdates[uid] = podUpdates
// ensure that static pods start in the order they are received by UpdatePod
if kubetypes.IsStaticPod(pod) {
p.waitingToStartStaticPodsByFullname[status.fullname] =
append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
}
// allow testing of delays in the pod update channel
var outCh <-chan struct{}
if p.workerChannelFn != nil {
outCh = p.workerChannelFn(uid, podUpdates)
} else {
outCh = podUpdates
}
// spawn a pod worker
// 启动 worker goroutine
go func() {
// TODO: this should be a wait.Until with backoff to handle panics, and
// accept a context for shutdown
defer runtime.HandleCrash()
defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)
p.podWorkerLoop(uid, outCh)
}()
}
// ...(略)
}
2.5 真实创建:podWorkerLoop(pod_workers.go)
podWorkerLoop是 kubelet 中每个 Pod 的独立工作循环,负责处理单个 Pod 的所有同步操作。每个 Pod 都有自己专属的 goroutine 运行此函数,主要功能包括:
- 事件驱动处理:通过
podUpdateschannel 接收并处理 Pod 的更新事件 - 状态同步:根据 Pod 的当前工作类型执行相应的同步操作
- 生命周期管理:处理 Pod 从创建、运行到终止的完整生命周期
- 错误恢复:在操作失败时进行重试,确保 Pod 状态最终一致
- 资源清理:在 Pod 终止时清理相关资源并关闭 worker
最终调用调用 SyncPod执行同步。
代码路径:kubernetes/pkg/kubelet/pod_workerss.go
func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
var lastSyncTime time.Time
for range podUpdates {
//启动同步操作
ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)
// If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate.
//...(略)
// Take the appropriate action (illegal phases are prevented by UpdatePod)
switch {
case update.WorkType == TerminatedPod:
err = p.podSyncer.SyncTerminatedPod(ctx, update.Options.Pod, status)
case update.WorkType == TerminatingPod:
var gracePeriod *int64
if opt := update.Options.KillPodOptions; opt != nil {
gracePeriod = opt.PodTerminationGracePeriodSecondsOverride
}
podStatusFn := p.acknowledgeTerminating(podUID)
// if we only have a running pod, terminate it directly
if update.Options.RunningPod != nil {
err = p.podSyncer.SyncTerminatingRuntimePod(ctx, update.Options.RunningPod)
} else {
err = p.podSyncer.SyncTerminatingPod(ctx, update.Options.Pod, status, gracePeriod, podStatusFn)
}
default:
//正常同步 Pod 状态
isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
}
lastSyncTime = p.clock.Now()
return err
}()
//...(略)
}
}
2.6 kubelet创建:SyncPod(kubelet.go)
[SyncPod] 是 kubelet 中同步 Pod 状态的核心函数,负责将 Pod 的期望状态与实际状态对齐。主要功能包括:
- Pod 状态同步:根据 Pod 的期望状态和当前运行时状态,执行必要的操作使两者一致
- 生命周期管理:处理 Pod 从创建、运行到终止的完整生命周期
- 资源管理:创建和管理 Pod 的 Cgroups,确保资源隔离和 QoS
- 卷管理:等待并挂载 Pod 所需的存储卷
- 网络配置:检查网络插件状态,处理主机网络 Pod
- 可观测性:记录指标、日志和追踪信息,支持性能监控和问题诊断
最后调用 kl.containerRuntime.SyncPod 调用容器运行时同步回调
代码路径:kubernetes/pkg/kubelet/kubelet.go
func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
// ...(略)
// Make data directories for the pod
// 创建 Pod 目录
if err := kl.makePodDataDirs(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
return false, err
}
// Wait for volumes to attach/mount
// 等待挂载卷
if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
var volumeAttachLimitErr *volumemanager.VolumeAttachLimitExceededError
if errors.As(err, &volumeAttachLimitErr) {
kl.rejectPod(pod, volumemanager.VolumeAttachmentLimitExceededReason, volumeAttachLimitErr.Error())
recordAdmissionRejection(volumemanager.VolumeAttachmentLimitExceededReason)
return true, nil
}
if !wait.Interrupted(err) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
}
return false, err
}
// Fetch the pull secrets for the pod
//为POD下载secret
pullSecrets := kl.getPullSecretsForPod(pod)
//...(略)
//调用容器运行时创建和启动容器
result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.crashLoopBackOff)
kl.reasonCache.Update(pod.UID, result)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
for _, r := range result.SyncResults {
if r.Action == kubecontainer.ResizePodInPlace && r.Error != nil {
// If the condition already exists, the observedGeneration does not get updated.
kl.statusManager.SetPodResizeInProgressCondition(pod.UID, v1.PodReasonError, r.Message, pod.Generation)
}
}
}
return false, result.Error()
}
运行时

2.7 运行时:SyncPod(kuberuntime_manager.go)
SyncPod 是 kubelet 容器运行时管理器的核心同步函数,负责将 Pod 的期望状态与容器运行时的实际状态对齐。主要功能包括:
- Pod 同步:根据 Pod 规范和当前运行时状态,执行必要的操作使两者一致
- Sandbox 管理:创建、更新或删除 Pod sandbox(容器运行时隔离环境)
- 容器生命周期管理:启动、停止、重启容器(包括 init 容器、临时容器和主容器)
- 资源管理:处理容器资源分配和原地垂直扩缩容
- 网络配置:管理 Pod IP 地址和网络配置
- 镜像管理:拉取容器镜像和镜像卷
- 错误处理:记录同步结果和错误,支持重试机制
代码路径:kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create ephemeral containers.
// 6. Create init containers.
// 7. Resize running containers (if InPlacePodVerticalScaling==true)
// 8. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
logger := klog.FromContext(ctx)
//...(略)
// Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
var msg string
var err error
logger.V(4).Info("Creating PodSandbox for pod", "pod", klog.KObj(pod))
//...(略)
//开始创建沙箱
podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
//...(略)
logger.V(4).Info("Created PodSandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod))
//获取沙箱状态
resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false)
//...(略)
// If we ever allow updating a pod from non-host-network to
// host-network, we may use a stale IP.
//给POD分配IP
if !kubecontainer.IsHostNetworkPod(pod) {
// Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.
podIPs = m.determinePodSandboxIPs(ctx, pod.Namespace, pod.Name, resp.GetStatus())
logger.V(4).Info("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod))
}
}
// the start containers routines depend on pod ip(as in primary pod ip)
// instead of trying to figure out if we have 0 < len(podIPs)
// everytime, we short circuit it here
podIP := ""
if len(podIPs) != 0 {
podIP = podIPs[0]
}
// Get podSandboxConfig for containers to start.
configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
result.AddSyncResult(configPodSandboxResult)
podSandboxConfig, err := m.generatePodSandboxConfig(ctx, pod, podContainerChanges.Attempt)
if err != nil {
message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
logger.Error(err, "GeneratePodSandboxConfig for pod failed", "pod", klog.KObj(pod))
configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
return
}
imageVolumePullResults, err := m.getImageVolumes(ctx, pod, podSandboxConfig, pullSecrets)
if err != nil {
logger.Error(err, "Get image volumes for pod failed", "pod", klog.KObj(pod))
configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, err.Error())
return
}
// Helper containing boilerplate common to starting all types of containers.
// typeName is a description used to describe this type of container in log messages,
// currently: "container", "init container" or "ephemeral container"
// metricLabel is the label used to describe this type of container in monitoring metrics.
// currently: "container", "init_container" or "ephemeral_container"
//沙箱建好了,接下来在POD内创建容器
start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(ctx, pod, spec.container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
logger.V(4).Info("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container.Name, "pod", klog.KObj(pod))
return err
}
metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()
if sc.HasWindowsHostProcessRequest(pod, spec.container) {
metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc()
}
logger.V(4).Info("Creating container in pod", "containerType", typeName, "container", spec.container.Name, "pod", klog.KObj(pod))
// We fail late here to populate the "ErrImagePull" and "ImagePullBackOff" correctly to the end user.
imageVolumes, err := m.toKubeContainerImageVolumes(ctx, imageVolumePullResults, spec.container, pod, startContainerResult)
if err != nil {
return err
}
// NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
msg, err = m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs, imageVolumes)
incrementImageVolumeMetrics(err, msg, spec.container, imageVolumes)
if err != nil {
// startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
// useful to cluster administrators to distinguish "server errors" from "user errors".
metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
if sc.HasWindowsHostProcessRequest(pod, spec.container) {
metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
}
startContainerResult.Fail(err, msg)
// known errors that are logged in other places are logged at higher levels here to avoid
// repetitive log spam
switch {
case err == images.ErrImagePullBackOff:
logger.V(3).Info("Container start failed in pod", "containerType", typeName, "container", spec.container.Name, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
default:
utilruntime.HandleError(fmt.Errorf("%v %v start failed in pod %v: %w: %s", typeName, spec.container.Name, format.Pod(pod), err, msg))
}
return err
}
return nil
}
// Step 5: start ephemeral containers
// These are started "prior" to init containers to allow running ephemeral containers even when there
// are errors starting an init container. In practice init containers will start first since ephemeral
// containers cannot be specified on pod creation.
for _, idx := range podContainerChanges.EphemeralContainersToStart {
start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
}
// Step 6: start init containers.
for _, idx := range podContainerChanges.InitContainersToStart {
container := &pod.Spec.InitContainers[idx]
// Start the next init container.
if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
if podutil.IsRestartableInitContainer(container) {
logger.V(4).Info("Failed to start the restartable init container for the pod, skipping", "initContainerName", container.Name, "pod", klog.KObj(pod))
continue
}
logger.V(4).Info("Failed to initialize the pod, as the init container failed to start, aborting", "initContainerName", container.Name, "pod", klog.KObj(pod))
return
}
// Successfully started the container; clear the entry in the failure
logger.V(4).Info("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
}
// Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources
if resizable, _, _ := allocation.IsInPlacePodVerticalScalingAllowed(pod); resizable {
if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
result.SyncResults = append(result.SyncResults, m.doPodResizeAction(ctx, pod, podStatus, podContainerChanges))
}
}
// Step 8: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
}
return
}
2.8 创建沙箱:createPodSandbox(kuberuntime_sandbox.go)
createPodSandbox是 kubelet 容器运行时管理器中创建 Pod sandbox 的核心函数。Sandbox 是容器运行时的隔离环境,为 Pod 提供共享的网络、PID 等命名空间。主要功能包括:
- 生成 Sandbox 配置:根据 Pod 规范生成 sandbox 配置
- 创建日志目录:为 Pod 创建日志存储目录
- 查找运行时处理器:根据 RuntimeClass 查找对应的容器运行时
- 创建 Sandbox:调用容器运行时接口创建实际的 sandbox
- 错误处理:在每个步骤进行错误检查和日志记录
代码路径:kubernetes/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, pod *v1.Pod, attempt uint32) (string, string, error) {
// ...(略)
podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)
if err != nil {
message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)
logger.Error(err, "Failed to create sandbox for pod", "pod", klog.KObj(pod))
return "", message, err
}
return podSandBoxID, "", nil
}
2.9 调用CRI:RunPodSandbox(remote_runtime.go)
[RunPodSandbox]是 CRI(容器运行时接口)客户端的核心方法,负责通过 gRPC 调用远程容器运行时创建 Pod sandbox。主要功能包括:
- 超时管理:为 sandbox 创建操作设置超时时间(默认 4 分钟)
- 远程调用:通过 gRPC 客户端调用容器运行时的
RunPodSandbox接口 - 响应验证:验证返回的 sandbox ID 是否有效
- 错误处理:记录错误日志并返回适当的错误信息
- 日志记录:记录请求和响应的详细信息,便于调试
代码路径:kubernetes/staging/src/k8s.io/cri-client/pkg/remote_runtime.go
func (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
// ...(略)
resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
Config: config,
RuntimeHandler: runtimeHandler,
})
// ...(略)
return podSandboxID, nil
}
Containerd
进入containerd 的内容
📝 总结与展望
Kubelet 的 syncLoop 会定期(默认 10 秒) 执行一次全量同步,。
- 如果容器崩了:在下一次同步时,Kubelet 发现“API Server 说应该有这个 Pod,但 containerd 说没这个容器”,Kubelet 会立即重新调用 CRI 创建它。
- 如果网络断了:等网络恢复,Kubelet 会通过 Watch 补偿机制补齐断网期间错过的所有 Pod 更新。
创建POD过程中,会通过CRI调用containerd 先创建pause沙箱,再在沙箱内创建业务容器,实现整个POD的生命周期管理。
这就是为什么 Kubernetes 是“状态驱动”而不是“指令驱动”: Kubelet 永远在对比“API 记录的准则”和“我这台机器上的实际容器”,一旦不对劲就立刻修正。
📚 参考资料
https://kubernetes.io/zh-cn/docs/reference/command-line-tools-reference/kubelet/
https://blog.huweihuang.com/kubernetes-notes/principle/component/kubernetes-core-principle-kubelet/
更多推荐
所有评论(0)