Data Processing for Various Categories in Pipeline¶
Since DataKit 1.4.0, the built-in Pipeline feature can be used to directly manipulate data collected by DataKit, supporting all data types currently available.
Attention
- The Pipeline applies to all data and is currently in the experimental stage; there is no guarantee that the mechanism or behavior will not undergo incompatible adjustments later.
- Even data reported through the DataKit API supports Pipeline processing.
- Using the Pipeline to process existing collected data (especially non-log data) may very likely disrupt the existing data structure, causing anomalies in TrueWatch.
- Before applying the Pipeline, please confirm that the data processing meets expectations using the Pipeline Debugging Tool.
The Pipeline can perform the following operations on data collected by DataKit:
- Add, delete, modify values or data types of fields and tags
- Convert fields into tags
- Modify the name of a measurement set
- Mark the current data for discard (drop())
- Terminate the execution of the Pipeline script (exit())
- ...
Input Data Structure¶
All categories of data are encapsulated into a Point structure before being processed by the Pipeline script. Its structure can be considered as:
struct Point {
Name: str # Equivalent to the name of the Measurement Set for Metrics (time series) data, the source for Logging (log) data,
# the source for Network data, the class for Object/CustomObject (object) data ...
Tags: map[str]str # Stores all tags of the data; for non-time-series category data, the boundary between tags and fields is relatively blurred.
Fields: map[str]any # Stores all fields of the data (called metrics for time-series category data)
Time: int64 # As the timestamp of the data, usually interpreted as the timestamp when the data was generated, in nanoseconds.
DropFlag: bool # Marks whether this data should be discarded.
}
For example, an nginx log data point, after being collected by the log collector, would look roughly like this as input to the Pipeline script:
Point {
Name: "nginx"
Tags: map[str]str {
"host": "your_hostname"
},
Fields: map[str]any {
"message": "127.0.0.1 - - [12/Jan/2023:11:51:38 +0800] \"GET / HTTP/1.1\" 200 612 \"-\" \"curl/7.81.0\""
},
Time: 1673495498000123456,
DropFlag: false,
}
Note:
-
The
name
can be modified using the functionset_measurement()
. -
In the point's tags/fields map, no key can or will simultaneously appear in both tags and fields;
-
You can read the corresponding key’s value from the point's tags/fields map in the Pipeline using custom identifiers or the function
get_key()
, but modifying the value of keys in Tags or Fields requires other built-in functions, such asadd_key
;_
can be regarded as an alias for the keymessage
. -
After the script runs, if there is a key named
time
in the point's tags/fields map, it will be deleted; if its value is of type int64, its value will be assigned to the point's time after deletion. Iftime
is a string, you can attempt to use the functiondefault_time()
to convert it to int64. -
You can use the
drop()
function to mark the input Point as pending for discard; after the script executes, this data will not be uploaded.
Storage, Indexing, and Matching of Pipeline Scripts¶
Script Storage and Indexing¶
Currently, Pipeline scripts are divided into four namespaces based on their origin, with decreasing indexing priority as shown in the table below:
Namespace | Directory | Supported Data Categories | Description |
---|---|---|---|
remote |
[DataKit Installation Directory]/pipeline_remote | CO, E, L, M, N, O, P, R, S, T | Scripts managed via the TrueWatch console |
confd |
[DataKit Installation Directory]/pipeline_cond | CO, E, L, M, N, O, P, R, S, T | Scripts managed via Confd |
gitrepo |
[DataKit Installation Directory]/pipeline_gitrepos/[repo-name] | CO, E, L, M, N, O, P, R, S, T | Scripts managed via Git |
default |
[DataKit Installation Directory]/pipeline | CO, E, L, M, N, O, P, R, S, T | Scripts generated by DataKit or written by users |
Note:
- Do not modify the automatically generated default scripts under the pipeline directory; any modifications will be overwritten upon DataKit startup;
- It is recommended to add local scripts corresponding to each data category under the pipeline/[category]/ directory;
- Except for the pipeline directory, do not make any modifications to other script directories (remote, confd, gitrepo).
When DataKit selects the corresponding Pipeline, the indexing priority of scripts within these four namespaces decreases sequentially. For example, for the cpu
measurement set, when needing metric/cpu.p, DataKit searches in the following order:
pipeline_remote/metric/cpu.p
pipeline_confd/metric/cpu.p
gitrepo/<repo-name>/metric/cpu.p
pipeline/metric/cpu.p
Note: Here
<repo-name>
depends on the name of your git repository.
We create indexes for scripts under each data category separately; this functionality does not allow cross-namespace references using the use()
function. The implementation of Pipeline script storage and script indexing is shown in the figure below. During index creation, scripts from higher-priority namespaces will override those from lower-priority ones:
All four sources of Pipeline directories store Pipeline scripts in the following manner:
├── pattern <-- A dedicated directory for storing custom patterns
├── apache.p
├── consul.p
├── sqlserver.p <--- All top-level directories' Pipelines default to logs for compatibility with historical settings
├── tomcat.p
├── other.p
├── custom_object <--- Dedicated directory for storing pipelines for custom objects
│ └── some-object.p
├── keyevent <--- Dedicated directory for storing event-specific pipelines
│ └── some-event.p
├── logging <--- Dedicated directory for storing log-specific pipelines
│ └── nginx.p
├── metric <--- Dedicated directory for storing time-series metric-specific pipelines
│ └── cpu.p
├── network <--- Dedicated directory for storing network metric-specific pipelines
│ └── ebpf.p
├── object <--- Dedicated directory for storing object-specific pipelines
│ └── HOST.p
├── rum <--- Dedicated directory for storing RUM-specific pipelines
│ └── error.p
├── security <--- Dedicated directory for storing scheck-specific pipelines
│ └── scheck.p
└── tracing <--- Dedicated directory for storing APM-specific pipelines
└── service_a.p
Data and Script Matching Strategy¶
There are four strategies for matching data with script names, which will be evaluated from the 4th (highest priority) down to the 1st. If a higher-priority strategy is met, lower-priority strategies will not be executed:
- Based on the data characteristic string generated from the input data, append the Pipeline script file extension
.p
and search for the corresponding category script. - Default scripts set for all data of a given category in the TrueWatch console.
- Mapping relationships between data and scripts defined in the TrueWatch console.
- Scripts specified in the collector configuration file.
All data and script matching strategies depend on the data characteristic string of the data. The generation strategy for the data characteristic string differs for different data categories:
- Generate the data characteristic string based on specific point tags/fields:
- Tracing and Profiling data categories in APM:
- Generate the data characteristic string based on the value of
service
in tags/fields. For example, if DataKit collects a piece of data where theservice
value isservice-a
, it will generateservice-a
, corresponding to the script nameservice-a.p
, which will then be searched under the Tracing/Profiling script index;
- Generate the data characteristic string based on the value of
-
Security data characteristic strings for Scheck:
- Generate the data characteristic string based on the value of
category
in tags/fields. For example, if DataKit receives a Security data point where thecategory
value issystem
, it will generatesystem
, corresponding to the script namesystem.p
.
- Generate the data characteristic string based on the value of
-
Generate the data characteristic string based on specific point tags/fields and the point name:
-
RUM data:
- Generate the data characteristic string using the value of
app_id
in tags/fields and the value of the point name; for example, if the point name value isaction
, it generates<app_id>_action
, corresponding to the script name<app_id>_action.p
;
- Generate the data characteristic string using the value of
-
Generate the data characteristic string based on the point name:
- Logging/Metric/Network/Object/... and all other categories:
- Generate the data characteristic string based on the point name. For example, for the time-series measurement set
cpu
, it generatescpu
, corresponding to the scriptcpu.p
; for host objects in object data where the class isHOST
, it generatesHOST
, corresponding to the scriptHOST.p
.
- Generate the data characteristic string based on the point name. For example, for the time-series measurement set
Pipeline Processing Examples¶
Example scripts are for reference only; actual usage should be tailored to specific needs.
Processing Time Series Data¶
The following example demonstrates how to modify tags and fields using the Pipeline. Through DQL, we know that the fields of a CPU measurement set are as follows:
dql > M::cpu{host='u'} LIMIT 1
-----------------[ r1.cpu.s1 ]-----------------
core_temperature 76
cpu 'cpu-total'
host 'u'
time 2022-04-25 12:32:55 +0800 CST
usage_guest 0
usage_guest_nice 0
usage_idle 81.399796
usage_iowait 0.624681
usage_irq 0
usage_nice 1.695563
usage_softirq 0.191229
usage_steal 0
usage_system 5.239674
usage_total 18.600204
usage_user 10.849057
---------
Write the following Pipeline script,
# file pipeline/metric/cpu.p
set_tag(script, "metric::cpu.p")
set_tag(host2, host)
usage_guest = 100.1
After restarting DataKit, new data is collected, and through DQL, we get the following modified CPU measurement set:
dql > M::cpu{host='u'}[20s] LIMIT 1
-----------------[ r1.cpu.s1 ]-----------------
core_temperature 54.250000
cpu 'cpu-total'
host 'u'
host2 'u' <--- Newly added tag
script 'metric::cpu.p' <--- Newly added tag
time 2022-05-31 12:49:15 +0800 CST
usage_guest 100.100000 <--- Rewritten field value
usage_guest_nice 0
usage_idle 94.251269
usage_iowait 0.012690
usage_irq 0
usage_nice 0
usage_softirq 0.012690
usage_steal 0
usage_system 2.106599
usage_total 5.748731
usage_user 3.616751
---------
Processing Object Data¶
The following Pipeline example demonstrates how to discard (filter) data. Taking the Nginx process as an example, the current list of Nginx processes on the host is as follows:
$ ps axuwf | grep nginx
root 1278 0.0 0.0 55288 1496 ? Ss 10:10 0:00 nginx: master process /usr/sbin/nginx -g daemon on; master_process on;
www-data 1279 0.0 0.0 55856 5212 ? S 10:10 0:00 \_ nginx: worker process
www-data 1280 0.0 0.0 55856 5212 ? S 10:10 0:00 \_ nginx: worker process
www-data 1281 0.0 0.0 55856 5212 ? S 10:10 0:00 \_ nginx: worker process
www-data 1282 0.0 0.0 55856 5212 ? S 10:10 0:00 \_ nginx: worker process
www-data 1283 0.0 0.0 55856 5212 ? S 10:10 0:00 \_ nginx: worker process
www-data 1284 0.0 0.0 55856 5212 ? S 10:10 0:00 \_ nginx: worker process
www-data 1286 0.0 0.0 55856 5212 ? S 10:10 0:00 \_ nginx: worker process
www-data 1287 0.0 0.0 55856 5212 ? S 10:10 0:00 \_ nginx: worker process
Through DQL, we know that the fields of a specific process measurement set are as follows:
dql > O::host_processes:(host, class, process_name, cmdline, pid) {host='u', pid=1278}
-----------------[ r1.host_processes.s1 ]-----------------
class 'host_processes'
cmdline 'nginx: master process /usr/sbin/nginx -g daemon on; master_process on;'
host 'u'
pid 1278
process_name 'nginx'
time 2022-05-31 14:19:15 +0800 CST
---------
Write the following Pipeline script:
if process_name == "nginx" {
drop() # drop() function marks this data as pending for discard, and continues running pl
exit() # You can terminate the Pipeline execution using the exit() function
}
After restarting DataKit, the corresponding NGINX process object will no longer be collected (the central object has an expiration policy, so wait 5~10 minutes for the original NGINX object to expire automatically).