diff --git a/src/CommandSystem/Handler.cpp b/src/CommandSystem/Handler.cpp index 2571111c1b2deb1d91408d3739ea1171340ae0d0..3603fce6587a273a069828ac3c5387b96f6c8c32 100644 --- a/src/CommandSystem/Handler.cpp +++ b/src/CommandSystem/Handler.cpp @@ -47,6 +47,7 @@ void Handler::loopFunction() { auto [poll_status, message] = JobPool->pollForJob(); if (poll_status == Kafka::PollStatus::Message && Parser::isStartCommand(message)) { + JobPool->commit_pool_offset(); handleStartCommand(std::move(message)); JobPool->disconnectFromPool(); } diff --git a/src/CommandSystem/JobListener.h b/src/CommandSystem/JobListener.h index 9555cb4769bea29a4880cd02892662cae6a1615d..5c0d24c8e3169f14c28cf6aab22327b7e65e5f8a 100644 --- a/src/CommandSystem/JobListener.h +++ b/src/CommandSystem/JobListener.h @@ -65,6 +65,14 @@ public: /// pollForJob(). virtual void disconnectFromPool(); + /// Commit the offset so that if the filewriter crashes it does not + /// re-consume the message. + void commit_pool_offset() { + if (!Consumer->commit_offset()) { + Logger::Error("Could not commit offset for CommandListener"); + } + } + private: // Do not change the ConsumerGroupId variable, it is vital to the workings of // the worker pool diff --git a/src/Kafka/Consumer.h b/src/Kafka/Consumer.h index 0ba2a994dcb6e1b6b3a8a5e33867607b8d074920..ebff0e6cbddd8e8fcf4bafee136f15165d9aaa99 100644 --- a/src/Kafka/Consumer.h +++ b/src/Kafka/Consumer.h @@ -34,6 +34,7 @@ public: virtual void addTopic(std::string const &Topic) = 0; virtual void assignAllPartitions(std::string const &Topic, time_point const &StartTimestamp) = 0; + virtual bool commit_offset() = 0; virtual const RdKafka::TopicMetadata * getTopicMetadata(const std::string &Topic, RdKafka::Metadata *MetadataPtr) = 0; @@ -80,6 +81,11 @@ public: getTopicMetadata(const std::string &Topic, RdKafka::Metadata *MetadataPtr) override; + bool commit_offset() override { + auto error = KafkaConsumer->commitSync(); + return error == RdKafka::ERR_NO_ERROR; + } + private: std::unique_ptr<RdKafka::Conf> Conf; BrokerSettings const ConsumerBrokerSettings; @@ -144,6 +150,8 @@ public: _topic = Topic; } + bool commit_offset() override { return true; } + const RdKafka::TopicMetadata * getTopicMetadata([[maybe_unused]] const std::string &Topic, [[maybe_unused]] RdKafka::Metadata *MetadataPtr) override {