-
The templated functions Deserialize and Serialize of the FairMQ Device are marked deprecated. These templates acctually only forward the call to the BoostSerializer and RootSerializer classes which are part of FairRoot. Call these classes directly to remove deprectaion warnings.
The templated functions Deserialize and Serialize of the FairMQ Device are marked deprecated. These templates acctually only forward the call to the BoostSerializer and RootSerializer classes which are part of FairRoot. Call these classes directly to remove deprectaion warnings.
CbmMqHistoServer.cxx 18.67 KiB
/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmMqHistoServer.h"
#include "CbmFlesCanvasTools.h"
#include "FairMQProgOptions.h" // device->fConfig
#include <Logger.h>
#include "TCanvas.h"
#include "TEnv.h"
#include "TFile.h"
#include "TH1.h"
#include "TH2.h"
#include "THttpServer.h"
#include "TMessage.h"
#include "TObjArray.h"
#include "TProfile.h"
#include "TRootSniffer.h"
#include "TSystem.h"
#include "BoostSerializer.h"
#include <boost/serialization/utility.hpp>
#include <mutex>
#include "RootSerializer.h"
std::mutex mtx;
/*
Bool_t bMqHistoServerResetHistos = kFALSE;
Bool_t bMqHistoServerSaveHistos = kFALSE;
*/
CbmMqHistoServer::CbmMqHistoServer()
: FairMQDevice()
, fsChannelNameHistosInput("histogram-in")
, fsChannelNameHistosConfig("histo-conf")
, fsChannelNameCanvasConfig("canvas-conf")
, fsHistoFileName("HistosMonitorPulser.root")
, fuHttpServerPort(8098)
, fArrayHisto()
, fvpsHistosFolder()
, fvpsCanvasConfig()
, fvHistos()
, fvCanvas()
, fNMessages(0)
, fServer(nullptr)
, fStopThread(false)
{
}
CbmMqHistoServer::~CbmMqHistoServer() {}
void CbmMqHistoServer::InitTask()
{
/// Read options from executable
LOG(info) << "Init options for CbmMqHistoServer.";
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
fsHistoFileName = fConfig->GetValue<std::string>("HistoFileName");
fuHttpServerPort = fConfig->GetValue<uint32_t>("histport");
/// Link channels to methods in order to process received messages
OnData(fsChannelNameHistosInput, &CbmMqHistoServer::ReceiveData);
OnData(fsChannelNameHistosConfig, &CbmMqHistoServer::ReceiveHistoConfig);
OnData(fsChannelNameCanvasConfig, &CbmMqHistoServer::ReceiveCanvasConfig);
/// If multi-parts, go to method processing combined Config+Data
OnData(fsChannelNameHistosInput, &CbmMqHistoServer::ReceiveConfigAndData);
fServer = new THttpServer(Form("http:%u", fuHttpServerPort));
/// To avoid the server sucking all Histos from gROOT when no output file is used
fServer->GetSniffer()->SetScanGlobalDir(kFALSE);
const char* jsrootsys = gSystem->Getenv("JSROOTSYS");
if (!jsrootsys) jsrootsys = gEnv->GetValue("HttpServ.JSRootPath", jsrootsys);
LOG(info) << "JSROOT location: " << jsrootsys;
//fServer->RegisterCommand("/Reset_Hist", "bMqHistoServerResetHistos=kTRUE");
//fServer->RegisterCommand("/Save_Hist", "bMqHistoServerSaveHistos=kTRUE");
//fServer->Restrict("/Reset_Hist", "allow=admin");
//fServer->Restrict("/Save_Hist", "allow=admin");
}
bool CbmMqHistoServer::ReceiveData(FairMQMessagePtr& msg, int /*index*/)
{
TObject* tempObject = nullptr;
// Deserialize<RootSerializer>(*msg, tempObject);
RootSerializer().Deserialize(*msg, tempObject);
if (TString(tempObject->ClassName()).EqualTo("TObjArray")) {
std::lock_guard<std::mutex> lk(mtx);
TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) {
TObject* pObj = arrayHisto->At(i);
if (nullptr != dynamic_cast<TProfile*>(pObj)) {
if (!ReadHistogram<TProfile>(dynamic_cast<TProfile*>(pObj))) return false;
} // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
else if (nullptr != dynamic_cast<TH2*>(pObj)) {
if (!ReadHistogram<TH2>(dynamic_cast<TH2*>(pObj))) return false;
} // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
else if (nullptr != dynamic_cast<TH1*>(pObj)) {
if (!ReadHistogram<TH1>(dynamic_cast<TH1*>(pObj))) return false;
} // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
else
LOG(warning) << "Unsupported object type for " << pObj->GetName();
} // for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++)
/// Need to use Delete instead of Clear to avoid memory leak!!!
arrayHisto->Delete();
/// If new histos received, try to prepare as many canvases as possible
/// Should be expensive on start and cheap afterward
if (!fbAllCanvasReady) {
for (uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
/// Jump canvases already ready
if (fvbCanvasReady[uCanv]) continue;
/// Now come the expensive part as we unpack its config and check each histo
fvbCanvasReady[uCanv] = PrepareCanvas(uCanv);
} // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
} // if( !fbAllCanvasReady )
} // if (TString(tempObject->ClassName()).EqualTo("TObjArray"))
else
LOG(fatal) << "CbmMqHistoServer::ReceiveData => Wrong object type at input: " << tempObject->ClassName();
fNMessages += 1;
if (nullptr != tempObject) delete tempObject;
/*
/// TODO: control flags communication with histo server
/// Idea: 1 req channel (per device or not mixup?), polling every N TS and/or M s
if (bMqHistoServerResetHistos) {
std::lock_guard<std::mutex> lk(mtx);
// LOG(info) << "Reset Monitor histos ";
ResetHistograms();
bMqHistoServerResetHistos = kFALSE;
} // if( bMqHistoServerResetHistos )
if (bMqHistoServerSaveHistos) {
std::lock_guard<std::mutex> lk(mtx);
// LOG(info) << "Save All histos & canvases";
SaveHistograms();
bMqHistoServerSaveHistos = kFALSE;
} // if( bMqHistoServerSaveHistos )
*/
return true;
}
bool CbmMqHistoServer::ReceiveHistoConfig(FairMQMessagePtr& msg, int /*index*/)
{
std::pair<std::string, std::string> tempObject;
// Deserialize<BoostSerializer<std::pair<std::string, std::string>>>(*msg, tempObject);
BoostSerializer<std::pair<std::string, std::string>>().Deserialize(*msg, tempObject);
LOG(info) << " Received configuration for histo " << tempObject.first << " : " << tempObject.second;
/// Check if histo name already received in previous messages
/// Linear search should be ok as config is shared only at startup
UInt_t uPrevHist = 0;
for (uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist) {
if (fvpsHistosFolder[uPrevHist].first == tempObject.first) break;
} // for( UInt_t uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist )
if (uPrevHist < fvpsHistosFolder.size()) {
LOG(info) << " Ignored new configuration for histo " << tempObject.first
<< " due to previously received one: " << tempObject.second;
/// Not sure if we should return false here...
} // if( uPrevHist < fvpsHistosFolder.size() )
else {
fvpsHistosFolder.push_back(tempObject);
fvHistos.push_back(std::pair<TNamed*, std::string>(nullptr, ""));
fvbHistoRegistered.push_back(false);
fbAllHistosRegistered = false;
} // else of if( uPrevHist < fvpsHistosFolder.size() )
return true;
}
bool CbmMqHistoServer::ReceiveCanvasConfig(FairMQMessagePtr& msg, int /*index*/)
{
std::pair<std::string, std::string> tempObject;
// Deserialize<BoostSerializer<std::pair<std::string, std::string>>>(*msg, tempObject);
BoostSerializer<std::pair<std::string, std::string>>().Deserialize(*msg, tempObject);
LOG(info) << " Received configuration for canvas " << tempObject.first << " : " << tempObject.second;
/// Check if canvas name already received in previous messages
/// Linear search should be ok as config is shared only at startup
uint32_t uPrevCanv = 0;
for (uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv) {
if (fvpsCanvasConfig[uPrevCanv].first == tempObject.first) break;
} // for( UInt_t uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv )
if (uPrevCanv < fvpsCanvasConfig.size()) {
LOG(warning) << " Ignored new configuration for histo " << tempObject.first
<< " due to previously received one: " << tempObject.second;
/// Not sure if we should return false here...
} // if( uPrevCanv < fvpsCanvasConfig.size() )
else {
fvpsCanvasConfig.push_back(tempObject);
fvbCanvasReady.push_back(false);
fbAllCanvasReady = false;
fvCanvas.push_back(std::pair<TCanvas*, std::string>(nullptr, ""));
fvbCanvasRegistered.push_back(false);
fbAllCanvasRegistered = false;
} // else of if( uPrevCanv < fvpsCanvasConfig.size() )
return true;
}
bool CbmMqHistoServer::ReceiveConfigAndData(FairMQParts& parts, int /*index*/)
{
/// Reject anything but a at least Header + Histo Config + Canvas Config + Histo Data
if (parts.Size() < 4) {
if (1 == parts.Size()) {
/// PAL, 09/04/2021, Debug message catching missed method overlad/polymorphism:
/// contrary to my expectation, if 2 method bound to same channel, one with FairMQMessagePtr and one with
/// FairMQParts, all messages go to multipart version and FairMQMessagePtr is converted to size 1 FairMQParts
LOG(debug) << "CbmMqHistoServer::ReceiveConfigAndData => only 1 parts found in input, "
<< "assuming data only message routed to wrong method!";
return ReceiveData(parts.At(0), 0);
} // if( 1 == parts.Size() )
LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => Wrong number of parts: " << parts.Size()
<< " instead of at least 4 (Header + Histo Config + Canvas config + Data)!";
} // if( parts.Size() < 4 )
LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Received composed message with " << parts.Size() << " parts";
/// Header contains a pair of
std::pair<uint32_t, uint32_t> pairHeader;
// Deserialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*parts.At(0), pairHeader);
BoostSerializer<std::pair<uint32_t, uint32_t>>().Deserialize(*parts.At(0), pairHeader);
LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Received configuration for " << pairHeader.first
<< " histos and " << pairHeader.second << " canvases";
if (static_cast<size_t>(parts.Size()) != 1 + pairHeader.first + pairHeader.second + 1) {
LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => Number of parts not matching header: " << parts.Size()
<< " instead of " << 1 + pairHeader.first + pairHeader.second + 1;
} // if( parts.Size() != 1 + pairHeader.first + pairHeader.second )
/// Decode parts for histograms configuration
for (uint32_t uHisto = 0; uHisto < pairHeader.first; ++uHisto) {
ReceiveHistoConfig(parts.At(1 + uHisto), 0);
} // for (UInt_t uHisto = 0; uHisto < pairHeader.first; ++uHisto)
/// Decode parts for histograms configuration
for (uint32_t uCanv = 0; uCanv < pairHeader.second; ++uCanv) {
ReceiveCanvasConfig(parts.At(1 + pairHeader.first + uCanv), 0);
} // for (UInt_t uCanv = 0; uCanv < pairHeader.second; ++uCanv)
/// Decode the histograms data now that the configuration is loaded
ReceiveData(parts.At(1 + pairHeader.first + pairHeader.second), 0);
return true;
}
void CbmMqHistoServer::PreRun()
{
fStopThread = false;
fThread = std::thread(&CbmMqHistoServer::UpdateHttpServer, this);
}
void CbmMqHistoServer::UpdateHttpServer()
{
while (!fStopThread) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lk(mtx);
fServer->ProcessRequests();
}
}
void CbmMqHistoServer::PostRun()
{
fStopThread = true;
fThread.join();
}
template<class HistoT>
bool CbmMqHistoServer::ReadHistogram(HistoT* pHist)
{
int index1 = FindHistogram(pHist->GetName());
if (-1 == index1) {
HistoT* histogram_new = static_cast<HistoT*>(pHist->Clone());
fArrayHisto.Add(histogram_new);
LOG(info) << "Received new histo " << pHist->GetName();
/// If new histo received, try to register it if configuration available
if (!fbAllHistosRegistered) {
for (uint32_t uHist = 0; uHist < fvpsHistosFolder.size(); ++uHist) {
/// Jump histos already ready
if (fvbHistoRegistered[uHist]) continue;
/// Check if name matches one in config for others
if (fvpsHistosFolder[uHist].first == histogram_new->GetName()) {
fvHistos[uHist] = std::pair<TNamed*, std::string>(histogram_new, fvpsHistosFolder[uHist].second);
fServer->Register(Form("/%s", fvHistos[uHist].second.data()), fvHistos[uHist].first);
fvbHistoRegistered[uHist] = true;
LOG(info) << "registered histo " << fvHistos[uHist].first->GetName() << " in folder "
<< fvHistos[uHist].second;
/// Update flag telling whether all known histos are registered
fbAllHistosRegistered = true;
for (uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx) {
if (!fvbHistoRegistered[uIdx]) {
fbAllHistosRegistered = false;
break;
} // if( !fvbHistoRegistered[ uIdx ] )
} // for( uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx )
break;
} // if( fvpsHistosFolder[ uHist ].first == histogram_new->GetName() )
} // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
} // if( !fbAllCanvasReady )
} // if (-1 == index1)
else {
HistoT* histogram_existing = dynamic_cast<HistoT*>(fArrayHisto.At(index1));
if (nullptr == histogram_existing) {
LOG(error) << "CbmMqHistoServer::ReadHistogram => "
<< "Incompatible type found during update for histo " << pHist->GetName();
return false;
} // if( nullptr == histogram_existing )
histogram_existing->Add(pHist);
} // else of if (-1 == index1)
return true;
}
int CbmMqHistoServer::FindHistogram(const std::string& name)
{
for (int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist) {
TObject* obj = fArrayHisto.At(iHist);
if (TString(obj->GetName()).EqualTo(name)) { return iHist; } // if( TString( obj->GetName() ).EqualTo( name ) )
} // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
return -1;
}
bool CbmMqHistoServer::ResetHistograms()
{
for (int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist) {
dynamic_cast<TH1*>(fArrayHisto.At(iHist))->Reset();
} // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
return true;
}
bool CbmMqHistoServer::PrepareCanvas(uint32_t uCanvIdx)
{
CanvasConfig conf(ExtractCanvasConfigFromString(fvpsCanvasConfig[uCanvIdx].second));
/// First check if all objects to be drawn are present
uint32_t uNbPads(conf.GetNbPads());
for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
uint32_t uNbObj(conf.GetNbObjsInPad(uPadIdx));
for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
std::string sName(conf.GetObjName(uPadIdx, uObjIdx));
/// Check for empty pads!
if ("nullptr" != sName) {
if (FindHistogram(sName) < 0) {
return false;
} // if( FindHistogram( conf.GetObjName( uPadIdx, uObjIdx ) ) < 0 )
} // if( "nullptr" != sName )
} // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
} // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
/// Create new canvas and pads
TCanvas* pNewCanv = new TCanvas(conf.GetName().data(), conf.GetTitle().data());
pNewCanv->Divide(conf.GetNbPadsX(), conf.GetNbPadsY());
/// Loop on pads
for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
pNewCanv->cd(1 + uPadIdx);
/// Pad settings
gPad->SetGrid(conf.GetGridx(uPadIdx), conf.GetGridy(uPadIdx));
gPad->SetLogx(conf.GetLogx(uPadIdx));
gPad->SetLogy(conf.GetLogy(uPadIdx));
gPad->SetLogz(conf.GetLogz(uPadIdx));
/// Add objects (we know they are there
uint32_t uNbObj(conf.GetNbObjsInPad(uPadIdx));
for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
std::string sName(conf.GetObjName(uPadIdx, uObjIdx));
if ("nullptr" != sName) {
TObject* pObj = fArrayHisto[FindHistogram(sName)];
if (nullptr != dynamic_cast<TProfile*>(pObj)) {
dynamic_cast<TProfile*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
} // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
else if (nullptr != dynamic_cast<TH2*>(pObj)) {
dynamic_cast<TH2*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
} // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
else if (nullptr != dynamic_cast<TH1*>(pObj)) {
dynamic_cast<TH1*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
} // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
else
LOG(warning) << "Unsupported object type for " << sName << " when preparing canvas " << conf.GetName();
} // if( "nullptr" != sName )
} // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
} // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
fvCanvas[uCanvIdx] = std::pair<TCanvas*, std::string>(pNewCanv, "canvases");
fServer->Register(Form("/%s", fvCanvas[uCanvIdx].second.data()), fvCanvas[uCanvIdx].first);
fvbCanvasRegistered[uCanvIdx] = true;
LOG(info) << "registered canvas " << fvCanvas[uCanvIdx].first->GetName() << " in folder "
<< fvCanvas[uCanvIdx].second;
/// Update flag telling whether all known canvases are registered
fbAllCanvasRegistered = true;
for (uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx) {
if (!fvbCanvasRegistered[uIdx]) {
fbAllCanvasRegistered = false;
break;
} // if( !fvbCanvasRegistered[ uIdx ] )
} // for( uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx )
return true;
}
bool CbmMqHistoServer::SaveHistograms()
{
/// Save old global file and folder pointer to avoid messing with FairRoot
TFile* oldFile = gFile;
TDirectory* oldDir = gDirectory;
/// (Re-)Create ROOT file to store the histos
TFile* histoFile = nullptr;
// open separate histo file in recreate mode
histoFile = new TFile(fsHistoFileName.data(), "RECREATE");
if (nullptr == histoFile) return false;
/// Register the histos in the HTTP server
for (UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto) {
/// Make sure we end up in chosen folder
TString sFolder = fvHistos[uHisto].second.data();
if (nullptr == gDirectory->Get(sFolder)) gDirectory->mkdir(sFolder);
gDirectory->cd(sFolder);
/// Write plot
fvHistos[uHisto].first->Write();
histoFile->cd();
} // for( UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto )
for (UInt_t uCanvas = 0; uCanvas < fvCanvas.size(); ++uCanvas) {
/// Make sure we end up in chosen folder
TString sFolder = fvCanvas[uCanvas].second.data();
if (nullptr == gDirectory->Get(sFolder)) gDirectory->mkdir(sFolder);
gDirectory->cd(sFolder);
/// Write plot
fvCanvas[uCanvas].first->Write();
histoFile->cd();
} // for( UInt_t uHisto = 0; uHisto < fvCanvas.size(); ++uHisto )
/// Restore old global file and folder pointer to avoid messing with FairRoot
gFile = oldFile;
gDirectory = oldDir;
histoFile->Close();
return true;
}