From 0b178923bc60bbbe9b2c6e4e4c916cb9af532f08 Mon Sep 17 00:00:00 2001
From: Matt Clarke <matt.clarke@ess.eu>
Date: Tue, 21 Jan 2025 14:46:34 +0000
Subject: [PATCH] Commit pool offset when starting job

---
 src/CommandSystem/Handler.cpp   | 1 +
 src/CommandSystem/JobListener.h | 8 ++++++++
 src/Kafka/Consumer.h            | 8 ++++++++
 3 files changed, 17 insertions(+)

diff --git a/src/CommandSystem/Handler.cpp b/src/CommandSystem/Handler.cpp
index 2571111c1..3603fce65 100644
--- a/src/CommandSystem/Handler.cpp
+++ b/src/CommandSystem/Handler.cpp
@@ -47,6 +47,7 @@ void Handler::loopFunction() {
     auto [poll_status, message] = JobPool->pollForJob();
     if (poll_status == Kafka::PollStatus::Message &&
         Parser::isStartCommand(message)) {
+      JobPool->commit_pool_offset();
       handleStartCommand(std::move(message));
       JobPool->disconnectFromPool();
     }
diff --git a/src/CommandSystem/JobListener.h b/src/CommandSystem/JobListener.h
index 9555cb476..5c0d24c8e 100644
--- a/src/CommandSystem/JobListener.h
+++ b/src/CommandSystem/JobListener.h
@@ -65,6 +65,14 @@ public:
   /// pollForJob().
   virtual void disconnectFromPool();
 
+  /// Commit the offset so that if the filewriter crashes it does not
+  /// re-consume the message.
+  void commit_pool_offset() {
+    if (!Consumer->commit_offset()) {
+      Logger::Error("Could not commit offset for CommandListener");
+    }
+  }
+
 private:
   // Do not change the ConsumerGroupId variable, it is vital to the workings of
   // the worker pool
diff --git a/src/Kafka/Consumer.h b/src/Kafka/Consumer.h
index 0ba2a994d..ebff0e6cb 100644
--- a/src/Kafka/Consumer.h
+++ b/src/Kafka/Consumer.h
@@ -34,6 +34,7 @@ public:
   virtual void addTopic(std::string const &Topic) = 0;
   virtual void assignAllPartitions(std::string const &Topic,
                                    time_point const &StartTimestamp) = 0;
+  virtual bool commit_offset() = 0;
   virtual const RdKafka::TopicMetadata *
   getTopicMetadata(const std::string &Topic,
                    RdKafka::Metadata *MetadataPtr) = 0;
@@ -80,6 +81,11 @@ public:
   getTopicMetadata(const std::string &Topic,
                    RdKafka::Metadata *MetadataPtr) override;
 
+  bool commit_offset() override {
+    auto error = KafkaConsumer->commitSync();
+    return error == RdKafka::ERR_NO_ERROR;
+  }
+
 private:
   std::unique_ptr<RdKafka::Conf> Conf;
   BrokerSettings const ConsumerBrokerSettings;
@@ -144,6 +150,8 @@ public:
     _topic = Topic;
   }
 
+  bool commit_offset() override { return true; }
+
   const RdKafka::TopicMetadata *
   getTopicMetadata([[maybe_unused]] const std::string &Topic,
                    [[maybe_unused]] RdKafka::Metadata *MetadataPtr) override {
-- 
GitLab