diff --git a/.env.template b/.env.template index 1daca0a..c392315 100644 --- a/.env.template +++ b/.env.template @@ -37,7 +37,8 @@ BLACKHOLE_RADARR_PATH="Movies" BLACKHOLE_SONARR_PATH="TV Shows" BLACKHOLE_FAIL_IF_NOT_CACHED=true BLACKHOLE_RD_MOUNT_REFRESH_SECONDS=200 -BLACKHOLE_WAIT_FOR_TORRENT_TIMEOUT=60 +BLACKHOLE_WAIT_FOR_TORRENT_TIMEOUT=51840 +BLACKHOLE_WAIT_FOR_PROGRESS_CHANGE=720 BLACKHOLE_HISTORY_PAGE_SIZE=500 DISCORD_ENABLED=false diff --git a/Dockerfile.blackhole b/Dockerfile.blackhole index 0be884d..d98cb1f 100644 --- a/Dockerfile.blackhole +++ b/Dockerfile.blackhole @@ -1,4 +1,4 @@ -FROM python:3.8-slim +FROM python:3.11-slim # Metadata labels LABEL org.opencontainers.image.source="https://github.com/westsurname/scripts" diff --git a/blackhole.py b/blackhole.py index 2c5df71..e648b8f 100644 --- a/blackhole.py +++ b/blackhole.py @@ -15,9 +15,12 @@ from shared.discord import discordError, discordUpdate from shared.shared import realdebrid, blackhole, plex, mediaExtensions, checkRequiredEnvs from shared.arr import Arr, Radarr, Sonarr +from blackhole_downloader import downloader +from RTN import parse rdHost = realdebrid['host'] authToken = realdebrid['apiKey'] +shared_dict = {} _print = print @@ -80,13 +83,13 @@ def __init__(self, isTorrentOrMagnet, isDotTorrentFile) -> None: self.isTorrentOrMagnet = isTorrentOrMagnet self.isDotTorrentFile = isDotTorrentFile - def __init__(self, filename, isRadarr) -> None: + def __init__(self, filename, isRadarr, filePath=None) -> None: print('filename:', filename) baseBath = getPath(isRadarr) isDotTorrentFile = filename.casefold().endswith('.torrent') isTorrentOrMagnet = isDotTorrentFile or filename.casefold().endswith('.magnet') filenameWithoutExt, _ = os.path.splitext(filename) - filePath = os.path.join(baseBath, filename) + filePath = filePath or os.path.join(baseBath, filename) filePathProcessing = os.path.join(baseBath, 'processing', filename) folderPathCompleted = os.path.join(baseBath, 'completed', filenameWithoutExt) folderPathMountTorrent = os.path.join(blackhole['rdMountTorrentsPath'], filenameWithoutExt) @@ -96,11 +99,10 @@ def __init__(self, filename, isRadarr) -> None: class TorrentBase(ABC): - def __init__(self, f, file, fail, failIfNotCached, onlyLargestFile) -> None: + def __init__(self, f, file, failIfNotCached, onlyLargestFile) -> None: super().__init__() self.f = f self.file = file - self.fail = fail self.failIfNotCached = failIfNotCached self.onlyLargestFile = onlyLargestFile self.id = None @@ -125,11 +127,11 @@ def submitTorrent(self): instantAvailability = self.getInstantAvailability() self.print('instantAvailability:', not not instantAvailability) if not instantAvailability: - self.fail(self) return False availableHost = self.getAvailableHost() - self.addTorrent(availableHost) + if self.addTorrent(availableHost) is None: + return None return True @abstractmethod @@ -142,20 +144,58 @@ def addTorrent(self, host): def getInstantAvailability(self, refresh=False): if refresh or not self._instantAvailability: - torrentHash = self.getHash() - self.print('hash:', torrentHash) - - if len(torrentHash) != 40: - self.incompatibleHashSize = True - return True - - instantAvailabilityRequest = requests.get(f"{rdHost}torrents/instantAvailability/{torrentHash}?auth_token={authToken}") - instantAvailabilities = instantAvailabilityRequest.json() - self.print('instantAvailabilities:', instantAvailabilities) - instantAvailabilityHosters = next(iter(instantAvailabilities.values())) - if not instantAvailabilityHosters: return - - self._instantAvailability = next(iter(instantAvailabilityHosters.values())) + while True: + activeTorrents = self.getActiveTorrents() + if activeTorrents['limit'] - activeTorrents['nb'] > 0: + availableHost = self.getAvailableHost() + if self.addTorrent(availableHost) is None: + return + count = 1 + while True: + info = self.getInfo(refresh=True) + if count >= 5: + self.delete() + return + if "status" not in info: + return + status = info['status'] + if status == 'waiting_files_selection': + if not self.selectFiles(): + self.delete() + return + elif status == 'magnet_conversion' or status == 'queued' or status == 'compressing' or status == 'uploading': + time.sleep(1) + elif status == 'downloading': + time.sleep(5) + info = self.getInfo(refresh=True) + elif status == 'downloaded': + self.delete() + return True + elif status == 'magnet_error' or status == 'error' or status == 'dead' or status == 'virus': + discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", info) + time.sleep(1) + self.delete() + return + count += 1 + else: + print("Torrent cannot be added, too many active downloads") + print("activeTorrents:", activeTorrents) + time.sleep(10) + # torrentHash = self.getHash() + # self.print('hash:', torrentHash) + + # if len(torrentHash) != 40: + # self.incompatibleHashSize = True + # return True + + # instantAvailabilityRequest = requests.get(f"{rdHost}torrents/instantAvailability/{torrentHash}?auth_token={authToken}") + # instantAvailabilities = instantAvailabilityRequest.json() + # self.print('instantAvailabilities:', instantAvailabilities) + # if not instantAvailabilities: return + # instantAvailabilityHosters = next(iter(instantAvailabilities.values())) + # if not instantAvailabilityHosters: return + + # self._instantAvailability = next(iter(instantAvailabilityHosters.values())) return self._instantAvailability @@ -174,15 +214,57 @@ def getInfo(self, refresh=False): return self._info - def selectFiles(self): + def getActiveTorrents(self): + activeCount = requests.get(f"{rdHost}torrents/activeCount?auth_token={authToken}") + activeTorrents = activeCount.json() + + return activeTorrents + + def getAllTorrents(self): + allTorrents = [] + page = 1 + limit = 2500 + + while True: + response = requests.get(f"{rdHost}torrents?auth_token={authToken}", params={"page": page, "limit": limit}) + if response.status_code != 200: + print(f"Error: {response.status_code} - {response.text}") + break + + torrentInfo = response.json() + if not torrentInfo: + break + + allTorrents.extend(torrentInfo) + page += 1 + + return allTorrents + + def selectFiles(self, uncached=False): self._enforceId() info = self.getInfo() self.print('files:', info['files']) mediaFiles = [file for file in info['files'] if os.path.splitext(file['path'])[1] in mediaExtensions] + if info['files'] and not mediaFiles and not uncached: + self.print('no media files found --> pushing to uncached') + discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", "No media files found --> pushing to uncached") + return False + elif not info['files'] and uncached: + total_time = 0 + while info['files'] or total_time == 300: + info = self.getInfo(refresh=True) + mediaFiles = [file for file in info['files'] if os.path.splitext(file['path'])[1] in mediaExtensions] + time.sleep(10) + total_time += 10 + if not info['files']: + self.print("unable to parse magnet files") + discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", "Unable to parse magnet files") + return False if not mediaFiles: - self.print('no media files found') + self.print("no media files found --> prolly virus bruh") + discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", "No media Files Found in uncached") return False mediaFileIds = {str(file['id']) for file in mediaFiles} @@ -195,7 +277,7 @@ def selectFiles(self): if self.failIfNotCached and not self.incompatibleHashSize: targetFileIds = {largestMediaFileId} if self.onlyLargestFile else mediaFileIds - if not any(set(fileGroup.keys()) == targetFileIds for fileGroup in self._instantAvailability): + if self._instantAvailability and not any(set(fileGroup.keys()) == targetFileIds for fileGroup in self._instantAvailability): extraFilesGroup = next((fileGroup for fileGroup in self._instantAvailability if largestMediaFileId in fileGroup.keys()), None) if self.onlyLargestFile and extraFilesGroup: self.print('extra files required for cache:', extraFilesGroup) @@ -232,9 +314,16 @@ def addTorrent(self, host): addTorrentRequest = requests.put(f"{rdHost}torrents/addTorrent?host={host}&auth_token={authToken}", data=self.f) addTorrentResponse = addTorrentRequest.json() self.print('torrent info:', addTorrentResponse) - - self.id = addTorrentResponse['id'] - return self.id + return True + # API Not working for torrent so returning true + # if "id" in addTorrentResponse: + # self.id = addTorrentResponse['id'] + # return self.id + # elif addTorrentResponse["error"] == "upload_error": + # return 1 + # else: + # discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", addTorrentResponse) + # return None class Magnet(TorrentBase): @@ -251,9 +340,14 @@ def addTorrent(self, host): addMagnetResponse = addMagnetRequest.json() self.print('magnet info:', addMagnetResponse) - self.id = addMagnetResponse['id'] - - return self.id + if "id" in addMagnetResponse: + self.id = addMagnetResponse['id'] + return self.id + elif addMagnetResponse["error"] == "upload_error": + return True + else: + discordError(f"Error: {self.file.fileInfo.filenameWithoutExt}", addMagnetResponse) + return None def getPath(isRadarr, create=False): baseWatchPath = blackhole['baseWatchPath'] @@ -261,7 +355,7 @@ def getPath(isRadarr, create=False): finalPath = os.path.join(absoluteBaseWatchPath, blackhole['radarrPath'] if isRadarr else blackhole['sonarrPath']) if create: - for sub_path in ['', 'processing', 'completed']: + for sub_path in ['', 'processing', 'completed', 'uncached']: path_to_check = os.path.join(finalPath, sub_path) if not os.path.exists(path_to_check): os.makedirs(path_to_check) @@ -319,7 +413,7 @@ def print(*values: object): import signal -async def processFile(file: TorrentFileInfo, arr: Arr, isRadarr): +async def processFile(file: TorrentFileInfo, arr: Arr, isRadarr, failIfNotCached=None, lock=None): try: _print = globals()['print'] @@ -344,29 +438,71 @@ async def is_accessible(path, timeout=10): return False finally: executor.shutdown(wait=False) - + if not os.path.exists(file.fileInfo.filePathProcessing): + return with open(file.fileInfo.filePathProcessing, 'rb' if file.torrentInfo.isDotTorrentFile else 'r') as f: - def fail(torrent: TorrentBase, arr: Arr=arr): + async def fail(torrent: TorrentBase, arr: Arr=arr, uncached=False): print(f"Failing") history = arr.getHistory(blackhole['historyPageSize'])['records'] + while not history: + time.sleep(5) + print("Trying to grab items after 5 seconds") + history = arr.getHistory(blackhole['historyPageSize'])['records'] items = (item for item in history if item['data'].get('torrentInfoHash', '').casefold() == torrent.getHash().casefold() or cleanFileName(item['sourceTitle'].casefold()) == torrent.file.fileInfo.filenameWithoutExt.casefold()) - + #TODO Fix sonarr calls here whatever it does when it doesn't downloads other hits + if not items: raise Exception("No history items found to cancel") for item in items: - # TODO: See if we can fail without blacklisting as cached items constantly changes arr.failHistoryItem(item['id']) + arr.removeFailedItem(item['id']) ## Removing from blocklist + + if uncached: + parsedTorrent = parse(torrent.file.fileInfo.filename) + torrentName = parsedTorrent.parsed_title + # itemId = str(first_item.get('seriesId', first_item.get('movieId'))) + path = os.path.join(getPath(isRadarr), 'uncached', torrentName, torrent.file.fileInfo.filename) + if not isRadarr: + if len(parsedTorrent.seasons) == 1 and len(parsedTorrent.episodes) == 1: # and first_item["releaseType"] != "SeasonPack" + episodeNum = str(parsedTorrent.episodes[0]) ## Fallback? data --> releaseType --> SeasonPack + path = os.path.join(getPath(isRadarr), 'uncached', torrentName, episodeNum, torrent.file.fileInfo.filename) + else: + seasonPack = 'seasonpack' + if not hasattr(parsedTorrent, 'seasons'): + print("Removing coz seasons not found after parsing.") + if os.path.exists(file.fileInfo.filePathProcessing): + os.remove(file.fileInfo.filePathProcessing) + if os.path.exists(file.fileInfo.filePath): + os.remove(file.fileInfo.filePath) + ## This won't ever happen --> unreachable + return + #else: + seasons = [str(pt) for pt in parsedTorrent.seasons] + seasons = "-".join(seasons) + path = os.path.join(getPath(isRadarr), 'uncached', torrentName, seasonPack, seasons, torrent.file.fileInfo.filename) + + if not os.path.exists(path): + os.renames(torrent.file.fileInfo.filePathProcessing, path) + elif os.path.exists(file.fileInfo.filePathProcessing): + os.remove(file.fileInfo.filePathProcessing) + print("Pushed to downloader") + await downloader(torrent, file, arr, path, shared_dict, lock) print(f"Failed") + + failIfNotCached = blackhole['failIfNotCached'] if failIfNotCached is None else failIfNotCached; onlyLargestFile = isRadarr or bool(re.search(r'S[\d]{2}E[\d]{2}', file.fileInfo.filename)) if file.torrentInfo.isDotTorrentFile: - torrent = Torrent(f, file, fail, blackhole['failIfNotCached'], onlyLargestFile) + torrent = Torrent(f, file, failIfNotCached, onlyLargestFile) else: - torrent = Magnet(f, file, fail, blackhole['failIfNotCached'], onlyLargestFile) + torrent = Magnet(f, file, failIfNotCached, onlyLargestFile) - if torrent.submitTorrent(): + failed = torrent.submitTorrent() + if failed is False: + historyItems = await fail(torrent, uncached=True) + elif failed is True: count = 0 while True: count += 1 @@ -378,7 +514,7 @@ def fail(torrent: TorrentBase, arr: Arr=arr): if status == 'waiting_files_selection': if not torrent.selectFiles(): torrent.delete() - fail(torrent) + await fail(torrent) break elif status == 'magnet_conversion' or status == 'queued' or status == 'downloading' or status == 'compressing' or status == 'uploading': # Send progress to arr @@ -387,11 +523,11 @@ def fail(torrent: TorrentBase, arr: Arr=arr): if torrent.incompatibleHashSize and torrent.failIfNotCached: print("Non-cached incompatible hash sized torrent") torrent.delete() - fail(torrent) + await fail(torrent, uncached=True) break await asyncio.sleep(1) elif status == 'magnet_error' or status == 'error' or status == 'dead' or status == 'virus': - fail(torrent) + await fail(torrent) break elif status == 'downloaded': existsCount = 0 @@ -404,7 +540,7 @@ def fail(torrent: TorrentBase, arr: Arr=arr): folderPathMountOriginalFilenameTorrent = os.path.join(blackhole['rdMountTorrentsPath'], originalFilename) folderPathMountOriginalFilenameWithoutExtTorrent = os.path.join(blackhole['rdMountTorrentsPath'], os.path.splitext(originalFilename)[0]) - while existsCount <= blackhole['waitForTorrentTimeout']: + while True: existsCount += 1 if os.path.exists(folderPathMountFilenameTorrent) and os.listdir(folderPathMountFilenameTorrent): @@ -427,6 +563,7 @@ def fail(torrent: TorrentBase, arr: Arr=arr): for root, dirs, files in os.walk(folderPathMountTorrent): relRoot = os.path.relpath(root, folderPathMountTorrent) for filename in files: + source_link = os.path.join(root, filename) # Check if the file is accessible # if not await is_accessible(os.path.join(root, filename)): # print(f"Timeout reached when accessing file: {filename}") @@ -446,18 +583,24 @@ def fail(torrent: TorrentBase, arr: Arr=arr): seasonFolderPathCompleted = re.sub(multiSeasonRegex2, season, seasonFolderPathCompleted) os.makedirs(os.path.join(seasonFolderPathCompleted, relRoot), exist_ok=True) - os.symlink(os.path.join(root, filename), os.path.join(seasonFolderPathCompleted, relRoot, filename)) - print('Season Recursive:', f"{os.path.join(seasonFolderPathCompleted, relRoot, filename)} -> {os.path.join(root, filename)}") + target_link = os.path.join(seasonFolderPathCompleted, relRoot, filename) + if os.path.islink(target_link): + os.remove(target_link) + os.symlink(source_link, target_link) + print('Season Recursive:', f"{target_link} -> {source_link}") # refreshEndpoint = f"{plex['serverHost']}/library/sections/{plex['serverMovieLibraryId'] if isRadarr else plex['serverTvShowLibraryId']}/refresh?path={urllib.parse.quote_plus(os.path.join(seasonFolderPathCompleted, relRoot))}&X-Plex-Token={plex['serverApiKey']}" # cancelRefreshRequest = requests.delete(refreshEndpoint, headers={'Accept': 'application/json'}) # refreshRequest = requests.get(refreshEndpoint, headers={'Accept': 'application/json'}) continue - + + target_link = os.path.join(file.fileInfo.folderPathCompleted, relRoot, filename) os.makedirs(os.path.join(file.fileInfo.folderPathCompleted, relRoot), exist_ok=True) - os.symlink(os.path.join(root, filename), os.path.join(file.fileInfo.folderPathCompleted, relRoot, filename)) - print('Recursive:', f"{os.path.join(file.fileInfo.folderPathCompleted, relRoot, filename)} -> {os.path.join(root, filename)}") + if os.path.islink(target_link): + os.remove(target_link) + os.symlink(source_link, target_link) + print('Recursive:', f"{target_link} -> {source_link}") # refreshEndpoint = f"{plex['serverHost']}/library/sections/{plex['serverMovieLibraryId'] if isRadarr else plex['serverTvShowLibraryId']}/refresh?path={urllib.parse.quote_plus(os.path.join(file.fileInfo.folderPathCompleted, relRoot))}&X-Plex-Token={plex['serverApiKey']}" # cancelRefreshRequest = requests.delete(refreshEndpoint, headers={'Accept': 'application/json'}) # refreshRequest = requests.get(refreshEndpoint, headers={'Accept': 'application/json'}) @@ -473,9 +616,10 @@ def fail(torrent: TorrentBase, arr: Arr=arr): # await asyncio.get_running_loop().run_in_executor(None, copyFiles, file, folderPathMountTorrent, arr) break - if existsCount == blackhole['rdMountRefreshSeconds'] + 1: + if existsCount >= blackhole['rdMountRefreshSeconds'] + 1: print(f"Torrent folder not found in filesystem: {file.fileInfo.filenameWithoutExt}") discordError("Torrent folder not found in filesystem", file.fileInfo.filenameWithoutExt) + return False await asyncio.sleep(1) break @@ -485,11 +629,13 @@ def fail(torrent: TorrentBase, arr: Arr=arr): print('infoCount > 20') discordError(f"{file.fileInfo.filenameWithoutExt} info attempt count > 20", status) elif count == blackhole['waitForTorrentTimeout']: - print('infoCount == 60 - Failing') - fail(torrent) + print(f"infoCount == {blackhole['waitForTorrentTimeout']} - Failing") + await fail(torrent) break - - os.remove(file.fileInfo.filePathProcessing) + if os.path.exists(file.fileInfo.filePathProcessing): + os.remove(file.fileInfo.filePathProcessing) + if os.path.exists(file.fileInfo.filePath): + os.remove(file.fileInfo.filePath) except: e = traceback.format_exc() @@ -500,10 +646,10 @@ def fail(torrent: TorrentBase, arr: Arr=arr): def getFiles(isRadarr): print('getFiles') - files = (TorrentFileInfo(filename, isRadarr) for filename in os.listdir(getPath(isRadarr)) if filename not in ['processing', 'completed']) + files = (TorrentFileInfo(filename, isRadarr) for filename in os.listdir(getPath(isRadarr)) if filename not in ['processing', 'completed', 'uncached']) return [file for file in files if file.torrentInfo.isTorrentOrMagnet] -async def on_created(isRadarr): +async def on_created(isRadarr, lock): print("Enter 'on_created'") try: print('radarr/sonarr:', 'radarr' if isRadarr else 'sonarr') @@ -522,7 +668,7 @@ async def on_created(isRadarr): if files: for file in files: os.renames(file.fileInfo.filePath, file.fileInfo.filePathProcessing) - futures.append(asyncio.gather(*(processFile(file, arr, isRadarr) for file in files))) + futures.append(asyncio.gather(*(processFile(file, arr, isRadarr, lock=lock) for file in files))) elif firstGo: print('No torrent files found') firstGo = False @@ -538,8 +684,54 @@ async def on_created(isRadarr): discordError(f"Error processing", e) print("Exit 'on_created'") -def start(isRadarr): - asyncio.run(on_created(isRadarr)) +def start(isRadarr, lock): + asyncio.run(on_created(isRadarr, lock)) + +def removeDir(dirPath): + files = os.listdir(dirPath) + for file in files: + os.remove(os.path.join(dirPath, file)) + os.rmdir(dirPath) + +async def resumeUncached(lock): + print('Processing uncached') + try: + radarr = Radarr() + sonarr = Sonarr() + + paths = [(os.path.join(getPath(isRadarr=True), 'uncached'), radarr, True), (os.path.join(getPath(isRadarr=False), 'uncached'), sonarr, False)] + + futures: list[asyncio.Future] = [] + processed_files = set() + + for path, arr, isRadarr in paths: + for root, dirs, _ in os.walk(path): + if not dirs and os.path.exists(root): + if not os.listdir(root): + os.removedirs(root) + continue + print(os.listdir(root)) + files = (TorrentFileInfo(filename, isRadarr, os.path.join(root, filename)) for filename in os.listdir(root)) + files = [file for file in files if file.torrentInfo.isTorrentOrMagnet] + for file in files: + if file.fileInfo.filename not in processed_files: + shutil.copy(file.fileInfo.filePath, file.fileInfo.filePathProcessing) + processed_files.add(file.fileInfo.filename) + futures.append(asyncio.gather(processFile(file, arr, isRadarr, lock=lock))) # create_task + else: + os.remove(file.fileInfo.filePath) + + + await asyncio.gather(*futures) + except: + e = traceback.format_exc() + + print(f"Error processing uncached") + print(e) + + discordError(f"Error processing uncached", e) + print("Finished processing uncached") + if __name__ == "__main__": start(isRadarr=sys.argv[1] == 'radarr') diff --git a/blackhole_downloader.py b/blackhole_downloader.py new file mode 100644 index 0000000..405e730 --- /dev/null +++ b/blackhole_downloader.py @@ -0,0 +1,262 @@ +import asyncio +import os +import glob +from shared.discord import discordError, discordUpdate, discordStatusUpdate +from shared.shared import blackhole +import re +webhook = None + +async def downloader(torrent, file, arr, torrentFile, shared_dict, lock): + from blackhole import refreshArr + availableHost = torrent.getAvailableHost() + activeTorrents = torrent.getActiveTorrents() + global webhook + + while True: + if activeTorrents['limit'] - activeTorrents['nb'] -2 > 0: + allTorrents = torrent.getAllTorrents() + torrentExists = False + for at in allTorrents: + if at["hash"] == torrent.getHash().lower(): + torrent.id = at["id"] + torrentName = at["filename"] + if at["status"] == "downloaded": + print("File already exists") + elif at["status"] == "downloading": + print("File downloading") + torrentExists = True + lock.acquire() + if torrentName in shared_dict: + lock.release() + remove_file(torrentFile, lock, shared_dict, "", current=True) + return + lock.release() + break + if not torrentExists: + while True: + folder_path = os.path.dirname(torrentFile) + all_files = glob.glob(os.path.join(folder_path, '*')) + all_files.sort(key=os.path.getctime) + top_4_files = all_files[:4] + if torrentFile in top_4_files: + torrent.getInstantAvailability() + if torrent.addTorrent(availableHost) is None: + remove_file(torrentFile, lock, shared_dict, "", current=True) + return + info = torrent.getInfo(refresh=True) + torrentName = info['filename'] + lock.acquire() + try: + if shared_dict: + shared_dict.update({torrentName : "added"}) + webhook = discordStatusUpdate(shared_dict, webhook, edit=True) + else: + shared_dict.update({torrentName : "added"}) + webhook = discordStatusUpdate(shared_dict, webhook) + finally: + lock.release() + break + if not os.path.exists(torrentFile): + remove_file(torrentFile, lock, shared_dict, torrentName, current=True) + return + await asyncio.sleep(60) + break + await asyncio.sleep(60) + + count = 0 + progress=0 + waitForProgress=0 + while True: + count += 1 + info = torrent.getInfo(refresh=True) + if "status" not in info: + remove_file(torrentFile, lock, shared_dict, torrentName, current=True) + return + status = info['status'] + if torrentName != info['filename']: + lock.acquire() + try: + if torrentName in shared_dict: + del shared_dict[torrentName] + if shared_dict and webhook and webhook.id: + webhook = discordStatusUpdate(shared_dict, webhook, edit=True) + elif not shared_dict and webhook and webhook.id: + webhook = discordStatusUpdate(shared_dict, webhook, delete=True) + finally: + lock.release() + torrentName = info['filename'] + + print('status:', status) + if not os.path.exists(torrentFile): + torrent.delete() + break + if status == 'waiting_files_selection': + if not torrent.selectFiles(uncached=True): + torrent.delete() + break + elif status == 'magnet_conversion' or status == 'queued' or status == 'compressing' or status == 'uploading': + await asyncio.sleep(1) + elif status == 'downloading': + if progress != info['progress']: + progress = info['progress'] + waitForProgress = 0 + elif waitForProgress >= blackhole['waitForProgressChange']: + torrent.delete() + remove_file(torrentFile, lock, shared_dict, torrentName, current=True) + return + else: + waitForProgress += 1 + print(progress) + lock.acquire() + try: + if shared_dict: + shared_dict.update({torrentName : f"Downloading {progress}%"}) + webhook = discordStatusUpdate(shared_dict, webhook, edit=True) + else: + shared_dict.update({torrentName : f"Downloading {progress}%"}) + webhook = discordStatusUpdate(shared_dict, webhook) + finally: + lock.release() + if torrent.incompatibleHashSize and torrent.failIfNotCached: + print("Non-cached incompatible hash sized torrent") + torrent.delete() + break + await asyncio.sleep(60) + elif status == 'magnet_error' or status == 'error' or status == 'dead' or status == 'virus': + discordError(f"Error: {file.fileInfo.filenameWithoutExt}", info) + torrent.delete() + break + elif status == 'downloaded': + existsCount = 0 + print('Waiting for folders to refresh...') + + filename = info.get('filename') + originalFilename = info.get('original_filename') + + folderPathMountFilenameTorrent = os.path.join(blackhole['rdMountTorrentsPath'], filename) + folderPathMountOriginalFilenameTorrent = os.path.join(blackhole['rdMountTorrentsPath'], originalFilename) + folderPathMountOriginalFilenameWithoutExtTorrent = os.path.join(blackhole['rdMountTorrentsPath'], os.path.splitext(originalFilename)[0]) + + while True: + existsCount += 1 + + if os.path.exists(folderPathMountFilenameTorrent) and os.listdir(folderPathMountFilenameTorrent): + folderPathMountTorrent = folderPathMountFilenameTorrent + elif os.path.exists(folderPathMountOriginalFilenameTorrent) and os.listdir(folderPathMountOriginalFilenameTorrent): + folderPathMountTorrent = folderPathMountOriginalFilenameTorrent + elif (originalFilename.endswith(('.mkv', '.mp4')) and + os.path.exists(folderPathMountOriginalFilenameWithoutExtTorrent) and os.listdir(folderPathMountOriginalFilenameWithoutExtTorrent)): + folderPathMountTorrent = folderPathMountOriginalFilenameWithoutExtTorrent + else: + folderPathMountTorrent = None + + if folderPathMountTorrent: + multiSeasonRegex1 = r'(?<=[\W_][Ss]eason[\W_])[\d][\W_][\d]{1,2}(?=[\W_])' + multiSeasonRegex2 = r'(?<=[\W_][Ss])[\d]{2}[\W_][Ss]?[\d]{2}(?=[\W_])' + multiSeasonRegexCombined = f'{multiSeasonRegex1}|{multiSeasonRegex2}' + + multiSeasonMatch = re.search(multiSeasonRegexCombined, file.fileInfo.filenameWithoutExt) + + for root, dirs, files in os.walk(folderPathMountTorrent): + relRoot = os.path.relpath(root, folderPathMountTorrent) + for filename in files: + # Check if the file is accessible + # if not await is_accessible(os.path.join(root, filename)): + # print(f"Timeout reached when accessing file: {filename}") + # discordError(f"Timeout reached when accessing file", filename) + # Uncomment the following line to fail the entire torrent if the timeout on any of its files are reached + # fail(torrent) + # return + + if multiSeasonMatch: + seasonMatch = re.search(r'S([\d]{2})E[\d]{2}', filename) + + if seasonMatch: + season = seasonMatch.group(1) + seasonShort = season[1:] if season[0] == '0' else season + + seasonFolderPathCompleted = re.sub(multiSeasonRegex1, seasonShort, file.fileInfo.folderPathCompleted) + seasonFolderPathCompleted = re.sub(multiSeasonRegex2, season, seasonFolderPathCompleted) + + os.makedirs(os.path.join(seasonFolderPathCompleted, relRoot), exist_ok=True) + os.symlink(os.path.join(root, filename), os.path.join(seasonFolderPathCompleted, relRoot, filename)) + print('Season Recursive:', f"{os.path.join(seasonFolderPathCompleted, relRoot, filename)} -> {os.path.join(root, filename)}") + continue + + + os.makedirs(os.path.join(file.fileInfo.folderPathCompleted, relRoot), exist_ok=True) + os.symlink(os.path.join(root, filename), os.path.join(file.fileInfo.folderPathCompleted, relRoot, filename)) + print('Recursive:', f"{os.path.join(file.fileInfo.folderPathCompleted, relRoot, filename)} -> {os.path.join(root, filename)}") + + print('Refreshed') + discordUpdate(f"Sucessfully processed {file.fileInfo.filenameWithoutExt}", f"Now available for immediate consumption! existsCount: {existsCount}") + + await refreshArr(arr) + break + + if existsCount >= blackhole['rdMountRefreshSeconds'] + 1: + print(f"Torrent folder not found in filesystem: {file.fileInfo.filenameWithoutExt}") + discordError("Torrent folder not found in filesystem", file.fileInfo.filenameWithoutExt) + return False + + await asyncio.sleep(1) + break + + if torrent.failIfNotCached: + if count == 21 and status != "downloading": + print('infoCount > 20') + discordError(f"{file.fileInfo.filenameWithoutExt} info attempt count > 20", status) + elif count == blackhole['waitForTorrentTimeout']: + print(f"infoCount == {blackhole['waitForTorrentTimeout']} - Failing") + torrent.delete() + break + remove_file(torrentFile, lock, shared_dict, torrentName) + +def remove_file(torrentFile, lock, shared_dict, torrentName, current=False): + if os.path.exists(torrentFile): + folder_path = os.path.dirname(torrentFile) + all_files = glob.glob(os.path.join(folder_path, '*')) + all_files.sort(key=os.path.getctime) + try: + file_index = all_files.index(torrentFile) + except ValueError: + print("The file does not exist in the folder.") + file_index = -1 + + if file_index != -1: + if current: + files_to_remove = [all_files[file_index]] + else: + files_to_remove = all_files[file_index:] + for file in files_to_remove: + try: + os.remove(file) + print(f"Removed: {file}") + except Exception as e: + print(f"Error removing {file}: {e}") + + remaining_files = os.listdir(folder_path) + if not remaining_files: + try: + os.rmdir(folder_path) + print(f"Removed folder: {folder_path}") + except Exception as e: + print(f"Error removing folder {folder_path}: {e}") + else: + print(f"Folder is not empty: {folder_path}") + else: + print("The specified file is not in the folder.") + else: + print("The file does not exist.") + + lock.acquire() + try: + global webhook + if torrentName in shared_dict: + del shared_dict[torrentName] + if shared_dict and webhook and webhook.id: + webhook = discordStatusUpdate(shared_dict, webhook, edit=True) + elif not shared_dict and webhook and webhook.id: + webhook = discordStatusUpdate(shared_dict, webhook, delete=True) + finally: + lock.release() \ No newline at end of file diff --git a/blackhole_watcher.py b/blackhole_watcher.py index 0a9ad35..e807e86 100644 --- a/blackhole_watcher.py +++ b/blackhole_watcher.py @@ -1,28 +1,36 @@ +import asyncio from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler -from blackhole import start, getPath +from blackhole import start, resumeUncached, getPath +import threading class BlackholeHandler(FileSystemEventHandler): - def __init__(self, is_radarr): + def __init__(self, is_radarr, lock): super().__init__() self.is_processing = False self.is_radarr = is_radarr self.path_name = getPath(is_radarr, create=True) + self.lock = lock def on_created(self, event): if not self.is_processing and not event.is_directory and event.src_path.lower().endswith((".torrent", ".magnet")): self.is_processing = True try: - start(self.is_radarr) + start(self.is_radarr, self.lock) finally: self.is_processing = False +async def scheduleResumeUncached(lock): + await resumeUncached(lock) + + if __name__ == "__main__": print("Watching blackhole") + lock = threading.Lock() - radarr_handler = BlackholeHandler(is_radarr=True) - sonarr_handler = BlackholeHandler(is_radarr=False) + radarr_handler = BlackholeHandler(is_radarr=True, lock=lock) + sonarr_handler = BlackholeHandler(is_radarr=False, lock=lock) radarr_observer = Observer() radarr_observer.schedule(radarr_handler, radarr_handler.path_name) @@ -33,6 +41,7 @@ def on_created(self, event): try: radarr_observer.start() sonarr_observer.start() + asyncio.run(scheduleResumeUncached(lock)) except KeyboardInterrupt: radarr_observer.stop() sonarr_observer.stop() diff --git a/repair.py b/repair.py index 9350592..81dfd4c 100644 --- a/repair.py +++ b/repair.py @@ -1,120 +1,168 @@ import os import argparse import time +import traceback +from shared.debrid import validateRealdebridMountTorrentsPath, validateTorboxMountTorrentsPath from shared.arr import Sonarr, Radarr -from shared.discord import discordUpdate -from shared.shared import repair, intersperse +from shared.discord import discordUpdate, discordError +from shared.shared import repair, realdebrid, torbox, intersperse, ensureTuple +from datetime import datetime -def parse_interval(interval_str): +def parseInterval(intervalStr): """Parse a smart interval string (e.g., '1w2d3h4m5s') into seconds.""" - if not interval_str: + if not intervalStr: return 0 - total_seconds = 0 - time_dict = {'w': 604800, 'd': 86400, 'h': 3600, 'm': 60, 's': 1} - current_number = '' - for char in interval_str: + totalSeconds = 0 + timeDict = {'w': 604800, 'd': 86400, 'h': 3600, 'm': 60, 's': 1} + currentNumber = '' + for char in intervalStr: if char.isdigit(): - current_number += char - elif char in time_dict and current_number: - total_seconds += int(current_number) * time_dict[char] - current_number = '' - return total_seconds - + currentNumber += char + elif char in timeDict and currentNumber: + totalSeconds += int(currentNumber) * timeDict[char] + currentNumber = '' + return totalSeconds # Parse arguments for dry run, no confirm options, and optional intervals -parser = argparse.ArgumentParser(description='Repair broken symlinks and manage media files.') +parser = argparse.ArgumentParser(description='Repair broken symlinks or missing files.') parser.add_argument('--dry-run', action='store_true', help='Perform a dry run without making any changes.') parser.add_argument('--no-confirm', action='store_true', help='Execute without confirmation prompts.') parser.add_argument('--repair-interval', type=str, default=repair['repairInterval'], help='Optional interval in smart format (e.g. 1h2m3s) to wait between repairing each media file.') parser.add_argument('--run-interval', type=str, default=repair['runInterval'], help='Optional interval in smart format (e.g. 1w2d3h4m5s) to run the repair process.') +parser.add_argument('--mode', type=str, choices=['symlink', 'file'], default='symlink', help='Choose repair mode: `symlink` or `file`. `symlink` to repair broken symlinks and `file` to repair missing files.') +parser.add_argument('--season-packs', action='store_true', help='Upgrade to season-packs when a non-season-pack is found. Only applicable in symlink mode.') +parser.add_argument('--include-unmonitored', action='store_true', help='Include unmonitored media in the repair process') args = parser.parse_args() +_print = print + +def print(*values: object): + _print(f"[{datetime.now()}] [{args.mode}]", *values) + if not args.repair_interval and not args.run_interval: print("Running repair once") else: print(f"Running repair{' once every ' + args.run_interval if args.run_interval else ''}{', and waiting ' + args.repair_interval + ' between each repair.' if args.repair_interval else '.'}") try: - repair_interval_seconds = parse_interval(args.repair_interval) + repairIntervalSeconds = parseInterval(args.repair_interval) except Exception as e: print(f"Invalid interval format for repair interval: {args.repair_interval}") exit(1) try: - run_interval_seconds = parse_interval(args.run_interval) + runIntervalSeconds = parseInterval(args.run_interval) except Exception as e: print(f"Invalid interval format for run interval: {args.run_interval}") exit(1) def main(): + if unsafe(): + print("One or both debrid services are not working properly. Skipping repair.") + discordError(f"[{args.mode}] One or both debrid services are not working properly. Skipping repair.") + return + print("Collecting media...") sonarr = Sonarr() radarr = Radarr() - sonarrMedia = [(sonarr, media) for media in sonarr.getAll() if media.anyMonitoredChildren] - radarrMedia = [(radarr, media) for media in radarr.getAll() if media.anyMonitoredChildren] + sonarrMedia = [(sonarr, media) for media in sonarr.getAll() if args.include_unmonitored or media.anyMonitoredChildren] + radarrMedia = [(radarr, media) for media in radarr.getAll() if args.include_unmonitored or media.anyMonitoredChildren] print("Finished collecting media.") for arr, media in intersperse(sonarrMedia, radarrMedia): - files = {} - for file in arr.getFiles(media): - if file.parentId in files: - files[file.parentId].append(file) - else: - files[file.parentId] = [file] - for childId in media.monitoredChildrenIds: - realPaths = [] - brokenSymlinks = [] - - childFiles = files.get(childId, []) - for childFile in childFiles: - - fullPath = childFile.path - realPath = os.path.realpath(fullPath) - realPaths.append(realPath) - - - if os.path.islink(fullPath) and not os.path.exists(realPath): - brokenSymlinks.append(realPath) - - # If not full season just repair individual episodes? - if brokenSymlinks: - print("Title:", media.title) - print("Movie ID/Season Number:", childId) - print("Broken symlinks:") - [print(brokenSymlink) for brokenSymlink in brokenSymlinks] - print() - if args.dry_run or args.no_confirm or input("Do you want to delete and re-grab? (y/n): ").lower() == 'y': - discordUpdate(f"Repairing... {media.title} - {childId}") - print("Deleting files:") - [print(childFile.path) for childFile in childFiles] - if not args.dry_run: - results = arr.deleteFiles(childFiles) - print("Remonitoring") - media = arr.get(media.id) - media.setChildMonitored(childId, False) - arr.put(media) - media.setChildMonitored(childId, True) - arr.put(media) - print("Searching for new files") - results = arr.automaticSearch(media, childId) - print(results) - - if repair_interval_seconds > 0: - time.sleep(repair_interval_seconds) - else: - print("Skipping") - print() - else: - parentFolders = set(os.path.dirname(path) for path in realPaths) - if childId in media.fullyAvailableChildrenIds and len(parentFolders) > 1: + try: + if unsafe(): + print("One or both debrid services are not working properly. Skipping repair.") + discordError(f"[{args.mode}] One or both debrid services are not working properly. Skipping repair.") + return + + getItems = lambda media, childId: arr.getFiles(media=media, childId=childId) if args.mode == 'symlink' else arr.getHistory(media=media, childId=childId, includeGrandchildDetails=True) + childrenIds = media.childrenIds if args.include_unmonitored else media.monitoredChildrenIds + + for childId in childrenIds: + brokenItems = [] + childItems = list(getItems(media=media, childId=childId)) + + for item in childItems: + if args.mode == 'symlink': + fullPath = item.path + if os.path.islink(fullPath): + destinationPath = os.readlink(fullPath) + if ((realdebrid['enabled'] and destinationPath.startswith(realdebrid['mountTorrentsPath']) and not os.path.exists(destinationPath)) or + (torbox['enabled'] and destinationPath.startswith(torbox['mountTorrentsPath']) and not os.path.exists(os.path.realpath(fullPath)))): + brokenItems.append(os.path.realpath(fullPath)) + else: # file mode + if item.reason == 'MissingFromDisk' and item.parentId not in media.fullyAvailableChildrenIds: + brokenItems.append(item.sourceTitle) + + if brokenItems: print("Title:", media.title) print("Movie ID/Season Number:", childId) - print("Inconsistent folders:") - [print(parentFolder) for parentFolder in parentFolders] + print("Broken items:") + [print(item) for item in brokenItems] print() + if args.dry_run or args.no_confirm or input("Do you want to delete and re-grab? (y/n): ").lower() == 'y': + if not args.dry_run: + discordUpdate(f"[{args.mode}] Repairing {media.title}: {childId}") + if args.mode == 'symlink': + print("Deleting files:") + [print(item.path) for item in childItems] + results = arr.deleteFiles(childItems) + print("Re-monitoring") + media = arr.get(media.id) + media.setChildMonitored(childId, False) + arr.put(media) + media.setChildMonitored(childId, True) + arr.put(media) + print("Searching for new files") + results = arr.automaticSearch(media, childId) + print(results) + + if repairIntervalSeconds > 0: + time.sleep(repairIntervalSeconds) + else: + print("Skipping") + print() + elif args.mode == 'symlink': + realPaths = [os.path.realpath(item.path) for item in childItems] + parentFolders = set(os.path.dirname(path) for path in realPaths) + if childId in media.fullyAvailableChildrenIds and len(parentFolders) > 1: + print("Title:", media.title) + print("Movie ID/Season Number:", childId) + print("Non-season-pack folders:") + [print(parentFolder) for parentFolder in parentFolders] + print() + if args.season_packs: + print("Searching for season-pack") + results = arr.automaticSearch(media, childId) + print(results) + + if repairIntervalSeconds > 0: + time.sleep(repairIntervalSeconds) + + except Exception: + e = traceback.format_exc() + + print(f"An error occurred while processing {media.title}: {e}") + discordError(f"[{args.mode}] An error occurred while processing {media.title}", e) -if run_interval_seconds > 0: + print("Repair complete") + discordUpdate(f"[{args.mode}] Repair complete") + +def unsafe(): + return (args.mode == 'symlink' and + ((realdebrid['enabled'] and not ensureTuple(validateRealdebridMountTorrentsPath())[0]) or + (torbox['enabled'] and not ensureTuple(validateTorboxMountTorrentsPath())[0]))) + +if runIntervalSeconds > 0: while True: - main() - time.sleep(run_interval_seconds) + try: + main() + time.sleep(runIntervalSeconds) + except Exception: + e = traceback.format_exc() + + print(f"An error occurred in the main loop: {e}") + discordError(f"[{args.mode}] An error occurred in the main loop", e) + time.sleep(runIntervalSeconds) # Still wait before retrying else: main() diff --git a/requirements.txt b/requirements.txt index 1b5b86f..b9d4797 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ requests==2.28.1 #all bencode3==0.1.0 #blackhole watchdog==4.0.0 #blackhole +rank-torrent-name #blackhole flask==3.0.2 #plex_request Flask-Caching==2.1.0 #plex_request diff --git a/shared/arr.py b/shared/arr.py index b17b9cf..d72048f 100644 --- a/shared/arr.py +++ b/shared/arr.py @@ -40,6 +40,7 @@ def validateRadarrApiKey(): return False return True + requiredEnvs = { 'Sonarr host': (sonarr['host'], validateSonarrHost), 'Sonarr API key': (sonarr['apiKey'], validateSonarrApiKey, True), @@ -67,6 +68,10 @@ def id(self): def title(self): return self.json['title'] + @property + def hasFile(self): + return self.json.get('hasFile', False) + @property def path(self): return self.json['path'] @@ -132,6 +137,22 @@ def setChildMonitored(self, childId: int, monitored: bool): season['monitored'] = monitored break +class Episode(Media): + @property + def size(self): + return self.json['sizeOnDisk'] + + @property + def monitoredChildrenIds(self): + return [self.id] if self.json['monitored'] else [] + + @property + def fullyAvailableChildrenIds(self): + return [self.id] if self.json['hasFile'] else [] + + def setChildMonitored(self, childId: int, monitored: bool): + self.json["monitored"] = monitored + class MediaFile(ABC): def __init__(self, json) -> None: super().__init__() @@ -169,19 +190,25 @@ def parentId(self): return self.json['movieId'] class Arr(ABC): - def __init__(self, host: str, apiKey: str, endpoint: str, fileEndpoint: str, childIdName: str, childName: str, constructor: Type[Media], fileConstructor: Type[MediaFile]) -> None: + def __init__(self, host: str, apiKey: str, endpoint: str, fileEndpoint: str, childIdName: str, childName: str, grandchildEndpoint: str, constructor: Type[Media], grandchildConstructor:Type[Media], fileConstructor: Type[MediaFile]) -> None: self.host = host self.apiKey = apiKey self.endpoint = endpoint self.fileEndpoint = fileEndpoint self.childIdName = childIdName self.childName = childName + self.grandchildEndpoint = grandchildEndpoint self.constructor = constructor + self.grandchildConstructor = grandchildConstructor self.fileConstructor = fileConstructor def get(self, id: int): get = requests.get(f"{self.host}/api/v3/{self.endpoint}/{id}?apiKey={self.apiKey}") return self.constructor(get.json()) + + def getGrandchild(self, id: int): + get = requests.get(f"{self.host}/api/v3/{self.grandchildEndpoint}/{id}?apiKey={self.apiKey}") + return self.grandchildConstructor(get.json()) def getAll(self): get = requests.get(f"{self.host}/api/v3/{self.endpoint}?apiKey={self.apiKey}") @@ -208,6 +235,12 @@ def getHistory(self, pageSize: int): def failHistoryItem(self, historyId: int): failRequest = requests.post(f"{self.host}/api/v3/history/failed/{historyId}?apiKey={self.apiKey}") + + def removeFailedItem(self, itemId: int): + removeRequest = requests.delete(f"{self.host}/api/v3/blocklist/{itemId}?apiKey={self.apiKey}") + + def clearBlocklist(self): + commandRequest = requests.post(f"{self.host}/api/v3/command?apiKey={self.apiKey}", json={'name': 'ClearBlocklist'}, headers={'Content-Type': 'application/json'}) def refreshMonitoredDownloads(self): commandRequest = requests.post(f"{self.host}/api/v3/command?apiKey={self.apiKey}", json={'name': 'RefreshMonitoredDownloads'}, headers={'Content-Type': 'application/json'}) @@ -225,6 +258,7 @@ def automaticSearch(self, media: Media, childId: int): def _automaticSearchJson(self, media: Media, childId: int): pass + class Sonarr(Arr): host = sonarr['host'] apiKey = sonarr['apiKey'] @@ -232,9 +266,10 @@ class Sonarr(Arr): fileEndpoint = 'episodefile' childIdName = 'seasonNumber' childName = 'Season' + grandchildEndpoint = 'episode' def __init__(self) -> None: - super().__init__(Sonarr.host, Sonarr.apiKey, Sonarr.endpoint, Sonarr.fileEndpoint, Sonarr.childIdName, Sonarr.childName, Show, EpisodeFile) + super().__init__(Sonarr.host, Sonarr.apiKey, Sonarr.endpoint, Sonarr.fileEndpoint, Sonarr.childIdName, Sonarr.childName, Sonarr.grandchildEndpoint, Show, Episode, EpisodeFile) def _automaticSearchJson(self, media: Media, childId: int): return {"name": f"{self.childName}Search", f"{self.endpoint}Id": media.id, self.childIdName: childId} @@ -246,9 +281,11 @@ class Radarr(Arr): fileEndpoint = 'moviefile' childIdName = None childName = 'Movies' + grandchildEndpoint = endpoint def __init__(self) -> None: - super().__init__(Radarr.host, Radarr.apiKey, Radarr.endpoint, Radarr.fileEndpoint, None, Radarr.childName, Movie, MovieFile) + super().__init__(Radarr.host, Radarr.apiKey, Radarr.endpoint, Radarr.fileEndpoint, Radarr.childIdName, Radarr.childName, Radarr.grandchildEndpoint, Movie, Movie, MovieFile) def _automaticSearchJson(self, media: Media, childId: int): return {"name": f"{self.childName}Search", f"{self.endpoint}Ids": [media.id]} + diff --git a/shared/debrid.py b/shared/debrid.py new file mode 100644 index 0000000..d095abb --- /dev/null +++ b/shared/debrid.py @@ -0,0 +1,499 @@ +import asyncio +import os +import re +import hashlib +import requests +from abc import ABC, abstractmethod +from urllib.parse import urljoin +from datetime import datetime +from shared.discord import discordUpdate +from shared.requests import retryRequest +from shared.shared import realdebrid, torbox, mediaExtensions, checkRequiredEnvs + +def validateDebridEnabled(): + if not realdebrid['enabled'] and not torbox['enabled']: + return False, "At least one of RealDebrid or Torbox must be enabled." + return True + +def validateRealdebridHost(): + url = urljoin(realdebrid['host'], "time") + try: + response = requests.get(url) + return response.status_code == 200 + except Exception as e: + return False + +def validateRealdebridApiKey(): + url = urljoin(realdebrid['host'], "user") + headers = {'Authorization': f'Bearer {realdebrid["apiKey"]}'} + try: + response = requests.get(url, headers=headers) + + if response.status_code == 401: + return False, "Invalid or expired API key." + elif response.status_code == 403: + return False, "Permission denied, account locked." + except Exception as e: + return False + + return True + +def validateRealdebridMountTorrentsPath(): + path = realdebrid['mountTorrentsPath'] + if os.path.exists(path) and any(os.path.isdir(os.path.join(path, child)) for child in os.listdir(path)): + return True + else: + return False, "Path does not exist or has no children." + +def validateTorboxHost(): + url = urljoin(torbox['host'], "stats") + try: + response = requests.get(url) + return response.status_code == 200 + except Exception as e: + return False + +def validateTorboxApiKey(): + url = urljoin(torbox['host'], "user/me") + headers = {'Authorization': f'Bearer {torbox["apiKey"]}'} + try: + response = requests.get(url, headers=headers) + + if response.status_code == 401: + return False, "Invalid or expired API key." + elif response.status_code == 403: + return False, "Permission denied, account locked." + except Exception as e: + return False + + return True + +def validateTorboxMountTorrentsPath(): + path = torbox['mountTorrentsPath'] + if os.path.exists(path) and any(os.path.isdir(os.path.join(path, child)) for child in os.listdir(path)): + return True + else: + return False, "Path does not exist or has no children." + +requiredEnvs = { + 'RealDebrid/TorBox enabled': (True, validateDebridEnabled), +} + +if realdebrid['enabled']: + requiredEnvs.update({ + 'RealDebrid host': (realdebrid['host'], validateRealdebridHost), + 'RealDebrid API key': (realdebrid['apiKey'], validateRealdebridApiKey, True), + 'RealDebrid mount torrents path': (realdebrid['mountTorrentsPath'], validateRealdebridMountTorrentsPath) + }) + +if torbox['enabled']: + requiredEnvs.update({ + 'Torbox host': (torbox['host'], validateTorboxHost), + 'Torbox API key': (torbox['apiKey'], validateTorboxApiKey, True), + 'Torbox mount torrents path': (torbox['mountTorrentsPath'], validateTorboxMountTorrentsPath) + }) + +checkRequiredEnvs(requiredEnvs) + +class TorrentBase(ABC): + STATUS_WAITING_FILES_SELECTION = 'waiting_files_selection' + STATUS_DOWNLOADING = 'downloading' + STATUS_COMPLETED = 'completed' + STATUS_ERROR = 'error' + + def __init__(self, f, fileData, file, failIfNotCached, onlyLargestFile) -> None: + super().__init__() + self.f = f + self.fileData = fileData + self.file = file + self.failIfNotCached = failIfNotCached + self.onlyLargestFile = onlyLargestFile + self.skipAvailabilityCheck = False + self.id = None + self._info = None + self._hash = None + self._instantAvailability = None + + def print(self, *values: object): + print(f"[{datetime.now()}] [{self.__class__.__name__}] [{self.file.fileInfo.filenameWithoutExt}]", *values) + + @abstractmethod + def submitTorrent(self): + pass + + @abstractmethod + def getHash(self): + pass + + @abstractmethod + def addTorrent(self): + pass + + @abstractmethod + async def getInfo(self, refresh=False): + pass + + @abstractmethod + async def selectFiles(self): + pass + + @abstractmethod + def delete(self): + pass + + @abstractmethod + async def getTorrentPath(self): + pass + + @abstractmethod + def _addTorrentFile(self): + pass + + @abstractmethod + def _addMagnetFile(self): + pass + + def _enforceId(self): + if not self.id: + raise Exception("Id is required. Must be acquired via successfully running submitTorrent() first.") + +class RealDebrid(TorrentBase): + def __init__(self, f, fileData, file, failIfNotCached, onlyLargestFile) -> None: + super().__init__(f, fileData, file, failIfNotCached, onlyLargestFile) + self.headers = {'Authorization': f'Bearer {realdebrid["apiKey"]}'} + self.mountTorrentsPath = realdebrid["mountTorrentsPath"] + + def submitTorrent(self): + if self.failIfNotCached: + instantAvailability = self._getInstantAvailability() + self.print('instantAvailability:', not not instantAvailability) + if not instantAvailability: + return False + + return not not self.addTorrent() + + def _getInstantAvailability(self, refresh=False): + torrentHash = self.getHash() + self.print('hash:', torrentHash) + self.skipAvailabilityCheck = True + + return True + + def _getAvailableHost(self): + availableHostsRequest = retryRequest( + lambda: requests.get(urljoin(realdebrid['host'], "torrents/availableHosts"), headers=self.headers), + print=self.print + ) + if availableHostsRequest is None: + return None + + availableHosts = availableHostsRequest.json() + return availableHosts[0]['host'] + + async def getInfo(self, refresh=False): + self._enforceId() + + if refresh or not self._info: + infoRequest = retryRequest( + lambda: requests.get(urljoin(realdebrid['host'], f"torrents/info/{self.id}"), headers=self.headers), + print=self.print + ) + if infoRequest is None: + self._info = None + else: + info = infoRequest.json() + info['status'] = self._normalize_status(info['status']) + self._info = info + + return self._info + + async def selectFiles(self): + self._enforceId() + + info = await self.getInfo() + if info is None: + return False + + self.print('files:', info['files']) + mediaFiles = [file for file in info['files'] if os.path.splitext(file['path'])[1].lower() in mediaExtensions] + + if not mediaFiles: + self.print('no media files found') + return False + + mediaFileIds = {str(file['id']) for file in mediaFiles} + self.print('required fileIds:', mediaFileIds) + + largestMediaFile = max(mediaFiles, key=lambda file: file['bytes']) + largestMediaFileId = str(largestMediaFile['id']) + self.print('only largest file:', self.onlyLargestFile) + self.print('largest file:', largestMediaFile) + + if self.onlyLargestFile and len(mediaFiles) > 1: + discordUpdate('largest file:', largestMediaFile['path']) + + files = {'files': [largestMediaFileId] if self.onlyLargestFile else ','.join(mediaFileIds)} + selectFilesRequest = retryRequest( + lambda: requests.post(urljoin(realdebrid['host'], f"torrents/selectFiles/{self.id}"), headers=self.headers, data=files), + print=self.print + ) + if selectFilesRequest is None: + return False + + return True + + def delete(self): + self._enforceId() + + deleteRequest = retryRequest( + lambda: requests.delete(urljoin(realdebrid['host'], f"torrents/delete/{self.id}"), headers=self.headers), + print=self.print + ) + return not not deleteRequest + + + async def getTorrentPath(self): + filename = (await self.getInfo())['filename'] + originalFilename = (await self.getInfo())['original_filename'] + + folderPathMountFilenameTorrent = os.path.join(self.mountTorrentsPath, filename) + folderPathMountOriginalFilenameTorrent = os.path.join(self.mountTorrentsPath, originalFilename) + folderPathMountOriginalFilenameWithoutExtTorrent = os.path.join(self.mountTorrentsPath, os.path.splitext(originalFilename)[0]) + + if os.path.exists(folderPathMountFilenameTorrent) and os.listdir(folderPathMountFilenameTorrent): + folderPathMountTorrent = folderPathMountFilenameTorrent + elif os.path.exists(folderPathMountOriginalFilenameTorrent) and os.listdir(folderPathMountOriginalFilenameTorrent): + folderPathMountTorrent = folderPathMountOriginalFilenameTorrent + elif (originalFilename.endswith(('.mkv', '.mp4')) and + os.path.exists(folderPathMountOriginalFilenameWithoutExtTorrent) and os.listdir(folderPathMountOriginalFilenameWithoutExtTorrent)): + folderPathMountTorrent = folderPathMountOriginalFilenameWithoutExtTorrent + else: + folderPathMountTorrent = None + + return folderPathMountTorrent + + def _addFile(self, request, endpoint, data): + host = self._getAvailableHost() + if host is None: + return None + + request = retryRequest( + lambda: request(urljoin(realdebrid['host'], endpoint), params={'host': host}, headers=self.headers, data=data), + print=self.print + ) + if request is None: + return None + + response = request.json() + self.print('response info:', response) + self.id = response['id'] + + return self.id + + def _addTorrentFile(self): + return self._addFile(requests.put, "torrents/addTorrent", self.f) + + def _addMagnetFile(self): + return self._addFile(requests.post, "torrents/addMagnet", {'magnet': self.fileData}) + + def _normalize_status(self, status): + if status in ['waiting_files_selection']: + return self.STATUS_WAITING_FILES_SELECTION + elif status in ['magnet_conversion', 'queued', 'downloading', 'compressing', 'uploading']: + return self.STATUS_DOWNLOADING + elif status == 'downloaded': + return self.STATUS_COMPLETED + elif status in ['magnet_error', 'error', 'dead', 'virus']: + return self.STATUS_ERROR + return status + +class Torbox(TorrentBase): + def __init__(self, f, fileData, file, failIfNotCached, onlyLargestFile) -> None: + super().__init__(f, fileData, file, failIfNotCached, onlyLargestFile) + self.headers = {'Authorization': f'Bearer {torbox["apiKey"]}'} + self.mountTorrentsPath = torbox["mountTorrentsPath"] + self.submittedTime = None + self.lastInactiveCheck = None + + userInfoRequest = retryRequest( + lambda: requests.get(urljoin(torbox['host'], "user/me"), headers=self.headers), + print=self.print + ) + if userInfoRequest is not None: + userInfo = userInfoRequest.json() + self.authId = userInfo['data']['auth_id'] + + def submitTorrent(self): + if self.failIfNotCached: + instantAvailability = self._getInstantAvailability() + self.print('instantAvailability:', not not instantAvailability) + if not instantAvailability: + return False + + if self.addTorrent(): + self.submittedTime = datetime.now() + return True + return False + + def _getInstantAvailability(self, refresh=False): + if refresh or not self._instantAvailability: + torrentHash = self.getHash() + self.print('hash:', torrentHash) + + instantAvailabilityRequest = retryRequest( + lambda: requests.get( + urljoin(torbox['host'], "torrents/checkcached"), + headers=self.headers, + params={'hash': torrentHash, 'format': 'object'} + ), + print=self.print + ) + if instantAvailabilityRequest is None: + return None + + instantAvailabilities = instantAvailabilityRequest.json() + self.print('instantAvailabilities:', instantAvailabilities) + + # Check if 'data' exists and is not None or False + if instantAvailabilities and 'data' in instantAvailabilities and instantAvailabilities['data']: + self._instantAvailability = instantAvailabilities['data'] + else: + self._instantAvailability = None + + return self._instantAvailability + + async def getInfo(self, refresh=False): + self._enforceId() + + if refresh or not self._info: + if not self.authId: + return None + + currentTime = datetime.now() + if (currentTime - self.submittedTime).total_seconds() < 300: + if not self.lastInactiveCheck or (currentTime - self.lastInactiveCheck).total_seconds() > 5: + inactiveCheckUrl = f"https://relay.torbox.app/v1/inactivecheck/torrent/{self.authId}/{self.id}" + retryRequest( + lambda: requests.get(inactiveCheckUrl), + print=self.print + ) + self.lastInactiveCheck = currentTime + for _ in range(60): + infoRequest = retryRequest( + lambda: requests.get(urljoin(torbox['host'], "torrents/mylist"), headers=self.headers), + print=self.print + ) + if infoRequest is None: + return None + + torrents = infoRequest.json()['data'] + + for torrent in torrents: + if torrent['id'] == self.id: + torrent['status'] = self._normalize_status(torrent['download_state'], torrent['download_finished']) + self._info = torrent + return self._info + + await asyncio.sleep(1) + return self._info + + async def selectFiles(self): + pass + + def delete(self): + self._enforceId() + + deleteRequest = retryRequest( + lambda: requests.delete(urljoin(torbox['host'], "torrents/controltorrent"), headers=self.headers, data={'torrent_id': self.id, 'operation': "Delete"}), + print=self.print + ) + return not not deleteRequest + + async def getTorrentPath(self): + filename = (await self.getInfo())['files'][0]['name'].split("/")[0] + + folderPathMountFilenameTorrent = os.path.join(self.mountTorrentsPath, filename) + + if os.path.exists(folderPathMountFilenameTorrent) and os.listdir(folderPathMountFilenameTorrent): + folderPathMountTorrent = folderPathMountFilenameTorrent + else: + folderPathMountTorrent = None + + return folderPathMountTorrent + + def _addFile(self, data=None, files=None): + request = retryRequest( + lambda: requests.post(urljoin(torbox['host'], "torrents/createtorrent"), headers=self.headers, data=data, files=files), + print=self.print + ) + if request is None: + return None + + response = request.json() + self.print('response info:', response) + + if response.get('detail') == 'queued': + return None + + self.id = response['data']['torrent_id'] + + return self.id + + def _addTorrentFile(self): + nametorrent = self.f.name.split('/')[-1] + files = {'file': (nametorrent, self.f, 'application/x-bittorrent')} + return self._addFile(files=files) + + def _addMagnetFile(self): + return self._addFile(data={'magnet': self.fileData}) + + def _normalize_status(self, status, download_finished): + if download_finished: + return self.STATUS_COMPLETED + elif status in [ + 'completed', 'cached', 'paused', 'downloading', 'uploading', + 'checkingResumeData', 'metaDL', 'pausedUP', 'queuedUP', 'checkingUP', + 'forcedUP', 'allocating', 'downloading', 'metaDL', 'pausedDL', + 'queuedDL', 'checkingDL', 'forcedDL', 'checkingResumeData', 'moving' + ]: + return self.STATUS_DOWNLOADING + elif status in ['error', 'stalledUP', 'stalledDL', 'stalled (no seeds)', 'missingFiles', 'failed']: + return self.STATUS_ERROR + return status + +class Torrent(TorrentBase): + def getHash(self): + + if not self._hash: + import bencode3 + self._hash = hashlib.sha1(bencode3.bencode(bencode3.bdecode(self.fileData)['info'])).hexdigest() + + return self._hash + + def addTorrent(self): + return self._addTorrentFile() + +class Magnet(TorrentBase): + def getHash(self): + + if not self._hash: + # Consider changing when I'm more familiar with hashes + self._hash = re.search('xt=urn:btih:(.+?)(?:&|$)', self.fileData).group(1) + + return self._hash + + def addTorrent(self): + return self._addMagnetFile() + + +class RealDebridTorrent(RealDebrid, Torrent): + pass + +class RealDebridMagnet(RealDebrid, Magnet): + pass + +class TorboxTorrent(Torbox, Torrent): + pass + +class TorboxMagnet(Torbox, Magnet): + pass diff --git a/shared/discord.py b/shared/discord.py index 357e997..b27d14a 100644 --- a/shared/discord.py +++ b/shared/discord.py @@ -39,3 +39,44 @@ def discordUpdate(title, message=None): embeds=[embed] ) response = webhook.execute() + +def discordStatusUpdate(torrentDict, webhook=None, edit=False, delete=False): + if discord['updateEnabled']: + if webhook and webhook.id: + webhook.delete() + + webhook = DiscordWebhook( + url=discord['webhookUrl'], + rate_limit_retry=True, + username='Status Bot' + ) + + if delete: + embed = DiscordEmbed("Downloading Status", f"No Active Downloads", color=9807270) + webhook.add_embed(embed) + # response = webhook.edit() + webhook.__dict__["flags"] = 4096 + response = webhook.execute(remove_embeds=True) + return webhook + + if not edit: + embed = DiscordEmbed("Downloading Status", f"Current downloading - {len(torrentDict)}", color=16776960) + for filename,progress in torrentDict.items(): + embed.add_embed_field(name=filename, value=progress, inline=False) + + webhook.add_embed(embed) + # response = webhook.edit() + webhook.__dict__["flags"] = 4096 + response = webhook.execute(remove_embeds=True) + return webhook + else: + # webhook.remove_embeds() # Used for editing + embed = DiscordEmbed("Downloading Status", f"Current downloading - {len(torrentDict)}", color=16776960) + for filename,progress in torrentDict.items(): + embed.add_embed_field(name=filename, value=progress, inline=False) + + webhook.add_embed(embed) + # response = webhook.edit() # used for editing + webhook.__dict__["flags"] = 4096 + response = webhook.execute(remove_embeds=True) + return webhook \ No newline at end of file diff --git a/shared/requests.py b/shared/requests.py new file mode 100644 index 0000000..fe689a8 --- /dev/null +++ b/shared/requests.py @@ -0,0 +1,60 @@ +import time +import requests +from typing import Callable, Optional +from shared.discord import discordError, discordUpdate + + +def retryRequest( + requestFunc: Callable[[], requests.Response], + print: Callable[..., None] = print, + retries: int = 1, + delay: int = 1 +) -> Optional[requests.Response]: + """ + Retry a request if the response status code is not in the 200 range. + + :param requestFunc: A callable that returns an HTTP response. + :param print: Optional print function for logging. + :param retries: The number of times to retry the request after the initial attempt. + :param delay: The delay between retries in seconds. + :return: The response object or None if all attempts fail. + """ + attempts = retries + 1 # Total attempts including the initial one + for attempt in range(attempts): + try: + response = requestFunc() + if 200 <= response.status_code < 300: + return response + else: + message = [ + f"URL: {response.url}", + f"Status code: {response.status_code}", + f"Message: {response.reason}", + f"Response: {response.content}", + f"Attempt {attempt + 1} failed" + ] + for line in message: + print(line) + if attempt == retries: + discordError("Request Failed", "\n".join(message)) + else: + update_message = message + [f"Retrying in {delay} seconds..."] + discordUpdate("Retrying Request", "\n".join(update_message)) + print(f"Retrying in {delay} seconds...") + time.sleep(delay) + except requests.RequestException as e: + message = [ + f"URL: {response.url if 'response' in locals() else 'unknown'}", + f"Attempt {attempt + 1} encountered an error: {e}" + ] + for line in message: + print(line) + if attempt == retries: + discordError("Request Exception", "\n".join(message)) + else: + update_message = message + [f"Retrying in {delay} seconds..."] + discordUpdate("Retrying Request", "\n".join(update_message)) + print(f"Retrying in {delay} seconds...") + time.sleep(delay) + + return None \ No newline at end of file diff --git a/shared/shared.py b/shared/shared.py index c97cdeb..c7b426b 100644 --- a/shared/shared.py +++ b/shared/shared.py @@ -28,6 +28,7 @@ def stringEnvParser(value): 'rdMountRefreshSeconds': env.int('BLACKHOLE_RD_MOUNT_REFRESH_SECONDS', default=None), 'waitForTorrentTimeout': env.int('BLACKHOLE_WAIT_FOR_TORRENT_TIMEOUT', default=None), 'historyPageSize': env.int('BLACKHOLE_HISTORY_PAGE_SIZE', default=None), + 'waitForProgressChange': env.int('BLACKHOLE_WAIT_FOR_PROGRESS_CHANGE', default=None), } server = {