25
25
import static io .grpc .ConnectivityState .SHUTDOWN ;
26
26
import static io .grpc .ConnectivityState .TRANSIENT_FAILURE ;
27
27
28
+ import com .google .common .annotations .VisibleForTesting ;
29
+ import com .google .common .base .Joiner ;
28
30
import com .google .common .base .MoreObjects ;
29
31
import com .google .common .collect .HashMultiset ;
30
32
import com .google .common .collect .Multiset ;
34
36
import io .grpc .EquivalentAddressGroup ;
35
37
import io .grpc .InternalLogId ;
36
38
import io .grpc .LoadBalancer ;
39
+ import io .grpc .Metadata ;
37
40
import io .grpc .Status ;
38
41
import io .grpc .SynchronizationContext ;
39
42
import io .grpc .util .MultiChildLoadBalancer ;
43
+ import io .grpc .xds .ThreadSafeRandom .ThreadSafeRandomImpl ;
40
44
import io .grpc .xds .client .XdsLogger ;
41
45
import io .grpc .xds .client .XdsLogger .XdsLogLevel ;
42
46
import java .net .SocketAddress ;
@@ -69,13 +73,21 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
69
73
new LazyLoadBalancer .Factory (pickFirstLbProvider );
70
74
private final XdsLogger logger ;
71
75
private final SynchronizationContext syncContext ;
76
+ private final ThreadSafeRandom random ;
72
77
private List <RingEntry > ring ;
78
+ @ Nullable private Metadata .Key <String > requestHashHeaderKey ;
73
79
74
80
RingHashLoadBalancer (Helper helper ) {
81
+ this (helper , ThreadSafeRandomImpl .instance );
82
+ }
83
+
84
+ @ VisibleForTesting
85
+ RingHashLoadBalancer (Helper helper , ThreadSafeRandom random ) {
75
86
super (helper );
76
87
syncContext = checkNotNull (helper .getSynchronizationContext (), "syncContext" );
77
88
logger = XdsLogger .withLogId (InternalLogId .allocate ("ring_hash_lb" , helper .getAuthority ()));
78
89
logger .log (XdsLogLevel .INFO , "Created" );
90
+ this .random = checkNotNull (random , "random" );
79
91
}
80
92
81
93
@ Override
@@ -92,6 +104,10 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
92
104
if (config == null ) {
93
105
throw new IllegalArgumentException ("Missing RingHash configuration" );
94
106
}
107
+ requestHashHeaderKey =
108
+ config .requestHashHeader .isEmpty ()
109
+ ? null
110
+ : Metadata .Key .of (config .requestHashHeader , Metadata .ASCII_STRING_MARSHALLER );
95
111
Map <EquivalentAddressGroup , Long > serverWeights = new HashMap <>();
96
112
long totalWeight = 0L ;
97
113
for (EquivalentAddressGroup eag : addrList ) {
@@ -197,7 +213,8 @@ protected void updateOverallBalancingState() {
197
213
overallState = TRANSIENT_FAILURE ;
198
214
}
199
215
200
- RingHashPicker picker = new RingHashPicker (syncContext , ring , getChildLbStates ());
216
+ RingHashPicker picker =
217
+ new RingHashPicker (syncContext , ring , getChildLbStates (), requestHashHeaderKey , random );
201
218
getHelper ().updateBalancingState (overallState , picker );
202
219
this .currentConnectivityState = overallState ;
203
220
}
@@ -325,21 +342,32 @@ private static final class RingHashPicker extends SubchannelPicker {
325
342
// TODO(chengyuanzhang): can be more performance-friendly with
326
343
// IdentityHashMap<Subchannel, ConnectivityStateInfo> and RingEntry contains Subchannel.
327
344
private final Map <Endpoint , SubchannelView > pickableSubchannels ; // read-only
345
+ @ Nullable private final Metadata .Key <String > requestHashHeaderKey ;
346
+ private final ThreadSafeRandom random ;
347
+ private final boolean hasEndpointInConnectingState ;
328
348
329
349
private RingHashPicker (
330
350
SynchronizationContext syncContext , List <RingEntry > ring ,
331
- Collection <ChildLbState > children ) {
351
+ Collection <ChildLbState > children , Metadata .Key <String > requestHashHeaderKey ,
352
+ ThreadSafeRandom random ) {
332
353
this .syncContext = syncContext ;
333
354
this .ring = ring ;
355
+ this .requestHashHeaderKey = requestHashHeaderKey ;
356
+ this .random = random ;
334
357
pickableSubchannels = new HashMap <>(children .size ());
358
+ boolean hasConnectingState = false ;
335
359
for (ChildLbState childLbState : children ) {
336
360
pickableSubchannels .put ((Endpoint )childLbState .getKey (),
337
361
new SubchannelView (childLbState , childLbState .getCurrentState ()));
362
+ if (childLbState .getCurrentState () == CONNECTING ) {
363
+ hasConnectingState = true ;
364
+ }
338
365
}
366
+ this .hasEndpointInConnectingState = hasConnectingState ;
339
367
}
340
368
341
369
// Find the ring entry with hash next to (clockwise) the RPC's hash (binary search).
342
- private int getTargetIndex (Long requestHash ) {
370
+ private int getTargetIndex (long requestHash ) {
343
371
if (ring .size () <= 1 ) {
344
372
return 0 ;
345
373
}
@@ -365,38 +393,77 @@ private int getTargetIndex(Long requestHash) {
365
393
366
394
@ Override
367
395
public PickResult pickSubchannel (PickSubchannelArgs args ) {
368
- Long requestHash = args .getCallOptions ().getOption (XdsNameResolver .RPC_HASH_KEY );
369
- if (requestHash == null ) {
370
- return PickResult .withError (RPC_HASH_NOT_FOUND );
396
+ // Determine request hash.
397
+ boolean usingRandomHash = false ;
398
+ long requestHash ;
399
+ if (requestHashHeaderKey == null ) {
400
+ // Set by the xDS config selector.
401
+ Long rpcHashFromCallOptions = args .getCallOptions ().getOption (XdsNameResolver .RPC_HASH_KEY );
402
+ if (rpcHashFromCallOptions == null ) {
403
+ return PickResult .withError (RPC_HASH_NOT_FOUND );
404
+ }
405
+ requestHash = rpcHashFromCallOptions ;
406
+ } else {
407
+ Iterable <String > headerValues = args .getHeaders ().getAll (requestHashHeaderKey );
408
+ if (headerValues != null ) {
409
+ requestHash = hashFunc .hashAsciiString (Joiner .on ("," ).join (headerValues ));
410
+ } else {
411
+ requestHash = random .nextLong ();
412
+ usingRandomHash = true ;
413
+ }
371
414
}
372
415
373
416
int targetIndex = getTargetIndex (requestHash );
374
417
375
- // Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore
376
- // all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE. If
377
- // CONNECTING or IDLE we return a pick with no results. Additionally, if that entry is in
378
- // IDLE, we initiate a connection.
379
- for (int i = 0 ; i < ring .size (); i ++) {
380
- int index = (targetIndex + i ) % ring .size ();
381
- SubchannelView subchannelView = pickableSubchannels .get (ring .get (index ).addrKey );
382
- ChildLbState childLbState = subchannelView .childLbState ;
383
-
384
- if (subchannelView .connectivityState == READY ) {
385
- return childLbState .getCurrentPicker ().pickSubchannel (args );
418
+ if (!usingRandomHash ) {
419
+ // Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore
420
+ // all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE. If
421
+ // CONNECTING or IDLE we return a pick with no results. Additionally, if that entry is in
422
+ // IDLE, we initiate a connection.
423
+ for (int i = 0 ; i < ring .size (); i ++) {
424
+ int index = (targetIndex + i ) % ring .size ();
425
+ SubchannelView subchannelView = pickableSubchannels .get (ring .get (index ).addrKey );
426
+ ChildLbState childLbState = subchannelView .childLbState ;
427
+
428
+ if (subchannelView .connectivityState == READY ) {
429
+ return childLbState .getCurrentPicker ().pickSubchannel (args );
430
+ }
431
+
432
+ // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
433
+ // are failed unless there is a READY connection.
434
+ if (subchannelView .connectivityState == CONNECTING ) {
435
+ return PickResult .withNoResult ();
436
+ }
437
+
438
+ if (subchannelView .connectivityState == IDLE ) {
439
+ syncContext .execute (() -> {
440
+ childLbState .getLb ().requestConnection ();
441
+ });
442
+
443
+ return PickResult .withNoResult (); // Indicates that this should be retried after backoff
444
+ }
386
445
}
387
-
388
- // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
389
- // are failed unless there is a READY connection.
390
- if (subchannelView .connectivityState == CONNECTING ) {
391
- return PickResult .withNoResult ();
446
+ } else {
447
+ // Using a random hash. Find and use the first READY ring entry, triggering at most one
448
+ // entry to attempt connection.
449
+ boolean requestedConnection = hasEndpointInConnectingState ;
450
+ for (int i = 0 ; i < ring .size (); i ++) {
451
+ int index = (targetIndex + i ) % ring .size ();
452
+ SubchannelView subchannelView = pickableSubchannels .get (ring .get (index ).addrKey );
453
+ ChildLbState childLbState = subchannelView .childLbState ;
454
+ if (subchannelView .connectivityState == READY ) {
455
+ return childLbState .getCurrentPicker ().pickSubchannel (args );
456
+ }
457
+ if (!requestedConnection && subchannelView .connectivityState == IDLE ) {
458
+ syncContext .execute (
459
+ () -> {
460
+ childLbState .getLb ().requestConnection ();
461
+ });
462
+ requestedConnection = true ;
463
+ }
392
464
}
393
-
394
- if (subchannelView .connectivityState == IDLE ) {
395
- syncContext .execute (() -> {
396
- childLbState .getLb ().requestConnection ();
397
- });
398
-
399
- return PickResult .withNoResult (); // Indicates that this should be retried after backoff
465
+ if (requestedConnection ) {
466
+ return PickResult .withNoResult ();
400
467
}
401
468
}
402
469
@@ -444,20 +511,24 @@ public int compareTo(RingEntry entry) {
444
511
static final class RingHashConfig {
445
512
final long minRingSize ;
446
513
final long maxRingSize ;
514
+ final String requestHashHeader ;
447
515
448
- RingHashConfig (long minRingSize , long maxRingSize ) {
516
+ RingHashConfig (long minRingSize , long maxRingSize , String requestHashHeader ) {
449
517
checkArgument (minRingSize > 0 , "minRingSize <= 0" );
450
518
checkArgument (maxRingSize > 0 , "maxRingSize <= 0" );
451
519
checkArgument (minRingSize <= maxRingSize , "minRingSize > maxRingSize" );
520
+ checkNotNull (requestHashHeader );
452
521
this .minRingSize = minRingSize ;
453
522
this .maxRingSize = maxRingSize ;
523
+ this .requestHashHeader = requestHashHeader ;
454
524
}
455
525
456
526
@ Override
457
527
public String toString () {
458
528
return MoreObjects .toStringHelper (this )
459
529
.add ("minRingSize" , minRingSize )
460
530
.add ("maxRingSize" , maxRingSize )
531
+ .add ("requestHashHeader" , requestHashHeader )
461
532
.toString ();
462
533
}
463
534
}
0 commit comments