Learn how to deploy Kafkorama Gateway on Amazon Elastic Kubernetes Service (EKS) with support for Amazon Managed Streaming for Apache Kafka (MSK), including IAM, networking, and topic configuration.
This tutorial walks you through deploying a Kafkorama Gateway cluster on Amazon EKS with Kafka support provided by Amazon MSK (Managed Streaming for Apache Kafka).
Before deploying Kafkorama Gateway on EKS, make sure you have:
The following environment variables will be used throughout the deployment steps to create consistent and reusable AWS and Kubernetes resources:
# EKS configuration
export EKS_CLUSTER=eks-kafkorama
export EKS_NAMESPACE=kafkorama
export EKS_SERVICE_ACCOUNT=msk-service-account
export EKS_ROLE_NAME=msk-kafkorama-role
export EKS_SECURITY_GROUP=eks-kafkorama-sg
# MSK configuration
export MSK_CLUSTER_NAME=msk-kafkorama
export MSK_CONFIGURATION_NAME=MSK-Kafkorama-Auto-Create-Topics-Enabled
export IAM_POLICY_NAME=msk-cluster-access-kafkorama
# Kafka topic
export KAFKA_TOPIC=vehicles
Login to AWS with the following command and follow the instructions on the screen to configure your AWS credentials:
aws configure
Create an EKS cluster with at least three and at most five nodes:
Create cluster configuration file. For NLB load balancer to work we need to change the parameter awsLoadBalancerController
from false
to true
:
To be able to configure a service account used for kafka client authentication from EKS to MSK cluster, we need to enable OIDC. We need to change the parameter withOIDC
from false
to true
:
eksctl create cluster --name=$EKS_CLUSTER \
--version=1.32 \
--nodes-min=3 \
--nodes-max=5 \
--region=us-east-1 \
--zones=us-east-1a,us-east-1b \
--ssh-access=true \
--node-ami-family=AmazonLinux2023 \
--dry-run | sed 's/awsLoadBalancerController: false/awsLoadBalancerController: true/g' | sed 's/withOIDC: false/withOIDC: true/g' > cluster-config.yaml
For the EKS cluster to be able to communicate with the AWS network load balancer, we need to replace the following lines:
iam:
vpcResourceControllerPolicy: true
withOIDC: true
with the following lines:
iam:
withOIDC: true
serviceAccounts:
- metadata:
name: aws-load-balancer-controller
namespace: kube-system
attachPolicy:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- elasticloadbalancing:DescribeListenerAttributes
- elasticloadbalancing:ModifyListenerAttributes
Resource: "*"
wellKnownPolicies:
awsLoadBalancerController: true
from the generated cluster-config.yaml
file.
eksctl create cluster -f cluster-config.yaml
This can take between 20 to 30 minutes to complete.
Check if the EKS nodes are ready with the following command:
kubectl get nodes
Install the AWS load balancer controller as follows to be able to use a Network Load Balancer (NLB) to expose the Kafkorama Gateways to the internet:
helm install aws-load-balancer-controller aws-load-balancer-controller \
-n kube-system \
--repo=https://aws.github.io/eks-charts \
--set clusterName=$EKS_CLUSTER \
--set serviceAccount.create=false \
--set serviceAccount.name=aws-load-balancer-controller
Check if the load balancer controller is running with the following command:
kubectl get all --selector "app.kubernetes.io/name=aws-load-balancer-controller" \
--namespace "kube-system"
To connect Kafkorama Gateways from EKS pods to MSK cluster, we need to create and attach an IAM policy to an EKS service account. The IAM policy gives the service account the necessary permissions to interact with MSK. The service account is configured in Kafkorama Gateway kubernetes cluster deployment configuration from bellow and is set with config serviceAccountName: $EKS_SERVICE_ACCOUNT
.
Create the file msk-policy.json
cat > msk-policy.json <<EOL
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kafka-cluster:*",
"Resource": "*"
}
]
}
EOL
aws iam create-policy --policy-name $IAM_POLICY_NAME --policy-document file://msk-policy.json
Attach the policy to the service account:
POLICY_ARN=$(aws iam list-policies --query "Policies[?PolicyName=='$IAM_POLICY_NAME'].{ARN:Arn}" --output text)
eksctl create iamserviceaccount --name $EKS_SERVICE_ACCOUNT --namespace $EKS_NAMESPACE --cluster $EKS_CLUSTER --role-name $EKS_ROLE_NAME \
--attach-policy-arn $POLICY_ARN --approve
See if role was attached to the service account with the following commands:
aws iam get-role --role-name $EKS_ROLE_NAME --query Role.AssumeRolePolicyDocument
aws iam list-attached-role-policies --role-name $EKS_ROLE_NAME --query 'AttachedPolicies[].PolicyArn' --output text
kubectl describe serviceaccount $EKS_SERVICE_ACCOUNT -n $EKS_NAMESPACE
Get VPC and Subnets ids from eks cluster
VPC_ID=`aws ec2 describe-vpcs --filters Name=tag:Name,Values="*$EKS_CLUSTER*" --query "Vpcs[].VpcId" --output text`
echo $VPC_ID
SUBNET_ID_1=$(aws ec2 describe-subnets --filter Name=vpc-id,Values=$VPC_ID --query 'Subnets[?AvailabilityZone==`us-east-1a`&&MapPublicIpOnLaunch==`true`].SubnetId' --output text)
echo $SUBNET_ID_1
SUBNET_ID_2=$(aws ec2 describe-subnets --filter Name=vpc-id,Values=$VPC_ID --query 'Subnets[?AvailabilityZone==`us-east-1b`&&MapPublicIpOnLaunch==`true`].SubnetId' --output text)
echo $SUBNET_ID_2
Create security group used for communication between EKS and MSK
aws ec2 create-security-group --group-name $EKS_SECURITY_GROUP --description "Kafkorama msk security group" --vpc-id $VPC_ID
Add inbound permit rules for port 9098:
SECURITY_GROUP_ID=$(aws ec2 describe-security-groups --filter Name=vpc-id,Values=$VPC_ID Name=group-name,Values=$EKS_SECURITY_GROUP --query 'SecurityGroups[*].[GroupId]' --output text)
echo $SECURITY_GROUP_ID
aws ec2 authorize-security-group-ingress --group-id $SECURITY_GROUP_ID --protocol tcp --port 9098 --cidr 0.0.0.0/0
Create configuration file for kafka to allow automatic topic creation:
cat > configuration.txt <<EOL
auto.create.topics.enable=true
EOL
MSK_CONFIGURATION_ARN=$(aws kafka create-configuration --name "${MSK_CONFIGURATION_NAME}" --description "Auto create topics enabled" --kafka-versions "3.5.1" --server-properties fileb://configuration.txt --query "Arn" --output text)
echo $MSK_CONFIGURATION_ARN
Create the following configuration files:
brokernodegroupinfo.json
to define the broker node groupclient-authentication.json
to enable IAM client authenticationconfiguration.json
to define custom kafka configurationcat > brokernodegroupinfo.json <<EOL
{
"InstanceType": "kafka.t3.small",
"ClientSubnets": [
"${SUBNET_ID_1}",
"${SUBNET_ID_2}"
],
"SecurityGroups": [
"${SECURITY_GROUP_ID}"
]
}
EOL
cat > client-authentication.json <<EOL
{
"Sasl": {
"Iam": {
"Enabled": true
}
}
}
EOL
cat > configuration.json <<EOL
{
"Revision": 1,
"Arn": "${MSK_CONFIGURATION_ARN}"
}
EOL
Run the following command to create the MSK cluster:
aws kafka create-cluster --cluster-name $MSK_CLUSTER_NAME \
--broker-node-group-info file://brokernodegroupinfo.json \
--kafka-version "3.5.1" \
--client-authentication file://client-authentication.json \
--configuration-info file://configuration.json \
--number-of-broker-nodes 2
This may take 15 to 30 minutes to complete.
Save the value of the ClusterArn key as you need it to perform other actions on your cluster. If you forget the cluster ARN, list all the kafka clusters with the following command:
MSK_ARN=$(aws kafka list-clusters --query 'ClusterInfoList[*].ClusterArn' --output text)
echo $MSK_ARN
See info about the cluster with the following command:
aws kafka describe-cluster --cluster-arn $MSK_ARN
Retrieve the Kafka bootstrap server in variable KAFKA_BOOTSTRAP_SERVER
as follows:
KAFKA_BOOTSTRAP_SERVER=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_ARN --output text)
echo $KAFKA_BOOTSTRAP_SERVER
kubectl create namespace $EKS_NAMESPACE
We will use the following Kubernetes manifest to build a cluster of three Kafkorama Gateways:
cat > kafkorama-cluster.yaml <<EOL
apiVersion: v1
kind: ConfigMap
metadata:
name: client-properties
labels:
name: client-properties
namespace: kafkorama
data:
client.properties: |-
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
---
apiVersion: v1
kind: Service
metadata:
namespace: kafkorama
name: kafkorama-cs
labels:
app: kafkorama
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
spec:
type: LoadBalancer
ports:
- name: client-port
port: 80
protocol: TCP
targetPort: 8800
selector:
app: kafkorama
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafkorama
namespace: kafkorama
labels:
app: kafkorama
spec:
selector:
matchLabels:
app: kafkorama
replicas: 3
template:
metadata:
labels:
app: kafkorama
spec:
serviceAccountName: $EKS_SERVICE_ACCOUNT
containers:
- name: kafkorama
imagePullPolicy: Always
image: kafkorama/kafkorama-gateway:6.0.23
volumeMounts:
- name: client-properties
mountPath: "/kafkorama-gateway/addons/kafka/consumer.properties"
subPath: client.properties
readOnly: true
- name: client-properties
mountPath: "/kafkorama-gateway/addons/kafka/producer.properties"
subPath: client.properties
readOnly: true
env:
- name: KAFKORAMA_GATEWAY_EXTRA_OPTS
value: "-DMemory=512MB \
-DX.ConnectionOffload=true \
-Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVER \
-Dtopics=$KAFKA_TOPIC"
- name: KAFKORAMA_GATEWAY_JAVA_GC_LOG_OPTS
value: "-XX:+PrintCommandLineFlags -XX:+PrintGC -XX:+PrintGCDetails -XX:+DisableExplicitGC -Dsun.rmi.dgc.client.gcInterval=0x7ffffffffffffff0 -Dsun.rmi.dgc.server.gcInterval=0x7ffffffffffffff0 -verbose:gc"
resources:
requests:
memory: "512Mi"
ports:
- name: client-port
containerPort: 8800
readinessProbe:
tcpSocket:
port: 8800
initialDelaySeconds: 20
failureThreshold: 5
periodSeconds: 5
livenessProbe:
tcpSocket:
port: 8800
initialDelaySeconds: 10
failureThreshold: 5
periodSeconds: 5
volumes:
- name: client-properties
configMap:
name: client-properties
EOL
This manifest includes:
We configure Kafkorama Gateway using the KAFKORAMA_GATEWAY_EXTRA_OPTS
environment variable, which allows you to override default parameters from the Configuration Guide. In
this example, we:
To deploy the cluster, save this manifest as kafkorama-cluster.yaml
and run:
kubectl apply -f kafkorama-cluster.yaml
Since the deployment uses the kafkorama
namespace, switch to it with the following command:
kubectl config set-context --current --namespace=kafkorama
To switch back to the default namespace, run:
kubectl config set-context --current --namespace=default
Check that the kafkorama pods are running:
kubectl get pods
The output should look similar to the following:
NAME READY STATUS RESTARTS AGE
kafkorama-57848575bd-4tnbz 1/1 Running 0 4m32s
kafkorama-57848575bd-gjmld 1/1 Running 0 4m32s
kafkorama-57848575bd-tcbtf 1/1 Running 0 4m32s
To view the logs of the Kafkorama pod, run:
kubectl logs kafkorama-57848575bd-4tnbz
Then, check if the service is up and accessible:
kubectl get svc
You should see an output similar to the following:
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafkorama-cs LoadBalancer 10.0.90.187 NLB-DNS 80:30546/TCP 5m27s
Locate the EXTERNAL-IP
and PORT
values for the kafkorama-cs
service. In this example, the service is available at
http://NLB-DNS
.
Open this URL in your browser. You should see a welcome page that includes a demo application under the Debug Console menu, which allows you to publish and consume real-time messages via the Kafkorama cluster.
The stateless nature of the Kafkorama cluster when deployed in conjunction with MSK, where each cluster member is independent from the others, highly simplifies the horizontal scaling on EKS.
For example, if the load of your system increases substantially, and supposing your nodes have enough resources
available, you can add two new members to the cluster by modifying the replicas
field as follows:
kubectl scale deployment kafkorama --replicas=5
If the load of your system decreases significantly, then you might remove three members from the cluster by modifying
the replicas
field as follows:
kubectl scale deployment kafkorama --replicas=2
Manual scaling is practical if the load of your system changes gradually. Otherwise, you can use the autoscaling feature of Kubernetes.
Kubernetes can monitor the load of your system, typically expressed in CPU usage, and scale your Kafkorama cluster
up and down by automatically modifying the replicas
field.
In the example above, to add one or more new members up to a maximum of 5
cluster members if the CPU usage of the
existing members becomes higher than 50%, or remove one or more of the existing members if the CPU usage of the existing
members becomes lower than 50%, use the following command:
kubectl autoscale deployment kafkorama \
--cpu-percent=50 --min=3 --max=5
Now, you can display information about the autoscaler object above using the following command:
kubectl get hpa
and display CPU usage of cluster members with:
kubectl top pods
While testing cluster autoscaling, it is important to understand that the Kubernetes autoscaler periodically retrieves CPU usage information from the cluster members. As a result, the autoscaling process may not appear instantaneous, but this delay aligns with the normal behavior of Kubernetes.
Get node group name with the following command:
eksctl get nodegroup --cluster=$EKS_CLUSTER --region=us-east-1
You should see an output similar to the following:
CLUSTER NODEGROUP STATUS CREATED MIN SIZE MAX SIZE DESIRED CAPACITY INSTANCE TYPE IMAGE ID
eks-kafkorama ng-78d1f82e ACTIVE 2024-01-25T07:55:46Z 3 5 5 m5.large AL2_x86_64
To scale the number of nodes in the EKS cluster, use the following command, update <NODE_GROUP> with the value from above:
eksctl scale nodegroup --cluster=$EKS_CLUSTER --nodes=5 --name=<NODE_GROUP> --region=us-east-1
See the number of nodes increased with the following command:
kubectl get nodes
You should see an output similar to the following:
NAME STATUS ROLES AGE VERSION
ip-192-168-0-196.ec2.internal Ready <none> 2m26s v1.27.9-eks-5e0fdde
ip-192-168-20-197.ec2.internal Ready <none> 2m26s v1.27.9-eks-5e0fdde
ip-192-168-46-194.ec2.internal Ready <none> 54m v1.27.9-eks-5e0fdde
ip-192-168-49-230.ec2.internal Ready <none> 54m v1.27.9-eks-5e0fdde
ip-192-168-8-103.ec2.internal Ready <none> 54m v1.27.9-eks-5e0fdde
Kafkorama Gateway clustering tolerates a number of cluster member to be down or to fail as detailed in the Clustering section.
In order to test an EKS node failure, use:
kubectl drain <node-name> --force --delete-local-data --ignore-daemonsets
Then, to start an EKS node, use:
kubectl uncordon <node-name>
To uninstall or if something got wrong with the commands above, you can remove the allocated resources (and try again) as detailed below.
kubectl delete namespace $EKS_NAMESPACE
aws kafka delete-cluster --cluster-arn $MSK_ARN
Delete the msk configuration, wait for msk cluster to be deleted before running the following command:
aws kafka delete-configuration --arn $MSK_CONFIGURATION_ARN
Delete the security group, same as above, wait for msk cluster to be deleted before running the following command:
SECURITY_GROUP_ID=$(aws ec2 describe-security-groups --filter Name=vpc-id,Values=$VPC_ID Name=group-name,Values=$EKS_SECURITY_GROUP --query 'SecurityGroups[*].[GroupId]' --output text)
aws ec2 delete-security-group --group-id $SECURITY_GROUP_ID
eksctl delete cluster --name=$EKS_CLUSTER --region=us-east-1
aws iam delete-policy --policy-arn $POLICY_ARN
Next, choose the appropriate SDK for your programming language or platform to build real-time apps that communicate with your Kafkorama Gateway cluster.
You can also use MSK's APIs or tools to publish real-time messages to MSK — these messages will be delivered to connected Kafkorama clients. Similarly, you can consume messages from MSK that originate from clients connected to Kafkorama.
Finally, to manage Kafka clusters, entitlement, and streaming APIs through a web interface, you can deploy Kafkorama Portal. It provides centralized control for your real-time infrastructure and simplifies operations and access control, and more.