From 634f9ae1ead5b4686e0f8fab44bddfc7a03bfc92 Mon Sep 17 00:00:00 2001 From: alja Date: Mon, 24 Mar 2025 20:10:22 -0700 Subject: [PATCH 01/16] Query ETag --- src/XrdPfc/XrdPfc.cc | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 33ea646ef4f..71398e87f77 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -22,6 +22,8 @@ #include #include "XrdCl/XrdClURL.hh" +#include "XrdCl/XrdClFileSystem.hh" +#include "XrdCl/XrdClFileStateHandler.hh" #include "XrdOuc/XrdOucEnv.hh" #include "XrdOuc/XrdOucUtils.hh" @@ -1093,6 +1095,37 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) m_purge_delay_set.insert(f_name); } + // + // Cache control headers + // + + +/* + // stat example + XrdCl::FileSystem fs(url); // AMT .. the url is not passed to the plugin + XrdCl::Buffer queryArgs(100); + queryArgs.FromString(curl); // string value of xrdCl::QueryCode::XAttr + XrdCl::StatInfo *response = 0; + XrdCl::Status st = fs.Stat(url.GetPath(), response); + std::cout << st.GetShellCode() << " resp:" << response << "\n"; +*/ + + XrdCl::FileSystem fs(url); + XrdCl::Buffer queryArgs(500); + queryArgs.FromString(curl); // pass parh throug args + XrdCl::Buffer* response = nullptr; + + XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Space, queryArgs, response); + + std::cout << st.GetShellCode() << " resp in buffer:" << response << "\n"; + if (st.IsOK()) { + std::string etag = response->ToString(); + std::cout << "XrdCl::FileSystem::Query success: ETag = " << "" << etag << "\n"; + } + else { + std::cout << "XrdCl::FileSystem::Query failed " << st.GetShellCode() << "\n"; + } + struct stat sbuff; if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK) { From 521bf114bd7c3f7681556ea4b34011557df685cd Mon Sep 17 00:00:00 2001 From: alja Date: Mon, 31 Mar 2025 06:43:14 -0700 Subject: [PATCH 02/16] Add FCntl interface in the IO cache --- src/XrdOuc/XrdOucCache.hh | 15 +++++++++++++++ src/XrdPfc/XrdPfc.cc | 16 ++++++++++++++++ src/XrdPosix/XrdPosixFile.cc | 10 ++++++++++ src/XrdPosix/XrdPosixFile.hh | 2 ++ 4 files changed, 43 insertions(+) diff --git a/src/XrdOuc/XrdOucCache.hh b/src/XrdOuc/XrdOucCache.hh index ad582bcbbaf..87ad34a20fb 100644 --- a/src/XrdOuc/XrdOucCache.hh +++ b/src/XrdOuc/XrdOucCache.hh @@ -36,6 +36,7 @@ #include "XrdOuc/XrdOucCacheStats.hh" #include "XrdOuc/XrdOucIOVec.hh" +#include "XrdCl/XrdClBuffer.hh" struct stat; class XrdOucEnv; @@ -147,6 +148,20 @@ long long FSize() = 0; virtual int Fstat(struct stat &sbuff) {(void)sbuff; return 1;} + +//------------------------------------------------------------------------------ +//! Perform an fcntl() operation (defaults to passthrough). +//! +//! @param AMT, for the moment XrdCl::Buffer to pass query code value and +//! XrdCl::Buffer to pass the string response. The XrdCL::Buffers is +//! interpreted as std::string +//! +//! @return <0 - fstat failed, value is -errno. +//! =0 - fstat succeeded, sbuff holds stat information. +//! >0 - fstat could not be done, forward operation to next level. +//------------------------------------------------------------------------------ +virtual int Fcntl(const XrdCl::Buffer& args, XrdCl::Buffer*& res) { return -1; } + //----------------------------------------------------------------------------- //! Get the file's location (i.e. endpoint hostname and port) //! diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 71398e87f77..d4e98f82864 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -440,6 +440,22 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f } else { filesize = st.st_size; } + + XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::XAttr; // tmp ... should be XrdCl::QueryCode::ETag ???? + XrdCl::Buffer queryArgs(5); + std::string qs = std::to_string(queryCode); + queryArgs.FromString(qs); + + XrdCl::Buffer* responseFctl = nullptr; + int resFctl = io->Base()->Fcntl(queryArgs, responseFctl); + if (resFctl == 0) + { + // seems like success + std::cout << "Cache::GetFile ... IO Query result buffer " << responseFctl->ToString() << "\n"; + } + else { + std::cout << "Cache::GetFile IO query to origin failed \n"; + } } File *file = 0; diff --git a/src/XrdPosix/XrdPosixFile.cc b/src/XrdPosix/XrdPosixFile.cc index 8863475843a..f204acc0179 100644 --- a/src/XrdPosix/XrdPosixFile.cc +++ b/src/XrdPosix/XrdPosixFile.cc @@ -386,6 +386,16 @@ int XrdPosixFile::Fstat(struct stat &buf) buf.st_mode = myMode; return 0; } + +int XrdPosixFile::Fcntl(const XrdCl::Buffer &arg, XrdCl::Buffer *&response) +{ + // AMT, how do I check if it is open ? + XrdCl::XRootDStatus st = clFile.Fcntl(arg, response); + + // AMT simplify for now. The XRootDStatus shell code is 0 or other positive value + // Do we need to check here for socket errors? + return st.GetShellCode(); +} /******************************************************************************/ /* H a n d l e R e s p o n s e */ diff --git a/src/XrdPosix/XrdPosixFile.hh b/src/XrdPosix/XrdPosixFile.hh index 6d99cf46489..5335384f969 100644 --- a/src/XrdPosix/XrdPosixFile.hh +++ b/src/XrdPosix/XrdPosixFile.hh @@ -100,6 +100,8 @@ static void DelayedDestroy(XrdPosixFile *fp); int Fstat(struct stat &buf) override; + int Fcntl(const XrdCl::Buffer& args, XrdCl::Buffer*& res) override; + const char *Location(bool refresh=false) override; void HandleResponse(XrdCl::XRootDStatus *status, From deb43b7edf5b5a676e420aaa1fbc3b026c22fdfd Mon Sep 17 00:00:00 2001 From: alja Date: Mon, 31 Mar 2025 06:50:21 -0700 Subject: [PATCH 03/16] Consistenty use XrdCl::QueryCode::XAttr to retrive ETag --- src/XrdPfc/XrdPfc.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index d4e98f82864..b3dea7c7ff3 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -1131,7 +1131,7 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) queryArgs.FromString(curl); // pass parh throug args XrdCl::Buffer* response = nullptr; - XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Space, queryArgs, response); + XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::XAttr, queryArgs, response); std::cout << st.GetShellCode() << " resp in buffer:" << response << "\n"; if (st.IsOK()) { From f0708e38b09b41f20c0307f8080f0833bb0b746d Mon Sep 17 00:00:00 2001 From: alja Date: Mon, 28 Apr 2025 12:32:18 -0700 Subject: [PATCH 04/16] End debug statement with new line. --- src/XrdCl/XrdClFile.hh | 2 +- src/XrdPfc/XrdPfc.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/XrdCl/XrdClFile.hh b/src/XrdCl/XrdClFile.hh index db9fbc0e4a2..5af4ae1b95c 100644 --- a/src/XrdCl/XrdClFile.hh +++ b/src/XrdCl/XrdClFile.hh @@ -599,7 +599,7 @@ namespace XrdCl Buffer *&response, uint16_t timeout = 0 ) XRD_WARN_UNUSED_RESULT; - +files //------------------------------------------------------------------------ //! Get access token to a file - async //! diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index b3dea7c7ff3..27044fb1096 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -451,10 +451,10 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f if (resFctl == 0) { // seems like success - std::cout << "Cache::GetFile ... IO Query result buffer " << responseFctl->ToString() << "\n"; + std::cout << "Cache::GetFile ... Fctl result buffer " << responseFctl->ToString() << "\n"; } else { - std::cout << "Cache::GetFile IO query to origin failed \n"; + std::cout << "Cache::GetFile Fctl to origin failed \n"; } } From c32c3a4a8599ff35563d04c3bbe0c170c74e5b50 Mon Sep 17 00:00:00 2001 From: alja Date: Wed, 14 May 2025 16:26:53 -0700 Subject: [PATCH 05/16] Query, store and validate Cache-Control values (ETag, exiration, and must-revalidate) --- src/XrdCl/XrdClFile.hh | 2 +- src/XrdPfc/XrdPfc.cc | 136 ++++++++++++++++++++++----------- src/XrdPfc/XrdPfc.hh | 4 + src/XrdPfc/XrdPfcFile.cc | 37 ++++++++- src/XrdPfc/XrdPfcFile.hh | 10 ++- src/XrdPosix/XrdPosixPrepIO.hh | 3 + 6 files changed, 139 insertions(+), 53 deletions(-) diff --git a/src/XrdCl/XrdClFile.hh b/src/XrdCl/XrdClFile.hh index 5af4ae1b95c..db9fbc0e4a2 100644 --- a/src/XrdCl/XrdClFile.hh +++ b/src/XrdCl/XrdClFile.hh @@ -599,7 +599,7 @@ namespace XrdCl Buffer *&response, uint16_t timeout = 0 ) XRD_WARN_UNUSED_RESULT; -files + //------------------------------------------------------------------------ //! Get access token to a file - async //! diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 27044fb1096..f0f9a910285 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -28,6 +28,7 @@ #include "XrdOuc/XrdOucEnv.hh" #include "XrdOuc/XrdOucUtils.hh" #include "XrdOuc/XrdOucPrivateUtils.hh" +#include "XrdOuc/XrdOucJson.hh" #include "XrdSys/XrdSysTimer.hh" #include "XrdSys/XrdSysTrace.hh" @@ -427,6 +428,7 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f } // This is always true, now that IOFileBlock is unsupported. + if (filesize == 0) { struct stat st; @@ -440,29 +442,31 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f } else { filesize = st.st_size; } + } - XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::XAttr; // tmp ... should be XrdCl::QueryCode::ETag ???? - XrdCl::Buffer queryArgs(5); - std::string qs = std::to_string(queryCode); - queryArgs.FromString(qs); + std::string ccjson; + XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::XAttr; // AMT tmp + XrdCl::Buffer queryArgs(5); + std::string qs = std::to_string(queryCode); + queryArgs.FromString(qs); - XrdCl::Buffer* responseFctl = nullptr; - int resFctl = io->Base()->Fcntl(queryArgs, responseFctl); - if (resFctl == 0) - { - // seems like success - std::cout << "Cache::GetFile ... Fctl result buffer " << responseFctl->ToString() << "\n"; - } - else { - std::cout << "Cache::GetFile Fctl to origin failed \n"; - } + XrdCl::Buffer* responseFctl = nullptr; + int resFctl = io->Base()->Fcntl(queryArgs, responseFctl); + if (resFctl == 0) + { + // seems like success + std::cout << "Cache::GetFile ... Fctl result buffer " << responseFctl->ToString() << "\n"; + ccjson = responseFctl->ToString(); + } + else { + std::cout << "Cache::GetFile Fctl to origin failed \n"; } File *file = 0; if (filesize >= 0) { - file = File::FileOpen(path, off, filesize); + file = File::FileOpen(path, off, filesize, ccjson); } { @@ -922,6 +926,19 @@ int Cache::LocalFilePath(const char *curl, char *buff, int blen, return -ENOENT; } +//______________________________________________________________________________ +// If supported, write Cache-Control as xattr to cinfo file. +//------------------------------------------------------------------------------ +void Cache::WriteCacheControlXAttr(int cinfo_fd, const std::string& cc) +{ + if (m_metaXattr) { + int res = XrdSysXAttrActive->Set("pfc.cache-control", cc.c_str(), cc.size(), 0, cinfo_fd, 0); + if (res != 0) { + TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res); + } + } +} + //______________________________________________________________________________ // If supported, write file_size as xattr to cinfo file. //------------------------------------------------------------------------------ @@ -976,6 +993,48 @@ long long Cache::DetermineFullFileSize(const std::string &cinfo_fname) return ret; } +//______________________________________________________________________________ +// Get cache control attributes from the corresponding cinfo-file name. +// Returns -error on failure. +//------------------------------------------------------------------------------ +int Cache::GetCacheControlXAttr(const std::string &cinfo_fname, std::string& ival) +{ + if (m_metaXattr) { + + char pfn[4096]; + m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096); + + char cc[512]; + int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, pfn, -1); + if (res > 0) + { + std::string tmp(cc); + ival = tmp; + //ival.assign(tmp); + return res; + } + } + return 0; +} + +//______________________________________________________________________________ +// Get cache control attributes from the corresponding cinfo-file name. +// Returns -error on failure. +//------------------------------------------------------------------------------ +int Cache::GetCacheControlXAttr(int fd, std::string& ival) +{ + if (m_metaXattr) { + char cc[512]; + int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, nullptr, fd); + if (res > 0) + { + ival = cc; + return res; + } + } + return 0; +} + //______________________________________________________________________________ // Calculate if the file is to be considered cached for the purposes of // only-if-cached and setting of atime of the Stat() calls. @@ -1111,41 +1170,26 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) m_purge_delay_set.insert(f_name); } - // - // Cache control headers - // - - -/* - // stat example - XrdCl::FileSystem fs(url); // AMT .. the url is not passed to the plugin - XrdCl::Buffer queryArgs(100); - queryArgs.FromString(curl); // string value of xrdCl::QueryCode::XAttr - XrdCl::StatInfo *response = 0; - XrdCl::Status st = fs.Stat(url.GetPath(), response); - std::cout << st.GetShellCode() << " resp:" << response << "\n"; -*/ - - XrdCl::FileSystem fs(url); - XrdCl::Buffer queryArgs(500); - queryArgs.FromString(curl); // pass parh throug args - XrdCl::Buffer* response = nullptr; - - XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::XAttr, queryArgs, response); - - std::cout << st.GetShellCode() << " resp in buffer:" << response << "\n"; - if (st.IsOK()) { - std::string etag = response->ToString(); - std::cout << "XrdCl::FileSystem::Query success: ETag = " << "" << etag << "\n"; - } - else { - std::cout << "XrdCl::FileSystem::Query failed " << st.GetShellCode() << "\n"; - } - struct stat sbuff; if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK) { TRACE(Dump, "Prepare defer open " << f_name); + + std::string icc; + if (GetCacheControlXAttr(i_name, icc) > 0) { + using namespace nlohmann; + json j = json::parse(icc); + if (j.contains("ETag") && j.contains("revalidate") && j["revalidate"] == true) { + return 0; + } + if (j.contains("expire")) { + time_t current_time; + current_time = time(NULL); + if (current_time > j["expire"]) { + return 0; + } + } + } return 1; } else diff --git a/src/XrdPfc/XrdPfc.hh b/src/XrdPfc/XrdPfc.hh index 45c9448b61c..4b334ba64f9 100644 --- a/src/XrdPfc/XrdPfc.hh +++ b/src/XrdPfc/XrdPfc.hh @@ -186,8 +186,12 @@ public: virtual int ConsiderCached(const char *url); bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk); + void WriteCacheControlXAttr(int cinfo_fd, const std::string& cc); void WriteFileSizeXAttr(int cinfo_fd, long long file_size); long long DetermineFullFileSize(const std::string &cinfo_fname); + int GetCacheControlXAttr(const std::string &cinfo_fname, std::string& res); + int GetCacheControlXAttr(int fd, std::string& res); + //-------------------------------------------------------------------- //! \brief Makes decision if the original XrdOucCacheIO should be cached. diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index f55f881aa04..fc867083e48 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -27,6 +27,7 @@ #include "XrdSys/XrdSysTimer.hh" #include "XrdOss/XrdOss.hh" #include "XrdOuc/XrdOucEnv.hh" +#include "XrdOuc/XrdOucJson.hh" #include "XrdSfs/XrdSfsInterface.hh" #include @@ -50,7 +51,7 @@ const char *File::m_traceID = "File"; //------------------------------------------------------------------------------ -File::File(const std::string& path, long long iOffset, long long iFileSize) : +File::File(const std::string& path, long long iOffset, long long iFileSize, const std::string& json) : m_ref_cnt(0), m_data_file(0), m_info_file(0), @@ -58,6 +59,7 @@ File::File(const std::string& path, long long iOffset, long long iFileSize) : m_filename(path), m_offset(iOffset), m_file_size(iFileSize), + m_cache_control(json), m_current_io(m_io_set.end()), m_ios_in_detach(0), m_non_flushed_cnt(0), @@ -135,9 +137,9 @@ void File::Close() //------------------------------------------------------------------------------ -File* File::FileOpen(const std::string &path, long long offset, long long fileSize) +File* File::FileOpen(const std::string &path, long long offset, long long fileSize, const std::string& json) { - File *file = new File(path, offset, fileSize); + File *file = new File(path, offset, fileSize, json); if ( ! file->Open()) { delete file; @@ -496,7 +498,7 @@ bool File::Open() ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")"); // Check if data file exists and is of reasonable size. - if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize()) + if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize() && TestCCXAttr()) { initialize_info_file = false; } else { @@ -531,6 +533,8 @@ bool File::Open() m_cfi.Write(m_info_file, ifn.c_str()); m_info_file->Fsync(); cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size); + cache()->WriteCacheControlXAttr(m_info_file->getFD(), m_cache_control); + // TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks()); } else @@ -1677,6 +1681,31 @@ std::string File::GetRemoteLocations() const return s; } +bool File::TestCCXAttr() +{ + std::string cc; + cache()->GetCacheControlXAttr(m_info_file->getFD(),cc); + + nlohmann::json j1 = nlohmann::json::parse(cc); + nlohmann::json j2 = nlohmann::json::parse(m_cache_control); + if (j2.contains("ETag") ) { + if (j1.contains("ETag")) { + if (j1["ETag"] == j2["ETag"]) { + return true; + } + else { + TRACEF(Info, "XrdPfc::File::TestCCXAttr ETag mismatch"); + return false; + } + } + else { + TRACEF(Info, "XrdPfc::File::TestCCXAttr. ETag entry missing"); + return false; + } + } + return true; +} + //============================================================================== //======================= RESPONSE HANDLERS ============================== //============================================================================== diff --git a/src/XrdPfc/XrdPfcFile.hh b/src/XrdPfc/XrdPfcFile.hh index 0d103ad4cc7..b8160dd453c 100644 --- a/src/XrdPfc/XrdPfcFile.hh +++ b/src/XrdPfc/XrdPfcFile.hh @@ -208,7 +208,7 @@ public: // Constructor, destructor, Open() and Close() are private. //! Static constructor that also does Open. Returns null ptr if Open fails. - static File* FileOpen(const std::string &path, long long offset, long long fileSize); + static File* FileOpen(const std::string &path, long long offset, long long fileSize, const std::string& json); //! Handle removal of a block from Cache's write queue. void BlockRemovedFromWriteQ(Block*); @@ -292,7 +292,7 @@ public: private: //! Constructor. - File(const std::string &path, long long offset, long long fileSize); + File(const std::string &path, long long offset, long long fileSize, const std::string& xattrJson); //! Destructor. ~File(); @@ -315,6 +315,9 @@ private: const long long m_offset; //!< offset of cached file for block-based / hdfs operation const long long m_file_size; //!< size of cached disk file for block-based operation + // HTTP header cache control + const std::string m_cache_control; //!< cache control values retrived with Fsctl from origin's IO + // IO objects attached to this file. typedef std::set IoSet_t; @@ -415,6 +418,9 @@ private: bool select_current_io_or_disable_prefetching(bool skip_current); int offsetIdx(int idx) const; + + // Http cache directive + bool TestCCXAttr(); }; //------------------------------------------------------------------------------ diff --git a/src/XrdPosix/XrdPosixPrepIO.hh b/src/XrdPosix/XrdPosixPrepIO.hh index d7eef1c250c..8516f570363 100644 --- a/src/XrdPosix/XrdPosixPrepIO.hh +++ b/src/XrdPosix/XrdPosixPrepIO.hh @@ -48,6 +48,9 @@ long long FSize() {return (Init() ? fileP->FSize() : openRC);} int Fstat(struct stat &buf) {return (Init() ? fileP->Fstat(buf) : openRC);} + +virtual int Fcntl(const XrdCl::Buffer& args, XrdCl::Buffer*& res) { return (Init() ? fileP->Fcntl(args, res) : openRC); } + int Open() {Init(); return openRC;} const char *Path() {return fileP->Path();} From 61a01417b0fb7130f556b853107fced3a32fc026 Mon Sep 17 00:00:00 2001 From: alja Date: Thu, 22 May 2025 15:45:47 -0700 Subject: [PATCH 06/16] New query XQueryType with num value 9, enum name kXR_Qhead --- src/XProtocol/XProtocol.hh | 1 + src/XrdCl/XrdClFileSystem.hh | 1 + 2 files changed, 2 insertions(+) diff --git a/src/XProtocol/XProtocol.hh b/src/XProtocol/XProtocol.hh index 0bcb8df6399..d7681d30176 100644 --- a/src/XProtocol/XProtocol.hh +++ b/src/XProtocol/XProtocol.hh @@ -620,6 +620,7 @@ enum XQueryType { kXR_Qckscan= 6, kXR_Qconfig= 7, kXR_Qvisa = 8, + kXR_Qhead = 9, kXR_Qopaque=16, kXR_Qopaquf=32, kXR_Qopaqug=64 diff --git a/src/XrdCl/XrdClFileSystem.hh b/src/XrdCl/XrdClFileSystem.hh index 8229a234c10..1792c0e3f59 100644 --- a/src/XrdCl/XrdClFileSystem.hh +++ b/src/XrdCl/XrdClFileSystem.hh @@ -60,6 +60,7 @@ namespace XrdCl Space = kXR_Qspace, //!< Query logical space stats Stats = kXR_QStats, //!< Query server stats Visa = kXR_Qvisa, //!< Query file visa attributes + Head = kXR_Qhead, //!< Query http header response XAttr = kXR_Qxattr //!< Query file extended attributes }; }; From 9e60b517ebcea056e6ce6d9d4c407232efccd46b Mon Sep 17 00:00:00 2001 From: alja Date: Wed, 4 Jun 2025 13:37:55 -0700 Subject: [PATCH 07/16] In Prepare() compare etags if must revalidate is on in the cinfo xattr. --- src/XrdPfc/XrdPfc.cc | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index f0f9a910285..0b550bd7a08 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -1008,7 +1008,7 @@ int Cache::GetCacheControlXAttr(const std::string &cinfo_fname, std::string& iva int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, pfn, -1); if (res > 0) { - std::string tmp(cc); + std::string tmp(cc, res); ival = tmp; //ival.assign(tmp); return res; @@ -1028,7 +1028,7 @@ int Cache::GetCacheControlXAttr(int fd, std::string& ival) int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, nullptr, fd); if (res > 0) { - ival = cc; + ival = std::string(cc, res); return res; } } @@ -1180,7 +1180,29 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) using namespace nlohmann; json j = json::parse(icc); if (j.contains("ETag") && j.contains("revalidate") && j["revalidate"] == true) { - return 0; + // comapre cinfo xattr etag and the etag from http header response + XrdCl::FileSystem fs(url); + XrdCl::Buffer queryArgs(500); + queryArgs.FromString(curl); // pass parh throug args + XrdCl::Buffer *response = nullptr; + + XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::XAttr, queryArgs, response); + + std::cout << "json " < ToString(); + std::cout << "XrdCl::FileSystem::Query success: ETag = " << "" << etag << "\n"; + if (etag == j["ETag"]) { + return 1; + } + } + else + { + TRACE(Error, "Prepare() XrdCl::FileSystem::Query failed " << f_name.c_str()); + return 0; + } } if (j.contains("expire")) { time_t current_time; From 543b89ba9f91561cbe664206de4572e108eb6356 Mon Sep 17 00:00:00 2001 From: alja Date: Wed, 4 Jun 2025 19:33:23 -0700 Subject: [PATCH 08/16] Improve debug messages in GetFile() --- src/XrdPfc/XrdPfc.cc | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 0b550bd7a08..09bd8ad24ac 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -444,22 +444,28 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f } } + // AMT:The File::Fcntl is competing/duplicating with FileSystem::Query + // in the case of defer open matching etags and not exceeding expiration date. + // Is there a way one know here this is the case of defer open? + // + // Also, the io->Baseis of XrdPosixPrepIO type + // this functions requires initializaton in the case of defer open + // The call needs to be optional (set from configuration) + // std::string ccjson; - XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::XAttr; // AMT tmp + XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::XAttr; XrdCl::Buffer queryArgs(5); std::string qs = std::to_string(queryCode); queryArgs.FromString(qs); - XrdCl::Buffer* responseFctl = nullptr; int resFctl = io->Base()->Fcntl(queryArgs, responseFctl); if (resFctl == 0) { - // seems like success - std::cout << "Cache::GetFile ... Fctl result buffer " << responseFctl->ToString() << "\n"; + TRACE(Debug, "GetFile() XrdCl::File::Fcntl value " << responseFctl->ToString()); ccjson = responseFctl->ToString(); } else { - std::cout << "Cache::GetFile Fctl to origin failed \n"; + TRACE(Error, "GetFile() XrdCl::File::Fcntl query failed " << io->Base()->Path()); } File *file = 0; @@ -1188,15 +1194,12 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::XAttr, queryArgs, response); - std::cout << "json " < ToString(); - std::cout << "XrdCl::FileSystem::Query success: ETag = " << "" << etag << "\n"; - if (etag == j["ETag"]) { - return 1; - } + bool etagValid = (etag == j["ETag"]); + TRACE(Info, "Prepare " << f_name << ", ETag valid res: " << etagValid); + return etagValid; } else { From 79a353d440274173feae263e1ca28f2e36b6fb82 Mon Sep 17 00:00:00 2001 From: alja Date: Thu, 17 Jul 2025 10:45:00 -0700 Subject: [PATCH 09/16] Perform Fcntl in Pfc::File::Open() only to retrive cache-control attributes when file does not exist in cache yet. --- src/XrdPfc/XrdPfc.cc | 76 ++++++++++++++++++++-------------------- src/XrdPfc/XrdPfc.hh | 2 +- src/XrdPfc/XrdPfcFile.cc | 69 +++++++++++++++++++----------------- src/XrdPfc/XrdPfcFile.hh | 12 ++----- 4 files changed, 78 insertions(+), 81 deletions(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 09bd8ad24ac..75d3f2f5f0c 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -444,35 +444,11 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f } } - // AMT:The File::Fcntl is competing/duplicating with FileSystem::Query - // in the case of defer open matching etags and not exceeding expiration date. - // Is there a way one know here this is the case of defer open? - // - // Also, the io->Baseis of XrdPosixPrepIO type - // this functions requires initializaton in the case of defer open - // The call needs to be optional (set from configuration) - // - std::string ccjson; - XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::XAttr; - XrdCl::Buffer queryArgs(5); - std::string qs = std::to_string(queryCode); - queryArgs.FromString(qs); - XrdCl::Buffer* responseFctl = nullptr; - int resFctl = io->Base()->Fcntl(queryArgs, responseFctl); - if (resFctl == 0) - { - TRACE(Debug, "GetFile() XrdCl::File::Fcntl value " << responseFctl->ToString()); - ccjson = responseFctl->ToString(); - } - else { - TRACE(Error, "GetFile() XrdCl::File::Fcntl query failed " << io->Base()->Path()); - } - File *file = 0; if (filesize >= 0) { - file = File::FileOpen(path, off, filesize, ccjson); + file = File::FileOpen(path, off, filesize, io->Base()); } { @@ -934,11 +910,12 @@ int Cache::LocalFilePath(const char *curl, char *buff, int blen, //______________________________________________________________________________ // If supported, write Cache-Control as xattr to cinfo file. +// One can use file descriptor or full path interchangeably //------------------------------------------------------------------------------ -void Cache::WriteCacheControlXAttr(int cinfo_fd, const std::string& cc) +void Cache::WriteCacheControlXAttr(int cinfo_fd, const char* path, const std::string& cc) { if (m_metaXattr) { - int res = XrdSysXAttrActive->Set("pfc.cache-control", cc.c_str(), cc.size(), 0, cinfo_fd, 0); + int res = XrdSysXAttrActive->Set("pfc.cache-control", cc.c_str(), cc.size(), path, cinfo_fd, 0); if (res != 0) { TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res); } @@ -1185,7 +1162,21 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) if (GetCacheControlXAttr(i_name, icc) > 0) { using namespace nlohmann; json j = json::parse(icc); - if (j.contains("ETag") && j.contains("revalidate") && j["revalidate"] == true) { + + bool mustRevalidate = j.contains("revalidate") && (j["revalidate"] == true); + bool hasExpired = false; + if (j.contains("expire")) { + time_t current_time; + current_time = time(NULL); + if (current_time > j["expire"]) { + hasExpired = true; + } + } + + + bool ccIsValid = true; + + if (j.contains("ETag") && (mustRevalidate || hasExpired)) { // comapre cinfo xattr etag and the etag from http header response XrdCl::FileSystem fs(url); XrdCl::Buffer queryArgs(500); @@ -1197,22 +1188,31 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) if (st.IsOK()) { std::string etag = response->ToString(); - bool etagValid = (etag == j["ETag"]); - TRACE(Info, "Prepare " << f_name << ", ETag valid res: " << etagValid); - return etagValid; + ccIsValid = (etag == j["ETag"]); + TRACE(Info, "Prepare " << f_name << ", ETag valid res: " << ccIsValid); + + // update expire time when Etag is valid + if (j.contains("max-age")) { + time_t ma = j["max-age"]; + j["expire"] = ma + time(NULL); + + char pfn[4096]; + m_oss->Lfn2Pfn(i_name.c_str(), pfn, 4096); + WriteCacheControlXAttr(-1, pfn, j.dump()); + } } else { TRACE(Error, "Prepare() XrdCl::FileSystem::Query failed " << f_name.c_str()); - return 0; } } - if (j.contains("expire")) { - time_t current_time; - current_time = time(NULL); - if (current_time > j["expire"]) { - return 0; - } + + + if (!ccIsValid) + { + // invalidate cinfo on ETag mismatch + // AMT ...what to do with the second fail_if_open argument ? + UnlinkFile(f_name, false); } } return 1; diff --git a/src/XrdPfc/XrdPfc.hh b/src/XrdPfc/XrdPfc.hh index 4b334ba64f9..b8a5002bf08 100644 --- a/src/XrdPfc/XrdPfc.hh +++ b/src/XrdPfc/XrdPfc.hh @@ -186,7 +186,7 @@ public: virtual int ConsiderCached(const char *url); bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk); - void WriteCacheControlXAttr(int cinfo_fd, const std::string& cc); + void WriteCacheControlXAttr(int cinfo_fd, const char* path, const std::string& cc); void WriteFileSizeXAttr(int cinfo_fd, long long file_size); long long DetermineFullFileSize(const std::string &cinfo_fname); int GetCacheControlXAttr(const std::string &cinfo_fname, std::string& res); diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index fc867083e48..3373b5871e3 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -30,6 +30,8 @@ #include "XrdOuc/XrdOucJson.hh" #include "XrdSfs/XrdSfsInterface.hh" +#include "XrdCl/XrdClFileStateHandler.hh" + #include #include #include @@ -51,7 +53,7 @@ const char *File::m_traceID = "File"; //------------------------------------------------------------------------------ -File::File(const std::string& path, long long iOffset, long long iFileSize, const std::string& json) : +File::File(const std::string& path, long long iOffset, long long iFileSize) : m_ref_cnt(0), m_data_file(0), m_info_file(0), @@ -59,7 +61,6 @@ File::File(const std::string& path, long long iOffset, long long iFileSize, cons m_filename(path), m_offset(iOffset), m_file_size(iFileSize), - m_cache_control(json), m_current_io(m_io_set.end()), m_ios_in_detach(0), m_non_flushed_cnt(0), @@ -137,10 +138,10 @@ void File::Close() //------------------------------------------------------------------------------ -File* File::FileOpen(const std::string &path, long long offset, long long fileSize, const std::string& json) +File* File::FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO* inputIO) { - File *file = new File(path, offset, fileSize, json); - if ( ! file->Open()) + File *file = new File(path, offset, fileSize); + if ( ! file->Open(inputIO)) { delete file; file = 0; @@ -422,7 +423,7 @@ void File::RemoveIO(IO *io) //------------------------------------------------------------------------------ -bool File::Open() +bool File::Open(XrdOucCacheIO* inputIO) { // Sets errno accordingly. @@ -498,7 +499,7 @@ bool File::Open() ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")"); // Check if data file exists and is of reasonable size. - if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize() && TestCCXAttr()) + if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize()) { initialize_info_file = false; } else { @@ -533,7 +534,34 @@ bool File::Open() m_cfi.Write(m_info_file, ifn.c_str()); m_info_file->Fsync(); cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size); - cache()->WriteCacheControlXAttr(m_info_file->getFD(), m_cache_control); + + // access and write cache-control attributes + std::string ccjson; + XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::XAttr; + XrdCl::Buffer queryArgs(5); + std::string qs = std::to_string(queryCode); + queryArgs.FromString(qs); + XrdCl::Buffer *responseFctl = nullptr; + int resFctl = inputIO->Fcntl(queryArgs, responseFctl); + if (resFctl == 0) + { + ccjson = responseFctl->ToString(); + nlohmann::json j = nlohmann::json::parse(ccjson); + std::cout << j.dump(2) << " ====\n"; + if (j.contains("max-age")) { + time_t ma = j["max-age"]; + ma += time(NULL); + j["expire"] = ma; + ccjson = j.dump(); + } + TRACE(Debug, "GetFile() XrdCl::File::Fcntl value " << ccjson); + cache()->WriteCacheControlXAttr(m_info_file->getFD(), nullptr, ccjson); + } + else + { + TRACE(Error, "GetFile() XrdCl::File::Fcntl query failed " << inputIO->Path()); + } + // TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks()); } @@ -1681,31 +1709,6 @@ std::string File::GetRemoteLocations() const return s; } -bool File::TestCCXAttr() -{ - std::string cc; - cache()->GetCacheControlXAttr(m_info_file->getFD(),cc); - - nlohmann::json j1 = nlohmann::json::parse(cc); - nlohmann::json j2 = nlohmann::json::parse(m_cache_control); - if (j2.contains("ETag") ) { - if (j1.contains("ETag")) { - if (j1["ETag"] == j2["ETag"]) { - return true; - } - else { - TRACEF(Info, "XrdPfc::File::TestCCXAttr ETag mismatch"); - return false; - } - } - else { - TRACEF(Info, "XrdPfc::File::TestCCXAttr. ETag entry missing"); - return false; - } - } - return true; -} - //============================================================================== //======================= RESPONSE HANDLERS ============================== //============================================================================== diff --git a/src/XrdPfc/XrdPfcFile.hh b/src/XrdPfc/XrdPfcFile.hh index b8160dd453c..314fad2177a 100644 --- a/src/XrdPfc/XrdPfcFile.hh +++ b/src/XrdPfc/XrdPfcFile.hh @@ -208,7 +208,7 @@ public: // Constructor, destructor, Open() and Close() are private. //! Static constructor that also does Open. Returns null ptr if Open fails. - static File* FileOpen(const std::string &path, long long offset, long long fileSize, const std::string& json); + static File* FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO*); //! Handle removal of a block from Cache's write queue. void BlockRemovedFromWriteQ(Block*); @@ -292,7 +292,7 @@ public: private: //! Constructor. - File(const std::string &path, long long offset, long long fileSize, const std::string& xattrJson); + File(const std::string &path, long long offset, long long fileSize); //! Destructor. ~File(); @@ -301,7 +301,7 @@ private: void Close(); //! Open file handle for data file and info file on local disk. - bool Open(); + bool Open(XrdOucCacheIO* inputOrigin); static const char *m_traceID; @@ -315,9 +315,6 @@ private: const long long m_offset; //!< offset of cached file for block-based / hdfs operation const long long m_file_size; //!< size of cached disk file for block-based operation - // HTTP header cache control - const std::string m_cache_control; //!< cache control values retrived with Fsctl from origin's IO - // IO objects attached to this file. typedef std::set IoSet_t; @@ -418,9 +415,6 @@ private: bool select_current_io_or_disable_prefetching(bool skip_current); int offsetIdx(int idx) const; - - // Http cache directive - bool TestCCXAttr(); }; //------------------------------------------------------------------------------ From 54393878d73b11bf329ece0612d750fceac0ad99 Mon Sep 17 00:00:00 2001 From: alja Date: Thu, 17 Jul 2025 11:41:26 -0700 Subject: [PATCH 10/16] Include Matevz's comments --- src/XrdPfc/XrdPfc.cc | 7 +++---- src/XrdPfc/XrdPfcFile.cc | 3 ++- src/XrdPosix/XrdPosixFile.cc | 8 ++------ src/XrdPosix/XrdPosixPrepIO.hh | 3 +-- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 75d3f2f5f0c..0ace3fa48f0 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -448,7 +448,7 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f if (filesize >= 0) { - file = File::FileOpen(path, off, filesize, io->Base()); + file = File::FileOpen(path, off, filesize, io->GetInput()); } { @@ -917,7 +917,7 @@ void Cache::WriteCacheControlXAttr(int cinfo_fd, const char* path, const std::st if (m_metaXattr) { int res = XrdSysXAttrActive->Set("pfc.cache-control", cc.c_str(), cc.size(), path, cinfo_fd, 0); if (res != 0) { - TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res); + TRACE(Error, "WritecacheControlXAttr error setting xattr " << res); } } } @@ -993,9 +993,8 @@ int Cache::GetCacheControlXAttr(const std::string &cinfo_fname, std::string& iva { std::string tmp(cc, res); ival = tmp; - //ival.assign(tmp); - return res; } + return res; } return 0; } diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index 3373b5871e3..38629126384 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -548,7 +548,8 @@ bool File::Open(XrdOucCacheIO* inputIO) ccjson = responseFctl->ToString(); nlohmann::json j = nlohmann::json::parse(ccjson); std::cout << j.dump(2) << " ====\n"; - if (j.contains("max-age")) { + if (j.contains("max-age")) + { time_t ma = j["max-age"]; ma += time(NULL); j["expire"] = ma; diff --git a/src/XrdPosix/XrdPosixFile.cc b/src/XrdPosix/XrdPosixFile.cc index f204acc0179..ca01945d31c 100644 --- a/src/XrdPosix/XrdPosixFile.cc +++ b/src/XrdPosix/XrdPosixFile.cc @@ -389,12 +389,8 @@ int XrdPosixFile::Fstat(struct stat &buf) int XrdPosixFile::Fcntl(const XrdCl::Buffer &arg, XrdCl::Buffer *&response) { - // AMT, how do I check if it is open ? - XrdCl::XRootDStatus st = clFile.Fcntl(arg, response); - - // AMT simplify for now. The XRootDStatus shell code is 0 or other positive value - // Do we need to check here for socket errors? - return st.GetShellCode(); + XrdCl::XRootDStatus status = clFile.Fcntl(arg, response); + return status.IsOK() ? 0 : -1; } /******************************************************************************/ diff --git a/src/XrdPosix/XrdPosixPrepIO.hh b/src/XrdPosix/XrdPosixPrepIO.hh index 8516f570363..2cbada0879a 100644 --- a/src/XrdPosix/XrdPosixPrepIO.hh +++ b/src/XrdPosix/XrdPosixPrepIO.hh @@ -48,8 +48,7 @@ long long FSize() {return (Init() ? fileP->FSize() : openRC);} int Fstat(struct stat &buf) {return (Init() ? fileP->Fstat(buf) : openRC);} - -virtual int Fcntl(const XrdCl::Buffer& args, XrdCl::Buffer*& res) { return (Init() ? fileP->Fcntl(args, res) : openRC); } +int Fcntl(const XrdCl::Buffer& args, XrdCl::Buffer*& res) { return (Init() ? fileP->Fcntl(args, res) : openRC); } int Open() {Init(); return openRC;} From 18d82c9201e3eca1eb2571b8577213576b073028 Mon Sep 17 00:00:00 2001 From: alja Date: Thu, 17 Jul 2025 12:04:49 -0700 Subject: [PATCH 11/16] rename json variable --- src/XrdPfc/XrdPfc.cc | 33 ++++++++++++++++----------------- src/XrdPfc/XrdPfcFile.cc | 19 +++++++++---------- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 0ace3fa48f0..8e3bed9594f 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -1160,44 +1160,43 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) std::string icc; if (GetCacheControlXAttr(i_name, icc) > 0) { using namespace nlohmann; - json j = json::parse(icc); + json cc_json = json::parse(icc); - bool mustRevalidate = j.contains("revalidate") && (j["revalidate"] == true); + bool mustRevalidate = cc_json.contains("revalidate") && (cc_json["revalidate"] == true); bool hasExpired = false; - if (j.contains("expire")) { + if (cc_json.contains("expire")) + { time_t current_time; current_time = time(NULL); - if (current_time > j["expire"]) { + if (current_time > cc_json["expire"]) hasExpired = true; - } } - bool ccIsValid = true; - if (j.contains("ETag") && (mustRevalidate || hasExpired)) { - // comapre cinfo xattr etag and the etag from http header response + if (cc_json.contains("ETag") && (mustRevalidate || hasExpired)) { + // Compare cinfo xattr etag and the etag from file system query response + // Note: qeury returns only etag value, not a json string XrdCl::FileSystem fs(url); XrdCl::Buffer queryArgs(500); - queryArgs.FromString(curl); // pass parh throug args + queryArgs.FromString(curl); // pass file path throug args XrdCl::Buffer *response = nullptr; - XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::XAttr, queryArgs, response); if (st.IsOK()) { std::string etag = response->ToString(); - ccIsValid = (etag == j["ETag"]); + ccIsValid = (etag == cc_json["ETag"]); TRACE(Info, "Prepare " << f_name << ", ETag valid res: " << ccIsValid); - // update expire time when Etag is valid - if (j.contains("max-age")) { - time_t ma = j["max-age"]; - j["expire"] = ma + time(NULL); - + // update expiration time if Etag is valid + if (cc_json.contains("max-age")) + { + time_t ma = cc_json["max-age"]; + cc_json["expire"] = ma + time(NULL); char pfn[4096]; m_oss->Lfn2Pfn(i_name.c_str(), pfn, 4096); - WriteCacheControlXAttr(-1, pfn, j.dump()); + WriteCacheControlXAttr(-1, pfn, cc_json.dump()); } } else diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index 38629126384..a0564ff6679 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -536,7 +536,7 @@ bool File::Open(XrdOucCacheIO* inputIO) cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size); // access and write cache-control attributes - std::string ccjson; + std::string cc_str; XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::XAttr; XrdCl::Buffer queryArgs(5); std::string qs = std::to_string(queryCode); @@ -545,18 +545,17 @@ bool File::Open(XrdOucCacheIO* inputIO) int resFctl = inputIO->Fcntl(queryArgs, responseFctl); if (resFctl == 0) { - ccjson = responseFctl->ToString(); - nlohmann::json j = nlohmann::json::parse(ccjson); - std::cout << j.dump(2) << " ====\n"; - if (j.contains("max-age")) + cc_str = responseFctl->ToString(); + nlohmann::json cc_json = nlohmann::json::parse(cc_str); + if (cc_json.contains("max-age")) { - time_t ma = j["max-age"]; + time_t ma = cc_json["max-age"]; ma += time(NULL); - j["expire"] = ma; - ccjson = j.dump(); + cc_json["expire"] = ma; + cc_str = cc_json.dump(); } - TRACE(Debug, "GetFile() XrdCl::File::Fcntl value " << ccjson); - cache()->WriteCacheControlXAttr(m_info_file->getFD(), nullptr, ccjson); + TRACE(Debug, "GetFile() XrdCl::File::Fcntl value " << cc_str); + cache()->WriteCacheControlXAttr(m_info_file->getFD(), nullptr, cc_str); } else { From 3d5aca9a16bc8a6c517f9f06306d744589a4e5b4 Mon Sep 17 00:00:00 2001 From: alja Date: Thu, 17 Jul 2025 12:12:53 -0700 Subject: [PATCH 12/16] Set cc validy status in cache control query fails --- src/XrdPfc/XrdPfc.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 8e3bed9594f..d618e51b814 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -1201,7 +1201,9 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) } else { + // Message has a status beacuse we are in the block condition for cache-contol xattr TRACE(Error, "Prepare() XrdCl::FileSystem::Query failed " << f_name.c_str()); + ccIsValid = false; } } From a59ca60def71c71f3a6b597fc01a41cf8278cabc Mon Sep 17 00:00:00 2001 From: alja Date: Thu, 17 Jul 2025 13:53:10 -0700 Subject: [PATCH 13/16] Use XrdCl::QueryCode::Head to query cache control values --- src/XrdPfc/XrdPfc.cc | 2 +- src/XrdPfc/XrdPfcFile.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index d618e51b814..bf0a2bfb696 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -1181,7 +1181,7 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) XrdCl::Buffer queryArgs(500); queryArgs.FromString(curl); // pass file path throug args XrdCl::Buffer *response = nullptr; - XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::XAttr, queryArgs, response); + XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Head, queryArgs, response); if (st.IsOK()) { diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index a0564ff6679..d66421d81af 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -537,7 +537,7 @@ bool File::Open(XrdOucCacheIO* inputIO) // access and write cache-control attributes std::string cc_str; - XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::XAttr; + XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::Head; XrdCl::Buffer queryArgs(5); std::string qs = std::to_string(queryCode); queryArgs.FromString(qs); From 4f22b05d022ac74c67ef5e2eca38d9e887ca1b78 Mon Sep 17 00:00:00 2001 From: alja Date: Fri, 18 Jul 2025 10:57:12 -0700 Subject: [PATCH 14/16] Reserve more memory in XrdCl::Buffer in file system query --- src/XrdPfc/XrdPfc.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index bf0a2bfb696..fcf9a629ddf 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -1178,8 +1178,8 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) // Compare cinfo xattr etag and the etag from file system query response // Note: qeury returns only etag value, not a json string XrdCl::FileSystem fs(url); - XrdCl::Buffer queryArgs(500); - queryArgs.FromString(curl); // pass file path throug args + XrdCl::Buffer queryArgs(1024); // pass file path throug args: reserve bytes to store path + queryArgs.FromString(curl); XrdCl::Buffer *response = nullptr; XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Head, queryArgs, response); @@ -1211,7 +1211,6 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) if (!ccIsValid) { // invalidate cinfo on ETag mismatch - // AMT ...what to do with the second fail_if_open argument ? UnlinkFile(f_name, false); } } From 52b7f128dc9e9404a8653070ed63cd95f786afaa Mon Sep 17 00:00:00 2001 From: alja Date: Fri, 18 Jul 2025 15:02:19 -0700 Subject: [PATCH 15/16] Add comment --- src/XrdPfc/XrdPfc.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index fcf9a629ddf..94b328a5214 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -1207,13 +1207,13 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) } } - if (!ccIsValid) { // invalidate cinfo on ETag mismatch UnlinkFile(f_name, false); } - } + } // end chekcing cache control xattr in cinfo file + return 1; } else From 10862397458d15e4b71c1ed0c745637317ba8096 Mon Sep 17 00:00:00 2001 From: alja Date: Fri, 18 Jul 2025 15:03:24 -0700 Subject: [PATCH 16/16] Retrun error code in io::Fcntl call --- src/XrdPfc/XrdPfcFile.cc | 11 +++++------ src/XrdPosix/XrdPosixFile.cc | 3 ++- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index d66421d81af..61ab1d5528c 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -536,7 +536,6 @@ bool File::Open(XrdOucCacheIO* inputIO) cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size); // access and write cache-control attributes - std::string cc_str; XrdCl::QueryCode::Code queryCode = XrdCl::QueryCode::Head; XrdCl::Buffer queryArgs(5); std::string qs = std::to_string(queryCode); @@ -545,7 +544,7 @@ bool File::Open(XrdOucCacheIO* inputIO) int resFctl = inputIO->Fcntl(queryArgs, responseFctl); if (resFctl == 0) { - cc_str = responseFctl->ToString(); + std::string cc_str = responseFctl->ToString(); nlohmann::json cc_json = nlohmann::json::parse(cc_str); if (cc_json.contains("max-age")) { @@ -554,15 +553,15 @@ bool File::Open(XrdOucCacheIO* inputIO) cc_json["expire"] = ma; cc_str = cc_json.dump(); } - TRACE(Debug, "GetFile() XrdCl::File::Fcntl value " << cc_str); + TRACE(Error, "GetFile() XrdCl::File::Fcntl value " << cc_str); cache()->WriteCacheControlXAttr(m_info_file->getFD(), nullptr, cc_str); } - else + else if (resFctl != kXR_Unsupported) { - TRACE(Error, "GetFile() XrdCl::File::Fcntl query failed " << inputIO->Path()); + // Query XrdCl::QueryCode::Head is optional, print error only if informatin is supported + TRACE(Error, "GetFile() XrdCl::File::Fcntl query XrdCl::QueryCode::Head failed " << inputIO->Path()); } - // TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks()); } else diff --git a/src/XrdPosix/XrdPosixFile.cc b/src/XrdPosix/XrdPosixFile.cc index ca01945d31c..71cb8eff07d 100644 --- a/src/XrdPosix/XrdPosixFile.cc +++ b/src/XrdPosix/XrdPosixFile.cc @@ -389,8 +389,9 @@ int XrdPosixFile::Fstat(struct stat &buf) int XrdPosixFile::Fcntl(const XrdCl::Buffer &arg, XrdCl::Buffer *&response) { + // AMT: temporary solution to handle unsuported operations in XrdPfc::File::Open() XrdCl::XRootDStatus status = clFile.Fcntl(arg, response); - return status.IsOK() ? 0 : -1; + return status.IsOK() ? 0 : status.errNo; } /******************************************************************************/