From a1b6e83c79fdf33855cbeaa7c4bc4edab5b1002b Mon Sep 17 00:00:00 2001 From: Matt Clarke <matt.clarke@ess.eu> Date: Wed, 22 Jan 2025 13:19:10 +0000 Subject: [PATCH] added additional logging for streamcontroller --- src/StreamController.cpp | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/StreamController.cpp b/src/StreamController.cpp index 75ecf4bce..b1c73583d 100644 --- a/src/StreamController.cpp +++ b/src/StreamController.cpp @@ -25,25 +25,33 @@ StreamController::StreamController( Registrar->getNewRegistrar("stream.writer")), StreamerOptions(Settings), MetaDataTracker(std::move(Tracker)), _metadata_enquirer(std::move(metadata_enquirer)), - _consumer_factory(std::move(consumer_factory)) {} + _consumer_factory(std::move(consumer_factory)) { + Logger::Error("streamcontroller constructed"); +} StreamController::~StreamController() { + Logger::Error("streamcontroller destructor start"); stop(); + Logger::Error("streamcontroller destructor calls mdat"); MdatWriter->write_metadata(*WriterTask); Logger::Info("Stopped StreamController for file with id : {}", StreamController::getJobId()); + Logger::Error("streamcontroller destructor end"); } void StreamController::start() { + Logger::Error("streamcontroller start start"); MdatWriter->set_start_time(StreamerOptions.StartTimestamp); MdatWriter->set_stop_time(StreamerOptions.StopTimestamp); Executor.sendLowPriorityWork([=]() { CurrentMetadataTimeOut = StreamerOptions.BrokerSettings.MinMetadataTimeout; getTopicNames(); }); + Logger::Error("streamcontroller start end"); } void StreamController::setStopTime(time_point const &StopTime) { + Logger::Error("streamcontroller setstoptime start"); StreamerOptions.StopTimestamp = StopTime; MdatWriter->set_stop_time(StopTime); Executor.sendWork([=]() { @@ -51,6 +59,7 @@ void StreamController::setStopTime(time_point const &StopTime) { s->setStopTime(StopTime); } }); + Logger::Error("streamcontroller setstoptime end"); } void StreamController::pauseStreamers() { StreamersPaused.store(true); } @@ -58,14 +67,17 @@ void StreamController::pauseStreamers() { StreamersPaused.store(true); } void StreamController::resumeStreamers() { StreamersPaused.store(false); } void StreamController::stop() { + Logger::Error("streamcontroller stop start"); for (auto &Stream : Streamers) Stream->stop(); WriterThread.stop(); StopNow = true; + Logger::Error("streamcontroller stop end"); } using duration = std::chrono::system_clock::duration; bool StreamController::isDoneWriting() { + Logger::Error("streamcontroller isdonewriting start"); auto Now = std::chrono::system_clock::now(); auto IsDoneWriting = HasError || StopNow or @@ -80,12 +92,14 @@ bool StreamController::isDoneWriting() { FileSizeCalcInterval * int(std::round(TimeDiffPeriods)); } } + Logger::Error("streamcontroller isdonewriting end"); return IsDoneWriting; } std::string StreamController::getJobId() const { return WriterTask->jobID(); } void StreamController::getTopicNames() { + Logger::Error("streamcontroller gettopicnames start"); try { auto TopicNames = _metadata_enquirer->getTopicList( StreamerOptions.BrokerSettings.Address, CurrentMetadataTimeOut, @@ -104,9 +118,11 @@ void StreamController::getTopicNames() { .count()); Executor.sendLowPriorityWork([=]() { getTopicNames(); }); } + Logger::Error("streamcontroller gettopicnames end"); } void StreamController::initStreams(std::set<std::string> known_topic_names) { + Logger::Error("streamcontroller initstreams start"); std::map<std::string, Stream::SrcToDst> topic_src_map; std::string errors_collector; for (auto &src : WriterTask->sources()) { @@ -148,6 +164,7 @@ void StreamController::initStreams(std::set<std::string> known_topic_names) { Streamers.emplace_back(std::move(topic)); } Executor.sendLowPriorityWork([=]() { performPeriodicChecks(); }); + Logger::Error("streamcontroller initstreams end"); } bool StreamController::hasErrorState() const { return HasError; } @@ -158,13 +175,16 @@ std::string StreamController::errorMessage() { } void StreamController::performPeriodicChecks() { + Logger::Error("streamcontroller performperiodicchecks start"); checkIfStreamsAreDone(); throttleIfWriteQueueIsFull(); std::this_thread::sleep_for(PeriodicChecksInterval); Executor.sendLowPriorityWork([=]() { performPeriodicChecks(); }); + Logger::Error("streamcontroller performperiodicchecks end"); } void StreamController::checkIfStreamsAreDone() { + Logger::Error("streamcontroller checkifstreamsaredone start"); try { Streamers.erase( std::remove_if(Streamers.begin(), Streamers.end(), @@ -181,9 +201,11 @@ void StreamController::checkIfStreamsAreDone() { stop(); StreamersRemaining.store(false); } + Logger::Error("streamcontroller checkifstreamsaredone end"); } void StreamController::throttleIfWriteQueueIsFull() { + Logger::Error("streamcontroller throttle start"); auto QueuedWrites = WriterThread.nrOfWritesQueued(); if (QueuedWrites > StreamerOptions.MaxQueuedWrites && !StreamersPaused.load()) { @@ -198,6 +220,7 @@ void StreamController::throttleIfWriteQueueIsFull() { QueuedWrites); resumeStreamers(); } + Logger::Error("streamcontroller throttle end"); } } // namespace FileWriter -- GitLab