Skip to content

Commit c6ee53d

Browse files
committed
[SDFAB-1210] WIP
1 parent 83ae44b commit c6ee53d

22 files changed

+1169
-1
lines changed

pom.xml

+13-1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,18 @@ SPDX-License-Identifier: Apache-2.0
8888
<artifactId>javax.annotation-api</artifactId>
8989
<version>1.3.2</version>
9090
</dependency>
91+
<dependency>
92+
<groupId>io.grpc</groupId>
93+
<artifactId>grpc-testing</artifactId>
94+
<version>${grpc.version}</version>
95+
<scope>test</scope>
96+
</dependency>
97+
<dependency>
98+
<groupId>org.mockito</groupId>
99+
<artifactId>mockito-core</artifactId>
100+
<version>3.4.0</version>
101+
<scope>test</scope>
102+
</dependency>
91103
</dependencies>
92104

93105
<profiles>
@@ -212,7 +224,7 @@ SPDX-License-Identifier: Apache-2.0
212224
<version>${maven.surefire.plugin.version}</version>
213225
<configuration>
214226
<shutdown>kill</shutdown>
215-
<argLine>${argLine} ${argLine.extras} ${argLine.common}</argLine>
227+
<argLine>${argLine.extras} ${argLine.common}</argLine>
216228
</configuration>
217229
</plugin>
218230

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// SPDX-FileCopyrightText: 2022-present Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package io.atomix.client;
6+
7+
import java.time.Duration;
8+
import java.util.concurrent.CompletableFuture;
9+
10+
/**
11+
* Asynchronous primitive.
12+
*/
13+
public interface AsyncPrimitive extends DistributedPrimitive {
14+
15+
/**
16+
* Closes the primitive.
17+
*
18+
* @return a future to be completed once the primitive is closed
19+
*/
20+
CompletableFuture<Void> close();
21+
22+
/**
23+
* Purges state associated with this primitive.
24+
* <p>
25+
* Implementations can override and provide appropriate clean up logic for purging
26+
* any state state associated with the primitive. Whether modifications made within the
27+
* destroy method have local or global visibility is left unspecified.
28+
*
29+
* @return {@code CompletableFuture} that is completed when the operation completes
30+
*/
31+
CompletableFuture<Void> destroy();
32+
33+
/**
34+
* Returns a synchronous wrapper around the asynchronous primitive.
35+
*
36+
* @return the synchronous primitive
37+
*/
38+
SyncPrimitive sync();
39+
40+
/**
41+
* Returns a synchronous wrapper around the asynchronous primitive.
42+
*
43+
* @param operationTimeout the synchronous operation timeout
44+
* @return the synchronous primitive
45+
*/
46+
SyncPrimitive sync(Duration operationTimeout);
47+
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// SPDX-FileCopyrightText: 2022-present Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package io.atomix.client;
6+
7+
8+
import java.util.function.Consumer;
9+
10+
/**
11+
* Interface for all distributed primitives.
12+
*/
13+
public interface DistributedPrimitive {
14+
15+
/**
16+
* Default timeout for primitive operations.
17+
*/
18+
long DEFAULT_OPERATION_TIMEOUT_MILLIS = 5000L;
19+
20+
/**
21+
* Returns the name of this primitive.
22+
*
23+
* @return name
24+
*/
25+
String name();
26+
27+
/**
28+
* Returns the type of primitive.
29+
*
30+
* @return primitive type
31+
*/
32+
PrimitiveType type();
33+
34+
/**
35+
* Registers a listener to be called when the primitive's state changes.
36+
*
37+
* @param listener The listener to be called when the state changes.
38+
*/
39+
default void addStateChangeListener(Consumer<PrimitiveState> listener) {
40+
}
41+
42+
/**
43+
* Unregisters a previously registered listener to be called when the primitive's state changes.
44+
*
45+
* @param listener The listener to unregister
46+
*/
47+
default void removeStateChangeListener(Consumer<PrimitiveState> listener) {
48+
}
49+
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// SPDX-FileCopyrightText: 2022-present Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package io.atomix.client;
6+
7+
8+
import io.atomix.client.utils.ThreadContext;
9+
import io.grpc.Channel;
10+
11+
import java.util.concurrent.CompletableFuture;
12+
import java.util.concurrent.CompletionException;
13+
14+
import static com.google.common.base.Preconditions.checkNotNull;
15+
16+
/**
17+
* Abstract builder for distributed primitives.
18+
*
19+
* @param <B> builder type
20+
* @param <P> primitive type
21+
*/
22+
public abstract class PrimitiveBuilder<B extends PrimitiveBuilder<B, P>, P extends SyncPrimitive> {
23+
protected final String primitiveName;
24+
protected boolean readOnly;
25+
protected final Channel serviceChannel;
26+
protected final ThreadContext threadContext;
27+
28+
protected PrimitiveBuilder(String primitiveName, Channel serviceChannel, ThreadContext threadContext) {
29+
this.primitiveName = checkNotNull(primitiveName,
30+
"primitive name cannot be null");
31+
this.serviceChannel = checkNotNull(serviceChannel,
32+
"primitive channel cannot be null");
33+
this.threadContext = checkNotNull(threadContext,
34+
"thread context cannot be null");
35+
}
36+
37+
/**
38+
* Returns the primitive name.
39+
*
40+
* @return the primitive name
41+
*/
42+
protected String getPrimitiveName() {
43+
return primitiveName;
44+
}
45+
46+
/**
47+
* Returns the service channel.
48+
*
49+
* @return the service channel
50+
*/
51+
protected Channel getServiceChannel() {
52+
return serviceChannel;
53+
}
54+
55+
/**
56+
* Returns the thread context.
57+
*
58+
* @return the thread context
59+
*/
60+
protected ThreadContext getThreadContext() {
61+
return threadContext;
62+
}
63+
64+
/**
65+
* Sets the primitive to read-only.
66+
*
67+
* @return the primitive builder
68+
*/
69+
@SuppressWarnings("unchecked")
70+
public B withReadOnly() {
71+
return withReadOnly(true);
72+
}
73+
74+
/**
75+
* Sets whether the primitive is read-only.
76+
*
77+
* @param readOnly whether the primitive is read-only
78+
* @return the primitive builder
79+
*/
80+
@SuppressWarnings("unchecked")
81+
public B withReadOnly(boolean readOnly) {
82+
this.readOnly = readOnly;
83+
return (B) this;
84+
}
85+
86+
/**
87+
* Builds a new instance of the primitive.
88+
* <p>
89+
* The returned instance will be distinct from all other instances of the same primitive on this node, with a
90+
* distinct session, ordering guarantees, memory, etc.
91+
*
92+
* @return a new instance of the primitive
93+
*/
94+
public P build() {
95+
try {
96+
return buildAsync().join();
97+
} catch (Exception e) {
98+
if (e instanceof CompletionException && e.getCause() instanceof RuntimeException) {
99+
throw (RuntimeException) e.getCause();
100+
} else {
101+
throw e;
102+
}
103+
}
104+
}
105+
106+
/**
107+
* Builds a new instance of the primitive asynchronously.
108+
* <p>
109+
* The returned instance will be distinct from all other instances of the same primitive on this node, with a
110+
* distinct session, ordering guarantees, memory, etc.
111+
*
112+
* @return asynchronous distributed primitive
113+
*/
114+
public abstract CompletableFuture<P> buildAsync();
115+
116+
/**
117+
* Gets or builds a singleton instance of the primitive.
118+
* <p>
119+
* The returned primitive will be shared by all {@code get()} calls for the named primitive. If no instance has yet
120+
* been constructed, the instance will be built from this builder's configuration.
121+
*
122+
* @return a singleton instance of the primitive
123+
*/
124+
public P get() {
125+
try {
126+
return getAsync().join();
127+
} catch (Exception e) {
128+
if (e instanceof CompletionException && e.getCause() instanceof RuntimeException) {
129+
throw (RuntimeException) e.getCause();
130+
} else {
131+
throw e;
132+
}
133+
}
134+
}
135+
136+
/**
137+
* Gets or builds a singleton instance of the primitive asynchronously.
138+
* <p>
139+
* The returned primitive will be shared by all {@code get()} calls for the named primitive. If no instance has yet
140+
* been constructed, the instance will be built from this builder's configuration.
141+
*
142+
* @return a singleton instance of the primitive
143+
*/
144+
public CompletableFuture<P> getAsync() {
145+
return null;
146+
}
147+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// SPDX-FileCopyrightText: 2022-present Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package io.atomix.client;
6+
7+
/**
8+
* State of distributed primitive.
9+
*/
10+
public enum PrimitiveState {
11+
12+
/**
13+
* Signifies a state wherein the primitive is operating correctly and is capable of meeting the advertised
14+
* consistency and reliability guarantees.
15+
*/
16+
CONNECTED,
17+
18+
/**
19+
* Signifies a state wherein the primitive is temporarily incapable of providing the advertised
20+
* consistency properties.
21+
*/
22+
SUSPENDED,
23+
24+
/**
25+
* FIXME might not apply anymore
26+
* Signifies a state wherein the primitive's session has been expired and therefore cannot perform its functions.
27+
*/
28+
EXPIRED,
29+
30+
/**
31+
* Signifies a state wherein the primitive session has been closed and therefore cannot perform its functions.
32+
*/
33+
CLOSED
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// SPDX-FileCopyrightText: 2022-present Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package io.atomix.client;
6+
7+
import io.atomix.client.utils.ThreadContext;
8+
import io.grpc.Channel;
9+
10+
/**
11+
* Primitive type.
12+
*/
13+
public interface PrimitiveType<B extends PrimitiveBuilder, P extends SyncPrimitive> {
14+
15+
/**
16+
* Returns a new primitive builder.
17+
*
18+
* @param primitiveName the primitive name
19+
* @param serviceChannel the channel to be used for the primitive services
20+
* @param threadContext the thread context
21+
* @return a new primitive builder
22+
*/
23+
B newBuilder(String primitiveName, Channel serviceChannel, ThreadContext threadContext);
24+
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// SPDX-FileCopyrightText: 2022-present Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package io.atomix.client;
6+
7+
/**
8+
* Synchronous primitive.
9+
*/
10+
public interface SyncPrimitive extends DistributedPrimitive {
11+
12+
/**
13+
* Purges state associated with this primitive.
14+
* <p>
15+
* Implementations can override and provide appropriate clean up logic for purging
16+
* any state associated with the primitive. Whether modifications made within the
17+
* destroy method have local or global visibility is left unspecified.
18+
*/
19+
default void destroy() {
20+
}
21+
22+
/**
23+
* Closes the primitive.
24+
*/
25+
void close();
26+
27+
/**
28+
* Returns the underlying asynchronous primitive.
29+
*
30+
* @return the underlying asynchronous primitive
31+
*/
32+
AsyncPrimitive async();
33+
34+
}

0 commit comments

Comments
 (0)