diff --git a/src/StreamController.cpp b/src/StreamController.cpp index 75ecf4bcebf7e6476ce97f5e718776413684cca4..b1c73583deaa6ba660b74ad2e52768737c4f4f99 100644 --- a/src/StreamController.cpp +++ b/src/StreamController.cpp @@ -25,25 +25,33 @@ StreamController::StreamController( Registrar->getNewRegistrar("stream.writer")), StreamerOptions(Settings), MetaDataTracker(std::move(Tracker)), _metadata_enquirer(std::move(metadata_enquirer)), - _consumer_factory(std::move(consumer_factory)) {} + _consumer_factory(std::move(consumer_factory)) { + Logger::Error("streamcontroller constructed"); +} StreamController::~StreamController() { + Logger::Error("streamcontroller destructor start"); stop(); + Logger::Error("streamcontroller destructor calls mdat"); MdatWriter->write_metadata(*WriterTask); Logger::Info("Stopped StreamController for file with id : {}", StreamController::getJobId()); + Logger::Error("streamcontroller destructor end"); } void StreamController::start() { + Logger::Error("streamcontroller start start"); MdatWriter->set_start_time(StreamerOptions.StartTimestamp); MdatWriter->set_stop_time(StreamerOptions.StopTimestamp); Executor.sendLowPriorityWork([=]() { CurrentMetadataTimeOut = StreamerOptions.BrokerSettings.MinMetadataTimeout; getTopicNames(); }); + Logger::Error("streamcontroller start end"); } void StreamController::setStopTime(time_point const &StopTime) { + Logger::Error("streamcontroller setstoptime start"); StreamerOptions.StopTimestamp = StopTime; MdatWriter->set_stop_time(StopTime); Executor.sendWork([=]() { @@ -51,6 +59,7 @@ void StreamController::setStopTime(time_point const &StopTime) { s->setStopTime(StopTime); } }); + Logger::Error("streamcontroller setstoptime end"); } void StreamController::pauseStreamers() { StreamersPaused.store(true); } @@ -58,14 +67,17 @@ void StreamController::pauseStreamers() { StreamersPaused.store(true); } void StreamController::resumeStreamers() { StreamersPaused.store(false); } void StreamController::stop() { + Logger::Error("streamcontroller stop start"); for (auto &Stream : Streamers) Stream->stop(); WriterThread.stop(); StopNow = true; + Logger::Error("streamcontroller stop end"); } using duration = std::chrono::system_clock::duration; bool StreamController::isDoneWriting() { + Logger::Error("streamcontroller isdonewriting start"); auto Now = std::chrono::system_clock::now(); auto IsDoneWriting = HasError || StopNow or @@ -80,12 +92,14 @@ bool StreamController::isDoneWriting() { FileSizeCalcInterval * int(std::round(TimeDiffPeriods)); } } + Logger::Error("streamcontroller isdonewriting end"); return IsDoneWriting; } std::string StreamController::getJobId() const { return WriterTask->jobID(); } void StreamController::getTopicNames() { + Logger::Error("streamcontroller gettopicnames start"); try { auto TopicNames = _metadata_enquirer->getTopicList( StreamerOptions.BrokerSettings.Address, CurrentMetadataTimeOut, @@ -104,9 +118,11 @@ void StreamController::getTopicNames() { .count()); Executor.sendLowPriorityWork([=]() { getTopicNames(); }); } + Logger::Error("streamcontroller gettopicnames end"); } void StreamController::initStreams(std::set<std::string> known_topic_names) { + Logger::Error("streamcontroller initstreams start"); std::map<std::string, Stream::SrcToDst> topic_src_map; std::string errors_collector; for (auto &src : WriterTask->sources()) { @@ -148,6 +164,7 @@ void StreamController::initStreams(std::set<std::string> known_topic_names) { Streamers.emplace_back(std::move(topic)); } Executor.sendLowPriorityWork([=]() { performPeriodicChecks(); }); + Logger::Error("streamcontroller initstreams end"); } bool StreamController::hasErrorState() const { return HasError; } @@ -158,13 +175,16 @@ std::string StreamController::errorMessage() { } void StreamController::performPeriodicChecks() { + Logger::Error("streamcontroller performperiodicchecks start"); checkIfStreamsAreDone(); throttleIfWriteQueueIsFull(); std::this_thread::sleep_for(PeriodicChecksInterval); Executor.sendLowPriorityWork([=]() { performPeriodicChecks(); }); + Logger::Error("streamcontroller performperiodicchecks end"); } void StreamController::checkIfStreamsAreDone() { + Logger::Error("streamcontroller checkifstreamsaredone start"); try { Streamers.erase( std::remove_if(Streamers.begin(), Streamers.end(), @@ -181,9 +201,11 @@ void StreamController::checkIfStreamsAreDone() { stop(); StreamersRemaining.store(false); } + Logger::Error("streamcontroller checkifstreamsaredone end"); } void StreamController::throttleIfWriteQueueIsFull() { + Logger::Error("streamcontroller throttle start"); auto QueuedWrites = WriterThread.nrOfWritesQueued(); if (QueuedWrites > StreamerOptions.MaxQueuedWrites && !StreamersPaused.load()) { @@ -198,6 +220,7 @@ void StreamController::throttleIfWriteQueueIsFull() { QueuedWrites); resumeStreamers(); } + Logger::Error("streamcontroller throttle end"); } } // namespace FileWriter