Skip to content

Pipelines


Pipelines are lightweight scripting languages running on DataKit, used for custom parsing and modification of collected data. By defining parsing rules, they can finely slice and convert various types of data into structured formats to meet specific data management needs. For example, users can extract timestamps, statuses, and other key fields from logs through Pipelines and use this information as tags.

DataKit leverages the powerful capabilities of Pipelines, allowing users to write and debug Pipeline scripts directly on the workspace page, thereby achieving more granular structured processing of data. This processing not only enhances data manageability but also supports standardized operations on common data through the rich function library provided by Pipeline, such as parsing time strings and supplementing geographic information of IP addresses.

The main features of Pipeline include:

  • As a lightweight scripting language, Pipeline provides efficient data processing capabilities;
  • It has a rich function library that supports standardized operations on various common data types;
  • Users can write and debug Pipeline scripts directly on the workspace page, making script creation and batch activation more convenient.

Currently, TrueWatch supports configuring local Pipelines and central Pipelines.

  • Local Pipeline: Runs during data collection, requiring DataKit collector version 1.5.0 or higher;
  • Central Pipeline: Runs after data is uploaded to the console center;

Use Cases

Type
Scenarios
Local Pipeline Processes logs before data forwarding.
Central Pipeline 1. User access (Session) data, Profiling data, Synthetic Tests data;
2. Processes user access data in the trace, such as extracting session, view, resource fields from the trace message.

In addition to the above, both local and central Pipelines can process the data.

Prerequisites

To ensure normal use of Pipeline, please upgrade DataKit to version 1.5.0 or higher. Versions lower than this may cause some Pipeline functions to fail.

Before DataKit<1.5.0:

  • Default Pipeline function is not supported;

  • Data sources do not support multiple selections, each Pipeline can only select one source. Therefore, if your version is lower than 1.5.0 and multiple data sources are selected, it will not take effect;

  • Pipeline names are fixed and cannot be modified. For example: If the log source is selected as nginx, the Pipeline name is fixed as nginx.p. Therefore, if your version is lower than 1.5.0 and the Pipeline name does not match the data source name, the Pipeline will not take effect.


This feature requires a paid plan.


Create

In the workspace Manage > Pipelines, click Create Pipeline.

Or you can create it by clicking Pipelines in the menu directories of Metrics, Logs, RUM, APM, Infrastructure, and Security Check.

Note

After the Pipeline file is created, DataKit needs to be installed for it to take effect. DataKit periodically fetches the configured Pipeline files from the workspace, with a default interval of 1 minute, which can be modified in conf.d/datakit.conf.

[pipeline]
  remote_pull_interval = "1m"
  1. Select the Pipeline type;
  2. Select the data type and add filter conditions;
  3. Enter the Pipeline name, which is the custom Pipeline file name;
  4. Provide a test sample;
  5. Enter the function script and configure the parsing rules;
  6. Save.
Note
  • If the filter object is selected as logs, the system will automatically filter out testing data, even if this Pipeline is set as default, it will not be applied to testing data.
  • If the filter object is selected as "Synthetic Tests", the type will be automatically set as "Central Pipeline", and local Pipeline cannot be selected.
  • Pipeline file naming needs to avoid duplication.
  • Each data type only supports setting one default Pipeline. If duplication occurs during creation or import, the system will pop up a confirmation box asking whether to replace it. Pipelines that have been set as default will display a default identifier after the name.

Test Sample

Based on the selected data type, input the corresponding data and test it against the configured parsing rules.

  1. One-click sample retrieval: Automatically retrieves already collected data, including Message and all fields;
  2. Add: Can add multiple sample data (up to 3).
Note

Pipeline files created in the workspace are uniformly saved in the <datakit installation directory>/pipeline_remote directory. Among them:

  • Files in the first-level directory are log Pipelines by default.
  • Pipeline files of each type are saved in the corresponding second-level directory. For example, the Metrics Pipeline file cpu.p is saved in the path <datakit installation directory>/pipeline_remote/metric/cpu.p.

For more details, refer to Pipeline Category Data Processing.

One-click Sample Retrieval

When creating or editing a Pipeline, click Sample Parsing Test > One-click Sample Retrieval, you can choose whether to randomly retrieve data based on the specified log source or specify a specific data source.

The system will automatically select the latest piece of data from the data collected and reported to the workspace based on the filtered data range, and fill it into the test sample box for testing. Each time "One-click Sample Retrieval" is clicked, the system only queries data from the last 6 hours. If no data has been reported in the last 6 hours, the sample cannot be automatically retrieved.

Debugging Example:

The following is a one-click retrieved Metrics data sample, with the Measurement as cpu, and tags as cpu and host. The fields from usage_guest to usage_user are Metrics data, and the last 1667732804738974000 is the timestamp. Through the returned results, the data structure of the one-click retrieved sample can be clearly understood.

Manual Sample Input

You can also manually input sample data for testing, supporting two format types:

  • Log data can directly input message content in the sample parsing test;
  • Other data types need to convert the content into "line protocol" format content first, then input it for sample parsing test.

For more details on log Pipelines, refer to Log Pipeline Manual.

Line Protocol Example

  • cpu, redis are Measurements; tag1, tag2 are tag sets; f1, f2, f3 are field sets (where f1=1i represents int, f2=1.2 defaults to float, f3="abc" represents string); 162072387000000000 is the timestamp;
  • Measurements and tag sets are separated by commas; multiple tags are separated by commas;
  • Tag sets and field sets are separated by spaces; multiple fields are separated by commas;
  • Field sets and timestamps are separated by spaces; timestamps are mandatory;
  • For object data, there must be a name tag, otherwise the protocol will report an error; it is best to have a message field, mainly for full-text search.

For more details on line protocol, refer to DataKit API.

More ways to obtain line protocol data can be configured in conf.d/datakit.conf by setting the output_file output file, and view the line protocol in this file.

[io]
  output_file = "/path/to/file"

Define Parsing Rules

Manually write or AI define parsing rules for different data sources, supporting multiple script functions, and directly view their syntax formats through the script function list provided by the system on the right, such as add_pattern().

For how to define parsing rules, refer to Pipeline Manual.

Manual Writing

Independently write data parsing rules, and set text auto-wrapping or content overflow.

AI Generation

AI-generated parsing rules are based on models to generate Pipeline parsing, aiming to quickly provide preliminary parsing solutions.

Note

Since the rules generated by the model may not cover all complex situations or scenarios, the returned results may not be completely accurate. It is recommended to use them as references and starting points, and further adjust and optimize them based on specific log formats and needs after generation.

Now, based on the sample input, extract the required content and names, such as:

-"date_pl":"2024-12-25 07:25:33.525",
-"m_pl":"[INFO][66] route_table.go 237: Queueing a resync of routing table. ipVersion=0x4"

Click to generate Pipeline:

After testing, the returned results are:

For more details, refer to Rule Writing Guide.

Start Testing

On the Pipeline editing page, you can test the already filled parsing rules by inputting data in the Sample Parsing Test. If the parsing rules do not match, an error prompt result will be returned. Sample parsing test is not mandatory, and the test data is saved synchronously after the test.

Terminal Command Line Debugging

In addition to debugging Pipelines on the console, you can also debug Pipelines through terminal command lines.

For more details, refer to How to Write Pipeline Scripts.

Further Reading