Skip to content

Commit f3d1541

Browse files
committed
* Added reconnection for mqtt client on lost connection and some
another error cases * Changed odd logic of generation clientRef for Android * Added recover subscriptions to topic after reconnect
1 parent 4f9476b commit f3d1541

File tree

5 files changed

+155
-76
lines changed

5 files changed

+155
-76
lines changed

android/android.iml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@
9393
<excludeFolder url="file://$MODULE_DIR$/build/intermediates/exploded-aar/com.facebook.fresco/fresco/0.8.1/jars" />
9494
<excludeFolder url="file://$MODULE_DIR$/build/intermediates/exploded-aar/com.facebook.fresco/imagepipeline-okhttp/0.8.1/jars" />
9595
<excludeFolder url="file://$MODULE_DIR$/build/intermediates/exploded-aar/com.facebook.fresco/imagepipeline/0.8.1/jars" />
96-
<excludeFolder url="file://$MODULE_DIR$/build/intermediates/exploded-aar/com.facebook.react/react-native/0.19.1/jars" />
96+
<excludeFolder url="file://$MODULE_DIR$/build/intermediates/exploded-aar/com.facebook.react/react-native/0.20.1/jars" />
9797
<excludeFolder url="file://$MODULE_DIR$/build/intermediates/exploded-aar/org.webkit/android-jsc/r174650/jars" />
9898
<excludeFolder url="file://$MODULE_DIR$/build/intermediates/incremental" />
9999
<excludeFolder url="file://$MODULE_DIR$/build/intermediates/lint" />
@@ -111,7 +111,6 @@
111111
<orderEntry type="library" exported="" name="okio-1.6.0" level="project" />
112112
<orderEntry type="library" exported="" name="stetho-okhttp-1.2.0" level="project" />
113113
<orderEntry type="library" exported="" name="okhttp-2.5.0" level="project" />
114-
<orderEntry type="library" exported="" name="react-native-0.19.1" level="project" />
115114
<orderEntry type="library" exported="" name="jsr305-3.0.0" level="project" />
116115
<orderEntry type="library" exported="" name="stetho-1.2.0" level="project" />
117116
<orderEntry type="library" exported="" name="jackson-core-2.2.3" level="project" />
@@ -127,6 +126,7 @@
127126
<orderEntry type="library" exported="" name="bolts-android-1.1.4" level="project" />
128127
<orderEntry type="library" exported="" name="support-v4-23.0.1" level="project" />
129128
<orderEntry type="library" exported="" name="drawee-0.8.1" level="project" />
129+
<orderEntry type="library" exported="" name="react-native-0.20.1" level="project" />
130130
<orderEntry type="library" exported="" name="appcompat-v7-23.0.1" level="project" />
131131
<orderEntry type="library" exported="" name="support-annotations-23.0.1" level="project" />
132132
</component>

android/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ repositories {
3333
}
3434

3535
dependencies {
36-
compile 'com.facebook.react:react-native:0.19.+'
36+
compile 'com.facebook.react:react-native:+'
3737
compile('org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2') {
3838
exclude module: 'support-v4'
3939
}

android/src/main/java/com/tuanpm/RCTMqtt/RCTMqtt.java

Lines changed: 111 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import com.facebook.react.bridge.ReadableMap;
1010
import com.facebook.react.bridge.WritableMap;
1111
import com.facebook.react.bridge.WritableNativeMap;
12-
import com.facebook.react.modules.core.DeviceEventManagerModule;
12+
import com.facebook.react.modules.core.RCTNativeAppEventEmitter;
1313

1414
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
1515
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -25,6 +25,14 @@
2525
import java.security.SecureRandom;
2626
import java.security.cert.CertificateException;
2727
import java.security.cert.X509Certificate;
28+
import java.util.HashMap;
29+
import java.util.Iterator;
30+
import java.util.Map;
31+
import java.util.Timer;
32+
import java.util.TimerTask;
33+
import java.util.UUID;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.atomic.AtomicBoolean;
2836

2937
import javax.annotation.Nullable;
3038
import javax.net.ssl.SSLContext;
@@ -35,20 +43,20 @@ public class RCTMqtt
3543
implements MqttCallback
3644
{
3745
private static final String TAG = "RCTMqttModule";
38-
private final ReactApplicationContext _reactContext;
46+
private final ReactApplicationContext reactContext;
3947
private final WritableMap defaultOptions;
40-
private final int clientRef;
41-
MqttAsyncClient client;
42-
MemoryPersistence memPer;
43-
MqttConnectOptions mqttoptions;
48+
private final String clientRef;
49+
private MqttAsyncClient client;
50+
private MemoryPersistence memPer;
51+
private MqttConnectOptions mqttOptions;
52+
private Map<String, Integer> topics = new HashMap<>();
4453

45-
46-
public RCTMqtt(final int ref,
54+
public RCTMqtt(@NonNull final String ref,
4755
final ReactApplicationContext reactContext,
4856
final ReadableMap options)
4957
{
5058
clientRef = ref;
51-
_reactContext = reactContext;
59+
this.reactContext = reactContext;
5260
defaultOptions = new WritableNativeMap();
5361
defaultOptions.putString("host", "localhost");
5462
defaultOptions.putInt("port", 1883);
@@ -63,7 +71,6 @@ public RCTMqtt(final int ref,
6371
defaultOptions.putString("pass", "");
6472
defaultOptions.putBoolean("will", false);
6573
defaultOptions.putInt("protocolLevel", 4);
66-
defaultOptions.putBoolean("will", false);
6774
defaultOptions.putString("willMsg", "");
6875
defaultOptions.putString("willtopic", "");
6976
defaultOptions.putInt("willQos", 0);
@@ -153,14 +160,14 @@ private void createClient(@NonNull final ReadableMap params)
153160
// Set this wrapper as the callback handler
154161

155162

156-
mqttoptions = new MqttConnectOptions();
163+
mqttOptions = new MqttConnectOptions();
157164

158165
if (options.getInt("protocolLevel") == 3)
159166
{
160-
mqttoptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
167+
mqttOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
161168
}
162169

163-
mqttoptions.setKeepAliveInterval(options.getInt("keepalive"));
170+
mqttOptions.setKeepAliveInterval(options.getInt("keepalive"));
164171

165172
StringBuilder uri = new StringBuilder("tcp://");
166173
if (options.getBoolean("tls"))
@@ -197,7 +204,7 @@ public X509Certificate[] getAcceptedIssuers()
197204
}
198205
}}, new SecureRandom());
199206

200-
mqttoptions.setSocketFactory(sslContext.getSocketFactory());
207+
mqttOptions.setSocketFactory(sslContext.getSocketFactory());
201208
}
202209
catch (Exception e)
203210
{
@@ -213,11 +220,11 @@ public X509Certificate[] getAcceptedIssuers()
213220
String pass = options.getString("pass");
214221
if (user.length() > 0)
215222
{
216-
mqttoptions.setUserName(user);
223+
mqttOptions.setUserName(user);
217224
}
218225
if (pass.length() > 0)
219226
{
220-
mqttoptions.setPassword(pass.toCharArray());
227+
mqttOptions.setPassword(pass.toCharArray());
221228
}
222229
}
223230

@@ -244,37 +251,44 @@ public void setCallback()
244251
}
245252

246253

247-
private void sendEvent(final ReactContext reactContext,
248-
final String eventName,
249-
@Nullable WritableMap params)
250-
{
251-
params.putInt("clientRef", this.clientRef);
252-
reactContext.getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter.class).emit(eventName, params);
253-
}
254-
255254
public void connect()
256255
{
257256
try
258257
{
258+
WritableMap params = Arguments.createMap();
259+
params.putString("event", "connecting");
260+
params.putString("message", "try to connect");
261+
sendEvent(reactContext, "mqtt_events", params);
262+
259263
// Connect using a non-blocking connect
260-
client.connect(mqttoptions, _reactContext, new IMqttActionListener()
264+
client.connect(mqttOptions, reactContext, new IMqttActionListener()
261265
{
262266
public void onSuccess(IMqttToken asyncActionToken)
263267
{
264268
WritableMap params = Arguments.createMap();
265269
params.putString("event", "connect");
266270
params.putString("message", "connected");
267-
sendEvent(_reactContext, "mqtt_events", params);
271+
sendEvent(reactContext, "mqtt_events", params);
268272
log("Connected");
273+
274+
Iterator<String> iterator = topics.keySet().iterator();
275+
while(iterator.hasNext())
276+
{
277+
final String topic = iterator.next();
278+
subscribe(topic, topics.get(topic));
279+
}
269280
}
270281

271282
public void onFailure(IMqttToken asyncActionToken,
272283
Throwable exception)
273284
{
274285
WritableMap params = Arguments.createMap();
275286
params.putString("event", "error");
276-
params.putString("message", "connection failure");
277-
sendEvent(_reactContext, "mqtt_events", params);
287+
final String errorDescription = new StringBuilder("connection failure ")
288+
.append(exception).toString();
289+
params.putString("message", errorDescription);
290+
sendEvent(reactContext, "mqtt_events", params);
291+
reconnectIfNeeded(exception);
278292
}
279293
});
280294
}
@@ -283,7 +297,7 @@ public void onFailure(IMqttToken asyncActionToken,
283297
WritableMap params = Arguments.createMap();
284298
params.putString("event", "error");
285299
params.putString("message", "Can't create connection");
286-
sendEvent(_reactContext, "mqtt_events", params);
300+
sendEvent(reactContext, "mqtt_events", params);
287301
}
288302
}
289303

@@ -297,7 +311,7 @@ public void onSuccess(IMqttToken asyncActionToken)
297311
WritableMap params = Arguments.createMap();
298312
params.putString("event", "closed");
299313
params.putString("message", "Disconnect");
300-
sendEvent(_reactContext, "mqtt_events", params);
314+
sendEvent(reactContext, "mqtt_events", params);
301315
}
302316

303317
public void onFailure(IMqttToken asyncActionToken,
@@ -309,7 +323,7 @@ public void onFailure(IMqttToken asyncActionToken,
309323

310324
try
311325
{
312-
client.disconnect(_reactContext, discListener);
326+
client.disconnect(reactContext, discListener);
313327
}
314328
catch (MqttException e)
315329
{
@@ -322,6 +336,7 @@ public void subscribe(@NonNull final String topic,
322336
{
323337
try
324338
{
339+
topics.put(topic, qos);
325340
IMqttToken subToken = client.subscribe(topic, qos);
326341
subToken.setActionCallback(new IMqttActionListener()
327342
{
@@ -353,6 +368,9 @@ public void unsubscribe(@NonNull final String topic)
353368
{
354369
try
355370
{
371+
if (topics.containsKey(topic)) {
372+
topics.remove(topic);
373+
}
356374
client.unsubscribe(topic).setActionCallback(new IMqttActionListener()
357375
{
358376
@Override
@@ -415,9 +433,12 @@ public void connectionLost(Throwable cause)
415433
// logic at this point. This sample simply exits.
416434
log(new StringBuilder("Connection to lost! ").append(cause).toString());
417435
WritableMap params = Arguments.createMap();
418-
params.putString("event", "closed");
419-
params.putString("message", "Connection to lost!");
420-
sendEvent(_reactContext, "mqtt_events", params);
436+
params.putString("event", "error");
437+
final String errorDescription = new StringBuilder("Connection to lost! ")
438+
.append(cause).toString();
439+
params.putString("message", errorDescription);
440+
sendEvent(reactContext, "mqtt_events", params);
441+
reconnectIfNeeded(cause);
421442
}
422443

423444
/**
@@ -463,15 +484,69 @@ public void messageArrived(@NonNull final String topic,
463484
WritableMap params = Arguments.createMap();
464485
params.putString("event", "message");
465486
params.putMap("message", data);
466-
sendEvent(_reactContext, "mqtt_events", params);
487+
sendEvent(reactContext, "mqtt_events", params);
488+
}
489+
490+
private void sendEvent(final ReactContext reactContext,
491+
final String eventName,
492+
@Nullable WritableMap params)
493+
{
494+
params.putString("clientRef", this.clientRef);
495+
reactContext.getJSModule(RCTNativeAppEventEmitter.class).emit(eventName, params);
496+
}
497+
498+
private boolean needToReconnect(@NonNull final MqttException exception)
499+
{
500+
int reasonCode = exception.getReasonCode();
501+
return reasonCode == MqttException.REASON_CODE_SERVER_CONNECT_ERROR ||
502+
reasonCode == MqttException.REASON_CODE_CLIENT_EXCEPTION ||
503+
reasonCode == MqttException.REASON_CODE_CONNECTION_LOST;
504+
}
505+
506+
private void reconnectIfNeeded(@NonNull final Throwable cause)
507+
{
508+
if (!(cause instanceof MqttException))
509+
{
510+
final String notMqttExceptionError = new StringBuilder("Not MqttException ")
511+
.append(cause).toString();
512+
log(notMqttExceptionError);
513+
return;
514+
}
515+
516+
final MqttException mqttError = (MqttException) cause;
517+
518+
if (!needToReconnect(mqttError))
519+
{
520+
final String noNeedToReconnect = new StringBuilder("No need to reconnect ")
521+
.append(mqttError.getReasonCode()).toString();
522+
log(noNeedToReconnect);
523+
return;
524+
}
525+
526+
final String timerName = new StringBuilder("reconnect-")
527+
.append(UUID.randomUUID().toString())
528+
.toString();
529+
530+
final Timer timer = new Timer(timerName);
531+
timer.schedule(new TimerTask()
532+
{
533+
@Override
534+
public void run()
535+
{
536+
log("[ MQTT ] reconnect");
537+
connect();
538+
timer.cancel();
539+
timer.purge();
540+
}
541+
}, TimeUnit.SECONDS.toMillis(5));
467542
}
468543

469544
/**
470545
* Utility method to handle logging. If 'quietMode' is set, this method does nothing
471546
*
472547
* @param message the message to log
473548
*/
474-
void log(@NonNull final String message)
549+
private void log(@NonNull final String message)
475550
{
476551
if (!BuildConfig.DEBUG)
477552
{

0 commit comments

Comments
 (0)