Skip to content
Snippets Groups Projects
Select Git revision
  • f9e0943e376629bbafd30654960ba472592f25f3
  • master default protected
  • nightly_master
  • online_mvd_readconf_cleanup protected
  • online_much_readconf_cleanup protected
  • jul25_patches
  • cleanup_rich_v25a
  • jul24_patches
  • nov23_patches
  • DC_2404
  • nighly_master
  • DC_Jan24
  • DC_Nov23
  • DC_Oct23
  • feb23_patches
  • L1Algo-dev9
  • dec21_patches protected
  • apr21_patches protected
  • dev_2025_46
  • dev_2025_45
  • dev_2025_44
  • dev_2025_43
  • dev_2025_42
  • dev_2025_41
  • dev_2025_40
  • dev_2025_39
  • dev_2025_38
  • dev_2025_37
  • dev_2025_36
  • dev_2025_35
  • dev_2025_34
  • dev_2025_33
  • dev_2025_32
  • dev_2025_31
  • dev_2025_30
  • RC_jul25
  • dev_2025_29
  • dev_2025_28
38 results

CbmDeviceMonitorBmon.cxx

Blame
  • Eoin Clerkin's avatar
    Eoin Clerkin authored and Pierre-Alain Loizeau committed
    Updates T0 digi classes with Bmon exquivalents. Standardises
    the use of Bmon across code base and, in-use and legacy macros.
    4f519993
    History
    CbmDeviceMonitorBmon.cxx 16.09 KiB
    /* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
       SPDX-License-Identifier: GPL-3.0-only
       Authors: Pierre-Alain Loizeau, Florian Uhlig [committer] */
    
    /**
     * CbmDeviceMonitorBmon.cxx
     *
     * @since 2019-03-26
     * @author F. Uhlig
     */
    
    #include "CbmDeviceMonitorBmon.h"
    
    #include "CbmFlesCanvasTools.h"
    #include "CbmMcbm2018MonitorAlgoBmon.h"
    
    #include "StorableTimeslice.hpp"
    
    #include "FairMQLogger.h"
    #include "FairMQProgOptions.h"  // device->fConfig
    #include "FairParGenericSet.h"
    
    #include "TCanvas.h"
    #include "TFile.h"
    #include "TH1.h"
    #include "TList.h"
    #include "TNamed.h"
    
    #include "BoostSerializer.h"
    #include <boost/archive/binary_iarchive.hpp>
    #include <boost/serialization/utility.hpp>
    
    #include <array>
    #include <iomanip>
    #include <stdexcept>
    #include <string>
    
    #include "RootSerializer.h"
    struct InitTaskError : std::runtime_error {
      using std::runtime_error::runtime_error;
    };
    
    using namespace std;
    
    
    CbmDeviceMonitorBmon::CbmDeviceMonitorBmon()
      : fbIgnoreOverlapMs {false}
      , fsChannelNameDataInput {"t0component"}
      , fsChannelNameHistosInput {"histogram-in"}
      , fuHistoryHistoSize {3600}
      , fuMinTotPulser {185}
      , fuMaxTotPulser {195}
      , fuOffSpillCountLimit {25}
      , fuOffSpillCountLimitNonPulser {10}
      , fdSpillCheckInterval {0.0128}
      , fvuChanMap {0, 1, 2, 3, 4, 5, 6, 7}
      , fuPublishFreqTs {100}
      , fdMinPublishTime {0.5}
      , fdMaxPublishTime {5.0}
      , fsAllowedChannels {fsChannelNameDataInput}
      , fParCList {nullptr}
      , fulNumMessages {0}
      , fulTsCounter {0}
      , fLastPublishTime {std::chrono::system_clock::now()}
      , fMonitorAlgo {new CbmMcbm2018MonitorAlgoBmon()}
      , fArrayHisto {}
      , fvpsHistosFolder {}
      , fvpsCanvasConfig {}
    {
    }
    
    void CbmDeviceMonitorBmon::InitTask()
    try {
      /// Read options from executable
      LOG(info) << "Init options for CbmMqStarHistoServer.";
      fbIgnoreOverlapMs             = fConfig->GetValue<bool>("IgnOverMs");
      fuHistoryHistoSize            = fConfig->GetValue<uint32_t>("HistEvoSz");
      fuMinTotPulser                = fConfig->GetValue<uint32_t>("PulsTotMin");
      fuMaxTotPulser                = fConfig->GetValue<uint32_t>("PulsTotMax");
      fuOffSpillCountLimit          = fConfig->GetValue<uint32_t>("SpillThr");
      fuOffSpillCountLimitNonPulser = fConfig->GetValue<uint32_t>("SpillThrNonPuls");
      fdSpillCheckInterval          = fConfig->GetValue<double>("SpillCheckInt");
      std::string sChanMap          = fConfig->GetValue<std::string>("ChanMap");
      fuPublishFreqTs               = fConfig->GetValue<uint32_t>("PubFreqTs");
      fdMinPublishTime              = fConfig->GetValue<double_t>("PubTimeMin");
      fdMaxPublishTime              = fConfig->GetValue<double_t>("PubTimeMax");
      fsChannelNameDataInput        = fConfig->GetValue<std::string>("TsNameIn");
      fsChannelNameHistosInput      = fConfig->GetValue<std::string>("ChNameIn");
      fsAllowedChannels[0]          = fsChannelNameDataInput;
    
      UInt_t uChanIdx   = 0;
      size_t charPosDel = sChanMap.find(',');
      while (uChanIdx < fvuChanMap.size() && std::string::npos != charPosDel) {
        LOG(info) << sChanMap.substr(0, charPosDel);
        fvuChanMap[uChanIdx] = std::stoul(sChanMap.substr(0, charPosDel));
        LOG(info) << fvuChanMap[uChanIdx];
        sChanMap = sChanMap.substr(charPosDel + 1);
        LOG(info) << sChanMap;
        uChanIdx++;
        charPosDel = sChanMap.find(',');
      }  // while( uChanIdx < fvuChanMap.size() && std::string::npos != charPosDel )
      if (uChanIdx < fvuChanMap.size()) {
        LOG(info) << sChanMap;
        fvuChanMap[uChanIdx] = std::stoul(sChanMap);
        LOG(info) << fvuChanMap[uChanIdx];
      }  // if( uChanIdx < fvuChanMap.size() )
    
      LOG(info) << "Histograms publication frequency in TS:    " << fuPublishFreqTs;
      LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
      LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
    
      // Get the information about created channels from the device
      // Check if the defined channels from the topology (by name)
      // are in the list of channels which are possible/allowed
      // for the device
      // The idea is to check at initilization if the devices are
      // properly connected. For the time beeing this is done with a
      // nameing convention. It is not avoided that someone sends other
      // data on this channel.
      //logger::SetLogLevel("INFO");
    
      int noChannel = fChannels.size();
      LOG(info) << "Number of defined channels: " << noChannel;
      for (auto const& entry : fChannels) {
        LOG(info) << "Channel name: " << entry.first;
        if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
          if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
          OnData(entry.first, &CbmDeviceMonitorBmon::HandleData);
        }  // if( entry.first.find( "ts" )
      }    // for( auto const &entry : fChannels )
    }
    catch (InitTaskError& e) {
      LOG(error) << e.what();
      ChangeState(fair::mq::Transition::ErrorFound);
    }
    
    bool CbmDeviceMonitorBmon::IsChannelNameAllowed(std::string channelName)
    {
      for (auto const& entry : fsAllowedChannels) {
        std::size_t pos1 = channelName.find(entry);
        if (pos1 != std::string::npos) {
          const vector<std::string>::const_iterator pos =
            std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
          const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
          LOG(info) << "Found " << entry << " in " << channelName;
          LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
          return true;
        }  // if (pos1!=std::string::npos)
      }    // for(auto const &entry : fsAllowedChannels)
      LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
      LOG(error) << "Stop device.";
      return false;
    }
    
    bool CbmDeviceMonitorBmon::InitContainers()
    {
      LOG(info) << "Init parameter containers for CbmDeviceMonitorBmon.";
    
      fParCList = fMonitorAlgo->GetParList();
    
      for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
        FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
        fParCList->Remove(tempObj);
        std::string paramName {tempObj->GetName()};
        // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
        // Should only be used for small data because of the cost of an additional copy
    
        // Her must come the proper Runid
        std::string message = paramName + ",111";
        LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
    
        FairMQMessagePtr req(NewSimpleMessage(message));
        FairMQMessagePtr rep(NewMessage());
    
        FairParGenericSet* newObj = nullptr;
    
        if (Send(req, "parameters") > 0) {
          if (Receive(rep, "parameters") >= 0) {
            if (rep->GetSize() != 0) {
              CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
              newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
              LOG(info) << "Received unpack parameter from the server:";
              newObj->print();
            }
            else {
              LOG(error) << "Received empty reply. Parameter not available";
            }  // if (rep->GetSize() != 0)
          }    // if (Receive(rep, "parameters") >= 0)
        }      // if (Send(req, "parameters") > 0)
        fParCList->AddAt(newObj, iparC);
        delete tempObj;
      }  // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
    
      /// Need to add accessors for all options
      fMonitorAlgo->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
      fMonitorAlgo->SetMonitorMode(kTRUE);
      fMonitorAlgo->SetHistoryHistoSize(fuHistoryHistoSize);
      fMonitorAlgo->SetPulserTotLimits(fuMinTotPulser, fuMaxTotPulser);
      fMonitorAlgo->SetSpillThreshold(fuOffSpillCountLimit);
      fMonitorAlgo->SetSpillThresholdNonPulser(fuOffSpillCountLimitNonPulser);
      fMonitorAlgo->SetSpillCheckInterval(fdSpillCheckInterval);
      fMonitorAlgo->SetChannelMap(fvuChanMap[0], fvuChanMap[1], fvuChanMap[2], fvuChanMap[3], fvuChanMap[4], fvuChanMap[5],
                                  fvuChanMap[6], fvuChanMap[7]);
    
      //   fMonitorAlgo->AddMsComponentToList(0, 0x90);
    
      Bool_t initOK = fMonitorAlgo->InitContainers();
    
      return initOK;
    }
    
    bool CbmDeviceMonitorBmon::InitHistograms()
    {
      /// Histos creation and obtain pointer on them
      /// Trigger histo creation on all associated algos
      bool initOK = fMonitorAlgo->CreateHistograms();
    
      /// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
      std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
      /// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
      std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
    
      /// Add pointers to each histo in the histo array
      /// Create histo config vector
      /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
      ///      and send it through a separate channel using the BoostSerializer
      for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
        //         LOG(info) << "Registering  " << vHistos[ uHisto ].first->GetName()
        //                   << " in " << vHistos[ uHisto ].second.data()
        //                   ;
        fArrayHisto.Add(vHistos[uHisto].first);
        std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
        fvpsHistosFolder.push_back(psHistoConfig);
    
        LOG(info) << "Config of hist  " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
      }  // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
    
      /// Create canvas config vector
      /// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
      ///      and send it through a separate channel using the BoostSerializer
      for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
        //         LOG(info) << "Registering  " << vCanvases[ uCanv ].first->GetName()
        //                   << " in " << vCanvases[ uCanv ].second.data();
        std::string sCanvName = (vCanvases[uCanv].first)->GetName();
        std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
    
        std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
    
        fvpsCanvasConfig.push_back(psCanvConfig);
    
        LOG(info) << "Config string of Canvas  " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
      }  //  for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
    
      return initOK;
    }
    
    
    // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
    bool CbmDeviceMonitorBmon::HandleData(FairMQMessagePtr& msg, int /*index*/)
    {
    
      if (0 == fulNumMessages) {
        try {
          InitContainers();
        }
        catch (InitTaskError& e) {
          LOG(error) << e.what();
          ChangeState(fair::mq::Transition::ErrorFound);
        }
      }  // if( 0 == fulNumMessages)
    
      if (0 == fulNumMessages) InitHistograms();
    
      fulNumMessages++;
      LOG(debug) << "Received message number " << fulNumMessages << " with size " << msg->GetSize();
    
      if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
    
      std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
      std::istringstream iss(msgStr);
      boost::archive::binary_iarchive inputArchive(iss);
    
      /// Create an empty TS and fill it with the incoming message
      fles::StorableTimeslice component {0};
      inputArchive >> component;
    
      /// Process the Timeslice
      DoUnpack(component, 0);
    
      /// Send histograms each 100 time slices. Should be each ~1s
      /// Use also runtime checker to trigger sending after M s if
      /// processing too slow or delay sending if processing too fast
      std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
      std::chrono::duration<double_t> elapsedSeconds    = currentTime - fLastPublishTime;
      if ((fdMaxPublishTime < elapsedSeconds.count())
          || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
        if (!fbConfigSent) {
          // Send the configuration only once per run!
          fbConfigSent = SendHistoConfAndData();
        }  // if( !fbConfigSent )
        else
          SendHistograms();
    
        fLastPublishTime = std::chrono::system_clock::now();
      }  // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
    
      return true;
    }
    
    bool CbmDeviceMonitorBmon::SendHistoConfAndData()
    {
      /// Prepare multiparts message and header
      std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
      FairMQMessagePtr messageHeader(NewMessage());
      //  Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
      BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
    
      FairMQParts partsOut;
      partsOut.AddPart(std::move(messageHeader));
    
      for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
        /// Serialize the vector of histo config into a single MQ message
        FairMQMessagePtr messageHist(NewMessage());
        //    Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
        BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
    
        partsOut.AddPart(std::move(messageHist));
      }  // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
    
      for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
        /// Serialize the vector of canvas config into a single MQ message
        FairMQMessagePtr messageCan(NewMessage());
        //    Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
        BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
    
        partsOut.AddPart(std::move(messageCan));
      }  // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
    
      /// Serialize the array of histos into a single MQ message
      FairMQMessagePtr msgHistos(NewMessage());
      //  Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
      RootSerializer().Serialize(*msgHistos, &fArrayHisto);
    
      partsOut.AddPart(std::move(msgHistos));
    
      /// Send the multi-parts message to the common histogram messages queue
      if (Send(partsOut, fsChannelNameHistosInput) < 0) {
        LOG(error) << "CbmDeviceMonitorBmon::SendHistoConfAndData => Problem sending data";
        return false;
      }  // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
    
      /// Reset the histograms after sending them (but do not reset the time)
      fMonitorAlgo->ResetHistograms(kFALSE);
    
      return true;
    }
    
    bool CbmDeviceMonitorBmon::SendHistograms()
    {
      /// Serialize the array of histos into a single MQ message
      FairMQMessagePtr message(NewMessage());
      //  Serialize<RootSerializer>(*message, &fArrayHisto);
      RootSerializer().Serialize(*message, &fArrayHisto);
    
      /// Send message to the common histogram messages queue
      if (Send(message, fsChannelNameHistosInput) < 0) {
        LOG(error) << "Problem sending data";
        return false;
      }  // if( Send( message, fsChannelNameHistosInput ) < 0 )
    
      /// Reset the histograms after sending them (but do not reset the time)
      fMonitorAlgo->ResetHistograms(kFALSE);
    
      return true;
    }
    
    
    CbmDeviceMonitorBmon::~CbmDeviceMonitorBmon() {}
    
    
    Bool_t CbmDeviceMonitorBmon::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
    {
      fulTsCounter++;
    
      if (kFALSE == fbComponentsAddedToList) {
        for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
          if (kusSysId == ts.descriptor(uCompIdx, 0).sys_id) {
            fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysId);
          }  // if( kusSysId == ts.descriptor( uCompIdx, 0 ).sys_id )
        }    // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
        fbComponentsAddedToList = kTRUE;
      }  // if( kFALSE == fbComponentsAddedToList )
    
      if (kFALSE == fMonitorAlgo->ProcessTs(ts)) {
        LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class";
        return kTRUE;
      }  // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
    
      /// Clear the digis vector in case it was filled
      fMonitorAlgo->ClearVector();
    
      if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";
    
      return kTRUE;
    }
    
    void CbmDeviceMonitorBmon::Finish() {}