2
2
#include " Command.h"
3
3
#include " Response.h"
4
4
5
- #define MAX_BUFFER_SIZE 536870999 // redis response limit
5
+ #define MAX_BUFFER_SIZE 536800 // response part limit
6
6
7
7
RedisConnectionOverSsh::RedisConnectionOverSsh (const RedisConnectionConfig &c)
8
- : RedisConnectionAbstract(c), socket(nullptr ), isHostKeyAlreadyAdded(false ), socketConnected(false )
8
+ : RedisConnectionAbstract(c), socket(nullptr ), sshClient( nullptr ), isHostKeyAlreadyAdded(false ), socketConnected(false )
9
9
{
10
- syncTimer.setSingleShot (true );
11
10
12
- QObject::connect (&syncTimer, SIGNAL (timeout ()), &syncLoop, SLOT (quit ()));
13
- QObject::connect (&sshClient, SIGNAL (connected ()), this , SLOT (OnSshConnected ()));
11
+ }
12
+
13
+ void RedisConnectionOverSsh::init ()
14
+ {
15
+ if (sshClient != nullptr ) {
16
+ return ;
17
+ }
18
+
19
+ RedisConnectionAbstract::init ();
20
+
21
+ sshClient = new QxtSshClient;
22
+ syncLoop = new QEventLoop;
23
+ syncTimer = new QTimer;
24
+
25
+ syncTimer->setSingleShot (true );
26
+
27
+ QObject::connect (syncTimer, SIGNAL (timeout ()), syncLoop, SLOT (quit ()));
28
+ QObject::connect (sshClient, SIGNAL (connected ()), this , SLOT (OnSshConnected ()));
14
29
15
30
QObject::connect (
16
- & sshClient, SIGNAL (error (QxtSshClient::Error)),
31
+ sshClient, SIGNAL (error (QxtSshClient::Error)),
17
32
this , SLOT (OnSshConnectionError (QxtSshClient::Error))
18
33
);
19
34
}
@@ -22,26 +37,31 @@ RedisConnectionOverSsh::~RedisConnectionOverSsh(void)
22
37
{
23
38
if (socket != nullptr ) {
24
39
delete socket;
40
+ delete syncLoop;
41
+ delete syncTimer;
42
+ delete sshClient;
25
43
}
26
44
}
27
45
28
46
bool RedisConnectionOverSsh::connect ()
29
47
{
48
+ init ();
49
+
30
50
// set password
31
- sshClient. setPassphrase (config.sshPassword );
51
+ sshClient-> setPassphrase (config.sshPassword );
32
52
33
53
// connect to ssh server
34
- syncTimer. start (config.connectionTimeout );
35
- sshClient. connectToHost (config.sshUser , config.sshHost , config.sshPort );
36
- syncLoop. exec ();
54
+ syncTimer-> start (config.connectionTimeout );
55
+ sshClient-> connectToHost (config.sshUser , config.sshHost , config.sshPort );
56
+ syncLoop-> exec ();
37
57
38
- if (!connected && !syncTimer. isActive ()) {
58
+ if (!connected && !syncTimer-> isActive ()) {
39
59
connected = false ;
40
60
return connected;
41
61
}
42
62
43
63
// connect to redis
44
- socket = sshClient. openTcpSocket (config.host , config.port );
64
+ socket = sshClient-> openTcpSocket (config.host , config.port );
45
65
46
66
if (socket == NULL ) {
47
67
socketConnected = false ;
@@ -50,10 +70,10 @@ bool RedisConnectionOverSsh::connect()
50
70
51
71
QObject::connect (socket, SIGNAL (readyRead ()), this , SLOT (OnSocketReadyRead ()));
52
72
53
- syncTimer. start (config.connectionTimeout );
54
- syncLoop. exec ();
73
+ syncTimer-> start (config.connectionTimeout );
74
+ syncLoop-> exec ();
55
75
56
- if (!socketConnected && !syncTimer. isActive ()) {
76
+ if (!socketConnected && !syncTimer-> isActive ()) {
57
77
socketConnected = false ;
58
78
return socketConnected;
59
79
}
@@ -68,20 +88,20 @@ bool RedisConnectionOverSsh::connect()
68
88
void RedisConnectionOverSsh::OnSshConnectionError (QxtSshClient::Error error)
69
89
{
70
90
if (QxtSshClient::HostKeyUnknownError == error) {
71
- QxtSshKey hostKey = sshClient. hostKey ();
91
+ QxtSshKey hostKey = sshClient-> hostKey ();
72
92
73
- sshClient. addKnownHost (config.sshHost , hostKey);
93
+ sshClient-> addKnownHost (config.sshHost , hostKey);
74
94
75
- sshClient. resetState ();
95
+ sshClient-> resetState ();
76
96
77
- sshClient. connectToHost (config.sshUser , config.sshHost , config.sshPort );
97
+ sshClient-> connectToHost (config.sshUser , config.sshHost , config.sshPort );
78
98
79
99
isHostKeyAlreadyAdded = true ;
80
100
return ;
81
101
}
82
102
83
- if (syncLoop. isRunning ()) {
84
- syncLoop. exit ();
103
+ if (syncLoop-> isRunning ()) {
104
+ syncLoop-> exit ();
85
105
}
86
106
87
107
}
@@ -90,8 +110,8 @@ void RedisConnectionOverSsh::OnSshConnected()
90
110
{
91
111
connected = true ;
92
112
93
- if (syncLoop. isRunning ()) {
94
- syncLoop. exit ();
113
+ if (syncLoop-> isRunning ()) {
114
+ syncLoop-> exit ();
95
115
}
96
116
}
97
117
@@ -102,10 +122,29 @@ void RedisConnectionOverSsh::OnSocketReadyRead()
102
122
socketConnected = true ;
103
123
}
104
124
105
- if (syncLoop.isRunning ()) {
106
- syncLoop.exit ();
125
+ if (syncLoop->isRunning ()) {
126
+ syncLoop->exit ();
127
+ }
128
+
129
+ // ignore signals if running blocking version
130
+ if (!commandRunning) {
131
+ return ;
132
+ }
133
+
134
+ readingBuffer = socket->read (MAX_BUFFER_SIZE);
135
+
136
+ if (readingBuffer.size () == 0 ) {
137
+ return ;
107
138
}
108
139
140
+ executionTimer->stop ();
141
+ resp.appendToSource (readingBuffer);
142
+
143
+ if (resp.isValid ()) {
144
+ return sendResponse ();
145
+ } else {
146
+ executionTimer->start (config.executeTimeout ); // restart execution timer
147
+ }
109
148
}
110
149
111
150
@@ -136,10 +175,10 @@ QVariant RedisConnectionOverSsh::execute(QString command)
136
175
socket->write (cString, byteArray.size ());
137
176
138
177
// wait for ready read
139
- syncTimer. start (config.executeTimeout );
140
- syncLoop. exec ();
178
+ syncTimer-> start (config.executeTimeout );
179
+ syncLoop-> exec ();
141
180
142
- if (!syncTimer. isActive ()) {
181
+ if (!syncTimer-> isActive ()) {
143
182
return QVariant ();
144
183
}
145
184
@@ -189,9 +228,31 @@ QVariant RedisConnectionOverSsh::execute(QString command)
189
228
return response.getValue ();
190
229
}
191
230
192
- void RedisConnectionOverSsh::runCommand (const Command &cmd )
231
+ void RedisConnectionOverSsh::runCommand (const Command &command )
193
232
{
194
233
// todo: implement this
234
+ if (command.hasDbIndex ()) {
235
+ selectDb (command.getDbIndex ());
236
+ }
237
+
238
+ resp.clear ();
239
+ commandRunning = true ;
240
+ runningCommand = command;
241
+ executionTimer->start (config.executeTimeout );
242
+
243
+ if (command.isEmpty ()) {
244
+ return sendResponse ();
245
+ }
246
+
247
+ QString formattedCommand = command.getFormattedString ();
248
+
249
+ /*
250
+ * Send command
251
+ */
252
+ QByteArray byteArray = formattedCommand.toUtf8 ();
253
+ const char * cString = byteArray.constData ();
254
+
255
+ socket->write (cString, byteArray.size ());
195
256
}
196
257
197
258
bool RedisConnectionOverSsh::waitForData (int ms)
0 commit comments