Skip to content

Event Data Sharding Practice: Implementation Based on Dataway Sink


This document provides a detailed explanation of how to achieve intelligent sharding of event data (keyevent) through DataFlux Func HTTP Header Injection and Dataway Sinker Rule Configuration. With this solution, you can route event data with different business attributes and environmental characteristics to specified workspaces.

Solution Principle

Data Sharding Flow

Core Mechanism Explanation

  1. DataFlux Func Side Injection Identification: When event data is reported, dynamically generate the X-Global-Tags Header through Func configuration, containing key-value pairs required for sharding (e.g., env=prod).

  2. Dataway Routing Matching: Dataway forwards events carrying specific identifiers to the corresponding workspace based on the rules defined in sinker.json.

Dataway Configuration

Before using this feature, ensure that Dataway has been deployed and the Sinker sharding feature is enabled.

For Sinker configuration, refer to: Dataway Sinker Configuration Guide;

Note: The Dataway used by DataFlux Func in the Deployment Plan is located in the utils namespace under internal-dataway.

DataFlux Func Configuration

Header Injection X-Global-Tags

Core Parameter Explanation

Parameter Name Type Description
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS list/string Define the sharding identifier generation rules for event data

Simple Example

Unify all workspace events into the "Event Central Management" workspace:

  1. Access the Launcher console;
  2. Go to the top right corner > Modify Application Configuration;
  3. Locate the func2Config configuration item under the func2 namespace;
  4. Add the configuration:

    CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
      - category: keyevent     # Data category
        fields: df_source      # Field used for sharding, fill in the fixed identifier field of the event here
    
  5. Configure Dataway Sinker rules: Modify the sinker.json configuration file to set data routing rules:

{
    "strict": true,
    "rules": [
        {
            "rules": [
                "{ df_source = 'monitor' }"
            ],
            "url": "Workspace Data Reporting URL"
        }
    ]
}

Special Field Explanation

Field Name Description
DF_WORKSPACE_UUID Workspace ID
DF_WORKSPACE_NAME Workspace Name
DF_MONITOR_CHECKER_ID Monitor ID
DF_MONITOR_CHECKER_NAME Monitor Name

More Advanced Configuration

Configuration Method Example Description
Direct Extraction -host Extract the host field from the tags or fields of the event data
Rename Field -src:service; dest:business_type Rename the service field to business_type
Value Mapping remap:{order:电商业务} Map the original value order to 电商业务
Default Value default:unknown Use the default value when the field does not exist
Fixed Value - dest:env; fixed:prod Directly inject the fixed value env=prod
Global Tags Generation Rules
Field Name Type Default Value Description
[#].category string/[string] "*" Match the data's Category
[#].fields string/dict [string]/[dict] - Extract data fields (including Tags and Fields); supports direct extraction and rule extraction
[#].fields[#] string - Extract field name, and supports additional extraction fields (see table below)
[#].fields[#] dict - Extract field rules
[#].fields[#].src string - Extract field name, and supports additional extraction fields (see table below)
[#].fields[#].dest string Same as src Field name written to Header after extraction
[#].fields[#].default string - Default value written to Header when the specified field does not exist
[#].fields[#].fixed string - Fixed value written to Header
[#].fields[#].remap dict null Map the extracted field value
[#].fields[#].remap_default string - Default value when there is no corresponding mapped value during the mapping conversion of the extracted field value
If not specified, the original value is used
If specified as null, this field is ignored
[#].filter dict/string null Data matching filter
Supports Tag filtering and filterString filtering
Custom Global Tags Generation Function ID

Function ID format is {script set ID}__{script ID}.{function name}

Function definition is as follows:

Parameter Type Description
category string Category, e.g., "keyevent"
point dict Single piece of data to be processed
point.measurement string Data measurement
point.tags dict Data tags content
point.fields dict Data fields content
extra_fields dict Additional extraction fields (see table below)

Example:

  • point parameter value
{
  "measurement": "keyevent",
  "tags": {
    "host": "web-001",
    "ip"  : "1.2.3.4"
  },
  "fields": {
    "name": "Tom"
  }
}
  • extra_fields parameter value
{
  "DF_WORKSPACE_UUID"      : "wksp_xxxxx",
  "DF_MONITOR_CHECKER_ID"  : "rul_xxxxx",
  "DF_MONITOR_CHECKER_NAME": "Monitor XXXXX",
  "DF_WORKSPACE_NAME"      : "Workspace XXXXX"
}

Generation Effect Verification

Example of adding key:value in the Header {#example}

Writing Event Data to the Same Workspace

Extracting fields from events

Example Configuration

CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
  - category: keyevent
    fields:
      - host
      - name
      - DF_WORKSPACE_UUID

Example Data

{
  "measurement": "keyevent",
  "tags": {
    "host": "web-001",
    "ip"  : "1.2.3.4"
  },
  "fields": {
    "name": "Tom"
  }
}

Example Header Written

X-Global-Tags: host=web-001,name=Tom,DF_WORKSPACE_UUID=wksp_xxxxx

Extracting a single field from events

Example Configuration

CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
  - category: keyevent
    # Only 1 field can be abbreviated
    fields: host

Example Data

{
  "measurement": "keyevent",
  "tags": {
    "host": "web-001"
  },
  "fields": {
    "name": "Tom"
  }
}

Example Header Written

X-Global-Tags: host=web-001
Writing All Data to the Same Workspace

Not writing category means processing all data

Example Configuration

CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
  - fields: DF_WORKSPACE_UUID

Example Data

{
  "measurement": "keyevent",
  "tags": {
    "host": "web-001"
  },
  "fields": {
    "name": "Tom"
  }
}

Example Header Written

X-Global-Tags: DF_WORKSPACE_UUID=wksp_xxxxx
Other Cases

Changing the field name when extracting fields

Example Configuration

CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
  - fields:
    - src : host
      dest: HOST

Example Data

{
  "measurement": "keyevent",
  "tags": {
    "host": "web-001"
  },
  "fields": {
    "name": "Tom"
  }
}

Example Header Written

X-Global-Tags: HOST=web-001

Mapping field values when extracting fields

Example Configuration

CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
  - fields:
    - src : result
      remap:
        OK     : ok
        success: ok
        failed : error
        failure: error
        timeout: error
      remap_default: unknown

Example Data

{
  "measurement": "keyevent",
  "tags": {
    "result": "success"
  },
  "fields": {
    "name": "Tom"
  }
}

Example Header Written

X-Global-Tags: result=ok

Using default values when extracting fields

Example Configuration

CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
  - fields:
    - src    : result
      default: unknown

Example Data

{
  "measurement": "keyevent",
  "tags": {
    "host": "web-001"
  },
  "fields": {
    "name": "Tom"
  }
}

Example Header Written

X-Global-Tags: result=unknown

Writing fixed values

Example Configuration

CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
  - fields:
    - dist : app
      fixed: dataPlatform

Example Data

{
  "measurement": "keyevent",
  "tags": {
    "host": "web-001"
  },
  "fields": {
    "name": "Tom"
  }
}

Example Header Written

X-Global-Tags: app=dataPlatform

Using Tag method to match data

Example Configuration

CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
  - fields: host
    filter:
      service: app-*
  - fields: client_ip
    filter:
      service: web-*

Example Data

{
  "measurement": "keyevent",
  "tags": {
    "host"     : "app-001",
    "client_ip": "1.2.3.4",
    "service"  : "app-user"
  },
  "fields": {
    "name": "Tom"
  }
}

Example Header Written

X-Global-Tags: host=app-001

Using filterString method to match data

Example Configuration

CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
  - fields: host
    filter: 'service:app-*'
  - fields: client_ip
    filter: 'service:web-*'

Example Data

{
  "measurement": "keyevent",
  "tags": {
    "host"     : "app-001",
    "client_ip": "1.2.3.4",
    "service"  : "app-user"
  },
  "fields": {
    "name": "Tom"
  }
}

Example Header Written

X-Global-Tags: host=app-001

Custom function method to extract event field prefixes and suffixes

Example Configuration

CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS: my_script_set__my_script.make_global_tags

Example Function Located in Script Set my_script_set, Script my_script

def make_global_tags(category, point, extra_fields):
    # Only process event type data
    if category != 'keyevent':
        return

    global_tags_list = {}

    # Get name, region fields from data's fields or tags
    name   = point['fields'].get('name')   or point['tags'].get('name')
    region = point['fields'].get('region') or point['tags'].get('region')

    # Get name prefix
    if name:
        prefix = str(name).split('-')[0]
        global_tags_list['name_prefix'] = prefix

    # Get region suffix
    if region:
        suffix = str(region).split('-').pop()
        global_tags_list['region_suffix'] = suffix

    # Return
    return global_tags_list

Example Data

{
  "measurement": "keyevent",
  "tags": {
    "region"   : "cn-shanghai",
    "service"  : "app-user"
  },
  "fields": {
    "name": "Tom-Jerry"
  }
}

Example Header Written

X-Global-Tags: name_prefix=Tom,region_suffix=shanghai

Event Reporting Example:

{
  "measurement": "keyevent",
  "tags": { "host": "web-01", "service": "order" },
  "fields": { "message": "用户下单异常" }
}

Generated HTTP Header:

X-Global-Tags: host=web-01,business_type=电商业务,DF_WORKSPACE_UUID=wksp_123

Dataway Sinker Rule Configuration

Rule File Example (sinker.json)

{
  "strict": false,
  "rules": [
    {
      "rules": ["{ business_type = '电商业务' }"],  // Match e-commerce business events
      "url": "https://kodo.truewatch.com?token=tkn_电商空间令牌"
    },
    {
      "rules": ["{ DF_WORKSPACE_UUID = 'wksp_123' }"],  // Match specified workspace
      "url": "https://backup.truewatch.com?token=tkn_备份空间令牌"
    },
    {
      "rules": ["*"],  // Default rule (must exist)
      "url": "https://default.truewatch.com?token=tkn_默认空间令牌"
    }
  ]
}

Rule Syntax Explanation

Operator Example Description
= { env = 'prod' } Exact match
!= { env != 'test' } Not equal
in { region in ['cn-east','cn-north'] } Multi-value match
match { host match 'web-*' } Wildcard match

Datakit End Configuration Explanation

Basic Configuration

# /usr/local/datakit/conf.d/datakit.conf
[dataway]
  # Enable Sinker function
  enable_sinker = true
  # Define sharding basis fields (up to 3)
  global_customer_keys = ["host", "env"]

Notes

  • Field Type Limitation: Only supports string type fields (all Tag values are strings)
  • Binary Data Support: Supports sharding of binary data such as Session Replay, Profiling
  • Performance Impact: Each additional sharding field increases memory usage by about 5%

Impact of Global Tags

1. Global Tag Example

# datakit.conf
[election.tags]
    cluster = "cluster-A"  # Global election Tag
[global_tags]
    region = "cn-east"     # Global host Tag

2. Sharding Identifier Merge Logic

Assume the event data contains the following Tag:

{
  "tags": { "cluster": "cluster-B", "app": "payment" }
}

Final Sharding Identifier:

X-Global-Tags: cluster=cluster-B,region=cn-east

Extended Explanation: Sharding of Other Data Types

1. Custom Sharding Rules

For non-event data (e.g., logging, metric), achieve sharding by specifying category:

# Func configuration example: Process logging data
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
  - category: logging
    fields:
      - src: log_level
        remap:
          error: Critical Error
          warn: General Warning
      - service

2. General Principles

  • Isolated Configuration: Use independent configuration blocks for different data categories (keyevent/logging/metric)
  • Field Concision: The sharding identifiers for a single data category should not exceed 3
  • Avoid Conflicts: Different categories' sharding fields should use different names

Troubleshooting

Common Issues

Phenomenon Troubleshooting Steps
Sharding Not Effective 1. Check Dataway logs grep 'sinker reload'
2. Use curl -v to verify Header
3. Check Sinker rule priority
Partial Data Loss 1. Confirm strict mode status
2. Check if the default rule exists
Identifier Not Injected 1. Verify Func configuration syntax
2. Check if the field is of string type

Diagnostic Commands

# View Dataway sharding statistics
curl http://localhost:9528/metrics | grep sinker_requests_total
# Manually test sharding rules
curl -X POST -H "X-Global-Tags: business_type=电商业务" http://dataway/v1/write/keyevent