From 176adbfc8e6fe93270ab921ed0147155b6445fa3 Mon Sep 17 00:00:00 2001 From: westsurname <155189104+westsurname@users.noreply.github.com> Date: Sun, 19 May 2024 04:03:09 -0400 Subject: [PATCH 01/11] Smart caching if no uncached found --- blackhole.py | 95 ++++++++++++++++++++++++++++++++++++++------ blackhole_watcher.py | 10 ++++- shared/arr.py | 37 +++++++++++++++-- 3 files changed, 125 insertions(+), 17 deletions(-) diff --git a/blackhole.py b/blackhole.py index 2c5df71..b53d460 100644 --- a/blackhole.py +++ b/blackhole.py @@ -80,13 +80,13 @@ def __init__(self, isTorrentOrMagnet, isDotTorrentFile) -> None: self.isTorrentOrMagnet = isTorrentOrMagnet self.isDotTorrentFile = isDotTorrentFile - def __init__(self, filename, isRadarr) -> None: + def __init__(self, filename, isRadarr, filePath=None) -> None: print('filename:', filename) baseBath = getPath(isRadarr) isDotTorrentFile = filename.casefold().endswith('.torrent') isTorrentOrMagnet = isDotTorrentFile or filename.casefold().endswith('.magnet') filenameWithoutExt, _ = os.path.splitext(filename) - filePath = os.path.join(baseBath, filename) + filePath = filePath or os.path.join(baseBath, filename) filePathProcessing = os.path.join(baseBath, 'processing', filename) folderPathCompleted = os.path.join(baseBath, 'completed', filenameWithoutExt) folderPathMountTorrent = os.path.join(blackhole['rdMountTorrentsPath'], filenameWithoutExt) @@ -96,11 +96,10 @@ def __init__(self, filename, isRadarr) -> None: class TorrentBase(ABC): - def __init__(self, f, file, fail, failIfNotCached, onlyLargestFile) -> None: + def __init__(self, f, file, failIfNotCached, onlyLargestFile) -> None: super().__init__() self.f = f self.file = file - self.fail = fail self.failIfNotCached = failIfNotCached self.onlyLargestFile = onlyLargestFile self.id = None @@ -125,7 +124,6 @@ def submitTorrent(self): instantAvailability = self.getInstantAvailability() self.print('instantAvailability:', not not instantAvailability) if not instantAvailability: - self.fail(self) return False availableHost = self.getAvailableHost() @@ -261,7 +259,7 @@ def getPath(isRadarr, create=False): finalPath = os.path.join(absoluteBaseWatchPath, blackhole['radarrPath'] if isRadarr else blackhole['sonarrPath']) if create: - for sub_path in ['', 'processing', 'completed']: + for sub_path in ['', 'processing', 'completed', 'uncached']: path_to_check = os.path.join(finalPath, sub_path) if not os.path.exists(path_to_check): os.makedirs(path_to_check) @@ -319,7 +317,7 @@ def print(*values: object): import signal -async def processFile(file: TorrentFileInfo, arr: Arr, isRadarr): +async def processFile(file: TorrentFileInfo, arr: Arr, isRadarr, failIfNotCached=None): try: _print = globals()['print'] @@ -346,7 +344,7 @@ async def is_accessible(path, timeout=10): executor.shutdown(wait=False) with open(file.fileInfo.filePathProcessing, 'rb' if file.torrentInfo.isDotTorrentFile else 'r') as f: - def fail(torrent: TorrentBase, arr: Arr=arr): + def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): print(f"Failing") history = arr.getHistory(blackhole['historyPageSize'])['records'] @@ -358,15 +356,25 @@ def fail(torrent: TorrentBase, arr: Arr=arr): for item in items: # TODO: See if we can fail without blacklisting as cached items constantly changes arr.failHistoryItem(item['id']) + + if uncached and items: + ids = '-'.join(str(item.get('episodeId', item['movieId'])) for item in items) + path = os.path.join(getPath(isRadarr), 'uncached', ids) + os.renames(torrent.file.fileInfo.filePathProcessing, path) + print(f"Failed") + + failIfNotCached = blackhole['failIfNotCached'] if failIfNotCached is None else failIfNotCached; onlyLargestFile = isRadarr or bool(re.search(r'S[\d]{2}E[\d]{2}', file.fileInfo.filename)) if file.torrentInfo.isDotTorrentFile: - torrent = Torrent(f, file, fail, blackhole['failIfNotCached'], onlyLargestFile) + torrent = Torrent(f, file, failIfNotCached, onlyLargestFile) else: - torrent = Magnet(f, file, fail, blackhole['failIfNotCached'], onlyLargestFile) + torrent = Magnet(f, file, failIfNotCached, onlyLargestFile) - if torrent.submitTorrent(): + if not torrent.submitTorrent(): + historyItems = fail(torrent, uncached=True) + else: count = 0 while True: count += 1 @@ -387,7 +395,7 @@ def fail(torrent: TorrentBase, arr: Arr=arr): if torrent.incompatibleHashSize and torrent.failIfNotCached: print("Non-cached incompatible hash sized torrent") torrent.delete() - fail(torrent) + fail(torrent, uncached=True) break await asyncio.sleep(1) elif status == 'magnet_error' or status == 'error' or status == 'dead' or status == 'virus': @@ -500,7 +508,7 @@ def fail(torrent: TorrentBase, arr: Arr=arr): def getFiles(isRadarr): print('getFiles') - files = (TorrentFileInfo(filename, isRadarr) for filename in os.listdir(getPath(isRadarr)) if filename not in ['processing', 'completed']) + files = (TorrentFileInfo(filename, isRadarr) for filename in os.listdir(getPath(isRadarr)) if filename not in ['processing', 'completed', 'uncached']) return [file for file in files if file.torrentInfo.isTorrentOrMagnet] async def on_created(isRadarr): @@ -541,5 +549,66 @@ async def on_created(isRadarr): def start(isRadarr): asyncio.run(on_created(isRadarr)) +def removeDir(dirPath): + files = os.listdir(dirPath) + for file in files: + os.remove(os.path.join(dirPath, file)) + os.rmdir(dirPath) + +async def processUncachedDir(root, dir, arr, isRadarr): + try: + dirPath = os.path.join(root, dir) + files = os.listdir(dirPath) + if not files: + os.rmdir(dirPath) + return + + ids = dir.split('-') + if all(arr.getGrandchild(id).hasFile for id in ids): + removeDir(dirPath) + return + + files = sorted((os.path.join(dirPath, file) for file in files), key=os.path.getctime) + + newestFileTime = os.path.getctime(files[-1]) + if (time.time() - newestFileTime) <= 900: # 15 minutes + return + + oldestFile = files[0] + await processFile(TorrentFileInfo(os.path.basename(oldestFile), isRadarr, oldestFile), arr, isRadarr, failIfNotCached=False) + removeDir(dirPath) + except: + e = traceback.format_exc() + + print(f"Error processing uncached directory: {dirPath}") + print(e) + + discordError(f"Error processing uncached directory: {dirPath}", e) + +async def processUncached(): + print('Processing uncached') + try: + radarr = Radarr() + sonarr = Sonarr() + + paths = [(os.path.join(getPath(isRadarr=True), 'uncached'), radarr, True), (os.path.join(getPath(isRadarr=False), 'uncached'), sonarr, False)] + + futures: list[asyncio.Future] = [] + + for path, arr, isRadarr in paths: + for root, dirs, _ in os.walk(path): + futures.append(asyncio.gather(*(processUncachedDir(root, dir, arr, isRadarr) for dir in dirs))) + + await asyncio.gather(*futures) + except: + e = traceback.format_exc() + + print(f"Error processing uncached") + print(e) + + discordError(f"Error processing uncached", e) + print("Finished processing uncached") + + if __name__ == "__main__": start(isRadarr=sys.argv[1] == 'radarr') diff --git a/blackhole_watcher.py b/blackhole_watcher.py index 0a9ad35..1749d69 100644 --- a/blackhole_watcher.py +++ b/blackhole_watcher.py @@ -1,6 +1,7 @@ +import asyncio from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler -from blackhole import start, getPath +from blackhole import start, processUncached, getPath class BlackholeHandler(FileSystemEventHandler): def __init__(self, is_radarr): @@ -18,6 +19,12 @@ def on_created(self, event): self.is_processing = False +async def scheduleProcessUncached(): + while True: + await asyncio.sleep(600) # 10 minutes + await processUncached() + + if __name__ == "__main__": print("Watching blackhole") @@ -33,6 +40,7 @@ def on_created(self, event): try: radarr_observer.start() sonarr_observer.start() + asyncio.run(scheduleProcessUncached()) except KeyboardInterrupt: radarr_observer.stop() sonarr_observer.stop() diff --git a/shared/arr.py b/shared/arr.py index b17b9cf..f72d60f 100644 --- a/shared/arr.py +++ b/shared/arr.py @@ -40,6 +40,7 @@ def validateRadarrApiKey(): return False return True + requiredEnvs = { 'Sonarr host': (sonarr['host'], validateSonarrHost), 'Sonarr API key': (sonarr['apiKey'], validateSonarrApiKey, True), @@ -67,6 +68,10 @@ def id(self): def title(self): return self.json['title'] + @property + def hasFile(self): + return self.json.get('hasFile', False) + @property def path(self): return self.json['path'] @@ -132,6 +137,22 @@ def setChildMonitored(self, childId: int, monitored: bool): season['monitored'] = monitored break +class Episode(Media): + @property + def size(self): + return self.json['sizeOnDisk'] + + @property + def monitoredChildrenIds(self): + return [self.id] if self.json['monitored'] else [] + + @property + def fullyAvailableChildrenIds(self): + return [self.id] if self.json['hasFile'] else [] + + def setChildMonitored(self, childId: int, monitored: bool): + self.json["monitored"] = monitored + class MediaFile(ABC): def __init__(self, json) -> None: super().__init__() @@ -169,19 +190,25 @@ def parentId(self): return self.json['movieId'] class Arr(ABC): - def __init__(self, host: str, apiKey: str, endpoint: str, fileEndpoint: str, childIdName: str, childName: str, constructor: Type[Media], fileConstructor: Type[MediaFile]) -> None: + def __init__(self, host: str, apiKey: str, endpoint: str, fileEndpoint: str, childIdName: str, childName: str, grandchildEndpoint: str, constructor: Type[Media], grandchildConstructor:Type[Media], fileConstructor: Type[MediaFile]) -> None: self.host = host self.apiKey = apiKey self.endpoint = endpoint self.fileEndpoint = fileEndpoint self.childIdName = childIdName self.childName = childName + self.grandchildEndpoint = grandchildEndpoint self.constructor = constructor + self.grandchildConstructor = grandchildConstructor self.fileConstructor = fileConstructor def get(self, id: int): get = requests.get(f"{self.host}/api/v3/{self.endpoint}/{id}?apiKey={self.apiKey}") return self.constructor(get.json()) + + def getGrandchild(self, id: int): + get = requests.get(f"{self.host}/api/v3/{self.grandchildEndpoint}/{id}?apiKey={self.apiKey}") + return self.grandchildConstructor(get.json()) def getAll(self): get = requests.get(f"{self.host}/api/v3/{self.endpoint}?apiKey={self.apiKey}") @@ -225,6 +252,7 @@ def automaticSearch(self, media: Media, childId: int): def _automaticSearchJson(self, media: Media, childId: int): pass + class Sonarr(Arr): host = sonarr['host'] apiKey = sonarr['apiKey'] @@ -232,9 +260,10 @@ class Sonarr(Arr): fileEndpoint = 'episodefile' childIdName = 'seasonNumber' childName = 'Season' + grandchildEndpoint = 'episode' def __init__(self) -> None: - super().__init__(Sonarr.host, Sonarr.apiKey, Sonarr.endpoint, Sonarr.fileEndpoint, Sonarr.childIdName, Sonarr.childName, Show, EpisodeFile) + super().__init__(Sonarr.host, Sonarr.apiKey, Sonarr.endpoint, Sonarr.fileEndpoint, Sonarr.childIdName, Sonarr.childName, Sonarr.grandchildEndpoint, Show, Episode, EpisodeFile) def _automaticSearchJson(self, media: Media, childId: int): return {"name": f"{self.childName}Search", f"{self.endpoint}Id": media.id, self.childIdName: childId} @@ -246,9 +275,11 @@ class Radarr(Arr): fileEndpoint = 'moviefile' childIdName = None childName = 'Movies' + grandchildEndpoint = endpoint def __init__(self) -> None: - super().__init__(Radarr.host, Radarr.apiKey, Radarr.endpoint, Radarr.fileEndpoint, None, Radarr.childName, Movie, MovieFile) + super().__init__(Radarr.host, Radarr.apiKey, Radarr.endpoint, Radarr.fileEndpoint, Radarr.childIdName, Radarr.childName, Radarr.grandchildEndpoint, Movie, Movie, MovieFile) def _automaticSearchJson(self, media: Media, childId: int): return {"name": f"{self.childName}Search", f"{self.endpoint}Ids": [media.id]} + From 62940265296506c99fb787b45c68704acaf0b09e Mon Sep 17 00:00:00 2001 From: Priky-one Date: Mon, 8 Jul 2024 05:26:15 -0400 Subject: [PATCH 02/11] Added uncached downloader --- .env.template | 2 +- Dockerfile.blackhole | 2 +- blackhole.py | 158 +++++++++++++++++++---------- blackhole_downloader.py | 219 ++++++++++++++++++++++++++++++++++++++++ blackhole_watcher.py | 10 +- requirements.txt | 1 + shared/arr.py | 6 ++ shared/discord.py | 34 +++++++ 8 files changed, 370 insertions(+), 62 deletions(-) create mode 100644 blackhole_downloader.py diff --git a/.env.template b/.env.template index 1daca0a..c1df7e9 100644 --- a/.env.template +++ b/.env.template @@ -37,7 +37,7 @@ BLACKHOLE_RADARR_PATH="Movies" BLACKHOLE_SONARR_PATH="TV Shows" BLACKHOLE_FAIL_IF_NOT_CACHED=true BLACKHOLE_RD_MOUNT_REFRESH_SECONDS=200 -BLACKHOLE_WAIT_FOR_TORRENT_TIMEOUT=60 +BLACKHOLE_WAIT_FOR_TORRENT_TIMEOUT=51840 BLACKHOLE_HISTORY_PAGE_SIZE=500 DISCORD_ENABLED=false diff --git a/Dockerfile.blackhole b/Dockerfile.blackhole index 0be884d..d98cb1f 100644 --- a/Dockerfile.blackhole +++ b/Dockerfile.blackhole @@ -1,4 +1,4 @@ -FROM python:3.8-slim +FROM python:3.11-slim # Metadata labels LABEL org.opencontainers.image.source="https://github.com/westsurname/scripts" diff --git a/blackhole.py b/blackhole.py index b53d460..5f807eb 100644 --- a/blackhole.py +++ b/blackhole.py @@ -12,12 +12,18 @@ # import urllib from werkzeug.utils import cached_property from abc import ABC, abstractmethod -from shared.discord import discordError, discordUpdate +from shared.discord import discordError, discordUpdate, discordStatusUpdate from shared.shared import realdebrid, blackhole, plex, mediaExtensions, checkRequiredEnvs from shared.arr import Arr, Radarr, Sonarr +from blackhole_downloader import downloader +from RTN import parse +import threading rdHost = realdebrid['host'] authToken = realdebrid['apiKey'] +shared_dict = {} +lock = threading.Lock() +webhook = discordStatusUpdate(shared_dict, create=True) _print = print @@ -172,6 +178,32 @@ def getInfo(self, refresh=False): return self._info + def getActiveTorrents(self): + activeCount = requests.get(f"{rdHost}torrents/activeCount?auth_token={authToken}") + activeTorrents = activeCount.json() + + return activeTorrents + + def getAllTorrents(self): + allTorrents = [] + page = 1 + limit = 2500 + + while True: + response = requests.get(f"{rdHost}torrents?auth_token={authToken}", params={"page": page, "limit": limit}) + if response.status_code != 200: + print(f"Error: {response.status_code} - {response.text}") + break + + torrentInfo = response.json() + if not torrentInfo: + break + + allTorrents.extend(torrentInfo) + page += 1 + + return allTorrents + def selectFiles(self): self._enforceId() @@ -193,7 +225,7 @@ def selectFiles(self): if self.failIfNotCached and not self.incompatibleHashSize: targetFileIds = {largestMediaFileId} if self.onlyLargestFile else mediaFileIds - if not any(set(fileGroup.keys()) == targetFileIds for fileGroup in self._instantAvailability): + if self._instantAvailability and not any(set(fileGroup.keys()) == targetFileIds for fileGroup in self._instantAvailability): extraFilesGroup = next((fileGroup for fileGroup in self._instantAvailability if largestMediaFileId in fileGroup.keys()), None) if self.onlyLargestFile and extraFilesGroup: self.print('extra files required for cache:', extraFilesGroup) @@ -344,7 +376,7 @@ async def is_accessible(path, timeout=10): executor.shutdown(wait=False) with open(file.fileInfo.filePathProcessing, 'rb' if file.torrentInfo.isDotTorrentFile else 'r') as f: - def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): + async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): print(f"Failing") history = arr.getHistory(blackhole['historyPageSize'])['records'] @@ -353,15 +385,41 @@ def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): if not items: raise Exception("No history items found to cancel") + first_item = None + total_items = 0 for item in items: + if first_item is None: + first_item = item # TODO: See if we can fail without blacklisting as cached items constantly changes arr.failHistoryItem(item['id']) - - if uncached and items: - ids = '-'.join(str(item.get('episodeId', item['movieId'])) for item in items) - path = os.path.join(getPath(isRadarr), 'uncached', ids) - os.renames(torrent.file.fileInfo.filePathProcessing, path) - + arr.removeFailedItem(item['id']) ## Removing from blocklist + total_items += 1 + + if uncached and items and first_item: + itemId = str(first_item.get('seriesId', first_item.get('movieId'))) + path = os.path.join(getPath(isRadarr), 'uncached', itemId, torrent.file.fileInfo.filename) + if not isRadarr: + if total_items == 1: # and first_item["releaseType"] != "SeasonPack" + episodeId = str(first_item['episodeId']) ## Fallback? data --> releaseType --> SeasonPack + path = os.path.join(getPath(isRadarr), 'uncached', itemId, episodeId, torrent.file.fileInfo.filename) + else: + seasonPack = 'seasonpack' + parsedTorrent = parse(torrent.file.fileInfo.filename) ## Fallback? episode --> seasonNumber + seasons = [str(pt) for pt in parsedTorrent.season] + seasons = "-".join(seasons) + path = os.path.join(getPath(isRadarr), 'uncached', itemId, seasonPack, seasons, torrent.file.fileInfo.filename) + + if not os.path.exists(path): + os.renames(torrent.file.fileInfo.filePathProcessing, path) + else: + os.remove(file.fileInfo.filePathProcessing) + await downloader(torrent, file, arr, path, shared_dict, lock, webhook) + elif not first_item: + os.remove(file.fileInfo.filePathProcessing) + os.remove(file.fileInfo.filePath) + arr.clearBlocklist() + allItems = arr.getAll() + # TODO: Trigger scan for the deleted torrent which don't exist in history print(f"Failed") @@ -373,7 +431,7 @@ def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): torrent = Magnet(f, file, failIfNotCached, onlyLargestFile) if not torrent.submitTorrent(): - historyItems = fail(torrent, uncached=True) + historyItems = await fail(torrent, uncached=True) else: count = 0 while True: @@ -386,7 +444,7 @@ def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): if status == 'waiting_files_selection': if not torrent.selectFiles(): torrent.delete() - fail(torrent) + await fail(torrent) break elif status == 'magnet_conversion' or status == 'queued' or status == 'downloading' or status == 'compressing' or status == 'uploading': # Send progress to arr @@ -395,11 +453,11 @@ def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): if torrent.incompatibleHashSize and torrent.failIfNotCached: print("Non-cached incompatible hash sized torrent") torrent.delete() - fail(torrent, uncached=True) + await fail(torrent, uncached=True) break await asyncio.sleep(1) elif status == 'magnet_error' or status == 'error' or status == 'dead' or status == 'virus': - fail(torrent) + await fail(torrent) break elif status == 'downloaded': existsCount = 0 @@ -435,6 +493,7 @@ def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): for root, dirs, files in os.walk(folderPathMountTorrent): relRoot = os.path.relpath(root, folderPathMountTorrent) for filename in files: + source_link = os.path.join(root, filename) # Check if the file is accessible # if not await is_accessible(os.path.join(root, filename)): # print(f"Timeout reached when accessing file: {filename}") @@ -454,18 +513,24 @@ def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): seasonFolderPathCompleted = re.sub(multiSeasonRegex2, season, seasonFolderPathCompleted) os.makedirs(os.path.join(seasonFolderPathCompleted, relRoot), exist_ok=True) - os.symlink(os.path.join(root, filename), os.path.join(seasonFolderPathCompleted, relRoot, filename)) - print('Season Recursive:', f"{os.path.join(seasonFolderPathCompleted, relRoot, filename)} -> {os.path.join(root, filename)}") + target_link = os.path.join(seasonFolderPathCompleted, relRoot, filename) + if os.path.islink(target_link): + os.remove(target_link) + os.symlink(source_link, target_link) + print('Season Recursive:', f"{target_link} -> {source_link}") # refreshEndpoint = f"{plex['serverHost']}/library/sections/{plex['serverMovieLibraryId'] if isRadarr else plex['serverTvShowLibraryId']}/refresh?path={urllib.parse.quote_plus(os.path.join(seasonFolderPathCompleted, relRoot))}&X-Plex-Token={plex['serverApiKey']}" # cancelRefreshRequest = requests.delete(refreshEndpoint, headers={'Accept': 'application/json'}) # refreshRequest = requests.get(refreshEndpoint, headers={'Accept': 'application/json'}) continue - + + target_link = os.path.join(file.fileInfo.folderPathCompleted, relRoot, filename) os.makedirs(os.path.join(file.fileInfo.folderPathCompleted, relRoot), exist_ok=True) - os.symlink(os.path.join(root, filename), os.path.join(file.fileInfo.folderPathCompleted, relRoot, filename)) - print('Recursive:', f"{os.path.join(file.fileInfo.folderPathCompleted, relRoot, filename)} -> {os.path.join(root, filename)}") + if os.path.islink(target_link): + os.remove(target_link) + os.symlink(source_link, target_link) + print('Recursive:', f"{target_link} -> {source_link}") # refreshEndpoint = f"{plex['serverHost']}/library/sections/{plex['serverMovieLibraryId'] if isRadarr else plex['serverTvShowLibraryId']}/refresh?path={urllib.parse.quote_plus(os.path.join(file.fileInfo.folderPathCompleted, relRoot))}&X-Plex-Token={plex['serverApiKey']}" # cancelRefreshRequest = requests.delete(refreshEndpoint, headers={'Accept': 'application/json'}) # refreshRequest = requests.get(refreshEndpoint, headers={'Accept': 'application/json'}) @@ -494,10 +559,10 @@ def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): discordError(f"{file.fileInfo.filenameWithoutExt} info attempt count > 20", status) elif count == blackhole['waitForTorrentTimeout']: print('infoCount == 60 - Failing') - fail(torrent) + await fail(torrent) break - os.remove(file.fileInfo.filePathProcessing) + os.remove(file.fileInfo.filePathProcessing) except: e = traceback.format_exc() @@ -555,37 +620,7 @@ def removeDir(dirPath): os.remove(os.path.join(dirPath, file)) os.rmdir(dirPath) -async def processUncachedDir(root, dir, arr, isRadarr): - try: - dirPath = os.path.join(root, dir) - files = os.listdir(dirPath) - if not files: - os.rmdir(dirPath) - return - - ids = dir.split('-') - if all(arr.getGrandchild(id).hasFile for id in ids): - removeDir(dirPath) - return - - files = sorted((os.path.join(dirPath, file) for file in files), key=os.path.getctime) - - newestFileTime = os.path.getctime(files[-1]) - if (time.time() - newestFileTime) <= 900: # 15 minutes - return - - oldestFile = files[0] - await processFile(TorrentFileInfo(os.path.basename(oldestFile), isRadarr, oldestFile), arr, isRadarr, failIfNotCached=False) - removeDir(dirPath) - except: - e = traceback.format_exc() - - print(f"Error processing uncached directory: {dirPath}") - print(e) - - discordError(f"Error processing uncached directory: {dirPath}", e) - -async def processUncached(): +async def resumeUncached(): print('Processing uncached') try: radarr = Radarr() @@ -594,10 +629,25 @@ async def processUncached(): paths = [(os.path.join(getPath(isRadarr=True), 'uncached'), radarr, True), (os.path.join(getPath(isRadarr=False), 'uncached'), sonarr, False)] futures: list[asyncio.Future] = [] - + processed_files = set() + for path, arr, isRadarr in paths: for root, dirs, _ in os.walk(path): - futures.append(asyncio.gather(*(processUncachedDir(root, dir, arr, isRadarr) for dir in dirs))) + if not dirs: + if not os.listdir(root): + os.removedirs(root) + continue + print(os.listdir(root)) + files = (TorrentFileInfo(filename, isRadarr, os.path.join(root, filename)) for filename in os.listdir(root)) + files = [file for file in files if file.torrentInfo.isTorrentOrMagnet] + for file in files: + if file.fileInfo.filename not in processed_files: + shutil.copy(file.fileInfo.filePath, file.fileInfo.filePathProcessing) + processed_files.add(file.fileInfo.filename) + futures.append(asyncio.create_task(processFile(file, arr, isRadarr))) + else: + os.remove(file.fileInfo.filePath) + await asyncio.gather(*futures) except: @@ -611,4 +661,4 @@ async def processUncached(): if __name__ == "__main__": - start(isRadarr=sys.argv[1] == 'radarr') + start(isRadarr=sys.argv[1] == 'radarr') \ No newline at end of file diff --git a/blackhole_downloader.py b/blackhole_downloader.py new file mode 100644 index 0000000..96d5ccf --- /dev/null +++ b/blackhole_downloader.py @@ -0,0 +1,219 @@ +import asyncio +import os +import glob +from blackhole import * + +async def downloader(torrent, file, arr, torrentFile, shared_dict, lock, webhook): + availableHost = torrent.getAvailableHost() + activeTorrents = torrent.getActiveTorrents() + + while True: + if activeTorrents['limit'] - activeTorrents['nb'] > 0: + allTorrents = torrent.getAllTorrents() + torrentExists = False + for at in allTorrents: + if at["hash"] == torrent.getHash().lower(): + torrent.id = at["id"] + torrentName = at["filename"] + if at["status"] == "downloaded": + print("File already exists") + elif at["status"] == "downloading": + print("File downloading") + torrentExists = True + lock.acquire() + if torrentName in shared_dict: + lock.release() + return ## Delete these duplicate torrents from disk + lock.release() + break + if not torrentExists: + while True: + folder_path = os.path.dirname(torrentFile) + all_files = glob.glob(os.path.join(folder_path, '*')) + all_files.sort(key=os.path.getctime) + top_4_files = all_files[:4] + if torrentFile in top_4_files: + torrent.getInstantAvailability() + torrent.addTorrent(availableHost) + info = torrent.getInfo(refresh=True) + torrentName = info['filename'] + lock.acquire() + try: + if shared_dict: + shared_dict.update({torrentName : "added"}) + discordStatusUpdate(shared_dict, webhook, edit=True) + else: + shared_dict.update({torrentName : "added"}) + discordStatusUpdate(shared_dict, webhook) + finally: + lock.release() + break + if not os.path.exists(torrentFile): + break + await asyncio.sleep(60) + break + await asyncio.sleep(60) + + count = 0 + while True: + count += 1 + info = torrent.getInfo(refresh=True) + status = info['status'] + torrentName = info['filename'] + + print('status:', status) + if not os.path.exists(torrentFile): + torrent.delete() + break + if status == 'waiting_files_selection': + if not torrent.selectFiles(): + torrent.delete() + break + elif status == 'magnet_conversion' or status == 'queued' or status == 'downloading' or status == 'compressing' or status == 'uploading': + progress = info['progress'] + print(progress) + lock.acquire() + try: + if shared_dict: + shared_dict.update({torrentName : f"Downloading {progress}%"}) + discordStatusUpdate(shared_dict, webhook, edit=True) + else: + shared_dict.update({torrentName : f"Downloading {progress}%"}) + discordStatusUpdate(shared_dict, webhook) + finally: + lock.release() + if torrent.incompatibleHashSize and torrent.failIfNotCached: + print("Non-cached incompatible hash sized torrent") + torrent.delete() + break + await asyncio.sleep(5) + elif status == 'magnet_error' or status == 'error' or status == 'dead' or status == 'virus': + torrent.delete() + break + elif status == 'downloaded': + existsCount = 0 + print('Waiting for folders to refresh...') + + filename = info.get('filename') + originalFilename = info.get('original_filename') + + folderPathMountFilenameTorrent = os.path.join(blackhole['rdMountTorrentsPath'], filename) + folderPathMountOriginalFilenameTorrent = os.path.join(blackhole['rdMountTorrentsPath'], originalFilename) + folderPathMountOriginalFilenameWithoutExtTorrent = os.path.join(blackhole['rdMountTorrentsPath'], os.path.splitext(originalFilename)[0]) + + while existsCount <= blackhole['waitForTorrentTimeout']: + existsCount += 1 + + if os.path.exists(folderPathMountFilenameTorrent) and os.listdir(folderPathMountFilenameTorrent): + folderPathMountTorrent = folderPathMountFilenameTorrent + elif os.path.exists(folderPathMountOriginalFilenameTorrent) and os.listdir(folderPathMountOriginalFilenameTorrent): + folderPathMountTorrent = folderPathMountOriginalFilenameTorrent + elif (originalFilename.endswith(('.mkv', '.mp4')) and + os.path.exists(folderPathMountOriginalFilenameWithoutExtTorrent) and os.listdir(folderPathMountOriginalFilenameWithoutExtTorrent)): + folderPathMountTorrent = folderPathMountOriginalFilenameWithoutExtTorrent + else: + folderPathMountTorrent = None + + if folderPathMountTorrent: + multiSeasonRegex1 = r'(?<=[\W_][Ss]eason[\W_])[\d][\W_][\d]{1,2}(?=[\W_])' + multiSeasonRegex2 = r'(?<=[\W_][Ss])[\d]{2}[\W_][Ss]?[\d]{2}(?=[\W_])' + multiSeasonRegexCombined = f'{multiSeasonRegex1}|{multiSeasonRegex2}' + + multiSeasonMatch = re.search(multiSeasonRegexCombined, file.fileInfo.filenameWithoutExt) + + for root, dirs, files in os.walk(folderPathMountTorrent): + relRoot = os.path.relpath(root, folderPathMountTorrent) + for filename in files: + # Check if the file is accessible + # if not await is_accessible(os.path.join(root, filename)): + # print(f"Timeout reached when accessing file: {filename}") + # discordError(f"Timeout reached when accessing file", filename) + # Uncomment the following line to fail the entire torrent if the timeout on any of its files are reached + # fail(torrent) + # return + + if multiSeasonMatch: + seasonMatch = re.search(r'S([\d]{2})E[\d]{2}', filename) + + if seasonMatch: + season = seasonMatch.group(1) + seasonShort = season[1:] if season[0] == '0' else season + + seasonFolderPathCompleted = re.sub(multiSeasonRegex1, seasonShort, file.fileInfo.folderPathCompleted) + seasonFolderPathCompleted = re.sub(multiSeasonRegex2, season, seasonFolderPathCompleted) + + os.makedirs(os.path.join(seasonFolderPathCompleted, relRoot), exist_ok=True) + os.symlink(os.path.join(root, filename), os.path.join(seasonFolderPathCompleted, relRoot, filename)) + print('Season Recursive:', f"{os.path.join(seasonFolderPathCompleted, relRoot, filename)} -> {os.path.join(root, filename)}") + continue + + + os.makedirs(os.path.join(file.fileInfo.folderPathCompleted, relRoot), exist_ok=True) + os.symlink(os.path.join(root, filename), os.path.join(file.fileInfo.folderPathCompleted, relRoot, filename)) + print('Recursive:', f"{os.path.join(file.fileInfo.folderPathCompleted, relRoot, filename)} -> {os.path.join(root, filename)}") + + print('Refreshed') + discordUpdate(f"Sucessfully processed {file.fileInfo.filenameWithoutExt}", f"Now available for immediate consumption! existsCount: {existsCount}") + + await refreshArr(arr) + break + + if existsCount == blackhole['rdMountRefreshSeconds'] + 1: + print(f"Torrent folder not found in filesystem: {file.fileInfo.filenameWithoutExt}") + discordError("Torrent folder not found in filesystem", file.fileInfo.filenameWithoutExt) + + await asyncio.sleep(1) + break + + if torrent.failIfNotCached: + if count == 21 and status != "downloading": + print('infoCount > 20') + discordError(f"{file.fileInfo.filenameWithoutExt} info attempt count > 20", status) + elif count == blackhole['waitForTorrentTimeout']: + print(f"infoCount == {blackhole['waitForTorrentTimeout']} - Failing") + torrent.delete() + break + + if os.path.exists(torrentFile): + folder_path = os.path.dirname(torrentFile) + all_files = glob.glob(os.path.join(folder_path, '*')) + all_files.sort(key=os.path.getctime) + try: + file_index = all_files.index(torrentFile) + except ValueError: + print("The file does not exist in the folder.") + file_index = -1 + + if file_index != -1: + files_to_remove = all_files[file_index:] + for file in files_to_remove: + try: + os.remove(file) + print(f"Removed: {file}") + except Exception as e: + print(f"Error removing {file}: {e}") + + remaining_files = os.listdir(folder_path) + if not remaining_files: + try: + os.rmdir(folder_path) + print(f"Removed folder: {folder_path}") + except Exception as e: + print(f"Error removing folder {folder_path}: {e}") + else: + print(f"Folder is not empty: {folder_path}") + else: + print("The specified file is not in the folder.") + else: + print("The file does not exist.") + + lock.acquire() + try: + if torrentName in shared_dict: + del shared_dict[torrentName] + if shared_dict: + discordStatusUpdate(shared_dict, webhook, edit=True) + else: + discordStatusUpdate(shared_dict, webhook, delete=True) + finally: + lock.release() \ No newline at end of file diff --git a/blackhole_watcher.py b/blackhole_watcher.py index 1749d69..347b2f4 100644 --- a/blackhole_watcher.py +++ b/blackhole_watcher.py @@ -1,7 +1,7 @@ import asyncio from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler -from blackhole import start, processUncached, getPath +from blackhole import start, resumeUncached, getPath class BlackholeHandler(FileSystemEventHandler): def __init__(self, is_radarr): @@ -19,10 +19,8 @@ def on_created(self, event): self.is_processing = False -async def scheduleProcessUncached(): - while True: - await asyncio.sleep(600) # 10 minutes - await processUncached() +async def scheduleResumeUncached(): + await resumeUncached() if __name__ == "__main__": @@ -40,7 +38,7 @@ async def scheduleProcessUncached(): try: radarr_observer.start() sonarr_observer.start() - asyncio.run(scheduleProcessUncached()) + asyncio.run(scheduleResumeUncached()) except KeyboardInterrupt: radarr_observer.stop() sonarr_observer.stop() diff --git a/requirements.txt b/requirements.txt index 1b5b86f..b9d4797 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ requests==2.28.1 #all bencode3==0.1.0 #blackhole watchdog==4.0.0 #blackhole +rank-torrent-name #blackhole flask==3.0.2 #plex_request Flask-Caching==2.1.0 #plex_request diff --git a/shared/arr.py b/shared/arr.py index f72d60f..d72048f 100644 --- a/shared/arr.py +++ b/shared/arr.py @@ -235,6 +235,12 @@ def getHistory(self, pageSize: int): def failHistoryItem(self, historyId: int): failRequest = requests.post(f"{self.host}/api/v3/history/failed/{historyId}?apiKey={self.apiKey}") + + def removeFailedItem(self, itemId: int): + removeRequest = requests.delete(f"{self.host}/api/v3/blocklist/{itemId}?apiKey={self.apiKey}") + + def clearBlocklist(self): + commandRequest = requests.post(f"{self.host}/api/v3/command?apiKey={self.apiKey}", json={'name': 'ClearBlocklist'}, headers={'Content-Type': 'application/json'}) def refreshMonitoredDownloads(self): commandRequest = requests.post(f"{self.host}/api/v3/command?apiKey={self.apiKey}", json={'name': 'RefreshMonitoredDownloads'}, headers={'Content-Type': 'application/json'}) diff --git a/shared/discord.py b/shared/discord.py index 357e997..ac60331 100644 --- a/shared/discord.py +++ b/shared/discord.py @@ -39,3 +39,37 @@ def discordUpdate(title, message=None): embeds=[embed] ) response = webhook.execute() + +def discordStatusUpdate(torrentDict, webhook=None, create=False, edit=False, delete=False): + if discord['updateEnabled']: + if delete: + webhook.remove_embeds() + embed = DiscordEmbed("Downloading Status", f"No Active Downloads", color=9807270) + webhook.add_embed(embed) + response = webhook.edit() + return response + if create: + webhook = DiscordWebhook( + url=discord['webhookUrl'], + rate_limit_retry=True, + username='Status Bot' + ) + return webhook + + if not edit: + embed = DiscordEmbed("Downloading Status", f"Current downloading - {len(torrentDict)}", color=16776960) + for filename,progress in torrentDict.items(): + embed.add_embed_field(name=filename, value=progress, inline=False) + + webhook.add_embed(embed) + response = webhook.execute(remove_embeds=True) + return webhook + else: + webhook.remove_embeds() + embed = DiscordEmbed("Downloading Status", f"Current downloading - {len(torrentDict)}", color=16776960) + for filename,progress in torrentDict.items(): + embed.add_embed_field(name=filename, value=progress, inline=False) + + webhook.add_embed(embed) + response = webhook.edit() + return webhook \ No newline at end of file From 6964c2aebd04ae10f4f0ef7681bb5dbada593948 Mon Sep 17 00:00:00 2001 From: Priky-one Date: Wed, 10 Jul 2024 09:21:48 -0400 Subject: [PATCH 03/11] Fixed imports & wait time --- blackhole.py | 2 +- blackhole_downloader.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/blackhole.py b/blackhole.py index 5f807eb..e793a86 100644 --- a/blackhole.py +++ b/blackhole.py @@ -558,7 +558,7 @@ async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): print('infoCount > 20') discordError(f"{file.fileInfo.filenameWithoutExt} info attempt count > 20", status) elif count == blackhole['waitForTorrentTimeout']: - print('infoCount == 60 - Failing') + print(f"infoCount == {blackhole['waitForTorrentTimeout']} - Failing") await fail(torrent) break diff --git a/blackhole_downloader.py b/blackhole_downloader.py index 96d5ccf..1c79732 100644 --- a/blackhole_downloader.py +++ b/blackhole_downloader.py @@ -1,7 +1,10 @@ import asyncio import os import glob -from blackhole import * +from shared.discord import discordError, discordUpdate, discordStatusUpdate +from shared.shared import blackhole +import re +from blackhole import refreshArr async def downloader(torrent, file, arr, torrentFile, shared_dict, lock, webhook): availableHost = torrent.getAvailableHost() From eec5fcafe220af295d9f8f86f81628e1ca8e40d0 Mon Sep 17 00:00:00 2001 From: Priky-one Date: Thu, 11 Jul 2024 08:04:38 -0400 Subject: [PATCH 04/11] Fixed imports --- blackhole.py | 4 +++- blackhole_downloader.py | 9 ++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/blackhole.py b/blackhole.py index e793a86..e85eea6 100644 --- a/blackhole.py +++ b/blackhole.py @@ -416,7 +416,9 @@ async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): await downloader(torrent, file, arr, path, shared_dict, lock, webhook) elif not first_item: os.remove(file.fileInfo.filePathProcessing) - os.remove(file.fileInfo.filePath) + if os.path.exists(file.fileInfo.filePath): + os.remove(file.fileInfo.filePath) + return arr.clearBlocklist() allItems = arr.getAll() # TODO: Trigger scan for the deleted torrent which don't exist in history diff --git a/blackhole_downloader.py b/blackhole_downloader.py index 1c79732..f7677e3 100644 --- a/blackhole_downloader.py +++ b/blackhole_downloader.py @@ -4,9 +4,9 @@ from shared.discord import discordError, discordUpdate, discordStatusUpdate from shared.shared import blackhole import re -from blackhole import refreshArr async def downloader(torrent, file, arr, torrentFile, shared_dict, lock, webhook): + from blackhole import refreshArr availableHost = torrent.getAvailableHost() activeTorrents = torrent.getActiveTorrents() @@ -26,7 +26,8 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock, webhook lock.acquire() if torrentName in shared_dict: lock.release() - return ## Delete these duplicate torrents from disk + remove_file(torrentFile) + return lock.release() break if not torrentExists: @@ -176,7 +177,9 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock, webhook print(f"infoCount == {blackhole['waitForTorrentTimeout']} - Failing") torrent.delete() break - + remove_file(torrentFile) + +def remove_file(torrentFile) if os.path.exists(torrentFile): folder_path = os.path.dirname(torrentFile) all_files = glob.glob(os.path.join(folder_path, '*')) From 60320d4757a8b4bc2b8b4c2721ef0820b773b383 Mon Sep 17 00:00:00 2001 From: Priky-one Date: Mon, 22 Jul 2024 07:46:29 -0400 Subject: [PATCH 05/11] Fixed major filenotfound errors --- blackhole.py | 29 +++++++++++++++++------------ blackhole_downloader.py | 6 +++--- blackhole_watcher.py | 17 ++++++++++------- 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/blackhole.py b/blackhole.py index e85eea6..c9a87fa 100644 --- a/blackhole.py +++ b/blackhole.py @@ -22,7 +22,7 @@ rdHost = realdebrid['host'] authToken = realdebrid['apiKey'] shared_dict = {} -lock = threading.Lock() +# lock = threading.Lock() webhook = discordStatusUpdate(shared_dict, create=True) _print = print @@ -156,6 +156,7 @@ def getInstantAvailability(self, refresh=False): instantAvailabilityRequest = requests.get(f"{rdHost}torrents/instantAvailability/{torrentHash}?auth_token={authToken}") instantAvailabilities = instantAvailabilityRequest.json() self.print('instantAvailabilities:', instantAvailabilities) + if not instantAvailabilities: return instantAvailabilityHosters = next(iter(instantAvailabilities.values())) if not instantAvailabilityHosters: return @@ -349,7 +350,7 @@ def print(*values: object): import signal -async def processFile(file: TorrentFileInfo, arr: Arr, isRadarr, failIfNotCached=None): +async def processFile(file: TorrentFileInfo, arr: Arr, isRadarr, failIfNotCached=None, lock=None): try: _print = globals()['print'] @@ -374,7 +375,8 @@ async def is_accessible(path, timeout=10): return False finally: executor.shutdown(wait=False) - + if not os.path.exists(file.fileInfo.filePathProcessing): + return with open(file.fileInfo.filePathProcessing, 'rb' if file.torrentInfo.isDotTorrentFile else 'r') as f: async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): print(f"Failing") @@ -411,15 +413,15 @@ async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): if not os.path.exists(path): os.renames(torrent.file.fileInfo.filePathProcessing, path) - else: + elif os.path.exists(file.fileInfo.filePathProcessing): os.remove(file.fileInfo.filePathProcessing) await downloader(torrent, file, arr, path, shared_dict, lock, webhook) elif not first_item: + arr.clearBlocklist() os.remove(file.fileInfo.filePathProcessing) if os.path.exists(file.fileInfo.filePath): os.remove(file.fileInfo.filePath) return - arr.clearBlocklist() allItems = arr.getAll() # TODO: Trigger scan for the deleted torrent which don't exist in history print(f"Failed") @@ -565,6 +567,9 @@ async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): break os.remove(file.fileInfo.filePathProcessing) + print("FILEPATH REMOVING: ", file.fileInfo.filePath) + if os.path.exists(file.fileInfo.filePath): + os.remove(file.fileInfo.filePath) except: e = traceback.format_exc() @@ -578,7 +583,7 @@ def getFiles(isRadarr): files = (TorrentFileInfo(filename, isRadarr) for filename in os.listdir(getPath(isRadarr)) if filename not in ['processing', 'completed', 'uncached']) return [file for file in files if file.torrentInfo.isTorrentOrMagnet] -async def on_created(isRadarr): +async def on_created(isRadarr, lock): print("Enter 'on_created'") try: print('radarr/sonarr:', 'radarr' if isRadarr else 'sonarr') @@ -597,7 +602,7 @@ async def on_created(isRadarr): if files: for file in files: os.renames(file.fileInfo.filePath, file.fileInfo.filePathProcessing) - futures.append(asyncio.gather(*(processFile(file, arr, isRadarr) for file in files))) + futures.append(asyncio.gather(*(processFile(file, arr, isRadarr, lock=lock) for file in files))) elif firstGo: print('No torrent files found') firstGo = False @@ -613,8 +618,8 @@ async def on_created(isRadarr): discordError(f"Error processing", e) print("Exit 'on_created'") -def start(isRadarr): - asyncio.run(on_created(isRadarr)) +def start(isRadarr, lock): + asyncio.run(on_created(isRadarr, lock)) def removeDir(dirPath): files = os.listdir(dirPath) @@ -622,7 +627,7 @@ def removeDir(dirPath): os.remove(os.path.join(dirPath, file)) os.rmdir(dirPath) -async def resumeUncached(): +async def resumeUncached(lock): print('Processing uncached') try: radarr = Radarr() @@ -636,7 +641,7 @@ async def resumeUncached(): for path, arr, isRadarr in paths: for root, dirs, _ in os.walk(path): if not dirs: - if not os.listdir(root): + if os.path.exists(root) and not os.listdir(root): os.removedirs(root) continue print(os.listdir(root)) @@ -646,7 +651,7 @@ async def resumeUncached(): if file.fileInfo.filename not in processed_files: shutil.copy(file.fileInfo.filePath, file.fileInfo.filePathProcessing) processed_files.add(file.fileInfo.filename) - futures.append(asyncio.create_task(processFile(file, arr, isRadarr))) + futures.append(asyncio.gather(processFile(file, arr, isRadarr, lock=lock))) # create_task else: os.remove(file.fileInfo.filePath) diff --git a/blackhole_downloader.py b/blackhole_downloader.py index f7677e3..009d489 100644 --- a/blackhole_downloader.py +++ b/blackhole_downloader.py @@ -26,7 +26,7 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock, webhook lock.acquire() if torrentName in shared_dict: lock.release() - remove_file(torrentFile) + remove_file(torrentFile, lock, shared_dict, torrentName, webhook) return lock.release() break @@ -177,9 +177,9 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock, webhook print(f"infoCount == {blackhole['waitForTorrentTimeout']} - Failing") torrent.delete() break - remove_file(torrentFile) + remove_file(torrentFile, lock, shared_dict, torrentName, webhook) -def remove_file(torrentFile) +def remove_file(torrentFile, lock, shared_dict, torrentName, webhook): if os.path.exists(torrentFile): folder_path = os.path.dirname(torrentFile) all_files = glob.glob(os.path.join(folder_path, '*')) diff --git a/blackhole_watcher.py b/blackhole_watcher.py index 347b2f4..e807e86 100644 --- a/blackhole_watcher.py +++ b/blackhole_watcher.py @@ -2,32 +2,35 @@ from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from blackhole import start, resumeUncached, getPath +import threading class BlackholeHandler(FileSystemEventHandler): - def __init__(self, is_radarr): + def __init__(self, is_radarr, lock): super().__init__() self.is_processing = False self.is_radarr = is_radarr self.path_name = getPath(is_radarr, create=True) + self.lock = lock def on_created(self, event): if not self.is_processing and not event.is_directory and event.src_path.lower().endswith((".torrent", ".magnet")): self.is_processing = True try: - start(self.is_radarr) + start(self.is_radarr, self.lock) finally: self.is_processing = False -async def scheduleResumeUncached(): - await resumeUncached() +async def scheduleResumeUncached(lock): + await resumeUncached(lock) if __name__ == "__main__": print("Watching blackhole") + lock = threading.Lock() - radarr_handler = BlackholeHandler(is_radarr=True) - sonarr_handler = BlackholeHandler(is_radarr=False) + radarr_handler = BlackholeHandler(is_radarr=True, lock=lock) + sonarr_handler = BlackholeHandler(is_radarr=False, lock=lock) radarr_observer = Observer() radarr_observer.schedule(radarr_handler, radarr_handler.path_name) @@ -38,7 +41,7 @@ async def scheduleResumeUncached(): try: radarr_observer.start() sonarr_observer.start() - asyncio.run(scheduleResumeUncached()) + asyncio.run(scheduleResumeUncached(lock)) except KeyboardInterrupt: radarr_observer.stop() sonarr_observer.stop() From d11cea3f6b679b698f4adabc6d417f61391650d5 Mon Sep 17 00:00:00 2001 From: Priky-one Date: Thu, 29 Aug 2024 13:33:30 +0530 Subject: [PATCH 06/11] Bug fixes and enabled silent notifications --- .env.template | 1 + blackhole.py | 41 +++++++++++++------------ blackhole_downloader.py | 68 ++++++++++++++++++++++++++++++----------- shared/discord.py | 31 +++++++++++-------- shared/shared.py | 1 + 5 files changed, 93 insertions(+), 49 deletions(-) diff --git a/.env.template b/.env.template index c1df7e9..c392315 100644 --- a/.env.template +++ b/.env.template @@ -38,6 +38,7 @@ BLACKHOLE_SONARR_PATH="TV Shows" BLACKHOLE_FAIL_IF_NOT_CACHED=true BLACKHOLE_RD_MOUNT_REFRESH_SECONDS=200 BLACKHOLE_WAIT_FOR_TORRENT_TIMEOUT=51840 +BLACKHOLE_WAIT_FOR_PROGRESS_CHANGE=720 BLACKHOLE_HISTORY_PAGE_SIZE=500 DISCORD_ENABLED=false diff --git a/blackhole.py b/blackhole.py index c9a87fa..c2f1283 100644 --- a/blackhole.py +++ b/blackhole.py @@ -12,18 +12,15 @@ # import urllib from werkzeug.utils import cached_property from abc import ABC, abstractmethod -from shared.discord import discordError, discordUpdate, discordStatusUpdate +from shared.discord import discordError, discordUpdate from shared.shared import realdebrid, blackhole, plex, mediaExtensions, checkRequiredEnvs from shared.arr import Arr, Radarr, Sonarr from blackhole_downloader import downloader from RTN import parse -import threading rdHost = realdebrid['host'] authToken = realdebrid['apiKey'] shared_dict = {} -# lock = threading.Lock() -webhook = discordStatusUpdate(shared_dict, create=True) _print = print @@ -133,7 +130,8 @@ def submitTorrent(self): return False availableHost = self.getAvailableHost() - self.addTorrent(availableHost) + if self.addTorrent(availableHost) is None: + return None return True @abstractmethod @@ -264,8 +262,11 @@ def addTorrent(self, host): addTorrentResponse = addTorrentRequest.json() self.print('torrent info:', addTorrentResponse) - self.id = addTorrentResponse['id'] - return self.id + if "id" in addTorrentResponse: + self.id = addTorrentResponse['id'] + return self.id + else: + return None class Magnet(TorrentBase): @@ -282,9 +283,11 @@ def addTorrent(self, host): addMagnetResponse = addMagnetRequest.json() self.print('magnet info:', addMagnetResponse) - self.id = addMagnetResponse['id'] - - return self.id + if "id" in addMagnetResponse: + self.id = addMagnetResponse['id'] + return self.id + else: + return None def getPath(isRadarr, create=False): baseWatchPath = blackhole['baseWatchPath'] @@ -415,7 +418,7 @@ async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): os.renames(torrent.file.fileInfo.filePathProcessing, path) elif os.path.exists(file.fileInfo.filePathProcessing): os.remove(file.fileInfo.filePathProcessing) - await downloader(torrent, file, arr, path, shared_dict, lock, webhook) + await downloader(torrent, file, arr, path, shared_dict, lock) elif not first_item: arr.clearBlocklist() os.remove(file.fileInfo.filePathProcessing) @@ -434,9 +437,10 @@ async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): else: torrent = Magnet(f, file, failIfNotCached, onlyLargestFile) - if not torrent.submitTorrent(): + failed = torrent.submitTorrent() + if failed is False: historyItems = await fail(torrent, uncached=True) - else: + elif failed is True: count = 0 while True: count += 1 @@ -565,11 +569,10 @@ async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): print(f"infoCount == {blackhole['waitForTorrentTimeout']} - Failing") await fail(torrent) break - + if os.path.exists(file.fileInfo.filePathProcessing): os.remove(file.fileInfo.filePathProcessing) - print("FILEPATH REMOVING: ", file.fileInfo.filePath) - if os.path.exists(file.fileInfo.filePath): - os.remove(file.fileInfo.filePath) + if os.path.exists(file.fileInfo.filePath): + os.remove(file.fileInfo.filePath) except: e = traceback.format_exc() @@ -640,8 +643,8 @@ async def resumeUncached(lock): for path, arr, isRadarr in paths: for root, dirs, _ in os.walk(path): - if not dirs: - if os.path.exists(root) and not os.listdir(root): + if not dirs and os.path.exists(root): + if not os.listdir(root): os.removedirs(root) continue print(os.listdir(root)) diff --git a/blackhole_downloader.py b/blackhole_downloader.py index 009d489..582c808 100644 --- a/blackhole_downloader.py +++ b/blackhole_downloader.py @@ -4,14 +4,16 @@ from shared.discord import discordError, discordUpdate, discordStatusUpdate from shared.shared import blackhole import re +webhook = None -async def downloader(torrent, file, arr, torrentFile, shared_dict, lock, webhook): +async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): from blackhole import refreshArr availableHost = torrent.getAvailableHost() activeTorrents = torrent.getActiveTorrents() + global webhook while True: - if activeTorrents['limit'] - activeTorrents['nb'] > 0: + if activeTorrents['limit'] - activeTorrents['nb'] -2 > 0: allTorrents = torrent.getAllTorrents() torrentExists = False for at in allTorrents: @@ -26,7 +28,7 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock, webhook lock.acquire() if torrentName in shared_dict: lock.release() - remove_file(torrentFile, lock, shared_dict, torrentName, webhook) + remove_file(torrentFile, lock, shared_dict, "", webhook, current=True) return lock.release() break @@ -38,33 +40,52 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock, webhook top_4_files = all_files[:4] if torrentFile in top_4_files: torrent.getInstantAvailability() - torrent.addTorrent(availableHost) + if torrent.addTorrent(availableHost) is None: + remove_file(torrentFile, lock, shared_dict, "", None, current=True) + return info = torrent.getInfo(refresh=True) torrentName = info['filename'] lock.acquire() try: if shared_dict: shared_dict.update({torrentName : "added"}) - discordStatusUpdate(shared_dict, webhook, edit=True) + webhook = discordStatusUpdate(shared_dict, webhook, edit=True) else: shared_dict.update({torrentName : "added"}) - discordStatusUpdate(shared_dict, webhook) + webhook = discordStatusUpdate(shared_dict, webhook) finally: lock.release() break if not os.path.exists(torrentFile): - break + remove_file(torrentFile, lock, shared_dict, torrentName, webhook, current=True) + return await asyncio.sleep(60) break await asyncio.sleep(60) count = 0 + progress=0 + waitForProgress=0 while True: count += 1 info = torrent.getInfo(refresh=True) + if "status" not in info: + remove_file(torrentFile, lock, shared_dict, torrentName, webhook, current=True) + return status = info['status'] + if torrentName != info['filename']: + lock.acquire() + try: + if torrentName in shared_dict: + del shared_dict[torrentName] + if shared_dict and webhook: + webhook = discordStatusUpdate(shared_dict, webhook, edit=True) + elif webhook and webhook.id: + webhook = discordStatusUpdate(shared_dict, webhook, delete=True) + finally: + lock.release() torrentName = info['filename'] - + print('status:', status) if not os.path.exists(torrentFile): torrent.delete() @@ -74,23 +95,31 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock, webhook torrent.delete() break elif status == 'magnet_conversion' or status == 'queued' or status == 'downloading' or status == 'compressing' or status == 'uploading': - progress = info['progress'] + if progress != info['progress']: + progress = info['progress'] + waitForProgress = 0 + elif waitForProgress >= blackhole['waitForProgressChange']: + torrent.delete() + remove_file(torrentFile, lock, shared_dict, torrentName, webhook, current=True) + return + else: + waitForProgress += 1 print(progress) lock.acquire() try: if shared_dict: shared_dict.update({torrentName : f"Downloading {progress}%"}) - discordStatusUpdate(shared_dict, webhook, edit=True) + webhook = discordStatusUpdate(shared_dict, webhook, edit=True) else: shared_dict.update({torrentName : f"Downloading {progress}%"}) - discordStatusUpdate(shared_dict, webhook) + webhook = discordStatusUpdate(shared_dict, webhook) finally: lock.release() if torrent.incompatibleHashSize and torrent.failIfNotCached: print("Non-cached incompatible hash sized torrent") torrent.delete() break - await asyncio.sleep(5) + await asyncio.sleep(60) elif status == 'magnet_error' or status == 'error' or status == 'dead' or status == 'virus': torrent.delete() break @@ -179,7 +208,7 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock, webhook break remove_file(torrentFile, lock, shared_dict, torrentName, webhook) -def remove_file(torrentFile, lock, shared_dict, torrentName, webhook): +def remove_file(torrentFile, lock, shared_dict, torrentName, webhook, current=False): if os.path.exists(torrentFile): folder_path = os.path.dirname(torrentFile) all_files = glob.glob(os.path.join(folder_path, '*')) @@ -191,7 +220,10 @@ def remove_file(torrentFile, lock, shared_dict, torrentName, webhook): file_index = -1 if file_index != -1: - files_to_remove = all_files[file_index:] + if current: + files_to_remove = [all_files[file_index]] + else: + files_to_remove = all_files[file_index:] for file in files_to_remove: try: os.remove(file) @@ -217,9 +249,9 @@ def remove_file(torrentFile, lock, shared_dict, torrentName, webhook): try: if torrentName in shared_dict: del shared_dict[torrentName] - if shared_dict: - discordStatusUpdate(shared_dict, webhook, edit=True) - else: - discordStatusUpdate(shared_dict, webhook, delete=True) + if shared_dict and webhook and webhook.id: + webhook = discordStatusUpdate(shared_dict, webhook, edit=True) + elif webhook and webhook.id: + webhook = discordStatusUpdate(shared_dict, webhook, delete=True) finally: lock.release() \ No newline at end of file diff --git a/shared/discord.py b/shared/discord.py index ac60331..b27d14a 100644 --- a/shared/discord.py +++ b/shared/discord.py @@ -40,20 +40,23 @@ def discordUpdate(title, message=None): ) response = webhook.execute() -def discordStatusUpdate(torrentDict, webhook=None, create=False, edit=False, delete=False): +def discordStatusUpdate(torrentDict, webhook=None, edit=False, delete=False): if discord['updateEnabled']: + if webhook and webhook.id: + webhook.delete() + + webhook = DiscordWebhook( + url=discord['webhookUrl'], + rate_limit_retry=True, + username='Status Bot' + ) + if delete: - webhook.remove_embeds() embed = DiscordEmbed("Downloading Status", f"No Active Downloads", color=9807270) webhook.add_embed(embed) - response = webhook.edit() - return response - if create: - webhook = DiscordWebhook( - url=discord['webhookUrl'], - rate_limit_retry=True, - username='Status Bot' - ) + # response = webhook.edit() + webhook.__dict__["flags"] = 4096 + response = webhook.execute(remove_embeds=True) return webhook if not edit: @@ -62,14 +65,18 @@ def discordStatusUpdate(torrentDict, webhook=None, create=False, edit=False, del embed.add_embed_field(name=filename, value=progress, inline=False) webhook.add_embed(embed) + # response = webhook.edit() + webhook.__dict__["flags"] = 4096 response = webhook.execute(remove_embeds=True) return webhook else: - webhook.remove_embeds() + # webhook.remove_embeds() # Used for editing embed = DiscordEmbed("Downloading Status", f"Current downloading - {len(torrentDict)}", color=16776960) for filename,progress in torrentDict.items(): embed.add_embed_field(name=filename, value=progress, inline=False) webhook.add_embed(embed) - response = webhook.edit() + # response = webhook.edit() # used for editing + webhook.__dict__["flags"] = 4096 + response = webhook.execute(remove_embeds=True) return webhook \ No newline at end of file diff --git a/shared/shared.py b/shared/shared.py index c97cdeb..c7b426b 100644 --- a/shared/shared.py +++ b/shared/shared.py @@ -28,6 +28,7 @@ def stringEnvParser(value): 'rdMountRefreshSeconds': env.int('BLACKHOLE_RD_MOUNT_REFRESH_SECONDS', default=None), 'waitForTorrentTimeout': env.int('BLACKHOLE_WAIT_FOR_TORRENT_TIMEOUT', default=None), 'historyPageSize': env.int('BLACKHOLE_HISTORY_PAGE_SIZE', default=None), + 'waitForProgressChange': env.int('BLACKHOLE_WAIT_FOR_PROGRESS_CHANGE', default=None), } server = { From 8e4520ef877ffeceebe5a3595a744555a445fa22 Mon Sep 17 00:00:00 2001 From: Priky-one Date: Tue, 24 Dec 2024 10:00:00 -0500 Subject: [PATCH 07/11] Modifications for rd api changes --- blackhole.py | 136 ++++++++++++++++++++++++++-------------- blackhole_downloader.py | 26 ++++---- 2 files changed, 105 insertions(+), 57 deletions(-) diff --git a/blackhole.py b/blackhole.py index c2f1283..52a3511 100644 --- a/blackhole.py +++ b/blackhole.py @@ -144,21 +144,56 @@ def addTorrent(self, host): def getInstantAvailability(self, refresh=False): if refresh or not self._instantAvailability: - torrentHash = self.getHash() - self.print('hash:', torrentHash) - - if len(torrentHash) != 40: - self.incompatibleHashSize = True - return True - - instantAvailabilityRequest = requests.get(f"{rdHost}torrents/instantAvailability/{torrentHash}?auth_token={authToken}") - instantAvailabilities = instantAvailabilityRequest.json() - self.print('instantAvailabilities:', instantAvailabilities) - if not instantAvailabilities: return - instantAvailabilityHosters = next(iter(instantAvailabilities.values())) - if not instantAvailabilityHosters: return - - self._instantAvailability = next(iter(instantAvailabilityHosters.values())) + while True: + activeTorrents = self.getActiveTorrents() + if activeTorrents['limit'] - activeTorrents['nb'] > 0: + availableHost = self.getAvailableHost() + if self.addTorrent(availableHost) is None: + return + count = 1 + while True: + info = self.getInfo(refresh=True) + if count >= 5: + self.delete() + return + if "status" not in info: + return + status = info['status'] + if status == 'waiting_files_selection': + if not self.selectFiles(): + return + elif status == 'magnet_conversion' or status == 'queued' or status == 'compressing' or status == 'uploading': + time.sleep(1) + elif status == 'downloading': + time.sleep(5) + info = self.getInfo(refresh=True) + elif status == 'downloaded': + return True + elif status == 'magnet_error' or status == 'error' or status == 'dead' or status == 'virus': + discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", info) + time.sleep(1) + self.delete() + return + count += 1 + else: + print("Torrent cannot be added, too many active downloads") + print("activeTorrents:", activeTorrents) + time.sleep(10) + # torrentHash = self.getHash() + # self.print('hash:', torrentHash) + + # if len(torrentHash) != 40: + # self.incompatibleHashSize = True + # return True + + # instantAvailabilityRequest = requests.get(f"{rdHost}torrents/instantAvailability/{torrentHash}?auth_token={authToken}") + # instantAvailabilities = instantAvailabilityRequest.json() + # self.print('instantAvailabilities:', instantAvailabilities) + # if not instantAvailabilities: return + # instantAvailabilityHosters = next(iter(instantAvailabilities.values())) + # if not instantAvailabilityHosters: return + + # self._instantAvailability = next(iter(instantAvailabilityHosters.values())) return self._instantAvailability @@ -261,12 +296,16 @@ def addTorrent(self, host): addTorrentRequest = requests.put(f"{rdHost}torrents/addTorrent?host={host}&auth_token={authToken}", data=self.f) addTorrentResponse = addTorrentRequest.json() self.print('torrent info:', addTorrentResponse) - - if "id" in addTorrentResponse: - self.id = addTorrentResponse['id'] - return self.id - else: - return None + return True + # API Not working for torrent so returning true + # if "id" in addTorrentResponse: + # self.id = addTorrentResponse['id'] + # return self.id + # elif addTorrentResponse["error"] == "upload_error": + # return 1 + # else: + # discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", addTorrentResponse) + # return None class Magnet(TorrentBase): @@ -286,7 +325,10 @@ def addTorrent(self, host): if "id" in addMagnetResponse: self.id = addMagnetResponse['id'] return self.id + elif addTorrentResponse["error"] == "upload_error": + return True else: + discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", addMagnetResponse) return None def getPath(isRadarr, create=False): @@ -385,48 +427,50 @@ async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): print(f"Failing") history = arr.getHistory(blackhole['historyPageSize'])['records'] + while not history: + time.sleep(5) + print("Trying to grab items after 5 seconds") + history = arr.getHistory(blackhole['historyPageSize'])['records'] items = (item for item in history if item['data'].get('torrentInfoHash', '').casefold() == torrent.getHash().casefold() or cleanFileName(item['sourceTitle'].casefold()) == torrent.file.fileInfo.filenameWithoutExt.casefold()) - + #TODO Fix sonarr calls here whatever it does when it doesn't downloads other hits + if not items: raise Exception("No history items found to cancel") - first_item = None - total_items = 0 for item in items: - if first_item is None: - first_item = item - # TODO: See if we can fail without blacklisting as cached items constantly changes arr.failHistoryItem(item['id']) arr.removeFailedItem(item['id']) ## Removing from blocklist - total_items += 1 - if uncached and items and first_item: - itemId = str(first_item.get('seriesId', first_item.get('movieId'))) - path = os.path.join(getPath(isRadarr), 'uncached', itemId, torrent.file.fileInfo.filename) + if uncached: + parsedTorrent = parse(torrent.file.fileInfo.filename) + torrentName = parsedTorrent.parsed_title + # itemId = str(first_item.get('seriesId', first_item.get('movieId'))) + path = os.path.join(getPath(isRadarr), 'uncached', torrentName, torrent.file.fileInfo.filename) if not isRadarr: - if total_items == 1: # and first_item["releaseType"] != "SeasonPack" - episodeId = str(first_item['episodeId']) ## Fallback? data --> releaseType --> SeasonPack - path = os.path.join(getPath(isRadarr), 'uncached', itemId, episodeId, torrent.file.fileInfo.filename) + if len(parsedTorrent.seasons) == 1 and len(parsedTorrent.episodes) == 1: # and first_item["releaseType"] != "SeasonPack" + episodeNum = str(parsedTorrent.episodes[0]) ## Fallback? data --> releaseType --> SeasonPack + path = os.path.join(getPath(isRadarr), 'uncached', torrentName, episodeNum, torrent.file.fileInfo.filename) else: seasonPack = 'seasonpack' - parsedTorrent = parse(torrent.file.fileInfo.filename) ## Fallback? episode --> seasonNumber - seasons = [str(pt) for pt in parsedTorrent.season] + if not hasattr(parsedTorrent, 'seasons'): + print("Removing coz seasons not found after parsing.") + if os.path.exists(file.fileInfo.filePathProcessing): + os.remove(file.fileInfo.filePathProcessing) + if os.path.exists(file.fileInfo.filePath): + os.remove(file.fileInfo.filePath) + ## This won't ever happen --> unreachable + return + #else: + seasons = [str(pt) for pt in parsedTorrent.seasons] seasons = "-".join(seasons) - path = os.path.join(getPath(isRadarr), 'uncached', itemId, seasonPack, seasons, torrent.file.fileInfo.filename) + path = os.path.join(getPath(isRadarr), 'uncached', torrentName, seasonPack, seasons, torrent.file.fileInfo.filename) if not os.path.exists(path): os.renames(torrent.file.fileInfo.filePathProcessing, path) elif os.path.exists(file.fileInfo.filePathProcessing): os.remove(file.fileInfo.filePathProcessing) + print("Pushed to downloader") await downloader(torrent, file, arr, path, shared_dict, lock) - elif not first_item: - arr.clearBlocklist() - os.remove(file.fileInfo.filePathProcessing) - if os.path.exists(file.fileInfo.filePath): - os.remove(file.fileInfo.filePath) - return - allItems = arr.getAll() - # TODO: Trigger scan for the deleted torrent which don't exist in history print(f"Failed") @@ -671,4 +715,4 @@ async def resumeUncached(lock): if __name__ == "__main__": - start(isRadarr=sys.argv[1] == 'radarr') \ No newline at end of file + start(isRadarr=sys.argv[1] == 'radarr') diff --git a/blackhole_downloader.py b/blackhole_downloader.py index 582c808..9f4740b 100644 --- a/blackhole_downloader.py +++ b/blackhole_downloader.py @@ -28,7 +28,7 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): lock.acquire() if torrentName in shared_dict: lock.release() - remove_file(torrentFile, lock, shared_dict, "", webhook, current=True) + remove_file(torrentFile, lock, shared_dict, "", current=True) return lock.release() break @@ -41,7 +41,7 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): if torrentFile in top_4_files: torrent.getInstantAvailability() if torrent.addTorrent(availableHost) is None: - remove_file(torrentFile, lock, shared_dict, "", None, current=True) + remove_file(torrentFile, lock, shared_dict, "", current=True) return info = torrent.getInfo(refresh=True) torrentName = info['filename'] @@ -57,7 +57,7 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): lock.release() break if not os.path.exists(torrentFile): - remove_file(torrentFile, lock, shared_dict, torrentName, webhook, current=True) + remove_file(torrentFile, lock, shared_dict, torrentName, current=True) return await asyncio.sleep(60) break @@ -70,7 +70,7 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): count += 1 info = torrent.getInfo(refresh=True) if "status" not in info: - remove_file(torrentFile, lock, shared_dict, torrentName, webhook, current=True) + remove_file(torrentFile, lock, shared_dict, torrentName, current=True) return status = info['status'] if torrentName != info['filename']: @@ -78,9 +78,9 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): try: if torrentName in shared_dict: del shared_dict[torrentName] - if shared_dict and webhook: + if shared_dict and webhook and webhook.id: webhook = discordStatusUpdate(shared_dict, webhook, edit=True) - elif webhook and webhook.id: + elif not shared_dict and webhook and webhook.id: webhook = discordStatusUpdate(shared_dict, webhook, delete=True) finally: lock.release() @@ -94,13 +94,15 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): if not torrent.selectFiles(): torrent.delete() break - elif status == 'magnet_conversion' or status == 'queued' or status == 'downloading' or status == 'compressing' or status == 'uploading': + elif status == 'magnet_conversion' or status == 'queued' or status == 'compressing' or status == 'uploading': + await asyncio.sleep(1) + elif status == 'downloading': if progress != info['progress']: progress = info['progress'] waitForProgress = 0 elif waitForProgress >= blackhole['waitForProgressChange']: torrent.delete() - remove_file(torrentFile, lock, shared_dict, torrentName, webhook, current=True) + remove_file(torrentFile, lock, shared_dict, torrentName, current=True) return else: waitForProgress += 1 @@ -121,6 +123,7 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): break await asyncio.sleep(60) elif status == 'magnet_error' or status == 'error' or status == 'dead' or status == 'virus': + discordError(f"Error: {file.fileInfo.filenameWithoutExt}", info) torrent.delete() break elif status == 'downloaded': @@ -206,9 +209,9 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): print(f"infoCount == {blackhole['waitForTorrentTimeout']} - Failing") torrent.delete() break - remove_file(torrentFile, lock, shared_dict, torrentName, webhook) + remove_file(torrentFile, lock, shared_dict, torrentName) -def remove_file(torrentFile, lock, shared_dict, torrentName, webhook, current=False): +def remove_file(torrentFile, lock, shared_dict, torrentName, current=False): if os.path.exists(torrentFile): folder_path = os.path.dirname(torrentFile) all_files = glob.glob(os.path.join(folder_path, '*')) @@ -247,11 +250,12 @@ def remove_file(torrentFile, lock, shared_dict, torrentName, webhook, current=Fa lock.acquire() try: + global webhook if torrentName in shared_dict: del shared_dict[torrentName] if shared_dict and webhook and webhook.id: webhook = discordStatusUpdate(shared_dict, webhook, edit=True) - elif webhook and webhook.id: + elif not shared_dict and webhook and webhook.id: webhook = discordStatusUpdate(shared_dict, webhook, delete=True) finally: lock.release() \ No newline at end of file From 79a6d1eff2266e14641851367d2ebc089826a077 Mon Sep 17 00:00:00 2001 From: Priky-one Date: Sat, 22 Mar 2025 15:12:03 +0530 Subject: [PATCH 08/11] Fixed torrent wait timeout --- blackhole.py | 29 ++++++++++++++++++++++++----- blackhole_downloader.py | 7 ++++--- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/blackhole.py b/blackhole.py index 52a3511..e648b8f 100644 --- a/blackhole.py +++ b/blackhole.py @@ -161,6 +161,7 @@ def getInstantAvailability(self, refresh=False): status = info['status'] if status == 'waiting_files_selection': if not self.selectFiles(): + self.delete() return elif status == 'magnet_conversion' or status == 'queued' or status == 'compressing' or status == 'uploading': time.sleep(1) @@ -168,6 +169,7 @@ def getInstantAvailability(self, refresh=False): time.sleep(5) info = self.getInfo(refresh=True) elif status == 'downloaded': + self.delete() return True elif status == 'magnet_error' or status == 'error' or status == 'dead' or status == 'virus': discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", info) @@ -238,15 +240,31 @@ def getAllTorrents(self): return allTorrents - def selectFiles(self): + def selectFiles(self, uncached=False): self._enforceId() info = self.getInfo() self.print('files:', info['files']) mediaFiles = [file for file in info['files'] if os.path.splitext(file['path'])[1] in mediaExtensions] + if info['files'] and not mediaFiles and not uncached: + self.print('no media files found --> pushing to uncached') + discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", "No media files found --> pushing to uncached") + return False + elif not info['files'] and uncached: + total_time = 0 + while info['files'] or total_time == 300: + info = self.getInfo(refresh=True) + mediaFiles = [file for file in info['files'] if os.path.splitext(file['path'])[1] in mediaExtensions] + time.sleep(10) + total_time += 10 + if not info['files']: + self.print("unable to parse magnet files") + discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", "Unable to parse magnet files") + return False if not mediaFiles: - self.print('no media files found') + self.print("no media files found --> prolly virus bruh") + discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", "No media Files Found in uncached") return False mediaFileIds = {str(file['id']) for file in mediaFiles} @@ -325,7 +343,7 @@ def addTorrent(self, host): if "id" in addMagnetResponse: self.id = addMagnetResponse['id'] return self.id - elif addTorrentResponse["error"] == "upload_error": + elif addMagnetResponse["error"] == "upload_error": return True else: discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", addMagnetResponse) @@ -522,7 +540,7 @@ async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): folderPathMountOriginalFilenameTorrent = os.path.join(blackhole['rdMountTorrentsPath'], originalFilename) folderPathMountOriginalFilenameWithoutExtTorrent = os.path.join(blackhole['rdMountTorrentsPath'], os.path.splitext(originalFilename)[0]) - while existsCount <= blackhole['waitForTorrentTimeout']: + while True: existsCount += 1 if os.path.exists(folderPathMountFilenameTorrent) and os.listdir(folderPathMountFilenameTorrent): @@ -598,9 +616,10 @@ async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): # await asyncio.get_running_loop().run_in_executor(None, copyFiles, file, folderPathMountTorrent, arr) break - if existsCount == blackhole['rdMountRefreshSeconds'] + 1: + if existsCount >= blackhole['rdMountRefreshSeconds'] + 1: print(f"Torrent folder not found in filesystem: {file.fileInfo.filenameWithoutExt}") discordError("Torrent folder not found in filesystem", file.fileInfo.filenameWithoutExt) + return False await asyncio.sleep(1) break diff --git a/blackhole_downloader.py b/blackhole_downloader.py index 9f4740b..405e730 100644 --- a/blackhole_downloader.py +++ b/blackhole_downloader.py @@ -91,7 +91,7 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): torrent.delete() break if status == 'waiting_files_selection': - if not torrent.selectFiles(): + if not torrent.selectFiles(uncached=True): torrent.delete() break elif status == 'magnet_conversion' or status == 'queued' or status == 'compressing' or status == 'uploading': @@ -137,7 +137,7 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): folderPathMountOriginalFilenameTorrent = os.path.join(blackhole['rdMountTorrentsPath'], originalFilename) folderPathMountOriginalFilenameWithoutExtTorrent = os.path.join(blackhole['rdMountTorrentsPath'], os.path.splitext(originalFilename)[0]) - while existsCount <= blackhole['waitForTorrentTimeout']: + while True: existsCount += 1 if os.path.exists(folderPathMountFilenameTorrent) and os.listdir(folderPathMountFilenameTorrent): @@ -194,9 +194,10 @@ async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): await refreshArr(arr) break - if existsCount == blackhole['rdMountRefreshSeconds'] + 1: + if existsCount >= blackhole['rdMountRefreshSeconds'] + 1: print(f"Torrent folder not found in filesystem: {file.fileInfo.filenameWithoutExt}") discordError("Torrent folder not found in filesystem", file.fileInfo.filenameWithoutExt) + return False await asyncio.sleep(1) break From 64ef88f8bbb6bfe626f12522390121b24889e3d7 Mon Sep 17 00:00:00 2001 From: Priky-one Date: Sat, 22 Mar 2025 15:25:54 +0530 Subject: [PATCH 09/11] updated repair from main --- repair.py | 200 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 124 insertions(+), 76 deletions(-) diff --git a/repair.py b/repair.py index 9350592..81dfd4c 100644 --- a/repair.py +++ b/repair.py @@ -1,120 +1,168 @@ import os import argparse import time +import traceback +from shared.debrid import validateRealdebridMountTorrentsPath, validateTorboxMountTorrentsPath from shared.arr import Sonarr, Radarr -from shared.discord import discordUpdate -from shared.shared import repair, intersperse +from shared.discord import discordUpdate, discordError +from shared.shared import repair, realdebrid, torbox, intersperse, ensureTuple +from datetime import datetime -def parse_interval(interval_str): +def parseInterval(intervalStr): """Parse a smart interval string (e.g., '1w2d3h4m5s') into seconds.""" - if not interval_str: + if not intervalStr: return 0 - total_seconds = 0 - time_dict = {'w': 604800, 'd': 86400, 'h': 3600, 'm': 60, 's': 1} - current_number = '' - for char in interval_str: + totalSeconds = 0 + timeDict = {'w': 604800, 'd': 86400, 'h': 3600, 'm': 60, 's': 1} + currentNumber = '' + for char in intervalStr: if char.isdigit(): - current_number += char - elif char in time_dict and current_number: - total_seconds += int(current_number) * time_dict[char] - current_number = '' - return total_seconds - + currentNumber += char + elif char in timeDict and currentNumber: + totalSeconds += int(currentNumber) * timeDict[char] + currentNumber = '' + return totalSeconds # Parse arguments for dry run, no confirm options, and optional intervals -parser = argparse.ArgumentParser(description='Repair broken symlinks and manage media files.') +parser = argparse.ArgumentParser(description='Repair broken symlinks or missing files.') parser.add_argument('--dry-run', action='store_true', help='Perform a dry run without making any changes.') parser.add_argument('--no-confirm', action='store_true', help='Execute without confirmation prompts.') parser.add_argument('--repair-interval', type=str, default=repair['repairInterval'], help='Optional interval in smart format (e.g. 1h2m3s) to wait between repairing each media file.') parser.add_argument('--run-interval', type=str, default=repair['runInterval'], help='Optional interval in smart format (e.g. 1w2d3h4m5s) to run the repair process.') +parser.add_argument('--mode', type=str, choices=['symlink', 'file'], default='symlink', help='Choose repair mode: `symlink` or `file`. `symlink` to repair broken symlinks and `file` to repair missing files.') +parser.add_argument('--season-packs', action='store_true', help='Upgrade to season-packs when a non-season-pack is found. Only applicable in symlink mode.') +parser.add_argument('--include-unmonitored', action='store_true', help='Include unmonitored media in the repair process') args = parser.parse_args() +_print = print + +def print(*values: object): + _print(f"[{datetime.now()}] [{args.mode}]", *values) + if not args.repair_interval and not args.run_interval: print("Running repair once") else: print(f"Running repair{' once every ' + args.run_interval if args.run_interval else ''}{', and waiting ' + args.repair_interval + ' between each repair.' if args.repair_interval else '.'}") try: - repair_interval_seconds = parse_interval(args.repair_interval) + repairIntervalSeconds = parseInterval(args.repair_interval) except Exception as e: print(f"Invalid interval format for repair interval: {args.repair_interval}") exit(1) try: - run_interval_seconds = parse_interval(args.run_interval) + runIntervalSeconds = parseInterval(args.run_interval) except Exception as e: print(f"Invalid interval format for run interval: {args.run_interval}") exit(1) def main(): + if unsafe(): + print("One or both debrid services are not working properly. Skipping repair.") + discordError(f"[{args.mode}] One or both debrid services are not working properly. Skipping repair.") + return + print("Collecting media...") sonarr = Sonarr() radarr = Radarr() - sonarrMedia = [(sonarr, media) for media in sonarr.getAll() if media.anyMonitoredChildren] - radarrMedia = [(radarr, media) for media in radarr.getAll() if media.anyMonitoredChildren] + sonarrMedia = [(sonarr, media) for media in sonarr.getAll() if args.include_unmonitored or media.anyMonitoredChildren] + radarrMedia = [(radarr, media) for media in radarr.getAll() if args.include_unmonitored or media.anyMonitoredChildren] print("Finished collecting media.") for arr, media in intersperse(sonarrMedia, radarrMedia): - files = {} - for file in arr.getFiles(media): - if file.parentId in files: - files[file.parentId].append(file) - else: - files[file.parentId] = [file] - for childId in media.monitoredChildrenIds: - realPaths = [] - brokenSymlinks = [] - - childFiles = files.get(childId, []) - for childFile in childFiles: - - fullPath = childFile.path - realPath = os.path.realpath(fullPath) - realPaths.append(realPath) - - - if os.path.islink(fullPath) and not os.path.exists(realPath): - brokenSymlinks.append(realPath) - - # If not full season just repair individual episodes? - if brokenSymlinks: - print("Title:", media.title) - print("Movie ID/Season Number:", childId) - print("Broken symlinks:") - [print(brokenSymlink) for brokenSymlink in brokenSymlinks] - print() - if args.dry_run or args.no_confirm or input("Do you want to delete and re-grab? (y/n): ").lower() == 'y': - discordUpdate(f"Repairing... {media.title} - {childId}") - print("Deleting files:") - [print(childFile.path) for childFile in childFiles] - if not args.dry_run: - results = arr.deleteFiles(childFiles) - print("Remonitoring") - media = arr.get(media.id) - media.setChildMonitored(childId, False) - arr.put(media) - media.setChildMonitored(childId, True) - arr.put(media) - print("Searching for new files") - results = arr.automaticSearch(media, childId) - print(results) - - if repair_interval_seconds > 0: - time.sleep(repair_interval_seconds) - else: - print("Skipping") - print() - else: - parentFolders = set(os.path.dirname(path) for path in realPaths) - if childId in media.fullyAvailableChildrenIds and len(parentFolders) > 1: + try: + if unsafe(): + print("One or both debrid services are not working properly. Skipping repair.") + discordError(f"[{args.mode}] One or both debrid services are not working properly. Skipping repair.") + return + + getItems = lambda media, childId: arr.getFiles(media=media, childId=childId) if args.mode == 'symlink' else arr.getHistory(media=media, childId=childId, includeGrandchildDetails=True) + childrenIds = media.childrenIds if args.include_unmonitored else media.monitoredChildrenIds + + for childId in childrenIds: + brokenItems = [] + childItems = list(getItems(media=media, childId=childId)) + + for item in childItems: + if args.mode == 'symlink': + fullPath = item.path + if os.path.islink(fullPath): + destinationPath = os.readlink(fullPath) + if ((realdebrid['enabled'] and destinationPath.startswith(realdebrid['mountTorrentsPath']) and not os.path.exists(destinationPath)) or + (torbox['enabled'] and destinationPath.startswith(torbox['mountTorrentsPath']) and not os.path.exists(os.path.realpath(fullPath)))): + brokenItems.append(os.path.realpath(fullPath)) + else: # file mode + if item.reason == 'MissingFromDisk' and item.parentId not in media.fullyAvailableChildrenIds: + brokenItems.append(item.sourceTitle) + + if brokenItems: print("Title:", media.title) print("Movie ID/Season Number:", childId) - print("Inconsistent folders:") - [print(parentFolder) for parentFolder in parentFolders] + print("Broken items:") + [print(item) for item in brokenItems] print() + if args.dry_run or args.no_confirm or input("Do you want to delete and re-grab? (y/n): ").lower() == 'y': + if not args.dry_run: + discordUpdate(f"[{args.mode}] Repairing {media.title}: {childId}") + if args.mode == 'symlink': + print("Deleting files:") + [print(item.path) for item in childItems] + results = arr.deleteFiles(childItems) + print("Re-monitoring") + media = arr.get(media.id) + media.setChildMonitored(childId, False) + arr.put(media) + media.setChildMonitored(childId, True) + arr.put(media) + print("Searching for new files") + results = arr.automaticSearch(media, childId) + print(results) + + if repairIntervalSeconds > 0: + time.sleep(repairIntervalSeconds) + else: + print("Skipping") + print() + elif args.mode == 'symlink': + realPaths = [os.path.realpath(item.path) for item in childItems] + parentFolders = set(os.path.dirname(path) for path in realPaths) + if childId in media.fullyAvailableChildrenIds and len(parentFolders) > 1: + print("Title:", media.title) + print("Movie ID/Season Number:", childId) + print("Non-season-pack folders:") + [print(parentFolder) for parentFolder in parentFolders] + print() + if args.season_packs: + print("Searching for season-pack") + results = arr.automaticSearch(media, childId) + print(results) + + if repairIntervalSeconds > 0: + time.sleep(repairIntervalSeconds) + + except Exception: + e = traceback.format_exc() + + print(f"An error occurred while processing {media.title}: {e}") + discordError(f"[{args.mode}] An error occurred while processing {media.title}", e) -if run_interval_seconds > 0: + print("Repair complete") + discordUpdate(f"[{args.mode}] Repair complete") + +def unsafe(): + return (args.mode == 'symlink' and + ((realdebrid['enabled'] and not ensureTuple(validateRealdebridMountTorrentsPath())[0]) or + (torbox['enabled'] and not ensureTuple(validateTorboxMountTorrentsPath())[0]))) + +if runIntervalSeconds > 0: while True: - main() - time.sleep(run_interval_seconds) + try: + main() + time.sleep(runIntervalSeconds) + except Exception: + e = traceback.format_exc() + + print(f"An error occurred in the main loop: {e}") + discordError(f"[{args.mode}] An error occurred in the main loop", e) + time.sleep(runIntervalSeconds) # Still wait before retrying else: main() From 1c051774a01b1172ec9379ab11b9dbe9eec65271 Mon Sep 17 00:00:00 2001 From: Priky-one Date: Sat, 22 Mar 2025 15:31:04 +0530 Subject: [PATCH 10/11] Updated shared/debrid.py from main to dev --- shared/debrid.py | 499 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 499 insertions(+) create mode 100644 shared/debrid.py diff --git a/shared/debrid.py b/shared/debrid.py new file mode 100644 index 0000000..d095abb --- /dev/null +++ b/shared/debrid.py @@ -0,0 +1,499 @@ +import asyncio +import os +import re +import hashlib +import requests +from abc import ABC, abstractmethod +from urllib.parse import urljoin +from datetime import datetime +from shared.discord import discordUpdate +from shared.requests import retryRequest +from shared.shared import realdebrid, torbox, mediaExtensions, checkRequiredEnvs + +def validateDebridEnabled(): + if not realdebrid['enabled'] and not torbox['enabled']: + return False, "At least one of RealDebrid or Torbox must be enabled." + return True + +def validateRealdebridHost(): + url = urljoin(realdebrid['host'], "time") + try: + response = requests.get(url) + return response.status_code == 200 + except Exception as e: + return False + +def validateRealdebridApiKey(): + url = urljoin(realdebrid['host'], "user") + headers = {'Authorization': f'Bearer {realdebrid["apiKey"]}'} + try: + response = requests.get(url, headers=headers) + + if response.status_code == 401: + return False, "Invalid or expired API key." + elif response.status_code == 403: + return False, "Permission denied, account locked." + except Exception as e: + return False + + return True + +def validateRealdebridMountTorrentsPath(): + path = realdebrid['mountTorrentsPath'] + if os.path.exists(path) and any(os.path.isdir(os.path.join(path, child)) for child in os.listdir(path)): + return True + else: + return False, "Path does not exist or has no children." + +def validateTorboxHost(): + url = urljoin(torbox['host'], "stats") + try: + response = requests.get(url) + return response.status_code == 200 + except Exception as e: + return False + +def validateTorboxApiKey(): + url = urljoin(torbox['host'], "user/me") + headers = {'Authorization': f'Bearer {torbox["apiKey"]}'} + try: + response = requests.get(url, headers=headers) + + if response.status_code == 401: + return False, "Invalid or expired API key." + elif response.status_code == 403: + return False, "Permission denied, account locked." + except Exception as e: + return False + + return True + +def validateTorboxMountTorrentsPath(): + path = torbox['mountTorrentsPath'] + if os.path.exists(path) and any(os.path.isdir(os.path.join(path, child)) for child in os.listdir(path)): + return True + else: + return False, "Path does not exist or has no children." + +requiredEnvs = { + 'RealDebrid/TorBox enabled': (True, validateDebridEnabled), +} + +if realdebrid['enabled']: + requiredEnvs.update({ + 'RealDebrid host': (realdebrid['host'], validateRealdebridHost), + 'RealDebrid API key': (realdebrid['apiKey'], validateRealdebridApiKey, True), + 'RealDebrid mount torrents path': (realdebrid['mountTorrentsPath'], validateRealdebridMountTorrentsPath) + }) + +if torbox['enabled']: + requiredEnvs.update({ + 'Torbox host': (torbox['host'], validateTorboxHost), + 'Torbox API key': (torbox['apiKey'], validateTorboxApiKey, True), + 'Torbox mount torrents path': (torbox['mountTorrentsPath'], validateTorboxMountTorrentsPath) + }) + +checkRequiredEnvs(requiredEnvs) + +class TorrentBase(ABC): + STATUS_WAITING_FILES_SELECTION = 'waiting_files_selection' + STATUS_DOWNLOADING = 'downloading' + STATUS_COMPLETED = 'completed' + STATUS_ERROR = 'error' + + def __init__(self, f, fileData, file, failIfNotCached, onlyLargestFile) -> None: + super().__init__() + self.f = f + self.fileData = fileData + self.file = file + self.failIfNotCached = failIfNotCached + self.onlyLargestFile = onlyLargestFile + self.skipAvailabilityCheck = False + self.id = None + self._info = None + self._hash = None + self._instantAvailability = None + + def print(self, *values: object): + print(f"[{datetime.now()}] [{self.__class__.__name__}] [{self.file.fileInfo.filenameWithoutExt}]", *values) + + @abstractmethod + def submitTorrent(self): + pass + + @abstractmethod + def getHash(self): + pass + + @abstractmethod + def addTorrent(self): + pass + + @abstractmethod + async def getInfo(self, refresh=False): + pass + + @abstractmethod + async def selectFiles(self): + pass + + @abstractmethod + def delete(self): + pass + + @abstractmethod + async def getTorrentPath(self): + pass + + @abstractmethod + def _addTorrentFile(self): + pass + + @abstractmethod + def _addMagnetFile(self): + pass + + def _enforceId(self): + if not self.id: + raise Exception("Id is required. Must be acquired via successfully running submitTorrent() first.") + +class RealDebrid(TorrentBase): + def __init__(self, f, fileData, file, failIfNotCached, onlyLargestFile) -> None: + super().__init__(f, fileData, file, failIfNotCached, onlyLargestFile) + self.headers = {'Authorization': f'Bearer {realdebrid["apiKey"]}'} + self.mountTorrentsPath = realdebrid["mountTorrentsPath"] + + def submitTorrent(self): + if self.failIfNotCached: + instantAvailability = self._getInstantAvailability() + self.print('instantAvailability:', not not instantAvailability) + if not instantAvailability: + return False + + return not not self.addTorrent() + + def _getInstantAvailability(self, refresh=False): + torrentHash = self.getHash() + self.print('hash:', torrentHash) + self.skipAvailabilityCheck = True + + return True + + def _getAvailableHost(self): + availableHostsRequest = retryRequest( + lambda: requests.get(urljoin(realdebrid['host'], "torrents/availableHosts"), headers=self.headers), + print=self.print + ) + if availableHostsRequest is None: + return None + + availableHosts = availableHostsRequest.json() + return availableHosts[0]['host'] + + async def getInfo(self, refresh=False): + self._enforceId() + + if refresh or not self._info: + infoRequest = retryRequest( + lambda: requests.get(urljoin(realdebrid['host'], f"torrents/info/{self.id}"), headers=self.headers), + print=self.print + ) + if infoRequest is None: + self._info = None + else: + info = infoRequest.json() + info['status'] = self._normalize_status(info['status']) + self._info = info + + return self._info + + async def selectFiles(self): + self._enforceId() + + info = await self.getInfo() + if info is None: + return False + + self.print('files:', info['files']) + mediaFiles = [file for file in info['files'] if os.path.splitext(file['path'])[1].lower() in mediaExtensions] + + if not mediaFiles: + self.print('no media files found') + return False + + mediaFileIds = {str(file['id']) for file in mediaFiles} + self.print('required fileIds:', mediaFileIds) + + largestMediaFile = max(mediaFiles, key=lambda file: file['bytes']) + largestMediaFileId = str(largestMediaFile['id']) + self.print('only largest file:', self.onlyLargestFile) + self.print('largest file:', largestMediaFile) + + if self.onlyLargestFile and len(mediaFiles) > 1: + discordUpdate('largest file:', largestMediaFile['path']) + + files = {'files': [largestMediaFileId] if self.onlyLargestFile else ','.join(mediaFileIds)} + selectFilesRequest = retryRequest( + lambda: requests.post(urljoin(realdebrid['host'], f"torrents/selectFiles/{self.id}"), headers=self.headers, data=files), + print=self.print + ) + if selectFilesRequest is None: + return False + + return True + + def delete(self): + self._enforceId() + + deleteRequest = retryRequest( + lambda: requests.delete(urljoin(realdebrid['host'], f"torrents/delete/{self.id}"), headers=self.headers), + print=self.print + ) + return not not deleteRequest + + + async def getTorrentPath(self): + filename = (await self.getInfo())['filename'] + originalFilename = (await self.getInfo())['original_filename'] + + folderPathMountFilenameTorrent = os.path.join(self.mountTorrentsPath, filename) + folderPathMountOriginalFilenameTorrent = os.path.join(self.mountTorrentsPath, originalFilename) + folderPathMountOriginalFilenameWithoutExtTorrent = os.path.join(self.mountTorrentsPath, os.path.splitext(originalFilename)[0]) + + if os.path.exists(folderPathMountFilenameTorrent) and os.listdir(folderPathMountFilenameTorrent): + folderPathMountTorrent = folderPathMountFilenameTorrent + elif os.path.exists(folderPathMountOriginalFilenameTorrent) and os.listdir(folderPathMountOriginalFilenameTorrent): + folderPathMountTorrent = folderPathMountOriginalFilenameTorrent + elif (originalFilename.endswith(('.mkv', '.mp4')) and + os.path.exists(folderPathMountOriginalFilenameWithoutExtTorrent) and os.listdir(folderPathMountOriginalFilenameWithoutExtTorrent)): + folderPathMountTorrent = folderPathMountOriginalFilenameWithoutExtTorrent + else: + folderPathMountTorrent = None + + return folderPathMountTorrent + + def _addFile(self, request, endpoint, data): + host = self._getAvailableHost() + if host is None: + return None + + request = retryRequest( + lambda: request(urljoin(realdebrid['host'], endpoint), params={'host': host}, headers=self.headers, data=data), + print=self.print + ) + if request is None: + return None + + response = request.json() + self.print('response info:', response) + self.id = response['id'] + + return self.id + + def _addTorrentFile(self): + return self._addFile(requests.put, "torrents/addTorrent", self.f) + + def _addMagnetFile(self): + return self._addFile(requests.post, "torrents/addMagnet", {'magnet': self.fileData}) + + def _normalize_status(self, status): + if status in ['waiting_files_selection']: + return self.STATUS_WAITING_FILES_SELECTION + elif status in ['magnet_conversion', 'queued', 'downloading', 'compressing', 'uploading']: + return self.STATUS_DOWNLOADING + elif status == 'downloaded': + return self.STATUS_COMPLETED + elif status in ['magnet_error', 'error', 'dead', 'virus']: + return self.STATUS_ERROR + return status + +class Torbox(TorrentBase): + def __init__(self, f, fileData, file, failIfNotCached, onlyLargestFile) -> None: + super().__init__(f, fileData, file, failIfNotCached, onlyLargestFile) + self.headers = {'Authorization': f'Bearer {torbox["apiKey"]}'} + self.mountTorrentsPath = torbox["mountTorrentsPath"] + self.submittedTime = None + self.lastInactiveCheck = None + + userInfoRequest = retryRequest( + lambda: requests.get(urljoin(torbox['host'], "user/me"), headers=self.headers), + print=self.print + ) + if userInfoRequest is not None: + userInfo = userInfoRequest.json() + self.authId = userInfo['data']['auth_id'] + + def submitTorrent(self): + if self.failIfNotCached: + instantAvailability = self._getInstantAvailability() + self.print('instantAvailability:', not not instantAvailability) + if not instantAvailability: + return False + + if self.addTorrent(): + self.submittedTime = datetime.now() + return True + return False + + def _getInstantAvailability(self, refresh=False): + if refresh or not self._instantAvailability: + torrentHash = self.getHash() + self.print('hash:', torrentHash) + + instantAvailabilityRequest = retryRequest( + lambda: requests.get( + urljoin(torbox['host'], "torrents/checkcached"), + headers=self.headers, + params={'hash': torrentHash, 'format': 'object'} + ), + print=self.print + ) + if instantAvailabilityRequest is None: + return None + + instantAvailabilities = instantAvailabilityRequest.json() + self.print('instantAvailabilities:', instantAvailabilities) + + # Check if 'data' exists and is not None or False + if instantAvailabilities and 'data' in instantAvailabilities and instantAvailabilities['data']: + self._instantAvailability = instantAvailabilities['data'] + else: + self._instantAvailability = None + + return self._instantAvailability + + async def getInfo(self, refresh=False): + self._enforceId() + + if refresh or not self._info: + if not self.authId: + return None + + currentTime = datetime.now() + if (currentTime - self.submittedTime).total_seconds() < 300: + if not self.lastInactiveCheck or (currentTime - self.lastInactiveCheck).total_seconds() > 5: + inactiveCheckUrl = f"https://relay.torbox.app/v1/inactivecheck/torrent/{self.authId}/{self.id}" + retryRequest( + lambda: requests.get(inactiveCheckUrl), + print=self.print + ) + self.lastInactiveCheck = currentTime + for _ in range(60): + infoRequest = retryRequest( + lambda: requests.get(urljoin(torbox['host'], "torrents/mylist"), headers=self.headers), + print=self.print + ) + if infoRequest is None: + return None + + torrents = infoRequest.json()['data'] + + for torrent in torrents: + if torrent['id'] == self.id: + torrent['status'] = self._normalize_status(torrent['download_state'], torrent['download_finished']) + self._info = torrent + return self._info + + await asyncio.sleep(1) + return self._info + + async def selectFiles(self): + pass + + def delete(self): + self._enforceId() + + deleteRequest = retryRequest( + lambda: requests.delete(urljoin(torbox['host'], "torrents/controltorrent"), headers=self.headers, data={'torrent_id': self.id, 'operation': "Delete"}), + print=self.print + ) + return not not deleteRequest + + async def getTorrentPath(self): + filename = (await self.getInfo())['files'][0]['name'].split("/")[0] + + folderPathMountFilenameTorrent = os.path.join(self.mountTorrentsPath, filename) + + if os.path.exists(folderPathMountFilenameTorrent) and os.listdir(folderPathMountFilenameTorrent): + folderPathMountTorrent = folderPathMountFilenameTorrent + else: + folderPathMountTorrent = None + + return folderPathMountTorrent + + def _addFile(self, data=None, files=None): + request = retryRequest( + lambda: requests.post(urljoin(torbox['host'], "torrents/createtorrent"), headers=self.headers, data=data, files=files), + print=self.print + ) + if request is None: + return None + + response = request.json() + self.print('response info:', response) + + if response.get('detail') == 'queued': + return None + + self.id = response['data']['torrent_id'] + + return self.id + + def _addTorrentFile(self): + nametorrent = self.f.name.split('/')[-1] + files = {'file': (nametorrent, self.f, 'application/x-bittorrent')} + return self._addFile(files=files) + + def _addMagnetFile(self): + return self._addFile(data={'magnet': self.fileData}) + + def _normalize_status(self, status, download_finished): + if download_finished: + return self.STATUS_COMPLETED + elif status in [ + 'completed', 'cached', 'paused', 'downloading', 'uploading', + 'checkingResumeData', 'metaDL', 'pausedUP', 'queuedUP', 'checkingUP', + 'forcedUP', 'allocating', 'downloading', 'metaDL', 'pausedDL', + 'queuedDL', 'checkingDL', 'forcedDL', 'checkingResumeData', 'moving' + ]: + return self.STATUS_DOWNLOADING + elif status in ['error', 'stalledUP', 'stalledDL', 'stalled (no seeds)', 'missingFiles', 'failed']: + return self.STATUS_ERROR + return status + +class Torrent(TorrentBase): + def getHash(self): + + if not self._hash: + import bencode3 + self._hash = hashlib.sha1(bencode3.bencode(bencode3.bdecode(self.fileData)['info'])).hexdigest() + + return self._hash + + def addTorrent(self): + return self._addTorrentFile() + +class Magnet(TorrentBase): + def getHash(self): + + if not self._hash: + # Consider changing when I'm more familiar with hashes + self._hash = re.search('xt=urn:btih:(.+?)(?:&|$)', self.fileData).group(1) + + return self._hash + + def addTorrent(self): + return self._addMagnetFile() + + +class RealDebridTorrent(RealDebrid, Torrent): + pass + +class RealDebridMagnet(RealDebrid, Magnet): + pass + +class TorboxTorrent(Torbox, Torrent): + pass + +class TorboxMagnet(Torbox, Magnet): + pass From 3f598bc7d796acc82c7e9656dda345e4f9acf2bc Mon Sep 17 00:00:00 2001 From: Priky-one Date: Sat, 22 Mar 2025 15:32:17 +0530 Subject: [PATCH 11/11] Updated shared/requests.py from main to dev --- shared/requests.py | 60 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 shared/requests.py diff --git a/shared/requests.py b/shared/requests.py new file mode 100644 index 0000000..fe689a8 --- /dev/null +++ b/shared/requests.py @@ -0,0 +1,60 @@ +import time +import requests +from typing import Callable, Optional +from shared.discord import discordError, discordUpdate + + +def retryRequest( + requestFunc: Callable[[], requests.Response], + print: Callable[..., None] = print, + retries: int = 1, + delay: int = 1 +) -> Optional[requests.Response]: + """ + Retry a request if the response status code is not in the 200 range. + + :param requestFunc: A callable that returns an HTTP response. + :param print: Optional print function for logging. + :param retries: The number of times to retry the request after the initial attempt. + :param delay: The delay between retries in seconds. + :return: The response object or None if all attempts fail. + """ + attempts = retries + 1 # Total attempts including the initial one + for attempt in range(attempts): + try: + response = requestFunc() + if 200 <= response.status_code < 300: + return response + else: + message = [ + f"URL: {response.url}", + f"Status code: {response.status_code}", + f"Message: {response.reason}", + f"Response: {response.content}", + f"Attempt {attempt + 1} failed" + ] + for line in message: + print(line) + if attempt == retries: + discordError("Request Failed", "\n".join(message)) + else: + update_message = message + [f"Retrying in {delay} seconds..."] + discordUpdate("Retrying Request", "\n".join(update_message)) + print(f"Retrying in {delay} seconds...") + time.sleep(delay) + except requests.RequestException as e: + message = [ + f"URL: {response.url if 'response' in locals() else 'unknown'}", + f"Attempt {attempt + 1} encountered an error: {e}" + ] + for line in message: + print(line) + if attempt == retries: + discordError("Request Exception", "\n".join(message)) + else: + update_message = message + [f"Retrying in {delay} seconds..."] + discordUpdate("Retrying Request", "\n".join(update_message)) + print(f"Retrying in {delay} seconds...") + time.sleep(delay) + + return None \ No newline at end of file