Simple, secure Kafka access
This tutorial will walk you through declaring and applying intents to easily secure access to Kafka running inside a Kubernetes cluster, automating the management of Kafka ACLs, and the generation and deployment of certificates for mTLS between Kafka and its clients.
In this tutorial, we will:
- Deploy a Kafka broker, and two clients that call it.
- Declare that one client pod intends to access a topic on Kafka.
- See that an ACL was autogenerated to allow just that, while blocking calls from the other client.
- See that mTLS credentials were autogenerated.
Prerequisites
For this tutorial, we'll configure Otterize to not manage network policies, so we can focus on topic-level Kafka ACL authorization (vs just accessing Kafka at all).
Of course you can also choose to combine them — after all, Kafka is just another service running in the cluster. To do that, reinstall Otterize without the --set intentsOperator.operator.enableNetworkPolicyCreation=false
flag.
You can now install (or reinstall) Otterize in your cluster, and optionally connect to Otterize Cloud. Connecting to Cloud lets you:
- See what's happening visually in your browser, through the "access graph";
- Avoid using SPIRE (which can be installed with Otterize) for issuing certificates, as Otterize Cloud provides a certificate service.
So either forego browser visualization and:
Install Otterize in your cluster, without Otterize Cloud (and no network policy management)
helm repo add otterize https://helm.otterize.com
helm repo update
helm install otterize otterize/otterize-kubernetes -n otterize-system --create-namespace \
--set intentsOperator.operator.enableNetworkPolicyCreation=false
This chart is a bundle of the Otterize intents operator, Otterize credentials operator, Otterize network mapper, and SPIRE.
Initial deployment may take a couple of minutes.
You can add the --wait
flag for Helm to wait for deployment to complete and all pods to be Ready, or manually watch for all pods to be Ready
using kubectl get pods -n otterize-system -w
.
Or choose to include browser visualization and:
Install Otterize in your cluster, with Otterize Cloud (and no network policy management)
Create an Otterize Cloud account
If you don't already have an account, browse to https://app.otterize.com to set one up.
If someone in your team has already created an org in Otterize Cloud, and invited you (using your email address), you may see an invitation to accept.
Otherwise, you'll create a new org, which you can later rename, and invite your teammates to join you there.
Install Otterize OSS, connected to Otterize Cloud
If no Kubernetes clusters are connected to your account, click the "connect your cluster" button to:
- Create a Cloud cluster object, specifying its name and the name of an environment to which all namespaces in that cluster will belong, by default.
- Connect it with your actual Kubernetes cluster, by clicking on the "Connection guide →" link and running the Helm commands shown there.
- Follow the instructions to install Otterize with enforcement on (not in shadow mode) for this tutorial. In other words, omit the following flag in the Helm command:
--set intentsOperator.operator.enableEnforcement=false
- And add the following flag to the Helm command:
--set intentsOperator.operator.enableNetworkPolicyCreation=false
- Follow the instructions to install Otterize with enforcement on (not in shadow mode) for this tutorial. In other words, omit the following flag in the Helm command:
More details, if you're curious
Connecting your cluster simply entails installing Otterize OSS via Helm, using credentials from your account so Otterize OSS can report information needed to visualize the cluster.
The credentials will already be inlined into the Helm command shown in the Cloud UI, so you just need to copy that line and run it from your shell. If you don't give it the Cloud credentials, Otterize OSS will run fully standalone in your cluster — you just won't have the visualization in Otterize Cloud.
The Helm command shown in the Cloud UI also includes flags to turn off enforcement: Otterize OSS will be running in "shadow mode," meaning that it will show you what would happen if it created network policies to restrict pod-to-pod traffic, and created Kafka ACLs to control access to Kafka topics. While that's useful for gradually rolling out IBAC, for this tutorial we go straight to active enforcement.
While we want enforcement turned on, in this tutorial we don't want it for network policies — only for Kafka, so we can focus on Kafka topic-level access. You can configure network policy shadow or active enforcement, and Kafka ACLs shadow or active enforcement, independently of each other.s
Install Kafka
We will deploy a Kafka cluster using Bitnami's Helm chart. In the chart we will configure Kafka to:
- Recognize the Otterize intents operator as a super user so it can configure ACLs;
- Use TLS (Kafka calls it SSL) for its listeners;
- Tell the Otterize credentials operator, via pod annotations, how credentials should be created; and
- Authenticate clients using mTLS credentials provided as a Kubernetes secret
Expand to see the Helm values.yaml used with the Bitnami chart
# Configure Otterize as a super user to grant it access to configure ACLs
superUsers: "User:CN=kafka.kafka,O=SPIRE,C=US;User:CN=intents-operator-controller-manager.otterize,O=SPIRE,C=US"
# Use TLS for the Kafka listeners (Kafka calls them SSL)
listeners:
- "CLIENT://:9092"
- "INTERNAL://:9093"
advertisedListeners:
- "CLIENT://:9092"
- "INTERNAL://:9093"
listenerSecurityProtocolMap: "INTERNAL:SSL,CLIENT:SSL"
# Annotations for Otterize to generate credentials
podAnnotations:
credentials-operator.otterize.com/cert-type: jks
credentials-operator.otterize.com/tls-secret-name: kafka-tls-secret
credentials-operator.otterize.com/truststore-file-name: kafka.truststore.jks
credentials-operator.otterize.com/keystore-file-name: kafka.keystore.jks
credentials-operator.otterize.com/dns-names: "kafka-0.kafka-headless.kafka.svc.cluster.local,kafka.kafka.svc.cluster.local"
# Authenticate clients using mTLS
auth:
clientProtocol: mtls
interBrokerProtocol: mtls
tls:
type: jks
existingSecrets:
- kafka-tls-secret
password: password
authorizerClassName: kafka.security.authorizer.AclAuthorizer
# Allocate resources
resources:
requests:
cpu: 50m
memory: 256Mi
The following command will deploy a Kafka cluster with this chart:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install --create-namespace -n kafka \
-f https://docs.otterize.com/code-examples/kafka-mtls/helm/values.yaml kafka bitnami/kafka
You can watch for all pods to be Ready
using kubectl get pods -n kafka -w
.
Configure Otterize to manage Kafka access
Let's connect Kafka with Otterize by applying an Otterize KafkaServerConfig
:
kubectl apply -f https://docs.otterize.com/code-examples/kafka-mtls/kafkaserverconfig.yaml
Expand to see the KafkaServerConfig
- kafkaserverconfig.yaml
apiVersion: k8s.otterize.com/v1alpha2
kind: KafkaServerConfig
metadata:
name: kafkaserverconfig
namespace: kafka
spec:
service:
name: kafka
addr: kafka.kafka:9092
tls:
certFile: /etc/otterize-spire/cert.pem
keyFile: /etc/otterize-spire/key.pem
rootCAFile: /etc/otterize-spire/ca.pem
topics:
- topic: "*"
pattern: literal
clientIdentityRequired: false
intentsRequired: false
Upon applying the KafkaServerConfig, an ACL will configure Kafka to allow anonymous access all topics. This will be the base state, from which we will gradually roll out secure access to Kafka.
kubectl logs -n kafka statefulset/kafka | grep "Processing Acl change" | grep ANONYMOUS | tail -n 1
You should see the following output:
[2022-09-13 10:58:32,052] INFO Processing Acl change notification for
ResourcePattern(resourceType=TOPIC, name=*, patternType=LITERAL), versionedAcls :
Set(User:ANONYMOUS has ALLOW permission for operations: ALL from hosts: *,
User:* has ALLOW permission for operations: ALL from hosts: *), zkVersion : 0
(kafka.security.authorizer.AclAuthorizer)
Deploy clients
Clients will authenticate to Kafka using mTLS. Otterize makes this easy, requiring just 3 simple changes to the client pod spec:
- Generate credentials: add the
credentials-operator.otterize.com/tls-secret-name
annotation, which tells Otterize to generate mTLS credentials and store them in a Kubernetes secret whose name is the value of this annotation. - Expose credentials in a volume: add a volume containing this secret to the pod.
- Mount the volume: mount the volume in every container in the pod.
Expand to see this structure
spec:
template:
metadata:
annotations:
# 1. Generate credentials as a secret called "client-credentials-secret":
credentials-operator.otterize.com/tls-secret-name: client-credentials-secret
...
spec:
volumes:
# 2. Create a volume containing this secret:
- name: otterize-credentials
secret:
secretName: client-credentials-secret
...
containers:
- name: client
...
volumeMounts:
# 3. Mount volume into container
- name: otterize-credentials
mountPath: /var/otterize/credentials
readOnly: true
Our simple example consists of a two client pods — "client" and "client-other" — and the Kafka broker.
Expand to see the client specs used in this example
- namespace.yaml
- client-deployment.yaml
- client-configmap.yaml
- client-other-deployment.yaml
- client-other-configmap.yaml
apiVersion: v1
kind: Namespace
metadata:
name: otterize-tutorial-kafka-mtls
apiVersion: apps/v1
kind: Deployment
metadata:
name: client
namespace: otterize-tutorial-kafka-mtls
spec:
selector:
matchLabels:
app: client
template:
metadata:
labels:
app: client
annotations:
credentials-operator.otterize.com/tls-secret-name: client-credentials-secret
spec:
containers:
- name: client
image: golang
command: [ "/bin/sh", "-c", "--" ]
args: [ "while true; do cd /app; cp src/* .; go get main; go run .; sleep infinity; done" ]
volumeMounts:
- name: ephemeral
mountPath: /app
- mountPath: /app/src
name: client-go
- name: otterize-credentials
mountPath: /var/otterize/credentials
readOnly: true
volumes:
- name: client-go
configMap:
name: client-go
- name: otterize-credentials
secret:
secretName: client-credentials-secret
- name: ephemeral
emptyDir: { }
apiVersion: v1
kind: ConfigMap
metadata:
name: client-go
namespace: otterize-tutorial-kafka-mtls
data:
client.go: |
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
"io/ioutil"
"time"
)
const (
kafkaAddr = "kafka.kafka:9092"
testTopicName = "mytopic"
certFile = "/var/otterize/credentials/cert.pem"
keyFile = "/var/otterize/credentials/key.pem"
rootCAFile = "/var/otterize/credentials/ca.pem"
)
func getTLSConfig()( * tls.Config, error) {
cert, err: = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, fmt.Errorf("failed loading x509 key pair: %w", err)
}
pool: = x509.NewCertPool()
rootCAPEM, err: = ioutil.ReadFile(rootCAFile)
if err != nil {
return nil, fmt.Errorf("failed loading root CA PEM file: %w ", err)
}
pool.AppendCertsFromPEM(rootCAPEM)
return &tls.Config {
Certificates: [] tls.Certificate {
cert
},
RootCAs: pool,
}, nil
}
func send_messages(producer sarama.SyncProducer) {
i: = 1
for {
msg: = fmt.Sprintf("Message %d [sent by client]", i)
_,
_,
err: = producer.SendMessage( & sarama.ProducerMessage {
Topic: testTopicName,
Partition: -1,
Value: sarama.StringEncoder(msg),
})
if err != nil {
return
}
fmt.Printf("Sent message - %s\n", msg)
time.Sleep(2 * time.Second)
i++
}
}
func loop_kafka() error {
addrs: = [] string {
kafkaAddr
}
config: = sarama.NewConfig()
fmt.Println("Loading mTLS certificates")
config.Net.TLS.Enable = true
tlsConfig,
err: = getTLSConfig()
if err != nil {
return err
}
config.Net.TLS.Config = tlsConfig
fmt.Println("Connecting to Kafka")
config.Net.DialTimeout = 5 * time.Second
config.Net.ReadTimeout = 5 * time.Second
config.Net.WriteTimeout = 5 * time.Second
client,
err: = sarama.NewClient(addrs, config)
if err != nil {
return err
}
fmt.Println("Creating a producer and a consumer for -", testTopicName)
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
config.Consumer.MaxWaitTime = 5 * time.Second
config.Producer.Return.Errors = true
config.Consumer.Return.Errors = true
producer,
err: = sarama.NewSyncProducerFromClient(client)
if err != nil {
return err
}
consumer,
err: = sarama.NewConsumerFromClient(client)
if err != nil {
return err
}
fmt.Println("Sending messages")
go send_messages(producer)
partConsumer,
err: = consumer.ConsumePartition(testTopicName, 0, 0)
if err != nil {
return err
}
for msg: = range partConsumer.Messages() {
fmt.Printf("Read message - %s\n", msg.Value)
}
return nil
}
func main() {
for {
err: = loop_kafka()
logrus.WithError(err).Println()
fmt.Println("Loop exited")
time.Sleep(2 * time.Second)
}
}
apiVersion: apps/v1
kind: Deployment
metadata:
name: client-other
namespace: otterize-tutorial-kafka-mtls
spec:
selector:
matchLabels:
app: client-other
template:
metadata:
labels:
app: client-other
annotations:
credentials-operator.otterize.com/tls-secret-name: client-other-credentials-secret
spec:
containers:
- name: client-other
image: golang
command: [ "/bin/sh", "-c", "--" ]
args: [ "while true; do cd /app; cp src/* .; go get main; go run .; sleep infinity; done" ]
volumeMounts:
- name: ephemeral
mountPath: /app
- mountPath: /app/src
name: client-other-go
- name: otterize-credentials
mountPath: /var/otterize/credentials
readOnly: true
volumes:
- name: client-other-go
configMap:
name: client-other-go
- name: otterize-credentials
secret:
secretName: client-other-credentials-secret
- name: ephemeral
emptyDir: { }
apiVersion: v1
kind: ConfigMap
metadata:
name: client-other-go
namespace: otterize-tutorial-kafka-mtls
data:
client-other.go: |
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
"io/ioutil"
"time"
)
const (
kafkaAddr = "kafka.kafka:9092"
testTopicName = "mytopic"
certFile = "/var/otterize/credentials/cert.pem"
keyFile = "/var/otterize/credentials/key.pem"
rootCAFile = "/var/otterize/credentials/ca.pem"
)
func getTLSConfig()( * tls.Config, error) {
cert, err: = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, fmt.Errorf("failed loading x509 key pair: %w", err)
}
pool: = x509.NewCertPool()
rootCAPEM, err: = ioutil.ReadFile(rootCAFile)
if err != nil {
return nil, fmt.Errorf("failed loading root CA PEM file: %w ", err)
}
pool.AppendCertsFromPEM(rootCAPEM)
return &tls.Config {
Certificates: [] tls.Certificate {
cert
},
RootCAs: pool,
}, nil
}
func loop_kafka() error {
addrs: = [] string {
kafkaAddr
}
config: = sarama.NewConfig()
fmt.Println("Loading mTLS certificates")
config.Net.TLS.Enable = true
tlsConfig,
err: = getTLSConfig()
if err != nil {
return err
}
config.Net.TLS.Config = tlsConfig
fmt.Println("Connecting to Kafka")
config.Net.DialTimeout = 5 * time.Second
config.Net.ReadTimeout = 5 * time.Second
config.Net.WriteTimeout = 5 * time.Second
client,
err: = sarama.NewClient(addrs, config)
if err != nil {
return err
}
fmt.Println("Creating a producer for -", testTopicName)
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
config.Producer.Return.Errors = true
producer,
err: = sarama.NewSyncProducerFromClient(client)
if err != nil {
return err
}
fmt.Println("Sending messages")
i: = 1
for {
msg: = fmt.Sprintf("Message %d [sent by client-other]", i)
_,
_,
err = producer.SendMessage( & sarama.ProducerMessage {
Topic: testTopicName,
Partition: -1,
Value: sarama.StringEncoder(msg),
})
if err != nil {
return err
}
fmt.Printf("Sent message - %s\n", msg)
time.Sleep(1 * time.Second)
i++
}
return nil
}
func main() {
for {
err: = loop_kafka()
logrus.WithError(err).Println()
fmt.Println("Loop exited")
time.Sleep(2 * time.Second)
}
}
- Deploy the two clients into a namespace called
otterize-tutorial-kafka-mtls
usingkubectl
:
kubectl apply -f https://docs.otterize.com/code-examples/kafka-mtls/all.yaml
Optional: check deployment status
Check that the client pods were deployed:
kubectl get pods -n otterize-tutorial-kafka-mtls
You should see:
NAME READY STATUS RESTARTS AGE
client-5d9646fc46-tw5hs 1/1 Running 0 21s
client-other-59647b448c-w4cpq 1/1 Running 0 21s
Let's monitor, in separate terminal windows, both clients' attempts to call Kafka, so we can see the effects of our changes in real time.
- Open a new terminal window [client] and tail the client log:
kubectl logs -f --tail 1 -n otterize-tutorial-kafka-mtls deploy/client
This client should be able to communicate with the server:
Loading mTLS certificates
Connecting to Kafka
Creating a producer and a consumer for - mytopic
Sending messages
Sent message - Message 1 [sent by client]
Read message - Message 1 [sent by client]
Read message - Message 1 [sent by client-other]
Sent message - Message 2 [sent by client]
Read message - Message 2 [sent by client-other]
Read message - Message 2 [sent by client]
Read message - Message 3 [sent by client-other]
Sent message - Message 3 [sent by client]
Read message - Message 3 [sent by client]
- Open another terminal window [client-other] and tail the client-other log:
kubectl logs -f --tail 1 -n otterize-tutorial-kafka-mtls deploy/client-other
This other client should also be able to communicate with the server:
Loading mTLS certificates
Connecting to Kafka
Creating a producer for - mytopic
Sending messages
Sent message - Message 1 [sent by client-other]
Sent message - Message 2 [sent by client-other]
Sent message - Message 3 [sent by client-other]
If you've attached Otterize OSS to Otterize Cloud, you can now browse to your account at https://app.otterize.com and see the access graph for your cluster:
Why do I see five services?
In addition to the Kafka service and the 2 clients we deployed, the network mapper also picked up the calls between the intents operator and Kafka, and between Kafka and zookeeper, so those discovered intents are reflected in the access graph (in light blue).
The access graph also reflects an intent that's already been applied, and was reported by the intents operator to the access graph: it's the intent which the intents operator created for itself to ensure it has access to Kafka. That's automatically generated by the intents operator when you apply the KafkaServerConfig: at that point the intents operator knows there is a Kafka service, and in order to ensure it can reach it and configure it, it declares its intent to do so. Specifically that will generate a network policy between the intents operator and the Kafka service, if network policies are in active enforcement and supported by your cluster, so the intents operator doesn't get blocked itself.
Apply intents
- The client declares its intent to call the server with this
intents.yaml
file:
apiVersion: k8s.otterize.com/v1alpha2
kind: ClientIntents
metadata:
name: client
namespace: otterize-tutorial-kafka-mtls
spec:
service:
name: client
calls:
- name: kafka.kafka
type: kafka
topics:
- name: mytopic
operations: [ all ]
Client intents are the cornerstone of intent-based access control.
- Keep an eye on the logs being tailed in the [client-other] while you apply this
intents.yaml
file in your main terminal window using:
kubectl apply -f https://docs.otterize.com/code-examples/kafka-mtls/client-intents.yaml
You should quickly see in the [client-other] that the other client cannot access the topic:
Sent message - Message 12 [sent by client-other] # <- before applying the intents file
Sent message - Message 13 [sent by client-other] # <- before applying the intents file
time="2022-10-06T09:44:53Z" level=info error="kafka server: # <- after applying the intents file
The client is not authorized to access this topic"
Loop exited
Meanwhile, in the [client] terminal you can see that the client can access the topic:
Sent message - Message 24 [sent by client]
Read message - Message 24 [sent by client]
Sent message - Message 25 [sent by client]
Read message - Message 25 [sent by client]
- Verify that an ACL for this client was configured on the Kafka broker:
kubectl logs -n kafka statefulset/kafka | grep "Processing Acl change" | grep mytopic | tail -n 1
You should see:
[2022-09-13 10:44:52,803] INFO Processing Acl change notification for
ResourcePattern(resourceType=TOPIC, name=mytopic, patternType=LITERAL),
versionedAcls : Set(User:ANONYMOUS has DENY permission for operations:
ALL from hosts: *, User:CN=client.otterize-tutorial-kafka-mtls,O=SPIRE,C=US has ALLOW permission
for operations: ALL from hosts: *), zkVersion : 6 (kafka.security.authorizer.AclAuthorizer)
If you've attached Otterize OSS to Otterize Cloud, go back to see the access graph in your browser. Click on the Kafka service, and click at the bottom of it to focus on it and show its details:
We can see what happened:
- Kafka topic-specific intents from [client] are declared (solid black inner line and Kafka icon).
- Calls from [client-other] are not declared (missing "white" inner line).
- Looking at the Kafka service, we can see that [client] has specific access configured (via Kafka ACLs) to perform
all
operations on themytopic
topic.
Since discovered intents from the network mapper don't specify what specific topics and operations clients are performing (or attempting to perform), the access graph cannot show information on what is being blocked vs allowed (red vs green). That feature is in development.
Also, the access graph shows information about the mTLS certificates (credentials) distributed to the various services, as long as Cloud-managed credentials are being used. Visibility for certificates distributed through an in-cluster SPIRE is in development.
What did we accomplish?
Controlling Kafka access no longer means touching ACLs, issuing and managing and distributing certs, establishing trust, etc.
As we saw with pod-to-pod access, clients simply declare with their intents files the Kafka access they need, and define a place on their filesystem where they'll get the appropriate credentials (certs).
The next
kubectl apply
ensures that all the appropriate certs are issued and distributed, and that Kafka ACLs are configured to reflect precisely the intended topic-level access.
Expand to see what happened behind the scenes
One-time setups:
We configured the Helm chart for Kafka to:
- Allow the Otterize intents operator to be a Kafka super user (authenticated with a certificate).
- Use the SSL protocol for the Kafka listeners.
- Let Otterize know it should generate mTLS credentials in the Java Key Store and Java Trust Store formats, and store them as a Kubernetes secret.
- Use mTLS to authenticate clients, using this Kubernetes secret.
We configured Kafka itself to:
- Add the TLS certificates of the Otterize credentials operator.
- Set the default ACL for all topics to allow anonymous access.
Per-client setups:
We configured each of our clients to:
- Let Otterize know it should generate mTLS credentials for that client.
- Mount the Kubernetes secret in a local volume.
This already enables mTLS authentication between both clients and Kafka.
Then we applied intents:
- We only declared that the client pod (not the client-other pod) needed to access the
mytopic
topic.
This allowed the client pod its access and protected mytopic
from any unintended access, such as from client-other.
Try to create an intents file yourself for client-other, and apply it to allow this other client to access the topic.
What's next
- Follow a more visual tutorial for securing Kafka with IBAC in a demo ecommerce application.
- Learn how to easily secure pod-to-pod access with IBAC using Kubernetes network policies, in a hands-on tutorial or a more visual tutorial.
Teardown
Take care to remove the intents before removing the KafkaServerConfig or the Kafka broker, as the operator will not know how to remove the intents if you first make it forget about the Kafka broker or it can't access the broker. If it's unable to remove the ACLs for the intents, the operator will prevent the intents from being deleted until it is able to do so.
To remove the deployed examples run:
# run this first:
kubectl delete -f https://docs.otterize.com/code-examples/kafka-mtls/client-intents.yaml
# then the rest:
kubectl delete -f https://docs.otterize.com/code-examples/kafka-mtls/all.yaml
kubectl delete -f https://docs.otterize.com/code-examples/kafka-mtls/kafkaserverconfig.yaml
helm uninstall kafka -n kafka