flink1.13 on native k8s
flink 版本: 1.13.0k8s 版本: v1.16.2本次试用都是在default的namespace下操作的, 如果有需要可以 通过 -Dkubernetes.namespace=default来指定namespace,并在在kubectl操作时, -n来指定方案一: 基于flink session来实现先创建一个flink session. 此时由于没有任务的提交, 是没有taskMa
·
flink 版本: 1.13.0
k8s 版本: v1.16.2
本次试用都是在default的namespace下操作的, 如果有需要可以 通过 -Dkubernetes.namespace=default来指定namespace, 并在在kubectl操作时, -n来指定
方案一: 基于flink session来实现
先创建一个flink session. 此时由于没有任务的提交, 是没有taskManager的.
/data/flink-1.13.0/bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=stream-wordcount-application-cluster \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=flink \
-Dkubernetes.service.exposed.type=NodePort
提交flink任务到flink session中
/data/flink-1.13.0/bin/flink run -d -e kubernetes-session \
-Dkubernetes.cluster-id=stream-wordcount-application-cluster \
/data/flink-1.13.0/examples/streaming/WindowJoin.jar
# 如果任务提交失败, 那么修改NodePort
kubectl edit service stream-wordcount-application-cluster
查看k8s, 是可以发现任务正常运行了
kubectl get pods
kubectl get deployment
kubectl get svc
# 用此方法删除任务, 会出现configmap的残留
kubectl delete deployment stream-wordcount-application-cluster
方案二: 非session方式的flink native k8s
# 因为试用时, 使用了hdfs作为ha的存储和checkpoint的存储,
# 所以需要将基础的包flink-shaded-hadoop-2-uber-2.7.5-10.0.jar准备
mkdir -p /data/test
cd /data/test
# 将flink-shaded-hadoop-2-uber-2.7.5-10.0.jar包放入到该目录下
# 将需要运行的自己的包放到这个目录下, 我的试验的包是 flink-stream-k8s-1.0.jar
touch Dockerfile
vi Dockerfile
# 添加内容如下:
FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY ./flink-stream-k8s-1.0.jar $FLINK_HOME/usrlib/flink-stream-k8s-1.0.jar
COPY ./flink-shaded-hadoop-2-uber-2.7.5-10.0.jar $FLINK_HOME/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
# 构建image
docker build -t flink-stream-k8s:1.5 .
# 启动命令如下:
/data/flink-1.13.0/bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-stream-application-cluster-08 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=hdfs://ip:9000/flink/recovery \
-Dkubernetes.container.image=flink-stream-k8s:1.5 \
-Dstate.checkpoints.dir=hdfs://ip:9000/flink/checkpoints/flink-stream-application-cluster-08 \
-Dstate.savepoints.dir=hdfs://ip:9000/flink/savepoints/flink-stream-application-cluster-08 \
-Dexecution.checkpointing.interval=2s \
-Dexecution.checkpointing.mode=EXACTLY_ONCE \
-Dstate.backend=filesystem \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Drestart-strategy=failure-rate \
-Drestart-strategy.failure-rate.delay=1s \
-Drestart-strategy.failure-rate.failure-rate-interval=5s \
-Drestart-strategy.failure-rate.max-failures-per-interval=1 \
local:///opt/flink/usrlib/flink-stream-k8s-1.0.jar \
-clz cn.clife.k8s.flink.StreamRuntimeHaBody -jobName stream
# 停止命令
/data/flink-1.13.0/bin/flink stop -t kubernetes-application \
-Dkubernetes.cluster-id=flink-stream-application-cluster-08 \
00000000000000000000000000000000
注意事项:
1, 提交时, 会自动加载flink的配置文件, flink-conf.yaml到configmap中, 但是如果需要手动修改配置文件, 那么使用 -Dkey=value
2, local:/// 这个值镜像中的, 运行jar包的目录, 不是本服务器的文件地址
3, 运行main args参数依然放在服务后面提交
4, flink运行过程中, 出现任务自动重启, 是以checkpoint作为启动点重启的
查看configmap
kubectl get configmap |grep flink-stream-application-cluster-08
# 其中的flink-config取自于 flink_home/config/目录下的 logback-console.xml flink-conf.yaml log4j-console.properties 文件
# 其中hadoop-config取自于 hadoop_home/etc/hadoop/目录下的 core-site.xml hdfs-site.xml文件
kubectl delete configmap xxxx
kubectl describe configmap xxxx
总结
flink on native k8s 是在启动时, 通过k8s 的apiServer提交任务. 非session的方式提交, 启动时间较长, session提交任务, 启动时间较短,
问题
修改k8s的端口范围:
异常: provided port is not in the valid range. The range of valid ports is 30000-32767
操作:
vim /etc/kubernetes/manifests/kube-apiserver.yaml
在--service-cluster-ip-range 这一行下面添加: - --service-node-port-range=1-65535
保存后执行操作
systemctl daemon-reload
systemctl restart kubelet
system.out.println() 不输出的问题, 可以通过
kubectl logs {podName} -f 来查看
账号问题:
kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink
在提交命令中, 可以指定提交用户
-Dkubernetes.jobmanager.service-account=flink
更多推荐
已为社区贡献1条内容
所有评论(0)