From 8d80bb1b5b86591d56453f80cd8ad5016f07cb60 Mon Sep 17 00:00:00 2001
From: "P.-A. Loizeau" <p.-a.loizeau@gsi.de>
Date: Wed, 6 Apr 2022 18:47:56 +0200
Subject: [PATCH] [MQ] In digi evt sink, add an option to bypass the buffer to
 write TS in order with no holes

---
 MQ/histoServer/CbmMqHistoServer.cxx   |  2 ++
 MQ/mcbm/CbmDeviceDigiEventSink.cxx    | 33 +++++++++++++++++++++------
 MQ/mcbm/CbmDeviceDigiEventSink.h      |  7 +++---
 MQ/mcbm/runDigiEventSink.cxx          |  2 ++
 MQ/mcbm/startBuildRawEvents2022.sh.in |  1 +
 5 files changed, 35 insertions(+), 10 deletions(-)

diff --git a/MQ/histoServer/CbmMqHistoServer.cxx b/MQ/histoServer/CbmMqHistoServer.cxx
index 9b25d4cbff..ce1a896110 100644
--- a/MQ/histoServer/CbmMqHistoServer.cxx
+++ b/MQ/histoServer/CbmMqHistoServer.cxx
@@ -473,6 +473,8 @@ bool CbmMqHistoServer::SaveHistograms()
   // open separate histo file in recreate mode
   histoFile = new TFile(fsHistoFileName.data(), "RECREATE");
 
+  LOG(info) << "Save Histos in file " << fsHistoFileName.data();
+
   if (nullptr == histoFile) return false;
 
   /// Register the histos in the HTTP server
diff --git a/MQ/mcbm/CbmDeviceDigiEventSink.cxx b/MQ/mcbm/CbmDeviceDigiEventSink.cxx
index 239d6daa69..9dcc0b4582 100644
--- a/MQ/mcbm/CbmDeviceDigiEventSink.cxx
+++ b/MQ/mcbm/CbmDeviceDigiEventSink.cxx
@@ -69,6 +69,8 @@ try {
   fsChannelNameDataInput = fConfig->GetValue<std::string>("EvtNameIn");
   fsAllowedChannels[0]   = fsChannelNameDataInput;
 
+  fbBypassConsecutiveTs = fConfig->GetValue<bool>("BypassConsecutiveTs");
+
   fbFillHistos             = fConfig->GetValue<bool>("FillHistos");
   fuPublishFreqTs          = fConfig->GetValue<uint32_t>("PubFreqTs");
   fdMinPublishTime         = fConfig->GetValue<double_t>("PubTimeMin");
@@ -270,9 +272,9 @@ bool CbmDeviceDigiEventSink::HandleData(FairMQParts& parts, int /*index*/)
   CbmEventTimeslice unpTs(parts);
 
   /// FIXME: Need to check if TS arrived in order (probably not!!!) + buffer!!!
-  LOG(info) << "Next TS check " << fuPrevTsIndex << " " << fulTsCounter << " " << unpTs.fTsMetaData.GetIndex()
-            << " Sorage size: " << fmFullTsStorage.size();
-  if (fuPrevTsIndex + 1 == unpTs.fTsMetaData.GetIndex()
+  LOG(debug) << "Next TS check " << fuPrevTsIndex << " " << fulTsCounter << " " << unpTs.fTsMetaData.GetIndex()
+             << " Storage size: " << fmFullTsStorage.size();
+  if (fbBypassConsecutiveTs || (fuPrevTsIndex + 1 == unpTs.fTsMetaData.GetIndex())
       || (0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == unpTs.fTsMetaData.GetIndex())) {
     LOG(debug) << "TS direct to dump";
     /// Fill all storage variables registers for data output
@@ -294,9 +296,26 @@ bool CbmDeviceDigiEventSink::HandleData(FairMQParts& parts, int /*index*/)
   /// Clear metadata => crashes, maybe not needed as due to move the pointer is invalidated?
   //   delete fTsMetaData;
 
-  /// Check TS queue and process it if needed (in case it filled a hole!)
-  CheckTsQueues();
-  LOG(debug) << "TS queues checked";
+  if (fbBypassConsecutiveTs) {
+    /// Skip checking the TS buffer as writing straight to file
+    /// => Just check if we are done and can close the file or not
+    if (fbReceivedEof) {
+      /// In this case we cannot check if the last TS received/processed is the final one due to lack of order
+      /// => use instead the fact that we received all expected TS
+      if ((fulTsCounter + fvulMissedTsIndices.size()) == fuTotalTsCount) {
+        LOG(info) << "CbmDeviceDigiEventSink::HandleData => "
+                  << "Found all expected TS (" << fulTsCounter << ") and total nb of TS " << fuTotalTsCount
+                  << " after accounting for the ones reported as missing by the source (" << fvulMissedTsIndices.size()
+                  << ")";
+        Finish();
+      }  // if ((fulTsCounter + fvulMissedTsIndices.size()) == fuTotalTsCount)
+    }
+  }
+  else {
+    /// Check TS queue and process it if needed (in case it filled a hole!)
+    CheckTsQueues();
+    LOG(debug) << "TS queues checked";
+  }
 
   /// Histograms management
   if (kTRUE == fbFillHistos) {
@@ -613,7 +632,7 @@ CbmDeviceDigiEventSink::~CbmDeviceDigiEventSink()
 {
   /// FIXME: Add pointers check before delete
 
-  /// Close things properly if not alredy done
+  /// Close things properly if not already done
   if (!fbFinishDone) Finish();
 
   /// Clear events vector
diff --git a/MQ/mcbm/CbmDeviceDigiEventSink.h b/MQ/mcbm/CbmDeviceDigiEventSink.h
index 138e3d0c18..da45d40b7c 100644
--- a/MQ/mcbm/CbmDeviceDigiEventSink.h
+++ b/MQ/mcbm/CbmDeviceDigiEventSink.h
@@ -82,9 +82,10 @@ private:
   /// Constants
 
   /// Control flags
-  Bool_t fbStoreFullTs = false;  //! If true, store digis vectors with full TS in addition to selected events
-  Bool_t fbFillHistos  = false;  //! Switch ON/OFF filling of histograms
-  Bool_t fbFinishDone  = false;  //! Keep track of whether the Finish was already called
+  bool fbStoreFullTs         = false;  //! If true, store digis vectors with full TS in addition to selected events
+  bool fbBypassConsecutiveTs = false;  //! Switch ON/OFF the bypass of the consecutive TS buffer before writing to file
+  bool fbFillHistos          = false;  //! Switch ON/OFF filling of histograms
+  bool fbFinishDone          = false;  //! Keep track of whether the Finish was already called
 
   /// User settings parameters
   /// Algo enum settings
diff --git a/MQ/mcbm/runDigiEventSink.cxx b/MQ/mcbm/runDigiEventSink.cxx
index 06e6a97e4f..7f875f027f 100644
--- a/MQ/mcbm/runDigiEventSink.cxx
+++ b/MQ/mcbm/runDigiEventSink.cxx
@@ -20,6 +20,8 @@ void addCustomOptions(bpo::options_description& options)
                         "Name (full or relative path) of the output .root file ");
   options.add_options()("EvtNameIn", bpo::value<std::string>()->default_value("events"),
                         "MQ channel name for built events");
+  options.add_options()("BypassConsecutiveTs", bpo::value<bool>()->default_value(false),
+                        "Do not wait for having consecutive TS in buffer before writing to file if true");
   options.add_options()("FillHistos", bpo::value<bool>()->default_value(false),
                         "Fill histograms and send them to histo server if true");
 
diff --git a/MQ/mcbm/startBuildRawEvents2022.sh.in b/MQ/mcbm/startBuildRawEvents2022.sh.in
index 33542299a4..d508da90f6 100755
--- a/MQ/mcbm/startBuildRawEvents2022.sh.in
+++ b/MQ/mcbm/startBuildRawEvents2022.sh.in
@@ -285,6 +285,7 @@ EVTSINK+=" --id evtsink1"
 EVTSINK+=" --severity info"
 #EVTSINK+=" --severity debug"
 #EVTSINK+=" --StoreFullTs 1"
+#EVTSINK+=" --BypassConsecutiveTs 1"
 EVTSINK+=" --OutFileName mcbm_digis_events.root"
 EVTSINK+=" --FillHistos true"
 EVTSINK+=" --PubFreqTs $_pubfreqts"
-- 
GitLab