CbmDeviceEventBuilderEtofStar2019.cxx 19.65 KiB
/* Copyright (C) 2019-2021 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer] */
/**
* CbmDeviceEventBuilderEtofStar2019.cxx
*/
#include "CbmDeviceEventBuilderEtofStar2019.h"
#include "CbmMQDefs.h"
#include "CbmStar2019EventBuilderEtofAlgo.h"
#include "CbmStar2019TofPar.h"
#include "StorableTimeslice.hpp"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairParGenericSet.h"
#include "FairRuntimeDb.h"
#include "TFile.h"
#include "TH1.h"
#include "TH2.h"
#include "THttpServer.h"
#include "TROOT.h"
#include "TString.h"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
// include this header to serialize vectors
#include <boost/serialization/vector.hpp>
#include <array>
#include <iomanip>
#include <stdexcept>
#include <string>
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
using namespace std;
//static Int_t iMess=0;
const Int_t DetMask = 0x003FFFFF;
static uint fiSelectComponents {0};
CbmDeviceEventBuilderEtofStar2019::CbmDeviceEventBuilderEtofStar2019()
: //CbmDeviceUnpackTofMcbm2018(),
fNumMessages(0)
, fbMonitorMode(kFALSE)
, fbDebugMonitorMode(kFALSE)
, fbSandboxMode(kFALSE)
, fbEventDumpEna(kFALSE)
, fParCList(nullptr)
, fulTsCounter(0)
, fNumEvt(0)
, fEventBuilderAlgo(nullptr)
, fTimer()
, fUnpackPar(nullptr)
, fpBinDumpFile(nullptr)
{
fEventBuilderAlgo = new CbmStar2019EventBuilderEtofAlgo();
}
CbmDeviceEventBuilderEtofStar2019::~CbmDeviceEventBuilderEtofStar2019() { delete fEventBuilderAlgo; }
void CbmDeviceEventBuilderEtofStar2019::InitTask()
try {
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
if (entry.first == "syscmd") {
OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleMessage);
continue;
}
//if(entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleData);
if (entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleParts);
else {
fChannelsToSend[0].push_back(entry.first);
LOG(info) << "Init to send data to channel " << fChannelsToSend[0][0];
}
}
InitContainers();
}
catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}
bool CbmDeviceEventBuilderEtofStar2019::IsChannelNameAllowed(std::string channelName)
{
for (auto const& entry : fAllowedChannels) {
LOG(info) << "Inspect " << entry;
std::size_t pos1 = channelName.find(entry);
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
return true;
}
}
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
Bool_t CbmDeviceEventBuilderEtofStar2019::InitContainers()
{
LOG(info) << "Init parameter containers for CbmDeviceEventBuilderEtofStar2019.";
// FairRuntimeDb* fRtdb = FairRuntimeDb::instance();
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
std::string message {"CbmStar2019TofPar,111"};
LOG(info) << "Requesting parameter container CbmStar2019TofPar, sending message: " << message;
FairMQMessagePtr req(NewSimpleMessage("CbmStar2019TofPar,111"));
FairMQMessagePtr rep(NewMessage());
if (Send(req, "parameters") > 0) {
if (Receive(rep, "parameters") >= 0) {
if (rep->GetSize() != 0) {
CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
fUnpackPar = dynamic_cast<CbmStar2019TofPar*>(tmsg.ReadObject(tmsg.GetClass()));
LOG(info) << "Received unpack parameter from parmq server: " << fUnpackPar;
fUnpackPar->Print();
}
else {
LOG(error) << "Received empty reply. Parameter not available";
}
}
}
SetParContainers();
Bool_t initOK = kTRUE;
initOK &= fEventBuilderAlgo->InitContainers();
initOK &= ReInitContainers(); // needed for TInt parameters
fEventBuilderAlgo->SetAddStatusToEvent(true);
if (kTRUE == fbMonitorMode) { // CreateHistograms();
initOK &= fEventBuilderAlgo->CreateHistograms();
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector<std::pair<TNamed*, std::string>> vHistos = fEventBuilderAlgo->GetHistoVector();
/* FIXME
/// Register the histos in the HTTP server
THttpServer* server = FairRunOnline::Instance()->GetHttpServer();
for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
{
server->Register( Form( "/%s", vHistos[ uHisto ].second.data() ), vHistos[ uHisto ].first );
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
server->RegisterCommand("/Reset_EvtBuild_Hist", "bStarEtof2019EventBuilderResetHistos=kTRUE");
server->Restrict("/Reset_EvtBuild_Hist", "allow=admin");
*/
} // if( kTRUE == fbMonitorMode )
return initOK;
}
void CbmDeviceEventBuilderEtofStar2019::SetParContainers()
{
FairRuntimeDb* fRtdb = FairRuntimeDb::instance();
fParCList = fEventBuilderAlgo->GetParList();
LOG(info) << "Setting parameter containers for " << fParCList->GetEntries() << " entries ";
for (Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC) {
FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
fParCList->Remove(tempObj);
std::string sParamName {tempObj->GetName()};
FairParGenericSet* newObj = dynamic_cast<FairParGenericSet*>(fRtdb->getContainer(sParamName.data()));
LOG(info) << " - Get " << sParamName.data() << " at " << newObj;
if (nullptr == newObj) {
LOG(error) << "Failed to obtain parameter container " << sParamName << ", for parameter index " << iparC;
return;
} // if( nullptr == newObj )
if (iparC == 0) {
newObj = (FairParGenericSet*) fUnpackPar;
LOG(info) << " - Mod " << sParamName.data() << " to " << newObj;
}
fParCList->AddAt(newObj, iparC);
// delete tempObj;
} // for( Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC )
}
void CbmDeviceEventBuilderEtofStar2019::AddMsComponentToList(size_t component, UShort_t usDetectorId)
{
fEventBuilderAlgo->AddMsComponentToList(component, usDetectorId);
}
Bool_t CbmDeviceEventBuilderEtofStar2019::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
{
if (0 == fulTsCounter) {
LOG(info) << "FIXME ===> Jumping 1st TS as corrupted with current FW + "
"FLESNET combination";
fulTsCounter++;
return kTRUE;
} // if( 0 == fulTsCounter )
if (kFALSE == fEventBuilderAlgo->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in event builder algorithm class";
return kTRUE;
} // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
std::vector<CbmTofStarSubevent2019>& eventBuffer = fEventBuilderAlgo->GetEventBuffer();
for (UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent) {
/// Send the sub-event to the STAR systems
Int_t iBuffSzByte = 0;
void* pDataBuff = eventBuffer[uEvent].BuildOutput(iBuffSzByte);
if (NULL != pDataBuff) {
/// Valid output, do stuff with it!
// Bool_t fbSendEventToStar = kFALSE;
if (kFALSE == fbSandboxMode) {
/*
** Function to send sub-event block to the STAR DAQ system
* trg_word received is packed as:
*
* trg_cmd|daq_cmd|tkn_hi|tkn_mid|tkn_lo
*/
/*
star_rhicf_write( eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),
pDataBuff, iBuffSzByte );
*/
SendSubevent(eventBuffer[uEvent].GetTrigger().GetStarTrigerWord(), (char*) pDataBuff, iBuffSzByte, 0);
} // if( kFALSE == fbSandboxMode )
LOG(debug) << "Sent STAR event with size " << iBuffSzByte << " Bytes"
<< " and token " << eventBuffer[uEvent].GetTrigger().GetStarToken();
} // if( NULL != pDataBuff )
else
LOG(error) << "Invalid STAR SubEvent Output, can only happen if trigger "
<< " object was not set => Do Nothing more with it!!! ";
} // for( UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent )
return kTRUE;
}
Bool_t CbmDeviceEventBuilderEtofStar2019::ReInitContainers()
{
LOG(info) << "ReInit parameter containers for CbmDeviceEventBuilderEtofStar2019";
Bool_t initOK = fEventBuilderAlgo->ReInitContainers();
return initOK;
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceEventBuilderEtofStar2019::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
fNumMessages++;
LOG(debug) << "Received message number " << fNumMessages << " with size " << msg->GetSize();
std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
fles::StorableTimeslice component {0};
inputArchive >> component;
CheckTimeslice(component);
DoUnpack(component, 0);
// if(fNumMessages%10000 == 0) LOG(info)<<"Processed "<<fNumMessages<<" time slices";
return true;
}
static Double_t dctime = 0.;
bool CbmDeviceEventBuilderEtofStar2019::HandleParts(FairMQParts& parts, int /*index*/)
{
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
fNumMessages++;
LOG(debug) << "Received message number " << fNumMessages << " with " << parts.Size() << " parts";
fles::StorableTimeslice ts {0}; // rename ??? FIXME
switch (fiSelectComponents) {
case 0: {
std::string msgStr(static_cast<char*>(parts.At(0)->GetData()), (parts.At(0))->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
inputArchive >> ts;
CheckTimeslice(ts);
if (1 == fNumMessages) {
LOG(info) << "Initialize TS components list to " << ts.num_components();
for (size_t c {0}; c < ts.num_components(); c++) {
auto systemID = static_cast<int>(ts.descriptor(c, 0).sys_id);
LOG(info) << "Found systemID: " << std::hex << systemID << std::dec;
fEventBuilderAlgo->AddMsComponentToList(c, systemID); // TOF data
}
}
} break;
case 1: {
fles::StorableTimeslice component {0};
uint ncomp = parts.Size();
for (uint i = 0; i < ncomp; i++) {
std::string msgStr(static_cast<char*>(parts.At(i)->GetData()), (parts.At(i))->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
//fles::StorableTimeslice component{i};
inputArchive >> component;
CheckTimeslice(component);
fEventBuilderAlgo->AddMsComponentToList(i, 0x60); // TOF data
LOG(debug) << "HandleParts message " << fNumMessages << " with indx " << component.index();
}
} break;
default:;
}
if (kFALSE == fEventBuilderAlgo->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in event builder algorithm class";
return kTRUE;
} // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
std::vector<CbmTofStarSubevent2019>& eventBuffer = fEventBuilderAlgo->GetEventBuffer();
LOG(debug) << "Process time slice " << fNumMessages << " with " << eventBuffer.size() << " events";
//if(fNumMessages%10000 == 0) LOG(info)<<"Processed "<<fNumMessages<<" time slices";
for (UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent) {
/// Send the sub-event to the STAR systems
Int_t iBuffSzByte = 0;
void* pDataBuff = eventBuffer[uEvent].BuildOutput(iBuffSzByte);
if (NULL != pDataBuff) {
/// Valid output, do stuff with it!
// Send to Star TriggerHandler, TBD
if (kFALSE == fbSandboxMode) {
/*
** Function to send sub-event block to the STAR DAQ system
* trg_word received is packed as:
*
* trg_cmd|daq_cmd|tkn_hi|tkn_mid|tkn_lo
*/
/*
star_rhicf_write( eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),
pDataBuff, iBuffSzByte );
*/
} // if( kFALSE == fbSandboxMode )
SendSubevent(eventBuffer[uEvent].GetTrigger().GetStarTrigerWord(), (char*) pDataBuff, iBuffSzByte, 0);
LOG(debug) << "Sent STAR event " << uEvent << " with size " << iBuffSzByte << " Bytes"
<< ", token " << eventBuffer[uEvent].GetTrigger().GetStarToken() << ", TrigWord "
<< eventBuffer[uEvent].GetTrigger().GetStarTrigerWord();
}
}
if (0 == fulTsCounter % 10000) {
LOG(info) << "Processed " << fulTsCounter << " TS, CPUtime: " << dctime / 10. << " ms/TS";
dctime = 0.;
}
fulTsCounter++;
return true;
}
bool CbmDeviceEventBuilderEtofStar2019::HandleMessage(FairMQMessagePtr& msg, int /*index*/)
{
const char* cmd = (char*) (msg->GetData());
const char cmda[4] = {*cmd};
LOG(info) << "Handle message " << cmd << ", " << cmd[0];
cbm::mq::LogState(this);
// only one implemented so far "Stop"
if (strcmp(cmda, "STOP")) {
LOG(info) << "STOP";
cbm::mq::ChangeState(this, cbm::mq::Transition::Ready);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::DeviceReady);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::Idle);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::End);
cbm::mq::LogState(this);
}
return true;
}
bool CbmDeviceEventBuilderEtofStar2019::CheckTimeslice(const fles::Timeslice& ts)
{
if (0 == ts.num_components()) {
LOG(error) << "No Component in TS " << ts.index();
return 1;
}
auto tsIndex = ts.index();
LOG(debug) << "Found " << ts.num_components() << " different components in timeslice " << tsIndex;
/*
for (size_t c = 0; c < ts.num_components(); ++c) {
LOG(debug) << "Found " << ts.num_microslices(c)
<< " microslices in component " << c;
LOG(debug) << "Component " << c << " has a size of "
<< ts.size_component(c) << " bytes";
LOG(debug) << "Sys ID: Ox" << std::hex << static_cast<int>(ts.descriptor(0,0).sys_id)
<< std::dec;
for (size_t m = 0; m < ts.num_microslices(c); ++m) {
PrintMicroSliceDescriptor(ts.descriptor(c,m));
}
}
*/
return true;
}
bool CbmDeviceEventBuilderEtofStar2019::SendEvent(std::vector<Int_t> vdigi, int idx)
{
LOG(debug) << "Send Data for event " << fNumEvt << " with size " << vdigi.size() << Form(" at %p ", &vdigi);
// LOG(debug) << "EventHeader: "<< fEventHeader[0] << " " << fEventHeader[1] << " " << fEventHeader[2] << " " << fEventHeader[3];
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << vdigi;
std::string* strMsg = new std::string(oss.str());
FairMQParts parts;
parts.AddPart(NewMessage(
const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
LOG(debug) << "Send data to channel " << idx << " " << fChannelsToSend[idx][0];
// if (Send(msg, fChannelsToSend[idx][0]) < 0) {
if (Send(parts, fChannelsToSend[idx][0]) < 0) {
LOG(error) << "Problem sending data " << fChannelsToSend[idx][0];
return false;
}
fNumEvt++;
//if(fNumEvt==100) FairMQStateMachine::ChangeState(PAUSE); //sleep(10000); // Stop for debugging ...
return true;
}
bool CbmDeviceEventBuilderEtofStar2019::SendSubevent(uint trig, char* pData, int nData, int idx)
{
LOG(debug) << "SendSubevent " << fNumEvt << ", TrigWord " << trig << " with size " << nData << Form(" at %p ", pData);
std::stringstream ossE;
boost::archive::binary_oarchive oaE(ossE);
oaE << trig;
std::string* strMsgE = new std::string(ossE.str());
/*
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << cData;
std::string* strMsg = new std::string(oss.str());
*/
std::string* strMsg = new std::string(pData, nData);
FairMQParts parts;
parts.AddPart(NewMessage(
const_cast<char*>(strMsgE->c_str()), // data
strMsgE->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsgE)); // object that manages the data
parts.AddPart(NewMessage(
const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
LOG(debug) << "Send data to channel " << idx << " " << fChannelsToSend[idx][0];
// if (Send(msg, fChannelsToSend[idx][0]) < 0) {
if (Send(parts, fChannelsToSend[idx][0]) < 0) {
LOG(error) << "Problem sending data " << fChannelsToSend[idx][0];
return false;
}
fNumEvt++;
//if(fNumEvt==100) FairMQStateMachine::ChangeState(PAUSE); //sleep(10000); // Stop for debugging ...
return true;
}
void CbmDeviceEventBuilderEtofStar2019::Reset() {}
void CbmDeviceEventBuilderEtofStar2019::Finish()
{
if (NULL != fpBinDumpFile) {
LOG(info) << "Closing binary file used for event dump.";
fpBinDumpFile->close();
} // if( NULL != fpBinDumpFile )
/// If monitor mode enabled, trigger histos creation, obtain pointer on them and add them to the HTTP server
if (kTRUE == fbMonitorMode) {
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector<std::pair<TNamed*, std::string>> vHistos = fEventBuilderAlgo->GetHistoVector();
/// Save old global file and folder pointer to avoid messing with FairRoot
TFile* oldFile = gFile;
TDirectory* oldDir = gDirectory;
/// (Re-)Create ROOT file to store the histos
TFile* histoFile = nullptr;
// open separate histo file in recreate mode
histoFile = new TFile("data/eventBuilderMonHist.root", "RECREATE");
histoFile->cd();
/// Register the histos in the HTTP server
for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
/// Make sure we end up in chosen folder
gDirectory->mkdir(vHistos[uHisto].second.data());
gDirectory->cd(vHistos[uHisto].second.data());
/// Write plot
vHistos[uHisto].first->Write();
histoFile->cd();
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Restore old global file and folder pointer to avoid messing with FairRoot
gFile = oldFile;
gDirectory = oldDir;
histoFile->Close();
} // if( kTRUE == fbMonitorMode )
}