Skip to content
Snippets Groups Projects
CbmDeviceMcbmEventBuilderWin.cxx 28.63 KiB
/* Copyright (C) 2020 Facility for Antiproton and Ion Research in Europe, Darmstadt
   SPDX-License-Identifier: GPL-3.0-only
   Authors: Pierre-Alain Loizeau [committer] */

/**
 * CbmDeviceMcbmEventBuilderWin.cxx
 *
 * @since 2020-05-24
 * @author P.-A. Loizeau
 */

#include "CbmDeviceMcbmEventBuilderWin.h"


/// CBM headers
#include "CbmEvent.h"
#include "CbmFlesCanvasTools.h"
#include "CbmMQDefs.h"
#include "CbmMatch.h"
#include "CbmMvdDigi.h"

#include "TimesliceMetaData.h"

/// FAIRROOT headers
#include "FairMQLogger.h"
#include "FairMQProgOptions.h"  // device->fConfig
#include "FairParGenericSet.h"
#include "FairRunOnline.h"

#include "BoostSerializer.h"

#include "RootSerializer.h"

/// FAIRSOFT headers (geant, boost, ...)
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"
#include "TList.h"
#include "TNamed.h"

#include <boost/archive/binary_iarchive.hpp>
#include <boost/serialization/utility.hpp>

/// C/C++ headers
#include <array>
#include <iomanip>
#include <stdexcept>
#include <string>
struct InitTaskError : std::runtime_error {
  using std::runtime_error::runtime_error;
};

using namespace std;

//Bool_t bMcbm2018MonitorTaskBmonResetHistos = kFALSE;

CbmDeviceMcbmEventBuilderWin::CbmDeviceMcbmEventBuilderWin() { fpAlgo = new CbmMcbm2019TimeWinEventBuilderAlgo(); }

void CbmDeviceMcbmEventBuilderWin::InitTask()
try {
  /// Read options from executable
  LOG(info) << "Init options for CbmDeviceMcbmEventBuilderWin.";
  fbFillHistos      = fConfig->GetValue<bool>("FillHistos");
  fbIgnoreTsOverlap = fConfig->GetValue<bool>("IgnOverMs");

  fsEvtOverMode   = fConfig->GetValue<std::string>("EvtOverMode");
  fsRefDet        = fConfig->GetValue<std::string>("RefDet");
  fvsAddDet       = fConfig->GetValue<std::vector<std::string>>("AddDet");
  fvsDelDet       = fConfig->GetValue<std::vector<std::string>>("DelDet");
  fvsSetTrigWin   = fConfig->GetValue<std::vector<std::string>>("SetTrigWin");
  fvsSetTrigMinNb = fConfig->GetValue<std::vector<std::string>>("SetTrigMinNb");

  fsChannelNameDataInput    = fConfig->GetValue<std::string>("TsNameIn");
  fsChannelNameDataOutput   = fConfig->GetValue<std::string>("EvtNameOut");
  fsChannelNameHistosInput  = fConfig->GetValue<std::string>("ChNameIn");
  fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
  fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
  fsAllowedChannels[0]      = fsChannelNameDataInput;

  fuPublishFreqTs  = fConfig->GetValue<uint32_t>("PubFreqTs");
  fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
  fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");

  // Get the information about created channels from the device
  // Check if the defined channels from the topology (by name)
  // are in the list of channels which are possible/allowed
  // for the device
  // The idea is to check at initilization if the devices are
  // properly connected. For the time beeing this is done with a
  // nameing convention. It is not avoided that someone sends other
  // data on this channel.
  //logger::SetLogLevel("INFO");
  int noChannel = fChannels.size();
  LOG(info) << "Number of defined channels: " << noChannel;
  for (auto const& entry : fChannels) {
    LOG(info) << "Channel name: " << entry.first;
    if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
      if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
      OnData(entry.first, &CbmDeviceMcbmEventBuilderWin::HandleData);
    }  // if( entry.first.find( "ts" )
  }    // for( auto const &entry : fChannels )

  //   InitContainers();

  /// FIXME: Disable clang formatting for now as it corrupts all alignment
  /* clang-format off */

  /// Initialize the Algorithm parameters
  fpAlgo->SetFillHistos(fbFillHistos);
  fpAlgo->SetIgnoreTsOverlap(fbIgnoreTsOverlap);
  /// Extract Event Overlap Mode
  EOverlapMode mode =
     ("NoOverlap"    == fsEvtOverMode ? EOverlapMode::NoOverlap
   : ("MergeOverlap" == fsEvtOverMode ? EOverlapMode::MergeOverlap
   : ("AllowOverlap" == fsEvtOverMode ? EOverlapMode::AllowOverlap
                                      : EOverlapMode::NoOverlap)));
  fpAlgo->SetEventOverlapMode(mode);
  /// Extract refdet
  EventBuilderDetector refDet = ("kBmon"   == fsRefDet ? kEventBuilderDetBmon
                              : ("kSts"  == fsRefDet ? kEventBuilderDetSts
                              : ("kMuch" == fsRefDet ? kEventBuilderDetMuch
                              : ("kTrd"  == fsRefDet ? kEventBuilderDetTrd
                              : ("kTof"  == fsRefDet ? kEventBuilderDetTof
                              : ("kRich" == fsRefDet ? kEventBuilderDetRich
                              : ("kPsd"  == fsRefDet ? kEventBuilderDetPsd
                                                     : kEventBuilderDetUndef)))))));
  if (kEventBuilderDetUndef != refDet) {
    fpAlgo->SetReferenceDetector(refDet);
  }  // if( kEventBuilderDetUndef != refDet )
  else {
    LOG(info) << "CbmDeviceMcbmEventBuilderWin::InitTask => Trying to change "
                 "reference to unsupported detector, ignored! "
              << fsRefDet;
  }  // else of if( kEventBuilderDetUndef != refDet

  /// Extract detector to add if any
  for (std::vector<std::string>::iterator itStrAdd = fvsAddDet.begin();
       itStrAdd != fvsAddDet.end();
       ++itStrAdd) {
    EventBuilderDetector addDet = ("kBmon"   == *itStrAdd ? kEventBuilderDetBmon
                                : ("kSts"  == *itStrAdd ? kEventBuilderDetSts
                                : ("kMuch" == *itStrAdd ? kEventBuilderDetMuch
                                : ("kTrd"  == *itStrAdd ? kEventBuilderDetTrd
                                : ("kTof"  == *itStrAdd ? kEventBuilderDetTof
                                : ("kRich" == *itStrAdd ? kEventBuilderDetRich
                                : ("kPsd"  == *itStrAdd ? kEventBuilderDetPsd
                                                        : kEventBuilderDetUndef)))))));
    if (kEventBuilderDetUndef != addDet) {
      fpAlgo->AddDetector(addDet);
    }  // if( kEventBuilderDetUndef != addDet )
    else {
      LOG(info) << "CbmDeviceMcbmEventBuilderWin::InitTask => Trying to add "
                   "unsupported detector, ignored! "
                << (*itStrAdd);
      continue;
    }  // else of if( kEventBuilderDetUndef != addDet )
  }  // for( std::vector< std::string >::iterator itStrAdd = fvsAddDet.begin(); itStrAdd != fvsAddDet.end(); ++itStrAdd )

     /// Extract detector to remove if any
  for (std::vector<std::string>::iterator itStrRem = fvsDelDet.begin();
       itStrRem != fvsDelDet.end();
       ++itStrRem) {
    EventBuilderDetector remDet = ("kBmon"   == *itStrRem ? kEventBuilderDetBmon
                                : ("kSts"  == *itStrRem ? kEventBuilderDetSts
                                : ("kMuch" == *itStrRem ? kEventBuilderDetMuch
                                : ("kTrd"  == *itStrRem ? kEventBuilderDetTrd
                                : ("kTof"  == *itStrRem ? kEventBuilderDetTof
                                : ("kRich" == *itStrRem ? kEventBuilderDetRich
                                : ("kPsd"  == *itStrRem ? kEventBuilderDetPsd
                                                        : kEventBuilderDetUndef)))))));
    if (kEventBuilderDetUndef != remDet) {
      fpAlgo->RemoveDetector(remDet);
    }  // if( kEventBuilderDetUndef != remDet )
    else {
      LOG(info) << "CbmDeviceMcbmEventBuilderWin::InitTask => Trying to remove "
                   "unsupported detector, ignored! "
                << (*itStrRem);
      continue;
    }  // else of if( kEventBuilderDetUndef != remDet )
  }  // for( std::vector< std::string >::iterator itStrAdd = fvsAddDet.begin(); itStrAdd != fvsAddDet.end(); ++itStrAdd )
     /// Extract Trigger window to add if any
  for (std::vector<std::string>::iterator itStrTrigWin = fvsSetTrigWin.begin();
       itStrTrigWin != fvsSetTrigWin.end();
       ++itStrTrigWin) {
    size_t charPosDel = (*itStrTrigWin).find(',');
    if (std::string::npos == charPosDel) {
      LOG(info)
        << "CbmDeviceMcbmEventBuilderWin::InitTask => "
        << "Trying to set trigger window with invalid option pattern, ignored! "
        << " (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found "
        << (*itStrTrigWin) << " )";
      continue;
    }  // if( std::string::npos == charPosDel )

    /// Detector Enum Tag
    std::string sSelDet = (*itStrTrigWin).substr(0, charPosDel);
    ECbmModuleId selDet = ("kBmon"   == sSelDet ? ECbmModuleId::kBmon
                        : ("kSts"  == sSelDet ? ECbmModuleId::kSts
                        : ("kMuch" == sSelDet ? ECbmModuleId::kMuch
                        : ("kTrd"  == sSelDet ? ECbmModuleId::kTrd
                        : ("kTof"  == sSelDet ? ECbmModuleId::kTof
                        : ("kRich" == sSelDet ? ECbmModuleId::kRich
                        : ("kPsd"  == sSelDet ? ECbmModuleId::kPsd
                                              : ECbmModuleId::kNotExist)))))));
    if (ECbmModuleId::kNotExist == selDet) {
      LOG(info)
        << "CbmDeviceMcbmEventBuilderWin::InitTask => "
        << "Trying to set trigger window for unsupported detector, ignored! "
        << sSelDet;
      continue;
    }  // if(  ECbmModuleId::kNotExist == selDet )

    /// Window beginning
    charPosDel++;
    std::string sNext = (*itStrTrigWin).substr(charPosDel);
    charPosDel        = sNext.find(',');
    if (std::string::npos == charPosDel) {
      LOG(info)
        << "CbmDeviceMcbmEventBuilderWin::InitTask => "
        << "Trying to set trigger window with invalid option pattern, ignored! "
        << " (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found "
        << (*itStrTrigWin) << " )";
      continue;
    }  // if( std::string::npos == charPosDel )
    Double_t dWinBeg = std::stod(sNext.substr(0, charPosDel));

    /// Window end
    charPosDel++;
    Double_t dWinEnd = std::stod(sNext.substr(charPosDel));

    fpAlgo->SetTriggerWindow(selDet, dWinBeg, dWinEnd);
  }  //       for( std::vector< std::string >::iterator itStrTrigWin = fvsSetTrigWin.begin(); itStrTrigWin != fvsSetTrigWin.end(); ++itStrTrigWin )
     /// Extract MinNb for trigger if any
  for (std::vector<std::string>::iterator itStrMinNb = fvsSetTrigMinNb.begin();
       itStrMinNb != fvsSetTrigMinNb.end();
       ++itStrMinNb) {
    size_t charPosDel = (*itStrMinNb).find(',');
    if (std::string::npos == charPosDel) {
      LOG(info)
        << "CbmDeviceMcbmEventBuilderWin::InitTask => "
        << "Trying to set trigger min Nb with invalid option pattern, ignored! "
        << " (Should be ECbmModuleId,uMinNb but instead found " << (*itStrMinNb)
        << " )";
      continue;
    }  // if( std::string::npos == charPosDel )

    /// Detector Enum Tag
    std::string sSelDet = (*itStrMinNb).substr(0, charPosDel);
    ECbmModuleId selDet = ("kBmon"   == sSelDet ? ECbmModuleId::kBmon
                        : ("kSts"  == sSelDet ? ECbmModuleId::kSts
                        : ("kMuch" == sSelDet ? ECbmModuleId::kMuch
                        : ("kTrd"  == sSelDet ? ECbmModuleId::kTrd
                        : ("kTof"  == sSelDet ? ECbmModuleId::kTof
                        : ("kRich" == sSelDet ? ECbmModuleId::kRich
                        : ("kPsd"  == sSelDet ? ECbmModuleId::kPsd
                                              : ECbmModuleId::kNotExist)))))));
    if (ECbmModuleId::kNotExist == selDet) {
      LOG(info)
        << "CbmDeviceMcbmEventBuilderWin::InitTask => "
        << "Trying to set trigger min Nb for unsupported detector, ignored! "
        << sSelDet;
      continue;
    }  // if(  ECbmModuleId::kNotExist == selDet )

    /// Min number
    charPosDel++;
    UInt_t uMinNb = std::stoul((*itStrMinNb).substr(charPosDel));

    fpAlgo->SetTriggerMinNumber(selDet, uMinNb);
  }  //    for( std::vector< std::string >::iterator itStrMinNb = fvsSetTrigMinNb.begin(); itStrMinNb != fvsSetTrigMinNb.end(); ++itStrMinNb )

  /// FIXME: Re-enable clang formatting after formatted lines
  /* clang-format on */

  /// Create input vectors
  fvDigiBmon = new std::vector<CbmTofDigi>();
  fvDigiSts  = new std::vector<CbmStsDigi>();
  fvDigiMuch = new std::vector<CbmMuchBeamTimeDigi>();
  fvDigiTrd  = new std::vector<CbmTrdDigi>();
  fvDigiTof  = new std::vector<CbmTofDigi>();
  fvDigiRich = new std::vector<CbmRichDigi>();
  fvDigiPsd  = new std::vector<CbmPsdDigi>();

  /// Register all input data members with the FairRoot manager
  fpRun                  = new FairRunOnline(0);
  FairRootManager* ioman = nullptr;
  ioman                  = FairRootManager::Instance();
  if (NULL == ioman) { throw InitTaskError("No FairRootManager instance"); }
  fTimeSliceMetaDataArray = new TClonesArray("TimesliceMetaData", 1);
  if (NULL == fTimeSliceMetaDataArray) { throw InitTaskError("Failed creating the TS meta data TClonesarray "); }
  ioman->Register("TimesliceMetaData", "TS Meta Data", fTimeSliceMetaDataArray, kFALSE);
  /// Digis storage
  ioman->RegisterAny("BmonDigi", fvDigiBmon, kFALSE);
  ioman->RegisterAny("StsDigi", fvDigiSts, kFALSE);
  ioman->RegisterAny("MuchBeamTimeDigi", fvDigiMuch, kFALSE);
  ioman->RegisterAny("TrdDigi", fvDigiTrd, kFALSE);
  ioman->RegisterAny("TofDigi", fvDigiTof, kFALSE);
  ioman->RegisterAny("RichDigi", fvDigiRich, kFALSE);
  ioman->RegisterAny("PsdDigi", fvDigiPsd, kFALSE);
  /// Feint to avoid crash of DigiManager due to missing source pointer
  /// validity check in FairRootManager.h at line 461
  std::vector<CbmMvdDigi>* pMvdDigi = new std::vector<CbmMvdDigi>();
  ioman->RegisterAny("MvdDigi", pMvdDigi, kFALSE);
  std::vector<CbmMatch>* pFakeMatch = new std::vector<CbmMatch>();
  ioman->RegisterAny("MvdDigiMatch", pFakeMatch, kFALSE);
  ioman->RegisterAny("StsDigiMatch", pFakeMatch, kFALSE);
  ioman->RegisterAny("MuchBeamTimeDigiMatch", pFakeMatch, kFALSE);
  ioman->RegisterAny("TrdDigiMatch", pFakeMatch, kFALSE);
  ioman->RegisterAny("TofDigiMatch", pFakeMatch, kFALSE);
  ioman->RegisterAny("RichDigiMatch", pFakeMatch, kFALSE);
  ioman->RegisterAny("PsdDigiMatch", pFakeMatch, kFALSE);

  /// Create output TClonesArray
  /// TODO: remove TObject from CbmEvent and switch to vectors!
  fEvents = new TClonesArray("CbmEvent", 500);

  /// Now that everything is set, initialize the Algorithm
  if (kFALSE == fpAlgo->InitAlgo()) {
    throw InitTaskError("Failed to initilize the algorithm class.");
  }  // if( kFALSE == fpAlgo->InitAlgo() )

  /// Histograms management
  if (kTRUE == fbFillHistos) {
    /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
    std::vector<std::pair<TNamed*, std::string>> vHistos = fpAlgo->GetHistoVector();
    /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
    std::vector<std::pair<TCanvas*, std::string>> vCanvases = fpAlgo->GetCanvasVector();

    /// Add pointers to each histo in the histo array
    /// Create histo config vector
    /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
    ///      and send it through a separate channel using the BoostSerializer
    for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
      //         LOG(info) << "Registering  " << vHistos[ uHisto ].first->GetName()
      //                   << " in " << vHistos[ uHisto ].second.data()
      //                   ;
      fArrayHisto.Add(vHistos[uHisto].first);
      std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
      fvpsHistosFolder.push_back(psHistoConfig);

      /// Serialize the vector of histo config into a single MQ message
      FairMQMessagePtr messageHist(NewMessage());
      //      Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, psHistoConfig);
      BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, psHistoConfig);

      /// Send message to the common histogram config messages queue
      if (Send(messageHist, fsChannelNameHistosConfig) < 0) {
        throw InitTaskError("Problem sending histo config");
      }  // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
      LOG(info) << "Config of hist  " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
    }  // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )

    /// Create canvas config vector
    /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
    ///      and send it through a separate channel using the BoostSerializer
    for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
      //         LOG(info) << "Registering  " << vCanvases[ uCanv ].first->GetName()
      //                   << " in " << vCanvases[ uCanv ].second.data();
      std::string sCanvName = (vCanvases[uCanv].first)->GetName();
      std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);

      std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);

      fvpsCanvasConfig.push_back(psCanvConfig);

      /// Serialize the vector of canvas config into a single MQ message
      FairMQMessagePtr messageCan(NewMessage());
      //      Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, psCanvConfig);
      BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, psCanvConfig);

      /// Send message to the common canvas config messages queue
      if (Send(messageCan, fsChannelNameCanvasConfig) < 0) {
        throw InitTaskError("Problem sending canvas config");
      }  // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )

      LOG(info) << "Config string of Canvas  " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
    }  //  for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
  }    // if( kTRUE == fbFillHistos )
}
catch (InitTaskError& e) {
  LOG(error) << e.what();
  // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
  cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}

bool CbmDeviceMcbmEventBuilderWin::IsChannelNameAllowed(std::string channelName)
{
  for (auto const& entry : fsAllowedChannels) {
    std::size_t pos1 = channelName.find(entry);
    if (pos1 != std::string::npos) {
      const vector<std::string>::const_iterator pos =
        std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
      const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
      LOG(info) << "Found " << entry << " in " << channelName;
      LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
      return true;
    }  // if (pos1!=std::string::npos)
  }    // for(auto const &entry : fsAllowedChannels)
  LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
  LOG(error) << "Stop device.";
  return false;
}
/*
Bool_t CbmDeviceMcbmEventBuilderWin::InitContainers()
{
   LOG(info) << "Init parameter containers for CbmDeviceMcbmEventBuilderWin.";

   if( kFALSE == InitParameters( fpAlgo ->GetParList() ) )
      return kFALSE;

   /// Need to add accessors for all options
   fpAlgo ->SetIgnoreOverlapMs( fbIgnoreOverlapMs );

   Bool_t initOK = fpAlgo->InitContainers();

//   Bool_t initOK = fMonitorAlgo->ReInitContainers();

  return initOK;
}

Bool_t CbmDeviceMcbmEventBuilderWin::InitParameters( TList* fParCList )
{
   for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
   {
      FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
      fParCList->Remove( tempObj );
      std::string paramName{ tempObj->GetName() };
      // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
      // Should only be used for small data because of the cost of an additional copy

      // Her must come the proper Runid
      std::string message = paramName + ",111";
      LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;

      FairMQMessagePtr req( NewSimpleMessage(message) );
      FairMQMessagePtr rep( NewMessage() );

      FairParGenericSet* newObj = nullptr;

      if( Send(req, "parameters") > 0 )
      {
         if( Receive( rep, "parameters" ) >= 0)
         {
            if( 0 !=  rep->GetSize() )
            {
               CbmMqTMessage tmsg( rep->GetData(), rep->GetSize() );
               newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
               LOG( info ) << "Received unpack parameter from the server:";
               newObj->print();
            } // if( 0 !=  rep->GetSize() )
               else
               {
                  LOG( error ) << "Received empty reply. Parameter not available";
                  return kFALSE;
               } // else of if( 0 !=  rep->GetSize() )
         } // if( Receive( rep, "parameters" ) >= 0)
      } // if( Send(req, "parameters") > 0 )
      fParCList->AddAt( newObj, iparC );
      delete tempObj;
   } // for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )

   return kTRUE;
}
*/
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMcbmEventBuilderWin::HandleData(FairMQParts& parts, int /*index*/)
{
  fulNumMessages++;
  LOG(debug) << "Received message number " << fulNumMessages << " with " << parts.Size() << " parts"
             << ", size0: " << parts.At(0)->GetSize();

  if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";

  /// Extract unpacked data from input message
  uint32_t uPartIdx = 0;
  /// TS metadata
  /// TODO: code order of vectors in the TS MetaData!!
  /*
  std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
                            ( parts.At( uPartIdx ) )->GetSize() );
  std::istringstream issTsMeta(msgStrTsMeta);
  boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
  inputArchiveTsMeta >> (*fTsMetaData);
  ++uPartIdx;
*/
  //  Deserialize<RootSerializer>(*parts.At(uPartIdx), fTsMetaData);
  RootSerializer().Deserialize(*parts.At(uPartIdx), fTsMetaData);
  /// FIXME: Not if this is the proper way to insert the data
  new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()
                                  //                                    ] ) TimesliceMetaData( *fTsMetaData ) ;
  ]) TimesliceMetaData(std::move(*fTsMetaData));
  ++uPartIdx;

  /// BMON
  std::string msgStrBmon(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
  std::istringstream issBmon(msgStrBmon);
  boost::archive::binary_iarchive inputArchiveBmon(issBmon);
  inputArchiveBmon >> *fvDigiBmon;
  ++uPartIdx;

  /// STS
  std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
  std::istringstream issSts(msgStrSts);
  boost::archive::binary_iarchive inputArchiveSts(issSts);
  inputArchiveSts >> *fvDigiSts;
  ++uPartIdx;

  /// MUCH
  std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
  std::istringstream issMuch(msgStrMuch);
  boost::archive::binary_iarchive inputArchiveMuch(issMuch);
  inputArchiveMuch >> *fvDigiMuch;
  ++uPartIdx;

  /// TRD
  std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
  std::istringstream issTrd(msgStrTrd);
  boost::archive::binary_iarchive inputArchiveTrd(issTrd);
  inputArchiveTrd >> *fvDigiTrd;
  ++uPartIdx;

  /// TOF
  std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
  std::istringstream issTof(msgStrTof);
  boost::archive::binary_iarchive inputArchiveTof(issTof);
  inputArchiveTof >> *fvDigiTof;
  ++uPartIdx;

  /// RICH
  std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
  std::istringstream issRich(msgStrRich);
  boost::archive::binary_iarchive inputArchiveRich(issRich);
  inputArchiveRich >> *fvDigiRich;
  ++uPartIdx;

  /// PSD
  std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
  std::istringstream issPsd(msgStrPsd);
  boost::archive::binary_iarchive inputArchivePsd(issPsd);
  inputArchivePsd >> *fvDigiPsd;
  ++uPartIdx;

  /// Call Algo ProcessTs method
  fpAlgo->ProcessTs();

  /// Send events vector to ouput
  if (!SendEvents(parts)) return false;

  /// Clear metadata
  fTimeSliceMetaDataArray->Clear();
  //   delete fTsMetaData;

  /// Clear vectors
  fvDigiBmon->clear();
  fvDigiSts->clear();
  fvDigiMuch->clear();
  fvDigiTrd->clear();
  fvDigiTof->clear();
  fvDigiRich->clear();
  fvDigiPsd->clear();

  /// Clear event vector after usage
  fpAlgo->ClearEventVector();
  fEvents->Clear("C");
  //   fEvents->Clear();

  /// Histograms management
  if (kTRUE == fbFillHistos) {
    /// Send histograms each 100 time slices. Should be each ~1s
    /// Use also runtime checker to trigger sending after M s if
    /// processing too slow or delay sending if processing too fast
    std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
    std::chrono::duration<double_t> elapsedSeconds    = currentTime - fLastPublishTime;
    if ((fdMaxPublishTime < elapsedSeconds.count())
        || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
      SendHistograms();
      fLastPublishTime = std::chrono::system_clock::now();
    }  // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
  }    // if( kTRUE == fbFillHistos )

  return true;
}

bool CbmDeviceMcbmEventBuilderWin::SendEvents(FairMQParts& partsIn)
{
  /// Clear events TClonesArray before usage.
  fEvents->Delete();
  //   fEvents->Clear();

  /// Get vector reference from algo
  std::vector<CbmEvent*> vEvents = fpAlgo->GetEventVector();

  /// Move CbmEvent from temporary vector to TClonesArray
  for (CbmEvent* event : vEvents) {
    LOG(debug) << "Vector: " << event->ToString();
    new ((*fEvents)[fEvents->GetEntriesFast()]) CbmEvent(std::move(*event));
    //      new ( (*fEvents)[fEvents->GetEntriesFast()] ) CbmEvent( *event );
    LOG(debug) << "TClonesArray: " << static_cast<CbmEvent*>(fEvents->At(fEvents->GetEntriesFast() - 1))->ToString();
  }  // for( CbmEvent* event: vEvents )

  /// Serialize the array of events into a single MQ message
  FairMQMessagePtr message(NewMessage());
  //  Serialize<RootSerializer>(*message, fEvents);
  RootSerializer().Serialize(*message, fEvents);

  /// Add it at the end of the input composed message
  /// FIXME: use move or fix addition of new part to avoid full message copy
  FairMQParts partsOut(std::move(partsIn));
  partsOut.AddPart(std::move(message));

  //   /// Get vector from algo
  //   fEventVector = fpAlgo->GetEventVector();
  //
  //   /// Prepare serialized versions of the events vector
  //   std::stringstream ossEvents;
  //   boost::archive::binary_oarchive oaEvents(ossEvents);
  //   oaEvents << fpAlgo->GetEventVector();
  //   std::string* strMsgEvents = new std::string(ossEvents.str());
  //
  //   /// Create message
  //   FairMQMessagePtr msg( NewMessage( const_cast< char * >( strMsgEvents->c_str() ), // data
  //                                     strMsgEvents->length(), // size
  //                                     []( void * /*data*/, void* object ){ delete static_cast< std::string * >( object ); },
  //                                     strMsgEvents ) ); // object that manages the data

  /// Send message
  //   if( Send( message, fsChannelNameDataOutput ) < 0 )
  if (Send(partsOut, fsChannelNameDataOutput) < 0) {
    LOG(error) << "Problem sending data to " << fsChannelNameDataOutput;
    return false;
  }

  return true;
}

bool CbmDeviceMcbmEventBuilderWin::SendHistograms()
{
  /// Serialize the array of histos into a single MQ message
  FairMQMessagePtr message(NewMessage());
  //  Serialize<RootSerializer>(*message, &fArrayHisto);
  RootSerializer().Serialize(*message, &fArrayHisto);

  /// Send message to the common histogram messages queue
  if (Send(message, fsChannelNameHistosInput) < 0) {
    LOG(error) << "Problem sending data";
    return false;
  }  // if( Send( message, fsChannelNameHistosInput ) < 0 )

  /// Reset the histograms after sending them (but do not reset the time)
  fpAlgo->ResetHistograms(kFALSE);

  return true;
}

CbmDeviceMcbmEventBuilderWin::~CbmDeviceMcbmEventBuilderWin()
{
  /// Clear metadata
  fTimeSliceMetaDataArray->Clear();
  delete fTsMetaData;

  /// Clear vectors
  fvDigiBmon->clear();
  fvDigiSts->clear();
  fvDigiMuch->clear();
  fvDigiTrd->clear();
  fvDigiTof->clear();
  fvDigiRich->clear();
  fvDigiPsd->clear();

  /// Clear events TClonesArray
  fEvents->Delete();

  delete fpRun;

  delete fTimeSliceMetaDataArray;
  delete fEvents;

  delete fpAlgo;
}

void CbmDeviceMcbmEventBuilderWin::Finish() {}