-
Administrator authored
This version is an exact copy of the last revision of the trunk branch of the old SVN repository of CbmRoor at https://subversion.gsi.de/cbmsoft/cbmroot/trunk The old SVN repository will still be available for read access.
Administrator authoredThis version is an exact copy of the last revision of the trunk branch of the old SVN repository of CbmRoor at https://subversion.gsi.de/cbmsoft/cbmroot/trunk The old SVN repository will still be available for read access.
CbmDeviceEventBuilderEtofStar2019.cxx 19.89 KiB
/**
* CbmDeviceEventBuilderEtofStar2019.cxx
*/
#include "CbmDeviceEventBuilderEtofStar2019.h"
#include "CbmMQDefs.h"
#include "CbmStar2019EventBuilderEtofAlgo.h"
#include "CbmStar2019TofPar.h"
#include "StorableTimeslice.hpp"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairRuntimeDb.h"
#include "FairParGenericSet.h"
#include "THttpServer.h"
#include "TROOT.h"
#include "TFile.h"
#include "TString.h"
#include "TH1.h"
#include "TH2.h"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
// include this header to serialize vectors
#include <boost/serialization/vector.hpp>
#include <string>
#include <iomanip>
#include <array>
#include <stdexcept>
struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; };
using namespace std;
//static Int_t iMess=0;
const Int_t DetMask = 0x003FFFFF;
static uint fiSelectComponents{0};
CbmDeviceEventBuilderEtofStar2019::CbmDeviceEventBuilderEtofStar2019()
: //CbmDeviceUnpackTofMcbm2018(),
fNumMessages(0),
fbMonitorMode( kFALSE ),
fbDebugMonitorMode( kFALSE ),
fbSandboxMode( kFALSE ),
fbEventDumpEna( kFALSE ),
fParCList( nullptr ),
fulTsCounter( 0 ),
fNumEvt(0),
fEventBuilderAlgo( nullptr ),
fTimer(),
fUnpackPar( nullptr ),
fpBinDumpFile( nullptr )
{
fEventBuilderAlgo = new CbmStar2019EventBuilderEtofAlgo();
}
CbmDeviceEventBuilderEtofStar2019::~CbmDeviceEventBuilderEtofStar2019()
{
delete fEventBuilderAlgo;
}
void CbmDeviceEventBuilderEtofStar2019::InitTask()
try
{
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for(auto const &entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
if(entry.first == "syscmd") {
OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleMessage);
continue;
}
//if(entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleData);
if(entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleParts);
else {
fChannelsToSend[0].push_back(entry.first);
LOG(info) << "Init to send data to channel " << fChannelsToSend[0][0];
}
}
InitContainers();
} catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}
bool CbmDeviceEventBuilderEtofStar2019::IsChannelNameAllowed(std::string channelName)
{
for(auto const &entry : fAllowedChannels) {
LOG(info) << "Inspect " << entry;
std::size_t pos1 = channelName.find(entry);
if (pos1!=std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos-fAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName
<< " found in list of allowed channel names at position " << idx;
return true;
}
}
LOG(info) << "Channel name " << channelName
<< " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
Bool_t CbmDeviceEventBuilderEtofStar2019::InitContainers()
{
LOG(info) << "Init parameter containers for CbmDeviceEventBuilderEtofStar2019.";
// FairRuntimeDb* fRtdb = FairRuntimeDb::instance();
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
std::string message{"CbmStar2019TofPar,111"};
LOG(info) << "Requesting parameter container CbmStar2019TofPar, sending message: " << message;
FairMQMessagePtr req(NewSimpleMessage("CbmStar2019TofPar,111"));
FairMQMessagePtr rep(NewMessage());
if (Send(req, "parameters") > 0)
{
if (Receive(rep, "parameters") >= 0)
{
if (rep->GetSize() != 0)
{
CbmMQTMessage tmsg(rep->GetData(), rep->GetSize());
fUnpackPar = dynamic_cast<CbmStar2019TofPar*>(tmsg.ReadObject(tmsg.GetClass()));
LOG(info) << "Received unpack parameter from parmq server: " << fUnpackPar;
fUnpackPar->Print();
}
else
{
LOG(error) << "Received empty reply. Parameter not available";
}
}
}
SetParContainers();
Bool_t initOK = kTRUE;
initOK &= fEventBuilderAlgo->InitContainers();
initOK &= ReInitContainers(); // needed for TInt parameters
fEventBuilderAlgo->SetAddStatusToEvent( true );
if( kTRUE == fbMonitorMode )
{ // CreateHistograms();
initOK &= fEventBuilderAlgo->CreateHistograms();
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector< std::pair< TNamed *, std::string > > vHistos = fEventBuilderAlgo->GetHistoVector();
/* FIXME
/// Register the histos in the HTTP server
THttpServer* server = FairRunOnline::Instance()->GetHttpServer();
for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
{
server->Register( Form( "/%s", vHistos[ uHisto ].second.data() ), vHistos[ uHisto ].first );
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
server->RegisterCommand("/Reset_EvtBuild_Hist", "bStarEtof2019EventBuilderResetHistos=kTRUE");
server->Restrict("/Reset_EvtBuild_Hist", "allow=admin");
*/
} // if( kTRUE == fbMonitorMode )
return initOK;
}
void CbmDeviceEventBuilderEtofStar2019::SetParContainers()
{
FairRuntimeDb* fRtdb = FairRuntimeDb::instance();
fParCList = fEventBuilderAlgo->GetParList();
LOG(info) << "Setting parameter containers for " << fParCList->GetEntries() << " entries ";
for( Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC )
{
FairParGenericSet* tempObj = (FairParGenericSet*)(fParCList->At(iparC));
fParCList->Remove(tempObj);
std::string sParamName{ tempObj->GetName() };
FairParGenericSet* newObj = dynamic_cast<FairParGenericSet*>( fRtdb->getContainer( sParamName.data() ) );
LOG(info) << " - Get " << sParamName.data() << " at " << newObj;
if( nullptr == newObj )
{
LOG(error) << "Failed to obtain parameter container " << sParamName
<< ", for parameter index " << iparC;
return;
} // if( nullptr == newObj )
if( iparC == 0 ) {
newObj=(FairParGenericSet *) fUnpackPar;
LOG(info) << " - Mod " << sParamName.data() << " to " << newObj;
}
fParCList->AddAt(newObj, iparC);
// delete tempObj;
} // for( Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC )
}
void CbmDeviceEventBuilderEtofStar2019::AddMsComponentToList( size_t component, UShort_t usDetectorId )
{
fEventBuilderAlgo->AddMsComponentToList( component, usDetectorId );
}
Bool_t CbmDeviceEventBuilderEtofStar2019::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
{
if( 0 == fulTsCounter )
{
LOG(info) << "FIXME ===> Jumping 1st TS as corrupted with current FW + FLESNET combination";
fulTsCounter++;
return kTRUE;
} // if( 0 == fulTsCounter )
if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
{
LOG(error) << "Failed processing TS " << ts.index()
<< " in event builder algorithm class";
return kTRUE;
} // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
std::vector< CbmTofStarSubevent2019 > & eventBuffer = fEventBuilderAlgo->GetEventBuffer();
for( UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent )
{
/// Send the sub-event to the STAR systems
Int_t iBuffSzByte = 0;
void * pDataBuff = eventBuffer[ uEvent ].BuildOutput( iBuffSzByte );
if( NULL != pDataBuff )
{
/// Valid output, do stuff with it!
// Bool_t fbSendEventToStar = kFALSE;
if( kFALSE == fbSandboxMode )
{
/*
** Function to send sub-event block to the STAR DAQ system
* trg_word received is packed as:
*
* trg_cmd|daq_cmd|tkn_hi|tkn_mid|tkn_lo
*/
/*
star_rhicf_write( eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),
pDataBuff, iBuffSzByte );
*/
SendSubevent(eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),(char *)pDataBuff, iBuffSzByte, 0);
} // if( kFALSE == fbSandboxMode )
LOG(debug) << "Sent STAR event with size " << iBuffSzByte << " Bytes"
<< " and token " << eventBuffer[ uEvent ].GetTrigger().GetStarToken();
} // if( NULL != pDataBuff )
else LOG(error) << "Invalid STAR SubEvent Output, can only happen if trigger "
<< " object was not set => Do Nothing more with it!!! ";
} // for( UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent )
return kTRUE;
}
Bool_t CbmDeviceEventBuilderEtofStar2019::ReInitContainers()
{
LOG(info) << "ReInit parameter containers for CbmDeviceEventBuilderEtofStar2019";
Bool_t initOK = fEventBuilderAlgo->ReInitContainers();
return initOK;
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceEventBuilderEtofStar2019::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
fNumMessages++;
LOG(debug) << "Received message number "<< fNumMessages
<< " with size " << msg->GetSize();
std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
fles::StorableTimeslice component{0};
inputArchive >> component;
CheckTimeslice(component);
DoUnpack(component, 0);
// if(fNumMessages%10000 == 0) LOG(info)<<"Processed "<<fNumMessages<<" time slices";
return true;
}
static Double_t dctime=0.;
bool CbmDeviceEventBuilderEtofStar2019::HandleParts(FairMQParts& parts, int /*index*/)
{
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
fNumMessages++;
LOG(debug) << "Received message number "<< fNumMessages
<< " with " << parts.Size() << " parts" ;
fles::StorableTimeslice ts{0}; // rename ??? FIXME
switch(fiSelectComponents) {
case 0: {
std::string msgStr(static_cast<char*>(parts.At(0)->GetData()),(parts.At(0))->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
inputArchive >> ts;
CheckTimeslice(ts);
if( 1 == fNumMessages ) {
LOG(info) << "Initialize TS components list to " << ts.num_components();
for (size_t c {0}; c < ts.num_components(); c++) {
auto systemID = static_cast<int>(ts.descriptor(c, 0).sys_id);
LOG(info) << "Found systemID: " << std::hex << systemID << std::dec;
fEventBuilderAlgo->AddMsComponentToList( c, systemID ); // TOF data
}
}
}
break;
case 1: {
fles::StorableTimeslice component{0};
uint ncomp=parts.Size();
for (uint i=0; i<ncomp; i++) {
std::string msgStr(static_cast<char*>(parts.At(i)->GetData()),(parts.At(i))->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
//fles::StorableTimeslice component{i};
inputArchive >> component;
CheckTimeslice(component);
fEventBuilderAlgo->AddMsComponentToList( i, 0x60 ); // TOF data
LOG(debug) << "HandleParts message " << fNumMessages << " with indx " << component.index();
}
}
break;
default:
;
}
if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
{
LOG(error) << "Failed processing TS " << ts.index()
<< " in event builder algorithm class";
return kTRUE;
} // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
std::vector< CbmTofStarSubevent2019 > & eventBuffer = fEventBuilderAlgo->GetEventBuffer();
LOG(debug) <<"Process time slice "<<fNumMessages<<" with "<<eventBuffer.size()<<" events";
//if(fNumMessages%10000 == 0) LOG(info)<<"Processed "<<fNumMessages<<" time slices";
for( UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent )
{
/// Send the sub-event to the STAR systems
Int_t iBuffSzByte = 0;
void * pDataBuff = eventBuffer[ uEvent ].BuildOutput( iBuffSzByte );
if( NULL != pDataBuff )
{
/// Valid output, do stuff with it!
// Send to Star TriggerHandler, TBD
if( kFALSE == fbSandboxMode )
{
/*
** Function to send sub-event block to the STAR DAQ system
* trg_word received is packed as:
*
* trg_cmd|daq_cmd|tkn_hi|tkn_mid|tkn_lo
*/
/*
star_rhicf_write( eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),
pDataBuff, iBuffSzByte );
*/
} // if( kFALSE == fbSandboxMode )
SendSubevent(eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),(char *)pDataBuff, iBuffSzByte, 0);
LOG(debug) << "Sent STAR event " << uEvent << " with size " << iBuffSzByte << " Bytes"
<< ", token " << eventBuffer[ uEvent ].GetTrigger().GetStarToken()
<< ", TrigWord " << eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord();
}
}
if( 0 == fulTsCounter % 10000 ) {
LOG(info) << "Processed " << fulTsCounter << " TS, CPUtime: "<< dctime/10. << " ms/TS";
dctime=0.;
}
fulTsCounter++;
return true;
}
bool CbmDeviceEventBuilderEtofStar2019::HandleMessage(FairMQMessagePtr& msg, int /*index*/)
{
const char *cmd = (char *)(msg->GetData());
const char cmda[4]={*cmd};
LOG(info) << "Handle message " << cmd <<", " << cmd[0];
cbm::mq::LogState(this);
// only one implemented so far "Stop"
if( strcmp(cmda,"STOP") ) {
LOG(info) << "STOP";
cbm::mq::ChangeState(this, cbm::mq::Transition::Ready);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::DeviceReady);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::Idle);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::End);
cbm::mq::LogState(this);
}
return true;
}
bool CbmDeviceEventBuilderEtofStar2019::CheckTimeslice(const fles::Timeslice& ts)
{
if ( 0 == ts.num_components() ) {
LOG(error) << "No Component in TS " << ts.index();
return 1;
}
auto tsIndex = ts.index();
LOG(debug) << "Found " << ts.num_components()
<< " different components in timeslice "
<< tsIndex;
/*
for (size_t c = 0; c < ts.num_components(); ++c) {
LOG(debug) << "Found " << ts.num_microslices(c)
<< " microslices in component " << c;
LOG(debug) << "Component " << c << " has a size of "
<< ts.size_component(c) << " bytes";
LOG(debug) << "Sys ID: Ox" << std::hex << static_cast<int>(ts.descriptor(0,0).sys_id)
<< std::dec;
for (size_t m = 0; m < ts.num_microslices(c); ++m) {
PrintMicroSliceDescriptor(ts.descriptor(c,m));
}
}
*/
return true;
}
bool CbmDeviceEventBuilderEtofStar2019::SendEvent( std::vector<Int_t> vdigi, int idx )
{
LOG(debug) << "Send Data for event "<<fNumEvt<<" with size "<< vdigi.size()<< Form(" at %p ",&vdigi);
// LOG(debug) << "EventHeader: "<< fEventHeader[0] << " " << fEventHeader[1] << " " << fEventHeader[2] << " " << fEventHeader[3];
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << vdigi;
std::string* strMsg = new std::string(oss.str());
FairMQParts parts;
parts.AddPart(NewMessage(const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* , void* object){ delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
LOG(debug) << "Send data to channel "<< idx << " " << fChannelsToSend[idx][0];
// if (Send(msg, fChannelsToSend[idx][0]) < 0) {
if (Send(parts, fChannelsToSend[idx][0]) < 0) {
LOG(error) << "Problem sending data " << fChannelsToSend[idx][0];
return false;
}
fNumEvt++;
//if(fNumEvt==100) FairMQStateMachine::ChangeState(PAUSE); //sleep(10000); // Stop for debugging ...
return true;
}
bool CbmDeviceEventBuilderEtofStar2019::SendSubevent( uint trig, char *pData, int nData, int idx )
{
LOG(debug) << "SendSubevent "<<fNumEvt<<", TrigWord " << trig << " with size "<< nData << Form(" at %p ", pData);
std::stringstream ossE;
boost::archive::binary_oarchive oaE(ossE);
oaE << trig;
std::string* strMsgE = new std::string(ossE.str());
/*
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << cData;
std::string* strMsg = new std::string(oss.str());
*/
std::string* strMsg = new std::string(pData, nData);
FairMQParts parts;
parts.AddPart(NewMessage(const_cast<char*>(strMsgE->c_str()), // data
strMsgE->length(), // size
[](void* , void* object){ delete static_cast<std::string*>(object); },
strMsgE)); // object that manages the data
parts.AddPart(NewMessage(const_cast<char*>(strMsg->c_str()), // data
strMsg->length(), // size
[](void* , void* object){ delete static_cast<std::string*>(object); },
strMsg)); // object that manages the data
LOG(debug) << "Send data to channel "<< idx << " " << fChannelsToSend[idx][0];
// if (Send(msg, fChannelsToSend[idx][0]) < 0) {
if (Send(parts, fChannelsToSend[idx][0]) < 0) {
LOG(error) << "Problem sending data " << fChannelsToSend[idx][0];
return false;
}
fNumEvt++;
//if(fNumEvt==100) FairMQStateMachine::ChangeState(PAUSE); //sleep(10000); // Stop for debugging ...
return true;
}
void CbmDeviceEventBuilderEtofStar2019::Reset()
{
}
void CbmDeviceEventBuilderEtofStar2019::Finish()
{
if( NULL != fpBinDumpFile )
{
LOG(info) << "Closing binary file used for event dump.";
fpBinDumpFile->close();
} // if( NULL != fpBinDumpFile )
/// If monitor mode enabled, trigger histos creation, obtain pointer on them and add them to the HTTP server
if( kTRUE == fbMonitorMode )
{
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector< std::pair< TNamed *, std::string > > vHistos = fEventBuilderAlgo->GetHistoVector();
/// (Re-)Create ROOT file to store the histos
TDirectory * oldDir = NULL;
TFile * histoFile = NULL;
// Store current directory position to allow restore later
oldDir = gDirectory;
// open separate histo file in recreate mode
histoFile = new TFile( "data/eventBuilderMonHist.root" , "RECREATE");
histoFile->cd();
/// Register the histos in the HTTP server
for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
{
/// Make sure we end up in chosen folder
gDirectory->mkdir( vHistos[ uHisto ].second.data() );
gDirectory->cd( vHistos[ uHisto ].second.data() );
/// Write plot
vHistos[ uHisto ].first->Write();
histoFile->cd();
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
// Restore original directory position
oldDir->cd();
histoFile->Close();
} // if( kTRUE == fbMonitorMode )
}