Arbiter¶
Arbiter is the data analysis engine of SIEM (Security Information and Event Management).
When creating and editing SIEM detection rules, Arbiter processes data by executing written scripts and generates events. Arbiter provides a series of built-in functions, including functions related to script input and output such as the dql
function for querying data from TrueWatch, the trigger
function for triggering events, the printf
function for outputting information to standard output, and more.
Quick Start¶
Taking the example of counting today's new access IPs compared to yesterday, the script is as follows:
Use the DQL statement R::`resource`:(distinct(`ip`) as ip) [2d:1d]
to query the deduplicated IP data of user access from one day ago.
v = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
# Output the result to stdout
printf("%v", v)
The result of executing the script is:
{"series":[[{"columns":{"ip":"120.20.000.79","time":1747041737200},"tags":null},{"columns":{"ip":"120.130.000.85","time":1747031791143},"tags":null},{"columns":{"ip":"153.30.000.2","time":1747030318384},"tags":null}]],"status_code":200}
When the by
statement is not used for grouping, the series
list of the DQL query result usually contains only one element (i.e., only one time series).
We need to process the original result to obtain the IP list, which can be achieved in the following two ways:
This function retrieves all time series and returns a two-dimensional list; if the field does not exist, it uses nil
as a placeholder:
- Script:
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
ips = dql_series_get(result_dql, "ip")
printf("%v", ips)
- Standard output:
- Script:
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
ips = []
for series in result_dql["series"] {
for elem in series {
# It is known that the ip field is in columns, not in tags
if "columns" in elem && "ip" in elem["columns"] {
ips = append(ips, elem["columns"]["ip"])
} else {
# In the dql_series_get function, nil is added as a placeholder for fields that do not exist
ips = append(ips, nil)
}
}
}
printf("%v", ips)
- Standard output:
After obtaining the IP list, you can start comparing today's data with yesterday's IP list data. The script reference is as follows:
# Yesterday's
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
ip_yesterday = dql_series_get(result_dql, "ip")
# Today's
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [1d]")
ip_today = dql_series_get(result_dql, "ip")
# It is recommended to check whether len(ip_today) > 0 or not, as a runtime error will occur if the index is out of bounds
new_ips = []
for s in ip_today[0] {
if s == nil {
continue
}
if !(s in ip_yesterday[0]) {
new_ips = append(new_ips, s)
}
}
# Trigger the new IP detection event
trigger(
result=new_ips,
status="info",
dimension_tags={
"user_cron_job_": "new_ips_check",
"data_category": "rum"
},
related_data={
"IPs": new_ips
}
)
The trigger
function supports multiple triggers. Since it is executed only once, the current result contains only one element, which can be used as a template variable on the webpage, such as {Result}
corresponding to the result
below:
[
{
"result": [
"116.60.000.211",
"14.150.000.99",
"58.240.000.245"
],
"status": "info",
"dimension_tags": {
"data_category": "rum",
"user_cron_job_": "new_ips_check"
},
"related_data": {
"IPs": [
"116.60.000.211",
"14.150.000.99",
"58.240.000.245"
]
}
}
]
Use Cases¶
Abnormal Log Detection of Geographic Location Change¶
# The query time range is automatically set when triggered periodically
user_id_data = dql("L::access_log:(distict(user_id) as user_id)")
user_ids = dql_series_get(user_id_data, "user_id")
# user_ids value like [["user_id_1", "user_id_2", "user_id_3"]]
user_ids = user_ids[0]
for uid in user_ids {
# Query user access country information, deduplicated
country_data = dql(strfmt(
"L::access_log:(distict(country) as country) {user_id = \"%s\"}", uid))
country_list = dql_series_get(country_data, "country")
# More than one country
if len(country_list) == 1 && len(country_list[0]) > 1 {
trigger( # Trigger an event
result=uid,
related_data = country_list[0]
)
}
}
Detect URL Access Sequence¶
# Get all session ids within the detection period
session_id_data = dql("R::view:(distinct(session_id) as session_id)")
session_id_list = dql_series_get(session_id_data, "session_id")
session_id_list = session_id_list[0]
for session_id in session_id_list {
# Get all view paths for the specified session_id
view_data = dql(strfmt("R::view:(view_path) {session_id = \"%s\"} order by time asc", session_id))
view_path = dql_series_get(view_data, "view_path")
view_path = view_path[0]
pos_login_api = -1
pos_send_msg_api = -1
for i = 0; i < len(view_path); i++ {
if pos_login_api < 0 && view_path[i] == "/login" {
pos_login_api = i
}
if pos_send_msg_api < 0 && view_path[i] == "/send_message" {
pos_send_msg_api = i
}
if pos_login_api == -1 && pos_send_msg_api >= 0 {
trigger(
result= strfmt("In this session(%s), login was accessed before send_msg", session_id)
)
}
# Since it is in ascending order, the first element is the earliest access time
if pos_login_api > pos_send_msg_api {
# Trigger an event
trigger(
result= strfmt("In this session(%s), login was accessed after send_msg", session_id)
)
}
}
}
A Token Accessed by Multiple IPs¶
token_data = dql("L::access_log:(distict(token) as token)")
token_list = dql_series_get(token_data, "token")
token_list = token_list[0]
for token in token_list {
token_ip_data = dql(strfmt(
"L::access_log:(distinc(ip) as ip) {token = \"%s\"}", token)
ip_list = dql_series_get(token_ip_data, "ip")
if len(ip_list) == 0 && len(ip_list[0]) > 1 {
# Trigger an event
trigger(
result = strfmt("token %s accessed by multiple IPs, IPs: %s", token, str_join(ip_list[0], ", "))
)
}
}