Arbiter¶
Arbiter is the data analysis engine of SIEM (Security Information and Event Management). Through Arbiter, log, trace, Metrics category data can be analyzed, and events can be generated.
Arbiter processes data by executing written scripts and generates events. Arbiter provides a series of built-in functions. Functions related to script input and output include: the dql
function used to query data from TrueWatch, the trigger
function for triggering events, and the printf
function for outputting information to standard output, among others.
Quick Start¶
First Script¶
As an example, let's count today's new access IPs compared to yesterday. The script is as follows:
Use the DQL statement R::`resource`:(distinct(`ip`) as ip) [2d:1d]
to query the deduplicated IP data accessed by users one day ago.
v = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
# Output the result to stdout
printf("%v", v)
The result of executing the script is:
{"series":[[{"columns":{"ip":"120.20.000.79","time":1747041737200},"tags":null},{"columns":{"ip":"120.130.000.85","time":1747031791143},"tags":null},{"columns":{"ip":"153.30.000.2","time":1747030318384},"tags":null}]],"status_code":200}
When not using the by
clause for grouping, the series
list in the DQL query results usually has only one element (i.e., it contains only one Time Series).
We need to process the original results to obtain the IP list, which can be achieved through the following two methods:
-
Using the
dql_series_get
function:This function retrieves all time series and returns a two-dimensional list; if the field does not exist, it uses
nil
as a placeholder:- Script:
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]") ips = dql_series_get(result_dql, "ip") printf("%v", ips)
- Standard output:
-
Using a
for
loop to iterate:- Script:
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]") ips = [] for series in result_dql["series"] { for elem in series { # It is known that the `ip` field is in columns, not in tags if "columns" in elem && "ip" in elem["columns"] { ips = append(ips, elem["columns"]["ip"]) } else { # In the `dql_series_get` function, if there is no such field, `nil` is added as a placeholder ips = append(ips, nil) } } } printf("%v", ips)
- Standard output:
After obtaining the IP list, you can start comparing today's data with yesterday's IP list data. A reference script is as follows:
# Yesterday's
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
ip_yesterday = dql_series_get(result_dql, "ip")
# Today's
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [1d]")
ip_today = dql_series_get(result_dql, "ip")
# It is recommended to check whether len(ip_today) > 0 or not, because runtime errors will occur if there is an out-of-bounds index
new_ips = []
for s in ip_today[0] {
if s == nil {
continue
}
if !(s in ip_yesterday[0]) {
new_ips = append(new_ips, s)
}
}
# Trigger the event for detecting new IPs
trigger(
result=new_ips,
status="info",
dimension_tags={
"user_cron_job_": "new_ips_check",
"data_category": "rum"
},
related_data={
"IPs": new_ips
}
)
The trigger
function supports multiple triggers. Since it is executed only once here, there is currently only one element in the result, and its result can be used as a template variable on the webpage, such as {Result}
corresponding to the following result
: