Kafka 4
Collect Kafka 4 Metrics
Configuration¶
Prerequisites¶
Download JMX Exporter¶
Download URL: https://github.com/prometheus/jmx_exporter/releases/tag/1.3.0
Configure JMX Script and Startup Parameters¶
Note: To collect Producer, Consumer, Streams, and Connect metrics, separate processes need to be started. Ensure to replace the corresponding yaml files and startup scripts when starting each process, as shown below.
KRaft Metrics¶
- Create KRaft Metrics configuration file
kafka.yml
# ------------------------------------------------------------
# Kafka 4 Prometheus JMX Exporter Configuration
# ------------------------------------------------------------
lowercaseOutputName: false
lowercaseOutputLabelNames: true
cacheRules: true
rules:
# 1. Broker / Topic / Partition Metrics
- pattern: kafka.server<type=BrokerTopicMetrics, name=(BytesInPerSec|BytesOutPerSec|MessagesInPerSec|TotalFetchRequestsPerSec|ProduceRequestsPerSec|FailedProduceRequestsPerSec|TotalProduceRequestsPerSec|ReassignmentBytesInPerSec|ReassignmentBytesOutPerSec|ProduceMessageConversionsPerSec|FetchMessageConversionsPerSec)(?:, topic=([-\.\w]*))?><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
name: kafka_server_broker_topic_metrics_$1
type: GAUGE
labels:
topic: "$2"
# 2. Request / Network Metrics
- pattern: kafka.network<type=RequestMetrics, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
name: kafka_network_request_metrics_$1
type: GAUGE
# 3. Socket Server Metrics
- pattern: kafka.network<type=SocketServer, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate|Value)
name: kafka_network_socket_server_metrics_$1
type: GAUGE
# 4. Log / Segment / Cleaner Metrics
- pattern: kafka.log<type=LogFlushStats, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
name: kafka_log_$1_$2
type: GAUGE
# 5. Controller (KRaft) Metrics
- pattern: kafka.controller<type=KafkaController, name=(.+)><>(Count|Value)
name: kafka_controller_$1
type: GAUGE
# 6. Group / Coordinator Metrics
- pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(.+)><>(Count|Value)
name: kafka_coordinator_group_metadata_manager_$1
type: GAUGE
# 7. KRaft Specific Metrics
- pattern: kafka.controller<type=KafkaController, name=(LeaderElectionSuccessRate|LeaderElectionLatencyMs)><>(Count|Value)
name: kafka_controller_$1
type: GAUGE
# 8. New Generation Consumer Rebalance Protocol Metrics
- pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(RebalanceTimeMs|RebalanceFrequency)><>(Count|Value)
name: kafka_coordinator_group_metadata_manager_$1
type: GAUGE
# 9. Queue Metrics
- pattern: kafka.server<type=Queue, name=(QueueSize|QueueConsumerRate)><>(Count|Value)
name: kafka_server_queue_$1
type: GAUGE
# 10. Client Metrics
- pattern: kafka.network<type=RequestMetrics, name=(ClientConnections|ClientRequestRate|ClientResponseTime)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
name: kafka_network_request_metrics_$1
type: GAUGE
# 11. Log Flush Rate and Time
- pattern: kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
name: kafka_log_log_flush_rate_and_time_ms
type: GAUGE
- Startup Parameters
export KAFKA_HEAP_OPTS="-Xms1g -Xmx1g"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.rmi.port=9999 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties
Producer Metrics¶
- Create Producer Metrics configuration file
producer.yml
---
lowercaseOutputName: true
rules:
# New: producer-node-metrics
- pattern: kafka\.producer<type=producer-node-metrics, client-id=([^,]+), node-id=([^>]+)><>([^:]+)
name: kafka_producer_node_$3
labels:
client_id: "$1"
node_id: "$2"
type: GAUGE
- pattern: 'kafka\.producer<type=producer-metrics, client-id=([^>]+)><>([^:,\s]+).*'
name: 'kafka_producer_metrics_$2'
labels:
client_id: "$1"
type: GAUGE
# Collect all Selector metrics (New in Kafka 4.0)
- pattern: 'kafka\.(?:(producer|consumer|connect))<type=(producer|consumer|connect)-metrics, client-id=([^>]+)><>(connection-.+|io-.+|network-.+|select-.+|send-.+|receive-.+|reauthentication-.+)'
name: 'kafka_${1}_${4}'
labels:
client_id: '$3'
type: GAUGE
- Startup Parameters
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7072:/opt/jmx_exporter/producer.yml"
/opt/kafka/kafka/bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic xxxx \
--producer-property bootstrap.servers=localhost:9092
Consumer Metrics¶
- Create Consumer Metrics configuration file
consumer.yml
lowercaseOutputName: true
rules:
# consumer-coordinator-metrics
- pattern: 'kafka\.consumer<type=consumer-coordinator-metrics, client-id=([^>]+)><>([^:,\s]+).*'
name: 'kafka_consumer_coordinator_metrics_$2'
labels:
client_id: "$1"
type: GAUGE
- pattern: 'kafka\.consumer<type=consumer-metrics, client-id=([^>]+)><>([^:,\s]+).*'
name: 'kafka_consumer_metrics_$2'
labels:
client_id: "$1"
- Startup Parameters
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7073:/opt/jmx_exporter/consumer.yml"
/opt/kafka/kafka/bin/kafka-console-consumer.sh \
--broker-list localhost:9092 \
--topic xxxx \
--producer-property bootstrap.servers=localhost:9092
Streams Metrics¶
- Create Streams Metrics configuration file
stream.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Kafka Streams application metrics - Remove special characters
- pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+)$'
name: kafka_streams_$2
labels:
client_id: "$1"
# Handle attribute names with special characters
- pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+):(.+)$'
name: kafka_streams_$2_$3
labels:
client_id: "$1"
# Processor Node metrics
- pattern: 'kafka.streams<type=stream-processor-node-metrics, client-id=(.+), task-id=(.+), processor-node-id=(.+)><>(.+)'
name: kafka_streams_processor_$4
labels:
client_id: "$1"
task_id: "$2"
processor_node_id: "$3"
# Task metrics
- pattern: 'kafka.streams<type=stream-task-metrics, client-id=(.+), task-id=(.+)><>(.+)'
name: kafka_streams_task_$3
labels:
client_id: "$1"
task_id: "$2"
# Thread metrics
- pattern: 'kafka.streams<type=stream-thread-metrics, client-id=(.+), thread-id=(.+)><>(.+)'
name: kafka_streams_thread_$3
labels:
client_id: "$1"
thread_id: "$2"
# JVM metrics
- pattern: 'java.lang<type=Memory><>(.+)'
name: jvm_memory_$1
- pattern: 'java.lang<type=GarbageCollector, name=(.+)><>(\w+)'
name: jvm_gc_$2
labels:
gc: "$1"
# Thread pool metrics
- pattern: 'java.lang<type=Threading><>(.+)'
name: jvm_threads_$1
# Default rule
- pattern: '(.*)'
- Startup Parameters
export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9996 \
-Dcom.sun.management.jmxremote.rmi.port=9996 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7075:/opt/jmx_exporter/stream.yml"
java $KAFKA_HEAP_OPTS $KAFKA_JMX_OPTS $EXTRA_ARGS -cp "libs/*:my-streams.jar" WordCountDemo
Connect Metrics¶
- Create Connect Metrics configuration file
connect.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# 1) connect-worker-metrics (Global)
- pattern: 'kafka\.connect<type=connect-worker-metrics><>([^:]+)'
name: 'kafka_connect_worker_$1'
type: GAUGE
# 2) connect-worker-metrics,connector=xxx
- pattern: 'kafka\.connect<type=connect-worker-metrics, connector=([^>]+)><>([^:]+)'
name: 'kafka_connect_worker_$2'
labels:
connector: "$1"
type: GAUGE
# 3) connect-worker-rebalance-metrics
- pattern: 'kafka\.connect<type=connect-worker-rebalance-metrics><>([^:]+)'
name: 'kafka_connect_worker_rebalance_$1'
type: GAUGE
# 4) connector-task-metrics
- pattern: 'kafka\.connect<type=connector-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'
name: 'kafka_connect_task_$3'
labels:
connector: "$1"
task_id: "$2"
type: GAUGE
# 5) sink-task-metrics
- pattern: 'kafka\.connect<type=sink-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'
name: 'kafka_connect_sink_task_$3'
labels:
connector: "$1"
task_id: "$2"
- Startup Parameters
export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Dcom.sun.management.jmxremote.port=9995 \
-Dcom.sun.management.jmxremote.rmi.port=9995 \
-Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7074:/opt/jmx_exporter/connect.yml"
# Start Kafka Connect
/opt/kafka/kafka/bin/connect-distributed.sh /opt/kafka/kafka/config/connect-distributed.properties
After successful startup, you can view the collected metrics via curl http://IP:Port/metrics
.
Configure DataKit¶
- Navigate to the
conf.d/prom
directory under the DataKit installation directory, copyprom.conf.sample
and rename it tokafka.conf
cp prom.conf.sample kafka.conf
- Adjust
kafka.conf
[[inputs.prom]]
## Exporter URLs.
urls = ["http://127.0.0.1:7071/metrics","http://127.0.0.1:7072/metrics","http://127.0.0.1:7073/metrics","http://127.0.0.1:7074/metrics","http://127.0.0.1:7075/metrics"]
## Collector alias.
source = "kafka"
## Prioritier over 'measurement_name' configuration.
[[inputs.prom.measurements]]
prefix = "kafka_controller_"
name = "kafka_controller"
[[inputs.prom.measurements]]
prefix = "kafka_network_"
name = "kafka_network"
[[inputs.prom.measurements]]
prefix = "kafka_log_"
name = "kafka_log"
[[inputs.prom.measurements]]
prefix = "kafka_server_"
name = "kafka_server"
[[inputs.prom.measurements]]
prefix = "kafka_connect_"
name = "kafka_connect"
[[inputs.prom.measurements]]
prefix = "kafka_stream_"
name = "kafka_stream"
- Restart DataKit
Execute the following command
Metrics¶
Below are some Kafka 4 metrics descriptions. For more metrics, refer to Kafka Metrics Details
kafka_server
Measurement¶
Metric Name | Description | Unit |
---|---|---|
Fetch_queue_size |
Fetch queue size | count |
Produce_queue_size |
Producer queue size | count |
Request_queue_size |
Request queue size | count |
broker_topic_metrics_BytesInPerSec |
Client byte input rate | bytes/s |
broker_topic_metrics_BytesOutPerSec |
Client byte output rate | bytes/s |
broker_topic_metrics_FailedProduceRequestsPerSec |
Produce request failure rate | count/s |
broker_topic_metrics_FetchMessageConversionsPerSec |
Fetch message conversion rate | count/s |
broker_topic_metrics_MessagesInPerSec |
Incoming message rate | count/s |
broker_topic_metrics_ProduceMessageConversionsPerSec |
Producer message conversion rate | count/s |
broker_topic_metrics_TotalFetchRequestsPerSec |
Fetch request (from client or follower) rate | count/s |
broker_topic_metrics_TotalProduceRequestsPerSec |
Producer request rate | count/s |
socket_server_metrics_connection_count |
SocketServer connection count | count |
socket_server_metrics_connection_close_total |
SocketServer closed connection count | count |
socket_server_metrics_incoming_byte_rate |
SocketServer input byte rate | bytes/s |
kafka_network
Measurement¶
Metric Name | Description | Unit |
---|---|---|
request_metrics_RequestBytes_request_AddOffsetsToTxn |
AddOffsetsToTxn request size | bytes |
request_metrics_RequestBytes_request_Fetch |
Fetch request size | count |
request_metrics_RequestBytes_request_FetchConsumer |
FetchConsumer request size | bytes |
request_metrics_RequestBytes_request_FetchFollower |
FetchFollower request size | bytes |
request_metrics_TotalTimeMs_request_CreateTopics |
CreateTopics request total time | ms |
request_metrics_TotalTimeMs_request_CreatePartitions |
CreatePartitions request total time | ms |
request_metrics_RequestQueueTimeMs_request_CreateTopics |
CreateTopics request queue wait time | ms |
request_metrics_RequestQueueTimeMs_request_CreatePartitions |
CreatePartitions request queue wait time | ms |
request_metrics_RequestQueueTimeMs_request_Produce |
Produce request queue wait time | ms |
request_metrics_ResponseSendTimeMs_request_CreateTopics |
CreateTopics request response time | ms |
request_metrics_ResponseSendTimeMs_request_CreatePartitions |
CreatePartitions request response time | ms |
kafka_controller
Measurement¶
Metric Name | Description | Unit |
---|---|---|
ActiveBrokerCount |
Active Broker count | count |
ActiveControllerCount |
Active controller count | count |
GlobalPartitionCount |
Partition count | count |
GlobalTopicCount |
Topic count | count |
OfflinePartitionsCount |
Offline partition count | count |
PreferredReplicaImbalanceCount |
Preferred Leader election condition partition count | count |
OfflinePartitionsCount |
Offline partition count | count |
TimedOutBrokerHeartbeatCount |
Broker heartbeat timeout count | count |
LastAppliedRecordLagMs |
Last applied record lag time | ms |
LastAppliedRecordOffset |
Last applied record offset | - |
MetadataErrorCount |
Metadata error count | count |
NewActiveControllersCount |
New controller election count | count |
kafka_producer
Measurement¶
Metric Name | Description | Unit |
---|---|---|
producer_metrics_batch_split_rate |
Batch split rate | count/s |
producer_metrics_buffer_available_bytes |
Unused buffer memory total | bytes |
producer_metrics_buffer_exhausted_rate |
Average record send count discarded due to buffer exhaustion per second | count/s |
producer_metrics_buffer_total_bytes |
Buffer total byte size | bytes |
producer_metrics_bufferpool_wait_ratio |
Buffer pool wait ratio | % |
producer_metrics_bufferpool_wait_time_ns_total |
Buffer pool wait time | ms |
producer_metrics_connection_close_rate |
Connection close rate | count/s |
producer_metrics_connection_count |
Connection close count | count |
producer_metrics_flush_time_ns_total |
Flush total time | ns |
producer_metrics_incoming_byte_rate |
Input byte rate | bytes/s |
producer_metrics_outgoing_byte_rate |
Output byte rate | bytes/s |
producer_metrics_request_rate |
Request rate | count/s |
producer_metrics_request_size_avg |
Request size | bytes |
kafka_consumer
Measurement¶
Metric Name | Description | Unit |
---|---|---|
consumer_coordinator_metrics_failed_rebalance_total |
Rebalance failure count | count |
consumer_coordinator_metrics_heartbeat_rate |
Average heartbeat count per second | count/s |
consumer_coordinator_metrics_heartbeat_response_time_max |
Heartbeat response max time | count |
consumer_coordinator_metrics_join_rate |
Group join rate per second | count/s |
consumer_coordinator_metrics_join_total |
Group join total | count |
consumer_coordinator_metrics_last_rebalance_seconds_ago |
Seconds since last rebalance event | ms |
consumer_coordinator_metrics_rebalance_latency_total |
Rebalance latency total | ms |
consumer_fetch_manager_metrics_bytes_consumed_rate |
Bytes consumed per second | bytes/s |
consumer_fetch_manager_metrics_fetch_latency_avg |
Fetch request latency | ms |
consumer_metrics_connection_count |
Connection count | count |
consumer_metrics_connection_count |
Connection close count | count/s |
consumer_metrics_incoming_byte_rate |
Input byte rate | bytes/s |
consumer_metrics_outgoing_byte_rate |
Output byte rate | bytes/s |
consumer_metrics_select_rate |
Select rate | count/s |
consumer_metrics_last_poll_seconds_ago |
IO wait time | ms |
consumer_metrics_last_poll_seconds_ago |
IO wait time | ms |
kafka_connect
Measurement¶
Metric Name | Description | Unit |
---|---|---|
worker_connector_count |
Connector count | count |
worker_task_startup_attempts_total |
Task startup retry count | count |
worker_connector_startup_attempts_total |
Connector startup attempt count | count |
worker_task_startup_failure_total |
Task startup failure count | count |
worker_connector_startup_failure_percentage |
Connection failure rate | % |
worker_rebalance_completed_rebalances_total |
Rebalance completion total | count |
worker_task_startup_failure_percentage |
Task startup failure percentage | % |
worker_rebalance_time_since_last_rebalance_ms |
Time since last rebalance | ms |
worker_task_startup_attempts_total |
Task startup attempt count | count |
kafka_stream
Measurement¶
Metric Name | Description | Unit |
---|---|---|
stream_thread_metrics_thread_start_time |
Thread start time | Timestamp ms |
stream_thread_metrics_task_created_total |
Task creation total | count |
stream_state_metrics_block_cache_capacity |
Block cache size | bytes |
stream_state_metrics_all_rate |
All operation rate | count/s |
stream_state_metrics_block_cache_usage |
Block cache usage | % |
stream_state_metrics_bytes_read_compaction_rate |
Byte read compression rate | bytes/s |
stream_state_metrics_bytes_written_compaction_rate |
Byte write compression rate | bytes/s |
stream_state_metrics_block_cache_index_hit_ratio |
Block cache index hit ratio | % |
stream_state_metrics_block_cache_data_hit_ratio |
Block cache data hit ratio | % |
stream_state_metrics_block_cache_filter_hit_ratio |
Block cache filter hit ratio | % |
stream_state_metrics_bytes_written_rate |
Byte write rate | bytes/s |
stream_state_metrics_bytes_read_rate |
Byte read rate | bytes/s |
stream_state_metrics_block_cache_filter_hit_ratio |
Cache size bytes | bytes |
stream_task_metrics_process_rate |
Records processed per second | bytes/s |
stream_task_metrics_enforced_processing_rate |
Enforced processing count per second | bytes/s |
stream_task_metrics_active_process_ratio |
Active process ratio | % |
stream_thread_metrics_commit_rate |
Commit rate | count/s |
stream_thread_metrics_poll_latency_avg |
Poll latency time | ms |
stream_thread_metrics_poll_rate |
Poll rate | count/s |
stream_thread_metrics_blocked_time_ns_total |
Blocked time | ns |
stream_topic_metrics_bytes_consumed_total |
Consumed bytes | bytes |
stream_topic_metrics_bytes_produced_total |
Produced bytes | bytes |
stream_topic_metrics_records_consumed_total |
Total records consumed by source processor nodes | count |
stream_topic_metrics_records_produced_total |
Total records produced by sink processor nodes | count |