Skip to content

Commit acfc240

Browse files
committed
adding support for filter by tag in SPM
Signed-off-by: Harshil Gupta <[email protected]>
1 parent dc21563 commit acfc240

File tree

7 files changed

+10424
-12
lines changed

7 files changed

+10424
-12
lines changed

cmd/query/app/query_parser.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,16 @@ func (p *queryParser) parseMetricsQueryParams(r *http.Request) (bqp metricstore.
275275
if err != nil {
276276
return bqp, err
277277
}
278+
279+
// Parse tags if present
280+
tags, err := p.parseTags(r.Form[tagParam], r.Form[tagsParam])
281+
if err != nil {
282+
return bqp, err
283+
}
284+
if len(tags) > 0 {
285+
bqp.Tags = tags
286+
}
287+
278288
bqp.EndTime = &endTs
279289
bqp.Lookback = &lookback
280290
bqp.Step = &step

cmd/query/app/query_parser_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,67 @@ func TestParseRepeatedSpanKinds(t *testing.T) {
304304
}, mqp.SpanKinds)
305305
}
306306

307+
func TestParseMetricsQueryWithTags(t *testing.T) {
308+
tests := []struct {
309+
name string
310+
urlStr string
311+
wantTags map[string]string
312+
wantErr bool
313+
}{
314+
{
315+
name: "with simple tag",
316+
urlStr: "x?service=foo&tag=key:value",
317+
wantTags: map[string]string{"key": "value"},
318+
},
319+
{
320+
name: "with multiple tags",
321+
urlStr: "x?service=foo&tag=key1:value1&tag=key2:value2",
322+
wantTags: map[string]string{"key1": "value1", "key2": "value2"},
323+
},
324+
{
325+
name: "with JSON tags",
326+
urlStr: `x?service=foo&tags={"jsonKey":"jsonValue"}`,
327+
wantTags: map[string]string{"jsonKey": "jsonValue"},
328+
},
329+
{
330+
name: "with both simple and JSON tags",
331+
urlStr: `x?service=foo&tag=key:value&tags={"jsonKey":"jsonValue"}`,
332+
wantTags: map[string]string{"key": "value", "jsonKey": "jsonValue"},
333+
},
334+
{
335+
name: "with malformed tag",
336+
urlStr: "x?service=foo&tag=malformed",
337+
wantErr: true,
338+
},
339+
{
340+
name: "with malformed JSON tags",
341+
urlStr: `x?service=foo&tags={"jsonKey":1234}`,
342+
wantErr: true,
343+
},
344+
}
345+
346+
for _, tt := range tests {
347+
t.Run(tt.name, func(t *testing.T) {
348+
request, err := http.NewRequest(http.MethodGet, tt.urlStr, http.NoBody)
349+
require.NoError(t, err)
350+
351+
parser := &queryParser{
352+
timeNow: time.Now,
353+
}
354+
355+
mqp, err := parser.parseMetricsQueryParams(request)
356+
357+
if tt.wantErr {
358+
assert.Error(t, err)
359+
return
360+
}
361+
362+
require.NoError(t, err)
363+
assert.Equal(t, tt.wantTags, mqp.Tags)
364+
})
365+
}
366+
}
367+
307368
func TestParameterErrors(t *testing.T) {
308369
ts := initializeTestServer(t)
309370

internal/storage/metricstore/elasticsearch/query_builder.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,15 @@ func (q *QueryBuilder) BuildBoolQuery(params metricstore.BaseQueryParameters, ti
6666
spanKindQuery := elastic.NewTermsQuery("tag."+spanKindField, buildInterfaceSlice(normalizeSpanKinds(params.SpanKinds))...)
6767
boolQuery.Filter(spanKindQuery)
6868

69+
// Add tag filters if present
70+
if len(params.Tags) > 0 {
71+
for k, v := range params.Tags {
72+
tagKey := strings.ReplaceAll(k, ".", q.cfg.Tags.DotReplacement)
73+
tagQuery := elastic.NewTermQuery("tag."+tagKey, v)
74+
boolQuery.Filter(tagQuery)
75+
}
76+
}
77+
6978
// Add additional terms queries if provided
7079
for _, termQuery := range termsQueries {
7180
boolQuery.Filter(termQuery)

internal/storage/metricstore/elasticsearch/query_builder_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,55 @@ func TestBuildBoolQuery(t *testing.T) {
6666
require.Len(t, filterClause, 3) // services, span kinds, time range
6767
}
6868

69+
func TestBuildBoolQueryWithTags(t *testing.T) {
70+
qb := setupTestQB()
71+
params := metricstore.BaseQueryParameters{
72+
ServiceNames: []string{"service1"},
73+
SpanKinds: []string{"server"},
74+
Tags: map[string]string{
75+
"environment": "prod",
76+
"region": "us-east-1",
77+
},
78+
}
79+
80+
boolQuery := qb.BuildBoolQuery(params, commonTimeRange)
81+
require.NotNil(t, boolQuery)
82+
83+
src, err := boolQuery.Source()
84+
require.NoError(t, err)
85+
86+
queryMap := src.(map[string]any)
87+
boolClause := queryMap["bool"].(map[string]any)
88+
filterClause := boolClause["filter"].([]any)
89+
90+
require.Len(t, filterClause, 5) // services, span kinds, 2 tags, time range
91+
92+
// Validate the presence of tag filters
93+
tagFiltersFound := 0
94+
for _, filter := range filterClause {
95+
filterMap, ok := filter.(map[string]any)
96+
if !ok {
97+
continue
98+
}
99+
100+
if termQuery, ok := filterMap["term"]; ok {
101+
termMap, ok := termQuery.(map[string]any)
102+
if !ok {
103+
continue
104+
}
105+
106+
if _, ok := termMap["tag.environment"]; ok {
107+
tagFiltersFound++
108+
}
109+
if _, ok := termMap["tag.region"]; ok {
110+
tagFiltersFound++
111+
}
112+
}
113+
}
114+
115+
require.Equal(t, 2, tagFiltersFound, "Expected to find 2 tag filters")
116+
}
117+
69118
func TestBuildLatenciesAggregation(t *testing.T) {
70119
qb := setupTestQB()
71120
step := time.Minute

internal/storage/metricstore/prometheus/metricstore/reader.go

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type (
5151
spanKindFilter string
5252
serviceFilter string
5353
rate string
54+
tagFilters []string
5455
}
5556

5657
metricsQueryParams struct {
@@ -134,13 +135,23 @@ func (m MetricsReader) GetLatencies(ctx context.Context, requestParams *metricst
134135
metricName: "service_latencies",
135136
metricDesc: fmt.Sprintf("%.2fth quantile latency, grouped by service", requestParams.Quantile),
136137
buildPromQuery: func(p promQueryParams) string {
138+
// Build filter string including service_name, span_kind, and tags
139+
filters := []string{fmt.Sprintf(`service_name =~ %q`, p.serviceFilter)}
140+
141+
if p.spanKindFilter != "" {
142+
filters = append(filters, p.spanKindFilter)
143+
}
144+
145+
// Add tag filters
146+
filters = append(filters, p.tagFilters...)
147+
148+
filterStr := strings.Join(filters, ", ")
149+
137150
return fmt.Sprintf(
138-
// Note: p.spanKindFilter can be ""; trailing commas are okay within a timeseries selection.
139-
`histogram_quantile(%.2f, sum(rate(%s_bucket{service_name =~ %q, %s}[%s])) by (%s))`,
151+
`histogram_quantile(%.2f, sum(rate(%s_bucket{%s}[%s])) by (%s))`,
140152
requestParams.Quantile,
141153
m.latencyMetricName,
142-
p.serviceFilter,
143-
p.spanKindFilter,
154+
filterStr,
144155
p.rate,
145156
p.groupBy,
146157
)
@@ -177,12 +188,22 @@ func (m MetricsReader) GetCallRates(ctx context.Context, requestParams *metricst
177188
metricName: "service_call_rate",
178189
metricDesc: "calls/sec, grouped by service",
179190
buildPromQuery: func(p promQueryParams) string {
191+
// Build filter string including service_name, span_kind, and tags
192+
filters := []string{fmt.Sprintf(`service_name =~ %q`, p.serviceFilter)}
193+
194+
if p.spanKindFilter != "" {
195+
filters = append(filters, p.spanKindFilter)
196+
}
197+
198+
// Add tag filters
199+
filters = append(filters, p.tagFilters...)
200+
201+
filterStr := strings.Join(filters, ", ")
202+
180203
return fmt.Sprintf(
181-
// Note: p.spanKindFilter can be ""; trailing commas are okay within a timeseries selection.
182-
`sum(rate(%s{service_name =~ %q, %s}[%s])) by (%s)`,
204+
`sum(rate(%s{%s}[%s])) by (%s)`,
183205
m.callsMetricName,
184-
p.serviceFilter,
185-
p.spanKindFilter,
206+
filterStr,
186207
p.rate,
187208
p.groupBy,
188209
)
@@ -211,11 +232,30 @@ func (m MetricsReader) GetErrorRates(ctx context.Context, requestParams *metrics
211232
metricName: "service_error_rate",
212233
metricDesc: "error rate, computed as a fraction of errors/sec over calls/sec, grouped by service",
213234
buildPromQuery: func(p promQueryParams) string {
235+
// Build base filters for all queries (service_name)
236+
baseFilters := []string{fmt.Sprintf(`service_name =~ %q`, p.serviceFilter)}
237+
238+
// Add status_code filter only for error rate numerator, must be right after service_name to match test expectations
239+
errorFilters := append([]string{}, baseFilters...)
240+
errorFilters = append(errorFilters, `status_code = "STATUS_CODE_ERROR"`)
241+
242+
// Add span_kind filter
243+
if p.spanKindFilter != "" {
244+
baseFilters = append(baseFilters, p.spanKindFilter)
245+
errorFilters = append(errorFilters, p.spanKindFilter)
246+
}
247+
248+
// Add tag filters
249+
baseFilters = append(baseFilters, p.tagFilters...)
250+
errorFilters = append(errorFilters, p.tagFilters...)
251+
252+
errorFilterStr := strings.Join(errorFilters, ", ")
253+
baseFilterStr := strings.Join(baseFilters, ", ")
254+
214255
return fmt.Sprintf(
215-
// Note: p.spanKindFilter can be ""; trailing commas are okay within a timeseries selection.
216-
`sum(rate(%s{service_name =~ %q, status_code = "STATUS_CODE_ERROR", %s}[%s])) by (%s) / sum(rate(%s{service_name =~ %q, %s}[%s])) by (%s)`,
217-
m.callsMetricName, p.serviceFilter, p.spanKindFilter, p.rate, p.groupBy,
218-
m.callsMetricName, p.serviceFilter, p.spanKindFilter, p.rate, p.groupBy,
256+
`sum(rate(%s{%s}[%s])) by (%s) / sum(rate(%s{%s}[%s])) by (%s)`,
257+
m.callsMetricName, errorFilterStr, p.rate, p.groupBy,
258+
m.callsMetricName, baseFilterStr, p.rate, p.groupBy,
219259
)
220260
},
221261
}
@@ -308,11 +348,23 @@ func (m MetricsReader) buildPromQuery(metricsParams metricsQueryParams) string {
308348
if len(metricsParams.SpanKinds) > 0 {
309349
spanKindFilter = fmt.Sprintf(`span_kind =~ %q`, strings.Join(metricsParams.SpanKinds, "|"))
310350
}
351+
352+
// Build tag filters
353+
var tagFilters []string
354+
if len(metricsParams.Tags) > 0 {
355+
for k, v := range metricsParams.Tags {
356+
// Escape dots in key names for Prometheus compatibility
357+
escapedKey := strings.ReplaceAll(k, ".", "_")
358+
tagFilters = append(tagFilters, fmt.Sprintf(`%s=%q`, escapedKey, v))
359+
}
360+
}
361+
311362
promParams := promQueryParams{
312363
serviceFilter: strings.Join(metricsParams.ServiceNames, "|"),
313364
spanKindFilter: spanKindFilter,
314365
rate: promqlDurationString(metricsParams.RatePer),
315366
groupBy: strings.Join(groupBy, ","),
367+
tagFilters: tagFilters,
316368
}
317369
return metricsParams.buildPromQuery(promParams)
318370
}

internal/storage/v1/api/metricstore/interface.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type BaseQueryParameters struct {
4545
RatePer *time.Duration
4646
// SpanKinds is the list of span kinds to include (logical OR) in the resulting metrics aggregation.
4747
SpanKinds []string
48+
// Tags is a map of tag keys and values to filter the metrics by.
49+
Tags map[string]string
4850
}
4951

5052
// LatenciesQueryParameters contains the parameters required for latency metrics queries.

0 commit comments

Comments
 (0)