-
Pierre-Alain Loizeau authoredPierre-Alain Loizeau authored
CbmMqHistoServer.cxx 20.00 KiB
/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmMqHistoServer.h"
#include "CbmFlesCanvasTools.h"
#include "FairMQProgOptions.h" // device->fConfig
#include <Logger.h>
#include "TCanvas.h"
#include "TEnv.h"
#include "TFile.h"
#include "TH1.h"
#include "TH2.h"
#include "THttpServer.h"
#include "TMessage.h"
#include "TObjArray.h"
#include "TProfile.h"
#include "TRootSniffer.h"
#include "TSystem.h"
#include "BoostSerializer.h"
#include <boost/serialization/utility.hpp>
#include <mutex>
#include "RootSerializer.h"
std::mutex mtx;
/*
Bool_t bMqHistoServerResetHistos = kFALSE;
Bool_t bMqHistoServerSaveHistos = kFALSE;
*/
CbmMqHistoServer::CbmMqHistoServer()
: FairMQDevice()
, fArrayHisto()
{
}
CbmMqHistoServer::~CbmMqHistoServer() {}
void CbmMqHistoServer::InitTask()
{
/// Read options from executable
LOG(info) << "Init options for CbmMqHistoServer.";
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
fsHistoFileName = fConfig->GetValue<std::string>("HistoFileName");
fuHttpServerPort = fConfig->GetValue<uint32_t>("histport");
/// Link channels to methods in order to process received messages
OnData(fsChannelNameHistosInput, &CbmMqHistoServer::ReceiveData);
OnData(fsChannelNameHistosConfig, &CbmMqHistoServer::ReceiveHistoConfig);
OnData(fsChannelNameCanvasConfig, &CbmMqHistoServer::ReceiveCanvasConfig);
/// If multi-parts, go to method processing combined Config+Data
OnData(fsChannelNameHistosInput, &CbmMqHistoServer::ReceiveConfigAndData);
fServer = new THttpServer(Form("http:%u", fuHttpServerPort));
/// To avoid the server sucking all Histos from gROOT when no output file is used
fServer->GetSniffer()->SetScanGlobalDir(kFALSE);
const char* jsrootsys = gSystem->Getenv("JSROOTSYS");
if (!jsrootsys) jsrootsys = gEnv->GetValue("HttpServ.JSRootPath", jsrootsys);
LOG(info) << "JSROOT location: " << jsrootsys;
//fServer->RegisterCommand("/Reset_Hist", "bMqHistoServerResetHistos=kTRUE");
//fServer->RegisterCommand("/Save_Hist", "bMqHistoServerSaveHistos=kTRUE");
//fServer->Restrict("/Reset_Hist", "allow=admin");
//fServer->Restrict("/Save_Hist", "allow=admin");
}
bool CbmMqHistoServer::ReceiveData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(debug) << "CbmMqHistoServer::ReceiveData => Processing histograms update";
TObject* tempObject = nullptr;
// Deserialize<RootSerializer>(*msg, tempObject);
RootSerializer().Deserialize(*msg, tempObject);
if (TString(tempObject->ClassName()).EqualTo("TObjArray")) {
std::lock_guard<std::mutex> lk(mtx);
TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) {
TObject* pObj = arrayHisto->At(i);
if (nullptr != dynamic_cast<TProfile*>(pObj)) {
if (!ReadHistogram<TProfile>(dynamic_cast<TProfile*>(pObj))) return false;
} // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
else if (nullptr != dynamic_cast<TH2*>(pObj)) {
if (!ReadHistogram<TH2>(dynamic_cast<TH2*>(pObj))) return false;
} // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
else if (nullptr != dynamic_cast<TH1*>(pObj)) {
if (!ReadHistogram<TH1>(dynamic_cast<TH1*>(pObj))) return false;
} // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
else
LOG(warning) << "Unsupported object type for " << pObj->GetName();
} // for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++)
LOG(debug) << "CbmMqHistoServer::ReceiveData => Deleting array";
/// Need to use Delete instead of Clear to avoid memory leak!!!
arrayHisto->Delete();
/// If new histos received, try to prepare as many canvases as possible
/// Should be expensive on start and cheap afterward
if (!fbAllCanvasReady) {
LOG(debug) << "CbmMqHistoServer::ReceiveData => Checking for canvases updates";
for (uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
/// Jump canvases already ready
if (fvbCanvasReady[uCanv]) continue;
/// Now come the expensive part as we unpack its config and check each histo
fvbCanvasReady[uCanv] = PrepareCanvas(uCanv);
} // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
} // if( !fbAllCanvasReady )
} // if (TString(tempObject->ClassName()).EqualTo("TObjArray"))
else
LOG(fatal) << "CbmMqHistoServer::ReceiveData => Wrong object type at input: " << tempObject->ClassName();
fNMessages += 1;
if (nullptr != tempObject) delete tempObject;
/*
/// TODO: control flags communication with histo server
/// Idea: 1 req channel (per device or not mixup?), polling every N TS and/or M s
if (bMqHistoServerResetHistos) {
std::lock_guard<std::mutex> lk(mtx);
// LOG(info) << "Reset Monitor histos ";
ResetHistograms();
bMqHistoServerResetHistos = kFALSE;
} // if( bMqHistoServerResetHistos )
if (bMqHistoServerSaveHistos) {
std::lock_guard<std::mutex> lk(mtx);
// LOG(info) << "Save All histos & canvases";
SaveHistograms();
bMqHistoServerSaveHistos = kFALSE;
} // if( bMqHistoServerSaveHistos )
*/
LOG(debug) << "CbmMqHistoServer::ReceiveData => Finished processing histograms update";
return true;
}
bool CbmMqHistoServer::ReceiveHistoConfig(FairMQMessagePtr& msg, int /*index*/)
{
std::pair<std::string, std::string> tempObject;
// Deserialize<BoostSerializer<std::pair<std::string, std::string>>>(*msg, tempObject);
BoostSerializer<std::pair<std::string, std::string>>().Deserialize(*msg, tempObject);
LOG(info) << " Received configuration for histo " << tempObject.first << " : " << tempObject.second;
/// Check if histo name already received in previous messages
/// Linear search should be ok as config is shared only at startup
UInt_t uPrevHist = 0;
for (uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist) {
if (fvpsHistosFolder[uPrevHist].first == tempObject.first) break;
} // for( UInt_t uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist )
if (uPrevHist < fvpsHistosFolder.size()) {
LOG(info) << " Ignored new configuration for histo " << tempObject.first
<< " due to previously received one: " << tempObject.second;
/// Not sure if we should return false here...
} // if( uPrevHist < fvpsHistosFolder.size() )
else {
fvpsHistosFolder.push_back(tempObject);
fvHistos.push_back(std::pair<TNamed*, std::string>(nullptr, ""));
fvbHistoRegistered.push_back(false);
fbAllHistosRegistered = false;
} // else of if( uPrevHist < fvpsHistosFolder.size() )
return true;
}
bool CbmMqHistoServer::ReceiveCanvasConfig(FairMQMessagePtr& msg, int /*index*/)
{
std::pair<std::string, std::string> tempObject;
// Deserialize<BoostSerializer<std::pair<std::string, std::string>>>(*msg, tempObject);
BoostSerializer<std::pair<std::string, std::string>>().Deserialize(*msg, tempObject);
LOG(info) << " Received configuration for canvas " << tempObject.first << " : " << tempObject.second;
/// Check if canvas name already received in previous messages
/// Linear search should be ok as config is shared only at startup
uint32_t uPrevCanv = 0;
for (uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv) {
if (fvpsCanvasConfig[uPrevCanv].first == tempObject.first) break;
} // for( UInt_t uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv )
if (uPrevCanv < fvpsCanvasConfig.size()) {
LOG(warning) << " Ignored new configuration for Canvas " << tempObject.first
<< " due to previously received one: " << tempObject.second;
/// Not sure if we should return false here...
} // if( uPrevCanv < fvpsCanvasConfig.size() )
else {
fvpsCanvasConfig.push_back(tempObject);
fvbCanvasReady.push_back(false);
fbAllCanvasReady = false;
fvCanvas.push_back(std::pair<TCanvas*, std::string>(nullptr, ""));
fvbCanvasRegistered.push_back(false);
fbAllCanvasRegistered = false;
} // else of if( uPrevCanv < fvpsCanvasConfig.size() )
return true;
}
bool CbmMqHistoServer::ReceiveConfigAndData(FairMQParts& parts, int /*index*/)
{
/// Reject anything but a at least Header + Histo Config + Canvas Config + Histo Data
if (parts.Size() < 4) {
if (1 == parts.Size()) {
/// PAL, 09/04/2021, Debug message catching missed method overload/polymorphism:
/// contrary to my expectation, if 2 method bound to same channel, one with FairMQMessagePtr and one with
/// FairMQParts, all messages go to multipart version and FairMQMessagePtr is converted to size 1 FairMQParts
LOG(debug) << "CbmMqHistoServer::ReceiveConfigAndData => only 1 parts found in input, "
<< "assuming data only message routed to wrong method!";
return ReceiveData(parts.At(0), 0);
} // if( 1 == parts.Size() )
LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => Wrong number of parts: " << parts.Size()
<< " instead of at least 4 (Header + Histo Config + Canvas config + Data)!";
} // if( parts.Size() < 4 )
LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Received composed message with " << parts.Size() << " parts";
/// Header contains a pair of
std::pair<uint32_t, uint32_t> pairHeader;
// Deserialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*parts.At(0), pairHeader);
BoostSerializer<std::pair<uint32_t, uint32_t>>().Deserialize(*parts.At(0), pairHeader);
LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Received configuration for " << pairHeader.first
<< " histos and " << pairHeader.second << " canvases";
uint32_t uOffsetHistoConfig = pairHeader.first;
if (0 == pairHeader.first) {
uOffsetHistoConfig = 1;
if (0 < (parts.At(uOffsetHistoConfig))->GetSize()) {
LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => No histo config expected but corresponding message is"
<< " not empty: " << (parts.At(uOffsetHistoConfig))->GetSize();
}
}
uint32_t uOffsetCanvasConfig = pairHeader.second;
if (0 == pairHeader.second) {
uOffsetCanvasConfig = 1;
if (0 < (parts.At(uOffsetHistoConfig + uOffsetCanvasConfig))->GetSize()) {
LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => No Canvas config expected but corresponding message is"
<< " not empty: " << (parts.At(uOffsetHistoConfig + uOffsetCanvasConfig))->GetSize();
}
}
if (static_cast<size_t>(parts.Size()) != 1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1) {
LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => Number of parts not matching header: " << parts.Size()
<< " instead of " << 1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1;
} // if( parts.Size() != 1 + pairHeader.first + pairHeader.second )
/// Decode parts for histograms configuration
for (uint32_t uHisto = 0; uHisto < pairHeader.first; ++uHisto) {
ReceiveHistoConfig(parts.At(1 + uHisto), 0);
} // for (UInt_t uHisto = 0; uHisto < pairHeader.first; ++uHisto)
/// Decode parts for histograms configuration
for (uint32_t uCanv = 0; uCanv < pairHeader.second; ++uCanv) {
ReceiveCanvasConfig(parts.At(1 + uOffsetHistoConfig + uCanv), 0);
} // for (UInt_t uCanv = 0; uCanv < pairHeader.second; ++uCanv)
/// Decode the histograms data now that the configuration is loaded
ReceiveData(parts.At(1 + uOffsetHistoConfig + uOffsetCanvasConfig), 0);
LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Finished processing composed message with " << parts.Size()
<< " parts";
return true;
}
void CbmMqHistoServer::PreRun()
{
fStopThread = false;
fThread = std::thread(&CbmMqHistoServer::UpdateHttpServer, this);
}
void CbmMqHistoServer::UpdateHttpServer()
{
while (!fStopThread) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lk(mtx);
fServer->ProcessRequests();
}
}
void CbmMqHistoServer::PostRun()
{
SaveHistograms();
fStopThread = true;
fThread.join();
SaveHistograms();
}
template<class HistoT>
bool CbmMqHistoServer::ReadHistogram(HistoT* pHist)
{
int index1 = FindHistogram(pHist->GetName());
if (-1 == index1) {
HistoT* histogram_new = static_cast<HistoT*>(pHist->Clone());
fArrayHisto.Add(histogram_new);
LOG(info) << "Received new histo " << pHist->GetName();
/// If new histo received, try to register it if configuration available
if (!fbAllHistosRegistered) {
for (uint32_t uHist = 0; uHist < fvpsHistosFolder.size(); ++uHist) {
/// Jump histos already ready
if (fvbHistoRegistered[uHist]) continue;
/// Check if name matches one in config for others
if (fvpsHistosFolder[uHist].first == histogram_new->GetName()) {
fvHistos[uHist] = std::pair<TNamed*, std::string>(histogram_new, fvpsHistosFolder[uHist].second);
fServer->Register(Form("/%s", fvHistos[uHist].second.data()), fvHistos[uHist].first);
fvbHistoRegistered[uHist] = true;
LOG(info) << "registered histo " << fvHistos[uHist].first->GetName() << " in folder "
<< fvHistos[uHist].second;
/// Update flag telling whether all known histos are registered
fbAllHistosRegistered = true;
for (uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx) {
if (!fvbHistoRegistered[uIdx]) {
fbAllHistosRegistered = false;
break;
} // if( !fvbHistoRegistered[ uIdx ] )
} // for( uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx )
break;
} // if( fvpsHistosFolder[ uHist ].first == histogram_new->GetName() )
} // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
} // if( !fbAllCanvasReady )
} // if (-1 == index1)
else {
HistoT* histogram_existing = dynamic_cast<HistoT*>(fArrayHisto.At(index1));
if (nullptr == histogram_existing) {
LOG(error) << "CbmMqHistoServer::ReadHistogram => "
<< "Incompatible type found during update for histo " << pHist->GetName();
return false;
} // if( nullptr == histogram_existing )
histogram_existing->Add(pHist);
} // else of if (-1 == index1)
return true;
}
int CbmMqHistoServer::FindHistogram(const std::string& name)
{
for (int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist) {
TObject* obj = fArrayHisto.At(iHist);
if (TString(obj->GetName()).EqualTo(name)) { return iHist; } // if( TString( obj->GetName() ).EqualTo( name ) )
} // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
return -1;
}
bool CbmMqHistoServer::ResetHistograms()
{
for (int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist) {
dynamic_cast<TH1*>(fArrayHisto.At(iHist))->Reset();
} // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
return true;
}
bool CbmMqHistoServer::PrepareCanvas(uint32_t uCanvIdx)
{
LOG(debug) << " Extracting configuration for canvas index " << uCanvIdx;
CanvasConfig conf(ExtractCanvasConfigFromString(fvpsCanvasConfig[uCanvIdx].second));
/// First check if all objects to be drawn are present
uint32_t uNbPads(conf.GetNbPads());
for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
uint32_t uNbObj(conf.GetNbObjsInPad(uPadIdx));
for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
std::string sName(conf.GetObjName(uPadIdx, uObjIdx));
/// Check for empty pads!
if ("nullptr" != sName) {
if (FindHistogram(sName) < 0) {
return false;
} // if( FindHistogram( conf.GetObjName( uPadIdx, uObjIdx ) ) < 0 )
} // if( "nullptr" != sName )
} // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
} // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
LOG(info) << " All histos found for canvas " << conf.GetName().data() << ", now preparing it";
/// Create new canvas and pads
TCanvas* pNewCanv = new TCanvas(conf.GetName().data(), conf.GetTitle().data());
pNewCanv->Divide(conf.GetNbPadsX(), conf.GetNbPadsY());
/// Loop on pads
for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
pNewCanv->cd(1 + uPadIdx);
/// Pad settings
gPad->SetGrid(conf.GetGridx(uPadIdx), conf.GetGridy(uPadIdx));
gPad->SetLogx(conf.GetLogx(uPadIdx));
gPad->SetLogy(conf.GetLogy(uPadIdx));
gPad->SetLogz(conf.GetLogz(uPadIdx));
/// Add objects (we know they are there
uint32_t uNbObj(conf.GetNbObjsInPad(uPadIdx));
for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
std::string sName(conf.GetObjName(uPadIdx, uObjIdx));
if ("nullptr" != sName) {
TObject* pObj = fArrayHisto[FindHistogram(sName)];
if (nullptr != dynamic_cast<TProfile*>(pObj)) {
dynamic_cast<TProfile*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
} // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
else if (nullptr != dynamic_cast<TH2*>(pObj)) {
dynamic_cast<TH2*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
} // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
else if (nullptr != dynamic_cast<TH1*>(pObj)) {
dynamic_cast<TH1*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
} // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
else
LOG(warning) << " Unsupported object type for " << sName << " when preparing canvas " << conf.GetName();
LOG(info) << " Configured histo " << sName << " on pad " << (1 + uPadIdx) << " for canvas "
<< conf.GetName().data();
} // if( "nullptr" != sName )
} // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
} // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
fvCanvas[uCanvIdx] = std::pair<TCanvas*, std::string>(pNewCanv, "canvases");
fServer->Register(Form("/%s", fvCanvas[uCanvIdx].second.data()), fvCanvas[uCanvIdx].first);
fvbCanvasRegistered[uCanvIdx] = true;
LOG(info) << " Registered canvas " << fvCanvas[uCanvIdx].first->GetName() << " in folder "
<< fvCanvas[uCanvIdx].second;
/// Update flag telling whether all known canvases are registered
fbAllCanvasRegistered = true;
for (uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx) {
if (!fvbCanvasRegistered[uIdx]) {
fbAllCanvasRegistered = false;
break;
} // if( !fvbCanvasRegistered[ uIdx ] )
} // for( uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx )
return true;
}
bool CbmMqHistoServer::SaveHistograms()
{
/// Save old global file and folder pointer to avoid messing with FairRoot
TFile* oldFile = gFile;
TDirectory* oldDir = gDirectory;
/// (Re-)Create ROOT file to store the histos
TFile* histoFile = nullptr;
// open separate histo file in recreate mode
histoFile = new TFile(fsHistoFileName.data(), "RECREATE");
LOG(info) << "Save Histos in file " << fsHistoFileName.data();
if (nullptr == histoFile) return false;
/// Register the histos in the HTTP server
for (UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto) {
/// Make sure we end up in chosen folder
TString sFolder = fvHistos[uHisto].second.data();
if (nullptr == gDirectory->Get(sFolder)) gDirectory->mkdir(sFolder);
gDirectory->cd(sFolder);
/// Write plot
fvHistos[uHisto].first->Write();
histoFile->cd();
} // for( UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto )
for (UInt_t uCanvas = 0; uCanvas < fvCanvas.size(); ++uCanvas) {
/// Make sure we end up in chosen folder
TString sFolder = fvCanvas[uCanvas].second.data();
if (nullptr == gDirectory->Get(sFolder)) gDirectory->mkdir(sFolder);
gDirectory->cd(sFolder);
/// Write plot
fvCanvas[uCanvas].first->Write();
histoFile->cd();
} // for( UInt_t uHisto = 0; uHisto < fvCanvas.size(); ++uHisto )
/// Restore old global file and folder pointer to avoid messing with FairRoot
gFile = oldFile;
gDirectory = oldDir;
histoFile->Close();
return true;
}