Skip to content
Snippets Groups Projects
CbmDeviceEventBuilderEtofStar2019.cxx 19.65 KiB
/* Copyright (C) 2019-2021 PI-UHd, GSI
   SPDX-License-Identifier: GPL-3.0-only
   Authors: Norbert Herrmann [committer] */

/**
 * CbmDeviceEventBuilderEtofStar2019.cxx
 */

#include "CbmDeviceEventBuilderEtofStar2019.h"

#include "CbmMQDefs.h"
#include "CbmStar2019EventBuilderEtofAlgo.h"
#include "CbmStar2019TofPar.h"

#include "StorableTimeslice.hpp"

#include "FairMQLogger.h"
#include "FairMQProgOptions.h"  // device->fConfig
#include "FairParGenericSet.h"
#include "FairRuntimeDb.h"

#include "TFile.h"
#include "TH1.h"
#include "TH2.h"
#include "THttpServer.h"
#include "TROOT.h"
#include "TString.h"

#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>

// include this header to serialize vectors
#include <boost/serialization/vector.hpp>

#include <array>
#include <iomanip>
#include <stdexcept>
#include <string>
struct InitTaskError : std::runtime_error {
  using std::runtime_error::runtime_error;
};

using namespace std;

//static Int_t iMess=0;
const Int_t DetMask = 0x003FFFFF;
static uint fiSelectComponents {0};

CbmDeviceEventBuilderEtofStar2019::CbmDeviceEventBuilderEtofStar2019()
  :  //CbmDeviceUnpackTofMcbm2018(),
  fNumMessages(0)
  , fbMonitorMode(kFALSE)
  , fbDebugMonitorMode(kFALSE)
  , fbSandboxMode(kFALSE)
  , fbEventDumpEna(kFALSE)
  , fParCList(nullptr)
  , fulTsCounter(0)
  , fNumEvt(0)
  , fEventBuilderAlgo(nullptr)
  , fTimer()
  , fUnpackPar(nullptr)
  , fpBinDumpFile(nullptr)
{
  fEventBuilderAlgo = new CbmStar2019EventBuilderEtofAlgo();
}

CbmDeviceEventBuilderEtofStar2019::~CbmDeviceEventBuilderEtofStar2019() { delete fEventBuilderAlgo; }

void CbmDeviceEventBuilderEtofStar2019::InitTask()
try {
  // Get the information about created channels from the device
  // Check if the defined channels from the topology (by name)
  // are in the list of channels which are possible/allowed
  // for the device
  // The idea is to check at initilization if the devices are
  // properly connected. For the time beeing this is done with a
  // nameing convention. It is not avoided that someone sends other
  // data on this channel.
  //logger::SetLogLevel("INFO");

  int noChannel = fChannels.size();
  LOG(info) << "Number of defined channels: " << noChannel;
  for (auto const& entry : fChannels) {
    LOG(info) << "Channel name: " << entry.first;
    if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
    if (entry.first == "syscmd") {
      OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleMessage);
      continue;
    }
    //if(entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleData);
    if (entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleParts);
    else {
      fChannelsToSend[0].push_back(entry.first);
      LOG(info) << "Init to send data to channel " << fChannelsToSend[0][0];
    }
  }
  InitContainers();
}
catch (InitTaskError& e) {
  LOG(error) << e.what();
  // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
  cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}

bool CbmDeviceEventBuilderEtofStar2019::IsChannelNameAllowed(std::string channelName)
{
  for (auto const& entry : fAllowedChannels) {
    LOG(info) << "Inspect " << entry;
    std::size_t pos1 = channelName.find(entry);
    if (pos1 != std::string::npos) {
      const vector<std::string>::const_iterator pos =
        std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
      const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
      LOG(info) << "Found " << entry << " in " << channelName;
      LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
      return true;
    }
  }
  LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
  LOG(error) << "Stop device.";
  return false;
}

Bool_t CbmDeviceEventBuilderEtofStar2019::InitContainers()
{
  LOG(info) << "Init parameter containers for CbmDeviceEventBuilderEtofStar2019.";
  //  FairRuntimeDb* fRtdb = FairRuntimeDb::instance();

  // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
  // Should only be used for small data because of the cost of an additional copy
  std::string message {"CbmStar2019TofPar,111"};
  LOG(info) << "Requesting parameter container CbmStar2019TofPar, sending message: " << message;

  FairMQMessagePtr req(NewSimpleMessage("CbmStar2019TofPar,111"));
  FairMQMessagePtr rep(NewMessage());

  if (Send(req, "parameters") > 0) {
    if (Receive(rep, "parameters") >= 0) {
      if (rep->GetSize() != 0) {
        CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
        fUnpackPar = dynamic_cast<CbmStar2019TofPar*>(tmsg.ReadObject(tmsg.GetClass()));
        LOG(info) << "Received unpack parameter from parmq server: " << fUnpackPar;
        fUnpackPar->Print();
      }
      else {
        LOG(error) << "Received empty reply. Parameter not available";
      }
    }
  }


  SetParContainers();

  Bool_t initOK = kTRUE;
  initOK &= fEventBuilderAlgo->InitContainers();
  initOK &= ReInitContainers();  // needed for TInt parameters
  fEventBuilderAlgo->SetAddStatusToEvent(true);

  if (kTRUE == fbMonitorMode) {  //  CreateHistograms();
    initOK &= fEventBuilderAlgo->CreateHistograms();

    /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
    std::vector<std::pair<TNamed*, std::string>> vHistos = fEventBuilderAlgo->GetHistoVector();
    /* FIXME
      /// Register the histos in the HTTP server
      THttpServer* server = FairRunOnline::Instance()->GetHttpServer();
      for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
      {
         server->Register( Form( "/%s", vHistos[ uHisto ].second.data() ), vHistos[ uHisto ].first );
      } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )

      server->RegisterCommand("/Reset_EvtBuild_Hist", "bStarEtof2019EventBuilderResetHistos=kTRUE");
      server->Restrict("/Reset_EvtBuild_Hist", "allow=admin");
     */
  }  // if( kTRUE == fbMonitorMode )

  return initOK;
}

void CbmDeviceEventBuilderEtofStar2019::SetParContainers()
{
  FairRuntimeDb* fRtdb = FairRuntimeDb::instance();

  fParCList = fEventBuilderAlgo->GetParList();

  LOG(info) << "Setting parameter containers for " << fParCList->GetEntries() << " entries ";

  for (Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC) {
    FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
    fParCList->Remove(tempObj);

    std::string sParamName {tempObj->GetName()};

    FairParGenericSet* newObj = dynamic_cast<FairParGenericSet*>(fRtdb->getContainer(sParamName.data()));
    LOG(info) << " - Get " << sParamName.data() << " at " << newObj;
    if (nullptr == newObj) {

      LOG(error) << "Failed to obtain parameter container " << sParamName << ", for parameter index " << iparC;
      return;
    }  // if( nullptr == newObj )
    if (iparC == 0) {
      newObj = (FairParGenericSet*) fUnpackPar;
      LOG(info) << " - Mod " << sParamName.data() << " to " << newObj;
    }
    fParCList->AddAt(newObj, iparC);
    //      delete tempObj;
  }  // for( Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC )
}

void CbmDeviceEventBuilderEtofStar2019::AddMsComponentToList(size_t component, UShort_t usDetectorId)
{
  fEventBuilderAlgo->AddMsComponentToList(component, usDetectorId);
}

Bool_t CbmDeviceEventBuilderEtofStar2019::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
{
  if (0 == fulTsCounter) {
    LOG(info) << "FIXME ===> Jumping 1st TS as corrupted with current FW + "
                 "FLESNET combination";
    fulTsCounter++;
    return kTRUE;
  }  // if( 0 == fulTsCounter )
  if (kFALSE == fEventBuilderAlgo->ProcessTs(ts)) {
    LOG(error) << "Failed processing TS " << ts.index() << " in event builder algorithm class";
    return kTRUE;
  }  // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )

  std::vector<CbmTofStarSubevent2019>& eventBuffer = fEventBuilderAlgo->GetEventBuffer();

  for (UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent) {
    /// Send the sub-event to the STAR systems
    Int_t iBuffSzByte = 0;
    void* pDataBuff   = eventBuffer[uEvent].BuildOutput(iBuffSzByte);
    if (NULL != pDataBuff) {
      /// Valid output, do stuff with it!
      //         Bool_t fbSendEventToStar = kFALSE;
      if (kFALSE == fbSandboxMode) {
        /*
             ** Function to send sub-event block to the STAR DAQ system
             *       trg_word received is packed as:
             *
             *       trg_cmd|daq_cmd|tkn_hi|tkn_mid|tkn_lo
             */
        /*
            star_rhicf_write( eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),
                              pDataBuff, iBuffSzByte );
	   */

        SendSubevent(eventBuffer[uEvent].GetTrigger().GetStarTrigerWord(), (char*) pDataBuff, iBuffSzByte, 0);

      }  // if( kFALSE == fbSandboxMode )

      LOG(debug) << "Sent STAR event with size " << iBuffSzByte << " Bytes"
                 << " and token " << eventBuffer[uEvent].GetTrigger().GetStarToken();
    }  // if( NULL != pDataBuff )
    else
      LOG(error) << "Invalid STAR SubEvent Output, can only happen if trigger "
                 << " object was not set => Do Nothing more with it!!! ";
  }  // for( UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent )

  return kTRUE;
}


Bool_t CbmDeviceEventBuilderEtofStar2019::ReInitContainers()
{
  LOG(info) << "ReInit parameter containers for CbmDeviceEventBuilderEtofStar2019";
  Bool_t initOK = fEventBuilderAlgo->ReInitContainers();
  return initOK;
}

// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceEventBuilderEtofStar2019::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
  // Don't do anything with the data
  // Maybe add an message counter which counts the incomming messages and add
  // an output
  fNumMessages++;
  LOG(debug) << "Received message number " << fNumMessages << " with size " << msg->GetSize();
  std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
  std::istringstream iss(msgStr);
  boost::archive::binary_iarchive inputArchive(iss);

  fles::StorableTimeslice component {0};
  inputArchive >> component;

  CheckTimeslice(component);

  DoUnpack(component, 0);

  //  if(fNumMessages%10000 == 0) LOG(info)<<"Processed "<<fNumMessages<<" time slices";

  return true;
}

static Double_t dctime = 0.;

bool CbmDeviceEventBuilderEtofStar2019::HandleParts(FairMQParts& parts, int /*index*/)
{
  // Don't do anything with the data
  // Maybe add an message counter which counts the incomming messages and add
  // an output
  fNumMessages++;
  LOG(debug) << "Received message number " << fNumMessages << " with " << parts.Size() << " parts";

  fles::StorableTimeslice ts {0};  // rename ??? FIXME

  switch (fiSelectComponents) {
    case 0: {
      std::string msgStr(static_cast<char*>(parts.At(0)->GetData()), (parts.At(0))->GetSize());
      std::istringstream iss(msgStr);
      boost::archive::binary_iarchive inputArchive(iss);
      inputArchive >> ts;
      CheckTimeslice(ts);
      if (1 == fNumMessages) {
        LOG(info) << "Initialize TS components list to " << ts.num_components();
        for (size_t c {0}; c < ts.num_components(); c++) {
          auto systemID = static_cast<int>(ts.descriptor(c, 0).sys_id);
          LOG(info) << "Found systemID: " << std::hex << systemID << std::dec;
          fEventBuilderAlgo->AddMsComponentToList(c, systemID);  // TOF data
        }
      }
    } break;
    case 1: {
      fles::StorableTimeslice component {0};

      uint ncomp = parts.Size();
      for (uint i = 0; i < ncomp; i++) {
        std::string msgStr(static_cast<char*>(parts.At(i)->GetData()), (parts.At(i))->GetSize());
        std::istringstream iss(msgStr);
        boost::archive::binary_iarchive inputArchive(iss);
        //fles::StorableTimeslice component{i};
        inputArchive >> component;

        CheckTimeslice(component);
        fEventBuilderAlgo->AddMsComponentToList(i, 0x60);  // TOF data
        LOG(debug) << "HandleParts message " << fNumMessages << " with indx " << component.index();
      }
    } break;
    default:;
  }

  if (kFALSE == fEventBuilderAlgo->ProcessTs(ts)) {
    LOG(error) << "Failed processing TS " << ts.index() << " in event builder algorithm class";
    return kTRUE;
  }  // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )

  std::vector<CbmTofStarSubevent2019>& eventBuffer = fEventBuilderAlgo->GetEventBuffer();
  LOG(debug) << "Process time slice " << fNumMessages << " with " << eventBuffer.size() << " events";

  //if(fNumMessages%10000 == 0) LOG(info)<<"Processed "<<fNumMessages<<" time slices";

  for (UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent) {
    /// Send the sub-event to the STAR systems
    Int_t iBuffSzByte = 0;
    void* pDataBuff   = eventBuffer[uEvent].BuildOutput(iBuffSzByte);
    if (NULL != pDataBuff) {
      /// Valid output, do stuff with it!
      // Send to Star TriggerHandler, TBD
      if (kFALSE == fbSandboxMode) {
        /*
             ** Function to send sub-event block to the STAR DAQ system
             *       trg_word received is packed as:
             *
             *       trg_cmd|daq_cmd|tkn_hi|tkn_mid|tkn_lo
             */
        /*
            star_rhicf_write( eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),
                              pDataBuff, iBuffSzByte );
	   */
      }  // if( kFALSE == fbSandboxMode )
      SendSubevent(eventBuffer[uEvent].GetTrigger().GetStarTrigerWord(), (char*) pDataBuff, iBuffSzByte, 0);

      LOG(debug) << "Sent STAR event " << uEvent << " with size " << iBuffSzByte << " Bytes"
                 << ", token " << eventBuffer[uEvent].GetTrigger().GetStarToken() << ", TrigWord "
                 << eventBuffer[uEvent].GetTrigger().GetStarTrigerWord();
    }
  }

  if (0 == fulTsCounter % 10000) {
    LOG(info) << "Processed " << fulTsCounter << " TS,  CPUtime: " << dctime / 10. << " ms/TS";
    dctime = 0.;
  }
  fulTsCounter++;
  return true;
}

bool CbmDeviceEventBuilderEtofStar2019::HandleMessage(FairMQMessagePtr& msg, int /*index*/)
{
  const char* cmd    = (char*) (msg->GetData());
  const char cmda[4] = {*cmd};
  LOG(info) << "Handle message " << cmd << ", " << cmd[0];
  cbm::mq::LogState(this);

  // only one implemented so far "Stop"

  if (strcmp(cmda, "STOP")) {
    LOG(info) << "STOP";
    cbm::mq::ChangeState(this, cbm::mq::Transition::Ready);
    cbm::mq::LogState(this);
    cbm::mq::ChangeState(this, cbm::mq::Transition::DeviceReady);
    cbm::mq::LogState(this);
    cbm::mq::ChangeState(this, cbm::mq::Transition::Idle);
    cbm::mq::LogState(this);
    cbm::mq::ChangeState(this, cbm::mq::Transition::End);
    cbm::mq::LogState(this);
  }
  return true;
}


bool CbmDeviceEventBuilderEtofStar2019::CheckTimeslice(const fles::Timeslice& ts)
{
  if (0 == ts.num_components()) {
    LOG(error) << "No Component in TS " << ts.index();
    return 1;
  }
  auto tsIndex = ts.index();
  LOG(debug) << "Found " << ts.num_components() << " different components in timeslice " << tsIndex;

  /*
  for (size_t c = 0; c < ts.num_components(); ++c) {
    LOG(debug) << "Found " << ts.num_microslices(c)
              << " microslices in component " << c;
    LOG(debug) << "Component " << c << " has a size of "
              << ts.size_component(c) << " bytes";
    LOG(debug) << "Sys ID: Ox" << std::hex << static_cast<int>(ts.descriptor(0,0).sys_id)
            << std::dec;

    for (size_t m = 0; m < ts.num_microslices(c); ++m) {
      PrintMicroSliceDescriptor(ts.descriptor(c,m));
    }
  }
*/
  return true;
}

bool CbmDeviceEventBuilderEtofStar2019::SendEvent(std::vector<Int_t> vdigi, int idx)
{
  LOG(debug) << "Send Data for event " << fNumEvt << " with size " << vdigi.size() << Form(" at %p ", &vdigi);
  //  LOG(debug) << "EventHeader: "<< fEventHeader[0] << " " << fEventHeader[1] << " " << fEventHeader[2] << " " << fEventHeader[3];

  std::stringstream oss;
  boost::archive::binary_oarchive oa(oss);
  oa << vdigi;
  std::string* strMsg = new std::string(oss.str());

  FairMQParts parts;
  parts.AddPart(NewMessage(
    const_cast<char*>(strMsg->c_str()),  // data
    strMsg->length(),                    // size
    [](void*, void* object) { delete static_cast<std::string*>(object); },
    strMsg));  // object that manages the data

  LOG(debug) << "Send data to channel " << idx << " " << fChannelsToSend[idx][0];


  //  if (Send(msg, fChannelsToSend[idx][0]) < 0) {
  if (Send(parts, fChannelsToSend[idx][0]) < 0) {
    LOG(error) << "Problem sending data " << fChannelsToSend[idx][0];
    return false;
  }
  fNumEvt++;
  //if(fNumEvt==100) FairMQStateMachine::ChangeState(PAUSE); //sleep(10000); // Stop for debugging ...
  return true;
}

bool CbmDeviceEventBuilderEtofStar2019::SendSubevent(uint trig, char* pData, int nData, int idx)
{

  LOG(debug) << "SendSubevent " << fNumEvt << ", TrigWord " << trig << " with size " << nData << Form(" at %p ", pData);

  std::stringstream ossE;
  boost::archive::binary_oarchive oaE(ossE);
  oaE << trig;
  std::string* strMsgE = new std::string(ossE.str());

  /*
  std::stringstream oss;
  boost::archive::binary_oarchive oa(oss);
  oa << cData;
  std::string* strMsg = new std::string(oss.str());
  */

  std::string* strMsg = new std::string(pData, nData);

  FairMQParts parts;
  parts.AddPart(NewMessage(
    const_cast<char*>(strMsgE->c_str()),  // data
    strMsgE->length(),                    // size
    [](void*, void* object) { delete static_cast<std::string*>(object); },
    strMsgE));  // object that manages the data

  parts.AddPart(NewMessage(
    const_cast<char*>(strMsg->c_str()),  // data
    strMsg->length(),                    // size
    [](void*, void* object) { delete static_cast<std::string*>(object); },
    strMsg));  // object that manages the data

  LOG(debug) << "Send data to channel " << idx << " " << fChannelsToSend[idx][0];


  //  if (Send(msg, fChannelsToSend[idx][0]) < 0) {
  if (Send(parts, fChannelsToSend[idx][0]) < 0) {
    LOG(error) << "Problem sending data " << fChannelsToSend[idx][0];
    return false;
  }
  fNumEvt++;
  //if(fNumEvt==100) FairMQStateMachine::ChangeState(PAUSE); //sleep(10000); // Stop for debugging ...
  return true;
}

void CbmDeviceEventBuilderEtofStar2019::Reset() {}

void CbmDeviceEventBuilderEtofStar2019::Finish()
{
  if (NULL != fpBinDumpFile) {
    LOG(info) << "Closing binary file used for event dump.";
    fpBinDumpFile->close();
  }  // if( NULL != fpBinDumpFile )

  /// If monitor mode enabled, trigger histos creation, obtain pointer on them and add them to the HTTP server
  if (kTRUE == fbMonitorMode) {
    /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
    std::vector<std::pair<TNamed*, std::string>> vHistos = fEventBuilderAlgo->GetHistoVector();

    /// Save old global file and folder pointer to avoid messing with FairRoot
    TFile* oldFile     = gFile;
    TDirectory* oldDir = gDirectory;

    /// (Re-)Create ROOT file to store the histos
    TFile* histoFile = nullptr;

    // open separate histo file in recreate mode
    histoFile = new TFile("data/eventBuilderMonHist.root", "RECREATE");
    histoFile->cd();

    /// Register the histos in the HTTP server
    for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
      /// Make sure we end up in chosen folder
      gDirectory->mkdir(vHistos[uHisto].second.data());
      gDirectory->cd(vHistos[uHisto].second.data());

      /// Write plot
      vHistos[uHisto].first->Write();

      histoFile->cd();
    }  // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )

    /// Restore old global file and folder pointer to avoid messing with FairRoot
    gFile      = oldFile;
    gDirectory = oldDir;

    histoFile->Close();
  }  // if( kTRUE == fbMonitorMode )
}