From 8908e200fb5de0a70eb211d41e080379f73cd363 Mon Sep 17 00:00:00 2001
From: Matt Clarke <matt.clarke@ess.eu>
Date: Mon, 2 Dec 2024 14:16:51 +0100
Subject: [PATCH 01/11] added boolean for writing buffered or not

---
 .../messages_before_and_after_data.json       | 22 ++++++++++++-----
 .../test_messages_before_and_after.py         |  4 ++--
 src/Stream/MessageWriter.cpp                  | 10 ++++----
 src/Stream/MessageWriter.h                    |  5 ++--
 src/Stream/SourceFilter.cpp                   |  8 +++----
 src/Stream/SourceFilter.h                     |  3 ++-
 src/WriterModule/ad00/ad00_Writer.cpp         |  3 ++-
 src/WriterModule/ad00/ad00_Writer.h           |  3 ++-
 src/WriterModule/al00/al00_Writer.cpp         |  3 ++-
 src/WriterModule/al00/al00_Writer.h           |  3 ++-
 src/WriterModule/da00/da00_Writer.cpp         |  3 ++-
 src/WriterModule/da00/da00_Writer.h           |  3 ++-
 src/WriterModule/ep01/ep01_Writer.cpp         |  3 ++-
 src/WriterModule/ep01/ep01_Writer.h           |  3 ++-
 src/WriterModule/ev44/ev44_Writer.cpp         |  3 ++-
 src/WriterModule/ev44/ev44_Writer.h           |  3 ++-
 src/WriterModule/f144/f144_Writer.cpp         |  3 ++-
 src/WriterModule/f144/f144_Writer.h           |  3 ++-
 src/WriterModule/se00/se00_Writer.cpp         |  3 ++-
 src/WriterModule/se00/se00_Writer.h           |  3 ++-
 src/WriterModule/tdct/tdct_Writer.cpp         |  3 ++-
 src/WriterModule/tdct/tdct_Writer.h           |  3 ++-
 src/WriterModule/template/TemplateWriter.h    |  2 +-
 src/WriterModuleBase.h                        |  8 ++++---
 tests/SourceTests.cpp                         |  2 +-
 tests/Stream/MessageWriterTests.cpp           | 10 ++++----
 tests/Stream/SourceFilterTest.cpp             |  4 ++--
 tests/WriterModule/TemplateWriterTests.cpp    |  2 +-
 tests/WriterModule/al00_WriterTests.cpp       |  2 +-
 tests/WriterModule/da00_WriterTests.cpp       |  6 ++---
 tests/WriterModule/ep01_WriterTests.cpp       |  2 +-
 tests/WriterModule/ev44_WriterTests.cpp       | 24 +++++++++----------
 tests/WriterModule/f144_WriterTests.cpp       |  7 +++---
 tests/WriterModule/se00_WriterTests.cpp       | 10 ++++----
 tests/WriterModule/tdct_WriterTests.cpp       |  6 ++---
 tests/helpers/StubWriterModule.h              |  2 +-
 36 files changed, 110 insertions(+), 77 deletions(-)

diff --git a/domain-tests/data_files/messages_before_and_after_data.json b/domain-tests/data_files/messages_before_and_after_data.json
index da4e87128..91f5f8e50 100644
--- a/domain-tests/data_files/messages_before_and_after_data.json
+++ b/domain-tests/data_files/messages_before_and_after_data.json
@@ -102,31 +102,41 @@
   {
     "schema": "ev44",
     "topic": "local_detector",
-    "kafka_timestamp": 11000,
+    "kafka_timestamp": 10000,
     "source_name": "detector_events",
     "message_id": 667,
-    "reference_time": 11000,
+    "reference_time": 10000,
     "time_of_flight": [50, 60, 70, 80],
     "pixel_ids": [1, 2, 3, 4]
   },
   {
     "schema": "ev44",
     "topic": "local_detector",
-    "kafka_timestamp": 16000,
+    "kafka_timestamp": 12000,
     "source_name": "detector_events",
     "message_id": 668,
-    "reference_time": 16000,
+    "reference_time": 12000,
     "time_of_flight": [90, 100, 110, 120],
     "pixel_ids": [1, 2, 3, 4]
   },
   {
     "schema": "ev44",
     "topic": "local_detector",
-    "kafka_timestamp": 17000,
+    "kafka_timestamp": 16000,
     "source_name": "detector_events",
     "message_id": 669,
-    "reference_time": 17000,
+    "reference_time": 16000,
     "time_of_flight": [130, 140, 150, 160],
     "pixel_ids": [1, 2, 3, 4]
+  },
+  {
+    "schema": "ev44",
+    "topic": "local_detector",
+    "kafka_timestamp": 17000,
+    "source_name": "detector_events",
+    "message_id": 670,
+    "reference_time": 17000,
+    "time_of_flight": [170, 180, 190, 200],
+    "pixel_ids": [1, 2, 3, 4]
   }
 ]
diff --git a/domain-tests/test_messages_before_and_after.py b/domain-tests/test_messages_before_and_after.py
index 4a858cb66..ec474814b 100644
--- a/domain-tests/test_messages_before_and_after.py
+++ b/domain-tests/test_messages_before_and_after.py
@@ -27,11 +27,11 @@ def test_first_f144_data_after_stop_written_but_later_values_ignored(local_file)
         assert f["/entry/instrument/chopper/delay/time"][~0] == 16_000_000_000
 
 
-def test_last_ev44_data_before_start_is_written_but_earlier_values_ignored(local_file):
+def test_ev44_data_before_start_is_not_written(local_file):
     with h5py.File(local_file, "r") as f:
         assert (
             f["/entry/instrument/event_detector/events/event_time_zero"][0]
-            == 9_000_000_000
+            == 10_000_000_000
         )
 
 
diff --git a/src/Stream/MessageWriter.cpp b/src/Stream/MessageWriter.cpp
index 2625bf879..f02e36a43 100644
--- a/src/Stream/MessageWriter.cpp
+++ b/src/Stream/MessageWriter.cpp
@@ -51,16 +51,18 @@ MessageWriter::~MessageWriter() {
   }
 }
 
-void MessageWriter::addMessage(Message const &Msg) {
-  WriteJobs.enqueue([=]() { writeMsgImpl(Msg.DestPtr, Msg.FbMsg); });
+void MessageWriter::addMessage(Message const &Msg, bool is_buffered_message) {
+  WriteJobs.enqueue(
+      [=]() { writeMsgImpl(Msg.DestPtr, Msg.FbMsg, is_buffered_message); });
 }
 
 void MessageWriter::stop() { RunThread.store(false); }
 
 void MessageWriter::writeMsgImpl(WriterModule::Base *ModulePtr,
-                                 FileWriter::FlatbufferMessage const &Msg) {
+                                 FileWriter::FlatbufferMessage const &Msg,
+                                 bool is_buffered_message) {
   try {
-    ModulePtr->write(Msg);
+    ModulePtr->write(Msg, is_buffered_message);
     WritesDone++;
   } catch (WriterModule::WriterException &E) {
     WriteErrors++;
diff --git a/src/Stream/MessageWriter.h b/src/Stream/MessageWriter.h
index 9ef768597..e7e75687c 100644
--- a/src/Stream/MessageWriter.h
+++ b/src/Stream/MessageWriter.h
@@ -38,7 +38,7 @@ public:
 
   virtual ~MessageWriter();
 
-  virtual void addMessage(Message const &Msg);
+  virtual void addMessage(Message const &Msg, bool is_buffered_message);
 
   /// \brief Tell the writer thread to stop.
   ///
@@ -60,7 +60,8 @@ public:
 
 protected:
   virtual void writeMsgImpl(WriterModule::Base *ModulePtr,
-                            FileWriter::FlatbufferMessage const &Msg);
+                            FileWriter::FlatbufferMessage const &Msg,
+                            bool is_buffered_message);
   virtual void threadFunction();
 
   virtual void flushData() { FlushDataFunction(); };
diff --git a/src/Stream/SourceFilter.cpp b/src/Stream/SourceFilter.cpp
index 77a5cd282..5defc44cf 100644
--- a/src/Stream/SourceFilter.cpp
+++ b/src/Stream/SourceFilter.cpp
@@ -36,7 +36,7 @@ bool SourceFilter::has_finished() const { return _is_finished; }
 
 void SourceFilter::forward_buffered_message() {
   if (_buffered_message.isValid()) {
-    forward_message(_buffered_message);
+    forward_message(_buffered_message, true);
     _buffered_message = FileWriter::FlatbufferMessage();
   }
 }
@@ -92,11 +92,11 @@ bool SourceFilter::filter_message(
   return true;
 }
 
-void SourceFilter::forward_message(
-    FileWriter::FlatbufferMessage const &message) {
+void SourceFilter::forward_message(FileWriter::FlatbufferMessage const &message,
+                                   bool is_buffered_message) {
   ++MessagesTransmitted;
   for (auto const &writer_module : _destination_writer_modules) {
-    _writer->addMessage({writer_module, message});
+    _writer->addMessage({writer_module, message}, is_buffered_message);
   }
 }
 
diff --git a/src/Stream/SourceFilter.h b/src/Stream/SourceFilter.h
index 60b63f61a..443ce20da 100644
--- a/src/Stream/SourceFilter.h
+++ b/src/Stream/SourceFilter.h
@@ -58,7 +58,8 @@ public:
   }
 
 private:
-  void forward_message(FileWriter::FlatbufferMessage const &message);
+  void forward_message(FileWriter::FlatbufferMessage const &message,
+                       bool is_buffered_message = false);
   void forward_buffered_message();
   time_point _start_time;
   time_point _stop_time;
diff --git a/src/WriterModule/ad00/ad00_Writer.cpp b/src/WriterModule/ad00/ad00_Writer.cpp
index 10524503b..39ab476d4 100644
--- a/src/WriterModule/ad00/ad00_Writer.cpp
+++ b/src/WriterModule/ad00/ad00_Writer.cpp
@@ -139,7 +139,8 @@ void msgTypeIsConfigType(ad00_Writer::Type ConfigType, DType MsgType) {
   }
 }
 
-void ad00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message) {
+void ad00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
+                            [[maybe_unused]] bool is_buffered_message) {
   auto ad00 = Getad00_ADArray(Message.data());
   auto DataShape =
       hdf5::Dimensions(ad00->dimensions()->begin(), ad00->dimensions()->end());
diff --git a/src/WriterModule/ad00/ad00_Writer.h b/src/WriterModule/ad00/ad00_Writer.h
index 1975a35b1..4d2ed1c9c 100644
--- a/src/WriterModule/ad00/ad00_Writer.h
+++ b/src/WriterModule/ad00/ad00_Writer.h
@@ -33,7 +33,8 @@ public:
 
   InitResult reopen(hdf5::node::Group &HDFGroup) override;
 
-  void writeImpl(FileWriter::FlatbufferMessage const &Message) override;
+  void writeImpl(FileWriter::FlatbufferMessage const &Message,
+                 bool is_buffered_message) override;
 
   enum class Type {
     int8,
diff --git a/src/WriterModule/al00/al00_Writer.cpp b/src/WriterModule/al00/al00_Writer.cpp
index ac56b12ac..17e98caff 100644
--- a/src/WriterModule/al00/al00_Writer.cpp
+++ b/src/WriterModule/al00/al00_Writer.cpp
@@ -54,7 +54,8 @@ InitResult al00_Writer::reopen(hdf5::node::Group &HDFGroup) {
   return InitResult::OK;
 }
 
-void al00_Writer::writeImpl(FlatbufferMessage const &Message) {
+void al00_Writer::writeImpl(FlatbufferMessage const &Message,
+                            [[maybe_unused]] bool is_buffered_message) {
   auto AlarmMessage = GetAlarm(Message.data());
 
   AlarmTime.appendElement(AlarmMessage->timestamp());
diff --git a/src/WriterModule/al00/al00_Writer.h b/src/WriterModule/al00/al00_Writer.h
index 87ce75656..52bb2c21e 100644
--- a/src/WriterModule/al00/al00_Writer.h
+++ b/src/WriterModule/al00/al00_Writer.h
@@ -33,7 +33,8 @@ public:
   WriterModule::InitResult reopen(hdf5::node::Group &HDFGroup) override;
 
   /// Write an incoming message which should contain a flatbuffer.
-  void writeImpl(FlatbufferMessage const &Message) override;
+  void writeImpl(FlatbufferMessage const &Message,
+                 bool is_buffered_message) override;
 
   al00_Writer() : WriterModule::Base("al00", false, "NXlog") {}
   ~al00_Writer() override = default;
diff --git a/src/WriterModule/da00/da00_Writer.cpp b/src/WriterModule/da00/da00_Writer.cpp
index eb94cb8a5..3c15c29ac 100644
--- a/src/WriterModule/da00/da00_Writer.cpp
+++ b/src/WriterModule/da00/da00_Writer.cpp
@@ -330,7 +330,8 @@ InitResult da00_Writer::reopen(hdf5::node::Group &HDFGroup) {
   return InitResult::OK;
 }
 
-void da00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message) {
+void da00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
+                            [[maybe_unused]] bool is_buffered_message) {
   const auto da00_obj = Getda00_DataArray(Message.data());
   if (isFirstMessage) {
     handle_first_message(da00_obj);
diff --git a/src/WriterModule/da00/da00_Writer.h b/src/WriterModule/da00/da00_Writer.h
index c07d987bb..1f0367840 100644
--- a/src/WriterModule/da00/da00_Writer.h
+++ b/src/WriterModule/da00/da00_Writer.h
@@ -36,7 +36,8 @@ public:
 
   InitResult reopen(hdf5::node::Group &HDFGroup) override;
 
-  void writeImpl(FileWriter::FlatbufferMessage const &Message) override;
+  void writeImpl(FileWriter::FlatbufferMessage const &Message,
+                 bool is_buffered_message) override;
 
   NeXusDataset::Time Timestamp;
   NeXusDataset::CueIndex CueIndex;
diff --git a/src/WriterModule/ep01/ep01_Writer.cpp b/src/WriterModule/ep01/ep01_Writer.cpp
index 6b13d5786..b0cf09150 100644
--- a/src/WriterModule/ep01/ep01_Writer.cpp
+++ b/src/WriterModule/ep01/ep01_Writer.cpp
@@ -35,7 +35,8 @@ InitResult ep01_Writer::init_hdf(hdf5::node::Group &HDFGroup) {
   return InitResult::OK;
 }
 
-void ep01_Writer::writeImpl(FileWriter::FlatbufferMessage const &Message) {
+void ep01_Writer::writeImpl(FileWriter::FlatbufferMessage const &Message,
+                            [[maybe_unused]] bool is_buffered_message) {
   auto FlatBuffer = GetEpicsPVConnectionInfo(Message.data());
   std::int16_t Status = static_cast<std::int16_t>(FlatBuffer->status());
   StatusDataset.appendElement(Status);
diff --git a/src/WriterModule/ep01/ep01_Writer.h b/src/WriterModule/ep01/ep01_Writer.h
index de0537c92..2013876ce 100644
--- a/src/WriterModule/ep01/ep01_Writer.h
+++ b/src/WriterModule/ep01/ep01_Writer.h
@@ -9,7 +9,8 @@ class ep01_Writer final : public WriterModule::Base {
 public:
   InitResult init_hdf(hdf5::node::Group &HDFGroup) override;
   InitResult reopen(hdf5::node::Group &HDFGroup) override;
-  void writeImpl(FileWriter::FlatbufferMessage const &Message) override;
+  void writeImpl(FileWriter::FlatbufferMessage const &Message,
+                 bool is_buffered_message) override;
 
   ep01_Writer() : WriterModule::Base("ep01", false, "NXlog") {}
   ~ep01_Writer() override = default;
diff --git a/src/WriterModule/ev44/ev44_Writer.cpp b/src/WriterModule/ev44/ev44_Writer.cpp
index b7a308846..4458a17fe 100644
--- a/src/WriterModule/ev44/ev44_Writer.cpp
+++ b/src/WriterModule/ev44/ev44_Writer.cpp
@@ -89,7 +89,8 @@ WriterModule::InitResult ev44_Writer::reopen(hdf5::node::Group &HDFGroup) {
   return WriterModule::InitResult::OK;
 }
 
-void ev44_Writer::writeImpl(FlatbufferMessage const &Message) {
+void ev44_Writer::writeImpl(FlatbufferMessage const &Message,
+                            [[maybe_unused]] bool is_buffered_message) {
   auto EventMsgFlatbuffer = GetEvent44Message(Message.data());
   auto CurrentNumberOfEvents = EventMsgFlatbuffer->time_of_flight()->size();
   if (EventMsgFlatbuffer->pixel_id()->size() > 0 &&
diff --git a/src/WriterModule/ev44/ev44_Writer.h b/src/WriterModule/ev44/ev44_Writer.h
index 0850818dc..850378082 100644
--- a/src/WriterModule/ev44/ev44_Writer.h
+++ b/src/WriterModule/ev44/ev44_Writer.h
@@ -27,7 +27,8 @@ public:
   /// \brief Write flatbuffer message.
   ///
   /// \param FlatBufferMessage
-  void writeImpl(FlatbufferMessage const &Message) override;
+  void writeImpl(FlatbufferMessage const &Message,
+                 bool is_buffered_message) override;
 
   NeXusDataset::EventTimeOffset EventTimeOffset;
   NeXusDataset::EventId EventId;
diff --git a/src/WriterModule/f144/f144_Writer.cpp b/src/WriterModule/f144/f144_Writer.cpp
index 7c21d83f0..ae7e15ab6 100644
--- a/src/WriterModule/f144/f144_Writer.cpp
+++ b/src/WriterModule/f144/f144_Writer.cpp
@@ -194,7 +194,8 @@ void msgTypeIsConfigType(f144_Writer::Type ConfigType, Value MsgType) {
   }
 }
 
-void f144_Writer::writeImpl(FlatbufferMessage const &Message) {
+void f144_Writer::writeImpl(FlatbufferMessage const &Message,
+                            [[maybe_unused]] bool is_buffered_message) {
   auto LogDataMessage = Getf144_LogData(Message.data());
   Timestamp.appendElement(LogDataMessage->timestamp());
   auto Type = LogDataMessage->value_type();
diff --git a/src/WriterModule/f144/f144_Writer.h b/src/WriterModule/f144/f144_Writer.h
index 1841d7941..f31dba956 100644
--- a/src/WriterModule/f144/f144_Writer.h
+++ b/src/WriterModule/f144/f144_Writer.h
@@ -39,7 +39,8 @@ public:
   WriterModule::InitResult reopen(hdf5::node::Group &HDFGroup) override;
 
   /// Write an incoming message which should contain a flatbuffer.
-  void writeImpl(FlatbufferMessage const &Message) override;
+  void writeImpl(FlatbufferMessage const &Message,
+                 bool is_buffered_message) override;
 
   f144_Writer()
       : WriterModule::Base("f144", false, "NXlog",
diff --git a/src/WriterModule/se00/se00_Writer.cpp b/src/WriterModule/se00/se00_Writer.cpp
index 0880a4892..1514eeec0 100644
--- a/src/WriterModule/se00/se00_Writer.cpp
+++ b/src/WriterModule/se00/se00_Writer.cpp
@@ -132,7 +132,8 @@ void msgTypeIsConfigType(se00_Writer::Type ConfigType, ValueUnion MsgType) {
   }
 }
 
-void se00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message) {
+void se00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
+                            [[maybe_unused]] bool is_buffered_message) {
   auto FbPointer = Getse00_SampleEnvironmentData(Message.data());
   auto CueIndexValue = Value->current_size();
   auto ValuesType = FbPointer->values_type();
diff --git a/src/WriterModule/se00/se00_Writer.h b/src/WriterModule/se00/se00_Writer.h
index 43a770d13..175db231a 100644
--- a/src/WriterModule/se00/se00_Writer.h
+++ b/src/WriterModule/se00/se00_Writer.h
@@ -44,7 +44,8 @@ public:
 
   InitResult reopen(hdf5::node::Group &HDFGroup) override;
 
-  void writeImpl(FlatbufferMessage const &Message) override;
+  void writeImpl(FlatbufferMessage const &Message,
+                 bool is_buffered_message) override;
 
   enum class Type {
     int8,
diff --git a/src/WriterModule/tdct/tdct_Writer.cpp b/src/WriterModule/tdct/tdct_Writer.cpp
index fa0551a64..e7218b18c 100644
--- a/src/WriterModule/tdct/tdct_Writer.cpp
+++ b/src/WriterModule/tdct/tdct_Writer.cpp
@@ -64,7 +64,8 @@ WriterModule::InitResult tdct_Writer::reopen(hdf5::node::Group &HDFGroup) {
   return WriterModule::InitResult::OK;
 }
 
-void tdct_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message) {
+void tdct_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
+                            [[maybe_unused]] bool is_buffered_message) {
   auto FbPointer = Gettimestamp(Message.data());
   auto TempTimePtr = FbPointer->timestamps()->data();
   auto TempTimeSize = FbPointer->timestamps()->size();
diff --git a/src/WriterModule/tdct/tdct_Writer.h b/src/WriterModule/tdct/tdct_Writer.h
index d82d770b3..f82f7c350 100644
--- a/src/WriterModule/tdct/tdct_Writer.h
+++ b/src/WriterModule/tdct/tdct_Writer.h
@@ -34,7 +34,8 @@ public:
 
   InitResult reopen(hdf5::node::Group &HDFGroup) override;
 
-  void writeImpl(FlatbufferMessage const &Message) override;
+  void writeImpl(FlatbufferMessage const &Message,
+                 bool is_buffered_message) override;
 
 protected:
   NeXusDataset::Time Timestamp;
diff --git a/src/WriterModule/template/TemplateWriter.h b/src/WriterModule/template/TemplateWriter.h
index 867e15f3d..c0192f316 100644
--- a/src/WriterModule/template/TemplateWriter.h
+++ b/src/WriterModule/template/TemplateWriter.h
@@ -186,7 +186,7 @@ public:
   /// \param Message The structure containing a pointer to a buffer
   /// containing data received from the Kafka broker and the size of the buffer.
   // cppcheck-suppress functionStatic
-  void writeImpl(FileWriter::FlatbufferMessage const &/*Message*/) override {
+  void writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const &message, [[maybe_unused]]bool is_buffered_message) override {
     std::cout << "WriterClass::writeImpl()\n";
   }
 };
diff --git a/src/WriterModuleBase.h b/src/WriterModuleBase.h
index b0f98ddc3..fd0ed03a3 100644
--- a/src/WriterModuleBase.h
+++ b/src/WriterModuleBase.h
@@ -111,15 +111,17 @@ public:
   /// \brief Increment write counter and call the subclass-specific write logic.
   ///
   /// \param msg The message to process
-  void write(FileWriter::FlatbufferMessage const &Message) {
-    writeImpl(Message);
+  void write(FileWriter::FlatbufferMessage const &Message,
+             bool is_buffered_message) {
+    writeImpl(Message, is_buffered_message);
     WriteCount++;
   }
 
   /// \brief Process the message in some way, for example write to the HDF file.
   ///
   /// \param msg The message to process
-  virtual void writeImpl(FileWriter::FlatbufferMessage const &Message) = 0;
+  virtual void writeImpl(FileWriter::FlatbufferMessage const &Message,
+                         bool is_buffered_message) = 0;
 
   void registerField(JsonConfig::FieldBase *Ptr) {
     ConfigHandler.registerField(Ptr);
diff --git a/tests/SourceTests.cpp b/tests/SourceTests.cpp
index f2d8c5d30..efc2a24c4 100644
--- a/tests/SourceTests.cpp
+++ b/tests/SourceTests.cpp
@@ -34,7 +34,7 @@ public:
 
 class WriterModuleMock : public StubWriterModule {
 public:
-  MAKE_MOCK1(writeImpl, void(FlatbufferMessage const &), override);
+  MAKE_MOCK2(writeImpl, void(FlatbufferMessage const &, bool), override);
 };
 
 TEST_F(SourceTests, ConstructorSetsMembers) {
diff --git a/tests/Stream/MessageWriterTests.cpp b/tests/Stream/MessageWriterTests.cpp
index bff7af812..d4155a233 100644
--- a/tests/Stream/MessageWriterTests.cpp
+++ b/tests/Stream/MessageWriterTests.cpp
@@ -25,7 +25,7 @@ public:
   ~WriterModuleStandIn() = default;
   MAKE_MOCK1(init_hdf, WriterModule::InitResult(hdf5::node::Group &), override);
   MAKE_MOCK1(reopen, WriterModule::InitResult(hdf5::node::Group &), override);
-  MAKE_MOCK1(writeImpl, void(FileWriter::FlatbufferMessage const &), override);
+  MAKE_MOCK2(writeImpl, void(FileWriter::FlatbufferMessage const &, bool), override);
 };
 
 class DataMessageWriterTest : public ::testing::Test {
@@ -79,7 +79,7 @@ TEST_F(DataMessageWriterTest, EnableExtraModule) {
 }
 
 TEST_F(DataMessageWriterTest, WriteMessageSuccess) {
-  REQUIRE_CALL(WriterModule, writeImpl(_)).TIMES(1);
+  REQUIRE_CALL(WriterModule, writeImpl(_, _)).TIMES(1);
   auto InitialWriteCount = WriterModule.getWriteCount();
   FileWriter::FlatbufferMessage Msg;
   Stream::Message SomeMessage(
@@ -87,7 +87,7 @@ TEST_F(DataMessageWriterTest, WriteMessageSuccess) {
   {
     Stream::MessageWriter Writer{
         []() {}, 1s, std::make_unique<Metrics::Registrar>("some_prefix")};
-    Writer.addMessage(SomeMessage);
+    Writer.addMessage(SomeMessage, false);
     Writer.runJob([&Writer]() {
       EXPECT_TRUE(Writer.nrOfWritesDone() == 1);
       EXPECT_TRUE(Writer.nrOfWriteErrors() == 0);
@@ -97,7 +97,7 @@ TEST_F(DataMessageWriterTest, WriteMessageSuccess) {
 }
 
 TEST_F(DataMessageWriterTest, WriteMessageExceptionUnknownFb) {
-  REQUIRE_CALL(WriterModule, writeImpl(_))
+  REQUIRE_CALL(WriterModule, writeImpl(_, _))
       .TIMES(1)
       .THROW(WriterModule::WriterException("Some error."));
   auto InitialWriteCount = WriterModule.getWriteCount();
@@ -110,7 +110,7 @@ TEST_F(DataMessageWriterTest, WriteMessageExceptionUnknownFb) {
     Writer.runJob([&Writer]() {
       EXPECT_TRUE(Writer.nrOfWriterModulesWithErrors() == 1);
     });
-    Writer.addMessage(SomeMessage);
+    Writer.addMessage(SomeMessage, false);
     Writer.runJob([&Writer]() {
       EXPECT_TRUE(Writer.nrOfWritesDone() == 0);
       EXPECT_TRUE(Writer.nrOfWriteErrors() == 1);
diff --git a/tests/Stream/SourceFilterTest.cpp b/tests/Stream/SourceFilterTest.cpp
index 1f4d0c562..25d358bd7 100644
--- a/tests/Stream/SourceFilterTest.cpp
+++ b/tests/Stream/SourceFilterTest.cpp
@@ -17,8 +17,8 @@ class StubMessageWriter : public Stream::MessageWriter {
 public:
   StubMessageWriter()
       : MessageWriter([]() {}, 1s, std::make_unique<Metrics::Registrar>("")) {}
-  void addMessage(Stream::Message const &Msg) override {
-    messages_received.emplace_back(Msg);
+  void addMessage(Stream::Message const &message, [[maybe_unused]] bool is_buffered_message) override {
+    messages_received.emplace_back(message);
   }
   std::vector<Stream::Message> messages_received;
 };
diff --git a/tests/WriterModule/TemplateWriterTests.cpp b/tests/WriterModule/TemplateWriterTests.cpp
index 09f9e2037..f23a9e7b6 100644
--- a/tests/WriterModule/TemplateWriterTests.cpp
+++ b/tests/WriterModule/TemplateWriterTests.cpp
@@ -15,5 +15,5 @@ TEST(TemplateTests, WriterReturnValues) {
   hdf5::node::Group SomeGroup;
   EXPECT_TRUE(SomeWriter.init_hdf(SomeGroup) == WriterModule::InitResult::OK);
   EXPECT_TRUE(SomeWriter.reopen(SomeGroup) == WriterModule::InitResult::OK);
-  EXPECT_NO_THROW(SomeWriter.write(FileWriter::FlatbufferMessage()));
+  EXPECT_NO_THROW(SomeWriter.write(FileWriter::FlatbufferMessage(), false));
 }
diff --git a/tests/WriterModule/al00_WriterTests.cpp b/tests/WriterModule/al00_WriterTests.cpp
index d0c103b99..5e9f6d9c8 100644
--- a/tests/WriterModule/al00_WriterTests.cpp
+++ b/tests/WriterModule/al00_WriterTests.cpp
@@ -89,7 +89,7 @@ TEST_F(EPICS_AlarmWriter, WriteDataOnce) {
   EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK);
   EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK);
   FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize);
-  EXPECT_NO_THROW(Writer.write(TestMsg));
+  EXPECT_NO_THROW(Writer.write(TestMsg, false));
   auto AlarmMsgDataset = UsedGroup.get_dataset("alarm_message");
   auto AlarmSeverityDataset = UsedGroup.get_dataset("alarm_severity");
   auto AlarmTimeDataset = UsedGroup.get_dataset("alarm_time");
diff --git a/tests/WriterModule/da00_WriterTests.cpp b/tests/WriterModule/da00_WriterTests.cpp
index c37d0bc7c..3d46bfe67 100644
--- a/tests/WriterModule/da00_WriterTests.cpp
+++ b/tests/WriterModule/da00_WriterTests.cpp
@@ -747,7 +747,7 @@ TEST_F(da00_WriterTestFixture, da00_WriterWriteExtendsShape) {
     writer.init_hdf(_group);
     writer.reopen(_group);
     for (int i = 0; i < write_count; ++i)
-      EXPECT_NO_THROW(writer.write(message));
+      EXPECT_NO_THROW(writer.write(message, false));
     EXPECT_EQ(write_count, writer.Timestamp.current_size());
   }
   for (const auto &name :
@@ -774,7 +774,7 @@ TEST_F(da00_WriterTestFixture, da00_WriterWriteCopiesData) {
     writer.reopen(_group);
     constexpr int write_count{10};
     for (int i = 0; i < write_count; ++i)
-      EXPECT_NO_THROW(writer.write(message));
+      EXPECT_NO_THROW(writer.write(message, false));
     EXPECT_EQ(write_count, writer.Timestamp.current_size());
   }
   const auto shape = Simple(_group.get_dataset("signal").dataspace());
@@ -797,7 +797,7 @@ TEST_F(da00_WriterTestFixture, da00_WriterFillsInAttributes) {
     writer.reopen(_group);
     constexpr int write_count{10};
     for (int i = 0; i < write_count; ++i)
-      EXPECT_NO_THROW(writer.write(message));
+      EXPECT_NO_THROW(writer.write(message, false));
     EXPECT_EQ(write_count, writer.Timestamp.current_size());
   }
   for (const auto &name : {"signal", "signal_error", "gain", "x", "y", "time",
diff --git a/tests/WriterModule/ep01_WriterTests.cpp b/tests/WriterModule/ep01_WriterTests.cpp
index 088bf7c2c..40664fc91 100644
--- a/tests/WriterModule/ep01_WriterTests.cpp
+++ b/tests/WriterModule/ep01_WriterTests.cpp
@@ -87,7 +87,7 @@ TEST_F(EPICS_ConStatusWriter, WriteDataOnce) {
   EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK);
   EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK);
   FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize);
-  EXPECT_NO_THROW(Writer.write(TestMsg));
+  EXPECT_NO_THROW(Writer.write(TestMsg, false));
   auto ConStatusTimeDataset = UsedGroup.get_dataset("connection_status_time");
   auto ConStatusDataset = UsedGroup.get_dataset("connection_status");
   auto FbPointer = GetEpicsPVConnectionInfo(TestMsg.data());
diff --git a/tests/WriterModule/ev44_WriterTests.cpp b/tests/WriterModule/ev44_WriterTests.cpp
index 748f7de19..f2e7b1f86 100644
--- a/tests/WriterModule/ev44_WriterTests.cpp
+++ b/tests/WriterModule/ev44_WriterTests.cpp
@@ -163,7 +163,7 @@ TEST_F(Event44WriterTests,
     WriterModule::ev44::ev44_Writer Writer;
     EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK);
     EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK);
-    EXPECT_NO_THROW(Writer.write(TestMessage));
+    EXPECT_NO_THROW(Writer.write(TestMessage, false));
   } // These braces are required due to "h5.cpp"
 
   // Read data from the file
@@ -217,7 +217,7 @@ TEST_F(Event44WriterTests, WriterSuccessfullyRecordsEventDataWithoutPixelIds) {
     WriterModule::ev44::ev44_Writer Writer;
     EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK);
     EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK);
-    EXPECT_NO_THROW(Writer.write(TestMessage));
+    EXPECT_NO_THROW(Writer.write(TestMessage, false));
   } // These braces are required due to "h5.cpp"
 
   // Read data from the file
@@ -280,8 +280,8 @@ TEST_F(Event44WriterTests, WriterSuccessfullyRecordsEventDataFromTwoMessages) {
     WriterModule::ev44::ev44_Writer Writer;
     EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK);
     EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK);
-    EXPECT_NO_THROW(Writer.write(TestMessage1));
-    EXPECT_NO_THROW(Writer.write(TestMessage2));
+    EXPECT_NO_THROW(Writer.write(TestMessage1, false));
+    EXPECT_NO_THROW(Writer.write(TestMessage2, false));
   } // These braces are required due to "h5.cpp"
 
   // Read data from the file
@@ -369,9 +369,9 @@ TEST_F(Event44WriterTests, WriterSuccessfullyHandlesMessageWithNoEvents) {
     WriterModule::ev44::ev44_Writer Writer;
     EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK);
     EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK);
-    EXPECT_NO_THROW(Writer.write(TestMessage1));
-    EXPECT_NO_THROW(Writer.write(TestMessage2));
-    EXPECT_NO_THROW(Writer.write(TestMessage3));
+    EXPECT_NO_THROW(Writer.write(TestMessage1, false));
+    EXPECT_NO_THROW(Writer.write(TestMessage2, false));
+    EXPECT_NO_THROW(Writer.write(TestMessage3, false));
   } // These braces are required due to "h5.cpp"
 
   // Read data from the file
@@ -445,7 +445,7 @@ TEST_F(Event44WriterTests, PulseTimeIsRepeatedWithinSameMessage) {
     WriterModule::ev44::ev44_Writer Writer;
     EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK);
     EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK);
-    EXPECT_NO_THROW(Writer.write(TestMessage));
+    EXPECT_NO_THROW(Writer.write(TestMessage, false));
   } // These braces are required due to "h5.cpp"
 
   // Read data from the file
@@ -510,8 +510,8 @@ TEST_F(Event44WriterTests, LastPulseTimeIsRepeatedInSubsequentMessage) {
     WriterModule::ev44::ev44_Writer Writer;
     EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK);
     EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK);
-    EXPECT_NO_THROW(Writer.write(TestMessage1));
-    EXPECT_NO_THROW(Writer.write(TestMessage2));
+    EXPECT_NO_THROW(Writer.write(TestMessage1, false));
+    EXPECT_NO_THROW(Writer.write(TestMessage2, false));
   } // These braces are required due to "h5.cpp"
 
   // Read data from the file
@@ -581,8 +581,8 @@ TEST_F(Event44WriterTests, CuesFromTwoMessagesAreRecorded) {
     Writer.setCueInterval(1);
     EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK);
     EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK);
-    EXPECT_NO_THROW(Writer.write(TestMessage1));
-    EXPECT_NO_THROW(Writer.write(TestMessage2));
+    EXPECT_NO_THROW(Writer.write(TestMessage1, false));
+    EXPECT_NO_THROW(Writer.write(TestMessage2, false));
   } // These braces are required due to "h5.cpp"
 
   // Read data from the file
diff --git a/tests/WriterModule/f144_WriterTests.cpp b/tests/WriterModule/f144_WriterTests.cpp
index 05b73f298..0948b4c32 100644
--- a/tests/WriterModule/f144_WriterTests.cpp
+++ b/tests/WriterModule/f144_WriterTests.cpp
@@ -313,7 +313,7 @@ TEST_F(f144Init, write_one_element) {
   EXPECT_EQ(FlatbufferMsg.getFlatbufferID(), "f144");
   EXPECT_EQ(TestWriter.Values.size(), 0u);
   EXPECT_EQ(TestWriter.Timestamp.current_size(), 0);
-  TestWriter.write(FlatbufferMsg);
+  TestWriter.write(FlatbufferMsg, false);
   EXPECT_EQ(TestWriter.Values.size(), 1u);
   ASSERT_EQ(TestWriter.Timestamp.current_size(), 1);
   std::vector<double> WrittenValues(1);
@@ -338,7 +338,8 @@ TEST_F(f144Init, write_one_default_value_element) {
   EXPECT_EQ(TestWriter.Values.size(), 0u);
   EXPECT_EQ(TestWriter.Timestamp.current_size(), 0);
   TestWriter.write(FileWriter::FlatbufferMessage(FlatbufferData.first.get(),
-                                                 FlatbufferData.second));
+                                                 FlatbufferData.second),
+                   false);
   EXPECT_EQ(TestWriter.Values.size(), 1u);
   ASSERT_EQ(TestWriter.Timestamp.current_size(), 1);
   std::vector<double> WrittenValues(1);
@@ -360,7 +361,7 @@ TEST_F(f144Init, write_multiple_elements) {
     auto [buffer, size] =
         f144_schema::generateFlatbufferMessage(values[i], timestamps[i]);
     FileWriter::FlatbufferMessage FlatbufferMsg(buffer.get(), size);
-    TestWriter.write(FlatbufferMsg);
+    TestWriter.write(FlatbufferMsg, false);
   }
 
   std::vector<double> WrittenValues(values.size());
diff --git a/tests/WriterModule/se00_WriterTests.cpp b/tests/WriterModule/se00_WriterTests.cpp
index b7a663454..74bb73a92 100644
--- a/tests/WriterModule/se00_WriterTests.cpp
+++ b/tests/WriterModule/se00_WriterTests.cpp
@@ -109,7 +109,7 @@ TEST_F(se00Writer, WriteDataOnce) {
   EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK);
   EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK);
   FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize);
-  EXPECT_NO_THROW(Writer.write(TestMsg));
+  EXPECT_NO_THROW(Writer.write(TestMsg, false));
   auto RawValuesDataset = UsedGroup.get_dataset("value");
   auto TimestampDataset = UsedGroup.get_dataset("time");
   auto CueIndexDataset = UsedGroup.get_dataset("cue_index");
@@ -151,8 +151,8 @@ TEST_F(se00Writer, WriteDataTwice) {
   EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK);
   EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK);
   FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize);
-  EXPECT_NO_THROW(Writer.write(TestMsg));
-  EXPECT_NO_THROW(Writer.write(TestMsg));
+  EXPECT_NO_THROW(Writer.write(TestMsg, false));
+  EXPECT_NO_THROW(Writer.write(TestMsg, false));
   auto RawValuesDataset = UsedGroup.get_dataset("value");
   auto TimestampDataset = UsedGroup.get_dataset("time");
   auto CueIndexDataset = UsedGroup.get_dataset("cue_index");
@@ -190,7 +190,7 @@ TEST_F(se00Writer, WriteNoElements) {
   EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK);
   EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK);
   FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize);
-  EXPECT_NO_THROW(Writer.write(TestMsg));
+  EXPECT_NO_THROW(Writer.write(TestMsg, false));
   auto RawValuesDataset = UsedGroup.get_dataset("value");
   auto TimestampDataset = UsedGroup.get_dataset("time");
   auto CueIndexDataset = UsedGroup.get_dataset("cue_index");
@@ -209,7 +209,7 @@ TEST_F(se00Writer, WriteDataWithNoTimestampsInFB) {
   EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK);
   EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK);
   FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize);
-  EXPECT_NO_THROW(Writer.write(TestMsg));
+  EXPECT_NO_THROW(Writer.write(TestMsg, false));
   auto RawValuesDataset = UsedGroup.get_dataset("value");
   auto TimestampDataset = UsedGroup.get_dataset("time");
   auto CueIndexDataset = UsedGroup.get_dataset("cue_index");
diff --git a/tests/WriterModule/tdct_WriterTests.cpp b/tests/WriterModule/tdct_WriterTests.cpp
index a80559b55..e8d0e8d41 100644
--- a/tests/WriterModule/tdct_WriterTests.cpp
+++ b/tests/WriterModule/tdct_WriterTests.cpp
@@ -89,7 +89,7 @@ TEST_F(ChopperTimeStampWriter, WriteDataOnce) {
   EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK);
   EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK);
   FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize);
-  EXPECT_NO_THROW(Writer.write(TestMsg));
+  EXPECT_NO_THROW(Writer.write(TestMsg, false));
   auto TimestampDataset = UsedGroup.get_dataset("time");
   auto CueIndexDataset = UsedGroup.get_dataset("cue_index");
   auto CueTimestampZeroDataset = UsedGroup.get_dataset("cue_timestamp_zero");
@@ -119,8 +119,8 @@ TEST_F(ChopperTimeStampWriter, WriteDataTwice) {
   EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK);
   EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK);
   FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize);
-  EXPECT_NO_THROW(Writer.write(TestMsg));
-  EXPECT_NO_THROW(Writer.write(TestMsg));
+  EXPECT_NO_THROW(Writer.write(TestMsg, false));
+  EXPECT_NO_THROW(Writer.write(TestMsg, false));
   auto TimestampDataset = UsedGroup.get_dataset("time");
   auto CueIndexDataset = UsedGroup.get_dataset("cue_index");
   auto CueTimestampZeroDataset = UsedGroup.get_dataset("cue_timestamp_zero");
diff --git a/tests/helpers/StubWriterModule.h b/tests/helpers/StubWriterModule.h
index 0fc753363..9e6e3f090 100644
--- a/tests/helpers/StubWriterModule.h
+++ b/tests/helpers/StubWriterModule.h
@@ -22,5 +22,5 @@ public:
   InitResult reopen(hdf5::node::Group & /*HDFGroup*/) override {
     return InitResult::OK;
   }
-  void writeImpl(FileWriter::FlatbufferMessage const & /*Message*/) override {}
+  void writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const & message, [[maybe_unused]] bool is_buffered_message) override {}
 };
-- 
GitLab


From 76e916a2283bdaebb8932529653cd613d6b29ac8 Mon Sep 17 00:00:00 2001
From: Matt Clarke <matt.clarke@ess.eu>
Date: Mon, 2 Dec 2024 14:19:38 +0100
Subject: [PATCH 02/11] ev44 now ignores buffered messages

---
 src/WriterModule/ev44/ev44_Writer.cpp | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/src/WriterModule/ev44/ev44_Writer.cpp b/src/WriterModule/ev44/ev44_Writer.cpp
index 4458a17fe..9b4365539 100644
--- a/src/WriterModule/ev44/ev44_Writer.cpp
+++ b/src/WriterModule/ev44/ev44_Writer.cpp
@@ -90,7 +90,11 @@ WriterModule::InitResult ev44_Writer::reopen(hdf5::node::Group &HDFGroup) {
 }
 
 void ev44_Writer::writeImpl(FlatbufferMessage const &Message,
-                            [[maybe_unused]] bool is_buffered_message) {
+                            bool is_buffered_message) {
+  if (is_buffered_message) {
+    // Ignore buffered data for event data
+    return;
+  }
   auto EventMsgFlatbuffer = GetEvent44Message(Message.data());
   auto CurrentNumberOfEvents = EventMsgFlatbuffer->time_of_flight()->size();
   if (EventMsgFlatbuffer->pixel_id()->size() > 0 &&
-- 
GitLab


From 8d0aaae3f73844eaf23388a01a29e00bfa4dda0a Mon Sep 17 00:00:00 2001
From: Matt Clarke <matt.clarke@ess.eu>
Date: Mon, 2 Dec 2024 14:53:55 +0100
Subject: [PATCH 03/11] clang formatting

---
 .../data_files/messages_before_and_after_data.json        | 8 ++++----
 domain-tests/test_messages_before_and_after.py            | 4 ++--
 tests/ProducerTests.cpp                                   | 4 +---
 tests/Stream/MessageWriterTests.cpp                       | 3 ++-
 tests/Stream/SourceFilterTest.cpp                         | 3 ++-
 tests/helpers/StubWriterModule.h                          | 3 ++-
 6 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/domain-tests/data_files/messages_before_and_after_data.json b/domain-tests/data_files/messages_before_and_after_data.json
index 91f5f8e50..7140c5078 100644
--- a/domain-tests/data_files/messages_before_and_after_data.json
+++ b/domain-tests/data_files/messages_before_and_after_data.json
@@ -122,20 +122,20 @@
   {
     "schema": "ev44",
     "topic": "local_detector",
-    "kafka_timestamp": 16000,
+    "kafka_timestamp": 15000,
     "source_name": "detector_events",
     "message_id": 669,
-    "reference_time": 16000,
+    "reference_time": 15000,
     "time_of_flight": [130, 140, 150, 160],
     "pixel_ids": [1, 2, 3, 4]
   },
   {
     "schema": "ev44",
     "topic": "local_detector",
-    "kafka_timestamp": 17000,
+    "kafka_timestamp": 16000,
     "source_name": "detector_events",
     "message_id": 670,
-    "reference_time": 17000,
+    "reference_time": 16000,
     "time_of_flight": [170, 180, 190, 200],
     "pixel_ids": [1, 2, 3, 4]
   }
diff --git a/domain-tests/test_messages_before_and_after.py b/domain-tests/test_messages_before_and_after.py
index ec474814b..8ce8eb3e2 100644
--- a/domain-tests/test_messages_before_and_after.py
+++ b/domain-tests/test_messages_before_and_after.py
@@ -35,9 +35,9 @@ def test_ev44_data_before_start_is_not_written(local_file):
         )
 
 
-def test_first_ev44_data_after_stop_written_but_later_values_ignored(local_file):
+def test_ev44_data_after_stop_not_written(local_file):
     with h5py.File(local_file, "r") as f:
         assert (
             f["/entry/instrument/event_detector/events/event_time_zero"][~0]
-            == 16_000_000_000
+            == 15_000_000_000
         )
diff --git a/tests/ProducerTests.cpp b/tests/ProducerTests.cpp
index 9962af48a..259100307 100644
--- a/tests/ProducerTests.cpp
+++ b/tests/ProducerTests.cpp
@@ -43,9 +43,7 @@ public:
                                   int64_t /*offset*/) override {
     return RdKafka::ERR_NO_ERROR;
   };
-  struct rd_kafka_topic_s *c_ptr() override {
-    return {};
-  };
+  struct rd_kafka_topic_s *c_ptr() override { return {}; };
 };
 
 TEST_F(ProducerTests, creatingForwarderIncrementsForwarderCounter) {
diff --git a/tests/Stream/MessageWriterTests.cpp b/tests/Stream/MessageWriterTests.cpp
index d4155a233..8db83f246 100644
--- a/tests/Stream/MessageWriterTests.cpp
+++ b/tests/Stream/MessageWriterTests.cpp
@@ -25,7 +25,8 @@ public:
   ~WriterModuleStandIn() = default;
   MAKE_MOCK1(init_hdf, WriterModule::InitResult(hdf5::node::Group &), override);
   MAKE_MOCK1(reopen, WriterModule::InitResult(hdf5::node::Group &), override);
-  MAKE_MOCK2(writeImpl, void(FileWriter::FlatbufferMessage const &, bool), override);
+  MAKE_MOCK2(writeImpl, void(FileWriter::FlatbufferMessage const &, bool),
+             override);
 };
 
 class DataMessageWriterTest : public ::testing::Test {
diff --git a/tests/Stream/SourceFilterTest.cpp b/tests/Stream/SourceFilterTest.cpp
index 25d358bd7..6806f40ba 100644
--- a/tests/Stream/SourceFilterTest.cpp
+++ b/tests/Stream/SourceFilterTest.cpp
@@ -17,7 +17,8 @@ class StubMessageWriter : public Stream::MessageWriter {
 public:
   StubMessageWriter()
       : MessageWriter([]() {}, 1s, std::make_unique<Metrics::Registrar>("")) {}
-  void addMessage(Stream::Message const &message, [[maybe_unused]] bool is_buffered_message) override {
+  void addMessage(Stream::Message const &message,
+                  [[maybe_unused]] bool is_buffered_message) override {
     messages_received.emplace_back(message);
   }
   std::vector<Stream::Message> messages_received;
diff --git a/tests/helpers/StubWriterModule.h b/tests/helpers/StubWriterModule.h
index 9e6e3f090..3b05f4720 100644
--- a/tests/helpers/StubWriterModule.h
+++ b/tests/helpers/StubWriterModule.h
@@ -22,5 +22,6 @@ public:
   InitResult reopen(hdf5::node::Group & /*HDFGroup*/) override {
     return InitResult::OK;
   }
-  void writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const & message, [[maybe_unused]] bool is_buffered_message) override {}
+  void writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const &message,
+                 [[maybe_unused]] bool is_buffered_message) override {}
 };
-- 
GitLab


From 6864cb60cd53ade993fba9a88a42c37c218df03d Mon Sep 17 00:00:00 2001
From: Matt Clarke <matt.clarke@ess.eu>
Date: Tue, 3 Dec 2024 15:11:12 +0100
Subject: [PATCH 04/11] adjusted test

---
 integration-tests/test_filewriter.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/integration-tests/test_filewriter.py b/integration-tests/test_filewriter.py
index 5633fc25a..c40e5cd3b 100644
--- a/integration-tests/test_filewriter.py
+++ b/integration-tests/test_filewriter.py
@@ -379,7 +379,7 @@ class TestFileWriter:
 
         with h5py.File(os.path.join(OUTPUT_DIR, file_name), "r") as f:
             # Check data is as expected
-            assert len(f["/entry/detector/event_time_zero"]) == 8
+            assert len(f["/entry/detector/event_time_zero"]) == 7
             assert f["/entry/detector/event_id"][0] == 15
             assert f["/entry/detector/event_id"][~0] == 54
             assert len(f["/entry/motion/time"]) == 8
-- 
GitLab


From afa37d73c52b0ea93d64410daf54edf0b4783a3f Mon Sep 17 00:00:00 2001
From: Matt Clarke <matt.clarke@ess.eu>
Date: Wed, 4 Dec 2024 14:19:08 +0100
Subject: [PATCH 05/11] don't write data after stop time

---
 integration-tests/test_filewriter.py | 17 +++++++++--------
 src/Stream/SourceFilter.cpp          |  2 ++
 2 files changed, 11 insertions(+), 8 deletions(-)

diff --git a/integration-tests/test_filewriter.py b/integration-tests/test_filewriter.py
index c40e5cd3b..211d041d6 100644
--- a/integration-tests/test_filewriter.py
+++ b/integration-tests/test_filewriter.py
@@ -341,9 +341,10 @@ class TestFileWriter:
     def test_data_before_and_after(self, file_writer):
         """
         Test that when writing in the past:
-         - the last value before the start time is written
-         - the first value after the stop time is written
-         - all values inbetween are written
+         - the last value before the start time is written for f144
+         - the first value after the stop time is written for f144
+         - all f144 values inbetween are written
+         - ev44 only writes the values inbetween the start and stop times
         """
         producer = Producer({"bootstrap.servers": ",".join(get_brokers())})
 
@@ -379,12 +380,12 @@ class TestFileWriter:
 
         with h5py.File(os.path.join(OUTPUT_DIR, file_name), "r") as f:
             # Check data is as expected
-            assert len(f["/entry/detector/event_time_zero"]) == 7
-            assert f["/entry/detector/event_id"][0] == 15
-            assert f["/entry/detector/event_id"][~0] == 54
-            assert len(f["/entry/motion/time"]) == 8
+            assert len(f["/entry/detector/event_time_zero"]) == 6
+            assert f["/entry/detector/event_id"][0] == 20
+            assert f["/entry/detector/event_id"][~0] == 49
+            assert len(f["/entry/motion/time"]) == 7
             assert f["/entry/motion/value"][0] == 3
-            assert f["/entry/motion/value"][~0] == 10
+            assert f["/entry/motion/value"][~0] == 9
 
     def test_start_and_stop_in_the_future(self, file_writer):
         """
diff --git a/src/Stream/SourceFilter.cpp b/src/Stream/SourceFilter.cpp
index 5defc44cf..586e54426 100644
--- a/src/Stream/SourceFilter.cpp
+++ b/src/Stream/SourceFilter.cpp
@@ -86,6 +86,8 @@ bool SourceFilter::filter_message(
   }
   if (message_time > _stop_time) {
     _is_finished = true;
+    forward_buffered_message();
+    return false;
   }
   forward_buffered_message();
   forward_message(message);
-- 
GitLab


From b13cf7d8abe1eea3d8d38cfc1cf555e4d0abd2ba Mon Sep 17 00:00:00 2001
From: Matt Clarke <matt.clarke@ess.eu>
Date: Wed, 4 Dec 2024 15:38:53 +0100
Subject: [PATCH 06/11] clang formatting

---
 tests/ProducerTests.cpp | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/tests/ProducerTests.cpp b/tests/ProducerTests.cpp
index 259100307..9962af48a 100644
--- a/tests/ProducerTests.cpp
+++ b/tests/ProducerTests.cpp
@@ -43,7 +43,9 @@ public:
                                   int64_t /*offset*/) override {
     return RdKafka::ERR_NO_ERROR;
   };
-  struct rd_kafka_topic_s *c_ptr() override { return {}; };
+  struct rd_kafka_topic_s *c_ptr() override {
+    return {};
+  };
 };
 
 TEST_F(ProducerTests, creatingForwarderIncrementsForwarderCounter) {
-- 
GitLab


From badbb407c19131767797cd38383a1dc8baab2e76 Mon Sep 17 00:00:00 2001
From: Matt Clarke <matt.clarke@ess.eu>
Date: Thu, 5 Dec 2024 08:34:16 +0100
Subject: [PATCH 07/11] update tests to match changes

---
 tests/Stream/SourceFilterTest.cpp | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/tests/Stream/SourceFilterTest.cpp b/tests/Stream/SourceFilterTest.cpp
index 6806f40ba..72b3f0a5c 100644
--- a/tests/Stream/SourceFilterTest.cpp
+++ b/tests/Stream/SourceFilterTest.cpp
@@ -116,15 +116,13 @@ TEST(SourceFilter, message_on_start_is_allowed_through) {
   EXPECT_EQ(1u, harness.writer->messages_received.size());
 }
 
-TEST(SourceFilter, first_message_after_stop_is_allowed_through) {
+TEST(SourceFilter, messages_after_stop_are_not_allowed_through) {
   auto harness = create_filter_for_tests(time_point{0ms}, time_point{1000ms});
 
   harness.filter->filter_message(create_f144_message("::source::", 1, 1001));
   harness.filter->filter_message(create_f144_message("::source::", 2, 1002));
 
-  EXPECT_EQ(1u, harness.writer->messages_received.size());
-  auto first = harness.writer->messages_received.at(0).FbMsg;
-  EXPECT_EQ(1001000000, first.getTimestamp()); // timestamp is in ns
+  EXPECT_EQ(0u, harness.writer->messages_received.size());
 }
 
 TEST(SourceFilter, message_after_stop_sets_filter_to_finished) {
@@ -164,11 +162,13 @@ TEST(SourceFilter, can_change_stop_time_after_construction) {
 
   harness.filter->set_stop_time(time_point{1000ms});
 
-  // First message after stop is allowed
+  // Messages after stop are not allowed
+  harness.filter->filter_message(create_f144_message("::source::", 1, 999));
+  harness.filter->filter_message(create_f144_message("::source::", 1, 1000));
   harness.filter->filter_message(create_f144_message("::source::", 1, 1001));
   harness.filter->filter_message(create_f144_message("::source::", 2, 1002));
 
-  EXPECT_EQ(1u, harness.writer->messages_received.size());
+  EXPECT_EQ(2u, harness.writer->messages_received.size());
 }
 
 TEST(SourceFilter,
-- 
GitLab


From 499ac6090c18d1c6e929428de332f5e9dab7b69f Mon Sep 17 00:00:00 2001
From: Matt Clarke <matt.clarke@ess.eu>
Date: Thu, 5 Dec 2024 13:17:46 +0100
Subject: [PATCH 08/11] missed domain test

---
 domain-tests/test_messages_before_and_after.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/domain-tests/test_messages_before_and_after.py b/domain-tests/test_messages_before_and_after.py
index 8ce8eb3e2..eff2c7126 100644
--- a/domain-tests/test_messages_before_and_after.py
+++ b/domain-tests/test_messages_before_and_after.py
@@ -21,10 +21,10 @@ def test_last_f144_data_before_start_is_written_but_earlier_values_ignored(local
         assert f["/entry/instrument/chopper/delay/time"][0] == 9_999_000_000
 
 
-def test_first_f144_data_after_stop_written_but_later_values_ignored(local_file):
+def test_f144_data_after_stop_not_written(local_file):
     with h5py.File(local_file, "r") as f:
-        assert f["/entry/instrument/chopper/delay/value"][~0] == 17
-        assert f["/entry/instrument/chopper/delay/time"][~0] == 16_000_000_000
+        assert f["/entry/instrument/chopper/delay/value"][~0] == 16
+        assert f["/entry/instrument/chopper/delay/time"][~0] == 15_000_000_000
 
 
 def test_ev44_data_before_start_is_not_written(local_file):
-- 
GitLab


From 82f60167683027f171f023bdb9c134cac46345c8 Mon Sep 17 00:00:00 2001
From: Matt Clarke <matt.clarke@ess.eu>
Date: Thu, 5 Dec 2024 13:49:10 +0100
Subject: [PATCH 09/11] added domain test to check f144 buffered value is
 written

---
 .gitignore                                    |  3 +-
 domain-tests/data_files/buffered_data.json    | 18 ++++
 .../nexus_templates/buffered_template.json    | 89 +++++++++++++++++++
 domain-tests/test_buffered.py                 | 20 +++++
 .../test_messages_before_and_after.py         |  1 -
 5 files changed, 129 insertions(+), 2 deletions(-)
 create mode 100644 domain-tests/data_files/buffered_data.json
 create mode 100644 domain-tests/nexus_templates/buffered_template.json
 create mode 100644 domain-tests/test_buffered.py

diff --git a/.gitignore b/.gitignore
index 098f81eba..9a7e609c2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,7 +10,8 @@ pipeline.gdsl
 *.j2~
 *.h~
 *.cxx~
-/integration-tests/output-files/*.nxs
+/integration-tests/output-files/*.*
+/domain-tests/output_files/*.*
 venv
 *.log
 *.DS_Store
diff --git a/domain-tests/data_files/buffered_data.json b/domain-tests/data_files/buffered_data.json
new file mode 100644
index 000000000..919dbb570
--- /dev/null
+++ b/domain-tests/data_files/buffered_data.json
@@ -0,0 +1,18 @@
+[
+  {
+    "schema": "f144",
+    "topic": "local_choppers",
+    "kafka_timestamp": 5000,
+    "source_name": "local:choppers:delay",
+    "timestamp": 5000,
+    "value": 2
+  },
+  {
+    "schema": "f144",
+    "topic": "local_choppers",
+    "kafka_timestamp": 9000,
+    "source_name": "local:choppers:delay",
+    "timestamp": 9000,
+    "value": 3
+  }
+]
diff --git a/domain-tests/nexus_templates/buffered_template.json b/domain-tests/nexus_templates/buffered_template.json
new file mode 100644
index 000000000..b6ec0427f
--- /dev/null
+++ b/domain-tests/nexus_templates/buffered_template.json
@@ -0,0 +1,89 @@
+{
+  "children": [
+    {
+      "name": "entry",
+      "type": "group",
+      "attributes": [
+        {
+          "name": "NX_class",
+          "dtype": "string",
+          "values": "NXentry"
+        }
+      ],
+      "children": [
+        {
+          "module": "dataset",
+          "config": {
+            "name": "title",
+            "values": "This is a title",
+            "dtype": "string"
+          }
+        },
+        {
+          "module": "mdat",
+          "config": {
+            "items": [
+              "start_time",
+              "end_time"
+            ]
+          }
+        },
+        {
+          "name": "instrument",
+          "type": "group",
+          "attributes": [
+            {
+              "name": "NX_class",
+              "dtype": "string",
+              "values": "NXinstrument"
+            }
+          ],
+          "children": [
+            {
+              "name": "chopper",
+              "type": "group",
+              "attributes": [
+                {
+                  "name": "NX_class",
+                  "dtype": "string",
+                  "values": "NXdisk_chopper"
+                }
+              ],
+              "children": [
+                {
+                  "name": "delay",
+                  "type": "group",
+                  "attributes": [
+                    {
+                      "name": "NX_class",
+                      "dtype": "string",
+                      "values": "NXlog"
+                    }
+                  ],
+                  "children": [
+                    {
+                      "module": "f144",
+                      "config": {
+                        "source": "local:choppers:delay",
+                        "topic": "local_choppers",
+                        "dtype": "double"
+                      }
+                    }
+                  ]
+                },
+                {
+                  "module": "dataset",
+                  "config": {
+                    "name": "depends_on",
+                    "values": ".",
+                    "dtype": "string"
+                  }
+                }
+              ]
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}
diff --git a/domain-tests/test_buffered.py b/domain-tests/test_buffered.py
new file mode 100644
index 000000000..b7acaf87e
--- /dev/null
+++ b/domain-tests/test_buffered.py
@@ -0,0 +1,20 @@
+import h5py
+import pytest
+from conftest import write_file
+
+
+# This fixture is used to create the file
+@pytest.fixture(scope="module")
+def local_file(request):
+    return write_file(
+        request,
+        "output_files/buffered.hdf",
+        "nexus_templates/buffered_template.json",
+        "data_files/buffered_data.json",
+    )
+
+
+def test_f144_data_before_start_is_written_when_no_data_between_start_and_stop(local_file):
+    with h5py.File(local_file, "r") as f:
+        assert f["/entry/instrument/chopper/delay/value"][0] == 3
+        assert f["/entry/instrument/chopper/delay/time"][0] == 9_000_000_000
\ No newline at end of file
diff --git a/domain-tests/test_messages_before_and_after.py b/domain-tests/test_messages_before_and_after.py
index eff2c7126..cf3c0dd02 100644
--- a/domain-tests/test_messages_before_and_after.py
+++ b/domain-tests/test_messages_before_and_after.py
@@ -1,5 +1,4 @@
 import h5py
-import numpy as np
 import pytest
 from conftest import write_file
 
-- 
GitLab


From c599eed98540360a9760aa50a57ce32d35d5f893 Mon Sep 17 00:00:00 2001
From: Matt Clarke <matt.clarke@ess.eu>
Date: Thu, 5 Dec 2024 13:55:27 +0100
Subject: [PATCH 10/11] formatting

---
 domain-tests/test_buffered.py | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/domain-tests/test_buffered.py b/domain-tests/test_buffered.py
index b7acaf87e..1c5301893 100644
--- a/domain-tests/test_buffered.py
+++ b/domain-tests/test_buffered.py
@@ -14,7 +14,9 @@ def local_file(request):
     )
 
 
-def test_f144_data_before_start_is_written_when_no_data_between_start_and_stop(local_file):
+def test_f144_data_before_start_is_written_when_no_data_between_start_and_stop(
+    local_file,
+):
     with h5py.File(local_file, "r") as f:
         assert f["/entry/instrument/chopper/delay/value"][0] == 3
-        assert f["/entry/instrument/chopper/delay/time"][0] == 9_000_000_000
\ No newline at end of file
+        assert f["/entry/instrument/chopper/delay/time"][0] == 9_000_000_000
-- 
GitLab


From 2054a75e20ab5e48340c8b90586c33886cc92ce6 Mon Sep 17 00:00:00 2001
From: Matt Clarke <matt.clarke@ess.eu>
Date: Thu, 5 Dec 2024 14:49:34 +0100
Subject: [PATCH 11/11] unit test to confirm that ev44 buffered data is not
 written

---
 src/WriterModule/ad00/ad00_Writer.cpp      |  3 ++-
 src/WriterModule/ad00/ad00_Writer.h        |  2 +-
 src/WriterModule/al00/al00_Writer.cpp      |  3 ++-
 src/WriterModule/al00/al00_Writer.h        |  2 +-
 src/WriterModule/da00/da00_Writer.cpp      |  3 ++-
 src/WriterModule/da00/da00_Writer.h        |  2 +-
 src/WriterModule/ep01/ep01_Writer.cpp      |  3 ++-
 src/WriterModule/ep01/ep01_Writer.h        |  2 +-
 src/WriterModule/ev44/ev44_Writer.cpp      |  5 ++--
 src/WriterModule/ev44/ev44_Writer.h        |  2 +-
 src/WriterModule/f144/f144_Writer.cpp      |  3 ++-
 src/WriterModule/f144/f144_Writer.h        |  2 +-
 src/WriterModule/se00/se00_Writer.cpp      |  5 ++--
 src/WriterModule/se00/se00_Writer.h        |  2 +-
 src/WriterModule/tdct/tdct_Writer.cpp      |  5 ++--
 src/WriterModule/tdct/tdct_Writer.h        |  2 +-
 src/WriterModule/template/TemplateWriter.h |  3 ++-
 src/WriterModuleBase.h                     |  7 +++---
 tests/SourceTests.cpp                      |  2 +-
 tests/Stream/MessageWriterTests.cpp        |  4 ++--
 tests/WriterModule/ev44_WriterTests.cpp    | 27 ++++++++++++++++++++++
 tests/helpers/StubWriterModule.h           |  6 +++--
 22 files changed, 67 insertions(+), 28 deletions(-)

diff --git a/src/WriterModule/ad00/ad00_Writer.cpp b/src/WriterModule/ad00/ad00_Writer.cpp
index 39ab476d4..b50951a64 100644
--- a/src/WriterModule/ad00/ad00_Writer.cpp
+++ b/src/WriterModule/ad00/ad00_Writer.cpp
@@ -139,7 +139,7 @@ void msgTypeIsConfigType(ad00_Writer::Type ConfigType, DType MsgType) {
   }
 }
 
-void ad00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
+bool ad00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
                             [[maybe_unused]] bool is_buffered_message) {
   auto ad00 = Getad00_ADArray(Message.data());
   auto DataShape =
@@ -200,6 +200,7 @@ void ad00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
     CueTimestamp.appendElement(CurrentTimestamp);
     CueCounter = 0;
   }
+  return true;
 }
 
 template <typename Type>
diff --git a/src/WriterModule/ad00/ad00_Writer.h b/src/WriterModule/ad00/ad00_Writer.h
index 4d2ed1c9c..5922457d3 100644
--- a/src/WriterModule/ad00/ad00_Writer.h
+++ b/src/WriterModule/ad00/ad00_Writer.h
@@ -33,7 +33,7 @@ public:
 
   InitResult reopen(hdf5::node::Group &HDFGroup) override;
 
-  void writeImpl(FileWriter::FlatbufferMessage const &Message,
+  bool writeImpl(FileWriter::FlatbufferMessage const &Message,
                  bool is_buffered_message) override;
 
   enum class Type {
diff --git a/src/WriterModule/al00/al00_Writer.cpp b/src/WriterModule/al00/al00_Writer.cpp
index 17e98caff..9beeff2ba 100644
--- a/src/WriterModule/al00/al00_Writer.cpp
+++ b/src/WriterModule/al00/al00_Writer.cpp
@@ -54,7 +54,7 @@ InitResult al00_Writer::reopen(hdf5::node::Group &HDFGroup) {
   return InitResult::OK;
 }
 
-void al00_Writer::writeImpl(FlatbufferMessage const &Message,
+bool al00_Writer::writeImpl(FlatbufferMessage const &Message,
                             [[maybe_unused]] bool is_buffered_message) {
   auto AlarmMessage = GetAlarm(Message.data());
 
@@ -66,6 +66,7 @@ void al00_Writer::writeImpl(FlatbufferMessage const &Message,
     AlarmMessageString = "NO ALARM MESSAGE";
   }
   AlarmMsg.appendStringElement(AlarmMessageString);
+  return true;
 }
 
 /// Register the writer module.
diff --git a/src/WriterModule/al00/al00_Writer.h b/src/WriterModule/al00/al00_Writer.h
index 52bb2c21e..fa68a2b36 100644
--- a/src/WriterModule/al00/al00_Writer.h
+++ b/src/WriterModule/al00/al00_Writer.h
@@ -33,7 +33,7 @@ public:
   WriterModule::InitResult reopen(hdf5::node::Group &HDFGroup) override;
 
   /// Write an incoming message which should contain a flatbuffer.
-  void writeImpl(FlatbufferMessage const &Message,
+  bool writeImpl(FlatbufferMessage const &Message,
                  bool is_buffered_message) override;
 
   al00_Writer() : WriterModule::Base("al00", false, "NXlog") {}
diff --git a/src/WriterModule/da00/da00_Writer.cpp b/src/WriterModule/da00/da00_Writer.cpp
index 3c15c29ac..c36674bfe 100644
--- a/src/WriterModule/da00/da00_Writer.cpp
+++ b/src/WriterModule/da00/da00_Writer.cpp
@@ -330,7 +330,7 @@ InitResult da00_Writer::reopen(hdf5::node::Group &HDFGroup) {
   return InitResult::OK;
 }
 
-void da00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
+bool da00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
                             [[maybe_unused]] bool is_buffered_message) {
   const auto da00_obj = Getda00_DataArray(Message.data());
   if (isFirstMessage) {
@@ -383,6 +383,7 @@ void da00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
     CueTimestampZero.appendElement(da00_obj->timestamp());
     CueCounter = 0;
   }
+  return true;
 }
 
 } // namespace WriterModule::da00
diff --git a/src/WriterModule/da00/da00_Writer.h b/src/WriterModule/da00/da00_Writer.h
index 1f0367840..eb5301e55 100644
--- a/src/WriterModule/da00/da00_Writer.h
+++ b/src/WriterModule/da00/da00_Writer.h
@@ -36,7 +36,7 @@ public:
 
   InitResult reopen(hdf5::node::Group &HDFGroup) override;
 
-  void writeImpl(FileWriter::FlatbufferMessage const &Message,
+  bool writeImpl(FileWriter::FlatbufferMessage const &Message,
                  bool is_buffered_message) override;
 
   NeXusDataset::Time Timestamp;
diff --git a/src/WriterModule/ep01/ep01_Writer.cpp b/src/WriterModule/ep01/ep01_Writer.cpp
index b0cf09150..3e6dc1f31 100644
--- a/src/WriterModule/ep01/ep01_Writer.cpp
+++ b/src/WriterModule/ep01/ep01_Writer.cpp
@@ -35,13 +35,14 @@ InitResult ep01_Writer::init_hdf(hdf5::node::Group &HDFGroup) {
   return InitResult::OK;
 }
 
-void ep01_Writer::writeImpl(FileWriter::FlatbufferMessage const &Message,
+bool ep01_Writer::writeImpl(FileWriter::FlatbufferMessage const &Message,
                             [[maybe_unused]] bool is_buffered_message) {
   auto FlatBuffer = GetEpicsPVConnectionInfo(Message.data());
   std::int16_t Status = static_cast<std::int16_t>(FlatBuffer->status());
   StatusDataset.appendElement(Status);
   auto FBTimestamp = FlatBuffer->timestamp();
   TimestampDataset.appendElement(FBTimestamp);
+  return true;
 }
 
 static WriterModule::Registry::Registrar<ep01_Writer>
diff --git a/src/WriterModule/ep01/ep01_Writer.h b/src/WriterModule/ep01/ep01_Writer.h
index 2013876ce..b13be6d9d 100644
--- a/src/WriterModule/ep01/ep01_Writer.h
+++ b/src/WriterModule/ep01/ep01_Writer.h
@@ -9,7 +9,7 @@ class ep01_Writer final : public WriterModule::Base {
 public:
   InitResult init_hdf(hdf5::node::Group &HDFGroup) override;
   InitResult reopen(hdf5::node::Group &HDFGroup) override;
-  void writeImpl(FileWriter::FlatbufferMessage const &Message,
+  bool writeImpl(FileWriter::FlatbufferMessage const &Message,
                  bool is_buffered_message) override;
 
   ep01_Writer() : WriterModule::Base("ep01", false, "NXlog") {}
diff --git a/src/WriterModule/ev44/ev44_Writer.cpp b/src/WriterModule/ev44/ev44_Writer.cpp
index 9b4365539..b47611f2e 100644
--- a/src/WriterModule/ev44/ev44_Writer.cpp
+++ b/src/WriterModule/ev44/ev44_Writer.cpp
@@ -89,11 +89,11 @@ WriterModule::InitResult ev44_Writer::reopen(hdf5::node::Group &HDFGroup) {
   return WriterModule::InitResult::OK;
 }
 
-void ev44_Writer::writeImpl(FlatbufferMessage const &Message,
+bool ev44_Writer::writeImpl(FlatbufferMessage const &Message,
                             bool is_buffered_message) {
   if (is_buffered_message) {
     // Ignore buffered data for event data
-    return;
+    return false;
   }
   auto EventMsgFlatbuffer = GetEvent44Message(Message.data());
   auto CurrentNumberOfEvents = EventMsgFlatbuffer->time_of_flight()->size();
@@ -134,6 +134,7 @@ void ev44_Writer::writeImpl(FlatbufferMessage const &Message,
     }
     EventsWrittenMetadataField.setValue(EventsWritten);
   }
+  return true;
 }
 
 void ev44_Writer::register_meta_data(const hdf5::node::Group &HDFGroup,
diff --git a/src/WriterModule/ev44/ev44_Writer.h b/src/WriterModule/ev44/ev44_Writer.h
index 850378082..7e3d46493 100644
--- a/src/WriterModule/ev44/ev44_Writer.h
+++ b/src/WriterModule/ev44/ev44_Writer.h
@@ -27,7 +27,7 @@ public:
   /// \brief Write flatbuffer message.
   ///
   /// \param FlatBufferMessage
-  void writeImpl(FlatbufferMessage const &Message,
+  bool writeImpl(FlatbufferMessage const &Message,
                  bool is_buffered_message) override;
 
   NeXusDataset::EventTimeOffset EventTimeOffset;
diff --git a/src/WriterModule/f144/f144_Writer.cpp b/src/WriterModule/f144/f144_Writer.cpp
index ae7e15ab6..8707ac4d7 100644
--- a/src/WriterModule/f144/f144_Writer.cpp
+++ b/src/WriterModule/f144/f144_Writer.cpp
@@ -194,7 +194,7 @@ void msgTypeIsConfigType(f144_Writer::Type ConfigType, Value MsgType) {
   }
 }
 
-void f144_Writer::writeImpl(FlatbufferMessage const &Message,
+bool f144_Writer::writeImpl(FlatbufferMessage const &Message,
                             [[maybe_unused]] bool is_buffered_message) {
   auto LogDataMessage = Getf144_LogData(Message.data());
   Timestamp.appendElement(LogDataMessage->timestamp());
@@ -274,6 +274,7 @@ void f144_Writer::writeImpl(FlatbufferMessage const &Message,
     MetaDataMax.setValue(Max);
     MetaDataMean.setValue(Sum / TotalNrOfElementsWritten);
   }
+  return true;
 }
 
 void f144_Writer::register_meta_data(hdf5::node::Group const &HDFGroup,
diff --git a/src/WriterModule/f144/f144_Writer.h b/src/WriterModule/f144/f144_Writer.h
index f31dba956..b051a739a 100644
--- a/src/WriterModule/f144/f144_Writer.h
+++ b/src/WriterModule/f144/f144_Writer.h
@@ -39,7 +39,7 @@ public:
   WriterModule::InitResult reopen(hdf5::node::Group &HDFGroup) override;
 
   /// Write an incoming message which should contain a flatbuffer.
-  void writeImpl(FlatbufferMessage const &Message,
+  bool writeImpl(FlatbufferMessage const &Message,
                  bool is_buffered_message) override;
 
   f144_Writer()
diff --git a/src/WriterModule/se00/se00_Writer.cpp b/src/WriterModule/se00/se00_Writer.cpp
index 1514eeec0..0b0bd75d0 100644
--- a/src/WriterModule/se00/se00_Writer.cpp
+++ b/src/WriterModule/se00/se00_Writer.cpp
@@ -132,7 +132,7 @@ void msgTypeIsConfigType(se00_Writer::Type ConfigType, ValueUnion MsgType) {
   }
 }
 
-void se00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
+bool se00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
                             [[maybe_unused]] bool is_buffered_message) {
   auto FbPointer = Getse00_SampleEnvironmentData(Message.data());
   auto CueIndexValue = Value->current_size();
@@ -200,7 +200,7 @@ void se00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
     Logger::Info("Unknown data type in flatbuffer.");
   }
   if (NrOfElements == 0) {
-    return;
+    return false;
   }
   CueTimestampIndex.appendElement(static_cast<std::uint32_t>(CueIndexValue));
   CueTimestamp.appendElement(FbPointer->packet_timestamp());
@@ -217,6 +217,7 @@ void se00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
         FbPointer->packet_timestamp(), FbPointer->time_delta(), NrOfElements));
     Timestamp.appendArray(TempTimeStamps);
   }
+  return true;
 }
 
 template <typename Type>
diff --git a/src/WriterModule/se00/se00_Writer.h b/src/WriterModule/se00/se00_Writer.h
index 175db231a..290c17719 100644
--- a/src/WriterModule/se00/se00_Writer.h
+++ b/src/WriterModule/se00/se00_Writer.h
@@ -44,7 +44,7 @@ public:
 
   InitResult reopen(hdf5::node::Group &HDFGroup) override;
 
-  void writeImpl(FlatbufferMessage const &Message,
+  bool writeImpl(FlatbufferMessage const &Message,
                  bool is_buffered_message) override;
 
   enum class Type {
diff --git a/src/WriterModule/tdct/tdct_Writer.cpp b/src/WriterModule/tdct/tdct_Writer.cpp
index e7218b18c..2f69c1c7a 100644
--- a/src/WriterModule/tdct/tdct_Writer.cpp
+++ b/src/WriterModule/tdct/tdct_Writer.cpp
@@ -64,7 +64,7 @@ WriterModule::InitResult tdct_Writer::reopen(hdf5::node::Group &HDFGroup) {
   return WriterModule::InitResult::OK;
 }
 
-void tdct_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
+bool tdct_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
                             [[maybe_unused]] bool is_buffered_message) {
   auto FbPointer = Gettimestamp(Message.data());
   auto TempTimePtr = FbPointer->timestamps()->data();
@@ -72,13 +72,14 @@ void tdct_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message,
   if (TempTimeSize == 0) {
     Logger::Info(
         "Received a flatbuffer with zero (0) timestamps elements in it.");
-    return;
+    return false;
   }
   hdf5::ArrayAdapter<const std::uint64_t> CArray(TempTimePtr, TempTimeSize);
   auto CueIndexValue = Timestamp.current_size();
   CueTimestampIndex.appendElement(static_cast<std::uint32_t>(CueIndexValue));
   CueTimestamp.appendElement(FbPointer->timestamps()->operator[](0));
   Timestamp.appendArray(CArray);
+  return true;
 }
 
 } // namespace WriterModule::tdct
diff --git a/src/WriterModule/tdct/tdct_Writer.h b/src/WriterModule/tdct/tdct_Writer.h
index f82f7c350..ee6c2c87d 100644
--- a/src/WriterModule/tdct/tdct_Writer.h
+++ b/src/WriterModule/tdct/tdct_Writer.h
@@ -34,7 +34,7 @@ public:
 
   InitResult reopen(hdf5::node::Group &HDFGroup) override;
 
-  void writeImpl(FlatbufferMessage const &Message,
+  bool writeImpl(FlatbufferMessage const &Message,
                  bool is_buffered_message) override;
 
 protected:
diff --git a/src/WriterModule/template/TemplateWriter.h b/src/WriterModule/template/TemplateWriter.h
index c0192f316..ea5cae5b3 100644
--- a/src/WriterModule/template/TemplateWriter.h
+++ b/src/WriterModule/template/TemplateWriter.h
@@ -186,8 +186,9 @@ public:
   /// \param Message The structure containing a pointer to a buffer
   /// containing data received from the Kafka broker and the size of the buffer.
   // cppcheck-suppress functionStatic
-  void writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const &message, [[maybe_unused]]bool is_buffered_message) override {
+  bool writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const &message, [[maybe_unused]]bool is_buffered_message) override {
     std::cout << "WriterClass::writeImpl()\n";
+    return true;
   }
 };
 } // namespace TemplateWriter
diff --git a/src/WriterModuleBase.h b/src/WriterModuleBase.h
index fd0ed03a3..65d8ac703 100644
--- a/src/WriterModuleBase.h
+++ b/src/WriterModuleBase.h
@@ -113,14 +113,15 @@ public:
   /// \param msg The message to process
   void write(FileWriter::FlatbufferMessage const &Message,
              bool is_buffered_message) {
-    writeImpl(Message, is_buffered_message);
-    WriteCount++;
+    if (writeImpl(Message, is_buffered_message)) {
+      WriteCount++;
+    }
   }
 
   /// \brief Process the message in some way, for example write to the HDF file.
   ///
   /// \param msg The message to process
-  virtual void writeImpl(FileWriter::FlatbufferMessage const &Message,
+  virtual bool writeImpl(FileWriter::FlatbufferMessage const &Message,
                          bool is_buffered_message) = 0;
 
   void registerField(JsonConfig::FieldBase *Ptr) {
diff --git a/tests/SourceTests.cpp b/tests/SourceTests.cpp
index efc2a24c4..49c3a8168 100644
--- a/tests/SourceTests.cpp
+++ b/tests/SourceTests.cpp
@@ -34,7 +34,7 @@ public:
 
 class WriterModuleMock : public StubWriterModule {
 public:
-  MAKE_MOCK2(writeImpl, void(FlatbufferMessage const &, bool), override);
+  MAKE_MOCK2(writeImpl, bool(FlatbufferMessage const &, bool), override);
 };
 
 TEST_F(SourceTests, ConstructorSetsMembers) {
diff --git a/tests/Stream/MessageWriterTests.cpp b/tests/Stream/MessageWriterTests.cpp
index 8db83f246..0cec2c971 100644
--- a/tests/Stream/MessageWriterTests.cpp
+++ b/tests/Stream/MessageWriterTests.cpp
@@ -25,7 +25,7 @@ public:
   ~WriterModuleStandIn() = default;
   MAKE_MOCK1(init_hdf, WriterModule::InitResult(hdf5::node::Group &), override);
   MAKE_MOCK1(reopen, WriterModule::InitResult(hdf5::node::Group &), override);
-  MAKE_MOCK2(writeImpl, void(FileWriter::FlatbufferMessage const &, bool),
+  MAKE_MOCK2(writeImpl, bool(FileWriter::FlatbufferMessage const &, bool),
              override);
 };
 
@@ -80,7 +80,7 @@ TEST_F(DataMessageWriterTest, EnableExtraModule) {
 }
 
 TEST_F(DataMessageWriterTest, WriteMessageSuccess) {
-  REQUIRE_CALL(WriterModule, writeImpl(_, _)).TIMES(1);
+  REQUIRE_CALL(WriterModule, writeImpl(_, _)).TIMES(1).RETURN(true);
   auto InitialWriteCount = WriterModule.getWriteCount();
   FileWriter::FlatbufferMessage Msg;
   Stream::Message SomeMessage(
diff --git a/tests/WriterModule/ev44_WriterTests.cpp b/tests/WriterModule/ev44_WriterTests.cpp
index f2e7b1f86..a3b904fc3 100644
--- a/tests/WriterModule/ev44_WriterTests.cpp
+++ b/tests/WriterModule/ev44_WriterTests.cpp
@@ -603,3 +603,30 @@ TEST_F(Event44WriterTests, CuesFromTwoMessagesAreRecorded) {
       << "Expected cue_index dataset to contain the indices of the last event "
          "of every message";
 }
+
+TEST_F(Event44WriterTests, buffered_data_not_written) {
+  std::vector<int32_t> TimeOfFlight1 = {101, 102, 201};
+  std::vector<int32_t> DetectorID1 = {101, 102, 201};
+  std::vector<int64_t> ReferenceTime1 = {1000, 2000};
+  std::vector<int32_t> ReferenceTimeIndex1 = {0, 2};
+  auto MessageBuffer1 =
+      generateFlatbufferData("TestSource", 1, TimeOfFlight1, DetectorID1,
+                             ReferenceTime1, ReferenceTimeIndex1);
+  FileWriter::FlatbufferMessage TestMessage1(MessageBuffer1.data(),
+                                             MessageBuffer1.size());
+
+  // Create writer and give it the message to write
+  {
+    WriterModule::ev44::ev44_Writer Writer;
+    EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK);
+    EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK);
+    EXPECT_NO_THROW(Writer.write(TestMessage1, true));
+  } // These braces are required due to "h5.cpp"
+
+  // Read data from the file
+  auto EventTimeOffsetDataset = TestGroup.get_dataset("event_time_offset");
+  auto EventTimeZeroDataset = TestGroup.get_dataset("event_time_zero");
+
+  EXPECT_EQ(0, EventTimeOffsetDataset.dataspace().size());
+  EXPECT_EQ(0, EventTimeZeroDataset.dataspace().size());
+}
diff --git a/tests/helpers/StubWriterModule.h b/tests/helpers/StubWriterModule.h
index 3b05f4720..d75dd844d 100644
--- a/tests/helpers/StubWriterModule.h
+++ b/tests/helpers/StubWriterModule.h
@@ -22,6 +22,8 @@ public:
   InitResult reopen(hdf5::node::Group & /*HDFGroup*/) override {
     return InitResult::OK;
   }
-  void writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const &message,
-                 [[maybe_unused]] bool is_buffered_message) override {}
+  bool writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const &message,
+                 [[maybe_unused]] bool is_buffered_message) override {
+    return true;
+  }
 };
-- 
GitLab