Skip to content

Commit 6d5978d

Browse files
committed
fix an infinite loop issue
1 parent 04c1dff commit 6d5978d

File tree

3 files changed

+13
-7
lines changed

3 files changed

+13
-7
lines changed

gpdbwriter/src/main/java/cn/hashdata/datax/plugin/writer/gpdbwriter/CopyProcessor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,12 @@ public Long call() throws Exception {
4545
Thread.currentThread().setName("CopyProcessor");
4646
Record record = null;
4747

48-
while (task.moreRecord() || (record = queueIn.poll(1000L, TimeUnit.MILLISECONDS)) != null) {
49-
if (record == null) {
48+
while (true) {
49+
record = queueIn.poll(1000L, TimeUnit.MILLISECONDS);
50+
51+
if (record == null && false == task.moreRecord()) {
52+
break;
53+
} else if (record == null) {
5054
continue;
5155
}
5256

gpdbwriter/src/main/java/cn/hashdata/datax/plugin/writer/gpdbwriter/CopyWorker.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import org.slf4j.Logger;
2222
import org.slf4j.LoggerFactory;
2323

24-
import com.alibaba.datax.common.element.Column;
25-
import com.alibaba.datax.common.element.Record;
2624
import com.alibaba.datax.common.exception.DataXException;
2725
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
2826
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
@@ -94,8 +92,12 @@ public Long call() throws Exception {
9492

9593
byte[] data = null;
9694
try {
97-
while (task.moreData() || (data = queue.poll(1000L, TimeUnit.MILLISECONDS)) != null) {
98-
if (data == null) {
95+
while (true) {
96+
data = queue.poll(1000L, TimeUnit.MILLISECONDS);
97+
98+
if (data == null && false == task.moreData()) {
99+
break;
100+
} else if (data == null) {
99101
continue;
100102
}
101103

gpdbwriter/src/main/java/cn/hashdata/datax/plugin/writer/gpdbwriter/CopyWriterTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private void send(Record record, LinkedBlockingQueue<Record> queue)
7676
result.get();
7777
}
7878

79-
Thread.sleep(1000);
79+
Thread.sleep(100);
8080
}
8181
}
8282

0 commit comments

Comments
 (0)