Skip to content
Snippets Groups Projects
  • Pierre-Alain Loizeau's avatar
    [MQ] Add Bmon and Much support to unpack and event building + various fixes and improvements · 9a84d73b
    Pierre-Alain Loizeau authored
    - Add proper Bmon support to unpack + build digi events chain
    - Add proper Much support to unpack + build digi events chain
    - Add user parameter + flags to CbmDeviceUnpack to enable/disable the unpacking of each detector
    - Add proper handling of disabled detectors in MQ transfer ib both CbmDeviceUnpack and CbmDeviceBuildDigiEvents
    - Add parameter to set "MaxNb" type selection in CbmDeviceBuildDigiEvents
    - Synchronize unpackers configuration in CbmDeviceUnpack with the one in the current `macro/run/run_unpack_tsa.C`
    - Use fles::SubsystemIdentifier instead of hard coded values in CbmDeviceUnpack
    [MQ] Add Bmon and Much support to unpack and event building + various fixes and improvements
    Pierre-Alain Loizeau authored
    - Add proper Bmon support to unpack + build digi events chain
    - Add proper Much support to unpack + build digi events chain
    - Add user parameter + flags to CbmDeviceUnpack to enable/disable the unpacking of each detector
    - Add proper handling of disabled detectors in MQ transfer ib both CbmDeviceUnpack and CbmDeviceBuildDigiEvents
    - Add parameter to set "MaxNb" type selection in CbmDeviceBuildDigiEvents
    - Synchronize unpackers configuration in CbmDeviceUnpack with the one in the current `macro/run/run_unpack_tsa.C`
    - Use fles::SubsystemIdentifier instead of hard coded values in CbmDeviceUnpack
ParameterMQServer.cxx 8.01 KiB
 *    Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH    *
 *                                                                              *
 *              This software is distributed under the terms of the             *
 *         GNU Lesser General Public Licence version 3 (LGPL) version 3,        *
 *                  copied verbatim in the file "LICENSE"                       *
 * ParameterMQServer.cxx
 * @since 2015-10-26
 * @author M. Al-Turany, A. Rybalchenko

#include "ParameterMQServer.h"

#include "CbmMQDefs.h"
#include "CbmSetup.h"

#include "FairMQLogger.h"
#include "FairMQProgOptions.h"
#include "FairParAsciiFileIo.h"
#include "FairParGenericSet.h"
#include "FairParRootFileIo.h"
#include "FairRuntimeDb.h"

#include "Rtypes.h"
#include "TGeoManager.h"
#include "TList.h"
#include "TMessage.h"
#include "TObjString.h"
#include "TSystem.h"

using namespace std;

  : fRtdb(FairRuntimeDb::instance())

void ParameterMQServer::InitTask()
  string loadLibs = fConfig->GetValue<string>("libs-to-load");
  if (loadLibs.length() > 0) {
    LOG(info) << "There are libraries to load.";
    if (loadLibs.find(";") != std::string::npos) {
      LOG(info) << "There are several libraries to load";
      istringstream f(loadLibs);
      string s;
      while (getline(f, s, ';')) {
        LOG(info) << "Load library " << s;
    else {
      LOG(info) << "Load library " << loadLibs;
  else {
    LOG(info) << "There are no libraries to load.";

  fFirstInputName  = fConfig->GetValue<string>("first-input-name");
  fFirstInputType  = fConfig->GetValue<string>("first-input-type");
  fSecondInputName = fConfig->GetValue<string>("second-input-name");
  fSecondInputType = fConfig->GetValue<string>("second-input-type");
  fOutputName      = fConfig->GetValue<string>("output-name");
  fOutputType      = fConfig->GetValue<string>("output-type");
  fChannelName     = fConfig->GetValue<string>("channel-name");

  fsSetupName = fConfig->GetValue<std::string>("setup");
  LOG(info) << "Using setup: " << fsSetupName;

  if (fRtdb != 0) {
    // Set first input
    if (fFirstInputType == "ROOT") {
      FairParRootFileIo* par1R = new FairParRootFileIo();
      par1R->open(, "UPDATE");
    else if (fFirstInputType == "ASCII") {
      FairParAsciiFileIo* par1A = new FairParAsciiFileIo();
      if (fFirstInputName.find(";") != std::string::npos) {
        LOG(info) << "File list found!";
        TList* parFileList = new TList();
        TObjString* parFile(NULL);
        istringstream f(fFirstInputName);
        string s;
        while (getline(f, s, ';')) {
          LOG(info) << "File: " << s;
          parFile = new TObjString(s.c_str());
          par1A->open(parFileList, "in");
      else {
        LOG(info) << "Single input file found!";
        par1A->open(, "in");

    // Set second input
    if (fSecondInputName != "") {
      if (fSecondInputType == "ROOT") {
        FairParRootFileIo* par2R = new FairParRootFileIo();
        par2R->open(, "UPDATE");
      else if (fSecondInputType == "ASCII") {
        FairParAsciiFileIo* par2A = new FairParAsciiFileIo();
        if (fSecondInputName.find(";") != std::string::npos) {
          LOG(info) << "File list found!";
          TList* parFileList = new TList();
          TObjString* parFile(NULL);
          istringstream f(fSecondInputName);
          string s;
          while (getline(f, s, ';')) {
            LOG(info) << "File: " << s;
            parFile = new TObjString(s.c_str());
            par2A->open(parFileList, "in");
        else {
          LOG(info) << "Single input file found!";
          par2A->open(, "in");

    // Set output
    if (fOutputName != "") {
      if (fOutputType == "ROOT") {
        FairParRootFileIo* parOut = new FairParRootFileIo(kTRUE);


  // -----   CbmSetup   -----------------------------------------------------
  if ("" != fsSetupName) {
    fSetup = CbmSetup::Instance();
  // ------------------------------------------------------------------------

void ParameterMQServer::Run()
  string parameterName   = "";
  FairParGenericSet* par = nullptr;

  while (cbm::mq::CheckCurrentState(this, cbm::mq::State::Running)) {
    FairMQMessagePtr req(NewMessage());

    if (Receive(req, fChannelName, 0) > 0) {
      string reqStr(static_cast<char*>(req->GetData()), req->GetSize());
      LOG(info) << "Received parameter request from client: \"" << reqStr << "\"";

      if ("setup" == reqStr) {
        // TODO: support for multiple setups on Par Server? with request containing setup name?
        if ("" != fsSetupName && fSetup) {
          /// Prepare serialized versions of the CbmSetup
          CbmSetupStorable exchangableSetup(fSetup);

          TMessage* tmsg = new TMessage(kMESS_OBJECT);

          FairMQMessagePtr rep(NewMessage(
            tmsg->Buffer(), tmsg->BufferSize(),
            [](void* /*data*/, void* object) { delete static_cast<TMessage*>(object); }, tmsg));

          if (Send(rep, fChannelName, 0) < 0) {
            LOG(error) << "failed sending reply to Setup request";
        else {
          LOG(error) << "CbmSetup uninitialized!";
          // Send an empty message back to keep the REQ/REP cycle
          FairMQMessagePtr rep(NewMessage());
          if (Send(rep, fChannelName, 0) < 0) {
            LOG(error) << "failed sending reply to Setup request";
      else {
        size_t pos              = reqStr.rfind(",");
        string newParameterName = reqStr.substr(0, pos);
        int runId               = stoi(reqStr.substr(pos + 1));
        LOG(info) << "Parameter name: " << newParameterName;
        LOG(info) << "Run ID: " << runId;

        LOG(info) << "Retrieving parameter...";
        // Check if the parameter name has changed to avoid getting same container repeatedly
        if (newParameterName != parameterName) {
          parameterName = newParameterName;
          par           = static_cast<FairParGenericSet*>(fRtdb->getContainer(parameterName.c_str()));
        LOG(info) << "Retrieving parameter...Done";
        if (-1 != runId) { fRtdb->initContainers(runId); }

        LOG(info) << "Sending following parameter to the client:";
        if (par) {

          TMessage* tmsg = new TMessage(kMESS_OBJECT);

          FairMQMessagePtr rep(NewMessage(
            tmsg->Buffer(), tmsg->BufferSize(),
            [](void* /*data*/, void* object) { delete static_cast<TMessage*>(object); }, tmsg));

          if (Send(rep, fChannelName, 0) < 0) {
            LOG(error) << "failed sending reply";
        else {
          LOG(error) << "Parameter uninitialized!";
          // Send an empty message back to keep the REQ/REP cycle
          FairMQMessagePtr rep(NewMessage());
          if (Send(rep, fChannelName, 0) < 0) {
            LOG(error) << "failed sending reply";

  if (gGeoManager) {
  delete fRtdb;