# -*- coding: utf-8 -*- """ app.tasks ~~~~~~~~~ This module implements tasks to run. :copyright: (c) 2018 European Spallation Source ERIC :license: BSD 2-Clause, see LICENSE for more details. """ import time import traceback import tower_cli from flask import current_app from rq import Worker, get_current_job from .extensions import db from . import utils, models class TaskWorker(Worker): """ Modified version of the rq worker which updates the task status and end time in the CSEntry database """ def save_exception(self, job, *exc_info): """Save the exception to the database The exception is only saved if it occured before the AWX job was triggered. If the AWX job failed, we can refer to the logs on AWX. """ task = models.Task.query.get(job.id) if task.awx_job_id is None: # No AWX job was triggered. An exception occured before. Save it. task.exception = self._get_safe_exception_string( traceback.format_exception(*exc_info) ) db.session.commit() def update_task_attributes(self, job, attributes): """Update the attributes of the task linked to the given job""" # The task is created after enqueueing the job. # If the job is processed very quickly, the task might # not be found in the database. We wait up to 3 seconds. for _ in range(3): task = models.Task.query.get(job.id) if task is not None: break else: self.log.warning("task not found...") time.sleep(1) else: self.log.error(f"Task {job.id} not found! Task attribute not updated!") return for name, value in attributes.items(): setattr(task, name, value) db.session.commit() def move_to_failed_queue(self, job, *exc_info): # This could be achieved by passing a custom exception handler # when initializing the worker. As we already subclass it, it's # easier to override the default handler in case of failure self.update_task_attributes( job, {"ended_at": job.ended_at, "status": models.JobStatus.FAILED} ) self.save_exception(job, *exc_info) super().move_to_failed_queue(job, *exc_info) def handle_job_success(self, job, queue, started_job_registry): self.update_task_attributes( job, {"ended_at": job.ended_at, "status": models.JobStatus.FINISHED} ) super().handle_job_success(job, queue, started_job_registry) def prepare_job_execution(self, job): self.update_task_attributes(job, {"status": models.JobStatus.STARTED}) super().prepare_job_execution(job) def trigger_vm_creation(name, interface, memory, cores): """Trigger a job to create a virtual machine or virtual IOC""" extra_vars = [ f"vmname={name}", f"memory={memory}", f"cores={cores}", f"vcpus={cores}", f"vlan_name={interface.network.vlan_name}", f"vlan_id={interface.network.vlan_id}", f"mac={interface.mac.address}", ] if interface.is_ioc: task_name = "trigger_vioc_creation" job_template = current_app.config["AWX_CREATE_VIOC"] else: task_name = "trigger_vm_creation" job_template = current_app.config["AWX_CREATE_VM"] extra_vars.extend( [ f"ip_address={interface.ip}", f"domain={interface.network.domain.name}", f'dns={current_app.config["VM_DEFAULT_DNS"]}', f"netmask={interface.network.netmask}", f"gateway={interface.network.gateway}", ] ) current_app.logger.info( f"Launch new job to create the {name} VM: {job_template} with {extra_vars}" ) user = utils.cse_current_user() task = user.launch_task( task_name, func="launch_job_template", job_template=job_template, extra_vars=extra_vars, timeout=500, ) return task def launch_job_template(job_template, **kwargs): rq_job = get_current_job() if job_template in ( current_app.config["AWX_CREATE_VIOC"], current_app.config["AWX_CREATE_VM"], ) and not current_app.config.get("AWX_VM_CREATION_ENABLED", False): current_app.logger.info("AWX VM creation is disabled. Not sending any request.") return "AWX VM creation not triggered" if not current_app.config.get("AWX_JOB_ENABLED", False): current_app.logger.info("AWX job is disabled. Not sending any request.") return "AWX job not triggered" # Launch the AWX job resource = tower_cli.get_resource("job") result = resource.launch(job_template=job_template, **kwargs) # Save the AWX job id in the task task = models.Task.query.get(rq_job.id) task.awx_job_id = result["id"] db.session.commit() # Monitor the job until done result = resource.monitor(pk=result["id"]) return result