Forked from
ICS Control System Infrastructure / csentry
299 commits behind the upstream repository.
-
Benjamin Bertrand authored
Use the before_flush hook to trigger the core services update. It's much nicer than having to call a method in several places. The core services update is triggered on creation/modification/deletion of an Interface. In the before_flush we can manipulate the session (add objects) but we shouldn't call commit. The db.session.commit() was removed from the launch_task method.
Benjamin Bertrand authoredUse the before_flush hook to trigger the core services update. It's much nicer than having to call a method in several places. The core services update is triggered on creation/modification/deletion of an Interface. In the before_flush we can manipulate the session (add objects) but we shouldn't call commit. The db.session.commit() was removed from the launch_task method.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tasks.py 4.97 KiB
# -*- coding: utf-8 -*-
"""
app.tasks
~~~~~~~~~
This module implements tasks to run.
:copyright: (c) 2018 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.
"""
import time
import traceback
import tower_cli
from flask import current_app
from rq import Worker, get_current_job
from .extensions import db
from . import utils, models
class TaskWorker(Worker):
"""
Modified version of the rq worker which updates
the task status and end time in the CSEntry database
"""
def save_exception(self, job, *exc_info):
"""Save the exception to the database
The exception is only saved if it occured before the AWX job was triggered.
If the AWX job failed, we can refer to the logs on AWX.
"""
task = models.Task.query.get(job.id)
if task.awx_job_id is None:
# No AWX job was triggered. An exception occured before. Save it.
task.exception = self._get_safe_exception_string(
traceback.format_exception(*exc_info)
)
db.session.commit()
def update_task_attributes(self, job, attributes):
"""Update the attributes of the task linked to the given job"""
# The task is created after enqueueing the job.
# If the job is processed very quickly, the task might
# not be found in the database. We wait up to 3 seconds.
for _ in range(3):
task = models.Task.query.get(job.id)
if task is not None:
break
else:
self.log.warning("task not found...")
time.sleep(1)
else:
self.log.error(f"Task {job.id} not found! Task attribute not updated!")
return
for name, value in attributes.items():
setattr(task, name, value)
db.session.commit()
def move_to_failed_queue(self, job, *exc_info):
# This could be achieved by passing a custom exception handler
# when initializing the worker. As we already subclass it, it's
# easier to override the default handler in case of failure
self.update_task_attributes(
job, {"ended_at": job.ended_at, "status": models.JobStatus.FAILED}
)
self.save_exception(job, *exc_info)
super().move_to_failed_queue(job, *exc_info)
def handle_job_success(self, job, queue, started_job_registry):
self.update_task_attributes(
job, {"ended_at": job.ended_at, "status": models.JobStatus.FINISHED}
)
super().handle_job_success(job, queue, started_job_registry)
def prepare_job_execution(self, job):
self.update_task_attributes(job, {"status": models.JobStatus.STARTED})
super().prepare_job_execution(job)
def trigger_vm_creation(name, interface, memory, cores):
"""Trigger a job to create a virtual machine or virtual IOC"""
extra_vars = [
f"vmname={name}",
f"memory={memory}",
f"cores={cores}",
f"vcpus={cores}",
f"vlan_name={interface.network.vlan_name}",
f"vlan_id={interface.network.vlan_id}",
f"mac={interface.mac.address}",
]
if interface.is_ioc:
task_name = "trigger_vioc_creation"
job_template = current_app.config["AWX_CREATE_VIOC"]
else:
task_name = "trigger_vm_creation"
job_template = current_app.config["AWX_CREATE_VM"]
extra_vars.extend(
[
f"ip_address={interface.ip}",
f"domain={interface.network.domain.name}",
f'dns={current_app.config["VM_DEFAULT_DNS"]}',
f"netmask={interface.network.netmask}",
f"gateway={interface.network.gateway}",
]
)
current_app.logger.info(
f"Launch new job to create the {name} VM: {job_template} with {extra_vars}"
)
user = utils.cse_current_user()
task = user.launch_task(
task_name,
func="launch_job_template",
job_template=job_template,
extra_vars=extra_vars,
timeout=500,
)
return task
def launch_job_template(job_template, **kwargs):
rq_job = get_current_job()
if job_template in (
current_app.config["AWX_CREATE_VIOC"],
current_app.config["AWX_CREATE_VM"],
) and not current_app.config.get("AWX_VM_CREATION_ENABLED", False):
current_app.logger.info("AWX VM creation is disabled. Not sending any request.")
return "AWX VM creation not triggered"
if not current_app.config.get("AWX_JOB_ENABLED", False):
current_app.logger.info("AWX job is disabled. Not sending any request.")
return "AWX job not triggered"
# Launch the AWX job
resource = tower_cli.get_resource("job")
result = resource.launch(job_template=job_template, **kwargs)
# Save the AWX job id in the task
task = models.Task.query.get(rq_job.id)
task.awx_job_id = result["id"]
db.session.commit()
# Monitor the job until done
result = resource.monitor(pk=result["id"])
return result