Event Data Sharding Practice: Implementation Based on Dataway Sink¶
This document provides a detailed explanation of how to achieve intelligent sharding of event data (keyevent
) through DataFlux Func HTTP Header Injection and Dataway Sinker Rule Configuration. With this solution, you can route event data with different business attributes and environmental characteristics to specified workspaces.
Solution Principle¶
Data Sharding Flow¶
Core Mechanism Explanation¶
-
DataFlux Func Side Injection Identification: When event data is reported, dynamically generate the
X-Global-Tags
Header through Func configuration, containing key-value pairs required for sharding (e.g.,env=prod
). -
Dataway Routing Matching: Dataway forwards events carrying specific identifiers to the corresponding workspace based on the rules defined in
sinker.json
.
Dataway Configuration¶
Before using this feature, ensure that Dataway has been deployed and the Sinker sharding feature is enabled.
For Sinker configuration, refer to: Dataway Sinker Configuration Guide;
Note: The Dataway used by DataFlux Func in the Deployment Plan is located in the utils
namespace under internal-dataway
.
DataFlux Func Configuration¶
Header Injection X-Global-Tags¶
Core Parameter Explanation¶
Parameter Name | Type | Description |
---|---|---|
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS |
list/string | Define the sharding identifier generation rules for event data |
Simple Example¶
Unify all workspace events into the "Event Central Management" workspace:
- Access the Launcher console;
- Go to the top right corner > Modify Application Configuration;
- Locate the
func2Config
configuration item under thefunc2
namespace; -
Add the configuration:
-
Configure Dataway Sinker rules: Modify the sinker.json configuration file to set data routing rules:
{
"strict": true,
"rules": [
{
"rules": [
"{ df_source = 'monitor' }"
],
"url": "Workspace Data Reporting URL"
}
]
}
Special Field Explanation¶
Field Name | Description |
---|---|
DF_WORKSPACE_UUID |
Workspace ID |
DF_WORKSPACE_NAME |
Workspace Name |
DF_MONITOR_CHECKER_ID |
Monitor ID |
DF_MONITOR_CHECKER_NAME |
Monitor Name |
More Advanced Configuration¶
Configuration Method | Example | Description |
---|---|---|
Direct Extraction | -host |
Extract the host field from the tags or fields of the event data |
Rename Field | -src:service; dest:business_type |
Rename the service field to business_type |
Value Mapping | remap:{order:电商业务} |
Map the original value order to 电商业务 |
Default Value | default:unknown |
Use the default value when the field does not exist |
Fixed Value | - dest:env; fixed:prod |
Directly inject the fixed value env=prod |
Global Tags Generation Rules¶
Field Name | Type | Default Value | Description |
---|---|---|---|
[#].category |
string/[string] | "*" |
Match the data's Category |
[#].fields |
string/dict [string]/[dict] | - | Extract data fields (including Tags and Fields); supports direct extraction and rule extraction |
[#].fields[#] |
string | - | Extract field name, and supports additional extraction fields (see table below) |
[#].fields[#] |
dict | - | Extract field rules |
[#].fields[#].src |
string | - | Extract field name, and supports additional extraction fields (see table below) |
[#].fields[#].dest |
string | Same as src |
Field name written to Header after extraction |
[#].fields[#].default |
string | - | Default value written to Header when the specified field does not exist |
[#].fields[#].fixed |
string | - | Fixed value written to Header |
[#].fields[#].remap |
dict | null |
Map the extracted field value |
[#].fields[#].remap_default |
string | - | Default value when there is no corresponding mapped value during the mapping conversion of the extracted field value If not specified, the original value is used If specified as null , this field is ignored |
[#].filter |
dict/string | null |
Data matching filter Supports Tag filtering and filterString filtering |
Custom Global Tags Generation Function ID¶
Function ID format is {script set ID}__{script ID}.{function name}
Function definition is as follows:
Parameter | Type | Description |
---|---|---|
category |
string | Category, e.g., "keyevent" |
point |
dict | Single piece of data to be processed |
point.measurement |
string | Data measurement |
point.tags |
dict | Data tags content |
point.fields |
dict | Data fields content |
extra_fields |
dict | Additional extraction fields (see table below) |
Example:
- point parameter value
{
"measurement": "keyevent",
"tags": {
"host": "web-001",
"ip" : "1.2.3.4"
},
"fields": {
"name": "Tom"
}
}
- extra_fields parameter value
{
"DF_WORKSPACE_UUID" : "wksp_xxxxx",
"DF_MONITOR_CHECKER_ID" : "rul_xxxxx",
"DF_MONITOR_CHECKER_NAME": "Monitor XXXXX",
"DF_WORKSPACE_NAME" : "Workspace XXXXX"
}
Generation Effect Verification¶
Example of adding key:value
in the Header {#example}
Writing Event Data to the Same Workspace¶
Extracting fields from events
Example Configuration
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
- category: keyevent
fields:
- host
- name
- DF_WORKSPACE_UUID
Example Data
{
"measurement": "keyevent",
"tags": {
"host": "web-001",
"ip" : "1.2.3.4"
},
"fields": {
"name": "Tom"
}
}
Example Header Written
Extracting a single field from events
Example Configuration
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
- category: keyevent
# Only 1 field can be abbreviated
fields: host
Example Data
Example Header Written
Writing All Data to the Same Workspace¶
Not writing category
means processing all data
Example Configuration
Example Data
Example Header Written
Other Cases¶
Changing the field name when extracting fields
Example Configuration
Example Data
Example Header Written
Mapping field values when extracting fields
Example Configuration
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
- fields:
- src : result
remap:
OK : ok
success: ok
failed : error
failure: error
timeout: error
remap_default: unknown
Example Data
Example Header Written
Using default values when extracting fields
Example Configuration
Example Data
Example Header Written
Writing fixed values
Example Configuration
Example Data
Example Header Written
Using Tag method to match data
Example Configuration
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
- fields: host
filter:
service: app-*
- fields: client_ip
filter:
service: web-*
Example Data
{
"measurement": "keyevent",
"tags": {
"host" : "app-001",
"client_ip": "1.2.3.4",
"service" : "app-user"
},
"fields": {
"name": "Tom"
}
}
Example Header Written
Using filterString method to match data
Example Configuration
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
- fields: host
filter: 'service:app-*'
- fields: client_ip
filter: 'service:web-*'
Example Data
{
"measurement": "keyevent",
"tags": {
"host" : "app-001",
"client_ip": "1.2.3.4",
"service" : "app-user"
},
"fields": {
"name": "Tom"
}
}
Example Header Written
Custom function method to extract event field prefixes and suffixes
Example Configuration
Example Function Located in Script Set my_script_set, Script my_script
def make_global_tags(category, point, extra_fields):
# Only process event type data
if category != 'keyevent':
return
global_tags_list = {}
# Get name, region fields from data's fields or tags
name = point['fields'].get('name') or point['tags'].get('name')
region = point['fields'].get('region') or point['tags'].get('region')
# Get name prefix
if name:
prefix = str(name).split('-')[0]
global_tags_list['name_prefix'] = prefix
# Get region suffix
if region:
suffix = str(region).split('-').pop()
global_tags_list['region_suffix'] = suffix
# Return
return global_tags_list
Example Data
{
"measurement": "keyevent",
"tags": {
"region" : "cn-shanghai",
"service" : "app-user"
},
"fields": {
"name": "Tom-Jerry"
}
}
Example Header Written
Event Reporting Example:
{
"measurement": "keyevent",
"tags": { "host": "web-01", "service": "order" },
"fields": { "message": "用户下单异常" }
}
Generated HTTP Header:
Dataway Sinker Rule Configuration¶
Rule File Example (sinker.json
)¶
{
"strict": false,
"rules": [
{
"rules": ["{ business_type = '电商业务' }"], // Match e-commerce business events
"url": "https://kodo.truewatch.com?token=tkn_电商空间令牌"
},
{
"rules": ["{ DF_WORKSPACE_UUID = 'wksp_123' }"], // Match specified workspace
"url": "https://backup.truewatch.com?token=tkn_备份空间令牌"
},
{
"rules": ["*"], // Default rule (must exist)
"url": "https://default.truewatch.com?token=tkn_默认空间令牌"
}
]
}
Rule Syntax Explanation¶
Operator | Example | Description |
---|---|---|
= |
{ env = 'prod' } |
Exact match |
!= |
{ env != 'test' } |
Not equal |
in |
{ region in ['cn-east','cn-north'] } |
Multi-value match |
match |
{ host match 'web-*' } |
Wildcard match |
Datakit End Configuration Explanation¶
Basic Configuration¶
# /usr/local/datakit/conf.d/datakit.conf
[dataway]
# Enable Sinker function
enable_sinker = true
# Define sharding basis fields (up to 3)
global_customer_keys = ["host", "env"]
Notes¶
- Field Type Limitation: Only supports string type fields (all Tag values are strings)
- Binary Data Support: Supports sharding of binary data such as Session Replay, Profiling
- Performance Impact: Each additional sharding field increases memory usage by about 5%
Impact of Global Tags¶
1. Global Tag Example¶
# datakit.conf
[election.tags]
cluster = "cluster-A" # Global election Tag
[global_tags]
region = "cn-east" # Global host Tag
2. Sharding Identifier Merge Logic¶
Assume the event data contains the following Tag:
Final Sharding Identifier:
Extended Explanation: Sharding of Other Data Types¶
1. Custom Sharding Rules¶
For non-event data (e.g., logging
, metric
), achieve sharding by specifying category
:
# Func configuration example: Process logging data
CUSTOM_INTERNAL_DATAWAY_X_GLOBAL_TAGS:
- category: logging
fields:
- src: log_level
remap:
error: Critical Error
warn: General Warning
- service
2. General Principles¶
- Isolated Configuration: Use independent configuration blocks for different data categories (
keyevent
/logging
/metric
) - Field Concision: The sharding identifiers for a single data category should not exceed 3
- Avoid Conflicts: Different categories' sharding fields should use different names
Troubleshooting¶
Common Issues¶
Phenomenon | Troubleshooting Steps |
---|---|
Sharding Not Effective | 1. Check Dataway logs grep 'sinker reload' 2. Use curl -v to verify Header3. Check Sinker rule priority |
Partial Data Loss | 1. Confirm strict mode status2. Check if the default rule exists |
Identifier Not Injected | 1. Verify Func configuration syntax 2. Check if the field is of string type |