Source code for yarqueue.watch.common

import sys

from redis import Redis

from ..base_queue import BaseJoinableQueue
from ..queue import JoinableQueue, Queue

DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8080

DEFAULT_RHOST = "localhost"
DEFAULT_DB = 0
DEFAULT_RPORT = 6379
DEFAULT_INTERVAL = 1

DEFAULT_REDIS = Redis(DEFAULT_RHOST, DEFAULT_RPORT, db=DEFAULT_DB)


[docs]class QueueWatcher: def __init__(self, name, redis: Redis = DEFAULT_REDIS): self.name = name if redis.exists(name + "__counter"): self._queue = JoinableQueue(name=name, redis=redis) else: self._queue = Queue(name=name, redis=redis) def __len__(self): return sum(self.queued_inprogress())
[docs] def queued_inprogress(self): if isinstance(self._queue, BaseJoinableQueue): in_prog = self._queue.n_in_progress() else: in_prog = 0 return len(self._queue), in_prog
[docs]def signal_handler(sig, frame): print("Detected interrupt, exiting", file=sys.stderr) sys.exit(0)