diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 296c0ad63cdc..6cb155a0ccfa 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -34,6 +34,8 @@ extern "C" { #define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSD INFO ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) #define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSD DEBUG ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD TRACE ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) + +#define tsdbInfoL(...) { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLongString("TSD INFO ", DEBUG_INFO, tsLogEmbedded ? 255 : tsdbDebugFlag, __VA_ARGS__); }} // clang-format on typedef struct TSDBROW TSDBROW; @@ -727,6 +729,31 @@ typedef struct STsdbRepOpts { int32_t tSerializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo); int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo); +int32_t tMissingFileListDataLenCalc(int32_t fileCount); +int32_t tDeserializeMissingFileList(void *buf, int32_t bufLen, void **ppFiles, int32_t *pFileCount, SHashObj **ppHash, + SHashObj **ppSttHash, int32_t vgId); +int32_t tsdbExtractMissingFids(STsdb *pTsdb, SHashObj *missingFileHash, int32_t **ppFids, int32_t *pFidCount); +int32_t tsdbDetermineFidSyncMode(STsdb *pTsdb, const void *files, int32_t fileCount, SHashObj **ppFidModeHash); + +#define TSDB_SNAP_SYNC_FILE_LEVEL 0 +#define TSDB_SNAP_SYNC_FSET_LEVEL 1 + +// snap file 5-tuple key for version-range matching: (fid, ftype, level, minVer, maxVer) +#define TSDB_SNAP_FILE_KEY_LEN (sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int64_t) + sizeof(int64_t)) +static inline void tsdbSnapFileKeyMake(int32_t fid, int32_t ftype, int32_t level, int64_t minVer, int64_t maxVer, + char key[TSDB_SNAP_FILE_KEY_LEN]) { + int32_t offset = 0; + memcpy(key + offset, &fid, sizeof(fid)); + offset += sizeof(fid); + memcpy(key + offset, &ftype, sizeof(ftype)); + offset += sizeof(ftype); + memcpy(key + offset, &level, sizeof(level)); + offset += sizeof(level); + memcpy(key + offset, &minVer, sizeof(minVer)); + offset += sizeof(minVer); + memcpy(key + offset, &maxVer, sizeof(maxVer)); +} + // snap read struct STsdbReadSnap { SMemTable *pMem; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 457c15bd7e87..76958dc6f0aa 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -333,7 +333,7 @@ int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback); // STsdbSnapReader ======================================== int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges, - STsdbSnapReader** ppReader); + const int32_t* missingFids, int32_t missingFidCount, STsdbSnapReader** ppReader); void tsdbSnapReaderClose(STsdbSnapReader** ppReader); int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); // STsdbSnapWriter ======================================== @@ -342,7 +342,9 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr); int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter, bool rollback); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); // STsdbSnapRAWReader ======================================== -int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader); +int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, void* pRanges, SHashObj* missingFileHash, + SHashObj* fidModeHash, SHashObj* missingSttHash, const int32_t* missingFids, + int32_t missingFidCount, STsdbSnapRAWReader** ppReader); void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader); int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData); // STsdbSnapRAWWriter ======================================== @@ -585,6 +587,7 @@ enum { SNAP_DATA_STREAM_STATE_BACKEND = 12, SNAP_DATA_TQ_CHECKINFO = 13, SNAP_DATA_RAW = 14, + SNAP_DATA_MISSING_FIDS = 16, }; struct SSnapDataHdr { diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 5836b69b5fd6..e21816453548 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -48,7 +48,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, (i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2), NULL, - &pReader->pDataReader[i]); + NULL, 0, &pReader->pDataReader[i]); TAOS_CHECK_GOTO(code, &lino, _exit); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index a9dee797bd95..35726be9ab2c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -205,6 +205,12 @@ static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) { code = load_json(fname, &json); TSDB_CHECK_CODE(code, lino, _exit); + char *jsonStr = cJSON_Print(json); + if (jsonStr != NULL) { + tsdbInfoL("vgId:%d, load_fs file:%s content:%s", TD_VID(pTsdb->pVnode), fname, jsonStr); + taosMemoryFree(jsonStr); + } + // parse json const cJSON *item1; @@ -1086,6 +1092,52 @@ static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) { return pHash; } +int32_t tsdbFSCreateRefSnapshotWithRanges(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr) { + int32_t code = 0; + STFileSet *fset, *fset1; + SHashObj *pHash = NULL; + + fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0])); + if (fsetArr[0] == NULL) return terrno; + + pHash = tsdbFSetRangeArrayToHash(pRanges); + if (pHash == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _out; + } + + (void)taosThreadMutexLock(&fs->tsdb->mutex); + TARRAY2_FOREACH(fs->fSetArr, fset) { + int32_t fid = fset->fid; + if (taosHashGet(pHash, &fid, sizeof(fid)) == NULL) { + tsdbDebug("skip fid:%d, not in ranges", fid); + continue; + } + + code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1); + if (code) break; + + code = TARRAY2_APPEND(fsetArr[0], fset1); + if (code) { + tsdbTFileSetClear(&fset1); + break; + } + } + (void)taosThreadMutexUnlock(&fs->tsdb->mutex); + + if (code) { + TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear); + taosMemoryFree(fsetArr[0]); + fsetArr[0] = NULL; + } + +_out: + if (pHash) { + taosHashCleanup(pHash); + } + return code; +} + int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr, TFileOpArray *fopArr) { int32_t code = 0; @@ -1139,7 +1191,21 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRa void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); } +static bool tsdbFSFidInMissingSet(int32_t fid, const int32_t *missingFids, int32_t missingFidCount) { + int32_t lo = 0, hi = missingFidCount - 1; + while (lo <= hi) { + int32_t mid = lo + (hi - lo) / 2; + if (missingFids[mid] == fid) return true; + if (missingFids[mid] < fid) + lo = mid + 1; + else + hi = mid - 1; + } + return false; +} + int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges, + const int32_t *missingFids, int32_t missingFidCount, TFileSetRangeArray **fsrArr) { int32_t code = 0; STFileSet *fset; @@ -1166,6 +1232,12 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev int64_t sver1 = sver; int64_t ever1 = ever; + // skip fids not in missing-fid filter + if (missingFids != NULL && !tsdbFSFidInMissingSet(fset->fid, missingFids, missingFidCount)) { + tsdbDebug("vgId:%d, skip fid:%d not in missing-fid set", TD_VID(fs->tsdb->pVnode), fset->fid); + continue; + } + if (pHash) { int32_t fid = fset->fid; STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid)); @@ -1180,7 +1252,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev continue; } - tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1); + tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1); code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1); if (code) break; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index 9694edcdd918..cb46a76baa95 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -45,13 +45,14 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr); void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr); int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr); int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr); +int32_t tsdbFSCreateRefSnapshotWithRanges(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr); void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr); int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pExclude, TFileSetArray **fsetArr, TFileOpArray *fopArr); void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr); int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges, - TFileSetRangeArray **fsrArr); + const int32_t *missingFids, int32_t missingFidCount, TFileSetRangeArray **fsrArr); void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr); // txn int64_t tsdbFSAllocEid(STFileSystem *fs); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c b/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c index 53bace994156..dc495de330fb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c @@ -16,7 +16,19 @@ #include "tsdb.h" #include "tsdbFS2.h" -#define TSDB_SNAP_MSG_VER 1 +#define TSDB_SNAP_MSG_VER 2 + +// file info for snapshot sync: (fid, ftype, level, minVer, maxVer, cid, size, isMissing) +typedef struct { + int32_t fid; + int32_t ftype; // tsdb_ftype_t + int32_t level; // STT level (0/1/2), farr files use 0 + int64_t minVer; // min version in file + int64_t maxVer; // max version in file + int64_t cid; // commit id + int64_t size; // file size + int8_t isMissing; // 1=missing, 0=present +} STsdbSnapFileInfo; // fset partition static int32_t tsdbFSetPartCmprFn(STsdbFSetPartition* x, STsdbFSetPartition* y) { @@ -529,6 +541,509 @@ int32_t tDeserializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) return code; } +int32_t tMissingFileListDataLenCalc(int32_t fileCount) { + int32_t hdrLen = sizeof(int32_t); + int32_t datLen = 0; + + int8_t msgVer = 0; + hdrLen += sizeof(msgVer); + datLen += hdrLen; + datLen += sizeof(int32_t); // fileCount + // fid + ftype + level + minVer + maxVer + cid + size + isMissing = 4+4+4+8+8+8+8+1 = 45 bytes per record + datLen += fileCount * (sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int64_t) + sizeof(int64_t) + + sizeof(int64_t) + sizeof(int64_t) + sizeof(int8_t)); + return datLen; +} + +int32_t tSerializeMissingFileList(void* buf, int32_t bufLen, const STsdbSnapFileInfo* files, int32_t fileCount) { + int32_t code = 0; + SEncoder encoder = {0}; + int8_t msgVer = TSDB_SNAP_MSG_VER; + + tEncoderInit(&encoder, buf, bufLen); + + if ((code = tStartEncode(&encoder))) goto _err; + if ((code = tEncodeI8(&encoder, msgVer))) goto _err; + if ((code = tEncodeI32(&encoder, fileCount))) goto _err; + for (int32_t i = 0; i < fileCount; ++i) { + if ((code = tEncodeI32(&encoder, files[i].fid))) goto _err; + if ((code = tEncodeI32(&encoder, files[i].ftype))) goto _err; + if ((code = tEncodeI32(&encoder, files[i].level))) goto _err; + if ((code = tEncodeI64(&encoder, files[i].minVer))) goto _err; + if ((code = tEncodeI64(&encoder, files[i].maxVer))) goto _err; + if ((code = tEncodeI64(&encoder, files[i].cid))) goto _err; + if ((code = tEncodeI64(&encoder, files[i].size))) goto _err; + if ((code = tEncodeI8(&encoder, files[i].isMissing))) goto _err; + } + + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; + +_err: + tEncoderClear(&encoder); + return code; +} + +int32_t tDeserializeMissingFileList(void* buf, int32_t bufLen, void** ppFiles, int32_t* pFileCount, SHashObj** ppHash, + SHashObj** ppSttHash, int32_t vgId) { + int32_t code = 0; + SDecoder decoder = {0}; + int8_t msgVer = 0; + int32_t fileCount = 0; + SHashObj* pHash = NULL; + SHashObj* pSttHash = NULL; + STsdbSnapFileInfo* files = NULL; + + tDecoderInit(&decoder, buf, bufLen); + + if ((code = tStartDecode(&decoder))) goto _err; + if ((code = tDecodeI8(&decoder, &msgVer))) goto _err; + if (msgVer != TSDB_SNAP_MSG_VER) { + code = TSDB_CODE_INVALID_MSG; + goto _err; + } + if ((code = tDecodeI32(&decoder, &fileCount))) goto _err; + if (fileCount < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _err; + } + if (fileCount > 0) { + pHash = taosHashInit(fileCount * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (pHash == NULL) { + code = terrno; + goto _err; + } + pSttHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (pSttHash == NULL) { + code = terrno; + goto _err; + } + files = taosMemoryMalloc(fileCount * sizeof(STsdbSnapFileInfo)); + if (files == NULL) { + code = terrno; + goto _err; + } + for (int32_t i = 0; i < fileCount; ++i) { + if ((code = tDecodeI32(&decoder, &files[i].fid))) goto _err; + if ((code = tDecodeI32(&decoder, &files[i].ftype))) goto _err; + if ((code = tDecodeI32(&decoder, &files[i].level))) goto _err; + if ((code = tDecodeI64(&decoder, &files[i].minVer))) goto _err; + if ((code = tDecodeI64(&decoder, &files[i].maxVer))) goto _err; + if ((code = tDecodeI64(&decoder, &files[i].cid))) goto _err; + if ((code = tDecodeI64(&decoder, &files[i].size))) goto _err; + if ((code = tDecodeI8(&decoder, &files[i].isMissing))) goto _err; + + tsdbInfo("vgId:%d, FileInfo fid:%d ftype:%d level:%d minVer:%" PRId64 " maxVer:%" PRId64 " cid:%" PRId64 + " size:%" PRId64 " isMissing:%d", + vgId, files[i].fid, files[i].ftype, files[i].level, files[i].minVer, files[i].maxVer, files[i].cid, + files[i].size, files[i].isMissing); + + if (files[i].isMissing) { + char dummy = 0; + char key[TSDB_SNAP_FILE_KEY_LEN]; + tsdbSnapFileKeyMake(files[i].fid, files[i].ftype, files[i].level, files[i].minVer, files[i].maxVer, key); + + if (taosHashPut(pHash, key, sizeof(key), &dummy, sizeof(dummy)) != 0) { + code = terrno; + goto _err; + } + // for STT files, also put into missingSttHash keyed by 5-tuple (fid, ftype, level, minVer, maxVer) + if (files[i].ftype == TSDB_FTYPE_STT) { + if (taosHashPut(pSttHash, key, sizeof(key), &dummy, sizeof(dummy)) != 0) { + code = terrno; + goto _err; + } + } + } + } + } + + tEndDecode(&decoder); + tDecoderClear(&decoder); + + *ppHash = pHash; + *ppSttHash = pSttHash; + *ppFiles = (void*)files; + *pFileCount = fileCount; + return 0; + +_err: + if (pHash) taosHashCleanup(pHash); + if (pSttHash) taosHashCleanup(pSttHash); + taosMemoryFree(files); + tDecoderClear(&decoder); + return code; +} + +int32_t tsdbExtractMissingFids(STsdb* pTsdb, SHashObj* missingFileHash, int32_t** ppFids, int32_t* pFidCount) { + int32_t code = 0; + int32_t fidCap = 0; + int32_t fidCount = 0; + int32_t* fids = NULL; + + // extract unique fids from hash keys (key = 5-tuple binary: fid, ftype, level, minVer, maxVer) + void* pIter = NULL; + while ((pIter = taosHashIterate(missingFileHash, pIter)) != NULL) { + size_t keyLen = 0; + char* pKey = taosHashGetKey(pIter, &keyLen); + int32_t fid; + memcpy(&fid, pKey, sizeof(fid)); + + // check if fid already exists + bool exists = false; + for (int32_t i = 0; i < fidCount; ++i) { + if (fids[i] == fid) { + exists = true; + break; + } + } + if (exists) continue; + + if (fidCount >= fidCap) { + int32_t newCap = fidCap == 0 ? 16 : fidCap * 2; + int32_t* tmp = taosMemoryRealloc(fids, newCap * sizeof(int32_t)); + if (tmp == NULL) { + code = terrno; + taosHashCancelIterate(missingFileHash, pIter); + taosMemoryFree(fids); + return code; + } + fids = tmp; + fidCap = newCap; + } + fids[fidCount++] = fid; + } + + // sort fids for binary search + if (fidCount > 1) { + for (int32_t i = 0; i < fidCount - 1; ++i) { + for (int32_t j = i + 1; j < fidCount; ++j) { + if (fids[i] > fids[j]) { + int32_t tmp = fids[i]; + fids[i] = fids[j]; + fids[j] = tmp; + } + } + } + } + + *ppFids = fids; + *pFidCount = fidCount; + return 0; +} + +int32_t tsdbDetermineFidSyncMode(STsdb* pTsdb, const void* pFileArr, int32_t fileCount, SHashObj** ppFidModeHash) { + int32_t code = 0; + SHashObj* pFidModeHash = NULL; + SHashObj* pLeaderKeyHash = NULL; + SHashObj* pFollowerKeyHash = NULL; + SHashObj* pFollowerFidSet = NULL; + const STsdbSnapFileInfo* files = (const STsdbSnapFileInfo*)pFileArr; + + if (fileCount <= 0 || files == NULL) { + *ppFidModeHash = NULL; + return 0; + } + + pFidModeHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pFidModeHash == NULL) { + return terrno; + } + + // build leader file key hash: key=(fid,ftype,level,minVer,maxVer) -> (cid, size) + pLeaderKeyHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (pLeaderKeyHash == NULL) { + code = terrno; + goto _out; + } + + typedef struct { + int64_t cid; + int64_t size; + } SLeaderFileVal; + + (void)taosThreadMutexLock(&pTsdb->mutex); + + // populate leader key hash + { + STFileSet* fset; + TARRAY2_FOREACH(pTsdb->pFS->fSetArr, fset) { + for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { + if (fset->farr[ftype] != NULL) { + char key[TSDB_SNAP_FILE_KEY_LEN]; + tsdbSnapFileKeyMake(fset->fid, ftype, 0, fset->farr[ftype]->f->minVer, fset->farr[ftype]->f->maxVer, key); + SLeaderFileVal val = {.cid = fset->farr[ftype]->f->cid, .size = fset->farr[ftype]->f->size}; + tsdbInfo("vgId:%d, leader FileInfo fid:%d ftype:%d level:%d minVer:%" PRId64 " maxVer:%" PRId64 + " cid:%" PRId64 " size:%" PRId64, + TD_VID(pTsdb->pVnode), fset->fid, ftype, 0, fset->farr[ftype]->f->minVer, + fset->farr[ftype]->f->maxVer, fset->farr[ftype]->f->cid, fset->farr[ftype]->f->size); + if (taosHashPut(pLeaderKeyHash, key, TSDB_SNAP_FILE_KEY_LEN, &val, sizeof(val)) != 0) { + code = terrno; + goto _unlock; + } + } + } + SSttLvl* lvl; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + char key[TSDB_SNAP_FILE_KEY_LEN]; + tsdbSnapFileKeyMake(fset->fid, TSDB_FTYPE_STT, lvl->level, fobj->f->minVer, fobj->f->maxVer, key); + SLeaderFileVal val = {.cid = fobj->f->cid, .size = fobj->f->size}; + tsdbInfo("vgId:%d, leader FileInfo fid:%d ftype:%d level:%d minVer:%" PRId64 " maxVer:%" PRId64 + " cid:%" PRId64 " size:%" PRId64, + TD_VID(pTsdb->pVnode), fset->fid, TSDB_FTYPE_STT, lvl->level, fobj->f->minVer, fobj->f->maxVer, + fobj->f->cid, fobj->f->size); + if (taosHashPut(pLeaderKeyHash, key, TSDB_SNAP_FILE_KEY_LEN, &val, sizeof(val)) != 0) { + code = terrno; + goto _unlock; + } + } + } + } + } + + // first pass: check each follower present file against leader + for (int32_t i = 0; i < fileCount; ++i) { + if (files[i].isMissing) continue; + + int32_t fid = files[i].fid; + + // skip if already FSET_LEVEL + uint8_t* pExistMode = taosHashGet(pFidModeHash, &fid, sizeof(fid)); + if (pExistMode != NULL && *pExistMode == TSDB_SNAP_SYNC_FSET_LEVEL) { + continue; + } + + char key[TSDB_SNAP_FILE_KEY_LEN]; + tsdbSnapFileKeyMake(fid, files[i].ftype, files[i].level, files[i].minVer, files[i].maxVer, key); + + SLeaderFileVal* pLeaderVal = taosHashGet(pLeaderKeyHash, key, TSDB_SNAP_FILE_KEY_LEN); + + uint8_t mode = TSDB_SNAP_SYNC_FILE_LEVEL; + if (pLeaderVal == NULL) { + tsdbInfo("vgId:%d, snap leader no match key fid:%d ftype:%d level:%d minVer:%" PRId64 " maxVer:%" PRId64, + TD_VID(pTsdb->pVnode), fid, files[i].ftype, files[i].level, files[i].minVer, files[i].maxVer); + mode = TSDB_SNAP_SYNC_FSET_LEVEL; + } else if (pLeaderVal->size != files[i].size) { + tsdbInfo("vgId:%d, snap size mismatch fid:%d ftype:%d level:%d leader-size:%" PRId64 " follower-size:%" PRId64, + TD_VID(pTsdb->pVnode), fid, files[i].ftype, files[i].level, pLeaderVal->size, files[i].size); + mode = TSDB_SNAP_SYNC_FSET_LEVEL; + } else if (llabs(pLeaderVal->cid - files[i].cid) > 10) { + tsdbInfo("vgId:%d, snap cid diff>10 fid:%d ftype:%d level:%d leader-cid:%" PRId64 " follower-cid:%" PRId64, + TD_VID(pTsdb->pVnode), fid, files[i].ftype, files[i].level, pLeaderVal->cid, files[i].cid); + mode = TSDB_SNAP_SYNC_FSET_LEVEL; + } + + if (taosHashPut(pFidModeHash, &fid, sizeof(fid), &mode, sizeof(mode)) != 0) { + code = terrno; + goto _unlock; + } + } + + // second pass: check leader files not known by follower (within reported fids) + pFollowerKeyHash = + taosHashInit(fileCount * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (pFollowerKeyHash == NULL) { + code = terrno; + goto _unlock; + } + + for (int32_t i = 0; i < fileCount; ++i) { + char key[TSDB_SNAP_FILE_KEY_LEN]; + tsdbSnapFileKeyMake(files[i].fid, files[i].ftype, files[i].level, files[i].minVer, files[i].maxVer, key); + char dummy = 0; + if (taosHashPut(pFollowerKeyHash, key, TSDB_SNAP_FILE_KEY_LEN, &dummy, sizeof(dummy)) != 0) { + code = terrno; + goto _unlock; + } + } + + // collect fids that follower reported + pFollowerFidSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pFollowerFidSet == NULL) { + code = terrno; + goto _unlock; + } + for (int32_t i = 0; i < fileCount; ++i) { + char dummy = 0; + (void)taosHashPut(pFollowerFidSet, &files[i].fid, sizeof(files[i].fid), &dummy, sizeof(dummy)); + } + + { + STFileSet* fset; + TARRAY2_FOREACH(pTsdb->pFS->fSetArr, fset) { + int32_t fid = fset->fid; + + // skip fids not reported by follower + if (taosHashGet(pFollowerFidSet, &fid, sizeof(fid)) == NULL) continue; + + // skip if already FSET_LEVEL + uint8_t* pExistMode = taosHashGet(pFidModeHash, &fid, sizeof(fid)); + if (pExistMode != NULL && *pExistMode == TSDB_SNAP_SYNC_FSET_LEVEL) continue; + + for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { + if (fset->farr[ftype] != NULL) { + char key[TSDB_SNAP_FILE_KEY_LEN]; + tsdbSnapFileKeyMake(fid, ftype, 0, fset->farr[ftype]->f->minVer, fset->farr[ftype]->f->maxVer, key); + if (taosHashGet(pFollowerKeyHash, key, TSDB_SNAP_FILE_KEY_LEN) == NULL) { + tsdbInfo("vgId:%d, snap follower missing leader file fid:%d ftype:%d minVer:%" PRId64 " maxVer:%" PRId64, + TD_VID(pTsdb->pVnode), fid, ftype, fset->farr[ftype]->f->minVer, fset->farr[ftype]->f->maxVer); + uint8_t mode = TSDB_SNAP_SYNC_FSET_LEVEL; + if (taosHashPut(pFidModeHash, &fid, sizeof(fid), &mode, sizeof(mode)) != 0) { + code = terrno; + goto _unlock; + } + goto _next_fset; + } + } + } + + SSttLvl* lvl; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + char key[TSDB_SNAP_FILE_KEY_LEN]; + tsdbSnapFileKeyMake(fid, TSDB_FTYPE_STT, lvl->level, fobj->f->minVer, fobj->f->maxVer, key); + if (taosHashGet(pFollowerKeyHash, key, TSDB_SNAP_FILE_KEY_LEN) == NULL) { + tsdbInfo("vgId:%d, snap follower missing leader stt fid:%d level:%d minVer:%" PRId64 " maxVer:%" PRId64, + TD_VID(pTsdb->pVnode), fid, lvl->level, fobj->f->minVer, fobj->f->maxVer); + uint8_t mode = TSDB_SNAP_SYNC_FSET_LEVEL; + if (taosHashPut(pFidModeHash, &fid, sizeof(fid), &mode, sizeof(mode)) != 0) { + code = terrno; + goto _unlock; + } + goto _next_fset; + } + } + } + _next_fset:; + } + } + +_unlock: + (void)taosThreadMutexUnlock(&pTsdb->mutex); + +_out: + if (pFollowerKeyHash) taosHashCleanup(pFollowerKeyHash); + if (pFollowerFidSet) taosHashCleanup(pFollowerFidSet); + if (pLeaderKeyHash) taosHashCleanup(pLeaderKeyHash); + + if (code != 0) { + taosHashCleanup(pFidModeHash); + *ppFidModeHash = NULL; + } else { + tsdbInfo("vgId:%d, Fid mode count %d", TD_VID(pTsdb->pVnode), taosHashGetSize(pFidModeHash)); + *ppFidModeHash = pFidModeHash; + } + return code; +} + +static int32_t tsdbCollectAllFileInfo(SVnode* pVnode, STsdbSnapFileInfo** ppFiles, int32_t* pFileCount) { + int32_t code = 0; + STsdbSnapFileInfo* files = NULL; + int32_t fileCount = 0; + int32_t fileCap = 0; + STsdb* pTsdb = pVnode->pTsdb; + + *ppFiles = NULL; + *pFileCount = 0; + + (void)taosThreadMutexLock(&pTsdb->mutex); + + STFileSet* fset; + TARRAY2_FOREACH(pTsdb->pFS->fSetArr, fset) { + // collect farr entries (HEAD, DATA, SMA, TOMB) + for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { + if (fset->farr[ftype] != NULL) { + if (fileCount >= fileCap) { + int32_t newCap = fileCap == 0 ? 16 : fileCap * 2; + STsdbSnapFileInfo* tmp = taosMemoryRealloc(files, newCap * sizeof(STsdbSnapFileInfo)); + if (tmp == NULL) { + code = terrno; + goto _unlock; + } + files = tmp; + fileCap = newCap; + } + files[fileCount].fid = fset->fid; + files[fileCount].ftype = ftype; + files[fileCount].level = 0; + files[fileCount].minVer = fset->farr[ftype]->f->minVer; + files[fileCount].maxVer = fset->farr[ftype]->f->maxVer; + files[fileCount].cid = fset->farr[ftype]->f->cid; + files[fileCount].size = fset->farr[ftype]->f->size; + files[fileCount].isMissing = !taosCheckExistFile(fset->farr[ftype]->fname) ? 1 : 0; + tsdbInfo("vgId:%d, collect all file info, fid:%d ftype:%d level:%d minVer:%" PRId64 " maxVer:%" PRId64 + " cid:%" PRId64 " size:%" PRId64 " isMissing:%d", + TD_VID(pVnode), files[fileCount].fid, files[fileCount].ftype, files[fileCount].level, + files[fileCount].minVer, files[fileCount].maxVer, files[fileCount].cid, files[fileCount].size, + files[fileCount].isMissing); + fileCount++; + } + } + + // collect STT files in lvlArr + SSttLvl* lvl; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + if (fileCount >= fileCap) { + int32_t newCap = fileCap == 0 ? 16 : fileCap * 2; + STsdbSnapFileInfo* tmp = taosMemoryRealloc(files, newCap * sizeof(STsdbSnapFileInfo)); + if (tmp == NULL) { + code = terrno; + goto _unlock; + } + files = tmp; + fileCap = newCap; + } + files[fileCount].fid = fset->fid; + files[fileCount].ftype = TSDB_FTYPE_STT; + files[fileCount].level = lvl->level; + files[fileCount].minVer = fobj->f->minVer; + files[fileCount].maxVer = fobj->f->maxVer; + files[fileCount].cid = fobj->f->cid; + files[fileCount].size = fobj->f->size; + files[fileCount].isMissing = !taosCheckExistFile(fobj->fname) ? 1 : 0; + tsdbInfo("vgId:%d, collect all file info, fid:%d ftype:%d level:%d minVer:%" PRId64 " maxVer:%" PRId64 + " cid:%" PRId64 " size:%" PRId64 " isMissing:%d", + TD_VID(pVnode), files[fileCount].fid, files[fileCount].ftype, files[fileCount].level, + files[fileCount].minVer, files[fileCount].maxVer, files[fileCount].cid, files[fileCount].size, + files[fileCount].isMissing); + fileCount++; + } + } + } + +_unlock: + (void)taosThreadMutexUnlock(&pTsdb->mutex); + + if (code != 0) { + taosMemoryFree(files); + return code; + } + + *ppFiles = files; + *pFileCount = fileCount; + return 0; +} + +static int32_t tsdbMissingFilesEstSize(int32_t fileCount) { + return sizeof(SSyncTLV) + tMissingFileListDataLenCalc(fileCount); +} + +static int32_t tsdbMissingFilesSerialize(const STsdbSnapFileInfo* files, int32_t fileCount, void* buf, int32_t bufLen) { + SSyncTLV* pSubHead = buf; + int32_t tlen = tSerializeMissingFileList(pSubHead->val, bufLen - sizeof(*pSubHead), files, fileCount); + if (tlen < 0) return tlen; + pSubHead->typ = SNAP_DATA_MISSING_FIDS; + pSubHead->len = tlen; + return sizeof(*pSubHead) + tlen; +} + static int32_t tsdbRepOptsEstSize(STsdbRepOpts* pOpts) { int32_t dataLen = 0; dataLen += sizeof(SSyncTLV); @@ -578,9 +1093,8 @@ static int32_t tsdbSnapPrepDealWithSnapInfo(SVnode* pVnode, SSnapshot* pSnap, ST } } break; default: - code = TSDB_CODE_INVALID_MSG; - tsdbError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), pField->typ); - return code; + tsdbWarn("vgId:%d, unknown subfield type in snap info, skipping. typ:%d", TD_VID(pVnode), pField->typ); + break; } } @@ -598,7 +1112,9 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { } // deal with snap info for reply - STsdbRepOpts opts = {.format = TSDB_SNAP_REP_FMT_RAW}; + STsdbRepOpts opts = {.format = TSDB_SNAP_REP_FMT_RAW}; + STsdbSnapFileInfo* missingFiles = NULL; + int32_t missingFileCount = 0; if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { STsdbRepOpts leaderOpts = {0}; if ((code = tsdbSnapPrepDealWithSnapInfo(pVnode, pSnap, &leaderOpts)) < 0) { @@ -606,6 +1122,15 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { goto _out; } opts.format = TMIN(opts.format, leaderOpts.format); + + int32_t detectCode = tsdbCollectAllFileInfo(pVnode, &missingFiles, &missingFileCount); + if (detectCode != 0) { + tsdbWarn("vgId:%d, failed to collect file info since %s, continuing without", TD_VID(pVnode), + tstrerror(detectCode)); + missingFileCount = 0; + } else if (missingFileCount > 0) { + tsdbInfo("vgId:%d, collected %d file info entries for snapshot", TD_VID(pVnode), missingFileCount); + } } // info data realloc @@ -613,6 +1138,9 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { int32_t bufLen = headLen; bufLen += tsdbPartitionInfoEstSize(pInfo); bufLen += tsdbRepOptsEstSize(&opts); + if (missingFileCount > 0) { + bufLen += tsdbMissingFilesEstSize(missingFileCount); + } if ((code = syncSnapInfoDataRealloc(pSnap, bufLen)) != 0) { tsdbError("vgId:%d, failed to realloc memory for data of snap info. bytes:%d", TD_VID(pVnode), bufLen); goto _out; @@ -637,6 +1165,15 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { } offset += tlen; + if (missingFileCount > 0) { + if ((tlen = tsdbMissingFilesSerialize(missingFiles, missingFileCount, buf + offset, bufLen - offset)) < 0) { + code = tlen; + tsdbError("vgId:%d, failed to serialize missing files since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + offset += tlen; + } + // set header of info data SSyncTLV* pHead = pSnap->data; pHead->typ = pSnap->type; @@ -646,6 +1183,7 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { pHead->len); _out: + taosMemoryFree(missingFiles); tsdbPartitionInfoClear(pInfo); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index b356beee453b..654203e1294e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -52,6 +52,10 @@ struct STsdbSnapReader { TTsdbIterArray tombIterArr[1]; SIterMerger* tombIterMerger; + // missing fid filter (NULL = send all, non-NULL = only send listed fids) + int32_t* missingFids; + int32_t missingFidCount; + // data SBlockData blockData[1]; STombBlock tombBlock[1]; @@ -198,12 +202,35 @@ static void tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) { return; } +static bool tsdbSnapFidInMissingSet(int32_t fid, const int32_t* missingFids, int32_t missingFidCount) { + int32_t lo = 0, hi = missingFidCount - 1; + while (lo <= hi) { + int32_t mid = lo + (hi - lo) / 2; + if (missingFids[mid] == fid) return true; + if (missingFids[mid] < fid) + lo = mid + 1; + else + hi = mid - 1; + } + return false; +} + static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) { int32_t code = 0; int32_t lino = 0; - if (reader->ctx->fsrArrIdx < TARRAY2_SIZE(reader->fsrArr)) { + while (reader->ctx->fsrArrIdx < TARRAY2_SIZE(reader->fsrArr)) { reader->ctx->fsr = TARRAY2_GET(reader->fsrArr, reader->ctx->fsrArrIdx++); + + // skip fids not in missing-fid filter + if (reader->missingFids != NULL && + !tsdbSnapFidInMissingSet(reader->ctx->fsr->fset->fid, reader->missingFids, reader->missingFidCount)) { + tsdbDebug("vgId:%d, snap reader skip fid:%d not in missing-fid set", TD_VID(reader->tsdb->pVnode), + reader->ctx->fsr->fset->fid); + reader->ctx->fsr = NULL; + continue; + } + reader->ctx->isDataDone = false; reader->ctx->isTombDone = false; @@ -212,6 +239,8 @@ static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) { code = tsdbSnapReadFileSetOpenIter(reader); TSDB_CHECK_CODE(code, lino, _exit); + + return code; } _exit: @@ -441,7 +470,7 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) { } int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges, - STsdbSnapReader** reader) { + const int32_t* missingFids, int32_t missingFidCount, STsdbSnapReader** reader) { int32_t code = 0; int32_t lino = 0; @@ -453,7 +482,20 @@ int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, reader[0]->ever = ever; reader[0]->type = type; - code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, &reader[0]->fsrArr); + // copy missing fid filter + if (missingFids != NULL && missingFidCount > 0) { + reader[0]->missingFids = taosMemoryMalloc(missingFidCount * sizeof(int32_t)); + if (reader[0]->missingFids == NULL) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + memcpy(reader[0]->missingFids, missingFids, missingFidCount * sizeof(int32_t)); + reader[0]->missingFidCount = missingFidCount; + tsdbInfo("vgId:%d, snap reader opened with %d missing-fid filter", TD_VID(tsdb->pVnode), missingFidCount); + } + + code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, reader[0]->missingFids, + reader[0]->missingFidCount, &reader[0]->fsrArr); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -461,6 +503,7 @@ int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code), sver, ever, type); tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr); + taosMemoryFree(reader[0]->missingFids); taosMemoryFree(reader[0]); reader[0] = NULL; } else { @@ -496,6 +539,7 @@ void tsdbSnapReaderClose(STsdbSnapReader** reader) { tBufferDestroy(reader[0]->buffers + i); } + taosMemoryFree(reader[0]->missingFids); taosMemoryFree(reader[0]); reader[0] = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c b/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c index 254a9c5a2e1a..09e4f48486ca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c @@ -45,9 +45,31 @@ typedef struct STsdbSnapRAWReader { // iter SDataFileRAWReaderIter dataIter[1]; + + // missing file filter + SHashObj* missingFileHash; // key=(fid,ftype,level,minVer,maxVer) — per-file filtering (not owned, do not free) + SHashObj* fidModeHash; // key=fid, val=uint8_t mode (not owned, do not free) + SHashObj* missingSttHash; // key=(fid,cid) — per-STT filtering (not owned, do not free) + int32_t* missingFids; // FID set for FID-level pre-filtering (owned, copy) + int32_t missingFidCount; } STsdbSnapRAWReader; -int32_t tsdbSnapRAWReaderOpen(STsdb* tsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** reader) { +static bool tsdbFidInMissingSet(int32_t fid, const int32_t* missingFids, int32_t missingFidCount) { + int32_t lo = 0, hi = missingFidCount - 1; + while (lo <= hi) { + int32_t mid = lo + (hi - lo) / 2; + if (missingFids[mid] == fid) return true; + if (missingFids[mid] < fid) + lo = mid + 1; + else + hi = mid - 1; + } + return false; +} + +int32_t tsdbSnapRAWReaderOpen(STsdb* tsdb, int64_t ever, int8_t type, void* pRanges, SHashObj* missingFileHash, + SHashObj* fidModeHash, SHashObj* missingSttHash, const int32_t* missingFids, + int32_t missingFidCount, STsdbSnapRAWReader** reader) { int32_t code = 0; int32_t lino = 0; @@ -58,19 +80,42 @@ int32_t tsdbSnapRAWReaderOpen(STsdb* tsdb, int64_t ever, int8_t type, STsdbSnapR reader[0]->ever = ever; reader[0]->type = type; - code = tsdbFSCreateRefSnapshot(tsdb->pFS, &reader[0]->fsetArr); + // set missing file filter (hash is borrowed, not owned) + reader[0]->missingFileHash = missingFileHash; + reader[0]->fidModeHash = fidModeHash; + reader[0]->missingSttHash = missingSttHash; + + // copy missing fid filter + if (missingFids != NULL && missingFidCount > 0) { + reader[0]->missingFids = taosMemoryMalloc(missingFidCount * sizeof(int32_t)); + if (reader[0]->missingFids == NULL) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + memcpy(reader[0]->missingFids, missingFids, missingFidCount * sizeof(int32_t)); + reader[0]->missingFidCount = missingFidCount; + tsdbInfo("vgId:%d, RAW reader opened with %d missing-fid filter", TD_VID(tsdb->pVnode), missingFidCount); + } + + TFileSetRangeArray* pTypedRanges = (TFileSetRangeArray*)pRanges; + if (pTypedRanges != NULL && TARRAY2_SIZE(pTypedRanges) > 0) { + code = tsdbFSCreateRefSnapshotWithRanges(tsdb->pFS, pTypedRanges, &reader[0]->fsetArr); + } else { + code = tsdbFSCreateRefSnapshot(tsdb->pFS, &reader[0]->fsetArr); + } TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s, sver:0, ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, - lino, tstrerror(code), ever, type); + tsdbError("vgId:%d %s failed at line %d since %s, ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, lino, + tstrerror(code), ever, type); tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr); + taosMemoryFree(reader[0]->missingFids); taosMemoryFree(reader[0]); reader[0] = NULL; } else { - tsdbInfo("vgId:%d, tsdb snapshot raw reader opened. sver:0, ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), ever, - type); + tsdbInfo("vgId:%d, tsdb snapshot raw reader opened. ever:%" PRId64 " type:%d ranged:%d", TD_VID(tsdb->pVnode), ever, + type, (pTypedRanges != NULL && TARRAY2_SIZE(pTypedRanges) > 0)); } return code; } @@ -85,6 +130,8 @@ void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** reader) { TARRAY2_DESTROY(reader[0]->dataReaderArr, tsdbDataFileRAWReaderClose); tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr); + // missingFileHash is borrowed, not freed here + taosMemoryFree(reader[0]->missingFids); taosMemoryFree(reader[0]); reader[0] = NULL; return; @@ -94,12 +141,36 @@ static int32_t tsdbSnapRAWReadFileSetOpenReader(STsdbSnapRAWReader* reader) { int32_t code = 0; int32_t lino = 0; + // determine sync mode for this fid + int32_t curFid = reader->ctx->fset->fid; + bool fsetLevel = false; + if (reader->fidModeHash != NULL) { + uint8_t* pMode = taosHashGet(reader->fidModeHash, &curFid, sizeof(curFid)); + if (pMode != NULL && *pMode == TSDB_SNAP_SYNC_FSET_LEVEL) { + fsetLevel = true; + tsdbInfo("vgId:%d, RAW fid:%d using FSET_LEVEL sync (send all files)", TD_VID(reader->tsdb->pVnode), curFid); + } else { + tsdbInfo("vgId:%d, RAW fid:%d using FILE_LEVEL sync (send only missing files)", TD_VID(reader->tsdb->pVnode), + curFid); + } + } + // data for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) { if (reader->ctx->fset->farr[ftype] == NULL) { continue; } STFileObj* fobj = reader->ctx->fset->farr[ftype]; + // per-file filter: skip files not in missing set (only when FILE_LEVEL mode) + if (!fsetLevel && reader->missingFileHash != NULL) { + char key[TSDB_SNAP_FILE_KEY_LEN]; + tsdbSnapFileKeyMake(reader->ctx->fset->fid, ftype, 0, fobj->f->minVer, fobj->f->maxVer, key); + if (taosHashGet(reader->missingFileHash, key, sizeof(key)) == NULL) { + tsdbDebug("vgId:%d, RAW skip file fid:%d ftype:%d not in missing set", TD_VID(reader->tsdb->pVnode), + reader->ctx->fset->fid, ftype); + continue; + } + } SDataFileRAWReader* dataReader; SDataFileRAWReaderConfig config = { .tsdb = reader->tsdb, @@ -110,6 +181,8 @@ static int32_t tsdbSnapRAWReadFileSetOpenReader(STsdbSnapRAWReader* reader) { TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND(reader->dataReaderArr, dataReader); + tsdbInfo("vgId:%d, RAW include file non-stt fid:%d ftype:%d in missing set", TD_VID(reader->tsdb->pVnode), + reader->ctx->fset->fid, ftype); TSDB_CHECK_CODE(code, lino, _exit); } @@ -118,6 +191,17 @@ static int32_t tsdbSnapRAWReadFileSetOpenReader(STsdbSnapRAWReader* reader) { TARRAY2_FOREACH(reader->ctx->fset->lvlArr, lvl) { STFileObj* fobj; TARRAY2_FOREACH(lvl->fobjArr, fobj) { + // per-file filter: skip stt files not in missing set (only when FILE_LEVEL mode) + if (!fsetLevel && reader->missingSttHash != NULL) { + char sttKey[TSDB_SNAP_FILE_KEY_LEN]; + tsdbSnapFileKeyMake(reader->ctx->fset->fid, TSDB_FTYPE_STT, lvl->level, fobj->f->minVer, fobj->f->maxVer, + sttKey); + if (taosHashGet(reader->missingSttHash, sttKey, sizeof(sttKey)) == NULL) { + tsdbDebug("vgId:%d, RAW skip stt file fid:%d cid:%" PRId64 " not in missing set", + TD_VID(reader->tsdb->pVnode), reader->ctx->fset->fid, fobj->f->cid); + continue; + } + } SDataFileRAWReader* dataReader; SDataFileRAWReaderConfig config = { .tsdb = reader->tsdb, @@ -128,6 +212,8 @@ static int32_t tsdbSnapRAWReadFileSetOpenReader(STsdbSnapRAWReader* reader) { TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND(reader->dataReaderArr, dataReader); + tsdbInfo("vgId:%d, RAW include file stt fid:%d in missing set", TD_VID(reader->tsdb->pVnode), + reader->ctx->fset->fid); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -233,8 +319,19 @@ static int32_t tsdbSnapRAWReadBegin(STsdbSnapRAWReader* reader) { int32_t code = 0; int32_t lino = 0; - if (reader->ctx->fsetArrIdx < TARRAY2_SIZE(reader->fsetArr)) { + while (reader->ctx->fsetArrIdx < TARRAY2_SIZE(reader->fsetArr)) { reader->ctx->fset = TARRAY2_GET(reader->fsetArr, reader->ctx->fsetArrIdx++); + + // skip fids not in missing-fid filter + if (reader->missingFids != NULL && + !tsdbFidInMissingSet(reader->ctx->fset->fid, reader->missingFids, reader->missingFidCount)) { + tsdbDebug("vgId:%d, skip fid:%d not in missing-fid set", TD_VID(reader->tsdb->pVnode), reader->ctx->fset->fid); + reader->ctx->fset = NULL; + continue; + } + tsdbInfo("vgId:%d, RAW include fset fid:%d in missing-fid set", TD_VID(reader->tsdb->pVnode), + reader->ctx->fset->fid); + reader->ctx->isDataDone = false; code = tsdbSnapRAWReadFileSetOpenReader(reader); @@ -242,6 +339,8 @@ static int32_t tsdbSnapRAWReadBegin(STsdbSnapRAWReader* reader) { code = tsdbSnapRAWReadFileSetOpenIter(reader); TSDB_CHECK_CODE(code, lino, _exit); + + return code; } _exit: diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index feec2404b9c6..0d7646a1d380 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -53,6 +53,12 @@ struct SVSnapReader { // tsdb raw int8_t tsdbRAWDone; STsdbSnapRAWReader *pTsdbRAWReader; + // missing file filter + SHashObj *missingFileHash; // key=(fid,ftype,level,minVer,maxVer), val=dummy — for RAW mode per-file filtering + SHashObj *fidModeHash; // key=fid, val=uint8_t mode (FILE_LEVEL or FSET_LEVEL) + SHashObj *missingSttHash; // key=cid, val=dummy — for RAW mode per-STT filtering + int32_t *missingFids; // FID set extracted from file names — for Normal mode FID filtering + int32_t missingFidCount; // tq int8_t tqHandleDone; @@ -89,6 +95,11 @@ static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, } } +static int32_t vnodeExtractMissingFids(SVSnapReader *pReader) { + return tsdbExtractMissingFids(pReader->pVnode->pTsdb, pReader->missingFileHash, &pReader->missingFids, + &pReader->missingFidCount); +} + static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) { int32_t code = 0; SVnode *pVnode = pReader->pVnode; @@ -135,16 +146,48 @@ static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotP goto _out; } } break; + case SNAP_DATA_MISSING_FIDS: { + void *missingFiles = NULL; + int32_t missingFileCount = 0; + code = tDeserializeMissingFileList(buf, bufLen, &missingFiles, &missingFileCount, &pReader->missingFileHash, + &pReader->missingSttHash, TD_VID(pVnode)); + if (code) { + vError("vgId:%d, failed to deserialize missing file list since %s", TD_VID(pVnode), tstrerror(code)); + goto _out; + } + vInfo("vgId:%d, received %d file infos from follower, missing file:%d, missing stt file:%d", TD_VID(pVnode), + missingFileCount, (pReader->missingFileHash ? (int32_t)taosHashGetSize(pReader->missingFileHash) : 0), + (pReader->missingSttHash ? (int32_t)taosHashGetSize(pReader->missingSttHash) : 0)); + // determine sync mode per fid + if (missingFiles && missingFileCount > 0) { + code = tsdbDetermineFidSyncMode(pVnode->pTsdb, missingFiles, missingFileCount, &pReader->fidModeHash); + if (code) { + taosMemoryFree(missingFiles); + vError("vgId:%d, failed to determine fid sync mode since %s", TD_VID(pVnode), tstrerror(code)); + goto _out; + } + } + taosMemoryFree(missingFiles); + } break; default: - vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ); - code = TSDB_CODE_INVALID_DATA_FMT; - goto _out; + vWarn("vgId:%d, unknown subfield type in snap info, skipping. typ:%d", TD_VID(pVnode), subField->typ); + break; + } + } + + // extract FID set from missing file hash for normal mode filtering + if (pReader->missingFileHash && taosHashGetSize(pReader->missingFileHash) > 0) { + code = vnodeExtractMissingFids(pReader); + if (code) { + vError("vgId:%d, failed to extract missing fids from file hash since %s", TD_VID(pVnode), tstrerror(code)); + goto _out; } } // toggle snap replication mode - vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format); - if (pReader->sver == 0 && tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) { + vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d, sver:%" PRId64, TD_VID(pVnode), tsdbOpts.format, + pReader->sver); + if (tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) { pReader->tsdbDone = true; } else { pReader->tsdbRAWDone = true; @@ -179,7 +222,9 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader // open tsdb snapshot raw reader if (!pReader->tsdbRAWDone) { - code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader); + code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, pReader->pRanges, pReader->missingFileHash, + pReader->fidModeHash, pReader->missingSttHash, pReader->missingFids, + pReader->missingFidCount, &pReader->pTsdbRAWReader); if (code) goto _exit; } @@ -247,6 +292,16 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) { tqSnapReaderClose(&pReader->pTqCheckInfoReader); } #endif + if (pReader->missingFileHash) { + taosHashCleanup(pReader->missingFileHash); + } + if (pReader->fidModeHash) { + taosHashCleanup(pReader->fidModeHash); + } + if (pReader->missingSttHash) { + taosHashCleanup(pReader->missingSttHash); + } + taosMemoryFree(pReader->missingFids); taosMemoryFree(pReader); } @@ -332,7 +387,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) // open if not if (pReader->pTsdbReader == NULL) { code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pRanges, - &pReader->pTsdbReader); + pReader->missingFids, pReader->missingFidCount, &pReader->pTsdbReader); TSDB_CHECK_CODE(code, lino, _exit); } @@ -349,7 +404,9 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (!pReader->tsdbRAWDone) { // open if not if (pReader->pTsdbRAWReader == NULL) { - code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader); + code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, pReader->pRanges, + pReader->missingFileHash, pReader->fidModeHash, pReader->missingSttHash, + pReader->missingFids, pReader->missingFidCount, &pReader->pTsdbRAWReader); TSDB_CHECK_CODE(code, lino, _exit); } @@ -578,10 +635,12 @@ static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotP code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts); TSDB_CHECK_CODE(code, lino, _exit); } break; + case SNAP_DATA_MISSING_FIDS: { + vInfo("vgId:%d, snap writer received missing fids subfield, skipping (handled by reader)", TD_VID(pVnode)); + } break; default: - vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ); - TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit); - goto _exit; + vWarn("vgId:%d, unknown subfield type in snap info, skipping. typ:%d", TD_VID(pVnode), subField->typ); + break; } }