Skip to content

Commit 9a19244

Browse files
authored
Merge pull request taosdata#22520 from taosdata/fix/dataSinkIssue
fix: global data sink manager issue
2 parents b5bd8f7 + cb70861 commit 9a19244

File tree

6 files changed

+58
-29
lines changed

6 files changed

+58
-29
lines changed

include/libs/executor/dataSinkMgt.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ typedef struct SDataSinkMgtCfg {
5959
uint32_t maxDataBlockNumPerQuery;
6060
} SDataSinkMgtCfg;
6161

62-
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI);
62+
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager);
6363

6464
typedef struct SInputData {
6565
const struct SSDataBlock* pData;
@@ -83,7 +83,7 @@ typedef struct SOutputData {
8383
* @param pHandle output
8484
* @return error code
8585
*/
86-
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id);
86+
int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id);
8787

8888
int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat);
8989

source/libs/executor/src/dataDeleter.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
224224
}
225225
taosCloseQueue(pDeleter->pDataBlocks);
226226
taosThreadMutexDestroy(&pDeleter->mutex);
227+
228+
taosMemoryFree(pDeleter->pManager);
227229
return TSDB_CODE_SUCCESS;
228230
}
229231

@@ -279,6 +281,8 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
279281
if (deleter != NULL) {
280282
destroyDataSinker((SDataSinkHandle*)deleter);
281283
taosMemoryFree(deleter);
284+
} else {
285+
taosMemoryFree(pManager);
282286
}
283287
return code;
284288
}

source/libs/executor/src/dataDispatcher.c

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
226226
}
227227
taosCloseQueue(pDispatcher->pDataBlocks);
228228
taosThreadMutexDestroy(&pDispatcher->mutex);
229+
taosMemoryFree(pDispatcher->pManager);
229230
return TSDB_CODE_SUCCESS;
230231
}
231232

@@ -240,7 +241,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
240241
SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
241242
if (NULL == dispatcher) {
242243
terrno = TSDB_CODE_OUT_OF_MEMORY;
243-
return TSDB_CODE_OUT_OF_MEMORY;
244+
goto _return;
244245
}
245246
dispatcher->sink.fPut = putDataBlock;
246247
dispatcher->sink.fEndPut = endPut;
@@ -257,8 +258,13 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
257258
if (NULL == dispatcher->pDataBlocks) {
258259
taosMemoryFree(dispatcher);
259260
terrno = TSDB_CODE_OUT_OF_MEMORY;
260-
return TSDB_CODE_OUT_OF_MEMORY;
261+
goto _return;
261262
}
262263
*pHandle = dispatcher;
263264
return TSDB_CODE_SUCCESS;
265+
266+
_return:
267+
268+
taosMemoryFree(pManager);
269+
return terrno;
264270
}

source/libs/executor/src/dataInserter.c

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
395395
taosMemoryFree(pInserter->pParam);
396396
taosHashCleanup(pInserter->pCols);
397397
taosThreadMutexDestroy(&pInserter->mutex);
398+
399+
taosMemoryFree(pInserter->pManager);
398400
return TSDB_CODE_SUCCESS;
399401
}
400402

@@ -411,7 +413,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
411413
if (NULL == inserter) {
412414
taosMemoryFree(pParam);
413415
terrno = TSDB_CODE_OUT_OF_MEMORY;
414-
return TSDB_CODE_OUT_OF_MEMORY;
416+
goto _return;
415417
}
416418

417419
SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink;
@@ -431,23 +433,18 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
431433
int64_t suid = 0;
432434
int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
433435
if (code) {
434-
destroyDataSinker((SDataSinkHandle*)inserter);
435-
taosMemoryFree(inserter);
436-
return code;
436+
terrno = code;
437+
goto _return;
437438
}
438439

439440
if (pInserterNode->stableId != suid) {
440-
destroyDataSinker((SDataSinkHandle*)inserter);
441-
taosMemoryFree(inserter);
442441
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
443-
return terrno;
442+
goto _return;
444443
}
445444

446445
inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
447446
taosThreadMutexInit(&inserter->mutex, NULL);
448447
if (NULL == inserter->pDataBlocks) {
449-
destroyDataSinker((SDataSinkHandle*)inserter);
450-
taosMemoryFree(inserter);
451448
terrno = TSDB_CODE_OUT_OF_MEMORY;
452449
return TSDB_CODE_OUT_OF_MEMORY;
453450
}
@@ -471,4 +468,15 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
471468

472469
*pHandle = inserter;
473470
return TSDB_CODE_SUCCESS;
471+
472+
_return:
473+
474+
if (inserter) {
475+
destroyDataSinker((SDataSinkHandle*)inserter);
476+
taosMemoryFree(inserter);
477+
} else {
478+
taosMemoryFree(pManager);
479+
}
480+
481+
return terrno;
474482
}

source/libs/executor/src/dataSinkMgt.c

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@
1818
#include "planner.h"
1919
#include "tarray.h"
2020

21-
static SDataSinkManager gDataSinkManager = {0};
2221
SDataSinkStat gDataSinkStat = {0};
2322

24-
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI) {
25-
gDataSinkManager.cfg = *cfg;
26-
gDataSinkManager.pAPI = pAPI;
23+
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager) {
24+
SDataSinkManager* pSinkManager = taosMemoryMalloc(sizeof(SDataSinkManager));
25+
if (NULL == pSinkManager) {
26+
return TSDB_CODE_OUT_OF_MEMORY;
27+
}
28+
pSinkManager->cfg = *cfg;
29+
pSinkManager->pAPI = pAPI;
30+
31+
*ppSinkManager = pSinkManager;
2732
return 0; // to avoid compiler eror
2833
}
2934

@@ -33,18 +38,22 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) {
3338
return 0;
3439
}
3540

36-
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
41+
int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
42+
SDataSinkManager* pManager = pSinkManager;
3743
switch ((int)nodeType(pDataSink)) {
3844
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
39-
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
45+
return createDataDispatcher(pManager, pDataSink, pHandle);
4046
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
41-
return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam);
47+
return createDataDeleter(pManager, pDataSink, pHandle, pParam);
4248
}
4349
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
44-
return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam);
50+
return createDataInserter(pManager, pDataSink, pHandle, pParam);
4551
}
52+
default:
53+
break;
4654
}
4755

56+
taosMemoryFree(pSinkManager);
4857
qError("invalid input node type:%d, %s", nodeType(pDataSink), id);
4958
return TSDB_CODE_QRY_INVALID_INPUT;
5059
}

source/libs/executor/src/executor.c

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -511,23 +511,25 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
511511
goto _error;
512512
}
513513

514-
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
515-
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI);
516-
if (code != TSDB_CODE_SUCCESS) {
517-
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
518-
goto _error;
519-
}
520-
521514
if (handle) {
515+
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
516+
void* pSinkManager = NULL;
517+
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
518+
if (code != TSDB_CODE_SUCCESS) {
519+
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
520+
goto _error;
521+
}
522+
522523
void* pSinkParam = NULL;
523524
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
524525
if (code != TSDB_CODE_SUCCESS) {
525526
qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
527+
taosMemoryFree(pSinkManager);
526528
goto _error;
527529
}
528530

529531
// pSinkParam has been freed during create sinker.
530-
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
532+
code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
531533
}
532534

533535
qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);

0 commit comments

Comments
 (0)