Skip to content
Snippets Groups Projects
Select Git revision
  • master
  • memgroups
  • kernel-templates
  • v0.6.4
  • v0.6.3
  • v0.6.2
  • v0.6.1
  • v0.6.0
  • v0.5.4
  • v0.5.3
  • v0.5.2
  • v0.5.1
  • v0.5.0
  • v0.4.4
  • v0.4.3
  • v0.4.2
  • v0.4.1
  • v0.4.0
  • v0.3.0
  • v0.2.2
  • v0.2.1
  • v0.2.0
  • v0.1.0
23 results

MergeKernel.cpp

Blame
  • CbmDeviceTriggerHandlerEtof.cxx 5.81 KiB
    /* Copyright (C) 2019 PI-UHd, GSI
       SPDX-License-Identifier: GPL-3.0-only
       Authors: Norbert Herrmann [committer] */
    
    /**
     * CbmDeviceTriggerHandlerEtof.cxx
     *
     * @since 2019-11-15
     * @author N. Herrmann
     */
    
    #include "CbmDeviceTriggerHandlerEtof.h"
    
    #include "CbmMQDefs.h"
    
    #include "FairEventHeader.h"
    #include "FairFileHeader.h"
    #include "FairGeoParSet.h"
    #include "FairMQLogger.h"
    #include "FairMQProgOptions.h"  // device->fConfig
    #include "FairRootFileSink.h"
    #include "FairRootManager.h"
    #include "FairRunOnline.h"
    #include "FairRuntimeDb.h"
    
    #include <thread>  // this_thread::sleep_for
    
    #include <boost/archive/binary_iarchive.hpp>
    #include <boost/archive/binary_oarchive.hpp>
    #include <boost/serialization/vector.hpp>
    
    #include <chrono>
    #include <iomanip>
    #include <stdexcept>
    #include <string>
    struct InitTaskError : std::runtime_error {
      using std::runtime_error::runtime_error;
    };
    
    static std::chrono::steady_clock::time_point dctime = std::chrono::steady_clock::now();
    static double dSize                                 = 0.;
    
    using namespace std;
    
    CbmDeviceTriggerHandlerEtof::CbmDeviceTriggerHandlerEtof()
      : fNumMessages(0)
      , fiMsgCnt(0)
      , fbMonitorMode(kFALSE)
      , fbDebugMonitorMode(kFALSE)
      , fbSandboxMode(kFALSE)
      , fbEventDumpEna(kFALSE)
      , fdEvent(0.)
    {
    }
    
    CbmDeviceTriggerHandlerEtof::~CbmDeviceTriggerHandlerEtof() {}
    
    void CbmDeviceTriggerHandlerEtof::InitTask()
    try {
      // Get the information about created channels from the device
      // Check if the defined channels from the topology (by name)
      // are in the list of channels which are possible/allowed
      // for the device
      // The idea is to check at initilization if the devices are
      // properly connected. For the time beeing this is done with a
      // nameing convention. It is not avoided that someone sends other
      // data on this channel.
      int noChannel = fChannels.size();
      LOG(info) << "Number of defined input channels: " << noChannel;
      for (auto const& entry : fChannels) {
        LOG(info) << "Channel name: " << entry.first;
        if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
        if (entry.first != "syscmd") OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleData);
        else
          OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleMessage);
      }
      InitWorkspace();
    }
    catch (InitTaskError& e) {
      LOG(error) << e.what();
      cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
    }
    
    bool CbmDeviceTriggerHandlerEtof::IsChannelNameAllowed(std::string channelName)
    {
      for (auto const& entry : fAllowedChannels) {
        std::size_t pos1 = channelName.find(entry);
        if (pos1 != std::string::npos) {
          const vector<std::string>::const_iterator pos =
            std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
          const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
          LOG(info) << "Found " << entry << " in " << channelName;
          LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
          return true;
        }
      }
      LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
      LOG(error) << "Stop device.";
      return false;
    }
    
    Bool_t CbmDeviceTriggerHandlerEtof::InitWorkspace()
    {
      LOG(info) << "Init work space for CbmDeviceTriggerHandlerEtof.";
    
      // steering variables
      fbSandboxMode = fConfig->GetValue<bool>("SandboxMode");
    
      return kTRUE;
    }
    
    // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
    //bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQMessagePtr& msg, int /*index*/)
    bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQParts& parts, int /*index*/)
    {
      // Don't do anything with the data
      // Maybe add an message counter which counts the incomming messages and add
      // an output
      fNumMessages++;
      LOG(debug) << "Received message " << fNumMessages << " with " << parts.Size() << " parts"
                 << ", size0: " << parts.At(0)->GetSize();
    
      uint TrigWord {0};
      std::string msgStrE(static_cast<char*>(parts.At(0)->GetData()), (parts.At(0))->GetSize());
      std::istringstream issE(msgStrE);
      boost::archive::binary_iarchive inputArchiveE(issE);
      inputArchiveE >> TrigWord;
    
      char* pDataBuff = static_cast<char*>(parts.At(1)->GetData());
      int iBuffSzByte = parts.At(1)->GetSize();
    
      // Send Subevent to STAR
      LOG(debug) << "Send Data for event " << fdEvent << ", TrigWord " << TrigWord << " with size " << iBuffSzByte
                 << Form(" at %p ", pDataBuff);
      if (kFALSE == fbSandboxMode) { star_rhicf_write(TrigWord, pDataBuff, iBuffSzByte); }
      dSize += iBuffSzByte;
      if (0 == (int) fdEvent % 10000) {
        std::chrono::duration<double> deltatime = std::chrono::steady_clock::now() - dctime;
        LOG(info) << "Processed " << fdEvent << " events,  delta-time: " << deltatime.count()
                  << ", rate: " << dSize * 1.E-6 / deltatime.count() << "MB/s";
        dctime = std::chrono::steady_clock::now();
        dSize  = 0.;
      }
      fdEvent++;
    
      return kTRUE;
    }
    
    /************************************************************************************/
    
    bool CbmDeviceTriggerHandlerEtof::HandleMessage(FairMQMessagePtr& msg, int /*index*/)
    {
      const char* cmd    = (char*) (msg->GetData());
      const char cmda[4] = {*cmd};
      LOG(info) << "Handle message " << cmd << ", " << cmd[0];
    
      // only one implemented so far "Stop"
      if (strcmp(cmda, "STOP")) {
        cbm::mq::ChangeState(this, cbm::mq::Transition::Ready);
        cbm::mq::LogState(this);
        cbm::mq::ChangeState(this, cbm::mq::Transition::DeviceReady);
        cbm::mq::LogState(this);
        cbm::mq::ChangeState(this, cbm::mq::Transition::Idle);
        cbm::mq::LogState(this);
        cbm::mq::ChangeState(this, cbm::mq::Transition::End);
        cbm::mq::LogState(this);
        //    ChangeState(fair::mq::Transition(STOP));
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
      }
    
      return true;
    }