Skip to content

Commit 4689d4a

Browse files
committed
TEST retry, context and interceptors
1 parent c6ee53d commit 4689d4a

18 files changed

+679
-64
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// SPDX-FileCopyrightText: 2022-present Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package io.atomix.client;
6+
7+
import io.grpc.Context;
8+
import io.grpc.Metadata;
9+
10+
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
11+
12+
public class Constants {
13+
/**
14+
* Context key carrying over the application id.
15+
*/
16+
public static final Context.Key<String> APPLICATION_ID_CTX = Context.key("Application-ID");
17+
/**
18+
* Metadata key carrying over the application id.
19+
*/
20+
public static final Metadata.Key<String> APPLICATION_ID_META = Metadata.Key.of("Application-ID", ASCII_STRING_MARSHALLER);
21+
/**
22+
* Context key carrying over the primitive id.
23+
*/
24+
public static final Context.Key<String> PRIMITIVE_ID_CTX = Context.key("Primitive-ID");
25+
/**
26+
* Metadata key carrying over the primitive id.
27+
*/
28+
public static final Metadata.Key<String> PRIMITIVE_ID_META = Metadata.Key.of("Primitive-ID", ASCII_STRING_MARSHALLER);
29+
/**
30+
* Context key carrying over the session id.
31+
*/
32+
public static final Context.Key<String> SESSION_ID_CTX = Context.key("Session-ID");
33+
/**
34+
* Metadata key carrying over the session id.
35+
*/
36+
public static final Metadata.Key<String> SESSION_ID_META = Metadata.Key.of("Session-ID", ASCII_STRING_MARSHALLER);
37+
}

src/main/java/io/atomix/client/PrimitiveBuilder.java

+10-12
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55
package io.atomix.client;
66

77

8-
import io.atomix.client.utils.ThreadContext;
98
import io.grpc.Channel;
9+
import io.grpc.Context;
1010

1111
import java.util.concurrent.CompletableFuture;
1212
import java.util.concurrent.CompletionException;
1313

1414
import static com.google.common.base.Preconditions.checkNotNull;
15+
import static com.google.common.base.Preconditions.checkState;
1516

1617
/**
1718
* Abstract builder for distributed primitives.
@@ -23,15 +24,12 @@ public abstract class PrimitiveBuilder<B extends PrimitiveBuilder<B, P>, P exten
2324
protected final String primitiveName;
2425
protected boolean readOnly;
2526
protected final Channel serviceChannel;
26-
protected final ThreadContext threadContext;
27-
28-
protected PrimitiveBuilder(String primitiveName, Channel serviceChannel, ThreadContext threadContext) {
29-
this.primitiveName = checkNotNull(primitiveName,
30-
"primitive name cannot be null");
31-
this.serviceChannel = checkNotNull(serviceChannel,
32-
"primitive channel cannot be null");
33-
this.threadContext = checkNotNull(threadContext,
34-
"thread context cannot be null");
27+
protected final Context context;
28+
29+
protected PrimitiveBuilder(String primitiveName, Channel serviceChannel, Context context) {
30+
this.primitiveName = checkNotNull(primitiveName, "primitive name cannot be null");
31+
this.serviceChannel = checkNotNull(serviceChannel, "primitive channel cannot be null");
32+
this.context = checkNotNull(context, "context cannot be null");
3533
}
3634

3735
/**
@@ -57,8 +55,8 @@ protected Channel getServiceChannel() {
5755
*
5856
* @return the thread context
5957
*/
60-
protected ThreadContext getThreadContext() {
61-
return threadContext;
58+
protected Context getContext() {
59+
return context;
6260
}
6361

6462
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// SPDX-FileCopyrightText: 2022-present Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package io.atomix.client;
6+
7+
import io.grpc.CallOptions;
8+
import io.grpc.Channel;
9+
import io.grpc.ClientCall;
10+
import io.grpc.ClientInterceptor;
11+
import io.grpc.ForwardingClientCall;
12+
import io.grpc.Metadata;
13+
import io.grpc.MethodDescriptor;
14+
15+
public class PrimitiveClientInterceptor implements ClientInterceptor {
16+
@Override
17+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
18+
CallOptions callOptions,
19+
Channel channel) {
20+
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(methodDescriptor, callOptions)) {
21+
@Override
22+
public void start(Listener<RespT> responseListener, Metadata headers) {
23+
String appId = Constants.APPLICATION_ID_CTX.get();
24+
String primitiveId = Constants.PRIMITIVE_ID_CTX.get();
25+
String sessionId = Constants.SESSION_ID_CTX.get();
26+
if (appId != null && primitiveId != null && sessionId != null) {
27+
headers.put(Constants.APPLICATION_ID_META, appId);
28+
headers.put(Constants.PRIMITIVE_ID_META, primitiveId);
29+
headers.put(Constants.SESSION_ID_META, sessionId);
30+
}
31+
super.start(responseListener, headers);
32+
}
33+
};
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// SPDX-FileCopyrightText: 2022-present Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package io.atomix.client;
6+
7+
import atomix.counter.v1.CounterGrpc;
8+
import atomix.election.v1.LeaderElectionGrpc;
9+
10+
/**
11+
* Enum for the primitive services to be used in the service config builder.
12+
*/
13+
public enum PrimitiveService {
14+
/**
15+
* Cunter gRPC service.
16+
*/
17+
COUNTER(CounterGrpc.SERVICE_NAME),
18+
ELECTION(LeaderElectionGrpc.SERVICE_NAME);
19+
20+
private final String serviceName;
21+
22+
PrimitiveService(String value) {
23+
serviceName = value;
24+
}
25+
26+
public String getServiceName() {
27+
return serviceName;
28+
}
29+
}

src/main/java/io/atomix/client/PrimitiveType.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
package io.atomix.client;
66

7-
import io.atomix.client.utils.ThreadContext;
87
import io.grpc.Channel;
8+
import io.grpc.Context;
99

1010
/**
1111
* Primitive type.
@@ -17,9 +17,9 @@ public interface PrimitiveType<B extends PrimitiveBuilder, P extends SyncPrimiti
1717
*
1818
* @param primitiveName the primitive name
1919
* @param serviceChannel the channel to be used for the primitive services
20-
* @param threadContext the thread context
20+
* @param context the context
2121
* @return a new primitive builder
2222
*/
23-
B newBuilder(String primitiveName, Channel serviceChannel, ThreadContext threadContext);
23+
B newBuilder(String primitiveName, Channel serviceChannel, Context context);
2424

2525
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// SPDX-FileCopyrightText: 2022-present Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package io.atomix.client;
6+
7+
import com.google.common.collect.Lists;
8+
import com.google.common.collect.Maps;
9+
10+
import java.util.List;
11+
import java.util.Map;
12+
13+
14+
/**
15+
* Service config builder used to configure the internal retry
16+
* mechanism of the gRPC services.
17+
*/
18+
public class ServiceConfigBuilder {
19+
public static final String MAX_ATTEMPTS = "maxAttempts";
20+
public static final String INITIAL_BACKOFF = "initialBackoff";
21+
public static final String MAX_BACKOFF = "maxBackoff";
22+
public static final String BACKOFF_MULTIPLIER = "backoffMultiplier";
23+
public static final String RETRYABLE_STATUS_CODES = "retryableStatusCodes";
24+
public static final String SERVICE = "service";
25+
public static final String NAME = "name";
26+
public static final String RETRY_POLICY = "retryPolicy";
27+
public static final String METHOD_CONFIG = "methodConfig";
28+
29+
private Double maxAttempts = 5.0;
30+
private String initialBackoff = "0.5s";
31+
private String maxBackoff = "30s";
32+
private Double backoffMultiplier = 2.0;
33+
private List<String> retryableStatusCodes = List.of("UNAVAILABLE");
34+
35+
/**
36+
* Set the maximum number of attempts for retries
37+
*
38+
* @param maxAttempts max attempts to be performed
39+
* @return this
40+
*/
41+
ServiceConfigBuilder withMaxAttempts(double maxAttempts) {
42+
this.maxAttempts = maxAttempts;
43+
return this;
44+
}
45+
46+
/**
47+
* Set the initial backoff for retries
48+
*
49+
* @param initialBackoff the initial backoff
50+
* @return this
51+
*/
52+
ServiceConfigBuilder withInitialBackoff(String initialBackoff) {
53+
this.initialBackoff = initialBackoff;
54+
return this;
55+
}
56+
57+
/**
58+
* Set the maximum backoff for retries
59+
*
60+
* @param maxBackoff the maximum backoff
61+
* @return this
62+
*/
63+
ServiceConfigBuilder withMaxBackoff(String maxBackoff) {
64+
this.maxBackoff = maxBackoff;
65+
return this;
66+
}
67+
68+
/**
69+
* Set the backoff multiplier for retries
70+
*
71+
* @param backoffMultiplier the backoff multiplier
72+
* @return this
73+
*/
74+
ServiceConfigBuilder withBackoffMultiplier(double backoffMultiplier) {
75+
this.backoffMultiplier = backoffMultiplier;
76+
return this;
77+
}
78+
79+
/**
80+
* Define the retryable status codes.
81+
*
82+
* @param retryableStatusCodes the retryable status codes
83+
* @return this
84+
*/
85+
ServiceConfigBuilder withRetryableStatusCodes(List<String> retryableStatusCodes) {
86+
this.retryableStatusCodes = retryableStatusCodes;
87+
return this;
88+
}
89+
90+
/**
91+
* Builds the band.
92+
*
93+
* @return a band
94+
*/
95+
Map<String, ?> build() {
96+
// For further infos look at service_config.proto in the grpc repo
97+
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#detailed-design
98+
Map<String, Object> methodConfig = Maps.newHashMap();
99+
// method config is the overarching map
100+
List<Map<String, Object>> methodConfigs = Lists.newArrayList();
101+
for (PrimitiveService primitiveService : PrimitiveService.values()) {
102+
// retryPolicy opaque map
103+
Map<String, Object> retryPolicy = Maps.newHashMap();
104+
retryPolicy.put(MAX_ATTEMPTS, maxAttempts);
105+
retryPolicy.put(INITIAL_BACKOFF, initialBackoff);
106+
retryPolicy.put(MAX_BACKOFF, maxBackoff);
107+
retryPolicy.put(BACKOFF_MULTIPLIER, backoffMultiplier);
108+
retryPolicy.put(RETRYABLE_STATUS_CODES, retryableStatusCodes);
109+
110+
// name defines the grpc services affected by the retry policy
111+
// we do that for all the primitives we want to support
112+
List<Map<String, Object>> name = Lists.newArrayList();
113+
114+
Map<String, Object> serviceName = Maps.newHashMap();
115+
serviceName.put(SERVICE, primitiveService.getServiceName());
116+
name.add(serviceName);
117+
118+
Map<String, Object> serviceConfig = Maps.newHashMap();
119+
serviceConfig.put(NAME, name);
120+
serviceConfig.put(RETRY_POLICY, retryPolicy);
121+
methodConfigs.add(serviceConfig);
122+
}
123+
methodConfig.put(METHOD_CONFIG, methodConfigs);
124+
return methodConfig;
125+
}
126+
127+
}

src/main/java/io/atomix/client/counter/AtomicCounterBuilder.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@
55
package io.atomix.client.counter;
66

77
import io.atomix.client.PrimitiveBuilder;
8-
import io.atomix.client.utils.ThreadContext;
98
import io.grpc.Channel;
9+
import io.grpc.Context;
1010

1111
/**
1212
* Builder for AtomicCounter.
1313
*/
1414
public abstract class AtomicCounterBuilder
1515
extends PrimitiveBuilder<AtomicCounterBuilder, AtomicCounter> {
1616

17-
protected AtomicCounterBuilder(String primitiveName, Channel serviceChannel, ThreadContext threadContext) {
18-
super(primitiveName, serviceChannel, threadContext);
17+
protected AtomicCounterBuilder(String primitiveName, Channel serviceChannel, Context context) {
18+
super(primitiveName, serviceChannel, context);
1919
}
2020
}

src/main/java/io/atomix/client/counter/AtomicCounterType.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66

77
import io.atomix.client.PrimitiveType;
88
import io.atomix.client.counter.impl.DefaultAtomicCounterBuilder;
9-
import io.atomix.client.utils.ThreadContext;
109
import io.grpc.Channel;
10+
import io.grpc.Context;
1111

1212
import static com.google.common.base.MoreObjects.toStringHelper;
1313

@@ -28,8 +28,8 @@ public static AtomicCounterType instance() {
2828

2929
@Override
3030
public AtomicCounterBuilder newBuilder(String primitiveName, Channel serviceChannel,
31-
ThreadContext threadContext) {
32-
return new DefaultAtomicCounterBuilder(primitiveName, serviceChannel, threadContext);
31+
Context context) {
32+
return new DefaultAtomicCounterBuilder(primitiveName, serviceChannel, context);
3333
}
3434

3535
@Override

src/main/java/io/atomix/client/counter/impl/DefaultAsyncAtomicCounter.java

+4-8
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,14 @@
66

77
import atomix.counter.v1.CounterGrpc;
88
import atomix.counter.v1.CounterOuterClass.GetRequest;
9-
import io.atomix.client.PrimitiveState;
10-
import io.atomix.client.PrimitiveType;
119
import io.atomix.client.counter.AsyncAtomicCounter;
1210
import io.atomix.client.counter.AtomicCounter;
1311
import io.atomix.client.impl.AbstractAsyncPrimitive;
14-
import io.atomix.client.utils.ThreadContext;
1512
import io.grpc.Channel;
13+
import io.grpc.Context;
1614

1715
import java.time.Duration;
1816
import java.util.concurrent.CompletableFuture;
19-
import java.util.function.Consumer;
2017

2118
import static atomix.counter.v1.CounterOuterClass.*;
2219

@@ -27,7 +24,7 @@ public class DefaultAsyncAtomicCounter
2724
extends AbstractAsyncPrimitive<CounterGrpc.CounterStub, AsyncAtomicCounter>
2825
implements AsyncAtomicCounter {
2926

30-
public DefaultAsyncAtomicCounter(String primitiveName, Channel serviceChannel, ThreadContext context) {
27+
public DefaultAsyncAtomicCounter(String primitiveName, Channel serviceChannel, Context context) {
3128
super(primitiveName, CounterGrpc.newStub(serviceChannel), context);
3229
}
3330

@@ -49,9 +46,8 @@ public CompletableFuture<Void> destroy() {
4946
@Override
5047
public CompletableFuture<Long> get() {
5148
return execute((header, observer) -> service().get(GetRequest.newBuilder()
52-
.setHeaders(header)
53-
.build(), observer), GetResponse::getHeaders)
54-
.thenApply(response -> response.getOutput().getValue());
49+
.setHeaders(header).build(), observer), GetResponse::getHeaders)
50+
.thenApply(response -> response.getOutput().getValue());
5551
}
5652

5753
@Override

0 commit comments

Comments
 (0)