diff --git a/.gitignore b/.gitignore index 2fbaea687bb2..30f4adeff466 100644 --- a/.gitignore +++ b/.gitignore @@ -61,3 +61,6 @@ test/allure-results/ .idea/ cmake-build-debug/ cmake-build-release/ + +# Local Claude AI tool settings +.claude/ diff --git a/docs/en/08-operation/04-maintenance.md b/docs/en/08-operation/04-maintenance.md index 2e42b577d0ed..0d1415374ebc 100644 --- a/docs/en/08-operation/04-maintenance.md +++ b/docs/en/08-operation/04-maintenance.md @@ -75,6 +75,22 @@ restore qnode on dnode ; # Restore qnode on dnode - This feature is based on the recovery of existing replication capabilities, not disaster recovery or backup recovery. Therefore, for the mnode and vnode to be recovered, the prerequisite for using this command is that the other two replicas of the mnode or vnode can still function normally. - This command cannot repair individual files in the data directory that are damaged or lost. For example, if individual files or data in an mnode or vnode are damaged, it is not possible to recover a specific file or block of data individually. In this case, you can choose to completely clear the data of that mnode/vnode and then perform recovery. +### Monitoring Snapshot Send Progress + +After running `restore dnode`, TDengine synchronizes data to the target node via snapshot replication. Use the following system tables to monitor progress in real time: + +```sql +-- Vnode-level: overall progress for each vnode currently sending a snapshot +SELECT * FROM information_schema.ins_snap_send_vnodes; + +-- Fileset-level: per-time-partition file transfer details for a given vgroup +SELECT * FROM information_schema.ins_snap_send_filesets +WHERE vgroup_id = +ORDER BY fid; +``` + +For column definitions of both tables, see [Metadata](../../tdengine-reference/sql-manual/metadata). + ## Splitting Virtual Groups When a vgroup is overloaded with CPU or Disk resource usage due to too many subtables, after adding a dnode, you can split the vgroup into two virtual groups using the `split vgroup` command. After the split, the newly created two vgroups will undertake the read and write services originally provided by one vgroup. This command was first released in version 3.0.6.0, and it is recommended to use the latest version whenever possible. diff --git a/docs/en/14-reference/03-taos-sql/22-meta.md b/docs/en/14-reference/03-taos-sql/22-meta.md index 1cf145676032..294388f61d70 100644 --- a/docs/en/14-reference/03-taos-sql/22-meta.md +++ b/docs/en/14-reference/03-taos-sql/22-meta.md @@ -359,6 +359,37 @@ Provides information about file sets. | 7 | last_compact | TIMESTAMP | Time of the last compaction | | 8 | should_compact | bool | Whether the file set should be compacted | +## INS_SNAP_SEND_VNODES + +Provides overall progress information for vnodes currently undergoing snapshot transfer. A row appears when a vgroup's leader is transferring a snapshot to a follower, and disappears automatically when the transfer completes. + +| # | **Column Name** | **Data Type** | **Description** | +| --- | :------------------: | ------------- | -------------------------------------------------- | +| 1 | vgroup_id | INT | vgroup ID of the vnode | +| 2 | dnode_id | INT | dnode ID of the leader | +| 3 | total_file_sets | INT | total number of filesets to transfer | +| 4 | finished_file_sets | INT | number of filesets fully transferred | +| 5 | start_time | TIMESTAMP | time when the snapshot reader was opened | +| 6 | elapsed | VARCHAR(16) | elapsed duration, format `H:MM:SS` | + +## INS_SNAP_SEND_FILESETS + +Provides file-level transfer progress for each fileset (time partition) in the active snapshot send. Rows disappear automatically when the snapshot transfer for the owning vnode completes. + +| # | **Column Name** | **Data Type** | **Description** | +| --- | :-------------------: | ------------- | ---------------------------------------------------------------------------------------------------------------------------------- | +| 1 | vgroup_id | INT | vgroup ID of the vnode | +| 2 | fid | INT | fileset ID (time-partition ID) | +| 3 | file_count | INT | total number of physical files in this fileset (HEAD/DATA/SMA/STT etc.) | +| 4 | finished_file_count | INT | number of physical files fully transferred | +| 5 | total_size | BIGINT | sum of all physical file sizes in this fileset, in bytes | +| 6 | read_size | BIGINT | bytes read and sent so far (monotonically increasing; same unit as total_size in RAW mode; re-compressed bytes in ROW mode, use as trend indicator only) | +| 7 | start_time | TIMESTAMP | time when transfer of this fileset began | +| 8 | elapsed | VARCHAR(16) | elapsed time for this fileset, format `H:MM:SS` | +| 9 | start_index | BIGINT | start version of this fileset (sver) | +| 10 | end_index | BIGINT | end version of this fileset (ever) | +| 11 | transfer_type | VARCHAR(4) | transfer mode: `raw` (full RAW transfer) or `row` (incremental ROW transfer) | + ## INS_VNODES Provides information about vnodes in the system. Users with SYSINFO property set to 0 cannot view this table. diff --git a/docs/zh/08-operation/04-maintenance.md b/docs/zh/08-operation/04-maintenance.md index 0a15dc106d40..f3c634794973 100644 --- a/docs/zh/08-operation/04-maintenance.md +++ b/docs/zh/08-operation/04-maintenance.md @@ -77,6 +77,22 @@ restore qnode on dnode ;# 恢复dnode上的qnode - 该功能是基于已有的复制功能的恢复,不是灾难恢复或者备份恢复,所以对于要恢复的 mnode 和 vnode 来说,使用该命令的前提是还存在该 mnode 或 vnode 的其它两个副本仍然能够正常工作。 - 该命令不能修复数据目录中的个别文件的损坏或者丢失。例如,如果某个 mnode 或者 vnode 中的个别文件或数据损坏,无法单独恢复损坏的某个文件或者某块数据。此时,可以选择将该 mnode/vnode 的数据全部清空再进行恢复。 +### 监控 Snapshot 发送进度 + +执行 `restore dnode` 后,TDengine 会通过 snapshot 复制将数据从其他副本同步到目标节点。可以通过以下系统表实时查看进度: + +```sql +-- vnode 级:查看每个正在传输 snapshot 的 vnode 的整体进度 +SELECT * FROM information_schema.ins_snap_send_vnodes; + +-- fileset 级:查看指定 vgroup 中每个时间分片的文件传输详情 +SELECT * FROM information_schema.ins_snap_send_filesets +WHERE vgroup_id = +ORDER BY fid; +``` + +两张表的列定义参见[元数据](../../reference/taos-sql/meta)。 + ## 分裂虚拟组 当一个 vgroup 因为子表数过多而导致 CPU 或 Disk 资源使用量负载过高时,增加 dnode 节点后,可通过 `split vgroup` 命令把该 vgroup 分裂为两个虚拟组。分裂完成后,新产生的两个 vgroup 承担原来由一个 vgroup 提供的读写服务。该命令在 3.0.6.0 版本第一次发布,建议尽可能使用最新版本。 diff --git a/docs/zh/14-reference/03-taos-sql/22-meta.md b/docs/zh/14-reference/03-taos-sql/22-meta.md index ebd50edc576c..cdc64c261694 100644 --- a/docs/zh/14-reference/03-taos-sql/22-meta.md +++ b/docs/zh/14-reference/03-taos-sql/22-meta.md @@ -360,6 +360,37 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | 7 | last_compact | TIMESTAMP | 最后一次压缩的时间 | | 8 | should_compact | bool | 是否需要压缩,true:需要,false:不需要 | +## INS_SNAP_SEND_VNODES + +提供当前正在进行 snapshot 发送的 vnode 的整体进度信息。当某 vgroup 的 leader 正在向 follower 传输 snapshot 时,对应行出现;传输完成后自动消失。 + +| # | **列名** | **数据类型** | **说明** | +| --- | :-----------------: | ------------- | --------------------------------------------------------------------- | +| 1 | vgroup_id | INT | vnode 所属 vgroup ID | +| 2 | dnode_id | INT | leader 所在 dnode ID | +| 3 | total_file_sets | INT | 本次发送需传输的 fileset 总数 | +| 4 | finished_file_sets | INT | 已完整传输完成的 fileset 数量 | +| 5 | start_time | TIMESTAMP | snapshot reader 打开时间 | +| 6 | elapsed | VARCHAR(16) | 持续时长,格式 `H:MM:SS` | + +## INS_SNAP_SEND_FILESETS + +提供当前活跃 snapshot 发送中每个 fileset(时间分片)的文件级传输进度。随所属 vnode 的 snapshot 发送完成后自动消失。 + +| # | **列名** | **数据类型** | **说明** | +| --- | :-------------------: | ------------- | ------------------------------------------------------------------------------------------------------------------ | +| 1 | vgroup_id | INT | vnode 所属 vgroup ID | +| 2 | fid | INT | fileset ID(时间分片 ID) | +| 3 | file_count | INT | 该 fileset 包含的物理文件总数(HEAD/DATA/SMA/STT 等累计) | +| 4 | finished_file_count | INT | 已完整传输的物理文件数 | +| 5 | total_size | BIGINT | 该 fileset 所有物理文件大小之和(bytes) | +| 6 | read_size | BIGINT | 已读取并发送的字节数(单调递增;RAW 模式与 total_size 同单位,ROW 模式为重压缩后字节,仅供趋势参考) | +| 7 | start_time | TIMESTAMP | 开始传输该 fileset 的时间 | +| 8 | elapsed | VARCHAR(16) | 该 fileset 已耗时,格式 `H:MM:SS` | +| 9 | start_index | BIGINT | 该 fileset 的起始 version(sver) | +| 10 | end_index | BIGINT | 该 fileset 的结束 version(ever) | +| 11 | transfer_type | VARCHAR(4) | 传输方式:`raw`(RAW 全量传输)或 `row`(ROW 增量传输) | + ## INS_VNODES 提供系统中 vnode 的相关信息。属性为 0 的用户不能查看此表。 diff --git a/include/common/systable.h b/include/common/systable.h index fe867c9ad01b..fd82258771cc 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -64,6 +64,8 @@ extern "C" { #define TSDB_INS_DISK_USAGE "ins_disk_usage" #define TSDB_INS_TABLE_FILESETS "ins_filesets" #define TSDB_INS_TABLE_TRANSACTION_DETAILS "ins_transaction_details" +#define TSDB_INS_TABLE_SNAP_SEND_VNODES "ins_snap_send_vnodes" +#define TSDB_INS_TABLE_SNAP_SEND_FILESETS "ins_snap_send_filesets" #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFS_TABLE_SMAS "perf_smas" diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 828485782c82..95c6d9397bc7 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -297,6 +297,7 @@ extern bool tsWalDeleteOnCorruption; extern bool tsDiskIDCheckEnabled; extern int32_t tsTransPullupInterval; extern int32_t tsCompactPullupInterval; +extern int32_t tsSnapSendPullupInterval; extern int32_t tsMqRebalanceInterval; extern int32_t tsStreamCheckpointInterval; extern int32_t tsThresholdItemsInWriteQueue; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5da52b4f6881..512d73cf5291 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -164,6 +164,8 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_USAGE, TSDB_MGMT_TABLE_FILESETS, TSDB_MGMT_TABLE_TRANSACTION_DETAIL, + TSDB_MGMT_TABLE_SNAP_SEND_VNODES, + TSDB_MGMT_TABLE_SNAP_SEND_FILESETS, TSDB_MGMT_TABLE_MAX, } EShowType; @@ -2030,6 +2032,7 @@ typedef struct { int64_t syncCommitIndex; int64_t bufferSegmentUsed; int64_t bufferSegmentSize; + int8_t snapshotSending; // 1 if this vnode (as leader) is actively sending a snapshot } SVnodeLoad; typedef struct { @@ -2326,6 +2329,39 @@ int32_t tSerializeSDnodeQueryCompactProgressRsp(void *buf, int32_t bufLen, SDnod int32_t tDeserializeSDnodeQueryCompactProgressRsp(void *buf, int32_t bufLen, SDnodeQueryCompactProgressRsp *pRsp); void tFreeSDnodeQueryCompactProgressRsp(SDnodeQueryCompactProgressRsp *pRsp); +// Snap send progress query (mnode → dnode, dnode → mnode RSP) +typedef struct { + int32_t fid; + int32_t fileCount; + int32_t finishedFileCount; + int64_t totalSize; + int64_t readSize; + int64_t startTime; // ms timestamp + int64_t sver; + int64_t ever; + int8_t transferType; // SNAP_DATA_TSDB(2) or SNAP_DATA_RAW(14) +} SSnapSendFileSetInfo; + +typedef struct { + int32_t vgId; + int32_t dnodeId; + int32_t totalFileSets; + int32_t finishedFileSets; + int64_t startTime; // ms timestamp of reader open + int32_t fileSetCount; // length of pFileSetInfos + SSnapSendFileSetInfo *pFileSetInfos; +} SSnapSendVnodeInfo; + +typedef struct { + int32_t dnodeId; + int32_t numOfVnodes; + SSnapSendVnodeInfo *pVnodeInfos; // array of numOfVnodes elements +} SDnodeQuerySnapSendProgressRsp; + +int32_t tSerializeSDnodeQuerySnapSendProgressRsp(void *buf, int32_t bufLen, SDnodeQuerySnapSendProgressRsp *pRsp); +int32_t tDeserializeSDnodeQuerySnapSendProgressRsp(void *buf, int32_t bufLen, SDnodeQuerySnapSendProgressRsp *pRsp); +void tFreeSDnodeQuerySnapSendProgressRsp(SDnodeQuerySnapSendProgressRsp *pRsp); + typedef struct { int32_t vgId; int32_t dnodeId; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index edf8d513b66a..ef4a31796286 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -138,6 +138,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_SET_VGROUP_KEEP_VERSION, "set-vgroup-keep-version", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_WAL, "trim-db-wal", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_QUERY_COMPACT_PROGRESS, "dnode-query-compact-progress", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_DND_QUERY_SNAP_SEND_PROGRESS, "dnode-query-snap-send-progress", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_DND_MSG) TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8 diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 2182f6d7fed7..3024e44998f8 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -1606,6 +1606,12 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->clusterCfg.statusIntervalMs)); + // Encode snapshotSending per vnode (appended for backward compat) + for (int32_t i = 0; i < vlen; ++i) { + SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pload->snapshotSending)); + } + tEndEncode(&encoder); _exit: @@ -1760,6 +1766,15 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (!tDecodeIsEnd(&decoder)) { TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->clusterCfg.statusIntervalMs)); } + + // Decode snapshotSending per vnode (backward compat guard) + if (!tDecodeIsEnd(&decoder)) { + for (int32_t i = 0; i < vlen; ++i) { + SVnodeLoad *pLoad = taosArrayGet(pReq->pVloads, i); + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pLoad->snapshotSending)); + } + } + tEndDecode(&decoder); _exit: @@ -8206,6 +8221,113 @@ void tFreeSDnodeQueryCompactProgressRsp(SDnodeQueryCompactProgressRsp *pRsp) { taosMemoryFreeClear(pRsp->vnodeProgress); } +// Snap send progress serialization +int32_t tSerializeSDnodeQuerySnapSendProgressRsp(void *buf, int32_t bufLen, SDnodeQuerySnapSendProgressRsp *pRsp) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->dnodeId)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->numOfVnodes)); + for (int32_t i = 0; i < pRsp->numOfVnodes; i++) { + SSnapSendVnodeInfo *pInfo = &pRsp->pVnodeInfos[i]; + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->vgId)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->dnodeId)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->totalFileSets)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->finishedFileSets)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pInfo->startTime)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->fileSetCount)); + for (int32_t j = 0; j < pInfo->fileSetCount; j++) { + SSnapSendFileSetInfo *pFs = &pInfo->pFileSetInfos[j]; + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pFs->fid)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pFs->fileCount)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pFs->finishedFileCount)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pFs->totalSize)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pFs->readSize)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pFs->startTime)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pFs->sver)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pFs->ever)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pFs->transferType)); + } + } + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSDnodeQuerySnapSendProgressRsp(void *buf, int32_t bufLen, SDnodeQuerySnapSendProgressRsp *pRsp) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pRsp->dnodeId)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pRsp->numOfVnodes)); + + if (pRsp->numOfVnodes > 0) { + pRsp->pVnodeInfos = (SSnapSendVnodeInfo *)taosMemoryCalloc(pRsp->numOfVnodes, sizeof(SSnapSendVnodeInfo)); + if (pRsp->pVnodeInfos == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + + for (int32_t i = 0; i < pRsp->numOfVnodes; i++) { + SSnapSendVnodeInfo *pInfo = &pRsp->pVnodeInfos[i]; + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->vgId)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->dnodeId)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->totalFileSets)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->finishedFileSets)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pInfo->startTime)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->fileSetCount)); + if (pInfo->fileSetCount > 0) { + pInfo->pFileSetInfos = (SSnapSendFileSetInfo *)taosMemoryCalloc(pInfo->fileSetCount, sizeof(SSnapSendFileSetInfo)); + if (pInfo->pFileSetInfos == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + for (int32_t j = 0; j < pInfo->fileSetCount; j++) { + SSnapSendFileSetInfo *pFs = &pInfo->pFileSetInfos[j]; + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pFs->fid)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pFs->fileCount)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pFs->finishedFileCount)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pFs->totalSize)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pFs->readSize)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pFs->startTime)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pFs->sver)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pFs->ever)); + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pFs->transferType)); + } + } + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + return code; +} + +void tFreeSDnodeQuerySnapSendProgressRsp(SDnodeQuerySnapSendProgressRsp *pRsp) { + if (pRsp == NULL) return; + if (pRsp->pVnodeInfos != NULL) { + for (int32_t i = 0; i < pRsp->numOfVnodes; i++) { + taosMemoryFreeClear(pRsp->pVnodeInfos[i].pFileSetInfos); + } + taosMemoryFreeClear(pRsp->pVnodeInfos); + } +} + int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; diff --git a/source/common/src/systable.c b/source/common/src/systable.c index fc07a8768e8f..7f41ac3d3e57 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -435,6 +435,29 @@ static const SSysDbTableSchema userTransactionDetailSchema[] = { {.name = "detail", .bytes = TSDB_TRANS_DETAIL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; +static const SSysDbTableSchema snapSendVnodesSchema[] = { + {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "total_file_sets", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "finished_file_sets", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "elapsed", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, +}; + +static const SSysDbTableSchema snapSendFilesetsSchema[] = { + {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "fid", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "file_count", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "finished_file_count", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "total_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "read_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "elapsed", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "start_index", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "end_index", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "transfer_type", .bytes = 4 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, +}; + static const SSysDbTableSchema anodesSchema[] = { {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "url", .bytes = TSDB_ANALYTIC_ANODE_URL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, @@ -555,6 +578,8 @@ static const SSysTableMeta infosMeta[] = { {TSDB_INS_DISK_USAGE, diskUsageSchema, tListLen(diskUsageSchema), false}, {TSDB_INS_TABLE_FILESETS, filesetsFullSchema, tListLen(filesetsFullSchema), false}, {TSDB_INS_TABLE_TRANSACTION_DETAILS, userTransactionDetailSchema, tListLen(userTransactionDetailSchema), false}, + {TSDB_INS_TABLE_SNAP_SEND_VNODES, snapSendVnodesSchema, tListLen(snapSendVnodesSchema), false}, + {TSDB_INS_TABLE_SNAP_SEND_FILESETS, snapSendFilesetsSchema, tListLen(snapSendFilesetsSchema), false}, }; static const SSysDbTableSchema connectionsSchema[] = { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8f9a625479a0..b797e305c517 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -369,6 +369,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch bool tsDiskIDCheckEnabled = false; int32_t tsTransPullupInterval = 2; int32_t tsCompactPullupInterval = 10; +int32_t tsSnapSendPullupInterval = 10; int32_t tsMqRebalanceInterval = 2; int32_t tsStreamCheckpointInterval = 300; float tsSinkDataRate = 2.0; @@ -1033,6 +1034,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddBool(pCfg, "diskIDCheckEnabled", tsDiskIDCheckEnabled, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "snapSendPullupInterval", tsSnapSendPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); @@ -1912,6 +1914,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "compactPullupInterval"); tsCompactPullupInterval = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "snapSendPullupInterval"); + tsSnapSendPullupInterval = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "mqRebalanceInterval"); tsMqRebalanceInterval = pItem->i32; @@ -2923,6 +2928,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"tmqRowSize", &tmqRowSize}, {"transPullupInterval", &tsTransPullupInterval}, {"compactPullupInterval", &tsCompactPullupInterval}, + {"snapSendPullupInterval", &tsSnapSendPullupInterval}, {"trimVDbIntervalSec", &tsTrimVDbIntervalSec}, {"ttlBatchDropNum", &tsTtlBatchDropNum}, {"ttlFlushThreshold", &tsTtlFlushThreshold}, diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index b8bae9077add..73fac81dd8d2 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -238,6 +238,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_CLUSTER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_QUERY_COMPACT_PROGRESS_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_QUERY_SNAP_SEND_PROGRESS_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ENCRYPT_KEY, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 5910b60379af..8017ae5f1cfa 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -22,6 +22,8 @@ extern taos_counter_t *tsInsertCounter; // Implemented in enterprise/src/plugins/vnode/src/vnodeCompact.c extern int32_t vnodeGetCompactProgress(SVnode *pVnode, int32_t compactId, SQueryCompactProgressRsp *pRsp); #endif +// Implemented in community/source/dnode/vnode/src/vnd/vnodeSnapshot.c +extern int32_t vnodeGetSnapSendProgress(SVnode *pVnode, int32_t dnodeId, SSnapSendVnodeInfo *pInfo); void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); @@ -1011,6 +1013,98 @@ int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return terrno; } +int32_t vmProcessDnodeQuerySnapSendProgressReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + int32_t code = 0; + void *pRsp = NULL; + SVnodeObj **ppVnodes = NULL; + int32_t numOfVnodes = 0; + SArray *pInfoArray = taosArrayInit(16, sizeof(SSnapSendVnodeInfo)); + + if (pInfoArray == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes); + if (code != 0) { + dError("dnode:%d, failed to get vnode list for snap-send-progress, code:%s", + pMgmt->pData->dnodeId, tstrerror(code)); + goto _exit; + } + + for (int32_t i = 0; i < numOfVnodes; i++) { + SVnodeObj *pVnode = ppVnodes[i]; + if (pVnode == NULL || pVnode->failed || pVnode->pImpl == NULL) { + if (pVnode) vmReleaseVnode(pMgmt, pVnode); + continue; + } + + SSnapSendVnodeInfo info = {0}; + if (vnodeGetSnapSendProgress(pVnode->pImpl, pMgmt->pData->dnodeId, &info) == 0) { + if (taosArrayPush(pInfoArray, &info) == NULL) { + taosMemoryFree(info.pFileSetInfos); + dError("dnode:%d, vgId:%d, failed to push snap-send-progress info (OOM)", + pMgmt->pData->dnodeId, pVnode->vgId); + vmReleaseVnode(pMgmt, pVnode); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + + vmReleaseVnode(pMgmt, pVnode); + } + taosMemoryFree(ppVnodes); + ppVnodes = NULL; + + // Build response. NOTE: rsp.pVnodeInfos aliases pInfoArray's internal buffer — + // it must NOT be freed via tFreeSDnodeQuerySnapSendProgressRsp; cleanup is in _exit. + SDnodeQuerySnapSendProgressRsp rsp = {0}; + rsp.dnodeId = pMgmt->pData->dnodeId; + rsp.numOfVnodes = (int32_t)taosArrayGetSize(pInfoArray); + rsp.pVnodeInfos = (rsp.numOfVnodes > 0) + ? (SSnapSendVnodeInfo *)taosArrayGet(pInfoArray, 0) + : NULL; + + dInfo("dnode:%d, send snap-send-progress rsp, numOfVnodes:%d", + rsp.dnodeId, rsp.numOfVnodes); + + int32_t rspLen = tSerializeSDnodeQuerySnapSendProgressRsp(NULL, 0, &rsp); + if (rspLen < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + if (tSerializeSDnodeQuerySnapSendProgressRsp(pRsp, rspLen, &rsp) < 0) { + code = TSDB_CODE_INVALID_MSG; + rpcFreeCont(pRsp); + pRsp = NULL; + goto _exit; + } + + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; + +_exit: + // pInfoArray owns both the SSnapSendVnodeInfo structs and the pFileSetInfos blocks. + // Free pFileSetInfos manually; do NOT call tFreeSDnodeQuerySnapSendProgressRsp + // (it would free the alias rsp.pVnodeInfos which is pInfoArray's internal buffer). + if (pInfoArray != NULL) { + for (int32_t k = 0; k < (int32_t)taosArrayGetSize(pInfoArray); k++) { + SSnapSendVnodeInfo *p = taosArrayGet(pInfoArray, k); + taosMemoryFree(p->pFileSetInfos); + } + taosArrayDestroy(pInfoArray); + } + taosMemoryFree(ppVnodes); + return code; +} + int32_t vmProcessDnodeQueryCompactProgressReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t code = 0; SDnodeQueryCompactProgressReq req = {0}; @@ -1227,6 +1321,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_ARB_CHECK_SYNC, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_ASSIGNED_LEADER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_QUERY_COMPACT_PROGRESS, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_QUERY_SNAP_SEND_PROGRESS, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index d402f66261ed..5c5f3c588edd 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -18,6 +18,7 @@ #include "vnodeInt.h" extern int32_t vmProcessDnodeQueryCompactProgressReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +extern int32_t vmProcessDnodeQuerySnapSendProgressReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) { if (pMsg->info.handle == NULL) return; @@ -92,6 +93,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { case TDMT_DND_QUERY_COMPACT_PROGRESS: code = vmProcessDnodeQueryCompactProgressReq(pMgmt, pMsg); break; + case TDMT_DND_QUERY_SNAP_SEND_PROGRESS: + code = vmProcessDnodeQuerySnapSendProgressReq(pMgmt, pMsg); + break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; dGError("msg:%p, not processed in vnode-mgmt queue", pMsg); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 38c84e7845ff..50fa5318ba98 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -520,6 +520,7 @@ typedef struct { int64_t compStorage; int64_t pointsWritten; int8_t compact; + int8_t snapRestoring; // 1=snapshot send in progress (leader side), 0=idle; ephemeral int8_t isTsma; int8_t replica; SVnodeGid vnodeGid[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA]; diff --git a/source/dnode/mnode/impl/inc/mndSnapSend.h b/source/dnode/mnode/impl/inc/mndSnapSend.h new file mode 100644 index 000000000000..9588ecaad6b7 --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndSnapSend.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_SNAP_SEND_H_ +#define _TD_MND_SNAP_SEND_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitSnapSend(SMnode *pMnode); +void mndCleanupSnapSend(SMnode *pMnode); + +/* Called by mndDoTimerPullupTask every tsSnapSendPullupInterval seconds */ +void mndSnapSendPullup(SMnode *pMnode); + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_MND_SNAP_SEND_H_ */ diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 8bdbfe8feb54..797c7505a2a4 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -934,6 +934,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pVgroup->totalStorage = pVload->totalStorage; pVgroup->compStorage = pVload->compStorage; pVgroup->pointsWritten = pVload->pointsWritten; + // Track snapshot send state (ephemeral, in-memory only — cleared on decode) + pVgroup->snapRestoring = pVload->snapshotSending; } bool stateChanged = false; for (int32_t vg = 0; vg < pVgroup->replica; ++vg) { diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 8a8442947e5f..32f29e1f79e1 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -20,6 +20,7 @@ #include "mndCluster.h" #include "mndCompact.h" #include "mndCompactDetail.h" +#include "mndSnapSend.h" #include "mndConfig.h" #include "mndConsumer.h" #include "mndDb.h" @@ -430,6 +431,11 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) { if (sec % tsCompactPullupInterval == 0) { mndPullupCompacts(pMnode); } + + if (sec % tsSnapSendPullupInterval == 0) { + mndSnapSendPullup(pMnode); + } + #ifdef USE_TOPIC if (sec % tsMqRebalanceInterval == 0) { mndCalMqRebalance(pMnode); @@ -729,6 +735,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-view", mndInitView, mndCleanupView)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-compact", mndInitCompact, mndCleanupCompact)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-compact-detail", mndInitCompactDetail, mndCleanupCompactDetail)); + TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-snap-send", mndInitSnapSend, mndCleanupSnapSend)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile)); TAOS_CHECK_RETURN(mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow)); diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index f19eabd885c5..366c02af04b8 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -132,6 +132,10 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { type = TSDB_MGMT_TABLE_COMPACT; } else if (strncasecmp(name, TSDB_INS_TABLE_COMPACT_DETAILS, len) == 0) { type = TSDB_MGMT_TABLE_COMPACT_DETAIL; + } else if (strncasecmp(name, TSDB_INS_TABLE_SNAP_SEND_VNODES, len) == 0) { + type = TSDB_MGMT_TABLE_SNAP_SEND_VNODES; + } else if (strncasecmp(name, TSDB_INS_TABLE_SNAP_SEND_FILESETS, len) == 0) { + type = TSDB_MGMT_TABLE_SNAP_SEND_FILESETS; } else if (strncasecmp(name, TSDB_INS_TABLE_TRANSACTION_DETAILS, len) == 0) { type = TSDB_MGMT_TABLE_TRANSACTION_DETAIL; } else if (strncasecmp(name, TSDB_INS_TABLE_GRANTS_FULL, len) == 0) { diff --git a/source/dnode/mnode/impl/src/mndSnapSend.c b/source/dnode/mnode/impl/src/mndSnapSend.c new file mode 100644 index 000000000000..da2dde4d9e2e --- /dev/null +++ b/source/dnode/mnode/impl/src/mndSnapSend.c @@ -0,0 +1,462 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +/* + * mndSnapSend.c — mnode side of the snapshot-send progress system tables: + * ins_snap_send_vnodes (one row per vnode currently sending a snapshot) + * ins_snap_send_filesets (one row per fileset of an active snapshot send) + * + * Design overview: + * 1. mndSnapSendPullup() is called every tsSnapSendPullupInterval seconds from + * mndDoTimerPullupTask(). It scans all SVgObj in SDB, finds those whose + * snapRestoring==1 (set by the dnode heartbeat), and sends + * TDMT_DND_QUERY_SNAP_SEND_PROGRESS to the leader dnode. + * 2. mndProcessDnodeSnapSendProgressRsp() receives the response and updates + * the in-memory hash pSnapSendHash (key=vgId, value=SSnapSendVnodeInfo). + * 3. mndRetrieveSnapSendVnodes / mndRetrieveSnapSendFilesets iterate the hash + * and fill the result block for SQL queries. + * 4. When snapRestoring turns 0, the entry is removed from the hash on the + * next pullup cycle. + */ + +#include "mndDnode.h" +#include "mndShow.h" +#include "mndSnapSend.h" +#include "mndVgroup.h" +#include "systable.h" +#include "tmisce.h" +#include "tmsgcb.h" + +/* transferType constant — matches SNAP_DATA_RAW in vnodeInt.h */ +#define SNAP_TRANSFER_TYPE_RAW 14 + +/* ==================================================================== + * Module-level state (singleton, protected by snapSendMutex) + * ==================================================================== */ + +typedef struct { + SHashObj *pHash; /* key: int32_t vgId → value: SSnapSendVnodeInfo (deep copy) */ + TdThreadMutex mutex; +} SSnapSendMgmt; + +static SSnapSendMgmt gSnapSendMgmt = {0}; + +/* ==================================================================== + * Helper: elapsed string "HH:MM:SS" + * ==================================================================== */ +static void snapSendFmtElapsed(int64_t startTimeMs, char *buf, int32_t bufLen) { + if (startTimeMs <= 0) { + tsnprintf(buf, bufLen, "0:00:00"); + return; + } + int64_t elapsedSec = (taosGetTimestampMs() - startTimeMs) / 1000; + if (elapsedSec < 0) elapsedSec = 0; + int64_t h = elapsedSec / 3600; + int64_t m = (elapsedSec % 3600) / 60; + int64_t s = elapsedSec % 60; + tsnprintf(buf, bufLen, "%" PRId64 ":%02" PRId64 ":%02" PRId64, h, m, s); +} + +/* ==================================================================== + * Helper: free a deep-copied SSnapSendVnodeInfo + * ==================================================================== */ +static void snapSendFreeVnodeInfo(SSnapSendVnodeInfo *pInfo) { + if (pInfo) { + taosMemoryFree(pInfo->pFileSetInfos); + pInfo->pFileSetInfos = NULL; + } +} + +/* ==================================================================== + * RSP handler: TDMT_DND_QUERY_SNAP_SEND_PROGRESS_RSP + * ==================================================================== */ +static int32_t mndProcessDnodeSnapSendProgressRsp(SRpcMsg *pReq) { + int32_t code = 0; + SDnodeQuerySnapSendProgressRsp rsp = {0}; + + if (pReq->code != 0) { + mDebug("snap-send-progress rsp from dnode with error: %s", tstrerror(pReq->code)); + TAOS_RETURN(0); // non-fatal: ignore, will retry next cycle + } + + code = tDeserializeSDnodeQuerySnapSendProgressRsp(pReq->pCont, pReq->contLen, &rsp); + if (code != 0) { + mError("failed to deserialize snap-send-progress rsp, code:%s", tstrerror(code)); + TAOS_RETURN(code); + } + + (void)taosThreadMutexLock(&gSnapSendMgmt.mutex); + + for (int32_t i = 0; i < rsp.numOfVnodes; i++) { + SSnapSendVnodeInfo *pSrc = &rsp.pVnodeInfos[i]; + + // Deep-copy pFileSetInfos + SSnapSendFileSetInfo *pFsCopy = NULL; + if (pSrc->fileSetCount > 0 && pSrc->pFileSetInfos != NULL) { + pFsCopy = taosMemoryMalloc(pSrc->fileSetCount * sizeof(SSnapSendFileSetInfo)); + if (pFsCopy == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + memcpy(pFsCopy, pSrc->pFileSetInfos, pSrc->fileSetCount * sizeof(SSnapSendFileSetInfo)); + } + + SSnapSendVnodeInfo copy = *pSrc; + copy.pFileSetInfos = pFsCopy; + + // Save old pFileSetInfos pointer so we can free it AFTER a successful put. + // Do NOT call snapSendFreeVnodeInfo before taosHashPut: if the put fails, + // it would leave an entry with fileSetCount>0 but pFileSetInfos=NULL in the + // hash, causing a NULL-dereference in mndRetrieveSnapSendFilesets. + SSnapSendVnodeInfo *pOld = taosHashGet(gSnapSendMgmt.pHash, &pSrc->vgId, sizeof(int32_t)); + SSnapSendFileSetInfo *pOldFs = (pOld != NULL) ? pOld->pFileSetInfos : NULL; + + // Upsert — only free old memory after a successful put + if (taosHashPut(gSnapSendMgmt.pHash, &pSrc->vgId, sizeof(int32_t), ©, sizeof(copy)) != 0) { + taosMemoryFree(pFsCopy); + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + taosMemoryFree(pOldFs); + } + + (void)taosThreadMutexUnlock(&gSnapSendMgmt.mutex); + + tFreeSDnodeQuerySnapSendProgressRsp(&rsp); + TAOS_RETURN(code); +} + +/* ==================================================================== + * Pullup: scan SDB for active snapshot senders, query each leader dnode + * ==================================================================== */ +void mndSnapSendPullup(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + + mDebug("snap-send-progress pullup started"); + + // Build a set of vgIds that are still snapRestoring + SHashObj *pActiveVgIds = + taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (pActiveVgIds == NULL) return; + + // Track which dnodes need a query (key=dnodeId of leader) + // We send one request per leader-dnode that has >=1 snapshotSending vnode. + SHashObj *pDnodesToQuery = + taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (pDnodesToQuery == NULL) { + taosHashCleanup(pActiveVgIds); + return; + } + + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (pVgroup->snapRestoring) { + mDebug("snap-send-progress pullup: found snapRestoring vgroup vgId:%d", pVgroup->vgId); + + if (taosHashPut(pActiveVgIds, &pVgroup->vgId, sizeof(int32_t), &pVgroup->vgId, sizeof(int32_t)) != 0) { + // Must skip: without this entry in pActiveVgIds the cleanup pass would + // incorrectly treat the vgroup as stale and evict its progress cache entry. + mError("snap-send-progress pullup: failed to track active vgId:%d, skipping", pVgroup->vgId); + sdbRelease(pSdb, pVgroup); + continue; + } + + // Find the current leader dnode for this vgroup + for (int8_t r = 0; r < pVgroup->replica; r++) { + if (pVgroup->vnodeGid[r].syncState == TAOS_SYNC_STATE_LEADER) { + int32_t leaderId = pVgroup->vnodeGid[r].dnodeId; + if (taosHashPut(pDnodesToQuery, &leaderId, sizeof(int32_t), &leaderId, sizeof(int32_t)) != 0) { + mError("snap-send-progress pullup: failed to track dnode:%d", leaderId); + } + break; + } + } + } + + sdbRelease(pSdb, pVgroup); + } + + // Remove stale hash entries (vgIds that are no longer snapRestoring). + // Collect keys first — modifying the hash during taosHashIterate corrupts the walk. + SArray *pStaleVgIds = taosArrayInit(8, sizeof(int32_t)); + + (void)taosThreadMutexLock(&gSnapSendMgmt.mutex); + + if (pStaleVgIds != NULL) { + void *pHashIter = taosHashIterate(gSnapSendMgmt.pHash, NULL); + while (pHashIter != NULL) { + SSnapSendVnodeInfo *pInfo = (SSnapSendVnodeInfo *)pHashIter; + if (taosHashGet(pActiveVgIds, &pInfo->vgId, sizeof(int32_t)) == NULL) { + if (taosArrayPush(pStaleVgIds, &pInfo->vgId) == NULL) { + mError("snap-send-progress pullup: failed to collect stale vgId:%d", pInfo->vgId); + } + } + pHashIter = taosHashIterate(gSnapSendMgmt.pHash, pHashIter); + } + + for (int32_t k = 0; k < (int32_t)taosArrayGetSize(pStaleVgIds); k++) { + int32_t *pVgId = taosArrayGet(pStaleVgIds, k); + SSnapSendVnodeInfo *pOld = taosHashGet(gSnapSendMgmt.pHash, pVgId, sizeof(int32_t)); + if (pOld != NULL) snapSendFreeVnodeInfo(pOld); + taosHashRemove(gSnapSendMgmt.pHash, pVgId, sizeof(int32_t)); + } + taosArrayDestroy(pStaleVgIds); + } + + (void)taosThreadMutexUnlock(&gSnapSendMgmt.mutex); + + taosHashCleanup(pActiveVgIds); + + // Send TDMT_DND_QUERY_SNAP_SEND_PROGRESS to each leader dnode + void *pDnodeIter = taosHashIterate(pDnodesToQuery, NULL); + while (pDnodeIter != NULL) { + int32_t dnodeId = *(int32_t *)pDnodeIter; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId); + if (pDnode == NULL) { + pDnodeIter = taosHashIterate(pDnodesToQuery, pDnodeIter); + continue; + } + + // Empty request body (no fields needed — dnode returns all vnode stats) + int32_t contLen = sizeof(SMsgHead); + SMsgHead *pHead = rpcMallocCont(contLen); + if (pHead != NULL) { + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(0); + + SEpSet epSet = mndGetDnodeEpset(pDnode); + SRpcMsg rpcMsg = { + .msgType = TDMT_DND_QUERY_SNAP_SEND_PROGRESS, + .pCont = pHead, + .contLen = contLen, + }; + + mDebug("snap-send-progress: send progress query to dnode:%d", dnodeId); + if (tmsgSendReq(&epSet, &rpcMsg) < 0) { + mError("snap-send-progress: failed to send to dnode:%d", dnodeId); + } + } + + mndReleaseDnode(pMnode, pDnode); + pDnodeIter = taosHashIterate(pDnodesToQuery, pDnodeIter); + } + + taosHashCleanup(pDnodesToQuery); +} + +/* ==================================================================== + * Retrieve: ins_snap_send_vnodes + * ==================================================================== */ +int32_t mndRetrieveSnapSendVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + int32_t numOfRows = 0; + int32_t code = 0; + + (void)taosThreadMutexLock(&gSnapSendMgmt.mutex); + + void *pIter = taosHashIterate(gSnapSendMgmt.pHash, NULL); + while (pIter != NULL && numOfRows < rows) { + SSnapSendVnodeInfo *pInfo = (SSnapSendVnodeInfo *)pIter; + SColumnInfoData *pColInfo; + int32_t cols = 0; + char elapsedBuf[32]; + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pInfo->vgId, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; break; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pInfo->dnodeId, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; break; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pInfo->totalFileSets, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; break; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pInfo->finishedFileSets, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; break; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pInfo->startTime, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; break; + } + + // elapsed (VARCHAR "HH:MM:SS") + snapSendFmtElapsed(pInfo->startTime, elapsedBuf, sizeof(elapsedBuf)); + char varElapsed[32 + VARSTR_HEADER_SIZE]; + STR_TO_VARSTR(varElapsed, elapsedBuf); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, varElapsed, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; break; + } + + numOfRows++; + pIter = taosHashIterate(gSnapSendMgmt.pHash, pIter); + } + + (void)taosThreadMutexUnlock(&gSnapSendMgmt.mutex); + + pShow->numOfRows += numOfRows; + if (code != 0) mError("snap-send-progress: retrieve vnodes failed, code:%s", tstrerror(code)); + return numOfRows; +} + +/* ==================================================================== + * Retrieve: ins_snap_send_filesets + * ==================================================================== */ +int32_t mndRetrieveSnapSendFilesets(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + int32_t numOfRows = 0; + int32_t code = 0; + + (void)taosThreadMutexLock(&gSnapSendMgmt.mutex); + + void *pIter = taosHashIterate(gSnapSendMgmt.pHash, NULL); + while (pIter != NULL && numOfRows < rows) { + SSnapSendVnodeInfo *pInfo = (SSnapSendVnodeInfo *)pIter; + + for (int32_t fi = 0; fi < pInfo->fileSetCount && numOfRows < rows; fi++) { + SSnapSendFileSetInfo *pFs = &pInfo->pFileSetInfos[fi]; + SColumnInfoData *pColInfo; + int32_t cols = 0; + char elapsedBuf[32]; + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pInfo->vgId, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _done; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pFs->fid, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _done; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pFs->fileCount, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _done; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pFs->finishedFileCount, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _done; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pFs->totalSize, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _done; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pFs->readSize, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _done; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pFs->startTime, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _done; + } + + // elapsed + snapSendFmtElapsed(pFs->startTime, elapsedBuf, sizeof(elapsedBuf)); + char varElapsed[32 + VARSTR_HEADER_SIZE]; + STR_TO_VARSTR(varElapsed, elapsedBuf); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, varElapsed, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _done; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pFs->sver, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _done; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, (const char *)&pFs->ever, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _done; + } + + // transfer_type: "raw" or "row" + const char *typeStr = (pFs->transferType == SNAP_TRANSFER_TYPE_RAW) ? "raw" : "row"; + char varType[8 + VARSTR_HEADER_SIZE]; + STR_TO_VARSTR(varType, typeStr); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (colDataSetVal(pColInfo, numOfRows, varType, false) != 0) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _done; + } + + numOfRows++; + } + + pIter = taosHashIterate(gSnapSendMgmt.pHash, pIter); + } + +_done: + (void)taosThreadMutexUnlock(&gSnapSendMgmt.mutex); + pShow->numOfRows += numOfRows; + if (code != 0) mError("snap-send-progress: retrieve filesets failed, code:%s", tstrerror(code)); + return numOfRows; +} + +/* ==================================================================== + * Init / Cleanup + * ==================================================================== */ +int32_t mndInitSnapSend(SMnode *pMnode) { + gSnapSendMgmt.pHash = taosHashInit( + 64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (gSnapSendMgmt.pHash == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + if (taosThreadMutexInit(&gSnapSendMgmt.mutex, NULL) != 0) { + taosHashCleanup(gSnapSendMgmt.pHash); + gSnapSendMgmt.pHash = NULL; + return TSDB_CODE_FAILED; + } + + mndSetMsgHandle(pMnode, TDMT_DND_QUERY_SNAP_SEND_PROGRESS_RSP, + mndProcessDnodeSnapSendProgressRsp); + + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SNAP_SEND_VNODES, + mndRetrieveSnapSendVnodes); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SNAP_SEND_FILESETS, + mndRetrieveSnapSendFilesets); + + mDebug("mnd snap-send progress module initialized"); + return 0; +} + +void mndCleanupSnapSend(SMnode *pMnode) { + if (gSnapSendMgmt.pHash == NULL) return; + + (void)taosThreadMutexLock(&gSnapSendMgmt.mutex); + void *pIter = taosHashIterate(gSnapSendMgmt.pHash, NULL); + while (pIter != NULL) { + SSnapSendVnodeInfo *pInfo = (SSnapSendVnodeInfo *)pIter; + snapSendFreeVnodeInfo(pInfo); + pIter = taosHashIterate(gSnapSendMgmt.pHash, pIter); + } + taosHashCleanup(gSnapSendMgmt.pHash); + gSnapSendMgmt.pHash = NULL; + (void)taosThreadMutexUnlock(&gSnapSendMgmt.mutex); + (void)taosThreadMutexDestroy(&gSnapSendMgmt.mutex); + + mDebug("mnd snap-send progress module cleaned up"); +} diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 1a6aaaa5f15d..70b558547b82 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -29,9 +29,10 @@ #include "mndVgroup.h" #include "tmisce.h" -#define VGROUP_VER_COMPAT_MOUNT_KEEP_VER 2 -#define VGROUP_VER_NUMBER VGROUP_VER_COMPAT_MOUNT_KEEP_VER -#define VGROUP_RESERVE_SIZE 60 +#define VGROUP_VER_COMPAT_MOUNT_KEEP_VER 2 +#define VGROUP_VER_COMPAT_SNAP_RESTORING 3 +#define VGROUP_VER_NUMBER VGROUP_VER_COMPAT_SNAP_RESTORING +#define VGROUP_RESERVE_SIZE 60 // since 3.3.6.32/3.3.8.6 mountId + keepVersion + keepVersionTime + VGROUP_RESERVE_SIZE = 8 + 8 + 64 = 80 #define DLEN_AFTER_SYNC_CONF_CHANGE_VER 80 @@ -121,6 +122,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { SDB_SET_INT32(pRaw, dataPos, pVgroup->mountVgId, _OVER) SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersion, _OVER) SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersionTime, _OVER) + SDB_SET_INT8(pRaw, dataPos, pVgroup->snapRestoring, _OVER) SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) @@ -190,6 +192,11 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) { SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersionTime, _OVER) } + if (dataPos + sizeof(int8_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) { + SDB_GET_INT8(pRaw, dataPos, &pVgroup->snapRestoring, _OVER) + // Always clear on decode: ephemeral state, must not survive mnode restart + pVgroup->snapRestoring = 0; + } if (dataPos + VGROUP_RESERVE_SIZE <= pRaw->dataLen) { SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER) } @@ -288,6 +295,7 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) { pNew->compStorage = pOld->compStorage; pNew->pointsWritten = pOld->pointsWritten; pNew->compact = pOld->compact; + pNew->snapRestoring = pOld->snapRestoring; // ephemeral: preserve in-memory value across SDB update memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid)); pOld->syncConfChangeVer = pNew->syncConfChangeVer; tstrncpy(pOld->dbName, pNew->dbName, TSDB_DB_FNAME_LEN); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 296c0ad63cdc..cb4c108d4c60 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -360,6 +360,26 @@ typedef struct { typedef struct SCompMonitor SCompMonitor; +// Snapshot send progress tracking structs (internal tsdb layer) +typedef struct { + int32_t fid; + int32_t fileCount; + int32_t finishedFileCount; + int64_t totalSize; + int64_t readSize; + int64_t startTime; // ms timestamp, set at RangeBegin/RAWReadBegin + int64_t sver; + int64_t ever; + int8_t transferType; // SNAP_DATA_TSDB(2) or SNAP_DATA_RAW(14) +} SSnapSendFileSetStat; + +typedef struct { + int32_t totalFileSets; + int32_t finishedFileSets; + int64_t startTime; // ms timestamp, set at ReaderOpen + SSnapSendFileSetStat *pFileSetStats; // array[totalFileSets], allocated once at open +} SSnapSendVnodeStat; + struct STsdb { char *path; SVnode *pVnode; @@ -380,6 +400,8 @@ struct STsdb { struct STFileSystem *pFS; // new SRocksCache rCache; SCompMonitor *pCompMonitor; + SSnapSendVnodeStat *pSnapStat; // NULL when no active snapshot send + TdThreadRwlock snapStatLock; // protects pSnapStat (readers: mnode query; writer: snapshot reader) struct { SVHashTable *ht; SArray *arr; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 457c15bd7e87..a08e34a35396 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -345,6 +345,8 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader); void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader); int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData); +// Snapshot send progress +int32_t vnodeGetSnapSendProgress(SVnode *pVnode, int32_t dnodeId, SSnapSendVnodeInfo *pInfo); // STsdbSnapRAWWriter ======================================== int32_t tsdbSnapRAWWriterOpen(STsdb* pTsdb, int64_t ever, STsdbSnapRAWWriter** ppWriter); int32_t tsdbSnapRAWWrite(STsdbSnapRAWWriter* pWriter, SSnapDataHdr* pHdr); diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 23113621abfd..126bbecd70eb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -58,6 +58,7 @@ int32_t tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg * // taosRealPath(pTsdb->path, NULL, slen); pTsdb->pVnode = pVnode; (void)taosThreadMutexInit(&pTsdb->mutex, NULL); + (void)taosThreadRwlockInit(&pTsdb->snapStatLock, NULL); if (!pKeepCfg) { tsdbSetKeepCfg(pTsdb, &pVnode->config.tsdbCfg); } else { @@ -93,6 +94,7 @@ int32_t tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg * tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, lino, tstrerror(code)); tsdbCloseFS(&pTsdb->pFS); (void)taosThreadMutexDestroy(&pTsdb->mutex); + (void)taosThreadRwlockDestroy(&pTsdb->snapStatLock); taosMemoryFree(pTsdb); } else { tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d, keepTimeoffset:%d", TD_VID(pVnode), pTsdb->path, @@ -119,7 +121,15 @@ void tsdbClose(STsdb **pTsdb) { #ifdef TD_ENTERPRISE tsdbCloseCompMonitor(*pTsdb); #endif + (void)taosThreadRwlockWrlock(&(*pTsdb)->snapStatLock); + if ((*pTsdb)->pSnapStat != NULL) { + taosMemoryFree((*pTsdb)->pSnapStat->pFileSetStats); + taosMemoryFree((*pTsdb)->pSnapStat); + (*pTsdb)->pSnapStat = NULL; + } + (void)taosThreadRwlockUnlock(&(*pTsdb)->snapStatLock); (void)taosThreadMutexDestroy(&(*pTsdb)->mutex); + (void)taosThreadRwlockDestroy(&(*pTsdb)->snapStatLock); taosMemoryFreeClear(*pTsdb); } return; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index b356beee453b..869c27b0882e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -207,6 +207,17 @@ static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) { reader->ctx->isDataDone = false; reader->ctx->isTombDone = false; + // record fileset start time + STsdb* tsdb = reader->tsdb; + (void)taosThreadRwlockWrlock(&tsdb->snapStatLock); + if (tsdb->pSnapStat != NULL) { + int32_t idx = reader->ctx->fsrArrIdx - 1; + if (idx >= 0 && idx < tsdb->pSnapStat->totalFileSets) { + tsdb->pSnapStat->pFileSetStats[idx].startTime = taosGetTimestampMs(); + } + } + (void)taosThreadRwlockUnlock(&tsdb->snapStatLock); + code = tsdbSnapReadFileSetOpenReader(reader); TSDB_CHECK_CODE(code, lino, _exit); @@ -225,6 +236,20 @@ static int32_t tsdbSnapReadRangeEnd(STsdbSnapReader* reader) { tsdbSnapReadFileSetCloseIter(reader); tsdbSnapReadFileSetCloseReader(reader); reader->ctx->fsr = NULL; + + // mark fileset finished + STsdb* tsdb = reader->tsdb; + (void)taosThreadRwlockWrlock(&tsdb->snapStatLock); + if (tsdb->pSnapStat != NULL) { + int32_t idx = reader->ctx->fsrArrIdx - 1; + tsdb->pSnapStat->finishedFileSets++; + if (idx >= 0 && idx < tsdb->pSnapStat->totalFileSets) { + SSnapSendFileSetStat* pFs = &tsdb->pSnapStat->pFileSetStats[idx]; + pFs->finishedFileCount = pFs->fileCount; + } + } + (void)taosThreadRwlockUnlock(&tsdb->snapStatLock); + return 0; } @@ -456,6 +481,53 @@ int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, &reader[0]->fsrArr); TSDB_CHECK_CODE(code, lino, _exit); + // build snap send progress stat + { + int32_t n = (int32_t)TARRAY2_SIZE(reader[0]->fsrArr); + SSnapSendVnodeStat* pStat = (SSnapSendVnodeStat*)taosMemoryCalloc(1, sizeof(*pStat)); + if (pStat == NULL) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + pStat->totalFileSets = n; + pStat->startTime = taosGetTimestampMs(); + if (n > 0) { + pStat->pFileSetStats = (SSnapSendFileSetStat*)taosMemoryCalloc(n, sizeof(SSnapSendFileSetStat)); + if (pStat->pFileSetStats == NULL) { + taosMemoryFree(pStat); + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + for (int32_t i = 0; i < n; i++) { + STFileSetRange* fsr = TARRAY2_GET(reader[0]->fsrArr, i); + SSnapSendFileSetStat* s = &pStat->pFileSetStats[i]; + s->fid = fsr->fid; + s->sver = fsr->sver; + s->ever = fsr->ever; + s->transferType = type; + // sum up data file sizes + for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) { + if (fsr->fset->farr[ftype] != NULL) { + s->totalSize += fsr->fset->farr[ftype]->f[0].size; + s->fileCount++; + } + } + // sum up stt file sizes + SSttLvl* lvl; + TARRAY2_FOREACH(fsr->fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + s->totalSize += fobj->f[0].size; + s->fileCount++; + } + } + } + } + (void)taosThreadRwlockWrlock(&tsdb->snapStatLock); + tsdb->pSnapStat = pStat; + (void)taosThreadRwlockUnlock(&tsdb->snapStatLock); + } + _exit: if (code) { tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), @@ -499,6 +571,15 @@ void tsdbSnapReaderClose(STsdbSnapReader** reader) { taosMemoryFree(reader[0]); reader[0] = NULL; + // clear progress stat + (void)taosThreadRwlockWrlock(&tsdb->snapStatLock); + if (tsdb->pSnapStat != NULL) { + taosMemoryFree(tsdb->pSnapStat->pFileSetStats); + taosMemoryFree(tsdb->pSnapStat); + tsdb->pSnapStat = NULL; + } + (void)taosThreadRwlockUnlock(&tsdb->snapStatLock); + return; } @@ -546,6 +627,18 @@ int32_t tsdbSnapRead(STsdbSnapReader* reader, uint8_t** data) { if (code) { TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); } else { + // accumulate sent size when data was returned + if (data[0] != NULL) { + STsdb* tsdb = reader->tsdb; + (void)taosThreadRwlockWrlock(&tsdb->snapStatLock); + if (tsdb->pSnapStat != NULL) { + int32_t idx = reader->ctx->fsrArrIdx - 1; + if (idx >= 0 && idx < tsdb->pSnapStat->totalFileSets) { + tsdb->pSnapStat->pFileSetStats[idx].readSize += ((SSnapDataHdr*)data[0])->size; + } + } + (void)taosThreadRwlockUnlock(&tsdb->snapStatLock); + } tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__); } return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c b/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c index 254a9c5a2e1a..bf26b903d223 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c @@ -61,6 +61,51 @@ int32_t tsdbSnapRAWReaderOpen(STsdb* tsdb, int64_t ever, int8_t type, STsdbSnapR code = tsdbFSCreateRefSnapshot(tsdb->pFS, &reader[0]->fsetArr); TSDB_CHECK_CODE(code, lino, _exit); + // build snap send progress stat + { + int32_t n = (int32_t)TARRAY2_SIZE(reader[0]->fsetArr); + SSnapSendVnodeStat* pStat = (SSnapSendVnodeStat*)taosMemoryCalloc(1, sizeof(*pStat)); + if (pStat == NULL) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + pStat->totalFileSets = n; + pStat->startTime = taosGetTimestampMs(); + if (n > 0) { + pStat->pFileSetStats = (SSnapSendFileSetStat*)taosMemoryCalloc(n, sizeof(SSnapSendFileSetStat)); + if (pStat->pFileSetStats == NULL) { + taosMemoryFree(pStat); + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + for (int32_t i = 0; i < n; i++) { + STFileSet* fset = TARRAY2_GET(reader[0]->fsetArr, i); + SSnapSendFileSetStat* s = &pStat->pFileSetStats[i]; + s->fid = fset->fid; + s->sver = 0; + s->ever = ever; + s->transferType = type; + for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) { + if (fset->farr[ftype] != NULL) { + s->totalSize += fset->farr[ftype]->f[0].size; + s->fileCount++; + } + } + SSttLvl* lvl; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + s->totalSize += fobj->f[0].size; + s->fileCount++; + } + } + } + } + (void)taosThreadRwlockWrlock(&tsdb->snapStatLock); + tsdb->pSnapStat = pStat; + (void)taosThreadRwlockUnlock(&tsdb->snapStatLock); + } + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s, sver:0, ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, @@ -87,6 +132,15 @@ void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** reader) { tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr); taosMemoryFree(reader[0]); reader[0] = NULL; + + // clear progress stat + (void)taosThreadRwlockWrlock(&tsdb->snapStatLock); + if (tsdb->pSnapStat != NULL) { + taosMemoryFree(tsdb->pSnapStat->pFileSetStats); + taosMemoryFree(tsdb->pSnapStat); + tsdb->pSnapStat = NULL; + } + (void)taosThreadRwlockUnlock(&tsdb->snapStatLock); return; } @@ -237,6 +291,17 @@ static int32_t tsdbSnapRAWReadBegin(STsdbSnapRAWReader* reader) { reader->ctx->fset = TARRAY2_GET(reader->fsetArr, reader->ctx->fsetArrIdx++); reader->ctx->isDataDone = false; + // record fileset start time + STsdb* tsdb = reader->tsdb; + (void)taosThreadRwlockWrlock(&tsdb->snapStatLock); + if (tsdb->pSnapStat != NULL) { + int32_t idx = reader->ctx->fsetArrIdx - 1; + if (idx >= 0 && idx < tsdb->pSnapStat->totalFileSets) { + tsdb->pSnapStat->pFileSetStats[idx].startTime = taosGetTimestampMs(); + } + } + (void)taosThreadRwlockUnlock(&tsdb->snapStatLock); + code = tsdbSnapRAWReadFileSetOpenReader(reader); TSDB_CHECK_CODE(code, lino, _exit); @@ -255,6 +320,20 @@ static int32_t tsdbSnapRAWReadEnd(STsdbSnapRAWReader* reader) { tsdbSnapRAWReadFileSetCloseIter(reader); tsdbSnapRAWReadFileSetCloseReader(reader); reader->ctx->fset = NULL; + + // mark fileset finished + STsdb* tsdb = reader->tsdb; + (void)taosThreadRwlockWrlock(&tsdb->snapStatLock); + if (tsdb->pSnapStat != NULL) { + int32_t idx = reader->ctx->fsetArrIdx - 1; + tsdb->pSnapStat->finishedFileSets++; + if (idx >= 0 && idx < tsdb->pSnapStat->totalFileSets) { + SSnapSendFileSetStat* pFs = &tsdb->pSnapStat->pFileSetStats[idx]; + pFs->finishedFileCount = pFs->fileCount; + } + } + (void)taosThreadRwlockUnlock(&tsdb->snapStatLock); + return 0; } @@ -292,6 +371,19 @@ int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* reader, uint8_t** data) { if (code) { TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); } else { + // accumulate sent size when data was returned + if (data[0] != NULL) { + STsdb* tsdb = reader->tsdb; + (void)taosThreadRwlockWrlock(&tsdb->snapStatLock); + if (tsdb->pSnapStat != NULL) { + int32_t idx = reader->ctx->fsetArrIdx - 1; + if (idx >= 0 && idx < tsdb->pSnapStat->totalFileSets) { + tsdb->pSnapStat->pFileSetStats[idx].readSize += ((SSnapDataHdr*)data[0])->size; + tsdb->pSnapStat->pFileSetStats[idx].finishedFileCount = reader->dataIter->idx; + } + } + (void)taosThreadRwlockUnlock(&tsdb->snapStatLock); + } tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__); } return code; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 6f68b6c5a5f2..5f43d01fd0d6 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -981,6 +981,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert); pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess); vnodeGetBufferInfo(pVnode, &pLoad->bufferSegmentUsed, &pLoad->bufferSegmentSize); + pLoad->snapshotSending = syncSnapshotSending(pVnode->sync) ? 1 : 0; return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index feec2404b9c6..ede3e63e21e5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -940,3 +940,61 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { } return code; } + +/* ====================================================================== + * vnodeGetSnapSendProgress — copy the live snapshot-send progress from + * the tsdb layer into an SSnapSendVnodeInfo for the mnode progress query. + * + * Returns: + * 0 — progress data available, *pInfo populated (caller + * must taosMemoryFree(pInfo->pFileSetInfos)) + * TSDB_CODE_NOT_FOUND — no active snapshot send on this vnode + * TSDB_CODE_OUT_OF_MEMORY — failed to allocate pFileSetInfos copy + * ====================================================================== */ +int32_t vnodeGetSnapSendProgress(SVnode *pVnode, int32_t dnodeId, SSnapSendVnodeInfo *pInfo) { + STsdb *pTsdb = pVnode->pTsdb; + if (pTsdb == NULL) return TSDB_CODE_NOT_FOUND; + + (void)taosThreadRwlockRdlock(&pTsdb->snapStatLock); + + if (pTsdb->pSnapStat == NULL) { + (void)taosThreadRwlockUnlock(&pTsdb->snapStatLock); + return TSDB_CODE_NOT_FOUND; + } + + SSnapSendVnodeStat *pStat = pTsdb->pSnapStat; + + pInfo->vgId = TD_VID(pVnode); + pInfo->dnodeId = dnodeId; + pInfo->totalFileSets = pStat->totalFileSets; + pInfo->finishedFileSets = pStat->finishedFileSets; + pInfo->startTime = pStat->startTime; + pInfo->fileSetCount = pStat->totalFileSets; + pInfo->pFileSetInfos = NULL; + + int32_t code = 0; + if (pStat->totalFileSets > 0 && pStat->pFileSetStats != NULL) { + pInfo->pFileSetInfos = + (SSnapSendFileSetInfo *)taosMemoryMalloc(pStat->totalFileSets * sizeof(SSnapSendFileSetInfo)); + if (pInfo->pFileSetInfos == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + for (int32_t i = 0; i < pStat->totalFileSets; i++) { + SSnapSendFileSetStat *pSrc = &pStat->pFileSetStats[i]; + SSnapSendFileSetInfo *pDest = &pInfo->pFileSetInfos[i]; + pDest->fid = pSrc->fid; + pDest->fileCount = pSrc->fileCount; + pDest->finishedFileCount = pSrc->finishedFileCount; + pDest->totalSize = pSrc->totalSize; + pDest->readSize = pSrc->readSize; + pDest->startTime = pSrc->startTime; + pDest->sver = pSrc->sver; + pDest->ever = pSrc->ever; + pDest->transferType = pSrc->transferType; + } + } + } + + (void)taosThreadRwlockUnlock(&pTsdb->snapStatLock); + return code; +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 0a111fd7aff2..baf248168d34 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -585,29 +585,27 @@ bool syncIsReadyForRead(int64_t rid) { return ready; } -#ifdef BUILD_NO_CALL -bool syncSnapshotSending(int64_t rid) { +bool syncSnapshotRecving(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } - bool b = syncNodeSnapshotSending(pSyncNode); + bool b = syncNodeSnapshotRecving(pSyncNode); syncNodeRelease(pSyncNode); return b; } -bool syncSnapshotRecving(int64_t rid) { +bool syncSnapshotSending(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } - bool b = syncNodeSnapshotRecving(pSyncNode); + bool b = syncNodeSnapshotSending(pSyncNode); syncNodeRelease(pSyncNode); return b; } -#endif int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { if (pSyncNode->peersNum == 0) { @@ -3620,7 +3618,7 @@ bool syncNodeSnapshotSending(SSyncNode* pSyncNode) { if (pSyncNode == NULL) return false; bool b = false; for (int32_t i = 0; i < pSyncNode->totalReplicaNum; ++i) { - if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) { + if (pSyncNode->senders[i] != NULL && snapshotSenderIsStart(pSyncNode->senders[i])) { b = true; break; } diff --git a/test/cases/20-Metadata/test_meta_sysdb.py b/test/cases/20-Metadata/test_meta_sysdb.py index 2392a05af6e5..3771d05055fc 100644 --- a/test/cases/20-Metadata/test_meta_sysdb.py +++ b/test/cases/20-Metadata/test_meta_sysdb.py @@ -73,7 +73,7 @@ def check_ins_tables_count(self): ## 1.1 check count result tdSql.query("select count(*) cnt from information_schema.ins_tables", show=1) tdSql.checkRows(1) - tdSql.checkData(0, 0, 43) # 42 sys tables + 1 user table + tdSql.checkData(0, 0, 45) # 42 sys tables + 1 user table ## 2. check plan with group by ### 2.1 check plan with group by db_name @@ -87,7 +87,7 @@ def check_ins_tables_count(self): tdSql.query("select db_name, count(*) cnt from information_schema.ins_tables \ group by db_name order by cnt desc", show=1) tdSql.checkRows(4) - tdSql.checkData(0, 1, 37) # 37 tables in information_schema + tdSql.checkData(0, 1, 39) # 39 tables in information_schema tdSql.checkData(1, 1, 5) # 5 tables in sys tdSql.checkData(2, 1, 1) # 1 table in test_meta_sysdb tdSql.checkData(3, 1, 0) # 0 table in empty_db_for_count_test @@ -104,7 +104,7 @@ def check_ins_tables_count(self): group by stable_name order by cnt desc", show=1) tdSql.checkRows(3) tdSql.checkData(0, 0, None) - tdSql.checkData(0, 1, 42) # 42 normal tables in system databases + tdSql.checkData(0, 1, 44) # 42 normal tables in system databases tdSql.checkData(1, 0, "stb") tdSql.checkData(1, 1, 1) # 1 table in test_meta_sysdb.stb tdSql.checkData(2, 0, "stb_empty") @@ -123,7 +123,7 @@ def check_ins_tables_count(self): tdSql.query("select count(*) cnt from information_schema.ins_tables \ where db_name='information_schema'", show=1) tdSql.checkRows(1) - tdSql.checkData(0, 0, 37) # 37 tables in information_schema + tdSql.checkData(0, 0, 39) # 39 tables in information_schema ### 3.3 check plan with where stable_name tdSql.query("explain select count(*) cnt from information_schema.ins_tables \ diff --git a/test/cases/20-Metadata/test_table_count.py b/test/cases/20-Metadata/test_table_count.py index da3e171d9c7f..9636f32c9458 100644 --- a/test/cases/20-Metadata/test_table_count.py +++ b/test/cases/20-Metadata/test_table_count.py @@ -87,7 +87,7 @@ def test_table_count(self): ) tdSql.checkRows(3) - tdSql.checkData(0, 1, 43) + tdSql.checkData(0, 1, 45) tdSql.checkData(1, 1, 10) @@ -102,7 +102,7 @@ def test_table_count(self): tdSql.checkData(1, 1, 5) - tdSql.checkData(2, 1, 37) + tdSql.checkData(2, 1, 39) tdSql.checkData(3, 1, 5) @@ -121,7 +121,7 @@ def test_table_count(self): tdSql.checkData(4, 2, 3) - tdSql.checkData(5, 2, 37) + tdSql.checkData(5, 2, 39) tdSql.checkData(6, 2, 5) diff --git a/test/cases/23-Users/test_user_privilege_table.py b/test/cases/23-Users/test_user_privilege_table.py index 388f2ff486ad..a68f9953b1f5 100644 --- a/test/cases/23-Users/test_user_privilege_table.py +++ b/test/cases/23-Users/test_user_privilege_table.py @@ -8,7 +8,7 @@ def setup_class(cls): tdLog.debug(f"start to execute {__file__}") - def check_show_tables(self, dbName = 'test', stbNum = 0, tbNum = 0, sysTbNum = 42, otherDbTbNum = 0): + def check_show_tables(self, dbName = 'test', stbNum = 0, tbNum = 0, sysTbNum = 44, otherDbTbNum = 0): tdSql.query(f"show {dbName}.stables;") tdSql.checkRows(stbNum) tdSql.query(f"select * from information_schema.ins_stables where db_name='{dbName}';") @@ -58,7 +58,7 @@ def test_user_privilege_table(self): tdSql.execute(f"create table st2s2 using st2 tags(2, 'shanghai');") tdSql.execute(f"insert into st2s1 values(now, 1) st2s2 values(now, 2);") tdSql.execute(f"create user wxy pass 'taosdata';") - self.check_show_tables("test", 2, 4, 42) + self.check_show_tables("test", 2, 4, 44) tdLog.info( f"=============== case 1: database unauthorized and table unauthorized" @@ -90,7 +90,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.query(f"select * from test.st1;") tdSql.checkRows(2) - self.check_show_tables("test", 1, 2, 42) + self.check_show_tables("test", 1, 2, 44) tdSql.error( f"insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);" @@ -116,7 +116,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.query(f"select * from test.st1;") tdSql.checkRows(1) - self.check_show_tables("test", 1, 2, 42) + self.check_show_tables("test", 1, 2, 44) tdSql.error(f"insert into test.st1s1 values(now, 10);") tdSql.error(f"insert into test.st1s2 values(now, 20);") @@ -140,7 +140,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.error(f"select tbname, * from test.st1;") - self.check_show_tables("test", 1, 2, 42) + self.check_show_tables("test", 1, 2, 44) tdSql.execute(f"insert into test.st1s1 values(now, 10);") tdSql.execute(f"insert into test.st1s2 values(now, 20);") tdSql.error(f"select * from test.st2;") @@ -163,7 +163,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.error(f"select tbname, * from test.st1;") - self.check_show_tables("test", 1, 2, 42) + self.check_show_tables("test", 1, 2, 44) tdSql.execute(f"insert into test.st1s1 values(now, 10);") tdSql.execute( f"insert into test.st1s3 using test.st1 tags(1, 'dachang') values(now, 100);" @@ -193,7 +193,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.query(f"select * from test.st1;") tdSql.checkRows(6) - self.check_show_tables("test", 2, 5, 42) + self.check_show_tables("test", 2, 5, 44) tdSql.error( f"insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);" @@ -218,7 +218,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.query(f"select * from test.st1;") tdSql.checkRows(6) - self.check_show_tables("test", 2, 5, 42) + self.check_show_tables("test", 2, 5, 44) tdSql.error( f"insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);" @@ -244,7 +244,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.query(f"select * from test.st1;") tdSql.checkRows(4) - self.check_show_tables("test", 2, 5, 42) + self.check_show_tables("test", 2, 5, 44) tdSql.error( f"insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);" @@ -270,7 +270,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.query(f"select * from test.st1;") tdSql.checkRows(6) - self.check_show_tables("test", 2, 5, 42) + self.check_show_tables("test", 2, 5, 44) tdSql.execute( f"insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);" @@ -296,7 +296,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.query(f"select * from test.st1;") tdSql.checkRows(8) - self.check_show_tables("test", 2, 5, 42) + self.check_show_tables("test", 2, 5, 44) tdSql.execute(f"insert into test.st1s1 values(now, 10);") tdSql.error(f"insert into test.st1s2 values(now, 20);") @@ -321,7 +321,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.error(f"select * from test.st1;") - self.check_show_tables("test", 2, 5, 42) + self.check_show_tables("test", 2, 5, 44) tdSql.execute( f"insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);" ) @@ -343,7 +343,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.query(f"select * from test.st1;") tdSql.checkRows(11) - self.check_show_tables("test", 2, 5, 42) + self.check_show_tables("test", 2, 5, 44) tdSql.execute( f"insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);" @@ -367,7 +367,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.query(f"select * from test.st1;") tdSql.checkRows(8) - self.check_show_tables("test", 2, 5, 42) + self.check_show_tables("test", 2, 5, 44) tdSql.execute( f"insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);" @@ -390,7 +390,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.error(f"select * from test.st1;") - self.check_show_tables("test", 2, 5, 42) + self.check_show_tables("test", 2, 5, 44) tdSql.execute( f"insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);" ) @@ -412,7 +412,7 @@ def test_user_privilege_table(self): tdSql.execute(f"reset query cache;") tdSql.error(f"select * from test.st1;") - self.check_show_tables("test", 2, 5, 42) + self.check_show_tables("test", 2, 5, 44) tdSql.execute(f"insert into test.st1s1 values(now, 10);") tdSql.error(f"insert into test.st1s2 values(now, 20);") tdSql.error(f"select * from test.st2;") @@ -438,8 +438,8 @@ def test_user_privilege_table(self): tdSql.connect("wxy") tdSql.execute(f"reset query cache;") - self.check_show_tables("test", 2, 5, 42, 100) - self.check_show_tables("test2", 1, 100, 42, 5) + self.check_show_tables("test", 2, 5, 44, 100) + self.check_show_tables("test2", 1, 100, 44, 5) tdSql.connect("root") tdSql.execute(f"grant write on test2.st1 to wxy;") @@ -447,23 +447,23 @@ def test_user_privilege_table(self): tdSql.execute(f"grant read on test2.st3 with id = 1 to wxy;") tdSql.connect("wxy") tdSql.execute(f"reset query cache;") - self.check_show_tables("test", 2, 5, 42, 400) - self.check_show_tables("test2", 4, 400, 42, 5) + self.check_show_tables("test", 2, 5, 44, 400) + self.check_show_tables("test2", 4, 400, 44, 5) tdSql.connect("root") tdSql.execute(f"grant read on test2.ntb0 to wxy;") tdSql.execute(f"grant read on test2.ntb9 to wxy;") tdSql.connect("wxy") tdSql.execute(f"reset query cache;") - self.check_show_tables("test", 2, 5, 42, 402) - self.check_show_tables("test2", 4, 402, 42, 5) + self.check_show_tables("test", 2, 5, 44, 402) + self.check_show_tables("test2", 4, 402, 44, 5) tdSql.connect("root") tdSql.execute(f"grant read on test2.* to wxy;") tdSql.connect("wxy") tdSql.execute(f"reset query cache;") - self.check_show_tables("test", 2, 5, 42, 1010) - self.check_show_tables("test2", 10, 1010, 42, 5) + self.check_show_tables("test", 2, 5, 44, 1010) + self.check_show_tables("test2", 10, 1010, 44, 5) diff --git a/test/env/ci_default.yaml b/test/env/ci_default.yaml index 97f636c8d030..fbcdec5d93a5 100644 --- a/test/env/ci_default.yaml +++ b/test/env/ci_default.yaml @@ -11,18 +11,17 @@ settings: cDebugFlag: 135 charset: UTF-8 dDebugFlag: 131 - dataDir: - - /Users/simondominic/dev/TDinternal/sim/dnode1/data + dataDir: /root/workspace/TDinternal/sim/dnode1/data enableQueryHb: 1 jniDebugFlag: 131 locale: en_US.UTF-8 - logDir: /Users/simondominic/dev/TDinternal/sim/dnode1/log + logDir: /root/workspace/TDinternal/sim/dnode1/log mDebugFlag: 135 maxShellConns: 30000 monitor: 0 mqttPort: 6083 numOfLogLines: 100000000 - qDebugFlag: 135 + qDebugFlag: 131 rpcDebugFlag: 135 sDebugFlag: 131 smaDebugFlag: 135 @@ -33,11 +32,11 @@ settings: tmrDebugFlag: 131 tqDebugFlag: 135 uDebugFlag: 131 - vDebugFlag: 135 + vDebugFlag: 131 wDebugFlag: 131 - config_dir: /Users/simondominic/dev/TDinternal/sim/dnode1/cfg + config_dir: /root/workspace/TDinternal/sim/dnode1/cfg endpoint: localhost:6030 mqttPort: 6083 - system: darwin - taosdPath: /Users/simondominic/dev/TDinternal/debug/build/bin/taosd + system: linux + taosdPath: /root/workspace/TDinternal/debug/build/bin/taosd version: 2.4.0.0 diff --git a/tests/develop-test/2-query/table_count_scan.py b/tests/develop-test/2-query/table_count_scan.py index b5c6140481e5..bc5265912908 100644 --- a/tests/develop-test/2-query/table_count_scan.py +++ b/tests/develop-test/2-query/table_count_scan.py @@ -68,7 +68,7 @@ def run(self): for i in range(0, 3): db_name = tdSql.getData(i, 1) if db_name == 'information_schema': - tdSql.checkData(i, 0, 37) + tdSql.checkData(i, 0, 39) tdSql.checkData(i, 2, None) elif db_name == 'performance_schema': tdSql.checkData(i, 0, 5) @@ -81,7 +81,7 @@ def run(self): tdSql.query('select count(1) v,db_name, stable_name from information_schema.ins_tables group by db_name, stable_name order by v desc;') tdSql.checkRows(3) - tdSql.checkData(0, 0, 37) + tdSql.checkData(0, 0, 39) tdSql.checkData(0, 1, 'information_schema') tdSql.checkData(0, 2, None) tdSql.checkData(1, 0, 5) @@ -97,7 +97,7 @@ def run(self): tdSql.checkData(1, 1, 'performance_schema') tdSql.checkData(0, 0, 3) tdSql.checkData(0, 1, 'tbl_count') - tdSql.checkData(2, 0, 37) + tdSql.checkData(2, 0, 39) tdSql.checkData(2, 1, 'information_schema') tdSql.query("select count(*) from information_schema.ins_tables where db_name='tbl_count'") @@ -110,7 +110,7 @@ def run(self): tdSql.query('select count(*) from information_schema.ins_tables') tdSql.checkRows(1) - tdSql.checkData(0, 0, 45) + tdSql.checkData(0, 0, 47) tdSql.execute('create table stba (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);') @@ -193,7 +193,7 @@ def run(self): tdSql.checkData(2, 0, 5) tdSql.checkData(2, 1, 'performance_schema') tdSql.checkData(2, 2, None) - tdSql.checkData(3, 0, 37) + tdSql.checkData(3, 0, 39) tdSql.checkData(3, 1, 'information_schema') tdSql.checkData(3, 2, None) @@ -208,7 +208,7 @@ def run(self): tdSql.checkData(2, 0, 5) tdSql.checkData(2, 1, 'performance_schema') tdSql.checkData(2, 2, None) - tdSql.checkData(3, 0, 37) + tdSql.checkData(3, 0, 39) tdSql.checkData(3, 1, 'information_schema') tdSql.checkData(3, 2, None) @@ -219,7 +219,7 @@ def run(self): tdSql.checkData(0, 1, 'tbl_count') tdSql.checkData(1, 0, 5) tdSql.checkData(1, 1, 'performance_schema') - tdSql.checkData(2, 0, 37) + tdSql.checkData(2, 0, 39) tdSql.checkData(2, 1, 'information_schema') tdSql.query("select count(*) from information_schema.ins_tables where db_name='tbl_count'") @@ -232,7 +232,7 @@ def run(self): tdSql.query('select count(*) from information_schema.ins_tables') tdSql.checkRows(1) - tdSql.checkData(0, 0, 46) + tdSql.checkData(0, 0, 48) tdSql.execute('drop database tbl_count') diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 764030af7907..afc01e41bd07 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -61,7 +61,7 @@ def init(self, conn, logSql, replicaVar=1): self.ins_list = ['ins_dnodes','ins_mnodes','ins_qnodes','ins_snodes','ins_cluster','ins_databases','ins_functions',\ 'ins_indexes','ins_stables','ins_tables','ins_tags','ins_columns','ins_users','ins_grants','ins_vgroups','ins_configs','ins_dnode_variables',\ 'ins_topics','ins_subscriptions','ins_streams','ins_stream_tasks','ins_vnodes','ins_user_privileges','ins_views', - 'ins_compacts', 'ins_compact_details', 'ins_grants_full','ins_grants_logs', 'ins_machines', 'ins_arbgroups', 'ins_tsmas', "ins_encryptions", "ins_anodes", "ins_anodes_full", "ins_disk_usagea", "ins_filesets", "ins_transaction_details"] + 'ins_compacts', 'ins_compact_details', 'ins_grants_full','ins_grants_logs', 'ins_machines', 'ins_arbgroups', 'ins_tsmas', "ins_encryptions", "ins_anodes", "ins_anodes_full", "ins_disk_usagea", "ins_filesets", "ins_transaction_details","ins_snap_send_vnodes", "ins_snap_send_filesets"] self.perf_list = ['perf_connections','perf_queries','perf_consumers','perf_trans','perf_apps'] def insert_data(self,column_dict,tbname,row_num): insert_sql = self.setsql.set_insertsql(column_dict,tbname,self.binary_str,self.nchar_str) @@ -221,7 +221,7 @@ def ins_columns_check(self): tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") - tdSql.checkRows(333) + tdSql.checkRows(350) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkRows(65) @@ -394,6 +394,25 @@ def test_query_ins_tags(self): tdSql.query(sql) tdSql.checkRows(2) + def ins_snap_send_check(self): + # ins_snap_send_vnodes: table must exist and have the expected 6 columns. + # No active snapshot transfer in a single-node test, so 0 rows is correct. + tdSql.query('select * from information_schema.ins_snap_send_vnodes') + tdSql.checkRows(0) + tdSql.query('describe information_schema.ins_snap_send_vnodes') + col_names = [row[0] for row in tdSql.queryResult] + for col in ['vgroup_id', 'dnode_id', 'total_file_sets', 'finished_file_sets', 'start_time', 'elapsed']: + tdSql.checkEqual(True, col in col_names) + + # ins_snap_send_filesets: table must exist and have the expected 11 columns. + tdSql.query('select * from information_schema.ins_snap_send_filesets') + tdSql.checkRows(0) + tdSql.query('describe information_schema.ins_snap_send_filesets') + col_names = [row[0] for row in tdSql.queryResult] + for col in ['vgroup_id', 'fid', 'file_count', 'finished_file_count', + 'total_size', 'read_size', 'start_time', 'elapsed', + 'start_index', 'end_index', 'transfer_type']: + tdSql.checkEqual(True, col in col_names) def run(self): self.prepare_data() @@ -406,6 +425,7 @@ def run(self): self.ins_grants_check() self.ins_encryptions_check() self.test_query_ins_tags() + self.ins_snap_send_check() def stop(self): diff --git a/tests/system-test/2-query/union.py b/tests/system-test/2-query/union.py index 1abe2b1a650c..2717475f9ee5 100644 --- a/tests/system-test/2-query/union.py +++ b/tests/system-test/2-query/union.py @@ -441,7 +441,7 @@ def test_TD_33137(self): tdSql.checkRows(2) sql = "select db_name `TABLE_CAT`, '' `TABLE_SCHEM`, stable_name `TABLE_NAME`, 'TABLE' `TABLE_TYPE`, table_comment `REMARKS` from information_schema.ins_stables union all select db_name `TABLE_CAT`, '' `TABLE_SCHEM`, table_name `TABLE_NAME`, case when `type`='SYSTEM_TABLE' then 'TABLE' when `type`='NORMAL_TABLE' then 'TABLE' when `type`='CHILD_TABLE' then 'TABLE' else 'UNKNOWN' end `TABLE_TYPE`, table_comment `REMARKS` from information_schema.ins_tables union all select db_name `TABLE_CAT`, '' `TABLE_SCHEM`, view_name `TABLE_NAME`, 'VIEW' `TABLE_TYPE`, NULL `REMARKS` from information_schema.ins_views" tdSql.query(sql, queryTimes=1) - tdSql.checkRows(50) + tdSql.checkRows(52) sql = "select null union select null" tdSql.query(sql, queryTimes=1)