Skip to content

Commit fde65a6

Browse files
committed
IGNITE-13193 Added fallback to full rebalance if historical one has failed.
1 parent 236f30b commit fde65a6

File tree

12 files changed

+1082
-288
lines changed

12 files changed

+1082
-288
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java

Lines changed: 162 additions & 148 deletions
Large diffs are not rendered by default.

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ public interface GridCachePreloader {
7676
* @param exchFut Completed exchange future. Can be {@code null} if forced or reassigned generation occurs.
7777
* @return Partition assignments which will be requested from supplier nodes.
7878
*/
79-
@Nullable public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId,
80-
@Nullable GridDhtPartitionsExchangeFuture exchFut);
79+
@Nullable public GridDhtPreloaderAssignments generateAssignments(
80+
GridDhtPartitionExchangeId exchId,
81+
@Nullable GridDhtPartitionsExchangeFuture exchFut);
8182

8283
/**
8384
* Adds assignments to preloader.

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,9 @@ public GridCachePreloaderAdapter(CacheGroupContext grp) {
144144
}
145145

146146
/** {@inheritDoc} */
147-
@Override public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId,
148-
GridDhtPartitionsExchangeFuture exchFut) {
147+
@Override public GridDhtPreloaderAssignments generateAssignments(
148+
GridDhtPartitionExchangeId exchId,
149+
GridDhtPartitionsExchangeFuture exchFut) {
149150
return null;
150151
}
151152

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ Collection<UUID> remainingNodes() {
327327
return null;
328328
}
329329

330-
final RebalanceFuture fut = new RebalanceFuture(grp, assignments, log, rebalanceId, next, lastCancelledTime);
330+
final RebalanceFuture fut = new RebalanceFuture(grp, lastExchangeFut, assignments, log, rebalanceId, next, lastCancelledTime);
331331

332332
if (!grp.localWalEnabled()) {
333333
fut.listen(new IgniteInClosureX<IgniteInternalFuture<Boolean>>() {
@@ -482,21 +482,21 @@ public void handleSupplyMessage(
482482

483483
if (node == null) {
484484
if (log.isDebugEnabled())
485-
log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
485+
log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(nodeId, supplyMsg) + ']');
486486

487487
return;
488488
}
489489

490490
// Topology already changed (for the future that supply message based on).
491491
if (!fut.isActual(supplyMsg.rebalanceId())) {
492492
if (log.isDebugEnabled())
493-
log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
493+
log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(nodeId, supplyMsg) + ']');
494494

495495
return;
496496
}
497497

498498
if (log.isDebugEnabled())
499-
log.debug("Received supply message [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
499+
log.debug("Received supply message [" + demandRoutineInfo(nodeId, supplyMsg) + ']');
500500

501501
// Check whether there were error during supply message unmarshalling process.
502502
if (supplyMsg.classError() != null) {
@@ -616,15 +616,15 @@ public void handleSupplyMessage(
616616

617617
if (log.isDebugEnabled())
618618
log.debug("Skipping rebalancing partition (state is not MOVING): " +
619-
"[" + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + "]");
619+
'[' + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + ']');
620620
}
621621
}
622622
else {
623623
fut.partitionDone(nodeId, p, false);
624624

625625
if (log.isDebugEnabled())
626626
log.debug("Skipping rebalancing partition (affinity changed): " +
627-
"[" + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + "]");
627+
'[' + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + ']');
628628
}
629629
}
630630

@@ -662,7 +662,7 @@ public void handleSupplyMessage(
662662
else {
663663
if (log.isDebugEnabled())
664664
log.debug("Will not request next demand message [" + demandRoutineInfo(nodeId, supplyMsg) +
665-
", rebalanceFuture=" + fut + "]");
665+
", rebalanceFuture=" + fut + ']');
666666
}
667667
}
668668
catch (IgniteSpiException | IgniteCheckedException e) {
@@ -985,14 +985,14 @@ private void updateGroupMetrics() {
985985
* Internal states of rebalance future.
986986
*/
987987
private enum RebalanceFutureState {
988-
/** Init. */
988+
/** Initial state. */
989989
INIT,
990990

991-
/** Started. */
991+
/** Rebalance future started and requested required partitions. */
992992
STARTED,
993993

994-
/** Marked as cancelled. */
995-
MARK_CANCELLED,
994+
/** Marked as cancelled. This means partitions will not be requested. */
995+
MARK_CANCELLED
996996
}
997997

998998
/**
@@ -1018,13 +1018,17 @@ public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
10181018
/** Remaining. */
10191019
private final Map<UUID, IgniteDhtDemandedPartitionsMap> remaining = new HashMap<>();
10201020

1021-
/** Missed. */
1021+
/** Collection of missed partitions and partitions that could not be rebalanced from a supplier. */
10221022
private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
10231023

10241024
/** Exchange ID. */
10251025
@GridToStringExclude
10261026
private final GridDhtPartitionExchangeId exchId;
10271027

1028+
/** Coresponding exchange future. */
1029+
@GridToStringExclude
1030+
private final GridDhtPartitionsExchangeFuture exchFut;
1031+
10281032
/** Topology version. */
10291033
private final AffinityTopologyVersion topVer;
10301034

@@ -1076,7 +1080,10 @@ public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
10761080
private final Map<ClusterNode, Set<Integer>> rebalancingParts;
10771081

10781082
/**
1079-
* @param grp Cache group.
1083+
* Creates a new rebalance future.
1084+
*
1085+
* @param grp Cache group context.
1086+
* @param exchFut Exchange future.
10801087
* @param assignments Assignments.
10811088
* @param log Logger.
10821089
* @param rebalanceId Rebalance id.
@@ -1085,17 +1092,21 @@ public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
10851092
*/
10861093
RebalanceFuture(
10871094
CacheGroupContext grp,
1095+
GridDhtPartitionsExchangeFuture exchFut,
10881096
GridDhtPreloaderAssignments assignments,
10891097
IgniteLogger log,
10901098
long rebalanceId,
10911099
RebalanceFuture next,
1092-
AtomicLong lastCancelledTime) {
1100+
AtomicLong lastCancelledTime
1101+
) {
10931102
assert assignments != null;
1103+
assert assignments != null : "Asiignments must not be null.";
10941104

10951105
this.rebalancingParts = U.newHashMap(assignments.size());
10961106
this.assignments = assignments;
10971107
exchId = assignments.exchangeId();
10981108
topVer = assignments.topologyVersion();
1109+
this.exchFut = exchFut;
10991110
this.next = next;
11001111

11011112
this.lastCancelledTime = lastCancelledTime;
@@ -1142,6 +1153,7 @@ public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
11421153
this.assignments = null;
11431154
this.exchId = null;
11441155
this.topVer = null;
1156+
this.exchFut = null;
11451157
this.ctx = null;
11461158
this.grp = null;
11471159
this.log = null;
@@ -1476,6 +1488,19 @@ private synchronized void partitionMissed(UUID nodeId, int p) {
14761488
if (isDone())
14771489
return;
14781490

1491+
IgniteDhtDemandedPartitionsMap parts = remaining.get(nodeId);
1492+
1493+
assert parts != null : "Remaining not found [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId +
1494+
", part=" + p + "]";
1495+
1496+
if (parts.historicalMap().contains(p)) {
1497+
// The partition p cannot be wal rebalanced,
1498+
// let's exclude the given nodeId and give a try to full rebalance.
1499+
exchFut.markNodeAsInapplicableForHistoricalRebalance(nodeId);
1500+
}
1501+
else
1502+
exchFut.markNodeAsInapplicableForFullRebalance(nodeId, grp.groupId(), p);
1503+
14791504
missed.computeIfAbsent(nodeId, k -> new HashSet<>());
14801505

14811506
missed.get(nodeId).add(p);
@@ -1611,7 +1636,7 @@ private void checkIsDone(boolean cancelled) {
16111636

16121637
onDone(false); // Finished but has missed partitions, will force dummy exchange
16131638

1614-
ctx.exchange().forceReassign(exchId);
1639+
ctx.exchange().forceReassign(exchId, exchFut);
16151640

16161641
return;
16171642
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java

Lines changed: 59 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
1919

2020
import java.util.Collection;
21+
import java.util.Collections;
2122
import java.util.HashMap;
2223
import java.util.HashSet;
2324
import java.util.Iterator;
2425
import java.util.Map;
26+
import java.util.Optional;
2527
import java.util.Set;
2628
import java.util.UUID;
2729
import java.util.stream.Collectors;
@@ -47,7 +49,9 @@
4749
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
4850
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
4951
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
52+
import org.apache.ignite.internal.util.typedef.F;
5053
import org.apache.ignite.internal.util.typedef.T3;
54+
import org.apache.ignite.internal.util.typedef.X;
5155
import org.apache.ignite.internal.util.typedef.internal.LT;
5256
import org.apache.ignite.internal.util.typedef.internal.S;
5357
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -214,6 +218,15 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand
214218

215219
SupplyContext sctx = null;
216220

221+
Set<Integer> remainingParts = null;
222+
223+
GridDhtPartitionSupplyMessage supplyMsg = new GridDhtPartitionSupplyMessage(
224+
demandMsg.rebalanceId(),
225+
grp.groupId(),
226+
demandMsg.topologyVersion(),
227+
grp.deploymentEnabled()
228+
);
229+
217230
try {
218231
synchronized (scMap) {
219232
sctx = scMap.remove(contextId);
@@ -257,15 +270,6 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand
257270
else
258271
maxBatchesCnt = 1;
259272

260-
GridDhtPartitionSupplyMessage supplyMsg = new GridDhtPartitionSupplyMessage(
261-
demandMsg.rebalanceId(),
262-
grp.groupId(),
263-
demandMsg.topologyVersion(),
264-
grp.deploymentEnabled()
265-
);
266-
267-
Set<Integer> remainingParts;
268-
269273
if (sctx == null || sctx.iterator == null) {
270274
iter = grp.offheap().rebalanceIterator(demandMsg.partitions(), demandMsg.topologyVersion());
271275

@@ -455,42 +459,65 @@ else if (iter.isPartitionMissing(p)) {
455459
}
456460
else
457461
U.error(log, "Failed to continue supplying ["
458-
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t);
462+
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t);
459463

460464
try {
461465
if (sctx != null)
462466
clearContext(sctx, log);
463-
else if (iter != null)
464-
iter.close();
465467
}
466468
catch (Throwable t1) {
467469
U.error(log, "Failed to cleanup supplying context ["
468-
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t1);
470+
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1);
469471
}
470472

471473
if (!sendErrMsg)
472474
return;
473475

476+
boolean fallbackToFullRebalance = X.hasCause(t, IgniteHistoricalIteratorException.class);
477+
474478
try {
475-
GridDhtPartitionSupplyMessageV2 errMsg = new GridDhtPartitionSupplyMessageV2(
476-
demandMsg.rebalanceId(),
477-
grp.groupId(),
478-
demandMsg.topologyVersion(),
479-
grp.deploymentEnabled(),
480-
t
481-
);
479+
GridDhtPartitionSupplyMessage errMsg;
480+
481+
if (fallbackToFullRebalance) {
482+
// Mark the last checkpoint as not applicable for WAL rebalance.
483+
grp.shared().database().lastCheckpointInapplicableForWalRebalance(grp.groupId());
484+
485+
// Mark all remaining partitions as missed to trigger full rebalance.
486+
if (iter == null && F.isEmpty(remainingParts)) {
487+
remainingParts = new HashSet<>(demandMsg.partitions().fullSet());
488+
remainingParts.addAll(demandMsg.partitions().historicalSet());
489+
}
490+
491+
for (int p : Optional.ofNullable(remainingParts).orElseGet(Collections::emptySet))
492+
supplyMsg.missed(p);
493+
494+
errMsg = supplyMsg;
495+
}
496+
else {
497+
errMsg = new GridDhtPartitionSupplyMessageV2(
498+
demandMsg.rebalanceId(),
499+
grp.groupId(),
500+
demandMsg.topologyVersion(),
501+
grp.deploymentEnabled(),
502+
t
503+
);
504+
}
482505

483506
reply(topicId, demanderNode, demandMsg, errMsg, contextId);
484507
}
485508
catch (Throwable t1) {
486509
U.error(log, "Failed to send supply error message ["
487-
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t1);
510+
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1);
488511
}
489512

490-
grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR,
491-
new IgniteCheckedException("Failed to continue supplying ["
492-
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t)
493-
));
513+
// If fallback to full rebalance is possible then let's try to switch to it
514+
// instead of triggering failure handler.
515+
if (!fallbackToFullRebalance) {
516+
grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR,
517+
new IgniteCheckedException("Failed to continue supplying ["
518+
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t)
519+
));
520+
}
494521
}
495522
}
496523

@@ -537,7 +564,7 @@ private GridCacheEntryInfo extractEntryInfo(CacheDataRow row) {
537564
* @param demander Recipient of supply message.
538565
* @param demandMsg Demand message.
539566
* @param supplyMsg Supply message.
540-
* @param contextId Supply context id.
567+
* @param ctxId Supply context id.
541568
* @return {@code True} if message was sent, {@code false} if recipient left grid.
542569
* @throws IgniteCheckedException If failed.
543570
*/
@@ -546,7 +573,7 @@ private boolean reply(
546573
ClusterNode demander,
547574
GridDhtPartitionDemandMessage demandMsg,
548575
GridDhtPartitionSupplyMessage supplyMsg,
549-
T3<UUID, Integer, AffinityTopologyVersion> contextId
576+
T3<UUID, Integer, AffinityTopologyVersion> ctxId
550577
) throws IgniteCheckedException {
551578
try {
552579
if (log.isDebugEnabled())
@@ -567,7 +594,7 @@ else if (grp.preloader().throttle() > 0)
567594
log.debug("Failed to send supply message (demander left): [" + supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]");
568595

569596
synchronized (scMap) {
570-
clearContext(scMap.remove(contextId), log);
597+
clearContext(scMap.remove(ctxId), log);
571598
}
572599

573600
return false;
@@ -588,21 +615,21 @@ private String supplyRoutineInfo(int topicId, UUID demander, GridDhtPartitionDem
588615
/**
589616
* Saves supply context with given parameters to {@code scMap}.
590617
*
591-
* @param contextId Supply context id.
618+
* @param ctxId Supply context id.
592619
* @param entryIt Entries rebalance iterator.
593620
* @param remainingParts Set of partitions that weren't sent yet.
594621
* @param rebalanceId Rebalance id.
595622
*/
596623
private void saveSupplyContext(
597-
T3<UUID, Integer, AffinityTopologyVersion> contextId,
624+
T3<UUID, Integer, AffinityTopologyVersion> ctxId,
598625
IgniteRebalanceIterator entryIt,
599626
Set<Integer> remainingParts,
600627
long rebalanceId
601628
) {
602629
synchronized (scMap) {
603-
assert scMap.get(contextId) == null;
630+
assert scMap.get(ctxId) == null;
604631

605-
scMap.put(contextId, new SupplyContext(entryIt, remainingParts, rebalanceId));
632+
scMap.put(ctxId, new SupplyContext(entryIt, remainingParts, rebalanceId));
606633
}
607634
}
608635

0 commit comments

Comments
 (0)