diff --git a/app/commands.py b/app/commands.py index 4ea3b1e7528e9cb885a47e679b4afd8ac5b4e4f4..4aa9756e01b2a280ec506d9126e2c1997828a65b 100644 --- a/app/commands.py +++ b/app/commands.py @@ -106,7 +106,7 @@ def register_cli(app): redis_url = current_app.config["REDIS_URL"] redis_connection = redis.from_url(redis_url) with rq.Connection(redis_connection): - worker = TaskWorker(current_app.config["QUEUES"]) + worker = TaskWorker(["high", "normal", "low"]) if current_app.config["SENTRY_DSN"]: client = Client( current_app.config["SENTRY_DSN"], diff --git a/app/models.py b/app/models.py index 176ddbad1fa2cc676f89567df8d420cf47f28936..b5df4e86418dcf646aa158b2d8de32fabd0fb276 100644 --- a/app/models.py +++ b/app/models.py @@ -309,13 +309,13 @@ class User(db.Model, UserMixin): ] return [favorite for favorites in favorites_list for favorite in favorites] - def launch_task(self, name, func, *args, **kwargs): + def launch_task(self, name, func, queue_name="normal", **kwargs): """Launch a task in the background using RQ The task is added to the session but not committed. """ - q = Queue() - job = q.enqueue(f"app.tasks.{func}", *args, **kwargs) + q = Queue(queue_name) + job = q.enqueue(f"app.tasks.{func}", **kwargs) # The status will be set to QUEUED or DEFERRED task = Task( id=job.id, diff --git a/app/settings.py b/app/settings.py index 63e824f494039c828771db4531415adb94c9dae1..e3d59165507f3063ec8676f2bad5d1025db0fa0d 100644 --- a/app/settings.py +++ b/app/settings.py @@ -34,7 +34,6 @@ SESSION_REDIS_URL = "redis://redis:6379/0" CACHE_TYPE = "redis" CACHE_REDIS_URL = "redis://redis:6379/1" REDIS_URL = "redis://redis:6379/2" -QUEUES = ["default"] ELASTICSEARCH_URL = "http://elasticsearch:9200" ELASTICSEARCH_INDEX_SUFFIX = "-dev" diff --git a/app/tasks.py b/app/tasks.py index 96808d9d227169718c4ee6bd0f5668ecacab26fc..e4631c5251ee3a55f00efca512abbe98cc9fd206 100644 --- a/app/tasks.py +++ b/app/tasks.py @@ -108,6 +108,7 @@ def trigger_vm_creation( task = current_user.launch_task( task_name, resource="job", + queue_name="low", func="launch_job_template", job_template=job_template, extra_vars=extra_vars, @@ -117,6 +118,7 @@ def trigger_vm_creation( current_user.launch_task( "trigger_post_install_job", resource="job", + queue_name="low", func="launch_job_template", job_template=post_job_template, limit=f"{host.fqdn}", @@ -147,6 +149,7 @@ def trigger_ztp_configuration(host): task = current_user.launch_task( "trigger_ztp_configuration", resource="job", + queue_name="low", func="launch_job_template", job_template=job_template, extra_vars=extra_vars, diff --git a/app/utils.py b/app/utils.py index 17b2d5e34e6be4f8c8b413d0aab8c1301e1c2492..803e69fcedc1d359b556de322d41802a64224c2c 100644 --- a/app/utils.py +++ b/app/utils.py @@ -256,7 +256,9 @@ def trigger_core_services_update(): # There is already one running task. Trigger a new one when it's done. kwargs["depends_on"] = started.id current_app.logger.info(f"Launch new job to update core services: {job_template}") - task = current_user.launch_task("trigger_core_services_update", **kwargs) + task = current_user.launch_task( + "trigger_core_services_update", queue_name="normal", **kwargs + ) return task