Skip to content

Arbiter


Arbiter is the data analysis engine of SIEM.

When creating and editing SIEM detection rules, Arbiter processes data by executing written scripts and generates events. Arbiter provides a series of built-in functions, including functions related to script input and output: the dql function, used to query data from TrueWatch, the trigger function used to trigger events, the printf function to output information to standard output, and more.

Quick Start

Taking the example of counting today's new access IPs compared to yesterday, the script is as follows:

Use the DQL statement R::`resource`:(distinct(`ip`) as ip) [2d:1d] to query the deduplicated IP data of user access from one day ago.

v = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
# Output the result to stdout
printf("%v", v)

The execution result of the script is:

{"series":[[{"columns":{"ip":"120.20.000.79","time":1747041737200},"tags":null},{"columns":{"ip":"120.130.000.85","time":1747031791143},"tags":null},{"columns":{"ip":"153.30.000.2","time":1747030318384},"tags":null}]],"status_code":200}

When the by statement is not used for grouping, the series list of the DQL query result usually contains only one element (i.e., only one time series).

We need to process the original result to obtain the IP list, which can be achieved in the following two ways:

This function retrieves all time series and returns a two-dimensional list; if the field does not exist, it uses nil as a placeholder:

  • Script:
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
ips = dql_series_get(result_dql, "ip")

printf("%v", ips)
  • Standard output:
["120.20.000.79","120.130.000.85","153.30.000.2"]

  • Script:
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
ips = []
for series in result_dql["series"] {
    for elem in series {
        # It is known that the ip field is in columns, not in tags
        if "columns" in elem && "ip" in elem["columns"] {
            ips = append(ips, elem["columns"]["ip"])
        } else {
            # In the function dql_series_get, for fields that do not exist, nil is added as a placeholder
            ips = append(ips, nil)
        }
    }
}
printf("%v", ips)
  • Standard output:
["120.250.000.179","120.130.000.185","153.30.000.42"]

After obtaining the IP list, you can start comparing today's data with yesterday's IP list data. The script is as follows:

# Yesterday's
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")

ip_yesterday = dql_series_get(result_dql, "ip")

# Today's
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [1d]")
ip_today = dql_series_get(result_dql, "ip")


# It is recommended to check whether len(ip_today) > 0, otherwise an out-of-bounds error will occur at runtime
new_ips = []
for s in ip_today[0] {
    if s == nil {
        continue
    }
    if !(s in ip_yesterday[0]) {
        new_ips = append(new_ips, s)
    }
}

# Trigger a new IP detection event
trigger(
    result=new_ips,
    status="info",
    dimension_tags={
        "user_cron_job_": "new_ips_check",
        "data_category": "rum"
    },
    related_data={
        "IPs": new_ips
    }
)

The trigger function supports multiple triggers. Since it is executed only once, the current result contains only one element, which can be used as a template variable on the web page, such as {Result} corresponding to the result below:

[
  {
    "result": [
        "116.60.000.211",
        "14.150.000.99",
        "58.240.000.245"
    ],
    "status": "info",
    "dimension_tags": {
      "data_category": "rum",
      "user_cron_job_": "new_ips_check"
    },
    "related_data": {
      "IPs": [
        "116.60.000.211",
        "14.150.000.99",
        "58.240.000.245"
      ]
    }
  }
]

Use Cases

Abnormal Log Identification for Geographic Location Changes

# The query time range will be automatically set when triggered periodically
user_id_data = dql("L::access_log:(distict(user_id) as user_id)")

user_ids = dql_series_get(user_id_data, "user_id")
# user_ids value like [["user_id_1", "user_id_2", "user_id_3"]]

user_ids = user_ids[0]

for uid in user_ids {
    # Query user access country information, deduplicated
    country_data = dql(strfmt(
        "L::access_log:(distict(country) as country) {user_id = \"%s\"}", uid)) 

    country_list = dql_series_get(country_data, "country")

    # More than one country
    if len(country_list) == 1 && len(country_list[0]) > 1 {
        trigger( # Trigger an event
            result=uid,
            related_data = country_list[0]
        )
    }
}

Detecting URL Access Order

# Get all session ids within the detection period
session_id_data = dql("R::view:(distinct(session_id) as session_id)")

session_id_list = dql_series_get(session_id_data, "session_id")
session_id_list = session_id_list[0]

for session_id in session_id_list {
    # Get all view paths for the specified session_id 
    view_data = dql(strfmt("R::view:(view_path) {session_id = \"%s\"} order by time asc", session_id))

    view_path = dql_series_get(view_data, "view_path")
    view_path = view_path[0]

    pos_login_api = -1
    pos_send_msg_api = -1

    for i = 0; i < len(view_path); i++ {
        if pos_login_api < 0 && view_path[i] == "/login" {
            pos_login_api = i
        }
        if pos_send_msg_api < 0 && view_path[i] == "/send_message" {
            pos_send_msg_api = i
        }

        if pos_login_api == -1 && pos_send_msg_api >= 0 {
            trigger(
                result= strfmt("In this session(%s), login was accessed before send_msg", session_id)
            )
        }

        # Since it is in ascending order, the first element is the earliest access time
        if pos_login_api > pos_send_msg_api {
            # Trigger an event
            trigger(
                result= strfmt("In this session(%s), login was accessed after send_msg", session_id)
            )
        }
    }
}

A Token Accessed by Multiple IPs

token_data = dql("L::access_log:(distict(token) as token)")

token_list = dql_series_get(token_data, "token")
token_list = token_list[0]

for token in token_list {
    token_ip_data = dql(strfmt(
        "L::access_log:(distinc(ip) as ip) {token = \"%s\"}", token)
    ip_list = dql_series_get(token_ip_data, "ip")

    if len(ip_list) == 0 && len(ip_list[0]) > 1  {
        # Trigger an event
        trigger(
            result = strfmt("token %s was accessed by multiple ips, IPs: %s", token, str_join(ip_list[0], ", "))
        )
    }
}