Skip to content

Commit caf6e3a

Browse files
authored
Immediately trigger error for non-recoverable error in client creation (oxia-db#295)
A non-recoverable error, such as trying to use a namespace that does not exist, should trigger an immediate error in the client creation, instead of having the client do multiple retries.
1 parent 2f14cc5 commit caf6e3a

File tree

4 files changed

+29
-6
lines changed

4 files changed

+29
-6
lines changed

common/wait_group.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ func (g *waitGroup) Wait(ctx context.Context) error {
6262
}
6363

6464
func (g *waitGroup) Done() {
65-
g.responses <- nil
65+
select {
66+
case g.responses <- nil:
67+
default:
68+
}
6669
}
6770

6871
func (g *waitGroup) Fail(err error) {

coordinator/impl/coordinator_e2e_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,10 @@ func TestCoordinator_MultipleNamespaces(t *testing.T) {
293293
assert.NoError(t, err)
294294
defer clientNs1.Close()
295295

296+
clientNs3, err := oxia.NewSyncClient(sa1.Public, oxia.WithNamespace("my-ns-does-not-exist"))
297+
assert.ErrorIs(t, err, common.ErrorNamespaceNotFound)
298+
assert.Nil(t, clientNs3)
299+
296300
ctx := context.Background()
297301

298302
// Write in default ns

oxia/internal/shard_manager.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type ShardManager interface {
3838

3939
type shardManagerImpl struct {
4040
sync.RWMutex
41-
updatedCondition common.ConditionContext
41+
updatedWg common.WaitGroup
4242

4343
shardStrategy ShardStrategy
4444
clientPool common.ClientPool
@@ -63,7 +63,7 @@ func NewShardManager(shardStrategy ShardStrategy, clientPool common.ClientPool,
6363
logger: log.With().Str("component", "shardManager").Logger(),
6464
}
6565

66-
sm.updatedCondition = common.NewConditionContext(sm)
66+
sm.updatedWg = common.NewWaitGroup(1)
6767
sm.ctx, sm.cancel = context.WithCancel(context.Background())
6868

6969
if err := sm.start(); err != nil {
@@ -80,7 +80,6 @@ func (s *shardManagerImpl) Close() error {
8080

8181
func (s *shardManagerImpl) start() error {
8282
s.Lock()
83-
defer s.Unlock()
8483

8584
go common.DoWithLabels(map[string]string{
8685
"oxia": "receive-shard-updates",
@@ -89,7 +88,8 @@ func (s *shardManagerImpl) start() error {
8988
ctx, cancel := context.WithTimeout(s.ctx, s.requestTimeout)
9089
defer cancel()
9190

92-
return s.updatedCondition.Wait(ctx)
91+
s.Unlock()
92+
return s.updatedWg.Wait(ctx)
9393
}
9494

9595
func (s *shardManagerImpl) Get(key string) uint32 {
@@ -140,6 +140,10 @@ func (s *shardManagerImpl) receiveWithRecovery() {
140140
s.logger.Debug().Err(err).Msg("Closed")
141141
return nil
142142
}
143+
144+
if !isErrorRetryable(err) {
145+
return backoff.Permanent(err)
146+
}
143147
return err
144148
},
145149
backOff,
@@ -153,6 +157,7 @@ func (s *shardManagerImpl) receiveWithRecovery() {
153157
)
154158
if err != nil {
155159
s.logger.Error().Err(err).Msg("Failed receiving shard assignments")
160+
s.updatedWg.Fail(err)
156161
}
157162
}
158163

@@ -205,9 +210,19 @@ func (s *shardManagerImpl) update(updates []Shard) {
205210
s.shards[update.Id] = update
206211
}
207212

208-
s.updatedCondition.Signal()
213+
s.updatedWg.Done()
209214
}
210215

211216
func overlap(a HashRange, b HashRange) bool {
212217
return !(a.MinInclusive > b.MaxInclusive || a.MaxInclusive < b.MinInclusive)
213218
}
219+
220+
func isErrorRetryable(err error) bool {
221+
switch status.Code(err) {
222+
case common.CodeNamespaceNotFound:
223+
return false
224+
225+
default:
226+
return true
227+
}
228+
}

server/assignment_dispatcher.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func (s *shardAssignmentDispatcher) RegisterForUpdates(req *proto.ShardAssignmen
7373
}
7474

7575
if _, ok := s.assignments.Namespaces[namespace]; !ok {
76+
s.Unlock()
7677
return common.ErrorNamespaceNotFound
7778
}
7879

0 commit comments

Comments
 (0)