Skip to content

Commit d56f18c

Browse files
author
Tim Fox
committed
Refactoring around binders, logs, etc
1 parent 94b76b1 commit d56f18c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1467
-1091
lines changed

src/example/java/com/tesco/mewbase/example/mewblet/ShoppingBasketExample.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package com.tesco.mewbase.example.mewblet;
22

33
import com.tesco.mewbase.bson.BsonObject;
4-
import com.tesco.mewbase.bson.BsonPath;
54
import com.tesco.mewbase.client.Client;
65
import com.tesco.mewbase.client.ClientOptions;
7-
import com.tesco.mewbase.server.MewAdmin;
8-
import com.tesco.mewbase.server.Mewblet;
96
import com.tesco.mewbase.server.Server;
107
import com.tesco.mewbase.server.ServerOptions;
118

src/main/java/com/tesco/mewbase/client/Client.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.tesco.mewbase.client;
22

3+
import com.tesco.mewbase.bson.BsonArray;
34
import com.tesco.mewbase.bson.BsonObject;
45
import com.tesco.mewbase.client.spi.ClientFactory;
56
import com.tesco.mewbase.common.SubDescriptor;
@@ -28,15 +29,23 @@ static Client newClient(Vertx vertx, ClientOptions options) {
2829
int ERR_AUTHENTICATION_FAILED = 1;
2930
int ERR_NOT_AUTHORISED = 2;
3031
int ERR_NO_SUCH_CHANNEL = 3;
31-
int ERR_FAILED_TO_PERSIST = 4;
32+
int ERR_NO_SUCH_BINDER = 4;
33+
34+
int ERR_SERVER_ERROR = 100;
35+
3236

3337
ClientFactory factory = ServiceHelper.loadFactory(ClientFactory.class);
3438

39+
// Query operations
40+
3541
CompletableFuture<BsonObject> findByID(String binderName, String id);
3642

43+
// TODO use Reactive streams for this instead
3744
void findMatching(String binderName, BsonObject matcher,
3845
Consumer<QueryResult> resultHandler, Consumer<Throwable> exceptionHandler);
3946

47+
// Pub/sub operations
48+
4049
CompletableFuture<Subscription> subscribe(SubDescriptor subDescriptor, Consumer<ClientDelivery> handler);
4150

4251
Producer createProducer(String channel);
@@ -45,6 +54,16 @@ void findMatching(String binderName, BsonObject matcher,
4554

4655
CompletableFuture<Void> publish(String channel, BsonObject event, Function<BsonObject, String> partitionFunc);
4756

57+
// Admin operations
58+
59+
CompletableFuture<BsonArray> listBinders();
60+
61+
CompletableFuture<Boolean> createBinder(String binderName);
62+
63+
CompletableFuture<BsonArray> listChannels();
64+
65+
CompletableFuture<Boolean> createChannel(String binderName);
66+
4867
CompletableFuture<Void> close();
4968

5069
}

src/main/java/com/tesco/mewbase/client/impl/ClientFrameHandler.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,24 @@ default void handleQuery(BsonObject frame) {
6262
default void handleQueryAck(BsonObject frame) {
6363
throw new UnsupportedOperationException();
6464
}
65+
66+
@Override
67+
default void handleListBinders(BsonObject frame) {
68+
throw new UnsupportedOperationException();
69+
}
70+
71+
@Override
72+
default void handleCreateBinder(BsonObject frame) {
73+
throw new UnsupportedOperationException();
74+
}
75+
76+
@Override
77+
default void handleListChannels(BsonObject frame) {
78+
throw new UnsupportedOperationException();
79+
}
80+
81+
@Override
82+
default void handleCreateChannel(BsonObject frame) {
83+
throw new UnsupportedOperationException();
84+
}
6585
}

src/main/java/com/tesco/mewbase/client/impl/ClientImpl.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.tesco.mewbase.client.impl;
22

3+
import com.tesco.mewbase.bson.BsonArray;
34
import com.tesco.mewbase.bson.BsonObject;
45
import com.tesco.mewbase.client.*;
56
import com.tesco.mewbase.common.SubDescriptor;
@@ -145,6 +146,74 @@ public void findMatching(String binderName, BsonObject matcher, Consumer<QueryRe
145146
writeQuery(frame, resultHandler, cf);
146147
}
147148

149+
// Admin operations
150+
151+
@Override
152+
public CompletableFuture<BsonArray> listBinders() {
153+
CompletableFuture<BsonArray> cf = new CompletableFuture<>();
154+
BsonObject frame = new BsonObject();
155+
write(cf, Protocol.LIST_BINDERS_FRAME, frame, resp -> {
156+
boolean ok = resp.getBoolean(Protocol.RESPONSE_OK);
157+
if (ok) {
158+
BsonArray binders = resp.getBsonArray(Protocol.LISTBINDERS_BINDERS);
159+
cf.complete(binders);
160+
} else {
161+
cf.completeExceptionally(responseToException(resp));
162+
}
163+
});
164+
return cf;
165+
}
166+
167+
@Override
168+
public CompletableFuture<Boolean> createBinder(String binderName) {
169+
CompletableFuture<Boolean> cf = new CompletableFuture<>();
170+
BsonObject frame = new BsonObject();
171+
frame.put(Protocol.CREATEBINDER_NAME, binderName);
172+
write(cf, Protocol.CREATE_BINDER_FRAME, frame, resp -> {
173+
boolean ok = resp.getBoolean(Protocol.RESPONSE_OK);
174+
boolean exists = resp.getBoolean(Protocol.CREATEBINDER_RESPONSE_EXISTS);
175+
if (ok) {
176+
cf.complete(!exists);
177+
} else {
178+
cf.completeExceptionally(responseToException(resp));
179+
}
180+
});
181+
return cf;
182+
}
183+
184+
@Override
185+
public CompletableFuture<BsonArray> listChannels() {
186+
CompletableFuture<BsonArray> cf = new CompletableFuture<>();
187+
BsonObject frame = new BsonObject();
188+
write(cf, Protocol.LIST_CHANNELS_FRAME, frame, resp -> {
189+
boolean ok = resp.getBoolean(Protocol.RESPONSE_OK);
190+
if (ok) {
191+
BsonArray channels = resp.getBsonArray(Protocol.LISTCHANNELS_CHANNELS);
192+
cf.complete(channels);
193+
} else {
194+
cf.completeExceptionally(responseToException(resp));
195+
}
196+
});
197+
return cf;
198+
}
199+
200+
@Override
201+
public CompletableFuture<Boolean> createChannel(String channelName) {
202+
CompletableFuture<Boolean> cf = new CompletableFuture<>();
203+
BsonObject frame = new BsonObject();
204+
frame.put(Protocol.CREATECHANNEL_NAME, channelName);
205+
write(cf, Protocol.CREATE_CHANNEL_FRAME, frame, resp -> {
206+
boolean ok = resp.getBoolean(Protocol.RESPONSE_OK);
207+
boolean exists = resp.getBoolean(Protocol.CREATECHANNEL_RESPONSE_EXISTS);
208+
if (ok) {
209+
cf.complete(!exists);
210+
} else {
211+
cf.completeExceptionally(responseToException(resp));
212+
}
213+
});
214+
return cf;
215+
}
216+
148217
@Override
149218
public CompletableFuture<Void> close() {
150219
netClient.close();
@@ -166,6 +235,8 @@ public void handleQueryResult(int size, BsonObject resp) {
166235
if (qrh == null) {
167236
throw new IllegalStateException("Can't find query result handler");
168237
}
238+
boolean ok = resp.getBoolean(Protocol.QUERYRESULT_OK);
239+
169240
boolean last = resp.getBoolean(Protocol.QUERYRESULT_LAST);
170241
QueryResult qr = new QueryResultImpl(resp.getBsonObject(Protocol.QUERYRESULT_RESULT), size, last, rQueryID);
171242
try {

src/main/java/com/tesco/mewbase/common/FrameHandler.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,15 @@ public interface FrameHandler {
3838
void handleQueryAck(BsonObject frame);
3939

4040
void handlePing(BsonObject frame);
41+
42+
43+
void handleListBinders(BsonObject frame);
44+
45+
void handleCreateBinder(BsonObject frame);
46+
47+
void handleListChannels(BsonObject frame);
48+
49+
void handleCreateChannel(BsonObject frame);
50+
51+
4152
}

src/main/java/com/tesco/mewbase/doc/DocManager.java

Lines changed: 0 additions & 60 deletions
This file was deleted.
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package com.tesco.mewbase.doc.impl.lmdb;
2+
3+
import com.tesco.mewbase.bson.BsonObject;
4+
import com.tesco.mewbase.server.Binder;
5+
import com.tesco.mewbase.server.DocReadStream;
6+
import com.tesco.mewbase.util.AsyncResCF;
7+
import io.vertx.core.buffer.Buffer;
8+
import org.fusesource.lmdbjni.Database;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.nio.charset.StandardCharsets;
13+
import java.util.concurrent.CompletableFuture;
14+
import java.util.function.Function;
15+
16+
/**
17+
* Created by tim on 29/12/16.
18+
*/
19+
public class LmdbBinder implements Binder {
20+
21+
private final static Logger logger = LoggerFactory.getLogger(LmdbBinder.class);
22+
23+
private final LmdbBinderFactory binderFactory;
24+
private final String name;
25+
private Database db;
26+
private AsyncResCF<Void> startRes;
27+
28+
public LmdbBinder(LmdbBinderFactory binderFactory, String name) {
29+
this.binderFactory = binderFactory;
30+
this.name = name;
31+
}
32+
33+
@Override
34+
public DocReadStream getMatching(Function<BsonObject, Boolean> matcher) {
35+
return new LmdbReadStream(binderFactory, db, matcher);
36+
}
37+
38+
@Override
39+
public CompletableFuture<BsonObject> get(String id) {
40+
logger.trace("Callling get on binder: " + this);
41+
AsyncResCF<BsonObject> res = new AsyncResCF<>();
42+
binderFactory.getExec().executeBlocking(fut -> {
43+
byte[] key = getKey(id);
44+
byte[] val = db.get(key);
45+
if (val != null) {
46+
BsonObject obj = new BsonObject(Buffer.buffer(val));
47+
fut.complete(obj);
48+
} else {
49+
fut.complete(null);
50+
}
51+
}, res);
52+
return res;
53+
}
54+
55+
@Override
56+
public CompletableFuture<Void> put(String id, BsonObject doc) {
57+
AsyncResCF<Void> res = new AsyncResCF<>();
58+
binderFactory.getExec().executeBlocking(fut -> {
59+
byte[] key = getKey(id);
60+
byte[] val = doc.encode().getBytes();
61+
db.put(key, val);
62+
fut.complete(null);
63+
}, res);
64+
return res;
65+
}
66+
67+
@Override
68+
public CompletableFuture<Boolean> delete(String id) {
69+
AsyncResCF<Boolean> res = new AsyncResCF<>();
70+
binderFactory.getExec().executeBlocking(fut -> {
71+
byte[] key = getKey(id);
72+
boolean deleted = db.delete(key);
73+
fut.complete(deleted);
74+
}, res);
75+
return res;
76+
}
77+
78+
@Override
79+
public CompletableFuture<Void> close() {
80+
AsyncResCF<Void> res = new AsyncResCF<>();
81+
binderFactory.getExec().executeBlocking(fut -> {
82+
db.close();
83+
binderFactory.getEnv().sync(true);
84+
fut.complete(null);
85+
}, res);
86+
return res;
87+
}
88+
89+
@Override
90+
public synchronized CompletableFuture<Void> start() {
91+
// Deals with race where start is called before previous start is complete
92+
// TODO test this!
93+
if (startRes == null) {
94+
startRes = new AsyncResCF<>();
95+
binderFactory.getExec().executeBlocking(fut -> {
96+
logger.trace("Opening lmdb database " + name);
97+
db = binderFactory.getEnv().openDatabase(name);
98+
logger.trace("Opened lmdb database " + name);
99+
fut.complete(null);
100+
}, startRes);
101+
}
102+
return startRes;
103+
}
104+
105+
@Override
106+
public String getName() {
107+
return name;
108+
}
109+
110+
private byte[] getKey(String id) {
111+
// TODO probably a better way to do this
112+
return id.getBytes(StandardCharsets.UTF_8);
113+
}
114+
115+
}

0 commit comments

Comments
 (0)