Newer
Older
/* Copyright (C) 2020 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */

Pierre-Alain Loizeau
committed
/**
* CbmDeviceMcbmUnpack.cxx
*
* @since 2020-05-04
* @author P.-A. Loizeau
*/
#include "CbmDeviceMcbmUnpack.h"

Pierre-Alain Loizeau
committed
#include "CbmMcbm2018UnpackerAlgoMuch.h"
#include "CbmMcbm2018UnpackerAlgoPsd.h"
#include "CbmMcbm2018UnpackerAlgoRich.h"
#include "CbmMcbm2018UnpackerAlgoSts.h"
#include "CbmMcbm2018UnpackerAlgoTof.h"
#include "CbmMcbm2018UnpackerAlgoTrdR.h"

Pierre-Alain Loizeau
committed
#include "StorableTimeslice.hpp"

Pierre-Alain Loizeau
committed
#include "FairMQLogger.h"

Pierre-Alain Loizeau
committed
#include "FairParGenericSet.h"
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
#include <boost/archive/binary_iarchive.hpp>

Pierre-Alain Loizeau
committed
#include <array>
#include <iomanip>

Pierre-Alain Loizeau
committed
#include <stdexcept>
#include <string>
#include "RootSerializer.h"
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};

Pierre-Alain Loizeau
committed
using namespace std;
Bool_t bMcbm2018MonitorTaskBmonResetHistos = kFALSE;

Pierre-Alain Loizeau
committed
CbmDeviceMcbmUnpack::CbmDeviceMcbmUnpack()
{
fUnpAlgoSts = new CbmMcbm2018UnpackerAlgoSts();
fUnpAlgoMuch = new CbmMcbm2018UnpackerAlgoMuch();
fUnpAlgoTrd = new CbmMcbm2018UnpackerAlgoTrdR();
fUnpAlgoTof = new CbmMcbm2018UnpackerAlgoTof();
fUnpAlgoRich = new CbmMcbm2018UnpackerAlgoRich();
fUnpAlgoPsd = new CbmMcbm2018UnpackerAlgoPsd();

Pierre-Alain Loizeau
committed
}
void CbmDeviceMcbmUnpack::InitTask()
try {
/// Read options from executable
LOG(info) << "Init options for CbmDeviceMcbmUnpack.";
fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs");
fvsSetTimeOffs = fConfig->GetValue<std::vector<std::string>>("SetTrigWin");
fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
fsChannelNameDataOutput = fConfig->GetValue<std::string>("TsNameOut");
fsAllowedChannels[0] = fsChannelNameDataInput;
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
OnData(entry.first, &CbmDeviceMcbmUnpack::HandleData);
} // if( entry.first.find( "ts" )
} // for( auto const &entry : fChannels )
InitContainers();
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);

Pierre-Alain Loizeau
committed
}
bool CbmDeviceMcbmUnpack::IsChannelNameAllowed(std::string channelName)
{
for (auto const& entry : fsAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
return true;
} // if (pos1!=std::string::npos)
} // for(auto const &entry : fsAllowedChannels)
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";

Pierre-Alain Loizeau
committed
}
Bool_t CbmDeviceMcbmUnpack::InitContainers()
{
LOG(info) << "Init parameter containers for CbmDeviceMcbmUnpack.";
if (kFALSE == InitParameters(fUnpAlgoSts->GetParList())) return kFALSE;
if (kFALSE == InitParameters(fUnpAlgoMuch->GetParList())) return kFALSE;
if (kFALSE == InitParameters(fUnpAlgoTrd->GetParList())) return kFALSE;
if (kFALSE == InitParameters(fUnpAlgoTof->GetParList())) return kFALSE;
if (kFALSE == InitParameters(fUnpAlgoRich->GetParList())) return kFALSE;
if (kFALSE == InitParameters(fUnpAlgoPsd->GetParList())) return kFALSE;
/// Need to add accessors for all options
fUnpAlgoSts->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
fUnpAlgoMuch->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
fUnpAlgoTrd->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
fUnpAlgoTof->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
fUnpAlgoRich->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
fUnpAlgoPsd->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
/// Load time offsets
for (std::vector<std::string>::iterator itStrOffs = fvsSetTimeOffs.begin(); itStrOffs != fvsSetTimeOffs.end();
++itStrOffs) {
size_t charPosDel = (*itStrOffs).find(',');
if (std::string::npos == charPosDel) {
LOG(info) << "CbmDeviceMcbmUnpack::InitContainers => "
<< "Trying to set trigger window with invalid option pattern, ignored! "
<< " (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found " << (*itStrOffs) << " )";
} // if( std::string::npos == charPosDel )
/// Detector Enum Tag
std::string sSelDet = (*itStrOffs).substr(0, charPosDel);
/// Min number
charPosDel++;
Double_t dOffset = std::stod((*itStrOffs).substr(charPosDel));
if ("kSTS" == sSelDet) { fUnpAlgoSts->SetTimeOffsetNs(dOffset); } // if( "kSTS" == sSelDet )
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
else if ("kMUCH" == sSelDet) {
fUnpAlgoMuch->SetTimeOffsetNs(dOffset);
} // else if( "kMUCH" == sSelDet )
else if ("kTRD" == sSelDet) {
fUnpAlgoTrd->SetTimeOffsetNs(dOffset);
} // else if( "kTRD" == sSelDet )
else if ("kTOF" == sSelDet) {
fUnpAlgoTof->SetTimeOffsetNs(dOffset);
} // else if( "kTOF" == sSelDet )
else if ("kRICH" == sSelDet) {
fUnpAlgoRich->SetTimeOffsetNs(dOffset);
} // else if( "kRICH" == sSelDet )
else if ("kPSD" == sSelDet) {
fUnpAlgoPsd->SetTimeOffsetNs(dOffset);
} // else if( "kPSD" == sSelDet )
else {
LOG(info) << "CbmDeviceMcbmUnpack::InitContainers => Trying to set time "
"offset for unsupported detector, ignored! "
<< (sSelDet);
continue;
} // else of detector enum detection
} // for( std::vector< std::string >::iterator itStrAdd = fvsAddDet.begin(); itStrAdd != fvsAddDet.end(); ++itStrAdd )
/// Starting from first run on Tuesday 28/04/2020, STS uses bin sorter FW
fUnpAlgoSts->SetBinningFwFlag(kTRUE);
/// Starting from first run on Monday 04/05/2020, MUCH uses bin sorter FW
fUnpAlgoMuch->SetBinningFwFlag(kTRUE);
Bool_t initOK = fUnpAlgoSts->InitContainers();
initOK &= fUnpAlgoMuch->InitContainers();
initOK &= fUnpAlgoTrd->InitContainers();
initOK &= fUnpAlgoTof->InitContainers();
initOK &= fUnpAlgoRich->InitContainers();
initOK &= fUnpAlgoPsd->InitContainers();
/// Special case for TRD vector initialization
/// Get address of vector from algo in a kind of loopback ^^'
initOK &= fUnpAlgoTrd->SetDigiOutputPointer(&(fUnpAlgoTrd->GetVector()));
// Bool_t initOK = fMonitorAlgo->ReInitContainers();

Pierre-Alain Loizeau
committed
return initOK;
}
Bool_t CbmDeviceMcbmUnpack::InitParameters(TList* fParCList)
{
for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
fParCList->Remove(tempObj);
std::string paramName {tempObj->GetName()};
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
// Her must come the proper Runid
std::string message = paramName + ",111";
LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
FairMQMessagePtr req(NewSimpleMessage(message));
FairMQMessagePtr rep(NewMessage());
FairParGenericSet* newObj = nullptr;
if (Send(req, "parameters") > 0) {
if (Receive(rep, "parameters") >= 0) {
if (0 != rep->GetSize()) {
CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
LOG(info) << "Received unpack parameter from the server:";
newObj->print();
} // if( 0 != rep->GetSize() )
else {
LOG(error) << "Received empty reply. Parameter not available";
return kFALSE;
} // else of if( 0 != rep->GetSize() )
} // if( Receive( rep, "parameters" ) >= 0)
} // if( Send(req, "parameters") > 0 )
fParCList->AddAt(newObj, iparC);
delete tempObj;
} // for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
return kTRUE;

Pierre-Alain Loizeau
committed
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMcbmUnpack::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(debug) << "Received message number " << fulNumMessages << " with size " << msg->GetSize();
if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
/// Create an empty TS and fill it with the incoming message
fles::StorableTimeslice ts {0};
inputArchive >> ts;
/// On first TS, extract the TS parameters from header (by definition stable over time)
if (-1.0 == fdTsCoreSizeInNs) {
fuNbCoreMsPerTs = ts.num_core_microslices();
fuNbOverMsPerTs = ts.num_microslices(0) - ts.num_core_microslices();
fdTsCoreSizeInNs = fdMsSizeInNs * (fuNbCoreMsPerTs);
fdTsOverSizeInNs = fdMsSizeInNs * (fuNbOverMsPerTs);
fdTsFullSizeInNs = fdTsCoreSizeInNs + fdTsOverSizeInNs;
LOG(info) << "Timeslice parameters: each TS has " << fuNbCoreMsPerTs << " Core MS and " << fuNbOverMsPerTs
<< " Overlap MS, for a core duration of " << fdTsCoreSizeInNs << " ns and a full duration of "
<< fdTsFullSizeInNs << " ns";
fTsMetaData = new TimesliceMetaData(ts.descriptor(0, 0).idx, fdTsCoreSizeInNs, fdTsOverSizeInNs, ts.index());
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
/// Process the Timeslice
DoUnpack(ts, 0);
/// Send digi vectors to ouput
if (!SendUnpData()) return false;
delete fTsMetaData;
/// Clear the digis vectors in case it was filled
fUnpAlgoSts->ClearVector();
fUnpAlgoMuch->ClearVector();
fUnpAlgoTrd->ClearVector();
fUnpAlgoTof->ClearVector();
fUnpAlgoRich->ClearVector();
fUnpAlgoPsd->ClearVector();
/// Clear the digis vectors in case it was filled
fUnpAlgoSts->ClearErrorVector();
fUnpAlgoMuch->ClearErrorVector();
fUnpAlgoTrd->ClearErrorVector();
fUnpAlgoTof->ClearErrorVector();
fUnpAlgoRich->ClearErrorVector();
fUnpAlgoPsd->ClearErrorVector();
return true;

Pierre-Alain Loizeau
committed
}
bool CbmDeviceMcbmUnpack::SendUnpData()
{

Pierre-Alain Loizeau
committed

Pierre-Alain Loizeau
committed
std::stringstream ossTsMeta;
boost::archive::binary_oarchive oaTsMeta(ossTsMeta);
oaTsMeta << *(fTsMetaData);
std::string* strMsgTsMetaE = new std::string(ossTsMeta.str());
*/
// Serialize<RootSerializer>(*messTsMeta, fTsMetaData);
RootSerializer().Serialize(*messTsMeta, fTsMetaData);
std::stringstream ossSts;
boost::archive::binary_oarchive oaSts(ossSts);
oaSts << fUnpAlgoSts->GetVector();
std::string* strMsgSts = new std::string(ossSts.str());
std::stringstream ossMuch;
boost::archive::binary_oarchive oaMuch(ossMuch);
oaMuch << fUnpAlgoMuch->GetVector();
std::string* strMsgMuch = new std::string(ossMuch.str());
std::stringstream ossTrd;
boost::archive::binary_oarchive oaTrd(ossTrd);
oaTrd << fUnpAlgoTrd->GetVector();
std::string* strMsgTrd = new std::string(ossTrd.str());
/// Split TOF vector in TOF and Bmon
std::vector<CbmTofDigi>& vDigiTofBmon = fUnpAlgoTof->GetVector();
for (auto digi : vDigiTofBmon) {
if (fuDigiMaskedIdBmon == (digi.GetAddress() & fuDigiMaskId)) {
/// Insert data in Bmon output container
vDigiBmon.emplace_back(digi);
} // if( fuDigiMaskedIdBmon == ( digi.GetAddress() & fuDigiMaskId ) )
else {
/// Insert data in TOF output container
vDigiTof.emplace_back(digi);
} // else of if( fuDigiMaskedIdBmon == ( digi.GetAddress() & fuDigiMaskId ) )
} // for( auto digi: vDigi )
std::stringstream ossTof;
boost::archive::binary_oarchive oaTof(ossTof);
oaTof << vDigiTof;
std::string* strMsgTof = new std::string(ossTof.str());
std::stringstream ossBmon;
boost::archive::binary_oarchive oaBmon(ossBmon);
oaBmon << vDigiBmon;
std::string* strMsgBmon = new std::string(ossBmon.str());
std::stringstream ossRich;
boost::archive::binary_oarchive oaRich(ossRich);
oaRich << fUnpAlgoRich->GetVector();
std::string* strMsgRich = new std::string(ossRich.str());
std::stringstream ossPsd;
boost::archive::binary_oarchive oaPsd(ossPsd);
oaPsd << fUnpAlgoPsd->GetVector();
std::string* strMsgPsd = new std::string(ossPsd.str());
FairMQParts parts;
parts.AddPart(std::move(messTsMeta));
/*

Pierre-Alain Loizeau
committed
parts.AddPart( NewMessage( const_cast< char * >( strMsgTsMetaE->c_str() ), // data
strMsgTsMetaE->length(), // size
[]( void* , void* object ){ delete static_cast< std::string * >( object ); },
strMsgTsMetaE
)
); // object that manages the data
*/
const_cast<char*>(strMsgBmon->c_str()), // data
strMsgBmon->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
parts.AddPart(NewMessage(
const_cast<char*>(strMsgSts->c_str()), // data
strMsgSts->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsgSts)); // object that manages the data
parts.AddPart(NewMessage(
const_cast<char*>(strMsgMuch->c_str()), // data
strMsgMuch->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsgMuch)); // object that manages the data
parts.AddPart(NewMessage(
const_cast<char*>(strMsgTrd->c_str()), // data
strMsgTrd->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsgTrd)); // object that manages the data
parts.AddPart(NewMessage(
const_cast<char*>(strMsgTof->c_str()), // data
strMsgTof->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsgTof)); // object that manages the data
parts.AddPart(NewMessage(
const_cast<char*>(strMsgRich->c_str()), // data
strMsgRich->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsgRich)); // object that manages the data
parts.AddPart(NewMessage(
const_cast<char*>(strMsgPsd->c_str()), // data
strMsgPsd->length(), // size
[](void*, void* object) { delete static_cast<std::string*>(object); },
strMsgPsd)); // object that manages the data
if (Send(parts, fsChannelNameDataOutput) < 0) {

Pierre-Alain Loizeau
committed
LOG(error) << "Problem sending data to " << fsChannelNameDataOutput;
return false;
}

Pierre-Alain Loizeau
committed
}
CbmDeviceMcbmUnpack::~CbmDeviceMcbmUnpack()
{
if (nullptr != fUnpAlgoSts) delete fUnpAlgoSts;
if (nullptr != fUnpAlgoMuch) delete fUnpAlgoMuch;
if (nullptr != fUnpAlgoTrd) delete fUnpAlgoTrd;
if (nullptr != fUnpAlgoTof) delete fUnpAlgoTof;
if (nullptr != fUnpAlgoRich) delete fUnpAlgoRich;
if (nullptr != fUnpAlgoPsd) delete fUnpAlgoPsd;

Pierre-Alain Loizeau
committed
}
Bool_t CbmDeviceMcbmUnpack::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
{
fulTsCounter++;
if (kFALSE == fbComponentsAddedToList) {
for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
switch (ts.descriptor(uCompIdx, 0).sys_id) {
case kusSysIdSts: {
fUnpAlgoSts->AddMsComponentToList(uCompIdx, kusSysIdSts);
break;
} // case kusSysIdSts
case kusSysIdMuch: {
fUnpAlgoMuch->AddMsComponentToList(uCompIdx, kusSysIdMuch);
break;
} // case kusSysIdMuch
case kusSysIdTrd: {
fUnpAlgoTrd->AddMsComponentToList(uCompIdx, kusSysIdTrd);
break;
} // case kusSysIdTrd
case kusSysIdTof: {
fUnpAlgoTof->AddMsComponentToList(uCompIdx, kusSysIdTof);
break;
} // case kusSysIdTof
case kusSysIdBmon: {
fUnpAlgoTof->AddMsComponentToList(uCompIdx, kusSysIdBmon);
case kusSysIdRich: {
fUnpAlgoRich->AddMsComponentToList(uCompIdx, kusSysIdRich);
break;
} // case kusSysIdRich
case kusSysIdPsd: {
fUnpAlgoPsd->AddMsComponentToList(uCompIdx, kusSysIdPsd);
break;
} // case kusSysIdPsd
default: break;
} // switch( ts.descriptor( uCompIdx, 0 ).sys_id )
} // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
fbComponentsAddedToList = kTRUE;
} // if( kFALSE == fbComponentsAddedToList )
if (kFALSE == fUnpAlgoSts->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in STS unpacker algorithm class";
return kFALSE;
} // if( kFALSE == fUnpAlgoSts->ProcessTs( ts ) )
if (kFALSE == fUnpAlgoMuch->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in MUCH unpacker algorithm class";
return kFALSE;
} // if( kFALSE == fUnpAlgoMuch->ProcessTs( ts ) )
if (kFALSE == fUnpAlgoTrd->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in TRD unpacker algorithm class";
return kFALSE;
} // if( kFALSE == fUnpAlgoTrd->ProcessTs( ts ) )
if (kFALSE == fUnpAlgoTof->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in TOF unpacker algorithm class";
return kFALSE;
} // if( kFALSE == fUnpAlgoTof->ProcessTs( ts ) )
if (kFALSE == fUnpAlgoRich->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in RICH unpacker algorithm class";
return kFALSE;
} // if( kFALSE == fUnpAlgoRich->ProcessTs( ts ) )
if (kFALSE == fUnpAlgoPsd->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in PSD unpacker algorithm class";
return kFALSE;
} // if( kFALSE == fUnpAlgoPsd->ProcessTs( ts ) )
if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";

Pierre-Alain Loizeau
committed
}