Skip to content

Commit 1b4624f

Browse files
authored
[#4835] Fixed the fileUpload stream not closed problem (#4837)
1 parent 6a9ff25 commit 1b4624f

File tree

3 files changed

+295
-1
lines changed

3 files changed

+295
-1
lines changed

foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ public FileUploadPart(FileUpload fileUpload) {
3636

3737
@Override
3838
public InputStream getInputStream() throws IOException {
39-
return Files.newInputStream(new File(fileUpload.uploadedFileName()).toPath());
39+
final InputStream inputStream = Files.newInputStream(new File(fileUpload.uploadedFileName()).toPath());
40+
return new InputStreamWrapper(inputStream);
4041
}
4142

4243
@Override
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.servicecomb.foundation.vertx.http;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.TimeUnit;
29+
30+
import org.apache.servicecomb.foundation.common.event.EventManager;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import com.google.common.eventbus.EventBus;
35+
import com.netflix.config.DynamicPropertyFactory;
36+
37+
public class FileUploadStreamRecorder {
38+
private static final Logger LOGGER = LoggerFactory.getLogger(FileUploadStreamRecorder.class);
39+
40+
private static final FileUploadStreamRecorder RECORDER = new FileUploadStreamRecorder();
41+
42+
public static final String STREAM_RECORDER_MAX_SIZE = "servicecomb.uploads.file.streamRecorder.maxSize";
43+
44+
public static final String STREAM_STACKTRACE_ENABLED
45+
= "servicecomb.uploads.file.streamRecorder.stackTraceEnabled";
46+
47+
public static final String STREAM_CHECK_INTERVAL = "servicecomb.uploads.file.streamRecorder.checkInterval";
48+
49+
public static final String STREAM_MAX_OPEN_TIME = "servicecomb.uploads.file.streamRecorder.streamMaxOpenTime";
50+
51+
public static final int DEFAULT_STREAM_RECORDER_MAX_SIZE = 5000;
52+
53+
public static final long DEFAULT_STREAM_CHECK_INTERVAL = 30000L;
54+
55+
public static final long DEFAULT_STREAM_MAX_OPEN_TIME = 90000L;
56+
57+
private final Map<InputStreamWrapper, StreamOperateEvent> streamWrapperRecorder = new ConcurrentHashMap<>();
58+
59+
private final EventBus eventBus;
60+
61+
private final ScheduledExecutorService streamCheckExecutor;
62+
63+
private final Object lock = new Object();
64+
65+
private FileUploadStreamRecorder() {
66+
eventBus = EventManager.getEventBus();
67+
streamCheckExecutor = Executors.newScheduledThreadPool(1,
68+
(t) -> new Thread(t, "upload-file-stream-check"));
69+
startCheckRecordFileStream();
70+
}
71+
72+
private void startCheckRecordFileStream() {
73+
streamCheckExecutor.scheduleWithFixedDelay(this::checkRecordFileStream, DEFAULT_STREAM_CHECK_INTERVAL,
74+
getStreamCheckInterval(), TimeUnit.MILLISECONDS);
75+
}
76+
77+
public static FileUploadStreamRecorder getInstance() {
78+
return RECORDER;
79+
}
80+
81+
public void recordOpenStream(final InputStreamWrapper wrapper) {
82+
checkAndRemoveOldestStream();
83+
streamWrapperRecorder.put(wrapper, new StreamOperateEvent(wrapper));
84+
}
85+
86+
private void checkAndRemoveOldestStream() {
87+
int maxSize = getStreamRecorderMaxSize();
88+
if (streamWrapperRecorder.size() < maxSize) {
89+
return;
90+
}
91+
synchronized (lock) {
92+
StreamOperateEvent oldestEvent = getOldestOperateEvent(streamWrapperRecorder.values());
93+
LOGGER.warn("reached record maxSize [{}] of file stream, delete oldest stream, operate time [{}], stackTrace: ",
94+
maxSize, oldestEvent.getOpenStreamTimestamp(), oldestEvent.getInvokeStackTrace());
95+
oldestEvent.setEventType(EventType.OVER_SIZE);
96+
eventBus.post(oldestEvent);
97+
closeStreamWrapper(oldestEvent.getInputStreamWrapper());
98+
}
99+
}
100+
101+
private StreamOperateEvent getOldestOperateEvent(Collection<StreamOperateEvent> values) {
102+
StreamOperateEvent oldestEvent = null;
103+
for (StreamOperateEvent event : values) {
104+
if (oldestEvent == null) {
105+
oldestEvent = event;
106+
continue;
107+
}
108+
if (oldestEvent.getOpenStreamTimestamp() > event.getOpenStreamTimestamp()) {
109+
oldestEvent = event;
110+
}
111+
}
112+
return oldestEvent;
113+
}
114+
115+
public void clearRecorder(InputStreamWrapper inputStreamWrapper) {
116+
streamWrapperRecorder.remove(inputStreamWrapper);
117+
}
118+
119+
private void checkRecordFileStream() {
120+
try {
121+
if (streamWrapperRecorder.isEmpty()) {
122+
return;
123+
}
124+
List<StreamOperateEvent> overdueStreamEvents = new ArrayList<>();
125+
long currentMillis = System.currentTimeMillis();
126+
for (StreamOperateEvent event : streamWrapperRecorder.values()) {
127+
long streamOperateTime = event.getOpenStreamTimestamp();
128+
if (currentMillis - streamOperateTime >= getStreamMaxOpenTime()) {
129+
overdueStreamEvents.add(event);
130+
}
131+
}
132+
for (StreamOperateEvent overdueEvent : overdueStreamEvents) {
133+
overdueEvent.setEventType(EventType.TIMEOUT);
134+
eventBus.post(overdueEvent);
135+
closeStreamWrapper(overdueEvent.getInputStreamWrapper());
136+
LOGGER.warn("closed timeout stream, operate time [{}], operate stackTrace: ",
137+
overdueEvent.getOpenStreamTimestamp(), overdueEvent.getInvokeStackTrace());
138+
}
139+
} catch (Exception e) {
140+
LOGGER.error("checkRecordFileStream failed, next interval will try again.", e);
141+
}
142+
}
143+
144+
private void closeStreamWrapper(InputStreamWrapper wrapper) {
145+
try {
146+
wrapper.close();
147+
} catch (IOException e) {
148+
LOGGER.error("closed input stream failed!", e);
149+
}
150+
}
151+
152+
private int getStreamRecorderMaxSize() {
153+
return DynamicPropertyFactory.getInstance()
154+
.getIntProperty(STREAM_RECORDER_MAX_SIZE, DEFAULT_STREAM_RECORDER_MAX_SIZE).get();
155+
}
156+
157+
private static boolean getStreamStackTraceEnabled() {
158+
return DynamicPropertyFactory.getInstance().getBooleanProperty(STREAM_STACKTRACE_ENABLED, false).get();
159+
}
160+
161+
private long getStreamCheckInterval() {
162+
return DynamicPropertyFactory.getInstance()
163+
.getLongProperty(STREAM_CHECK_INTERVAL, DEFAULT_STREAM_CHECK_INTERVAL).get();
164+
}
165+
166+
private long getStreamMaxOpenTime() {
167+
return DynamicPropertyFactory.getInstance()
168+
.getLongProperty(STREAM_MAX_OPEN_TIME, DEFAULT_STREAM_MAX_OPEN_TIME).get();
169+
}
170+
171+
public static class StreamOperateEvent {
172+
private final InputStreamWrapper inputStreamWrapper;
173+
174+
private final long openStreamTimestamp;
175+
176+
private Exception invokeStackTrace;
177+
178+
private EventType eventType;
179+
180+
public StreamOperateEvent(InputStreamWrapper inputStreamWrapper) {
181+
this.inputStreamWrapper = inputStreamWrapper;
182+
if (getStreamStackTraceEnabled()) {
183+
this.invokeStackTrace = new Exception();
184+
}
185+
this.openStreamTimestamp = System.currentTimeMillis();
186+
}
187+
188+
public InputStreamWrapper getInputStreamWrapper() {
189+
return inputStreamWrapper;
190+
}
191+
192+
public Exception getInvokeStackTrace() {
193+
return invokeStackTrace;
194+
}
195+
196+
public long getOpenStreamTimestamp() {
197+
return openStreamTimestamp;
198+
}
199+
200+
public EventType getEventType() {
201+
return eventType;
202+
}
203+
204+
public void setEventType(EventType eventType) {
205+
this.eventType = eventType;
206+
}
207+
}
208+
209+
public enum EventType {
210+
OVER_SIZE,
211+
TIMEOUT
212+
}
213+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.servicecomb.foundation.vertx.http;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
23+
public class InputStreamWrapper extends InputStream {
24+
private final InputStream inputStream;
25+
26+
public InputStreamWrapper(InputStream inputStream) {
27+
this.inputStream = inputStream;
28+
FileUploadStreamRecorder.getInstance().recordOpenStream(this);
29+
}
30+
31+
public InputStream getInputStream() {
32+
return inputStream;
33+
}
34+
35+
@Override
36+
public int read() throws IOException {
37+
return inputStream.read();
38+
}
39+
40+
@Override
41+
public int read(byte[] b) throws IOException {
42+
return inputStream.read(b);
43+
}
44+
45+
@Override
46+
public int read(byte[] b, int off, int len) throws IOException {
47+
return inputStream.read(b, off, len);
48+
}
49+
50+
@Override
51+
public long skip(long n) throws IOException {
52+
return inputStream.skip(n);
53+
}
54+
55+
@Override
56+
public int available() throws IOException {
57+
return inputStream.available();
58+
}
59+
60+
@Override
61+
public boolean markSupported() {
62+
return inputStream.markSupported();
63+
}
64+
65+
@Override
66+
public void mark(int readlimit) {
67+
inputStream.mark(readlimit);
68+
}
69+
70+
@Override
71+
public void close() throws IOException {
72+
FileUploadStreamRecorder.getInstance().clearRecorder(this);
73+
inputStream.close();
74+
}
75+
76+
@Override
77+
public void reset() throws IOException {
78+
inputStream.reset();
79+
}
80+
}

0 commit comments

Comments
 (0)