Skip to content

APM Trace 查询

0x01 关键信息

a. 适用场景

APM 链路追踪数据查询、Trace 详情获取、ES 查询调试。

0x02 代码片段

a. 查询 Trace 详情

资源:apm.resources.QueryTraceDetailResource

json
{
  "bk_biz_id": 2,
  "app_name": "bkop",
  "trace_id": "2927858ceff8ed8375e9723891a3eba7",
  "displays": [],
  "query_trace_relation_app": false
}

b. 使用 QueryProxy 查询

python
from apm.core.handlers.query.proxy import QueryProxy

proxy = QueryProxy(2, "sandtrpc")
# 时间范围:1721665179, 1721751579
# trace_id: dc65d13f38263e1b3e31404810296b78

c. ES DSL 查询示例

python
import time
import datetime
from functools import wraps
from metadata import models
from apm.core.handlers.query.proxy import QueryProxy

def timer(func):
    @wraps(func)
    def inner(*args, **kwargs):
        start: float = time.time()
        ret = func(*args, **kwargs)
        cost = time.time() - start
        print(f"[timer] func -> {func.__name__}, cost -> {round(cost, 3)}")
        return {"ret": ret, "cost": cost}
    return inner

@timer
def execute_dsl(_client, _index, _dsl):
    return _client.search(index=_index, doc_type="_doc", body=_dsl)


end_time = int(datetime.datetime.now().timestamp())
begin_time = end_time - int(datetime.timedelta(hours=1).total_seconds())
begin_time, end_time = begin_time * 1000000, end_time * 1000000

dsl = {
    "query": {
        "bool": {
            "filter": [
                {"range": {"end_time": {"gt": begin_time, "lte": end_time}}},
                {"terms": {"status.code": ["2"]}},
                {"terms": {"resource.service.name": ["example.greeter"]}}
            ]
        }
    },
    "size": 10000,
    "_source": [
        "resource.service.name", "span_name", "trace_id", 
        "events.attributes.exception.type", "events.name", "time"
    ]
}

bk_biz_id = 60
app_name = "trpc-galileo-sdk-access-demo"
proxy = QueryProxy(bk_biz_id, app_name)
storage = models.ESStorage.objects.get(table_id=f"{bk_biz_id}_bkapm.trace_{app_name}")
result = execute_dsl(storage.es_client, f"v2_{bk_biz_id}_bkapm_trace_{app_name}_*_*", dsl)

# 查看 mapping
# storage.es_client.indices.get_mapping(f"v2_{bk_biz_id}_bkapm_trace_{app_name}_*_*")

d. Trace 详情处理

python
from core.drf_resource import Resource, api
from apm_web.handlers.trace_handler.base import TraceHandler
from apm_web.trace.diagram.service_topo import trace_data_to_service_topo
from apm_web.trace.diagram.topo import trace_data_to_topo_data

validated_request_data = {
    "bk_biz_id": 101067, 
    "app_name": "stress", 
    "trace_id": "00f37ae98580b22ed5582a86b8c271c6", 
    "displays": [], 
    "query_trace_relation_app": False, 
    "bk_username": "fixme:英文名"
}

data = api.apm_api.query_trace_detail({
    "bk_biz_id": validated_request_data["bk_biz_id"],
    "app_name": validated_request_data["app_name"],
    "trace_id": validated_request_data["trace_id"],
    "displays": validated_request_data["displays"],
    "query_trace_relation_app": validated_request_data["query_trace_relation_app"],
})

handled_data = TraceHandler.handle_trace(
    validated_request_data["app_name"],
    data["trace_data"],
    validated_request_data["trace_id"],
    data["relation_mapping"],
    validated_request_data.get("displays"),
    validated_request_data.get("enabled_time_alignment"),
)

topo_data = trace_data_to_topo_data(handled_data["original_data"])
handled_data["topo_relation"] = topo_data["relations"]
handled_data["topo_nodes"] = topo_data["nodes"]
service_topo_data = trace_data_to_service_topo(handled_data["original_data"])
handled_data.update(service_topo_data)
handled_data.update(data.get("options"))