Skip to content
Snippets Groups Projects
CbmMqHistoServer.cxx 20.00 KiB
/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
   SPDX-License-Identifier: GPL-3.0-only
   Authors: Pierre-Alain Loizeau [committer] */

#include "CbmMqHistoServer.h"

#include "CbmFlesCanvasTools.h"

#include "FairMQProgOptions.h"  // device->fConfig
#include <Logger.h>

#include "TCanvas.h"
#include "TEnv.h"
#include "TFile.h"
#include "TH1.h"
#include "TH2.h"
#include "THttpServer.h"
#include "TMessage.h"
#include "TObjArray.h"
#include "TProfile.h"
#include "TRootSniffer.h"
#include "TSystem.h"

#include "BoostSerializer.h"
#include <boost/serialization/utility.hpp>

#include <mutex>

#include "RootSerializer.h"

std::mutex mtx;
/*
Bool_t bMqHistoServerResetHistos = kFALSE;
Bool_t bMqHistoServerSaveHistos  = kFALSE;
*/
CbmMqHistoServer::CbmMqHistoServer()
  : FairMQDevice()
  , fArrayHisto()
{
}

CbmMqHistoServer::~CbmMqHistoServer() {}

void CbmMqHistoServer::InitTask()
{
  /// Read options from executable
  LOG(info) << "Init options for CbmMqHistoServer.";
  fsChannelNameHistosInput  = fConfig->GetValue<std::string>("ChNameIn");
  fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
  fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
  fsHistoFileName           = fConfig->GetValue<std::string>("HistoFileName");
  fuHttpServerPort          = fConfig->GetValue<uint32_t>("histport");

  /// Link channels to methods in order to process received messages
  OnData(fsChannelNameHistosInput, &CbmMqHistoServer::ReceiveData);
  OnData(fsChannelNameHistosConfig, &CbmMqHistoServer::ReceiveHistoConfig);
  OnData(fsChannelNameCanvasConfig, &CbmMqHistoServer::ReceiveCanvasConfig);
  /// If multi-parts, go to method processing combined Config+Data
  OnData(fsChannelNameHistosInput, &CbmMqHistoServer::ReceiveConfigAndData);

  fServer = new THttpServer(Form("http:%u", fuHttpServerPort));
  /// To avoid the server sucking all Histos from gROOT when no output file is used
  fServer->GetSniffer()->SetScanGlobalDir(kFALSE);
  const char* jsrootsys = gSystem->Getenv("JSROOTSYS");
  if (!jsrootsys) jsrootsys = gEnv->GetValue("HttpServ.JSRootPath", jsrootsys);

  LOG(info) << "JSROOT location: " << jsrootsys;

  //fServer->RegisterCommand("/Reset_Hist", "bMqHistoServerResetHistos=kTRUE");
  //fServer->RegisterCommand("/Save_Hist", "bMqHistoServerSaveHistos=kTRUE");

  //fServer->Restrict("/Reset_Hist", "allow=admin");
  //fServer->Restrict("/Save_Hist", "allow=admin");
}

bool CbmMqHistoServer::ReceiveData(FairMQMessagePtr& msg, int /*index*/)
{
  LOG(debug) << "CbmMqHistoServer::ReceiveData => Processing histograms update";
  TObject* tempObject = nullptr;

  //  Deserialize<RootSerializer>(*msg, tempObject);
  RootSerializer().Deserialize(*msg, tempObject);

  if (TString(tempObject->ClassName()).EqualTo("TObjArray")) {
    std::lock_guard<std::mutex> lk(mtx);
    TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
    for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) {
      TObject* pObj = arrayHisto->At(i);

      if (nullptr != dynamic_cast<TProfile*>(pObj)) {
        if (!ReadHistogram<TProfile>(dynamic_cast<TProfile*>(pObj))) return false;
      }  // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
      else if (nullptr != dynamic_cast<TH2*>(pObj)) {
        if (!ReadHistogram<TH2>(dynamic_cast<TH2*>(pObj))) return false;
      }  // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
      else if (nullptr != dynamic_cast<TH1*>(pObj)) {
        if (!ReadHistogram<TH1>(dynamic_cast<TH1*>(pObj))) return false;
      }  // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
      else
        LOG(warning) << "Unsupported object type for " << pObj->GetName();
    }  // for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++)

    LOG(debug) << "CbmMqHistoServer::ReceiveData => Deleting array";
    /// Need to use Delete instead of Clear to avoid memory leak!!!
    arrayHisto->Delete();

    /// If new histos received, try to prepare as many canvases as possible
    /// Should be expensive on start and cheap afterward
    if (!fbAllCanvasReady) {
      LOG(debug) << "CbmMqHistoServer::ReceiveData => Checking for canvases updates";
      for (uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
        /// Jump canvases already ready
        if (fvbCanvasReady[uCanv]) continue;

        /// Now come the expensive part as we unpack its config and check each histo
        fvbCanvasReady[uCanv] = PrepareCanvas(uCanv);
      }  // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
    }    // if( !fbAllCanvasReady )
  }      // if (TString(tempObject->ClassName()).EqualTo("TObjArray"))
  else
    LOG(fatal) << "CbmMqHistoServer::ReceiveData => Wrong object type at input: " << tempObject->ClassName();

  fNMessages += 1;

  if (nullptr != tempObject) delete tempObject;
  /*
  /// TODO: control flags communication with histo server
  /// Idea: 1 req channel (per device or not mixup?), polling every N TS and/or M s
  if (bMqHistoServerResetHistos) {
    std::lock_guard<std::mutex> lk(mtx);
    //      LOG(info) << "Reset Monitor histos ";
    ResetHistograms();
    bMqHistoServerResetHistos = kFALSE;
  }  // if( bMqHistoServerResetHistos )

  if (bMqHistoServerSaveHistos) {
    std::lock_guard<std::mutex> lk(mtx);
    //      LOG(info) << "Save All histos & canvases";
    SaveHistograms();
    bMqHistoServerSaveHistos = kFALSE;
  }  // if( bMqHistoServerSaveHistos )
*/
  LOG(debug) << "CbmMqHistoServer::ReceiveData => Finished processing histograms update";

  return true;
}

bool CbmMqHistoServer::ReceiveHistoConfig(FairMQMessagePtr& msg, int /*index*/)
{
  std::pair<std::string, std::string> tempObject;

  //  Deserialize<BoostSerializer<std::pair<std::string, std::string>>>(*msg, tempObject);
  BoostSerializer<std::pair<std::string, std::string>>().Deserialize(*msg, tempObject);


  LOG(info) << " Received configuration for histo " << tempObject.first << " : " << tempObject.second;

  /// Check if histo name already received in previous messages
  /// Linear search should be ok as config is shared only at startup
  UInt_t uPrevHist = 0;
  for (uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist) {
    if (fvpsHistosFolder[uPrevHist].first == tempObject.first) break;
  }  // for( UInt_t uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist )

  if (uPrevHist < fvpsHistosFolder.size()) {
    LOG(info) << " Ignored new configuration for histo " << tempObject.first
              << " due to previously received one: " << tempObject.second;
    /// Not sure if we should return false here...
  }  // if( uPrevHist < fvpsHistosFolder.size() )
  else {
    fvpsHistosFolder.push_back(tempObject);
    fvHistos.push_back(std::pair<TNamed*, std::string>(nullptr, ""));
    fvbHistoRegistered.push_back(false);
    fbAllHistosRegistered = false;
  }  // else of if( uPrevHist < fvpsHistosFolder.size() )

  return true;
}

bool CbmMqHistoServer::ReceiveCanvasConfig(FairMQMessagePtr& msg, int /*index*/)
{
  std::pair<std::string, std::string> tempObject;

  //  Deserialize<BoostSerializer<std::pair<std::string, std::string>>>(*msg, tempObject);
  BoostSerializer<std::pair<std::string, std::string>>().Deserialize(*msg, tempObject);

  LOG(info) << " Received configuration for canvas " << tempObject.first << " : " << tempObject.second;

  /// Check if canvas name already received in previous messages
  /// Linear search should be ok as config is shared only at startup
  uint32_t uPrevCanv = 0;
  for (uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv) {
    if (fvpsCanvasConfig[uPrevCanv].first == tempObject.first) break;
  }  // for( UInt_t uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv )

  if (uPrevCanv < fvpsCanvasConfig.size()) {
    LOG(warning) << " Ignored new configuration for Canvas " << tempObject.first
                 << " due to previously received one: " << tempObject.second;
    /// Not sure if we should return false here...
  }  // if( uPrevCanv < fvpsCanvasConfig.size() )
  else {
    fvpsCanvasConfig.push_back(tempObject);
    fvbCanvasReady.push_back(false);
    fbAllCanvasReady = false;

    fvCanvas.push_back(std::pair<TCanvas*, std::string>(nullptr, ""));
    fvbCanvasRegistered.push_back(false);
    fbAllCanvasRegistered = false;
  }  // else of if( uPrevCanv < fvpsCanvasConfig.size() )
  return true;
}

bool CbmMqHistoServer::ReceiveConfigAndData(FairMQParts& parts, int /*index*/)
{
  /// Reject anything but a at least Header + Histo Config + Canvas Config + Histo Data
  if (parts.Size() < 4) {
    if (1 == parts.Size()) {
      /// PAL, 09/04/2021, Debug message catching missed method overload/polymorphism:
      /// contrary to my expectation, if 2 method bound to same channel, one with FairMQMessagePtr and one with
      /// FairMQParts, all messages go to multipart version and  FairMQMessagePtr is converted to size 1 FairMQParts
      LOG(debug) << "CbmMqHistoServer::ReceiveConfigAndData => only 1 parts found in input, "
                 << "assuming data only message routed to wrong method!";
      return ReceiveData(parts.At(0), 0);
    }  // if( 1 == parts.Size() )
    LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => Wrong number of parts: " << parts.Size()
               << " instead of at least 4 (Header + Histo Config + Canvas config + Data)!";
  }  // if( parts.Size() < 4 )

  LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Received composed message with " << parts.Size() << " parts";

  /// Header contains a pair of
  std::pair<uint32_t, uint32_t> pairHeader;
  //  Deserialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*parts.At(0), pairHeader);
  BoostSerializer<std::pair<uint32_t, uint32_t>>().Deserialize(*parts.At(0), pairHeader);

  LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Received configuration for " << pairHeader.first
            << " histos and " << pairHeader.second << " canvases";

  uint32_t uOffsetHistoConfig = pairHeader.first;
  if (0 == pairHeader.first) {
    uOffsetHistoConfig = 1;
    if (0 < (parts.At(uOffsetHistoConfig))->GetSize()) {
      LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => No histo config expected but corresponding message is"
                 << " not empty: " << (parts.At(uOffsetHistoConfig))->GetSize();
    }
  }

  uint32_t uOffsetCanvasConfig = pairHeader.second;
  if (0 == pairHeader.second) {
    uOffsetCanvasConfig = 1;
    if (0 < (parts.At(uOffsetHistoConfig + uOffsetCanvasConfig))->GetSize()) {
      LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => No Canvas config expected but corresponding message is"
                 << " not empty: " << (parts.At(uOffsetHistoConfig + uOffsetCanvasConfig))->GetSize();
    }
  }

  if (static_cast<size_t>(parts.Size()) != 1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1) {
    LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => Number of parts not matching header: " << parts.Size()
               << " instead of " << 1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1;
  }  // if( parts.Size() != 1 + pairHeader.first + pairHeader.second )

  /// Decode parts for histograms configuration
  for (uint32_t uHisto = 0; uHisto < pairHeader.first; ++uHisto) {
    ReceiveHistoConfig(parts.At(1 + uHisto), 0);
  }  // for (UInt_t uHisto = 0; uHisto < pairHeader.first; ++uHisto)

  /// Decode parts for histograms configuration
  for (uint32_t uCanv = 0; uCanv < pairHeader.second; ++uCanv) {
    ReceiveCanvasConfig(parts.At(1 + uOffsetHistoConfig + uCanv), 0);
  }  // for (UInt_t uCanv = 0; uCanv < pairHeader.second; ++uCanv)

  /// Decode the histograms data now that the configuration is loaded
  ReceiveData(parts.At(1 + uOffsetHistoConfig + uOffsetCanvasConfig), 0);

  LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Finished processing  composed message with " << parts.Size()
            << " parts";

  return true;
}
void CbmMqHistoServer::PreRun()
{
  fStopThread = false;
  fThread     = std::thread(&CbmMqHistoServer::UpdateHttpServer, this);
}

void CbmMqHistoServer::UpdateHttpServer()
{
  while (!fStopThread) {
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
    std::lock_guard<std::mutex> lk(mtx);

    fServer->ProcessRequests();
  }
}

void CbmMqHistoServer::PostRun()
{
  SaveHistograms();
  fStopThread = true;
  fThread.join();
  SaveHistograms();
}

template<class HistoT>
bool CbmMqHistoServer::ReadHistogram(HistoT* pHist)
{
  int index1 = FindHistogram(pHist->GetName());
  if (-1 == index1) {
    HistoT* histogram_new = static_cast<HistoT*>(pHist->Clone());
    fArrayHisto.Add(histogram_new);

    LOG(info) << "Received new histo " << pHist->GetName();

    /// If new histo received, try to register it if configuration available
    if (!fbAllHistosRegistered) {
      for (uint32_t uHist = 0; uHist < fvpsHistosFolder.size(); ++uHist) {
        /// Jump histos already ready
        if (fvbHistoRegistered[uHist]) continue;

        /// Check if name matches one in config for others
        if (fvpsHistosFolder[uHist].first == histogram_new->GetName()) {
          fvHistos[uHist] = std::pair<TNamed*, std::string>(histogram_new, fvpsHistosFolder[uHist].second);
          fServer->Register(Form("/%s", fvHistos[uHist].second.data()), fvHistos[uHist].first);
          fvbHistoRegistered[uHist] = true;

          LOG(info) << "registered histo " << fvHistos[uHist].first->GetName() << " in folder "
                    << fvHistos[uHist].second;


          /// Update flag telling whether all known histos are registered
          fbAllHistosRegistered = true;
          for (uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx) {
            if (!fvbHistoRegistered[uIdx]) {
              fbAllHistosRegistered = false;
              break;
            }  // if( !fvbHistoRegistered[ uIdx ] )
          }    // for( uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx )

          break;
        }  // if( fvpsHistosFolder[ uHist ].first == histogram_new->GetName() )
      }    // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
    }      // if( !fbAllCanvasReady )
  }        // if (-1 == index1)
  else {
    HistoT* histogram_existing = dynamic_cast<HistoT*>(fArrayHisto.At(index1));
    if (nullptr == histogram_existing) {
      LOG(error) << "CbmMqHistoServer::ReadHistogram => "
                 << "Incompatible type found during update for histo " << pHist->GetName();
      return false;
    }  // if( nullptr == histogram_existing )

    histogram_existing->Add(pHist);
  }  // else of if (-1 == index1)
  return true;
}

int CbmMqHistoServer::FindHistogram(const std::string& name)
{
  for (int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist) {
    TObject* obj = fArrayHisto.At(iHist);
    if (TString(obj->GetName()).EqualTo(name)) { return iHist; }  // if( TString( obj->GetName() ).EqualTo( name ) )
  }  // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
  return -1;
}

bool CbmMqHistoServer::ResetHistograms()
{
  for (int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist) {
    dynamic_cast<TH1*>(fArrayHisto.At(iHist))->Reset();
  }  // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
  return true;
}
bool CbmMqHistoServer::PrepareCanvas(uint32_t uCanvIdx)
{
  LOG(debug) << " Extracting configuration for canvas index " << uCanvIdx;
  CanvasConfig conf(ExtractCanvasConfigFromString(fvpsCanvasConfig[uCanvIdx].second));

  /// First check if all objects to be drawn are present
  uint32_t uNbPads(conf.GetNbPads());
  for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
    uint32_t uNbObj(conf.GetNbObjsInPad(uPadIdx));
    for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
      std::string sName(conf.GetObjName(uPadIdx, uObjIdx));
      /// Check for empty pads!
      if ("nullptr" != sName) {
        if (FindHistogram(sName) < 0) {
          return false;
        }  // if( FindHistogram( conf.GetObjName( uPadIdx, uObjIdx ) ) < 0 )
      }    // if( "nullptr" != sName )
    }      // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
  }        // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )

  LOG(info) << " All histos found for canvas " << conf.GetName().data() << ", now preparing it";

  /// Create new canvas and pads
  TCanvas* pNewCanv = new TCanvas(conf.GetName().data(), conf.GetTitle().data());
  pNewCanv->Divide(conf.GetNbPadsX(), conf.GetNbPadsY());

  /// Loop on pads
  for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
    pNewCanv->cd(1 + uPadIdx);

    /// Pad settings
    gPad->SetGrid(conf.GetGridx(uPadIdx), conf.GetGridy(uPadIdx));
    gPad->SetLogx(conf.GetLogx(uPadIdx));
    gPad->SetLogy(conf.GetLogy(uPadIdx));
    gPad->SetLogz(conf.GetLogz(uPadIdx));

    /// Add objects (we know they are there
    uint32_t uNbObj(conf.GetNbObjsInPad(uPadIdx));
    for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
      std::string sName(conf.GetObjName(uPadIdx, uObjIdx));
      if ("nullptr" != sName) {
        TObject* pObj = fArrayHisto[FindHistogram(sName)];

        if (nullptr != dynamic_cast<TProfile*>(pObj)) {
          dynamic_cast<TProfile*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
        }  // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
        else if (nullptr != dynamic_cast<TH2*>(pObj)) {
          dynamic_cast<TH2*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
        }  // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
        else if (nullptr != dynamic_cast<TH1*>(pObj)) {
          dynamic_cast<TH1*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
        }  // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
        else
          LOG(warning) << "  Unsupported object type for " << sName << " when preparing canvas " << conf.GetName();

        LOG(info) << "  Configured histo " << sName << " on pad " << (1 + uPadIdx) << " for canvas "
                  << conf.GetName().data();
      }  // if( "nullptr" != sName )
    }    // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
  }      // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )

  fvCanvas[uCanvIdx] = std::pair<TCanvas*, std::string>(pNewCanv, "canvases");
  fServer->Register(Form("/%s", fvCanvas[uCanvIdx].second.data()), fvCanvas[uCanvIdx].first);
  fvbCanvasRegistered[uCanvIdx] = true;

  LOG(info) << " Registered canvas " << fvCanvas[uCanvIdx].first->GetName() << " in folder "
            << fvCanvas[uCanvIdx].second;

  /// Update flag telling whether all known canvases are registered
  fbAllCanvasRegistered = true;
  for (uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx) {
    if (!fvbCanvasRegistered[uIdx]) {
      fbAllCanvasRegistered = false;
      break;
    }  // if( !fvbCanvasRegistered[ uIdx ] )
  }    // for( uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx )

  return true;
}

bool CbmMqHistoServer::SaveHistograms()
{
  /// Save old global file and folder pointer to avoid messing with FairRoot
  TFile* oldFile     = gFile;
  TDirectory* oldDir = gDirectory;

  /// (Re-)Create ROOT file to store the histos
  TFile* histoFile = nullptr;

  // open separate histo file in recreate mode
  histoFile = new TFile(fsHistoFileName.data(), "RECREATE");

  LOG(info) << "Save Histos in file " << fsHistoFileName.data();

  if (nullptr == histoFile) return false;

  /// Register the histos in the HTTP server
  for (UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto) {
    /// Make sure we end up in chosen folder
    TString sFolder = fvHistos[uHisto].second.data();
    if (nullptr == gDirectory->Get(sFolder)) gDirectory->mkdir(sFolder);
    gDirectory->cd(sFolder);

    /// Write plot
    fvHistos[uHisto].first->Write();

    histoFile->cd();
  }  // for( UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto )

  for (UInt_t uCanvas = 0; uCanvas < fvCanvas.size(); ++uCanvas) {
    /// Make sure we end up in chosen folder
    TString sFolder = fvCanvas[uCanvas].second.data();
    if (nullptr == gDirectory->Get(sFolder)) gDirectory->mkdir(sFolder);
    gDirectory->cd(sFolder);

    /// Write plot
    fvCanvas[uCanvas].first->Write();

    histoFile->cd();
  }  // for( UInt_t uHisto = 0; uHisto < fvCanvas.size(); ++uHisto )

  /// Restore old global file and folder pointer to avoid messing with FairRoot
  gFile      = oldFile;
  gDirectory = oldDir;

  histoFile->Close();

  return true;
}