Skip to content

Add Additional Tags to Cloud Resource Data

1. Background

Generally, collectors extract only some universally important attributes as tags after obtaining resources from cloud providers, which may not be sufficient for some users. This article will introduce how to supplement additional tags to the data after collection (before reporting).

2. Solution

Without modifying the official collector, the collector itself provides an after_collect parameter. Users can assign a function to perform secondary processing on the collected data, including adding additional tags.

def handler(point):
    point['tags']['origin'] = 'shanghai'
    return point

@DFF.API('xxx Collection', timeout=3600, fixed_crontab='* * * * *')
def run():
    Runner(main.DataCollector(account, collector_configs, after_collect=handler), debug=True).run()

The above example omits irrelevant configurations, focusing on the handler function. This function supports only one parameter point, which is the data that the collector is about to report. The data structure can refer to the relevant collector documentation "Data Reporting Format". It is certain that point contains three fields: measurement, tags, and fields (for detailed understanding, please refer to the line protocol documentation). We focus on the point.tags field. Insert the key-value pairs to be supplemented into tags. In the example, a key-value pair with key as origin and value as shanghai is added to point.tags.

3. Case Study

Supplement the EC2 tags configured in the AWS console to the tags of the EC2 object data collected by the collector.

Scenario 1: Directly extract the Tags field from point.fields and supplement it to point.tags.

account = {
    'ak_id'     :  DFF.ENV('aws_develop_test')['ak_id'],
    'ak_secret' :  DFF.ENV('aws_develop_test')['ak_secret'],
}

collector_configs = {
    'regions': ['cn-northwest-1']
}

from integration_core__runner import Runner
import integration_aws_ec2__main as main
from integration_core__utils import json_loads

def add_tags(point):
    # If the Tags of the cloud resource exist in point.fields, directly take them
    cloud_tags = json_loads(point['fields'].get('Tags'))
    if not cloud_tags:
        return point

    for t in cloud_tags:
        t_key = t['Key']
        t_v = t['Value']

        # Existing tags should not be replaced (case-sensitive)
        protected_tags = [k.lower() for k in point['tags'].keys()]
        if t_key.lower() in protected_tags:
            continue

        # Be cautious about tags that start and end with double underscores, the following are prohibited
        if t_key.startswith('__') and t_key.endswith('__'):
            continue

        point['tags'][t_key] = t_v

    return point

@DFF.API('AWS-EC2 Collection', timeout=3600, fixed_crontab='*/15 * * * *')
def run():
    Runner(main.DataCollector(account, collector_configs, after_collect=add_tags)).run()

Scenario 2: Not all collectors' point.fields contain the Tags field (ongoing support...). If not supported, it needs to be obtained from the API provided by the cloud provider (or possibly the customer's own API):

account = {
    'ak_id'     :  DFF.ENV('aws_develop_test')['ak_id'],
    'ak_secret' :  DFF.ENV('aws_develop_test')['ak_secret'],
}

# collector configuration
collector_configs = {
    'regions': ['cn-northwest-1']
}

from integration_core__runner import Runner
import integration_aws_ec2__main as main
from integration_core__utils import json_loads
from integration_core__client import AWS

def add_tags(point):
    # If the Tags of the cloud resource do not exist in point.fields, you can call the cloud API to get them
    client = AWS(**account)
    region_id = point['tags']['RegionId']
    instance_id = point['tags']['InstanceId']
    biz_params = {
        'Filters': [
            {
                'Name': 'resource-id',
                'Values': [
                    instance_id,
                ]
            }
        ]
    }
    api_res = client.do_api(action='describe_tags', product='ec2', region_id=region_id, **biz_params)
    if not api_res:
        return point

    cloud_tags = api_res.get('Tags')
    if not cloud_tags:
        return point

    for t in cloud_tags:
        t_key = t['Key']
        t_v = t['Value']

        # Existing tags should not be replaced (case-sensitive)
        protected_tags = [k.lower() for k in point['tags'].keys()]
        if t_key.lower() in protected_tags:
            continue

        # Be cautious about tags that start and end with double underscores, this demo directly prohibits them
        if t_key.startswith('__') and t_key.endswith('__'):
            continue

        point['tags'][t_key] = t_v

    return point

@DFF.API('AWS-EC2 Collection', timeout=3600, fixed_crontab='*/15 * * * *')
def run():
    Runner(main.DataCollector(account, collector_configs, after_collect=add_tags)).run()

4. Key Considerations

  1. In cloud product collectors, custom object tags are automatically supplemented to the associated metric tags. Therefore, if you enable both custom object collectors and cloud monitoring collectors, you only need to supplement tags to the object collector.
  2. When supplementing tags to the data reported by the collector, be particularly careful that some fields cannot be overwritten, such as the name field of custom objects. It is recommended to follow the case study: if the original data tags contain the same key, do not supplement it again to prevent unexpected situations.
  3. The function assigned to after_collect only receives one parameter point. After processing point, the function must return one or more points. If there is no return or an error occurs during processing, the original data will be reported as is. When the after_collect function is defined but invalid, first check this possibility.