Aggregator
Aggregator is a data processing component within DataKit's internal IO path, not a standalone input collector. Its main functions include:
- Filtering data points according to
aggr.tomlconfiguration rules and forwarding them to the Dataway aggregation interface - Filtering
tracing/logging/rumdata according totail-sampling.tomlconfiguration rules and forwarding them to the Dataway tail sampling interface
DataKit is only responsible for configuration loading, data point filtering, packaging, and sending. Actual aggregation calculations and tail sampling decisions are performed by Dataway.
Workflow¶
Aggregator loads configuration upon startup and automatically reloads it every minute.
The data processing flow is as follows:
- Execute aggregation rules (
PickMetric), package matched data points and send them to the/v1/aggregateinterface - Execute tail sampling rules by data type (
PickTrace/PickLogging/PickRUM), send matched data to the/v1/tail_samplinginterface
Supported features and processing behavior for different data types:
| Data Type | Aggregation Rules | Tail Sampling Rules | Original Data Point Processing |
|---|---|---|---|
tracing |
Supported | Supported | Taken over by Aggregator after matching tail sampling rules, no longer goes through normal write process |
logging |
Supported | Supported | Data points not matching tail sampling rules continue through normal write process |
rum |
Supported | Supported | Data points not matching tail sampling rules continue through normal write process |
metric and other types |
Supported | Not supported | Original data points continue through normal write process |
Main Configuration¶
Main configuration fields for Aggregator:
endpoints: List of downstream Dataway addresses, must includetokenquery parametermax_raw_body_size: Maximum size of a single data packet before compression (in bytes)use_local_config: Whether to enable local configuration file modelocal_config_dir: Path to local configuration file directorylocal_metric_config_file: Aggregation rule configuration filenamelocal_tail_sampling_config_file: Tail sampling rule configuration filename
Notes:
[aggregator]is the only active top-level configuration entry for this feature- When
endpointsis configured, DataKit builds requests against/v1/aggregate,/v1/tail_sampling, and/v1/tail_sampling_configon those endpoints - When
endpointsis empty, DataKit reuses downstream addresses already initialized by[dataway]
Configuration example:
[aggregator]
endpoints = [
"https://openway.example.com?token=<YOUR_WORKSPACE_TOKEN>",
]
max_raw_body_size = 1048576
use_local_config = true
local_config_dir = "/usr/local/datakit/conf.d/aggr"
local_metric_config_file = "aggr.toml"
local_tail_sampling_config_file = "tail-sampling.toml"
Default local configuration file paths:
/usr/local/datakit/conf.d/aggr/aggr.toml/usr/local/datakit/conf.d/aggr/tail-sampling.toml
When use_local_config = false, configuration is dynamically pulled from Dataway.
Aggregation Configuration¶
Top-level Fields¶
default_window: Default aggregation time windowaction: Default processing action, usually kept empty (options:passthrough,drop)delete_rules_point: Whether to delete original data points that match rules, usually kept asfalse
Rule Structure¶
[[aggregate_rules]]
name = "otel_jvm_class_loaded_sum"
group_by = ["service_name"]
[aggregate_rules.select]
category = "metric"
measurements = ["otel_service"]
metric_name = ["jvm.classes.loaded"]
condition = ""
[aggregate_rules.algorithms."jvm_class_loaded_sum"]
method = "sum"
source_field = "jvm.classes.loaded"
[aggregate_rules.algorithms."jvm_class_loaded_sum".add_tags]
metric = "jvm_class_loaded_sum"
Field descriptions:
group_by: Dimensions for data aggregation groupingselect.category: Data filtering category (supportsmetric,tracing,logging,rum, etc.)select.measurements: Measurement whitelistselect.metric_name: Metric field whitelistselect.condition: Data filtering condition (when empty, only filters by measurement/field)algorithms.*.method: Aggregation algorithm type (string format)algorithms.*.source_field: Source field name for algorithm processing, must exactly match the actual field name in data points
Available method Values¶
Currently supported aggregation algorithms:
sum: Summationavg: Averagecount: Countmin: Minimum valuemax: Maximum valuehistogram: Histogramstdev: Standard deviationquantiles: Quantile aggregationcount_distinct: Distinct countfirst: First valuelast: Last value
Notes:
expo_histogramalgorithm is currently not supportedmethodparameter must use string format
Common Rule Examples¶
1. JVM Metric Aggregation¶
[[aggregate_rules]]
name = "otel_jvm_threads_live_sum"
group_by = ["service_name"]
[aggregate_rules.select]
category = "metric"
measurements = ["otel_service"]
metric_name = ["jvm.threads.live"]
condition = ""
[aggregate_rules.algorithms."jvm_threads_live_sum"]
method = "sum"
source_field = "jvm.threads.live"
2. Counting Root Spans in Tracing Data¶
[[aggregate_rules]]
name = "trace_root_span_count"
group_by = ["service", "resource"]
[aggregate_rules.select]
category = "tracing"
measurements = ["opentelemetry"]
metric_name = ["span_id"]
condition = '{ parent_id = "0" }'
[aggregate_rules.algorithms."root_span.count"]
method = "count"
source_field = "span_id"
Tail Sampling Config¶
Top-level Structure¶
version = 2
[trace]
data_ttl = "1m"
group_key = "trace_id"
[logging]
data_ttl = "1m"
[rum]
data_ttl = "1m"
Configuration notes:
version: Configuration version number, sent to Dataway along with data packetstrace.group_key: Trace data grouping key, currently only supportstrace_idlogging/rum: Configure dimension-based grouping sampling throughgroup_dimensions
Trace Rules¶
For trace rules, the configuration key is sampling_pipeline (note: not pipelines):
[trace]
data_ttl = "1m"
group_key = "trace_id"
[[trace.sampling_pipeline]]
name = "keep-resource"
type = "condition"
condition = '{ resource IN ["/resource"] }'
action = "keep"
[[trace.sampling_pipeline]]
name = "drop-304"
type = "condition"
condition = '{ http_status_code = "304" }'
action = "drop"
Rules are evaluated in configuration order, so rule ordering is critical. The recommended priority is:
- Put explicit
keeprules first - Put explicit
droprules next - Use probabilistic sampling only as the final fallback
Why this order is recommended:
keeprules should protect high-value data first, such as error traces, slow traces, or critical business trafficdroprules should come afterkeep, so they only remove data that is already known to be safe to discard- Probabilistic sampling is best used as the last fallback to retain a percentage of the remaining traffic
Do not place a catch-all drop rule or a broad probabilistic rule too early, otherwise later and more specific keep rules may not work as expected.
Recommended example:
[trace]
data_ttl = "1m"
group_key = "trace_id"
[[trace.sampling_pipeline]]
name = "keep-error-trace"
type = "condition"
condition = '{ status = "error" }'
action = "keep"
[[trace.sampling_pipeline]]
name = "keep-slow-trace"
type = "condition"
condition = '{ duration > 1000000000 }'
action = "keep"
[[trace.sampling_pipeline]]
name = "drop-health-check"
type = "condition"
condition = '{ resource IN ["/health", "/ready"] }'
action = "drop"
[[trace.sampling_pipeline]]
name = "sample-rest"
type = "probabilistic"
condition = '{ 1 = 1 }'
rate = 0.1
Logging Rules¶
[logging]
data_ttl = "1m"
[[logging.group_dimensions]]
group_key = "trace_id"
[[logging.group_dimensions.pipelines]]
name = "sample-by-trace-id"
type = "probabilistic"
condition = '{ 1 = 1 }'
rate = 0.1
Processing behavior:
- First group log data by
group_key - Logs matching group sampling rules are sent to the tail sampling interface
- Log data missing
group_keyis passed through and does not participate in group sampling
RUM Rules¶
[rum]
data_ttl = "1m"
[[rum.group_dimensions]]
group_key = "session_id"
[[rum.group_dimensions.pipelines]]
name = "sample-by-session"
type = "probabilistic"
condition = '{ 1 = 1 }'
rate = 0.2
Rule Field Descriptions¶
type = "condition": Execute corresponding action after condition matching,actiononly supportskeep(retain) ordrop(discard)type = "probabilistic": Perform deterministic probability sampling according torateparameter (range:0 ~ 1)condition: Data filtering condition, supports empty value (matches all data)hash_keys: Reserved field, no configuration needed in current version
Current version limitations:
derived_metricsfeature is not yet enabled, configuring this field will cause errors
Sending and Performance Behavior¶
Aggregator performs packet splitting and concurrency optimization before data transmission:
- Automatically splits aggregation batches and tail sampling data packets according to
max_raw_body_sizeconfiguration - Uses asynchronous concurrent sending mechanism with a maximum of
8worker threads - Uses Protobuf format for data transmission (
Content-Type: application/x-protobuf)
max_raw_body_size configuration priority:
aggregator.max_raw_body_size(highest priority)dataway.max_raw_body_size- DataKit default value
When tail sampling data transmission returns status code 412, DataKit automatically resends tail sampling configuration to the /v1/tail_sampling_config interface.
Runtime Metrics¶
Aggregator now exposes the following Prometheus metrics (full prefix: datakit_io_):
| Metric Name | Type | Labels | Description |
|---|---|---|---|
aggr_send_success_total |
Counter | type, category |
Count of successful sends from Aggregator |
aggr_send_failed_total |
Counter | type, category, reason |
Count of failed sends from Aggregator |
aggr_send_points_total |
Counter | type, category |
Number of points successfully sent by Aggregator |
aggr_lost_points_total |
Counter | type, category, reason |
Number of points lost due to send failures |
aggr_send_latency_seconds |
Summary | type, category |
Send latency in seconds |
Label notes:
type: currently includesmetric,tail_sampling,configcategory: currently mainlyunknownon send paths (configfor config push)reason: currently includesmarshal,transport,network,server,other
Troubleshooting hints:
- If
aggr_send_failed_totalincreases byreason, first check encoding, network, downstream status code, or transport settings aggr_lost_points_total > 0indicates data loss at send stage- Use
aggr_send_points_totaltogether withaggr_send_latency_secondsto evaluate throughput and latency anomalies
Common Pitfalls¶
aggr.tomlandtail-sampling.tomlare two independent configuration files, cannot be mixed- Trace rule configuration must use
[[trace.sampling_pipeline]]syntax methodparameter must use string format, such assum,count,max, etc.source_fieldmust exactly match the actual field name in data pointstrace.group_keycurrently only supportstrace_idvalue- When using group sampling for
logging/rum, data points missinggroup_keyare passed through
Troubleshooting Suggestions¶
If configuration does not work as expected, troubleshoot in the following order:
- Confirm the correct configuration source is enabled (local file or remote pull)
- Check if local configuration file path is correct and TOML syntax is valid
- Verify if
category,measurements,metric_name,conditioncan match actual data points - Confirm if
source_fieldmatches the real field name in data points - Check if tail sampling
group_keyexists in data points - Check relevant information in DataKit logs, such as
loaded ... config,split ...,send ..., and HTTP status codes