4
4
import io .vertx .core .Future ;
5
5
import io .vertx .core .Vertx ;
6
6
import io .vertx .core .file .AsyncFile ;
7
+ import io .vertx .core .file .CopyOptions ;
7
8
import io .vertx .core .file .FileProps ;
8
9
import io .vertx .core .file .FileSystem ;
9
10
import io .vertx .core .file .FileSystemException ;
10
11
import io .vertx .core .file .OpenOptions ;
11
12
import io .vertx .core .http .HttpClient ;
12
- import io .vertx .core .http .HttpClientRequest ;
13
13
import io .vertx .core .http .HttpClientResponse ;
14
+ import io .vertx .core .http .HttpHeaders ;
14
15
import io .vertx .core .logging .Logger ;
15
16
import io .vertx .core .logging .LoggerFactory ;
16
17
import io .vertx .core .streams .Pump ;
18
+ import org .apache .commons .lang3 .StringUtils ;
17
19
import org .prebid .server .exception .PreBidException ;
18
20
import org .prebid .server .util .HttpUtil ;
19
21
@@ -32,38 +34,45 @@ public class RemoteFileSyncer {
32
34
33
35
private final String downloadUrl ; // url to resource to be downloaded
34
36
private final String saveFilePath ; // full path on file system where downloaded file located
37
+ private final String tmpFilePath ; // full path on file system where tmp file located
35
38
private final int retryCount ; // how many times try to download
36
39
private final long retryInterval ; // how long to wait between failed retries
37
40
private final long timeout ;
41
+ private final long updatePeriod ;
38
42
private final HttpClient httpClient ;
39
43
private final Vertx vertx ;
40
44
private final FileSystem fileSystem ;
41
45
42
- private RemoteFileSyncer (String downloadUrl , String saveFilePath , int retryCount ,
43
- long retryInterval , long timeout , HttpClient httpClient , Vertx vertx ,
46
+ private RemoteFileSyncer (String downloadUrl , String saveFilePath , String tmpFilePath , int retryCount ,
47
+ long retryInterval , long timeout , long updatePeriod , HttpClient httpClient , Vertx vertx ,
44
48
FileSystem fileSystem ) {
45
49
this .downloadUrl = downloadUrl ;
46
50
this .saveFilePath = saveFilePath ;
51
+ this .tmpFilePath = tmpFilePath ;
47
52
this .retryCount = retryCount ;
48
53
this .retryInterval = retryInterval ;
49
54
this .timeout = timeout ;
55
+ this .updatePeriod = updatePeriod ;
50
56
this .httpClient = httpClient ;
51
57
this .vertx = vertx ;
52
58
this .fileSystem = fileSystem ;
53
59
}
54
60
55
- public static RemoteFileSyncer create (String downloadUrl , String saveFilePath , int retryCount , long retryInterval ,
56
- long timeout , HttpClient httpClient , Vertx vertx ) {
61
+ public static RemoteFileSyncer create (String downloadUrl , String saveFilePath , String tmpFilePath , int retryCount ,
62
+ long retryInterval , long timeout , long updatePeriod , HttpClient httpClient ,
63
+ Vertx vertx , FileSystem fileSystem ) {
57
64
HttpUtil .validateUrl (downloadUrl );
58
65
Objects .requireNonNull (saveFilePath );
66
+ Objects .requireNonNull (tmpFilePath );
59
67
Objects .requireNonNull (vertx );
60
68
Objects .requireNonNull (httpClient );
61
- final FileSystem fileSystem = vertx . fileSystem ( );
69
+ Objects . requireNonNull ( fileSystem );
62
70
63
71
createAndCheckWritePermissionsFor (fileSystem , saveFilePath );
72
+ createAndCheckWritePermissionsFor (fileSystem , tmpFilePath );
64
73
65
- return new RemoteFileSyncer (downloadUrl , saveFilePath , retryCount , retryInterval , timeout , httpClient , vertx ,
66
- fileSystem );
74
+ return new RemoteFileSyncer (downloadUrl , saveFilePath , tmpFilePath , retryCount , retryInterval , timeout ,
75
+ updatePeriod , httpClient , vertx , fileSystem );
67
76
}
68
77
69
78
/**
@@ -90,36 +99,68 @@ public void syncForFilepath(RemoteFileProcessor remoteFileProcessor) {
90
99
downloadIfNotExist (remoteFileProcessor ).setHandler (syncResult -> handleSync (remoteFileProcessor , syncResult ));
91
100
}
92
101
93
- private Future <Void > downloadIfNotExist (RemoteFileProcessor fileProcessor ) {
94
- final Future <Void > future = Future .future ();
95
- fileSystem .exists (saveFilePath , existResult -> handleFileExisting (future , existResult , fileProcessor ));
102
+ private Future <Boolean > downloadIfNotExist (RemoteFileProcessor fileProcessor ) {
103
+ final Future <Boolean > future = Future .future ();
104
+ checkFileExist (saveFilePath ).setHandler (existResult ->
105
+ handleFileExistingWithSync (existResult , fileProcessor , future ));
96
106
return future ;
97
107
}
98
108
99
- private void handleFileExisting (Future <Void > future , AsyncResult <Boolean > existResult ,
100
- RemoteFileProcessor fileProcessor ) {
109
+ private Future <Boolean > checkFileExist (String filePath ) {
110
+ final Future <Boolean > result = Future .future ();
111
+ fileSystem .exists (filePath , async -> {
112
+ if (async .succeeded ()) {
113
+ result .complete (async .result ());
114
+ } else {
115
+ result .fail (String .format ("Cant check if file exists %s" , filePath ));
116
+ }
117
+ });
118
+ return result ;
119
+ }
120
+
121
+ private void handleFileExistingWithSync (AsyncResult <Boolean > existResult , RemoteFileProcessor fileProcessor ,
122
+ Future <Boolean > future ) {
101
123
if (existResult .succeeded ()) {
102
124
if (existResult .result ()) {
103
125
fileProcessor .setDataPath (saveFilePath )
104
126
.setHandler (serviceRespond -> handleServiceRespond (serviceRespond , future ));
105
127
} else {
106
- tryDownload (future );
128
+ syncRemoteFiles (). setHandler (future );
107
129
}
108
130
} else {
109
131
future .fail (existResult .cause ());
110
132
}
111
133
}
112
134
113
- private void handleServiceRespond (AsyncResult <?> processResult , Future <Void > future ) {
135
+ private void handleServiceRespond (AsyncResult <?> processResult , Future <Boolean > future ) {
114
136
if (processResult .failed ()) {
115
137
final Throwable cause = processResult .cause ();
116
- cleanUp ().setHandler (removalResult -> handleCorruptedFileRemoval (removalResult , future , cause ));
138
+ cleanUp (saveFilePath ).setHandler (removalResult -> handleCorruptedFileRemoval (removalResult , future , cause ));
117
139
} else {
140
+ future .complete (false );
118
141
logger .info ("Existing file {0} was successfully reused for service" , saveFilePath );
119
142
}
120
143
}
121
144
122
- private void handleCorruptedFileRemoval (AsyncResult <Void > removalResult , Future <Void > future ,
145
+ private Future <Void > cleanUp (String filePath ) {
146
+ final Future <Void > future = Future .future ();
147
+ checkFileExist (filePath ).setHandler (existResult -> handleFileExistsWithDelete (filePath , existResult , future ));
148
+ return future ;
149
+ }
150
+
151
+ private void handleFileExistsWithDelete (String filePath , AsyncResult <Boolean > existResult , Future <Void > future ) {
152
+ if (existResult .succeeded ()) {
153
+ if (existResult .result ()) {
154
+ fileSystem .delete (filePath , future );
155
+ } else {
156
+ future .complete ();
157
+ }
158
+ } else {
159
+ future .fail (new PreBidException (String .format ("Cant check if file exists %s" , filePath )));
160
+ }
161
+ }
162
+
163
+ private void handleCorruptedFileRemoval (AsyncResult <Void > removalResult , Future <Boolean > future ,
123
164
Throwable serviceCause ) {
124
165
if (removalResult .failed ()) {
125
166
final Throwable cause = removalResult .cause ();
@@ -130,29 +171,42 @@ private void handleCorruptedFileRemoval(AsyncResult<Void> removalResult, Future<
130
171
logger .info ("Existing file {0} cant be processed by service, try to download after removal" ,
131
172
serviceCause , saveFilePath );
132
173
133
- tryDownload (future );
174
+ syncRemoteFiles (). setHandler (future );
134
175
}
135
176
}
136
177
137
- private void tryDownload (Future <Void > future ) {
138
- download ().setHandler (downloadResult -> handleDownload (future , downloadResult ));
178
+ private Future <Boolean > syncRemoteFiles () {
179
+ return tryDownload ()
180
+ .compose (downloadResult -> swapFiles ())
181
+ .map (true );
182
+ }
183
+
184
+ private Future <Void > tryDownload () {
185
+ final Future <Void > result = Future .future ();
186
+ cleanUp (tmpFilePath ).setHandler (event -> handleTmpDelete (event , result ));
187
+ return result ;
188
+ }
189
+
190
+ private void handleTmpDelete (AsyncResult <Void > tmpDeleteResult , Future <Void > result ) {
191
+ if (tmpDeleteResult .failed ()) {
192
+ result .fail (tmpDeleteResult .cause ());
193
+ } else {
194
+ download ().setHandler (downloadResult -> handleDownload (downloadResult , result ));
195
+ }
139
196
}
140
197
141
198
private Future <Void > download () {
142
199
final Future <Void > future = Future .future ();
143
200
final OpenOptions openOptions = new OpenOptions ().setCreateNew (true );
144
- fileSystem .open (saveFilePath , openOptions , openResult -> handleFileOpenWithDownload (future , openResult ));
201
+ fileSystem .open (tmpFilePath , openOptions , openResult -> handleFileOpenWithDownload (openResult , future ));
145
202
return future ;
146
203
}
147
204
148
- private void handleFileOpenWithDownload (Future < Void > future , AsyncResult < AsyncFile > openResult ) {
205
+ private void handleFileOpenWithDownload (AsyncResult < AsyncFile > openResult , Future < Void > future ) {
149
206
if (openResult .succeeded ()) {
150
207
final AsyncFile asyncFile = openResult .result ();
151
208
try {
152
- // .getNow is not working
153
- final HttpClientRequest httpClientRequest = httpClient
154
- .getAbs (downloadUrl , response -> pumpFileFromRequest (response , asyncFile , future ));
155
- httpClientRequest .end ();
209
+ httpClient .getAbs (downloadUrl , response -> pumpFileFromRequest (response , asyncFile , future )).end ();
156
210
} catch (Exception e ) {
157
211
future .fail (e );
158
212
}
@@ -162,35 +216,35 @@ private void handleFileOpenWithDownload(Future<Void> future, AsyncResult<AsyncFi
162
216
}
163
217
164
218
private void pumpFileFromRequest (HttpClientResponse httpClientResponse , AsyncFile asyncFile , Future <Void > future ) {
165
- logger .info ("Trying to download from {0}" , downloadUrl );
219
+ logger .info ("Trying to download file from {0}" , downloadUrl );
166
220
httpClientResponse .pause ();
167
221
final Pump pump = Pump .pump (httpClientResponse , asyncFile );
168
222
pump .start ();
169
223
httpClientResponse .resume ();
170
224
171
- final long idTimer = setTimeoutTimer (asyncFile , future , pump );
225
+ final long idTimer = setTimeoutTimer (asyncFile , pump , future );
172
226
173
- httpClientResponse .endHandler (responseEndResult -> handleResponseEnd (asyncFile , future , idTimer ));
227
+ httpClientResponse .endHandler (responseEndResult -> handleResponseEnd (asyncFile , idTimer , future ));
174
228
}
175
229
176
- private long setTimeoutTimer (AsyncFile asyncFile , Future <Void > future , Pump pump ) {
177
- return vertx .setTimer (timeout , timerId -> handleTimeout (asyncFile , future , pump ));
230
+ private long setTimeoutTimer (AsyncFile asyncFile , Pump pump , Future <Void > future ) {
231
+ return vertx .setTimer (timeout , timerId -> handleTimeout (asyncFile , pump , future ));
178
232
}
179
233
180
- private void handleTimeout (AsyncFile asyncFile , Future <Void > future , Pump pump ) {
234
+ private void handleTimeout (AsyncFile asyncFile , Pump pump , Future <Void > future ) {
181
235
pump .stop ();
182
236
asyncFile .close ();
183
237
if (!future .isComplete ()) {
184
238
future .fail (new TimeoutException ("Timeout on download" ));
185
239
}
186
240
}
187
241
188
- private void handleResponseEnd (AsyncFile asyncFile , Future <Void > future , long idTimer ) {
242
+ private void handleResponseEnd (AsyncFile asyncFile , long idTimer , Future <Void > future ) {
189
243
vertx .cancelTimer (idTimer );
190
244
asyncFile .flush ().close (future );
191
245
}
192
246
193
- private void handleDownload (Future <Void > future , AsyncResult <Void > downloadResult ) {
247
+ private void handleDownload (AsyncResult <Void > downloadResult , Future <Void > future ) {
194
248
if (downloadResult .failed ()) {
195
249
retryDownload (future , retryInterval , retryCount );
196
250
} else {
@@ -206,45 +260,47 @@ private void retryDownload(Future<Void> receivedFuture, long retryInterval, long
206
260
private void handleRetry (Future <Void > receivedFuture , long retryInterval , long retryCount ) {
207
261
if (retryCount > 0 ) {
208
262
final long next = retryCount - 1 ;
209
- cleanUp ().compose (aVoid -> download ())
210
- .setHandler (retryResult -> handleRetryResult (receivedFuture , retryInterval , next , retryResult ));
263
+ cleanUp (tmpFilePath ).compose (ignore -> download ())
264
+ .setHandler (retryResult -> handleRetryResult (retryInterval , next , retryResult , receivedFuture ));
211
265
} else {
212
- cleanUp ().setHandler (aVoid -> receivedFuture .fail (new PreBidException ("File sync failed after retries" )));
266
+ cleanUp (tmpFilePath ).setHandler (ignore -> receivedFuture .fail (new PreBidException (
267
+ String .format ("File sync failed after %s retries" , this .retryCount - retryCount ))));
213
268
}
214
269
}
215
270
216
- private Future <Void > cleanUp () {
217
- final Future <Void > future = Future .future ();
218
- fileSystem .exists (saveFilePath , existResult -> handleFileExistsWithDelete (future , existResult ));
219
- return future ;
220
- }
221
-
222
- private void handleFileExistsWithDelete (Future <Void > future , AsyncResult <Boolean > existResult ) {
223
- if (existResult .succeeded ()) {
224
- if (existResult .result ()) {
225
- fileSystem .delete (saveFilePath , future );
226
- } else {
227
- future .complete ();
228
- }
229
- } else {
230
- future .fail (new PreBidException (String .format ("Cant check if file exists %s" , saveFilePath )));
231
- }
232
- }
233
-
234
- private void handleRetryResult (Future <Void > future , long retryInterval , long next , AsyncResult <Void > retryResult ) {
271
+ private void handleRetryResult (long retryInterval , long next , AsyncResult <Void > retryResult , Future <Void > future ) {
235
272
if (retryResult .succeeded ()) {
236
273
future .complete ();
237
274
} else {
238
275
retryDownload (future , retryInterval , next );
239
276
}
240
277
}
241
278
242
- private void handleSync (RemoteFileProcessor remoteFileProcessor , AsyncResult <Void > syncResult ) {
279
+ private Future <Void > swapFiles () {
280
+ final Future <Void > result = Future .future ();
281
+ logger .info ("Sync {0} to {1}" , tmpFilePath , saveFilePath );
282
+
283
+ final CopyOptions copyOptions = new CopyOptions ().setReplaceExisting (true );
284
+ fileSystem .move (tmpFilePath , saveFilePath , copyOptions , result );
285
+ return result ;
286
+ }
287
+
288
+ private void handleSync (RemoteFileProcessor remoteFileProcessor , AsyncResult <Boolean > syncResult ) {
243
289
if (syncResult .succeeded ()) {
244
- remoteFileProcessor .setDataPath (saveFilePath )
245
- .setHandler (this ::logFileProcessStatus );
290
+ if (syncResult .result ()) {
291
+ logger .info ("Sync service for {0}" , saveFilePath );
292
+ remoteFileProcessor .setDataPath (saveFilePath )
293
+ .setHandler (this ::logFileProcessStatus );
294
+ } else {
295
+ logger .info ("Sync is not required for {0}" , saveFilePath );
296
+ }
246
297
} else {
247
- logger .error ("Cant download file from {0}" , syncResult .cause (), downloadUrl );
298
+ logger .error ("Cant sync file from {0}" , syncResult .cause (), downloadUrl );
299
+ }
300
+
301
+ // setup new update regardless of the result
302
+ if (updatePeriod > 0 ) {
303
+ vertx .setTimer (updatePeriod , idUpdateNew -> configureAutoUpdates (remoteFileProcessor ));
248
304
}
249
305
}
250
306
@@ -255,5 +311,45 @@ private void logFileProcessStatus(AsyncResult<?> serviceRespond) {
255
311
logger .error ("Service cant process file {0} and still unavailable." , saveFilePath );
256
312
}
257
313
}
314
+
315
+ private void configureAutoUpdates (RemoteFileProcessor remoteFileProcessor ) {
316
+ logger .info ("Check for updated for {0}" , saveFilePath );
317
+ tryUpdate ().setHandler (asyncUpdate -> {
318
+ if (asyncUpdate .failed ()) {
319
+ logger .warn ("File {0} update failed" , asyncUpdate .cause (), saveFilePath );
320
+ }
321
+ handleSync (remoteFileProcessor , asyncUpdate );
322
+ });
323
+ }
324
+
325
+ private Future <Boolean > tryUpdate () {
326
+ return checkFileExist (saveFilePath )
327
+ .compose (fileExists -> fileExists ? isNeedToUpdate () : Future .succeededFuture (true ))
328
+ .compose (needUpdate -> needUpdate ? syncRemoteFiles () : Future .succeededFuture (false ));
329
+ }
330
+
331
+ private Future <Boolean > isNeedToUpdate () {
332
+ final Future <Boolean > isNeedToUpdate = Future .future ();
333
+ httpClient .headAbs (downloadUrl , response -> checkNewVersion (response , isNeedToUpdate ))
334
+ .exceptionHandler (isNeedToUpdate ::fail )
335
+ .end ();
336
+ return isNeedToUpdate ;
337
+ }
338
+
339
+ private void checkNewVersion (HttpClientResponse response , Future <Boolean > isNeedToUpdate ) {
340
+ final String contentLengthParameter = response .getHeader (HttpHeaders .CONTENT_LENGTH );
341
+ if (StringUtils .isNumeric (contentLengthParameter ) && !contentLengthParameter .equals ("0" )) {
342
+ final long contentLength = Long .parseLong (contentLengthParameter );
343
+ fileSystem .props (saveFilePath , filePropsResult -> {
344
+ if (filePropsResult .succeeded ()) {
345
+ isNeedToUpdate .complete (filePropsResult .result ().size () != contentLength );
346
+ } else {
347
+ isNeedToUpdate .fail (filePropsResult .cause ());
348
+ }
349
+ });
350
+ } else {
351
+ isNeedToUpdate .fail (String .format ("ContentLength is invalid: %s" , contentLengthParameter ));
352
+ }
353
+ }
258
354
}
259
355
0 commit comments