Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 11 additions & 94 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
package reporting

import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
"io"
"net/http"
"sort"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/lib/pq"
"github.com/samber/lo"
"go.uber.org/atomic"
Expand All @@ -25,12 +22,10 @@ import (
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/collectors"

"github.com/rudderlabs/rudder-go-kit/jsonrs"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/enterprise/reporting/client"
"github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/httputil"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -87,27 +82,22 @@ func NewErrorReportingStats(statsInstance stats.Stats) *ErrorReportingStats {
}

type ErrorDetailReporter struct {
ctx context.Context
cancel context.CancelFunc
g *errgroup.Group
configSubscriber *configSubscriber
reportingServiceURL string
syncersMu sync.RWMutex
syncers map[string]*types.SyncSource
log logger.Logger
namespace string
ctx context.Context
cancel context.CancelFunc
g *errgroup.Group
configSubscriber *configSubscriber
syncersMu sync.RWMutex
syncers map[string]*types.SyncSource
log logger.Logger
namespace string

instanceID string
region string
sleepInterval config.ValueLoader[time.Duration]
mainLoopSleepInterval config.ValueLoader[time.Duration]
maxConcurrentRequests config.ValueLoader[int]
maxOpenConnections int
vacuumFull config.ValueLoader[bool]

// DEPRECATED: Remove this after migration to commonClient, use edr.commonClient.Send instead.
httpClient *http.Client

errorDetailExtractor *ExtractorHandle
errorNormalizer ErrorNormalizer

Expand All @@ -117,7 +107,6 @@ type ErrorDetailReporter struct {
// Tagged stats (created dynamically with tags)
minReportedAtQueryTime stats.Measurement
errorDetailReportsQueryTime stats.Measurement
edReportingRequestLatency stats.Measurement
eventSamplingEnabled config.ValueLoader[bool]
eventSamplingDuration config.ValueLoader[time.Duration]
eventSampler event_sampler.EventSampler
Expand All @@ -126,8 +115,7 @@ type ErrorDetailReporter struct {
stats stats.Stats
config *config.Config

useCommonClient config.ValueLoader[bool]
commonClient *client.Client
commonClient *client.Client
}

func NewErrorDetailReporter(
Expand All @@ -136,13 +124,6 @@ func NewErrorDetailReporter(
statsInstance stats.Stats,
conf *config.Config,
) *ErrorDetailReporter {
// DEPRECATED: Remove this after migration to commonClient, use edr.commonClient.Send instead.
tr := &http.Transport{}
netClient := &http.Client{Transport: tr, Timeout: conf.GetDuration("HttpClient.reporting.timeout", 60, time.Second)}
reportingServiceURL := conf.GetString("REPORTING_URL", "https://reporting.dev.rudderlabs.com")
reportingServiceURL = strings.TrimSuffix(reportingServiceURL, "/")
useCommonClient := conf.GetReloadableBoolVar(false, "Reporting.useCommonClient")

mainLoopSleepInterval := conf.GetReloadableDurationVar(5, time.Second, "Reporting.mainLoopSleepInterval")
sleepInterval := conf.GetReloadableDurationVar(30, time.Second, "Reporting.sleepInterval")
maxConcurrentRequests := conf.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests")
Expand Down Expand Up @@ -174,13 +155,11 @@ func NewErrorDetailReporter(
ctx: ctx,
cancel: cancel,
g: g,
reportingServiceURL: reportingServiceURL,
log: log,
sleepInterval: sleepInterval,
mainLoopSleepInterval: mainLoopSleepInterval,
maxConcurrentRequests: maxConcurrentRequests,
vacuumFull: conf.GetReloadableBoolVar(true, "Reporting.errorReporting.vacuumFull", "Reporting.vacuumFull"),
httpClient: netClient,

eventSamplingEnabled: eventSamplingEnabled,
eventSamplingDuration: eventSamplingDuration,
Expand All @@ -192,7 +171,6 @@ func NewErrorDetailReporter(
// Initialize stats manager
statsManager: statsManager,
instanceID: conf.GetString("INSTANCE_ID", "1"),
region: conf.GetString("region", ""),

configSubscriber: configSubscriber,
syncers: make(map[string]*types.SyncSource),
Expand All @@ -202,8 +180,7 @@ func NewErrorDetailReporter(
stats: statsInstance,
config: conf,

useCommonClient: useCommonClient,
commonClient: client.New(client.RouteRecordErrors, conf, log, statsInstance),
commonClient: client.New(client.RouteRecordErrors, conf, log, statsInstance),
}
}

Expand Down Expand Up @@ -467,7 +444,6 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf

edr.minReportedAtQueryTime = edr.stats.NewTaggedStat("error_detail_reports_min_reported_at_query_time", stats.TimerType, tags)
edr.errorDetailReportsQueryTime = edr.stats.NewTaggedStat("error_detail_reports_query_time", stats.TimerType, tags)
edr.edReportingRequestLatency = edr.stats.NewTaggedStat("error_detail_reporting_request_latency", stats.TimerType, tags)

var lastReportedAtTime atomic.Time
lastReportedAtTime.Store(time.Now())
Expand Down Expand Up @@ -558,7 +534,7 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf
break
}
errGroup.Go(func() error {
err := edr.sendMetric(errCtx, c.Label, metricToSend)
err := edr.commonClient.Send(errCtx, metricToSend)
if err != nil {
edr.log.Errorn("Error while sending to Reporting service", obskit.Error(err))
}
Expand Down Expand Up @@ -839,65 +815,6 @@ func (edr *ErrorDetailReporter) aggregate(reports []*types.EDReportsDB) []*types
return edrortingMetrics
}

// DEPRECATED: Remove this after migration to commonClient, use edr.commonClient.Send instead.
func (edr *ErrorDetailReporter) sendMetric(ctx context.Context, _ string, metric *types.EDMetric) error {
if edr.useCommonClient.Load() {
return edr.commonClient.Send(ctx, metric)
}

payload, err := jsonrs.Marshal(metric)
if err != nil {
return fmt.Errorf("marshal failure: %w", err)
}
operation := func() error {
uri := fmt.Sprintf("%s/recordErrors", edr.reportingServiceURL)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri, bytes.NewBuffer(payload))
if err != nil {
return err
}
if edr.region != "" {
q := req.URL.Query()
q.Add("region", edr.region)
req.URL.RawQuery = q.Encode()
}
req.Header.Set("Content-Type", "application/json; charset=utf-8")
httpRequestStart := time.Now()
resp, err := edr.httpClient.Do(req)
if err != nil {
edr.log.Errorn("Sending request failed", obskit.Error(err))
return err
}

edr.edReportingRequestLatency.Since(httpRequestStart)
edr.statsManager.HttpRequest.Increment()

defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
edr.log.Debugn("[ErrorDetailReporting]Response from ReportingAPI", logger.NewStringField("response", string(respBody)))
if err != nil {
edr.log.Errorn("Reading response failed", obskit.Error(err))
return err
}

if !isMetricPosted(resp.StatusCode) {
err = fmt.Errorf(`received response: statusCode: %d error: %v`, resp.StatusCode, string(respBody))
edr.log.Errorn("received response",
logger.NewIntField("statusCode", int64(resp.StatusCode)),
obskit.Error(errors.New(string(respBody))))
}
return err
}

b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
err = backoff.RetryNotify(operation, b, func(err error, t time.Duration) {
edr.log.Errorn("[ Error Detail Reporting ]: Error reporting to service", obskit.Error(err))
})
if err != nil {
edr.log.Errorn("[ Error Detail Reporting ]: Error making request to reporting service", obskit.Error(err))
}
return err
}

func (edr *ErrorDetailReporter) Stop() {
edr.cancel()
_ = edr.g.Wait()
Expand Down
13 changes: 1 addition & 12 deletions enterprise/reporting/flusher/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"database/sql"
"errors"
"fmt"
"net/url"
"path"
"slices"

"github.com/rudderlabs/rudder-go-kit/config"
Expand Down Expand Up @@ -47,17 +45,8 @@ func CreateRunner(ctx context.Context, table string, log logger.Logger, stats st

commonClient := client.New(client.RouteTrackedUsers, conf, log, stats)

// DEPRECATED: Remove this after migration to commonClient.
reportingBaseURL := config.GetString("REPORTING_URL", "https://reporting.rudderstack.com/")
parsedURL, err := url.Parse(reportingBaseURL)
if err != nil {
return nil, fmt.Errorf("error parsing reporting url %w", err)
}
parsedURL.Path = path.Join(parsedURL.Path, "trackedUser")
reportingURL := parsedURL.String()

a := aggregator.NewTrackedUsersInAppAggregator(db, stats, conf, module)
f, err := NewFlusher(db, log, stats, conf, table, reportingURL, commonClient, a, module)
f, err := NewFlusher(db, log, stats, conf, table, commonClient, a, module)
if err != nil {
return nil, fmt.Errorf("error creating flusher %w", err)
}
Expand Down
Loading
Loading