Skip to content

Commit e59006e

Browse files
committed
Revert "Atomic processing of create/remove of keepalive thread"
This reverts commit 4680cb8 (caused shyiko#321).
1 parent d466f72 commit e59006e

File tree

1 file changed

+42
-58
lines changed

1 file changed

+42
-58
lines changed

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

Lines changed: 42 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ public X509Certificate[] getAcceptedIssuers() {
164164
private volatile ExecutorService keepAliveThreadExecutor;
165165

166166
private final Lock connectLock = new ReentrantLock();
167-
private final Lock keepAliveThreadExecutorLock = new ReentrantLock();
168167

169168
/**
170169
* Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password).
@@ -771,51 +770,46 @@ public Thread newThread(Runnable runnable) {
771770
return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port);
772771
}
773772
});
774-
try {
775-
keepAliveThreadExecutorLock.lock();
776-
threadExecutor.submit(new Runnable() {
777-
@Override
778-
public void run() {
779-
while (!threadExecutor.isShutdown()) {
773+
threadExecutor.submit(new Runnable() {
774+
@Override
775+
public void run() {
776+
while (!threadExecutor.isShutdown()) {
777+
try {
778+
Thread.sleep(keepAliveInterval);
779+
} catch (InterruptedException e) {
780+
// expected in case of disconnect
781+
}
782+
if (threadExecutor.isShutdown()) {
783+
return;
784+
}
785+
boolean connectionLost = false;
786+
if (heartbeatInterval > 0) {
787+
connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval;
788+
} else {
780789
try {
781-
Thread.sleep(keepAliveInterval);
782-
} catch (InterruptedException e) {
783-
// expected in case of disconnect
790+
channel.write(new PingCommand());
791+
} catch (IOException e) {
792+
connectionLost = true;
784793
}
785-
if (threadExecutor.isShutdown()) {
786-
return;
787-
}
788-
boolean connectionLost = false;
789-
if (heartbeatInterval > 0) {
790-
connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval;
791-
} else {
792-
try {
793-
channel.write(new PingCommand());
794-
} catch (IOException e) {
795-
connectionLost = true;
796-
}
794+
}
795+
if (connectionLost) {
796+
if (logger.isLoggable(Level.INFO)) {
797+
logger.info("Trying to restore lost connection to " + hostname + ":" + port);
797798
}
798-
if (connectionLost) {
799-
if (logger.isLoggable(Level.INFO)) {
800-
logger.info("Trying to restore lost connection to " + hostname + ":" + port);
801-
}
802-
try {
803-
terminateConnect();
804-
connect(connectTimeout);
805-
} catch (Exception ce) {
806-
if (logger.isLoggable(Level.WARNING)) {
807-
logger.warning("Failed to restore connection to " + hostname + ":" + port +
808-
". Next attempt in " + keepAliveInterval + "ms");
809-
}
799+
try {
800+
terminateConnect();
801+
connect(connectTimeout);
802+
} catch (Exception ce) {
803+
if (logger.isLoggable(Level.WARNING)) {
804+
logger.warning("Failed to restore connection to " + hostname + ":" + port +
805+
". Next attempt in " + keepAliveInterval + "ms");
810806
}
811807
}
812808
}
813809
}
814-
});
815-
keepAliveThreadExecutor = threadExecutor;
816-
} finally {
817-
keepAliveThreadExecutorLock.unlock();
818-
}
810+
}
811+
});
812+
keepAliveThreadExecutor = threadExecutor;
819813
}
820814

821815
private Thread newNamedThread(Runnable runnable, String threadName) {
@@ -825,12 +819,7 @@ private Thread newNamedThread(Runnable runnable, String threadName) {
825819
}
826820

827821
boolean isKeepAliveThreadRunning() {
828-
try {
829-
keepAliveThreadExecutorLock.lock();
830-
return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown();
831-
} finally {
832-
keepAliveThreadExecutorLock.unlock();
833-
}
822+
return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown();
834823
}
835824

836825
/**
@@ -1180,19 +1169,14 @@ public void disconnect() throws IOException {
11801169
}
11811170

11821171
private void terminateKeepAliveThread() {
1183-
try {
1184-
keepAliveThreadExecutorLock.lock();
1185-
ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor;
1186-
if (keepAliveThreadExecutor == null) {
1187-
return;
1188-
}
1189-
keepAliveThreadExecutor.shutdownNow();
1190-
while (!awaitTerminationInterruptibly(keepAliveThreadExecutor,
1191-
Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
1192-
// ignore
1193-
}
1194-
} finally {
1195-
keepAliveThreadExecutorLock.unlock();
1172+
ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor;
1173+
if (keepAliveThreadExecutor == null) {
1174+
return;
1175+
}
1176+
keepAliveThreadExecutor.shutdownNow();
1177+
while (!awaitTerminationInterruptibly(keepAliveThreadExecutor,
1178+
Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
1179+
// ignore
11961180
}
11971181
}
11981182

0 commit comments

Comments
 (0)