flink 版本: 1.13.0
  k8s 版本: v1.16.2
  本次试用都是在default的namespace下操作的, 如果有需要可以 通过 -Dkubernetes.namespace=default来指定namespace,  并在在kubectl操作时, -n来指定
方案一: 基于flink session来实现
     先创建一个flink session. 此时由于没有任务的提交, 是没有taskManager的.
/data/flink-1.13.0/bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=stream-wordcount-application-cluster \
  -Dtaskmanager.memory.process.size=4096m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dkubernetes.container.image=flink \
  -Dkubernetes.service.exposed.type=NodePort
    提交flink任务到flink session中
/data/flink-1.13.0/bin/flink run -d -e kubernetes-session \
 -Dkubernetes.cluster-id=stream-wordcount-application-cluster \
 /data/flink-1.13.0/examples/streaming/WindowJoin.jar

# 如果任务提交失败, 那么修改NodePort
kubectl edit service stream-wordcount-application-cluster 

    查看k8s, 是可以发现任务正常运行了
kubectl get pods 
kubectl get deployment
kubectl get svc 
# 用此方法删除任务, 会出现configmap的残留
kubectl delete deployment stream-wordcount-application-cluster  
方案二: 非session方式的flink native k8s
# 因为试用时, 使用了hdfs作为ha的存储和checkpoint的存储, 
# 所以需要将基础的包flink-shaded-hadoop-2-uber-2.7.5-10.0.jar准备
mkdir -p /data/test
cd /data/test
# 将flink-shaded-hadoop-2-uber-2.7.5-10.0.jar包放入到该目录下
# 将需要运行的自己的包放到这个目录下, 我的试验的包是 flink-stream-k8s-1.0.jar

touch Dockerfile
vi Dockerfile
# 添加内容如下:
FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY ./flink-stream-k8s-1.0.jar $FLINK_HOME/usrlib/flink-stream-k8s-1.0.jar
COPY ./flink-shaded-hadoop-2-uber-2.7.5-10.0.jar $FLINK_HOME/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

# 构建image
docker build -t flink-stream-k8s:1.5 .

# 启动命令如下:
/data/flink-1.13.0/bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=flink-stream-application-cluster-08 \
    -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
	-Dhigh-availability.storageDir=hdfs://ip:9000/flink/recovery \
    -Dkubernetes.container.image=flink-stream-k8s:1.5 \
	-Dstate.checkpoints.dir=hdfs://ip:9000/flink/checkpoints/flink-stream-application-cluster-08 \
    -Dstate.savepoints.dir=hdfs://ip:9000/flink/savepoints/flink-stream-application-cluster-08 \
	-Dexecution.checkpointing.interval=2s \
	-Dexecution.checkpointing.mode=EXACTLY_ONCE \
	-Dstate.backend=filesystem \
	-Dkubernetes.rest-service.exposed.type=NodePort  \
	-Drestart-strategy=failure-rate  \
	-Drestart-strategy.failure-rate.delay=1s  \
	-Drestart-strategy.failure-rate.failure-rate-interval=5s \
	-Drestart-strategy.failure-rate.max-failures-per-interval=1  \
    local:///opt/flink/usrlib/flink-stream-k8s-1.0.jar \
	-clz cn.clife.k8s.flink.StreamRuntimeHaBody -jobName stream

# 停止命令
/data/flink-1.13.0/bin/flink stop -t kubernetes-application \
   -Dkubernetes.cluster-id=flink-stream-application-cluster-08 \
    00000000000000000000000000000000
 注意事项:
  1,  提交时, 会自动加载flink的配置文件, flink-conf.yaml到configmap中, 但是如果需要手动修改配置文件, 那么使用 -Dkey=value
  2,  local:///  这个值镜像中的, 运行jar包的目录, 不是本服务器的文件地址
  3,  运行main args参数依然放在服务后面提交
  4,  flink运行过程中, 出现任务自动重启, 是以checkpoint作为启动点重启的
    查看configmap
 kubectl get configmap |grep flink-stream-application-cluster-08
 # 其中的flink-config取自于 flink_home/config/目录下的 logback-console.xml flink-conf.yaml log4j-console.properties 文件
 # 其中hadoop-config取自于 hadoop_home/etc/hadoop/目录下的 core-site.xml hdfs-site.xml文件
kubectl delete configmap xxxx 
kubectl describe configmap xxxx 

在这里插入图片描述

总结

flink on native k8s 是在启动时, 通过k8s 的apiServer提交任务. 非session的方式提交, 启动时间较长, session提交任务, 启动时间较短,

问题
修改k8s的端口范围:
异常: provided port is not in the valid range. The range of valid ports is 30000-32767
操作:
vim /etc/kubernetes/manifests/kube-apiserver.yaml
在--service-cluster-ip-range 这一行下面添加:   - --service-node-port-range=1-65535
保存后执行操作
systemctl daemon-reload
systemctl restart kubelet
 system.out.println() 不输出的问题, 可以通过
 kubectl logs {podName} -f 来查看
账号问题:
kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink

在提交命令中, 可以指定提交用户
-Dkubernetes.jobmanager.service-account=flink 
Logo

开源、云原生的融合云平台

更多推荐