Newer
Older
/* Copyright (C) 2018-2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Florian Uhlig [committer] */
/**
* CbmTsaComponentSink.cxx
*
* @since 2018-04-24
* @author F. Uhlig
*/
#include "CbmTsaComponentSink.h"
#include "CbmMQDefs.h"
#include "StorableTimeslice.hpp"
#include "FairMQLogger.h"
#include <boost/archive/binary_iarchive.hpp>
#include <stdexcept>
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
CbmTsaComponentSink::CbmTsaComponentSink() : fNumMessages(0) {}
void CbmTsaComponentSink::InitTask()
try {
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
int noChannel = fChannels.size();
LOG(info) << "Number of defined input channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
OnData(entry.first, &CbmTsaComponentSink::HandleData);
}
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}
bool CbmTsaComponentSink::IsChannelNameAllowed(std::string channelName)
{
std::size_t pos1 = channelName.find(entry);
const vector<std::string>::const_iterator pos =
std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmTsaComponentSink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
// Don't do anything with the data
// Maybe add an message counter which counts the incomming messages and add
// an output
LOG(info) << "Received message number " << fNumMessages << " with size " << msg->GetSize();
std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
inputArchive >> component;
CheckTimeslice(component);
return true;
}
void CbmTsaComponentSink::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
{
LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
LOG(info) << "Equipement ID: " << mdsc.eq_id;
LOG(info) << "Flags: " << mdsc.flags;
LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
LOG(info) << "Microslice Idx: " << mdsc.idx;
LOG(info) << "Checksum: " << mdsc.crc;
LOG(info) << "Size: " << mdsc.size;
LOG(info) << "Offset: " << mdsc.offset;
}
bool CbmTsaComponentSink::CheckTimeslice(const fles::Timeslice& ts)
{
LOG(error) << "No Component in TS " << ts.index();
return 1;
}
LOG(info) << "Found " << ts.num_components() << " different components in timeslice";
for (size_t c = 0; c < ts.num_components(); ++c) {
LOG(info) << "Found " << ts.num_microslices(c) << " microslices in component " << c;
LOG(info) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes";
LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(ts.descriptor(0, 0).sys_id) << std::dec;
for (size_t m = 0; m < ts.num_microslices(c); ++m) {
PrintMicroSliceDescriptor(ts.descriptor(c,m));
}
*/
}
return true;
}