Skip to content

Fix stop time functionality and allow writing of historic data

Created by: matthew-d-jones

Issue

Closes DM-1616 and DM-1148

Description of work

Previous to these changes, the file writer stops consuming any data as soon as the current wallclock time passes the requested stop time. This means it was impossible to write old data, for example from an experiment run which has already finished. It also meant that if the file writer did not keep up with the data stream then we lost data from the end of the run. We observed both of these problems at V20.

The file writer now does the following:

  • Until the system time >= stop time + max_producer_lag, continue to consume data but throw away any messages which are timestamped greater than stop time
  • When system time >= stop time + max_producer_lag, get stop offsets using offsetsForTimes method.
  • Continue to consume data until current offset >= stop offset, throwing away any messages which are timestamped greater than stop time

The new test_filewriter_can_write_data_when_start_and_stop_time_are_in_the_past system test demonstrates the new functionality. The functionality is also covered by unit tests in StreamerTest.cpp. There are several edge cases to be handled, for example that there is no data in the topic, that the stop offset was already reached when the stop offset is looked up, or that there is data in the topic but none between the start and stop offset. These are covered by unit tests.

During testing at V20 I discovered that I needed to improve the file writer's response to poor network conditions. To achieve this there are now retries on metadata lookups and I also updated librdkafka to 1.1.0 which is much more robust.

Nominate for Group Code Review

  • Nominate for code review

Merge request reports