Skip to content

Data Processing for Various Categories in Pipeline

Experimental


Since DataKit 1.4.0, the built-in Pipeline feature can be used to directly manipulate data collected by DataKit, supporting all data types currently available.

Attention
  • The Pipeline applies to all data and is currently in the experimental stage; there is no guarantee that the mechanism or behavior will not undergo incompatible adjustments later.
  • Even data reported through the DataKit API supports Pipeline processing.
  • Using the Pipeline to process existing collected data (especially non-log data) may very likely disrupt the existing data structure, causing anomalies in TrueWatch.
  • Before applying the Pipeline, please confirm that the data processing meets expectations using the Pipeline Debugging Tool.

The Pipeline can perform the following operations on data collected by DataKit:

  • Add, delete, modify values or data types of fields and tags
  • Convert fields into tags
  • Modify the name of a measurement set
  • Mark the current data for discard (drop())
  • Terminate the execution of the Pipeline script (exit())
  • ...

Input Data Structure

All categories of data are encapsulated into a Point structure before being processed by the Pipeline script. Its structure can be considered as:

struct Point {
   Name:      str          # Equivalent to the name of the Measurement Set for Metrics (time series) data, the source for Logging (log) data,
                              # the source for Network data, the class for Object/CustomObject (object) data ...
   Tags:      map[str]str  # Stores all tags of the data; for non-time-series category data, the boundary between tags and fields is relatively blurred.
   Fields:    map[str]any  # Stores all fields of the data (called metrics for time-series category data)
   Time:      int64        # As the timestamp of the data, usually interpreted as the timestamp when the data was generated, in nanoseconds.
   DropFlag:  bool         # Marks whether this data should be discarded.
}

For example, an nginx log data point, after being collected by the log collector, would look roughly like this as input to the Pipeline script:

Point {
    Name: "nginx"
    Tags: map[str]str {
        "host": "your_hostname"
    },
    Fields: map[str]any {
        "message": "127.0.0.1 - - [12/Jan/2023:11:51:38 +0800] \"GET / HTTP/1.1\" 200 612 \"-\" \"curl/7.81.0\""
    },
    Time: 1673495498000123456,
    DropFlag: false,
}

Note:

  • The name can be modified using the function set_measurement().

  • In the point's tags/fields map, no key can or will simultaneously appear in both tags and fields;

  • You can read the corresponding key’s value from the point's tags/fields map in the Pipeline using custom identifiers or the function get_key(), but modifying the value of keys in Tags or Fields requires other built-in functions, such as add_key; _ can be regarded as an alias for the key message.

  • After the script runs, if there is a key named time in the point's tags/fields map, it will be deleted; if its value is of type int64, its value will be assigned to the point's time after deletion. If time is a string, you can attempt to use the function default_time() to convert it to int64.

  • You can use the drop() function to mark the input Point as pending for discard; after the script executes, this data will not be uploaded.

Storage, Indexing, and Matching of Pipeline Scripts

Script Storage and Indexing

Currently, Pipeline scripts are divided into four namespaces based on their origin, with decreasing indexing priority as shown in the table below:

Namespace Directory Supported Data Categories Description
remote [DataKit Installation Directory]/pipeline_remote CO, E, L, M, N, O, P, R, S, T Scripts managed via the TrueWatch console
confd [DataKit Installation Directory]/pipeline_cond CO, E, L, M, N, O, P, R, S, T Scripts managed via Confd
gitrepo [DataKit Installation Directory]/pipeline_gitrepos/[repo-name] CO, E, L, M, N, O, P, R, S, T Scripts managed via Git
default [DataKit Installation Directory]/pipeline CO, E, L, M, N, O, P, R, S, T Scripts generated by DataKit or written by users

Note:

  • Do not modify the automatically generated default scripts under the pipeline directory; any modifications will be overwritten upon DataKit startup;
  • It is recommended to add local scripts corresponding to each data category under the pipeline/[category]/ directory;
  • Except for the pipeline directory, do not make any modifications to other script directories (remote, confd, gitrepo).

When DataKit selects the corresponding Pipeline, the indexing priority of scripts within these four namespaces decreases sequentially. For example, for the cpu measurement set, when needing metric/cpu.p, DataKit searches in the following order:

  1. pipeline_remote/metric/cpu.p
  2. pipeline_confd/metric/cpu.p
  3. gitrepo/<repo-name>/metric/cpu.p
  4. pipeline/metric/cpu.p

Note: Here <repo-name> depends on the name of your git repository.

We create indexes for scripts under each data category separately; this functionality does not allow cross-namespace references using the use() function. The implementation of Pipeline script storage and script indexing is shown in the figure below. During index creation, scripts from higher-priority namespaces will override those from lower-priority ones:

script-index

All four sources of Pipeline directories store Pipeline scripts in the following manner:

├── pattern   <-- A dedicated directory for storing custom patterns
├── apache.p
├── consul.p
├── sqlserver.p        <--- All top-level directories' Pipelines default to logs for compatibility with historical settings
├── tomcat.p
├── other.p
├── custom_object      <--- Dedicated directory for storing pipelines for custom objects
│   └── some-object.p
├── keyevent           <--- Dedicated directory for storing event-specific pipelines
│   └── some-event.p
├── logging            <--- Dedicated directory for storing log-specific pipelines
│   └── nginx.p
├── metric             <--- Dedicated directory for storing time-series metric-specific pipelines
│   └── cpu.p
├── network            <--- Dedicated directory for storing network metric-specific pipelines
│   └── ebpf.p
├── object             <--- Dedicated directory for storing object-specific pipelines
│   └── HOST.p
├── rum                <--- Dedicated directory for storing RUM-specific pipelines
│   └── error.p
├── security           <--- Dedicated directory for storing scheck-specific pipelines
│   └── scheck.p
└── tracing            <--- Dedicated directory for storing APM-specific pipelines
    └── service_a.p

Data and Script Matching Strategy

There are four strategies for matching data with script names, which will be evaluated from the 4th (highest priority) down to the 1st. If a higher-priority strategy is met, lower-priority strategies will not be executed:

  1. Based on the data characteristic string generated from the input data, append the Pipeline script file extension .p and search for the corresponding category script.
  2. Default scripts set for all data of a given category in the TrueWatch console.
  3. Mapping relationships between data and scripts defined in the TrueWatch console.
  4. Scripts specified in the collector configuration file.

All data and script matching strategies depend on the data characteristic string of the data. The generation strategy for the data characteristic string differs for different data categories:

  1. Generate the data characteristic string based on specific point tags/fields:
  2. Tracing and Profiling data categories in APM:
    • Generate the data characteristic string based on the value of service in tags/fields. For example, if DataKit collects a piece of data where the service value is service-a, it will generate service-a, corresponding to the script name service-a.p, which will then be searched under the Tracing/Profiling script index;
  3. Security data characteristic strings for Scheck:

    • Generate the data characteristic string based on the value of category in tags/fields. For example, if DataKit receives a Security data point where the category value is system, it will generate system, corresponding to the script name system.p.
  4. Generate the data characteristic string based on specific point tags/fields and the point name:

  5. RUM data:

    • Generate the data characteristic string using the value of app_id in tags/fields and the value of the point name; for example, if the point name value is action, it generates <app_id>_action, corresponding to the script name <app_id>_action.p;
  6. Generate the data characteristic string based on the point name:

  7. Logging/Metric/Network/Object/... and all other categories:
    • Generate the data characteristic string based on the point name. For example, for the time-series measurement set cpu, it generates cpu, corresponding to the script cpu.p; for host objects in object data where the class is HOST, it generates HOST, corresponding to the script HOST.p.

Pipeline Processing Examples

Example scripts are for reference only; actual usage should be tailored to specific needs.

Processing Time Series Data

The following example demonstrates how to modify tags and fields using the Pipeline. Through DQL, we know that the fields of a CPU measurement set are as follows:

dql > M::cpu{host='u'} LIMIT 1
-----------------[ r1.cpu.s1 ]-----------------
core_temperature 76
             cpu 'cpu-total'
            host 'u'
            time 2022-04-25 12:32:55 +0800 CST
     usage_guest 0
usage_guest_nice 0
      usage_idle 81.399796
    usage_iowait 0.624681
       usage_irq 0
      usage_nice 1.695563
   usage_softirq 0.191229
     usage_steal 0
    usage_system 5.239674
     usage_total 18.600204
      usage_user 10.849057
---------

Write the following Pipeline script,

# file pipeline/metric/cpu.p

set_tag(script, "metric::cpu.p")
set_tag(host2, host)
usage_guest = 100.1

After restarting DataKit, new data is collected, and through DQL, we get the following modified CPU measurement set:

dql > M::cpu{host='u'}[20s] LIMIT 1
-----------------[ r1.cpu.s1 ]-----------------
core_temperature 54.250000
             cpu 'cpu-total'
            host 'u'
           host2 'u'                        <--- Newly added tag
          script 'metric::cpu.p'            <--- Newly added tag
            time 2022-05-31 12:49:15 +0800 CST
     usage_guest 100.100000                 <--- Rewritten field value
usage_guest_nice 0
      usage_idle 94.251269
    usage_iowait 0.012690
       usage_irq 0
      usage_nice 0
   usage_softirq 0.012690
     usage_steal 0
    usage_system 2.106599
     usage_total 5.748731
      usage_user 3.616751
---------

Processing Object Data

The following Pipeline example demonstrates how to discard (filter) data. Taking the Nginx process as an example, the current list of Nginx processes on the host is as follows:

$ ps axuwf | grep  nginx
root        1278  0.0  0.0  55288  1496 ?        Ss   10:10   0:00 nginx: master process /usr/sbin/nginx -g daemon on; master_process on;
www-data    1279  0.0  0.0  55856  5212 ?        S    10:10   0:00  \_ nginx: worker process
www-data    1280  0.0  0.0  55856  5212 ?        S    10:10   0:00  \_ nginx: worker process
www-data    1281  0.0  0.0  55856  5212 ?        S    10:10   0:00  \_ nginx: worker process
www-data    1282  0.0  0.0  55856  5212 ?        S    10:10   0:00  \_ nginx: worker process
www-data    1283  0.0  0.0  55856  5212 ?        S    10:10   0:00  \_ nginx: worker process
www-data    1284  0.0  0.0  55856  5212 ?        S    10:10   0:00  \_ nginx: worker process
www-data    1286  0.0  0.0  55856  5212 ?        S    10:10   0:00  \_ nginx: worker process
www-data    1287  0.0  0.0  55856  5212 ?        S    10:10   0:00  \_ nginx: worker process

Through DQL, we know that the fields of a specific process measurement set are as follows:

dql > O::host_processes:(host, class, process_name, cmdline, pid) {host='u', pid=1278}
-----------------[ r1.host_processes.s1 ]-----------------
       class 'host_processes'
     cmdline 'nginx: master process /usr/sbin/nginx -g daemon on; master_process on;'
        host 'u'
         pid 1278
process_name 'nginx'
        time 2022-05-31 14:19:15 +0800 CST
---------

Write the following Pipeline script:

if process_name == "nginx" {
    drop()  # drop() function marks this data as pending for discard, and continues running pl
    exit()  # You can terminate the Pipeline execution using the exit() function
}

After restarting DataKit, the corresponding NGINX process object will no longer be collected (the central object has an expiration policy, so wait 5~10 minutes for the original NGINX object to expire automatically).