Skip to content

Arbiter


Arbiter is the data analysis engine of SIEM (Security Information and Event Management).

When creating and editing SIEM detection rules, Arbiter processes data by executing written scripts and generates events. Arbiter provides a series of built-in functions, including functions related to script input and output such as the dql function for querying data from TrueWatch, the trigger function for triggering events, the printf function for outputting information to standard output, and more.

Quick Start

Taking the example of counting today's new access IPs compared to yesterday, the script is as follows:

Use the DQL statement R::`resource`:(distinct(`ip`) as ip) [2d:1d] to query the deduplicated IP data of user access from one day ago.

v = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
# Output the result to stdout
printf("%v", v)

The result of executing the script is:

{"series":[[{"columns":{"ip":"120.20.000.79","time":1747041737200},"tags":null},{"columns":{"ip":"120.130.000.85","time":1747031791143},"tags":null},{"columns":{"ip":"153.30.000.2","time":1747030318384},"tags":null}]],"status_code":200}

When the by statement is not used for grouping, the series list of the DQL query result usually contains only one element (i.e., only one time series).

We need to process the original result to obtain the IP list, which can be achieved in the following two ways:

This function retrieves all time series and returns a two-dimensional list; if the field does not exist, it uses nil as a placeholder:

  • Script:
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
ips = dql_series_get(result_dql, "ip")

printf("%v", ips)
  • Standard output:
["120.20.000.79","120.130.000.85","153.30.000.2"]

  • Script:
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")
ips = []
for series in result_dql["series"] {
    for elem in series {
        # It is known that the ip field is in columns, not in tags
        if "columns" in elem && "ip" in elem["columns"] {
            ips = append(ips, elem["columns"]["ip"])
        } else {
            # In the dql_series_get function, nil is added as a placeholder for fields that do not exist
            ips = append(ips, nil)
        }
    }
}
printf("%v", ips)
  • Standard output:
["120.250.000.179","120.130.000.185","153.30.000.42"]

After obtaining the IP list, you can start comparing today's data with yesterday's IP list data. The script reference is as follows:

# Yesterday's
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [2d:1d]")

ip_yesterday = dql_series_get(result_dql, "ip")

# Today's
result_dql = dql("R::`resource`:(distinct(`ip`) as ip) [1d]")
ip_today = dql_series_get(result_dql, "ip")


# It is recommended to check whether len(ip_today) > 0 or not, as a runtime error will occur if the index is out of bounds
new_ips = []
for s in ip_today[0] {
    if s == nil {
        continue
    }
    if !(s in ip_yesterday[0]) {
        new_ips = append(new_ips, s)
    }
}

# Trigger the new IP detection event
trigger(
    result=new_ips,
    status="info",
    dimension_tags={
        "user_cron_job_": "new_ips_check",
        "data_category": "rum"
    },
    related_data={
        "IPs": new_ips
    }
)

The trigger function supports multiple triggers. Since it is executed only once, the current result contains only one element, which can be used as a template variable on the webpage, such as {Result} corresponding to the result below:

[
  {
    "result": [
        "116.60.000.211",
        "14.150.000.99",
        "58.240.000.245"
    ],
    "status": "info",
    "dimension_tags": {
      "data_category": "rum",
      "user_cron_job_": "new_ips_check"
    },
    "related_data": {
      "IPs": [
        "116.60.000.211",
        "14.150.000.99",
        "58.240.000.245"
      ]
    }
  }
]

Use Cases

Abnormal Log Detection of Geographic Location Change

# The query time range is automatically set when triggered periodically
user_id_data = dql("L::access_log:(distict(user_id) as user_id)")

user_ids = dql_series_get(user_id_data, "user_id")
# user_ids value like [["user_id_1", "user_id_2", "user_id_3"]]

user_ids = user_ids[0]

for uid in user_ids {
    # Query user access country information, deduplicated
    country_data = dql(strfmt(
        "L::access_log:(distict(country) as country) {user_id = \"%s\"}", uid)) 

    country_list = dql_series_get(country_data, "country")

    # More than one country
    if len(country_list) == 1 && len(country_list[0]) > 1 {
        trigger( # Trigger an event
            result=uid,
            related_data = country_list[0]
        )
    }
}

Detect URL Access Sequence

# Get all session ids within the detection period
session_id_data = dql("R::view:(distinct(session_id) as session_id)")

session_id_list = dql_series_get(session_id_data, "session_id")
session_id_list = session_id_list[0]

for session_id in session_id_list {
    # Get all view paths for the specified session_id 
    view_data = dql(strfmt("R::view:(view_path) {session_id = \"%s\"} order by time asc", session_id))

    view_path = dql_series_get(view_data, "view_path")
    view_path = view_path[0]

    pos_login_api = -1
    pos_send_msg_api = -1

    for i = 0; i < len(view_path); i++ {
        if pos_login_api < 0 && view_path[i] == "/login" {
            pos_login_api = i
        }
        if pos_send_msg_api < 0 && view_path[i] == "/send_message" {
            pos_send_msg_api = i
        }

        if pos_login_api == -1 && pos_send_msg_api >= 0 {
            trigger(
                result= strfmt("In this session(%s), login was accessed before send_msg", session_id)
            )
        }

        # Since it is in ascending order, the first element is the earliest access time
        if pos_login_api > pos_send_msg_api {
            # Trigger an event
            trigger(
                result= strfmt("In this session(%s), login was accessed after send_msg", session_id)
            )
        }
    }
}

A Token Accessed by Multiple IPs

token_data = dql("L::access_log:(distict(token) as token)")

token_list = dql_series_get(token_data, "token")
token_list = token_list[0]

for token in token_list {
    token_ip_data = dql(strfmt(
        "L::access_log:(distinc(ip) as ip) {token = \"%s\"}", token)
    ip_list = dql_series_get(token_ip_data, "ip")

    if len(ip_list) == 0 && len(ip_list[0]) > 1  {
        # Trigger an event
        trigger(
            result = strfmt("token %s accessed by multiple IPs, IPs: %s", token, str_join(ip_list[0], ", "))
        )
    }
}