Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions source/dnode/vnode/src/inc/tsdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ extern "C" {
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSD INFO ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSD DEBUG ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD TRACE ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)

#define tsdbInfoL(...) { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLongString("TSD INFO ", DEBUG_INFO, tsLogEmbedded ? 255 : tsdbDebugFlag, __VA_ARGS__); }}
// clang-format on

typedef struct TSDBROW TSDBROW;
Expand Down Expand Up @@ -727,6 +729,31 @@ typedef struct STsdbRepOpts {
int32_t tSerializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo);
int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo);

int32_t tMissingFileListDataLenCalc(int32_t fileCount);
int32_t tDeserializeMissingFileList(void *buf, int32_t bufLen, void **ppFiles, int32_t *pFileCount, SHashObj **ppHash,
SHashObj **ppSttHash, int32_t vgId);
int32_t tsdbExtractMissingFids(STsdb *pTsdb, SHashObj *missingFileHash, int32_t **ppFids, int32_t *pFidCount);
int32_t tsdbDetermineFidSyncMode(STsdb *pTsdb, const void *files, int32_t fileCount, SHashObj **ppFidModeHash);

#define TSDB_SNAP_SYNC_FILE_LEVEL 0
#define TSDB_SNAP_SYNC_FSET_LEVEL 1

// snap file 5-tuple key for version-range matching: (fid, ftype, level, minVer, maxVer)
#define TSDB_SNAP_FILE_KEY_LEN (sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int64_t) + sizeof(int64_t))
static inline void tsdbSnapFileKeyMake(int32_t fid, int32_t ftype, int32_t level, int64_t minVer, int64_t maxVer,
char key[TSDB_SNAP_FILE_KEY_LEN]) {
int32_t offset = 0;
memcpy(key + offset, &fid, sizeof(fid));
offset += sizeof(fid);
memcpy(key + offset, &ftype, sizeof(ftype));
offset += sizeof(ftype);
memcpy(key + offset, &level, sizeof(level));
offset += sizeof(level);
memcpy(key + offset, &minVer, sizeof(minVer));
offset += sizeof(minVer);
memcpy(key + offset, &maxVer, sizeof(maxVer));
}

// snap read
struct STsdbReadSnap {
SMemTable *pMem;
Expand Down
7 changes: 5 additions & 2 deletions source/dnode/vnode/src/inc/vnodeInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
// STsdbSnapReader ========================================
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
STsdbSnapReader** ppReader);
const int32_t* missingFids, int32_t missingFidCount, STsdbSnapReader** ppReader);
void tsdbSnapReaderClose(STsdbSnapReader** ppReader);
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
// STsdbSnapWriter ========================================
Expand All @@ -342,7 +342,9 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr);
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter, bool rollback);
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
// STsdbSnapRAWReader ========================================
int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader);
int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, void* pRanges, SHashObj* missingFileHash,
SHashObj* fidModeHash, SHashObj* missingSttHash, const int32_t* missingFids,
int32_t missingFidCount, STsdbSnapRAWReader** ppReader);
void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader);
int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData);
// STsdbSnapRAWWriter ========================================
Expand Down Expand Up @@ -585,6 +587,7 @@ enum {
SNAP_DATA_STREAM_STATE_BACKEND = 12,
SNAP_DATA_TQ_CHECKINFO = 13,
SNAP_DATA_RAW = 14,
SNAP_DATA_MISSING_FIDS = 16,
};

struct SSnapDataHdr {
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/sma/smaSnapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, (i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2), NULL,
&pReader->pDataReader[i]);
NULL, 0, &pReader->pDataReader[i]);
TAOS_CHECK_GOTO(code, &lino, _exit);
}
}
Expand Down
74 changes: 73 additions & 1 deletion source/dnode/vnode/src/tsdb/tsdbFS2.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
code = load_json(fname, &json);
TSDB_CHECK_CODE(code, lino, _exit);

char *jsonStr = cJSON_Print(json);
if (jsonStr != NULL) {
tsdbInfoL("vgId:%d, load_fs file:%s content:%s", TD_VID(pTsdb->pVnode), fname, jsonStr);
taosMemoryFree(jsonStr);
}

// parse json
const cJSON *item1;

Expand Down Expand Up @@ -1086,6 +1092,52 @@ static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) {
return pHash;
}

int32_t tsdbFSCreateRefSnapshotWithRanges(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr) {
int32_t code = 0;
STFileSet *fset, *fset1;
SHashObj *pHash = NULL;

fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0]));
if (fsetArr[0] == NULL) return terrno;

pHash = tsdbFSetRangeArrayToHash(pRanges);
if (pHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _out;
}

(void)taosThreadMutexLock(&fs->tsdb->mutex);
TARRAY2_FOREACH(fs->fSetArr, fset) {
int32_t fid = fset->fid;
if (taosHashGet(pHash, &fid, sizeof(fid)) == NULL) {
tsdbDebug("skip fid:%d, not in ranges", fid);
continue;
}

code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1);
if (code) break;

code = TARRAY2_APPEND(fsetArr[0], fset1);
if (code) {
tsdbTFileSetClear(&fset1);
break;
}
}
(void)taosThreadMutexUnlock(&fs->tsdb->mutex);

if (code) {
TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
taosMemoryFree(fsetArr[0]);
fsetArr[0] = NULL;
}
Comment on lines +1117 to +1132

_out:
if (pHash) {
taosHashCleanup(pHash);
}
return code;
}

int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr,
TFileOpArray *fopArr) {
int32_t code = 0;
Expand Down Expand Up @@ -1139,7 +1191,21 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRa

void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); }

static bool tsdbFSFidInMissingSet(int32_t fid, const int32_t *missingFids, int32_t missingFidCount) {
int32_t lo = 0, hi = missingFidCount - 1;
while (lo <= hi) {
int32_t mid = lo + (hi - lo) / 2;
if (missingFids[mid] == fid) return true;
if (missingFids[mid] < fid)
lo = mid + 1;
else
hi = mid - 1;
}
return false;
}

int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
const int32_t *missingFids, int32_t missingFidCount,
TFileSetRangeArray **fsrArr) {
int32_t code = 0;
STFileSet *fset;
Expand All @@ -1166,6 +1232,12 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
int64_t sver1 = sver;
int64_t ever1 = ever;

// skip fids not in missing-fid filter
if (missingFids != NULL && !tsdbFSFidInMissingSet(fset->fid, missingFids, missingFidCount)) {
tsdbDebug("vgId:%d, skip fid:%d not in missing-fid set", TD_VID(fs->tsdb->pVnode), fset->fid);
continue;
}

if (pHash) {
int32_t fid = fset->fid;
STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
Expand All @@ -1180,7 +1252,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
continue;
}

tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);

code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
if (code) break;
Expand Down
3 changes: 2 additions & 1 deletion source/dnode/vnode/src/tsdb/tsdbFS2.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr);
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr);
int32_t tsdbFSCreateRefSnapshotWithRanges(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr);
void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr);

int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pExclude, TFileSetArray **fsetArr,
TFileOpArray *fopArr);
void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr);
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
TFileSetRangeArray **fsrArr);
const int32_t *missingFids, int32_t missingFidCount, TFileSetRangeArray **fsrArr);
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr);
// txn
int64_t tsdbFSAllocEid(STFileSystem *fs);
Expand Down
Loading
Loading