From e605e8b6c041e57ec00bbf67d02a2bc29330cdd2 Mon Sep 17 00:00:00 2001
From: Benjamin Bertrand <benjamin.bertrand@esss.se>
Date: Tue, 9 Oct 2018 09:32:06 +0200
Subject: [PATCH] Switch to after_commit to update the index

1. If there is a rollback, some objects could be updated in the
elasticsearch index but not in the database.
Updating the index after commit ensures this can't happen.

2. In after_flush, not all relationships are updated properly. We need
to retrieve the objects in the after_flush_postexec event.
It can't be done in the after_commit event because we can't emit SQL at
that point.

See https://groups.google.com/forum/#!topic/sqlalchemy/GdB5J8h7YEU

JIRA INFRA-595
---
 app/models.py                   | 60 ++++++++++++++++++++++++---------
 app/search.py                   | 15 ++++-----
 tests/functional/conftest.py    | 10 ++++--
 tests/functional/test_search.py | 12 +++----
 4 files changed, 65 insertions(+), 32 deletions(-)

diff --git a/app/models.py b/app/models.py
index 7c03b28..fd4a0dc 100644
--- a/app/models.py
+++ b/app/models.py
@@ -346,25 +346,51 @@ class SearchableMixin(object):
         )
 
     @classmethod
-    def after_flush(cls, session, flush_context):
-        # Trigger the elasticsearch index update
-        # We don't use the after_commit event because calling
-        # model.to_dict() often requires to load some relationships
-        # which is not possible in the session state during after_commit
+    def before_flush(cls, session, flush_context, instances):
+        """Save the new/modified/deleted objects"""
+        # The session.new / dirty / deleted lists are empty in the after_flush_postexec event.
+        # We need to record them here
+        session._changes = {"add_obj": [], "delete": []}
         for obj in itertools.chain(session.new, session.dirty):
             if isinstance(obj, SearchableMixin):
-                search.add_to_index(
-                    obj.__tablename__
-                    + current_app.config["ELASTICSEARCH_INDEX_SUFFIX"],
-                    obj,
+                index = (
+                    obj.__tablename__ + current_app.config["ELASTICSEARCH_INDEX_SUFFIX"]
                 )
+                current_app.logger.debug(
+                    f"object to add/update in the {index} index: {obj}"
+                )
+                session._changes["add_obj"].append((index, obj))
         for obj in session.deleted:
             if isinstance(obj, SearchableMixin):
-                search.remove_from_index(
-                    obj.__tablename__
-                    + current_app.config["ELASTICSEARCH_INDEX_SUFFIX"],
-                    obj,
+                index = (
+                    obj.__tablename__ + current_app.config["ELASTICSEARCH_INDEX_SUFFIX"]
+                )
+                current_app.logger.debug(
+                    f"object to remove from the {index} index: {obj}"
                 )
+                session._changes["delete"].append((index, obj.id))
+
+    @classmethod
+    def after_flush_postexec(cls, session, flush_context):
+        """Retrieve the new and updated objects representation"""
+        # - We can't call obj.to_dict() in the before_flush event because the id
+        #   hasn't been allocated yet (for new objects) and other fields haven't been updated
+        #   (default values like created_at/updated_at and some relationships).
+        # - We can't call obj.to_dict() in the after_commit event because it would raise:
+        #   sqlalchemy.exc.InvalidRequestError:
+        #   This session is in 'committed' state; no further SQL can be emitted within this transaction.
+        session._changes["add"] = [
+            (index, obj.to_dict()) for index, obj in session._changes["add_obj"]
+        ]
+
+    @classmethod
+    def after_commit(cls, session):
+        """Update the elasticsearch index"""
+        for index, body in session._changes["add"]:
+            search.add_to_index(index, body)
+        for index, id in session._changes["delete"]:
+            search.remove_from_index(index, id)
+        session._changes = None
 
     @classmethod
     def reindex(cls):
@@ -372,7 +398,7 @@ class SearchableMixin(object):
         for obj in cls.query:
             search.add_to_index(
                 cls.__tablename__ + current_app.config["ELASTICSEARCH_INDEX_SUFFIX"],
-                obj,
+                obj.to_dict(),
             )
 
 
@@ -1420,4 +1446,8 @@ def before_flush(session, flush_context, instances):
 sa.orm.configure_mappers()
 ItemVersion = version_class(Item)
 # Set SQLAlchemy event listeners
-db.event.listen(db.session, "after_flush", SearchableMixin.after_flush)
+db.event.listen(db.session, "before_flush", SearchableMixin.before_flush)
+db.event.listen(
+    db.session, "after_flush_postexec", SearchableMixin.after_flush_postexec
+)
+db.event.listen(db.session, "after_commit", SearchableMixin.after_commit)
diff --git a/app/search.py b/app/search.py
index 051692d..e7ce5e0 100644
--- a/app/search.py
+++ b/app/search.py
@@ -12,32 +12,31 @@ This module implements the search interface.
 from flask import current_app
 
 
-def add_to_index(index, model):
-    """Add a model instance to an index"""
+def add_to_index(index, body):
+    """Add a document to an index"""
     if not current_app.elasticsearch:
         return
-    body = model.to_dict()
     # We remove the id key because it's used as the id of the document.
     # Using the model id as document id allows to easily retrieve
     # the model linked to a document
-    del body["id"]
+    id = body.pop("id")
     current_app.elasticsearch.index(
         index=index,
         doc_type="_doc",
-        id=model.id,
+        id=id,
         body=body,
         refresh=current_app.config["ELASTICSEARCH_REFRESH"],
     )
 
 
-def remove_from_index(index, model):
-    """Remove a model from the index"""
+def remove_from_index(index, id):
+    """Remove a document from the index"""
     if not current_app.elasticsearch:
         return
     current_app.elasticsearch.delete(
         index=index,
         doc_type="_doc",
-        id=model.id,
+        id=id,
         refresh=current_app.config["ELASTICSEARCH_REFRESH"],
     )
 
diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py
index 1ff32a3..8b39882 100644
--- a/tests/functional/conftest.py
+++ b/tests/functional/conftest.py
@@ -107,9 +107,13 @@ def session(db, request):
             session.expire_all()
             session.begin_nested()
 
-    # We have to register the after_flush event because we use a specific
-    # session to run the tests (not the same used in models.py)
-    db.event.listen(session(), "after_flush", SearchableMixin.after_flush)
+    # We have to register the before_flush/after_flush_postexec/after_commit events
+    # because we use a specific session to run the tests (not the same used in models.py)
+    db.event.listen(session(), "before_flush", SearchableMixin.before_flush)
+    db.event.listen(
+        session(), "after_flush_postexec", SearchableMixin.after_flush_postexec
+    )
+    db.event.listen(session(), "after_commit", SearchableMixin.after_commit)
     db.session = session
 
     yield session
diff --git a/tests/functional/test_search.py b/tests/functional/test_search.py
index feb15a2..62a0676 100644
--- a/tests/functional/test_search.py
+++ b/tests/functional/test_search.py
@@ -26,17 +26,17 @@ class MyModel:
 
 def test_add_to_index(db):
     model1 = MyModel(2, "foo", "This is a test")
-    search.add_to_index("index-test", model1)
+    search.add_to_index("index-test", model1.to_dict())
     res = db.app.elasticsearch.get(index="index-test", doc_type="_doc", id=2)
     assert res["_source"] == {"name": "foo", "description": "This is a test"}
 
 
 def test_remove_from_index(db):
     model1 = MyModel(3, "hello world!")
-    search.add_to_index("index-test", model1)
+    search.add_to_index("index-test", model1.to_dict())
     res = db.app.elasticsearch.search(index="index-test", q="*")
     assert res["hits"]["total"] == 1
-    search.remove_from_index("index-test", model1)
+    search.remove_from_index("index-test", model1.id)
     res = db.app.elasticsearch.search(index="index-test", q="*")
     assert res["hits"]["total"] == 0
 
@@ -44,14 +44,14 @@ def test_remove_from_index(db):
 def test_remove_from_index_non_existing():
     model1 = MyModel(1, "hello world!")
     with pytest.raises(elasticsearch.NotFoundError):
-        search.remove_from_index("index-test", model1)
+        search.remove_from_index("index-test", model1.id)
 
 
 def test_query_index():
     model1 = MyModel(1, "Python", "Python is my favorite language")
-    search.add_to_index("index-test", model1)
+    search.add_to_index("index-test", model1.to_dict())
     model1 = MyModel(2, "Java", "Your should switch to Python!")
-    search.add_to_index("index-test", model1)
+    search.add_to_index("index-test", model1.to_dict())
     # Test query all
     ids, total = search.query_index("index-test", "*")
     assert sorted(ids) == [1, 2]
-- 
GitLab