Skip to content

Commit a2e753d

Browse files
authored
chore: clean up legacy MTU logic (#6314)
# Description Starting this month, we are using the new MTU logic. This PR cleans up the legacy MTU logic. ## Linear Ticket Resolves [OBS-883](https://linear.app/rudderstack/issue/OBS-883/clean-up-old-mtu-logic) ## Security - [ ] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent af85b92 commit a2e753d

File tree

2 files changed

+12
-519
lines changed

2 files changed

+12
-519
lines changed

enterprise/trackedusers/users_reporter.go

Lines changed: 5 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,11 @@ type UsersReporter interface {
5858
}
5959

6060
type UniqueUsersReporter struct {
61-
log logger.Logger
62-
hllSettings *hll.Settings
63-
instanceID string
64-
now func() time.Time
65-
stats stats.Stats
66-
enabledV2Metrics config.ValueLoader[bool]
67-
shadowV2Metrics config.ValueLoader[bool]
61+
log logger.Logger
62+
hllSettings *hll.Settings
63+
instanceID string
64+
now func() time.Time
65+
stats stats.Stats
6866
}
6967

7068
func NewUniqueUsersReporter(log logger.Logger, conf *config.Config, stats stats.Stats) (*UniqueUsersReporter, error) {
@@ -81,8 +79,6 @@ func NewUniqueUsersReporter(log logger.Logger, conf *config.Config, stats stats.
8179
now: func() time.Time {
8280
return timeutil.Now()
8381
},
84-
enabledV2Metrics: config.GetReloadableBoolVar(false, "TrackedUsers.v2Metrics.enabled"),
85-
shadowV2Metrics: config.GetReloadableBoolVar(true, "TrackedUsers.v2Metrics.shadow"),
8682
}, nil
8783
}
8884

@@ -110,25 +106,6 @@ func (u *UniqueUsersReporter) MigrateDatabase(dbConn string, conf *config.Config
110106
}
111107

112108
func (u *UniqueUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourceIDtoFilter map[string]bool) []*UsersReport {
113-
// force new metrics from 1st September 2025 onwards
114-
v2MetricsActivationDate := time.Date(2025, 9, 1, 0, 0, 0, 0, time.UTC)
115-
if !u.now().UTC().Before(v2MetricsActivationDate) {
116-
return u.generateReportsFromJobs(jobs, sourceIDtoFilter, false)
117-
}
118-
119-
if !u.enabledV2Metrics.Load() {
120-
return u.generateReportsFromJobsLegacy(jobs, sourceIDtoFilter)
121-
}
122-
123-
shadowEnabled := u.shadowV2Metrics.Load()
124-
if shadowEnabled {
125-
return append(u.generateReportsFromJobsLegacy(jobs, sourceIDtoFilter), u.generateReportsFromJobs(jobs, sourceIDtoFilter, shadowEnabled)...)
126-
}
127-
128-
return u.generateReportsFromJobs(jobs, sourceIDtoFilter, shadowEnabled)
129-
}
130-
131-
func (u *UniqueUsersReporter) generateReportsFromJobs(jobs []*jobsdb.JobT, sourceIDtoFilter map[string]bool, shadowEnabled bool) []*UsersReport {
132109
if len(jobs) == 0 {
133110
return nil
134111
}
@@ -201,94 +178,6 @@ func (u *UniqueUsersReporter) generateReportsFromJobs(jobs []*jobsdb.JobT, sourc
201178
return nil
202179
}
203180

204-
reports := make([]*UsersReport, 0)
205-
for workspaceID, sourceUserMp := range workspaceSourceUserIdTypeMap {
206-
finalWorkspaceID := getNewID(workspaceID, shadowEnabled)
207-
reports = append(reports, lo.MapToSlice(sourceUserMp, func(sourceID string, userIdTypeMap map[string]*hll.Hll) *UsersReport {
208-
finalSourceID := getNewID(sourceID, shadowEnabled)
209-
return &UsersReport{
210-
WorkspaceID: finalWorkspaceID,
211-
SourceID: finalSourceID,
212-
UserIDHll: userIdTypeMap[idTypeUserID],
213-
AnonymousIDHll: userIdTypeMap[idTypeAnonymousID],
214-
IdentifiedAnonymousIDHll: userIdTypeMap[idTypeIdentifiedAnonymousID],
215-
}
216-
})...)
217-
}
218-
return reports
219-
}
220-
221-
func (u *UniqueUsersReporter) generateReportsFromJobsLegacy(jobs []*jobsdb.JobT, sourceIDtoFilter map[string]bool) []*UsersReport {
222-
if len(jobs) == 0 {
223-
return nil
224-
}
225-
workspaceSourceUserIdTypeMap := make(map[string]map[string]map[string]*hll.Hll)
226-
for _, job := range jobs {
227-
if job.WorkspaceId == "" {
228-
u.log.Warnn("workspace_id not found in job", logger.NewIntField("jobId", job.JobID))
229-
continue
230-
}
231-
232-
sourceID := gjson.GetBytes(job.Parameters, "source_id").String()
233-
if sourceID == "" {
234-
u.log.Warnn("source_id not found in job parameters", obskit.WorkspaceID(job.WorkspaceId),
235-
logger.NewIntField("jobId", job.JobID))
236-
continue
237-
}
238-
239-
if sourceIDtoFilter != nil && sourceIDtoFilter[sourceID] {
240-
u.log.Debugn("source to filter", obskit.SourceID(sourceID))
241-
continue
242-
}
243-
userID := gjson.GetBytes(job.EventPayload, "batch.0.userId").String()
244-
anonymousID := gjson.GetBytes(job.EventPayload, "batch.0.anonymousId").String()
245-
eventType := gjson.GetBytes(job.EventPayload, "batch.0.type").String()
246-
if userID == "" && anonymousID == "" {
247-
u.log.Warnn("both userID and anonymousID not found in job event payload", obskit.WorkspaceID(job.WorkspaceId),
248-
logger.NewIntField("jobId", job.JobID))
249-
continue
250-
}
251-
252-
if workspaceSourceUserIdTypeMap[job.WorkspaceId] == nil {
253-
workspaceSourceUserIdTypeMap[job.WorkspaceId] = make(map[string]map[string]*hll.Hll)
254-
}
255-
256-
if userID != "" {
257-
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], userID, idTypeUserID)
258-
}
259-
260-
if anonymousID != "" && userID != anonymousID {
261-
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], anonymousID, idTypeUserID)
262-
}
263-
264-
if userID != "" && anonymousID != "" && userID != anonymousID {
265-
combinedUserIDAnonymousID := combineUserIDAnonymousID(userID, anonymousID)
266-
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], combinedUserIDAnonymousID, idTypeIdentifiedAnonymousID)
267-
}
268-
269-
// for alias event we will be adding previousId to identifiedAnonymousID hll,
270-
// so for calculating unique users we do not double count the user
271-
// e.g. we receive events
272-
// {type:track, anonymousID: anon1}
273-
// {type:track, userID: user1}
274-
// {type:track, userID: user2}
275-
// {type:identify, userID: user1, anonymousID: anon1}
276-
// {type:alias, previousId: user2, userID: user1}
277-
// userHLL: {user1, user2}, anonHLL: {anon1}, identifiedAnonHLL: {user1-anon1, user2}
278-
// cardinality: len(userHLL)+len(anonHLL)-len(identifiedAnonHLL): 2+1-2 = 1
279-
if eventType == eventTypeAlias {
280-
previousID := gjson.GetBytes(job.EventPayload, "batch.0.previousId").String()
281-
if previousID != "" && previousID != userID && previousID != anonymousID {
282-
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], previousID, idTypeIdentifiedAnonymousID)
283-
}
284-
}
285-
}
286-
287-
if len(workspaceSourceUserIdTypeMap) == 0 {
288-
u.log.Warnn("no data to collect", obskit.WorkspaceID(jobs[0].WorkspaceId))
289-
return nil
290-
}
291-
292181
reports := make([]*UsersReport, 0)
293182
for workspaceID, sourceUserMp := range workspaceSourceUserIdTypeMap {
294183
reports = append(reports, lo.MapToSlice(sourceUserMp, func(sourceID string, userIdTypeMap map[string]*hll.Hll) *UsersReport {
@@ -375,10 +264,6 @@ func (u *UniqueUsersReporter) hllToString(hllStruct *hll.Hll) (string, error) {
375264
return hex.EncodeToString(hllStruct.ToBytes()), nil
376265
}
377266

378-
func combineUserIDAnonymousID(userID, anonymousID string) string {
379-
return userID + ":" + anonymousID
380-
}
381-
382267
func (u *UniqueUsersReporter) recordIdentifier(idTypeHllMap map[string]*hll.Hll, identifier, identifierType string) map[string]*hll.Hll {
383268
if idTypeHllMap == nil {
384269
idTypeHllMap = make(map[string]*hll.Hll)
@@ -417,10 +302,3 @@ func (u *UniqueUsersReporter) recordHllSizeStats(report *UsersReport) {
417302
}).Observe(float64(len(report.IdentifiedAnonymousIDHll.ToBytes())))
418303
}
419304
}
420-
421-
func getNewID(id string, shadowEnabled bool) string {
422-
if shadowEnabled {
423-
return id + "-shadow"
424-
}
425-
return id
426-
}

0 commit comments

Comments
 (0)