diff --git a/app/commands.py b/app/commands.py index 164cbb0d5eb1339b2eed32d19d8226d9274a9a6a..4a80ad0af5cd131e18747de5984a87c1e3caa71b 100644 --- a/app/commands.py +++ b/app/commands.py @@ -87,6 +87,21 @@ def sync_users(): db.session.commit() +def clean_deferred_tasks(): + """Set all deferred tasks to failed""" + for task in ( + models.Task.query.filter_by(status=models.JobStatus.DEFERRED) + .order_by(models.Task.created_at) + .all() + ): + if task.depends_on is None or task.depends_on.status == models.JobStatus.FAILED: + current_app.logger.info( + f"Set deferred task {task.id} ({task.name}) to failed" + ) + task.status = models.JobStatus.FAILED + db.session.commit() + + def register_cli(app): @app.cli.command() def create_defaults(): @@ -99,6 +114,11 @@ def register_cli(app): db.session.rollback() app.logger.debug(f"{instance} already exists") + @app.cli.command() + def clean_deferred(): + """Set deferred tasks to failed if the task it depends on failed""" + clean_deferred_tasks() + @app.cli.command() def syncusers(): """Synchronize all users from the database with information the LDAP server""" diff --git a/app/models.py b/app/models.py index 8ce4e5d7aae736a456bbabc0652da5ad25e1def6..1d23a0258841a9ebf58144fd98e942061ab499c7 100644 --- a/app/models.py +++ b/app/models.py @@ -38,10 +38,6 @@ from .validators import ( from . import utils, search -# Number of minutes to wait before to consider a deferrred job lost -WAITING_DELAY = 30 - - make_versioned(plugins=[FlaskUserPlugin()]) @@ -392,19 +388,13 @@ class User(db.Model, UserMixin): Waiting means: - queued - - deferred if not older than WAITING_DELAY minutes + - deferred - A deferred task can stay deferred forever if the task it depends on fails. + A deferred task will be set to failed if the task it depends on fails. """ count = ( Task.query.filter_by(name=name) - .filter( - (Task.status == JobStatus.QUEUED) - | ( - (Task.status == JobStatus.DEFERRED) - & (Task.created_at > utils.minutes_ago(WAITING_DELAY)) - ) - ) + .filter(Task.status.in_([JobStatus.DEFERRED, JobStatus.QUEUED])) .count() ) return count > 0 @@ -414,19 +404,13 @@ class User(db.Model, UserMixin): Waiting means: - queued - - deferred if not older than WAITING_DELAY minutes + - deferred - A deferred task can stay deferred forever if the task it depends on fails. + A deferred task will be set to failed if the task it depends on fails. """ return ( Task.query.filter_by(name=name) - .filter( - (Task.status == JobStatus.QUEUED) - | ( - (Task.status == JobStatus.DEFERRED) - & (Task.created_at > utils.minutes_ago(WAITING_DELAY)) - ) - ) + .filter(Task.status.in_([JobStatus.DEFERRED, JobStatus.QUEUED])) .order_by(Task.created_at.desc()) .first() ) @@ -497,7 +481,7 @@ class SearchableMixin(object): @classmethod def after_flush_postexec(cls, session, flush_context): """Retrieve the new and updated objects representation""" - if session._changes is None: + if not hasattr(session, "_changes") or session._changes is None: return # - We can't call obj.to_dict() in the before_flush event because the id # hasn't been allocated yet (for new objects) and other fields haven't been updated @@ -513,7 +497,7 @@ class SearchableMixin(object): @classmethod def after_commit(cls, session): """Update the elasticsearch index""" - if session._changes is None: + if not hasattr(session, "_changes") or session._changes is None: return for index, body in session._changes["add"]: search.add_to_index(index, body) @@ -1703,6 +1687,25 @@ class Task(db.Model): current_app.config["AWX_URL"], f"/#/{route}/{self.awx_job_id}" ) + def update_reverse_dependencies(self): + """Recursively set all reverse dependencies to FAILED + + When a RQ job is set to FAILED, its reverse dependencies will stay to DEFERRED. + This method allows to easily update the corresponding tasks status. + + The tasks are modified but the session is not committed. + """ + + def set_reverse_dependencies_to_failed(task): + for dependency in task.reverse_dependencies: + current_app.logger.info( + f"Setting {dependency.id} ({dependency.name}) to FAILED due to failed dependency" + ) + dependency.status = JobStatus.FAILED + set_reverse_dependencies_to_failed(dependency) + + set_reverse_dependencies_to_failed(self) + def __str__(self): return str(self.id) diff --git a/app/tasks.py b/app/tasks.py index a0b00d7ee3f9dce7257688d709fe63d2a203eb78..c771426496acbd77bd2d869919a3566da1630b5e 100644 --- a/app/tasks.py +++ b/app/tasks.py @@ -64,6 +64,13 @@ class TaskWorker(Worker): setattr(task, name, value) db.session.commit() + def update_reverse_dependencies(self, job): + task = models.Task.query.get(job.id) + if task is None: + return + task.update_reverse_dependencies() + db.session.commit() + def move_to_failed_queue(self, job, *exc_info): # This could be achieved by passing a custom exception handler # when initializing the worker. As we already subclass it, it's @@ -71,6 +78,7 @@ class TaskWorker(Worker): self.update_task_attributes( job, {"ended_at": job.ended_at, "status": models.JobStatus.FAILED} ) + self.update_reverse_dependencies(job) self.save_exception(job, *exc_info) super().move_to_failed_queue(job, *exc_info) diff --git a/tests/functional/test_models.py b/tests/functional/test_models.py index d3448e0ae6660bb0d5b33237588a3cf4d5bccb11..f8eaaf92884e397769af8f04d6299aeacf3bfbda 100644 --- a/tests/functional/test_models.py +++ b/tests/functional/test_models.py @@ -500,8 +500,8 @@ def test_task_waiting(status, user, task_factory): assert user.is_task_waiting("my-task") -@pytest.mark.parametrize("minutes", [5, 10, 29]) -def test_task_waiting_with_recent_deferred(minutes, user, task_factory): +@pytest.mark.parametrize("minutes", [5, 10, 29, 31, 60, 7200]) +def test_task_waiting_with_recent_or_old_deferred(minutes, user, task_factory): minutes_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=minutes) task_factory( created_at=minutes_ago, name="my-task", status=models.JobStatus.DEFERRED @@ -509,15 +509,6 @@ def test_task_waiting_with_recent_deferred(minutes, user, task_factory): assert user.is_task_waiting("my-task") -@pytest.mark.parametrize("minutes", [31, 60, 7200]) -def test_no_task_waiting_with_old_deferred(minutes, user, task_factory): - minutes_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=minutes) - task_factory( - created_at=minutes_ago, name="my-task", status=models.JobStatus.DEFERRED - ) - assert not user.is_task_waiting("my-task") - - @pytest.mark.parametrize("minutes", [5, 30, 7200]) def test_task_waiting_with_old_queued(minutes, user, task_factory): minutes_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=minutes) @@ -549,6 +540,22 @@ def test_get_tasks_in_progress(user, task_factory): assert user.get_tasks_in_progress("my-task") == [task1, task2, task3] +def test_update_task_reverse_dependencies(user, task_factory): + task1 = task_factory(name="my-task", status=models.JobStatus.STARTED) + task2 = task_factory( + name="my-task", status=models.JobStatus.DEFERRED, depends_on=task1 + ) + task3 = task_factory( + name="my-task", status=models.JobStatus.DEFERRED, depends_on=task1 + ) + task4 = task_factory( + name="my-task", status=models.JobStatus.DEFERRED, depends_on=task3 + ) + task1.update_reverse_dependencies() + for task in (task2, task3, task4): + assert task.status == models.JobStatus.FAILED + + def test_host_indexed(db, host_factory): host1 = host_factory(name="myhost") res = db.app.elasticsearch.search(index="host-test", q="*")