2. Kubernetes API 聚合开发

自定义资源实际上是为了扩展 kubernetes 的 API,向 kubenetes API 中增加新类型,可以使用以下三种方式:

  • 修改 kubenetes 的源码,显然难度比较高,也不太合适
  • 创建自定义 API server 并聚合到 API 中
  • 创建自定义资源(CRD)

2.1 CRD存在的问题

  1. 只支持 etcd
  2. 只支持JSON,不支持 protobuf (一种高性能的序列化语言)
  3. 只支持2种子资源接口 ( /status 和 /scale)
  4. 不支持优雅删除
  5. 显著增加 api server 负担
  6. 只支持 CRUD 原语
  7. 不支持跨 API groups 共享存储

2.2 自定义API server相比CRD的优势

  1. 底层存储无关(像metrics server 存在内存里面)
  2. 支持 protobuf
  3. 支持任意自定义子资源
  4. 可以实现优雅删除
  5. 支持复杂验证
  6. 支持自定义语义

2.3 使用自定义API server前先考虑以下几点

  1. 你的 API 是否属于 声明式的
  2. 是否想使用 kubectl 命令来管理
  3. 是否要作为 kubenretes 中的对象类型来管理,同时显示在 kubernetes dashboard 上
  4. 是否可以遵守 kubernetes 的 API 规则限制,例如 URL 和 API group、namespace 限制
  5. 是否可以接受该 API 只能作用于集群或者 namespace 范围
  6. 想要复用 kubernetes API 的公共功能,比如 CRUD、watch、内置的认证和授权等

2.4 Kubernetes APIServer 和 自定义APIServer的关系

2.4.1 Kubernetes 的 Aggregated API是什么?

1.1 概述

Aggregated(聚合的)API server 是为了将原来的 API server 这个巨石(monolithic)应用给拆分开,为了方便用户开发自己的 API server 集成进来,而不用直接修改 Kubernetes 官方仓库的代码,这样一来也能将 API server 解耦,方便用户使用实验特性,简而言之,它是允许k8s的开发人员编写一个自己的服务,可以把这个服务注册到k8s的api里面,这样,就像k8s自己的api一样,自定义的服务只要运行在k8s集群里面,k8s 的Aggregate通过service名称就可以转发到我们自定义的service里面去了。这些 API server 可以跟 kube-apiserver 无缝衔接,使用 kubectl 也可以管理它们。

1.7+ 版本及以后,聚合层apiserver和 kube-apiserver 一起运行。在扩展资源被注册前,聚合层不执行任何操,要注册其 API,用户必需添加一个 APIService 对象,该对象需在 Kubernetes API 中声明 URL 路径,聚合层将发送到该 API 路径(e.g. /apis/myextension.mycompany.io/v1/…)的所有对象代理到注册的 APIService。

通常,通过在集群中的一个 Pod 中运行一个 extension-apiserver 来实现 APIService。如果已添加的资源需要主动管理,这个 extension-apiserver 通常需要和一个或多个controller配对。

1.2 设计理念
  • api的扩展性:这样k8s的开发人员就可以编写自己的API服务器来公开他们想要的API。集群管理员应该能够使用这些服务,而不需要对核心库存储库进行任何更改。
  • 丰富了APIs:核心kubernetes团队阻止了很多新的API提案。通过允许开发人员将他们的API作为单独的服务器公开,并使集群管理员能够在不对核心库存储库进行任何更改的情况下使用它们,这样就无须社区繁杂的审查了。
  • 开发分阶段实验性API的地方:新的API可以在单独的聚集服务器中开发,当它稳定之后,那么把它们封装起来安装到其他集群就很容易了。
  • 确保新API遵循kubernetes约定:如果没有这里提出的机制,社区成员可能会被迫推出自己的东西,这可能会或可能不遵循kubernetes约定。
1.3 Kubernetes Aggregated原理解析

我们说的自定义API其实就是和Metrics Server的实现方式一样,都是通过注册API的形式来完成和Kubernetes的集成的,也就是在API Server增加原本没有的API。不过添加API还可以通过CRD的方式完成,不过我们这里直说聚合方式。看下图:

img

Kube-Aggregator类似于一个七层负载均衡,将来自用户的请求拦截转发给其他服务器,并且负责整个 APIServer 的 Discovery 功能。

通过APIServices对象关联到某个Service来进行请求的转发,其关联的Service类型进一步决定了请求转发形式。Aggregator包括一个GenericAPIServer和维护自身状态的Controller。其中 GenericAPIServer主要处理apiregistration.k8s.io组下的APIService资源请求。

主要controller包括:

  • apiserviceRegistrationController:负责APIServices中资源的注册与删除;
  • availableConditionController:维护APIServices的可用状态,包括其引用Service是否可用等;
  • autoRegistrationController:用于保持API中存在的一组特定的APIServices;
  • crdRegistrationController:负责将CRD GroupVersions自动注册到APIServices中;
  • openAPIAggregationController:将APIServices资源的变化同步至提供的OpenAPI文档;

假设有两个路由分别访问API,实际上我们访问API的时候访问的是一个aggregator的代理层,下面橙色的都是可用的服务后端。我们访问上图中的2个URL其实是被代理到不同的后端,在这个机制下你可以添加更多的后端,比如举例说说Custome-metrics-apiserver绿色线条的路径。

注册自定义API文件

该文件的主要作用就是向Api server注册一个api,此API名称是关联到一个service名称上。

apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
  name: v1beta1.custom.metrics.k8s.io
  labels:
    api: custom-metrics-apiserver
    apiserver: "true"
spec:
  version: v1beta1 #API版本
  group: custom.metrics.k8s.io #API所属的组
  groupPriorityMinimum: 2000
  service:
    name: custom-metrics-apiserver #自定义API所关联的service名称,当访问这个自定义API后转发到哪个service处理,就根据这个service名称选择
    namespace: default
  versionPriority: 10
  caBundle: "LS0tLS1CRUdJTiBDRVJUSUZJQ0"

上面定义了资源类型为APIService,service名称为custom-metrics-apiserver,空间为default的一个资源聚合接口。

下面带大家从源代码的角度来看,代码路径:
staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go ,和k8s其它controller一样,watch变化分发到add、update和delete方法

	apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    c.addAPIService,
		UpdateFunc: c.updateAPIService,
		DeleteFunc: c.deleteAPIService,
	})

主要监听两种资源apiService和service,路径:

staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
	// if the proxyHandler already exists, it needs to be updated. The aggregation bits do not
	// since they are wired against listers because they require multiple resources to respond
	if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
		proxyHandler.updateAPIService(apiService)
		if s.openAPIAggregationController != nil {
			s.openAPIAggregationController.UpdateAPIService(proxyHandler, apiService)
		}
		return nil
	}

	proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
	// v1. is a special case for the legacy API.  It proxies to a wider set of endpoints.
	if apiService.Name == legacyAPIServiceName {
		proxyPath = "/api"
	}

	// register the proxy handler
	proxyHandler := &proxyHandler{
		localDelegate:              s.delegateHandler,
		proxyCurrentCertKeyContent: s.proxyCurrentCertKeyContent,
		proxyTransport:             s.proxyTransport,
		serviceResolver:            s.serviceResolver,
		egressSelector:             s.egressSelector,
	}
	proxyHandler.updateAPIService(apiService)
	if s.openAPIAggregationController != nil {
		s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
	}
	s.proxyHandlers[apiService.Name] = proxyHandler
	s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
	s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)

	// if we're dealing with the legacy group, we're done here
	if apiService.Name == legacyAPIServiceName {
		return nil
	}

	// if we've already registered the path with the handler, we don't want to do it again.
	if s.handledGroups.Has(apiService.Spec.Group) {
		return nil
	}

	// it's time to register the group aggregation endpoint
	groupPath := "/apis/" + apiService.Spec.Group
	groupDiscoveryHandler := &apiGroupHandler{
		codecs:    aggregatorscheme.Codecs,
		groupName: apiService.Spec.Group,
		lister:    s.lister,
		delegate:  s.delegateHandler,
	}
	// aggregation is protected
	s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)
	s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle(groupPath+"/", groupDiscoveryHandler)
	s.handledGroups.Insert(apiService.Spec.Group)
	return nil
}

结合上面的源码:

proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version

例子就是/apis/custom-metrics.k8s.io/v1beta1,而处理方法请求的handle就是

	// register the proxy handler
	proxyHandler := &proxyHandler{
		localDelegate:              s.delegateHandler,
		proxyCurrentCertKeyContent: s.proxyCurrentCertKeyContent,
		proxyTransport:             s.proxyTransport,
		serviceResolver:            s.serviceResolver,
		egressSelector:             s.egressSelector,
	}
	proxyHandler.updateAPIService(apiService)

updateAPIService就是更新这个proxy的后端service,路径:

staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go

func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIService) {
	if apiService.Spec.Service == nil {
		r.handlingInfo.Store(proxyHandlingInfo{local: true})
		return
	}

	proxyClientCert, proxyClientKey := r.proxyCurrentCertKeyContent()

	newInfo := proxyHandlingInfo{
		name: apiService.Name,
		restConfig: &restclient.Config{
			TLSClientConfig: restclient.TLSClientConfig{
				Insecure:   apiService.Spec.InsecureSkipTLSVerify,
				ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
				CertData:   proxyClientCert,
				KeyData:    proxyClientKey,
				CAData:     apiService.Spec.CABundle,
			},
		},
		serviceName:      apiService.Spec.Service.Name,
		serviceNamespace: apiService.Spec.Service.Namespace,
		servicePort:      *apiService.Spec.Service.Port,
		serviceAvailable: apiregistrationv1apihelper.IsAPIServiceConditionTrue(apiService, apiregistrationv1api.Available),
	}
	if r.egressSelector != nil {
		networkContext := egressselector.Cluster.AsNetworkContext()
		var egressDialer utilnet.DialFunc
		egressDialer, err := r.egressSelector.Lookup(networkContext)
		if err != nil {
			klog.Warning(err.Error())
		} else {
			newInfo.restConfig.Dial = egressDialer
		}
	} else if r.proxyTransport != nil && r.proxyTransport.DialContext != nil {
		newInfo.restConfig.Dial = r.proxyTransport.DialContext
	}
	newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
	if newInfo.transportBuildingError != nil {
		klog.Warning(newInfo.transportBuildingError.Error())
	}
	r.handlingInfo.Store(newInfo)
}

上述源码中restConfig就是调用service的客户端参数,其中

ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc"

指的就是具体的service。

2.4.2 认证流程

工作方式

与自定义资源定义(CRD)不同,除标准的 Kubernetes kube-apiserver 外,Aggregation API 还涉及另一个服务器:Extension apiserver。Kubernetes kube-apiserver 将需要与自定义的 Extension apiserver 通信,并且自定义的 Extension apiserver 也需要与 Kubernetes kube-apiserver 通信。为了确保此通信的安全,Kubernetes kube-apiserver 使用 x509 证书向 Extension apiserver 认证。具体流程如下:
在这里插入图片描述

  1. Kubernetes kube-apiserver:对发出请求的用户身份认证,并对请求的 API 路径执行鉴权
  2. Kubernetes kube-apiserver (aggregator):将请求转发到 Extension apiserver (aggregated apiserver)
  3. Extension apiserver:认证来自 Kubernetes kube-apiserver 的请求
  4. Extension apiserver:对来自原始用户的请求鉴权
  5. Extension apiserver:执行
Kubernetes kube-apiserver 认证和授权

假设我们已经在 Kubernetes kube-apiserver 注册了 Extension apiserver。

当用户请求访问 path ,Kubernetes kube-apiserver 使用它的标准认证和授权配置来对用户认证,以及对特定 path 的鉴权,到目前为止,所有内容都是标准的 Kubernetes API 请求,认证与鉴权,接下来 Kubernetes kube-apiserver 现在准备将请求发送到 Extension apiserver。

Kubernetes kube-apiserver认证时,认证接受会将认证信息删除,处理逻辑如下:

  • 1.通过context获取user信息
  • 2.构造请求,删除requestheader信息,通过user重新填充
  • 3.通过proxyRoundTripper转发请求

代码路径:

staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go

func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	value := r.handlingInfo.Load()
	if value == nil {
		r.localDelegate.ServeHTTP(w, req)
		return
	}
	handlingInfo := value.(proxyHandlingInfo)
	if handlingInfo.local {
		if r.localDelegate == nil {
			http.Error(w, "", http.StatusNotFound)
			return
		}
		r.localDelegate.ServeHTTP(w, req)
		return
	}

	if !handlingInfo.serviceAvailable {
		proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
		return
	}

	if handlingInfo.transportBuildingError != nil {
		proxyError(w, req, handlingInfo.transportBuildingError.Error(), http.StatusInternalServerError)
		return
	}
	// 通过context获取user
	user, ok := genericapirequest.UserFrom(req.Context())
	if !ok {
		proxyError(w, req, "missing user", http.StatusInternalServerError)
		return
	}

	// write a new location based on the existing request pointed at the target service
    // 构造请求url,通过apiservice配置的service/namespace随机得到某个endpoint后端
	location := &url.URL{}
	location.Scheme = "https"
	rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName, handlingInfo.servicePort)
	if err != nil {
		klog.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, handlingInfo.serviceName, err)
		proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
		return
	}
	location.Host = rloc.Host
	location.Path = req.URL.Path
	location.RawQuery = req.URL.Query().Encode()

	newReq, cancelFn := newRequestForProxy(location, req)
	defer cancelFn()

	if handlingInfo.proxyRoundTripper == nil {
		proxyError(w, req, "", http.StatusNotFound)
		return
	}

	// we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers
	proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, handlingInfo.proxyRoundTripper, req)
	if err != nil {
		proxyError(w, req, err.Error(), http.StatusInternalServerError)
		return
	}
    // 包裹请求信息,将user信息放到header中
	proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)

	// if we are upgrading, then the upgrade path tries to use this request with the TLS config we provide, but it does
	// NOT use the roundtripper.  Its a direct call that bypasses the round tripper.  This means that we have to
	// attach the "correct" user headers to the request ahead of time.  After the initial upgrade, we'll be back
	// at the roundtripper flow, so we only have to muck with this request, but we do have to do it.
	if upgrade {
		transport.SetAuthProxyHeaders(newReq, user.GetName(), user.GetGroups(), user.GetExtra())
	}
	// 调用后端
	handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
	handler.ServeHTTP(w, newReq)
}

根据扩展apiserver找到后端时通过service获取对应endpoint列表,随机选择某个endpoint、实现如下,源码路径:

staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go

func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister, namespace, id string, port int32) (*url.URL, error) {
	svc, err := services.Services(namespace).Get(id)
	if err != nil {
		return nil, err
	}

	svcPort, err := findServicePort(svc, port)
	if err != nil {
		return nil, err
	}

	switch {
	case svc.Spec.Type == v1.ServiceTypeClusterIP, svc.Spec.Type == v1.ServiceTypeLoadBalancer, svc.Spec.Type == v1.ServiceTypeNodePort:
		// these are fine
	default:
		return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type)
	}

	eps, err := endpoints.Endpoints(namespace).Get(svc.Name)
	if err != nil {
		return nil, err
	}
	if len(eps.Subsets) == 0 {
		return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svc.Name))
	}

	// Pick a random Subset to start searching from.
	ssSeed := rand.Intn(len(eps.Subsets))

	// Find a Subset that has the port.
	for ssi := 0; ssi < len(eps.Subsets); ssi++ {
		ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
		if len(ss.Addresses) == 0 {
			continue
		}
		for i := range ss.Ports {
			if ss.Ports[i].Name == svcPort.Name {
				// Pick a random address.
				ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP
				port := int(ss.Ports[i].Port)
				return &url.URL{
					Scheme: "https",
					Host:   net.JoinHostPort(ip, strconv.Itoa(port)),
				}, nil
			}
		}
	}
	return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
}

ProxyRoundTripper创建路径:

staging/src/k8s.io/client-go/transport/round_trippers.go

func NewAuthProxyRoundTripper(username string, groups []string, extra map[string][]string, rt http.RoundTripper) http.RoundTripper {
	return &authProxyRoundTripper{
		username: username,
		groups:   groups,
		extra:    extra,
		rt:       rt,
	}
}

func (rt *authProxyRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
	req = utilnet.CloneRequest(req)
    // 设置user信息
	SetAuthProxyHeaders(req, rt.username, rt.groups, rt.extra)

	return rt.rt.RoundTrip(req)
}

// SetAuthProxyHeaders stomps the auth proxy header fields.  It mutates its argument.
func SetAuthProxyHeaders(req *http.Request, username string, groups []string, extra map[string][]string) {
    // 清除原始url的requestheader信息
	req.Header.Del("X-Remote-User")
	req.Header.Del("X-Remote-Group")
	for key := range req.Header {
		if strings.HasPrefix(strings.ToLower(key), strings.ToLower("X-Remote-Extra-")) {
			req.Header.Del(key)
		}
	}
	// 通过user重新填充信息
	req.Header.Set("X-Remote-User", username)
	for _, group := range groups {
		req.Header.Add("X-Remote-Group", group)
	}
	for key, values := range extra {
		for _, value := range values {
			req.Header.Add("X-Remote-Extra-"+headerKeyEscape(key), value)
		}
	}
}
Kubernetes kube-apiserver 代理请求

Kubernetes kube-apiserver 现在将请求发送或代理到注册以处理该请求的 Extension apiserver。为此,它需要了解几件事:

  1. Kubernetes kube-apiserver 应该如何向 Extension apiserver 认证,以通知 Extension apiserver 通过网络发出的请求来自有效的 Kubernetes kube-apiserver?
  2. Kubernetes kube-apiserver 应该如何通知 Extension apiserver 原始请求已通过认证的用户名和组?

简而言之,就是 Kubernetes kube-apiserver 已经认证和鉴权用户的请求,怎么将这些信息传递给 Extension apiserver,为提供这两条信息,我们必须使用若干启动参数来配置 Kubernetes apiserver。

Kubernetes kube-apiserver 客户端认证

Kubernetes kube-apiserver 通过 TLS 连接到 Extension apiserver,并使用客户端证书认证,这里 Kubernetes kube-apiserver (aggregator or proxy) 是 Extension apiserver 的客户端。必须在启动时使用提供的参数向 Kubernetes kube-apiserver 提供以下内容:

  • 通过 --proxy-client-key-file 指定签名私钥文件
  • 通过 --proxy-client-cert-file 指定验签证书文件
  • 通过 --requestheader-client-ca-file 签署客户端证书文件的 CA 证书
  • 通过 --requestheader-allowed-names 在签署的客户证书中有效的公用名(CN)

Kubernetes kube-apiserver 将使用由 –proxy-client-*-file 指示的文件来通过 Extension apiserver 验证。为了使合规的 Extension apiserver 能够将该请求视为有效,必须满足以下条件:

  1. 连接必须使用由 CA 签署的客户端证书,该证书的证书位于 --requestheader-client-ca-file 中。
  2. 连接必须使用客户端证书,该客户端证书的 CN 是 --requestheader-allowed-names 中列出的证书之一。 **注意:**您可以将此选项设置为空白,即为--requestheader-allowed-names=""。这将向扩展 apiserver 指示任何 CN 是可接受的。

使用这些选项启动时,Kubernetes kube-apiserver 将:

  1. 使用它们来通过 Extension apiserver 的认证。
  2. 在名为 kube-system 命名空间中创建一个 configmap extension-apiserver-authentication ,它将在其中放置 CA 证书和允许的 CN。反过来,Extension apiserver 可以检索这些内容以验证请求。
保存原始请求用户名和组信息

当 Kubernetes kube-apiserver 将请求代理到 Extension apiserver 时,它将向 Extension apiserver 通知原始请求已成功通过其验证的用户名和组。它在其代理请求的 http 标头中提供这些。您必须将要使用的标头名称告知 Kubernetes kube-apiserver。

  • 通过--requestheader-username-headers 标明用来保存用户名的头部
  • 通过--requestheader-group-headers 标明用来保存 group 的头部
  • 通过--requestheader-extra-headers-prefix 标明用来保存拓展信息前缀的头部

这些标头名称也放置在extension-apiserver-authentication 的 configmap 中,因此 Extension apiserver 可以检索和使用它们。

Extension apiserver 认证

Extension apiserver 在收到来自 Kubernetes kube-apiserver 的代理请求后,必须验证该请求确实来自有效的身份验证代理,该认证代理由 Kubernetes kube-apiserver 履行。Extension apiserver 通过以下方式对其认证:

  1. 如上所述,从kube-system中的 configmap 中检索以下内容:
  • 客户端 CA 证书 --requestheader-client-ca-file
  • 允许名称(CN)列表 --requestheader-allowed-names
  • 用户名,组和其他信息的头部。
  1. 使用以下证书检查 TLS 连接是否已通过认证:
  • 由其证书与检索到的 CA 证书匹配的 CA 签名。
  • 在允许的 CN 列表中有一个 CN,除非列表为空,在这种情况下允许所有 CN。
  • 从适当的头部中提取用户名和组。

如果以上均通过,则该请求是来自合法认证代理(在本例中为 Kubernetes kube-apiserver)的有效代理请求。

为了具有检索 configmap 的权限,Extension apiserver 需要适当的角色。在 kube-system 名字空间中有一个默认角色extension-apiserver-authentication-reader 可用于设置。

Extension apiserver 执行

如果SubjectAccessReview通过,则扩展 apiserver 执行请求。

2.4.3 部署过程

安装cfssl
wget https://pkg.cfssl.org/R1.2/cfssl_linux-amd64 -O /usr/local/bin/cfssl
wget https://pkg.cfssl.org/R1.2/cfssljson_linux-amd64 -O /usr/local/bin/cfssljson
wget https://pkg.cfssl.org/R1.2/cfssl-certinfo_linux-amd64 -O /usr/local/bin/cfssl-certinfo
cd /usr/local/bin/
chmod +x cfssl cfssljson cfssl-certinfo
创建CA
CA 配置文件
$ cat > aggregator-ca-config.json <<EOF
{
  "signing": {
    "default": {
      "expiry": "87600h"
    },
    "profiles": {
      "aggregator": {
        "usages": [
            "signing",
            "key encipherment",
            "server auth",
            "client auth"
        ],
        "expiry": "87600h"
      }
    }
  }
}
EOF
  • profiles : 可以定义多个 profiles,分别指定不同的过期时间、使用场景等参数;后续在签名证书时使用某个 profile。
  • signing :表示该证书可用于签名其它证书;生成的 aggregator-ca.pem 证书中 CA=TRUE
  • server auth :表示 Client 可以用该 CA 对 Server 提供的证书进行验证。
  • client auth :表示 Server 可以用该 CA 对 Client 提供的证书进行验证。
创建CA证书签名请求
$ cat > aggregator-ca-csr.json <<EOF
{
  "CN": "aggregator",
  "key": {
    "algo": "rsa",
    "size": 2048
  },
  "names": [
    {
      "C": "CN",
      "ST": "Shanghai",
      "L": "Shanghai",
      "O": "k8s",
      "OU": "wzlinux"
    }
  ],
    "ca": {
       "expiry": "87600h"
    }
}
  • “CN”Common Name,kube-apiserver 从证书中提取该字段作为请求的用户名 (User Name);浏览器使用该字段验证网站是否合法。
  • “O”Organization,kube-apiserver 从证书中提取该字段作为请求用户所属的组 (Group);
生成CA证书和私钥
cfssl gencert -initca aggregator-ca-csr.json | cfssljson -bare aggregator-ca
创建Kubernetes证书
创建aggregator证书签名请求
$ cat > aggregator-csr.json <<EOF
{
    "CN": "aggregator",
    "hosts": [
      "127.0.0.1",
      "172.18.0.101",
      "172.18.0.102",
      "172.18.0.103",
      "10.96.0.1",
      "kubernetes",
      "kubernetes.default",
      "kubernetes.default.svc",
      "kubernetes.default.svc.cluster",
      "kubernetes.default.svc.cluster.local"
    ],
    "key": {
        "algo": "rsa",
        "size": 2048
    },
    "names": [
        {
            "C": "CN",
            "ST": "Shanghai",
            "L": "Shanghai",
            "O": "k8s",
            "OU": "wzlinux"
        }
    ]
}

如果 hosts 字段不为空则需要指定授权使用该证书的 IP 或域名列表,由于该证书后续kubernetes master 集群使用,所以上面指定kubernetes master 集群的主机 IP 和 kubernetes 服务的服务 IP(一般是 kube-apiserver 指定的 service-cluster-ip-range 网段的第一个 IP,如 10.96.0.1)。

生成aggreagtor证书和私钥
cfssl gencert -ca=aggregator-ca.pem -ca-key=aggregator-ca-key.pem -config=aggregator-ca-config.json -profile=aggregator aggregator-csr.json | cfssljson -bare aggregator
分发证书

将生成的证书和秘钥文件(后缀名为.pem)拷贝到 Master 节点的 /etc/kubernetes/pki 目录下备用。

开启聚合层API

kube-apiserver 增加以下启动配置:

--requestheader-client-ca-file=/etc/kubernetes/pki/aggregator-ca.pem
--requestheader-allowed-names=aggregator
--requestheader-extra-headers-prefix=X-Remote-Extra-
--requestheader-group-headers=X-Remote-Group
--requestheader-username-headers=X-Remote-User
--proxy-client-cert-file=/etc/kubernetes/pki/aggregator.pem
--proxy-client-key-file=/etc/kubernetes/pki/aggregator-key.pem

前面创建的证书的 CN 字段的值必须和参数 –requestheader-allowed-names 指定的值 aggregator 相同

重启 kube-apiserver:

$ systemctl daemon-reload
$ systemctl restart kube-apiserver

如果 kube-proxy 没有在 Master 上面运行,kube-proxy 还需要添加配置:

--enable-aggregator-routing=true

2.5 实现自定义API(聚合)服务

API聚合这个方式实现相对复杂一点,但灵活度很高,基本业务上的大部分需求都可以满足。

2.5.1 工具介绍

虽然官方给了一个sample-apiserver,我们可以照着实现自己的Aggregated APIServer。但完全手工编写还是太费劲了,这里使用官方推荐的工具apiserver-builder帮助快速创建项目骨架。

apiserver-builder构建AA方案的API接口服务的原理还是比较清晰的,总之就是kubernetes里最常见的控制器模式,这里就不具体介绍了,官方文档既有文字又有图片讲得还是挺细致的,强烈推荐大家多看看,学习一下。

apiserver-builder这个工具与kubebuilder和operator-sdk非常相似,他们都依赖一个底层库controller-gen,apiserver-builder该工具生成的工程与kubebuilder生成的工程也非常相似,其中一个不同的地方就是kubebuilder因为不需要自定义apiserver,因此,apiserver-builder生成的工程会有一个自定义apiserver,控制器部分的逻辑两者都一样,都是通过调协方法实现自定义资源状态的维护。

2.5.2 API聚合服务开发

用例源码下载
git clone https://github.com/kubernetes/sample-apiserver.git
源码编译
cd sample-apiserver
编译二进制文件:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o artifacts/simple-image/kube-sample-apiserver
编译docker镜像:
docker build -t kube-sample-apiserver:latest ./artifacts/simple-image
集群中部署

踩坑:
artifacts/example/deployment.yaml文件中需要修改镜像及版本,要与集群中对应;
artifacts/example下所有文件中涉及的命名空间统一修改为default,auth-reader.yaml中metadata下的命名空间为kube-system保持不变。

kubectl apply -f artifacts/example
访问测试
查看sercet
kubectl get sa apiserver -o json
查看sercet里面的tocken,并将该tocken拷贝到test.log的文件中后续访问是作为参数传入
kubectl describe secret apiserver
查看角色
kubectl get clusterrole
查看角色绑定
kubectl get clusterrolebinding
REST接口访问
curl  -k  -H  "Authorization: Bearer $(cat test.log)" https://10.131.180.168:6443/apis/wardle.example.com/v1alpha1/flunders

2.5.3 apiserver-builder实现API聚合服务开发

工具使用当前最新版本V2.0beta

初始化项目

apiserver-boot init repo --domain example.com

创建一个非命名空间范围的api-resource

apiserver-boot create group version resource --group demo --version v1beta1 --non-namespaced=true --kind Foo

创建Foo这个api-resource的子资源

apiserver-boot create subresource --subresource bar --group demo --version v1beta1 --kind Foo

生成上述创建的api-resource类型的相关代码,包括deepcopy接口实现代码、versioned/unversioned类型转换代码、api-resource类型注册代码、api-resource类型的Controller代码、api-resource类型的AdmissionController代码

解决工程依赖问题并生成脚手架代码

go mod init
go mod vendor
make generate

可以直接在本地将etcd, apiserver, controller运行起来

apiserver-boot run local

上述这样操作之后,就可以访问我们的APIServer了,如下面的命令:

curl -k https://127.0.0.1:9443/apis/demo.example.com/v1beta1/foos

当然也可以新建一个yaml文件,然后用kubectl命令直接对api-resource进行操作:
创建Foo资源的yaml

echo 'apiVersion: demo.example.com/v1beta1
kind: Foo
metadata:
  name: foo-example
  namespace: test
spec: {}' > sample/foo.yaml

如果在apiserver的main方法里补上一些代码,以开启swagger-ui,还能更方便地看到这些API接口:

func main() {
	version := "v0"
	server.StartApiServer("/registry/example.com", apis.GetAllApiBuilders(), openapi.GetOpenAPIDefinitions, "Api", version, func(apiServerConfig *apiserver.Config) error {
		...
		apiServerConfig.RecommendedConfig.EnableSwaggerUI = true
		apiServerConfig.RecommendedConfig.SwaggerConfig = genericapiserver.DefaultSwaggerConfig()
		return nil
	})
}

然后浏览器访问https://127.0.0.1:9443/swagger-ui/就可以在swagger的Web页面上看到创建出来的所有API接口

定制API接口服务

像上面这样创建的API接口,接口是都有了,但接口没有啥意义,一般要根据实际情况定义api-resource的spec、status等结构体。

type Foo struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   FooSpec   `json:"spec,omitempty"`
	Status FooStatus `json:"status,omitempty"`
}

// FooSpec defines the desired state of Foo
type FooSpec struct {
}

// FooStatus defines the observed state of Foo
type FooStatus struct {
}
定制Controller

默认生成的api-resource的Reconcile逻辑如下:

// Reconcile reads that state of the cluster for a Foo object and makes changes based on the state read
// and what is in the Foo.Spec
// TODO(user): Modify this Reconcile function to implement your Controller logic.  The scaffolding writes
// a Deployment as an example
// +kubebuilder:rbac:groups=demo.jeremyxu2010.me,resources=foos,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=demo.jeremyxu2010.me,resources=foos/status,verbs=get;update;patch
func (r *ReconcileFoo) Reconcile(request reconcile.Request) (reconcile.Result, error) {
	// Fetch the Foo instance
	instance := &demov1beta1.Foo{}
	err := r.Get(context.TODO(), request.NamespacedName, instance)
	if err != nil {
		if errors.IsNotFound(err) {
			// Object not found, return.  Created objects are automatically garbage collected.
			// For additional cleanup logic use finalizers.
			return reconcile.Result{}, nil
		}
		// Error reading the object - requeue the request.
		return reconcile.Result{}, err
	}

	return reconcile.Result{}, nil
}

可以参考:operator-sdk-samples

打包部署

程序写好后,通过以下命令即可生成容器镜像及kubernetes的部署manifest文件:

生成二进制文件

apiserver-boot build executables

生成容器镜像

apiserver-boot build container --image demo/foo-apiserver:latest

生成kubernetes的部署manifest文件,可直接在kubernetes里apply即完成部署

apiserver-boot build config --name foo-apiserver --namespace default --image demo/foo-apiserver:latest

观察生成的kubernetes部署manifest文件config/apiserver.yaml,可以发现最终会创建一个Deployment,一个Service和一个APIService类型的kubernetes资源,同时APIService的caBundle及apiserver的TLS证书也配置妥当了。

Logo

开源、云原生的融合云平台

更多推荐