-
Pierre-Alain Loizeau authored
- Add proper Bmon support to unpack + build digi events chain - Add proper Much support to unpack + build digi events chain - Add user parameter + flags to CbmDeviceUnpack to enable/disable the unpacking of each detector - Add proper handling of disabled detectors in MQ transfer ib both CbmDeviceUnpack and CbmDeviceBuildDigiEvents - Add parameter to set "MaxNb" type selection in CbmDeviceBuildDigiEvents - Synchronize unpackers configuration in CbmDeviceUnpack with the one in the current `macro/run/run_unpack_tsa.C` - Use fles::SubsystemIdentifier instead of hard coded values in CbmDeviceUnpack
Pierre-Alain Loizeau authored- Add proper Bmon support to unpack + build digi events chain - Add proper Much support to unpack + build digi events chain - Add user parameter + flags to CbmDeviceUnpack to enable/disable the unpacking of each detector - Add proper handling of disabled detectors in MQ transfer ib both CbmDeviceUnpack and CbmDeviceBuildDigiEvents - Add parameter to set "MaxNb" type selection in CbmDeviceBuildDigiEvents - Synchronize unpackers configuration in CbmDeviceUnpack with the one in the current `macro/run/run_unpack_tsa.C` - Use fles::SubsystemIdentifier instead of hard coded values in CbmDeviceUnpack
ParameterMQServer.cxx 8.01 KiB
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* ParameterMQServer.cxx
*
* @since 2015-10-26
* @author M. Al-Turany, A. Rybalchenko
*/
#include "ParameterMQServer.h"
#include "CbmMQDefs.h"
#include "CbmSetup.h"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h"
#include "FairParAsciiFileIo.h"
#include "FairParGenericSet.h"
#include "FairParRootFileIo.h"
#include "FairRuntimeDb.h"
#include "Rtypes.h"
#include "TGeoManager.h"
#include "TList.h"
#include "TMessage.h"
#include "TObjString.h"
#include "TSystem.h"
using namespace std;
ParameterMQServer::ParameterMQServer()
: fRtdb(FairRuntimeDb::instance())
{
}
void ParameterMQServer::InitTask()
{
string loadLibs = fConfig->GetValue<string>("libs-to-load");
if (loadLibs.length() > 0) {
LOG(info) << "There are libraries to load.";
if (loadLibs.find(";") != std::string::npos) {
LOG(info) << "There are several libraries to load";
istringstream f(loadLibs);
string s;
while (getline(f, s, ';')) {
LOG(info) << "Load library " << s;
gSystem->Load(s.c_str());
}
}
else {
LOG(info) << "Load library " << loadLibs;
gSystem->Load(loadLibs.c_str());
}
}
else {
LOG(info) << "There are no libraries to load.";
}
fFirstInputName = fConfig->GetValue<string>("first-input-name");
fFirstInputType = fConfig->GetValue<string>("first-input-type");
fSecondInputName = fConfig->GetValue<string>("second-input-name");
fSecondInputType = fConfig->GetValue<string>("second-input-type");
fOutputName = fConfig->GetValue<string>("output-name");
fOutputType = fConfig->GetValue<string>("output-type");
fChannelName = fConfig->GetValue<string>("channel-name");
fsSetupName = fConfig->GetValue<std::string>("setup");
LOG(info) << "Using setup: " << fsSetupName;
if (fRtdb != 0) {
// Set first input
if (fFirstInputType == "ROOT") {
FairParRootFileIo* par1R = new FairParRootFileIo();
par1R->open(fFirstInputName.data(), "UPDATE");
fRtdb->setFirstInput(par1R);
}
else if (fFirstInputType == "ASCII") {
FairParAsciiFileIo* par1A = new FairParAsciiFileIo();
if (fFirstInputName.find(";") != std::string::npos) {
LOG(info) << "File list found!";
TList* parFileList = new TList();
TObjString* parFile(NULL);
istringstream f(fFirstInputName);
string s;
while (getline(f, s, ';')) {
LOG(info) << "File: " << s;
parFile = new TObjString(s.c_str());
parFileList->Add(parFile);
par1A->open(parFileList, "in");
}
}
else {
LOG(info) << "Single input file found!";
par1A->open(fFirstInputName.data(), "in");
}
fRtdb->setFirstInput(par1A);
}
// Set second input
if (fSecondInputName != "") {
if (fSecondInputType == "ROOT") {
FairParRootFileIo* par2R = new FairParRootFileIo();
par2R->open(fSecondInputName.data(), "UPDATE");
fRtdb->setSecondInput(par2R);
}
else if (fSecondInputType == "ASCII") {
FairParAsciiFileIo* par2A = new FairParAsciiFileIo();
if (fSecondInputName.find(";") != std::string::npos) {
LOG(info) << "File list found!";
TList* parFileList = new TList();
TObjString* parFile(NULL);
istringstream f(fSecondInputName);
string s;
while (getline(f, s, ';')) {
LOG(info) << "File: " << s;
parFile = new TObjString(s.c_str());
parFileList->Add(parFile);
par2A->open(parFileList, "in");
}
}
else {
LOG(info) << "Single input file found!";
par2A->open(fFirstInputName.data(), "in");
}
fRtdb->setSecondInput(par2A);
}
}
// Set output
if (fOutputName != "") {
if (fOutputType == "ROOT") {
FairParRootFileIo* parOut = new FairParRootFileIo(kTRUE);
parOut->open(fOutputName.data());
fRtdb->setOutput(parOut);
}
fRtdb->saveOutput();
}
}
fRtdb->print();
// ----- CbmSetup -----------------------------------------------------
if ("" != fsSetupName) {
fSetup = CbmSetup::Instance();
fSetup->LoadSetup(fsSetupName.data());
}
// ------------------------------------------------------------------------
}
void ParameterMQServer::Run()
{
string parameterName = "";
FairParGenericSet* par = nullptr;
while (cbm::mq::CheckCurrentState(this, cbm::mq::State::Running)) {
FairMQMessagePtr req(NewMessage());
if (Receive(req, fChannelName, 0) > 0) {
string reqStr(static_cast<char*>(req->GetData()), req->GetSize());
LOG(info) << "Received parameter request from client: \"" << reqStr << "\"";
if ("setup" == reqStr) {
// TODO: support for multiple setups on Par Server? with request containing setup name?
if ("" != fsSetupName && fSetup) {
/// Prepare serialized versions of the CbmSetup
CbmSetupStorable exchangableSetup(fSetup);
TMessage* tmsg = new TMessage(kMESS_OBJECT);
tmsg->WriteObject(&exchangableSetup);
FairMQMessagePtr rep(NewMessage(
tmsg->Buffer(), tmsg->BufferSize(),
[](void* /*data*/, void* object) { delete static_cast<TMessage*>(object); }, tmsg));
if (Send(rep, fChannelName, 0) < 0) {
LOG(error) << "failed sending reply to Setup request";
break;
}
}
else {
LOG(error) << "CbmSetup uninitialized!";
// Send an empty message back to keep the REQ/REP cycle
FairMQMessagePtr rep(NewMessage());
if (Send(rep, fChannelName, 0) < 0) {
LOG(error) << "failed sending reply to Setup request";
break;
}
}
}
else {
size_t pos = reqStr.rfind(",");
string newParameterName = reqStr.substr(0, pos);
int runId = stoi(reqStr.substr(pos + 1));
LOG(info) << "Parameter name: " << newParameterName;
LOG(info) << "Run ID: " << runId;
LOG(info) << "Retrieving parameter...";
// Check if the parameter name has changed to avoid getting same container repeatedly
if (newParameterName != parameterName) {
parameterName = newParameterName;
par = static_cast<FairParGenericSet*>(fRtdb->getContainer(parameterName.c_str()));
}
LOG(info) << "Retrieving parameter...Done";
if (-1 != runId) { fRtdb->initContainers(runId); }
LOG(info) << "Sending following parameter to the client:";
if (par) {
par->print();
TMessage* tmsg = new TMessage(kMESS_OBJECT);
tmsg->WriteObject(par);
FairMQMessagePtr rep(NewMessage(
tmsg->Buffer(), tmsg->BufferSize(),
[](void* /*data*/, void* object) { delete static_cast<TMessage*>(object); }, tmsg));
if (Send(rep, fChannelName, 0) < 0) {
LOG(error) << "failed sending reply";
break;
}
}
else {
LOG(error) << "Parameter uninitialized!";
// Send an empty message back to keep the REQ/REP cycle
FairMQMessagePtr rep(NewMessage());
if (Send(rep, fChannelName, 0) < 0) {
LOG(error) << "failed sending reply";
break;
}
}
}
}
}
}
ParameterMQServer::~ParameterMQServer()
{
if (gGeoManager) {
gGeoManager->GetListOfVolumes()->Delete();
gGeoManager->GetListOfShapes()->Delete();
}
delete fRtdb;
}