Source code for yarqueue.watch.http

import json
from itertools import zip_longest
from pathlib import Path
from typing import Optional, Dict

from redis import Redis

from .. import __version__

try:
    import click
except ImportError:
    raise ImportError(
        "click not importable; HTTP watcher not available. pip install click"
    )

try:
    import flask
    from flask import Flask
    from werkzeug.serving import run_simple
except ImportError:
    raise ImportError(
        "flask not importable; HTTP watcher not available. pip install flask"
    )

from .common import QueueWatcher, DEFAULT_INTERVAL


TEMPLATE_DIR = Path(__file__).resolve().parent / "templates"


[docs]class JsonQueueWatcher: def __init__(self, watcher: QueueWatcher, total=None): self.watcher = watcher self.total = total or len(self.watcher)
[docs] def status(self): queued, in_progress = self.watcher.queued_inprogress() obj = {"queued": queued, "inProgress": in_progress} if self.total: obj["total"] = self.total return obj
[docs]class MultiJson: def __init__(self, redis, names_totals: Dict[str, Optional[int]]): self.json_watcher = [ JsonQueueWatcher(QueueWatcher(name, redis), total) for name, total in names_totals.items() ] def __iter__(self): yield from self.json_watcher
[docs] def status(self, names=None): statuses = {w.watcher.name: w.status() for w in self} if names: return {n: statuses.get(n) for n in names} else: return statuses
[docs]def main(host, port, redis, names_totals): watchers = MultiJson(redis, names_totals) app = Flask("yarqserve", template_folder=TEMPLATE_DIR) @app.route("/") def page(): statuses = watchers.status() context = { "interval": DEFAULT_INTERVAL * 1000, "names": list(statuses), "statusStr": json.dumps(statuses), } return flask.render_template("index.html", **context) @app.route("/json") def data(): return flask.jsonify(watchers.status()) run_simple(host, port, app)
@click.command(help="Watch the progress of a number of redis-backed queues, over HTTP.") @click.version_option(version=__version__) @click.help_option() @click.option( "--name", "-n", multiple=True, help="Name of redis lists to watch (accepts multiple)", ) @click.option( "--total", "-t", multiple=True, type=int, help="Total items added to the queue (accepts multiple, same order as --name", ) # @click.option( # "--interval", "-i", default=1, type=float, help="Polling interval (seconds)", # show_default=True, # ) @click.option( "--host", default="localhost", help="Hostname at which to run server", show_default=True, ) @click.option( "--port", default=8080, type=int, help="Port on which to run server", show_default=True, ) @click.option( "--rhost", default="localhost", help="Hostname for the Redis instance", show_default=True, ) @click.option( "--rport", default=6379, type=int, help="Port for the Redis instance", show_default=True, ) @click.option( "--db", default=0, type=int, help="DB ID for the Redis instance", show_default=True ) @click.option( "--password", type=int, help="Password for the Redis instance", show_default=True ) def yarqserve(name, total, host, port, rhost, rport, db, password): redis = Redis(rhost, rport, db, password) names_totals = dict(zip_longest(name, total)) main(host, port, redis, names_totals) if __name__ == "__main__": yarqserve()