Skip to content

Kafka 4

Collect Kafka 4 Metrics

Configuration

Prerequisites

Download JMX Exporter

Download URL: https://github.com/prometheus/jmx_exporter/releases/tag/1.3.0

Configure JMX Script and Startup Parameters

Note: To collect Producer, Consumer, Streams, and Connect metrics, separate processes need to be started. Ensure to replace the corresponding yaml files and startup scripts when starting each process, as shown below.

KRaft Metrics
  • Create KRaft Metrics configuration file kafka.yml
# ------------------------------------------------------------
# Kafka 4 Prometheus JMX Exporter Configuration
# ------------------------------------------------------------
lowercaseOutputName: false
lowercaseOutputLabelNames: true
cacheRules: true
rules:

# 1. Broker / Topic / Partition Metrics
  - pattern: kafka.server<type=BrokerTopicMetrics, name=(BytesInPerSec|BytesOutPerSec|MessagesInPerSec|TotalFetchRequestsPerSec|ProduceRequestsPerSec|FailedProduceRequestsPerSec|TotalProduceRequestsPerSec|ReassignmentBytesInPerSec|ReassignmentBytesOutPerSec|ProduceMessageConversionsPerSec|FetchMessageConversionsPerSec)(?:, topic=([-\.\w]*))?><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
    name: kafka_server_broker_topic_metrics_$1
    type: GAUGE
    labels:
      topic: "$2"

# 2. Request / Network Metrics
  - pattern: kafka.network<type=RequestMetrics, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
    name: kafka_network_request_metrics_$1
    type: GAUGE

# 3. Socket Server Metrics
  - pattern: kafka.network<type=SocketServer, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate|Value)
    name: kafka_network_socket_server_metrics_$1
    type: GAUGE

# 4. Log / Segment / Cleaner Metrics
  - pattern: kafka.log<type=LogFlushStats, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
    name: kafka_log_$1_$2
    type: GAUGE

# 5. Controller (KRaft) Metrics
  - pattern: kafka.controller<type=KafkaController, name=(.+)><>(Count|Value)
    name: kafka_controller_$1
    type: GAUGE

# 6. Group / Coordinator Metrics
  - pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(.+)><>(Count|Value)
    name: kafka_coordinator_group_metadata_manager_$1
    type: GAUGE

# 7. KRaft Specific Metrics
  - pattern: kafka.controller<type=KafkaController, name=(LeaderElectionSuccessRate|LeaderElectionLatencyMs)><>(Count|Value)
    name: kafka_controller_$1
    type: GAUGE

# 8. New Generation Consumer Rebalance Protocol Metrics
  - pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(RebalanceTimeMs|RebalanceFrequency)><>(Count|Value)
    name: kafka_coordinator_group_metadata_manager_$1
    type: GAUGE

# 9. Queue Metrics
  - pattern: kafka.server<type=Queue, name=(QueueSize|QueueConsumerRate)><>(Count|Value)
    name: kafka_server_queue_$1
    type: GAUGE

# 10. Client Metrics
  - pattern: kafka.network<type=RequestMetrics, name=(ClientConnections|ClientRequestRate|ClientResponseTime)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
    name: kafka_network_request_metrics_$1
    type: GAUGE

# 11. Log Flush Rate and Time
  - pattern: kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
    name: kafka_log_log_flush_rate_and_time_ms
    type: GAUGE
  • Startup Parameters
export KAFKA_HEAP_OPTS="-Xms1g -Xmx1g"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9999 \
  -Dcom.sun.management.jmxremote.rmi.port=9999 \
  -Dcom.sun.management.jmxremote.authenticate=false \
  -Dcom.sun.management.jmxremote.ssl=false \
  -Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties
Producer Metrics
  • Create Producer Metrics configuration file producer.yml
---
lowercaseOutputName: true
rules:
  # New: producer-node-metrics
  - pattern: kafka\.producer<type=producer-node-metrics, client-id=([^,]+), node-id=([^>]+)><>([^:]+)
    name: kafka_producer_node_$3
    labels:
      client_id: "$1"
      node_id: "$2"
    type: GAUGE

  - pattern: 'kafka\.producer<type=producer-metrics, client-id=([^>]+)><>([^:,\s]+).*'
    name: 'kafka_producer_metrics_$2'
    labels:
      client_id: "$1"
    type: GAUGE

  # Collect all Selector metrics (New in Kafka 4.0)
  - pattern: 'kafka\.(?:(producer|consumer|connect))<type=(producer|consumer|connect)-metrics, client-id=([^>]+)><>(connection-.+|io-.+|network-.+|select-.+|send-.+|receive-.+|reauthentication-.+)'
    name: 'kafka_${1}_${4}'
    labels:
      client_id: '$3'
    type: GAUGE
  • Startup Parameters
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7072:/opt/jmx_exporter/producer.yml"

/opt/kafka/kafka/bin/kafka-console-producer.sh \
  --broker-list localhost:9092 \
  --topic xxxx \
  --producer-property bootstrap.servers=localhost:9092
Consumer Metrics
  • Create Consumer Metrics configuration file consumer.yml
lowercaseOutputName: true
rules:
  # consumer-coordinator-metrics
  - pattern: 'kafka\.consumer<type=consumer-coordinator-metrics, client-id=([^>]+)><>([^:,\s]+).*'
    name: 'kafka_consumer_coordinator_metrics_$2'
    labels:
      client_id: "$1"
    type: GAUGE

  - pattern: 'kafka\.consumer<type=consumer-metrics, client-id=([^>]+)><>([^:,\s]+).*'
    name: 'kafka_consumer_metrics_$2'
    labels:
      client_id: "$1"
  • Startup Parameters
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7073:/opt/jmx_exporter/consumer.yml"

/opt/kafka/kafka/bin/kafka-console-consumer.sh \
  --broker-list localhost:9092 \
  --topic xxxx \
  --producer-property bootstrap.servers=localhost:9092
Streams Metrics
  • Create Streams Metrics configuration file stream.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: true

rules:
  # Kafka Streams application metrics - Remove special characters
  - pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+)$'
    name: kafka_streams_$2
    labels:
      client_id: "$1"

  # Handle attribute names with special characters
  - pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+):(.+)$'
    name: kafka_streams_$2_$3
    labels:
      client_id: "$1"

  # Processor Node metrics
  - pattern: 'kafka.streams<type=stream-processor-node-metrics, client-id=(.+), task-id=(.+), processor-node-id=(.+)><>(.+)'
    name: kafka_streams_processor_$4
    labels:
      client_id: "$1"
      task_id: "$2"
      processor_node_id: "$3"

  # Task metrics
  - pattern: 'kafka.streams<type=stream-task-metrics, client-id=(.+), task-id=(.+)><>(.+)'
    name: kafka_streams_task_$3
    labels:
      client_id: "$1"
      task_id: "$2"

  # Thread metrics
  - pattern: 'kafka.streams<type=stream-thread-metrics, client-id=(.+), thread-id=(.+)><>(.+)'
    name: kafka_streams_thread_$3
    labels:
      client_id: "$1"
      thread_id: "$2"

  # JVM metrics
  - pattern: 'java.lang<type=Memory><>(.+)'
    name: jvm_memory_$1

  - pattern: 'java.lang<type=GarbageCollector, name=(.+)><>(\w+)'
    name: jvm_gc_$2
    labels:
      gc: "$1"

  # Thread pool metrics
  - pattern: 'java.lang<type=Threading><>(.+)'
    name: jvm_threads_$1

  # Default rule
  - pattern: '(.*)'
  • Startup Parameters
export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9996 \
  -Dcom.sun.management.jmxremote.rmi.port=9996 \
  -Dcom.sun.management.jmxremote.authenticate=false \
  -Dcom.sun.management.jmxremote.ssl=false \
  -Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7075:/opt/jmx_exporter/stream.yml"

java $KAFKA_HEAP_OPTS $KAFKA_JMX_OPTS $EXTRA_ARGS -cp "libs/*:my-streams.jar" WordCountDemo
Connect Metrics
  • Create Connect Metrics configuration file connect.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
  # 1) connect-worker-metrics (Global)
  - pattern: 'kafka\.connect<type=connect-worker-metrics><>([^:]+)'
    name: 'kafka_connect_worker_$1'
    type: GAUGE

  # 2) connect-worker-metrics,connector=xxx
  - pattern: 'kafka\.connect<type=connect-worker-metrics, connector=([^>]+)><>([^:]+)'
    name: 'kafka_connect_worker_$2'
    labels:
      connector: "$1"
    type: GAUGE

  # 3) connect-worker-rebalance-metrics
  - pattern: 'kafka\.connect<type=connect-worker-rebalance-metrics><>([^:]+)'
    name: 'kafka_connect_worker_rebalance_$1'
    type: GAUGE

  # 4) connector-task-metrics
  - pattern: 'kafka\.connect<type=connector-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'
    name: 'kafka_connect_task_$3'
    labels:
      connector: "$1"
      task_id: "$2"
    type: GAUGE

  # 5) sink-task-metrics
  - pattern: 'kafka\.connect<type=sink-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'
    name: 'kafka_connect_sink_task_$3'
    labels:
      connector: "$1"
      task_id: "$2"
  • Startup Parameters
export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \
  -Dcom.sun.management.jmxremote.authenticate=false \
  -Dcom.sun.management.jmxremote.ssl=false \
  -Dcom.sun.management.jmxremote.port=9995 \
  -Dcom.sun.management.jmxremote.rmi.port=9995 \
  -Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7074:/opt/jmx_exporter/connect.yml"

# Start Kafka Connect
/opt/kafka/kafka/bin/connect-distributed.sh /opt/kafka/kafka/config/connect-distributed.properties

After successful startup, you can view the collected metrics via curl http://IP:Port/metrics.

Configure DataKit

  • Navigate to the conf.d/prom directory under the DataKit installation directory, copy prom.conf.sample and rename it to kafka.conf

cp prom.conf.sample kafka.conf

  • Adjust kafka.conf
[[inputs.prom]]
  ## Exporter URLs.
  urls = ["http://127.0.0.1:7071/metrics","http://127.0.0.1:7072/metrics","http://127.0.0.1:7073/metrics","http://127.0.0.1:7074/metrics","http://127.0.0.1:7075/metrics"]

  ## Collector alias.
  source = "kafka"

  ## Prioritier over 'measurement_name' configuration.
  [[inputs.prom.measurements]]
    prefix = "kafka_controller_"
    name = "kafka_controller"

  [[inputs.prom.measurements]]
    prefix = "kafka_network_"
    name = "kafka_network"

  [[inputs.prom.measurements]]
    prefix = "kafka_log_"
    name = "kafka_log"

  [[inputs.prom.measurements]]
    prefix = "kafka_server_"
    name = "kafka_server"

  [[inputs.prom.measurements]]
    prefix = "kafka_connect_"
    name = "kafka_connect"

  [[inputs.prom.measurements]]
    prefix = "kafka_stream_"
    name = "kafka_stream"
  • Restart DataKit

Execute the following command

datakit service -R

Metrics

Below are some Kafka 4 metrics descriptions. For more metrics, refer to Kafka Metrics Details

kafka_server Measurement

Metric Name Description Unit
Fetch_queue_size Fetch queue size count
Produce_queue_size Producer queue size count
Request_queue_size Request queue size count
broker_topic_metrics_BytesInPerSec Client byte input rate bytes/s
broker_topic_metrics_BytesOutPerSec Client byte output rate bytes/s
broker_topic_metrics_FailedProduceRequestsPerSec Produce request failure rate count/s
broker_topic_metrics_FetchMessageConversionsPerSec Fetch message conversion rate count/s
broker_topic_metrics_MessagesInPerSec Incoming message rate count/s
broker_topic_metrics_ProduceMessageConversionsPerSec Producer message conversion rate count/s
broker_topic_metrics_TotalFetchRequestsPerSec Fetch request (from client or follower) rate count/s
broker_topic_metrics_TotalProduceRequestsPerSec Producer request rate count/s
socket_server_metrics_connection_count SocketServer connection count count
socket_server_metrics_connection_close_total SocketServer closed connection count count
socket_server_metrics_incoming_byte_rate SocketServer input byte rate bytes/s

kafka_network Measurement

Metric Name Description Unit
request_metrics_RequestBytes_request_AddOffsetsToTxn AddOffsetsToTxn request size bytes
request_metrics_RequestBytes_request_Fetch Fetch request size count
request_metrics_RequestBytes_request_FetchConsumer FetchConsumer request size bytes
request_metrics_RequestBytes_request_FetchFollower FetchFollower request size bytes
request_metrics_TotalTimeMs_request_CreateTopics CreateTopics request total time ms
request_metrics_TotalTimeMs_request_CreatePartitions CreatePartitions request total time ms
request_metrics_RequestQueueTimeMs_request_CreateTopics CreateTopics request queue wait time ms
request_metrics_RequestQueueTimeMs_request_CreatePartitions CreatePartitions request queue wait time ms
request_metrics_RequestQueueTimeMs_request_Produce Produce request queue wait time ms
request_metrics_ResponseSendTimeMs_request_CreateTopics CreateTopics request response time ms
request_metrics_ResponseSendTimeMs_request_CreatePartitions CreatePartitions request response time ms

kafka_controller Measurement

Metric Name Description Unit
ActiveBrokerCount Active Broker count count
ActiveControllerCount Active controller count count
GlobalPartitionCount Partition count count
GlobalTopicCount Topic count count
OfflinePartitionsCount Offline partition count count
PreferredReplicaImbalanceCount Preferred Leader election condition partition count count
OfflinePartitionsCount Offline partition count count
TimedOutBrokerHeartbeatCount Broker heartbeat timeout count count
LastAppliedRecordLagMs Last applied record lag time ms
LastAppliedRecordOffset Last applied record offset -
MetadataErrorCount Metadata error count count
NewActiveControllersCount New controller election count count

kafka_producer Measurement

Metric Name Description Unit
producer_metrics_batch_split_rate Batch split rate count/s
producer_metrics_buffer_available_bytes Unused buffer memory total bytes
producer_metrics_buffer_exhausted_rate Average record send count discarded due to buffer exhaustion per second count/s
producer_metrics_buffer_total_bytes Buffer total byte size bytes
producer_metrics_bufferpool_wait_ratio Buffer pool wait ratio %
producer_metrics_bufferpool_wait_time_ns_total Buffer pool wait time ms
producer_metrics_connection_close_rate Connection close rate count/s
producer_metrics_connection_count Connection close count count
producer_metrics_flush_time_ns_total Flush total time ns
producer_metrics_incoming_byte_rate Input byte rate bytes/s
producer_metrics_outgoing_byte_rate Output byte rate bytes/s
producer_metrics_request_rate Request rate count/s
producer_metrics_request_size_avg Request size bytes

kafka_consumer Measurement

Metric Name Description Unit
consumer_coordinator_metrics_failed_rebalance_total Rebalance failure count count
consumer_coordinator_metrics_heartbeat_rate Average heartbeat count per second count/s
consumer_coordinator_metrics_heartbeat_response_time_max Heartbeat response max time count
consumer_coordinator_metrics_join_rate Group join rate per second count/s
consumer_coordinator_metrics_join_total Group join total count
consumer_coordinator_metrics_last_rebalance_seconds_ago Seconds since last rebalance event ms
consumer_coordinator_metrics_rebalance_latency_total Rebalance latency total ms
consumer_fetch_manager_metrics_bytes_consumed_rate Bytes consumed per second bytes/s
consumer_fetch_manager_metrics_fetch_latency_avg Fetch request latency ms
consumer_metrics_connection_count Connection count count
consumer_metrics_connection_count Connection close count count/s
consumer_metrics_incoming_byte_rate Input byte rate bytes/s
consumer_metrics_outgoing_byte_rate Output byte rate bytes/s
consumer_metrics_select_rate Select rate count/s
consumer_metrics_last_poll_seconds_ago IO wait time ms
consumer_metrics_last_poll_seconds_ago IO wait time ms

kafka_connect Measurement

Metric Name Description Unit
worker_connector_count Connector count count
worker_task_startup_attempts_total Task startup retry count count
worker_connector_startup_attempts_total Connector startup attempt count count
worker_task_startup_failure_total Task startup failure count count
worker_connector_startup_failure_percentage Connection failure rate %
worker_rebalance_completed_rebalances_total Rebalance completion total count
worker_task_startup_failure_percentage Task startup failure percentage %
worker_rebalance_time_since_last_rebalance_ms Time since last rebalance ms
worker_task_startup_attempts_total Task startup attempt count count

kafka_stream Measurement

Metric Name Description Unit
stream_thread_metrics_thread_start_time Thread start time Timestamp ms
stream_thread_metrics_task_created_total Task creation total count
stream_state_metrics_block_cache_capacity Block cache size bytes
stream_state_metrics_all_rate All operation rate count/s
stream_state_metrics_block_cache_usage Block cache usage %
stream_state_metrics_bytes_read_compaction_rate Byte read compression rate bytes/s
stream_state_metrics_bytes_written_compaction_rate Byte write compression rate bytes/s
stream_state_metrics_block_cache_index_hit_ratio Block cache index hit ratio %
stream_state_metrics_block_cache_data_hit_ratio Block cache data hit ratio %
stream_state_metrics_block_cache_filter_hit_ratio Block cache filter hit ratio %
stream_state_metrics_bytes_written_rate Byte write rate bytes/s
stream_state_metrics_bytes_read_rate Byte read rate bytes/s
stream_state_metrics_block_cache_filter_hit_ratio Cache size bytes bytes
stream_task_metrics_process_rate Records processed per second bytes/s
stream_task_metrics_enforced_processing_rate Enforced processing count per second bytes/s
stream_task_metrics_active_process_ratio Active process ratio %
stream_thread_metrics_commit_rate Commit rate count/s
stream_thread_metrics_poll_latency_avg Poll latency time ms
stream_thread_metrics_poll_rate Poll rate count/s
stream_thread_metrics_blocked_time_ns_total Blocked time ns
stream_topic_metrics_bytes_consumed_total Consumed bytes bytes
stream_topic_metrics_bytes_produced_total Produced bytes bytes
stream_topic_metrics_records_consumed_total Total records consumed by source processor nodes count
stream_topic_metrics_records_produced_total Total records produced by sink processor nodes count