Skip to content

Commit 474479c

Browse files
dpavlovagoncharuk
authored andcommitted
IGNITE-6539 WAL parser fails if empty log files exist in directory - Fixes apache#2794.
Signed-off-by: Alexey Goncharuk <[email protected]>
1 parent 3b1cad2 commit 474479c

File tree

2 files changed

+104
-9
lines changed

2 files changed

+104
-9
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@
4545
import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
4646
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
4747
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
48-
import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
4948
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
5049
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
5150
import org.apache.ignite.internal.util.typedef.F;
51+
import org.apache.ignite.internal.util.typedef.internal.U;
5252
import org.jetbrains.annotations.NotNull;
5353
import org.jetbrains.annotations.Nullable;
5454

@@ -185,20 +185,19 @@ private void init(
185185
* Header record and its position is checked. WAL position is used to determine real index.
186186
* File index from file name is ignored.
187187
*
188-
* @param allFiles files to scan
189-
* @return list of file descriptors with checked header records, file index is set
190-
* @throws IgniteCheckedException if IO error occurs
188+
* @param allFiles files to scan.
189+
* @return list of file descriptors with checked header records, having correct file index is set
191190
*/
192191
private List<FileWriteAheadLogManager.FileDescriptor> scanIndexesFromFileHeaders(
193-
@Nullable final File[] allFiles) throws IgniteCheckedException {
192+
@Nullable final File[] allFiles) {
194193
if (allFiles == null || allFiles.length == 0)
195194
return Collections.emptyList();
196195

197196
final List<FileWriteAheadLogManager.FileDescriptor> resultingDescs = new ArrayList<>();
198197

199198
for (File file : allFiles) {
200199
if (file.length() < HEADER_RECORD_SIZE)
201-
continue;
200+
continue; //filter out this segment as it is too short
202201

203202
FileWALPointer ptr;
204203

@@ -211,17 +210,24 @@ private List<FileWriteAheadLogManager.FileDescriptor> scanIndexesFromFileHeaders
211210
// Header record must be agnostic to the serializer version.
212211
final int type = in.readUnsignedByte();
213212

214-
if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
215-
throw new SegmentEofException("Reached logical end of the segment", null);
213+
if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) {
214+
if (log.isInfoEnabled())
215+
log.info("Reached logical end of the segment for file " + file);
216+
217+
continue; //filter out this segment
218+
}
216219
ptr = RecordV1Serializer.readPosition(in);
217220
}
218221
catch (IOException e) {
219-
throw new IgniteCheckedException("Failed to scan index from file [" + file + "]", e);
222+
U.warn(log, "Failed to scan index from file [" + file + "]. Skipping this file during iteration", e);
223+
224+
continue; //filter out this segment
220225
}
221226

222227
resultingDescs.add(new FileWriteAheadLogManager.FileDescriptor(file, ptr.index()));
223228
}
224229
Collections.sort(resultingDescs);
230+
225231
return resultingDescs;
226232
}
227233

modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.apache.ignite.transactions.Transaction;
7676
import org.jetbrains.annotations.NotNull;
7777
import org.jetbrains.annotations.Nullable;
78+
import org.junit.Assert;
7879

7980
import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
8081
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
@@ -339,6 +340,22 @@ private void putDummyRecords(Ignite ignite, int recordsToWrite) {
339340
cache0.put(i, new IndexedObject(i));
340341
}
341342

343+
/**
344+
* Puts provided number of records to fill WAL
345+
*
346+
* @param ignite ignite instance
347+
* @param recordsToWrite count
348+
*/
349+
private void putAllDummyRecords(Ignite ignite, int recordsToWrite) {
350+
IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME);
351+
352+
Map<Object, Object> values = new HashMap<>();
353+
354+
for (int i = 0; i < recordsToWrite; i++)
355+
values.put(i, new IndexedObject(i));
356+
357+
cache0.putAll(values);
358+
}
342359
/**
343360
* Puts provided number of records to fill WAL under transactions
344361
*
@@ -714,6 +731,78 @@ else if (val instanceof BinaryObject) {
714731

715732
}
716733

734+
/**
735+
* Tests archive completed event is fired
736+
*
737+
* @throws Exception if failed
738+
*/
739+
public void testFillWalForExactSegmentsCount() throws Exception {
740+
customWalMode = WALMode.DEFAULT;
741+
742+
final CountDownLatch reqSegments = new CountDownLatch(15);
743+
final Ignite ignite = startGrid("node0");
744+
745+
ignite.active(true);
746+
747+
final IgniteEvents evts = ignite.events();
748+
749+
if (!evts.isEnabled(EVT_WAL_SEGMENT_ARCHIVED))
750+
assertTrue("nothing to test", false);
751+
752+
evts.localListen(new IgnitePredicate<Event>() {
753+
@Override public boolean apply(Event e) {
754+
WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e;
755+
long idx = archComplEvt.getAbsWalSegmentIdx();
756+
log.info("Finished archive for segment [" + idx + ", " +
757+
archComplEvt.getArchiveFile() + "]: [" + e + "]");
758+
759+
reqSegments.countDown();
760+
return true;
761+
}
762+
}, EVT_WAL_SEGMENT_ARCHIVED);
763+
764+
765+
int totalEntries = 0;
766+
while (reqSegments.getCount() > 0) {
767+
final int write = 500;
768+
putAllDummyRecords(ignite, write);
769+
totalEntries += write;
770+
Assert.assertTrue("Too much entries generated, but segments was not become available",
771+
totalEntries < 10000);
772+
}
773+
final String subfolderName = genDbSubfolderName(ignite, 0);
774+
775+
stopGrid("node0");
776+
777+
final String workDir = U.defaultWorkDirectory();
778+
final IgniteWalIteratorFactory factory = createWalIteratorFactory(subfolderName, workDir);
779+
780+
scanIterateAndCount(factory, workDir, subfolderName, totalEntries, 0, null, null);
781+
}
782+
783+
/**
784+
* Tests reading of empty WAL from non filled cluster
785+
*
786+
* @throws Exception if failed.
787+
*/
788+
public void testReadEmptyWal() throws Exception {
789+
customWalMode = WALMode.DEFAULT;
790+
791+
final Ignite ignite = startGrid("node0");
792+
793+
ignite.active(true);
794+
ignite.active(false);
795+
796+
final String subfolderName = genDbSubfolderName(ignite, 0);
797+
798+
stopGrid("node0");
799+
800+
final String workDir = U.defaultWorkDirectory();
801+
final IgniteWalIteratorFactory factory = createWalIteratorFactory(subfolderName, workDir);
802+
803+
scanIterateAndCount(factory, workDir, subfolderName, 0, 0, null, null);
804+
}
805+
717806
/**
718807
* Creates and fills cache with data.
719808
*

0 commit comments

Comments
 (0)