前提条件

  • 运行良好的kubernetes节点
  • 存储服务,本例使用NFS,参见:搭建NFS存储服务,挂载文件:/provisioner/srv

创建storageClass

由于 kafka 与 zookeeper 集群会使用pv 存储,这里使用storageClass 动态创建。

  1. 创建ServiceAccount 及相关认证信息
    编辑 nfs-rbac.yaml:
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: nfs-provisioner
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: nfs-provisioner-runner
rules:
  - apiGroups: [""]
    resources: ["persistentvolumes"]
    verbs: ["get", "list", "watch", "create", "delete"]
  - apiGroups: [""]
    resources: ["persistentvolumeclaims"]
    verbs: ["get", "list", "watch", "update"]
  - apiGroups: ["storage.k8s.io"]
    resources: ["storageclasses"]
    verbs: ["get", "list", "watch"]
  - apiGroups: [""]
    resources: ["events"]
    verbs: ["create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: run-nfs-provisioner
subjects:
  - kind: ServiceAccount
    name: nfs-provisioner
    namespace: default
roleRef:
  kind: ClusterRole
  name: nfs-provisioner-runner
  apiGroup: rbac.authorization.k8s.io
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: leader-locking-nfs-provisioner
rules:
  - apiGroups: [""]
    resources: ["endpoints"]
    verbs: ["get", "list", "watch", "create", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: leader-locking-nfs-provisioner
subjects:
  - kind: ServiceAccount
    name: nfs-provisioner
    # replace with namespace where provisioner is deployed
    namespace: default
roleRef:
  kind: Role
  name: leader-locking-nfs-provisioner
  apiGroup: rbac.authorization.k8s.io

kubectl apply -f nfs-rbac.yaml
  1. 创建StorageClass
    编辑nfs-sc.yaml:
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: nfs-storageclass
provisioner: example.com/nfs
kubectl apply -f nfs-sc.yaml
  1. 创建StorageClass 所用的provisioner
    编辑nfs-client-provisioner.yaml:
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: nfs-provisioner
spec:
  replicas: 1
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: nfs-provisioner
    spec:
      serviceAccount: nfs-provisioner
      containers:
        - name: nfs-provisioner
          image: registry.cn-hangzhou.aliyuncs.com/open-ali/nfs-client-provisioner
          volumeMounts:
            - name: nfs-client-root
              mountPath: /persistentvolumes
          env:
            - name: PROVISIONER_NAME
              value: example.com/nfs
            - name: NFS_SERVER
              value: 127.0.0.1
            - name: NFS_PATH
              value: /provisioner/srv
      volumes:
        - name: nfs-client-root
          nfs:
            server: 127.0.0.1
            path: /provisioner/srv
  • “PROVISIONER_NAME” 是定义的provisioner的名称;
  • “NFS_SERVER” 与 server 是NFS 服务器的地址;
  • “NFS_PATH” 是NFS服务端共享的文件目录
  • /persistentvolumes 这个不要改
kubectl apply -f nfs-client-provisioner.yaml
  1. 将该StorageClass 设置为默认
kubectl patch storageclass default -p '{"metadata": {"annotations": {"storageclass.kubernetes.io/is-default-class": "true"}}}'
kubectl get sc
NAME                         PROVISIONER       AGE
nfs-storageclass (default)   example.com/nfs   1d

创建kafka-operator 相关资源

  1. 创建 kafka namespace
kubectl create namespace kafka
  1. 创建operator
    该yaml 中包含 ClusterRoles, ClusterRoleBindings及CRDs.
curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.12.2/strimzi-cluster-operator-0.12.2.yaml \
  | sed 's/namespace: .*/namespace: kafka/' \
  | kubectl -n kafka apply -f -
  1. 创建Kafka 集群
kubectl apply -f https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/0.12.2/examples/kafka/kafka-persistent-single.yaml -n kafka
kubectl get po -nkafka
NAME                                          READY     STATUS    RESTARTS   AGE
my-cluster-entity-operator-7cdd54b88c-v8qtz   3/3       Running   0          1m
my-cluster-kafka-0                            2/2       Running   1          19m
my-cluster-zookeeper-0                        2/2       Running   0          19m
strimzi-cluster-operator-6574c5754f-trh4v     1/1       Running   0          20m

部署完成

测试

当集群运行正常,创建一个kafka 消息生产者

kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.12.2-kafka-2.2.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic

在生产者中发送消息

再创建一个消费者

kubectl -n kafka run kafka-consumer -ti --image=strimzi/kafka:0.12.2-kafka-2.2.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning

另外:参考https://github.com/banzaicloud/kafka-operator,这是另一种operator(推荐这种方式安装)

安装kafka-manager

该组件用于管理kafka 集群
参考(https://github.com/yahoo/kafka-manager/ )
这里采用容器化安装方式:

  1. Dockerfile 生成kafka-manager 镜像
    https://github.com/hleb-albau/kafka-manager-docker
    Dockefile:
FROM openjdk:8-jdk AS build

ENV KAFKA_MANAGER_SOURCE=2.0.0.2
ENV KAFKA_MANAGER_VERSION=2.0.0.2
ENV KAFKA_MANAGER_SRC_DIR=/kafka-manager-source
ENV KAFKA_MANAGER_DIST_FILE=$KAFKA_MANAGER_SRC_DIR/target/universal/kafka-manager-$KAFKA_MANAGER_VERSION.zip

RUN echo "Building Kafka Manager" \
    && wget "https://github.com/yahoo/kafka-manager/archive/${KAFKA_MANAGER_SOURCE}.tar.gz" -O kafka-manager-sources.tar.gz \
    && mkdir $KAFKA_MANAGER_SRC_DIR \
    && tar -xzf kafka-manager-sources.tar.gz -C $KAFKA_MANAGER_SRC_DIR --strip-components=1 \
    && cd $KAFKA_MANAGER_SRC_DIR \
    && echo 'scalacOptions ++= Seq("-Xmax-classfile-name", "200")' >> build.sbt

ADD robust_build.sh /
RUN chmod +x robust_build.sh && /robust_build.sh

### STAGE 2: Package ###
FROM openjdk:8-jre-alpine
MAINTAINER Hleb Albau <hleb.albau@gmail.com>

RUN apk update && apk add bash
COPY --from=build /kafka-manager-bin /kafka-manager

VOLUME /kafka-manager/conf
ENTRYPOINT ["/kafka-manager/bin/kafka-manager", "-Dpidfile.path=/dev/null", "-Dapplication.home=/kafka-manager"]

编辑robust_build.sh:

BUILD_COUNTER=1
until (ls $KAFKA_MANAGER_DIST_FILE && exit 0) do 
    
    echo "Build count $BUILD_COUNTER"
    
    (cd $KAFKA_MANAGER_SRC_DIR \
    && ./sbt clean dist \
    && unzip -d ./builded ./target/universal/kafka-manager-${KAFKA_MANAGER_VERSION}.zip \
    && mv -T ./builded/kafka-manager-${KAFKA_MANAGER_VERSION} /kafka-manager-bin ; exit 0)
    
    BUILD_COUNTER=$((BUILD_COUNTER + 1))
    
    if [ $BUILD_COUNTER -gt 5 ]; then
        break
    fi
done

ls $KAFKA_MANAGER_DIST_FILE

构造镜像:

docker build -t kafka-manager:2.0.0.2 .

运行:

docker run -d \
     -p 9000:9000  \
     -e ZK_HOSTS="localhost:2181" \
     kafka-manager:2.0.0.2 \
     -Dpidfile.path=/dev/null

注意:ZK_HOSTS 应该是zookeeper服务的主机地址

可以为Web UI界面添加认证信息的变量:

KAFKA_MANAGER_AUTH_ENABLED: "true"
KAFKA_MANAGER_USERNAME: username
KAFKA_MANAGER_PASSWORD: password

界面访问hostIP:9000:在这里插入图片描述
另外一种是helm chart的方式安装kafka-manager:http://mirror.azure.cn/kubernetes/charts/kafka-manager-2.1.5.tgz

helm repo add stable http://mirror.azure.cn/kubernetes/charts

helm search kafka-manager
NAME                    	CHART VERSION	APP VERSION	DESCRIPTION                      
stable/kafka-manager    	2.1.5        	1.3.3.22   	A tool for managing Apache Kafka.

#下载chart包修改参数
helm fetch stable/kafka-manager

#解压
tar zxvf kafka-manager-2.1.5.tgz
cd kafka-manager
#修改value.yaml 中的参数来匹配你的环境,包括“serviceAccount”、“zkHosts”、“tuning”、“basicAuth”等。

helm install .

也可以参考我的另一篇kubernetes-helm部署及本地repo搭建 将修改并打包好的chart包放到自定义的本地chart仓库目录下,再install:

helm search kafka-manager
NAME                    	CHART VERSION	APP VERSION	DESCRIPTION                      
local-repo/kafka-manager	2.1.5        	1.3.3.22   	A tool for managing Apache Kafka.
stable/kafka-manager    	2.1.5        	1.3.3.22   	A tool for managing Apache Kafka.

#安装
helm install local-repo/kafka-manager
Logo

开源、云原生的融合云平台

更多推荐