Skip to content
Snippets Groups Projects
Commit be6a492c authored by Benjamin Bertrand's avatar Benjamin Bertrand
Browse files

Add task table

Table used to save the RQ background jobs

JIRA INFRA-403
parent 0ec1ffb7
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,7 @@ from flask import current_app ...@@ -17,6 +17,7 @@ from flask import current_app
from .extensions import db, ldap_manager from .extensions import db, ldap_manager
from .defaults import defaults from .defaults import defaults
from .models import User from .models import User
from .tasks import TaskWorker
from . import utils, tokens from . import utils, tokens
...@@ -97,5 +98,5 @@ def register_cli(app): ...@@ -97,5 +98,5 @@ def register_cli(app):
redis_url = current_app.config['REDIS_URL'] redis_url = current_app.config['REDIS_URL']
redis_connection = redis.from_url(redis_url) redis_connection = redis.from_url(redis_url)
with rq.Connection(redis_connection): with rq.Connection(redis_connection):
worker = rq.Worker(current_app.config['QUEUES']) worker = TaskWorker(current_app.config['QUEUES'])
worker.work() worker.work()
...@@ -13,6 +13,7 @@ import ipaddress ...@@ -13,6 +13,7 @@ import ipaddress
import string import string
import qrcode import qrcode
import sqlalchemy as sa import sqlalchemy as sa
from enum import Enum
from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.dialects import postgresql from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import validates from sqlalchemy.orm import validates
...@@ -21,6 +22,7 @@ from citext import CIText ...@@ -21,6 +22,7 @@ from citext import CIText
from flask import current_app from flask import current_app
from flask_login import UserMixin from flask_login import UserMixin
from wtforms import ValidationError from wtforms import ValidationError
from rq import Queue
from .extensions import db, login_manager, ldap_manager, cache from .extensions import db, login_manager, ldap_manager, cache
from .plugins import FlaskUserPlugin from .plugins import FlaskUserPlugin
from .validators import (ICS_ID_RE, HOST_NAME_RE, VLAN_NAME_RE, MAC_ADDRESS_RE, from .validators import (ICS_ID_RE, HOST_NAME_RE, VLAN_NAME_RE, MAC_ADDRESS_RE,
...@@ -135,6 +137,7 @@ class User(db.Model, UserMixin): ...@@ -135,6 +137,7 @@ class User(db.Model, UserMixin):
email = db.Column(db.Text) email = db.Column(db.Text)
groups = db.Column(postgresql.ARRAY(db.Text), default=[]) groups = db.Column(postgresql.ARRAY(db.Text), default=[])
tokens = db.relationship("Token", backref="user") tokens = db.relationship("Token", backref="user")
tasks = db.relationship('Task', backref='user')
# The favorites won't be accessed very often so we load them # The favorites won't be accessed very often so we load them
# only when necessary (lazy=True) # only when necessary (lazy=True)
favorite_manufacturers = db.relationship( favorite_manufacturers = db.relationship(
...@@ -200,6 +203,41 @@ class User(db.Model, UserMixin): ...@@ -200,6 +203,41 @@ class User(db.Model, UserMixin):
self.favorite_actions] self.favorite_actions]
return [favorite for favorites in favorites_list for favorite in favorites] return [favorite for favorites in favorites_list for favorite in favorites]
def launch_task(self, name, func, *args, **kwargs):
"""Launch a task in the background using RQ"""
q = Queue()
job = q.enqueue(f'app.tasks.{func}', *args, **kwargs)
# The status will be set to QUEUED or DEFERRED
task = Task(id=job.id, name=name, command=job.get_call_string(),
status=JobStatus(job.status), user=self)
db.session.add(task)
db.session.commit()
return task
def get_tasks(self, all=False):
"""Return all tasks created by the current user
If the user is admin and all is set to True, will return all tasks
"""
if all and self.is_admin:
return Task.query.order_by(Task.created_at).all()
return Task.query.filter_by(user=self).order_by(Task.created_at).all()
def get_tasks_in_progress(self, name):
"""Return all the <name> tasks not finished or failed"""
return Task.query.filter_by(name=name).filter(
~Task.status.in_([JobStatus.FINISHED, JobStatus.FAILED])).all()
def get_task_started(self, name):
"""Return the <name> task currently running or None"""
return Task.query.filter_by(name=name, status=JobStatus.STARTED).first()
def is_task_waiting(self, name):
"""Return True if a <name> task is waiting (queued or deferred)"""
count = Task.query.filter_by(name=name).filter(
Task.status.in_([JobStatus.DEFERRED, JobStatus.QUEUED])).count()
return count > 0
def __str__(self): def __str__(self):
return self.username return self.username
...@@ -907,6 +945,42 @@ class NetworkScope(CreatedMixin, db.Model): ...@@ -907,6 +945,42 @@ class NetworkScope(CreatedMixin, db.Model):
return d return d
# Define RQ JobStatus as a Python enum
# We can't use the one defined in rq/job.py as it's
# not a real enum (it's a custom one) and is not
# compatible with sqlalchemy
class JobStatus(Enum):
QUEUED = 'queued'
FINISHED = 'finished'
FAILED = 'failed'
STARTED = 'started'
DEFERRED = 'deferred'
class Task(db.Model):
# Use job id generated by RQ
id = db.Column(postgresql.UUID, primary_key=True)
created_at = db.Column(db.DateTime, default=utcnow())
name = db.Column(db.Text, nullable=False, index=True)
command = db.Column(db.Text)
status = db.Column(db.Enum(JobStatus, name='job_status'))
user_id = db.Column(db.Integer, db.ForeignKey('user_account.id'),
nullable=False, default=utils.fetch_current_user_id)
def __str__(self):
return str(self.id)
def to_dict(self):
return {
'id': self.id,
'created_at': utils.format_field(self.created_at),
'name': self.name,
'command': self.command,
'status': self.status,
'user': str(self.user),
}
# call configure_mappers after defining all the models # call configure_mappers after defining all the models
# required by sqlalchemy_continuum # required by sqlalchemy_continuum
sa.orm.configure_mappers() sa.orm.configure_mappers()
......
...@@ -92,8 +92,8 @@ def view_host(name): ...@@ -92,8 +92,8 @@ def view_host(name):
flash(f'Only admin users are allowed to create a VM!', 'info') flash(f'Only admin users are allowed to create a VM!', 'info')
else: else:
interface = host.interfaces[0] interface = host.interfaces[0]
job = tasks.trigger_vm_creation(name, interface, int(form.memory.data) * 1000, form.cores.data) task = tasks.trigger_vm_creation(name, interface, int(form.memory.data) * 1000, form.cores.data)
current_app.logger.info(f'Creation of {name} requested: job {job.get_id()}') current_app.logger.info(f'Creation of {name} requested: task {task.id}')
flash(f'Creation of {name} requested!', 'success') flash(f'Creation of {name} requested!', 'success')
return redirect(url_for('network.view_host', name=name)) return redirect(url_for('network.view_host', name=name))
return render_template('network/view_host.html', host=host, form=form) return render_template('network/view_host.html', host=host, form=form)
......
...@@ -9,9 +9,48 @@ This module implements tasks to run. ...@@ -9,9 +9,48 @@ This module implements tasks to run.
:license: BSD 2-Clause, see LICENSE for more details. :license: BSD 2-Clause, see LICENSE for more details.
""" """
import time
import tower_cli import tower_cli
from flask import current_app from flask import current_app
from rq import Queue from rq import Worker
from .extensions import db
from . import utils, models
class TaskWorker(Worker):
"""
Modified version of the rq worker which updates
the task status in the CSEntry database
"""
def update_task_status(self, job, status):
# The task is created after enqueueing the job.
# If the job is processed very quickly, the task might
# not be found in the database. We wait up to 3 seconds.
for _ in range(3):
task = models.Task.query.get(job.id)
if task is not None:
break
else:
self.log.warning('task not found...')
time.sleep(1)
else:
self.log.error(f'Task {job.id} not found! Task status not updated!')
return
task.status = status
db.session.commit()
def move_to_failed_queue(self, job, *exc_info):
self.update_task_status(job, status=models.JobStatus.FAILED)
super().move_to_failed_queue(job, *exc_info)
def handle_job_success(self, job, queue, started_job_registry):
self.update_task_status(job, status=models.JobStatus.FINISHED)
super().handle_job_success(job, queue, started_job_registry)
def prepare_job_execution(self, job):
self.update_task_status(job, status=models.JobStatus.STARTED)
super().prepare_job_execution(job)
def trigger_vm_creation(name, interface, memory, cores): def trigger_vm_creation(name, interface, memory, cores):
...@@ -26,8 +65,10 @@ def trigger_vm_creation(name, interface, memory, cores): ...@@ -26,8 +65,10 @@ def trigger_vm_creation(name, interface, memory, cores):
f'mac={interface.mac.address}', f'mac={interface.mac.address}',
] ]
if interface.is_ioc: if interface.is_ioc:
task_name = 'trigger_vioc_creation'
job_template = current_app.config['AWX_CREATE_VIOC'] job_template = current_app.config['AWX_CREATE_VIOC']
else: else:
task_name = 'trigger_vm_creation'
job_template = current_app.config['AWX_CREATE_VM'] job_template = current_app.config['AWX_CREATE_VM']
extra_vars.extend([ extra_vars.extend([
f'ip_address={interface.ip}', f'ip_address={interface.ip}',
...@@ -36,15 +77,16 @@ def trigger_vm_creation(name, interface, memory, cores): ...@@ -36,15 +77,16 @@ def trigger_vm_creation(name, interface, memory, cores):
f'netmask={interface.network.netmask}', f'netmask={interface.network.netmask}',
f'gateway={interface.network.gateway}', f'gateway={interface.network.gateway}',
]) ])
q = Queue()
current_app.logger.info(f'Launch new job to create the {name} VM: {job_template} with {extra_vars}') current_app.logger.info(f'Launch new job to create the {name} VM: {job_template} with {extra_vars}')
job = q.enqueue( user = utils.cse_current_user()
launch_job_template, task = user.launch_task(
task_name,
func='launch_job_template',
job_template=job_template, job_template=job_template,
extra_vars=extra_vars, extra_vars=extra_vars,
timeout=500, timeout=500,
) )
return job return task
def trigger_core_services_update(): def trigger_core_services_update():
...@@ -56,19 +98,24 @@ def trigger_core_services_update(): ...@@ -56,19 +98,24 @@ def trigger_core_services_update():
Make sure that we don't have more than one in queue. Make sure that we don't have more than one in queue.
""" """
job_template = current_app.config['AWX_CORE_SERVICES_UPDATE'] job_template = current_app.config['AWX_CORE_SERVICES_UPDATE']
q = Queue() user = utils.cse_current_user()
# Only trigger a new job if none is already waiting in queue if user.is_task_waiting('trigger_core_services_update'):
for job in q.jobs: current_app.logger.info('Already one "trigger_core_services_update" task waiting. No need to trigger a new one.')
if (job.func_name == 'app.tasks.launch_job_template' and return None
job.kwargs.get('job_template', '') == job_template): kwargs = {
current_app.logger.info(f'Already one {job_template} job in queue. No need to trigger a new one.') 'func': 'launch_job_template',
return None 'job_template': job_template
}
started = user.get_task_started('trigger_core_services_update')
if started:
# There is already one running task. Trigger a new one when it's done.
kwargs['depends_on'] = started.id
current_app.logger.info(f'Launch new job to update core services: {job_template}') current_app.logger.info(f'Launch new job to update core services: {job_template}')
job = q.enqueue( task = user.launch_task(
launch_job_template, 'trigger_core_services_update',
job_template=job_template, **kwargs,
) )
return job return task
def launch_job_template(job_template, monitor=True, **kwargs): def launch_job_template(job_template, monitor=True, **kwargs):
......
"""add task table
Revision ID: 7d0d580cdb1a
Revises: f5a605c0c835
Create Date: 2018-06-28 21:01:13.171328
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '7d0d580cdb1a'
down_revision = 'f5a605c0c835'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'task',
sa.Column('id', postgresql.UUID(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('name', sa.Text(), nullable=False),
sa.Column('command', sa.Text(), nullable=True),
sa.Column('status', sa.Enum('QUEUED', 'FINISHED', 'FAILED', 'STARTED', 'DEFERRED', name='job_status'), nullable=True),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['user_account.id'], name=op.f('fk_task_user_id_user_account')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_task'))
)
op.create_index(op.f('ix_task_name'), 'task', ['name'], unique=False)
def downgrade():
op.drop_index(op.f('ix_task_name'), table_name='task')
op.drop_table('task')
op.execute('DROP TYPE job_status')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment