Skip to content

Commit 04efc4c

Browse files
feat: [TS-6100] Forbid where when using %%trows (#31827)
* feat: [TS-6100] Forbid where when using %%trows * test: update cases * feat: [TS-6100] Fix leaks. --------- Co-authored-by: Simon Guan <[email protected]>
1 parent 49b7610 commit 04efc4c

File tree

8 files changed

+116
-120
lines changed

8 files changed

+116
-120
lines changed

source/libs/parser/src/parTranslater.c

Lines changed: 18 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3053,16 +3053,7 @@ static EDealRes translatePlaceHolderFunc(STranslateContext* pCxt, SNode** pFunc)
30533053
}
30543054
case FUNCTION_TYPE_PLACEHOLDER_TBNAME: {
30553055
BIT_FLAG_SET_MASK(pCxt->placeHolderBitmap, PLACE_HOLDER_PARTITION_TBNAME);
3056-
if (BIT_FLAG_TEST_MASK(pCxt->placeHolderBitmap, PLACE_HOLDER_PARTITION_ROWS) && pCxt->createStreamCalc) {
3057-
SFunctionNode *pTbname = NULL;
3058-
PAR_ERR_JRET(createTbnameFunction(&pTbname));
3059-
tstrncpy(pTbname->node.userAlias, ((SExprNode*)*pFunc)->userAlias, TSDB_COL_NAME_LEN);
3060-
nodesDestroyNode(*pFunc);
3061-
*pFunc = (SNode*)pTbname;
3062-
return translateFunction(pCxt, (SFunctionNode**)pFunc);
3063-
} else {
3064-
PAR_ERR_JRET(nodesMakeValueNodeFromString("", (SValueNode**)&extraValue));
3065-
}
3056+
PAR_ERR_JRET(nodesMakeValueNodeFromString("", (SValueNode**)&extraValue));
30663057
break;
30673058
}
30683059
case FUNCTION_TYPE_PLACEHOLDER_COLUMN: {
@@ -3083,32 +3074,11 @@ static EDealRes translatePlaceHolderFunc(STranslateContext* pCxt, SNode** pFunc)
30833074
PAR_ERR_JRET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_STREAM_INVALID_PLACE_HOLDER, "%%n : partition index out of range"));
30843075
}
30853076

3086-
if (BIT_FLAG_TEST_MASK(pCxt->placeHolderBitmap, PLACE_HOLDER_PARTITION_ROWS) && pCxt->createStreamCalc) {
3087-
if (nodeType(pExpr) == QUERY_NODE_FUNCTION) {
3088-
SFunctionNode* pTbname = NULL;
3089-
PAR_ERR_JRET(createTbnameFunction(&pTbname));
3090-
tstrncpy(pTbname->node.userAlias, ((SExprNode*)*pFunc)->userAlias, TSDB_COL_NAME_LEN);
3091-
nodesDestroyNode(*pFunc);
3092-
*pFunc = (SNode*)pTbname;
3093-
return translateFunction(pCxt, (SFunctionNode**)pFunc);
3094-
} else if (nodeType(pExpr) == QUERY_NODE_COLUMN) {
3095-
SColumnNode* pCol = NULL;
3096-
PAR_ERR_JRET(nodesCloneNode((SNode*)pExpr, (SNode**)&pCol));
3097-
tstrncpy(pCol->node.userAlias, ((SExprNode*)*pFunc)->userAlias, TSDB_COL_NAME_LEN);
3098-
nodesDestroyNode(*pFunc);
3099-
*pFunc = (SNode*)pCol;
3100-
return translateColumn(pCxt, (SColumnNode**)pFunc);
3101-
} else {
3102-
PAR_ERR_JRET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_STREAM_INVALID_PLACE_HOLDER,
3103-
"%%n : partition index must be a column or tbname function"));
3104-
}
3105-
} else {
3106-
PAR_ERR_JRET(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&extraValue));
3107-
((SValueNode*)extraValue)->node.resType = pExpr->resType;
3108-
((SValueNode*)extraValue)->isNull = true;
3077+
PAR_ERR_JRET(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&extraValue));
3078+
((SValueNode*)extraValue)->node.resType = pExpr->resType;
3079+
((SValueNode*)extraValue)->isNull = true;
31093080

3110-
pFuncNode->node.resType = pExpr->resType;
3111-
}
3081+
pFuncNode->node.resType = pExpr->resType;
31123082
break;
31133083
}
31143084
default:
@@ -4746,7 +4716,7 @@ static int32_t setTrowsTableVgroupList(STranslateContext* pCxt, SName* pName, SR
47464716
if (vg == NULL) {
47474717
PAR_ERR_JRET(terrno);
47484718
}
4749-
pRealTable->pVgroupList->vgroups[0] = *vg;
4719+
memcpy(pRealTable->pVgroupList->vgroups, vg, sizeof(SVgroupInfo));
47504720

47514721
_return:
47524722
taosArrayDestroy(vgroupList);
@@ -6040,10 +6010,14 @@ static int32_t translatePlaceHolderTable(STranslateContext* pCxt, SNode** pTable
60406010
newPlaceHolderTable->pMeta->tableType == TSDB_SUPER_TABLE) {
60416011
newPlaceHolderTable->asSingleTable = true;
60426012
}
6013+
if (inJoin) {
6014+
PAR_ERR_JRET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
6015+
"%%%%trows should not appear in join condition"));
6016+
}
60436017
break;
60446018
}
60456019
default: {
6046-
PAR_ERR_JRET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_TSC_INVALID_OPERATION, "invalid placeholder table type"));
6020+
PAR_ERR_JRET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "invalid placeholder table type"));
60476021
break;
60486022
}
60496023
}
@@ -6052,6 +6026,7 @@ static int32_t translatePlaceHolderTable(STranslateContext* pCxt, SNode** pTable
60526026

60536027
return code;
60546028
_return:
6029+
nodesDestroyNode((SNode*)newPlaceHolderTable);
60556030
parserError("translatePlaceHolderTable failed, code:%d, errmsg:%s", code, tstrerror(code));
60566031
return code;
60576032
}
@@ -8395,10 +8370,13 @@ static int32_t setTableVgroupsFromEqualTbnameCond(STranslateContext* pCxt, SSele
83958370

83968371
static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) {
83978372
pCxt->currClause = SQL_CLAUSE_WHERE;
8398-
int32_t code = translateExpr(pCxt, &pSelect->pWhere);
8399-
if (TSDB_CODE_SUCCESS == code) {
8400-
code = getQueryTimeRange(pCxt, &pSelect->pWhere, &pSelect->timeRange, &pSelect->pTimeRange, pSelect->pFromTable);
8373+
int32_t code = TSDB_CODE_SUCCESS;
8374+
if (pSelect->pWhere && BIT_FLAG_TEST_MASK(pCxt->placeHolderBitmap, PLACE_HOLDER_PARTITION_ROWS) && pCxt->createStreamCalc) {
8375+
PAR_ERR_RET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
8376+
"%%%%trows can not be used with WHERE clause."));
84018377
}
8378+
PAR_ERR_RET(translateExpr(pCxt, &pSelect->pWhere));
8379+
PAR_ERR_RET(getQueryTimeRange(pCxt, &pSelect->pWhere, &pSelect->timeRange, &pSelect->pTimeRange, pSelect->pFromTable));
84028380
if (pSelect->pWhere != NULL && pCxt->pParseCxt->topicQuery == false) {
84038381
PAR_ERR_RET(setTableVgroupsFromEqualTbnameCond(pCxt, pSelect));
84048382
}

test/cases/13-StreamProcessing/07-SubQuery/test_subquery_count_1.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ def createStreams(self):
206206

207207
stream = StreamItem(
208208
id=6,
209-
stream="create stream rdb.s6 count_window(1, c1) from tdb.triggers partition by tbname into rdb.r6 as select _twstart ts, count(c1), avg(c2) from %%trows where ts >= _twstart and ts <= _twend partition by tbname",
209+
stream="create stream rdb.s6 count_window(1, c1) from tdb.triggers partition by tbname into rdb.r6 as select _twstart ts, count(c1), avg(c2) from %%trows partition by tbname",
210210
res_query="select *, tag_tbname from rdb.r6 where tag_tbname='t1'",
211211
exp_query="select _wstart, count(c1), avg(c2), 't1', 't1' from tdb.t1 where ts >= '2025-01-01 00:00:00' and ts < '2025-01-01 00:35:00' interval(5m);",
212212
)
@@ -222,7 +222,7 @@ def createStreams(self):
222222

223223
stream = StreamItem(
224224
id=8,
225-
stream="create stream rdb.s8 count_window(1, c1) from tdb.triggers partition by tbname into rdb.r8 as select _twstart ts, count(c1), avg(c2) from %%trows where ts >= _twstart and ts <= _twend partition by %%1",
225+
stream="create stream rdb.s8 count_window(1, c1) from tdb.triggers partition by tbname into rdb.r8 as select _twstart ts, count(c1), avg(c2) from %%trows partition by %%1",
226226
res_query="select *, tag_tbname from rdb.r8 where tag_tbname='t1'",
227227
exp_query="select _wstart, count(c1), avg(c2), 't1', 't1' from tdb.t1 where ts >= '2025-01-01 00:00:00' and ts < '2025-01-01 00:35:00' interval(5m);",
228228
)
@@ -256,9 +256,9 @@ def createStreams(self):
256256

257257
stream = StreamItem(
258258
id=12,
259-
stream="create stream rdb.s12 count_window(1, c1) from tdb.triggers partition by tbname into rdb.r12 as select _twstart ts, %%tbname tb, %%1, count(*) v1, avg(c1) v2, first(c1) v3, last(c1) v4 from %%trows where c2 > 0;",
259+
stream="create stream rdb.s12 count_window(1, c1) from tdb.triggers partition by tbname into rdb.r12 as select _twstart ts, %%tbname tb, %%1, count(*) v1, avg(c1) v2, first(c1) v3, last(c1) v4 from %%trows;",
260260
res_query="select ts, tb, `%%1`, v2, v3, v4, tag_tbname from rdb.r12 where tb='t1'",
261-
exp_query="select _wstart, 't1', 't1', avg(c1) v2, first(c1) v3, last(c1) v4, 't1' from tdb.t1 where ts >= '2025-01-01 00:00:00' and ts < '2025-01-01 00:35:00' and c2 > 0 interval(5m);",
261+
exp_query="select _wstart, 't1', 't1', avg(c1) v2, first(c1) v3, last(c1) v4, 't1' from tdb.t1 where ts >= '2025-01-01 00:00:00' and ts < '2025-01-01 00:35:00' interval(5m);",
262262
check_func=self.check12,
263263
)
264264
self.streams.append(stream)
@@ -435,9 +435,9 @@ def createStreams(self):
435435

436436
stream = StreamItem(
437437
id=34,
438-
stream="create stream rdb.s34 count_window(1, c1) from tdb.triggers partition by tbname into rdb.r34 as select _twstart - 5m tc, TIMEDIFF(_twstart - 5m, _twstart + 5m) tx, %%tbname tb, %%1 tg1, sum(c1) c1, avg(c2) c2, first(c1) c3, last(c2) c4 from %%trows where c1 > 0;",
438+
stream="create stream rdb.s34 count_window(1, c1) from tdb.triggers partition by tbname into rdb.r34 as select _twstart - 5m tc, TIMEDIFF(_twstart - 5m, _twstart + 5m) tx, %%tbname tb, %%1 tg1, sum(c1) c1, avg(c2) c2, first(c1) c3, last(c2) c4 from %%trows;",
439439
res_query="select * from rdb.r34 where tag_tbname = 't1';",
440-
exp_query="select _wstart - 5m, TIMEDIFF(_wstart, _wend) * 2, 't1', 't1', sum(c1) c1, avg(c2) c2, first(c1) c3, last(c2) c4 , 't1' from tdb.t1 where ts >= '2025-01-01 00:00:00.000' and ts < '2025-01-01 00:35:00.000' and c1 > 0 interval(5m);",
440+
exp_query="select _wstart - 5m, TIMEDIFF(_wstart, _wend) * 2, 't1', 't1', sum(c1) c1, avg(c2) c2, first(c1) c3, last(c2) c4 , 't1' from tdb.t1 where ts >= '2025-01-01 00:00:00.000' and ts < '2025-01-01 00:35:00.000' interval(5m);",
441441
)
442442
self.streams.append(stream)
443443

@@ -613,9 +613,9 @@ def createStreams(self):
613613

614614
stream = StreamItem(
615615
id=56,
616-
stream="create stream rdb.s56 count_window(1, c1) from tdb.v1 into rdb.r56 as select _wstart ws, _wend we, _twstart tws, _twend + 5m twe, first(c1) cf, last(c1) cl, count(c1) cc from %%trows where ts >= _twstart and ts < _twend + 5m interval(1m) fill(prev)",
616+
stream="create stream rdb.s56 count_window(1, c1) from tdb.v1 into rdb.r56 as select _wstart ws, _wend we, _twstart tws, _twend + 5m twe, first(c1) cf, last(c1) cl, count(c1) cc from %%trows interval(1m)",
617617
res_query="select * from rdb.r56 where ws >= '2025-01-01 00:00:00.000' and we <= '2025-01-01 00:05:00.000' ",
618-
exp_query="select _wstart ws, _wend we, cast('2025-01-01 00:00:00.000' as timestamp) tws, cast('2025-01-01 00:05:00.000' as timestamp) twe, first(c1) cf, last(c1) cl, count(c1) cc from tdb.v1 where ts >= '2025-01-01 00:00:00.000' and ts < '2025-01-01 00:05:00.000' interval(1m) fill(prev);",
618+
exp_query="select _wstart ws, _wend we, cast('2025-01-01 00:00:00.000' as timestamp) tws, cast('2025-01-01 00:05:00.000' as timestamp) twe, first(c1) cf, last(c1) cl, count(c1) cc from tdb.v1 where ts >= '2025-01-01 00:00:00.000' and ts < '2025-01-01 00:05:00.000' interval(1m);",
619619
)
620620
self.streams.append(stream)
621621

@@ -878,7 +878,7 @@ def createStreams(self):
878878

879879
stream = StreamItem(
880880
id=89,
881-
stream="create stream rdb.s89 interval(5m) sliding(5m) from tdb.v1 into rdb.r89 as select _twstart tw, _c0 ta, _rowts tb, c1, c2, rand() c3 from %%trows where _c0 >= _twstart and _c0 < _twend + 5m order by _c0 limit 1",
881+
stream="create stream rdb.s89 interval(5m) sliding(5m) from tdb.v1 into rdb.r89 as select _twstart tw, _c0 ta, _rowts tb, c1, c2, rand() c3 from %%trows order by _c0 limit 1",
882882
res_query="select tw, ta, tb, c1, c2 from rdb.r89 limit 3",
883883
exp_query="select _c0, _c0, _rowts, c1, c2 from tdb.v1 where ts = '2025-01-01 00:00:00.000' or ts = '2025-01-01 00:05:00.000' or ts = '2025-01-01 00:10:00.000' order by _c0 limit 3;",
884884
check_func=self.check89,
@@ -895,7 +895,7 @@ def createStreams(self):
895895

896896
stream = StreamItem(
897897
id=91,
898-
stream="create stream rdb.s91 count_window(1, c1) from tdb.triggers partition by id, name, tbname into rdb.r91 as select _twstart, id cid, name cname, sum(c1) amount from %%trows where ts between _twstart and _twend + 5m and id=%%1 and name=%%1 partition by id, name having sum(c1) <= 5;",
898+
stream="create stream rdb.s91 count_window(1, c1) from tdb.triggers partition by id, name, tbname into rdb.r91 as select _twstart, id cid, name cname, sum(c1) amount from %%trows partition by id, name having sum(c1) <= 5;",
899899
res_query="select * from rdb.r91",
900900
exp_query="select _wstart, id, name, sum(c1), id, name, tbname from tdb.t1 where ts >= '2025-01-01 00:00:00.000' and ts < '2025-01-01 00:15:00.000' partition by tbname interval(5m) having sum(c1) <= 5;",
901901
)
@@ -1159,9 +1159,9 @@ def createStreams(self):
11591159

11601160
stream = StreamItem(
11611161
id=124,
1162-
stream="create stream rdb.s124 count_window(1, c1) from tdb.triggers partition by tbname into rdb.r124 as select _wstart ts, count(c1), sum(c2) from %%trows where ts >= _twstart and ts <= _twend + 5m interval(1m) fill(prev)",
1162+
stream="create stream rdb.s124 count_window(1, c1) from tdb.triggers partition by tbname into rdb.r124 as select _wstart ts, count(c1), sum(c2) from %%trows interval(1m)",
11631163
res_query="select * from rdb.r124 where tag_tbname='t1' limit 15;",
1164-
exp_query="select _wstart, count(c1), sum(c2), 't1' from tdb.t1 where ts >= '2025-01-01 00:00:00.000' and ts < '2025-01-01 00:15:00.000' interval(1m) fill(prev);",
1164+
exp_query="select _wstart, count(c1), sum(c2), 't1' from tdb.t1 where ts >= '2025-01-01 00:00:00.000' and ts < '2025-01-01 00:35:00.000' interval(1m);",
11651165
)
11661166
self.streams.append(stream)
11671167

@@ -1219,7 +1219,7 @@ def createStreams(self):
12191219
res_query="select tats, tbts, tac1, tac2, tbc1, tbc2 from rdb.r131",
12201220
exp_query="select ta.ts tats, tb.cts tbts, ta.c1 tac1, ta.c2 tac2, tb.cint tbc1, tb.cuint tbc2 from tdb.v1 ta right join qdb.t1 tb on ta.ts=tb.cts where ta.ts >= '2025-01-01 00:00:00.000' and ta.ts < '2025-01-01 00:35:00.000';",
12211221
)
1222-
# self.streams.append(stream) TD-36477
1222+
# self.streams.append(stream) forbidden
12231223

12241224
stream = StreamItem(
12251225
id=132,
@@ -1234,7 +1234,7 @@ def createStreams(self):
12341234
stream="create stream rdb.s133 count_window(1) from tdb.v2 partition by id, tbname into rdb.r133 as select ta.ts tats, tb.cts tbts, ta.c1 tac1, ta.c2 tac2, tb.cint tbc1, tb.cuint tbc2, _twstart, _twend from %%trows ta right join qdb.t1 tb on ta.ts=tb.cts where ta.ts >= _twstart and ta.ts < _twend + 5m;",
12351235
check_func=self.check133,
12361236
)
1237-
self.streams.append(stream)
1237+
# self.streams.append(stream) forbidden
12381238

12391239
stream = StreamItem(
12401240
id=134,

0 commit comments

Comments
 (0)