Skip to main content

Kafka Watcher

To deploy the network mapper with the Kafka watcher component, do the following:

helm repo add otterize https://helm.otterize.com
helm repo update
helm install network-mapper otterize/network-mapper -n otterize-system --create-namespace --set kafkawatcher.enable=true

Make sure to include --set kafkaServers={} and provide a list of Kafka servers whose logs the Kafka watcher should watch. Servers in the list should be specified as name.namespace.

Kafka watcher parameters

KeyDescriptionDefault
kafkawatcher.enableEnable Kafka watcher deployment (beta).false
kafkawatcher.image.repositoryKafka watcher image repository.otterize
kafkawatcher.image.imageKafka watcher image.network-mapper-kafka-watcher
kafkawatcher.image.tagKafka watcher image tag.latest
kafkawatcher.pullPolicyKafka watcher pull policy.(none)
kafkawatcher.pullSecretsKafka watcher pull secrets.(none)
kafkawatcher.resourcesResources override.(none)
kafkawatcher.kafkaServersKafka servers to watch, specified as pod.namespace items.(none)

Enabling debug logs in Kafka servers

The Kafka watcher periodically examines logs of Kafka servers provided by the user through configuration, parses them and deduces topic-level access to Kafka from pods in the Kubernetes cluster. In order for the Kafka watcher to correctly examine topic-level access, the Kafka server's ACL authorizer logger should be configured to log at debug level, and to stdout.

Install Kafka via Helm with debug logs preconfigured

For the Bitnami Kafka Helm chart used in other Kafka tutorials, we can add the following configuration to the chart's values.yaml to start Kafka with its ACL authorizer logging to stdout at debug level:

Kafka debug logs values.yaml
log4j: |
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Unspecified loggers and loggers with additivity=true output to server.log and stdout
# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise
log4j.rootLogger=INFO, stdout, kafkaAppender

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.ConsoleAppender
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.stateChangeAppender=org.apache.log4j.ConsoleAppender
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.requestAppender=org.apache.log4j.ConsoleAppender
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.cleanerAppender=org.apache.log4j.ConsoleAppender
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.controllerAppender=org.apache.log4j.ConsoleAppender
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.authorizerAppender=org.apache.log4j.ConsoleAppender
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n


# Change the line below to adjust ZK client logging
log4j.logger.org.apache.zookeeper=INFO

# Change the two lines below to adjust the general broker logging level (output to server.log and stdout)
log4j.logger.kafka=INFO, stdout
log4j.logger.org.apache.kafka=INFO

# Change to DEBUG or TRACE to enable request logging
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false

# Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output
# related to the handling of requests
#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false

# Change the line below to adjust KRaft mode controller logging
log4j.logger.org.apache.kafka.controller=INFO, controllerAppender
log4j.additivity.org.apache.kafka.controller=false

# Change the line below to adjust ZK mode controller logging
log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false

log4j.logger.state.change.logger=INFO, stateChangeAppender
log4j.additivity.state.change.logger=false

# Access denials are logged at INFO level, change to DEBUG to also log allowed accesses
log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

Notice the log4j.logger.kafka.authorizer.logger=DEBUG line that sets the ACL authorizer logger to debug level.

Configure an already running Kafka server

Alternatively, we can also configure an already-running Kafka server and set its ACL authorizer logger level to debug. The Kafka server must be configured to log to stdout so the Kafka watcher could examine its logs.

First, deploy an interactive Kafka client:

kubectl apply -f https://docs.otterize.com/code-examples/ibac-for-kafka/client-deployment-no-creds.yaml

Connect to the interactive Kafka client using shell (replace the pod name with the name from your cluster):

kubectl exec -it -n ibac-for-kafka interactive-869fc7b89b-rgmfm -- /bin/bash

Once connected, use the interactive shell to configure the ACL authorizer's logging level:

$ cd opt/bitnami/kafka/
# query existing logging settings. Replace "kafka.kafka:9092" with the relevant service name, namespace and port.
$ bin/kafka-configs.sh --bootstrap-server kafka.kafka:9092 --describe --all --entity-type broker-loggers --entity-name 0 | grep authorizer
kafka.security.authorizer.AclAuthorizer=INFO sensitive=false synonyms={}
kafka.authorizer.logger=INFO sensitive=false synonyms={}
# enable authorizer debug logging
$ bin/kafka-configs.sh --bootstrap-server kafka.kafka:9092 --alter --add-config "kafka.authorizer.logger=DEBUG" --entity-type broker-loggers --entity-name 0
Completed updating config for broker-logger 0.

Check out your Kafka server logs. You should now see log records indicating allow/denied connections from the ACL authorizer (assuming you have clients producing/consuming data from topics):

[2023-03-22 16:06:22,746] DEBUG operation = READ on resource = ResourcePattern(resourceType=TOPIC, name=mytopic, patternType=LITERAL) from host = 10.244.0.12 is ALLOW based on acl = User:* has ALLOW permission for operations: ALL from hosts: * (kafka.authorizer.logger)