Skip to content

Commit dde391e

Browse files
committed
feat: router jobsdb query batch size should match throttling limit
1 parent cf04743 commit dde391e

20 files changed

+689
-92
lines changed

router/handle.go

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"github.com/rudderlabs/rudder-server/router/internal/jobiterator"
3030
"github.com/rudderlabs/rudder-server/router/internal/partition"
3131
"github.com/rudderlabs/rudder-server/router/isolation"
32-
rtThrottler "github.com/rudderlabs/rudder-server/router/throttler"
32+
"github.com/rudderlabs/rudder-server/router/throttler"
3333
"github.com/rudderlabs/rudder-server/router/transformer"
3434
"github.com/rudderlabs/rudder-server/router/types"
3535
routerutils "github.com/rudderlabs/rudder-server/router/utils"
@@ -50,7 +50,7 @@ const module = "router"
5050
type Handle struct {
5151
// external dependencies
5252
jobsDB jobsdb.JobsDB
53-
throttlerFactory rtThrottler.Factory
53+
throttlerFactory throttler.Factory
5454
backendConfig backendconfig.BackendConfig
5555
Reporting reporter
5656
transientSources transientsource.Service
@@ -151,7 +151,7 @@ func (rt *Handle) activePartitions(ctx context.Context) []string {
151151

152152
// pickup picks up jobs from the jobsDB for the provided partition and returns the number of jobs picked up and whether the limits were reached or not
153153
// picked up jobs are distributed to the workers
154-
func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worker) (pickupCount int, limitsReached bool) {
154+
func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worker, pickupBatchSizeGauge stats.Gauge) (pickupCount int, limitsReached bool) {
155155
// pickup limiter with dynamic priority
156156
start := time.Now()
157157
var discardedCount int
@@ -181,8 +181,21 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
181181
"Router."+rt.destType+".jobIterator.discardedPercentageTolerance",
182182
"Router.jobIterator.discardedPercentageTolerance")
183183

184+
jobQueryBatchSize := rt.reloadableConfig.jobQueryBatchSize.Load()
185+
if rt.isolationStrategy.SupportsPickupQueryThrottling() {
186+
jobQueryBatchSize = rt.getAdaptedJobQueryBatchSize(
187+
jobQueryBatchSize,
188+
func() []throttler.PickupThrottler {
189+
return rt.throttlerFactory.GetActivePickupThrottlers(partition)
190+
},
191+
rt.reloadableConfig.readSleep.Load(),
192+
rt.reloadableConfig.maxJobQueryBatchSize.Load(),
193+
)
194+
}
195+
pickupBatchSizeGauge.Gauge(jobQueryBatchSize)
196+
184197
iterator := jobiterator.New(
185-
rt.getQueryParams(partition, rt.reloadableConfig.jobQueryBatchSize.Load()),
198+
rt.getQueryParams(partition, jobQueryBatchSize),
186199
rt.getJobsFn(ctx),
187200
jobiterator.WithDiscardedPercentageTolerance(jobIteratorDiscardedPercentageTolerance),
188201
jobiterator.WithMaxQueries(jobIteratorMaxQueries),
@@ -333,6 +346,38 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke
333346
return pickupCount, limitsReached
334347
}
335348

349+
// getAdaptedJobQueryBatchSize returns the adapted job query batch size based on the throttling limits
350+
func (*Handle) getAdaptedJobQueryBatchSize(input int, pickupThrottlers func() []throttler.PickupThrottler, readSleep time.Duration, maxLimit int) int {
351+
jobQueryBatchSize := input
352+
// Calculate the total limit of all active throttlers:
353+
//
354+
// - if there is a global throttler, use its limit
355+
// - if there are event type specific throttlers, use the sum of their limits
356+
totalLimit := lo.Reduce(pickupThrottlers(), func(totalLimit int, throttler throttler.PickupThrottler, index int) int {
357+
if index == 0 {
358+
return int(throttler.GetLimitPerSecond())
359+
}
360+
if throttler.GetEventType() == "all" {
361+
// global throttler, total limit is the limit of the first throttler
362+
return totalLimit
363+
}
364+
// throttler per event type, total limit is the sum of all throttler per event type limits
365+
return totalLimit + int(throttler.GetLimitPerSecond())
366+
}, 0)
367+
// If throttling is enabled then we need to adapt job query batch size:
368+
//
369+
// - we assume that we will read for more than readSleep seconds (min 1 second)
370+
// - we will be setting the batch size to be min(totalLimit * readSleepSeconds, maxLimit)
371+
if totalLimit > 0 {
372+
readSleepSeconds := int((readSleep + time.Second - 1) / time.Second) // rounding up to the nearest second
373+
jobQueryBatchSize = totalLimit * readSleepSeconds
374+
if jobQueryBatchSize > maxLimit {
375+
return maxLimit
376+
}
377+
}
378+
return jobQueryBatchSize
379+
}
380+
336381
func (rt *Handle) stopIteration(err error, destinationID string) bool {
337382
// if the context is cancelled, we can stop iteration
338383
if errors.Is(err, types.ErrContextCancelled) {

router/handle_lifecycle.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ func (rt *Handle) setupReloadableVars() {
327327
rt.reloadableConfig.skipRtAbortAlertForTransformation = config.GetReloadableBoolVar(false, getRouterConfigKeys("skipRtAbortAlertForTf", rt.destType)...)
328328
rt.reloadableConfig.skipRtAbortAlertForDelivery = config.GetReloadableBoolVar(false, getRouterConfigKeys("skipRtAbortAlertForDelivery", rt.destType)...)
329329
rt.reloadableConfig.jobQueryBatchSize = config.GetReloadableIntVar(10000, 1, getRouterConfigKeys("jobQueryBatchSize", rt.destType)...)
330+
rt.reloadableConfig.maxJobQueryBatchSize = config.GetReloadableIntVar(10000, 1, getRouterConfigKeys("maxJobQueryBatchSize", rt.destType)...)
330331
rt.reloadableConfig.updateStatusBatchSize = config.GetReloadableIntVar(1000, 1, getRouterConfigKeys("updateStatusBatchSize", rt.destType)...)
331332
rt.reloadableConfig.readSleep = config.GetReloadableDurationVar(1000, time.Millisecond, getRouterConfigKeys("readSleep", rt.destType)...)
332333
rt.reloadableConfig.jobsBatchTimeout = config.GetReloadableDurationVar(5, time.Second, getRouterConfigKeys("jobsBatchTimeout", rt.destType)...)

router/handle_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package router
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/rudderlabs/rudder-server/router/throttler"
11+
)
12+
13+
// MockPickupThrottler implements the PickupThrottler interface for testing
14+
type MockPickupThrottler struct {
15+
limitPerSecond int64
16+
eventType string
17+
}
18+
19+
func (m *MockPickupThrottler) CheckLimitReached(ctx context.Context, cost int64) (bool, error) {
20+
return false, nil
21+
}
22+
23+
func (m *MockPickupThrottler) ResponseCodeReceived(code int) {}
24+
25+
func (m *MockPickupThrottler) Shutdown() {}
26+
27+
func (m *MockPickupThrottler) GetLimitPerSecond() int64 {
28+
return m.limitPerSecond
29+
}
30+
31+
func (m *MockPickupThrottler) GetEventType() string {
32+
return m.eventType
33+
}
34+
35+
func TestHandle_getAdaptedJobQueryBatchSize(t *testing.T) {
36+
h := &Handle{}
37+
38+
t.Run("no throttlers available should return original batch size", func(t *testing.T) {
39+
input := 100
40+
pickupThrottlers := func() []throttler.PickupThrottler {
41+
return []throttler.PickupThrottler{}
42+
}
43+
readSleep := 5 * time.Second
44+
maxLimit := 1000
45+
46+
result := h.getAdaptedJobQueryBatchSize(input, pickupThrottlers, readSleep, maxLimit)
47+
48+
require.Equal(t, input, result)
49+
})
50+
51+
t.Run("multiple throttlers of eventType all should use first throttler limit", func(t *testing.T) {
52+
input := 100
53+
pickupThrottlers := func() []throttler.PickupThrottler {
54+
return []throttler.PickupThrottler{
55+
&MockPickupThrottler{limitPerSecond: 50, eventType: "all"},
56+
&MockPickupThrottler{limitPerSecond: 30, eventType: "all"},
57+
}
58+
}
59+
readSleep := 1 * time.Second
60+
maxLimit := 1000
61+
62+
result := h.getAdaptedJobQueryBatchSize(input, pickupThrottlers, readSleep, maxLimit)
63+
64+
// Should use the first throttler's limit (50) since subsequent global throttlers are ignored
65+
require.Equal(t, 50, result)
66+
})
67+
68+
t.Run("multiple throttlers of eventType all returning zero as limit should return original batch size", func(t *testing.T) {
69+
input := 100
70+
pickupThrottlers := func() []throttler.PickupThrottler {
71+
return []throttler.PickupThrottler{
72+
&MockPickupThrottler{limitPerSecond: 0, eventType: "all"},
73+
&MockPickupThrottler{limitPerSecond: 0, eventType: "all"},
74+
}
75+
}
76+
readSleep := 1 * time.Second
77+
maxLimit := 1000
78+
79+
result := h.getAdaptedJobQueryBatchSize(input, pickupThrottlers, readSleep, maxLimit)
80+
81+
require.Equal(t, input, result)
82+
})
83+
84+
t.Run("multiple throttlers of different eventTypes should return the sum", func(t *testing.T) {
85+
input := 100
86+
pickupThrottlers := func() []throttler.PickupThrottler {
87+
return []throttler.PickupThrottler{
88+
&MockPickupThrottler{limitPerSecond: 20, eventType: "track"},
89+
&MockPickupThrottler{limitPerSecond: 30, eventType: "identify"},
90+
&MockPickupThrottler{limitPerSecond: 25, eventType: "page"},
91+
}
92+
}
93+
readSleep := 1 * time.Second
94+
maxLimit := 1000
95+
96+
result := h.getAdaptedJobQueryBatchSize(input, pickupThrottlers, readSleep, maxLimit)
97+
98+
// Should sum all limits: 20 + 30 + 25 = 75
99+
require.Equal(t, 75, result)
100+
})
101+
102+
t.Run("multiple throttlers of different eventTypes whose sum is greater than max limit should return max limit", func(t *testing.T) {
103+
input := 100
104+
pickupThrottlers := func() []throttler.PickupThrottler {
105+
return []throttler.PickupThrottler{
106+
&MockPickupThrottler{limitPerSecond: 400, eventType: "track"},
107+
&MockPickupThrottler{limitPerSecond: 300, eventType: "identify"},
108+
&MockPickupThrottler{limitPerSecond: 500, eventType: "page"},
109+
}
110+
}
111+
readSleep := 1 * time.Second
112+
maxLimit := 800
113+
114+
result := h.getAdaptedJobQueryBatchSize(input, pickupThrottlers, readSleep, maxLimit)
115+
116+
// Sum would be 1200, but should be capped at maxLimit
117+
require.Equal(t, maxLimit, result)
118+
})
119+
120+
t.Run("small readSleep less than a second should use 1 sec min", func(t *testing.T) {
121+
input := 100
122+
pickupThrottlers := func() []throttler.PickupThrottler {
123+
return []throttler.PickupThrottler{
124+
&MockPickupThrottler{limitPerSecond: 50, eventType: "track"},
125+
}
126+
}
127+
readSleep := 300 * time.Millisecond // Less than 1 second
128+
maxLimit := 1000
129+
130+
result := h.getAdaptedJobQueryBatchSize(input, pickupThrottlers, readSleep, maxLimit)
131+
132+
// Should use 1 second minimum: 50 * 1 = 50
133+
require.Equal(t, 50, result)
134+
})
135+
136+
t.Run("readSleep 2s should adapt batch size to 2 seconds", func(t *testing.T) {
137+
input := 100
138+
pickupThrottlers := func() []throttler.PickupThrottler {
139+
return []throttler.PickupThrottler{
140+
&MockPickupThrottler{limitPerSecond: 30, eventType: "track"},
141+
}
142+
}
143+
readSleep := 2 * time.Second
144+
maxLimit := 1000
145+
146+
result := h.getAdaptedJobQueryBatchSize(input, pickupThrottlers, readSleep, maxLimit)
147+
148+
// Should use 2 seconds: 30 * 2 = 60
149+
require.Equal(t, 60, result)
150+
})
151+
152+
t.Run("readSleep with fractional seconds should round up", func(t *testing.T) {
153+
input := 100
154+
pickupThrottlers := func() []throttler.PickupThrottler {
155+
return []throttler.PickupThrottler{
156+
&MockPickupThrottler{limitPerSecond: 40, eventType: "track"},
157+
}
158+
}
159+
readSleep := 1500 * time.Millisecond // 1.5 seconds
160+
maxLimit := 1000
161+
162+
result := h.getAdaptedJobQueryBatchSize(input, pickupThrottlers, readSleep, maxLimit)
163+
164+
// Should round up to 2 seconds: 40 * 2 = 80
165+
require.Equal(t, 80, result)
166+
})
167+
}

router/isolation/isolation.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ func GetStrategy(mode Mode, destType string, partitionFilter func(destinationID
3030
return workspaceStrategy{customVal: destType}, nil
3131
case ModeDestination:
3232
return &destinationStrategy{
33-
config: c,
34-
destinationFilter: partitionFilter,
35-
destType: destType,
36-
throttlerPerEventTypeConfig: make(map[string]config.ValueLoader[bool]),
33+
config: c,
34+
pickupQueryThrottlingEnabled: c.GetReloadableBoolVar(false, "Router."+destType+".pickupQueryThrottlingEnabled", "Router.pickupQueryThrottlingEnabled"),
35+
destinationFilter: partitionFilter,
36+
destType: destType,
37+
throttlerPerEventTypeConfig: make(map[string]config.ValueLoader[bool]),
3738
}, nil
3839
default:
3940
return noneStrategy{}, errors.New("unsupported isolation mode")
@@ -50,6 +51,8 @@ type Strategy interface {
5051
StopIteration(err error, destinationID string) bool
5152
// StopQueries returns true if the iterator should stop fetching more jobs from jobsDB
5253
StopQueries(err error, destinationID string) bool
54+
// SupportsPickupQueryThrottling returns true if the strategy supports pickup query throttling, i.e., if it can throttle queries to jobsDB based on the throttling limits set at destination level
55+
SupportsPickupQueryThrottling() bool
5356
}
5457

5558
// noneStrategy implements isolation at no level
@@ -71,6 +74,10 @@ func (noneStrategy) StopQueries(_ error, _ string) bool {
7174
return false
7275
}
7376

77+
func (noneStrategy) SupportsPickupQueryThrottling() bool {
78+
return false
79+
}
80+
7481
// workspaceStrategy implements isolation at workspace level
7582
type workspaceStrategy struct {
7683
customVal string
@@ -93,9 +100,14 @@ func (workspaceStrategy) StopQueries(_ error, _ string) bool {
93100
return false
94101
}
95102

103+
func (workspaceStrategy) SupportsPickupQueryThrottling() bool {
104+
return false
105+
}
106+
96107
// destinationStrategy implements isolation at destination level
97108
type destinationStrategy struct {
98109
config *config.Config
110+
pickupQueryThrottlingEnabled config.ValueLoader[bool]
99111
destinationFilter func(destinationID string) bool
100112
destType string
101113
throttlerPerEventTypeConfigMu sync.RWMutex
@@ -128,6 +140,10 @@ func (ds *destinationStrategy) StopQueries(err error, destinationID string) bool
128140
return errors.Is(err, types.ErrDestinationThrottled) && ds.hasDestinationThrottlerPerEventType(destinationID)
129141
}
130142

143+
func (ds *destinationStrategy) SupportsPickupQueryThrottling() bool {
144+
return ds.pickupQueryThrottlingEnabled.Load()
145+
}
146+
131147
func (ds *destinationStrategy) hasDestinationThrottlerPerEventType(destinationID string) bool {
132148
ds.throttlerPerEventTypeConfigMu.RLock()
133149
throttlerPerEventTypeConfig, ok := ds.throttlerPerEventTypeConfig[destinationID]

router/isolation/isolation_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ func TestIsolationStrategy(t *testing.T) {
3838
require.False(t, strategy.StopQueries(types.ErrBarrierExists, destinationID))
3939
require.False(t, strategy.StopQueries(types.ErrDestinationThrottled, destinationID))
4040
})
41+
t.Run("supports pickup query throttling", func(t *testing.T) {
42+
require.False(t, strategy.SupportsPickupQueryThrottling())
43+
})
4144
})
4245
t.Run("workspace", func(r *testing.T) {
4346
strategy, err := isolation.GetStrategy(isolation.ModeWorkspace, "", func(_ string) bool { return true }, c)
@@ -60,6 +63,10 @@ func TestIsolationStrategy(t *testing.T) {
6063
require.False(t, strategy.StopQueries(types.ErrBarrierExists, destinationID))
6164
require.False(t, strategy.StopQueries(types.ErrDestinationThrottled, destinationID))
6265
})
66+
67+
t.Run("supports pickup query throttling", func(t *testing.T) {
68+
require.False(t, strategy.SupportsPickupQueryThrottling())
69+
})
6370
})
6471
t.Run("destination", func(r *testing.T) {
6572
strategy, err := isolation.GetStrategy(isolation.ModeDestination, "WEBHOOK", func(_ string) bool { return true }, c)
@@ -103,5 +110,15 @@ func TestIsolationStrategy(t *testing.T) {
103110
require.True(t, strategy.StopQueries(types.ErrDestinationThrottled, anotherDestID))
104111
})
105112
})
113+
114+
t.Run("supports pickup query throttling when enabled", func(t *testing.T) {
115+
c.Set("Router.WEBHOOK.pickupQueryThrottlingEnabled", true)
116+
require.True(t, strategy.SupportsPickupQueryThrottling())
117+
})
118+
119+
t.Run("doesn't support pickup query throttling when not enabled", func(t *testing.T) {
120+
c.Set("Router.WEBHOOK.pickupQueryThrottlingEnabled", false)
121+
require.False(t, strategy.SupportsPickupQueryThrottling())
122+
})
106123
})
107124
}

router/partition_worker.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ type partitionWorker struct {
7777
// Work picks up jobs for the partitioned worker and returns whether it worked or not
7878
func (pw *partitionWorker) Work() bool {
7979
start := time.Now()
80-
pw.pickupCount, pw.limitsReached = pw.rt.pickup(pw.ctx, pw.partition, pw.workers)
80+
var pickupBatchSizeGauge stats.Gauge = stats.Default.NewTaggedStat("router_pickup_batch_size_gauge", stats.GaugeType, stats.Tags{"destType": pw.rt.destType, "partition": pw.partition})
81+
pw.pickupCount, pw.limitsReached = pw.rt.pickup(pw.ctx, pw.partition, pw.workers, pickupBatchSizeGauge)
8182
// the following stats are used to track the total time taken for the pickup process and the number of jobs picked up
8283
stats.Default.NewTaggedStat("router_generator_loop", stats.TimerType, stats.Tags{"destType": pw.rt.destType}).Since(start)
8384
stats.Default.NewTaggedStat("router_generator_events", stats.CountType, stats.Tags{"destType": pw.rt.destType, "partition": pw.partition}).Count(pw.pickupCount)

router/router_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ func (m *mockThrottlerFactory) GetPickupThrottler(destName, destID, eventType st
179179
return throttler.NewNoOpThrottlerFactory().GetPickupThrottler(destName, destID, eventType)
180180
}
181181

182+
func (m *mockThrottlerFactory) GetActivePickupThrottlers(destinationID string) []throttler.PickupThrottler {
183+
return nil
184+
}
185+
182186
func (m *mockThrottlerFactory) GetDeliveryThrottler(destName, destID, endpointPath string) throttler.DeliveryThrottler {
183187
m.count.Add(1)
184188
return throttler.NewNoOpThrottlerFactory().GetDeliveryThrottler(destName, destID, endpointPath)

0 commit comments

Comments
 (0)