Skip to content

Commit af85b92

Browse files
authored
chore(router): ignore throttling costs in pickup throttler (#6316)
# Description Ignoring throttling costs by default. We are planning to remove support for throttling costs in a future release ## Linear Ticket resolves PIPE-2365 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent edb02d3 commit af85b92

9 files changed

+28
-28
lines changed

router/router_throttling_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,8 @@ func Test_RouterThrottling(t *testing.T) {
238238
}
239239
}
240240

241-
verifyBucket(webhook1.buckets, noOfEvents, 20, 2)
242-
verifyBucket(webhook2.buckets, noOfEvents, 50, 2)
241+
verifyBucket(webhook1.buckets, noOfEvents, 20, 1)
242+
verifyBucket(webhook2.buckets, noOfEvents, 50, 1)
243243
}
244244

245245
func requireLengthInRange(t *testing.T, x interface{}, min, max int) {

router/throttler/internal/pickup/adaptive/adaptive_all_event_types.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
// NewAllEventTypesThrottler constructs a new adaptive throttler for all event types of a destination
13-
func NewAllEventTypesThrottler(destType, destinationID string, algorithm Algorithm, limiter Limiter, config *config.Config, stat stats.Stats, log Logger) *throttler {
13+
func NewAllEventTypesThrottler(destType, destinationID string, algorithm Algorithm, limiter Limiter, c *config.Config, stat stats.Stats, log Logger) *throttler {
1414
return &throttler{
1515
destinationID: destinationID,
1616
eventType: "all",
@@ -20,20 +20,20 @@ func NewAllEventTypesThrottler(destType, destinationID string, algorithm Algorit
2020
algorithm: algorithm,
2121
log: log,
2222

23-
window: GetAllEventsWindowConfig(config, destType, destinationID),
24-
minLimit: config.GetReloadableInt64Var(1, 1,
23+
window: GetAllEventsWindowConfig(c, destType, destinationID),
24+
minLimit: c.GetReloadableInt64Var(1, 1,
2525
fmt.Sprintf(`Router.throttler.adaptive.%s.%s.minLimit`, destType, destinationID),
2626
fmt.Sprintf(`Router.throttler.adaptive.%s.minLimit`, destType),
2727
`Router.throttler.adaptive.minLimit`,
2828
),
29-
maxLimit: maxLimitFunc(config, destType, destinationID,
29+
maxLimit: maxLimitFunc(c, destType, destinationID,
3030
[]string{
3131
fmt.Sprintf(`Router.throttler.adaptive.%s.%s.maxLimit`, destType, destinationID),
3232
fmt.Sprintf(`Router.throttler.adaptive.%s.maxLimit`, destType),
3333
`Router.throttler.adaptive.maxLimit`,
3434
},
3535
),
36-
staticCost: false,
36+
staticCost: c.GetReloadableBoolVar(true, `Router.throttler.adaptive.ignoreThrottlingCosts`, `Router.throttler.ignoreThrottlingCosts`),
3737

3838
everyGauge: kitsync.NewOnceEvery(time.Second),
3939
limitFactorGauge: stat.NewTaggedStat("adaptive_throttler_limit_factor", stats.GaugeType, stats.Tags{

router/throttler/internal/pickup/adaptive/adaptive_all_event_types_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,12 @@ func TestAdaptiveAllEventTypesThrottler(t *testing.T) {
9494

9595
throttler := NewAllEventTypesThrottler(destType, destinationID, mockAlgorithm, mockLimiter, config, statsStore, logger.NOP)
9696

97-
limited, err := throttler.CheckLimitReached(context.Background(), 5)
97+
limited, err := throttler.CheckLimitReached(context.Background(), 1)
9898

9999
require.NoError(t, err)
100100
require.False(t, limited)
101101
require.Len(t, mockLimiter.CallLog, 1)
102-
require.Equal(t, int64(5), mockLimiter.CallLog[0].Cost)
102+
require.Equal(t, int64(1), mockLimiter.CallLog[0].Cost)
103103
require.Equal(t, int64(80), mockLimiter.CallLog[0].Rate) // 100 * 0.8 = 80
104104
require.Equal(t, int64(10), mockLimiter.CallLog[0].Window)
105105
require.Equal(t, destinationID, mockLimiter.CallLog[0].Key)

router/throttler/internal/pickup/adaptive/adaptive_per_event_type.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
// NewPerEventTypeThrottler constructs a new adaptive throttler for a specific event type of a destination
1313
func NewPerEventTypeThrottler(destType, destinationID, eventType string,
1414
algorithm Algorithm,
15-
limiter Limiter, config *config.Config, stat stats.Stats, log Logger,
15+
limiter Limiter, c *config.Config, stat stats.Stats, log Logger,
1616
) *throttler {
1717
return &throttler{
1818
destinationID: destinationID,
@@ -23,15 +23,15 @@ func NewPerEventTypeThrottler(destType, destinationID, eventType string,
2323
algorithm: algorithm,
2424
log: log,
2525

26-
window: GetPerEventWindowConfig(config, destType, destinationID, eventType),
27-
minLimit: config.GetReloadableInt64Var(1, 1,
26+
window: GetPerEventWindowConfig(c, destType, destinationID, eventType),
27+
minLimit: c.GetReloadableInt64Var(1, 1,
2828
fmt.Sprintf(`Router.throttler.adaptive.%s.%s.%s.minLimit`, destType, destinationID, eventType),
2929
fmt.Sprintf(`Router.throttler.adaptive.%s.%s.minLimit`, destType, destinationID),
3030
fmt.Sprintf(`Router.throttler.adaptive.%s.%s.minLimit`, destType, eventType),
3131
fmt.Sprintf(`Router.throttler.adaptive.%s.minLimit`, destType),
3232
`Router.throttler.adaptive.minLimit`,
3333
),
34-
maxLimit: maxLimitFunc(config, destType, destinationID,
34+
maxLimit: maxLimitFunc(c, destType, destinationID,
3535
[]string{
3636
fmt.Sprintf(`Router.throttler.adaptive.%s.%s.%s.maxLimit`, destType, destinationID, eventType),
3737
fmt.Sprintf(`Router.throttler.adaptive.%s.%s.maxLimit`, destType, destinationID),
@@ -41,7 +41,7 @@ func NewPerEventTypeThrottler(destType, destinationID, eventType string,
4141
},
4242
),
4343
// static cost for per-event-type throttler: cost was originally introduced to address rate limit differences between different event types, so not needed when using per-event-type throttler
44-
staticCost: true,
44+
staticCost: config.SingleValueLoader(true),
4545

4646
everyGauge: kitsync.NewOnceEvery(time.Second),
4747
limitFactorGauge: stat.NewTaggedStat("adaptive_throttler_limit_factor", stats.GaugeType, stats.Tags{

router/throttler/internal/pickup/adaptive/throttler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type throttler struct {
2222
window config.ValueLoader[time.Duration]
2323
minLimit config.ValueLoader[int64]
2424
maxLimit func() int64
25-
staticCost bool
25+
staticCost config.ValueLoader[bool]
2626

2727
everyGauge *kitsync.OnceEvery
2828
limitFactorGauge stats.Gauge
@@ -85,7 +85,7 @@ func (t *throttler) updateGauges() {
8585
}
8686

8787
func (t *throttler) costFn(input int64) int64 {
88-
if t.staticCost {
88+
if t.staticCost.Load() {
8989
return 1
9090
}
9191
return input

router/throttler/internal/pickup/static/static_all_event_types.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,23 @@ import (
1010
)
1111

1212
// NewAllEventTypesThrottler constructs a new static throttler for all event types of a destination
13-
func NewAllEventTypesThrottler(destType, destinationID string, limiter Limiter, config *config.Config, stat stats.Stats, log Logger) *throttler {
13+
func NewAllEventTypesThrottler(destType, destinationID string, limiter Limiter, c *config.Config, stat stats.Stats, log Logger) *throttler {
1414
return &throttler{
1515
destinationID: destinationID,
1616
eventType: "all",
1717
key: destinationID, // key is destinationID
1818

1919
limiter: limiter,
2020
log: log,
21-
limit: config.GetReloadableInt64Var(0, 1,
21+
limit: c.GetReloadableInt64Var(0, 1,
2222
fmt.Sprintf(`Router.throttler.%s.%s.limit`, destType, destinationID),
2323
fmt.Sprintf(`Router.throttler.%s.limit`, destType),
2424
),
25-
window: config.GetReloadableDurationVar(0, time.Second,
25+
window: c.GetReloadableDurationVar(0, time.Second,
2626
fmt.Sprintf(`Router.throttler.%s.%s.timeWindow`, destType, destinationID),
2727
fmt.Sprintf(`Router.throttler.%s.timeWindow`, destType),
2828
),
29-
staticCost: false,
29+
staticCost: c.GetReloadableBoolVar(true, `Router.throttler.ignoreThrottlingCosts`),
3030

3131
onceEveryGauge: kitsync.NewOnceEvery(time.Second),
3232
rateLimitGauge: stat.NewTaggedStat("throttling_rate_limit", stats.GaugeType, stats.Tags{

router/throttler/internal/pickup/static/static_all_event_types_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,12 @@ func TestAllEventTypesThrottler(t *testing.T) {
8383

8484
throttler := NewAllEventTypesThrottler(destType, destinationID, mockLimiter, config, statsStore, logger.NOP)
8585

86-
limited, err := throttler.CheckLimitReached(context.Background(), 5)
86+
limited, err := throttler.CheckLimitReached(context.Background(), 1)
8787

8888
require.NoError(t, err)
8989
require.False(t, limited)
9090
require.Len(t, mockLimiter.CallLog, 1)
91-
require.Equal(t, int64(5), mockLimiter.CallLog[0].Cost)
91+
require.Equal(t, int64(1), mockLimiter.CallLog[0].Cost)
9292
require.Equal(t, int64(100), mockLimiter.CallLog[0].Rate)
9393
require.Equal(t, int64(10), mockLimiter.CallLog[0].Window)
9494
require.Equal(t, destinationID, mockLimiter.CallLog[0].Key)

router/throttler/internal/pickup/static/static_per_event_type.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,28 @@ import (
1010
)
1111

1212
// NewPerEventTypeThrottler constructs a new static throttler for a specific event type of a destination
13-
func NewPerEventTypeThrottler(destType, destinationID, eventType string, limiter Limiter, config *config.Config, stat stats.Stats, log Logger) *throttler {
13+
func NewPerEventTypeThrottler(destType, destinationID, eventType string, limiter Limiter, c *config.Config, stat stats.Stats, log Logger) *throttler {
1414
return &throttler{
1515
destinationID: destinationID,
1616
eventType: eventType,
1717
key: destinationID + ":" + eventType, // key is destinationID + ":" + eventType
1818

1919
limiter: limiter,
2020
log: log,
21-
limit: config.GetReloadableInt64Var(0, 1,
21+
limit: c.GetReloadableInt64Var(0, 1,
2222
fmt.Sprintf(`Router.throttler.%s.%s.%s.limit`, destType, destinationID, eventType),
2323
fmt.Sprintf(`Router.throttler.%s.%s.limit`, destType, destinationID),
2424
fmt.Sprintf(`Router.throttler.%s.%s.limit`, destType, eventType),
2525
fmt.Sprintf(`Router.throttler.%s.limit`, destType),
2626
),
27-
window: config.GetReloadableDurationVar(0, time.Second,
27+
window: c.GetReloadableDurationVar(0, time.Second,
2828
fmt.Sprintf(`Router.throttler.%s.%s.%s.timeWindow`, destType, destinationID, eventType),
2929
fmt.Sprintf(`Router.throttler.%s.%s.timeWindow`, destType, destinationID),
3030
fmt.Sprintf(`Router.throttler.%s.%s.timeWindow`, destType, eventType),
3131
fmt.Sprintf(`Router.throttler.%s.timeWindow`, destType),
3232
),
3333
// static cost for per-event-type throttler: cost was originally introduced to address rate limit differences between different event types, so not needed when using per-event-type throttler
34-
staticCost: true,
34+
staticCost: config.SingleValueLoader(true),
3535

3636
onceEveryGauge: kitsync.NewOnceEvery(time.Second),
3737
rateLimitGauge: stat.NewTaggedStat("throttling_rate_limit", stats.GaugeType, stats.Tags{

router/throttler/internal/pickup/static/throttler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type throttler struct {
1919
log Logger
2020
limit config.ValueLoader[int64]
2121
window config.ValueLoader[time.Duration]
22-
staticCost bool
22+
staticCost config.ValueLoader[bool]
2323

2424
onceEveryGauge *kitsync.OnceEvery
2525
rateLimitGauge stats.Gauge
@@ -66,7 +66,7 @@ func (t *throttler) updateGauges() {
6666
}
6767

6868
func (t *throttler) costFn(input int64) int64 {
69-
if t.staticCost {
69+
if t.staticCost.Load() {
7070
return 1
7171
}
7272
return input

0 commit comments

Comments
 (0)