|
21 | 21 | import net.openhft.chronicle.tools.ChronicleTools; |
22 | 22 | import org.junit.Test; |
23 | 23 |
|
| 24 | +import static org.junit.Assert.*; |
| 25 | + |
| 26 | + |
24 | 27 | import java.io.IOException; |
25 | 28 |
|
26 | 29 | /** |
@@ -64,4 +67,78 @@ public void testChained() throws IOException { |
64 | 67 | sink2.close(); |
65 | 68 | source1.close(); |
66 | 69 | } |
| 70 | + |
| 71 | + @Test |
| 72 | + public void testChainedChronicleReconnection() throws IOException, InterruptedException { |
| 73 | + |
| 74 | + //create the 'source' chronicle |
| 75 | + ChronicleTools.deleteOnExit(TMP + "/chronicle1"); |
| 76 | + Chronicle chronicle = new IndexedChronicle(TMP + "/chronicle1"); |
| 77 | + InProcessChronicleSource chronicleSource = new InProcessChronicleSource(chronicle, 61111); |
| 78 | + |
| 79 | + //write some data into the 'source' chronicle |
| 80 | + ExcerptAppender sourceAppender = chronicleSource.createAppender(); |
| 81 | + long NUM_INITIAL_MESSAGES = 20; |
| 82 | + for (long i = 0; i < NUM_INITIAL_MESSAGES; i++) { |
| 83 | + sourceAppender.startExcerpt(); |
| 84 | + sourceAppender.writeLong(i); |
| 85 | + sourceAppender.flush(); |
| 86 | + sourceAppender.finish(); |
| 87 | + } |
| 88 | + |
| 89 | + // Starting first slave instance |
| 90 | + // create the 'slave' chronicle |
| 91 | + |
| 92 | + ChronicleTools.deleteOnExit(TMP + "/chronicle2"); |
| 93 | + Chronicle chronicle1 = new IndexedChronicle(TMP + "/chronicle2"); |
| 94 | + InProcessChronicleSource chronicleSource1 = new InProcessChronicleSource(chronicle1, 62222); |
| 95 | + InProcessChronicleSink chronicleSink1 = new InProcessChronicleSink(chronicleSource1, "localhost", 61111); |
| 96 | + |
| 97 | + //try to read current data from the 'slave' chronicle |
| 98 | + |
| 99 | + ExcerptTailer tailer1 = chronicleSink1.createTailer(); |
| 100 | + long nextIndex1 = 0; |
| 101 | + while (tailer1.nextIndex()) { |
| 102 | + assertEquals("Unexpected index in stream", tailer1.readLong(), nextIndex1++); |
| 103 | + } |
| 104 | + assertEquals("Unexpected number of messages in stream", NUM_INITIAL_MESSAGES, nextIndex1); |
| 105 | + |
| 106 | + // Close first 'slave' chronicle |
| 107 | + |
| 108 | + chronicleSink1.close(); |
| 109 | + chronicleSource1.close(); |
| 110 | + chronicle1.close(); |
| 111 | + |
| 112 | + // Write some more data |
| 113 | + |
| 114 | + for (long i = NUM_INITIAL_MESSAGES; i < NUM_INITIAL_MESSAGES * 2; i++) { |
| 115 | + sourceAppender.startExcerpt(); |
| 116 | + sourceAppender.writeLong(i); |
| 117 | + sourceAppender.flush(); |
| 118 | + sourceAppender.finish(); |
| 119 | + } |
| 120 | + |
| 121 | + // Starting second slave instance |
| 122 | + // Observe that we don't call ChronicleTools.deleteOnExit(file) - |
| 123 | + // the new instance will re-open the existing chronicle file |
| 124 | + Chronicle chronicle2 = new IndexedChronicle(TMP + "/chronicle2"); |
| 125 | + InProcessChronicleSource chronicleSource2 = new InProcessChronicleSource(chronicle2, 63333); |
| 126 | + InProcessChronicleSink chronicleSink2 = new InProcessChronicleSink(chronicleSource2, "localhost", 61111); |
| 127 | + |
| 128 | + ExcerptTailer tailer2 = chronicleSink2.createTailer(); |
| 129 | + long nextIndex2 = 0; |
| 130 | + while (tailer2.nextIndex()) { |
| 131 | + assertEquals("Unexpected message index in stream", tailer2.readLong(), nextIndex2++); |
| 132 | + } |
| 133 | + |
| 134 | + assertEquals("Didn't read all messages", NUM_INITIAL_MESSAGES * 2, nextIndex2); |
| 135 | + |
| 136 | + // Cleaning up |
| 137 | + chronicleSink2.close(); |
| 138 | + chronicleSource2.close(); |
| 139 | + chronicle2.close(); |
| 140 | + |
| 141 | + chronicleSource.close(); |
| 142 | + chronicle.close(); |
| 143 | + } |
67 | 144 | } |
0 commit comments