diff --git a/src/modules/CMakeLists.txt b/src/modules/CMakeLists.txt index be57c5b8dc2bbf881ae9a9955657b0bb7f4dab85..4c6a300fac9d95ee1c13088932572c2f7b441553 100644 --- a/src/modules/CMakeLists.txt +++ b/src/modules/CMakeLists.txt @@ -5,6 +5,7 @@ include_directories(.) +add_subdirectory(generators) add_subdirectory(gdgem) add_subdirectory(multiblade) add_subdirectory(multigrid) diff --git a/src/modules/generators/CMakeLists.txt b/src/modules/generators/CMakeLists.txt index 97fffe1321ab000f3fed6a9528d034c040bc36b7..1ca4cfc6a87ac9f9c0efcd97c87d92e8cd4ecfe5 100644 --- a/src/modules/generators/CMakeLists.txt +++ b/src/modules/generators/CMakeLists.txt @@ -13,3 +13,8 @@ set(udpgen_hits_INC ) create_executable(udpgen_hits) target_compile_definitions(udpgen_hits PUBLIC GENERATOR_UDP_HITS) + + +# Stream event pixels from hdf5 files +set(hdf5events_SRC hdf5events.cpp) +create_executable(hdf5events) diff --git a/src/modules/generators/hdf5events.cpp b/src/modules/generators/hdf5events.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7273fa37c1ed20a2dc79eeb4311c19eab4e2c463 --- /dev/null +++ b/src/modules/generators/hdf5events.cpp @@ -0,0 +1,56 @@ +// Copyright (C) 2021 European Spallation Source, see LICENSE file +//===----------------------------------------------------------------------===// +/// +/// \file +/// +/// \brief A streamer of efu events (only pixel ids) from hdf5 files +/// +//===----------------------------------------------------------------------===// + +#include <CLI/CLI.hpp> +#include <cinttypes> +#include <common/EV42Serializer.h> +#include <common/Producer.h> +#include <h5cpp/hdf5.hpp> +#include <unistd.h> + +struct { + std::string FileName; + std::string KafkaBroker{"172.30.242.20:9092"}; + std::string KafkaTopic{"FREIA_detector"}; + int KafkaBufferSize {124000}; /// entries ~ 1MB +} Config; + +CLI::App app{"Read event_id from hdf5 files and send to Kafka"}; + +int main(int argc, char *argv[]) { + app.add_option("-f, --file", Config.FileName, "FileWriter HDF5"); + app.add_option("-b, --broker", Config.KafkaBroker, "Kafka broker"); + app.add_option("-t, --topic", Config.KafkaTopic, "Kafka topic"); + CLI11_PARSE(app, argc, argv); + + Producer eventprod(Config.KafkaBroker, Config.KafkaTopic); + auto Produce = [&eventprod](auto DataBuffer, auto Timestamp) { + eventprod.produce(DataBuffer, Timestamp); + }; + + EV42Serializer flatbuffer(Config.KafkaBufferSize, "multiblade", Produce); + + uint64_t efu_time = 1000000000LU * (uint64_t)time(NULL); // ns since 1970 + flatbuffer.pulseTime(efu_time); + + auto HDF5File = hdf5::file::open(Config.FileName); + auto RootGroup = HDF5File.root(); + auto Dataset = RootGroup.get_dataset("/experiment/data/event_id"); + hdf5::dataspace::Simple Dataspace(Dataset.dataspace()); + std::vector<uint32_t> AllElements(Dataspace.size()); + Dataset.read(AllElements); + + for (uint32_t Value : AllElements) { + flatbuffer.addEvent(0, Value); + } + + flatbuffer.produce(); + sleep(1); + return 0; +}