Skip to content

Commit 7d64ddf

Browse files
get_processing task
1 parent cee0231 commit 7d64ddf

2 files changed

Lines changed: 50 additions & 2 deletions

File tree

modelq/app/base.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -814,4 +814,52 @@ def remove_task_from_queue(self, task_id: str) -> bool:
814814
except Exception as e:
815815
logger.error(f"Failed to process task while trying to remove: {e}")
816816
return removed
817-
817+
818+
def get_processing_tasks(self) -> list[Dict[str, Any]]:
819+
"""
820+
Returns a list of task dicts that are currently in 'processing' state.
821+
It cross-checks the 'processing_tasks' Redis set with the per-task keys
822+
(task:{task_id}) and cleans up stale/mismatched entries.
823+
"""
824+
results: list[Dict[str, Any]] = []
825+
826+
# 1) Fetch all task IDs the workers marked as processing
827+
raw_ids = self.redis_client.smembers("processing_tasks")
828+
task_ids = [
829+
tid.decode("utf-8") if isinstance(tid, (bytes, bytearray)) else tid
830+
for tid in raw_ids
831+
]
832+
if not task_ids:
833+
return results
834+
835+
# 2) Batch fetch their task records with a pipeline to minimize RTT
836+
keys = [f"task:{tid}" for tid in task_ids]
837+
with self.redis_client.pipeline() as pipe:
838+
for k in keys:
839+
pipe.get(k)
840+
task_jsons = pipe.execute()
841+
842+
# 3) Validate & filter to status == 'processing'
843+
for tid, tjson in zip(task_ids, task_jsons):
844+
if not tjson:
845+
# No task record -> remove stale entry from processing set
846+
self.redis_client.srem("processing_tasks", tid)
847+
logger.warning(f"Stale processing entry removed (no record): {tid}")
848+
continue
849+
850+
try:
851+
task_dict = json.loads(tjson)
852+
except Exception as e:
853+
logger.error(f"Failed to parse task record for {tid}: {e}")
854+
# Conservatively remove from processing set if corrupted
855+
self.redis_client.srem("processing_tasks", tid)
856+
continue
857+
858+
status = task_dict.get("status")
859+
if status == "processing":
860+
results.append(task_dict)
861+
else:
862+
# If status drifted away from 'processing', tidy up the set
863+
self.redis_client.srem("processing_tasks", tid)
864+
865+
return results

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "modelq"
3-
version = "1.0.3"
3+
version = "1.0.4"
44
description = "Celery-like task queue for ML inference."
55
authors = ["Tanmaypatil123 <tanmay@modelslab.com>"]
66
readme = "README.md"

0 commit comments

Comments
 (0)