Skip to content

Commit b2e5c8d

Browse files
authored
fix: add outgoing metrics to proxy flow (#6355)
# Description This PR adds outgoing metrics to the proxy flow in the router worker to ensure consistent metrics collection for both direct and proxy transformer requests. ## Changes - Added transformer outgoing request metrics for proxy flow using `w.recordTransformerOutgoingRequestMetrics()` - Refactored `recordTransformerOutgoingRequestMetrics` to accept status code directly instead of full response object - Moved timing measurement to be consistent between proxy and direct flows ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent ffba3b8 commit b2e5c8d

File tree

2 files changed

+290
-224
lines changed

2 files changed

+290
-224
lines changed

router/worker.go

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -503,10 +503,14 @@ func (w *worker) process(destinationJobs []types.DestinationJobT) {
503503
if err != nil && waitCtx.Err() == nil {
504504
w.logger.Errorn("delivery throttler wait error", obskit.Error(err))
505505
}
506+
rdlTime := time.Now()
506507
if transformerProxy {
507508
attemptedRequests++
508509
attemptedJobs += len(destinationJob.JobMetadataArray)
509510
resp := w.proxyRequest(ctx, destinationJob, val)
511+
// Record the new transformer_outgoing_request metrics
512+
w.recordTransformerOutgoingRequestMetrics(val, destinationJob, resp.ProxyRequestStatusCode, time.Since(rdlTime))
513+
510514
for k, v := range resp.DontBatchDirectives {
511515
dontBatchDirectives[k] = v
512516
}
@@ -524,15 +528,14 @@ func (w *worker) process(destinationJobs []types.DestinationJobT) {
524528
}
525529
} else {
526530
sendCtx, cancel := context.WithTimeout(ctx, w.rt.netClientTimeout)
527-
rdlTime := time.Now()
528531
attemptedRequests++
529532
attemptedJobs += len(destinationJob.JobMetadataArray)
530533
resp := w.rt.netHandle.SendPost(sendCtx, val)
531534
cancel()
532535
respStatusCode, respBodyTemp, respContentType = resp.StatusCode, string(resp.ResponseBody), resp.ResponseContentType
533536

534537
// Record the new transformer_outgoing_request metrics
535-
w.recordTransformerOutgoingRequestMetrics(val, destinationJob, resp, time.Since(rdlTime))
538+
w.recordTransformerOutgoingRequestMetrics(val, destinationJob, respStatusCode, time.Since(rdlTime))
536539

537540
w.routerDeliveryLatencyStat.SendTiming(time.Since(rdlTime))
538541

@@ -1213,7 +1216,7 @@ func (w *worker) countTransformedJobStatuses(transformType string, transformedJo
12131216
func (w *worker) recordTransformerOutgoingRequestMetrics(
12141217
postParams integrations.PostParametersT,
12151218
destinationJob types.DestinationJobT,
1216-
resp *routerutils.SendPostResponse,
1219+
respStatus int,
12171220
duration time.Duration,
12181221
) {
12191222
// Only emit metrics if EndpointPath is present
@@ -1222,13 +1225,14 @@ func (w *worker) recordTransformerOutgoingRequestMetrics(
12221225
}
12231226

12241227
labels := deliveryMetricLabels{
1225-
DestType: w.rt.destType,
1226-
EndpointPath: postParams.EndpointPath,
1227-
StatusCode: strconv.Itoa(resp.StatusCode),
1228-
RequestMethod: postParams.RequestMethod,
1229-
Module: "router",
1230-
WorkspaceID: destinationJob.Destination.WorkspaceID,
1231-
DestinationID: destinationJob.Destination.ID,
1228+
DestType: w.rt.destType,
1229+
TransformerProxy: w.rt.reloadableConfig.transformerProxy.Load(),
1230+
EndpointPath: postParams.EndpointPath,
1231+
StatusCode: respStatus,
1232+
RequestMethod: postParams.RequestMethod,
1233+
Module: "router",
1234+
WorkspaceID: destinationJob.Destination.WorkspaceID,
1235+
DestinationID: destinationJob.Destination.ID,
12321236
}
12331237

12341238
// Get or create cached stats objects using StatsCache
@@ -1242,24 +1246,26 @@ func (w *worker) recordTransformerOutgoingRequestMetrics(
12421246

12431247
// deliveryMetricLabels represents a unique key for caching stats based on labels
12441248
type deliveryMetricLabels struct {
1245-
DestType string
1246-
EndpointPath string
1247-
StatusCode string
1248-
RequestMethod string
1249-
Module string
1250-
WorkspaceID string
1251-
DestinationID string
1249+
DestType string
1250+
TransformerProxy bool
1251+
EndpointPath string
1252+
StatusCode int
1253+
RequestMethod string
1254+
Module string
1255+
WorkspaceID string
1256+
DestinationID string
12521257
}
12531258

12541259
// ToStatTags converts deliveryMetricLabels to stats.Tags for StatsCacheKey interface
12551260
func (l deliveryMetricLabels) ToStatTags() stats.Tags {
12561261
return stats.Tags{
1257-
"destType": l.DestType,
1258-
"endpointPath": l.EndpointPath,
1259-
"statusCode": l.StatusCode,
1260-
"requestMethod": l.RequestMethod,
1261-
"module": l.Module,
1262-
"workspaceId": l.WorkspaceID,
1263-
"destinationId": l.DestinationID,
1262+
"destType": l.DestType,
1263+
"endpointPath": l.EndpointPath,
1264+
"transformerProxy": strconv.FormatBool(l.TransformerProxy),
1265+
"statusCode": strconv.Itoa(l.StatusCode),
1266+
"requestMethod": l.RequestMethod,
1267+
"module": l.Module,
1268+
"workspaceId": l.WorkspaceID,
1269+
"destinationId": l.DestinationID,
12641270
}
12651271
}

0 commit comments

Comments
 (0)