Skip to content

Commit 1f707aa

Browse files
committed
Bug#32776593 NOT ALL DATA NODE TO DATA NODE CONNECTIONS SEND HEART BEAT MESSAGING [1/3]
Add method to send a signal on all transporters links to a node. Change-Id: I132225deae54c1dba2d0b7a02c17a4d7f6298c00
1 parent 26f52e2 commit 1f707aa

File tree

6 files changed

+294
-28
lines changed

6 files changed

+294
-28
lines changed

storage/ndb/include/transporter/TransporterRegistry.hpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,9 +467,13 @@ class TransporterRegistry
467467
Uint8 prio,
468468
const Uint32 *signalData,
469469
NodeId nodeId,
470-
TrpId &trp_id,
470+
Transporter* t,
471471
AnySectionArg section);
472472

473+
Transporter* prepareSend_getTransporter(const SignalHeader *signalHeader,
474+
NodeId nodeId,
475+
TrpId &trp_id,
476+
SendStatus& status);
473477

474478
public:
475479
SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
@@ -495,7 +499,15 @@ class TransporterRegistry
495499
const Uint32 *signalData,
496500
NodeId nodeId,
497501
const GenericSectionPtr ptr[3]);
498-
502+
503+
SendStatus prepareSendOverAllLinks(
504+
TransporterSendBufferHandle *sendHandle,
505+
const SignalHeader *signalHeader,
506+
Uint8 prio,
507+
const Uint32 *signalData,
508+
NodeId nodeId,
509+
TrpBitmask &trp_mask);
510+
499511
/* Send on a specific transporter */
500512
bool performSend(TrpId id, bool need_wakeup = true);
501513
/* performSendNode is only used from NDB API */

storage/ndb/src/common/transporter/TransporterRegistry.cpp

Lines changed: 151 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,31 +1058,11 @@ TransporterRegistry::prepareSendTemplate(
10581058
Uint8 prio,
10591059
const Uint32 * signalData,
10601060
NodeId nodeId,
1061-
TrpId &trp_id,
1061+
Transporter *t,
10621062
AnySectionArg section)
10631063
{
1064-
Transporter *node_trp = theNodeIdTransporters[nodeId];
1065-
if (unlikely(node_trp == NULL))
1066-
{
1067-
DEBUG("Discarding message to unknown node: " << nodeId);
1068-
return SEND_UNKNOWN_NODE;
1069-
}
1070-
assert(!node_trp->isPartOfMultiTransporter());
1071-
Transporter *t;
1072-
t = node_trp->get_send_transporter(signalHeader->theReceiversBlockNumber,
1073-
signalHeader->theSendersBlockRef);
1074-
assert(!t->isMultiTransporter());
1075-
trp_id = t->getTransporterIndex();
1076-
if (unlikely(trp_id == 0))
1077-
{
1078-
/**
1079-
* Can happen in disconnect situations, node is disconnected, so send
1080-
* to it is successful since the node won't be there to receive the
1081-
* message.
1082-
*/
1083-
DEBUG("Discarding message due to trp_id = 0");
1084-
return SEND_OK;
1085-
}
1064+
assert(t != nullptr);
1065+
10861066
if(
10871067
likely((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
10881068
(signalHeader->theReceiversBlockNumber == QMGR) ||
@@ -1093,6 +1073,7 @@ TransporterRegistry::prepareSendTemplate(
10931073
const Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, section.m_ptr);
10941074
if (likely(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE))
10951075
{
1076+
TrpId trp_id = t->getTransporterIndex();
10961077
SendStatus error = SEND_OK;
10971078
Uint32 *insertPtr = getWritePtr(sendHandle,
10981079
t,
@@ -1184,6 +1165,40 @@ TransporterRegistry::prepareSendTemplate(
11841165
}
11851166

11861167

1168+
Transporter*
1169+
TransporterRegistry::prepareSend_getTransporter(
1170+
const SignalHeader *signalHeader,
1171+
NodeId nodeId,
1172+
TrpId &trp_id,
1173+
SendStatus& status)
1174+
{
1175+
Transporter *node_trp = theNodeIdTransporters[nodeId];
1176+
if (unlikely(node_trp == NULL))
1177+
{
1178+
DEBUG("Discarding message to unknown node: " << nodeId);
1179+
status = SEND_UNKNOWN_NODE;
1180+
return nullptr;
1181+
}
1182+
assert(!node_trp->isPartOfMultiTransporter());
1183+
Transporter *t;
1184+
t = node_trp->get_send_transporter(signalHeader->theReceiversBlockNumber,
1185+
signalHeader->theSendersBlockRef);
1186+
assert(!t->isMultiTransporter());
1187+
trp_id = t->getTransporterIndex();
1188+
if (unlikely(trp_id == 0))
1189+
{
1190+
/**
1191+
* Can happen in disconnect situations, node is disconnected, so send
1192+
* to it is successful since the node won't be there to receive the
1193+
* message.
1194+
*/
1195+
DEBUG("Discarding message due to trp_id = 0");
1196+
status = SEND_OK;
1197+
return nullptr;
1198+
}
1199+
return t;
1200+
}
1201+
11871202
SendStatus
11881203
TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
11891204
const SignalHeader *signalHeader,
@@ -1193,13 +1208,19 @@ TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
11931208
TrpId &trp_id,
11941209
const LinearSectionPtr ptr[3])
11951210
{
1211+
SendStatus status;
1212+
Transporter *t = prepareSend_getTransporter(signalHeader, nodeId, trp_id,
1213+
status);
1214+
if (unlikely(t == nullptr))
1215+
return status;
1216+
11961217
const Packer::LinearSectionArg section(ptr);
11971218
return prepareSendTemplate(sendHandle,
11981219
signalHeader,
11991220
prio,
12001221
signalData,
12011222
nodeId,
1202-
trp_id,
1223+
t,
12031224
section);
12041225
}
12051226

@@ -1214,13 +1235,19 @@ TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
12141235
class SectionSegmentPool &thePool,
12151236
const SegmentedSectionPtr ptr[3])
12161237
{
1238+
SendStatus status;
1239+
Transporter *t = prepareSend_getTransporter(signalHeader, nodeId, trp_id,
1240+
status);
1241+
if (unlikely(t == nullptr))
1242+
return status;
1243+
12171244
const Packer::SegmentedSectionArg section(thePool,ptr);
12181245
return prepareSendTemplate(sendHandle,
12191246
signalHeader,
12201247
prio,
12211248
signalData,
12221249
nodeId,
1223-
trp_id,
1250+
t,
12241251
section);
12251252
}
12261253

@@ -1234,16 +1261,114 @@ TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
12341261
const GenericSectionPtr ptr[3])
12351262
{
12361263
TrpId trp_id = 0;
1264+
SendStatus status;
1265+
Transporter *t = prepareSend_getTransporter(signalHeader, nodeId, trp_id,
1266+
status);
1267+
if (unlikely(t == nullptr))
1268+
return status;
1269+
12371270
const Packer::GenericSectionArg section(ptr);
12381271
return prepareSendTemplate(sendHandle,
12391272
signalHeader,
12401273
prio,
12411274
signalData,
12421275
nodeId,
1243-
trp_id,
1276+
t,
12441277
section);
12451278
}
12461279

1280+
SendStatus
1281+
TransporterRegistry::prepareSendOverAllLinks(
1282+
TransporterSendBufferHandle *sendHandle,
1283+
const SignalHeader *signalHeader,
1284+
Uint8 prio,
1285+
const Uint32 *signalData,
1286+
NodeId nodeId,
1287+
TrpBitmask &trp_ids)
1288+
{
1289+
// node_trp handling copied from first part of prepareSend_getTransporter
1290+
Transporter *node_trp = theNodeIdTransporters[nodeId];
1291+
if (unlikely(node_trp == NULL))
1292+
{
1293+
DEBUG("Discarding message to unknown node: " << nodeId);
1294+
return SEND_UNKNOWN_NODE;
1295+
}
1296+
assert(!node_trp->isPartOfMultiTransporter());
1297+
1298+
LinearSectionPtr ptr[3];
1299+
require(signalHeader->m_noOfSections == 0);
1300+
const Packer::LinearSectionArg section(ptr);
1301+
1302+
if (!node_trp->isMultiTransporter())
1303+
{
1304+
Transporter *t = node_trp;
1305+
// t handling copied from second part of prepareSend_getTransporter
1306+
TrpId trp_id = t->getTransporterIndex();
1307+
if (unlikely(trp_id == 0))
1308+
{
1309+
/**
1310+
* Can happen in disconnect situations, node is disconnected, so send
1311+
* to it is successful since the node won't be there to receive the
1312+
* message.
1313+
*/
1314+
DEBUG("Discarding message due to trp_id = 0");
1315+
return SEND_OK;
1316+
}
1317+
1318+
SendStatus status = prepareSendTemplate(sendHandle,
1319+
signalHeader,
1320+
prio,
1321+
signalData,
1322+
nodeId,
1323+
t,
1324+
section);
1325+
1326+
if (likely(status == SEND_OK))
1327+
{
1328+
require(trp_id < MAX_NTRANSPORTERS);
1329+
trp_ids.set(trp_id);
1330+
}
1331+
return status;
1332+
}
1333+
else
1334+
{
1335+
Multi_Transporter *multi_trp = get_node_multi_transporter(nodeId);
1336+
require(multi_trp == node_trp);
1337+
1338+
SendStatus return_status = SEND_OK;
1339+
Uint32 num_trps = multi_trp->get_num_active_transporters();
1340+
for (Uint32 i = 0; i < num_trps; i++)
1341+
{
1342+
Transporter *t = multi_trp->get_active_transporter(i);
1343+
require(t != nullptr);
1344+
const TrpId trp_id = t->getTransporterIndex();
1345+
if (unlikely(trp_id == 0))
1346+
continue;
1347+
SendStatus status = prepareSendTemplate(sendHandle,
1348+
signalHeader,
1349+
prio,
1350+
signalData,
1351+
nodeId,
1352+
t,
1353+
section);
1354+
if (likely(status == SEND_OK))
1355+
{
1356+
require(trp_id < MAX_NTRANSPORTERS);
1357+
trp_ids.set(trp_id);
1358+
}
1359+
else if (status != SEND_BLOCKED && status != SEND_DISCONNECTED)
1360+
{
1361+
/*
1362+
* Treat SEND_BLOCKED and SEND_DISCONNECTED as SEND_OK.
1363+
* Else take the last bad status returned.
1364+
*/
1365+
return_status = status;
1366+
}
1367+
}
1368+
return return_status;
1369+
}
1370+
}
1371+
12471372
bool
12481373
TransporterRegistry::setup_wakeup_socket(TransporterReceiveHandle& recvdata)
12491374
{

storage/ndb/src/kernel/vm/SimulatedBlock.cpp

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1963,6 +1963,95 @@ SimulatedBlock::sendSignalWithDelay(BlockReference ref,
19631963
sections->m_cnt = 0;
19641964
}
19651965

1966+
/*
1967+
* Copy implementation for sendSignalOverAllLinks from sendSignal, excluding part for
1968+
* local send.
1969+
*/
1970+
void
1971+
SimulatedBlock::sendSignalOverAllLinks(BlockReference ref,
1972+
GlobalSignalNumber gsn,
1973+
Signal25* signal,
1974+
Uint32 length,
1975+
JobBufferLevel jobBuffer) const
1976+
{
1977+
1978+
BlockReference sendBRef = reference();
1979+
1980+
Uint32 recBlock = refToBlock(ref);
1981+
Uint32 recNode = refToNode(ref);
1982+
Uint32 ourProcessor = globalData.ownId;
1983+
1984+
ndbrequire(signal->header.m_noOfSections == 0);
1985+
check_sections(signal, signal->header.m_noOfSections, 0);
1986+
1987+
signal->header.theLength = length;
1988+
signal->header.theVerId_signalNumber = gsn;
1989+
signal->header.theReceiversBlockNumber = recBlock;
1990+
signal->header.m_noOfSections = 0;
1991+
1992+
Uint32 tSignalId = signal->header.theSignalId;
1993+
1994+
if (unlikely((length == 0) || length > 25 || (recBlock == 0)))
1995+
{
1996+
signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1997+
return;
1998+
}//if
1999+
#ifdef VM_TRACE
2000+
if (globalData.testOn){
2001+
Uint16 proc =
2002+
(recNode == 0 ? globalData.ownId : recNode);
2003+
signal->header.theSendersBlockRef = sendBRef;
2004+
globalSignalLoggers.sendSignal(signal->header,
2005+
jobBuffer,
2006+
&signal->theData[0],
2007+
proc);
2008+
}
2009+
#endif
2010+
2011+
// Local send part not copied from sendSignal.
2012+
ndbrequire(recNode != ourProcessor);
2013+
ndbrequire(recNode != 0);
2014+
2015+
// send distributed Signal
2016+
SignalHeader sh;
2017+
2018+
Uint32 tTrace = signal->getTrace();
2019+
2020+
sh.theVerId_signalNumber = gsn;
2021+
sh.theReceiversBlockNumber = recBlock;
2022+
sh.theSendersBlockRef = refToBlock(sendBRef);
2023+
sh.theLength = length;
2024+
sh.theTrace = tTrace;
2025+
sh.theSignalId = tSignalId;
2026+
sh.m_noOfSections = 0;
2027+
sh.m_fragmentInfo = 0;
2028+
2029+
#ifdef TRACE_DISTRIBUTED
2030+
g_eventLogger->info("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn,
2031+
getBlockName(recBlock), recNode);
2032+
#endif
2033+
2034+
SendStatus ss;
2035+
#ifdef NDBD_MULTITHREADED
2036+
ss = mt_send_remote_over_all_links(m_threadId, &sh, jobBuffer,
2037+
&signal->theData[0], recNode);
2038+
#else
2039+
TrpBitmask trp_ids;
2040+
ss = globalTransporterRegistry.
2041+
prepareSendOverAllLinks(getNonMTTransporterSendHandle(),
2042+
&sh, jobBuffer,
2043+
&signal->theData[0], recNode, trp_ids);
2044+
#endif
2045+
2046+
if (unlikely(! (ss == SEND_OK ||
2047+
ss == SEND_BLOCKED ||
2048+
ss == SEND_DISCONNECTED)))
2049+
{
2050+
handle_send_failed(ss, signal, recNode, (LinearSectionPtr*)NULL);
2051+
}
2052+
return;
2053+
}
2054+
19662055
void
19672056
SimulatedBlock::release(SegmentedSectionPtr & ptr)
19682057
{

storage/ndb/src/kernel/vm/SimulatedBlock.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,12 @@ class alignas(NDB_CL) SimulatedBlock :
842842
Uint32 length,
843843
SectionHandle* sections) const;
844844

845+
void sendSignalOverAllLinks(BlockReference ref,
846+
GlobalSignalNumber gsn,
847+
Signal25* signal,
848+
Uint32 length,
849+
JobBufferLevel jbuf ) const ;
850+
845851
/**
846852
* EXECUTE_DIRECT comes in five variants.
847853
*

0 commit comments

Comments
 (0)