This tutorial shows how to deploy a Kafkorama cluster — with Kafka support, in conjunction with Amazon Managed Streaming for Apache Kafka (MSK), using Elastic Kubernetes Service (EKS).
Before deploying Kafkorama on EKS, ensure that you have an AWS account and have installed the following tools:
To avoid a hardcoded names of the EKS cluster and Kafka topic, let's define environment variables as follows:
export EKS_CLUSTER=eks-kafkorama
export EKS_SERVICE_ACCOUNT=msk-service-account
export EKS_NAMESPACE=kafkorama
export EKS_ROLE_NAME=msk-kafkorama-role
export EKS_SECURITY_GROUP=eks-kafkorama-sg
export KAFKA_TOPIC=server
export IAM_POLICY_NAME=msk-cluster-access-kafkorama
export MSK_CLUSTER_NAME=msk-kafkorama
export MSK_CONFIGURATION_NAME=MSK-Kafkorama-Auto-Create-Topics-Enabled
Login to AWS with the following command and follow the instructions on the screen to configure your AWS credentials:
aws configure
Create an EKS cluster with at least three and at most five nodes:
Create cluster configuration file. For NLB load balancer to work we need to change the parameter awsLoadBalancerController
from false
to true
:
To be able to configure a service account used for kafka client authentication from EKS to MSK cluster, we need to enable OIDC. We need to change the parameter withOIDC
from false
to true
:
eksctl create cluster --name=$EKS_CLUSTER \
--version=1.32 \
--nodes-min=3 \
--nodes-max=5 \
--region=us-east-1 \
--zones=us-east-1a,us-east-1b \
--ssh-access=true \
--node-ami-family=AmazonLinux2023 \
--dry-run | sed 's/awsLoadBalancerController: false/awsLoadBalancerController: true/g' | sed 's/withOIDC: false/withOIDC: true/g' > cluster-config.yaml
For the EKS cluster to be able to communicate with the AWS network load balancer, we need to replace the following lines:
iam:
vpcResourceControllerPolicy: true
withOIDC: true
with the following lines:
iam:
withOIDC: true
serviceAccounts:
- metadata:
name: aws-load-balancer-controller
namespace: kube-system
attachPolicy:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- elasticloadbalancing:DescribeListenerAttributes
- elasticloadbalancing:ModifyListenerAttributes
Resource: "*"
wellKnownPolicies:
awsLoadBalancerController: true
from the generated cluster-config.yaml
file.
eksctl create cluster -f cluster-config.yaml
This can take between 20 to 30 minutes to complete.
Check if the EKS nodes are ready with the following command:
kubectl get nodes
Install the AWS load balancer controller as follows to be able to use a Network Load Balancer (NLB) to expose the Kafkorama Gateways to the internet:
helm install aws-load-balancer-controller aws-load-balancer-controller \
-n kube-system \
--repo=https://aws.github.io/eks-charts \
--set clusterName=$EKS_CLUSTER \
--set serviceAccount.create=false \
--set serviceAccount.name=aws-load-balancer-controller
Check if the load balancer controller is running with the following command:
kubectl get all --selector "app.kubernetes.io/name=aws-load-balancer-controller" \
--namespace "kube-system"
To connect Kafkorama Gateways from EKS pods to MSK cluster, we need to create and attach an IAM policy to an EKS service account. The IAM policy gives the service account the necessary permissions to interact with MSK. The service account is configured in Migratorydata kubernetes cluster deployment configuration from bellow and is set with config serviceAccountName: $EKS_SERVICE_ACCOUNT
.
Create the file msk-policy.json
cat > msk-policy.json <<EOL
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kafka-cluster:*",
"Resource": "*"
}
]
}
EOL
aws iam create-policy --policy-name $IAM_POLICY_NAME --policy-document file://msk-policy.json
Attach the policy to the service account:
POLICY_ARN=$(aws iam list-policies --query "Policies[?PolicyName=='$IAM_POLICY_NAME'].{ARN:Arn}" --output text)
eksctl create iamserviceaccount --name $EKS_SERVICE_ACCOUNT --namespace $EKS_NAMESPACE --cluster $EKS_CLUSTER --role-name $EKS_ROLE_NAME \
--attach-policy-arn $POLICY_ARN --approve
See if role was attached to the service account with the following commands:
aws iam get-role --role-name $EKS_ROLE_NAME --query Role.AssumeRolePolicyDocument
aws iam list-attached-role-policies --role-name $EKS_ROLE_NAME --query 'AttachedPolicies[].PolicyArn' --output text
kubectl describe serviceaccount $EKS_SERVICE_ACCOUNT -n $EKS_NAMESPACE
Get VPC and Subnets ids from eks cluster
VPC_ID=`aws ec2 describe-vpcs --filters Name=tag:Name,Values="*$EKS_CLUSTER*" --query "Vpcs[].VpcId" --output text`
echo $VPC_ID
SUBNET_ID_1=$(aws ec2 describe-subnets --filter Name=vpc-id,Values=$VPC_ID --query 'Subnets[?AvailabilityZone==`us-east-1a`&&MapPublicIpOnLaunch==`true`].SubnetId' --output text)
echo $SUBNET_ID_1
SUBNET_ID_2=$(aws ec2 describe-subnets --filter Name=vpc-id,Values=$VPC_ID --query 'Subnets[?AvailabilityZone==`us-east-1b`&&MapPublicIpOnLaunch==`true`].SubnetId' --output text)
echo $SUBNET_ID_2
Create security group used for communication between EKS and MSK
aws ec2 create-security-group --group-name $EKS_SECURITY_GROUP --description "Kafkorama msk security group" --vpc-id $VPC_ID
Add inbound permit rules for port 9098:
SECURITY_GROUP_ID=$(aws ec2 describe-security-groups --filter Name=vpc-id,Values=$VPC_ID Name=group-name,Values=$EKS_SECURITY_GROUP --query 'SecurityGroups[*].[GroupId]' --output text)
echo $SECURITY_GROUP_ID
aws ec2 authorize-security-group-ingress --group-id $SECURITY_GROUP_ID --protocol tcp --port 9098 --cidr 0.0.0.0/0
Create configuration file for kafka to allow automatic topic creation:
cat > configuration.txt <<EOL
auto.create.topics.enable=true
EOL
MSK_CONFIGURATION_ARN=$(aws kafka create-configuration --name "${MSK_CONFIGURATION_NAME}" --description "Auto create topics enabled" --kafka-versions "3.5.1" --server-properties fileb://configuration.txt --query "Arn" --output text)
echo $MSK_CONFIGURATION_ARN
Create the following configuration files:
brokernodegroupinfo.json
to define the broker node groupclient-authentication.json
to enable IAM client authenticationconfiguration.json
to define custom kafka configurationcat > brokernodegroupinfo.json <<EOL
{
"InstanceType": "kafka.t3.small",
"ClientSubnets": [
"${SUBNET_ID_1}",
"${SUBNET_ID_2}"
],
"SecurityGroups": [
"${SECURITY_GROUP_ID}"
]
}
EOL
cat > client-authentication.json <<EOL
{
"Sasl": {
"Iam": {
"Enabled": true
}
}
}
EOL
cat > configuration.json <<EOL
{
"Revision": 1,
"Arn": "${MSK_CONFIGURATION_ARN}"
}
EOL
Run the following command to create the MSK cluster:
aws kafka create-cluster --cluster-name $MSK_CLUSTER_NAME \
--broker-node-group-info file://brokernodegroupinfo.json \
--kafka-version "3.5.1" \
--client-authentication file://client-authentication.json \
--configuration-info file://configuration.json \
--number-of-broker-nodes 2
This make take 15 to 30 minutes to complete.
Save the value of the ClusterArn key as you need it to perform other actions on your cluster. If you forget the cluster ARN, list all the kafka clusters with the following command:
MSK_ARN=$(aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn' --output text)
echo $MSK_ARN
See info about the cluster with the following command:
aws kafka describe-cluster --cluster-arn $MSK_ARN
Retrieve the Kafka bootstrap server in variable KAFKA_BOOTSTRAP_SERVER
as follows:
KAFKA_BOOTSTRAP_SERVER=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_ARN --output text)
echo $KAFKA_BOOTSTRAP_SERVER
kubectl create namespace $EKS_NAMESPACE
We will use the following Kubernetes manifest to build a cluster of three Kafkorama Gateways:
cat > kafkorama-cluster.yaml <<EOL
apiVersion: v1
kind: ConfigMap
metadata:
name: client-properties
labels:
name: client-properties
namespace: kafkorama
data:
client.properties: |-
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
---
apiVersion: v1
kind: Service
metadata:
namespace: kafkorama
name: kafkorama-cs
labels:
app: kafkorama
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
spec:
type: LoadBalancer
ports:
- name: client-port
port: 80
protocol: TCP
targetPort: 8800
selector:
app: kafkorama
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafkorama
namespace: kafkorama
labels:
app: kafkorama
spec:
selector:
matchLabels:
app: kafkorama
replicas: 3
template:
metadata:
labels:
app: kafkorama
spec:
serviceAccountName: $EKS_SERVICE_ACCOUNT
containers:
- name: kafkorama
imagePullPolicy: Always
image: kafkorama/kafkorama-gateway:latest
volumeMounts:
- name: client-properties
mountPath: "/kafkorama/addons/kafka/consumer.properties"
subPath: client.properties
readOnly: true
- name: client-properties
mountPath: "/kafkorama/addons/kafka/producer.properties"
subPath: client.properties
readOnly: true
env:
- name: MIGRATORYDATA_EXTRA_OPTS
value: "-DMemory=512MB -DX.ConnectionOffload=true -DClusterEngine=kafka"
- name: MIGRATORYDATA_KAFKA_EXTRA_OPTS
value: "-Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVER -Dtopics=$KAFKA_TOPIC"
- name: MIGRATORYDATA_JAVA_GC_LOG_OPTS
value: "-XX:+PrintCommandLineFlags -XX:+PrintGC -XX:+PrintGCDetails -XX:+DisableExplicitGC -Dsun.rmi.dgc.client.gcInterval=0x7ffffffffffffff0 -Dsun.rmi.dgc.server.gcInterval=0x7ffffffffffffff0 -verbose:gc"
resources:
requests:
memory: "512Mi"
ports:
- name: client-port
containerPort: 8800
readinessProbe:
tcpSocket:
port: 8800
initialDelaySeconds: 20
failureThreshold: 5
periodSeconds: 5
livenessProbe:
tcpSocket:
port: 8800
initialDelaySeconds: 10
failureThreshold: 5
periodSeconds: 5
volumes:
- name: client-properties
configMap:
name: client-properties
EOL
This manifest contains a Service and a Deployment. The Service is used to
handle the clients of the Kafkorama cluster over the port 8800
. Furthermore, the NLB service
create above maps this port to port number 80
. Consequently, clients will establish connections with the
Kafkorama cluster on port 80
.
In this manifest, we've used the MIGRATORYDATA_EXTRA_OPTS
environment variable which can be used to define specific parameters or adjust the default value of any parameter listed in
the Configuration Guide. In this manifest, we've used this environment variable to modify
the default values of the parameters such as Memory. Additionally,
we've employed it to modify the default value of the parameter ClusterEngine
, to enable the
Kafka native add-on.
To customize the Kafkorama's native add-on for Kafka, the environment variable
MIGRATORYDATA_KAFKA_EXTRA_OPTS offers the flexibility to
define specific parameters or adjust the default value of any parameter of the
Kafka native add-on. In the manifest above, we've used
this environment variable to modify the default values of the parameters bootstrap.servers
and topics
among others
to connect to MSK.
To deploy the Kafkorama cluster, copy this manifest to a file kafkorama-cluster.yaml
, and run:
kubectl apply -f kafkorama-cluster.yaml
Because the deployment concerns the namespace kafkorama
, switch to this namespace as follows:
kubectl config set-context --current --namespace=kafkorama
To return to the default namespace, run:
kubectl config set-context --current --namespace=default
Check the running pods to ensure the kafkorama
pods are running:
kubectl get pods
The output of this command should include something similar to the following:
NAME READY STATUS RESTARTS AGE
kafkorama-57848575bd-4tnbz 1/1 Running 0 4m32s
kafkorama-57848575bd-gjmld 1/1 Running 0 4m32s
kafkorama-57848575bd-tcbtf 1/1 Running 0 4m32s
You can check the logs of each cluster member by running a command as follows:
kubectl logs kafkorama-57848575bd-4tnbz
Now, you can check that the service of the Docker manifest above is up and running:
kubectl get svc
You should see an output similar to the following:
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafkorama-cs LoadBalancer 10.0.90.187 NLB-DNS 80:30546/TCP 5m27s
You should now be able to connect to http://NLB-DNS
and run the demo app provided with each Kafkorama Gateway of
the cluster (where NLB-DNS is the external address assigned by NLB service to your client
service.
The stateless nature of the Kafkorama cluster when deployed in conjunction with MSK, where each cluster member is independent from the others, highly simplifies the horizontal scaling on EKS.
For example, if the load of your system increases substantially, and supposing your nodes have enough resources
available, you can add two new members to the cluster by modifying the replicas
field as follows:
kubectl scale deployment kafkorama --replicas=5
If the load of your system decreases significantly, then you might remove three members from the cluster by modifying
the replicas
field as follows:
kubectl scale deployment kafkorama --replicas=2
Manual scaling is practical if the load of your system changes gradually. Otherwise, you can use the autoscaling feature of Kubernetes.
Kubernetes can monitor the load of your system, typically expressed in CPU usage, and scale your Kafkorama cluster
up and down by automatically modifying the replicas
field.
In the example above, to add one or more new members up to a maximum of 5
cluster members if the CPU usage of the
existing members becomes higher than 50%, or remove one or more of the existing members if the CPU usage of the existing
members becomes lower than 50%, use the following command:
kubectl autoscale deployment kafkorama \
--cpu-percent=50 --min=3 --max=5
Now, you can display information about the autoscaler object above using the following command:
kubectl get hpa
and display CPU usage of cluster members with:
kubectl top pods
While testing cluster autoscaling, it is important to understand that the Kubernetes autoscaler periodically retrieves CPU usage information from the cluster members. As a result, the autoscaling process may not appear instantaneous, but this delay aligns with the normal behavior of Kubernetes.
Get node group name with the following command:
eksctl get nodegroup --cluster=$EKS_CLUSTER --region=us-east-1
You should see an output similar to the following:
CLUSTER NODEGROUP STATUS CREATED MIN SIZE MAX SIZE DESIRED CAPACITY INSTANCE TYPE IMAGE ID
eks-kafkorama ng-78d1f82e ACTIVE 2024-01-25T07:55:46Z 3 5 5 m5.large AL2_x86_64
To scale the number of nodes in the EKS cluster, use the following command, update <NODE_GROUP> with the value from above:
eksctl scale nodegroup --cluster=$EKS_CLUSTER --nodes=5 --name=<NODE_GROUP> --region=us-east-1
See the number of nodes increased with the following command:
kubectl get nodes
You should see an output similar to the following:
NAME STATUS ROLES AGE VERSION
ip-192-168-0-196.ec2.internal Ready <none> 2m26s v1.27.9-eks-5e0fdde
ip-192-168-20-197.ec2.internal Ready <none> 2m26s v1.27.9-eks-5e0fdde
ip-192-168-46-194.ec2.internal Ready <none> 54m v1.27.9-eks-5e0fdde
ip-192-168-49-230.ec2.internal Ready <none> 54m v1.27.9-eks-5e0fdde
ip-192-168-8-103.ec2.internal Ready <none> 54m v1.27.9-eks-5e0fdde
Kafkorama clustering tolerates a number of cluster member to be down or to fail as detailed in the Clustering section.
In order to test an EKS node failure, use:
kubectl drain <node-name> --force --delete-local-data --ignore-daemonsets
Then, to start an EKS node, use:
kubectl uncordon <node-name>
To uninstall or if something got wrong with the commands above, you can remove the allocated resources (and try again) as detailed below.
kubectl delete namespace $EKS_NAMESPACE
aws kafka delete-cluster --cluster-arn $MSK_ARN
Delete the msk configuration, wait for msk cluster to be deleted before running the following command:
aws kafka delete-configuration --arn $MSK_CONFIGURATION_ARN
Delete the security group, same as above, wait for msk cluster to be deleted before running the following command:
SECURITY_GROUP_ID=$(aws ec2 describe-security-groups --filter Name=vpc-id,Values=$VPC_ID Name=group-name,Values=$EKS_SECURITY_GROUP --query 'SecurityGroups[*].[GroupId]' --output text)
aws ec2 delete-security-group --group-id $SECURITY_GROUP_ID
eksctl delete cluster --name=$EKS_CLUSTER --region=us-east-1
aws iam delete-policy --policy-arn $POLICY_ARN
First, please read the documentation of the Kafka native add-on to understand the automatic mapping between Kafkorama subjects and Kafka topics.
Utilize Kafkorama's client APIs to create real-time applications that communicate with your Kafkorama cluster via MSK.
Also, employ the APIs or tools of MSK to generate real-time messages, which are subsequently delivered to Kafkorama's clients. Similarly, consume real-time messages from MSK that originate from Kafkorama's clients.