Skip to content

server: Refactor stream observer #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jul 25, 2019
Merged
Prev Previous commit
Next Next commit
use field updater instead of atomicboolean
Signed-off-by: sschepens <[email protected]>
Signed-off-by: Sebastian Schepens <[email protected]>
  • Loading branch information
sschepens committed Jul 12, 2019
commit 2824ec5eea6a8b0d5988f65ccf39b96c051176d6
17 changes: 9 additions & 8 deletions cache/src/main/java/io/envoyproxy/controlplane/cache/Watch.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
package io.envoyproxy.controlplane.cache;

import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import java.util.concurrent.atomic.AtomicBoolean;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;

/**
* {@code Watch} is a dedicated stream of configuration resources produced by the configuration cache and consumed by
* the xDS server.
*/
public class Watch {

private static final AtomicIntegerFieldUpdater<Watch> isCancelledUpdater =
AtomicIntegerFieldUpdater.newUpdater(Watch.class, "isCancelled");
private final boolean ads;
private final AtomicBoolean isCancelled = new AtomicBoolean();
private final DiscoveryRequest request;
private final Consumer<Response> responseConsumer;

private volatile int isCancelled = 0;
private Runnable stop;

/**
* Construct a watch.
*
* @param ads is this watch for an ADS request?
* @param request the original request for the watch
* @param ads is this watch for an ADS request?
* @param request the original request for the watch
* @param responseConsumer handler for outgoing response messages
*/
public Watch(boolean ads, DiscoveryRequest request, Consumer<Response> responseConsumer) {
Expand All @@ -42,7 +43,7 @@ public boolean ads() {
* may be called multiple times, with each subsequent call being a no-op.
*/
public void cancel() {
if (isCancelled.compareAndSet(false, true)) {
if (isCancelledUpdater.compareAndSet(this, 0, 1)) {
if (stop != null) {
stop.run();
}
Expand All @@ -53,7 +54,7 @@ public void cancel() {
* Returns boolean indicating whether or not the watch has been cancelled.
*/
public boolean isCancelled() {
return isCancelled.get();
return isCancelledUpdater.get(this) == 1;
}

/**
Expand Down