Skip to content
Snippets Groups Projects
CbmDeviceStsLocalReco.cxx 8.74 KiB
Newer Older
/* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
   SPDX-License-Identifier: GPL-3.0-only
   Authors: Florian Uhlig [committer] */
/**
 * CbmDeviceStsLocalReco.cxx
 *
 * @since 2019-03-26
 * @author F. Uhlig
 */

#include "CbmDeviceStsLocalReco.h"

Administrator's avatar
Administrator committed
#include "CbmBsField.h"
#include "CbmFieldConst.h"
#include "CbmFieldMap.h"
Administrator's avatar
Administrator committed
#include "CbmFieldMapDistorted.h"
#include "CbmFieldMapSym1.h"
#include "CbmFieldMapSym2.h"
#include "CbmFieldMapSym3.h"
Administrator's avatar
Administrator committed
#include "CbmFieldPar.h"
#include "CbmMQDefs.h"
#include "CbmStsDigitizeParameters.h"
Administrator's avatar
Administrator committed
#include "FairField.h"
#include "FairGeoParSet.h"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h"  // device->fConfig
#include "FairRunAna.h"

//#include "FairParGenericSet.h"
//#include "RootSerializer.h"

#include "TGeoManager.h"
#include "TSystem.h"

/*
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"
Administrator's avatar
Administrator committed
#include "TList.h"
#include "TNamed.h"
*/

#include <boost/archive/binary_iarchive.hpp>

#include <array>
#include <iomanip>
#include <stdexcept>
#include <string>
Administrator's avatar
Administrator committed
struct InitTaskError : std::runtime_error {
  using std::runtime_error::runtime_error;
};

//using namespace std;
using std::string;

CbmDeviceStsLocalReco::CbmDeviceStsLocalReco()
Administrator's avatar
Administrator committed
  : fMaxTimeslices {0}
  , fNumMessages {0}
  , fRunId {"0"}
  , fvmcworkdir {""}
  , fDigiPar {nullptr}
  , fGeoPar {nullptr}
  , fFieldPar {nullptr}  //  , fParCList{nullptr}
Administrator's avatar
Administrator committed

CbmDeviceStsLocalReco::~CbmDeviceStsLocalReco()
{
  if (gGeoManager) {
    gGeoManager->GetListOfVolumes()->Delete();
    gGeoManager->GetListOfShapes()->Delete();
  }
}


void CbmDeviceStsLocalReco::InitTask()
try {
Administrator's avatar
Administrator committed
  fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
  // Get the information about created channels from the device
  // Check if the defined channels from the topology (by name)
  // are in the list of channels which are possible/allowed
  // for the device
  // The idea is to check at initilization if the devices are
  // properly connected. For the time beeing this is done with a
  // nameing convention. It is not avoided that someone sends other
  // data on this channel.
  //logger::SetLogLevel("INFO");

Administrator's avatar
Administrator committed
  int noChannel = fChannels.size();
  LOG(info) << "Number of defined channels: " << noChannel;
Administrator's avatar
Administrator committed
  for (auto const& entry : fChannels) {
    LOG(info) << "Channel name: " << entry.first;
    if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
Administrator's avatar
Administrator committed
    OnData(entry.first, &CbmDeviceStsLocalReco::HandleData);
  }
  InitContainers();
}
catch (InitTaskError& e) {
  LOG(error) << e.what();
  // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
  cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}


bool CbmDeviceStsLocalReco::IsChannelNameAllowed(std::string channelName)
{
Administrator's avatar
Administrator committed
  for (auto const& entry : fAllowedChannels) {
    std::size_t pos1 = channelName.find(entry);
Administrator's avatar
Administrator committed
    if (pos1 != std::string::npos) {
      const std::vector<std::string>::const_iterator pos =
Administrator's avatar
Administrator committed
        std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
      const std::vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
      LOG(info) << "Found " << entry << " in " << channelName;
      LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
  LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
  LOG(error) << "Stop device.";
Bool_t CbmDeviceStsLocalReco::InitContainers()
{
Administrator's avatar
Administrator committed
  Bool_t initOK {kTRUE};
Administrator's avatar
Administrator committed
  fRunId         = fConfig->GetValue<string>("run-id");
  fvmcworkdir    = fConfig->GetValue<string>("vmcworkdir");
  fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");

  LOG(info) << "Init parameter containers for CbmDeviceStsLocalReco.";

  // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
  // Should only be used for small data because of the cost of an additional copy
Administrator's avatar
Administrator committed

  std::string message {"CbmStsDigitizeParameters,"};
  message += fRunId;
  LOG(info) << "Requesting parameter container CbmStsDigitizeParameters, "
Administrator's avatar
Administrator committed
               "sending message: "
            << message;

  FairMQMessagePtr req(NewSimpleMessage(message));
  FairMQMessagePtr rep(NewMessage());

  if (Send(req, "parameters") > 0) {
    if (Receive(rep, "parameters") >= 0) {
      if (rep->GetSize() != 0) {
        CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
        fDigiPar = dynamic_cast<CbmStsDigitizeParameters*>(tmsg.ReadObject(tmsg.GetClass()));
        LOG(info) << "Received unpack parameter from parmq server: " << fDigiPar;
        // TODO: check if fDigiPar is properly initialized from the file
        fDigiPar->Print();
        LOG(info) << fDigiPar->ToString();
        throw InitTaskError("Received empty reply. Parameter not available");
      }
    }
  }

Administrator's avatar
Administrator committed
  std::string message1 {"FairGeoParSet,"};
  message1 += fRunId;
  LOG(info) << "Requesting parameter container FairGeoParSet, sending message: " << message1;

  FairMQMessagePtr req1(NewSimpleMessage(message1));
  FairMQMessagePtr rep1(NewMessage());

  if (Send(req1, "parameters") > 0) {
    if (Receive(rep1, "parameters") >= 0) {
      if (rep1->GetSize() != 0) {
        CbmMqTMessage tmsg(rep1->GetData(), rep1->GetSize());
        fGeoPar = static_cast<FairGeoParSet*>(tmsg.ReadObject(tmsg.GetClass()));
        LOG(info) << "Received unpack parameter from parmq server: " << fGeoPar;
        fGeoPar->Print();
        if (!gGeoManager) { throw InitTaskError("No gGeoManager found in FairGeoParSet"); }
        else {
          gGeoManager->Print();
        }
        throw InitTaskError("Received empty reply. Parameter not available");
      }
    }
  }

Administrator's avatar
Administrator committed
  std::string message2 {"CbmFieldPar,"};
  message2 += fRunId;
  LOG(info) << "Requesting parameter container CbmFieldPar, sending message: " << message2;

  FairMQMessagePtr req2(NewSimpleMessage(message2));
  FairMQMessagePtr rep2(NewMessage());

  if (Send(req2, "parameters") > 0) {
    if (Receive(rep2, "parameters") >= 0) {
      if (rep2->GetSize() != 0) {
        CbmMqTMessage tmsg(rep2->GetData(), rep2->GetSize());
        fFieldPar = static_cast<CbmFieldPar*>(tmsg.ReadObject(tmsg.GetClass()));
        LOG(info) << "Received unpack parameter from parmq server: " << fGeoPar;
        if (-1 == fFieldPar->GetType()) { throw InitTaskError("No field parameters available!"); }
        else {
          fFieldPar->Print();
          LOG(info) << "Before creating the field";
Administrator's avatar
Administrator committed
          FairField* field = createField();
          LOG(info) << "After creating the field";
          FairRunAna* run = new FairRunAna();
          run->SetField(field);
        }
        LOG(error) << "Received empty reply. Parameter not available";
FairField* CbmDeviceStsLocalReco::createField()
{
Administrator's avatar
Administrator committed
  FairField* fMagneticField {nullptr};

  // Instantiate correct field type
  Int_t fType = fFieldPar->GetType();
Administrator's avatar
Administrator committed
  gSystem->Setenv("VMCWORKDIR", fvmcworkdir.c_str());
  if (fType == 0) fMagneticField = new CbmFieldConst(fFieldPar);
Administrator's avatar
Administrator committed
  else if (fType == 1)
    fMagneticField = new CbmFieldMap(fFieldPar);
  else if (fType == 2)
    fMagneticField = new CbmFieldMapSym2(fFieldPar);
  else if (fType == 3)
    fMagneticField = new CbmFieldMapSym3(fFieldPar);
  else if (fType == 4)
    fMagneticField = new CbmFieldMapDistorted(fFieldPar);
  else if (fType == 5)
    fMagneticField = new CbmFieldMapSym1(fFieldPar);
  else if (fType == 6)
    fMagneticField = new CbmBsField(fFieldPar);
  else {
    std::stringstream ss;
    ss << "Unknown field type " << fType;
    throw InitTaskError(ss.str());
  }
  LOG(info) << "New field at " << fMagneticField << ", type " << fType;

  // Initialise field
Administrator's avatar
Administrator committed
  if (fMagneticField) {
    fMagneticField->Init();
    fMagneticField->Print("");
  }

  LOG(info) << "Before return";
  return fMagneticField;
}


// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceStsLocalReco::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
Administrator's avatar
Administrator committed
  // Don't do anything with the data
  // Maybe add an message counter which counts the incomming messages and add
  // an output
  LOG(debug) << "Received message number " << fNumMessages << " with size " << msg->GetSize();

  std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
  std::istringstream iss(msgStr);
  boost::archive::binary_iarchive inputArchive(iss);

  DoWork();

  if (fNumMessages % 10000 == 0) LOG(info) << "Processed " << fNumMessages << " time slices";
Administrator's avatar
Administrator committed
bool CbmDeviceStsLocalReco::SendData() { return true; }
Administrator's avatar
Administrator committed
Bool_t CbmDeviceStsLocalReco::DoWork() { return true; }
Administrator's avatar
Administrator committed
void CbmDeviceStsLocalReco::Finish() {}