Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • le.koch/cbmroot
  • patrick.pfistner_AT_kit.edu/cbmroot
  • lena.rossel_AT_stud.uni-frankfurt.de/cbmroot
  • i.deppner/cbmroot
  • fweig/cbmroot
  • karpushkin_AT_inr.ru/cbmroot
  • v.akishina/cbmroot
  • rishat.sultanov_AT_cern.ch/cbmroot
  • l_fabe01_AT_uni-muenster.de/cbmroot
  • pwg-c2f/cbmroot
  • j.decuveland/cbmroot
  • a.toia/cbmroot
  • i.vassiliev/cbmroot
  • n.herrmann/cbmroot
  • o.lubynets/cbmroot
  • se.gorbunov/cbmroot
  • cornelius.riesen_AT_physik.uni-giessen.de/cbmroot
  • zhangqn17_AT_mails.tsinghua.edu.cn/cbmroot
  • bartosz.sobol/cbmroot
  • ajit.kumar/cbmroot
  • computing/cbmroot
  • a.agarwal_AT_vecc.gov.in/cbmroot
  • osingh/cbmroot
  • wielanek_AT_if.pw.edu.pl/cbmroot
  • malgorzata.karabowicz.stud_AT_pw.edu.pl/cbmroot
  • m.shiroya/cbmroot
  • s.roy/cbmroot
  • p.-a.loizeau/cbmroot
  • a.weber/cbmroot
  • ma.beyer/cbmroot
  • d.klein/cbmroot
  • d.smith/cbmroot
  • mvdsoft/cbmroot
  • d.spicker/cbmroot
  • y.h.leung/cbmroot
  • m.deveaux/cbmroot
  • mkunold/cbmroot
  • h.darwish/cbmroot
  • f_fido01_AT_uni-muenster.de/cbmroot
  • g.kozlov/cbmroot
  • d.emschermann/cbmroot
  • evgeny.lavrik/cbmroot
  • v.friese/cbmroot
  • f.uhlig/cbmroot
  • ebechtel_AT_ikf.uni-frankfurt.de/cbmroot
  • a.senger/cbmroot
  • praisig/cbmroot
  • s.lebedev/cbmroot
  • redelbach_AT_compeng.uni-frankfurt.de/cbmroot
  • p.subramani/cbmroot
  • a_meye37_AT_uni-muenster.de/cbmroot
  • om/cbmroot
  • o.golosov/cbmroot
  • l.chlad/cbmroot
  • a.bercuci/cbmroot
  • d.ramirez/cbmroot
  • v.singhal/cbmroot
  • h.schiller/cbmroot
  • apuntke/cbmroot
  • f.zorn/cbmroot
  • rubio_AT_physi.uni-heidelberg.de/cbmroot
  • p.chudoba/cbmroot
  • apuntke/mcbmroot
  • r.karabowicz/cbmroot
64 results
Show changes
Showing
with 1275 additions and 1371 deletions
/* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
#include "CbmMQChannels.h"
#include "FairMQDevice.h"
CbmMQChannels::CbmMQChannels(std::vector<std::string> allowedChannels)
: fAllowedChannels{allowedChannels}
{
fChannelsToSend.resize(fAllowedChannels.size());
for(auto &entry : fChannelsToSend) {
entry.push_back("");
}
fComponentsToSend.resize(fAllowedChannels.size());
}
CbmMQChannels::CbmMQChannels(std::vector<std::string> allowedChannels) : fAllowedChannels {allowedChannels}
{
fChannelsToSend.resize(fAllowedChannels.size());
for (auto& entry : fChannelsToSend) {
entry.push_back("");
}
fComponentsToSend.resize(fAllowedChannels.size());
}
bool CbmMQChannels::IsChannelNameAllowed(std::string channelName)
{
for(auto const &entry : fAllowedChannels) {
for (auto const& entry : fAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1!=std::string::npos) {
if (pos1 != std::string::npos) {
const std::vector<std::string>::const_iterator pos =
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const std::vector<std::string>::size_type idx = pos-fAllowedChannels.begin();
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const std::vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName
<< " found in list of allowed channel names at position " << idx;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
fComponentsToSend[idx]++;
// The array is initialized with one empty string. If the string has still teh value from initialization
// exchnge the value by the new channel name. In any other case add one more entry to the vector
if (fChannelsToSend[idx].size() == 1 && fChannelsToSend[idx].at(0).empty()) {
fChannelsToSend[idx].at(0) = channelName;
} else {
fChannelsToSend[idx].push_back(channelName);
fChannelsToSend[idx].at(0) = channelName;
}
else {
fChannelsToSend[idx].push_back(channelName);
}
return true;
}
}
LOG(info) << "Channel name " << channelName
<< " not found in list of allowed channel names.";
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(info) << "The allowed channels are: ";
for(auto const &entry : fAllowedChannels) {
LOG(info) << entry;
for (auto const& entry : fAllowedChannels) {
LOG(info) << entry;
}
LOG(error) << "Stop device.";
return false;
......@@ -55,7 +58,7 @@ bool CbmMQChannels::CheckChannels(FairMQDevice* device)
// data on this channel.
int noChannel = device->fChannels.size();
LOG(info) << "Number of defined output channels: " << noChannel;
for(auto const &entry : device->fChannels) {
for (auto const& entry : device->fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) return false;
}
......
/* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
#ifndef CBMMQCHANNELS_H_
#define CBMMQCHANNELS_H_
#include <vector>
#include "FairMQDevice.h"
#include <string>
#include <vector>
class FairMQDevice;
class CbmMQChannels
{
public:
class CbmMQChannels {
public:
CbmMQChannels(std::vector<std::string>);
bool IsChannelNameAllowed(std::string channelName);
bool CheckChannels(FairMQDevice* device);
std::vector<int> GetComponentsToSend() { return fComponentsToSend; }
std::vector<std::vector<std::string>> GetChannelsToSend() { return fChannelsToSend; }
private:
private:
std::vector<std::string> fAllowedChannels {};
std::vector<int> fComponentsToSend {};
std::vector<std::vector<std::string>> fChannelsToSend { {} };
std::vector<std::vector<std::string>> fChannelsToSend {{}};
};
#endif
//#ifdef HAVE_FAIRMQSTATEMACHINE
//#include "FairMQStateMachine.h"
//#endif
/* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
#include "FairMQDevice.h"
namespace cbm {
namespace mq {
enum class Transition : int {
namespace cbm
{
namespace mq
{
enum class Transition : int
{
Idle,
DeviceReady,
Ready,
......@@ -14,72 +18,45 @@ namespace cbm {
ErrorFound
};
enum class State : int {
enum class State : int
{
Running
};
void ChangeState(FairMQDevice* device, cbm::mq::Transition transition) {
#ifdef HAVE_FAIRMQSTATEMACHINE
if(transition == cbm::mq::Transition::ErrorFound) {
device->ChangeState(FairMQStateMachine::Event::ERROR_FOUND);
} else if (transition == cbm::mq::Transition::End) {
device->ChangeState(FairMQStateMachine::Event::END);
} else if (transition == cbm::mq::Transition::Ready) {
device->ChangeState(FairMQStateMachine::Event::internal_READY);
} else if (transition == cbm::mq::Transition::DeviceReady) {
device->ChangeState(FairMQStateMachine::Event::internal_DEVICE_READY);
} else if (transition == cbm::mq::Transition::Idle) {
device->ChangeState(FairMQStateMachine::Event::internal_IDLE);
} else {
LOG(fatal) << "State Change not yet implemented";
device->ChangeState(FairMQStateMachine::Event::ERROR_FOUND);
}
#else
if(transition == cbm::mq::Transition::ErrorFound) {
device->ChangeState(fair::mq::Transition::ErrorFound);
} else if (transition == cbm::mq::Transition::End) {
void ChangeState(FairMQDevice* device, cbm::mq::Transition transition)
{
if (transition == cbm::mq::Transition::ErrorFound) { device->ChangeState(fair::mq::Transition::ErrorFound); }
else if (transition == cbm::mq::Transition::End) {
device->ChangeState(fair::mq::Transition::End);
} else if (transition == cbm::mq::Transition::Ready) {
}
else if (transition == cbm::mq::Transition::Ready) {
device->ChangeState(fair::mq::Transition::ResetTask);
} else if (transition == cbm::mq::Transition::DeviceReady) {
}
else if (transition == cbm::mq::Transition::DeviceReady) {
device->ChangeState(fair::mq::Transition::ResetDevice);
} else if (transition == cbm::mq::Transition::Idle) {
}
else if (transition == cbm::mq::Transition::Idle) {
device->ChangeState(fair::mq::Transition::Stop);
} else {
LOG(fatal) << "State Change not yet implemented";
device->ChangeState(fair::mq::Transition::ErrorFound);
}
#endif
else {
LOG(fatal) << "State Change not yet implemented";
device->ChangeState(fair::mq::Transition::ErrorFound);
}
}
void LogState(FairMQDevice* device) {
#ifdef HAVE_FAIRMQSTATEMACHINE
// LOG(info) << "Current State: " << FairMQStateMachine::GetCurrentStateName();
LOG(info) << "Current State: " << device->GetCurrentStateName();
#else
LOG(info) << "Current State: " << device->GetCurrentStateName();
#endif
}
bool CheckCurrentState(FairMQDevice* device, cbm::mq::State state) {
#ifdef HAVE_FAIRMQSTATEMACHINE
if(state == cbm::mq::State::Running) {
return device->CheckCurrentState(FairMQStateMachine::State::RUNNING);
} else {
LOG(fatal) << "State not yet implemented";
device->ChangeState(FairMQStateMachine::Event::ERROR_FOUND);
return 0;
}
#else
if(state == cbm::mq::State::Running) {
return !(device->NewStatePending());
} else {
LOG(fatal) << "State not yet implemented";
device->ChangeState(fair::mq::Transition::ErrorFound);
return 0;
}
#endif
void LogState(FairMQDevice* device)
{
LOG(info) << "Current State: " << device->GetCurrentStateName();
}
}
}
bool CheckCurrentState(FairMQDevice* device, cbm::mq::State state)
{
if (state == cbm::mq::State::Running) { return !(device->NewStatePending()); }
else {
LOG(fatal) << "State not yet implemented";
device->ChangeState(fair::mq::Transition::ErrorFound);
return 0;
}
}
} // namespace mq
} // namespace cbm
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#ifndef CBMMQTMESSAGE_H_
#define CBMMQTMESSAGE_H_
#include "TMessage.h"
// special class to expose protected TMessage constructor
class CbmMqTMessage : public TMessage {
public:
CbmMqTMessage(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
};
#endif /* CBMMQTMESSAGE_H_ */
......@@ -26,7 +26,7 @@ Set(SYSTEM_INCLUDE_DIRECTORIES
${FAIRMQ_INCLUDE_DIR}
${FAIRMQ_INCLUDE_DIR}/options
${IPC_INCLUDE_DIRECTORY}
${FLES_IPC_INCLUDE_DIRECTORY}
${CBMROOT_SOURCE_DIR}/external/cppzmq
)
......@@ -66,13 +66,18 @@ If(FAIRLOGGER_FOUND)
)
EndIf()
set(EXE_NAME EventBuilderEtofStar2019)
set(SRCS CbmDeviceEventBuilderEtofStar2019.cxx runEventBuilderEtofStar2019.cxx)
set(DEPENDENCIES
# Dependencies common to all executables
set(DEPENDENCIES_ALL
${DEPENDENCIES}
${FAIR_LIBS}
${BOOST_LIBS}
fles_ipc
)
set(EXE_NAME EventBuilderEtofStar2019)
set(SRCS CbmDeviceEventBuilderEtofStar2019.cxx runEventBuilderEtofStar2019.cxx)
set(DEPENDENCIES
${DEPENDENCIES_ALL}
external::fles_ipc
CbmFlibStar2019
CbmFlibMcbm2018
CbmBase
......@@ -94,10 +99,8 @@ Set(NO_DICT_SRCS
# Mask warning from file provided by STAR
SET_SOURCE_FILES_PROPERTIES(${CBMROOT_SOURCE_DIR}/fles/star2017/unpacker/star_rhicf.c PROPERTIES COMPILE_FLAGS -Wno-pointer-sign)
set(DEPENDENCIES
${DEPENDENCIES}
${FAIR_LIBS}
${BOOST_LIBS}
fles_ipc
${DEPENDENCIES_ALL}
external::fles_ipc
CbmFlibStar2019
CbmFlibMcbm2018
CbmBase
......
/* Copyright (C) 2019-2021 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer] */
/**
* CbmDeviceEventBuilderEtofStar2019.cxx
*/
#include "CbmDeviceEventBuilderEtofStar2019.h"
#include "CbmMQDefs.h"
#include "CbmMQDefs.h"
#include "CbmStar2019EventBuilderEtofAlgo.h"
#include "CbmStar2019TofPar.h"
#include "StorableTimeslice.hpp"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairRuntimeDb.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairParGenericSet.h"
#include "FairRuntimeDb.h"
#include "THttpServer.h"
#include "TROOT.h"
#include "TFile.h"
#include "TString.h"
#include "TH1.h"
#include "TH2.h"
#include "THttpServer.h"
#include "TROOT.h"
#include "TString.h"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
......@@ -28,73 +32,71 @@
// include this header to serialize vectors
#include <boost/serialization/vector.hpp>
#include <string>
#include <iomanip>
#include <array>
#include <iomanip>
#include <stdexcept>
struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; };
#include <string>
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
using namespace std;
//static Int_t iMess=0;
const Int_t DetMask = 0x003FFFFF;
static uint fiSelectComponents{0};
const Int_t DetMask = 0x003FFFFF;
static uint fiSelectComponents {0};
CbmDeviceEventBuilderEtofStar2019::CbmDeviceEventBuilderEtofStar2019()
: //CbmDeviceUnpackTofMcbm2018(),
fNumMessages(0),
fbMonitorMode( kFALSE ),
fbDebugMonitorMode( kFALSE ),
fbSandboxMode( kFALSE ),
fbEventDumpEna( kFALSE ),
fParCList( nullptr ),
fulTsCounter( 0 ),
fNumEvt(0),
fEventBuilderAlgo( nullptr ),
fTimer(),
fUnpackPar( nullptr ),
fpBinDumpFile( nullptr )
: //CbmDeviceUnpackTofMcbm2018(),
fNumMessages(0)
, fbMonitorMode(kFALSE)
, fbDebugMonitorMode(kFALSE)
, fbSandboxMode(kFALSE)
, fbEventDumpEna(kFALSE)
, fParCList(nullptr)
, fulTsCounter(0)
, fNumEvt(0)
, fEventBuilderAlgo(nullptr)
, fTimer()
, fUnpackPar(nullptr)
, fpBinDumpFile(nullptr)
{
fEventBuilderAlgo = new CbmStar2019EventBuilderEtofAlgo();
fEventBuilderAlgo = new CbmStar2019EventBuilderEtofAlgo();
}
CbmDeviceEventBuilderEtofStar2019::~CbmDeviceEventBuilderEtofStar2019()
{
delete fEventBuilderAlgo;
}
CbmDeviceEventBuilderEtofStar2019::~CbmDeviceEventBuilderEtofStar2019() { delete fEventBuilderAlgo; }
void CbmDeviceEventBuilderEtofStar2019::InitTask()
try
{
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for(auto const &entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
if(entry.first == "syscmd") {
OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleMessage);
continue;
}
//if(entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleData);
if(entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleParts);
else {
fChannelsToSend[0].push_back(entry.first);
LOG(info) << "Init to send data to channel " << fChannelsToSend[0][0];
}
try {
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
if (entry.first == "syscmd") {
OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleMessage);
continue;
}
//if(entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleData);
if (entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleParts);
else {
fChannelsToSend[0].push_back(entry.first);
LOG(info) << "Init to send data to channel " << fChannelsToSend[0][0];
}
InitContainers();
} catch (InitTaskError& e) {
}
InitContainers();
}
catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
......@@ -102,21 +104,19 @@ try
bool CbmDeviceEventBuilderEtofStar2019::IsChannelNameAllowed(std::string channelName)
{
for(auto const &entry : fAllowedChannels) {
for (auto const& entry : fAllowedChannels) {
LOG(info) << "Inspect " << entry;
std::size_t pos1 = channelName.find(entry);
if (pos1!=std::string::npos) {
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos-fAllowedChannels.begin();
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName
<< " found in list of allowed channel names at position " << idx;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
return true;
}
}
LOG(info) << "Channel name " << channelName
<< " not found in list of allowed channel names.";
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
......@@ -124,49 +124,44 @@ bool CbmDeviceEventBuilderEtofStar2019::IsChannelNameAllowed(std::string channel
Bool_t CbmDeviceEventBuilderEtofStar2019::InitContainers()
{
LOG(info) << "Init parameter containers for CbmDeviceEventBuilderEtofStar2019.";
// FairRuntimeDb* fRtdb = FairRuntimeDb::instance();
// FairRuntimeDb* fRtdb = FairRuntimeDb::instance();
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
std::string message{"CbmStar2019TofPar,111"};
std::string message {"CbmStar2019TofPar,111"};
LOG(info) << "Requesting parameter container CbmStar2019TofPar, sending message: " << message;
FairMQMessagePtr req(NewSimpleMessage("CbmStar2019TofPar,111"));
FairMQMessagePtr rep(NewMessage());
if (Send(req, "parameters") > 0)
{
if (Receive(rep, "parameters") >= 0)
{
if (rep->GetSize() != 0)
{
CbmMQTMessage tmsg(rep->GetData(), rep->GetSize());
fUnpackPar = dynamic_cast<CbmStar2019TofPar*>(tmsg.ReadObject(tmsg.GetClass()));
LOG(info) << "Received unpack parameter from parmq server: " << fUnpackPar;
fUnpackPar->Print();
}
else
{
LOG(error) << "Received empty reply. Parameter not available";
}
}
if (Send(req, "parameters") > 0) {
if (Receive(rep, "parameters") >= 0) {
if (rep->GetSize() != 0) {
CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
fUnpackPar = dynamic_cast<CbmStar2019TofPar*>(tmsg.ReadObject(tmsg.GetClass()));
LOG(info) << "Received unpack parameter from parmq server: " << fUnpackPar;
fUnpackPar->Print();
}
else {
LOG(error) << "Received empty reply. Parameter not available";
}
}
}
SetParContainers();
SetParContainers();
Bool_t initOK = kTRUE;
initOK &= fEventBuilderAlgo->InitContainers();
initOK &= ReInitContainers(); // needed for TInt parameters
fEventBuilderAlgo->SetAddStatusToEvent( true );
if( kTRUE == fbMonitorMode )
{ // CreateHistograms();
initOK &= fEventBuilderAlgo->CreateHistograms();
Bool_t initOK = kTRUE;
initOK &= fEventBuilderAlgo->InitContainers();
initOK &= ReInitContainers(); // needed for TInt parameters
fEventBuilderAlgo->SetAddStatusToEvent(true);
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector< std::pair< TNamed *, std::string > > vHistos = fEventBuilderAlgo->GetHistoVector();
/* FIXME
if (kTRUE == fbMonitorMode) { // CreateHistograms();
initOK &= fEventBuilderAlgo->CreateHistograms();
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector<std::pair<TNamed*, std::string>> vHistos = fEventBuilderAlgo->GetHistoVector();
/* FIXME
/// Register the histos in the HTTP server
THttpServer* server = FairRunOnline::Instance()->GetHttpServer();
for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
......@@ -177,126 +172,117 @@ Bool_t CbmDeviceEventBuilderEtofStar2019::InitContainers()
server->RegisterCommand("/Reset_EvtBuild_Hist", "bStarEtof2019EventBuilderResetHistos=kTRUE");
server->Restrict("/Reset_EvtBuild_Hist", "allow=admin");
*/
} // if( kTRUE == fbMonitorMode )
} // if( kTRUE == fbMonitorMode )
return initOK;
}
void CbmDeviceEventBuilderEtofStar2019::SetParContainers()
{
FairRuntimeDb* fRtdb = FairRuntimeDb::instance();
fParCList = fEventBuilderAlgo->GetParList();
LOG(info) << "Setting parameter containers for " << fParCList->GetEntries() << " entries ";
for( Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC )
{
FairParGenericSet* tempObj = (FairParGenericSet*)(fParCList->At(iparC));
fParCList->Remove(tempObj);
std::string sParamName{ tempObj->GetName() };
FairParGenericSet* newObj = dynamic_cast<FairParGenericSet*>( fRtdb->getContainer( sParamName.data() ) );
LOG(info) << " - Get " << sParamName.data() << " at " << newObj;
if( nullptr == newObj )
{
FairRuntimeDb* fRtdb = FairRuntimeDb::instance();
LOG(error) << "Failed to obtain parameter container " << sParamName
<< ", for parameter index " << iparC;
return;
} // if( nullptr == newObj )
if( iparC == 0 ) {
newObj=(FairParGenericSet *) fUnpackPar;
LOG(info) << " - Mod " << sParamName.data() << " to " << newObj;
}
fParCList->AddAt(newObj, iparC);
// delete tempObj;
} // for( Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC )
fParCList = fEventBuilderAlgo->GetParList();
LOG(info) << "Setting parameter containers for " << fParCList->GetEntries() << " entries ";
for (Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC) {
FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
fParCList->Remove(tempObj);
std::string sParamName {tempObj->GetName()};
FairParGenericSet* newObj = dynamic_cast<FairParGenericSet*>(fRtdb->getContainer(sParamName.data()));
LOG(info) << " - Get " << sParamName.data() << " at " << newObj;
if (nullptr == newObj) {
LOG(error) << "Failed to obtain parameter container " << sParamName << ", for parameter index " << iparC;
return;
} // if( nullptr == newObj )
if (iparC == 0) {
newObj = (FairParGenericSet*) fUnpackPar;
LOG(info) << " - Mod " << sParamName.data() << " to " << newObj;
}
fParCList->AddAt(newObj, iparC);
// delete tempObj;
} // for( Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC )
}
void CbmDeviceEventBuilderEtofStar2019::AddMsComponentToList( size_t component, UShort_t usDetectorId )
void CbmDeviceEventBuilderEtofStar2019::AddMsComponentToList(size_t component, UShort_t usDetectorId)
{
fEventBuilderAlgo->AddMsComponentToList( component, usDetectorId );
fEventBuilderAlgo->AddMsComponentToList(component, usDetectorId);
}
Bool_t CbmDeviceEventBuilderEtofStar2019::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
{
if( 0 == fulTsCounter )
{
LOG(info) << "FIXME ===> Jumping 1st TS as corrupted with current FW + FLESNET combination";
fulTsCounter++;
return kTRUE;
} // if( 0 == fulTsCounter )
if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
{
LOG(error) << "Failed processing TS " << ts.index()
<< " in event builder algorithm class";
return kTRUE;
} // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
std::vector< CbmTofStarSubevent2019 > & eventBuffer = fEventBuilderAlgo->GetEventBuffer();
for( UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent )
{
/// Send the sub-event to the STAR systems
Int_t iBuffSzByte = 0;
void * pDataBuff = eventBuffer[ uEvent ].BuildOutput( iBuffSzByte );
if( NULL != pDataBuff )
{
/// Valid output, do stuff with it!
// Bool_t fbSendEventToStar = kFALSE;
if( kFALSE == fbSandboxMode )
{
/*
if (0 == fulTsCounter) {
LOG(info) << "FIXME ===> Jumping 1st TS as corrupted with current FW + "
"FLESNET combination";
fulTsCounter++;
return kTRUE;
} // if( 0 == fulTsCounter )
if (kFALSE == fEventBuilderAlgo->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in event builder algorithm class";
return kTRUE;
} // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
std::vector<CbmTofStarSubevent2019>& eventBuffer = fEventBuilderAlgo->GetEventBuffer();
for (UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent) {
/// Send the sub-event to the STAR systems
Int_t iBuffSzByte = 0;
void* pDataBuff = eventBuffer[uEvent].BuildOutput(iBuffSzByte);
if (NULL != pDataBuff) {
/// Valid output, do stuff with it!
// Bool_t fbSendEventToStar = kFALSE;
if (kFALSE == fbSandboxMode) {
/*
** Function to send sub-event block to the STAR DAQ system
* trg_word received is packed as:
*
* trg_cmd|daq_cmd|tkn_hi|tkn_mid|tkn_lo
*/
/*
/*
star_rhicf_write( eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),
pDataBuff, iBuffSzByte );
*/
SendSubevent(eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),(char *)pDataBuff, iBuffSzByte, 0);
SendSubevent(eventBuffer[uEvent].GetTrigger().GetStarTrigerWord(), (char*) pDataBuff, iBuffSzByte, 0);
} // if( kFALSE == fbSandboxMode )
} // if( kFALSE == fbSandboxMode )
LOG(debug) << "Sent STAR event with size " << iBuffSzByte << " Bytes"
<< " and token " << eventBuffer[ uEvent ].GetTrigger().GetStarToken();
} // if( NULL != pDataBuff )
else LOG(error) << "Invalid STAR SubEvent Output, can only happen if trigger "
<< " object was not set => Do Nothing more with it!!! ";
} // for( UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent )
LOG(debug) << "Sent STAR event with size " << iBuffSzByte << " Bytes"
<< " and token " << eventBuffer[uEvent].GetTrigger().GetStarToken();
} // if( NULL != pDataBuff )
else
LOG(error) << "Invalid STAR SubEvent Output, can only happen if trigger "
<< " object was not set => Do Nothing more with it!!! ";
} // for( UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent )
return kTRUE;
return kTRUE;
}
Bool_t CbmDeviceEventBuilderEtofStar2019::ReInitContainers()
{
LOG(info) << "ReInit parameter containers for CbmDeviceEventBuilderEtofStar2019";
Bool_t initOK = fEventBuilderAlgo->ReInitContainers();
Bool_t initOK = fEventBuilderAlgo->ReInitContainers();
return initOK;
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceEventBuilderEtofStar2019::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
fNumMessages++;
LOG(debug) << "Received message number "<< fNumMessages
<< " with size " << msg->GetSize();
LOG(debug) << "Received message number " << fNumMessages << " with size " << msg->GetSize();
std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
fles::StorableTimeslice component{0};
fles::StorableTimeslice component {0};
inputArchive >> component;
CheckTimeslice(component);
......@@ -308,117 +294,108 @@ bool CbmDeviceEventBuilderEtofStar2019::HandleData(FairMQMessagePtr& msg, int /*
return true;
}
static Double_t dctime=0.;
static Double_t dctime = 0.;
bool CbmDeviceEventBuilderEtofStar2019::HandleParts(FairMQParts& parts, int /*index*/)
{
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
fNumMessages++;
LOG(debug) << "Received message number "<< fNumMessages
<< " with " << parts.Size() << " parts" ;
fles::StorableTimeslice ts{0}; // rename ??? FIXME
switch(fiSelectComponents) {
case 0: {
std::string msgStr(static_cast<char*>(parts.At(0)->GetData()),(parts.At(0))->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
inputArchive >> ts;
CheckTimeslice(ts);
if( 1 == fNumMessages ) {
LOG(info) << "Initialize TS components list to " << ts.num_components();
for (size_t c {0}; c < ts.num_components(); c++) {
auto systemID = static_cast<int>(ts.descriptor(c, 0).sys_id);
LOG(info) << "Found systemID: " << std::hex << systemID << std::dec;
fEventBuilderAlgo->AddMsComponentToList( c, systemID ); // TOF data
}
}
}
break;
case 1: {
fles::StorableTimeslice component{0};
LOG(debug) << "Received message number " << fNumMessages << " with " << parts.Size() << " parts";
fles::StorableTimeslice ts {0}; // rename ??? FIXME
uint ncomp=parts.Size();
for (uint i=0; i<ncomp; i++) {
std::string msgStr(static_cast<char*>(parts.At(i)->GetData()),(parts.At(i))->GetSize());
switch (fiSelectComponents) {
case 0: {
std::string msgStr(static_cast<char*>(parts.At(0)->GetData()), (parts.At(0))->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
//fles::StorableTimeslice component{i};
inputArchive >> component;
CheckTimeslice(component);
fEventBuilderAlgo->AddMsComponentToList( i, 0x60 ); // TOF data
LOG(debug) << "HandleParts message " << fNumMessages << " with indx " << component.index();
}
}
break;
default:
;
inputArchive >> ts;
CheckTimeslice(ts);
if (1 == fNumMessages) {
LOG(info) << "Initialize TS components list to " << ts.num_components();
for (size_t c {0}; c < ts.num_components(); c++) {
auto systemID = static_cast<int>(ts.descriptor(c, 0).sys_id);
LOG(info) << "Found systemID: " << std::hex << systemID << std::dec;
fEventBuilderAlgo->AddMsComponentToList(c, systemID); // TOF data
}
}
} break;
case 1: {
fles::StorableTimeslice component {0};
uint ncomp = parts.Size();
for (uint i = 0; i < ncomp; i++) {
std::string msgStr(static_cast<char*>(parts.At(i)->GetData()), (parts.At(i))->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
//fles::StorableTimeslice component{i};
inputArchive >> component;
CheckTimeslice(component);
fEventBuilderAlgo->AddMsComponentToList(i, 0x60); // TOF data
LOG(debug) << "HandleParts message " << fNumMessages << " with indx " << component.index();
}
} break;
default:;
}
if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
{
LOG(error) << "Failed processing TS " << ts.index()
<< " in event builder algorithm class";
if (kFALSE == fEventBuilderAlgo->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in event builder algorithm class";
return kTRUE;
} // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
} // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
std::vector< CbmTofStarSubevent2019 > & eventBuffer = fEventBuilderAlgo->GetEventBuffer();
LOG(debug) <<"Process time slice "<<fNumMessages<<" with "<<eventBuffer.size()<<" events";
std::vector<CbmTofStarSubevent2019>& eventBuffer = fEventBuilderAlgo->GetEventBuffer();
LOG(debug) << "Process time slice " << fNumMessages << " with " << eventBuffer.size() << " events";
//if(fNumMessages%10000 == 0) LOG(info)<<"Processed "<<fNumMessages<<" time slices";
for( UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent )
{
/// Send the sub-event to the STAR systems
Int_t iBuffSzByte = 0;
void * pDataBuff = eventBuffer[ uEvent ].BuildOutput( iBuffSzByte );
if( NULL != pDataBuff )
{
/// Valid output, do stuff with it!
// Send to Star TriggerHandler, TBD
if( kFALSE == fbSandboxMode )
{
/*
for (UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent) {
/// Send the sub-event to the STAR systems
Int_t iBuffSzByte = 0;
void* pDataBuff = eventBuffer[uEvent].BuildOutput(iBuffSzByte);
if (NULL != pDataBuff) {
/// Valid output, do stuff with it!
// Send to Star TriggerHandler, TBD
if (kFALSE == fbSandboxMode) {
/*
** Function to send sub-event block to the STAR DAQ system
* trg_word received is packed as:
*
* trg_cmd|daq_cmd|tkn_hi|tkn_mid|tkn_lo
*/
/*
/*
star_rhicf_write( eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),
pDataBuff, iBuffSzByte );
*/
} // if( kFALSE == fbSandboxMode )
SendSubevent(eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),(char *)pDataBuff, iBuffSzByte, 0);
} // if( kFALSE == fbSandboxMode )
SendSubevent(eventBuffer[uEvent].GetTrigger().GetStarTrigerWord(), (char*) pDataBuff, iBuffSzByte, 0);
LOG(debug) << "Sent STAR event " << uEvent << " with size " << iBuffSzByte << " Bytes"
<< ", token " << eventBuffer[ uEvent ].GetTrigger().GetStarToken()
<< ", TrigWord " << eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord();
}
}
LOG(debug) << "Sent STAR event " << uEvent << " with size " << iBuffSzByte << " Bytes"
<< ", token " << eventBuffer[uEvent].GetTrigger().GetStarToken() << ", TrigWord "
<< eventBuffer[uEvent].GetTrigger().GetStarTrigerWord();
}
}
if( 0 == fulTsCounter % 10000 ) {
LOG(info) << "Processed " << fulTsCounter << " TS, CPUtime: "<< dctime/10. << " ms/TS";
dctime=0.;
}
fulTsCounter++;
if (0 == fulTsCounter % 10000) {
LOG(info) << "Processed " << fulTsCounter << " TS, CPUtime: " << dctime / 10. << " ms/TS";
dctime = 0.;
}
fulTsCounter++;
return true;
}
bool CbmDeviceEventBuilderEtofStar2019::HandleMessage(FairMQMessagePtr& msg, int /*index*/)
{
const char *cmd = (char *)(msg->GetData());
const char cmda[4]={*cmd};
LOG(info) << "Handle message " << cmd <<", " << cmd[0];
const char* cmd = (char*) (msg->GetData());
const char cmda[4] = {*cmd};
LOG(info) << "Handle message " << cmd << ", " << cmd[0];
cbm::mq::LogState(this);
// only one implemented so far "Stop"
if( strcmp(cmda,"STOP") ) {
if (strcmp(cmda, "STOP")) {
LOG(info) << "STOP";
cbm::mq::ChangeState(this, cbm::mq::Transition::Ready);
cbm::mq::LogState(this);
......@@ -435,17 +412,15 @@ bool CbmDeviceEventBuilderEtofStar2019::HandleMessage(FairMQMessagePtr& msg, int
bool CbmDeviceEventBuilderEtofStar2019::CheckTimeslice(const fles::Timeslice& ts)
{
if ( 0 == ts.num_components() ) {
if (0 == ts.num_components()) {
LOG(error) << "No Component in TS " << ts.index();
return 1;
}
auto tsIndex = ts.index();
LOG(debug) << "Found " << ts.num_components()
<< " different components in timeslice "
<< tsIndex;
LOG(debug) << "Found " << ts.num_components() << " different components in timeslice " << tsIndex;
/*
/*
for (size_t c = 0; c < ts.num_components(); ++c) {
LOG(debug) << "Found " << ts.num_microslices(c)
<< " microslices in component " << c;
......@@ -462,9 +437,9 @@ bool CbmDeviceEventBuilderEtofStar2019::CheckTimeslice(const fles::Timeslice& ts
return true;
}
bool CbmDeviceEventBuilderEtofStar2019::SendEvent( std::vector<Int_t> vdigi, int idx )
bool CbmDeviceEventBuilderEtofStar2019::SendEvent(std::vector<Int_t> vdigi, int idx)
{
LOG(debug) << "Send Data for event "<<fNumEvt<<" with size "<< vdigi.size()<< Form(" at %p ",&vdigi);
LOG(debug) << "Send Data for event " << fNumEvt << " with size " << vdigi.size() << Form(" at %p ", &vdigi);
// LOG(debug) << "EventHeader: "<< fEventHeader[0] << " " << fEventHeader[1] << " " << fEventHeader[2] << " " << fEventHeader[3];
std::stringstream oss;
......@@ -473,12 +448,13 @@ bool CbmDeviceEventBuilderEtofStar2019::SendEvent( std::vector<Int_t> vdigi, int
std::string* strMsg = new std::string(oss.str());
FairMQParts parts;
parts.AddPart(NewMessage(const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* , void* object){ delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
parts.AddPart(NewMessage(
const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
LOG(debug) << "Send data to channel "<< idx << " " << fChannelsToSend[idx][0];
LOG(debug) << "Send data to channel " << idx << " " << fChannelsToSend[idx][0];
// if (Send(msg, fChannelsToSend[idx][0]) < 0) {
......@@ -491,10 +467,10 @@ bool CbmDeviceEventBuilderEtofStar2019::SendEvent( std::vector<Int_t> vdigi, int
return true;
}
bool CbmDeviceEventBuilderEtofStar2019::SendSubevent( uint trig, char *pData, int nData, int idx )
bool CbmDeviceEventBuilderEtofStar2019::SendSubevent(uint trig, char* pData, int nData, int idx)
{
LOG(debug) << "SendSubevent "<<fNumEvt<<", TrigWord " << trig << " with size "<< nData << Form(" at %p ", pData);
LOG(debug) << "SendSubevent " << fNumEvt << ", TrigWord " << trig << " with size " << nData << Form(" at %p ", pData);
std::stringstream ossE;
boost::archive::binary_oarchive oaE(ossE);
......@@ -511,17 +487,19 @@ bool CbmDeviceEventBuilderEtofStar2019::SendSubevent( uint trig, char *pData, i
std::string* strMsg = new std::string(pData, nData);
FairMQParts parts;
parts.AddPart(NewMessage(const_cast<char*>(strMsgE->c_str()), // data
strMsgE->length(), // size
[](void* , void* object){ delete static_cast<std::string*>(object); },
strMsgE)); // object that manages the data
parts.AddPart(NewMessage(
const_cast<char*>(strMsgE->c_str()), // data
strMsgE->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsgE)); // object that manages the data
parts.AddPart(NewMessage(const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* , void* object){ delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
parts.AddPart(NewMessage(
const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
LOG(debug) << "Send data to channel "<< idx << " " << fChannelsToSend[idx][0];
LOG(debug) << "Send data to channel " << idx << " " << fChannelsToSend[idx][0];
// if (Send(msg, fChannelsToSend[idx][0]) < 0) {
......@@ -534,48 +512,47 @@ bool CbmDeviceEventBuilderEtofStar2019::SendSubevent( uint trig, char *pData, i
return true;
}
void CbmDeviceEventBuilderEtofStar2019::Reset()
{
}
void CbmDeviceEventBuilderEtofStar2019::Reset() {}
void CbmDeviceEventBuilderEtofStar2019::Finish()
{
if( NULL != fpBinDumpFile )
{
LOG(info) << "Closing binary file used for event dump.";
fpBinDumpFile->close();
} // if( NULL != fpBinDumpFile )
/// If monitor mode enabled, trigger histos creation, obtain pointer on them and add them to the HTTP server
if( kTRUE == fbMonitorMode )
{
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector< std::pair< TNamed *, std::string > > vHistos = fEventBuilderAlgo->GetHistoVector();
/// (Re-)Create ROOT file to store the histos
TDirectory * oldDir = NULL;
TFile * histoFile = NULL;
// Store current directory position to allow restore later
oldDir = gDirectory;
// open separate histo file in recreate mode
histoFile = new TFile( "data/eventBuilderMonHist.root" , "RECREATE");
histoFile->cd();
if (NULL != fpBinDumpFile) {
LOG(info) << "Closing binary file used for event dump.";
fpBinDumpFile->close();
} // if( NULL != fpBinDumpFile )
/// Register the histos in the HTTP server
for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
{
/// Make sure we end up in chosen folder
gDirectory->mkdir( vHistos[ uHisto ].second.data() );
gDirectory->cd( vHistos[ uHisto ].second.data() );
/// If monitor mode enabled, trigger histos creation, obtain pointer on them and add them to the HTTP server
if (kTRUE == fbMonitorMode) {
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector<std::pair<TNamed*, std::string>> vHistos = fEventBuilderAlgo->GetHistoVector();
/// Write plot
vHistos[ uHisto ].first->Write();
/// Save old global file and folder pointer to avoid messing with FairRoot
TFile* oldFile = gFile;
TDirectory* oldDir = gDirectory;
histoFile->cd();
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// (Re-)Create ROOT file to store the histos
TFile* histoFile = nullptr;
// open separate histo file in recreate mode
histoFile = new TFile("data/eventBuilderMonHist.root", "RECREATE");
histoFile->cd();
/// Register the histos in the HTTP server
for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
/// Make sure we end up in chosen folder
gDirectory->mkdir(vHistos[uHisto].second.data());
gDirectory->cd(vHistos[uHisto].second.data());
/// Write plot
vHistos[uHisto].first->Write();
histoFile->cd();
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Restore old global file and folder pointer to avoid messing with FairRoot
gFile = oldFile;
gDirectory = oldDir;
// Restore original directory position
oldDir->cd();
histoFile->Close();
} // if( kTRUE == fbMonitorMode )
histoFile->Close();
} // if( kTRUE == fbMonitorMode )
}
/* Copyright (C) 2019 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer] */
/**
* CbmDeviceEventBuilderEtofStar2019.h
*
......@@ -6,93 +10,82 @@
#ifndef CBMDEVICEEVENTBUILDERETOFSTAR2019_H_
#define CBMDEVICEEVENTBUILDERETOFSTAR2019_H_
#include "FairMQDevice.h"
#include "CbmMqTMessage.h"
#include "Timeslice.hpp"
#include "TMessage.h"
#include "FairMQDevice.h"
#include "TMessage.h"
#include "TStopwatch.h"
class CbmStar2019EventBuilderEtofAlgo;
class CbmStar2019TofPar;
class CbmDeviceEventBuilderEtofStar2019: public FairMQDevice
{
public:
CbmDeviceEventBuilderEtofStar2019();
virtual ~CbmDeviceEventBuilderEtofStar2019();
class CbmDeviceEventBuilderEtofStar2019 : public FairMQDevice {
public:
CbmDeviceEventBuilderEtofStar2019();
virtual ~CbmDeviceEventBuilderEtofStar2019();
virtual Bool_t DoUnpack(const fles::Timeslice& ts, size_t component);
virtual void Reset();
virtual void Finish();
virtual Bool_t DoUnpack(const fles::Timeslice& ts, size_t component);
virtual void Reset();
virtual void Finish();
void SetParContainers();
void SetParContainers();
Bool_t InitContainers();
Bool_t InitContainers();
Bool_t ReInitContainers();
Bool_t ReInitContainers();
void SetSandboxMode( Bool_t bSandboxMode = kTRUE ){ fbSandboxMode = bSandboxMode; }
void SetEventDumpEnable( Bool_t bDumpEna = kTRUE );
void SetSandboxMode(Bool_t bSandboxMode = kTRUE) { fbSandboxMode = bSandboxMode; }
void SetEventDumpEnable(Bool_t bDumpEna = kTRUE);
/// Temp until we change from CbmMcbmUnpack to something else
void AddMsComponentToList( size_t component, UShort_t usDetectorId );
void SetNbMsInTs( size_t /*uCoreMsNb*/, size_t /*uOverlapMsNb*/ ){};
/// Temp until we change from CbmMcbmUnpack to something else
void AddMsComponentToList(size_t component, UShort_t usDetectorId);
void SetNbMsInTs(size_t /*uCoreMsNb*/, size_t /*uOverlapMsNb*/) {};
CbmDeviceEventBuilderEtofStar2019(const CbmDeviceEventBuilderEtofStar2019&) = delete;
CbmDeviceEventBuilderEtofStar2019 operator=(const CbmDeviceEventBuilderEtofStar2019&) = delete;
CbmDeviceEventBuilderEtofStar2019(const CbmDeviceEventBuilderEtofStar2019&) = delete;
CbmDeviceEventBuilderEtofStar2019 operator=(const CbmDeviceEventBuilderEtofStar2019&) = delete;
protected:
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
bool HandleParts(FairMQParts&, int);
bool HandleMessage(FairMQMessagePtr&, int);
virtual bool SendEvent(std::vector<Int_t>, int);
virtual bool SendSubevent(uint,char*,int, int);
protected:
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
bool HandleParts(FairMQParts&, int);
bool HandleMessage(FairMQMessagePtr&, int);
virtual bool SendEvent(std::vector<Int_t>, int);
virtual bool SendSubevent(uint, char*, int, int);
private:
uint64_t fNumMessages;
/// Control flags
Bool_t fbMonitorMode; //! Switch ON the filling of a minimal set of histograms
Bool_t fbDebugMonitorMode; //! Switch ON the filling of a additional set of histograms
Bool_t fbSandboxMode; //! Switch OFF the emission of data toward the STAR DAQ
Bool_t fbEventDumpEna; //! Switch ON the dumping of the events to a binary file
private:
uint64_t fNumMessages;
/// Control flags
Bool_t fbMonitorMode; //! Switch ON the filling of a minimal set of histograms
Bool_t fbDebugMonitorMode; //! Switch ON the filling of a additional set of histograms
Bool_t fbSandboxMode; //! Switch OFF the emission of data toward the STAR DAQ
Bool_t fbEventDumpEna; //! Switch ON the dumping of the events to a binary file
/// Parameters management
TList* fParCList;
/// Parameters management
TList* fParCList;
/// Statistics & first TS rejection
uint64_t fulTsCounter;
uint64_t fNumEvt;
/// Statistics & first TS rejection
uint64_t fulTsCounter;
uint64_t fNumEvt;
bool CheckTimeslice(const fles::Timeslice& ts);
bool IsChannelNameAllowed(std::string channelName);
bool CheckTimeslice(const fles::Timeslice& ts);
bool IsChannelNameAllowed(std::string channelName);
std::vector<std::string> fAllowedChannels
= {"tofcomponent","parameters","etofevts","syscmd"};
std::vector<std::vector<std::string>> fChannelsToSend = { {},{},{} };
std::vector<std::string> fAllowedChannels = {"tofcomponent", "parameters", "etofevts", "syscmd"};
std::vector<std::vector<std::string>> fChannelsToSend = {{}, {}, {}};
/// Processing algo
CbmStar2019EventBuilderEtofAlgo * fEventBuilderAlgo;
TStopwatch fTimer;
/// Processing algo
CbmStar2019EventBuilderEtofAlgo* fEventBuilderAlgo;
TStopwatch fTimer;
CbmStar2019TofPar* fUnpackPar; //!
/// Event dump to binary file
std::fstream * fpBinDumpFile;
const UInt_t kuBinDumpBegWord = 0xFEEDBEAF;
const UInt_t kuBinDumpEndWord = 0xFAEBDEEF;
};
CbmStar2019TofPar* fUnpackPar; //!
// special class to expose protected TMessage constructor
class CbmMQTMessage : public TMessage
{
public:
CbmMQTMessage(void* buf, Int_t len)
: TMessage(buf, len)
{
ResetBit(kIsOwner);
}
/// Event dump to binary file
std::fstream* fpBinDumpFile;
const UInt_t kuBinDumpBegWord = 0xFEEDBEAF;
const UInt_t kuBinDumpEndWord = 0xFAEBDEEF;
};
#endif /* CBMDEVICEEVENTBUILDERETOFSTAR2019_H_ */
/* Copyright (C) 2019 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer] */
/**
* CbmDeviceTriggerHandlerEtof.cxx
*
......@@ -6,92 +10,91 @@
*/
#include "CbmDeviceTriggerHandlerEtof.h"
#include "CbmMQDefs.h"
#include "FairMQLogger.h"
#include "FairEventHeader.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairRuntimeDb.h"
#include "FairFileHeader.h"
#include "FairGeoParSet.h"
#include "FairRootManager.h"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairRootFileSink.h"
#include "FairRootManager.h"
#include "FairRunOnline.h"
#include "FairFileHeader.h"
#include "FairRuntimeDb.h"
#include <thread> // this_thread::sleep_for
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/serialization/vector.hpp>
#include <string>
#include <iomanip>
#include <thread> // this_thread::sleep_for
#include <chrono>
#include <iomanip>
#include <stdexcept>
struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; };
#include <string>
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
static std::chrono::steady_clock::time_point dctime=std::chrono::steady_clock::now();
static double dSize=0.;
static std::chrono::steady_clock::time_point dctime = std::chrono::steady_clock::now();
static double dSize = 0.;
using namespace std;
CbmDeviceTriggerHandlerEtof::CbmDeviceTriggerHandlerEtof()
: fNumMessages(0)
, fiMsgCnt(0)
, fbMonitorMode( kFALSE )
, fbDebugMonitorMode( kFALSE )
, fbSandboxMode( kFALSE )
, fbEventDumpEna( kFALSE )
, fdEvent(0.)
, fbMonitorMode(kFALSE)
, fbDebugMonitorMode(kFALSE)
, fbSandboxMode(kFALSE)
, fbEventDumpEna(kFALSE)
, fdEvent(0.)
{
}
CbmDeviceTriggerHandlerEtof::~CbmDeviceTriggerHandlerEtof()
{
}
CbmDeviceTriggerHandlerEtof::~CbmDeviceTriggerHandlerEtof() {}
void CbmDeviceTriggerHandlerEtof::InitTask()
try
{
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
int noChannel = fChannels.size();
LOG(info) << "Number of defined input channels: " << noChannel;
for(auto const &entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
if(entry.first!="syscmd") OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleData);
else OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleMessage);
}
InitWorkspace();
} catch (InitTaskError& e) {
LOG(error) << e.what();
cbm::mq::ChangeState(this,cbm::mq::Transition::ErrorFound);
try {
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
int noChannel = fChannels.size();
LOG(info) << "Number of defined input channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
if (entry.first != "syscmd") OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleData);
else
OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleMessage);
}
InitWorkspace();
}
catch (InitTaskError& e) {
LOG(error) << e.what();
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}
bool CbmDeviceTriggerHandlerEtof::IsChannelNameAllowed(std::string channelName)
{
for(auto const &entry : fAllowedChannels) {
for (auto const& entry : fAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1!=std::string::npos) {
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos-fAllowedChannels.begin();
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName
<< " found in list of allowed channel names at position " << idx;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
return true;
}
}
LOG(info) << "Channel name " << channelName
<< " not found in list of allowed channel names.";
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
......@@ -110,52 +113,49 @@ Bool_t CbmDeviceTriggerHandlerEtof::InitWorkspace()
//bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQMessagePtr& msg, int /*index*/)
bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQParts& parts, int /*index*/)
{
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
fNumMessages++;
LOG(debug) << "Received message "<< fNumMessages << " with "
<< parts.Size() << " parts" << ", size0: " << parts.At(0)->GetSize();
LOG(debug) << "Received message " << fNumMessages << " with " << parts.Size() << " parts"
<< ", size0: " << parts.At(0)->GetSize();
uint TrigWord{0};
uint TrigWord {0};
std::string msgStrE(static_cast<char*>(parts.At(0)->GetData()), (parts.At(0))->GetSize());
std::istringstream issE(msgStrE);
boost::archive::binary_iarchive inputArchiveE(issE);
inputArchiveE>>TrigWord;
inputArchiveE >> TrigWord;
char* pDataBuff = static_cast < char* >(parts.At(1)->GetData());
int iBuffSzByte = parts.At(1)->GetSize();
char* pDataBuff = static_cast<char*>(parts.At(1)->GetData());
int iBuffSzByte = parts.At(1)->GetSize();
// Send Subevent to STAR
LOG(debug) << "Send Data for event "<<fdEvent<<", TrigWord " << TrigWord << " with size "<< iBuffSzByte << Form(" at %p ", pDataBuff);
if( kFALSE == fbSandboxMode )
{
star_rhicf_write( TrigWord, pDataBuff, iBuffSzByte );
}
LOG(debug) << "Send Data for event " << fdEvent << ", TrigWord " << TrigWord << " with size " << iBuffSzByte
<< Form(" at %p ", pDataBuff);
if (kFALSE == fbSandboxMode) { star_rhicf_write(TrigWord, pDataBuff, iBuffSzByte); }
dSize += iBuffSzByte;
if( 0 == (int)fdEvent % 10000 ) {
std::chrono::duration<double> deltatime =
std::chrono::steady_clock::now() - dctime;
LOG(info) << "Processed " << fdEvent << " events, delta-time: "<< deltatime.count()
<< ", rate: " << dSize*1.E-6/deltatime.count() << "MB/s";
dctime=std::chrono::steady_clock::now();
dSize=0.;
if (0 == (int) fdEvent % 10000) {
std::chrono::duration<double> deltatime = std::chrono::steady_clock::now() - dctime;
LOG(info) << "Processed " << fdEvent << " events, delta-time: " << deltatime.count()
<< ", rate: " << dSize * 1.E-6 / deltatime.count() << "MB/s";
dctime = std::chrono::steady_clock::now();
dSize = 0.;
}
fdEvent++;
return kTRUE;
}
/************************************************************************************/
bool CbmDeviceTriggerHandlerEtof::HandleMessage(FairMQMessagePtr& msg, int /*index*/)
{
const char *cmd = (char *)(msg->GetData());
const char cmda[4]={*cmd};
LOG(info) << "Handle message " << cmd <<", " << cmd[0];
const char* cmd = (char*) (msg->GetData());
const char cmda[4] = {*cmd};
LOG(info) << "Handle message " << cmd << ", " << cmd[0];
// only one implemented so far "Stop"
if( strcmp(cmda,"STOP") ) {
if (strcmp(cmda, "STOP")) {
cbm::mq::ChangeState(this, cbm::mq::Transition::Ready);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::DeviceReady);
......@@ -164,11 +164,9 @@ bool CbmDeviceTriggerHandlerEtof::HandleMessage(FairMQMessagePtr& msg, int /*ind
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::End);
cbm::mq::LogState(this);
// ChangeState(fair::mq::Transition(STOP));
// ChangeState(fair::mq::Transition(STOP));
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
return true;
}
/* Copyright (C) 2019 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer] */
/**
* CbmDeviceTriggerHandlerStar2019.h
*
......@@ -8,84 +12,69 @@
#ifndef CBMDEVICETRIGGERHANDLERETOF_H_
#define CBMDEVICETRIGGERHANDLERETOF_H_
#include "FairMQDevice.h"
#include "CbmMqTMessage.h"
#include "CbmTofStarData2019.h"
#include "Timeslice.hpp"
#include "MicrosliceDescriptor.hpp"
#include "Timeslice.hpp"
#include "CbmTofStarData2019.h"
#include "FairMQDevice.h"
#include "TMessage.h"
#include "Rtypes.h"
#include <vector>
#include <map>
#include <vector>
class CbmMQTMessage;
// Relevant TOF classes
// Relevant TOF classes
extern "C" int star_rhicf_write(unsigned int trg_word, void *dta, int bytes);
extern "C" int star_rhicf_write(unsigned int trg_word, void* dta, int bytes);
// ROOT Classes and includes
class TString;
// C++ Classes and includes
#include <vector>
#include <map>
#include <list>
#include <map>
#include <vector>
class CbmDeviceTriggerHandlerEtof: public FairMQDevice
{
public:
CbmDeviceTriggerHandlerEtof();
virtual ~CbmDeviceTriggerHandlerEtof();
protected:
virtual void InitTask();
bool HandleData(FairMQParts&, int);
bool HandleMessage(FairMQMessagePtr&, int);
private:
class CbmDeviceTriggerHandlerEtof : public FairMQDevice {
public:
CbmDeviceTriggerHandlerEtof();
virtual ~CbmDeviceTriggerHandlerEtof();
// Variables used for histo filling
protected:
virtual void InitTask();
bool HandleData(FairMQParts&, int);
bool HandleMessage(FairMQMessagePtr&, int);
Bool_t IsChannelNameAllowed(std::string channelName);
private:
// Variables used for histo filling
Bool_t InitWorkspace();
Bool_t InitContainers();
Bool_t IsChannelNameAllowed(std::string channelName);
Bool_t ReInitContainers();
Bool_t InitWorkspace();
Bool_t InitContainers();
uint64_t fNumMessages;
std::vector<std::string> fAllowedChannels = {"tofcomponent","parameters","etofevts","tofhits","syscmd"};
Bool_t ReInitContainers();
// Input variables
uint64_t fNumMessages;
std::vector<std::string> fAllowedChannels = {"tofcomponent", "parameters", "etofevts", "tofhits", "syscmd"};
// Output variables
// Input variables
// Constants or setting parameters
Int_t fiMsgCnt;
/// Control flags
Bool_t fbMonitorMode; //! Switch ON the filling of a minimal set of histograms
Bool_t fbDebugMonitorMode; //! Switch ON the filling of a additional set of histograms
Bool_t fbSandboxMode; //! Switch OFF the emission of data toward the STAR DAQ
Bool_t fbEventDumpEna; //! Switch ON the dumping of the events to a binary file
Double_t fdEvent;
// Output variables
// histograms
// Constants or setting parameters
Int_t fiMsgCnt;
/// Control flags
Bool_t fbMonitorMode; //! Switch ON the filling of a minimal set of histograms
Bool_t fbDebugMonitorMode; //! Switch ON the filling of a additional set of histograms
Bool_t fbSandboxMode; //! Switch OFF the emission of data toward the STAR DAQ
Bool_t fbEventDumpEna; //! Switch ON the dumping of the events to a binary file
};
Double_t fdEvent;
// special class to expose protected TMessage constructor
class CbmMQTMessage : public TMessage
{
public:
CbmMQTMessage(void* buf, Int_t len)
: TMessage(buf, len)
{
ResetBit(kIsOwner);
}
// histograms
};
#endif /* CBMDEVICETRIGGERHANDLERETOF_H_ */
#include "runFairMQDevice.h"
/* Copyright (C) 2019 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer] */
#include "CbmDeviceEventBuilderEtofStar2019.h"
#include <string>
#include <iomanip>
#include <string>
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
{
;
}
void addCustomOptions(bpo::options_description& options) { ; }
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new CbmDeviceEventBuilderEtofStar2019();
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceEventBuilderEtofStar2019(); }
#include "runFairMQDevice.h"
/* Copyright (C) 2019 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer] */
#include "CbmDeviceTriggerHandlerEtof.h"
#include <string>
#include <iomanip>
#include <string>
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
{
options.add_options() ("SandboxMode", bpo::value<bool>()->default_value(1),"Test mode switch");
options.add_options()("SandboxMode", bpo::value<bool>()->default_value(1), "Test mode switch");
;
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new CbmDeviceTriggerHandlerEtof();
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceTriggerHandlerEtof(); }
#!/bin/bash
# Copyright (C) 2019 PI-UHd,GSI
# SPDX-License-Identifier: GPL-3.0-only
# First commited by Norbert Herrmann
# script to write cosmic data to file
$FAIRROOTPATH/bin/shmmonitor --cleanup
......
......@@ -6,88 +6,49 @@
# copied verbatim in the file "LICENSE" #
################################################################################
Set(INCLUDE_DIRECTORIES
set(INCLUDE_DIRECTORIES
${CMAKE_CURRENT_SOURCE_DIR}
${CBMROOT_SOURCE_DIR}/fles/flestools
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${Boost_INCLUDE_DIR}
${FAIRROOT_INCLUDE_DIR}
${FAIRMQ_INCLUDE_DIR}
${FAIRMQ_INCLUDE_DIR}/options
)
Include_Directories(${INCLUDE_DIRECTORIES})
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
Set(LINK_DIRECTORIES
${Boost_LIBRARY_DIRS}
${FAIRROOT_LIBRARY_DIR}
${ROOT_LIBRARY_DIR}
)
Link_Directories(${LINK_DIRECTORIES})
set(FAIR_LIBS
FairMQ
)
If(FAIRLOGGER_FOUND)
set(FAIR_LIBS
${FAIR_LIBS}
FairLogger
)
EndIf()
Set(BOOST_LIBS
${Boost_SYSTEM_LIBRARY}
${Boost_PROGRAM_OPTIONS_LIBRARY}
)
If(UNIX AND NOT APPLE)
List(APPEND BOOST_LIBS pthread)
EndIf()
If(FairRoot_VERSION VERSION_LESS 18.2.0)
Add_Definitions(-DHAVE_RootDeserializer)
EndIf()
# Set the install path within the build directory
set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/MQ/histogramServer")
# Set the install path within the installation directory
set(BIN_DESTINATION bin/MQ/histogramServer)
set(DEPENDENCIES_ALL
${DEPENDENCIES}
${FAIR_LIBS}
${BOOST_LIBS}
)
set(PUBLIC_DEPS
ROOT::Core
ROOT::RHTTP
)
set(PRIVATE_DEPS
CbmFlibFlesTools
FairRoot::Base
FairMQ::FairMQ
ROOT::Gpad
ROOT::Hist
ROOT::Net
ROOT::RIO
)
set(EXE_NAME HistoServer)
set(SRCS CbmHistoServer.cxx runCbmHistoServer.cxx)
set(DEPENDENCIES
${DEPENDENCIES_ALL}
${FAIR_LIBS}
fles_ipc
Core
RIO
Net
Hist
RHTTP
)
GENERATE_EXECUTABLE()
set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/MQ/histogramServer")
set(PUBLIC_DEPENDENCIES ${PUBLIC_DEPS})
set(PRIVATE_DEPENDENCIES ${PRIVATE_DEPS})
set(INTERFACE_DEPENDENCIES ${INTERFACE_DEPS})
generate_cbm_executable()
set(EXE_NAME MqHistoServer)
set(SRCS CbmMqHistoServer.cxx runCbmMqHistoServer.cxx)
set(DEPENDENCIES
${DEPENDENCIES_ALL}
${FAIR_LIBS}
CbmFlibFlesTools
Core
RIO
Net
Hist
Gpad
RHTTP
)
GENERATE_EXECUTABLE()
set(PUBLIC_DEPENDENCIES ${PUBLIC_DEPS})
set(PRIVATE_DEPENDENCIES ${PRIVATE_DEPS})
set(INTERFACE_DEPENDENCIES ${INTERFACE_DEPS})
generate_cbm_executable()
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <mutex>
/* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
#include "CbmHistoServer.h"
#include <mutex>
//#include "CbmHistoCanvasDrawer.h"
#include "FairLogger.h"
#include "RootSerializer.h"
#include <Logger.h>
#include "TObjArray.h"
#include "TH1.h"
#include "TMessage.h"
#include "THttpServer.h"
#include "TMessage.h"
#include "TObjArray.h"
#include "RootSerializer.h"
std::mutex mtx;
CbmHistoServer::CbmHistoServer()
: FairMQDevice()
, fInputChannelName("histogram-in")
, fArrayHisto()
, fNMessages(0)
, fServer("http:8088")
// , fCanvasDrawer(nullptr)
, fStopThread(false)
: FairMQDevice()
, fInputChannelName("histogram-in")
, fArrayHisto()
, fNMessages(0)
, fServer("http:8088")
// , fCanvasDrawer(nullptr)
, fStopThread(false)
{
}
......@@ -34,9 +32,9 @@ CbmHistoServer::~CbmHistoServer() {}
void CbmHistoServer::InitTask()
{
OnData(fInputChannelName, &CbmHistoServer::ReceiveData);
OnData(fInputChannelName, &CbmHistoServer::ReceiveData);
/*
/*
if (fCanvasDrawer)
{
fCanvasDrawer->CreateCanvases(fServer);
......@@ -46,86 +44,73 @@ void CbmHistoServer::InitTask()
bool CbmHistoServer::ReceiveData(FairMQMessagePtr& msg, int /*index*/)
{
TObject* tempObject = nullptr;
#ifdef HAVE_RootDeserializer
Deserialize<RootDeserializer>(*msg, tempObject);
#else
Deserialize<RootSerializer>(*msg, tempObject);
#endif
if (TString(tempObject->ClassName()).EqualTo("TObjArray"))
{
std::lock_guard<std::mutex> lk(mtx);
TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
TH1* histogram_new;
TH1* histogram_existing;
for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++)
{
TObject* obj = arrayHisto->At(i);
TH1* histogram = static_cast<TH1*>(obj);
int index1 = FindHistogram(histogram->GetName());
if (-1 == index1)
{
histogram_new = static_cast<TH1*>(histogram->Clone());
fArrayHisto.Add(histogram_new);
fServer.Register("Histograms", histogram_new);
}
else
{
histogram_existing = static_cast<TH1*>(fArrayHisto.At(index1));
histogram_existing->Add(histogram);
}
}
arrayHisto->Clear();
TObject* tempObject = nullptr;
RootSerializer().Deserialize(*msg, tempObject);
if (TString(tempObject->ClassName()).EqualTo("TObjArray")) {
std::lock_guard<std::mutex> lk(mtx);
TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
TH1* histogram_new;
TH1* histogram_existing;
for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) {
TObject* obj = arrayHisto->At(i);
TH1* histogram = static_cast<TH1*>(obj);
int index1 = FindHistogram(histogram->GetName());
if (-1 == index1) {
histogram_new = static_cast<TH1*>(histogram->Clone());
fArrayHisto.Add(histogram_new);
fServer.Register("Histograms", histogram_new);
}
else {
histogram_existing = static_cast<TH1*>(fArrayHisto.At(index1));
histogram_existing->Add(histogram);
}
}
fNMessages += 1;
arrayHisto->Clear();
}
delete tempObject;
fNMessages += 1;
return true;
delete tempObject;
return true;
}
void CbmHistoServer::PreRun()
{
fStopThread = false;
fThread = std::thread(&CbmHistoServer::UpdateHttpServer, this);
fStopThread = false;
fThread = std::thread(&CbmHistoServer::UpdateHttpServer, this);
}
void CbmHistoServer::UpdateHttpServer()
{
while (!fStopThread)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lk(mtx);
while (!fStopThread) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lk(mtx);
/*
/*
if (fCanvasDrawer)
{
fCanvasDrawer->DrawHistograms(fArrayHisto);
}
*/
fServer.ProcessRequests();
}
fServer.ProcessRequests();
}
}
void CbmHistoServer::PostRun()
{
fStopThread = true;
fThread.join();
fStopThread = true;
fThread.join();
}
int CbmHistoServer::FindHistogram(const std::string& name)
{
for (int i = 0; i < fArrayHisto.GetEntriesFast(); i++)
{
TObject* obj = fArrayHisto.At(i);
if (TString(obj->GetName()).EqualTo(name))
{
return i;
}
}
return -1;
for (int i = 0; i < fArrayHisto.GetEntriesFast(); i++) {
TObject* obj = fArrayHisto.At(i);
if (TString(obj->GetName()).EqualTo(name)) { return i; }
}
return -1;
}
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
#ifndef FAIRMQEXHISTOSERVER
#define FAIRMQEXHISTOSERVER
......@@ -12,55 +9,53 @@
#include "THttpServer.h"
#include "TObjArray.h"
#include <thread>
#include <string>
#include <memory>
#include <string>
//class FairMQExHistoCanvasDrawer;
class CbmHistoServer : public FairMQDevice
{
public:
CbmHistoServer();
class CbmHistoServer : public FairMQDevice {
public:
CbmHistoServer();
virtual ~CbmHistoServer();
virtual ~CbmHistoServer();
void UpdateHttpServer();
void UpdateHttpServer();
/*
/*
void SetCanvasDrawer(std::unique_ptr<FairMQExHistoCanvasDrawer> canvasDrawer)
{
fCanvasDrawer = std::move(canvasDrawer);
}
*/
protected:
virtual void InitTask();
bool ReceiveData(FairMQMessagePtr& msg, int index);
protected:
virtual void InitTask();
virtual void PreRun();
bool ReceiveData(FairMQMessagePtr& msg, int index);
virtual void PostRun();
virtual void PreRun();
private:
std::string fInputChannelName;
virtual void PostRun();
TObjArray fArrayHisto;
private:
std::string fInputChannelName;
int fNMessages;
TObjArray fArrayHisto;
THttpServer fServer;
int fNMessages;
// std::unique_ptr<FairMQExHistoCanvasDrawer> fCanvasDrawer;
THttpServer fServer;
std::thread fThread;
bool fStopThread;
// std::unique_ptr<FairMQExHistoCanvasDrawer> fCanvasDrawer;
int FindHistogram(const std::string& name);
std::thread fThread;
bool fStopThread;
int FindHistogram(const std::string& name);
};
#endif
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <mutex>
/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmMqHistoServer.h"
#include "CbmFlesCanvasTools.h"
#include "FairLogger.h"
#include "RootSerializer.h"
#include "BoostSerializer.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairMQProgOptions.h" // device->fConfig
#include <Logger.h>
#include "TObjArray.h"
#include "TCanvas.h"
#include "TH2.h"
#include "TProfile.h"
#include "TEnv.h"
#include "TFile.h"
#include "TH1.h"
#include "TMessage.h"
#include "TH2.h"
#include "THttpServer.h"
#include "TFile.h"
#include "TMessage.h"
#include "TObjArray.h"
#include "TProfile.h"
#include "TRootSniffer.h"
#include "TSystem.h"
#include "BoostSerializer.h"
#include <boost/serialization/utility.hpp>
std::mutex mtx;
#include <mutex>
Bool_t bMqHistoServerResetHistos = kFALSE;
Bool_t bMqHistoServerSaveHistos = kFALSE;
#include "RootSerializer.h"
std::mutex mtx;
/*
Bool_t bMqHistoServerResetHistos = kFALSE;
Bool_t bMqHistoServerSaveHistos = kFALSE;
*/
CbmMqHistoServer::CbmMqHistoServer()
: FairMQDevice()
, fsChannelNameHistosInput("histogram-in")
, fsChannelNameHistosConfig("histo-conf")
, fsChannelNameCanvasConfig("canvas-conf")
, fsHistoFileName( "HistosMonitorPulser.root" )
, fuHttpServerPort( 8098 )
, fArrayHisto()
, fvpsHistosFolder()
, fvpsCanvasConfig()
, fvHistos()
, fvCanvas()
, fNMessages(0)
, fServer( nullptr )
, fStopThread(false)
: FairMQDevice()
, fArrayHisto()
{
}
......@@ -55,418 +43,460 @@ CbmMqHistoServer::~CbmMqHistoServer() {}
void CbmMqHistoServer::InitTask()
{
/// Read options from executable
LOG(info) << "Init options for CbmMqHistoServer.";
fsChannelNameHistosInput = fConfig->GetValue< std::string >( "ChNameIn" );
fsChannelNameHistosConfig = fConfig->GetValue< std::string >( "ChNameHistCfg" );
fsChannelNameCanvasConfig = fConfig->GetValue< std::string >( "ChNameCanvCfg" );
fsHistoFileName = fConfig->GetValue< std::string >( "HistoFileName" );
fuHttpServerPort = fConfig->GetValue< uint32_t >( "histport" );
/// Link channels to methods in order to process received messages
OnData(fsChannelNameHistosInput, &CbmMqHistoServer::ReceiveData);
OnData(fsChannelNameHistosConfig, &CbmMqHistoServer::ReceiveHistoConfig);
OnData(fsChannelNameCanvasConfig, &CbmMqHistoServer::ReceiveCanvasConfig);
fServer = new THttpServer( Form( "http:%u", fuHttpServerPort ) );
/// To avoid the server sucking all Histos from gROOT when no output file is used
fServer->GetSniffer()->SetScanGlobalDir(kFALSE);
fServer->RegisterCommand("/Reset_Hist", "bMqHistoServerResetHistos=kTRUE");
fServer->RegisterCommand("/Save_Hist", "bMqHistoServerSaveHistos=kTRUE");
fServer->Restrict("/Reset_Moni_Hist", "allow=admin");
fServer->Restrict("/Save_Pulser_Hist", "allow=admin");
/// Read options from executable
LOG(info) << "Init options for CbmMqHistoServer.";
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
fsHistoFileName = fConfig->GetValue<std::string>("HistoFileName");
fuHttpServerPort = fConfig->GetValue<uint32_t>("histport");
/// Link channels to methods in order to process received messages
OnData(fsChannelNameHistosInput, &CbmMqHistoServer::ReceiveData);
OnData(fsChannelNameHistosConfig, &CbmMqHistoServer::ReceiveHistoConfig);
OnData(fsChannelNameCanvasConfig, &CbmMqHistoServer::ReceiveCanvasConfig);
/// If multi-parts, go to method processing combined Config+Data
OnData(fsChannelNameHistosInput, &CbmMqHistoServer::ReceiveConfigAndData);
fServer = new THttpServer(Form("http:%u", fuHttpServerPort));
/// To avoid the server sucking all Histos from gROOT when no output file is used
fServer->GetSniffer()->SetScanGlobalDir(kFALSE);
const char* jsrootsys = gSystem->Getenv("JSROOTSYS");
if (!jsrootsys) jsrootsys = gEnv->GetValue("HttpServ.JSRootPath", jsrootsys);
LOG(info) << "JSROOT location: " << jsrootsys;
//fServer->RegisterCommand("/Reset_Hist", "bMqHistoServerResetHistos=kTRUE");
//fServer->RegisterCommand("/Save_Hist", "bMqHistoServerSaveHistos=kTRUE");
//fServer->Restrict("/Reset_Hist", "allow=admin");
//fServer->Restrict("/Save_Hist", "allow=admin");
}
bool CbmMqHistoServer::ReceiveData(FairMQMessagePtr& msg, int /*index*/)
{
TObject* tempObject = nullptr;
Deserialize<RootSerializer>(*msg, tempObject);
if (TString(tempObject->ClassName()).EqualTo("TObjArray"))
{
std::lock_guard<std::mutex> lk(mtx);
TObjArray* arrayHisto = static_cast< TObjArray * >( tempObject );
for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++)
{
TObject* pObj = arrayHisto->At(i);
if( nullptr != dynamic_cast< TProfile *>( pObj ) )
{
if( !ReadHistogram< TProfile >( dynamic_cast< TProfile *>( pObj ) ) )
return false;
} // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
else if( nullptr != dynamic_cast< TH2 *>( pObj ) )
{
if( !ReadHistogram< TH2 >( dynamic_cast< TH2 *>( pObj ) ) )
return false;
} // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
else if( nullptr != dynamic_cast< TH1 *>( pObj ) )
{
if( !ReadHistogram< TH1 >( dynamic_cast< TH1 *>( pObj ) ) )
return false;
} // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
else LOG( warning ) << "Unsupported object type for " << pObj->GetName();
} // for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++)
/// Need to use Delete instead of Clear to avoid memory leak!!!
arrayHisto->Delete();
/// If new histos received, try to prepare as many canvases as possible
/// Should be expensive on start and cheap afterward
if( !fbAllCanvasReady )
{
for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
{
/// Jump canvases already ready
if( fvbCanvasReady[ uCanv ] )
continue;
/// Now come the expensive part as we unpack its config and check each histo
fvbCanvasReady[ uCanv ] = PrepareCanvas( uCanv );
} // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
} // if( !fbAllCanvasReady )
} // if (TString(tempObject->ClassName()).EqualTo("TObjArray"))
else LOG(fatal) << "CbmMqHistoServer::ReceiveData => Wrong object type at input: "
<< tempObject->ClassName();
fNMessages += 1;
if( nullptr != tempObject )
delete tempObject;
/// TODO: control flags communication with histo server
/// Idea: 1 req channel (per device or not mixup?), polling every N TS and/or M s
if( bMqHistoServerResetHistos )
{
std::lock_guard<std::mutex> lk(mtx);
// LOG(info) << "Reset Monitor histos ";
ResetHistograms();
bMqHistoServerResetHistos = kFALSE;
} // if( bMqHistoServerResetHistos )
if( bMqHistoServerSaveHistos )
{
std::lock_guard<std::mutex> lk(mtx);
// LOG(info) << "Save All histos & canvases";
SaveHistograms();
bMqHistoServerSaveHistos = kFALSE;
} // if( bMqHistoServerSaveHistos )
return true;
LOG(debug) << "CbmMqHistoServer::ReceiveData => Processing histograms update";
TObject* tempObject = nullptr;
// Deserialize<RootSerializer>(*msg, tempObject);
RootSerializer().Deserialize(*msg, tempObject);
if (TString(tempObject->ClassName()).EqualTo("TObjArray")) {
std::lock_guard<std::mutex> lk(mtx);
TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) {
TObject* pObj = arrayHisto->At(i);
if (nullptr != dynamic_cast<TProfile*>(pObj)) {
if (!ReadHistogram<TProfile>(dynamic_cast<TProfile*>(pObj))) return false;
} // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
else if (nullptr != dynamic_cast<TH2*>(pObj)) {
if (!ReadHistogram<TH2>(dynamic_cast<TH2*>(pObj))) return false;
} // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
else if (nullptr != dynamic_cast<TH1*>(pObj)) {
if (!ReadHistogram<TH1>(dynamic_cast<TH1*>(pObj))) return false;
} // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
else
LOG(warning) << "Unsupported object type for " << pObj->GetName();
} // for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++)
LOG(debug) << "CbmMqHistoServer::ReceiveData => Deleting array";
/// Need to use Delete instead of Clear to avoid memory leak!!!
arrayHisto->Delete();
/// If new histos received, try to prepare as many canvases as possible
/// Should be expensive on start and cheap afterward
if (!fbAllCanvasReady) {
LOG(debug) << "CbmMqHistoServer::ReceiveData => Checking for canvases updates";
for (uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
/// Jump canvases already ready
if (fvbCanvasReady[uCanv]) continue;
/// Now come the expensive part as we unpack its config and check each histo
fvbCanvasReady[uCanv] = PrepareCanvas(uCanv);
} // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
} // if( !fbAllCanvasReady )
} // if (TString(tempObject->ClassName()).EqualTo("TObjArray"))
else
LOG(fatal) << "CbmMqHistoServer::ReceiveData => Wrong object type at input: " << tempObject->ClassName();
fNMessages += 1;
if (nullptr != tempObject) delete tempObject;
/*
/// TODO: control flags communication with histo server
/// Idea: 1 req channel (per device or not mixup?), polling every N TS and/or M s
if (bMqHistoServerResetHistos) {
std::lock_guard<std::mutex> lk(mtx);
// LOG(info) << "Reset Monitor histos ";
ResetHistograms();
bMqHistoServerResetHistos = kFALSE;
} // if( bMqHistoServerResetHistos )
if (bMqHistoServerSaveHistos) {
std::lock_guard<std::mutex> lk(mtx);
// LOG(info) << "Save All histos & canvases";
SaveHistograms();
bMqHistoServerSaveHistos = kFALSE;
} // if( bMqHistoServerSaveHistos )
*/
LOG(debug) << "CbmMqHistoServer::ReceiveData => Finished processing histograms update";
return true;
}
bool CbmMqHistoServer::ReceiveHistoConfig(FairMQMessagePtr& msg, int /*index*/)
{
std::pair< std::string, std::string > tempObject;
Deserialize< BoostSerializer< std::pair< std::string, std::string > > >(*msg, tempObject);
LOG( info ) << " Received configuration for histo " << tempObject.first
<< " : " << tempObject.second;
/// Check if histo name already received in previous messages
/// Linear search should be ok as config is shared only at startup
UInt_t uPrevHist = 0;
for( uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist )
{
if( fvpsHistosFolder[ uPrevHist ].first == tempObject.first )
break;
} // for( UInt_t uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist )
if( uPrevHist < fvpsHistosFolder.size() )
{
LOG( info ) << " Ignored new configuration for histo " << tempObject.first
<< " due to previously received one: " << tempObject.second;
/// Not sure if we should return false here...
} // if( uPrevHist < fvpsHistosFolder.size() )
else
{
fvpsHistosFolder.push_back( tempObject );
fvHistos.push_back( std::pair< TNamed *, std::string >( nullptr, "" ) );
fvbHistoRegistered.push_back( false );
fbAllHistosRegistered = false;
} // else of if( uPrevHist < fvpsHistosFolder.size() )
return true;
std::pair<std::string, std::string> tempObject;
// Deserialize<BoostSerializer<std::pair<std::string, std::string>>>(*msg, tempObject);
BoostSerializer<std::pair<std::string, std::string>>().Deserialize(*msg, tempObject);
LOG(info) << " Received configuration for histo " << tempObject.first << " : " << tempObject.second;
/// Check if histo name already received in previous messages
/// Linear search should be ok as config is shared only at startup
UInt_t uPrevHist = 0;
for (uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist) {
if (fvpsHistosFolder[uPrevHist].first == tempObject.first) break;
} // for( UInt_t uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist )
if (uPrevHist < fvpsHistosFolder.size()) {
LOG(info) << " Ignored new configuration for histo " << tempObject.first
<< " due to previously received one: " << tempObject.second;
/// Not sure if we should return false here...
} // if( uPrevHist < fvpsHistosFolder.size() )
else {
fvpsHistosFolder.push_back(tempObject);
fvHistos.push_back(std::pair<TNamed*, std::string>(nullptr, ""));
fvbHistoRegistered.push_back(false);
fbAllHistosRegistered = false;
} // else of if( uPrevHist < fvpsHistosFolder.size() )
return true;
}
bool CbmMqHistoServer::ReceiveCanvasConfig(FairMQMessagePtr& msg, int /*index*/)
{
std::pair< std::string, std::string > tempObject;
Deserialize< BoostSerializer< std::pair< std::string, std::string > > >(*msg, tempObject);
LOG( info ) << " Received configuration for canvas " << tempObject.first
<< " : " << tempObject.second;
/// Check if canvas name already received in previous messages
/// Linear search should be ok as config is shared only at startup
uint32_t uPrevCanv = 0;
for( uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv )
{
if( fvpsCanvasConfig[ uPrevCanv ].first == tempObject.first )
break;
} // for( UInt_t uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv )
if( uPrevCanv < fvpsCanvasConfig.size() )
{
LOG( warning ) << " Ignored new configuration for histo " << tempObject.first
<< " due to previously received one: " << tempObject.second;
/// Not sure if we should return false here...
} // if( uPrevCanv < fvpsCanvasConfig.size() )
else
{
fvpsCanvasConfig.push_back( tempObject );
fvbCanvasReady.push_back( false );
fbAllCanvasReady = false;
fvCanvas.push_back( std::pair< TCanvas *, std::string >( nullptr, "" ) );
fvbCanvasRegistered.push_back( false );
fbAllCanvasRegistered = false;
} // else of if( uPrevCanv < fvpsCanvasConfig.size() )
return true;
std::pair<std::string, std::string> tempObject;
// Deserialize<BoostSerializer<std::pair<std::string, std::string>>>(*msg, tempObject);
BoostSerializer<std::pair<std::string, std::string>>().Deserialize(*msg, tempObject);
LOG(info) << " Received configuration for canvas " << tempObject.first << " : " << tempObject.second;
/// Check if canvas name already received in previous messages
/// Linear search should be ok as config is shared only at startup
uint32_t uPrevCanv = 0;
for (uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv) {
if (fvpsCanvasConfig[uPrevCanv].first == tempObject.first) break;
} // for( UInt_t uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv )
if (uPrevCanv < fvpsCanvasConfig.size()) {
LOG(warning) << " Ignored new configuration for Canvas " << tempObject.first
<< " due to previously received one: " << tempObject.second;
/// Not sure if we should return false here...
} // if( uPrevCanv < fvpsCanvasConfig.size() )
else {
fvpsCanvasConfig.push_back(tempObject);
fvbCanvasReady.push_back(false);
fbAllCanvasReady = false;
fvCanvas.push_back(std::pair<TCanvas*, std::string>(nullptr, ""));
fvbCanvasRegistered.push_back(false);
fbAllCanvasRegistered = false;
} // else of if( uPrevCanv < fvpsCanvasConfig.size() )
return true;
}
bool CbmMqHistoServer::ReceiveConfigAndData(FairMQParts& parts, int /*index*/)
{
/// Reject anything but a at least Header + Histo Config + Canvas Config + Histo Data
if (parts.Size() < 4) {
if (1 == parts.Size()) {
/// PAL, 09/04/2021, Debug message catching missed method overload/polymorphism:
/// contrary to my expectation, if 2 method bound to same channel, one with FairMQMessagePtr and one with
/// FairMQParts, all messages go to multipart version and FairMQMessagePtr is converted to size 1 FairMQParts
LOG(debug) << "CbmMqHistoServer::ReceiveConfigAndData => only 1 parts found in input, "
<< "assuming data only message routed to wrong method!";
return ReceiveData(parts.At(0), 0);
} // if( 1 == parts.Size() )
LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => Wrong number of parts: " << parts.Size()
<< " instead of at least 4 (Header + Histo Config + Canvas config + Data)!";
} // if( parts.Size() < 4 )
LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Received composed message with " << parts.Size() << " parts";
/// Header contains a pair of
std::pair<uint32_t, uint32_t> pairHeader;
// Deserialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*parts.At(0), pairHeader);
BoostSerializer<std::pair<uint32_t, uint32_t>>().Deserialize(*parts.At(0), pairHeader);
LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Received configuration for " << pairHeader.first
<< " histos and " << pairHeader.second << " canvases";
uint32_t uOffsetHistoConfig = pairHeader.first;
if (0 == pairHeader.first) {
uOffsetHistoConfig = 1;
if (0 < (parts.At(uOffsetHistoConfig))->GetSize()) {
LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => No histo config expected but corresponding message is"
<< " not empty: " << (parts.At(uOffsetHistoConfig))->GetSize();
}
}
uint32_t uOffsetCanvasConfig = pairHeader.second;
if (0 == pairHeader.second) {
uOffsetCanvasConfig = 1;
if (0 < (parts.At(uOffsetHistoConfig + uOffsetCanvasConfig))->GetSize()) {
LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => No Canvas config expected but corresponding message is"
<< " not empty: " << (parts.At(uOffsetHistoConfig + uOffsetCanvasConfig))->GetSize();
}
}
if (static_cast<size_t>(parts.Size()) != 1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1) {
LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => Number of parts not matching header: " << parts.Size()
<< " instead of " << 1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1;
} // if( parts.Size() != 1 + pairHeader.first + pairHeader.second )
/// Decode parts for histograms configuration
for (uint32_t uHisto = 0; uHisto < pairHeader.first; ++uHisto) {
ReceiveHistoConfig(parts.At(1 + uHisto), 0);
} // for (UInt_t uHisto = 0; uHisto < pairHeader.first; ++uHisto)
/// Decode parts for histograms configuration
for (uint32_t uCanv = 0; uCanv < pairHeader.second; ++uCanv) {
ReceiveCanvasConfig(parts.At(1 + uOffsetHistoConfig + uCanv), 0);
} // for (UInt_t uCanv = 0; uCanv < pairHeader.second; ++uCanv)
/// Decode the histograms data now that the configuration is loaded
ReceiveData(parts.At(1 + uOffsetHistoConfig + uOffsetCanvasConfig), 0);
LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Finished processing composed message with " << parts.Size()
<< " parts";
return true;
}
void CbmMqHistoServer::PreRun()
{
fStopThread = false;
fThread = std::thread(&CbmMqHistoServer::UpdateHttpServer, this);
fStopThread = false;
fThread = std::thread(&CbmMqHistoServer::UpdateHttpServer, this);
}
void CbmMqHistoServer::UpdateHttpServer()
{
while (!fStopThread)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lk(mtx);
while (!fStopThread) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lk(mtx);
fServer->ProcessRequests();
}
fServer->ProcessRequests();
}
}
void CbmMqHistoServer::PostRun()
{
fStopThread = true;
fThread.join();
SaveHistograms();
fStopThread = true;
fThread.join();
SaveHistograms();
}
template< class HistoT > bool CbmMqHistoServer::ReadHistogram( HistoT * pHist )
template<class HistoT>
bool CbmMqHistoServer::ReadHistogram(HistoT* pHist)
{
int index1 = FindHistogram( pHist->GetName() );
if (-1 == index1)
{
HistoT * histogram_new = static_cast< HistoT * >( pHist->Clone() );
fArrayHisto.Add( histogram_new );
LOG(info) << "Received new histo " << pHist->GetName();
/// If new histo received, try to register it if configuration available
if( !fbAllHistosRegistered )
{
for( uint32_t uHist = 0; uHist < fvpsHistosFolder.size(); ++uHist )
{
/// Jump histos already ready
if( fvbHistoRegistered[ uHist ] )
continue;
/// Check if name matches one in config for others
if( fvpsHistosFolder[ uHist ].first == histogram_new->GetName() )
{
fvHistos[ uHist ] = std::pair< TNamed *, std::string >( histogram_new, fvpsHistosFolder[ uHist ].second );
fServer->Register( Form( "/%s", fvHistos[ uHist ].second.data() ), fvHistos[ uHist ].first );
fvbHistoRegistered[ uHist ] = true;
LOG(info) << "registered histo " << fvHistos[ uHist ].first->GetName()
<< " in folder " << fvHistos[ uHist ].second;
/// Update flag telling whether all known histos are registered
fbAllHistosRegistered = true;
for( uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx )
{
if( !fvbHistoRegistered[ uIdx ] )
{
fbAllHistosRegistered = false;
break;
} // if( !fvbHistoRegistered[ uIdx ] )
} // for( uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx )
break;
} // if( fvpsHistosFolder[ uHist ].first == histogram_new->GetName() )
} // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
} // if( !fbAllCanvasReady )
} // if (-1 == index1)
else
{
HistoT * histogram_existing = dynamic_cast< HistoT *>( fArrayHisto.At( index1 ) );
if( nullptr == histogram_existing )
{
LOG( error ) << "CbmMqHistoServer::ReadHistogram => "
<< "Incompatible type found during update for histo "
<< pHist->GetName();
return false;
} // if( nullptr == histogram_existing )
histogram_existing->Add( pHist );
} // else of if (-1 == index1)
return true;
int index1 = FindHistogram(pHist->GetName());
if (-1 == index1) {
HistoT* histogram_new = static_cast<HistoT*>(pHist->Clone());
fArrayHisto.Add(histogram_new);
LOG(info) << "Received new histo " << pHist->GetName();
/// If new histo received, try to register it if configuration available
if (!fbAllHistosRegistered) {
for (uint32_t uHist = 0; uHist < fvpsHistosFolder.size(); ++uHist) {
/// Jump histos already ready
if (fvbHistoRegistered[uHist]) continue;
/// Check if name matches one in config for others
if (fvpsHistosFolder[uHist].first == histogram_new->GetName()) {
fvHistos[uHist] = std::pair<TNamed*, std::string>(histogram_new, fvpsHistosFolder[uHist].second);
fServer->Register(Form("/%s", fvHistos[uHist].second.data()), fvHistos[uHist].first);
fvbHistoRegistered[uHist] = true;
LOG(info) << "registered histo " << fvHistos[uHist].first->GetName() << " in folder "
<< fvHistos[uHist].second;
/// Update flag telling whether all known histos are registered
fbAllHistosRegistered = true;
for (uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx) {
if (!fvbHistoRegistered[uIdx]) {
fbAllHistosRegistered = false;
break;
} // if( !fvbHistoRegistered[ uIdx ] )
} // for( uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx )
break;
} // if( fvpsHistosFolder[ uHist ].first == histogram_new->GetName() )
} // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
} // if( !fbAllCanvasReady )
} // if (-1 == index1)
else {
HistoT* histogram_existing = dynamic_cast<HistoT*>(fArrayHisto.At(index1));
if (nullptr == histogram_existing) {
LOG(error) << "CbmMqHistoServer::ReadHistogram => "
<< "Incompatible type found during update for histo " << pHist->GetName();
return false;
} // if( nullptr == histogram_existing )
histogram_existing->Add(pHist);
} // else of if (-1 == index1)
return true;
}
int CbmMqHistoServer::FindHistogram( const std::string& name )
int CbmMqHistoServer::FindHistogram(const std::string& name)
{
for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
{
TObject* obj = fArrayHisto.At( iHist );
if( TString( obj->GetName() ).EqualTo( name ) )
{
return iHist;
} // if( TString( obj->GetName() ).EqualTo( name ) )
} // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
return -1;
for (int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist) {
TObject* obj = fArrayHisto.At(iHist);
if (TString(obj->GetName()).EqualTo(name)) { return iHist; } // if( TString( obj->GetName() ).EqualTo( name ) )
} // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
return -1;
}
bool CbmMqHistoServer::ResetHistograms()
{
for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
{
dynamic_cast< TH1 * >(fArrayHisto.At( iHist ))->Reset();
} // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
return true;
for (int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist) {
dynamic_cast<TH1*>(fArrayHisto.At(iHist))->Reset();
} // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
return true;
}
bool CbmMqHistoServer::PrepareCanvas( uint32_t uCanvIdx )
bool CbmMqHistoServer::PrepareCanvas(uint32_t uCanvIdx)
{
CanvasConfig conf( ExtractCanvasConfigFromString( fvpsCanvasConfig[ uCanvIdx ].second ) );
/// First check if all objects to be drawn are present
uint32_t uNbPads( conf.GetNbPads() );
for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
{
uint32_t uNbObj( conf.GetNbObjsInPad( uPadIdx ) );
for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
{
std::string sName( conf.GetObjName( uPadIdx, uObjIdx ) );
/// Check for empty pads!
if( "nullptr" != sName )
{
if( FindHistogram( sName ) < 0 )
{
return false;
} // if( FindHistogram( conf.GetObjName( uPadIdx, uObjIdx ) ) < 0 )
} // if( "nullptr" != sName )
} // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
} // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
/// Create new canvas and pads
TCanvas * pNewCanv = new TCanvas( conf.GetName().data(), conf.GetTitle().data() );
pNewCanv->Divide( conf.GetNbPadsX(), conf.GetNbPadsY() );
/// Loop on pads
for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
{
pNewCanv->cd( 1 + uPadIdx );
/// Pad settings
gPad->SetGrid( conf.GetGridx( uPadIdx ), conf.GetGridy( uPadIdx ) );
gPad->SetLogx( conf.GetLogx( uPadIdx ) );
gPad->SetLogy( conf.GetLogy( uPadIdx ) );
gPad->SetLogz( conf.GetLogz( uPadIdx ) );
/// Add objects (we know they are there
uint32_t uNbObj( conf.GetNbObjsInPad( uPadIdx ) );
for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
{
std::string sName( conf.GetObjName( uPadIdx, uObjIdx ) );
if( "nullptr" != sName )
{
TObject * pObj = fArrayHisto[ FindHistogram( sName ) ];
if( nullptr != dynamic_cast< TProfile *>( pObj ) )
{
dynamic_cast< TProfile *>( pObj )->Draw( conf.GetOption( uPadIdx, uObjIdx ).data() );
} // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
else if( nullptr != dynamic_cast< TH2 *>( pObj ) )
{
dynamic_cast< TH2 *>( pObj )->Draw( conf.GetOption( uPadIdx, uObjIdx ).data() );
} // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
else if( nullptr != dynamic_cast< TH1 *>( pObj ) )
{
dynamic_cast< TH1 *>( pObj )->Draw( conf.GetOption( uPadIdx, uObjIdx ).data() );
} // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
else LOG( warning ) << "Unsupported object type for " << sName
<< " when preparing canvas " << conf.GetName();
} // if( "nullptr" != sName )
} // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
} // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
fvCanvas[ uCanvIdx ] = std::pair< TCanvas *, std::string >( pNewCanv, "canvases" );
fServer->Register( Form( "/%s", fvCanvas[ uCanvIdx ].second.data() ), fvCanvas[ uCanvIdx ].first );
fvbCanvasRegistered[ uCanvIdx ] = true;
LOG(info) << "registered canvas " << fvCanvas[ uCanvIdx ].first->GetName()
<< " in folder " << fvCanvas[ uCanvIdx ].second;
/// Update flag telling whether all known canvases are registered
fbAllCanvasRegistered = true;
for( uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx )
{
if( !fvbCanvasRegistered[ uIdx ] )
{
fbAllCanvasRegistered = false;
break;
} // if( !fvbCanvasRegistered[ uIdx ] )
} // for( uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx )
return true;
LOG(debug) << " Extracting configuration for canvas index " << uCanvIdx;
CanvasConfig conf(ExtractCanvasConfigFromString(fvpsCanvasConfig[uCanvIdx].second));
/// First check if all objects to be drawn are present
uint32_t uNbPads(conf.GetNbPads());
for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
uint32_t uNbObj(conf.GetNbObjsInPad(uPadIdx));
for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
std::string sName(conf.GetObjName(uPadIdx, uObjIdx));
/// Check for empty pads!
if ("nullptr" != sName) {
if (FindHistogram(sName) < 0) {
return false;
} // if( FindHistogram( conf.GetObjName( uPadIdx, uObjIdx ) ) < 0 )
} // if( "nullptr" != sName )
} // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
} // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
LOG(info) << " All histos found for canvas " << conf.GetName().data() << ", now preparing it";
/// Create new canvas and pads
TCanvas* pNewCanv = new TCanvas(conf.GetName().data(), conf.GetTitle().data());
pNewCanv->Divide(conf.GetNbPadsX(), conf.GetNbPadsY());
/// Loop on pads
for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
pNewCanv->cd(1 + uPadIdx);
/// Pad settings
gPad->SetGrid(conf.GetGridx(uPadIdx), conf.GetGridy(uPadIdx));
gPad->SetLogx(conf.GetLogx(uPadIdx));
gPad->SetLogy(conf.GetLogy(uPadIdx));
gPad->SetLogz(conf.GetLogz(uPadIdx));
/// Add objects (we know they are there
uint32_t uNbObj(conf.GetNbObjsInPad(uPadIdx));
for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
std::string sName(conf.GetObjName(uPadIdx, uObjIdx));
if ("nullptr" != sName) {
TObject* pObj = fArrayHisto[FindHistogram(sName)];
if (nullptr != dynamic_cast<TProfile*>(pObj)) {
dynamic_cast<TProfile*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
} // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
else if (nullptr != dynamic_cast<TH2*>(pObj)) {
dynamic_cast<TH2*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
} // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
else if (nullptr != dynamic_cast<TH1*>(pObj)) {
dynamic_cast<TH1*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
} // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
else
LOG(warning) << " Unsupported object type for " << sName << " when preparing canvas " << conf.GetName();
LOG(info) << " Configured histo " << sName << " on pad " << (1 + uPadIdx) << " for canvas "
<< conf.GetName().data();
} // if( "nullptr" != sName )
} // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
} // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
fvCanvas[uCanvIdx] = std::pair<TCanvas*, std::string>(pNewCanv, "canvases");
fServer->Register(Form("/%s", fvCanvas[uCanvIdx].second.data()), fvCanvas[uCanvIdx].first);
fvbCanvasRegistered[uCanvIdx] = true;
LOG(info) << " Registered canvas " << fvCanvas[uCanvIdx].first->GetName() << " in folder "
<< fvCanvas[uCanvIdx].second;
/// Update flag telling whether all known canvases are registered
fbAllCanvasRegistered = true;
for (uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx) {
if (!fvbCanvasRegistered[uIdx]) {
fbAllCanvasRegistered = false;
break;
} // if( !fvbCanvasRegistered[ uIdx ] )
} // for( uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx )
return true;
}
bool CbmMqHistoServer::SaveHistograms()
{
/// (Re-)Create ROOT file to store the histos
TDirectory * oldDir = NULL;
TFile * histoFile = NULL;
// Store current directory position to allow restore later
oldDir = gDirectory;
// open separate histo file in recreate mode
histoFile = new TFile( fsHistoFileName.data(), "RECREATE");
if( nullptr == histoFile )
return false;
/// Save old global file and folder pointer to avoid messing with FairRoot
TFile* oldFile = gFile;
TDirectory* oldDir = gDirectory;
/// (Re-)Create ROOT file to store the histos
TFile* histoFile = nullptr;
// open separate histo file in recreate mode
histoFile = new TFile(fsHistoFileName.data(), "RECREATE");
LOG(info) << "Save Histos in file " << fsHistoFileName.data();
if (nullptr == histoFile) return false;
/// Register the histos in the HTTP server
for (UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto) {
/// Make sure we end up in chosen folder
TString sFolder = fvHistos[uHisto].second.data();
if (nullptr == gDirectory->Get(sFolder)) gDirectory->mkdir(sFolder);
gDirectory->cd(sFolder);
/// Register the histos in the HTTP server
for( UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto )
{
/// Make sure we end up in chosen folder
TString sFolder = fvHistos[ uHisto ].second.data();
if( nullptr == gDirectory->Get( sFolder ) )
gDirectory->mkdir( sFolder );
gDirectory->cd( sFolder );
/// Write plot
fvHistos[uHisto].first->Write();
/// Write plot
fvHistos[ uHisto ].first->Write();
histoFile->cd();
} // for( UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto )
histoFile->cd();
} // for( UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto )
for (UInt_t uCanvas = 0; uCanvas < fvCanvas.size(); ++uCanvas) {
/// Make sure we end up in chosen folder
TString sFolder = fvCanvas[uCanvas].second.data();
if (nullptr == gDirectory->Get(sFolder)) gDirectory->mkdir(sFolder);
gDirectory->cd(sFolder);
for( UInt_t uCanvas = 0; uCanvas < fvCanvas.size(); ++uCanvas )
{
/// Make sure we end up in chosen folder
TString sFolder = fvCanvas[ uCanvas ].second.data();
if( nullptr == gDirectory->Get( sFolder ) )
gDirectory->mkdir( sFolder );
gDirectory->cd( sFolder );
/// Write plot
fvCanvas[uCanvas].first->Write();
/// Write plot
fvCanvas[ uCanvas ].first->Write();
histoFile->cd();
} // for( UInt_t uHisto = 0; uHisto < fvCanvas.size(); ++uHisto )
histoFile->cd();
} // for( UInt_t uHisto = 0; uHisto < fvCanvas.size(); ++uHisto )
/// Restore old global file and folder pointer to avoid messing with FairRoot
gFile = oldFile;
gDirectory = oldDir;
// Restore original directory position
oldDir->cd();
histoFile->Close();
histoFile->Close();
return true;
return true;
}
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#ifndef CBMMQHISTOSERVER_H
#define CBMMQHISTOSERVER_H
......@@ -12,77 +9,78 @@
#include "THttpServer.h"
#include "TObjArray.h"
#include <thread>
#include <string>
#include <memory>
#include <string>
class TNamed;
class TCanvas;
class CbmMqHistoServer : public FairMQDevice
{
public:
CbmMqHistoServer();
class CbmMqHistoServer : public FairMQDevice {
public:
CbmMqHistoServer();
virtual ~CbmMqHistoServer();
virtual ~CbmMqHistoServer();
void UpdateHttpServer();
void UpdateHttpServer();
protected:
virtual void InitTask();
protected:
virtual void InitTask();
bool ReceiveData(FairMQMessagePtr& msg, int index);
bool ReceiveData(FairMQMessagePtr& msg, int index);
bool ReceiveHistoConfig(FairMQMessagePtr& msg, int index);
bool ReceiveHistoConfig(FairMQMessagePtr& msg, int index);
bool ReceiveCanvasConfig(FairMQMessagePtr& msg, int index);
bool ReceiveCanvasConfig(FairMQMessagePtr& msg, int index);
virtual void PreRun();
bool ReceiveConfigAndData(FairMQParts& msg, int index);
virtual void PostRun();
virtual void PreRun();
private:
/// Parameters
std::string fsChannelNameHistosInput;
std::string fsChannelNameHistosConfig;
std::string fsChannelNameCanvasConfig;
std::string fsHistoFileName;
uint32_t fuHttpServerPort;
virtual void PostRun();
/// Array of histograms with unique names
TObjArray fArrayHisto;
/// Vector of string with ( HistoName, FolderPath ) to send to the histogram server
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder;
/// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server
/// Format of Can config is "Name;Title;NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)"
/// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)"
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig;
std::vector< bool > fvbCanvasReady;
bool fbAllCanvasReady;
private:
/// Parameters
std::string fsChannelNameHistosInput = "histogram-in";
std::string fsChannelNameHistosConfig = "histo-conf";
std::string fsChannelNameCanvasConfig = "canvas-conf";
std::string fsHistoFileName = "MqHistos.root";
uint32_t fuHttpServerPort = 8098;
std::vector< std::pair< TNamed *, std::string > > fvHistos; //! Vector of Histos pointers and folder path
std::vector< bool > fvbHistoRegistered;
bool fbAllHistosRegistered;
std::vector< std::pair< TCanvas *, std::string > > fvCanvas; //! Vector of Canvas pointers and folder path
std::vector< bool > fvbCanvasRegistered;
bool fbAllCanvasRegistered;
/// Array of histograms with unique names
TObjArray fArrayHisto;
/// Vector of string with ( HistoName, FolderPath ) to send to the histogram server
std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {};
/// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server
/// Format of Can config is "Name;Title;NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)"
/// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)"
std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {};
std::vector<bool> fvbCanvasReady = {};
bool fbAllCanvasReady = false;
/// Internal status
int fNMessages;
std::vector<std::pair<TNamed*, std::string>> fvHistos = {}; //! Vector of Histos pointers and folder path
std::vector<bool> fvbHistoRegistered = {};
bool fbAllHistosRegistered = false;
std::vector<std::pair<TCanvas*, std::string>> fvCanvas = {}; //! Vector of Canvas pointers and folder path
std::vector<bool> fvbCanvasRegistered = {};
bool fbAllCanvasRegistered = false;
THttpServer * fServer;
/// Internal status
int fNMessages = 0;
std::thread fThread;
bool fStopThread;
THttpServer* fServer = nullptr;
template <class HistoT > bool ReadHistogram( HistoT * pHist );
int FindHistogram(const std::string& name);
bool PrepareCanvas( uint32_t uCanvIdx );
std::thread fThread;
bool fStopThread = false;
bool ResetHistograms();
bool SaveHistograms();
template<class HistoT>
bool ReadHistogram(HistoT* pHist);
int FindHistogram(const std::string& name);
bool PrepareCanvas(uint32_t uCanvIdx);
bool ResetHistograms();
bool SaveHistograms();
};
#endif // CBMMQHISTOSERVER_H
#endif // CBMMQHISTOSERVER_H
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
#include "CbmHistoServer.h"
#include <memory>
#include "runFairMQDevice.h"
#include "CbmHistoServer.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/)
{
}
void addCustomOptions(bpo::options_description& /*options*/) {}
//std::unique_ptr<FairMQExHistoCanvasDrawer> getCanvasDrawer();
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
CbmHistoServer* histoServer = new CbmHistoServer();
CbmHistoServer* histoServer = new CbmHistoServer();
// histoServer->SetCanvasDrawer(getCanvasDrawer());
// histoServer->SetCanvasDrawer(getCanvasDrawer());
return histoServer;
return histoServer;
}
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmMqHistoServer.h"
#include <memory>
#include "runFairMQDevice.h"
#include "CbmMqHistoServer.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options() ( "ChNameIn", bpo::value< std::string >()->default_value( "histogram-in" ),
"MQ channel name for histos");
options.add_options() ( "ChNameHistCfg", bpo::value< std::string >()->default_value( "histo-conf" ),
"MQ channel name for histos config");
options.add_options() ( "ChNameCanvCfg", bpo::value< std::string >()->default_value( "canvas-conf" ),
"MQ channel name for canvases config");
options.add_options() ( "HistoFileName", bpo::value< std::string >()->default_value( "HistosMonitorPulser.root" ),
".root File name for histo saving");
options.add_options() ( "histport", bpo::value< uint32_t >()->default_value( 8080 ),
"port for histos http server");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
options.add_options()("ChNameHistCfg", bpo::value<std::string>()->default_value("histo-conf"),
"MQ channel name for histos config");
options.add_options()("ChNameCanvCfg", bpo::value<std::string>()->default_value("canvas-conf"),
"MQ channel name for canvases config");
options.add_options()("HistoFileName", bpo::value<std::string>()->default_value("HistosMonitorPulser.root"),
".root File name for histo saving");
options.add_options()("histport", bpo::value<uint32_t>()->default_value(8080), "port for histos http server");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
CbmMqHistoServer* histoServer = new CbmMqHistoServer();
CbmMqHistoServer* histoServer = new CbmMqHistoServer();
return histoServer;
return histoServer;
}
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQSamplerUnpackerParserverHitBuilder.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQSamplerUnpackerParserverHitBuilder.sh)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/startMQ_Mcbm.sh.in ${CMAKE_BINARY_DIR}/bin/MQ/topologies/startMQ_Mcbm.sh)
set(INCLUDE_DIRECTORIES
${CMAKE_CURRENT_SOURCE_DIR}
${CBMROOT_SOURCE_DIR}/fles/cern2016/param
${CBMROOT_SOURCE_DIR}/fles/cern2016/unpacker
${CBMROOT_SOURCE_DIR}/beamtime/base
${CBMDATA_DIR}
${CBMDATA_DIR}/tof
${CBMBASE_DIR}
${CBMROOT_SOURCE_DIR}/tof/TofParam
${CBMROOT_SOURCE_DIR}/tof/TofTools
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${ZeroMQ_INCLUDE_DIR}
${Boost_INCLUDE_DIR}
${FAIRROOT_INCLUDE_DIR}
${FAIRMQ_INCLUDE_DIR}
${FAIRMQ_INCLUDE_DIR}/options
${IPC_INCLUDE_DIRECTORY}
${CBMROOT_SOURCE_DIR}/external/cppzmq
${CBMROOT_SOURCE_DIR}/tof/TofReco
)
include_directories(${INCLUDE_DIRECTORIES})
include_directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
set(LINK_DIRECTORIES
${ROOT_LIBRARY_DIR}
${FAIRROOT_LIBRARY_DIR}
${Boost_LIBRARY_DIRS}
)
link_directories(${LINK_DIRECTORIES})
)
# Set the install path within the build directory
set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/MQ/hitbuilder")
# Set the install path within the installation directory
set(BIN_DESTINATION bin/MQ/hitbuilder)
Set(BOOST_LIBS
${Boost_SYSTEM_LIBRARY}
${Boost_SERIALIZATION_LIBRARY}
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_LOG_LIBRARY}
)
If(UNIX AND NOT APPLE)
List(APPEND BOOST_LIBS pthread)
EndIf()
set(FAIR_LIBS
Base
ParBase
FairMQ
)
If(FAIRLOGGER_FOUND)
set(FAIR_LIBS
${FAIR_LIBS}
FairLogger
)
EndIf()
set(EXE_NAME HitBuilderTof)
set(SRCS CbmDeviceHitBuilderTof.cxx runHitBuilderTof.cxx)
set(DEPENDENCIES
${DEPENDENCIES}
# FairMQ
${FAIR_LIBS}
${BOOST_LIBS}
# fles_ipc
CbmFlibCern2016
CbmBase
set(PUBLIC_DEPENDENCIES
CbmData
CbmTof
Geom
Core
MathCore
Tree
Physics
RIO
Net
Hist
)
GENERATE_EXECUTABLE()
CbmTofBase
CbmMQBase
FairRoot::Base
ROOT::Geom
)
set(PRIVATE_DEPENDENCIES
CbmBase
CbmTofReco
FairRoot::ParBase
FairRoot::Online
ROOT::Core
ROOT::Graf
ROOT::Hist
ROOT::MathCore
ROOT::Minuit
ROOT::Physics
)
set(INTERFACE_DEPENDENCIES
FairMQ::FairMQ
external::fles_ipc
ROOT::Net
ROOT::RIO
ROOT::Tree
)
generate_cbm_executable()
# Set the correct variables for the installation
set(VMCWORKDIR ${CMAKE_INSTALL_PREFIX}/share/cbmroot)
set(MY_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR})
set(CMAKE_CURRENT_SOURCE_DIR ${VMCWORKDIR}/input)
set(TMPDIR "${CMAKE_BINARY_DIR}")
set(CMAKE_BINARY_DIR ${CMAKE_INSTALL_PREFIX})
# Configure file for installation directory
configure_file(${MY_SOURCE_DIR}/startMQSamplerUnpackerParserverHitBuilder.sh.in ${TMPDIR}/bin/MQ/topologies/install/startMQSamplerUnpackerParserverHitBuilder.sh)
configure_file(${MY_SOURCE_DIR}/startMQ_Mcbm.sh.in ${TMPDIR}/bin/MQ/topologies/install/startMQ_Mcbm.sh)
install(PROGRAMS ${TMPDIR}/bin/MQ/topologies/install/startMQSamplerUnpackerParserverHitBuilder.sh
${TMPDIR}/bin/MQ/topologies/install/startMQ_Mcbm.sh
DESTINATION ${CMAKE_INSTALL_PREFIX}/bin/MQ/topologies
)