Skip to content
Snippets Groups Projects
Commit f30a711b authored by Benjamin Bertrand's avatar Benjamin Bertrand
Browse files

Use queues of different priorities

Ensure that the trigger_core_services_update job is always processed
before the other jobs (low queue).

JIRA INFRA-887
parent 8f209774
No related branches found
No related tags found
No related merge requests found
......@@ -106,7 +106,7 @@ def register_cli(app):
redis_url = current_app.config["REDIS_URL"]
redis_connection = redis.from_url(redis_url)
with rq.Connection(redis_connection):
worker = TaskWorker(current_app.config["QUEUES"])
worker = TaskWorker(["high", "normal", "low"])
if current_app.config["SENTRY_DSN"]:
client = Client(
current_app.config["SENTRY_DSN"],
......
......@@ -309,13 +309,13 @@ class User(db.Model, UserMixin):
]
return [favorite for favorites in favorites_list for favorite in favorites]
def launch_task(self, name, func, *args, **kwargs):
def launch_task(self, name, func, queue_name="normal", **kwargs):
"""Launch a task in the background using RQ
The task is added to the session but not committed.
"""
q = Queue()
job = q.enqueue(f"app.tasks.{func}", *args, **kwargs)
q = Queue(queue_name)
job = q.enqueue(f"app.tasks.{func}", **kwargs)
# The status will be set to QUEUED or DEFERRED
task = Task(
id=job.id,
......
......@@ -34,7 +34,6 @@ SESSION_REDIS_URL = "redis://redis:6379/0"
CACHE_TYPE = "redis"
CACHE_REDIS_URL = "redis://redis:6379/1"
REDIS_URL = "redis://redis:6379/2"
QUEUES = ["default"]
ELASTICSEARCH_URL = "http://elasticsearch:9200"
ELASTICSEARCH_INDEX_SUFFIX = "-dev"
......
......@@ -108,6 +108,7 @@ def trigger_vm_creation(
task = current_user.launch_task(
task_name,
resource="job",
queue_name="low",
func="launch_job_template",
job_template=job_template,
extra_vars=extra_vars,
......@@ -117,6 +118,7 @@ def trigger_vm_creation(
current_user.launch_task(
"trigger_post_install_job",
resource="job",
queue_name="low",
func="launch_job_template",
job_template=post_job_template,
limit=f"{host.fqdn}",
......@@ -147,6 +149,7 @@ def trigger_ztp_configuration(host):
task = current_user.launch_task(
"trigger_ztp_configuration",
resource="job",
queue_name="low",
func="launch_job_template",
job_template=job_template,
extra_vars=extra_vars,
......
......@@ -256,7 +256,9 @@ def trigger_core_services_update():
# There is already one running task. Trigger a new one when it's done.
kwargs["depends_on"] = started.id
current_app.logger.info(f"Launch new job to update core services: {job_template}")
task = current_user.launch_task("trigger_core_services_update", **kwargs)
task = current_user.launch_task(
"trigger_core_services_update", queue_name="normal", **kwargs
)
return task
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment