Develop Custom Collectors with Python¶
PythonD is a complete solution for periodically triggering user-defined Python collection scripts.
Configuration¶
Navigate to the conf.d/pythond directory under the DataKit installation directory, copy pythond.conf.sample and rename it to pythond.conf. An example is as follows:
[[inputs.pythond]]
# Python input name
name = 'some-python-inputs' # required
# System environments to run Python
#envs = ['LD_LIBRARY_PATH=/path/to/lib:$LD_LIBRARY_PATH',]
# Python path (recommended abstract Python path)
cmd = "python3" # required. python3 is recommended.
# Python scripts relative path
dirs = []
Python Environment¶
Currently in alpha phase, only compatible with Python 3+. Tested versions:
- 3.10.1
The following dependencies need to be installed:
- requests
Installation method is as follows:
The above installation requires pip. If you don't have it, you can refer to the following method (source):
Writing User-Defined Scripts¶
Create a directory named after the "Python package name" under the datakit/python.d directory, then create Python scripts (*.py) within this directory.
For example, if the package name is Demo, its directory structure would look like this. Here, demo.py is the Python script, and the filename of the Python script can be customized:
The Python script needs to inherit from the DataKitFramework class and override the run method.
The source code file path for the
DataKitFrameworkclass isdatakit_framework.pylocated atdatakit/python.d/core/datakit_framework.py.
Python Script Source Code Example
#encoding: utf-8
from datakit_framework import DataKitFramework
class Demo(DataKitFramework):
name = 'Demo'
interval = 10 # triggered interval seconds.
# if your datakit ip is 127.0.0.1 and port is 9529, you won't need use this,
# just comment it.
# def __init__(self, **kwargs):
# super().__init__(ip = '127.0.0.1', port = 9529)
# General report example.
def run(self):
print("Demo")
data = [
{
"measurement": "abc",
"tags": {
"t1": "b",
"t2": "d"
},
"fields": {
"f1": 123,
"f2": 3.4,
"f3": "strval"
},
# "time": 1624550216 # you don't need this
},
{
"measurement": "def",
"tags": {
"t1": "b",
"t2": "d"
},
"fields": {
"f1": 123,
"f2": 3.4,
"f3": "strval"
},
# "time": 1624550216 # you don't need this
}
]
in_data = {
'M':data, # 'M' for metrics, 'L' for logging, 'R' for rum, 'O' for object, 'CO' for custom object, 'E' for event.
'input': "datakitpy"
}
return self.report(in_data) # you must call self.report here
# # KeyEvent report example.
# def run(self):
# print("Demo")
# tags = {"tag1": "val1", "tag2": "val2"}
# date_range = 10
# status = 'info'
# event_id = 'event_id'
# title = 'title'
# message = 'message'
# kwargs = {"custom_key1":"custom_value1", "custom_key2": "custom_value2", "custom_key3": "custom_value3"}
# # Feed df_source=user event.
# user_id="user_id"
# return self.feed_user_event(
# user_id,
# tags, date_range, status, event_id, title, message, **kwargs
# )
# # Feed df_source=monitor event.
# dimension_tags='{"host":"web01"}' # dimension_tags must be the String(JSON format).
# return self.feed_monitor_event(
# dimension_tags,
# tags, date_range, status, event_id, title, message, **kwargs
# )
# # Feed df_source=system event.
# return self.feed_system_event(
# tags, date_range, status, event_id, title, message, **kwargs
# )
# # metrics, logging, object example.
# def run(self):
# print("Demo")
# measurement = "mydata"
# tags = {"tag1": "val1", "tag2": "val2"}
# fields = {"custom_field1": "val1","custom_field2": 1000}
# kwargs = {"custom_key1":"custom_value1", "custom_key2": "custom_value2", "custom_key3": "custom_value3"}
# # Feed metrics example.
# return self.feed_metric(
# measurement=measurement,
# tags=tags,
# fields=fields,
# **kwargs
# )
# # Feed logging example.
# message = "This is the message for testing"
# return self.feed_logging(
# source=measurement,
# tags=tags,
# message=message,
# **kwargs
# )
# # Feed object example.
# name = "name"
# return self.feed_object(
# cls=measurement,
# name=name,
# tags=tags,
# fields=fields,
# **kwargs
# )
Python SDK API definition (for more details, see datakit_framework.py):
- Reporting metrics data:
feed_metric(self, input=None, measurement=None, tags=None, fields=None, time=None, **kwargs); - Reporting logging data:
feed_logging(self, input=None, source=None, tags=None, message=None, time=None, **kwargs); - Reporting object data:
feed_object(self, input=None, cls=None, name=None, tags=None, fields=None, time=None, **kwargs); (clsstands forclass. Sinceclassis a Python keyword, it's abbreviated ascls)
Reporting Pythond Events¶
You can use the following three built-in functions to report events:
- Reporting events where
df_source = user:feed_user_event(self, df_user_id=None, tags=None, df_date_range=10, df_status=None, df_event_id=None, df_title=None, df_message=None, **kwargs) - Reporting events where
df_source = monitor:feed_monitor_event(self, df_dimension_tags=None, tags=None, df_date_range=10, df_status=None, df_event_id=None, df_title=None, df_message=None, **kwargs) - Reporting events where
df_source = system:feed_system_event(self, tags=None, df_date_range=10, df_status=None, df_event_id=None, df_title=None, df_message=None, **kwargs)
Common event field descriptions:
| Field Name | Type | Required | Description |
|---|---|---|---|
| df_date_range | Integer | Yes | Time range. Unit s |
| df_source | String | Yes | Data source. Possible values: system, monitor, user |
| df_status | Enum | Yes | Status. Possible values: ok, info, warning, error, critical, nodata |
| df_event_id | String | Yes | Event ID |
| df_title | String | Yes | Title |
| df_message | String | No | Detailed description |
| {other fields} | kwargs, e.g., k1=5, k2=6 |
No | Other additional fields |
- When
df_source = monitor:
It indicates an event generated by TrueWatch monitoring features, with the following additional fields:
| Additional Field Name | Type | Required | Description |
|---|---|---|---|
| df_dimension_tags | String(JSON format) | Yes | Monitoring dimension tags, e.g., {"host":"web01"} |
- When
df_source = user:
It indicates an event directly created by users, with the following additional fields:
| Additional Field Name | Type | Required | Description |
|---|---|---|---|
| df_user_id | String | Yes | User ID |
- When
df_source = system:
It indicates an event generated by the system, with no additional fields.
Example usage:
#encoding: utf-8
from datakit_framework import DataKitFramework
class Demo(DataKitFramework):
name = 'Demo'
interval = 10 # triggered interval seconds.
# if your datakit ip is 127.0.0.1 and port is 9529, you won't need use this,
# just comment it.
# def __init__(self, **kwargs):
# super().__init__(ip = '127.0.0.1', port = 9529)
# KeyEvent report example.
def run(self):
print("Demo")
tags = {"tag1": "val1", "tag2": "val2"}
date_range = 10
status = 'info'
event_id = 'event_id'
title = 'title'
message = 'message'
kwargs = {"custom_key1":"custom_value1", "custom_key2": "custom_value2", "custom_key3": "custom_value3"}
# Feed df_source=user event.
user_id="user_id"
return self.feed_user_event(
df_user_id=user_id,
tags=tags, df_date_range=date_range, df_status=status, df_event_id=event_id, df_title=title, df_message=message, **kwargs
)
# Feed df_source=monitor event.
dimension_tags='{"host":"web01"}' # dimension_tags must be the String(JSON format).
return self.feed_monitor_event(
df_dimension_tags=dimension_tags,
tags=tags, df_date_range=date_range, df_status=status, df_event_id=event_id, df_title=title, df_message=message, **kwargs
)
# Feed df_source=system event.
return self.feed_system_event(
tags=tags, df_date_range=date_range, df_status=status, df_event_id=event_id, df_title=title, df_message=message, **kwargs
)
Git Support¶
Git repo support is available. Once enabled, paths specified in conf args are relative to gitrepos. For instance, in this case, args should be set to mytest:
├── datakit
└── gitrepos
└── myconf
├── conf.d
│ └── pythond.conf
└── python.d
└── mytest
└── mytest.py
Complete Example¶
Step 1: Write a class that inherits from DataKitFramework:
from datakit_framework import DataKitFramework
class MyTest(DataKitFramework):
name = 'MyTest'
interval = 10 # triggered interval seconds.
# if your datakit ip is 127.0.0.1 and port is 9529, you won't need use this,
# just comment it.
# def __init__(self, **kwargs):
# super().__init__(ip = '127.0.0.1', port = 9529)
def run(self):
print("MyTest")
data = [
{
"measurement": "abc",
"tags": {
"t1": "b",
"t2": "d"
},
"fields": {
"f1": 123,
"f2": 3.4,
"f3": "strval"
},
# "time": 1624550216 # you don't need this
},
{
"measurement": "def",
"tags": {
"t1": "b",
"t2": "d"
},
"fields": {
"f1": 123,
"f2": 3.4,
"f3": "strval"
},
# "time": 1624550216 # you don't need this
}
]
in_data = {
'M':data,
'input': "datakitpy"
}
return self.report(in_data) # you must call self.report here
Step 2: We will not enable the git repo feature here. Place test.py in the mytest folder under python.d:
Step 3: Configure pythond.conf:
[[inputs.pythond]]
# Python collector name
name = 'some-python-inputs' # required
# Environment variables needed to run the Python collector
#envs = ['LD_LIBRARY_PATH=/path/to/lib:$LD_LIBRARY_PATH',]
# Path to the executable for the Python collector (absolute path is recommended)
cmd = "python3" # required. python3 is recommended.
# Relative path to user scripts (enter the folder name; all modules and .py files in the immediate subdirectory will be applied)
dirs = ["mytest"]
Step 4: Restart DataKit:
FAQ¶
How to Troubleshoot Errors¶
If the results do not meet expectations, check the following log files:
~/_datakit_pythond_cli.log_datakit_pythond_framework_[pythond name]_.log