Scale Kafka consumer với Keda
Ghi chú nhanh về cách sử dụng Keda trong việc scale Kafka consumer
Bài hướng dẫn dưới đây yêu cầu bạn đã cài được Keda trên một cụm K8S Cluster.
Đầu tiên, ta cần tạo một kafka topic mới với partitions = số kafka broker * hệ số tự chọn. Nếu số kafka broker là 3, và bạn chọn hệ số là 4, thì sẽ tạo topic với partitions là 12. Lưu ý con số 12 cũng là số lượng maxReplica mà Keda sẽ scale lên trong quá trình autoscale.
./kafka-topics.sh --create --topic kafka-test-topic --partitions 12 --replication-factor 1 --bootstrap-server kafka.kafka:9092
Sau khi tạo được topic, việc cần làm là produce message vào trong topic đó. Ta có thể sử dụng thư viện opensource kafkacat để phục vụ mục đích trên. Bên dưới là định nghĩa của một Deployment sử dụng kafkacat.
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafkacat-producer
namespace: keda
spec:
replicas: 1
selector:
matchLabels:
app: kafkacat-producer
template:
metadata:
labels:
app: kafkacat-producer
spec:
containers:
- name: kafkacat-producer
image: confluentinc/cp-kafkacat
command:
- "sh"
- "-c"
- |
for i in $(seq 1 1000); do
echo "Message $i" | kafkacat -b kafka.kafka:9092 -t kafka-test-topic -P
sleep 0.1
done
resources:
limits:
cpu: 100m
memory: 50Mi
Tiếp theo ta cần tạo consumer.
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafkacat-consumer
namespace: keda
spec:
replicas: 1
selector:
matchLabels:
app: kafkacat-consumer
template:
metadata:
labels:
app: kafkacat-consumer
spec:
containers:
- name: kafkacat-consumer
image: confluentinc/cp-kafkacat
command:
- "sh"
- "-c"
- "kafkacat -b kafka.kafka:9092 -G kafka-test-consumer-group kafka-test-topic"
tty: true
Bước cuối cùng là tạo ScaledObject để Keda biết rằng cần phải scale deployment nào (scaleTargetRef.name), và scale dựa vào input nào (metadata.lagThreshold).
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: kafkacat-consumer-scaledobject
namespace: keda
spec:
scaleTargetRef:
name: kafkacat-consumer
pollingInterval: 30
triggers:
- type: kafka
metadata:
bootstrapServers: kafka.kafka:9092
consumerGroup: kafka-test-consumer-group
topic: kafka-test-topic
lagThreshold: "10"
offsetResetPolicy: latest
Nếu như các bước setup không có vấn đề, kafka consumer giờ đây đã autoscale dựa trên số lag threshold!
Hãy tăng giảm số lượng replicas của producers để theo dõi sự tăng giảm tương ứng của consumers.