From f30a711ba05bae06b458423b1e72a88ee10abcc2 Mon Sep 17 00:00:00 2001
From: Benjamin Bertrand <benjamin.bertrand@esss.se>
Date: Fri, 8 Mar 2019 22:49:25 +0100
Subject: [PATCH] Use queues of different priorities

Ensure that the trigger_core_services_update job is always processed
before the other jobs (low queue).

JIRA INFRA-887
---
 app/commands.py | 2 +-
 app/models.py   | 6 +++---
 app/settings.py | 1 -
 app/tasks.py    | 3 +++
 app/utils.py    | 4 +++-
 5 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/app/commands.py b/app/commands.py
index 4ea3b1e..4aa9756 100644
--- a/app/commands.py
+++ b/app/commands.py
@@ -106,7 +106,7 @@ def register_cli(app):
         redis_url = current_app.config["REDIS_URL"]
         redis_connection = redis.from_url(redis_url)
         with rq.Connection(redis_connection):
-            worker = TaskWorker(current_app.config["QUEUES"])
+            worker = TaskWorker(["high", "normal", "low"])
             if current_app.config["SENTRY_DSN"]:
                 client = Client(
                     current_app.config["SENTRY_DSN"],
diff --git a/app/models.py b/app/models.py
index 176ddba..b5df4e8 100644
--- a/app/models.py
+++ b/app/models.py
@@ -309,13 +309,13 @@ class User(db.Model, UserMixin):
         ]
         return [favorite for favorites in favorites_list for favorite in favorites]
 
-    def launch_task(self, name, func, *args, **kwargs):
+    def launch_task(self, name, func, queue_name="normal", **kwargs):
         """Launch a task in the background using RQ
 
         The task is added to the session but not committed.
         """
-        q = Queue()
-        job = q.enqueue(f"app.tasks.{func}", *args, **kwargs)
+        q = Queue(queue_name)
+        job = q.enqueue(f"app.tasks.{func}", **kwargs)
         # The status will be set to QUEUED or DEFERRED
         task = Task(
             id=job.id,
diff --git a/app/settings.py b/app/settings.py
index 63e824f..e3d5916 100644
--- a/app/settings.py
+++ b/app/settings.py
@@ -34,7 +34,6 @@ SESSION_REDIS_URL = "redis://redis:6379/0"
 CACHE_TYPE = "redis"
 CACHE_REDIS_URL = "redis://redis:6379/1"
 REDIS_URL = "redis://redis:6379/2"
-QUEUES = ["default"]
 
 ELASTICSEARCH_URL = "http://elasticsearch:9200"
 ELASTICSEARCH_INDEX_SUFFIX = "-dev"
diff --git a/app/tasks.py b/app/tasks.py
index 96808d9..e4631c5 100644
--- a/app/tasks.py
+++ b/app/tasks.py
@@ -108,6 +108,7 @@ def trigger_vm_creation(
     task = current_user.launch_task(
         task_name,
         resource="job",
+        queue_name="low",
         func="launch_job_template",
         job_template=job_template,
         extra_vars=extra_vars,
@@ -117,6 +118,7 @@ def trigger_vm_creation(
         current_user.launch_task(
             "trigger_post_install_job",
             resource="job",
+            queue_name="low",
             func="launch_job_template",
             job_template=post_job_template,
             limit=f"{host.fqdn}",
@@ -147,6 +149,7 @@ def trigger_ztp_configuration(host):
     task = current_user.launch_task(
         "trigger_ztp_configuration",
         resource="job",
+        queue_name="low",
         func="launch_job_template",
         job_template=job_template,
         extra_vars=extra_vars,
diff --git a/app/utils.py b/app/utils.py
index 17b2d5e..803e69f 100644
--- a/app/utils.py
+++ b/app/utils.py
@@ -256,7 +256,9 @@ def trigger_core_services_update():
         # There is already one running task. Trigger a new one when it's done.
         kwargs["depends_on"] = started.id
     current_app.logger.info(f"Launch new job to update core services: {job_template}")
-    task = current_user.launch_task("trigger_core_services_update", **kwargs)
+    task = current_user.launch_task(
+        "trigger_core_services_update", queue_name="normal", **kwargs
+    )
     return task
 
 
-- 
GitLab