-
Eoin Clerkin authored
Decision to not use doxygen for licence headers. Removes doxygen formatting and file tag.
Eoin Clerkin authoredDecision to not use doxygen for licence headers. Removes doxygen formatting and file tag.
CbmDeviceTriggerHandlerEtof.cxx 5.81 KiB
/* Copyright (C) 2019 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer] */
/**
* CbmDeviceTriggerHandlerEtof.cxx
*
* @since 2019-11-15
* @author N. Herrmann
*/
#include "CbmDeviceTriggerHandlerEtof.h"
#include "CbmMQDefs.h"
#include "FairEventHeader.h"
#include "FairFileHeader.h"
#include "FairGeoParSet.h"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairRootFileSink.h"
#include "FairRootManager.h"
#include "FairRunOnline.h"
#include "FairRuntimeDb.h"
#include <thread> // this_thread::sleep_for
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/serialization/vector.hpp>
#include <chrono>
#include <iomanip>
#include <stdexcept>
#include <string>
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
static std::chrono::steady_clock::time_point dctime = std::chrono::steady_clock::now();
static double dSize = 0.;
using namespace std;
CbmDeviceTriggerHandlerEtof::CbmDeviceTriggerHandlerEtof()
: fNumMessages(0)
, fiMsgCnt(0)
, fbMonitorMode(kFALSE)
, fbDebugMonitorMode(kFALSE)
, fbSandboxMode(kFALSE)
, fbEventDumpEna(kFALSE)
, fdEvent(0.)
{
}
CbmDeviceTriggerHandlerEtof::~CbmDeviceTriggerHandlerEtof() {}
void CbmDeviceTriggerHandlerEtof::InitTask()
try {
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
int noChannel = fChannels.size();
LOG(info) << "Number of defined input channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
if (entry.first != "syscmd") OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleData);
else
OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleMessage);
}
InitWorkspace();
}
catch (InitTaskError& e) {
LOG(error) << e.what();
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}
bool CbmDeviceTriggerHandlerEtof::IsChannelNameAllowed(std::string channelName)
{
for (auto const& entry : fAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
return true;
}
}
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
Bool_t CbmDeviceTriggerHandlerEtof::InitWorkspace()
{
LOG(info) << "Init work space for CbmDeviceTriggerHandlerEtof.";
// steering variables
fbSandboxMode = fConfig->GetValue<bool>("SandboxMode");
return kTRUE;
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
//bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQMessagePtr& msg, int /*index*/)
bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQParts& parts, int /*index*/)
{
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
fNumMessages++;
LOG(debug) << "Received message " << fNumMessages << " with " << parts.Size() << " parts"
<< ", size0: " << parts.At(0)->GetSize();
uint TrigWord {0};
std::string msgStrE(static_cast<char*>(parts.At(0)->GetData()), (parts.At(0))->GetSize());
std::istringstream issE(msgStrE);
boost::archive::binary_iarchive inputArchiveE(issE);
inputArchiveE >> TrigWord;
char* pDataBuff = static_cast<char*>(parts.At(1)->GetData());
int iBuffSzByte = parts.At(1)->GetSize();
// Send Subevent to STAR
LOG(debug) << "Send Data for event " << fdEvent << ", TrigWord " << TrigWord << " with size " << iBuffSzByte
<< Form(" at %p ", pDataBuff);
if (kFALSE == fbSandboxMode) { star_rhicf_write(TrigWord, pDataBuff, iBuffSzByte); }
dSize += iBuffSzByte;
if (0 == (int) fdEvent % 10000) {
std::chrono::duration<double> deltatime = std::chrono::steady_clock::now() - dctime;
LOG(info) << "Processed " << fdEvent << " events, delta-time: " << deltatime.count()
<< ", rate: " << dSize * 1.E-6 / deltatime.count() << "MB/s";
dctime = std::chrono::steady_clock::now();
dSize = 0.;
}
fdEvent++;
return kTRUE;
}
/************************************************************************************/
bool CbmDeviceTriggerHandlerEtof::HandleMessage(FairMQMessagePtr& msg, int /*index*/)
{
const char* cmd = (char*) (msg->GetData());
const char cmda[4] = {*cmd};
LOG(info) << "Handle message " << cmd << ", " << cmd[0];
// only one implemented so far "Stop"
if (strcmp(cmda, "STOP")) {
cbm::mq::ChangeState(this, cbm::mq::Transition::Ready);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::DeviceReady);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::Idle);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::End);
cbm::mq::LogState(this);
// ChangeState(fair::mq::Transition(STOP));
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
return true;
}