From 8908e200fb5de0a70eb211d41e080379f73cd363 Mon Sep 17 00:00:00 2001 From: Matt Clarke <matt.clarke@ess.eu> Date: Mon, 2 Dec 2024 14:16:51 +0100 Subject: [PATCH 01/11] added boolean for writing buffered or not --- .../messages_before_and_after_data.json | 22 ++++++++++++----- .../test_messages_before_and_after.py | 4 ++-- src/Stream/MessageWriter.cpp | 10 ++++---- src/Stream/MessageWriter.h | 5 ++-- src/Stream/SourceFilter.cpp | 8 +++---- src/Stream/SourceFilter.h | 3 ++- src/WriterModule/ad00/ad00_Writer.cpp | 3 ++- src/WriterModule/ad00/ad00_Writer.h | 3 ++- src/WriterModule/al00/al00_Writer.cpp | 3 ++- src/WriterModule/al00/al00_Writer.h | 3 ++- src/WriterModule/da00/da00_Writer.cpp | 3 ++- src/WriterModule/da00/da00_Writer.h | 3 ++- src/WriterModule/ep01/ep01_Writer.cpp | 3 ++- src/WriterModule/ep01/ep01_Writer.h | 3 ++- src/WriterModule/ev44/ev44_Writer.cpp | 3 ++- src/WriterModule/ev44/ev44_Writer.h | 3 ++- src/WriterModule/f144/f144_Writer.cpp | 3 ++- src/WriterModule/f144/f144_Writer.h | 3 ++- src/WriterModule/se00/se00_Writer.cpp | 3 ++- src/WriterModule/se00/se00_Writer.h | 3 ++- src/WriterModule/tdct/tdct_Writer.cpp | 3 ++- src/WriterModule/tdct/tdct_Writer.h | 3 ++- src/WriterModule/template/TemplateWriter.h | 2 +- src/WriterModuleBase.h | 8 ++++--- tests/SourceTests.cpp | 2 +- tests/Stream/MessageWriterTests.cpp | 10 ++++---- tests/Stream/SourceFilterTest.cpp | 4 ++-- tests/WriterModule/TemplateWriterTests.cpp | 2 +- tests/WriterModule/al00_WriterTests.cpp | 2 +- tests/WriterModule/da00_WriterTests.cpp | 6 ++--- tests/WriterModule/ep01_WriterTests.cpp | 2 +- tests/WriterModule/ev44_WriterTests.cpp | 24 +++++++++---------- tests/WriterModule/f144_WriterTests.cpp | 7 +++--- tests/WriterModule/se00_WriterTests.cpp | 10 ++++---- tests/WriterModule/tdct_WriterTests.cpp | 6 ++--- tests/helpers/StubWriterModule.h | 2 +- 36 files changed, 110 insertions(+), 77 deletions(-) diff --git a/domain-tests/data_files/messages_before_and_after_data.json b/domain-tests/data_files/messages_before_and_after_data.json index da4e87128..91f5f8e50 100644 --- a/domain-tests/data_files/messages_before_and_after_data.json +++ b/domain-tests/data_files/messages_before_and_after_data.json @@ -102,31 +102,41 @@ { "schema": "ev44", "topic": "local_detector", - "kafka_timestamp": 11000, + "kafka_timestamp": 10000, "source_name": "detector_events", "message_id": 667, - "reference_time": 11000, + "reference_time": 10000, "time_of_flight": [50, 60, 70, 80], "pixel_ids": [1, 2, 3, 4] }, { "schema": "ev44", "topic": "local_detector", - "kafka_timestamp": 16000, + "kafka_timestamp": 12000, "source_name": "detector_events", "message_id": 668, - "reference_time": 16000, + "reference_time": 12000, "time_of_flight": [90, 100, 110, 120], "pixel_ids": [1, 2, 3, 4] }, { "schema": "ev44", "topic": "local_detector", - "kafka_timestamp": 17000, + "kafka_timestamp": 16000, "source_name": "detector_events", "message_id": 669, - "reference_time": 17000, + "reference_time": 16000, "time_of_flight": [130, 140, 150, 160], "pixel_ids": [1, 2, 3, 4] + }, + { + "schema": "ev44", + "topic": "local_detector", + "kafka_timestamp": 17000, + "source_name": "detector_events", + "message_id": 670, + "reference_time": 17000, + "time_of_flight": [170, 180, 190, 200], + "pixel_ids": [1, 2, 3, 4] } ] diff --git a/domain-tests/test_messages_before_and_after.py b/domain-tests/test_messages_before_and_after.py index 4a858cb66..ec474814b 100644 --- a/domain-tests/test_messages_before_and_after.py +++ b/domain-tests/test_messages_before_and_after.py @@ -27,11 +27,11 @@ def test_first_f144_data_after_stop_written_but_later_values_ignored(local_file) assert f["/entry/instrument/chopper/delay/time"][~0] == 16_000_000_000 -def test_last_ev44_data_before_start_is_written_but_earlier_values_ignored(local_file): +def test_ev44_data_before_start_is_not_written(local_file): with h5py.File(local_file, "r") as f: assert ( f["/entry/instrument/event_detector/events/event_time_zero"][0] - == 9_000_000_000 + == 10_000_000_000 ) diff --git a/src/Stream/MessageWriter.cpp b/src/Stream/MessageWriter.cpp index 2625bf879..f02e36a43 100644 --- a/src/Stream/MessageWriter.cpp +++ b/src/Stream/MessageWriter.cpp @@ -51,16 +51,18 @@ MessageWriter::~MessageWriter() { } } -void MessageWriter::addMessage(Message const &Msg) { - WriteJobs.enqueue([=]() { writeMsgImpl(Msg.DestPtr, Msg.FbMsg); }); +void MessageWriter::addMessage(Message const &Msg, bool is_buffered_message) { + WriteJobs.enqueue( + [=]() { writeMsgImpl(Msg.DestPtr, Msg.FbMsg, is_buffered_message); }); } void MessageWriter::stop() { RunThread.store(false); } void MessageWriter::writeMsgImpl(WriterModule::Base *ModulePtr, - FileWriter::FlatbufferMessage const &Msg) { + FileWriter::FlatbufferMessage const &Msg, + bool is_buffered_message) { try { - ModulePtr->write(Msg); + ModulePtr->write(Msg, is_buffered_message); WritesDone++; } catch (WriterModule::WriterException &E) { WriteErrors++; diff --git a/src/Stream/MessageWriter.h b/src/Stream/MessageWriter.h index 9ef768597..e7e75687c 100644 --- a/src/Stream/MessageWriter.h +++ b/src/Stream/MessageWriter.h @@ -38,7 +38,7 @@ public: virtual ~MessageWriter(); - virtual void addMessage(Message const &Msg); + virtual void addMessage(Message const &Msg, bool is_buffered_message); /// \brief Tell the writer thread to stop. /// @@ -60,7 +60,8 @@ public: protected: virtual void writeMsgImpl(WriterModule::Base *ModulePtr, - FileWriter::FlatbufferMessage const &Msg); + FileWriter::FlatbufferMessage const &Msg, + bool is_buffered_message); virtual void threadFunction(); virtual void flushData() { FlushDataFunction(); }; diff --git a/src/Stream/SourceFilter.cpp b/src/Stream/SourceFilter.cpp index 77a5cd282..5defc44cf 100644 --- a/src/Stream/SourceFilter.cpp +++ b/src/Stream/SourceFilter.cpp @@ -36,7 +36,7 @@ bool SourceFilter::has_finished() const { return _is_finished; } void SourceFilter::forward_buffered_message() { if (_buffered_message.isValid()) { - forward_message(_buffered_message); + forward_message(_buffered_message, true); _buffered_message = FileWriter::FlatbufferMessage(); } } @@ -92,11 +92,11 @@ bool SourceFilter::filter_message( return true; } -void SourceFilter::forward_message( - FileWriter::FlatbufferMessage const &message) { +void SourceFilter::forward_message(FileWriter::FlatbufferMessage const &message, + bool is_buffered_message) { ++MessagesTransmitted; for (auto const &writer_module : _destination_writer_modules) { - _writer->addMessage({writer_module, message}); + _writer->addMessage({writer_module, message}, is_buffered_message); } } diff --git a/src/Stream/SourceFilter.h b/src/Stream/SourceFilter.h index 60b63f61a..443ce20da 100644 --- a/src/Stream/SourceFilter.h +++ b/src/Stream/SourceFilter.h @@ -58,7 +58,8 @@ public: } private: - void forward_message(FileWriter::FlatbufferMessage const &message); + void forward_message(FileWriter::FlatbufferMessage const &message, + bool is_buffered_message = false); void forward_buffered_message(); time_point _start_time; time_point _stop_time; diff --git a/src/WriterModule/ad00/ad00_Writer.cpp b/src/WriterModule/ad00/ad00_Writer.cpp index 10524503b..39ab476d4 100644 --- a/src/WriterModule/ad00/ad00_Writer.cpp +++ b/src/WriterModule/ad00/ad00_Writer.cpp @@ -139,7 +139,8 @@ void msgTypeIsConfigType(ad00_Writer::Type ConfigType, DType MsgType) { } } -void ad00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message) { +void ad00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, + [[maybe_unused]] bool is_buffered_message) { auto ad00 = Getad00_ADArray(Message.data()); auto DataShape = hdf5::Dimensions(ad00->dimensions()->begin(), ad00->dimensions()->end()); diff --git a/src/WriterModule/ad00/ad00_Writer.h b/src/WriterModule/ad00/ad00_Writer.h index 1975a35b1..4d2ed1c9c 100644 --- a/src/WriterModule/ad00/ad00_Writer.h +++ b/src/WriterModule/ad00/ad00_Writer.h @@ -33,7 +33,8 @@ public: InitResult reopen(hdf5::node::Group &HDFGroup) override; - void writeImpl(FileWriter::FlatbufferMessage const &Message) override; + void writeImpl(FileWriter::FlatbufferMessage const &Message, + bool is_buffered_message) override; enum class Type { int8, diff --git a/src/WriterModule/al00/al00_Writer.cpp b/src/WriterModule/al00/al00_Writer.cpp index ac56b12ac..17e98caff 100644 --- a/src/WriterModule/al00/al00_Writer.cpp +++ b/src/WriterModule/al00/al00_Writer.cpp @@ -54,7 +54,8 @@ InitResult al00_Writer::reopen(hdf5::node::Group &HDFGroup) { return InitResult::OK; } -void al00_Writer::writeImpl(FlatbufferMessage const &Message) { +void al00_Writer::writeImpl(FlatbufferMessage const &Message, + [[maybe_unused]] bool is_buffered_message) { auto AlarmMessage = GetAlarm(Message.data()); AlarmTime.appendElement(AlarmMessage->timestamp()); diff --git a/src/WriterModule/al00/al00_Writer.h b/src/WriterModule/al00/al00_Writer.h index 87ce75656..52bb2c21e 100644 --- a/src/WriterModule/al00/al00_Writer.h +++ b/src/WriterModule/al00/al00_Writer.h @@ -33,7 +33,8 @@ public: WriterModule::InitResult reopen(hdf5::node::Group &HDFGroup) override; /// Write an incoming message which should contain a flatbuffer. - void writeImpl(FlatbufferMessage const &Message) override; + void writeImpl(FlatbufferMessage const &Message, + bool is_buffered_message) override; al00_Writer() : WriterModule::Base("al00", false, "NXlog") {} ~al00_Writer() override = default; diff --git a/src/WriterModule/da00/da00_Writer.cpp b/src/WriterModule/da00/da00_Writer.cpp index eb94cb8a5..3c15c29ac 100644 --- a/src/WriterModule/da00/da00_Writer.cpp +++ b/src/WriterModule/da00/da00_Writer.cpp @@ -330,7 +330,8 @@ InitResult da00_Writer::reopen(hdf5::node::Group &HDFGroup) { return InitResult::OK; } -void da00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message) { +void da00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, + [[maybe_unused]] bool is_buffered_message) { const auto da00_obj = Getda00_DataArray(Message.data()); if (isFirstMessage) { handle_first_message(da00_obj); diff --git a/src/WriterModule/da00/da00_Writer.h b/src/WriterModule/da00/da00_Writer.h index c07d987bb..1f0367840 100644 --- a/src/WriterModule/da00/da00_Writer.h +++ b/src/WriterModule/da00/da00_Writer.h @@ -36,7 +36,8 @@ public: InitResult reopen(hdf5::node::Group &HDFGroup) override; - void writeImpl(FileWriter::FlatbufferMessage const &Message) override; + void writeImpl(FileWriter::FlatbufferMessage const &Message, + bool is_buffered_message) override; NeXusDataset::Time Timestamp; NeXusDataset::CueIndex CueIndex; diff --git a/src/WriterModule/ep01/ep01_Writer.cpp b/src/WriterModule/ep01/ep01_Writer.cpp index 6b13d5786..b0cf09150 100644 --- a/src/WriterModule/ep01/ep01_Writer.cpp +++ b/src/WriterModule/ep01/ep01_Writer.cpp @@ -35,7 +35,8 @@ InitResult ep01_Writer::init_hdf(hdf5::node::Group &HDFGroup) { return InitResult::OK; } -void ep01_Writer::writeImpl(FileWriter::FlatbufferMessage const &Message) { +void ep01_Writer::writeImpl(FileWriter::FlatbufferMessage const &Message, + [[maybe_unused]] bool is_buffered_message) { auto FlatBuffer = GetEpicsPVConnectionInfo(Message.data()); std::int16_t Status = static_cast<std::int16_t>(FlatBuffer->status()); StatusDataset.appendElement(Status); diff --git a/src/WriterModule/ep01/ep01_Writer.h b/src/WriterModule/ep01/ep01_Writer.h index de0537c92..2013876ce 100644 --- a/src/WriterModule/ep01/ep01_Writer.h +++ b/src/WriterModule/ep01/ep01_Writer.h @@ -9,7 +9,8 @@ class ep01_Writer final : public WriterModule::Base { public: InitResult init_hdf(hdf5::node::Group &HDFGroup) override; InitResult reopen(hdf5::node::Group &HDFGroup) override; - void writeImpl(FileWriter::FlatbufferMessage const &Message) override; + void writeImpl(FileWriter::FlatbufferMessage const &Message, + bool is_buffered_message) override; ep01_Writer() : WriterModule::Base("ep01", false, "NXlog") {} ~ep01_Writer() override = default; diff --git a/src/WriterModule/ev44/ev44_Writer.cpp b/src/WriterModule/ev44/ev44_Writer.cpp index b7a308846..4458a17fe 100644 --- a/src/WriterModule/ev44/ev44_Writer.cpp +++ b/src/WriterModule/ev44/ev44_Writer.cpp @@ -89,7 +89,8 @@ WriterModule::InitResult ev44_Writer::reopen(hdf5::node::Group &HDFGroup) { return WriterModule::InitResult::OK; } -void ev44_Writer::writeImpl(FlatbufferMessage const &Message) { +void ev44_Writer::writeImpl(FlatbufferMessage const &Message, + [[maybe_unused]] bool is_buffered_message) { auto EventMsgFlatbuffer = GetEvent44Message(Message.data()); auto CurrentNumberOfEvents = EventMsgFlatbuffer->time_of_flight()->size(); if (EventMsgFlatbuffer->pixel_id()->size() > 0 && diff --git a/src/WriterModule/ev44/ev44_Writer.h b/src/WriterModule/ev44/ev44_Writer.h index 0850818dc..850378082 100644 --- a/src/WriterModule/ev44/ev44_Writer.h +++ b/src/WriterModule/ev44/ev44_Writer.h @@ -27,7 +27,8 @@ public: /// \brief Write flatbuffer message. /// /// \param FlatBufferMessage - void writeImpl(FlatbufferMessage const &Message) override; + void writeImpl(FlatbufferMessage const &Message, + bool is_buffered_message) override; NeXusDataset::EventTimeOffset EventTimeOffset; NeXusDataset::EventId EventId; diff --git a/src/WriterModule/f144/f144_Writer.cpp b/src/WriterModule/f144/f144_Writer.cpp index 7c21d83f0..ae7e15ab6 100644 --- a/src/WriterModule/f144/f144_Writer.cpp +++ b/src/WriterModule/f144/f144_Writer.cpp @@ -194,7 +194,8 @@ void msgTypeIsConfigType(f144_Writer::Type ConfigType, Value MsgType) { } } -void f144_Writer::writeImpl(FlatbufferMessage const &Message) { +void f144_Writer::writeImpl(FlatbufferMessage const &Message, + [[maybe_unused]] bool is_buffered_message) { auto LogDataMessage = Getf144_LogData(Message.data()); Timestamp.appendElement(LogDataMessage->timestamp()); auto Type = LogDataMessage->value_type(); diff --git a/src/WriterModule/f144/f144_Writer.h b/src/WriterModule/f144/f144_Writer.h index 1841d7941..f31dba956 100644 --- a/src/WriterModule/f144/f144_Writer.h +++ b/src/WriterModule/f144/f144_Writer.h @@ -39,7 +39,8 @@ public: WriterModule::InitResult reopen(hdf5::node::Group &HDFGroup) override; /// Write an incoming message which should contain a flatbuffer. - void writeImpl(FlatbufferMessage const &Message) override; + void writeImpl(FlatbufferMessage const &Message, + bool is_buffered_message) override; f144_Writer() : WriterModule::Base("f144", false, "NXlog", diff --git a/src/WriterModule/se00/se00_Writer.cpp b/src/WriterModule/se00/se00_Writer.cpp index 0880a4892..1514eeec0 100644 --- a/src/WriterModule/se00/se00_Writer.cpp +++ b/src/WriterModule/se00/se00_Writer.cpp @@ -132,7 +132,8 @@ void msgTypeIsConfigType(se00_Writer::Type ConfigType, ValueUnion MsgType) { } } -void se00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message) { +void se00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, + [[maybe_unused]] bool is_buffered_message) { auto FbPointer = Getse00_SampleEnvironmentData(Message.data()); auto CueIndexValue = Value->current_size(); auto ValuesType = FbPointer->values_type(); diff --git a/src/WriterModule/se00/se00_Writer.h b/src/WriterModule/se00/se00_Writer.h index 43a770d13..175db231a 100644 --- a/src/WriterModule/se00/se00_Writer.h +++ b/src/WriterModule/se00/se00_Writer.h @@ -44,7 +44,8 @@ public: InitResult reopen(hdf5::node::Group &HDFGroup) override; - void writeImpl(FlatbufferMessage const &Message) override; + void writeImpl(FlatbufferMessage const &Message, + bool is_buffered_message) override; enum class Type { int8, diff --git a/src/WriterModule/tdct/tdct_Writer.cpp b/src/WriterModule/tdct/tdct_Writer.cpp index fa0551a64..e7218b18c 100644 --- a/src/WriterModule/tdct/tdct_Writer.cpp +++ b/src/WriterModule/tdct/tdct_Writer.cpp @@ -64,7 +64,8 @@ WriterModule::InitResult tdct_Writer::reopen(hdf5::node::Group &HDFGroup) { return WriterModule::InitResult::OK; } -void tdct_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message) { +void tdct_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, + [[maybe_unused]] bool is_buffered_message) { auto FbPointer = Gettimestamp(Message.data()); auto TempTimePtr = FbPointer->timestamps()->data(); auto TempTimeSize = FbPointer->timestamps()->size(); diff --git a/src/WriterModule/tdct/tdct_Writer.h b/src/WriterModule/tdct/tdct_Writer.h index d82d770b3..f82f7c350 100644 --- a/src/WriterModule/tdct/tdct_Writer.h +++ b/src/WriterModule/tdct/tdct_Writer.h @@ -34,7 +34,8 @@ public: InitResult reopen(hdf5::node::Group &HDFGroup) override; - void writeImpl(FlatbufferMessage const &Message) override; + void writeImpl(FlatbufferMessage const &Message, + bool is_buffered_message) override; protected: NeXusDataset::Time Timestamp; diff --git a/src/WriterModule/template/TemplateWriter.h b/src/WriterModule/template/TemplateWriter.h index 867e15f3d..c0192f316 100644 --- a/src/WriterModule/template/TemplateWriter.h +++ b/src/WriterModule/template/TemplateWriter.h @@ -186,7 +186,7 @@ public: /// \param Message The structure containing a pointer to a buffer /// containing data received from the Kafka broker and the size of the buffer. // cppcheck-suppress functionStatic - void writeImpl(FileWriter::FlatbufferMessage const &/*Message*/) override { + void writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const &message, [[maybe_unused]]bool is_buffered_message) override { std::cout << "WriterClass::writeImpl()\n"; } }; diff --git a/src/WriterModuleBase.h b/src/WriterModuleBase.h index b0f98ddc3..fd0ed03a3 100644 --- a/src/WriterModuleBase.h +++ b/src/WriterModuleBase.h @@ -111,15 +111,17 @@ public: /// \brief Increment write counter and call the subclass-specific write logic. /// /// \param msg The message to process - void write(FileWriter::FlatbufferMessage const &Message) { - writeImpl(Message); + void write(FileWriter::FlatbufferMessage const &Message, + bool is_buffered_message) { + writeImpl(Message, is_buffered_message); WriteCount++; } /// \brief Process the message in some way, for example write to the HDF file. /// /// \param msg The message to process - virtual void writeImpl(FileWriter::FlatbufferMessage const &Message) = 0; + virtual void writeImpl(FileWriter::FlatbufferMessage const &Message, + bool is_buffered_message) = 0; void registerField(JsonConfig::FieldBase *Ptr) { ConfigHandler.registerField(Ptr); diff --git a/tests/SourceTests.cpp b/tests/SourceTests.cpp index f2d8c5d30..efc2a24c4 100644 --- a/tests/SourceTests.cpp +++ b/tests/SourceTests.cpp @@ -34,7 +34,7 @@ public: class WriterModuleMock : public StubWriterModule { public: - MAKE_MOCK1(writeImpl, void(FlatbufferMessage const &), override); + MAKE_MOCK2(writeImpl, void(FlatbufferMessage const &, bool), override); }; TEST_F(SourceTests, ConstructorSetsMembers) { diff --git a/tests/Stream/MessageWriterTests.cpp b/tests/Stream/MessageWriterTests.cpp index bff7af812..d4155a233 100644 --- a/tests/Stream/MessageWriterTests.cpp +++ b/tests/Stream/MessageWriterTests.cpp @@ -25,7 +25,7 @@ public: ~WriterModuleStandIn() = default; MAKE_MOCK1(init_hdf, WriterModule::InitResult(hdf5::node::Group &), override); MAKE_MOCK1(reopen, WriterModule::InitResult(hdf5::node::Group &), override); - MAKE_MOCK1(writeImpl, void(FileWriter::FlatbufferMessage const &), override); + MAKE_MOCK2(writeImpl, void(FileWriter::FlatbufferMessage const &, bool), override); }; class DataMessageWriterTest : public ::testing::Test { @@ -79,7 +79,7 @@ TEST_F(DataMessageWriterTest, EnableExtraModule) { } TEST_F(DataMessageWriterTest, WriteMessageSuccess) { - REQUIRE_CALL(WriterModule, writeImpl(_)).TIMES(1); + REQUIRE_CALL(WriterModule, writeImpl(_, _)).TIMES(1); auto InitialWriteCount = WriterModule.getWriteCount(); FileWriter::FlatbufferMessage Msg; Stream::Message SomeMessage( @@ -87,7 +87,7 @@ TEST_F(DataMessageWriterTest, WriteMessageSuccess) { { Stream::MessageWriter Writer{ []() {}, 1s, std::make_unique<Metrics::Registrar>("some_prefix")}; - Writer.addMessage(SomeMessage); + Writer.addMessage(SomeMessage, false); Writer.runJob([&Writer]() { EXPECT_TRUE(Writer.nrOfWritesDone() == 1); EXPECT_TRUE(Writer.nrOfWriteErrors() == 0); @@ -97,7 +97,7 @@ TEST_F(DataMessageWriterTest, WriteMessageSuccess) { } TEST_F(DataMessageWriterTest, WriteMessageExceptionUnknownFb) { - REQUIRE_CALL(WriterModule, writeImpl(_)) + REQUIRE_CALL(WriterModule, writeImpl(_, _)) .TIMES(1) .THROW(WriterModule::WriterException("Some error.")); auto InitialWriteCount = WriterModule.getWriteCount(); @@ -110,7 +110,7 @@ TEST_F(DataMessageWriterTest, WriteMessageExceptionUnknownFb) { Writer.runJob([&Writer]() { EXPECT_TRUE(Writer.nrOfWriterModulesWithErrors() == 1); }); - Writer.addMessage(SomeMessage); + Writer.addMessage(SomeMessage, false); Writer.runJob([&Writer]() { EXPECT_TRUE(Writer.nrOfWritesDone() == 0); EXPECT_TRUE(Writer.nrOfWriteErrors() == 1); diff --git a/tests/Stream/SourceFilterTest.cpp b/tests/Stream/SourceFilterTest.cpp index 1f4d0c562..25d358bd7 100644 --- a/tests/Stream/SourceFilterTest.cpp +++ b/tests/Stream/SourceFilterTest.cpp @@ -17,8 +17,8 @@ class StubMessageWriter : public Stream::MessageWriter { public: StubMessageWriter() : MessageWriter([]() {}, 1s, std::make_unique<Metrics::Registrar>("")) {} - void addMessage(Stream::Message const &Msg) override { - messages_received.emplace_back(Msg); + void addMessage(Stream::Message const &message, [[maybe_unused]] bool is_buffered_message) override { + messages_received.emplace_back(message); } std::vector<Stream::Message> messages_received; }; diff --git a/tests/WriterModule/TemplateWriterTests.cpp b/tests/WriterModule/TemplateWriterTests.cpp index 09f9e2037..f23a9e7b6 100644 --- a/tests/WriterModule/TemplateWriterTests.cpp +++ b/tests/WriterModule/TemplateWriterTests.cpp @@ -15,5 +15,5 @@ TEST(TemplateTests, WriterReturnValues) { hdf5::node::Group SomeGroup; EXPECT_TRUE(SomeWriter.init_hdf(SomeGroup) == WriterModule::InitResult::OK); EXPECT_TRUE(SomeWriter.reopen(SomeGroup) == WriterModule::InitResult::OK); - EXPECT_NO_THROW(SomeWriter.write(FileWriter::FlatbufferMessage())); + EXPECT_NO_THROW(SomeWriter.write(FileWriter::FlatbufferMessage(), false)); } diff --git a/tests/WriterModule/al00_WriterTests.cpp b/tests/WriterModule/al00_WriterTests.cpp index d0c103b99..5e9f6d9c8 100644 --- a/tests/WriterModule/al00_WriterTests.cpp +++ b/tests/WriterModule/al00_WriterTests.cpp @@ -89,7 +89,7 @@ TEST_F(EPICS_AlarmWriter, WriteDataOnce) { EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK); FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize); - EXPECT_NO_THROW(Writer.write(TestMsg)); + EXPECT_NO_THROW(Writer.write(TestMsg, false)); auto AlarmMsgDataset = UsedGroup.get_dataset("alarm_message"); auto AlarmSeverityDataset = UsedGroup.get_dataset("alarm_severity"); auto AlarmTimeDataset = UsedGroup.get_dataset("alarm_time"); diff --git a/tests/WriterModule/da00_WriterTests.cpp b/tests/WriterModule/da00_WriterTests.cpp index c37d0bc7c..3d46bfe67 100644 --- a/tests/WriterModule/da00_WriterTests.cpp +++ b/tests/WriterModule/da00_WriterTests.cpp @@ -747,7 +747,7 @@ TEST_F(da00_WriterTestFixture, da00_WriterWriteExtendsShape) { writer.init_hdf(_group); writer.reopen(_group); for (int i = 0; i < write_count; ++i) - EXPECT_NO_THROW(writer.write(message)); + EXPECT_NO_THROW(writer.write(message, false)); EXPECT_EQ(write_count, writer.Timestamp.current_size()); } for (const auto &name : @@ -774,7 +774,7 @@ TEST_F(da00_WriterTestFixture, da00_WriterWriteCopiesData) { writer.reopen(_group); constexpr int write_count{10}; for (int i = 0; i < write_count; ++i) - EXPECT_NO_THROW(writer.write(message)); + EXPECT_NO_THROW(writer.write(message, false)); EXPECT_EQ(write_count, writer.Timestamp.current_size()); } const auto shape = Simple(_group.get_dataset("signal").dataspace()); @@ -797,7 +797,7 @@ TEST_F(da00_WriterTestFixture, da00_WriterFillsInAttributes) { writer.reopen(_group); constexpr int write_count{10}; for (int i = 0; i < write_count; ++i) - EXPECT_NO_THROW(writer.write(message)); + EXPECT_NO_THROW(writer.write(message, false)); EXPECT_EQ(write_count, writer.Timestamp.current_size()); } for (const auto &name : {"signal", "signal_error", "gain", "x", "y", "time", diff --git a/tests/WriterModule/ep01_WriterTests.cpp b/tests/WriterModule/ep01_WriterTests.cpp index 088bf7c2c..40664fc91 100644 --- a/tests/WriterModule/ep01_WriterTests.cpp +++ b/tests/WriterModule/ep01_WriterTests.cpp @@ -87,7 +87,7 @@ TEST_F(EPICS_ConStatusWriter, WriteDataOnce) { EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK); FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize); - EXPECT_NO_THROW(Writer.write(TestMsg)); + EXPECT_NO_THROW(Writer.write(TestMsg, false)); auto ConStatusTimeDataset = UsedGroup.get_dataset("connection_status_time"); auto ConStatusDataset = UsedGroup.get_dataset("connection_status"); auto FbPointer = GetEpicsPVConnectionInfo(TestMsg.data()); diff --git a/tests/WriterModule/ev44_WriterTests.cpp b/tests/WriterModule/ev44_WriterTests.cpp index 748f7de19..f2e7b1f86 100644 --- a/tests/WriterModule/ev44_WriterTests.cpp +++ b/tests/WriterModule/ev44_WriterTests.cpp @@ -163,7 +163,7 @@ TEST_F(Event44WriterTests, WriterModule::ev44::ev44_Writer Writer; EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK); - EXPECT_NO_THROW(Writer.write(TestMessage)); + EXPECT_NO_THROW(Writer.write(TestMessage, false)); } // These braces are required due to "h5.cpp" // Read data from the file @@ -217,7 +217,7 @@ TEST_F(Event44WriterTests, WriterSuccessfullyRecordsEventDataWithoutPixelIds) { WriterModule::ev44::ev44_Writer Writer; EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK); - EXPECT_NO_THROW(Writer.write(TestMessage)); + EXPECT_NO_THROW(Writer.write(TestMessage, false)); } // These braces are required due to "h5.cpp" // Read data from the file @@ -280,8 +280,8 @@ TEST_F(Event44WriterTests, WriterSuccessfullyRecordsEventDataFromTwoMessages) { WriterModule::ev44::ev44_Writer Writer; EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK); - EXPECT_NO_THROW(Writer.write(TestMessage1)); - EXPECT_NO_THROW(Writer.write(TestMessage2)); + EXPECT_NO_THROW(Writer.write(TestMessage1, false)); + EXPECT_NO_THROW(Writer.write(TestMessage2, false)); } // These braces are required due to "h5.cpp" // Read data from the file @@ -369,9 +369,9 @@ TEST_F(Event44WriterTests, WriterSuccessfullyHandlesMessageWithNoEvents) { WriterModule::ev44::ev44_Writer Writer; EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK); - EXPECT_NO_THROW(Writer.write(TestMessage1)); - EXPECT_NO_THROW(Writer.write(TestMessage2)); - EXPECT_NO_THROW(Writer.write(TestMessage3)); + EXPECT_NO_THROW(Writer.write(TestMessage1, false)); + EXPECT_NO_THROW(Writer.write(TestMessage2, false)); + EXPECT_NO_THROW(Writer.write(TestMessage3, false)); } // These braces are required due to "h5.cpp" // Read data from the file @@ -445,7 +445,7 @@ TEST_F(Event44WriterTests, PulseTimeIsRepeatedWithinSameMessage) { WriterModule::ev44::ev44_Writer Writer; EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK); - EXPECT_NO_THROW(Writer.write(TestMessage)); + EXPECT_NO_THROW(Writer.write(TestMessage, false)); } // These braces are required due to "h5.cpp" // Read data from the file @@ -510,8 +510,8 @@ TEST_F(Event44WriterTests, LastPulseTimeIsRepeatedInSubsequentMessage) { WriterModule::ev44::ev44_Writer Writer; EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK); - EXPECT_NO_THROW(Writer.write(TestMessage1)); - EXPECT_NO_THROW(Writer.write(TestMessage2)); + EXPECT_NO_THROW(Writer.write(TestMessage1, false)); + EXPECT_NO_THROW(Writer.write(TestMessage2, false)); } // These braces are required due to "h5.cpp" // Read data from the file @@ -581,8 +581,8 @@ TEST_F(Event44WriterTests, CuesFromTwoMessagesAreRecorded) { Writer.setCueInterval(1); EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK); - EXPECT_NO_THROW(Writer.write(TestMessage1)); - EXPECT_NO_THROW(Writer.write(TestMessage2)); + EXPECT_NO_THROW(Writer.write(TestMessage1, false)); + EXPECT_NO_THROW(Writer.write(TestMessage2, false)); } // These braces are required due to "h5.cpp" // Read data from the file diff --git a/tests/WriterModule/f144_WriterTests.cpp b/tests/WriterModule/f144_WriterTests.cpp index 05b73f298..0948b4c32 100644 --- a/tests/WriterModule/f144_WriterTests.cpp +++ b/tests/WriterModule/f144_WriterTests.cpp @@ -313,7 +313,7 @@ TEST_F(f144Init, write_one_element) { EXPECT_EQ(FlatbufferMsg.getFlatbufferID(), "f144"); EXPECT_EQ(TestWriter.Values.size(), 0u); EXPECT_EQ(TestWriter.Timestamp.current_size(), 0); - TestWriter.write(FlatbufferMsg); + TestWriter.write(FlatbufferMsg, false); EXPECT_EQ(TestWriter.Values.size(), 1u); ASSERT_EQ(TestWriter.Timestamp.current_size(), 1); std::vector<double> WrittenValues(1); @@ -338,7 +338,8 @@ TEST_F(f144Init, write_one_default_value_element) { EXPECT_EQ(TestWriter.Values.size(), 0u); EXPECT_EQ(TestWriter.Timestamp.current_size(), 0); TestWriter.write(FileWriter::FlatbufferMessage(FlatbufferData.first.get(), - FlatbufferData.second)); + FlatbufferData.second), + false); EXPECT_EQ(TestWriter.Values.size(), 1u); ASSERT_EQ(TestWriter.Timestamp.current_size(), 1); std::vector<double> WrittenValues(1); @@ -360,7 +361,7 @@ TEST_F(f144Init, write_multiple_elements) { auto [buffer, size] = f144_schema::generateFlatbufferMessage(values[i], timestamps[i]); FileWriter::FlatbufferMessage FlatbufferMsg(buffer.get(), size); - TestWriter.write(FlatbufferMsg); + TestWriter.write(FlatbufferMsg, false); } std::vector<double> WrittenValues(values.size()); diff --git a/tests/WriterModule/se00_WriterTests.cpp b/tests/WriterModule/se00_WriterTests.cpp index b7a663454..74bb73a92 100644 --- a/tests/WriterModule/se00_WriterTests.cpp +++ b/tests/WriterModule/se00_WriterTests.cpp @@ -109,7 +109,7 @@ TEST_F(se00Writer, WriteDataOnce) { EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK); FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize); - EXPECT_NO_THROW(Writer.write(TestMsg)); + EXPECT_NO_THROW(Writer.write(TestMsg, false)); auto RawValuesDataset = UsedGroup.get_dataset("value"); auto TimestampDataset = UsedGroup.get_dataset("time"); auto CueIndexDataset = UsedGroup.get_dataset("cue_index"); @@ -151,8 +151,8 @@ TEST_F(se00Writer, WriteDataTwice) { EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK); FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize); - EXPECT_NO_THROW(Writer.write(TestMsg)); - EXPECT_NO_THROW(Writer.write(TestMsg)); + EXPECT_NO_THROW(Writer.write(TestMsg, false)); + EXPECT_NO_THROW(Writer.write(TestMsg, false)); auto RawValuesDataset = UsedGroup.get_dataset("value"); auto TimestampDataset = UsedGroup.get_dataset("time"); auto CueIndexDataset = UsedGroup.get_dataset("cue_index"); @@ -190,7 +190,7 @@ TEST_F(se00Writer, WriteNoElements) { EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK); FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize); - EXPECT_NO_THROW(Writer.write(TestMsg)); + EXPECT_NO_THROW(Writer.write(TestMsg, false)); auto RawValuesDataset = UsedGroup.get_dataset("value"); auto TimestampDataset = UsedGroup.get_dataset("time"); auto CueIndexDataset = UsedGroup.get_dataset("cue_index"); @@ -209,7 +209,7 @@ TEST_F(se00Writer, WriteDataWithNoTimestampsInFB) { EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK); FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize); - EXPECT_NO_THROW(Writer.write(TestMsg)); + EXPECT_NO_THROW(Writer.write(TestMsg, false)); auto RawValuesDataset = UsedGroup.get_dataset("value"); auto TimestampDataset = UsedGroup.get_dataset("time"); auto CueIndexDataset = UsedGroup.get_dataset("cue_index"); diff --git a/tests/WriterModule/tdct_WriterTests.cpp b/tests/WriterModule/tdct_WriterTests.cpp index a80559b55..e8d0e8d41 100644 --- a/tests/WriterModule/tdct_WriterTests.cpp +++ b/tests/WriterModule/tdct_WriterTests.cpp @@ -89,7 +89,7 @@ TEST_F(ChopperTimeStampWriter, WriteDataOnce) { EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK); FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize); - EXPECT_NO_THROW(Writer.write(TestMsg)); + EXPECT_NO_THROW(Writer.write(TestMsg, false)); auto TimestampDataset = UsedGroup.get_dataset("time"); auto CueIndexDataset = UsedGroup.get_dataset("cue_index"); auto CueTimestampZeroDataset = UsedGroup.get_dataset("cue_timestamp_zero"); @@ -119,8 +119,8 @@ TEST_F(ChopperTimeStampWriter, WriteDataTwice) { EXPECT_TRUE(Writer.init_hdf(UsedGroup) == InitResult::OK); EXPECT_TRUE(Writer.reopen(UsedGroup) == InitResult::OK); FileWriter::FlatbufferMessage TestMsg(Buffer.get(), BufferSize); - EXPECT_NO_THROW(Writer.write(TestMsg)); - EXPECT_NO_THROW(Writer.write(TestMsg)); + EXPECT_NO_THROW(Writer.write(TestMsg, false)); + EXPECT_NO_THROW(Writer.write(TestMsg, false)); auto TimestampDataset = UsedGroup.get_dataset("time"); auto CueIndexDataset = UsedGroup.get_dataset("cue_index"); auto CueTimestampZeroDataset = UsedGroup.get_dataset("cue_timestamp_zero"); diff --git a/tests/helpers/StubWriterModule.h b/tests/helpers/StubWriterModule.h index 0fc753363..9e6e3f090 100644 --- a/tests/helpers/StubWriterModule.h +++ b/tests/helpers/StubWriterModule.h @@ -22,5 +22,5 @@ public: InitResult reopen(hdf5::node::Group & /*HDFGroup*/) override { return InitResult::OK; } - void writeImpl(FileWriter::FlatbufferMessage const & /*Message*/) override {} + void writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const & message, [[maybe_unused]] bool is_buffered_message) override {} }; -- GitLab From 76e916a2283bdaebb8932529653cd613d6b29ac8 Mon Sep 17 00:00:00 2001 From: Matt Clarke <matt.clarke@ess.eu> Date: Mon, 2 Dec 2024 14:19:38 +0100 Subject: [PATCH 02/11] ev44 now ignores buffered messages --- src/WriterModule/ev44/ev44_Writer.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/WriterModule/ev44/ev44_Writer.cpp b/src/WriterModule/ev44/ev44_Writer.cpp index 4458a17fe..9b4365539 100644 --- a/src/WriterModule/ev44/ev44_Writer.cpp +++ b/src/WriterModule/ev44/ev44_Writer.cpp @@ -90,7 +90,11 @@ WriterModule::InitResult ev44_Writer::reopen(hdf5::node::Group &HDFGroup) { } void ev44_Writer::writeImpl(FlatbufferMessage const &Message, - [[maybe_unused]] bool is_buffered_message) { + bool is_buffered_message) { + if (is_buffered_message) { + // Ignore buffered data for event data + return; + } auto EventMsgFlatbuffer = GetEvent44Message(Message.data()); auto CurrentNumberOfEvents = EventMsgFlatbuffer->time_of_flight()->size(); if (EventMsgFlatbuffer->pixel_id()->size() > 0 && -- GitLab From 8d0aaae3f73844eaf23388a01a29e00bfa4dda0a Mon Sep 17 00:00:00 2001 From: Matt Clarke <matt.clarke@ess.eu> Date: Mon, 2 Dec 2024 14:53:55 +0100 Subject: [PATCH 03/11] clang formatting --- .../data_files/messages_before_and_after_data.json | 8 ++++---- domain-tests/test_messages_before_and_after.py | 4 ++-- tests/ProducerTests.cpp | 4 +--- tests/Stream/MessageWriterTests.cpp | 3 ++- tests/Stream/SourceFilterTest.cpp | 3 ++- tests/helpers/StubWriterModule.h | 3 ++- 6 files changed, 13 insertions(+), 12 deletions(-) diff --git a/domain-tests/data_files/messages_before_and_after_data.json b/domain-tests/data_files/messages_before_and_after_data.json index 91f5f8e50..7140c5078 100644 --- a/domain-tests/data_files/messages_before_and_after_data.json +++ b/domain-tests/data_files/messages_before_and_after_data.json @@ -122,20 +122,20 @@ { "schema": "ev44", "topic": "local_detector", - "kafka_timestamp": 16000, + "kafka_timestamp": 15000, "source_name": "detector_events", "message_id": 669, - "reference_time": 16000, + "reference_time": 15000, "time_of_flight": [130, 140, 150, 160], "pixel_ids": [1, 2, 3, 4] }, { "schema": "ev44", "topic": "local_detector", - "kafka_timestamp": 17000, + "kafka_timestamp": 16000, "source_name": "detector_events", "message_id": 670, - "reference_time": 17000, + "reference_time": 16000, "time_of_flight": [170, 180, 190, 200], "pixel_ids": [1, 2, 3, 4] } diff --git a/domain-tests/test_messages_before_and_after.py b/domain-tests/test_messages_before_and_after.py index ec474814b..8ce8eb3e2 100644 --- a/domain-tests/test_messages_before_and_after.py +++ b/domain-tests/test_messages_before_and_after.py @@ -35,9 +35,9 @@ def test_ev44_data_before_start_is_not_written(local_file): ) -def test_first_ev44_data_after_stop_written_but_later_values_ignored(local_file): +def test_ev44_data_after_stop_not_written(local_file): with h5py.File(local_file, "r") as f: assert ( f["/entry/instrument/event_detector/events/event_time_zero"][~0] - == 16_000_000_000 + == 15_000_000_000 ) diff --git a/tests/ProducerTests.cpp b/tests/ProducerTests.cpp index 9962af48a..259100307 100644 --- a/tests/ProducerTests.cpp +++ b/tests/ProducerTests.cpp @@ -43,9 +43,7 @@ public: int64_t /*offset*/) override { return RdKafka::ERR_NO_ERROR; }; - struct rd_kafka_topic_s *c_ptr() override { - return {}; - }; + struct rd_kafka_topic_s *c_ptr() override { return {}; }; }; TEST_F(ProducerTests, creatingForwarderIncrementsForwarderCounter) { diff --git a/tests/Stream/MessageWriterTests.cpp b/tests/Stream/MessageWriterTests.cpp index d4155a233..8db83f246 100644 --- a/tests/Stream/MessageWriterTests.cpp +++ b/tests/Stream/MessageWriterTests.cpp @@ -25,7 +25,8 @@ public: ~WriterModuleStandIn() = default; MAKE_MOCK1(init_hdf, WriterModule::InitResult(hdf5::node::Group &), override); MAKE_MOCK1(reopen, WriterModule::InitResult(hdf5::node::Group &), override); - MAKE_MOCK2(writeImpl, void(FileWriter::FlatbufferMessage const &, bool), override); + MAKE_MOCK2(writeImpl, void(FileWriter::FlatbufferMessage const &, bool), + override); }; class DataMessageWriterTest : public ::testing::Test { diff --git a/tests/Stream/SourceFilterTest.cpp b/tests/Stream/SourceFilterTest.cpp index 25d358bd7..6806f40ba 100644 --- a/tests/Stream/SourceFilterTest.cpp +++ b/tests/Stream/SourceFilterTest.cpp @@ -17,7 +17,8 @@ class StubMessageWriter : public Stream::MessageWriter { public: StubMessageWriter() : MessageWriter([]() {}, 1s, std::make_unique<Metrics::Registrar>("")) {} - void addMessage(Stream::Message const &message, [[maybe_unused]] bool is_buffered_message) override { + void addMessage(Stream::Message const &message, + [[maybe_unused]] bool is_buffered_message) override { messages_received.emplace_back(message); } std::vector<Stream::Message> messages_received; diff --git a/tests/helpers/StubWriterModule.h b/tests/helpers/StubWriterModule.h index 9e6e3f090..3b05f4720 100644 --- a/tests/helpers/StubWriterModule.h +++ b/tests/helpers/StubWriterModule.h @@ -22,5 +22,6 @@ public: InitResult reopen(hdf5::node::Group & /*HDFGroup*/) override { return InitResult::OK; } - void writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const & message, [[maybe_unused]] bool is_buffered_message) override {} + void writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const &message, + [[maybe_unused]] bool is_buffered_message) override {} }; -- GitLab From 6864cb60cd53ade993fba9a88a42c37c218df03d Mon Sep 17 00:00:00 2001 From: Matt Clarke <matt.clarke@ess.eu> Date: Tue, 3 Dec 2024 15:11:12 +0100 Subject: [PATCH 04/11] adjusted test --- integration-tests/test_filewriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/test_filewriter.py b/integration-tests/test_filewriter.py index 5633fc25a..c40e5cd3b 100644 --- a/integration-tests/test_filewriter.py +++ b/integration-tests/test_filewriter.py @@ -379,7 +379,7 @@ class TestFileWriter: with h5py.File(os.path.join(OUTPUT_DIR, file_name), "r") as f: # Check data is as expected - assert len(f["/entry/detector/event_time_zero"]) == 8 + assert len(f["/entry/detector/event_time_zero"]) == 7 assert f["/entry/detector/event_id"][0] == 15 assert f["/entry/detector/event_id"][~0] == 54 assert len(f["/entry/motion/time"]) == 8 -- GitLab From afa37d73c52b0ea93d64410daf54edf0b4783a3f Mon Sep 17 00:00:00 2001 From: Matt Clarke <matt.clarke@ess.eu> Date: Wed, 4 Dec 2024 14:19:08 +0100 Subject: [PATCH 05/11] don't write data after stop time --- integration-tests/test_filewriter.py | 17 +++++++++-------- src/Stream/SourceFilter.cpp | 2 ++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/integration-tests/test_filewriter.py b/integration-tests/test_filewriter.py index c40e5cd3b..211d041d6 100644 --- a/integration-tests/test_filewriter.py +++ b/integration-tests/test_filewriter.py @@ -341,9 +341,10 @@ class TestFileWriter: def test_data_before_and_after(self, file_writer): """ Test that when writing in the past: - - the last value before the start time is written - - the first value after the stop time is written - - all values inbetween are written + - the last value before the start time is written for f144 + - the first value after the stop time is written for f144 + - all f144 values inbetween are written + - ev44 only writes the values inbetween the start and stop times """ producer = Producer({"bootstrap.servers": ",".join(get_brokers())}) @@ -379,12 +380,12 @@ class TestFileWriter: with h5py.File(os.path.join(OUTPUT_DIR, file_name), "r") as f: # Check data is as expected - assert len(f["/entry/detector/event_time_zero"]) == 7 - assert f["/entry/detector/event_id"][0] == 15 - assert f["/entry/detector/event_id"][~0] == 54 - assert len(f["/entry/motion/time"]) == 8 + assert len(f["/entry/detector/event_time_zero"]) == 6 + assert f["/entry/detector/event_id"][0] == 20 + assert f["/entry/detector/event_id"][~0] == 49 + assert len(f["/entry/motion/time"]) == 7 assert f["/entry/motion/value"][0] == 3 - assert f["/entry/motion/value"][~0] == 10 + assert f["/entry/motion/value"][~0] == 9 def test_start_and_stop_in_the_future(self, file_writer): """ diff --git a/src/Stream/SourceFilter.cpp b/src/Stream/SourceFilter.cpp index 5defc44cf..586e54426 100644 --- a/src/Stream/SourceFilter.cpp +++ b/src/Stream/SourceFilter.cpp @@ -86,6 +86,8 @@ bool SourceFilter::filter_message( } if (message_time > _stop_time) { _is_finished = true; + forward_buffered_message(); + return false; } forward_buffered_message(); forward_message(message); -- GitLab From b13cf7d8abe1eea3d8d38cfc1cf555e4d0abd2ba Mon Sep 17 00:00:00 2001 From: Matt Clarke <matt.clarke@ess.eu> Date: Wed, 4 Dec 2024 15:38:53 +0100 Subject: [PATCH 06/11] clang formatting --- tests/ProducerTests.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/ProducerTests.cpp b/tests/ProducerTests.cpp index 259100307..9962af48a 100644 --- a/tests/ProducerTests.cpp +++ b/tests/ProducerTests.cpp @@ -43,7 +43,9 @@ public: int64_t /*offset*/) override { return RdKafka::ERR_NO_ERROR; }; - struct rd_kafka_topic_s *c_ptr() override { return {}; }; + struct rd_kafka_topic_s *c_ptr() override { + return {}; + }; }; TEST_F(ProducerTests, creatingForwarderIncrementsForwarderCounter) { -- GitLab From badbb407c19131767797cd38383a1dc8baab2e76 Mon Sep 17 00:00:00 2001 From: Matt Clarke <matt.clarke@ess.eu> Date: Thu, 5 Dec 2024 08:34:16 +0100 Subject: [PATCH 07/11] update tests to match changes --- tests/Stream/SourceFilterTest.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/Stream/SourceFilterTest.cpp b/tests/Stream/SourceFilterTest.cpp index 6806f40ba..72b3f0a5c 100644 --- a/tests/Stream/SourceFilterTest.cpp +++ b/tests/Stream/SourceFilterTest.cpp @@ -116,15 +116,13 @@ TEST(SourceFilter, message_on_start_is_allowed_through) { EXPECT_EQ(1u, harness.writer->messages_received.size()); } -TEST(SourceFilter, first_message_after_stop_is_allowed_through) { +TEST(SourceFilter, messages_after_stop_are_not_allowed_through) { auto harness = create_filter_for_tests(time_point{0ms}, time_point{1000ms}); harness.filter->filter_message(create_f144_message("::source::", 1, 1001)); harness.filter->filter_message(create_f144_message("::source::", 2, 1002)); - EXPECT_EQ(1u, harness.writer->messages_received.size()); - auto first = harness.writer->messages_received.at(0).FbMsg; - EXPECT_EQ(1001000000, first.getTimestamp()); // timestamp is in ns + EXPECT_EQ(0u, harness.writer->messages_received.size()); } TEST(SourceFilter, message_after_stop_sets_filter_to_finished) { @@ -164,11 +162,13 @@ TEST(SourceFilter, can_change_stop_time_after_construction) { harness.filter->set_stop_time(time_point{1000ms}); - // First message after stop is allowed + // Messages after stop are not allowed + harness.filter->filter_message(create_f144_message("::source::", 1, 999)); + harness.filter->filter_message(create_f144_message("::source::", 1, 1000)); harness.filter->filter_message(create_f144_message("::source::", 1, 1001)); harness.filter->filter_message(create_f144_message("::source::", 2, 1002)); - EXPECT_EQ(1u, harness.writer->messages_received.size()); + EXPECT_EQ(2u, harness.writer->messages_received.size()); } TEST(SourceFilter, -- GitLab From 499ac6090c18d1c6e929428de332f5e9dab7b69f Mon Sep 17 00:00:00 2001 From: Matt Clarke <matt.clarke@ess.eu> Date: Thu, 5 Dec 2024 13:17:46 +0100 Subject: [PATCH 08/11] missed domain test --- domain-tests/test_messages_before_and_after.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/domain-tests/test_messages_before_and_after.py b/domain-tests/test_messages_before_and_after.py index 8ce8eb3e2..eff2c7126 100644 --- a/domain-tests/test_messages_before_and_after.py +++ b/domain-tests/test_messages_before_and_after.py @@ -21,10 +21,10 @@ def test_last_f144_data_before_start_is_written_but_earlier_values_ignored(local assert f["/entry/instrument/chopper/delay/time"][0] == 9_999_000_000 -def test_first_f144_data_after_stop_written_but_later_values_ignored(local_file): +def test_f144_data_after_stop_not_written(local_file): with h5py.File(local_file, "r") as f: - assert f["/entry/instrument/chopper/delay/value"][~0] == 17 - assert f["/entry/instrument/chopper/delay/time"][~0] == 16_000_000_000 + assert f["/entry/instrument/chopper/delay/value"][~0] == 16 + assert f["/entry/instrument/chopper/delay/time"][~0] == 15_000_000_000 def test_ev44_data_before_start_is_not_written(local_file): -- GitLab From 82f60167683027f171f023bdb9c134cac46345c8 Mon Sep 17 00:00:00 2001 From: Matt Clarke <matt.clarke@ess.eu> Date: Thu, 5 Dec 2024 13:49:10 +0100 Subject: [PATCH 09/11] added domain test to check f144 buffered value is written --- .gitignore | 3 +- domain-tests/data_files/buffered_data.json | 18 ++++ .../nexus_templates/buffered_template.json | 89 +++++++++++++++++++ domain-tests/test_buffered.py | 20 +++++ .../test_messages_before_and_after.py | 1 - 5 files changed, 129 insertions(+), 2 deletions(-) create mode 100644 domain-tests/data_files/buffered_data.json create mode 100644 domain-tests/nexus_templates/buffered_template.json create mode 100644 domain-tests/test_buffered.py diff --git a/.gitignore b/.gitignore index 098f81eba..9a7e609c2 100644 --- a/.gitignore +++ b/.gitignore @@ -10,7 +10,8 @@ pipeline.gdsl *.j2~ *.h~ *.cxx~ -/integration-tests/output-files/*.nxs +/integration-tests/output-files/*.* +/domain-tests/output_files/*.* venv *.log *.DS_Store diff --git a/domain-tests/data_files/buffered_data.json b/domain-tests/data_files/buffered_data.json new file mode 100644 index 000000000..919dbb570 --- /dev/null +++ b/domain-tests/data_files/buffered_data.json @@ -0,0 +1,18 @@ +[ + { + "schema": "f144", + "topic": "local_choppers", + "kafka_timestamp": 5000, + "source_name": "local:choppers:delay", + "timestamp": 5000, + "value": 2 + }, + { + "schema": "f144", + "topic": "local_choppers", + "kafka_timestamp": 9000, + "source_name": "local:choppers:delay", + "timestamp": 9000, + "value": 3 + } +] diff --git a/domain-tests/nexus_templates/buffered_template.json b/domain-tests/nexus_templates/buffered_template.json new file mode 100644 index 000000000..b6ec0427f --- /dev/null +++ b/domain-tests/nexus_templates/buffered_template.json @@ -0,0 +1,89 @@ +{ + "children": [ + { + "name": "entry", + "type": "group", + "attributes": [ + { + "name": "NX_class", + "dtype": "string", + "values": "NXentry" + } + ], + "children": [ + { + "module": "dataset", + "config": { + "name": "title", + "values": "This is a title", + "dtype": "string" + } + }, + { + "module": "mdat", + "config": { + "items": [ + "start_time", + "end_time" + ] + } + }, + { + "name": "instrument", + "type": "group", + "attributes": [ + { + "name": "NX_class", + "dtype": "string", + "values": "NXinstrument" + } + ], + "children": [ + { + "name": "chopper", + "type": "group", + "attributes": [ + { + "name": "NX_class", + "dtype": "string", + "values": "NXdisk_chopper" + } + ], + "children": [ + { + "name": "delay", + "type": "group", + "attributes": [ + { + "name": "NX_class", + "dtype": "string", + "values": "NXlog" + } + ], + "children": [ + { + "module": "f144", + "config": { + "source": "local:choppers:delay", + "topic": "local_choppers", + "dtype": "double" + } + } + ] + }, + { + "module": "dataset", + "config": { + "name": "depends_on", + "values": ".", + "dtype": "string" + } + } + ] + } + ] + } + ] + } + ] +} diff --git a/domain-tests/test_buffered.py b/domain-tests/test_buffered.py new file mode 100644 index 000000000..b7acaf87e --- /dev/null +++ b/domain-tests/test_buffered.py @@ -0,0 +1,20 @@ +import h5py +import pytest +from conftest import write_file + + +# This fixture is used to create the file +@pytest.fixture(scope="module") +def local_file(request): + return write_file( + request, + "output_files/buffered.hdf", + "nexus_templates/buffered_template.json", + "data_files/buffered_data.json", + ) + + +def test_f144_data_before_start_is_written_when_no_data_between_start_and_stop(local_file): + with h5py.File(local_file, "r") as f: + assert f["/entry/instrument/chopper/delay/value"][0] == 3 + assert f["/entry/instrument/chopper/delay/time"][0] == 9_000_000_000 \ No newline at end of file diff --git a/domain-tests/test_messages_before_and_after.py b/domain-tests/test_messages_before_and_after.py index eff2c7126..cf3c0dd02 100644 --- a/domain-tests/test_messages_before_and_after.py +++ b/domain-tests/test_messages_before_and_after.py @@ -1,5 +1,4 @@ import h5py -import numpy as np import pytest from conftest import write_file -- GitLab From c599eed98540360a9760aa50a57ce32d35d5f893 Mon Sep 17 00:00:00 2001 From: Matt Clarke <matt.clarke@ess.eu> Date: Thu, 5 Dec 2024 13:55:27 +0100 Subject: [PATCH 10/11] formatting --- domain-tests/test_buffered.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/domain-tests/test_buffered.py b/domain-tests/test_buffered.py index b7acaf87e..1c5301893 100644 --- a/domain-tests/test_buffered.py +++ b/domain-tests/test_buffered.py @@ -14,7 +14,9 @@ def local_file(request): ) -def test_f144_data_before_start_is_written_when_no_data_between_start_and_stop(local_file): +def test_f144_data_before_start_is_written_when_no_data_between_start_and_stop( + local_file, +): with h5py.File(local_file, "r") as f: assert f["/entry/instrument/chopper/delay/value"][0] == 3 - assert f["/entry/instrument/chopper/delay/time"][0] == 9_000_000_000 \ No newline at end of file + assert f["/entry/instrument/chopper/delay/time"][0] == 9_000_000_000 -- GitLab From 2054a75e20ab5e48340c8b90586c33886cc92ce6 Mon Sep 17 00:00:00 2001 From: Matt Clarke <matt.clarke@ess.eu> Date: Thu, 5 Dec 2024 14:49:34 +0100 Subject: [PATCH 11/11] unit test to confirm that ev44 buffered data is not written --- src/WriterModule/ad00/ad00_Writer.cpp | 3 ++- src/WriterModule/ad00/ad00_Writer.h | 2 +- src/WriterModule/al00/al00_Writer.cpp | 3 ++- src/WriterModule/al00/al00_Writer.h | 2 +- src/WriterModule/da00/da00_Writer.cpp | 3 ++- src/WriterModule/da00/da00_Writer.h | 2 +- src/WriterModule/ep01/ep01_Writer.cpp | 3 ++- src/WriterModule/ep01/ep01_Writer.h | 2 +- src/WriterModule/ev44/ev44_Writer.cpp | 5 ++-- src/WriterModule/ev44/ev44_Writer.h | 2 +- src/WriterModule/f144/f144_Writer.cpp | 3 ++- src/WriterModule/f144/f144_Writer.h | 2 +- src/WriterModule/se00/se00_Writer.cpp | 5 ++-- src/WriterModule/se00/se00_Writer.h | 2 +- src/WriterModule/tdct/tdct_Writer.cpp | 5 ++-- src/WriterModule/tdct/tdct_Writer.h | 2 +- src/WriterModule/template/TemplateWriter.h | 3 ++- src/WriterModuleBase.h | 7 +++--- tests/SourceTests.cpp | 2 +- tests/Stream/MessageWriterTests.cpp | 4 ++-- tests/WriterModule/ev44_WriterTests.cpp | 27 ++++++++++++++++++++++ tests/helpers/StubWriterModule.h | 6 +++-- 22 files changed, 67 insertions(+), 28 deletions(-) diff --git a/src/WriterModule/ad00/ad00_Writer.cpp b/src/WriterModule/ad00/ad00_Writer.cpp index 39ab476d4..b50951a64 100644 --- a/src/WriterModule/ad00/ad00_Writer.cpp +++ b/src/WriterModule/ad00/ad00_Writer.cpp @@ -139,7 +139,7 @@ void msgTypeIsConfigType(ad00_Writer::Type ConfigType, DType MsgType) { } } -void ad00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, +bool ad00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, [[maybe_unused]] bool is_buffered_message) { auto ad00 = Getad00_ADArray(Message.data()); auto DataShape = @@ -200,6 +200,7 @@ void ad00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, CueTimestamp.appendElement(CurrentTimestamp); CueCounter = 0; } + return true; } template <typename Type> diff --git a/src/WriterModule/ad00/ad00_Writer.h b/src/WriterModule/ad00/ad00_Writer.h index 4d2ed1c9c..5922457d3 100644 --- a/src/WriterModule/ad00/ad00_Writer.h +++ b/src/WriterModule/ad00/ad00_Writer.h @@ -33,7 +33,7 @@ public: InitResult reopen(hdf5::node::Group &HDFGroup) override; - void writeImpl(FileWriter::FlatbufferMessage const &Message, + bool writeImpl(FileWriter::FlatbufferMessage const &Message, bool is_buffered_message) override; enum class Type { diff --git a/src/WriterModule/al00/al00_Writer.cpp b/src/WriterModule/al00/al00_Writer.cpp index 17e98caff..9beeff2ba 100644 --- a/src/WriterModule/al00/al00_Writer.cpp +++ b/src/WriterModule/al00/al00_Writer.cpp @@ -54,7 +54,7 @@ InitResult al00_Writer::reopen(hdf5::node::Group &HDFGroup) { return InitResult::OK; } -void al00_Writer::writeImpl(FlatbufferMessage const &Message, +bool al00_Writer::writeImpl(FlatbufferMessage const &Message, [[maybe_unused]] bool is_buffered_message) { auto AlarmMessage = GetAlarm(Message.data()); @@ -66,6 +66,7 @@ void al00_Writer::writeImpl(FlatbufferMessage const &Message, AlarmMessageString = "NO ALARM MESSAGE"; } AlarmMsg.appendStringElement(AlarmMessageString); + return true; } /// Register the writer module. diff --git a/src/WriterModule/al00/al00_Writer.h b/src/WriterModule/al00/al00_Writer.h index 52bb2c21e..fa68a2b36 100644 --- a/src/WriterModule/al00/al00_Writer.h +++ b/src/WriterModule/al00/al00_Writer.h @@ -33,7 +33,7 @@ public: WriterModule::InitResult reopen(hdf5::node::Group &HDFGroup) override; /// Write an incoming message which should contain a flatbuffer. - void writeImpl(FlatbufferMessage const &Message, + bool writeImpl(FlatbufferMessage const &Message, bool is_buffered_message) override; al00_Writer() : WriterModule::Base("al00", false, "NXlog") {} diff --git a/src/WriterModule/da00/da00_Writer.cpp b/src/WriterModule/da00/da00_Writer.cpp index 3c15c29ac..c36674bfe 100644 --- a/src/WriterModule/da00/da00_Writer.cpp +++ b/src/WriterModule/da00/da00_Writer.cpp @@ -330,7 +330,7 @@ InitResult da00_Writer::reopen(hdf5::node::Group &HDFGroup) { return InitResult::OK; } -void da00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, +bool da00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, [[maybe_unused]] bool is_buffered_message) { const auto da00_obj = Getda00_DataArray(Message.data()); if (isFirstMessage) { @@ -383,6 +383,7 @@ void da00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, CueTimestampZero.appendElement(da00_obj->timestamp()); CueCounter = 0; } + return true; } } // namespace WriterModule::da00 diff --git a/src/WriterModule/da00/da00_Writer.h b/src/WriterModule/da00/da00_Writer.h index 1f0367840..eb5301e55 100644 --- a/src/WriterModule/da00/da00_Writer.h +++ b/src/WriterModule/da00/da00_Writer.h @@ -36,7 +36,7 @@ public: InitResult reopen(hdf5::node::Group &HDFGroup) override; - void writeImpl(FileWriter::FlatbufferMessage const &Message, + bool writeImpl(FileWriter::FlatbufferMessage const &Message, bool is_buffered_message) override; NeXusDataset::Time Timestamp; diff --git a/src/WriterModule/ep01/ep01_Writer.cpp b/src/WriterModule/ep01/ep01_Writer.cpp index b0cf09150..3e6dc1f31 100644 --- a/src/WriterModule/ep01/ep01_Writer.cpp +++ b/src/WriterModule/ep01/ep01_Writer.cpp @@ -35,13 +35,14 @@ InitResult ep01_Writer::init_hdf(hdf5::node::Group &HDFGroup) { return InitResult::OK; } -void ep01_Writer::writeImpl(FileWriter::FlatbufferMessage const &Message, +bool ep01_Writer::writeImpl(FileWriter::FlatbufferMessage const &Message, [[maybe_unused]] bool is_buffered_message) { auto FlatBuffer = GetEpicsPVConnectionInfo(Message.data()); std::int16_t Status = static_cast<std::int16_t>(FlatBuffer->status()); StatusDataset.appendElement(Status); auto FBTimestamp = FlatBuffer->timestamp(); TimestampDataset.appendElement(FBTimestamp); + return true; } static WriterModule::Registry::Registrar<ep01_Writer> diff --git a/src/WriterModule/ep01/ep01_Writer.h b/src/WriterModule/ep01/ep01_Writer.h index 2013876ce..b13be6d9d 100644 --- a/src/WriterModule/ep01/ep01_Writer.h +++ b/src/WriterModule/ep01/ep01_Writer.h @@ -9,7 +9,7 @@ class ep01_Writer final : public WriterModule::Base { public: InitResult init_hdf(hdf5::node::Group &HDFGroup) override; InitResult reopen(hdf5::node::Group &HDFGroup) override; - void writeImpl(FileWriter::FlatbufferMessage const &Message, + bool writeImpl(FileWriter::FlatbufferMessage const &Message, bool is_buffered_message) override; ep01_Writer() : WriterModule::Base("ep01", false, "NXlog") {} diff --git a/src/WriterModule/ev44/ev44_Writer.cpp b/src/WriterModule/ev44/ev44_Writer.cpp index 9b4365539..b47611f2e 100644 --- a/src/WriterModule/ev44/ev44_Writer.cpp +++ b/src/WriterModule/ev44/ev44_Writer.cpp @@ -89,11 +89,11 @@ WriterModule::InitResult ev44_Writer::reopen(hdf5::node::Group &HDFGroup) { return WriterModule::InitResult::OK; } -void ev44_Writer::writeImpl(FlatbufferMessage const &Message, +bool ev44_Writer::writeImpl(FlatbufferMessage const &Message, bool is_buffered_message) { if (is_buffered_message) { // Ignore buffered data for event data - return; + return false; } auto EventMsgFlatbuffer = GetEvent44Message(Message.data()); auto CurrentNumberOfEvents = EventMsgFlatbuffer->time_of_flight()->size(); @@ -134,6 +134,7 @@ void ev44_Writer::writeImpl(FlatbufferMessage const &Message, } EventsWrittenMetadataField.setValue(EventsWritten); } + return true; } void ev44_Writer::register_meta_data(const hdf5::node::Group &HDFGroup, diff --git a/src/WriterModule/ev44/ev44_Writer.h b/src/WriterModule/ev44/ev44_Writer.h index 850378082..7e3d46493 100644 --- a/src/WriterModule/ev44/ev44_Writer.h +++ b/src/WriterModule/ev44/ev44_Writer.h @@ -27,7 +27,7 @@ public: /// \brief Write flatbuffer message. /// /// \param FlatBufferMessage - void writeImpl(FlatbufferMessage const &Message, + bool writeImpl(FlatbufferMessage const &Message, bool is_buffered_message) override; NeXusDataset::EventTimeOffset EventTimeOffset; diff --git a/src/WriterModule/f144/f144_Writer.cpp b/src/WriterModule/f144/f144_Writer.cpp index ae7e15ab6..8707ac4d7 100644 --- a/src/WriterModule/f144/f144_Writer.cpp +++ b/src/WriterModule/f144/f144_Writer.cpp @@ -194,7 +194,7 @@ void msgTypeIsConfigType(f144_Writer::Type ConfigType, Value MsgType) { } } -void f144_Writer::writeImpl(FlatbufferMessage const &Message, +bool f144_Writer::writeImpl(FlatbufferMessage const &Message, [[maybe_unused]] bool is_buffered_message) { auto LogDataMessage = Getf144_LogData(Message.data()); Timestamp.appendElement(LogDataMessage->timestamp()); @@ -274,6 +274,7 @@ void f144_Writer::writeImpl(FlatbufferMessage const &Message, MetaDataMax.setValue(Max); MetaDataMean.setValue(Sum / TotalNrOfElementsWritten); } + return true; } void f144_Writer::register_meta_data(hdf5::node::Group const &HDFGroup, diff --git a/src/WriterModule/f144/f144_Writer.h b/src/WriterModule/f144/f144_Writer.h index f31dba956..b051a739a 100644 --- a/src/WriterModule/f144/f144_Writer.h +++ b/src/WriterModule/f144/f144_Writer.h @@ -39,7 +39,7 @@ public: WriterModule::InitResult reopen(hdf5::node::Group &HDFGroup) override; /// Write an incoming message which should contain a flatbuffer. - void writeImpl(FlatbufferMessage const &Message, + bool writeImpl(FlatbufferMessage const &Message, bool is_buffered_message) override; f144_Writer() diff --git a/src/WriterModule/se00/se00_Writer.cpp b/src/WriterModule/se00/se00_Writer.cpp index 1514eeec0..0b0bd75d0 100644 --- a/src/WriterModule/se00/se00_Writer.cpp +++ b/src/WriterModule/se00/se00_Writer.cpp @@ -132,7 +132,7 @@ void msgTypeIsConfigType(se00_Writer::Type ConfigType, ValueUnion MsgType) { } } -void se00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, +bool se00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, [[maybe_unused]] bool is_buffered_message) { auto FbPointer = Getse00_SampleEnvironmentData(Message.data()); auto CueIndexValue = Value->current_size(); @@ -200,7 +200,7 @@ void se00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, Logger::Info("Unknown data type in flatbuffer."); } if (NrOfElements == 0) { - return; + return false; } CueTimestampIndex.appendElement(static_cast<std::uint32_t>(CueIndexValue)); CueTimestamp.appendElement(FbPointer->packet_timestamp()); @@ -217,6 +217,7 @@ void se00_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, FbPointer->packet_timestamp(), FbPointer->time_delta(), NrOfElements)); Timestamp.appendArray(TempTimeStamps); } + return true; } template <typename Type> diff --git a/src/WriterModule/se00/se00_Writer.h b/src/WriterModule/se00/se00_Writer.h index 175db231a..290c17719 100644 --- a/src/WriterModule/se00/se00_Writer.h +++ b/src/WriterModule/se00/se00_Writer.h @@ -44,7 +44,7 @@ public: InitResult reopen(hdf5::node::Group &HDFGroup) override; - void writeImpl(FlatbufferMessage const &Message, + bool writeImpl(FlatbufferMessage const &Message, bool is_buffered_message) override; enum class Type { diff --git a/src/WriterModule/tdct/tdct_Writer.cpp b/src/WriterModule/tdct/tdct_Writer.cpp index e7218b18c..2f69c1c7a 100644 --- a/src/WriterModule/tdct/tdct_Writer.cpp +++ b/src/WriterModule/tdct/tdct_Writer.cpp @@ -64,7 +64,7 @@ WriterModule::InitResult tdct_Writer::reopen(hdf5::node::Group &HDFGroup) { return WriterModule::InitResult::OK; } -void tdct_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, +bool tdct_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, [[maybe_unused]] bool is_buffered_message) { auto FbPointer = Gettimestamp(Message.data()); auto TempTimePtr = FbPointer->timestamps()->data(); @@ -72,13 +72,14 @@ void tdct_Writer::writeImpl(const FileWriter::FlatbufferMessage &Message, if (TempTimeSize == 0) { Logger::Info( "Received a flatbuffer with zero (0) timestamps elements in it."); - return; + return false; } hdf5::ArrayAdapter<const std::uint64_t> CArray(TempTimePtr, TempTimeSize); auto CueIndexValue = Timestamp.current_size(); CueTimestampIndex.appendElement(static_cast<std::uint32_t>(CueIndexValue)); CueTimestamp.appendElement(FbPointer->timestamps()->operator[](0)); Timestamp.appendArray(CArray); + return true; } } // namespace WriterModule::tdct diff --git a/src/WriterModule/tdct/tdct_Writer.h b/src/WriterModule/tdct/tdct_Writer.h index f82f7c350..ee6c2c87d 100644 --- a/src/WriterModule/tdct/tdct_Writer.h +++ b/src/WriterModule/tdct/tdct_Writer.h @@ -34,7 +34,7 @@ public: InitResult reopen(hdf5::node::Group &HDFGroup) override; - void writeImpl(FlatbufferMessage const &Message, + bool writeImpl(FlatbufferMessage const &Message, bool is_buffered_message) override; protected: diff --git a/src/WriterModule/template/TemplateWriter.h b/src/WriterModule/template/TemplateWriter.h index c0192f316..ea5cae5b3 100644 --- a/src/WriterModule/template/TemplateWriter.h +++ b/src/WriterModule/template/TemplateWriter.h @@ -186,8 +186,9 @@ public: /// \param Message The structure containing a pointer to a buffer /// containing data received from the Kafka broker and the size of the buffer. // cppcheck-suppress functionStatic - void writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const &message, [[maybe_unused]]bool is_buffered_message) override { + bool writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const &message, [[maybe_unused]]bool is_buffered_message) override { std::cout << "WriterClass::writeImpl()\n"; + return true; } }; } // namespace TemplateWriter diff --git a/src/WriterModuleBase.h b/src/WriterModuleBase.h index fd0ed03a3..65d8ac703 100644 --- a/src/WriterModuleBase.h +++ b/src/WriterModuleBase.h @@ -113,14 +113,15 @@ public: /// \param msg The message to process void write(FileWriter::FlatbufferMessage const &Message, bool is_buffered_message) { - writeImpl(Message, is_buffered_message); - WriteCount++; + if (writeImpl(Message, is_buffered_message)) { + WriteCount++; + } } /// \brief Process the message in some way, for example write to the HDF file. /// /// \param msg The message to process - virtual void writeImpl(FileWriter::FlatbufferMessage const &Message, + virtual bool writeImpl(FileWriter::FlatbufferMessage const &Message, bool is_buffered_message) = 0; void registerField(JsonConfig::FieldBase *Ptr) { diff --git a/tests/SourceTests.cpp b/tests/SourceTests.cpp index efc2a24c4..49c3a8168 100644 --- a/tests/SourceTests.cpp +++ b/tests/SourceTests.cpp @@ -34,7 +34,7 @@ public: class WriterModuleMock : public StubWriterModule { public: - MAKE_MOCK2(writeImpl, void(FlatbufferMessage const &, bool), override); + MAKE_MOCK2(writeImpl, bool(FlatbufferMessage const &, bool), override); }; TEST_F(SourceTests, ConstructorSetsMembers) { diff --git a/tests/Stream/MessageWriterTests.cpp b/tests/Stream/MessageWriterTests.cpp index 8db83f246..0cec2c971 100644 --- a/tests/Stream/MessageWriterTests.cpp +++ b/tests/Stream/MessageWriterTests.cpp @@ -25,7 +25,7 @@ public: ~WriterModuleStandIn() = default; MAKE_MOCK1(init_hdf, WriterModule::InitResult(hdf5::node::Group &), override); MAKE_MOCK1(reopen, WriterModule::InitResult(hdf5::node::Group &), override); - MAKE_MOCK2(writeImpl, void(FileWriter::FlatbufferMessage const &, bool), + MAKE_MOCK2(writeImpl, bool(FileWriter::FlatbufferMessage const &, bool), override); }; @@ -80,7 +80,7 @@ TEST_F(DataMessageWriterTest, EnableExtraModule) { } TEST_F(DataMessageWriterTest, WriteMessageSuccess) { - REQUIRE_CALL(WriterModule, writeImpl(_, _)).TIMES(1); + REQUIRE_CALL(WriterModule, writeImpl(_, _)).TIMES(1).RETURN(true); auto InitialWriteCount = WriterModule.getWriteCount(); FileWriter::FlatbufferMessage Msg; Stream::Message SomeMessage( diff --git a/tests/WriterModule/ev44_WriterTests.cpp b/tests/WriterModule/ev44_WriterTests.cpp index f2e7b1f86..a3b904fc3 100644 --- a/tests/WriterModule/ev44_WriterTests.cpp +++ b/tests/WriterModule/ev44_WriterTests.cpp @@ -603,3 +603,30 @@ TEST_F(Event44WriterTests, CuesFromTwoMessagesAreRecorded) { << "Expected cue_index dataset to contain the indices of the last event " "of every message"; } + +TEST_F(Event44WriterTests, buffered_data_not_written) { + std::vector<int32_t> TimeOfFlight1 = {101, 102, 201}; + std::vector<int32_t> DetectorID1 = {101, 102, 201}; + std::vector<int64_t> ReferenceTime1 = {1000, 2000}; + std::vector<int32_t> ReferenceTimeIndex1 = {0, 2}; + auto MessageBuffer1 = + generateFlatbufferData("TestSource", 1, TimeOfFlight1, DetectorID1, + ReferenceTime1, ReferenceTimeIndex1); + FileWriter::FlatbufferMessage TestMessage1(MessageBuffer1.data(), + MessageBuffer1.size()); + + // Create writer and give it the message to write + { + WriterModule::ev44::ev44_Writer Writer; + EXPECT_TRUE(Writer.init_hdf(TestGroup) == InitResult::OK); + EXPECT_TRUE(Writer.reopen(TestGroup) == InitResult::OK); + EXPECT_NO_THROW(Writer.write(TestMessage1, true)); + } // These braces are required due to "h5.cpp" + + // Read data from the file + auto EventTimeOffsetDataset = TestGroup.get_dataset("event_time_offset"); + auto EventTimeZeroDataset = TestGroup.get_dataset("event_time_zero"); + + EXPECT_EQ(0, EventTimeOffsetDataset.dataspace().size()); + EXPECT_EQ(0, EventTimeZeroDataset.dataspace().size()); +} diff --git a/tests/helpers/StubWriterModule.h b/tests/helpers/StubWriterModule.h index 3b05f4720..d75dd844d 100644 --- a/tests/helpers/StubWriterModule.h +++ b/tests/helpers/StubWriterModule.h @@ -22,6 +22,8 @@ public: InitResult reopen(hdf5::node::Group & /*HDFGroup*/) override { return InitResult::OK; } - void writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const &message, - [[maybe_unused]] bool is_buffered_message) override {} + bool writeImpl([[maybe_unused]] FileWriter::FlatbufferMessage const &message, + [[maybe_unused]] bool is_buffered_message) override { + return true; + } }; -- GitLab