Skip to content
Snippets Groups Projects
CbmMQTsaInfo.cxx 5.77 KiB
/* Copyright (C) 2017-2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
   SPDX-License-Identifier: GPL-3.0-only
   Authors: Florian Uhlig [committer] */

/**
 *  CbmMQTsaInfo.cpp
 *
 * @since 2017-11-17
 * @author F. Uhlig
 */


#include "CbmMQTsaInfo.h"

#include "CbmMQDefs.h"

#include "TimesliceInputArchive.hpp"
#include "TimesliceSubscriber.hpp"

#include "FairMQLogger.h"
#include "FairMQProgOptions.h"  // device->fConfig

#include <thread>  // this_thread::sleep_for

#include <boost/archive/binary_oarchive.hpp>

#include <chrono>
#include <ctime>

#include <stdio.h>

using namespace std;

#include <stdexcept>

struct InitTaskError : std::runtime_error {
  using std::runtime_error::runtime_error;
};


CbmMQTsaInfo::CbmMQTsaInfo()
  : FairMQDevice()
  , fMaxTimeslices(0)
  , fFileName("")
  , fInputFileList()
  , fFileCounter(0)
  , fHost("")
  , fPort(0)
  , fTSNumber(0)
  , fTSCounter(0)
  , fMessageCounter(0)
  , fTime()
{
}

void CbmMQTsaInfo::InitTask()
try {
  // Get the values from the command line options (via fConfig)
  fFileName      = fConfig->GetValue<string>("filename");
  fHost          = fConfig->GetValue<string>("flib-host");
  fPort          = fConfig->GetValue<uint64_t>("flib-port");
  fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");


  LOG(info) << "Filename: " << fFileName;
  LOG(info) << "Host: " << fHost;
  LOG(info) << "Port: " << fPort;

  LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
  // Get the information about created channels from the device
  // Check if the defined channels from the topology (by name)
  // are in the list of channels which are possible/allowed
  // for the device
  // The idea is to check at initilization if the devices are
  // properly connected. For the time beeing this is done with a
  // nameing convention. It is not avoided that someone sends other
  // data on this channel.
  int noChannel = fChannels.size();
  LOG(info) << "Number of defined output channels: " << noChannel;
  for (auto const& entry : fChannels) {
    LOG(info) << "Channel name: " << entry.first;
    if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
  }

  if (0 == fFileName.size() && 0 != fHost.size()) {
    std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
    LOG(info) << "Open TSPublisher at " << connector;
    fSource = new fles::TimesliceSubscriber(connector, 1);
    if (!fSource) { throw InitTaskError("Could not connect to publisher."); }
  }
  else {
    LOG(info) << "Open the Flib input file " << fFileName;
    // Check if the input file exist
    FILE* inputFile = fopen(fFileName.c_str(), "r");
    if (!inputFile) { throw InitTaskError("Input file doesn't exist."); }
    fclose(inputFile);
    fSource = new fles::TimesliceInputArchive(fFileName);
    if (!fSource) { throw InitTaskError("Could not open input file."); }
  }
  fTime = std::chrono::steady_clock::now();
}
catch (InitTaskError& e) {
  LOG(error) << e.what();
  // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
  cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}

bool CbmMQTsaInfo::IsChannelNameAllowed(std::string channelName)
{
  if (std::find(fAllowedChannels.begin(), fAllowedChannels.end(), channelName) != fAllowedChannels.end()) {
    LOG(info) << "Channel name " << channelName << " found in list of allowed channel names.";
    return true;
  }
  else {
    LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
    LOG(error) << "Stop device.";
    return false;
  }
}

bool CbmMQTsaInfo::ConditionalRun()
{


  auto timeslice = fSource->get();
  if (timeslice) {
    fTSCounter++;
    if (fTSCounter % 10000 == 0) LOG(info) << "Analyse Event " << fTSCounter;


    const fles::Timeslice& ts = *timeslice;
    //    auto tsIndex = ts.index();


    LOG(info) << "Found " << ts.num_components() << " different components in timeslice";

    CheckTimeslice(ts);

    if (fTSCounter < fMaxTimeslices) { return true; }
    else {
      CalcRuntime();
      return false;
    }
  }
  else {
    CalcRuntime();
    return false;
  }
}


CbmMQTsaInfo::~CbmMQTsaInfo() {}

void CbmMQTsaInfo::CalcRuntime()
{
  std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;

  LOG(info) << "Runtime: " << run_time.count();
  LOG(info) << "No more input data";
}


void CbmMQTsaInfo::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
{
  LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
  LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
  LOG(info) << "Equipement ID: " << mdsc.eq_id;
  LOG(info) << "Flags: " << mdsc.flags;
  LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
  LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
  LOG(info) << "Microslice Idx: " << mdsc.idx;
  LOG(info) << "Checksum: " << mdsc.crc;
  LOG(info) << "Size: " << mdsc.size;
  LOG(info) << "Offset: " << mdsc.offset;
}

bool CbmMQTsaInfo::CheckTimeslice(const fles::Timeslice& ts)
{
  if (0 == ts.num_components()) {
    LOG(error) << "No Component in TS " << ts.index();
    return 1;
  }
  LOG(info) << "Found " << ts.num_components() << " different components in timeslice";

  for (size_t c = 0; c < ts.num_components(); ++c) {
    LOG(info) << "Found " << ts.num_microslices(c) << " microslices in component " << c;
    LOG(info) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes";
    LOG(info) << "Component " << c << " has the system id 0x" << std::hex
              << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;

    /*
    for (size_t m = 0; m < ts.num_microslices(c); ++m) {
      PrintMicroSliceDescriptor(ts.descriptor(c,m));
    }
*/
  }

  return true;
}