Newer
Older
/* Copyright (C) 2019 PI-UHd, GSI
SPDX-License-Identifier: GPL-3.0-only
Authors: Norbert Herrmann [committer] */
/**
* CbmDeviceTriggerHandlerEtof.cxx
*
* @since 2019-11-15
* @author N. Herrmann
*/
#include "CbmDeviceTriggerHandlerEtof.h"
#include "CbmMQDefs.h"
#include "FairEventHeader.h"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairRootFileSink.h"
#include <thread> // this_thread::sleep_for
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/serialization/vector.hpp>
#include <chrono>
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
static std::chrono::steady_clock::time_point dctime = std::chrono::steady_clock::now();
static double dSize = 0.;
using namespace std;
CbmDeviceTriggerHandlerEtof::CbmDeviceTriggerHandlerEtof()
: fNumMessages(0)
, fiMsgCnt(0)
, fbMonitorMode(kFALSE)
, fbDebugMonitorMode(kFALSE)
, fbSandboxMode(kFALSE)
, fbEventDumpEna(kFALSE)
CbmDeviceTriggerHandlerEtof::~CbmDeviceTriggerHandlerEtof() {}
void CbmDeviceTriggerHandlerEtof::InitTask()
try {
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
int noChannel = fChannels.size();
LOG(info) << "Number of defined input channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
if (entry.first != "syscmd") OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleData);
else
OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleMessage);
}
InitWorkspace();
LOG(error) << e.what();
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
bool CbmDeviceTriggerHandlerEtof::IsChannelNameAllowed(std::string channelName)
{
std::size_t pos1 = channelName.find(entry);
const vector<std::string>::const_iterator pos =
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
Bool_t CbmDeviceTriggerHandlerEtof::InitWorkspace()
{
LOG(info) << "Init work space for CbmDeviceTriggerHandlerEtof.";
// steering variables
fbSandboxMode = fConfig->GetValue<bool>("SandboxMode");
return kTRUE;
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
//bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQMessagePtr& msg, int /*index*/)
bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQParts& parts, int /*index*/)
{
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
LOG(debug) << "Received message " << fNumMessages << " with " << parts.Size() << " parts"
std::string msgStrE(static_cast<char*>(parts.At(0)->GetData()), (parts.At(0))->GetSize());
std::istringstream issE(msgStrE);
boost::archive::binary_iarchive inputArchiveE(issE);
char* pDataBuff = static_cast<char*>(parts.At(1)->GetData());
int iBuffSzByte = parts.At(1)->GetSize();
// Send Subevent to STAR
LOG(debug) << "Send Data for event " << fdEvent << ", TrigWord " << TrigWord << " with size " << iBuffSzByte
<< Form(" at %p ", pDataBuff);
if (kFALSE == fbSandboxMode) { star_rhicf_write(TrigWord, pDataBuff, iBuffSzByte); }
std::chrono::duration<double> deltatime = std::chrono::steady_clock::now() - dctime;
LOG(info) << "Processed " << fdEvent << " events, delta-time: " << deltatime.count()
<< ", rate: " << dSize * 1.E-6 / deltatime.count() << "MB/s";
dctime = std::chrono::steady_clock::now();
dSize = 0.;
/************************************************************************************/
bool CbmDeviceTriggerHandlerEtof::HandleMessage(FairMQMessagePtr& msg, int /*index*/)
{
const char* cmd = (char*) (msg->GetData());
const char cmda[4] = {*cmd};
LOG(info) << "Handle message " << cmd << ", " << cmd[0];
// only one implemented so far "Stop"
cbm::mq::ChangeState(this, cbm::mq::Transition::Ready);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::DeviceReady);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::Idle);
cbm::mq::LogState(this);
cbm::mq::ChangeState(this, cbm::mq::Transition::End);
cbm::mq::LogState(this);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));