From a18a88cad607e35d7ae5091b7a390a9fe75f5c75 Mon Sep 17 00:00:00 2001 From: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com> Date: Thu, 5 Feb 2026 17:23:36 +0800 Subject: [PATCH 01/12] fix: set max value of supportVnodes from 4096 to 1024. (#34497) --- docs/en/14-reference/01-components/01-taosd.md | 2 +- docs/zh/14-reference/01-components/01-taosd.md | 2 +- source/common/src/tglobal.c | 2 +- tests/system-test/0-others/show.py | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/en/14-reference/01-components/01-taosd.md b/docs/en/14-reference/01-components/01-taosd.md index d4ef0b2fc230..306d5a8013b9 100644 --- a/docs/en/14-reference/01-components/01-taosd.md +++ b/docs/en/14-reference/01-components/01-taosd.md @@ -188,7 +188,7 @@ The effective value of charset is UTF-8. | Parameter Name | Supported Version | Dynamic Modification | Description | | -------------------------- | ----------------- | ---------------------------------- | ------------------------------------------------------------ | -| supportVnodes | | Supported, effective immediately | Maximum number of vnodes supported by a dnode, range 0-4096, default value is twice the number of CPU cores + 5 | +| supportVnodes | | Supported, effective immediately | Maximum number of vnodes supported by a dnode, range 0-1024, default value is twice the number of CPU cores + 5 | | numOfCommitThreads | | Supported, effective after restart | Maximum number of commit threads, range 1-1024, default value 4 | | numOfCompactThreads | | Supported, effective after restart | Maximum number of commit threads, range 1-16, default value 2 | | numOfMnodeReadThreads | | Supported, effective after restart | Number of Read threads for mnode, range 0-1024, default value is one quarter of the CPU cores (not exceeding 4) | diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index 2803e0abc4d6..14651bd7a094 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -747,7 +747,7 @@ charset 的有效值是 UTF-8。 - 类型:整数 - 默认值:CPU 核数的 2 倍 + 5 - 最小值:0 -- 最大值:4096 +- 最大值:1024 - 动态修改:仅在企业版支持通过 SQL 修改,立即生效。 - 支持版本:从 v3.1.0.0 版本开始引入 diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 61f45f987e82..fb1dffb2530e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -928,7 +928,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { // clang-format off TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER, CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE, CFG_CATEGORY_LOCAL)); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER, CFG_CATEGORY_LOCAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER, CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddString(pCfg, "encryptAlgorithm", tsEncryptAlgorithm, CFG_SCOPE_SERVER, CFG_DYN_NONE, CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddString(pCfg, "encryptScope", tsEncryptScope, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL)); diff --git a/tests/system-test/0-others/show.py b/tests/system-test/0-others/show.py index e6fa7bf16b41..96b1ffd51fcb 100644 --- a/tests/system-test/0-others/show.py +++ b/tests/system-test/0-others/show.py @@ -273,8 +273,8 @@ def test_show_variables(self): tdSql.error(f'ALTER LOCAL "{var}" "{error_val}"') var = 'supportVnodes' - expect_val = 1240 ## 1.211111 * 1024 - sql = f'ALTER DNODE 1 "{var}" "1.211111k"' + expect_val = 1024 + sql = f'ALTER DNODE 1 "{var}" "1024"' tdSql.execute(sql, queryTimes=1) val = int(self.get_variable(var, False)) if val != expect_val: From 6de275043378713a13d628697510aa54f50eb19a Mon Sep 17 00:00:00 2001 From: Daniel Clow <106956386+danielclow@users.noreply.github.com> Date: Thu, 5 Feb 2026 18:01:54 +0800 Subject: [PATCH 02/12] docs: add database compact parameter descriptions (#34515) --- .../14-reference/03-taos-sql/02-database.md | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/docs/en/14-reference/03-taos-sql/02-database.md b/docs/en/14-reference/03-taos-sql/02-database.md index 1faa887fb1d6..718e0a07bc1c 100644 --- a/docs/en/14-reference/03-taos-sql/02-database.md +++ b/docs/en/14-reference/03-taos-sql/02-database.md @@ -36,6 +36,9 @@ database_option: { | WAL_FSYNC_PERIOD value | WAL_RETENTION_PERIOD value | WAL_RETENTION_SIZE value + | COMPACT_INTERVAL value + | COMPACT_TIME_RANGE value + | COMPACT_TIME_OFFSET value } ``` @@ -80,6 +83,28 @@ database_option: { - WAL_RETENTION_PERIOD: For data subscription consumption, the maximum duration strategy for additional retention of WAL log files. WAL log cleaning is not affected by the consumption status of subscription clients. In seconds. Default is 3600, meaning WAL retains the most recent 3600 seconds of data, please modify this parameter to an appropriate value according to the needs of data subscription. - WAL_RETENTION_SIZE: For data subscription consumption, the maximum cumulative size strategy for additional retention of WAL log files. In KB. Default is 0, meaning there is no upper limit on cumulative size. +:::note + +The following parameters are available in TDengine Enterprise only. + +::: + +- **COMPACT_INTERVAL:** Interval at which to trigger automatic database compaction. The default value is 0, which disables automatic database compaction. To enable automatic database compaction, specify a value between 10m and `KEEP2`. The time unit of the value can be minutes (m), hours (h), or days (d), and the default unit is days. + + - Note that time slices start from 1970-01-01T00:00:00Z. + + - Automatic database compaction is not triggered when an existing compaction task is already running on the database. + +- **COMPACT_TIME_RANGE:** Time range for automatic compact tasks. The default value is `0, 0`, which indicates the range from `-KEEP2` to `-DURATION`. You can specify a custom time range starting at or after `-KEEP2` and ending at or before `-DURATION`. The time unit of the values in this range can be minutes (m), hours (h), or days (d), and the default unit is days. + + For example, `-300, -200` would compact data between 300 and 200 days in the past each time automatic compaction is triggered. If the duration parameter of the database is the default 10 days, `-300, -5` would return an error because the second value (5 days in the past) is more recent than the value of `-DURATION` (10 days in the past). + + Note that these values are negative numbers, indicating that the time range to be compacted is in the past. + +- **COMPACT_TIME_OFFSET:** Time offset relative to local time at which to trigger automatic database compaction. The default value is 0. You can enter an offset between 0 and 23 to trigger compaction after the specified number of hours. + + For example, if `COMPACT_INTERVAL` is `1d` and `COMPACT_TIME_OFFSET` is `0`, automatic compact is triggered at 00:00 every day. If `COMPACT_TIME_OFFSET` is `2`, automatic compact is triggered at 02:00 every day. + ### Database Creation Example ```sql From f5701c21a458dcb23f91ad967e574ab12ae89bc1 Mon Sep 17 00:00:00 2001 From: haoranchen Date: Fri, 6 Feb 2026 15:20:52 +0800 Subject: [PATCH 03/12] fix: ensure OOMScoreAdjust is set for taosd service (#34526) --- packaging/cfg/taosd.service | 1 + 1 file changed, 1 insertion(+) diff --git a/packaging/cfg/taosd.service b/packaging/cfg/taosd.service index e10282aecfe0..387cce0c21cb 100644 --- a/packaging/cfg/taosd.service +++ b/packaging/cfg/taosd.service @@ -17,6 +17,7 @@ Restart=always StartLimitBurst=3 StartLimitInterval=900s EnvironmentFile=-/etc/default/taosd +OOMScoreAdjust=-100 [Install] WantedBy=multi-user.target From 620fe662283e9634965d89fd2e0071da5e90f948 Mon Sep 17 00:00:00 2001 From: Mario Peng <48949600+Pengrongkun@users.noreply.github.com> Date: Tue, 10 Feb 2026 09:08:12 +0800 Subject: [PATCH 04/12] docs add maxSQLLength version in 3.3.8 (#34543) --- docs/en/14-reference/01-components/02-taosc.md | 2 +- docs/zh/14-reference/01-components/02-taosc.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/14-reference/01-components/02-taosc.md b/docs/en/14-reference/01-components/02-taosc.md index 5dd491025422..4ab4e883ae98 100644 --- a/docs/en/14-reference/01-components/02-taosc.md +++ b/docs/en/14-reference/01-components/02-taosc.md @@ -60,7 +60,7 @@ The following configuration parameters only take effect for Native connections. | smlTsDefaultName | |Supported, effective immediately | Configuration for setting the time column name in schemaless auto table creation, default value "_ts" | | smlDot2Underline | |Supported, effective immediately | Converts dots in supertable names to underscores in schemaless | | maxInsertBatchRows | |Supported, effective immediately | Internal parameter, maximum number of rows per batch insert | -| maxSQLLength | v3.3.6.34 |Supported, effective immediately | Maximum length of a single SQL statement; default value: 1,048,576; minimum value: 1,048,576; maximum value: 67,108,864 | +| maxSQLLength |v3.3.6.34, v3.3.8.8|Supported, effective immediately | Maximum length of a single SQL statement; default value: 1,048,576; minimum value: 1,048,576; maximum value: 67,108,864 | ### Region Related diff --git a/docs/zh/14-reference/01-components/02-taosc.md b/docs/zh/14-reference/01-components/02-taosc.md index b0db2d9bb97d..48170b94a55a 100755 --- a/docs/zh/14-reference/01-components/02-taosc.md +++ b/docs/zh/14-reference/01-components/02-taosc.md @@ -339,7 +339,7 @@ taosc 和 taosd 存在许多同名参数,虽然名称相同但作用范围可 - 最小值:1048576 - 最大值:67108864 - 动态修改:支持通过 SQL 修改,立即生效 -- 支持版本:从 v3.3.6.34 版本开始引入 +- 支持版本:从 v3.3.6.34、v3.3.8.8 版本开始引入 ### 区域相关 From 0fb8d991aa62889490d7a024c0a55028ac36e6b9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 11 Feb 2026 10:55:10 +0800 Subject: [PATCH 05/12] doc: fix SQL example formatting in stream documentation (#34552) --- docs/zh/14-reference/03-taos-sql/14-stream.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/zh/14-reference/03-taos-sql/14-stream.md b/docs/zh/14-reference/03-taos-sql/14-stream.md index b7c5426b0de3..458ab0659bbd 100644 --- a/docs/zh/14-reference/03-taos-sql/14-stream.md +++ b/docs/zh/14-reference/03-taos-sql/14-stream.md @@ -355,10 +355,11 @@ notification_options: { ```sql CREATE STREAM avg_current_stream FILL_HISTORY 1 + INTO dst_table AS SELECT _wstart, _wend, AVG(current) FROM meters INTERVAL (1m) NOTIFY ('ws://localhost:8080/notify', 'wss://192.168.1.1:8080/notify?key=foo') - ON ('WINDOW_OPEN', 'WINDOW_CLOSE'); + ON ('WINDOW_OPEN', 'WINDOW_CLOSE') NOTIFY_HISTORY 0 ON_FAILURE PAUSE; ``` From f319c61b926de69334aca26304b5e9bc69cf2a67 Mon Sep 17 00:00:00 2001 From: Simon Guan Date: Thu, 12 Feb 2026 23:12:46 +0800 Subject: [PATCH 06/12] fix: ci errors --- test/cases/23-ShowCommands/test_show_basic.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/cases/23-ShowCommands/test_show_basic.py b/test/cases/23-ShowCommands/test_show_basic.py index 2b46f8e5843f..756e9ef3ed66 100644 --- a/test/cases/23-ShowCommands/test_show_basic.py +++ b/test/cases/23-ShowCommands/test_show_basic.py @@ -455,8 +455,8 @@ def show_variables(self): tdSql.error(f'ALTER LOCAL "{var}" "{error_val}"') var = 'supportVnodes' - expect_val = 1240 ## 1.211111 * 1024 - sql = f'ALTER DNODE 1 "{var}" "1.211111k"' + expect_val = 1024 + sql = f'ALTER DNODE 1 "{var}" "1024"' tdSql.execute(sql, queryTimes=1) val = int(self.get_variable(var, False, 1)) if val != expect_val: From 61679df302fc2930b381fec58d9978e54f485a50 Mon Sep 17 00:00:00 2001 From: WANG MINGMING Date: Wed, 25 Feb 2026 09:56:05 +0800 Subject: [PATCH 07/12] opti(tmq): do filter error for virtual child table & return error code in tmq poll (#34519) --- source/client/src/clientTmq.c | 2 +- source/libs/executor/src/executil.c | 2 +- .../04-Query/test_tag_tbname_in.py | 35 ++++++++++++++++++- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 88b2c72f0948..bb0ec78ae73d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2470,7 +2470,7 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){ } static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ - int32_t code = 0; + int32_t code = pRspWrapper->code; SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp; tqErrorC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 8fe866657212..d91c82febdb1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1150,7 +1150,7 @@ static int32_t getTableListInInOperator(void* pVnode, SArray* pExistedUidList, S ETableType tbType = TSDB_TABLE_MAX; if (pStoreAPI->metaFn.getTableTypeSuidByName(pVnode, name, &tbType, &csuid) == 0 && - tbType == TSDB_CHILD_TABLE && + (tbType == TSDB_CHILD_TABLE || tbType == TSDB_VIRTUAL_CHILD_TABLE) && csuid == suid) { if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) != NULL) { STUidTagInfo s = {.uid = uid, .name = name, .pTagVal = NULL}; diff --git a/test/cases/04-SuperTables/04-Query/test_tag_tbname_in.py b/test/cases/04-SuperTables/04-Query/test_tag_tbname_in.py index c9b27b26f5c2..6e5784474d18 100644 --- a/test/cases/04-SuperTables/04-Query/test_tag_tbname_in.py +++ b/test/cases/04-SuperTables/04-Query/test_tag_tbname_in.py @@ -1,3 +1,4 @@ +import time from new_test_framework.utils import tdLog, tdSql, sc, clusterComCheck @@ -30,8 +31,9 @@ def test_tag_tbname_in(self): tdLog.info(f"======================== dnode1 start") tdLog.info(f"======== step1") - tdSql.prepare("db1", drop=True) + time.sleep(3) + tdSql.execute(f"create database db1 vgroups 1") tdSql.execute(f"use db1;") tdSql.execute(f"create stable st1 (ts timestamp, f1 int) tags(tg1 int);") tdSql.execute(f"create table tb1 using st1 tags(1);") @@ -52,6 +54,13 @@ def test_tag_tbname_in(self): tdSql.execute(f"insert into tb7 values ('2022-07-10 16:31:07', 7);") tdSql.execute(f"insert into tb8 values ('2022-07-10 16:31:08', 8);") + tdSql.execute(f"create stable vst1 (ts timestamp, f1 int) tags(tg1 int, tg2 binary(20)) VIRTUAL 1;") + tdSql.execute(f"create vtable vtb1(f1 from tb1.f1) using vst1 tags(1, 'tag1');") + tdSql.execute(f"create vtable vtb2(f1 from tb2.f1) using vst1 tags(2, 'tag2');") + + tdSql.query(f"select * from vst1 where tbname in ('vtb1');") + tdSql.checkRows(1) + tdSql.query(f"select * from tb1 where tbname in ('tb1');") tdSql.checkRows(1) @@ -85,6 +94,30 @@ def test_tag_tbname_in(self): tdSql.checkData(0, 1, 1) + tdSql.query(f"select * from st1 where tbname in ('tb1') or f1 > 4;") + tdSql.checkRows(5) + + tdSql.query(f"select * from st1 where tbname in ('tb1') or f1 > 4 or tg1 > 2;") + tdSql.checkRows(7) + + tdSql.query(f"select * from st1 where tbname in ('tb1') and tg1 > 2;") + tdSql.checkRows(0) + + tdSql.query(f"select * from st1 where tbname in ('tb8') and tg1 > 2;") + tdSql.checkRows(1) + + tdSql.query(f"select * from st1 where tbname in ('tb1') or tg1 > 7;") + tdSql.checkRows(2) + + tdSql.query(f"select * from st1 where tbname in ('tb99','tb1');") + tdSql.checkRows(1) + + tdSql.query(f"select * from st1 where tbname in ('tb99','tb100');") + tdSql.checkRows(0) + + tdSql.query(f"select * from st1 where tbname in ('tb99','tb1','tb2');") + tdSql.checkRows(2) + tdSql.query(f"select * from st1 where tbname in ('tb1','tb1');") tdSql.checkRows(1) From 7748cdebd69d458359a69dd1eb2fe2a4dd5fb6c6 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 26 Feb 2026 16:12:32 +0800 Subject: [PATCH 08/12] fix(vtable): support vtable in metaQuery --- source/dnode/vnode/src/meta/metaQuery.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 248227b69c38..20191482ca7a 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -204,7 +204,7 @@ int metaGetTableTypeSuidByName(void *pVnode, char *tbName, ETableType *tbType, u code = metaGetTableEntryByName(&mr, tbName); if (code == 0) *tbType = mr.me.type; - if (TSDB_CHILD_TABLE == mr.me.type) { + if (TSDB_CHILD_TABLE == mr.me.type || TSDB_VIRTUAL_CHILD_TABLE == mr.me.type) { *suid = mr.me.ctbEntry.suid; } else if (TSDB_SUPER_TABLE == mr.me.type) { *suid = mr.me.uid; @@ -232,9 +232,9 @@ int metaGetTableTtlByUid(void *meta, uint64_t uid, int64_t *ttlDays) { if (code < 0) { goto _exit; } - if (mr.me.type == TSDB_CHILD_TABLE) { + if (mr.me.type == TSDB_CHILD_TABLE || mr.me.type == TSDB_VIRTUAL_CHILD_TABLE) { *ttlDays = mr.me.ctbEntry.ttlDays; - } else if (mr.me.type == TSDB_NORMAL_TABLE) { + } else if (mr.me.type == TSDB_NORMAL_TABLE || mr.me.type == TSDB_VIRTUAL_NORMAL_TABLE) { *ttlDays = mr.me.ntbEntry.ttlDays; } else { goto _exit; @@ -432,7 +432,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int tDecoderClear(&dc); goto _exit; } - } else if (me.type == TSDB_CHILD_TABLE) { + } else if (me.type == TSDB_CHILD_TABLE || me.type == TSDB_VIRTUAL_CHILD_TABLE) { uid = me.ctbEntry.suid; tDecoderClear(&dc); goto _query; @@ -510,9 +510,9 @@ int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock) { tDecoderClear(&dc); goto _exit; } - if (me.type == TSDB_CHILD_TABLE) { + if (me.type == TSDB_CHILD_TABLE || me.type == TSDB_VIRTUAL_CHILD_TABLE) { createTime = me.ctbEntry.btime; - } else if (me.type == TSDB_NORMAL_TABLE) { + } else if (me.type == TSDB_NORMAL_TABLE || me.type == TSDB_VIRTUAL_NORMAL_TABLE) { createTime = me.ntbEntry.btime; } tDecoderClear(&dc); From 010d474ff291b9d50e55daf8a13457be9d3f23fb Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Thu, 26 Feb 2026 16:52:48 +0800 Subject: [PATCH 09/12] Merge pull request #34590 from taosdata/fix/ci-cases-338 fix(stream): fix ci case issues --- include/common/streamMsg.h | 12 +- source/common/src/msg/streamJson.c | 14 ++ source/common/src/msg/streamMsg.c | 14 ++ source/dnode/mnode/impl/src/mndStreamMgmt.c | 7 +- source/dnode/mnode/impl/src/mndStreamUtil.c | 6 +- source/dnode/vnode/src/vnd/vnodeStream.c | 24 ++- .../libs/new-stream/inc/streamTriggerMerger.h | 29 +-- .../libs/new-stream/inc/streamTriggerTask.h | 4 + .../libs/new-stream/src/streamTriggerMerger.c | 185 +++++++----------- .../libs/new-stream/src/streamTriggerTask.c | 122 ++++++++---- source/libs/parser/src/parTranslater.c | 19 ++ source/libs/planner/src/planOptimizer.c | 6 +- .../02-Stream/test_stream_check_name.py | 29 +-- .../07-SubQuery/test_subquery_count_1.py | 14 +- .../07-SubQuery/test_subquery_count_2.py | 14 +- .../07-SubQuery/test_subquery_event.py | 14 +- .../07-SubQuery/test_subquery_session.py | 14 +- .../07-SubQuery/test_subquery_sliding.py | 14 +- .../07-SubQuery/test_subquery_state.py | 14 +- .../test_oldcase_checkpoint_info.py | 35 ++-- test/ci/cases.task | 2 +- 21 files changed, 306 insertions(+), 286 deletions(-) diff --git a/include/common/streamMsg.h b/include/common/streamMsg.h index c3a50229d1f9..e1a88b7faaca 100644 --- a/include/common/streamMsg.h +++ b/include/common/streamMsg.h @@ -293,8 +293,10 @@ typedef struct { int64_t flags; int64_t tsmaId; int64_t placeHolderBitmap; - int16_t calcTsSlotId; // only used when using %%trows - int16_t triTsSlotId; // only used when using %%trows + int16_t calcTsSlotId; // only used when using %%trows + int16_t triTsSlotId; + int16_t calcPkSlotId; // only used when using %%trows + int16_t triPkSlotId; // only for (virtual) child table and normal table int32_t triggerTblVgId; @@ -587,7 +589,9 @@ typedef struct { int64_t eventTypes; int64_t placeHolderBitmap; int16_t calcTsSlotId; // only used when using %%trows - int16_t triTsSlotId; // only used when using %%trows + int16_t triTsSlotId; + int16_t calcPkSlotId; // only used when using %%trows + int16_t triPkSlotId; void* triggerPrevFilter; void* triggerScanPlan; // only used for virtual tables void* calcCacheScanPlan; // only used for virtual tables @@ -859,7 +863,7 @@ typedef struct SSTriggerWalNewRsp { void* deleteBlock; void* dropBlock; int64_t ver; - int64_t verTime; + int64_t verTime; // us int32_t totalRows; bool isCalc; } SSTriggerWalNewRsp; diff --git a/source/common/src/msg/streamJson.c b/source/common/src/msg/streamJson.c index 10396af65afd..4098964002cc 100644 --- a/source/common/src/msg/streamJson.c +++ b/source/common/src/msg/streamJson.c @@ -439,6 +439,8 @@ static const char* jkCreateStreamReqTsmaId = "tsmaId"; static const char* jkCreateStreamReqPlaceHolderBitmap = "placeHolderBitmap"; static const char* jkCreateStreamReqCalcTsSlotId = "calcTsSlotId"; static const char* jkCreateStreamReqTriTsSlotId = "triTsSlotId"; +static const char* jkCreateStreamReqCalcPkSlotId = "calcPkSlotId"; +static const char* jkCreateStreamReqTriPkSlotId = "triPkSlotId"; static const char* jkCreateStreamReqTriggerTblVgId = "triggerTblVgId"; static const char* jkCreateStreamReqOutTblVgId = "outTblVgId"; @@ -632,6 +634,10 @@ static int32_t scmCreateStreamReqToJsonImpl(const void* pObj, void* pJson) { pJson, jkCreateStreamReqCalcTsSlotId, pReq->calcTsSlotId)); TAOS_CHECK_RETURN(tjsonAddIntegerToObject( pJson, jkCreateStreamReqTriTsSlotId, pReq->triTsSlotId)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject( + pJson, jkCreateStreamReqCalcPkSlotId, pReq->calcPkSlotId)); + TAOS_CHECK_RETURN(tjsonAddIntegerToObject( + pJson, jkCreateStreamReqTriPkSlotId, pReq->triPkSlotId)); TAOS_CHECK_RETURN(tjsonAddIntegerToObject( pJson, jkCreateStreamReqTriggerTblVgId, pReq->triggerTblVgId)); @@ -724,6 +730,10 @@ int32_t scmCreateStreamReqToJson( int32_t jsonToSCMCreateStreamReq(const void* pJson, void* pObj) { SCMCreateStreamReq* pReq = (SCMCreateStreamReq*)pObj; + pReq->calcTsSlotId = -1; + pReq->triTsSlotId = -1; + pReq->calcPkSlotId = -1; + pReq->triPkSlotId = -1; TAOS_CHECK_RETURN(tjsonDupStringValue( pJson, jkCreateStreamReqName, (char**)&pReq->name)); TAOS_CHECK_RETURN(tjsonGetBigIntValue( @@ -867,6 +877,10 @@ int32_t jsonToSCMCreateStreamReq(const void* pJson, void* pObj) { pJson, jkCreateStreamReqCalcTsSlotId, &pReq->calcTsSlotId)); TAOS_CHECK_RETURN(tjsonGetSmallIntValue( pJson, jkCreateStreamReqTriTsSlotId, &pReq->triTsSlotId)); + TAOS_CHECK_RETURN(tjsonGetSmallIntValue( + pJson, jkCreateStreamReqCalcPkSlotId, &pReq->calcPkSlotId)); + TAOS_CHECK_RETURN(tjsonGetSmallIntValue( + pJson, jkCreateStreamReqTriPkSlotId, &pReq->triPkSlotId)); TAOS_CHECK_RETURN(tjsonGetIntValue( pJson, jkCreateStreamReqTriggerTblVgId, &pReq->triggerTblVgId)); diff --git a/source/common/src/msg/streamMsg.c b/source/common/src/msg/streamMsg.c index 9d3ea2cf1304..a50c6a7476b2 100644 --- a/source/common/src/msg/streamMsg.c +++ b/source/common/src/msg/streamMsg.c @@ -1076,6 +1076,8 @@ int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerD TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->placeHolderBitmap)); TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcTsSlotId)); TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triTsSlotId)); + TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcPkSlotId)); + TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triPkSlotId)); int32_t triggerPrevFilterLen = (pMsg->triggerPrevFilter == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerPrevFilter) + 1); TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, triggerPrevFilterLen)); int32_t triggerScanPlanLen = (pMsg->triggerScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerScanPlan) + 1); @@ -1641,6 +1643,8 @@ int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployM TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->placeHolderBitmap)); TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcTsSlotId)); TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triTsSlotId)); + TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcPkSlotId)); + TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triPkSlotId)); TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerPrevFilter, NULL)); TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL)); TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL)); @@ -2393,6 +2397,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS int32_t tDeserializeSCMCreateStreamReqImplOld(SDecoder *pDecoder, SCMCreateStreamReq *pReq, int32_t leftBytes) { int32_t code = 0; int32_t lino; + pReq->calcPkSlotId = -1; + pReq->triPkSlotId = -1; TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); @@ -2934,6 +2940,14 @@ int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStre } pDst->trigger.event.trueForDuration = pSrc->trigger.event.trueForDuration; break; + case WINDOW_TYPE_COUNT: + pDst->trigger.count.countVal = pSrc->trigger.count.countVal; + pDst->trigger.count.sliding = pSrc->trigger.count.sliding; + if (pSrc->trigger.count.condCols) { + pDst->trigger.count.condCols = COPY_STR(pSrc->trigger.count.condCols); + TSDB_CHECK_NULL(pDst->trigger.count.condCols, code, lino, _exit, terrno); + } + break; default: pDst->trigger = pSrc->trigger; break; diff --git a/source/dnode/mnode/impl/src/mndStreamMgmt.c b/source/dnode/mnode/impl/src/mndStreamMgmt.c index d99b4e988d0f..4f334c20bce4 100755 --- a/source/dnode/mnode/impl/src/mndStreamMgmt.c +++ b/source/dnode/mnode/impl/src/mndStreamMgmt.c @@ -849,7 +849,7 @@ int32_t msmBuildTriggerDeployInfo(SMnode* pMnode, SStmStatus* pInfo, SStmTaskDep pMsg->triggerHasPF = pStream->pCreate->triggerHasPF; pMsg->isTriggerTblStb = (pStream->pCreate->triggerTblType == TSDB_SUPER_TABLE); pMsg->precision = pStream->pCreate->triggerPrec; - pMsg->partitionCols = pStream->pCreate->partitionCols; + pMsg->partitionCols = pInfo->pCreate->partitionCols; pMsg->pNotifyAddrUrls = pInfo->pCreate->pNotifyAddrUrls; pMsg->notifyEventTypes = pStream->pCreate->notifyEventTypes; @@ -866,6 +866,8 @@ int32_t msmBuildTriggerDeployInfo(SMnode* pMnode, SStmStatus* pInfo, SStmTaskDep pMsg->placeHolderBitmap = pStream->pCreate->placeHolderBitmap; pMsg->calcTsSlotId = pStream->pCreate->calcTsSlotId; pMsg->triTsSlotId = pStream->pCreate->triTsSlotId; + pMsg->calcPkSlotId = pStream->pCreate->calcPkSlotId; + pMsg->triPkSlotId = pStream->pCreate->triPkSlotId; pMsg->triggerPrevFilter = pInfo->pCreate->triggerPrevFilter; if (STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) { pMsg->triggerScanPlan = pInfo->pCreate->triggerScanPlan; @@ -5321,6 +5323,3 @@ int32_t msmInitRuntimeInfo(SMnode *pMnode) { return code; } - - - diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 85ee46c6fc83..821b71466394 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -556,14 +556,14 @@ void mstLogSStreamObj(char* tips, SStreamObj* p) { "calcNotifyOnly:%d lowLatencyCalc:%d igNoDataTrigger:%d notifyUrlNum:%d notifyEventTypes:%d addOptions:%d notifyHistory:%d " "outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " " "triggerTblType:%d triggerTblUid:%" PRIx64 " triggerTblSuid:%" PRIx64 " vtableCalc:%d outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d " - "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d " + "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d calcPkSlotId:%d triPkSlotId:%d " "triggerTblVgId:%d outTblVgId:%d calcScanPlanNum:%d forceOutCols:%d", q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName, q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst, q->calcNotifyOnly, q->lowLatencyCalc, q->igNoDataTrigger, notifyUrlNum, q->notifyEventTypes, q->addOptions, q->notifyHistory, outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime, q->triggerTblType, q->triggerTblUid, q->triggerTblSuid, q->vtableCalc, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion, - q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId, + q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId, q->calcPkSlotId, q->triPkSlotId, q->triggerTblVgId, q->outTblVgId, calcScanNum, forceOutColNum); switch (q->triggerType) { @@ -1370,5 +1370,3 @@ int32_t mstGetScanUidFromPlan(int64_t streamId, void* scanPlan, int64_t* uid) { return code; } - - diff --git a/source/dnode/vnode/src/vnd/vnodeStream.c b/source/dnode/vnode/src/vnd/vnodeStream.c index 5d51b13f7c10..430ef7367ae6 100644 --- a/source/dnode/vnode/src/vnd/vnodeStream.c +++ b/source/dnode/vnode/src/vnd/vnodeStream.c @@ -924,6 +924,9 @@ static int32_t processWalVerMetaNew(SVnode* pVnode, SSTriggerWalNewRsp* rsp, SSt if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){ if (rsp->ver < walGetFirstVer(pWalReader->pWal)) { rsp->ver = walGetFirstVer(pWalReader->pWal); + rsp->verTime = 0; + } else { + rsp->verTime = taosGetTimestampUs(); } ST_TASK_DLOG("vgId:%d %s scan wal end:%s", TD_VID(pVnode), __func__, tstrerror(code)); code = TSDB_CODE_SUCCESS; @@ -934,7 +937,8 @@ static int32_t processWalVerMetaNew(SVnode* pVnode, SSTriggerWalNewRsp* rsp, SSt STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, STREAM_RETURN_ROWS_NUM)); while (1) { code = walNextValidMsg(pWalReader, true); - if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){\ + if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){ + rsp->verTime = taosGetTimestampUs(); ST_TASK_DLOG("vgId:%d %s scan wal end:%s", TD_VID(pVnode), __func__, tstrerror(code)); code = TSDB_CODE_SUCCESS; goto end; @@ -1752,6 +1756,9 @@ static int32_t prepareIndexMetaData(SWalReader* pWalReader, SStreamTriggerReader if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){ if (resultRsp->ver < walGetFirstVer(pWalReader->pWal)) { resultRsp->ver = walGetFirstVer(pWalReader->pWal); + resultRsp->verTime = 0; + } else { + resultRsp->verTime = taosGetTimestampUs(); } ST_TASK_DLOG("%s scan wal end:%s", __func__, tstrerror(code)); code = TSDB_CODE_SUCCESS; @@ -1762,6 +1769,7 @@ static int32_t prepareIndexMetaData(SWalReader* pWalReader, SStreamTriggerReader while (1) { code = walNextValidMsg(pWalReader, true); if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){ + resultRsp->verTime = taosGetTimestampUs(); ST_TASK_DLOG("%s scan wal end:%s", __func__, tstrerror(code)); code = TSDB_CODE_SUCCESS; goto end; @@ -3020,9 +3028,10 @@ static int32_t vnodeProcessStreamWalMetaNewReq(SVnode* pVnode, SRpcMsg* pMsg, SS end: if (resultRsp.totalRows == 0) { code = TSDB_CODE_STREAM_NO_DATA; - buf = rpcMallocCont(sizeof(int64_t)); - *(int64_t *)buf = resultRsp.ver; - size = sizeof(int64_t); + size = sizeof(int64_t) * 2; + buf = rpcMallocCont(size); + *(int64_t*)buf = resultRsp.ver; + *(((int64_t*)buf) + 1) = resultRsp.verTime; } SRpcMsg rsp = { .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code}; @@ -3071,10 +3080,11 @@ static int32_t vnodeProcessStreamWalMetaDataNewReq(SVnode* pVnode, SRpcMsg* pMsg end: if (resultRsp.totalRows == 0) { - buf = rpcMallocCont(sizeof(int64_t)); - *(int64_t *)buf = resultRsp.ver; - size = sizeof(int64_t); code = TSDB_CODE_STREAM_NO_DATA; + size = sizeof(int64_t) * 2; + buf = rpcMallocCont(size); + *(int64_t*)buf = resultRsp.ver; + *(((int64_t*)buf) + 1) = resultRsp.verTime; } SRpcMsg rsp = { .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code}; diff --git a/source/libs/new-stream/inc/streamTriggerMerger.h b/source/libs/new-stream/inc/streamTriggerMerger.h index b0cdab272cfb..f44dee42e6a4 100644 --- a/source/libs/new-stream/inc/streamTriggerMerger.h +++ b/source/libs/new-stream/inc/streamTriggerMerger.h @@ -85,7 +85,6 @@ typedef enum ETriggerTimestampSorterMask { TRIGGER_TS_SORTER_MASK_META_DATA_SET = BIT_FLAG_MASK(1), TRIGGER_TS_SORTER_MASK_DATA_MERGER_BUILD = BIT_FLAG_MASK(2), TRIGGER_TS_SORTER_MASK_SESS_WIN_BUILD = BIT_FLAG_MASK(3), - TRIGGER_TS_SORTER_MASK_NO_META_DATA = BIT_FLAG_MASK(4), } ETriggerTimestampSorterMask; typedef struct SSTriggerTimestampSorter { @@ -95,6 +94,7 @@ typedef struct SSTriggerTimestampSorter { STimeWindow readRange; int64_t tbUid; int32_t tsSlotId; + int32_t pkSlotId; SArray *pMetaNodeBuf; SArray *pMetaLists; @@ -136,10 +136,11 @@ void stTimestampSorterReset(SSTriggerTimestampSorter *pSorter); * @param pRange The time range for sorting, containing start and end timestamps * @param tbUid The UID of the table to be sorted * @param tsSlotId The index of the timestamp column in the data block, starting from 0 + * @param pkSlotId The index of the primary key column in the data block, starting from 0 * @return int32_t Status code indicating success or error */ int32_t stTimestampSorterSetSortInfo(SSTriggerTimestampSorter *pSorter, STimeWindow *pRange, int64_t tbUid, - int32_t tsSlotId); + int32_t tsSlotId, int32_t pkSlotId); /** * @brief Sets metadata for data blocks in the timestamp sorter. @@ -150,14 +151,6 @@ int32_t stTimestampSorterSetSortInfo(SSTriggerTimestampSorter *pSorter, STimeWin */ int32_t stTimestampSorterSetMetaDatas(SSTriggerTimestampSorter *pSorter, SSTriggerTableMeta *pTableMeta); -/** - * @brief Sets the timestamp sorter without metadata; it would only be used to return bound data blocks. - * - * @param pSorter The SSTriggerTimestampSorter instance responsible for merging - * @return int32_t Status code indicating success or error - */ -int32_t stTimestampSorterSetEmptyMetaDatas(SSTriggerTimestampSorter *pSorter); - /** * @brief Get next data block from the sorter. * @@ -303,14 +296,6 @@ int32_t stVtableMergerSetPseudoCols(SSTriggerVtableMerger *pMerger, SSDataBlock */ int32_t stVtableMergerSetMetaDatas(SSTriggerVtableMerger *pMerger, SSHashObj *pOrigTableMetas); -/** - * @brief Sets the underlying timestamp sorters without metadata. - * - * @param pMerger The SSTriggerVtableMerger instance responsible for merging - * @return int32_t Status code indicating success or error - */ -int32_t stVtableMergerSetEmptyMetaDatas(SSTriggerVtableMerger *pMerger); - /** * @brief Gets next data block from the vtable merger. * @@ -356,6 +341,7 @@ typedef struct SSTriggerNewTimestampSorter { bool inUse; SSDataBlock *pDataBlock; int32_t tsSlotId; + int32_t pkSlotId; SArray *pSliceBuf; // SArray SArray *pSliceLists; // SArray @@ -393,13 +379,15 @@ void stNewTimestampSorterReset(SSTriggerNewTimestampSorter *pSorter); * @param pSorter The SSTriggerNewTimestampSorter instance to set the data * @param tbUid The UID of the table to be sorted * @param tsSlotId The index of the timestamp column in the data block, starting from 0 + * @param pkSlotId The index of the primary key column in the data block, starting from 0 * @param pReadRange The time range for sorting, containing start and end timestamps * @param pMetas The metadatas of current table * @param pSlice The data slice containing the data block and the range of rows to be considered * @return int32_t Status code indicating success or error */ int32_t stNewTimestampSorterSetData(SSTriggerNewTimestampSorter *pSorter, int64_t tbUid, int32_t tsSlotId, - STimeWindow *pReadRange, SObjList *pMetas, SSTriggerDataSlice *pSlice); + int32_t pkSlotId, STimeWindow *pReadRange, SObjList *pMetas, + SSTriggerDataSlice *pSlice); /** * @brief Get next data block from the sorter. @@ -464,6 +452,7 @@ void stNewVtableMergerReset(SSTriggerNewVtableMerger *pMerger); * @param pMerger The SSTriggerNewVtableMerger instance responsible for merging * @param vtbUid The UID of the virtual table * @param tsSlotId The index of the timestamp column in the data block, starting from 0 + * @param pkSlotId The index of the primary key column in the data block, starting from 0 * @param pReadRange The time range for sorting, containing start and end timestamps * @param pTableUids List of original table UIDs involved in the merge * @param pTableColRefs Array of table column references, each containing original and virtual table column mappings @@ -471,7 +460,7 @@ void stNewVtableMergerReset(SSTriggerNewVtableMerger *pMerger); * @param pSlices The hash map from original table uid to its data slice * @return int32_t */ -int32_t stNewVtableMergerSetData(SSTriggerNewVtableMerger *pMerger, int64_t vtbUid, int32_t tsSlotId, +int32_t stNewVtableMergerSetData(SSTriggerNewVtableMerger *pMerger, int64_t vtbUid, int32_t tsSlotId, int32_t pkSlotId, STimeWindow *pReadRange, SObjList *pTableUids, SArray *pTableColRefs, SSHashObj *pMetas, SSHashObj *pSlices); diff --git a/source/libs/new-stream/inc/streamTriggerTask.h b/source/libs/new-stream/inc/streamTriggerTask.h index c49be871bdb2..9f773a1ff4fb 100644 --- a/source/libs/new-stream/inc/streamTriggerTask.h +++ b/source/libs/new-stream/inc/streamTriggerTask.h @@ -361,6 +361,8 @@ typedef struct SStreamTriggerTask { }; int32_t trigTsIndex; int32_t calcTsIndex; + int32_t trigPkIndex; + int32_t calcPkIndex; int64_t maxDelayNs; int64_t fillHistoryStartTime; int64_t watermark; @@ -381,6 +383,8 @@ typedef struct SStreamTriggerTask { // trigger options: old version, to be removed int32_t histTrigTsIndex; int32_t histCalcTsIndex; + int32_t histTrigPkIndex; + int32_t histCalcPkIndex; int64_t histStateSlotId; SNode *histTriggerFilter; SNode *histStateExpr; diff --git a/source/libs/new-stream/src/streamTriggerMerger.c b/source/libs/new-stream/src/streamTriggerMerger.c index 44dc2866e5d9..26b313341a5f 100644 --- a/source/libs/new-stream/src/streamTriggerMerger.c +++ b/source/libs/new-stream/src/streamTriggerMerger.c @@ -19,6 +19,38 @@ #include "tcompare.h" #include "tdatablock.h" +static FORCE_INLINE int32_t stCompareRowsInBlocks(const SSDataBlock *pDataBlock, int32_t tsSlotId, int32_t pkSlotId, + int32_t leftIdx, int32_t rightIdx) { + if (pDataBlock == NULL || tsSlotId < 0 || leftIdx < 0 || rightIdx < 0) { + return 0; + } + + SColumnInfoData *pTsCol = TARRAY_GET_ELEM(pDataBlock->pDataBlock, tsSlotId); + int64_t *pTsData = (int64_t *)pTsCol->pData; + SColumnInfoData *pPkCol = NULL; + + if (pTsData[leftIdx] < pTsData[rightIdx]) { + return -1; + } else if (pTsData[leftIdx] > pTsData[rightIdx]) { + return 1; + } else if (pkSlotId >= 0 && pkSlotId != tsSlotId) { + pPkCol = TARRAY_GET_ELEM(pDataBlock->pDataBlock, pkSlotId); + bool leftNull = colDataIsNull_s(pPkCol, leftIdx); + bool rightNull = colDataIsNull_s(pPkCol, rightIdx); + if (leftNull || rightNull) { + if (leftNull == rightNull) { + return 0; + } + return leftNull ? -1 : 1; + } + + __compar_fn_t cmp = getKeyComparFunc(pPkCol->info.type, TSDB_ORDER_ASC); + return cmp(colDataGetData(pPkCol, leftIdx), colDataGetData(pPkCol, rightIdx)); + } + + return 0; +} + typedef struct SSTriggerMetaDataNode { SSTriggerMetaData *pMeta; TD_DLIST_NODE(SSTriggerMetaDataNode); @@ -144,7 +176,7 @@ void stTimestampSorterReset(SSTriggerTimestampSorter *pSorter) { } int32_t stTimestampSorterSetSortInfo(SSTriggerTimestampSorter *pSorter, STimeWindow *pRange, int64_t tbUid, - int32_t tsSlotId) { + int32_t tsSlotId, int32_t pkSlotId) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamTriggerTask *pTask = pSorter->pTask; @@ -155,6 +187,7 @@ int32_t stTimestampSorterSetSortInfo(SSTriggerTimestampSorter *pSorter, STimeWin pSorter->readRange = *pRange; pSorter->tbUid = tbUid; pSorter->tsSlotId = tsSlotId; + pSorter->pkSlotId = pkSlotId; BIT_FLAG_SET_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_SORT_INFO_SET); @@ -220,32 +253,6 @@ int32_t stTimestampSorterSetMetaDatas(SSTriggerTimestampSorter *pSorter, SSTrigg return code; } -int32_t stTimestampSorterSetEmptyMetaDatas(SSTriggerTimestampSorter *pSorter) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; - SStreamTriggerTask *pTask = pSorter->pTask; - SArray *pMetaNodeBuf = pSorter->pMetaNodeBuf; - SArray *pMetaLists = pSorter->pMetaLists; - - QUERY_CHECK_CONDITION(pSorter->flags == TRIGGER_TS_SORTER_MASK_SORT_INFO_SET, code, lino, _end, - TSDB_CODE_INVALID_PARA); - QUERY_CHECK_CONDITION(pMetaNodeBuf != NULL && TARRAY_SIZE(pMetaNodeBuf) == 0, code, lino, _end, - TSDB_CODE_INVALID_PARA); - QUERY_CHECK_CONDITION(pMetaLists != NULL && TARRAY_SIZE(pMetaLists) == 0, code, lino, _end, TSDB_CODE_INVALID_PARA); - - SSTriggerMetaDataList *pList = taosArrayReserve(pMetaLists, 1); - QUERY_CHECK_NULL(pList, code, lino, _end, terrno); - *pList = (SSTriggerMetaDataList){.nextTs = INT64_MIN}; - - BIT_FLAG_SET_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_NO_META_DATA); - -_end: - if (TSDB_CODE_SUCCESS != code) { - ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - } - return code; -} - static FORCE_INLINE void stTimestampSorterMetaListMoveForward(SSTriggerMetaDataList *pList) { SSTriggerMetaDataNode *pHead = TD_DLIST_HEAD(pList); if (pHead != NULL) { @@ -387,34 +394,6 @@ int32_t stTimestampSorterNextDataBlock(SSTriggerTimestampSorter *pSorter, SSData *pStartIdx = 0; *pEndIdx = 0; - if (BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_NO_META_DATA)) { - pList = TARRAY_DATA(pSorter->pMetaLists); - if (pList->pDataBlock != NULL && pList->startIdx < pList->endIdx) { - int32_t nrows = blockDataGetNumOfRows(pList->pDataBlock); - SColumnInfoData *pTsCol = taosArrayGet(pList->pDataBlock->pDataBlock, pSorter->tsSlotId); - QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno); - int64_t *pTsData = (int64_t *)pTsCol->pData; - pList->startIdx = pList->endIdx; - if (pList->startIdx < nrows) { - pList->nextTs = pTsData[pList->startIdx]; - } else { - pList->nextTs = pTsData[nrows - 1] + 1; - blockDataDestroy(pList->pDataBlock); - pList->startIdx = pList->endIdx = 0; - } - } - if (pList->pDataBlock != NULL) { - int32_t nrows = blockDataGetNumOfRows(pList->pDataBlock); - pList->endIdx = nrows; - *ppDataBlock = pList->pDataBlock; - *pStartIdx = pList->startIdx; - *pEndIdx = pList->endIdx; - } else { - // need to fetch new data block - } - goto _end; - } - if (!BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_DATA_MERGER_BUILD)) { code = stTimestampSorterBuildDataMerger(pSorter); QUERY_CHECK_CODE(code, lino, _end); @@ -745,11 +724,6 @@ int32_t stTimestampSorterGetMetaToFetch(SSTriggerTimestampSorter *pSorter, SSTri *ppMeta = NULL; - if (BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_NO_META_DATA) || - IS_TRIGGER_TIMESTAMP_SORTER_EMPTY(pSorter)) { - goto _end; - } - QUERY_CHECK_CONDITION(BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_DATA_MERGER_BUILD), code, lino, _end, TSDB_CODE_INVALID_PARA); SSTriggerMetaDataList *pList = TARRAY_GET_ELEM(pSorter->pMetaLists, tMergeTreeGetChosenIndex(pSorter->pDataMerger)); @@ -770,19 +744,6 @@ int32_t stTimestampSorterBindDataBlock(SSTriggerTimestampSorter *pSorter, SSData SStreamTriggerTask *pTask = pSorter->pTask; SSTriggerMetaDataList *pList = NULL; - if (BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_NO_META_DATA)) { - SColumnInfoData *pTsCol = taosArrayGet((*ppDataBlock)->pDataBlock, pSorter->tsSlotId); - QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno); - int64_t *pTsData = (int64_t *)pTsCol->pData; - - pList = TARRAY_DATA(pSorter->pMetaLists); - QUERY_CHECK_CONDITION(pList->pDataBlock == NULL, code, lino, _end, TSDB_CODE_INVALID_PARA); - pList->pDataBlock = *ppDataBlock; - pList->startIdx = pList->endIdx = 0; - pList->nextTs = pTsData[0]; - goto _end; - } - QUERY_CHECK_CONDITION(!IS_TRIGGER_TIMESTAMP_SORTER_EMPTY(pSorter), code, lino, _end, TSDB_CODE_INVALID_PARA); QUERY_CHECK_CONDITION(BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_DATA_MERGER_BUILD), code, lino, _end, TSDB_CODE_INVALID_PARA); @@ -993,7 +954,7 @@ int32_t stVtableMergerSetMergeInfo(SSTriggerVtableMerger *pMerger, STimeWindow * code = stTimestampSorterInit(pReader, pTask); QUERY_CHECK_CODE(code, lino, _end); } - code = stTimestampSorterSetSortInfo(pReader, pRange, pReaderInfo->pColRef->otbUid, 0); + code = stTimestampSorterSetSortInfo(pReader, pRange, pReaderInfo->pColRef->otbUid, 0, -1); QUERY_CHECK_CODE(code, lino, _end); } @@ -1069,31 +1030,6 @@ int32_t stVtableMergerSetMetaDatas(SSTriggerVtableMerger *pMerger, SSHashObj *pO return code; } -int32_t stVtableMergerSetEmptyMetaDatas(SSTriggerVtableMerger *pMerger) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; - SStreamTriggerTask *pTask = pMerger->pTask; - SArray *pReaders = pMerger->pReaders; - - QUERY_CHECK_CONDITION(pMerger->flags == TRIGGER_VTABLE_MERGER_MASK_MERGE_INFO_SET, code, lino, _end, - TSDB_CODE_INVALID_PARA); - QUERY_CHECK_CONDITION(TARRAY_SIZE(pReaders) >= TARRAY_SIZE(pMerger->pReaderInfos), code, lino, _end, - TSDB_CODE_INVALID_PARA); - - int32_t nReaders = TARRAY_SIZE(pMerger->pReaderInfos); - for (int32_t i = 0; i < nReaders; i++) { - SSTriggerTimestampSorter *pReader = *(SSTriggerTimestampSorter **)TARRAY_GET_ELEM(pReaders, i); - code = stTimestampSorterSetEmptyMetaDatas(pReader); - QUERY_CHECK_CODE(code, lino, _end); - } - -_end: - if (TSDB_CODE_SUCCESS != code) { - ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - } - return code; -} - #define stVtableMerger_NUM_OF_ROWS_PER_BLOCK 4096 static int32_t stVtableMergerCopyDataBlock(SSTriggerVtableMerger *pMerger, SVtableMergerReaderInfo *pReaderInfo, @@ -1378,19 +1314,24 @@ static int32_t stNewTimestampSorterSliceListCompare(const void *pLeft, const voi SNewTimestampSorterSliceList *pLeftList = TARRAY_GET_ELEM(pSorter->pSliceLists, left); SNewTimestampSorterSliceList *pRightList = TARRAY_GET_ELEM(pSorter->pSliceLists, right); - SColumnInfoData *pTsCol = TARRAY_GET_ELEM(pSorter->pDataBlock->pDataBlock, pSorter->tsSlotId); - int64_t *pTsData = (int64_t *)pTsCol->pData; - int32_t leftIdx = (TD_DLIST_HEAD(pLeftList) != NULL) ? TD_DLIST_HEAD(pLeftList)->startIdx : -1; - int64_t leftTs = (leftIdx >= 0) ? pTsData[leftIdx] : INT64_MAX; - int32_t rightIdx = (TD_DLIST_HEAD(pRightList) != NULL) ? TD_DLIST_HEAD(pRightList)->startIdx : -1; - int64_t rightTs = (rightIdx >= 0) ? pTsData[rightIdx] : INT64_MAX; + int32_t leftIdx = (TD_DLIST_HEAD(pLeftList) != NULL) ? TD_DLIST_HEAD(pLeftList)->startIdx : -1; + int32_t rightIdx = (TD_DLIST_HEAD(pRightList) != NULL) ? TD_DLIST_HEAD(pRightList)->startIdx : -1; - // compare by start timestamp first, then by start index - if (leftTs < rightTs) { - return -1; - } else if (leftTs > rightTs) { + if (leftIdx < 0 && rightIdx >= 0) { return 1; - } else if (leftIdx < rightIdx) { + } else if (leftIdx >= 0 && rightIdx < 0) { + return -1; + } else if (leftIdx >= 0 && rightIdx >= 0) { + // compare by row key: timestamp first, then primary keys. + int32_t ret = stCompareRowsInBlocks(pSorter->pDataBlock, pSorter->tsSlotId, pSorter->pkSlotId, leftIdx, rightIdx); + if (ret < 0) { + return -1; + } else if (ret > 0) { + return 1; + } + } + // keep newer row first when row key is identical + if (leftIdx < rightIdx) { return 1; } else if (leftIdx > rightIdx) { return -1; @@ -1465,7 +1406,8 @@ void stNewTimestampSorterReset(SSTriggerNewTimestampSorter *pSorter) { } int32_t stNewTimestampSorterSetData(SSTriggerNewTimestampSorter *pSorter, int64_t tbUid, int32_t tsSlotId, - STimeWindow *pReadRange, SObjList *pMetas, SSTriggerDataSlice *pSlice) { + int32_t pkSlotId, STimeWindow *pReadRange, SObjList *pMetas, + SSTriggerDataSlice *pSlice) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamTriggerTask *pTask = pSorter->pTask; @@ -1476,6 +1418,7 @@ int32_t stNewTimestampSorterSetData(SSTriggerNewTimestampSorter *pSorter, int64_ pSorter->inUse = true; pSorter->pDataBlock = pDataBlock; pSorter->tsSlotId = tsSlotId; + pSorter->pkSlotId = pkSlotId; pDataBlock->info.id.uid = tbUid; // collect all data slices; data in each slice is in ascending order @@ -1635,7 +1578,8 @@ int32_t stNewTimestampSorterNextDataBlock(SSTriggerNewTimestampSorter *pSorter, if (i == idx || pTempSlice == NULL) { continue; } - if (pTsData[pTempSlice->startIdx] == startTs) { + if (stCompareRowsInBlocks(pSorter->pDataBlock, pSorter->tsSlotId, pSorter->pkSlotId, pSlice->startIdx, + pTempSlice->startIdx) == 0) { // skip the current row ST_TASK_DLOG("Slice List %d: pop [%d, %d)", i, pTempSlice->startIdx, pTempSlice->startIdx + 1); needRebuild = true; @@ -1648,11 +1592,19 @@ int32_t stNewTimestampSorterNextDataBlock(SSTriggerNewTimestampSorter *pSorter, } } } + endTs = TMIN(endTs, pTsData[pTempSlice->startIdx]); } - QUERY_CHECK_CONDITION(endTs > startTs, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + QUERY_CHECK_CONDITION(endTs >= startTs, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); - if (pTsData[pSlice->endIdx - 1] < endTs) { + if (startTs == endTs) { + *pEndIdx = pSlice->startIdx + 1; + pSlice->startIdx = *pEndIdx; + if (pSlice->startIdx == pSlice->endIdx) { + TD_DLIST_POP(pList, pSlice); + pSlice = TD_DLIST_HEAD(pList); + } + } else if (pTsData[pSlice->endIdx - 1] < endTs) { *pEndIdx = pSlice->endIdx; TD_DLIST_POP(pList, pSlice); pSlice = TD_DLIST_HEAD(pList); @@ -1839,7 +1791,7 @@ void stNewVtableMergerReset(SSTriggerNewVtableMerger *pMerger) { } } -int32_t stNewVtableMergerSetData(SSTriggerNewVtableMerger *pMerger, int64_t vtbUid, int32_t tsSlotId, +int32_t stNewVtableMergerSetData(SSTriggerNewVtableMerger *pMerger, int64_t vtbUid, int32_t tsSlotId, int32_t pkSlotId, STimeWindow *pReadRange, SObjList *pTableUids, SArray *pTableColRefs, SSHashObj *pMetas, SSHashObj *pSlices) { int32_t code = TSDB_CODE_SUCCESS; @@ -1889,7 +1841,8 @@ int32_t stNewVtableMergerSetData(SSTriggerNewVtableMerger *pMerger, int64_t vtbU } SObjList *pMeta = tSimpleHashGet(pMetas, &pColRef->otbVgId, sizeof(int32_t)); QUERY_CHECK_NULL(pMeta, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); - code = stNewTimestampSorterSetData(pReaderInfo->pReader, pColRef->otbUid, tsSlotId, pReadRange, pMeta, pSlice); + code = stNewTimestampSorterSetData(pReaderInfo->pReader, pColRef->otbUid, tsSlotId, pkSlotId, pReadRange, pMeta, + pSlice); QUERY_CHECK_CODE(code, lino, _end); pReaderInfo->pColRef = pColRef; code = stNewTimestampSorterNextDataBlock(pReaderInfo->pReader, NULL, &pReaderInfo->startIdx, &pReaderInfo->endIdx); diff --git a/source/libs/new-stream/src/streamTriggerTask.c b/source/libs/new-stream/src/streamTriggerTask.c index 512084860d9a..0deb1f0101a1 100644 --- a/source/libs/new-stream/src/streamTriggerTask.c +++ b/source/libs/new-stream/src/streamTriggerTask.c @@ -1326,6 +1326,14 @@ int32_t stTriggerTaskAddRecalcRequest(SStreamTriggerTask *pTask, SSTriggerRealti } if (pCalcRange->ekey != INT64_MAX) { STimeWindow win = stTriggerTaskGetTimeWindow(pTask, pCalcRange->ekey); + while (pTask->interval.interval > pTask->interval.sliding) { + STimeWindow nextWin = win; + stTriggerTaskNextTimeWindow(pTask, &nextWin); + if (nextWin.skey > pCalcRange->ekey) { + break; + } + win = nextWin; + } pReq->scanRange.ekey = TMIN(pReq->scanRange.ekey, win.ekey); } } else if (!isUserRecalc && pTask->fillHistoryStartTime > 0 && pCalcRange->skey != INT64_MIN) { @@ -1336,6 +1344,9 @@ int32_t stTriggerTaskAddRecalcRequest(SStreamTriggerTask *pTask, SSTriggerRealti } if ((pReq->scanRange.skey > pReq->scanRange.ekey) || (pReq->calcRange.skey > pReq->calcRange.ekey)) { + if (pReq->isHistory) { + atomic_store_8(&pTask->historyFinished, 1); + } goto _end; } @@ -1920,6 +1931,8 @@ static int32_t stTriggerTaskParseVirtScan(SStreamTriggerTask *pTask, void *trigg pTask->histTrigTsIndex = 0; pTask->histCalcTsIndex = 0; + pTask->histTrigPkIndex = -1; + pTask->histCalcPkIndex = -1; if (pTask->triggerType == STREAM_TRIGGER_STATE) { if (pTask->histStateSlotId != -1) { @@ -2182,6 +2195,8 @@ int32_t stTriggerTaskDeploy(SStreamTriggerTask *pTask, SStreamTriggerDeployMsg * pTask->trigTsIndex = pMsg->triTsSlotId; pTask->calcTsIndex = pMsg->calcTsSlotId; + pTask->trigPkIndex = pMsg->triPkSlotId; + pTask->calcPkIndex = pMsg->calcPkSlotId; pTask->maxDelayNs = pMsg->maxDelay * NANOSECOND_PER_MSEC; pTask->fillHistoryStartTime = pMsg->fillHistoryStartTime; pTask->watermark = pMsg->watermark; @@ -2233,6 +2248,8 @@ int32_t stTriggerTaskDeploy(SStreamTriggerTask *pTask, SStreamTriggerDeployMsg * pTask->histTrigTsIndex = pTask->trigTsIndex; } pTask->histCalcTsIndex = pTask->calcTsIndex; + pTask->histTrigPkIndex = pTask->trigPkIndex; + pTask->histCalcPkIndex = pTask->calcPkIndex; if (pTask->triggerFilter != NULL) { code = nodesCloneNode(pTask->triggerFilter, &pTask->histTriggerFilter); QUERY_CHECK_CODE(code, lino, _end); @@ -2870,8 +2887,8 @@ int32_t stTriggerTaskGetStatus(SStreamTask *pTask, SSTriggerRuntimeStatus *pStat int32_t stTriggerTaskGetDelay(SStreamTask *pStreamTask, int64_t *pDelay, bool *pFillHisFinished) { SStreamTriggerTask *pTask = (SStreamTriggerTask *)pStreamTask; - int64_t now = taosGetTimestampNs(); - *pDelay = now - atomic_load_64(&pTask->latestVersionTime); + int64_t now = taosGetTimestampUs(); + *pDelay = (now - atomic_load_64(&pTask->latestVersionTime)) / (NANOSECOND_PER_MSEC / NANOSECOND_PER_USEC); *pFillHisFinished = atomic_load_8(&pTask->historyFinished); return TSDB_CODE_SUCCESS; } @@ -5075,11 +5092,14 @@ static int32_t stRealtimeContextProcPullRsp(SSTriggerRealtimeContext *pContext, QUERY_CHECK_NULL(pProgress, code, lino, _end, TSDB_CODE_INVALID_PARA); if (pRsp->code == TSDB_CODE_STREAM_NO_DATA) { - QUERY_CHECK_CONDITION(pRsp->contLen == sizeof(int64_t), code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_CONDITION(pRsp->contLen >= sizeof(int64_t), code, lino, _end, TSDB_CODE_INVALID_PARA); blockDataEmpty(pContext->pMetaBlock); blockDataEmpty(pContext->pDeleteBlock); blockDataEmpty(pContext->pDropBlock); pContext->pMetaBlock->info.version = *(int64_t *)pRsp->pCont; + if (pRsp->contLen > sizeof(int64_t)) { + pProgress->verTime = *(((int64_t *)pRsp->pCont) + 1); + } } else { QUERY_CHECK_CONDITION(pRsp->contLen > 0, code, lino, _end, TSDB_CODE_INVALID_PARA); SSTriggerWalNewRsp rsp = {.metaBlock = pContext->pMetaBlock, @@ -5225,12 +5245,15 @@ static int32_t stRealtimeContextProcPullRsp(SSTriggerRealtimeContext *pContext, bool firstDataBlock = (blockDataGetNumOfCols(pProgress->pTrigBlock) == 0); if (pRsp->code == TSDB_CODE_STREAM_NO_DATA) { - QUERY_CHECK_CONDITION(pRsp->contLen == sizeof(int64_t), code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_CONDITION(pRsp->contLen >= sizeof(int64_t), code, lino, _end, TSDB_CODE_INVALID_PARA); if (pContext->walMode == STRIGGER_WAL_META_WITH_DATA) { blockDataEmpty(pContext->pMetaBlock); blockDataEmpty(pContext->pDeleteBlock); blockDataEmpty(pContext->pDropBlock); pContext->pMetaBlock->info.version = *(int64_t *)pRsp->pCont; + if (pRsp->contLen > sizeof(int64_t)) { + pProgress->verTime = *(((int64_t *)pRsp->pCont) + 1); + } } taosArrayClear(pContext->pTempSlices); } else { @@ -8058,7 +8081,8 @@ static int32_t stRealtimeGroupNextDataBlock(SSTriggerRealtimeGroup *pGroup, SSDa SObjList *pMetas = tSimpleHashGet(pGroup->pWalMetas, &vgId, sizeof(int32_t)); QUERY_CHECK_NULL(pMetas, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); STimeWindow range = {.skey = pGroup->oldThreshold + 1, .ekey = pGroup->newThreshold}; - code = stNewTimestampSorterSetData(pContext->pSorter, tbUid, pTask->trigTsIndex, &range, pMetas, pSlice); + code = stNewTimestampSorterSetData(pContext->pSorter, tbUid, pTask->trigTsIndex, pTask->trigPkIndex, &range, + pMetas, pSlice); QUERY_CHECK_CODE(code, lino, _end); } code = stNewTimestampSorterNextDataBlock(pContext->pSorter, ppDataBlock, pStartIdx, pEndIdx); @@ -8081,8 +8105,8 @@ static int32_t stRealtimeGroupNextDataBlock(SSTriggerRealtimeGroup *pGroup, SSDa break; } STimeWindow range = {.skey = pGroup->oldThreshold + 1, .ekey = pGroup->newThreshold}; - code = stNewVtableMergerSetData(pContext->pMerger, vtbUid, pTask->trigTsIndex, &range, &pGroup->tableUids, - pInfo->pTrigColRefs, pGroup->pWalMetas, pContext->pSlices); + code = stNewVtableMergerSetData(pContext->pMerger, vtbUid, pTask->trigTsIndex, pTask->trigPkIndex, &range, + &pGroup->tableUids, pInfo->pTrigColRefs, pGroup->pWalMetas, pContext->pSlices); QUERY_CHECK_CODE(code, lino, _end); } code = stNewVtableMergerNextDataBlock(pContext->pMerger, ppDataBlock, pStartIdx, pEndIdx); @@ -8116,7 +8140,8 @@ static int32_t stRealtimeGroupNextDataBlock(SSTriggerRealtimeGroup *pGroup, SSDa if (TARRAY_DATA(pContext->pCalcReq->params) != pContext->pCurParam) { range.skey = TMAX(range.skey, (pContext->pCurParam - 1)->wend + 1); } - code = stNewTimestampSorterSetData(pContext->pCalcSorter, tbUid, pTask->calcTsIndex, &range, pMetas, pSlice); + code = stNewTimestampSorterSetData(pContext->pCalcSorter, tbUid, pTask->calcTsIndex, pTask->calcPkIndex, &range, + pMetas, pSlice); QUERY_CHECK_CODE(code, lino, _end); } code = stNewTimestampSorterNextDataBlock(pContext->pCalcSorter, ppDataBlock, pStartIdx, pEndIdx); @@ -8143,7 +8168,7 @@ static int32_t stRealtimeGroupNextDataBlock(SSTriggerRealtimeGroup *pGroup, SSDa if (TARRAY_DATA(pContext->pCalcReq->params) != pContext->pCurParam) { range.skey = TMAX(range.skey, (pContext->pCurParam - 1)->wend + 1); } - code = stNewVtableMergerSetData(pContext->pCalcMerger, vtbUid, pTask->calcTsIndex, &range, + code = stNewVtableMergerSetData(pContext->pCalcMerger, vtbUid, pTask->calcTsIndex, pTask->calcPkIndex, &range, &pContext->pCalcTableUids, pInfo->pCalcColRefs, pGroup->pWalMetas, pContext->pSlices); QUERY_CHECK_CODE(code, lino, _end); @@ -9131,9 +9156,13 @@ static int32_t stRealtimeGroupGenCalcParams(SSTriggerRealtimeGroup *pGroup, int3 nextExecTime = TMIN(nextExecTime, t); } } - if (initPendingSize == 0 && (pGroup->pPendingCalcParams.neles > 0 || pGroup->pPendingParWinCalcParams.neles > 0)) { - int64_t t = pTask->lowLatencyCalc ? now : (now + tsStreamBatchRequestWaitMs * NANOSECOND_PER_MSEC); - nextExecTime = TMIN(nextExecTime, t); + if (pGroup->pPendingCalcParams.neles > 0 || pGroup->pPendingParWinCalcParams.neles > 0) { + if (initPendingSize == 0) { + int64_t t = pTask->lowLatencyCalc ? now : (now + tsStreamBatchRequestWaitMs * NANOSECOND_PER_MSEC); + nextExecTime = TMIN(nextExecTime, t); + } else { + nextExecTime = TMIN(nextExecTime, pGroup->nextExecTime); + } } if (nextExecTime != INT64_MAX && nextExecTime > pGroup->nextExecTime) { @@ -9625,38 +9654,58 @@ static int32_t stHistoryGroupAddCalcParam(SSTriggerHistoryGroup *pGroup, SSTrigg int32_t lino = 0; SSTriggerHistoryContext *pContext = pGroup->pContext; SStreamTriggerTask *pTask = pContext->pTask; - bool initSize = pGroup->pPendingCalcParams.neles + pGroup->pPendingParWinCalcParams.neles; + int32_t initSize = pGroup->pPendingCalcParams.neles + pGroup->pPendingParWinCalcParams.neles; + bool overlap = (pParam->wstart <= pContext->calcRange.ekey) && (pParam->wend >= pContext->calcRange.skey); + bool hasBefore = (pParam->wstart < pContext->calcRange.skey); + bool hasAfter = (pParam->wend > pContext->calcRange.ekey); - if (pParam->wstart > pGroup->finishTs) { - tDestroySSTriggerCalcParam(pParam); - goto _end; - } - if (!isParent) { - if (pParam->wstart < pContext->calcRange.skey) { - // skip param before the calc range - taosObjListClearEx(&pGroup->pPendingParWinCalcParams, tDestroySSTriggerCalcParam); - taosObjListClearEx(&pGroup->pPendingCalcParams, tDestroySSTriggerCalcParam); + switch (pTask->triggerType) { + case STREAM_TRIGGER_SLIDING: + case STREAM_TRIGGER_COUNT: { + if (!overlap) { + goto _end; + } + break; } - code = taosObjListAppend(&pGroup->pPendingCalcParams, pParam); - if (code != TSDB_CODE_SUCCESS) { - tDestroySSTriggerCalcParam(pParam); - goto _end; + case STREAM_TRIGGER_EVENT: { + if (!overlap && !(hasAfter && pParam->wstart <= pGroup->finishTs)) { + goto _end; + } + break; } - if (pParam->wend > pContext->calcRange.ekey) { - pGroup->finishTs = pParam->wstart; + case STREAM_TRIGGER_SESSION: + case STREAM_TRIGGER_STATE: { + if (!overlap && !hasBefore && !(hasAfter && pParam->wstart <= pGroup->finishTs)) { + goto _end; + } + if (hasBefore) { + taosObjListClearEx(&pGroup->pPendingCalcParams, tDestroySSTriggerCalcParam); + } + break; } - } else { - code = taosObjListAppend(&pGroup->pPendingParWinCalcParams, pParam); - if (code != TSDB_CODE_SUCCESS) { - tDestroySSTriggerCalcParam(pParam); - goto _end; + default: { + ST_TASK_ELOG("invalid stream trigger type %d at %s:%d", pTask->triggerType, __func__, __LINE__); + code = TSDB_CODE_INVALID_PARA; + QUERY_CHECK_CODE(code, lino, _end); } } + + if (hasAfter) { + pGroup->finishTs = pParam->wstart; + } + SObjList *pParamList = isParent ? &pGroup->pPendingParWinCalcParams : &pGroup->pPendingCalcParams; + code = taosObjListAppend(pParamList, pParam); + QUERY_CHECK_CODE(code, lino, _end); + pParam = NULL; + if (initSize == 0) { heapInsert(pContext->pMaxDelayHeap, &pGroup->heapNode); } _end: + if (pParam != NULL) { + tDestroySSTriggerCalcParam(pParam); + } if (code != TSDB_CODE_SUCCESS) { ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } @@ -10184,7 +10233,8 @@ static int32_t stHistoryGroupGetDataBlock(SSTriggerHistoryGroup *pGroup, bool sa QUERY_CHECK_CODE(code, lino, _end); } code = stTimestampSorterSetSortInfo(pContext->pSorter, &range, pContext->pCurTableMeta->tbUid, - isCalcData ? pTask->histCalcTsIndex : pTask->histTrigTsIndex); + isCalcData ? pTask->histCalcTsIndex : pTask->histTrigTsIndex, + isCalcData ? pTask->histCalcPkIndex : pTask->histTrigPkIndex); QUERY_CHECK_CODE(code, lino, _end); code = stTimestampSorterSetMetaDatas(pContext->pSorter, pContext->pCurTableMeta); QUERY_CHECK_CODE(code, lino, _end); @@ -10416,7 +10466,7 @@ static int32_t stHistoryGroupDoSessionCheck(SSTriggerHistoryGroup *pGroup) { QUERY_CHECK_CODE(code, lino, _end); STimeWindow range = pContext->stepRange; code = stTimestampSorterSetSortInfo(pContext->pSorter, &range, pContext->pCurTableMeta->tbUid, - pTask->histTrigTsIndex); + pTask->histTrigTsIndex, pTask->histTrigPkIndex); QUERY_CHECK_CODE(code, lino, _end); code = stTimestampSorterSetMetaDatas(pContext->pSorter, pContext->pCurTableMeta); QUERY_CHECK_CODE(code, lino, _end); @@ -10515,7 +10565,7 @@ static int32_t stHistoryGroupDoCountCheck(SSTriggerHistoryGroup *pGroup) { } STimeWindow range = pContext->stepRange; code = stTimestampSorterSetSortInfo(pContext->pSorter, &range, pContext->pCurTableMeta->tbUid, - pTask->histTrigTsIndex); + pTask->histTrigTsIndex, pTask->histTrigPkIndex); QUERY_CHECK_CODE(code, lino, _end); code = stTimestampSorterSetMetaDatas(pContext->pSorter, pContext->pCurTableMeta); QUERY_CHECK_CODE(code, lino, _end); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 18bf9496aa3d..8c73b83fac15 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -14522,6 +14522,20 @@ static void findTsSlotId(SScanPhysiNode* pScanNode, int16_t* pTsSlotId) { } } +static void findPkSlotId(SScanPhysiNode* pScanNode, int16_t* pPkSlotId) { + SNode* pNode = NULL; + FOREACH(pNode, pScanNode->pScanCols) { + STargetNode* pTarget = (STargetNode*)pNode; + if (nodeType(pTarget->pExpr) == QUERY_NODE_COLUMN) { + if (((SColumnNode*)pTarget->pExpr)->isPk) { + *pPkSlotId = pTarget->slotId; + break; + } + } + } +} + +// build trigger query plan in create stream request static int32_t createStreamReqBuildTriggerBuildPlan(STranslateContext* pCxt, SSelectStmt* pTriggerSelect, SCMCreateStreamReq* pReq, SHashObj **pTriggerSlotHash, SNode* pTriggerWindow, SNodeList* pTriggerPartition) { @@ -14564,6 +14578,7 @@ static int32_t createStreamReqBuildTriggerBuildPlan(STranslateContext* pCxt, SSe } findTsSlotId(pScanNode, &pReq->triTsSlotId); + findPkSlotId(pScanNode, &pReq->triPkSlotId); if (pReq->triTsSlotId == -1) { PAR_ERR_JRET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_STREAM_INVALID_QUERY, "Can not find timestamp primary key in trigger query scan")); @@ -15395,6 +15410,7 @@ static int32_t createStreamReqBuildCalcPlan(STranslateContext* pCxt, SQueryPlan* if (pReq->calcTsSlotId == -1) { PAR_ERR_JRET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_STREAM_INVALID_QUERY, "Can not find timestamp primary key in trigger query scan")); } + findPkSlotId((SScanPhysiNode*)pScanSubPlan->pNode, &pReq->calcPkSlotId); } SStreamCalcScan pNewScan = {0}; @@ -15536,6 +15552,9 @@ static int32_t createStreamReqBuildDefaultReq(STranslateContext* pCxt, SCreateSt pReq->flags = CREATE_STREAM_FLAG_NONE; pReq->placeHolderBitmap = PLACE_HOLDER_NONE; pReq->triTsSlotId = -1; + pReq->calcTsSlotId = -1; + pReq->triPkSlotId = -1; + pReq->calcPkSlotId = -1; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 99e2b02e7cdb..cc2e04a7c4a3 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -7680,8 +7680,7 @@ static int32_t tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* // check for head windows if (pScanRange->skey != TSKEY_MIN) { startOfSkeyFirstWin = taosTimeTruncate(pScanRange->skey, pInterval); - endOfSkeyFirstWin = - taosTimeAdd(startOfSkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pTsmaOptCtx->precision, tz); + endOfSkeyFirstWin = taosTimeGetIntervalEnd(startOfSkeyFirstWin, pInterval) + 1; isSkeyAlignedWithTsma = taosTimeTruncate(pScanRange->skey, &tsmaInterval) == pScanRange->skey; } else { endOfSkeyFirstWin = TSKEY_MIN; @@ -7690,8 +7689,7 @@ static int32_t tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* // check for tail windows if (pScanRange->ekey != TSKEY_MAX) { startOfEkeyFirstWin = taosTimeTruncate(pScanRange->ekey, pInterval); - endOfEkeyFirstWin = - taosTimeAdd(startOfEkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pTsmaOptCtx->precision, tz); + endOfEkeyFirstWin = taosTimeGetIntervalEnd(startOfEkeyFirstWin, pInterval) + 1; isEkeyAlignedWithTsma = taosTimeTruncate(pScanRange->ekey + 1, &tsmaInterval) == (pScanRange->ekey + 1); if (startOfEkeyFirstWin > startOfSkeyFirstWin) { needTailWindow = true; diff --git a/test/cases/18-StreamProcessing/02-Stream/test_stream_check_name.py b/test/cases/18-StreamProcessing/02-Stream/test_stream_check_name.py index 21716f2c0014..48644e83ffa3 100644 --- a/test/cases/18-StreamProcessing/02-Stream/test_stream_check_name.py +++ b/test/cases/18-StreamProcessing/02-Stream/test_stream_check_name.py @@ -260,29 +260,6 @@ def killOneDnode2(self): # tdDnodes[numOfDnodes].starttaosd() def checkStreamRunning(self): - tdLog.info(f"check stream running status:") - - timeout = 60 - start_time = time.time() - - while True: - if time.time() - start_time > timeout: - tdLog.error("Timeout waiting for all streams to be running.") - tdLog.error(f"Final stream running status: {streamRunning}") - raise TimeoutError( - f"Stream status did not reach 'Running' within {timeout}s timeout." - ) - - tdSql.query( - f"select status from information_schema.ins_streams order by stream_name;" - ) - streamRunning = tdSql.getColData(0) - - if all(status == "Running" for status in streamRunning): - tdLog.info("All Stream running!") - tdLog.info(f"stream running status: {streamRunning}") - return - else: - tdLog.info("Stream not running! Wait stream running ...") - tdLog.info(f"stream running status: {streamRunning}") - time.sleep(1) + print("wait stream ready ...") + tdStream.checkStreamStatus() + tdLog.info(f"check stream status successfully.") diff --git a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_count_1.py b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_count_1.py index 440be4d05f8e..2fe6ef0549bd 100644 --- a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_count_1.py +++ b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_count_1.py @@ -1031,12 +1031,11 @@ def createStreams(self): stream = StreamItem( id=108, - stream="create stream rdb.s108 count_window(1, c1) from tdb.t1 into rdb.r108 as select _twstart, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= _twstart and cts < _twstart + 5m partition by tbname slimit 1 soffset 2", + stream="create stream rdb.s108 count_window(1, c1) from tdb.t1 into rdb.r108 as select _twstart, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= _twstart and cts < _twstart + 5m partition by tbname order by tbname limit 1 offset 2", res_query="select * from rdb.r108 limit 1", - exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= '2025-01-01 00:00:00.000' and cts < '2025-01-01 00:05:00.000' partition by tbname slimit 1 soffset 2", + exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= '2025-01-01 00:00:00.000' and cts < '2025-01-01 00:05:00.000' partition by tbname order by tbname limit 1 offset 2", ) - # random expect results, skip this test - # self.streams.append(stream) + self.streams.append(stream) stream = StreamItem( id=109, @@ -1120,12 +1119,11 @@ def createStreams(self): stream = StreamItem( id=119, - stream="create stream rdb.s119 count_window(1, c1) from tdb.vtriggers partition by id, tbname into rdb.r119 as select _twstart ts, count(tac1), sum(tbcint) from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where tb.tint=1 and ta.ts >= _twstart and ta.ts < _twend + 5m and cos(tb.cint) >= 0 and cos(ta.c1) > 0);", + stream="create stream rdb.s119 count_window(1, c1) from tdb.vtriggers partition by id, tbname into rdb.r119 as select _twstart ts, count(tac1), sum(tbcint) from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where ta.ts >= _twstart and ta.ts < _twend + 5m and cos(ta.c1) > 0);", res_query="select * from rdb.r119 where id =2 limit 1", - exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(tac1), sum(tbcint), 2, 'v2' from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where tb.tint=1 and ta.ts >= '2025-01-01 00:00:00.000' and ta.ts < '2025-01-01 00:05:00.000' and cos(tb.cint) >= 0 and cos(ta.c1) > 0);", + exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(tac1), sum(tbcint), 2, 'v2' from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where ta.ts >= '2025-01-01 00:00:00.000' and ta.ts < '2025-01-01 00:05:00.000' and cos(ta.c1) > 0);", ) - # random expect results, skip this test - # self.streams.append(stream) + self.streams.append(stream) stream = StreamItem( id=120, diff --git a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_count_2.py b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_count_2.py index 78f589bf69a4..6dd8cc64d217 100644 --- a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_count_2.py +++ b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_count_2.py @@ -1033,12 +1033,11 @@ def createStreams(self): stream = StreamItem( id=108, - stream="create stream rdb.s108 count_window(2, c1) from tdb.t1 into rdb.r108 as select _twstart, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= _twstart and cts < _twstart + 5m partition by tbname slimit 1 soffset 2", + stream="create stream rdb.s108 count_window(2, c1) from tdb.t1 into rdb.r108 as select _twstart, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= _twstart and cts < _twstart + 5m partition by tbname order by tbname limit 1 offset 2", res_query="select * from rdb.r108 limit 1", - exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= '2025-01-01 00:00:00.000' and cts < '2025-01-01 00:05:00.000' partition by tbname slimit 1 soffset 2", + exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= '2025-01-01 00:00:00.000' and cts < '2025-01-01 00:05:00.000' partition by tbname order by tbname limit 1 offset 2", ) - # random expect results, skip this test - # self.streams.append(stream) + self.streams.append(stream) stream = StreamItem( id=109, @@ -1122,12 +1121,11 @@ def createStreams(self): stream = StreamItem( id=119, - stream="create stream rdb.s119 count_window(2, c1) from tdb.vtriggers partition by id, tbname into rdb.r119 as select _twstart ts, count(tac1), sum(tbcint) from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where tb.tint=1 and ta.ts >= _twstart and ta.ts < _twend + 4m and cos(tb.cint) >= 0 and cos(ta.c1) > 0);", + stream="create stream rdb.s119 count_window(2, c1) from tdb.vtriggers partition by id, tbname into rdb.r119 as select _twstart ts, count(tac1), sum(tbcint) from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where ta.ts >= _twstart and ta.ts < _twend + 4m and cos(ta.c1) > 0);", res_query="select * from rdb.r119 where id =2 limit 1", - exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(tac1), sum(tbcint), 2, 'v2' from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where tb.tint=1 and ta.ts >= '2025-01-01 00:00:00.000' and ta.ts < '2025-01-01 00:05:00.000' and cos(tb.cint) >= 0 and cos(ta.c1) > 0);", + exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(tac1), sum(tbcint), 2, 'v2' from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where ta.ts >= '2025-01-01 00:00:00.000' and ta.ts < '2025-01-01 00:05:00.000' and cos(ta.c1) > 0);", ) - # random expect results, skip this test - # self.streams.append(stream) + self.streams.append(stream) stream = StreamItem( id=120, diff --git a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_event.py b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_event.py index 2bcc33c31acd..094df0b7153b 100644 --- a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_event.py +++ b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_event.py @@ -1033,12 +1033,11 @@ def createStreams(self): stream = StreamItem( id=108, - stream="create stream rdb.s108 event_window(start with c2=0 end with c2=10) from tdb.t1 into rdb.r108 as select _twstart, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= _twstart and cts < _twstart + 5m partition by tbname slimit 1 soffset 2", + stream="create stream rdb.s108 event_window(start with c2=0 end with c2=10) from tdb.t1 into rdb.r108 as select _twstart, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= _twstart and cts < _twstart + 5m partition by tbname order by tbname limit 1 offset 2", res_query="select * from rdb.r108 limit 1", - exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= '2025-01-01 00:00:00.000' and cts < '2025-01-01 00:05:00.000' partition by tbname slimit 1 soffset 2", + exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= '2025-01-01 00:00:00.000' and cts < '2025-01-01 00:05:00.000' partition by tbname order by tbname limit 1 offset 2", ) - # random expect results, skip this test - # self.streams.append(stream) + self.streams.append(stream) stream = StreamItem( id=109, @@ -1122,12 +1121,11 @@ def createStreams(self): stream = StreamItem( id=119, - stream="create stream rdb.s119 event_window(start with c2=0 end with c2=10) from tdb.vtriggers partition by id, tbname into rdb.r119 as select _twstart ts, count(tac1), sum(tbcint) from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where tb.tint=1 and ta.ts >= _twstart and ta.ts < _twend + 4m and cos(tb.cint) >= 0 and cos(ta.c1) > 0);", + stream="create stream rdb.s119 event_window(start with c2=0 end with c2=10) from tdb.vtriggers partition by id, tbname into rdb.r119 as select _twstart ts, count(tac1), sum(tbcint) from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where ta.ts >= _twstart and ta.ts < _twend + 4m and cos(ta.c1) > 0);", res_query="select * from rdb.r119 where id =2 limit 1", - exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(tac1), sum(tbcint), 2, 'v2' from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where tb.tint=1 and ta.ts >= '2025-01-01 00:00:00.000' and ta.ts < '2025-01-01 00:05:00.000' and cos(tb.cint) >= 0 and cos(ta.c1) > 0);", + exp_query="select cast('2025-01-01 00:15:00.000' as timestamp) ts, count(tac1), sum(tbcint), 2, 'v2' from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where ta.ts >= '2025-01-01 00:15:00.000' and ta.ts < '2025-01-01 00:20:00.000' and cos(ta.c1) > 0);", ) - # random expect results, skip this test - # self.streams.append(stream) + self.streams.append(stream) stream = StreamItem( id=120, diff --git a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_session.py b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_session.py index fc55dcb170be..e3ac9f35062a 100644 --- a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_session.py +++ b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_session.py @@ -1038,12 +1038,11 @@ def createStreams(self): stream = StreamItem( id=108, - stream="create stream rdb.s108 session(ts, 2m) from tdb.t1 into rdb.r108 as select _twstart, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= _twstart and cts < _twstart + 5m partition by tbname slimit 1 soffset 2", + stream="create stream rdb.s108 session(ts, 2m) from tdb.t1 into rdb.r108 as select _twstart, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= _twstart and cts < _twstart + 5m partition by tbname order by tbname limit 1 offset 2", res_query="select * from rdb.r108 limit 1", - exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= '2025-01-01 00:00:00.000' and cts < '2025-01-01 00:05:00.000' partition by tbname slimit 1 soffset 2", + exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= '2025-01-01 00:00:00.000' and cts < '2025-01-01 00:05:00.000' partition by tbname order by tbname limit 1 offset 2", ) - # random expect results, skip this test - # self.streams.append(stream) + self.streams.append(stream) stream = StreamItem( id=109, @@ -1127,12 +1126,11 @@ def createStreams(self): stream = StreamItem( id=119, - stream="create stream rdb.s119 session(ts, 2m) from tdb.vtriggers partition by id, tbname into rdb.r119 as select _twstart ts, count(tac1), sum(tbcint) from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where tb.tint=1 and ta.ts >= _twstart and ta.ts < _twend + 4m and cos(tb.cint) >= 0 and cos(ta.c1) > 0);", + stream="create stream rdb.s119 session(ts, 2m) from tdb.vtriggers partition by id, tbname into rdb.r119 as select _twstart ts, count(tac1), sum(tbcint) from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where ta.ts >= _twstart and ta.ts < _twend + 4m and cos(ta.c1) > 0);", res_query="select * from rdb.r119 where id =2 limit 1", - exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(tac1), sum(tbcint), 2, 'v2' from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where tb.tint=1 and ta.ts >= '2025-01-01 00:00:00.000' and ta.ts < '2025-01-01 00:05:00.000' and cos(tb.cint) >= 0 and cos(ta.c1) > 0);", + exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(tac1), sum(tbcint), 2, 'v2' from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where ta.ts >= '2025-01-01 00:00:00.000' and ta.ts < '2025-01-01 00:05:00.000' and cos(ta.c1) > 0);", ) - # random expect results, skip this test - # self.streams.append(stream) + self.streams.append(stream) stream = StreamItem( id=120, diff --git a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_sliding.py b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_sliding.py index 9b4b66a08f76..40752c34d97b 100644 --- a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_sliding.py +++ b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_sliding.py @@ -1027,12 +1027,11 @@ def createStreams(self): stream = StreamItem( id=108, - stream="create stream rdb.s108 interval(5m) sliding(5m) from tdb.t1 into rdb.r108 as select _twstart, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= _twstart and cts < _twend partition by tbname slimit 1 soffset 2", + stream="create stream rdb.s108 interval(5m) sliding(5m) from tdb.t1 into rdb.r108 as select _twstart, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= _twstart and cts < _twend partition by tbname order by tbname limit 1 offset 2", res_query="select * from rdb.r108 limit 1", - exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= '2025-01-01 00:00:00.000' and cts < '2025-01-01 00:05:00.000' partition by tbname slimit 1 soffset 2", + exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= '2025-01-01 00:00:00.000' and cts < '2025-01-01 00:05:00.000' partition by tbname order by tbname limit 1 offset 2", ) - # random expect results, skip this test - # self.streams.append(stream) + self.streams.append(stream) stream = StreamItem( id=109, @@ -1116,12 +1115,11 @@ def createStreams(self): stream = StreamItem( id=119, - stream="create stream rdb.s119 interval(5m) sliding(5m) from tdb.vtriggers partition by id into rdb.r119 as select _twstart ts, count(tac1), sum(tbcint) from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where tb.tint=1 and ta.ts >= _twstart and ta.ts < _twend and cos(tb.cint) >= 0 and cos(ta.c1) > 0);", + stream="create stream rdb.s119 interval(5m) sliding(5m) from tdb.vtriggers partition by id into rdb.r119 as select _twstart ts, count(tac1), sum(tbcint) from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where ta.ts >= _twstart and ta.ts < _twend and cos(ta.c1) > 0);", res_query="select * from rdb.r119 limit 1", - exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(tac1), sum(tbcint), 1 from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where tb.tint=1 and ta.ts >= '2025-01-01 00:00:00.000' and ta.ts < '2025-01-01 00:05:00.000' and cos(tb.cint) >= 0 and cos(ta.c1) > 0);", + exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(tac1), sum(tbcint), 1 from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where ta.ts >= '2025-01-01 00:00:00.000' and ta.ts < '2025-01-01 00:05:00.000' and cos(ta.c1) > 0);", ) - # random expect results, skip this test - # self.streams.append(stream) + self.streams.append(stream) stream = StreamItem( id=120, diff --git a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_state.py b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_state.py index 365430121423..acba1355ed19 100644 --- a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_state.py +++ b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_state.py @@ -1033,12 +1033,11 @@ def createStreams(self): stream = StreamItem( id=108, - stream="create stream rdb.s108 state_window(c1) from tdb.t1 into rdb.r108 as select _twstart, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= _twstart and cts < _twstart + 5m partition by tbname slimit 1 soffset 2", + stream="create stream rdb.s108 state_window(c1) from tdb.t1 into rdb.r108 as select _twstart, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= _twstart and cts < _twstart + 5m partition by tbname order by tbname limit 1 offset 2", res_query="select * from rdb.r108 limit 1", - exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= '2025-01-01 00:00:00.000' and cts < '2025-01-01 00:05:00.000' partition by tbname slimit 1 soffset 2", + exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(cts), first(cuint), last(cuint), sum(cuint), tbname from qdb.meters where tbname in ('t1', 't2', 't3', 't4') and cts >= '2025-01-01 00:00:00.000' and cts < '2025-01-01 00:05:00.000' partition by tbname order by tbname limit 1 offset 2", ) - # random expect results, skip this test - # self.streams.append(stream) + self.streams.append(stream) stream = StreamItem( id=109, @@ -1122,12 +1121,11 @@ def createStreams(self): stream = StreamItem( id=119, - stream="create stream rdb.s119 state_window(c1) from tdb.vtriggers partition by id, tbname into rdb.r119 as select _twstart ts, count(tac1), sum(tbcint) from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where tb.tint=1 and ta.ts >= _twstart and ta.ts < _twend + 4m and cos(tb.cint) >= 0 and cos(ta.c1) > 0);", + stream="create stream rdb.s119 state_window(c1) from tdb.vtriggers partition by id, tbname into rdb.r119 as select _twstart ts, count(tac1), sum(tbcint) from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where ta.ts >= _twstart and ta.ts < _twend + 4m and cos(ta.c1) > 0);", res_query="select * from rdb.r119 where id =2 limit 1", - exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(tac1), sum(tbcint), 2, 'v2' from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where tb.tint=1 and ta.ts >= '2025-01-01 00:00:00.000' and ta.ts < '2025-01-01 00:05:00.000' and cos(tb.cint) >= 0 and cos(ta.c1) > 0);", + exp_query="select cast('2025-01-01 00:00:00.000' as timestamp) ts, count(tac1), sum(tbcint), 2, 'v2' from (select ta.ts tats, tb.cts tbts, ta.c1 tac1, tb.cint tbcint from qdb.meters tb right asof join tdb.t1 ta on ta.ts < tb.cts jlimit 10 where ta.ts >= '2025-01-01 00:00:00.000' and ta.ts < '2025-01-01 00:05:00.000' and cos(ta.c1) > 0);", ) - # random expect results, skip this test - # self.streams.append(stream) + self.streams.append(stream) stream = StreamItem( id=120, diff --git a/test/cases/18-StreamProcessing/30-OldPyCases/test_oldcase_checkpoint_info.py b/test/cases/18-StreamProcessing/30-OldPyCases/test_oldcase_checkpoint_info.py index fe5b005367d4..a7e4faa13ba3 100644 --- a/test/cases/18-StreamProcessing/30-OldPyCases/test_oldcase_checkpoint_info.py +++ b/test/cases/18-StreamProcessing/30-OldPyCases/test_oldcase_checkpoint_info.py @@ -319,32 +319,35 @@ def checkCheckpointFile(self): def StopStream(self): tdLog.info(f"stop stream:") tdSql.query(f"show {self.dbname}.streams;") - streamname = tdSql.getColData(0) - for i in streamname: - tdLog.info(f"stop stream {i}") - tdSql.execute(f"stop stream {i}") + streamName = tdSql.getColData(0) + for name in streamName: + tdLog.info(f"stop stream {name}") + tdSql.execute(f"stop stream {name}") + time.sleep(5) tdSql.query(f"show {self.dbname}.streams;") - stateStream = tdSql.getData(0,1) - for i in stateStream: - if stateStream != 'Stopped' : - raise Exception(f"Stop stream error") + streamName = tdSql.getColData(0) + streamStatus = tdSql.getColData(1) + for name, status in zip(streamName,streamStatus): + if status != 'Stopped': + raise Exception(f"Stop stream {name} error, status {status}") tdLog.info(f"stop all stream success") def StartStream(self): tdLog.info(f"start stream:") tdSql.query(f"show {self.dbname}.streams;") - streamname = tdSql.getColData(0) - for i in streamname: - tdLog.info(f"start stream {i}") - tdSql.execute(f"start stream {i}") + streamName = tdSql.getColData(0) + for name in streamName: + tdLog.info(f"start stream {name}") + tdSql.execute(f"start stream {name}") self.checkStreamRunning() tdSql.query(f"show {self.dbname}.streams;") - stateStream = tdSql.getData(0,1) - for i in stateStream: - if stateStream != 'Running' : - raise Exception(f"Start stream error") + streamName = tdSql.getColData(0) + streamStatus = tdSql.getColData(1) + for name, status in zip(streamName,streamStatus): + if status != 'Running': + raise Exception(f"Start stream {name} error, status {status}") tdLog.info(f"start all stream success") diff --git a/test/ci/cases.task b/test/ci/cases.task index 34a12f7a1e00..36e5125fbd35 100644 --- a/test/ci/cases.task +++ b/test/ci/cases.task @@ -771,7 +771,7 @@ # 19-TSMAs -90,,y,.,./ci/pytest.sh pytest cases/19-TSMAs/test_tsma.py +,,y,.,./ci/pytest.sh pytest cases/19-TSMAs/test_tsma.py ,,y,.,./ci/pytest.sh pytest cases/19-TSMAs/test_sma_basic.py -N 3 ,,y,.,./ci/pytest.sh pytest cases/19-TSMAs/test_sma_bugs.py ,,y,.,./ci/pytest.sh pytest cases/19-TSMAs/test_time_range_wise.py -N 3 From bf09785ac0b1ffc4edc1d1d38b6f9d937ff13a4c Mon Sep 17 00:00:00 2001 From: WANG MINGMING Date: Thu, 26 Feb 2026 16:58:10 +0800 Subject: [PATCH 10/12] fix(stream): add total rows if there is meta data (#34578) --- source/dnode/vnode/src/vnd/vnodeStream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/vnd/vnodeStream.c b/source/dnode/vnode/src/vnd/vnodeStream.c index 51c29c632b09..ff757fa6e969 100644 --- a/source/dnode/vnode/src/vnd/vnodeStream.c +++ b/source/dnode/vnode/src/vnd/vnodeStream.c @@ -1741,6 +1741,7 @@ static int32_t scanSubmitData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamR WalMetaResult* pMeta = (WalMetaResult*)px; STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver)); ((SSDataBlock*)rsp->metaBlock)->info.rows++; + rsp->totalRows++; ST_TASK_DLOG("%s process meta data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64 ", ver:%"PRId64, __func__, pMeta->skey, pMeta->ekey, pMeta->id, ver); px = tSimpleHashIterate(gidHash, px, &iter); From 7db2533a9c049d78f3b03f765acfec74795bb778 Mon Sep 17 00:00:00 2001 From: guozhenwei <2227465945@qq.com> Date: Fri, 27 Feb 2026 10:22:11 +0800 Subject: [PATCH 11/12] enh: change the default log retention limit to 3 (#34596) close: https://project.feishu.cn/taosdata_td/sub_task1/detail/6835717767 --- docs/en/14-reference/01-components/06-taoskeeper.md | 8 ++++---- docs/zh/14-reference/01-components/06-taoskeeper.md | 8 ++++---- tools/keeper/config/taoskeeper.toml | 4 ++-- tools/keeper/config/taoskeeper_enterprise.toml | 4 ++-- tools/keeper/infrastructure/config/log.go | 13 +++++++++---- 5 files changed, 21 insertions(+), 16 deletions(-) diff --git a/docs/en/14-reference/01-components/06-taoskeeper.md b/docs/en/14-reference/01-components/06-taoskeeper.md index 5ec0424c2d49..1c040dcb5d40 100644 --- a/docs/en/14-reference/01-components/06-taoskeeper.md +++ b/docs/en/14-reference/01-components/06-taoskeeper.md @@ -37,11 +37,11 @@ Usage of taoskeeper: -H, --host string http host. Env "TAOS_KEEPER_HOST" --instanceId int instance ID. Env "TAOS_KEEPER_INSTANCE_ID" (default 64) --log.compress whether to compress old log. Env "TAOS_KEEPER_LOG_COMPRESS" - --log.keepDays uint log retention days, must be a positive integer. Env "TAOS_KEEPER_LOG_KEEP_DAYS" (default 30) + --log.keepDays uint log retention days, must be a positive integer. Env "TAOS_KEEPER_LOG_KEEP_DAYS" (default 3) --log.level string log level (trace debug info warning error). Env "TAOS_KEEPER_LOG_LEVEL" (default "info") --log.path string log path. Env "TAOS_KEEPER_LOG_PATH" (default "/var/log/taos") --log.reservedDiskSize string reserved disk size for log dir (KB MB GB), must be a positive integer. Env "TAOS_KEEPER_LOG_RESERVED_DISK_SIZE" (default "1GB") - --log.rotationCount uint log rotation count. Env "TAOS_KEEPER_LOG_ROTATION_COUNT" (default 5) + --log.rotationCount uint log rotation count. Env "TAOS_KEEPER_LOG_ROTATION_COUNT" (default 3) --log.rotationSize string log rotation size(KB MB GB), must be a positive integer. Env "TAOS_KEEPER_LOG_ROTATION_SIZE" (default "1GB") --log.rotationTime duration deprecated: log rotation time always 24 hours. Env "TAOS_KEEPER_LOG_ROTATION_TIME" (default 24h0m0s) --logLevel string log level (trace debug info warning error). Env "TAOS_KEEPER_LOG_LEVEL" (default "info") @@ -119,9 +119,9 @@ incgroup = false # path = "/var/log/taos" level = "info" # Number of log file rotations before deletion. -rotationCount = 30 +rotationCount = 3 # The number of days to retain log files. -keepDays = 30 +keepDays = 3 # The maximum size of a log file before rotation. rotationSize = "1GB" # If set to true, log files will be compressed. diff --git a/docs/zh/14-reference/01-components/06-taoskeeper.md b/docs/zh/14-reference/01-components/06-taoskeeper.md index 2d1f9e1d0e9f..c9447a6f1aad 100644 --- a/docs/zh/14-reference/01-components/06-taoskeeper.md +++ b/docs/zh/14-reference/01-components/06-taoskeeper.md @@ -37,11 +37,11 @@ Usage of taoskeeper: -H, --host string http host. Env "TAOS_KEEPER_HOST" --instanceId int instance ID. Env "TAOS_KEEPER_INSTANCE_ID" (default 64) --log.compress whether to compress old log. Env "TAOS_KEEPER_LOG_COMPRESS" - --log.keepDays uint log retention days, must be a positive integer. Env "TAOS_KEEPER_LOG_KEEP_DAYS" (default 30) + --log.keepDays uint log retention days, must be a positive integer. Env "TAOS_KEEPER_LOG_KEEP_DAYS" (default 3) --log.level string log level (trace debug info warning error). Env "TAOS_KEEPER_LOG_LEVEL" (default "info") --log.path string log path. Env "TAOS_KEEPER_LOG_PATH" (default "/var/log/taos") --log.reservedDiskSize string reserved disk size for log dir (KB MB GB), must be a positive integer. Env "TAOS_KEEPER_LOG_RESERVED_DISK_SIZE" (default "1GB") - --log.rotationCount uint log rotation count. Env "TAOS_KEEPER_LOG_ROTATION_COUNT" (default 5) + --log.rotationCount uint log rotation count. Env "TAOS_KEEPER_LOG_ROTATION_COUNT" (default 3) --log.rotationSize string log rotation size(KB MB GB), must be a positive integer. Env "TAOS_KEEPER_LOG_ROTATION_SIZE" (default "1GB") --log.rotationTime duration deprecated: log rotation time always 24 hours. Env "TAOS_KEEPER_LOG_ROTATION_TIME" (default 24h0m0s) --logLevel string log level (trace debug info warning error). Env "TAOS_KEEPER_LOG_LEVEL" (default "info") @@ -119,9 +119,9 @@ incgroup = false # path = "/var/log/taos" level = "info" # Number of log file rotations before deletion. -rotationCount = 30 +rotationCount = 3 # The number of days to retain log files. -keepDays = 30 +keepDays = 3 # The maximum size of a log file before rotation. rotationSize = "1GB" # If set to true, log files will be compressed. diff --git a/tools/keeper/config/taoskeeper.toml b/tools/keeper/config/taoskeeper.toml index 77945e2fab64..1dfc9c9bb66d 100644 --- a/tools/keeper/config/taoskeeper.toml +++ b/tools/keeper/config/taoskeeper.toml @@ -45,9 +45,9 @@ incgroup = false # path = "/var/log/taos" level = "info" # Number of log file rotations before deletion. -rotationCount = 30 +rotationCount = 3 # The number of days to retain log files. -keepDays = 30 +keepDays = 3 # The maximum size of a log file before rotation. rotationSize = "1GB" # If set to true, log files will be compressed. diff --git a/tools/keeper/config/taoskeeper_enterprise.toml b/tools/keeper/config/taoskeeper_enterprise.toml index c31977b77ac7..4aa25722022c 100644 --- a/tools/keeper/config/taoskeeper_enterprise.toml +++ b/tools/keeper/config/taoskeeper_enterprise.toml @@ -57,9 +57,9 @@ cachemodel = "both" # path = "/var/log/taos" level = "info" # Number of log file rotations before deletion. -rotationCount = 30 +rotationCount = 3 # The number of days to retain log files. -keepDays = 30 +keepDays = 3 # The maximum size of a log file before rotation. rotationSize = "1GB" # If set to true, log files will be compressed. diff --git a/tools/keeper/infrastructure/config/log.go b/tools/keeper/infrastructure/config/log.go index 557d2ed8541c..8a4506611373 100644 --- a/tools/keeper/infrastructure/config/log.go +++ b/tools/keeper/infrastructure/config/log.go @@ -47,13 +47,18 @@ func initLog() { _ = viper.BindEnv("log.level", "TAOS_KEEPER_LOG_LEVEL") pflag.String("log.level", "info", `log level (trace debug info warning error). Env "TAOS_KEEPER_LOG_LEVEL"`) - viper.SetDefault("log.rotationCount", 5) + const ( + defaultLogRotationCount = 3 + defaultLogKeepDays = 3 + ) + + viper.SetDefault("log.rotationCount", defaultLogRotationCount) _ = viper.BindEnv("log.rotationCount", "TAOS_KEEPER_LOG_ROTATION_COUNT") - pflag.Uint("log.rotationCount", 5, `log rotation count. Env "TAOS_KEEPER_LOG_ROTATION_COUNT"`) + pflag.Uint("log.rotationCount", defaultLogRotationCount, `log rotation count. Env "TAOS_KEEPER_LOG_ROTATION_COUNT"`) - viper.SetDefault("log.keepDays", 30) + viper.SetDefault("log.keepDays", defaultLogKeepDays) _ = viper.BindEnv("log.keepDays", "TAOS_KEEPER_LOG_KEEP_DAYS") - pflag.Uint("log.keepDays", 30, `log retention days, must be a positive integer. Env "TAOS_KEEPER_LOG_KEEP_DAYS"`) + pflag.Uint("log.keepDays", defaultLogKeepDays, `log retention days, must be a positive integer. Env "TAOS_KEEPER_LOG_KEEP_DAYS"`) viper.SetDefault("log.rotationTime", time.Hour*24) _ = viper.BindEnv("log.rotationTime", "TAOS_KEEPER_LOG_ROTATION_TIME") From 1ae650d5bbe072cc7180de5d9497d4c55e919cca Mon Sep 17 00:00:00 2001 From: guichuan zhang <460873250@qq.com> Date: Fri, 27 Feb 2026 12:20:43 +0800 Subject: [PATCH 12/12] fix(xnode): support displaying dynamic error message returned by xnoded (#34601) Closes [6833801941](https://project.feishu.cn/taosdata_td/defect/detail/6833801941) --- source/client/src/clientMsgHandler.c | 33 +++++++++++++++ source/dnode/mnode/impl/src/mndXnode.c | 58 +++++++++++++++++++------- test/cases/42-Xnode/test_xnode.py | 3 ++ 3 files changed, 80 insertions(+), 14 deletions(-) diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index f7246452c7ba..59039e1c3627 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -1358,6 +1358,37 @@ int32_t processCreateTotpSecretRsp(void* param, SDataBuf* pMsg, int32_t code) { return code; } +int32_t processCreateXnodeTaskRsp(void* param, SDataBuf* pMsg, int32_t code) { + SRequestObj* pRequest = param; + if (code != TSDB_CODE_SUCCESS) { + setErrno(pRequest, code); + if (code == TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR) { + if (pMsg->pData != NULL && pMsg->len > 0) { + if (pMsg->len <= pRequest->msgBufLen) { + tstrncpy(pRequest->msgBuf, (char*)pMsg->pData, pRequest->msgBufLen); + } else { + taosMemoryFreeClear(pRequest->msgBuf); + pRequest->msgBuf = pMsg->pData; + pMsg->pData = NULL; + pRequest->msgBufLen = pMsg->len; + } + } + } + } + + if (pMsg->pData) { + taosMemoryFree(pMsg->pData); + } + taosMemoryFree(pMsg->pEpSet); + + if (pRequest->body.queryFp != NULL) { + pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code); + } else if (tsem_post(&pRequest->body.rspSem) != 0) { + tscError("failed to post semaphore"); + } + return code; +} + __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) { switch (msgType) { @@ -1385,6 +1416,8 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) { return processCreateTokenRsp; case TDMT_MND_CREATE_TOTP_SECRET: return processCreateTotpSecretRsp; + case TDMT_MND_CREATE_XNODE_TASK: + return processCreateXnodeTaskRsp; default: return genericRspCallback; diff --git a/source/dnode/mnode/impl/src/mndXnode.c b/source/dnode/mnode/impl/src/mndXnode.c index bb67e4524745..241c965afcd8 100644 --- a/source/dnode/mnode/impl/src/mndXnode.c +++ b/source/dnode/mnode/impl/src/mndXnode.c @@ -1620,6 +1620,25 @@ static int32_t mndValidateCreateXnodeTaskReq(SRpcMsg *pReq, SMCreateXnodeTaskReq code = terrno; goto _OVER; } + SJson *errorJson = tjsonGetObjectItem(pJson, "__inner_error"); + if (errorJson != NULL) { + code = TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR; + char* pValueString = ((cJSON*)errorJson)->valuestring; + if (NULL == pValueString) { + mError("should not failed to get __inner_error message, task name:%s", pCreateReq->name.ptr); + goto _OVER; + } + //handle response + int32_t contLen = strlen(pValueString) + strlen(tstrerror(code)) + 32; + void *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + TAOS_CHECK_GOTO(terrno, NULL, _OVER); + } + pReq->info.rspLen = contLen; + pReq->info.rsp = pRsp; + snprintf(pReq->info.rsp, contLen, "%s, since: %s", tstrerror(code), pValueString); + goto _OVER; + } // todo: only4test // (void)mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, 60000, pContStr, strlen(pContStr)); @@ -1658,7 +1677,7 @@ static int32_t mndHandleCreateXnodeTaskResult(int32_t createCode) { static int32_t mndProcessCreateXnodeTaskReq(SRpcMsg *pReq) { mDebug("xnode create task request received, contLen:%d\n", pReq->contLen); SMnode *pMnode = pReq->info.node; - int32_t code = -1; + int32_t code = 0; SMCreateXnodeTaskReq createReq = {0}; // Step 1: Validate permissions @@ -3909,7 +3928,7 @@ static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_POST, 1), &lino, _OVER); TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen), &lino, _OVER); TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf), &lino, _OVER); - TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L), &lino, _OVER); + TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L), &lino, _OVER); TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L), &lino, _OVER); mDebug("xnode curl post request will sent, url:%s len:%d content:%s", url, bufLen, buf); @@ -4001,7 +4020,7 @@ SJson *mndSendReqRetJson(const char *url, EHttpType type, int64_t timeout, const if (!taosCheckExistFile(socketPath)) { uError("xnode failed to send request, socket path:%s not exist", socketPath); terrno = TSDB_CODE_MND_XNODE_URL_CANT_ACCESS; - goto _OVER; + goto _EXIT; } if (type == HTTP_TYPE_GET) { if ((terrno = taosCurlGetRequest(url, &curlRsp, timeout, socketPath)) != 0) { @@ -4018,21 +4037,32 @@ SJson *mndSendReqRetJson(const char *url, EHttpType type, int64_t timeout, const } else { uError("xnode invalid http type:%d", type); terrno = TSDB_CODE_MND_XNODE_INVALID_MSG; - goto _OVER; + goto _EXIT; } - if (curlRsp.data == NULL || curlRsp.dataLen == 0) { +_OVER: + if (terrno == TSDB_CODE_SUCCESS) { + if (curlRsp.data == NULL || curlRsp.dataLen == 0) { + pJson = tjsonCreateObject(); + goto _EXIT; + } + pJson = tjsonParse(curlRsp.data); + if (pJson == NULL) { + terrno = TSDB_CODE_INVALID_JSON_FORMAT; + goto _EXIT; + } + } else if (terrno == TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR) { pJson = tjsonCreateObject(); - goto _OVER; - } - - pJson = tjsonParse(curlRsp.data); - if (pJson == NULL) { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; - goto _OVER; + char *buf = taosMemCalloc(1, curlRsp.dataLen + 1); + (void)memcpy(buf, curlRsp.data, curlRsp.dataLen); + if (tjsonAddStringToObject(pJson, "__inner_error", buf) != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(buf); + goto _EXIT; + } + taosMemoryFreeClear(buf); } -_OVER: +_EXIT: if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data); if (terrno != TSDB_CODE_SUCCESS) { mError("xnode failed to send request, url: %s, since:%s", url, tstrerror(terrno)); @@ -5125,4 +5155,4 @@ void mndRestartXnoded(SMnode *pMnode) { mndReleaseXnodeUserPass(pMnode, pObj); mInfo("mndxnode xnoded restarted"); return; -} \ No newline at end of file +} diff --git a/test/cases/42-Xnode/test_xnode.py b/test/cases/42-Xnode/test_xnode.py index 75ca2ae195c1..acb3cec8b1ed 100644 --- a/test/cases/42-Xnode/test_xnode.py +++ b/test/cases/42-Xnode/test_xnode.py @@ -328,6 +328,9 @@ def test_task_lifecycle(self): self.no_syntax_fail_execute(f"DROP DATABASE zgc_{rid}") self.no_syntax_fail_execute(f"DROP DATABASE test_{rid}") + long_col = "a"*1024 + self.must_fail_execute(f"CREATE XNODE TASK 't_{rid}' FROM 'taos://root:taosdata@localhost:6030/test_{rid}?test={long_col}' TO 'taos://root:taosdata@localhost:6030/zgc_{rid}' WITH STATUS 'created' VIA 1 labels 'labels';") + def test_agent_lifecycle(self): """test no syntax fail query