diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json
new file mode 100644
index 00000000..8be2bc62
--- /dev/null
+++ b/.vscode/c_cpp_properties.json
@@ -0,0 +1,21 @@
+{
+ "configurations": [
+ {
+ "name": "Linux",
+ "includePath": [
+ "${workspaceFolder}/**",
+ "${workspaceFolder}/AnnService/inc",
+ "${workspaceFolder}/Test/inc",
+ "${workspaceFolder}/ThirdParty/**",
+ "/usr/include/**"
+ ],
+ "defines": [],
+ "compilerPath": "/usr/bin/g++",
+ "cStandard": "c17",
+ "cppStandard": "c++17",
+ "intelliSenseMode": "gcc-x64",
+ "compileCommands": "${workspaceFolder}/build/compile_commands.json"
+ }
+ ],
+ "version": 4
+}
\ No newline at end of file
diff --git a/.vscode/launch.json b/.vscode/launch.json
new file mode 100644
index 00000000..bed3879a
--- /dev/null
+++ b/.vscode/launch.json
@@ -0,0 +1,219 @@
+{
+ "version": "2.0.0",
+ "configurations": [
+ {
+ "name": "(gdb) Launch SPFresh Test",
+ "type": "cppdbg",
+ "request": "launch",
+ "preLaunchTask": "Build Debug",
+ "stopAtEntry": false,
+ "program": "${workspaceFolder}/Debug/SPTAGTest",
+ "args": [ "--run_test=SPFreshTest/CacheTest" ],
+ "cwd": "${workspaceFolder}/Debug",
+ "environment": [
+ {
+ "name": "UDF_RUNTIME_DIR",
+ "value": "${workspaceFolder}"
+ },
+ {
+ "name": "STARROCKS_HOME",
+ "value": "${workspaceFolder}"
+ },
+ {
+ "name": "PYTHON_INSTALL_DIR",
+ "value": "${workspaceFolder}/thirdparty/installed/python"
+ },
+ {
+ "name": "LD_LIBRARY_PATH",
+ "value": "/usr/lib/jvm/java-1.11.0-openjdk-amd64/lib/server:${workspaceFolder}/Debug"
+ },
+ {
+ "name": "LD_PRELOAD",
+ "value": "/usr/lib/gcc/x86_64-linux-gnu/11/libasan.so"
+ },
+ {
+ "name": "PCI_ALLOWED",
+ "value": "1462:00:00.0"
+ },
+ {
+ "name": "SPFRESH_SPDK_USE_SSD_IMPL",
+ "value": "1"
+ },
+ {
+ "name": "SPFRESH_SPDK_CONF",
+ "value": "./bdev.json"
+ },
+ {
+ "name": "SPFRESH_SPDK_BDEV",
+ "value": "Nvme0n1"
+ },
+ {
+ "name": "BENCHMARK_CONFIG",
+ "value": "benchmark.ini"
+ }
+ ],
+ "externalConsole": false,
+ "MIMode": "gdb",
+ "miDebuggerPath": "/usr/bin/gdb",
+ "additionalSOLibSearchPath": "${workspaceFolder}/Debug",
+ "setupCommands": [
+ {
+ "description": "Enable pretty-printing for gdb",
+ "text": "-enable-pretty-printing",
+ "ignoreFailures": true
+ },
+ {
+ "description": "Set Disassembly Flavor to Intel",
+ "text": "-gdb-set disassembly-flavor intel",
+ "ignoreFailures": true
+ },
+ {
+ "description": "Skip standard library files",
+ "text": "-gdb-set skip-solib-deps on",
+ "ignoreFailures": true
+ }
+ ]
+ },
+ {
+ "name": "(gdb) Launch All Tests",
+ "type": "cppdbg",
+ "request": "launch",
+ "stopAtEntry": false,
+ "program": "${workspaceFolder}/Debug/SPTAGTest",
+ "args": [ "--run_test=SPFreshTest" ],
+ "cwd": "${workspaceFolder}/Debug",
+ "environment": [
+ {
+ "name": "UDF_RUNTIME_DIR",
+ "value": "${workspaceFolder}"
+ },
+ {
+ "name": "STARROCKS_HOME",
+ "value": "${workspaceFolder}"
+ },
+ {
+ "name": "PYTHON_INSTALL_DIR",
+ "value": "${workspaceFolder}/thirdparty/installed/python"
+ },
+ {
+ "name": "LD_LIBRARY_PATH",
+ "value": "/usr/lib/jvm/java-1.11.0-openjdk-amd64/lib/server:${workspaceFolder}/Debug"
+ },
+ {
+ "name": "LD_PRELOAD",
+ "value": "/usr/lib/gcc/x86_64-linux-gnu/11/libasan.so"
+ },
+ {
+ "name": "PCI_ALLOWED",
+ "value": "1462:00:00.0"
+ },
+ {
+ "name": "SPFRESH_SPDK_USE_SSD_IMPL",
+ "value": "1"
+ },
+ {
+ "name": "SPFRESH_SPDK_CONF",
+ "value": "./bdev.json"
+ },
+ {
+ "name": "SPFRESH_SPDK_BDEV",
+ "value": "Nvme0n1"
+ }
+ ],
+ "externalConsole": false,
+ "MIMode": "gdb",
+ "miDebuggerPath": "/usr/bin/gdb",
+ "additionalSOLibSearchPath": "${workspaceFolder}/Debug",
+ "setupCommands": [
+ {
+ "description": "Enable pretty-printing for gdb",
+ "text": "-enable-pretty-printing",
+ "ignoreFailures": true
+ },
+ {
+ "description": "Set Disassembly Flavor to Intel",
+ "text": "-gdb-set disassembly-flavor intel",
+ "ignoreFailures": true
+ },
+ {
+ "description": "Skip standard library files",
+ "text": "-gdb-set skip-solib-deps on",
+ "ignoreFailures": true
+ }
+ ]
+ },
+ {
+ "name": "(gdb) Launch SPFresh Benchmark",
+ "type": "cppdbg",
+ "request": "launch",
+ "preLaunchTask": "Build Debug",
+ "stopAtEntry": false,
+ "program": "${workspaceFolder}/Debug/SPTAGTest",
+ "args": [ "--run_test=SPFreshTest/BenchmarkFromConfig" ],
+ "cwd": "${workspaceFolder}/Debug",
+ "environment": [
+ {
+ "name": "UDF_RUNTIME_DIR",
+ "value": "${workspaceFolder}"
+ },
+ {
+ "name": "STARROCKS_HOME",
+ "value": "${workspaceFolder}"
+ },
+ {
+ "name": "PYTHON_INSTALL_DIR",
+ "value": "${workspaceFolder}/thirdparty/installed/python"
+ },
+ {
+ "name": "LD_LIBRARY_PATH",
+ "value": "/usr/lib/jvm/java-1.11.0-openjdk-amd64/lib/server:${workspaceFolder}/Debug"
+ },
+ {
+ "name": "LD_PRELOAD",
+ "value": "/usr/lib/gcc/x86_64-linux-gnu/11/libasan.so"
+ },
+ {
+ "name": "PCI_ALLOWED",
+ "value": "1462:00:00.0"
+ },
+ {
+ "name": "SPFRESH_SPDK_USE_SSD_IMPL",
+ "value": "1"
+ },
+ {
+ "name": "SPFRESH_SPDK_CONF",
+ "value": "./bdev.json"
+ },
+ {
+ "name": "SPFRESH_SPDK_BDEV",
+ "value": "Nvme0n1"
+ },
+ {
+ "name": "BENCHMARK_CONFIG",
+ "value": "/home/superbench/SPTAG/Debug/benchmark.ini"
+ }
+ ],
+ "externalConsole": false,
+ "MIMode": "gdb",
+ "miDebuggerPath": "/usr/bin/gdb",
+ "additionalSOLibSearchPath": "${workspaceFolder}/Debug",
+ "setupCommands": [
+ {
+ "description": "Enable pretty-printing for gdb",
+ "text": "-enable-pretty-printing",
+ "ignoreFailures": true
+ },
+ {
+ "description": "Set Disassembly Flavor to Intel",
+ "text": "-gdb-set disassembly-flavor intel",
+ "ignoreFailures": true
+ },
+ {
+ "description": "Skip standard library files",
+ "text": "-gdb-set skip-solib-deps on",
+ "ignoreFailures": true
+ }
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 00000000..42f8caeb
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,103 @@
+{
+ "debug.allowBreakpointsEverywhere": true,
+ "debug.disassemblyView.showSourceCode": true,
+ "C_Cpp.debugging.engineLogging": false,
+ "C_Cpp.loggingLevel": "Information",
+ "files.associations": {
+ "array": "cpp",
+ "bitset": "cpp",
+ "string_view": "cpp",
+ "initializer_list": "cpp",
+ "ranges": "cpp",
+ "span": "cpp",
+ "regex": "cpp",
+ "utility": "cpp",
+ "valarray": "cpp",
+ "barrier": "cpp",
+ "__hash_table": "cpp",
+ "__split_buffer": "cpp",
+ "__tree": "cpp",
+ "deque": "cpp",
+ "iterator": "cpp",
+ "list": "cpp",
+ "map": "cpp",
+ "queue": "cpp",
+ "random": "cpp",
+ "set": "cpp",
+ "stack": "cpp",
+ "string": "cpp",
+ "unordered_map": "cpp",
+ "unordered_set": "cpp",
+ "vector": "cpp",
+ "atomic": "cpp",
+ "cctype": "cpp",
+ "clocale": "cpp",
+ "cmath": "cpp",
+ "csetjmp": "cpp",
+ "csignal": "cpp",
+ "cstdarg": "cpp",
+ "cstddef": "cpp",
+ "cstdio": "cpp",
+ "cstdlib": "cpp",
+ "cstring": "cpp",
+ "ctime": "cpp",
+ "cwchar": "cpp",
+ "cwctype": "cpp",
+ "any": "cpp",
+ "strstream": "cpp",
+ "bit": "cpp",
+ "cfenv": "cpp",
+ "charconv": "cpp",
+ "chrono": "cpp",
+ "cinttypes": "cpp",
+ "codecvt": "cpp",
+ "compare": "cpp",
+ "complex": "cpp",
+ "concepts": "cpp",
+ "condition_variable": "cpp",
+ "coroutine": "cpp",
+ "cstdint": "cpp",
+ "cuchar": "cpp",
+ "forward_list": "cpp",
+ "exception": "cpp",
+ "algorithm": "cpp",
+ "functional": "cpp",
+ "memory": "cpp",
+ "memory_resource": "cpp",
+ "numeric": "cpp",
+ "optional": "cpp",
+ "ratio": "cpp",
+ "source_location": "cpp",
+ "system_error": "cpp",
+ "tuple": "cpp",
+ "type_traits": "cpp",
+ "fstream": "cpp",
+ "future": "cpp",
+ "iomanip": "cpp",
+ "iosfwd": "cpp",
+ "iostream": "cpp",
+ "istream": "cpp",
+ "latch": "cpp",
+ "limits": "cpp",
+ "mutex": "cpp",
+ "new": "cpp",
+ "numbers": "cpp",
+ "ostream": "cpp",
+ "scoped_allocator": "cpp",
+ "semaphore": "cpp",
+ "shared_mutex": "cpp",
+ "sstream": "cpp",
+ "stdexcept": "cpp",
+ "stop_token": "cpp",
+ "streambuf": "cpp",
+ "syncstream": "cpp",
+ "thread": "cpp",
+ "typeindex": "cpp",
+ "typeinfo": "cpp",
+ "variant": "cpp",
+ "expected": "cpp",
+ "spanstream": "cpp",
+ "stacktrace": "cpp",
+ "__nullptr": "cpp"
+ }
+}
\ No newline at end of file
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
new file mode 100644
index 00000000..cbd8b4c7
--- /dev/null
+++ b/.vscode/tasks.json
@@ -0,0 +1,27 @@
+{
+ "version": "2.0.0",
+ "tasks": [
+ {
+ "label": "Build Debug",
+ "type": "shell",
+ "command": "make",
+ "args": ["-j4"],
+ "group": {
+ "kind": "build",
+ "isDefault": true
+ },
+ "options": {
+ "cwd": "${workspaceFolder}/build"
+ },
+ "problemMatcher": ["$gcc"],
+ "presentation": {
+ "echo": true,
+ "reveal": "always",
+ "focus": false,
+ "panel": "shared",
+ "showReuseMessage": true,
+ "clear": false
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/AnnService/Aggregator.vcxproj b/AnnService/Aggregator.vcxproj
index a969443c..6a01565d 100644
--- a/AnnService/Aggregator.vcxproj
+++ b/AnnService/Aggregator.vcxproj
@@ -29,13 +29,13 @@
Application
true
- v142
+ v143
MultiByte
Application
false
- v142
+ v143
true
MultiByte
diff --git a/AnnService/BalancedDataPartition.vcxproj b/AnnService/BalancedDataPartition.vcxproj
index f4a71265..bcf75997 100644
--- a/AnnService/BalancedDataPartition.vcxproj
+++ b/AnnService/BalancedDataPartition.vcxproj
@@ -29,13 +29,13 @@
Application
true
- v142
+ v143
MultiByte
Application
false
- v142
+ v143
true
MultiByte
diff --git a/AnnService/Client.vcxproj b/AnnService/Client.vcxproj
index fdf22990..0802f05d 100644
--- a/AnnService/Client.vcxproj
+++ b/AnnService/Client.vcxproj
@@ -29,13 +29,13 @@
Application
true
- v142
+ v143
MultiByte
Application
false
- v142
+ v143
true
MultiByte
diff --git a/AnnService/CoreLibrary.vcxproj b/AnnService/CoreLibrary.vcxproj
index 721d74d4..5a66222f 100644
--- a/AnnService/CoreLibrary.vcxproj
+++ b/AnnService/CoreLibrary.vcxproj
@@ -94,7 +94,7 @@
-
+
diff --git a/AnnService/CoreLibrary.vcxproj.filters b/AnnService/CoreLibrary.vcxproj.filters
index 3f7b3fa1..63b5e799 100644
--- a/AnnService/CoreLibrary.vcxproj.filters
+++ b/AnnService/CoreLibrary.vcxproj.filters
@@ -160,7 +160,7 @@
Header Files\Helper
-
+
Header Files\Core\Common
diff --git a/AnnService/IndexBuilder.vcxproj b/AnnService/IndexBuilder.vcxproj
index deb392b0..0648b8e8 100644
--- a/AnnService/IndexBuilder.vcxproj
+++ b/AnnService/IndexBuilder.vcxproj
@@ -29,13 +29,13 @@
Application
true
- v142
+ v143
MultiByte
Application
false
- v142
+ v143
true
MultiByte
diff --git a/AnnService/IndexSearcher.vcxproj b/AnnService/IndexSearcher.vcxproj
index 77b43c57..459e9ebc 100644
--- a/AnnService/IndexSearcher.vcxproj
+++ b/AnnService/IndexSearcher.vcxproj
@@ -30,13 +30,13 @@
Application
true
- v142
+ v143
MultiByte
Application
false
- v142
+ v143
true
MultiByte
diff --git a/AnnService/SSDServing.vcxproj b/AnnService/SSDServing.vcxproj
index da8261b0..d93a42fc 100644
--- a/AnnService/SSDServing.vcxproj
+++ b/AnnService/SSDServing.vcxproj
@@ -40,13 +40,13 @@
Application
true
- v142
+ v143
MultiByte
Application
false
- v142
+ v143
true
MultiByte
diff --git a/AnnService/Server.vcxproj b/AnnService/Server.vcxproj
index fefafe8b..abcaa946 100644
--- a/AnnService/Server.vcxproj
+++ b/AnnService/Server.vcxproj
@@ -29,13 +29,13 @@
Application
true
- v142
+ v143
MultiByte
Application
false
- v142
+ v143
true
MultiByte
diff --git a/AnnService/SocketLib.vcxproj b/AnnService/SocketLib.vcxproj
index 4a28ae73..95b3b44e 100644
--- a/AnnService/SocketLib.vcxproj
+++ b/AnnService/SocketLib.vcxproj
@@ -28,13 +28,13 @@
StaticLibrary
true
- v142
+ v143
MultiByte
StaticLibrary
false
- v142
+ v143
true
MultiByte
diff --git a/AnnService/inc/Core/BKT/Index.h b/AnnService/inc/Core/BKT/Index.h
index dd9734be..8d5c41cc 100644
--- a/AnnService/inc/Core/BKT/Index.h
+++ b/AnnService/inc/Core/BKT/Index.h
@@ -15,7 +15,7 @@
#include "inc/Core/Common/WorkSpacePool.h"
#include "inc/Core/Common/RelativeNeighborhoodGraph.h"
#include "inc/Core/Common/BKTree.h"
-#include "inc/Core/Common/Labelset.h"
+#include "inc/Core/Common/LabelSet.h"
#include "inc/Helper/SimpleIniReader.h"
#include "inc/Helper/StringConvert.h"
#include "inc/Helper/ThreadPool.h"
@@ -94,7 +94,7 @@ namespace SPTAG
float m_fDeletePercentageForRefine;
std::mutex m_dataAddLock; // protect data and graph
std::shared_timed_mutex m_dataDeleteLock;
- COMMON::Labelset m_deletedID;
+ COMMON::LabelSet m_deletedID;
Helper::ThreadPool m_threadPool;
int m_iNumberOfThreads;
@@ -230,12 +230,12 @@ namespace SPTAG
int SearchIndexIterative(COMMON::QueryResultSet& p_query, COMMON::WorkSpace& p_space, bool p_isFirst, int batch, bool p_searchDeleted, bool p_searchDuplicated) const;
- template &, SizeType, float), bool (*checkFilter)(const std::shared_ptr &, SizeType, std::function)>
+ template &, SizeType, float), bool (*checkFilter)(const std::shared_ptr &, SizeType, std::function)>
int SearchIterative(COMMON::QueryResultSet& p_query,
COMMON::WorkSpace& p_space, bool p_isFirst, int batch) const;
void SearchIndex(COMMON::QueryResultSet &p_query, COMMON::WorkSpace &p_space, bool p_searchDeleted, bool p_searchDuplicated, std::function filterFunc = nullptr) const;
- template &, SizeType, float), bool(*checkFilter)(const std::shared_ptr&, SizeType, std::function)>
+ template &, SizeType, float), bool(*checkFilter)(const std::shared_ptr&, SizeType, std::function)>
void Search(COMMON::QueryResultSet& p_query, COMMON::WorkSpace& p_space, std::function filterFunc) const;
};
} // namespace BKT
diff --git a/AnnService/inc/Core/Common/CommonUtils.h b/AnnService/inc/Core/Common/CommonUtils.h
index 905de2cd..697eb521 100644
--- a/AnnService/inc/Core/Common/CommonUtils.h
+++ b/AnnService/inc/Core/Common/CommonUtils.h
@@ -100,6 +100,30 @@ namespace SPTAG
}
}
}
+
+ static void PrintPostingDiff(std::string &p1, std::string &p2, const char *pos)
+ {
+ if (p1.size() != p2.size())
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error,
+ "Merge %s: p1 and p2 have different sizes: before=%u after=%u\n", pos, p1.size(),
+ p2.size());
+ return;
+ }
+ std::string diff = "";
+ for (size_t i = 0; i < p1.size(); i++)
+ {
+ if (p1[i] != p2[i])
+ {
+ diff += "[" + std::to_string(i) + "]:" + std::to_string(int(p1[i])) + "^" +
+ std::to_string(int(p2[i])) + " ";
+ }
+ }
+ if (diff.size() != 0)
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "%s: %s\n", pos, diff.substr(0, 1000).c_str());
+ }
+ }
};
}
}
diff --git a/AnnService/inc/Core/Common/Dataset.h b/AnnService/inc/Core/Common/Dataset.h
index a5abaf74..dcdebbb0 100644
--- a/AnnService/inc/Core/Common/Dataset.h
+++ b/AnnService/inc/Core/Common/Dataset.h
@@ -260,11 +260,17 @@ namespace SPTAG
{
if (data != nullptr) {
if (ownData) ALIGN_FREE(data);
- for (char* ptr : *incBlocks) ALIGN_FREE(ptr);
- incBlocks->clear();
}
-
rows = rows_;
+
+ if (incBlocks != nullptr)
+ {
+ for (char *ptr : *incBlocks)
+ ALIGN_FREE(ptr);
+ incBlocks->clear();
+ }
+ incRows = 0;
+
if (rowEnd_ >= colStart_) cols = rowEnd_;
else cols = cols_ * sizeof(T);
data = (char*)data_;
@@ -390,11 +396,7 @@ namespace SPTAG
IOBINARY(pInput, ReadBinary, sizeof(SizeType), (char*)&(r));
IOBINARY(pInput, ReadBinary, sizeof(DimensionType), (char*)(&c));
- if (data == nullptr) Initialize(r, c, blockSize, capacity);
- else if (r > rows + incRows) {
- SPTAGLIB_LOG(Helper::LogLevel::LL_Warning, "Read more data (%d, %d) than before (%d, %d)!\n", r, c, rows + incRows, mycols);
- if(AddBatch(r - rows - incRows) != ErrorCode::Success) return ErrorCode::MemoryOverFlow;
- }
+ if (data == nullptr || r != rows + incRows) Initialize(r, c, blockSize, capacity);
for (SizeType i = 0; i < r; i++) {
IOBINARY(pInput, ReadBinary, sizeof(T) * mycols, (char*)At(i));
diff --git a/AnnService/inc/Core/Common/Labelset.h b/AnnService/inc/Core/Common/Labelset.h
index c34f563d..65b5ba7f 100644
--- a/AnnService/inc/Core/Common/Labelset.h
+++ b/AnnService/inc/Core/Common/Labelset.h
@@ -11,7 +11,7 @@ namespace SPTAG
{
namespace COMMON
{
- class Labelset
+ class LabelSet
{
public:
enum class InvalidIDBehavior
@@ -23,10 +23,10 @@ namespace SPTAG
private:
std::atomic m_inserted;
Dataset m_data;
- InvalidIDBehavior m_invalidIDBehaviorSetting;
+ InvalidIDBehavior m_invalidIDBehaviorSetting{};
public:
- Labelset()
+ LabelSet()
{
m_inserted = 0;
m_data.SetName("DeleteID");
diff --git a/AnnService/inc/Core/KDT/Index.h b/AnnService/inc/Core/KDT/Index.h
index 9d11dac4..064a7ac1 100644
--- a/AnnService/inc/Core/KDT/Index.h
+++ b/AnnService/inc/Core/KDT/Index.h
@@ -15,7 +15,7 @@
#include "inc/Core/Common/WorkSpacePool.h"
#include "inc/Core/Common/RelativeNeighborhoodGraph.h"
#include "inc/Core/Common/KDTree.h"
-#include "inc/Core/Common/Labelset.h"
+#include "inc/Core/Common/LabelSet.h"
#include "inc/Helper/SimpleIniReader.h"
#include "inc/Helper/StringConvert.h"
#include "inc/Helper/ThreadPool.h"
@@ -71,7 +71,7 @@ namespace SPTAG
float m_fDeletePercentageForRefine;
std::mutex m_dataAddLock; // protect data and graph
std::shared_timed_mutex m_dataDeleteLock;
- COMMON::Labelset m_deletedID;
+ COMMON::LabelSet m_deletedID;
Helper::ThreadPool m_threadPool;
int m_iNumberOfThreads;
@@ -204,7 +204,7 @@ namespace SPTAG
private:
template
void SearchIndex(COMMON::QueryResultSet &p_query, COMMON::WorkSpace &p_space, bool p_searchDeleted) const;
- template
+ template
void Search(COMMON::QueryResultSet& p_query, COMMON::WorkSpace& p_space) const;
};
} // namespace KDT
diff --git a/AnnService/inc/Core/MetadataSet.h b/AnnService/inc/Core/MetadataSet.h
index d16c037f..0325bda3 100644
--- a/AnnService/inc/Core/MetadataSet.h
+++ b/AnnService/inc/Core/MetadataSet.h
@@ -93,7 +93,7 @@ class MemMetadataSet : public MetadataSet
std::uint64_t p_blockSize, std::uint64_t p_capacity, std::uint64_t p_metaSize);
MemMetadataSet(const std::string& p_metafile, const std::string& p_metaindexfile,
- std::uint64_t p_blockSize, std::uint64_t p_capacity, std::uint64_t p_metaSize);
+ std::uint64_t p_blockSize, std::uint64_t p_capacity, std::uint64_t p_metaSize, int start = 0, int count = -1);
MemMetadataSet(std::shared_ptr p_metain, std::shared_ptr p_metaindexin,
std::uint64_t p_blockSize, std::uint64_t p_capacity, std::uint64_t p_metaSize);
@@ -118,7 +118,7 @@ class MemMetadataSet : public MetadataSet
private:
ErrorCode Init(std::shared_ptr p_metain, std::shared_ptr p_metaindexin,
- std::uint64_t p_blockSize, std::uint64_t p_capacity, std::uint64_t p_metaSize);
+ std::uint64_t p_blockSize, std::uint64_t p_capacity, std::uint64_t p_metaSize, int start = 0, int count = -1);
std::shared_ptr m_lock;
diff --git a/AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h b/AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h
index d5b88a8a..b737bccb 100644
--- a/AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h
+++ b/AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h
@@ -200,6 +200,7 @@ namespace SPTAG::SPANN {
std::unordered_setm_splitList;
Helper::Concurrent::ConcurrentMap m_mergeList;
+ std::shared_timed_mutex m_mergeListLock;
public:
ExtraDynamicSearcher(SPANN::Options& p_opt) {
@@ -298,7 +299,7 @@ namespace SPTAG::SPANN {
{
int assumptionBrokenNum = 0;
int postVectorNum = postingList.size() / m_vectorInfoSize;
- uint8_t* postingP = reinterpret_cast(&postingList.front());
+ uint8_t* postingP = reinterpret_cast(postingList.data());
float minDist;
float maxDist;
float avgDist = 0;
@@ -486,7 +487,7 @@ namespace SPTAG::SPANN {
}
// TODO
- ErrorCode RefineIndex(std::shared_ptr p_index,
+ ErrorCode RefineIndex(std::shared_ptr& p_index,
bool p_prereassign, std::vector *p_headmapping, std::vector *p_mapping) override
{
SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Begin RefineIndex\n");
@@ -501,6 +502,7 @@ namespace SPTAG::SPANN {
p_index->m_iDataBlockSize, p_index->m_iDataCapacity);
}
std::atomic_bool doneReassign = false;
+ Helper::Concurrent::ConcurrentSet mergelist;
while (!doneReassign) {
auto preReassignTimeBegin = std::chrono::high_resolution_clock::now();
std::atomic finalcode = ErrorCode::Success;
@@ -551,11 +553,11 @@ namespace SPTAG::SPANN {
index, (int)(m_postingSizes.GetSize(index) * m_vectorInfoSize),
(int)(postingList.size()), (int)(ret == ErrorCode::Success));
PrintErrorInPosting(postingList, index);
- finalcode = ret;
- return;
+ finalcode = ErrorCode::Fail;
+ //return;
}
SizeType postVectorNum = (SizeType)(postingList.size() / m_vectorInfoSize);
- auto *postingP = reinterpret_cast(&postingList.front());
+ auto *postingP = reinterpret_cast(postingList.data());
uint8_t *vectorId = postingP;
int vectorCount = 0;
for (int j = 0; j < postVectorNum;
@@ -575,10 +577,9 @@ namespace SPTAG::SPANN {
vectorCount++;
}
+ if (vectorCount <= m_mergeThreshold) mergelist.insert(p_headmapping->at(index));
+
postingList.resize(vectorCount * m_vectorInfoSize);
- new_postingSizes.UpdateSize(p_headmapping->at(index), vectorCount);
- *new_checkSums[p_headmapping->at(index)] =
- m_checkSum.CalcChecksum(postingList.c_str(), (int)(postingList.size()));
if ((ret = db->Put(p_headmapping->at(index), postingList, MaxTimeout,
&(workSpace.m_diskRequests))) !=
ErrorCode::Success)
@@ -588,6 +589,9 @@ namespace SPTAG::SPANN {
finalcode = ret;
return;
}
+ new_postingSizes.UpdateSize(p_headmapping->at(index), vectorCount);
+ *new_checkSums[p_headmapping->at(index)] =
+ m_checkSum.CalcChecksum(postingList.c_str(), (int)(postingList.size()));
if (m_opt->m_consistencyCheck && (ret = db->Check(p_headmapping->at(index), new_postingSizes.GetSize(p_headmapping->at(index)) * m_vectorInfoSize, nullptr)) != ErrorCode::Success)
{
SPTAGLIB_LOG(Helper::LogLevel::LL_Error,
@@ -615,8 +619,8 @@ namespace SPTAG::SPANN {
if (p_prereassign)
{
- p_index->SaveIndex(m_opt->m_indexDirectory + FolderSep + m_opt->m_headIndexFolder);
Checkpoint(m_opt->m_indexDirectory);
+ p_index->SaveIndex(m_opt->m_indexDirectory + FolderSep + m_opt->m_headIndexFolder);
CalculatePostingDistribution(p_index.get());
}
else
@@ -627,6 +631,37 @@ namespace SPTAG::SPANN {
std::string p_checksumPath = m_opt->m_indexDirectory + FolderSep + m_opt->m_checksumFile;
new_checkSums.Save(p_checksumPath);
db->Checkpoint(m_opt->m_indexDirectory);
+
+ if ((finalcode = m_postingSizes.Load(p_persistenRecord, p_index->m_iDataBlockSize,
+ p_index->m_iDataCapacity)) != ErrorCode::Success)
+ return finalcode;
+ if ((finalcode = m_checkSums.Load(p_checksumPath, p_index->m_iDataBlockSize,
+ p_index->m_iDataCapacity)) != ErrorCode::Success)
+ return finalcode;
+
+ if ((finalcode = m_versionMap->Load(m_opt->m_indexDirectory + FolderSep + m_opt->m_deleteIDFile,
+ p_index->m_iDataBlockSize, p_index->m_iDataCapacity)) !=
+ ErrorCode::Success)
+ return finalcode;
+ if ((finalcode = m_vectorTranslateMap->Load(
+ m_opt->m_indexDirectory + FolderSep + m_opt->m_headIDFile, p_index->m_iDataBlockSize,
+ p_index->m_iDataCapacity)) != ErrorCode::Success)
+ return finalcode;
+ if ((finalcode =
+ VectorIndex::LoadIndex(m_opt->m_indexDirectory + FolderSep + m_opt->m_headIndexFolder,
+ p_index)) != ErrorCode::Success)
+ return finalcode;
+
+ if (mergelist.size() > 0)
+ {
+ for (SizeType pid : mergelist)
+ {
+ MergeAsync(p_index.get(), pid);
+ }
+ Checkpoint(m_opt->m_indexDirectory);
+ p_index->SaveIndex(m_opt->m_indexDirectory + FolderSep + m_opt->m_headIndexFolder);
+ m_vectorTranslateMap->Save(m_opt->m_indexDirectory + FolderSep + m_opt->m_headIDFile);
+ }
}
SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "SPFresh: ReWriting SSD Info\n");
@@ -646,21 +681,27 @@ namespace SPTAG::SPANN {
std::unique_lock lock(m_rwLocks[headID], std::defer_lock);
if (requirelock) lock.lock();
+ int retry = 0;
+ Retry:
if (!p_index->ContainSample(headID)) return ErrorCode::Success;
std::string postingList;
auto splitGetBegin = std::chrono::high_resolution_clock::now();
if ((ret=db->Get(headID, &postingList, MaxTimeout, &(p_exWorkSpace->m_diskRequests))) !=
- ErrorCode::Success)// || !m_checkSum.ValidateChecksum(postingList.c_str(), (int)(postingList.size()), *m_checkSums[headID]))
+ ErrorCode::Success || !m_checkSum.ValidateChecksum(postingList.c_str(), (int)(postingList.size()), *m_checkSums[headID]))
{
- SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Split fail to get oversized postings: key=%d size=%d\n", headID, m_postingSizes.GetSize(headID));
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error,
+ "Split fail to get oversized postings: key=%d required size=%d read size=%d checksum "
+ "issue=%d\n",
+ headID, (int)(m_postingSizes.GetSize(headID) * m_vectorInfoSize),
+ (int)(postingList.size()), (int)(ret == ErrorCode::Success));
return ret;
}
auto splitGetEnd = std::chrono::high_resolution_clock::now();
elapsedMSeconds = std::chrono::duration_cast(splitGetEnd - splitGetBegin).count();
m_stat.m_getCost += elapsedMSeconds;
// reinterpret postingList to vectors and IDs
- auto* postingP = reinterpret_cast(&postingList.front());
+ auto* postingP = reinterpret_cast(postingList.data());
SizeType postVectorNum = (SizeType)(postingList.size() / m_vectorInfoSize);
//SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "DEBUG: db get Posting %d successfully with length %d real length:%d vectorNum:%d\n", headID, (int)(postingList.size()), m_postingSizes.GetSize(headID), postVectorNum);
@@ -668,44 +709,58 @@ namespace SPTAG::SPANN {
//COMMON::Dataset smallSample(0, m_opt->m_dim, p_index->m_iDataBlockSize, p_index->m_iDataCapacity); // smallSample[i] -> VID
//std::vector localIndicesInsert(postVectorNum); // smallSample[i] = j <-> localindices[j] = i
//std::vector localIndicesInsertVersion(postVectorNum);
- std::vector localIndices(postVectorNum);
- int index = 0;
+ std::vector localIndices;
+ localIndices.reserve(postVectorNum);
uint8_t* vectorId = postingP;
for (int j = 0; j < postVectorNum; j++, vectorId += m_vectorInfoSize)
{
//LOG(Helper::LogLevel::LL_Info, "vector index/total:id: %d/%d:%d\n", j, m_postingSizes[headID].load(), *(reinterpret_cast(vectorId)));
uint8_t version = *(vectorId + sizeof(int));
int VID = *((int*)(vectorId));
+ if (VID < 0 || VID >= m_versionMap->Count())
+ {
+ if (retry < 3)
+ {
+ retry++;
+ goto Retry;
+ }
+ else
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error,
+ "Split fail: Get posting %d fail after 3 times retries.\n", headID);
+ return ErrorCode::DiskIOFail;
+ }
+ }
+
//if (VID >= m_versionMap->Count()) SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "DEBUG: vector ID:%d total size:%d\n", VID, m_versionMap->Count());
if (m_versionMap->Deleted(VID) || m_versionMap->GetVersion(VID) != version) continue;
//localIndicesInsert[index] = VID;
//localIndicesInsertVersion[index] = version;
//smallSample.AddBatch(1, (ValueType*)(vectorId + m_metaDataSize));
- localIndices[index] = j;
- index++;
+ localIndices.push_back(j);
}
// double gcEndTime = sw.getElapsedMs();
// m_splitGcCost += gcEndTime;
- if (!preReassign && index < m_postingSizeLimit)
+ if (!preReassign && localIndices.size() < m_postingSizeLimit)
{
//SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "DEBUG: in place or not prereassign & index < m_postingSizeLimit. GC begin...\n");
char* ptr = (char*)(postingList.c_str());
- for (int j = 0; j < index; j++, ptr += m_vectorInfoSize)
+ for (int j = 0; j < localIndices.size(); j++, ptr += m_vectorInfoSize)
{
if (j == localIndices[j]) continue;
memcpy(ptr, postingList.c_str() + localIndices[j] * m_vectorInfoSize, m_vectorInfoSize);
//Serialize(ptr, localIndicesInsert[j], localIndicesInsertVersion[j], smallSample[j]);
}
- postingList.resize(index * m_vectorInfoSize);
- m_postingSizes.UpdateSize(headID, index);
- *m_checkSums[headID] = m_checkSum.CalcChecksum(postingList.c_str(), (int)(postingList.size()));
+ postingList.resize(localIndices.size() * m_vectorInfoSize);
if ((ret=db->Put(headID, postingList, MaxTimeout, &(p_exWorkSpace->m_diskRequests))) != ErrorCode::Success) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Split Fail to write back postings\n");
return ret;
}
+ m_postingSizes.UpdateSize(headID, localIndices.size());
+ *m_checkSums[headID] = m_checkSum.CalcChecksum(postingList.c_str(), (int)(postingList.size()));
if (m_opt->m_consistencyCheck && (ret = db->Check(headID, m_postingSizes.GetSize(headID) * m_vectorInfoSize, nullptr)) != ErrorCode::Success)
{
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Split: Check failed after Put %d\n", headID);
@@ -723,8 +778,6 @@ namespace SPTAG::SPANN {
//SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "GC triggered: %d, new length: %d\n", headID, index);
return ErrorCode::Success;
}
- //SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Resize\n");
- localIndices.resize(index);
auto clusterBegin = std::chrono::high_resolution_clock::now();
// k = 2, maybe we can change the split number, now it is fixed
@@ -751,13 +804,14 @@ namespace SPTAG::SPANN {
//Serialize(ptr, localIndicesInsert[j], localIndicesInsertVersion[j], smallSample[j]);
}
SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Cluserting Failed (The same vector), Cluster total dist:%f Only Keep %d vectors.\n", totaldist, cut);
-
- m_postingSizes.UpdateSize(headID, cut);
- *m_checkSums[headID] = m_checkSum.CalcChecksum(newpostingList.c_str(), (int)(newpostingList.size()));
+
if ((ret=db->Put(headID, newpostingList, MaxTimeout, &(p_exWorkSpace->m_diskRequests))) != ErrorCode::Success) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Split fail to override postings cut to limit\n");
return ret;
}
+ m_postingSizes.UpdateSize(headID, cut);
+ *m_checkSums[headID] =
+ m_checkSum.CalcChecksum(newpostingList.c_str(), (int)(newpostingList.size()));
if (m_opt->m_consistencyCheck && (ret = db->Check(headID, m_postingSizes.GetSize(headID) * m_vectorInfoSize, nullptr)) != ErrorCode::Success)
{
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Split: Consolidate Check failed after Put %d\n", headID);
@@ -787,14 +841,14 @@ namespace SPTAG::SPANN {
newHeadsID.push_back(headID);
newHeadVID = headID;
theSameHead = true;
- m_postingSizes.UpdateSize(newHeadVID, args.counts[k]);
- *m_checkSums[newHeadVID] =
- m_checkSum.CalcChecksum(newPostingLists[k].c_str(), (int)(newPostingLists[k].size()));
auto splitPutBegin = std::chrono::high_resolution_clock::now();
if (!preReassign && (ret=db->Put(newHeadVID, newPostingLists[k], MaxTimeout, &(p_exWorkSpace->m_diskRequests))) != ErrorCode::Success) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Fail to override postings\n");
return ret;
}
+ m_postingSizes.UpdateSize(newHeadVID, args.counts[k]);
+ *m_checkSums[newHeadVID] =
+ m_checkSum.CalcChecksum(newPostingLists[k].c_str(), (int)(newPostingLists[k].size()));
if (m_opt->m_consistencyCheck && (ret = db->Check(newHeadVID, m_postingSizes.GetSize(newHeadVID) * m_vectorInfoSize, nullptr)) != ErrorCode::Success)
{
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Split: Cluster Write Check failed after Put %d\n", newHeadVID);
@@ -926,7 +980,7 @@ namespace SPTAG::SPANN {
return ret;
}
- auto* postingP = reinterpret_cast(¤tPostingList.front());
+ auto* postingP = reinterpret_cast(currentPostingList.data());
size_t postVectorNum = currentPostingList.size() / m_vectorInfoSize;
int currentLength = 0;
uint8_t* vectorId = postingP;
@@ -959,7 +1013,10 @@ namespace SPTAG::SPANN {
m_mergeLock.unlock();
return ret;
}
- m_mergeList.unsafe_erase(headID);
+ {
+ std::unique_lock lock(m_mergeListLock);
+ m_mergeList.unsafe_erase(headID);
+ }
m_mergeLock.unlock();
return ErrorCode::Success;
}
@@ -972,7 +1029,12 @@ namespace SPTAG::SPANN {
{
BasicResult* queryResult = queryResults.GetResult(i);
int nextLength = m_postingSizes.GetSize(queryResult->VID);
- if (currentLength + nextLength < m_postingSizeLimit && m_mergeList.find(queryResult->VID) == m_mergeList.end())
+ bool listContains = false;
+ {
+ std::shared_lock anotherLock(m_mergeListLock);
+ listContains = (m_mergeList.find(queryResult->VID) != m_mergeList.end());
+ }
+ if (currentLength + nextLength < m_postingSizeLimit && !listContains)
{
{
std::unique_lock anotherLock(m_rwLocks[queryResult->VID], std::defer_lock);
@@ -993,11 +1055,6 @@ namespace SPTAG::SPANN {
}
postingP = reinterpret_cast(nextPostingList.data());
postVectorNum = nextPostingList.size() / m_vectorInfoSize;
- if (currentLength + postVectorNum > m_postingSizeLimit)
- {
- continue;
- }
-
nextLength = 0;
vectorId = postingP;
for (int j = 0; j < postVectorNum; j++, vectorId += m_vectorInfoSize)
@@ -1076,7 +1133,7 @@ namespace SPTAG::SPANN {
if (currentLength > nextLength)
{
/* ReAssign queryResult->VID*/
- postingP = reinterpret_cast(&nextPostingList.front());
+ postingP = reinterpret_cast(nextPostingList.data());
for (int j = 0; j < nextLength; j++) {
uint8_t* vectorId = postingP + j * m_vectorInfoSize;
// SizeType vid = *(reinterpret_cast(vectorId));
@@ -1090,7 +1147,7 @@ namespace SPTAG::SPANN {
} else
{
/* ReAssign headID*/
- postingP = reinterpret_cast(¤tPostingList.front());
+ postingP = reinterpret_cast(currentPostingList.data());
for (int j = 0; j < currentLength; j++) {
uint8_t* vectorId = postingP + j * m_vectorInfoSize;
// SizeType vid = *(reinterpret_cast(vectorId));
@@ -1106,18 +1163,21 @@ namespace SPTAG::SPANN {
if (m_opt->m_excludehead)
{
SizeType vid = (SizeType)(*(m_vectorTranslateMap->At(deletedHead)));
- if (!m_versionMap->Deleted(vid))
+ if (vid != MaxSize && !m_versionMap->Deleted(vid))
{
std::shared_ptr vectorinfo =
std::make_shared(m_vectorInfoSize, ' ');
- Serialize(&(vectorinfo->front()), vid, m_versionMap->GetVersion(vid),
+ Serialize(vectorinfo->data(), vid, m_versionMap->GetVersion(vid),
p_index->GetSample(deletedHead));
ReassignAsync(p_index, vectorinfo, -1);
}
}
}
- m_mergeList.unsafe_erase(headID);
+ {
+ std::unique_lock lock(m_mergeListLock);
+ m_mergeList.unsafe_erase(headID);
+ }
m_stat.m_mergeNum++;
return ErrorCode::Success;
@@ -1138,7 +1198,10 @@ namespace SPTAG::SPANN {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Merge: Check failed after put original posting %d\n", headID);
return ret;
}
- m_mergeList.unsafe_erase(headID);
+ {
+ std::unique_lock lock(m_mergeListLock);
+ m_mergeList.unsafe_erase(headID);
+ }
m_mergeLock.unlock();
}
return ErrorCode::Success;
@@ -1170,11 +1233,16 @@ namespace SPTAG::SPANN {
inline void MergeAsync(VectorIndex* p_index, SizeType headID, std::function p_callback = nullptr)
{
- if (m_mergeList.find(headID) != m_mergeList.end()) {
- return;
- }
Helper::Concurrent::ConcurrentMap::value_type workPair(headID, headID);
- m_mergeList.insert(workPair);
+ {
+ std::shared_lock lock(m_mergeListLock);
+ auto res = m_mergeList.insert(workPair);
+ if (!res.second)
+ {
+ // Already in queue
+ return;
+ }
+ }
auto* curJob = new MergeAsyncJob(p_index, this, headID, m_opt->m_disableReassign, p_callback);
m_splitThreadPool->add(curJob);
@@ -1194,10 +1262,10 @@ namespace SPTAG::SPANN {
if (m_opt->m_excludehead && !theSameHead)
{
SizeType vid = (SizeType)(*(m_vectorTranslateMap->At(headID)));
- if (!m_versionMap->Deleted(vid))
+ if (vid != MaxSize && !m_versionMap->Deleted(vid))
{
std::shared_ptr vectorinfo = std::make_shared(m_vectorInfoSize, ' ');
- Serialize(&(vectorinfo->front()), vid, m_versionMap->GetVersion(vid), headVector);
+ Serialize(vectorinfo->data(), vid, m_versionMap->GetVersion(vid), headVector);
ReassignAsync(p_index, vectorinfo, -1);
}
}
@@ -1208,7 +1276,7 @@ namespace SPTAG::SPANN {
for (int i = 0; i < postingLists.size(); i++) {
auto& postingList = postingLists[i];
size_t postVectorNum = postingList.size() / m_vectorInfoSize;
- auto* postingP = reinterpret_cast(&postingList.front());
+ auto* postingP = reinterpret_cast(postingList.data());
for (int j = 0; j < postVectorNum; j++) {
uint8_t* vectorId = postingP + j * m_vectorInfoSize;
SizeType vid = *(reinterpret_cast(vectorId));
@@ -1229,8 +1297,6 @@ namespace SPTAG::SPANN {
std::vector HeadPrevTopK;
newHeadsDist.clear();
newHeadsDist.resize(0);
- postingLists.clear();
- postingLists.resize(0);
COMMON::QueryResultSet nearbyHeads(headVector, m_opt->m_reassignK);
p_index->SearchIndex(nearbyHeads);
BasicResult* queryResults = nearbyHeads.GetResults();
@@ -1246,7 +1312,10 @@ namespace SPTAG::SPANN {
}
auto reassignScanIOBegin = std::chrono::high_resolution_clock::now();
ErrorCode ret;
- if ((ret=db->MultiGet(HeadPrevTopK, &postingLists, m_hardLatencyLimit, &(p_exWorkSpace->m_diskRequests))) != ErrorCode::Success || !ValidatePostings(HeadPrevTopK, postingLists)) {
+ if ((ret = db->MultiGet(HeadPrevTopK, p_exWorkSpace->m_pageBuffers, m_hardLatencyLimit,
+ &(p_exWorkSpace->m_diskRequests))) != ErrorCode::Success ||
+ !ValidatePostings(HeadPrevTopK, p_exWorkSpace->m_pageBuffers))
+ {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "ReAssign can't get all the near postings\n");
return ret;
}
@@ -1255,15 +1324,16 @@ namespace SPTAG::SPANN {
auto elapsedMSeconds = std::chrono::duration_cast(reassignScanIOEnd - reassignScanIOBegin).count();
m_stat.m_reassignScanIOCost += elapsedMSeconds;
- for (int i = 0; i < postingLists.size(); i++) {
- auto& postingList = postingLists[i];
- size_t postVectorNum = postingList.size() / m_vectorInfoSize;
- auto* postingP = reinterpret_cast(postingList.data());
+ for (int i = 0; i < HeadPrevTopK.size(); i++)
+ {
+ auto &buffer = (p_exWorkSpace->m_pageBuffers[i]);
+ size_t postVectorNum = (int)(buffer.GetAvailableSize() / m_vectorInfoSize);
+ auto *postingP = buffer.GetBuffer();
for (int j = 0; j < postVectorNum; j++) {
uint8_t* vectorId = postingP + j * m_vectorInfoSize;
SizeType vid = *(reinterpret_cast(vectorId));
// SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "%d: VID: %d, Head: %d, size:%d/%d\n", i, vid, HeadPrevTopK[i], postingLists.size(), HeadPrevTopK.size());
- uint8_t version = *(reinterpret_cast(vectorId + sizeof(int)));
+ uint8_t version = *(reinterpret_cast(vectorId + sizeof(SizeType)));
ValueType* vector = reinterpret_cast(vectorId + m_metaDataSize);
if (reAssignVectorsTopK.find(vid) == reAssignVectorsTopK.end() && !m_versionMap->Deleted(vid) && m_versionMap->GetVersion(vid) == version) {
m_stat.m_reAssignScanNum++;
@@ -1321,7 +1391,7 @@ namespace SPTAG::SPANN {
p_exWorkSpace->Clear(m_opt->m_searchInternalResultNum, (max(m_opt->m_postingPageLimit, m_opt->m_searchPostingPageLimit) + m_opt->m_bufferLength) << PageSizeEx, true, m_opt->m_enableDataCompression);
}
else {
- p_exWorkSpace->Initialize(m_opt->m_maxCheck, m_opt->m_hashExp, m_opt->m_searchInternalResultNum, (max(m_opt->m_postingPageLimit, m_opt->m_searchPostingPageLimit) + m_opt->m_bufferLength) << PageSizeEx, true, m_opt->m_enableDataCompression);
+ p_exWorkSpace->Initialize(m_opt->m_maxCheck, m_opt->m_hashExp, max(m_opt->m_searchInternalResultNum, m_opt->m_reassignK), (max(m_opt->m_postingPageLimit, m_opt->m_searchPostingPageLimit) + m_opt->m_bufferLength) << PageSizeEx, true, m_opt->m_enableDataCompression);
int wid = 0;
if (m_freeWorkSpaceIds == nullptr || !m_freeWorkSpaceIds->try_pop(wid))
{
@@ -1391,28 +1461,35 @@ namespace SPTAG::SPANN {
{
//std::shared_lock lock(m_rwLocks[headID]); //ROCKSDB
std::unique_lock lock(m_rwLocks[headID]); //SPDK
+ ErrorCode ret;
if (!p_index->ContainSample(headID)) {
lock.unlock();
goto checkDeleted;
}
if (m_postingSizes.GetSize(headID) + appendNum > (m_postingSizeLimit + m_bufferSizeLimit)) {
- SPTAGLIB_LOG(Helper::LogLevel::LL_Warning, "After appending, the number of vectors exceeds the postingsize + buffersize (%d + %d)! Do split now...\n", m_postingSizeLimit, m_bufferSizeLimit);
- Split(p_exWorkSpace, p_index, headID, !m_opt->m_disableReassign, false, false);
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Warning, "After appending, the number of vectors in %d exceeds the postingsize + buffersize (%d + %d)! Do split now...\n", headID, m_postingSizeLimit, m_bufferSizeLimit);
+ ret = Split(p_exWorkSpace, p_index, headID, !m_opt->m_disableReassign, false, false);
+ if (ret != ErrorCode::Success)
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Split %d failed!\n", headID);
lock.unlock();
goto checkDeleted;
}
- ErrorCode ret;
auto appendIOBegin = std::chrono::high_resolution_clock::now();
- *m_checkSums[headID] =
- m_checkSum.AppendChecksum(*m_checkSums[headID], appendPosting.c_str(), (int)(appendPosting.size()));
- if ((ret=db->Merge(headID, appendPosting, MaxTimeout, &(p_exWorkSpace->m_diskRequests))) != ErrorCode::Success) {
- SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Merge failed! Posting Size:%d, limit: %d\n", m_postingSizes.GetSize(headID), m_postingSizeLimit);
+ if ((ret = db->Merge(
+ headID, appendPosting, MaxTimeout, &(p_exWorkSpace->m_diskRequests),
+ [this, prefixChecksum = *m_checkSums[headID]](const void *val, const int size) -> bool {
+ return this->m_checkSum.ValidateChecksum((const char*)val, size, prefixChecksum);
+ })) != ErrorCode::Success)
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Merge failed for %d! Posting Size:%d, limit: %d\n", headID, m_postingSizes.GetSize(headID), m_postingSizeLimit);
GetDBStats();
return ret;
}
auto appendIOEnd = std::chrono::high_resolution_clock::now();
appendIOSeconds = std::chrono::duration_cast(appendIOEnd - appendIOBegin).count();
+ *m_checkSums[headID] =
+ m_checkSum.AppendChecksum(*m_checkSums[headID], appendPosting.c_str(), (int)(appendPosting.size()));
m_postingSizes.IncSize(headID, appendNum);
if (m_opt->m_consistencyCheck && (ret = db->Check(headID, m_postingSizes.GetSize(headID) * m_vectorInfoSize, nullptr)) != ErrorCode::Success)
{
@@ -1566,7 +1643,7 @@ namespace SPTAG::SPANN {
if (vectorNum > m_postingSizeLimit)
vectorNum = m_postingSizeLimit;
- auto *postingP = reinterpret_cast(&tempPosting.front());
+ auto *postingP = reinterpret_cast(tempPosting.data());
std::string newPosting(m_vectorInfoSize * vectorNum, '\0');
char *ptr = (char *)(newPosting.c_str());
for (int j = 0; j < vectorNum; ++j, ptr += m_vectorInfoSize)
@@ -1680,7 +1757,7 @@ namespace SPTAG::SPANN {
Helper::LogLevel::LL_Error,
"ValidatePostings fail: posting id:%d, required size:%d, buffer size:%d, checksum:%d\n",
pids[i], (int)(m_postingSizes.GetSize(pids[i]) * m_vectorInfoSize), (int)(postings[i].GetAvailableSize()), (int)(*m_checkSums[pids[i]]));
- //return false;
+ return false;
}
}
return true;
@@ -1688,8 +1765,7 @@ namespace SPTAG::SPANN {
bool ValidatePostings(std::vector &pids, std::vector &postings)
{
- if (!m_opt->m_checksumInRead)
- return true;
+ if (!m_opt->m_checksumInRead) return true;
ErrorCode ret;
for (int i = 0; i < pids.size(); i++)
@@ -1702,7 +1778,8 @@ namespace SPTAG::SPANN {
"ValidatePostings fail: posting id:%d, required size:%d, buffer size:%d, checksum:%d\n",
pids[i], (int)(m_postingSizes.GetSize(pids[i]) * m_vectorInfoSize),
(int)(postings[i].size()), (int)(*m_checkSums[pids[i]]));
- // return false;
+ PrintErrorInPosting(postings[i], pids[i]);
+ return false;
}
}
return true;
@@ -1794,7 +1871,7 @@ namespace SPTAG::SPANN {
queryResults.AddPoint(vectorID, distance2leaf);
}
auto compEnd = std::chrono::high_resolution_clock::now();
- if (realNum <= m_mergeThreshold) MergeAsync(p_index.get(), curPostingID); // TODO: Control merge
+ if (m_opt->m_asyncMergeInSearch && realNum <= m_mergeThreshold) MergeAsync(p_index.get(), curPostingID); // TODO: Control merge
compLatency += ((double)std::chrono::duration_cast(compEnd - compStart).count());
@@ -2178,7 +2255,6 @@ namespace SPTAG::SPANN {
std::vector replicaCountDist(m_opt->m_replicaCount + 1, 0);
for (int i = 0; i < replicaCount.size(); ++i)
{
- if (headVectorIDS.count(i) > 0) continue;
++replicaCountDist[replicaCount[i]];
}
@@ -2324,7 +2400,6 @@ namespace SPTAG::SPANN {
Serialize(ptr, fullID, version, p_fullVectors->GetVector(fullID));
ptr += m_vectorInfoSize;
}
- *m_checkSums[index] = m_checkSum.CalcChecksum(postinglist.c_str(), (int)(postinglist.size()));
ErrorCode tmp;
if ((tmp = db->Put(index, postinglist, MaxTimeout, &(workSpace.m_diskRequests))) !=
ErrorCode::Success)
@@ -2333,6 +2408,7 @@ namespace SPTAG::SPANN {
ret = tmp;
return;
}
+ *m_checkSums[index] = m_checkSum.CalcChecksum(postinglist.c_str(), (int)(postinglist.size()));
if (m_opt->m_consistencyCheck && (tmp = db->Check(index, m_postingSizes.GetSize(index) * m_vectorInfoSize, nullptr)) !=
ErrorCode::Success)
{
@@ -2453,7 +2529,7 @@ namespace SPTAG::SPANN {
return (postingID < m_postingSizes.GetPostingNum()) && (m_postingSizes.GetSize(postingID) > 0);
}
- virtual ErrorCode CheckPosting(SizeType postingID, std::vector *visited = nullptr,
+ virtual ErrorCode CheckPosting(SizeType postingID, std::vector *visited = nullptr,
ExtraWorkSpace *p_exWorkSpace = nullptr) override
{
if (postingID < 0 || postingID >= m_postingSizes.GetPostingNum())
@@ -2485,6 +2561,7 @@ namespace SPTAG::SPANN {
!m_checkSum.ValidateChecksum(posting.c_str(), (int)(posting.size()), *m_checkSums[postingID]))
{
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[CheckPosting] Get checksum fail %d!\n", postingID);
+ PrintErrorInPosting(posting, postingID);
return ret;
}
}
diff --git a/AnnService/inc/Core/SPANN/ExtraFileController.h b/AnnService/inc/Core/SPANN/ExtraFileController.h
index 70c1824e..f4b8bf24 100644
--- a/AnnService/inc/Core/SPANN/ExtraFileController.h
+++ b/AnnService/inc/Core/SPANN/ExtraFileController.h
@@ -4,7 +4,7 @@
#include "inc/Helper/KeyValueIO.h"
#include "inc/Core/Common/Dataset.h"
-#include "inc/Core/Common/Labelset.h"
+#include "inc/Core/Common/LabelSet.h"
#include "inc/Core/Common/FineGrainedLock.h"
#include "inc/Core/VectorIndex.h"
#include "inc/Helper/ThreadPool.h"
@@ -27,7 +27,7 @@ namespace SPTAG::SPANN {
Helper::Concurrent::ConcurrentQueue m_blockAddresses;
Helper::Concurrent::ConcurrentQueue m_blockAddresses_reserve;
- COMMON::Labelset m_available;
+ COMMON::LabelSet m_available;
std::atomic read_complete_vec = 0;
std::atomic read_submit_vec = 0;
@@ -75,6 +75,8 @@ namespace SPTAG::SPANN {
bool ReadBlocks(AddressType* p_data, std::string* p_value, const std::chrono::microseconds &timeout, std::vector* reqs);
+ bool ReadBlocks(AddressType *p_data, Helper::PageBuffer &p_value, const std::chrono::microseconds &timeout, std::vector *reqs);
+
bool ReadBlocks(const std::vector& p_data, std::vector* p_value, const std::chrono::microseconds &timeout, std::vector* reqs);
bool ReadBlocks(const std::vector& p_data, std::vector>& p_value, const std::chrono::microseconds& timeout, std::vector* reqs);
@@ -103,15 +105,18 @@ namespace SPTAG::SPANN {
SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "FileIO::BlockController::Checkpoint - Reload reserved blocks...\n");
AddressType currBlockAddress = 0;
- while (m_blockAddresses_reserve.try_pop(currBlockAddress))
- {
+ int reloadCount = 0;
+
+ while (m_blockAddresses_reserve.try_pop(currBlockAddress)) {
m_blockAddresses.push(currBlockAddress);
+ ++reloadCount;
}
AddressType blocks = RemainBlocks();
AddressType totalBlocks = m_totalAllocatedBlocks.load();
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "FileIO::BlockController::Checkpoint - Reloaded blocks: %d\n", reloadCount);
SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "FileIO::BlockController::Checkpoint - Total allocated blocks: %llu\n", static_cast(totalBlocks));
- SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "FileIO::BlockController::Checkpoint - Remaining free blocks: %llu\n", blocks);
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "FileIO::BlockController::Checkpoint - Remaining free blocks: %llu\n", static_cast(blocks));
SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "FileIO::BlockController::Checkpoint - Saving to file: %s\n", filename.c_str());
auto ptr = f_createIO();
@@ -124,26 +129,15 @@ namespace SPTAG::SPANN {
for (auto it = m_blockAddresses.unsafe_begin(); it != m_blockAddresses.unsafe_end(); it++) {
IOBINARY(ptr, WriteBinary, sizeof(AddressType), reinterpret_cast(&(*it)));
}
- /*
- int i = 0;
- for (auto it = m_blockAddresses.unsafe_begin(); it != m_blockAddresses.unsafe_end(); it++) {
- std::cout << *it << " ";
- i++;
- if (i == 10) break;
- }
- std::cout << std::endl;
- */
- SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Save Finish!\n");
+
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "FileIO::BlockController::Checkpoint - Save Finish!\n");
return ErrorCode::Success;
}
ErrorCode LoadBlockPool(std::string prefix, AddressType startNumBlocks, bool allowInit, int blockSize, int blockCapacity) {
std::string blockfile = prefix + "_blockpool";
if (allowInit && !fileexists(blockfile.c_str())) {
- SPTAGLIB_LOG(Helper::LogLevel::LL_Info,
- "FileIO::BlockController::LoadBlockPool: initializing fresh pool (no existing file "
- "found: %s)\n",
- blockfile.c_str());
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "FileIO::BlockController::LoadBlockPool: initializing fresh pool (no existing file found: %s)\n", blockfile.c_str());
m_available.Initialize(startNumBlocks, blockSize, blockCapacity);
for(AddressType i = 0; i < startNumBlocks; i++) {
m_blockAddresses.push(i);
@@ -206,34 +200,47 @@ namespace SPTAG::SPANN {
return ErrorCode::Success;
}
};
-
+
+ struct CacheCounters
+ {
+ int64_t query{};
+ int64_t query_hits{};
+ int64_t put{};
+ int64_t evict{};
+ };
+
+ struct CacheEntry
+ {
+ std::string value;
+ std::list::iterator iter;
+ };
+
class LRUCache {
int64_t capacity;
int64_t limit;
int64_t size;
std::list keys; // Page Address
- std::unordered_map::iterator>> cache; // Page Address -> Page Address in Cache
- int64_t queries;
- std::atomic hits;
+ std::unordered_map cache; // Page Address -> Page Address in Cache
FileIO* fileIO;
Helper::RequestQueue processIocp;
std::vector reqs;
- std::vector> pageBuffers;
+ Helper::PageBuffer pageBuffer;
+ std::atomic query_counter{};
+ std::atomic query_hits_counter{};
+ std::atomic put_counter{};
+ std::atomic evict_counter{};
public:
LRUCache(int64_t capacity, int64_t limit, FileIO* fileIO) {
this->capacity = capacity;
this->limit = min(capacity, (limit << PageSizeEx));
this->size = 0;
- this->queries = 0;
- this->hits = 0;
this->fileIO = fileIO;
this->reqs.resize(limit);
- this->pageBuffers.resize(limit);
- for (int i = 0; i < limit; i++) {
- this->pageBuffers[i].ReservePageBuffer(PageSize);
+ this->pageBuffer.ReservePageBuffer(limit << PageSizeEx);
+ for (uint64_t i = 0; i < limit; i++) {
auto& req = this->reqs[i];
- req.m_buffer = (char*)(this->pageBuffers[i].GetBuffer());
+ req.m_buffer = (char *)(this->pageBuffer.GetBuffer() + (i << PageSizeEx));
req.m_extension = &processIocp;
#ifdef _MSC_VER
@@ -249,8 +256,9 @@ namespace SPTAG::SPANN {
~LRUCache() {}
- bool evict(SizeType key, void* value, int vsize, std::unordered_map::iterator>>::iterator& it) {
+ bool evict(SizeType key, void* value, int vsize, std::unordered_map::iterator& it) {
if (value != nullptr) {
+ ++evict_counter;
std::string valstr((char*)value, vsize);
if (fileIO->Put(key, valstr, MaxTimeout, &reqs, false) != ErrorCode::Success) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "LRUCache: evict key:%d value size:%d to file failed\n", key, vsize);
@@ -258,50 +266,64 @@ namespace SPTAG::SPANN {
}
}
- size -= it->second.first.size();
- keys.erase(it->second.second);
+ size -= it->second.value.size();
+ keys.erase(it->second.iter);
cache.erase(it);
return true;
}
- bool get(SizeType key, void* value, int& get_size) {
- queries++;
+ bool get(SizeType key, Helper::PageBuffer& buffer) {
+ ++query_counter;
auto it = cache.find(key);
if (it == cache.end()) {
- return false; // If the key does not exist, return -1
+ return false;
}
- if (get_size > it->second.first.size()) {
- SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Cache get error: key %d required size %d, real size = %d\n", key, get_size, (int)(it->second.first.size()));
- get_size = (int)(it->second.first.size());
+
+ size_t data_size = it->second.value.size();
+ buffer.ReservePageBuffer(data_size);
+ memcpy(buffer.GetBuffer(), it->second.value.data(), data_size);
+ buffer.SetAvailableSize(data_size);
+ ++query_hits_counter;
+ return true;
+ }
+
+ bool get(SizeType key, std::string& value) {
+ ++query_counter;
+ auto it = cache.find(key);
+ if (it == cache.end()) {
+ return false;
}
- // Update access order, move the key to the head of the linked list
- memcpy((char*)value, it->second.first.data(), get_size);
- hits++;
+
+ size_t data_size = it->second.value.size();
+ value.resize(data_size);
+ memcpy(value.data(), it->second.value.data(), data_size);
+ ++query_hits_counter;
return true;
}
bool put(SizeType key, void* value, int put_size) {
+ ++put_counter;
auto it = cache.find(key);
if (it != cache.end()) {
if (put_size > limit) {
- evict(key, it->second.first.data(), it->second.first.size(), it);
+ evict(key, it->second.value.data(), it->second.value.size(), it);
return false;
}
- keys.splice(keys.begin(), keys, it->second.second);
- it->second.second = keys.begin();
+ keys.splice(keys.begin(), keys, it->second.iter);
+ it->second.iter = keys.begin();
- auto delta_size = put_size - (int)(it->second.first.size());
+ int delta_size = put_size - (int)(it->second.value.size());
while ((int)(capacity - size) < delta_size && (keys.size() > 1)) {
auto last = keys.back();
auto lastit = cache.find(last);
- if (!evict(last, lastit->second.first.data(), lastit->second.first.size(), lastit)) {
+ if (!evict(last, lastit->second.value.data(), lastit->second.value.size(), lastit)) {
+ evict(key, it->second.value.data(), it->second.value.size(), it);
return false;
}
}
- it->second.first.resize(put_size);
- memcpy(it->second.first.data(), (char*)value, put_size);
+ it->second.value.resize(put_size);
+ memcpy(it->second.value.data(), (char*)value, put_size);
size += delta_size;
- hits++;
return true;
}
if (put_size > limit) {
@@ -310,12 +332,12 @@ namespace SPTAG::SPANN {
while (put_size > (int)(capacity - size) && (!keys.empty())) {
auto last = keys.back();
auto lastit = cache.find(last);
- if (!evict(last, lastit->second.first.data(), lastit->second.first.size(), lastit)) {
+ if (!evict(last, lastit->second.value.data(), lastit->second.value.size(), lastit)) {
return false;
}
}
auto keys_it = keys.insert(keys.begin(), key);
- cache.insert({key, {std::string((char*)value, put_size), keys_it}});
+ cache.insert({key, {std::string((char *)value, put_size), keys_it}});
size += put_size;
return true;
}
@@ -329,51 +351,66 @@ namespace SPTAG::SPANN {
return true;
}
- bool merge(SizeType key, void* value, int merge_size) {
+ bool merge(SizeType key, void *value, int merge_size, std::function checksum)
+ {
+ ++put_counter;
// SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "LRUCache: merge size: %lld\n", merge_size);
auto it = cache.find(key);
if (it == cache.end()) {
// SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "LRUCache: merge key not found\n");
- std::string valstr;
- if (fileIO->Get(key, &valstr, MaxTimeout, &reqs, false) != ErrorCode::Success) {
- SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "LRUCache: merge key not found in file\n");
+ ErrorCode ret;
+ if ((ret = fileIO->Get(key, pageBuffer, MaxTimeout, &reqs, false)) != ErrorCode::Success ||
+ !checksum(pageBuffer.GetBuffer(), pageBuffer.GetAvailableSize())) {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "LRUCache: merge key %d not found in file or checksum issue = %d\n", key, (int)(ret == ErrorCode::Success));
return false; // If the key does not exist, return false
}
- cache.insert({key, {valstr, keys.insert(keys.begin(), key)}});
+ std::string valstr(pageBuffer.GetAvailableSize() + merge_size, '\0');
+ memcpy(valstr.data(), pageBuffer.GetBuffer(), pageBuffer.GetAvailableSize());
+ memcpy(valstr.data() + pageBuffer.GetAvailableSize(), value, merge_size);
+ while (valstr.size() > (int)(capacity - size) && (!keys.empty()))
+ {
+ auto last = keys.back();
+ auto lastit = cache.find(last);
+ if (!evict(last, lastit->second.value.data(), lastit->second.value.size(), lastit))
+ {
+ return false;
+ }
+ }
+ auto keys_it = keys.insert(keys.begin(), key);
+ cache.insert({key, {valstr, keys_it}});
size += valstr.size();
- it = cache.find(key);
- } else {
- hits++;
+ return true;
}
- if (merge_size + it->second.first.size() > limit) {
- evict(key, it->second.first.data(), it->second.first.size(), it);
+ if (merge_size + it->second.value.size() > limit) {
+ evict(key, it->second.value.data(), it->second.value.size(), it);
// SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "LRUCache: merge size exceeded\n");
return false;
}
- keys.splice(keys.begin(), keys, it->second.second);
- it->second.second = keys.begin();
+ keys.splice(keys.begin(), keys, it->second.iter);
+ it->second.iter = keys.begin();
while((int)(capacity - size) < merge_size && (keys.size() > 1)) {
auto last = keys.back();
auto lastit = cache.find(last);
- if (!evict(last, lastit->second.first.data(), lastit->second.first.size(), lastit)) {
+ if (!evict(last, lastit->second.value.data(), lastit->second.value.size(), lastit)) {
+ evict(key, it->second.value.data(), it->second.value.size(), it);
return false;
}
}
- it->second.first.append((char*)value, merge_size);
+ it->second.value.append((char*)value, merge_size);
size += merge_size;
// SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "LRUCache: merge success\n");
return true;
}
-
- std::pair get_stat() {
- return {queries, hits.load()};
- }
+ CacheCounters get_stat() {
+ return CacheCounters{query_counter.load(), query_hits_counter.load(), put_counter.load(), evict_counter.load()};
+ }
+
bool flush() {
for (auto it = cache.begin(); it != cache.end(); it++) {
- if (fileIO->Put(it->first, it->second.first, MaxTimeout, &reqs, false) != ErrorCode::Success) {
- SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "LRUCache: evict key:%d value size:%d to file failed\n", it->first, (int)(it->second.first.size()));
+ if (fileIO->Put(it->first, it->second.value, MaxTimeout, &reqs, false) != ErrorCode::Success) {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "LRUCache: evict key:%d value size:%d to file failed\n", it->first, (int)(it->second.value.size()));
return false;
}
}
@@ -382,13 +419,18 @@ namespace SPTAG::SPANN {
size = 0;
return true;
}
+
+ int64_t GetApproximateMemoryUsage() const
+ {
+ return static_cast(size);
+ }
};
class ShardedLRUCache {
int shards;
std::vector caches;
std::unique_ptr m_rwMutexs;
-
+
public:
ShardedLRUCache(int shards, int64_t capacity, int64_t limit, FileIO* fileIO) : shards(shards) {
caches.resize(shards);
@@ -407,10 +449,18 @@ namespace SPTAG::SPANN {
}
}
- bool get(SizeType key, void* value, int& get_size) {
+ bool get(SizeType key, Helper::PageBuffer &buffer)
+ {
+ SizeType cid = hash(key);
+ std::shared_lock lock(m_rwMutexs[cid]);
+ return caches[cid]->get(key, buffer);
+ }
+
+ bool get(SizeType key, std::string &value)
+ {
SizeType cid = hash(key);
std::shared_lock lock(m_rwMutexs[cid]);
- return caches[cid]->get(key, value, get_size);
+ return caches[cid]->get(key, value);
}
bool put(SizeType key, void* value, int put_size) {
@@ -421,12 +471,15 @@ namespace SPTAG::SPANN {
return caches[hash(key)]->del(key);
}
- bool merge(SizeType key, void* value, int merge_size) {
- return caches[hash(key)]->merge(key, value, merge_size);
+ bool merge(SizeType key, void *value, int merge_size,
+ std::function checksum)
+ {
+ return caches[hash(key)]->merge(key, value, merge_size, checksum);
}
bool flush() {
for (int i = 0; i < shards; i++) {
+ std::unique_lock lock(m_rwMutexs[i]);
if (!caches[i]->flush()) return false;
}
return true;
@@ -437,19 +490,31 @@ namespace SPTAG::SPANN {
return m_rwMutexs[hash(key)];
}
- SizeType hash(SizeType key) const
- {
+ SizeType hash(SizeType key) const {
return key % shards;
}
-
- std::pair get_stat() {
- int64_t queries = 0, hits = 0;
+
+ CacheCounters get_stat() {
+ CacheCounters result;
for (int i = 0; i < shards; i++) {
auto stat = caches[i]->get_stat();
- queries += stat.first;
- hits += stat.second;
+ result.query += stat.query;
+ result.query_hits += stat.query_hits;
+ result.put += stat.put;
+ result.evict += stat.evict;
}
- return {queries, hits};
+
+ return result;
+ }
+
+ int64_t GetApproximateMemoryUsage() const
+ {
+ int64_t result = 0;
+ for (int i = 0; i < shards; i++)
+ {
+ result += caches[i]->GetApproximateMemoryUsage();
+ }
+ return result;
}
};
@@ -565,9 +630,7 @@ namespace SPTAG::SPANN {
}
ErrorCode Get(const SizeType key, std::string* value, const std::chrono::microseconds &timeout, std::vector* reqs, bool useCache) {
- auto get_begin_time = std::chrono::high_resolution_clock::now();
SizeType r = m_pBlockMapping.R();
-
if (key >= r) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Key OverFlow! Key:%d R:%d\n", key, r);
return ErrorCode::Key_OverFlow;
@@ -582,23 +645,12 @@ namespace SPTAG::SPANN {
if (size < 0) return ErrorCode::Posting_SizeError;
if (useCache && m_pShardedLRUCache) {
- value->resize(size);
- if (m_pShardedLRUCache->get(key, value->data(), size)) {
- value->resize(size);
+ if (m_pShardedLRUCache->get(key, *value)) {
return ErrorCode::Success;
}
}
- // if (m_pBlockController.ReadBlocks((AddressType*)At(key), value)) {
- // return ErrorCode::Success;
- // }
- auto begin_time = std::chrono::high_resolution_clock::now();
auto result = m_pBlockController.ReadBlocks((AddressType*)At(key), value, timeout, reqs);
- auto end_time = std::chrono::high_resolution_clock::now();
- read_time_vec += std::chrono::duration_cast(end_time - begin_time).count();
- get_times_vec++;
- auto get_end_time = std::chrono::high_resolution_clock::now();
- get_time_vec += std::chrono::duration_cast(get_end_time - get_begin_time).count();
return result ? ErrorCode::Success : ErrorCode::Fail;
}
@@ -610,36 +662,58 @@ namespace SPTAG::SPANN {
return Get(std::stoi(key), value, timeout, reqs, true);
}
+ ErrorCode Get(const SizeType key, Helper::PageBuffer &value,
+ const std::chrono::microseconds &timeout, std::vector *reqs, bool useCache = true) override
+ {
+ SizeType r = m_pBlockMapping.R();
+ if (key >= r)
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Key OverFlow! Key:%d R:%d\n", key, r);
+ return ErrorCode::Key_OverFlow;
+ }
+ AddressType *addr = (AddressType *)(At(key));
+ if (((uintptr_t)addr) == 0xffffffffffffffff)
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Key NotFound! Key:%d\n", key);
+ return ErrorCode::Key_NotFound;
+ }
+
+ int size = (int)(addr[0]);
+ if (size < 0) return ErrorCode::Posting_SizeError;
+
+ if (useCache && m_pShardedLRUCache)
+ {
+ if (m_pShardedLRUCache->get(key, value))
+ {
+ return ErrorCode::Success;
+ }
+ }
+
+ auto result = m_pBlockController.ReadBlocks((AddressType *)At(key), value, timeout, reqs);
+ return result ? ErrorCode::Success : ErrorCode::Fail;
+ }
+
ErrorCode MultiGet(const std::vector& keys, std::vector>& values,
const std::chrono::microseconds &timeout, std::vector* reqs) override {
std::vector blocks;
- std::set lock_keys;
SizeType r;
int i = 0;
for (SizeType key : keys) {
- r = m_pBlockMapping.R();
+ r = m_pBlockMapping.R();
if (key < r) {
- AddressType* addr = (AddressType*)(At(key));
- if (m_pShardedLRUCache && ((uintptr_t)addr) != 0xffffffffffffffff && addr[0] >= 0) {
- int size = (int)(addr[0]);
- values[i].ReservePageBuffer(size);
- if (m_pShardedLRUCache->get(key, values[i].GetBuffer(), size)) {
- values[i].SetAvailableSize(size);
- blocks.push_back(nullptr);
- }
- else {
- blocks.push_back(addr);
- }
+ if (m_pShardedLRUCache && m_pShardedLRUCache->get(key, values[i]))
+ {
+ blocks.push_back(nullptr);
} else {
+ AddressType* addr = (AddressType*)(At(key));
blocks.push_back(addr);
- }
+ }
}
else {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Fail to read key:%d total key number:%d\n", key, r);
}
i++;
}
- // if (m_pBlockController.ReadBlocks(blocks, values, timeout)) return ErrorCode::Success;
auto result = m_pBlockController.ReadBlocks(blocks, values, timeout, reqs);
return result ? ErrorCode::Success : ErrorCode::Fail;
}
@@ -648,26 +722,17 @@ namespace SPTAG::SPANN {
ErrorCode MultiGet(const std::vector& keys, std::vector* values,
const std::chrono::microseconds& timeout, std::vector* reqs) {
std::vector blocks;
- std::set lock_keys;
SizeType r;
values->resize(keys.size());
int i = 0;
for (SizeType key : keys) {
r = m_pBlockMapping.R();
if (key < r) {
- AddressType* addr = (AddressType*)(At(key));
- if (m_pShardedLRUCache && ((uintptr_t)addr) != 0xffffffffffffffff && addr[0] >= 0) {
- int size = (int)(addr[0]);
- (*values)[i].resize(size);
- if (m_pShardedLRUCache->get(key, (*values)[i].data(), size)) {
- (*values)[i].resize(size);
- blocks.push_back(nullptr);
- }
- else {
- blocks.push_back(addr);
- }
+ if (m_pShardedLRUCache && m_pShardedLRUCache->get(key, (*values)[i])) {
+ blocks.push_back(nullptr);
}
else {
+ AddressType* addr = (AddressType*)(At(key));
blocks.push_back(addr);
}
}
@@ -676,7 +741,6 @@ namespace SPTAG::SPANN {
}
i++;
}
- // if (m_pBlockController.ReadBlocks(blocks, values, timeout)) return ErrorCode::Success;
auto result = m_pBlockController.ReadBlocks(blocks, values, timeout, reqs);
return result ? ErrorCode::Success : ErrorCode::Fail;
}
@@ -732,6 +796,48 @@ namespace SPTAG::SPANN {
{
lock = &(m_pShardedLRUCache->getlock(key));
lock->lock();
+
+ if (m_pShardedLRUCache->put(key, (void *)(value.data()), (SPTAG::SizeType)(value.size())))
+ {
+ if (At(key) == 0xffffffffffffffff)
+ {
+ uintptr_t tmpblocks = 0xffffffffffffffff;
+ if (m_buffer.unsafe_size() > m_bufferLimit)
+ {
+ while (!m_buffer.try_pop(tmpblocks));
+ }
+ else
+ {
+ tmpblocks = (uintptr_t)(new AddressType[m_blockLimit]);
+ }
+ // The 0th element of the block address list represents the data size; set it to -1.
+ memset((AddressType *)tmpblocks, -1, sizeof(AddressType) * m_blockLimit);
+ At(key) = tmpblocks;
+ }
+ int64_t *postingSize = (int64_t *)At(key);
+ int oldblocks = (*postingSize < 0) ? 0 : ((*postingSize + PageSize - 1) >> PageSizeEx);
+ if (blocks - oldblocks > 0)
+ {
+ if (!m_pBlockController.GetBlocks(postingSize + oldblocks + 1, blocks - oldblocks))
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[Put] Not enough blocks in the pool can be allocated!\n");
+ if (lock) lock->unlock();
+ return ErrorCode::DiskIOFail;
+ }
+ }
+ else if (blocks - oldblocks < 0)
+ {
+ if (!m_pBlockController.ReleaseBlocks(postingSize + blocks + 1, oldblocks - blocks))
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[Put] Release blocks back to the pool failed!\n");
+ if (lock) lock->unlock();
+ return ErrorCode::DiskIOFail;
+ }
+ }
+ *postingSize = (int64_t)(value.size());
+ lock->unlock();
+ return ErrorCode::Success;
+ }
}
uintptr_t tmpblocks = 0xffffffffffffffff;
// If this key has not been assigned mapping blocks yet, allocate a batch.
@@ -759,16 +865,14 @@ namespace SPTAG::SPANN {
return ErrorCode::DiskIOFail;
}
*postingSize = value.size();
- if (!useCache || m_pShardedLRUCache == nullptr || !m_pShardedLRUCache->put(key, (void*)(value.data()), (SPTAG::SizeType)(value.size()))) {
- if (!m_pBlockController.WriteBlocks(postingSize + 1, blocks, value, timeout, reqs))
- {
- m_pBlockController.ReleaseBlocks(postingSize + 1, blocks);
- memset(postingSize + 1, -1, sizeof(AddressType) * blocks);
- SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[Put] Write new block failed!\n");
- if (lock) lock->unlock();
- return ErrorCode::DiskIOFail;
- }
- }
+ if (!m_pBlockController.WriteBlocks(postingSize + 1, blocks, value, timeout, reqs))
+ {
+ m_pBlockController.ReleaseBlocks(postingSize + 1, blocks);
+ memset(postingSize + 1, -1, sizeof(AddressType) * blocks);
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[Put] Write new block failed!\n");
+ if (lock) lock->unlock();
+ return ErrorCode::DiskIOFail;
+ }
At(key) = tmpblocks;
}
else {
@@ -785,15 +889,13 @@ namespace SPTAG::SPANN {
return ErrorCode::DiskIOFail;
}
*((int64_t*)partialtmpblocks) = value.size();
- if (!useCache || m_pShardedLRUCache == nullptr || !m_pShardedLRUCache->put(key, (void*)(value.data()), (SPTAG::SizeType)(value.size()))) {
- if (!m_pBlockController.WriteBlocks((AddressType*)partialtmpblocks + 1, blocks, value, timeout, reqs))
- {
- SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[Put] Write new block failed!\n");
- m_pBlockController.ReleaseBlocks((AddressType*)partialtmpblocks + 1, blocks);
- m_buffer.push(partialtmpblocks);
- if (lock) lock->unlock();
- return ErrorCode::DiskIOFail;
- }
+ if (!m_pBlockController.WriteBlocks((AddressType*)partialtmpblocks + 1, blocks, value, timeout, reqs))
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[Put] Write new block failed!\n");
+ m_pBlockController.ReleaseBlocks((AddressType*)partialtmpblocks + 1, blocks);
+ m_buffer.push(partialtmpblocks);
+ if (lock) lock->unlock();
+ return ErrorCode::DiskIOFail;
}
// Release the original blocks
@@ -815,7 +917,7 @@ namespace SPTAG::SPANN {
return Put(std::stoi(key), value, timeout, reqs, true);
}
- ErrorCode Check(const SizeType key, int size, std::vector *visited) override
+ ErrorCode Check(const SizeType key, int size, std::vector *visited) override
{
SizeType r = m_pBlockMapping.R();
@@ -842,40 +944,43 @@ namespace SPTAG::SPANN {
(int)(postingSize[i]), m_pBlockController.TotalBlocks());
return ErrorCode::Block_IDError;
}
-
- if (visited == nullptr)
- continue;
+ if (visited == nullptr) continue;
- if (visited->at(postingSize[i]))
+ if (postingSize[i] >= visited->size())
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[Check] Key %d failed: BLOCK %lld exceed total block size %zu!\n", key,
+ postingSize[i], visited->size());
+ continue;
+ }
+
+ if (visited->at(postingSize[i]) > 0)
{
- SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[Check] Block %lld double used!\n", postingSize[i]);
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[Check] Key %d failed: Block %lld double used!\n", key, postingSize[i]);
return ErrorCode::Block_IDError;
}
else
{
- visited->at(postingSize[i]) = true;
+ InterlockedExchange8((char*)(&(visited->at(postingSize[i]))), 1);
}
}
return ErrorCode::Success;
}
- void PrintPostingDiff(std::string& p1, std::string& p2, const char* pos) {
- if (p1.size() != p2.size()) {
- SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Merge %s: p1 and p2 have different sizes: before=%u after=%u\n", pos, p1.size(), p2.size());
- return;
- }
- std::string diff = "";
- for (size_t i = 0; i < p1.size(); i+=4) {
- if (p1[i] != p2[i]) {
- diff += "[" + std::to_string(i) + "]:" + std::to_string(int(p1[i])) + "^" + std::to_string(int(p2[i])) + " ";
- }
- }
- if (diff.size() != 0) {
- SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Merge %s: %s\n", pos, diff.c_str());
+ int64_t GetApproximateMemoryUsage() const override
+ {
+ int64_t result = m_pBlockMapping.BufferSize();
+ if (m_pShardedLRUCache)
+ {
+ result += m_pShardedLRUCache->GetApproximateMemoryUsage();
}
- }
- ErrorCode Merge(const SizeType key, const std::string& value, const std::chrono::microseconds& timeout, std::vector* reqs) {
+ return result;
+ }
+
+ ErrorCode Merge(const SizeType key, const std::string &value, const std::chrono::microseconds &timeout,
+ std::vector *reqs,
+ std::function checksum)
+ {
SizeType r = m_pBlockMapping.R();
if (key >= r)
{
@@ -912,10 +1017,22 @@ namespace SPTAG::SPANN {
if (lock) lock->unlock();
return ErrorCode::Posting_OverFlow;
}
+ if (m_pShardedLRUCache && m_pShardedLRUCache->merge(key, (void *)(value.data()), value.size(), checksum))
+ {
+ int oldblocks = ((*postingSize + PageSize - 1) >> PageSizeEx);
+ int allocblocks = newblocks - oldblocks;
+ if (allocblocks > 0 && !m_pBlockController.GetBlocks(postingSize + 1 + oldblocks, allocblocks))
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[Merge] Not enough blocks in the pool can be allocated!\n");
+ if (lock) lock->unlock();
+ return ErrorCode::DiskIOFail;
+ }
+ *postingSize = newSize;
+ if (lock) lock->unlock();
+ return ErrorCode::Success;
+ }
- //std::string before;
- //Get(key, &before, timeout, reqs);
-
+ postingSize = (int64_t *)At(key);
auto sizeInPage = (*postingSize) % PageSize; // Actual size of the last block
int oldblocks = (*postingSize >> PageSizeEx);
int allocblocks = newblocks - oldblocks;
@@ -942,20 +1059,18 @@ namespace SPTAG::SPANN {
m_buffer.push(tmpblocks);
if (lock) lock->unlock();
return ErrorCode::DiskIOFail;
+ }
+ if (!m_pBlockController.WriteBlocks((AddressType *)tmpblocks + 1 + oldblocks, allocblocks, newValue,
+ timeout, reqs))
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error,
+ "[Merge] Write new block failed!\n");
+ m_pBlockController.ReleaseBlocks((AddressType *)tmpblocks + 1 + oldblocks, allocblocks);
+ m_buffer.push(tmpblocks);
+ if (lock) lock->unlock();
+ return ErrorCode::DiskIOFail;
}
- *((int64_t*)tmpblocks) = newSize;
- if (m_pShardedLRUCache == nullptr || !m_pShardedLRUCache->merge(key, (void *)(value.data()), value.size())) {
- if (!m_pBlockController.WriteBlocks((AddressType *)tmpblocks + 1 + oldblocks, allocblocks, newValue,
- timeout, reqs))
- {
- SPTAGLIB_LOG(Helper::LogLevel::LL_Error,
- "[Merge] Write new block failed!\n");
- m_pBlockController.ReleaseBlocks((AddressType *)tmpblocks + 1 + oldblocks, allocblocks);
- m_buffer.push(tmpblocks);
- if (lock) lock->unlock();
- return ErrorCode::DiskIOFail;
- }
- }
+ *((int64_t *)tmpblocks) = newSize;
// This is also to ensure checkpoint correctness, so we release the partially used block and allocate a new one.
m_pBlockController.ReleaseBlocks(postingSize + 1 + oldblocks, 1);
@@ -972,31 +1087,20 @@ namespace SPTAG::SPANN {
if (lock) lock->unlock();
return ErrorCode::DiskIOFail;
}
-
- if (m_pShardedLRUCache == nullptr || !m_pShardedLRUCache->merge(key, (void *)(value.data()), value.size())) {
- if (!m_pBlockController.WriteBlocks(postingSize + 1 + oldblocks, allocblocks, value, timeout, reqs))
- {
- SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[Merge] Write new block failed!\n");
- m_pBlockController.ReleaseBlocks(postingSize + 1 + oldblocks, allocblocks);
- if (lock) lock->unlock();
- return ErrorCode::DiskIOFail;
- }
+
+ if (!m_pBlockController.WriteBlocks(postingSize + 1 + oldblocks, allocblocks, value, timeout, reqs))
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "[Merge] Write new block failed!\n");
+ m_pBlockController.ReleaseBlocks(postingSize + 1 + oldblocks, allocblocks);
+ if (lock) lock->unlock();
+ return ErrorCode::DiskIOFail;
}
*postingSize = newSize;
}
- /*
- std::string after;
- Get(key, &after, timeout, reqs);
- before += value;
- PrintPostingDiff(before, after, "1");
- */
if (lock) lock->unlock();
return ErrorCode::Success;
}
- ErrorCode Merge(const std::string &key, const std::string& value, const std::chrono::microseconds& timeout, std::vector* reqs) {
- return Merge(std::stoi(key), value, timeout, reqs);
- }
ErrorCode Delete(SizeType key) override {
SizeType r = m_pBlockMapping.R();
@@ -1051,13 +1155,19 @@ namespace SPTAG::SPANN {
int remainGB = ((long long)(remainBlocks + reserveBlocks) >> (30 - PageSizeEx));
// int remainGB = remainBlocks >> 20 << 2;
SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Total %d blocks, Remain %d blocks, Reserve %d blocks, totally %d GB\n", totalBlocks, remainBlocks, reserveBlocks, remainGB);
- double average_read_time = (double)read_time_vec / get_times_vec;
- double average_get_time = (double)get_time_vec / get_times_vec;
- SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Get times: %llu, get time: %llu us, read time: %llu us\n", get_times_vec.load(), get_time_vec.load(), read_time_vec.load());
- SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Average read time: %lf us, average get time: %lf us\n", average_read_time, average_get_time);
if (m_pShardedLRUCache) {
auto cache_stat = m_pShardedLRUCache->get_stat();
- SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Cache queries: %lld, Cache hits: %lld, Hit rates: %lf\n", cache_stat.first, cache_stat.second, cache_stat.second == 0 ? 0 : (double)cache_stat.second / cache_stat.first);
+ if (cache_stat.query + cache_stat.put)
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "LRUCache query: %lld/%lld (%f), put: %lld/%lld (%f)\n",
+ cache_stat.query_hits,
+ cache_stat.query,
+ cache_stat.query ? cache_stat.query_hits / (float) cache_stat.query : 0.0,
+ cache_stat.evict,
+ cache_stat.put,
+ cache_stat.put ? cache_stat.evict / (float) cache_stat.put : 0.0
+ );
+ }
}
m_pBlockController.IOStatistics();
}
@@ -1139,10 +1249,6 @@ namespace SPTAG::SPANN {
}
private:
- std::atomic read_time_vec = 0;
- std::atomic get_time_vec = 0;
- std::atomic get_times_vec = 0;
-
std::string m_mappingPath;
SizeType m_blockLimit;
COMMON::Dataset m_pBlockMapping;
@@ -1152,7 +1258,7 @@ namespace SPTAG::SPANN {
std::shared_ptr m_compactionThreadPool;
BlockController m_pBlockController;
- ShardedLRUCache *m_pShardedLRUCache;
+ ShardedLRUCache *m_pShardedLRUCache{nullptr};
bool m_shutdownCalled;
std::shared_mutex m_updateMutex;
diff --git a/AnnService/inc/Core/SPANN/ExtraRocksDBController.h b/AnnService/inc/Core/SPANN/ExtraRocksDBController.h
index 732dda6f..8ecc4ff8 100644
--- a/AnnService/inc/Core/SPANN/ExtraRocksDBController.h
+++ b/AnnService/inc/Core/SPANN/ExtraRocksDBController.h
@@ -279,7 +279,10 @@ namespace SPTAG::SPANN
return Put(k, value, timeout, reqs);
}
- ErrorCode Merge(const SizeType key, const std::string& value, const std::chrono::microseconds& timeout, std::vector* reqs) override {
+ ErrorCode Merge(const SizeType key, const std::string &value, const std::chrono::microseconds &timeout,
+ std::vector *reqs,
+ std::function checksum) override
+ {
if (value.empty()) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Error! empty append posting!\n");
}
diff --git a/AnnService/inc/Core/SPANN/ExtraSPDKController.h b/AnnService/inc/Core/SPANN/ExtraSPDKController.h
index 396fb24b..1954588d 100644
--- a/AnnService/inc/Core/SPANN/ExtraSPDKController.h
+++ b/AnnService/inc/Core/SPANN/ExtraSPDKController.h
@@ -348,7 +348,10 @@ namespace SPTAG::SPANN
return ErrorCode::Success;
}
- ErrorCode Merge(SizeType key, const std::string& value, const std::chrono::microseconds& timeout, std::vector* reqs) override {
+ ErrorCode Merge(SizeType key, const std::string &value, const std::chrono::microseconds &timeout,
+ std::vector *reqs,
+ std::function checksum) override
+ {
if (key >= m_pBlockMapping.R()) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "Key range error: key: %d, mapping size: %d\n", key, m_pBlockMapping.R());
return ErrorCode::Fail;
diff --git a/AnnService/inc/Core/SPANN/ExtraStaticSearcher.h b/AnnService/inc/Core/SPANN/ExtraStaticSearcher.h
index 2149e0ad..2d21f2ac 100644
--- a/AnnService/inc/Core/SPANN/ExtraStaticSearcher.h
+++ b/AnnService/inc/Core/SPANN/ExtraStaticSearcher.h
@@ -1056,7 +1056,7 @@ namespace SPTAG
return m_listInfos[postingID].listEleCount != 0;
}
- virtual ErrorCode CheckPosting(SizeType postingID, std::vector *visited = nullptr,
+ virtual ErrorCode CheckPosting(SizeType postingID, std::vector *visited = nullptr,
ExtraWorkSpace *p_exWorkSpace = nullptr) override
{
if (postingID < 0 || postingID >= m_totalListCount)
diff --git a/AnnService/inc/Core/SPANN/IExtraSearcher.h b/AnnService/inc/Core/SPANN/IExtraSearcher.h
index 95d17185..be59225c 100644
--- a/AnnService/inc/Core/SPANN/IExtraSearcher.h
+++ b/AnnService/inc/Core/SPANN/IExtraSearcher.h
@@ -313,7 +313,7 @@ namespace SPTAG {
virtual ErrorCode GetPostingDebug(ExtraWorkSpace* p_exWorkSpace, std::shared_ptr p_index, SizeType vid, std::vector& VIDs, std::shared_ptr& vecs) = 0;
- virtual ErrorCode RefineIndex(std::shared_ptr p_index, bool p_prereassign = true,
+ virtual ErrorCode RefineIndex(std::shared_ptr& p_index, bool p_prereassign = true,
std::vector *p_headmapping = nullptr,
std::vector *p_mapping = nullptr)
{
@@ -330,7 +330,7 @@ namespace SPTAG {
virtual void ForceCompaction() { return; }
virtual bool CheckValidPosting(SizeType postingID) = 0;
- virtual ErrorCode CheckPosting(SizeType postingiD, std::vector *visited = nullptr,
+ virtual ErrorCode CheckPosting(SizeType postingiD, std::vector *visited = nullptr,
ExtraWorkSpace *p_exWorkSpace = nullptr) = 0;
virtual SizeType SearchVector(ExtraWorkSpace* p_exWorkSpace, std::shared_ptr& p_vectorSet,
std::shared_ptr p_index, int testNum = 64, SizeType VID = -1) { return -1; }
diff --git a/AnnService/inc/Core/SPANN/Index.h b/AnnService/inc/Core/SPANN/Index.h
index 54b33c2b..0479d6c8 100644
--- a/AnnService/inc/Core/SPANN/Index.h
+++ b/AnnService/inc/Core/SPANN/Index.h
@@ -17,7 +17,7 @@
#include "inc/Core/Common/VersionLabel.h"
#include "inc/Core/Common/PostingSizeRecord.h"
-#include "inc/Core/Common/Labelset.h"
+#include "inc/Core/Common/LabelSet.h"
#include "inc/Helper/SimpleIniReader.h"
#include "inc/Helper/StringConvert.h"
#include "inc/Helper/ThreadPool.h"
diff --git a/AnnService/inc/Core/SPANN/Options.h b/AnnService/inc/Core/SPANN/Options.h
index 414111a1..f4962123 100644
--- a/AnnService/inc/Core/SPANN/Options.h
+++ b/AnnService/inc/Core/SPANN/Options.h
@@ -189,7 +189,9 @@ namespace SPTAG {
bool m_checksumInRead;
int m_cacheSize;
int m_cacheShards;
-
+ bool m_asyncMergeInSearch;
+ bool m_centeringToZero;
+
// Iterative
int m_headBatch;
int m_asyncAppendQueueSize;
diff --git a/AnnService/inc/Core/SPANN/ParameterDefinitionList.h b/AnnService/inc/Core/SPANN/ParameterDefinitionList.h
index 61c752e4..0a88e3f1 100644
--- a/AnnService/inc/Core/SPANN/ParameterDefinitionList.h
+++ b/AnnService/inc/Core/SPANN/ParameterDefinitionList.h
@@ -202,6 +202,7 @@ DefineSSDParameter(m_growthFileSize, int, 10, "GrowthFileSizeGB")
DefineSSDParameter(m_growThreshold, float, 0.05, "GrowthThreshold")
DefineSSDParameter(m_fDeletePercentageForRefine, float, 0.4F, "DeletePercentageForRefine") // Mutable
DefineSSDParameter(m_oneClusterCutMax, bool, false, "OneClusterCutMax") // Mutable
+DefineSSDParameter(m_asyncMergeInSearch, bool, true, "AsyncMergeInSearch") // Mutable
DefineSSDParameter(m_consistencyCheck, bool, false, "ConsistencyCheck") // Mutable
DefineSSDParameter(m_checksumCheck, bool, false, "ChecksumCheck") // Mutable
DefineSSDParameter(m_checksumInRead, bool, false, "ChecksumInRead") // Mutable
@@ -209,7 +210,8 @@ DefineSSDParameter(m_cacheSize, int, 0, "CacheSizeGB") // Mutable
DefineSSDParameter(m_cacheShards, int, 1, "CacheShards") // Mutable
DefineSSDParameter(m_asyncAppendQueueSize, int, 0, "AsyncAppendQueueSize") // Mutable
DefineSSDParameter(m_allowZeroReplica, bool, false, "AllowZeroReplica")
-
+DefineSSDParameter(m_centeringToZero, bool, false, "CenteringToZero")
+
// Iterative
DefineSSDParameter(m_headBatch, int, 32, "IterativeSearchHeadBatch") // Mutable
diff --git a/AnnService/inc/Helper/KeyValueIO.h b/AnnService/inc/Helper/KeyValueIO.h
index 4bda94a9..69e029bf 100644
--- a/AnnService/inc/Helper/KeyValueIO.h
+++ b/AnnService/inc/Helper/KeyValueIO.h
@@ -22,7 +22,9 @@ namespace SPTAG
virtual ErrorCode Get(const SizeType key, std::string* value, const std::chrono::microseconds& timeout, std::vector* reqs) = 0;
- virtual ErrorCode MultiGet(const std::vector& keys, std::vector>& values, const std::chrono::microseconds& timeout, std::vector* reqs) = 0;
+ virtual ErrorCode Get(const SizeType key, Helper::PageBuffer &value, const std::chrono::microseconds &timeout, std::vector *reqs, bool useCache = true) { return ErrorCode::Undefined; }
+
+ virtual ErrorCode MultiGet(const std::vector& keys, std::vector>& values, const std::chrono::microseconds& timeout, std::vector* reqs) { return ErrorCode::Undefined; }
virtual ErrorCode MultiGet(const std::vector& keys, std::vector* values, const std::chrono::microseconds& timeout, std::vector* reqs) = 0;
@@ -32,7 +34,10 @@ namespace SPTAG
virtual ErrorCode Put(const SizeType key, const std::string& value, const std::chrono::microseconds& timeout, std::vector* reqs) = 0;
- virtual ErrorCode Merge(const SizeType key, const std::string& value, const std::chrono::microseconds& timeout, std::vector* reqs) = 0;
+ virtual ErrorCode Merge(const SizeType key, const std::string &value,
+ const std::chrono::microseconds &timeout,
+ std::vector *reqs,
+ std::function checksum) = 0;
virtual ErrorCode Delete(SizeType key) = 0;
@@ -46,11 +51,16 @@ namespace SPTAG
virtual bool Available() { return false; }
- virtual ErrorCode Check(const SizeType key, int size, std::vector *visited)
+ virtual ErrorCode Check(const SizeType key, int size, std::vector *visited)
{
return ErrorCode::Undefined;
}
+ virtual int64_t GetApproximateMemoryUsage() const
+ {
+ return 0;
+ }
+
virtual ErrorCode Checkpoint(std::string prefix) {return ErrorCode::Undefined;}
virtual ErrorCode StartToScan(SizeType& key, std::string* value) {return ErrorCode::Undefined;}
diff --git a/AnnService/inc/Helper/ThreadPool.h b/AnnService/inc/Helper/ThreadPool.h
index 96d78183..6aee44b3 100644
--- a/AnnService/inc/Helper/ThreadPool.h
+++ b/AnnService/inc/Helper/ThreadPool.h
@@ -104,7 +104,12 @@ namespace SPTAG
inline uint32_t runningJobs() { return currentJobs; }
- inline bool allClear() { return currentJobs == 0 && jobsize() == 0; }
+ inline bool allClear() {
+ size_t totaljobs = jobsize();
+ if (totaljobs % 10000 == 0)
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "jobsize: %zu\n", totaljobs);
+ return currentJobs == 0 && totaljobs == 0;
+ }
protected:
std::atomic_uint32_t currentJobs{ 0 };
diff --git a/AnnService/src/Core/BKT/BKTIndex.cpp b/AnnService/src/Core/BKT/BKTIndex.cpp
index 603272a3..77ecce4f 100644
--- a/AnnService/src/Core/BKT/BKTIndex.cpp
+++ b/AnnService/src/Core/BKT/BKTIndex.cpp
@@ -69,9 +69,9 @@ template ErrorCode Index::LoadIndexDataFromMemory(const std::vec
return ErrorCode::FailedParseValue;
if (p_indexBlobs.size() <= 3)
m_deletedID.Initialize(m_pSamples.R(), m_iDataBlockSize, m_iDataCapacity,
- COMMON::Labelset::InvalidIDBehavior::AlwaysContains);
+ COMMON::LabelSet::InvalidIDBehavior::AlwaysContains);
else if (m_deletedID.Load((char *)p_indexBlobs[3].Data(), m_iDataBlockSize, m_iDataCapacity,
- COMMON::Labelset::InvalidIDBehavior::AlwaysContains) != ErrorCode::Success)
+ COMMON::LabelSet::InvalidIDBehavior::AlwaysContains) != ErrorCode::Success)
return ErrorCode::FailedParseValue;
if (m_pSamples.R() != m_pGraph.R() || m_pSamples.R() != m_deletedID.R())
@@ -104,9 +104,9 @@ ErrorCode Index::LoadIndexData(const std::vector
-template &, SizeType, float),
bool (*checkFilter)(const std::shared_ptr &, SizeType, std::function)>
void Index::Search(COMMON::QueryResultSet &p_query, COMMON::WorkSpace &p_space,
@@ -394,7 +394,7 @@ void Index::Search(COMMON::QueryResultSet &p_query, COMMON::WorkSpace &p_s
}
template
-template &, SizeType, float),
bool (*checkFilter)(const std::shared_ptr &, SizeType, std::function)>
int Index::SearchIterative(COMMON::QueryResultSet &p_query, COMMON::WorkSpace &p_space, bool p_isFirst,
@@ -485,7 +485,7 @@ template bool AlwaysTrue(Args...)
return true;
}
-bool CheckIfNotDeleted(const COMMON::Labelset &deletedIDs, SizeType node)
+bool CheckIfNotDeleted(const COMMON::LabelSet &deletedIDs, SizeType node)
{
return !deletedIDs.Contains(node);
}
@@ -828,7 +828,7 @@ ErrorCode Index::BuildIndex(const void *p_data, SizeType p_vectorNum, Dimensi
m_pSamples.Initialize(p_vectorNum, p_dimension, m_iDataBlockSize, m_iDataCapacity, (T *)p_data, p_shareOwnership);
m_deletedID.Initialize(p_vectorNum, m_iDataBlockSize, m_iDataCapacity,
- COMMON::Labelset::InvalidIDBehavior::AlwaysContains);
+ COMMON::LabelSet::InvalidIDBehavior::AlwaysContains);
if (DistCalcMethod::Cosine == m_iDistCalcMethod && !p_normalized)
{
@@ -906,7 +906,7 @@ template ErrorCode Index::RefineIndex(std::shared_ptrm_deletedID.Initialize(newR, m_iDataBlockSize, m_iDataCapacity,
- COMMON::Labelset::InvalidIDBehavior::AlwaysContains);
+ COMMON::LabelSet::InvalidIDBehavior::AlwaysContains);
COMMON::BKTree *newtree = &(ptr->m_pTrees);
(*newtree).BuildTrees(ptr->m_pSamples, ptr->m_iDistCalcMethod, m_iNumberOfThreads);
m_pGraph.RefineGraph(this, indices, reverseIndices, nullptr, &(ptr->m_pGraph), &(ptr->m_pTrees.GetSampleMap()));
@@ -985,9 +985,9 @@ ErrorCode Index::RefineIndex(const std::vector ErrorCode Index::LoadIndexDataFromMemory(const std::vec
return ErrorCode::FailedParseValue;
if (p_indexBlobs.size() <= 3)
m_deletedID.Initialize(m_pSamples.R(), m_iDataBlockSize, m_iDataCapacity,
- COMMON::Labelset::InvalidIDBehavior::AlwaysContains);
+ COMMON::LabelSet::InvalidIDBehavior::AlwaysContains);
else if (m_deletedID.Load((char *)p_indexBlobs[3].Data(), m_iDataBlockSize, m_iDataCapacity,
- COMMON::Labelset::InvalidIDBehavior::AlwaysContains) != ErrorCode::Success)
+ COMMON::LabelSet::InvalidIDBehavior::AlwaysContains) != ErrorCode::Success)
return ErrorCode::FailedParseValue;
if (m_pSamples.R() != m_pGraph.R() || m_pSamples.R() != m_deletedID.R())
@@ -103,9 +103,9 @@ ErrorCode Index::LoadIndexData(const std::vector
-template
+template
void Index::Search(COMMON::QueryResultSet &p_query, COMMON::WorkSpace &p_space) const
{
std::shared_lock lock(*(m_pTrees.m_lock));
@@ -272,12 +272,12 @@ void Index::Search(COMMON::QueryResultSet &p_query, COMMON::WorkSpace &p_s
namespace StaticDispatch
{
-bool AlwaysTrue(const COMMON::Labelset &deletedIDs, SizeType node)
+bool AlwaysTrue(const COMMON::LabelSet &deletedIDs, SizeType node)
{
return true;
}
-bool CheckIfNotDeleted(const COMMON::Labelset &deletedIDs, SizeType node)
+bool CheckIfNotDeleted(const COMMON::LabelSet &deletedIDs, SizeType node)
{
return !deletedIDs.Contains(node);
}
@@ -490,7 +490,7 @@ ErrorCode Index::BuildIndex(const void *p_data, SizeType p_vectorNum, Dimensi
m_pSamples.Initialize(p_vectorNum, p_dimension, m_iDataBlockSize, m_iDataCapacity, (T *)p_data, p_shareOwnership);
m_deletedID.Initialize(p_vectorNum, m_iDataBlockSize, m_iDataCapacity,
- COMMON::Labelset::InvalidIDBehavior::AlwaysContains);
+ COMMON::LabelSet::InvalidIDBehavior::AlwaysContains);
if (DistCalcMethod::Cosine == m_iDistCalcMethod && !p_normalized)
{
@@ -567,7 +567,7 @@ template ErrorCode Index::RefineIndex(std::shared_ptrm_deletedID.Initialize(newR, m_iDataBlockSize, m_iDataCapacity,
- COMMON::Labelset::InvalidIDBehavior::AlwaysContains);
+ COMMON::LabelSet::InvalidIDBehavior::AlwaysContains);
COMMON::KDTree *newtree = &(ptr->m_pTrees);
(*newtree).BuildTrees(ptr->m_pSamples, m_iNumberOfThreads);
@@ -666,9 +666,9 @@ ErrorCode Index::RefineIndex(const std::vector p_metain, std::shared_ptr p_metaindexin,
- std::uint64_t p_blockSize, std::uint64_t p_capacity, std::uint64_t p_metaSize)
+ std::uint64_t p_blockSize, std::uint64_t p_capacity, std::uint64_t p_metaSize, int start, int count)
{
IOBINARY(p_metaindexin, ReadBinary, sizeof(m_count), (char *)&m_count);
+ if (start > m_count) start = m_count;
+ if (count < 0 || count > m_count - start) count = m_count - start;
+
+ std::uint64_t offset = 0;
m_pOffsets.reset(new MetadataOffsets, std::default_delete());
auto &m_offsets = *static_cast(m_pOffsets.get());
m_offsets.reserve(p_blockSize, p_capacity);
{
std::vector tmp(m_count + 1, 0);
IOBINARY(p_metaindexin, ReadBinary, sizeof(std::uint64_t) * (m_count + 1), (char *)tmp.data());
- m_offsets.assign(tmp.data(), tmp.data() + tmp.size());
+ offset = tmp[start];
+ if (offset > 0)
+ {
+ for (int i = start; i <= start + count; i++)
+ tmp[i] -= offset;
+ }
+ m_offsets.assign(tmp.data() + start, tmp.data() + start + count + 1);
}
- m_metadataHolder = ByteArray::Alloc(m_offsets[m_count]);
- IOBINARY(p_metain, ReadBinary, m_metadataHolder.Length(), (char *)m_metadataHolder.Data());
+ m_metadataHolder = ByteArray::Alloc(m_offsets[count]);
+ IOBINARY(p_metain, ReadBinary, m_metadataHolder.Length(), (char *)m_metadataHolder.Data(), offset);
m_newdata.reserve(p_blockSize * p_metaSize);
m_lock.reset(new std::shared_timed_mutex, std::default_delete());
- SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Load MetaIndex(%d) Meta(%llu)\n", m_count, m_offsets[m_count]);
+ m_count = count;
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Load MetaIndex(%d) Meta(%llu) Offset(%llu)\n", m_count, m_offsets[m_count], offset);
return ErrorCode::Success;
}
@@ -307,7 +318,8 @@ MemMetadataSet::MemMetadataSet(std::shared_ptr p_metain, std::sh
}
MemMetadataSet::MemMetadataSet(const std::string &p_metafile, const std::string &p_metaindexfile,
- std::uint64_t p_blockSize, std::uint64_t p_capacity, std::uint64_t p_metaSize)
+ std::uint64_t p_blockSize, std::uint64_t p_capacity, std::uint64_t p_metaSize,
+ int start, int count)
{
std::shared_ptr ptrMeta = f_createIO(), ptrMetaIndex = f_createIO();
if (ptrMeta == nullptr || ptrMetaIndex == nullptr ||
@@ -318,7 +330,7 @@ MemMetadataSet::MemMetadataSet(const std::string &p_metafile, const std::string
p_metaindexfile.c_str());
throw std::runtime_error("Cannot open MemMetadataSet files");
}
- if (Init(ptrMeta, ptrMetaIndex, p_blockSize, p_capacity, p_metaSize) != ErrorCode::Success)
+ if (Init(ptrMeta, ptrMetaIndex, p_blockSize, p_capacity, p_metaSize, start, count) != ErrorCode::Success)
{
SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "ERROR: Cannot read MemMetadataSet!\n");
throw std::runtime_error("Cannot read MemMetadataSet");
diff --git a/AnnService/src/Core/SPANN/ExtraFileController.cpp b/AnnService/src/Core/SPANN/ExtraFileController.cpp
index ea69c0c3..0c794f3b 100644
--- a/AnnService/src/Core/SPANN/ExtraFileController.cpp
+++ b/AnnService/src/Core/SPANN/ExtraFileController.cpp
@@ -228,6 +228,52 @@ bool FileIO::BlockController::ReadBlocks(AddressType *p_data, std::string *p_val
return true;
}
+bool FileIO::BlockController::ReadBlocks(
+ AddressType *p_data, Helper::PageBuffer &p_value, const std::chrono::microseconds &timeout,
+ std::vector *reqs)
+{
+ if ((uintptr_t)p_data == 0xffffffffffffffff)
+ {
+ p_value.SetAvailableSize(0);
+ return true;
+ }
+
+ const int64_t postingSize = (int64_t)(p_data[0]);
+ auto blockNum = (postingSize + PageSize - 1) >> PageSizeEx;
+ if (blockNum > reqs->size())
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "FileIO::BlockController::ReadBlocks: %d > %u\n", (int)blockNum,
+ reqs->size());
+ p_value.SetAvailableSize(0);
+ return false;
+ }
+
+ p_value.SetAvailableSize(postingSize);
+ AddressType currOffset = 0;
+ AddressType dataIdx = 1;
+ for (int i = 0; i < blockNum; i++)
+ {
+ Helper::AsyncReadRequest &curr = reqs->at(i);
+ curr.m_readSize = (postingSize - currOffset) < PageSize ? (postingSize - currOffset) : PageSize;
+ curr.m_offset = p_data[dataIdx] * PageSize;
+ currOffset += PageSize;
+ dataIdx++;
+ }
+
+ std::uint32_t totalReads = m_fileHandle->BatchReadFile(reqs->data(), blockNum, timeout, m_batchSize);
+ read_submit_vec += blockNum;
+ read_complete_vec += totalReads;
+
+ if (totalReads < blockNum)
+ {
+ SPTAGLIB_LOG(Helper::LogLevel::LL_Warning, "FileIO::BlockController::ReadBlocks: %u < %u\n", totalReads,
+ blockNum);
+ m_batchReadTimeouts++;
+ return false;
+ }
+ return true;
+}
+
bool FileIO::BlockController::ReadBlocks(const std::vector