Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • le.koch/cbmroot
  • patrick.pfistner_AT_kit.edu/cbmroot
  • lena.rossel_AT_stud.uni-frankfurt.de/cbmroot
  • i.deppner/cbmroot
  • fweig/cbmroot
  • karpushkin_AT_inr.ru/cbmroot
  • v.akishina/cbmroot
  • rishat.sultanov_AT_cern.ch/cbmroot
  • l_fabe01_AT_uni-muenster.de/cbmroot
  • pwg-c2f/cbmroot
  • j.decuveland/cbmroot
  • a.toia/cbmroot
  • i.vassiliev/cbmroot
  • n.herrmann/cbmroot
  • o.lubynets/cbmroot
  • se.gorbunov/cbmroot
  • cornelius.riesen_AT_physik.uni-giessen.de/cbmroot
  • zhangqn17_AT_mails.tsinghua.edu.cn/cbmroot
  • bartosz.sobol/cbmroot
  • ajit.kumar/cbmroot
  • computing/cbmroot
  • a.agarwal_AT_vecc.gov.in/cbmroot
  • osingh/cbmroot
  • wielanek_AT_if.pw.edu.pl/cbmroot
  • malgorzata.karabowicz.stud_AT_pw.edu.pl/cbmroot
  • m.shiroya/cbmroot
  • s.roy/cbmroot
  • p.-a.loizeau/cbmroot
  • a.weber/cbmroot
  • ma.beyer/cbmroot
  • d.klein/cbmroot
  • d.smith/cbmroot
  • mvdsoft/cbmroot
  • d.spicker/cbmroot
  • y.h.leung/cbmroot
  • m.deveaux/cbmroot
  • mkunold/cbmroot
  • h.darwish/cbmroot
  • f_fido01_AT_uni-muenster.de/cbmroot
  • g.kozlov/cbmroot
  • d.emschermann/cbmroot
  • evgeny.lavrik/cbmroot
  • v.friese/cbmroot
  • f.uhlig/cbmroot
  • ebechtel_AT_ikf.uni-frankfurt.de/cbmroot
  • a.senger/cbmroot
  • praisig/cbmroot
  • s.lebedev/cbmroot
  • redelbach_AT_compeng.uni-frankfurt.de/cbmroot
  • p.subramani/cbmroot
  • a_meye37_AT_uni-muenster.de/cbmroot
  • om/cbmroot
  • o.golosov/cbmroot
  • l.chlad/cbmroot
  • a.bercuci/cbmroot
  • d.ramirez/cbmroot
  • v.singhal/cbmroot
  • h.schiller/cbmroot
  • apuntke/cbmroot
  • f.zorn/cbmroot
  • rubio_AT_physi.uni-heidelberg.de/cbmroot
  • p.chudoba/cbmroot
  • apuntke/mcbmroot
  • r.karabowicz/cbmroot
64 results
Show changes
Showing
with 1393 additions and 1345 deletions
/**
* CbmDeviceMonitorT0.cxx
*
* @since 2019-03-26
* @author F. Uhlig
*/
#include "CbmDeviceMonitorT0.h"
#include "CbmMQDefs.h"
#include "CbmMcbm2018MonitorAlgoT0.h"
#include "CbmFlesCanvasTools.h"
#include "StorableTimeslice.hpp"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairParGenericSet.h"
#include "RootSerializer.h"
#include "BoostSerializer.h"
#include "TNamed.h"
#include "TList.h"
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"
#include <string>
#include <iomanip>
#include <array>
#include <boost/serialization/utility.hpp>
#include <boost/archive/binary_iarchive.hpp>
#include <stdexcept>
struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; };
using namespace std;
Bool_t bMcbm2018MonitorTaskT0ResetHistos = kFALSE;
CbmDeviceMonitorT0::CbmDeviceMonitorT0()
: fbIgnoreOverlapMs{ false }
, fsChannelNameDataInput{ "t0component" }
, fsChannelNameHistosInput{ "histogram-in" }
, fsChannelNameHistosConfig{ "histo-conf" }
, fsChannelNameCanvasConfig{ "canvas-conf" }
, fuHistoryHistoSize{ 3600 }
, fuMinTotPulser{ 185 }
, fuMaxTotPulser{ 195 }
, fuOffSpillCountLimit{ 1000 }
, fuPublishFreqTs{ 100 }
, fdMinPublishTime{ 0.5 }
, fdMaxPublishTime{ 5.0 }
, fsAllowedChannels{ fsChannelNameDataInput }
, fParCList{ nullptr }
, fulNumMessages{ 0 }
, fulTsCounter{ 0 }
, fLastPublishTime{ std::chrono::system_clock::now() }
, fMonitorAlgo{ new CbmMcbm2018MonitorAlgoT0() }
, fArrayHisto{ }
, fvpsHistosFolder{ }
, fvpsCanvasConfig{ }
{
}
void CbmDeviceMonitorT0::InitTask()
try
{
/// Read options from executable
LOG(info) << "Init options for CbmMqStarHistoServer.";
fbIgnoreOverlapMs = fConfig->GetValue< bool >( "IgnOverMs" );
fuHistoryHistoSize = fConfig->GetValue< uint32_t >( "HistEvoSz" );
fuMinTotPulser = fConfig->GetValue< uint32_t >( "PulsTotMin" );
fuMaxTotPulser = fConfig->GetValue< uint32_t >( "PulsTotMax" );
fuOffSpillCountLimit = fConfig->GetValue< uint32_t >( "SpillThr" );
fuPublishFreqTs = fConfig->GetValue< uint32_t >( "PubFreqTs" );
fdMinPublishTime = fConfig->GetValue< double_t >( "PubTimeMin" );
fdMaxPublishTime = fConfig->GetValue< double_t >( "PubTimeMax" );
fsChannelNameDataInput = fConfig->GetValue< std::string >( "TsNameIn" );
fsChannelNameHistosInput = fConfig->GetValue< std::string >( "ChNameIn" );
fsChannelNameHistosConfig = fConfig->GetValue< std::string >( "ChNameHistCfg" );
fsChannelNameCanvasConfig = fConfig->GetValue< std::string >( "ChNameCanvCfg" );
fsAllowedChannels[ 0 ] = fsChannelNameDataInput;
LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for( auto const &entry : fChannels )
{
LOG(info) << "Channel name: " << entry.first;
if( std::string::npos != entry.first.find( fsChannelNameDataInput ) )
{
if( !IsChannelNameAllowed( entry.first ) )
throw InitTaskError( "Channel name does not match." );
OnData( entry.first, &CbmDeviceMonitorT0::HandleData );
} // if( entry.first.find( "ts" )
} // for( auto const &entry : fChannels )
InitContainers();
} catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState( this, cbm::mq::Transition::ErrorFound );
}
bool CbmDeviceMonitorT0::IsChannelNameAllowed(std::string channelName)
{
for( auto const &entry : fsAllowedChannels ) {
std::size_t pos1 = channelName.find(entry);
if( pos1 != std::string::npos ) {
const vector< std::string >::const_iterator pos =
std::find( fsAllowedChannels.begin(), fsAllowedChannels.end(), entry );
const vector< std::string >::size_type idx = pos - fsAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName
<< " found in list of allowed channel names at position " << idx;
return true;
} // if (pos1!=std::string::npos)
} // for(auto const &entry : fsAllowedChannels)
LOG(info) << "Channel name " << channelName
<< " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
Bool_t CbmDeviceMonitorT0::InitContainers()
{
LOG(info) << "Init parameter containers for CbmDeviceMonitorT0.";
fParCList = fMonitorAlgo->GetParList();
for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ ) {
FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
fParCList->Remove( tempObj );
std::string paramName{ tempObj->GetName() };
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
// Her must come the proper Runid
std::string message = paramName + ",111";
LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
FairMQMessagePtr req( NewSimpleMessage(message) );
FairMQMessagePtr rep( NewMessage() );
FairParGenericSet* newObj = nullptr;
if ( Send(req, "parameters") > 0 ) {
if ( Receive( rep, "parameters" ) >= 0) {
if ( rep->GetSize() != 0 ) {
CbmMQTMessage tmsg( rep->GetData(), rep->GetSize() );
newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
LOG( info ) << "Received unpack parameter from the server:";
newObj->print();
} else {
LOG( error ) << "Received empty reply. Parameter not available";
} // if (rep->GetSize() != 0)
} // if (Receive(rep, "parameters") >= 0)
} // if (Send(req, "parameters") > 0)
fParCList->AddAt( newObj, iparC );
delete tempObj;
} // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
/// Need to add accessors for all options
fMonitorAlgo->SetIgnoreOverlapMs( fbIgnoreOverlapMs );
fMonitorAlgo->SetMonitorMode( kTRUE );
fMonitorAlgo->SetHistoryHistoSize( fuHistoryHistoSize );
fMonitorAlgo->SetPulserTotLimits( fuMinTotPulser, fuMaxTotPulser );
fMonitorAlgo->SetSpillThreshold( fuOffSpillCountLimit );
// fMonitorAlgo->AddMsComponentToList(0, 0x90);
Bool_t initOK = fMonitorAlgo->InitContainers();
// Bool_t initOK = fMonitorAlgo->ReInitContainers();
/// Histos creation and obtain pointer on them
/// Trigger histo creation on all associated algos
initOK &= fMonitorAlgo->CreateHistograms();
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector< std::pair< TNamed *, std::string > > vHistos = fMonitorAlgo->GetHistoVector();
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
std::vector< std::pair< TCanvas *, std::string > > vCanvases = fMonitorAlgo->GetCanvasVector();
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
{
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ].second.data()
// ;
fArrayHisto.Add( vHistos[ uHisto ].first );
std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(),
vHistos[ uHisto ].second );
fvpsHistosFolder.push_back( psHistoConfig );
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist( NewMessage() );
Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig );
/// Send message to the common histogram config messages queue
if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
{
LOG(error) << "Problem sending histo config";
return false;
} // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
LOG(info) << "Config of hist " << psHistoConfig.first.data()
<< " in folder " << psHistoConfig.second.data() ;
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
{
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ].second.data();
std::string sCanvName = (vCanvases[ uCanv ].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first );
std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf );
fvpsCanvasConfig.push_back( psCanvConfig );
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan( NewMessage() );
Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig );
/// Send message to the common canvas config messages queue
if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
{
LOG(error) << "Problem sending canvas config";
return false;
} // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
LOG(info) << "Config string of Canvas " << psCanvConfig.first.data()
<< " is " << psCanvConfig.second.data() ;
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
return initOK;
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMonitorT0::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
fulNumMessages++;
LOG(debug) << "Received message number "<< fulNumMessages
<< " with size " << msg->GetSize();
if( 0 == fulNumMessages % 10000 )
LOG(info) << "Received " << fulNumMessages << " messages";
std::string msgStr( static_cast<char*>( msg->GetData() ), msg->GetSize() );
std::istringstream iss( msgStr );
boost::archive::binary_iarchive inputArchive( iss );
/// Create an empty TS and fill it with the incoming message
fles::StorableTimeslice component{ 0 };
inputArchive >> component;
/// Process the Timeslice
DoUnpack(component, 0);
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
if( ( fdMaxPublishTime < elapsedSeconds.count() ) ||
( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
{
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
return true;
}
bool CbmDeviceMonitorT0::SendHistograms()
{
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message( NewMessage() );
Serialize<RootSerializer>( *message, &fArrayHisto );
// test code to check if deserialization works
/*
TObject* tempObject = nullptr;
Deserialize<RootDeserializer>(*message, tempObject);
if (TString(tempObject->ClassName()).EqualTo("TObjArray")) {
TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
LOG(info) << "Array contains " << arrayHisto->GetEntriesFast()
<< " entries";
for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) {
TObject* obj = arrayHisto->At(i);
LOG(info) << obj->GetName();
TH1* histogram = static_cast<TH1*>(obj);
LOG(info) << histogram->GetNbinsX();
}
}
*/
/// Send message to the common histogram messages queue
if( Send( message, fsChannelNameHistosInput ) < 0 )
{
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
fMonitorAlgo->ResetHistograms( kFALSE );
return true;
}
CbmDeviceMonitorT0::~CbmDeviceMonitorT0()
{
}
Bool_t CbmDeviceMonitorT0::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
{
fulTsCounter++;
if( kFALSE == fbComponentsAddedToList )
{
for( uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx )
{
if( kusSysId == ts.descriptor( uCompIdx, 0 ).sys_id )
{
fMonitorAlgo->AddMsComponentToList( uCompIdx, kusSysId );
} // if( kusSysId == ts.descriptor( uCompIdx, 0 ).sys_id )
} // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
fbComponentsAddedToList = kTRUE;
} // if( kFALSE == fbComponentsAddedToList )
if( /* fbMonitorMode && */ bMcbm2018MonitorTaskT0ResetHistos )
{
LOG(info) << "Reset T0 Monitor histos ";
fMonitorAlgo->ResetHistograms();
bMcbm2018MonitorTaskT0ResetHistos = kFALSE;
} // if( fbMonitorMode && bMcbm2018MonitorTaskT0ResetHistos )
if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
{
LOG(error) << "Failed processing TS " << ts.index()
<< " in unpacker algorithm class";
return kTRUE;
} // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
/// Clear the digis vector in case it was filled
fMonitorAlgo->ClearVector();
if( 0 == fulTsCounter % 10000 )
LOG(info) << "Processed " << fulTsCounter << " time slices";
return kTRUE;
}
void CbmDeviceMonitorT0::Finish()
{
}
/**
* CbmDeviceMonitorT0.h
*
* @since 2019-03-26
* @author F. Uhlig
*/
#ifndef CBMDEVICEMONITORT0_H_
#define CBMDEVICEMONITORT0_H_
#include "FairMQDevice.h"
#include "Timeslice.hpp"
#include "TMessage.h"
#include "Rtypes.h"
#include "TObjArray.h"
#include <vector>
#include <map>
#include <chrono>
class TList;
class CbmMcbm2018MonitorAlgoT0;
class CbmDeviceMonitorT0: public FairMQDevice
{
public:
CbmDeviceMonitorT0();
virtual ~CbmDeviceMonitorT0();
protected:
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
private:
/// Constants
static const uint16_t kusSysId = 0x90;
/// Control flags
Bool_t fbIgnoreOverlapMs; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice
Bool_t fbComponentsAddedToList = kFALSE;
/// User settings parameters
std::string fsChannelNameDataInput;
std::string fsChannelNameHistosInput;
std::string fsChannelNameHistosConfig;
std::string fsChannelNameCanvasConfig;
uint32_t fuHistoryHistoSize;
uint32_t fuMinTotPulser;
uint32_t fuMaxTotPulser;
uint32_t fuOffSpillCountLimit;
uint32_t fuPublishFreqTs;
double_t fdMinPublishTime;
double_t fdMaxPublishTime;
/// List of MQ channels names
std::vector< std::string > fsAllowedChannels;
/// Parameters management
TList* fParCList;
/// Statistics & first TS rejection
uint64_t fulNumMessages;
uint64_t fulTsCounter;
std::chrono::system_clock::time_point fLastPublishTime;
/// Processing algo
CbmMcbm2018MonitorAlgoT0 * fMonitorAlgo;
/// Array of histograms to send to the histogram server
TObjArray fArrayHisto;
/// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder;
/// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server
/// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)"
/// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)"
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig;
bool IsChannelNameAllowed(std::string channelName);
Bool_t InitContainers();
Bool_t DoUnpack(const fles::Timeslice& ts, size_t component);
void Finish();
bool SendHistograms();
};
// special class to expose protected TMessage constructor
class CbmMQTMessage : public TMessage
{
public:
CbmMQTMessage(void* buf, Int_t len)
: TMessage(buf, len)
{
ResetBit(kIsOwner);
}
};
#endif /* CBMDEVICEMONITORT0_H_ */
/* Copyright (C) 2020-2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
/**
* CbmDeviceMonitorTof.cxx
*
......@@ -6,236 +10,205 @@
*/
#include "CbmDeviceMonitorTof.h"
#include "CbmMQDefs.h"
#include "CbmMcbm2018MonitorAlgoTof.h"
#include "CbmFlesCanvasTools.h"
#include "CbmMQDefs.h"
#include "CbmMcbm2018MonitorAlgoTof.h"
#include "StorableTimeslice.hpp"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairMQProgOptions.h" // device->fConfig
#include "FairParGenericSet.h"
#include "RootSerializer.h"
#include "BoostSerializer.h"
#include "TNamed.h"
#include "TList.h"
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"
#include "TList.h"
#include "TNamed.h"
#include <string>
#include <iomanip>
#include <array>
#include <boost/serialization/utility.hpp>
#include "BoostSerializer.h"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/serialization/utility.hpp>
#include <array>
#include <iomanip>
#include <stdexcept>
struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; };
#include <string>
using namespace std;
#include "RootSerializer.h"
struct InitTaskError : std::runtime_error {
using std::runtime_error::runtime_error;
};
Bool_t bMcbm2018MonitorTaskTofResetHistos = kFALSE;
using namespace std;
CbmDeviceMonitorTof::CbmDeviceMonitorTof()
: fMonitorAlgo{ new CbmMcbm2018MonitorAlgoTof() }
{
}
CbmDeviceMonitorTof::CbmDeviceMonitorTof() : fMonitorAlgo {new CbmMcbm2018MonitorAlgoTof()} {}
void CbmDeviceMonitorTof::InitTask()
try
{
/// Read options from executable
LOG(info) << "Init options for CbmMqStarHistoServer.";
fbIgnoreOverlapMs = fConfig->GetValue< bool >( "IgnOverMs" );
fbDebugMonitorMode = fConfig->GetValue< bool >( "DebugMoni" );
fbIgnoreCriticalErrors = fConfig->GetValue< bool >( "IgnCritErr" );
fuHistoryHistoSize = fConfig->GetValue< uint32_t >( "HistEvoSz" );
fuMinTotPulser = fConfig->GetValue< uint32_t >( "PulsTotMin" );
fuMaxTotPulser = fConfig->GetValue< uint32_t >( "PulsTotMax" );
fiGdpbIndex = fConfig->GetValue< int32_t >( "GdpbIdx" );
fuPublishFreqTs = fConfig->GetValue< uint32_t >( "PubFreqTs" );
fdMinPublishTime = fConfig->GetValue< double_t >( "PubTimeMin" );
fdMaxPublishTime = fConfig->GetValue< double_t >( "PubTimeMax" );
fsChannelNameDataInput = fConfig->GetValue< std::string >( "TsNameIn" );
fsChannelNameHistosInput = fConfig->GetValue< std::string >( "ChNameIn" );
fsChannelNameHistosConfig = fConfig->GetValue< std::string >( "ChNameHistCfg" );
fsChannelNameCanvasConfig = fConfig->GetValue< std::string >( "ChNameCanvCfg" );
fsAllowedChannels[ 0 ] = fsChannelNameDataInput;
LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for( auto const &entry : fChannels )
{
LOG(info) << "Channel name: " << entry.first;
if( std::string::npos != entry.first.find( fsChannelNameDataInput ) )
{
if( !IsChannelNameAllowed( entry.first ) )
throw InitTaskError( "Channel name does not match." );
OnData( entry.first, &CbmDeviceMonitorTof::HandleData );
} // if( std::string::npos != entry.first.find( fsChannelNameDataInput ) )
} // for( auto const &entry : fChannels )
InitContainers();
} catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState( this, cbm::mq::Transition::ErrorFound );
try {
/// Read options from executable
LOG(info) << "Init options for CbmMqStarHistoServer.";
fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs");
fbDebugMonitorMode = fConfig->GetValue<bool>("DebugMoni");
fbIgnoreCriticalErrors = fConfig->GetValue<bool>("IgnCritErr");
fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz");
fuMinTotPulser = fConfig->GetValue<uint32_t>("PulsTotMin");
fuMaxTotPulser = fConfig->GetValue<uint32_t>("PulsTotMax");
fiGdpbIndex = fConfig->GetValue<int32_t>("GdpbIdx");
fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
fsAllowedChannels[0] = fsChannelNameDataInput;
LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
/// Set the Monitor Algo in Absolute time scale
fMonitorAlgo->UseAbsoluteTime();
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for (auto const& entry : fChannels) {
LOG(info) << "Channel name: " << entry.first;
if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
OnData(entry.first, &CbmDeviceMonitorTof::HandleData);
} // if( std::string::npos != entry.first.find( fsChannelNameDataInput ) )
} // for( auto const &entry : fChannels )
}
catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}
bool CbmDeviceMonitorTof::IsChannelNameAllowed(std::string channelName)
{
for( auto const &entry : fsAllowedChannels ) {
std::size_t pos1 = channelName.find(entry);
if( pos1 != std::string::npos ) {
const vector< std::string >::const_iterator pos =
std::find( fsAllowedChannels.begin(), fsAllowedChannels.end(), entry );
const vector< std::string >::size_type idx = pos - fsAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName
<< " found in list of allowed channel names at position " << idx;
return true;
} // if (pos1!=std::string::npos)
} // for(auto const &entry : fsAllowedChannels)
LOG(info) << "Channel name " << channelName
<< " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
for (auto const& entry : fsAllowedChannels) {
std::size_t pos1 = channelName.find(entry);
if (pos1 != std::string::npos) {
const vector<std::string>::const_iterator pos =
std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
return true;
} // if (pos1!=std::string::npos)
} // for(auto const &entry : fsAllowedChannels)
LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
Bool_t CbmDeviceMonitorTof::InitContainers()
{
LOG(info) << "Init parameter containers for CbmDeviceMonitorTof.";
fParCList = fMonitorAlgo->GetParList();
for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ ) {
FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
fParCList->Remove( tempObj );
std::string paramName{ tempObj->GetName() };
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
// Her must come the proper Runid
std::string message = paramName + ",111";
LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
FairMQMessagePtr req( NewSimpleMessage(message) );
FairMQMessagePtr rep( NewMessage() );
FairParGenericSet* newObj = nullptr;
if ( Send(req, "parameters") > 0 ) {
if ( Receive( rep, "parameters" ) >= 0) {
if ( rep->GetSize() != 0 ) {
CbmMQTMessage tmsg( rep->GetData(), rep->GetSize() );
newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
LOG( info ) << "Received unpack parameter from the server:";
newObj->print();
} else {
LOG( error ) << "Received empty reply. Parameter not available";
} // if (rep->GetSize() != 0)
} // if (Receive(rep, "parameters") >= 0)
} // if (Send(req, "parameters") > 0)
fParCList->AddAt( newObj, iparC );
delete tempObj;
} // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
/// Need to add accessors for all options
fMonitorAlgo->SetIgnoreOverlapMs( fbIgnoreOverlapMs );
fMonitorAlgo->SetDebugMonitorMode( fbDebugMonitorMode );
fMonitorAlgo->SetIgnoreCriticalErrors( fbIgnoreCriticalErrors );
fMonitorAlgo->SetHistoryHistoSize( fuHistoryHistoSize );
fMonitorAlgo->SetPulserTotLimits( fuMinTotPulser, fuMaxTotPulser );
fMonitorAlgo->SetGdpbIndex( fiGdpbIndex );
Bool_t initOK = fMonitorAlgo->InitContainers();
// Bool_t initOK = fMonitorAlgo->ReInitContainers();
/// Histos creation and obtain pointer on them
/// Trigger histo creation on all associated algos
initOK &= fMonitorAlgo->CreateHistograms();
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector< std::pair< TNamed *, std::string > > vHistos = fMonitorAlgo->GetHistoVector();
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
std::vector< std::pair< TCanvas *, std::string > > vCanvases = fMonitorAlgo->GetCanvasVector();
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
{
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ].second.data()
// ;
fArrayHisto.Add( vHistos[ uHisto ].first );
std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(),
vHistos[ uHisto ].second );
fvpsHistosFolder.push_back( psHistoConfig );
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist( NewMessage() );
Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig );
/// Send message to the common histogram config messages queue
if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
{
LOG(error) << "Problem sending histo config";
return false;
} // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
LOG(info) << "Config of hist " << psHistoConfig.first.data()
<< " in folder " << psHistoConfig.second.data() ;
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
{
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ].second.data();
std::string sCanvName = (vCanvases[ uCanv ].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first );
std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf );
fvpsCanvasConfig.push_back( psCanvConfig );
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan( NewMessage() );
Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig );
/// Send message to the common canvas config messages queue
if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
{
LOG(error) << "Problem sending canvas config";
return false;
} // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
LOG(info) << "Config string of Canvas " << psCanvConfig.first.data()
<< " is " << psCanvConfig.second.data() ;
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
LOG(info) << "Init parameter containers for CbmDeviceMonitorTof.";
fParCList = fMonitorAlgo->GetParList();
for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
fParCList->Remove(tempObj);
std::string paramName {tempObj->GetName()};
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
// Her must come the proper Runid
std::string message = paramName + ",111";
LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
FairMQMessagePtr req(NewSimpleMessage(message));
FairMQMessagePtr rep(NewMessage());
FairParGenericSet* newObj = nullptr;
if (Send(req, "parameters") > 0) {
if (Receive(rep, "parameters") >= 0) {
if (rep->GetSize() != 0) {
CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
LOG(info) << "Received unpack parameter from the server:";
newObj->print();
}
else {
LOG(error) << "Received empty reply. Parameter not available";
} // if (rep->GetSize() != 0)
} // if (Receive(rep, "parameters") >= 0)
} // if (Send(req, "parameters") > 0)
fParCList->AddAt(newObj, iparC);
delete tempObj;
} // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
/// Need to add accessors for all options
fMonitorAlgo->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
fMonitorAlgo->SetDebugMonitorMode(fbDebugMonitorMode);
fMonitorAlgo->SetIgnoreCriticalErrors(fbIgnoreCriticalErrors);
fMonitorAlgo->SetHistoryHistoSize(fuHistoryHistoSize);
fMonitorAlgo->SetPulserTotLimits(fuMinTotPulser, fuMaxTotPulser);
fMonitorAlgo->SetGdpbIndex(fiGdpbIndex);
Bool_t initOK = fMonitorAlgo->InitContainers();
return initOK;
}
bool CbmDeviceMonitorTof::InitHistograms()
{
/// Histos creation and obtain pointer on them
/// Trigger histo creation on all associated algos
bool initOK = fMonitorAlgo->CreateHistograms();
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ].second.data()
// ;
fArrayHisto.Add(vHistos[uHisto].first);
std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
fvpsHistosFolder.push_back(psHistoConfig);
LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ].second.data();
std::string sCanvName = (vCanvases[uCanv].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
fvpsCanvasConfig.push_back(psCanvConfig);
LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
return initOK;
}
......@@ -244,126 +217,152 @@ Bool_t CbmDeviceMonitorTof::InitContainers()
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMonitorTof::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
fulNumMessages++;
LOG(debug) << "Received message number "<< fulNumMessages
<< " with size " << msg->GetSize();
if( 0 == fulNumMessages % 10000 )
LOG(info) << "Received " << fulNumMessages << " messages";
std::string msgStr( static_cast<char*>( msg->GetData() ), msg->GetSize() );
std::istringstream iss( msgStr );
boost::archive::binary_iarchive inputArchive( iss );
/// Create an empty TS and fill it with the incoming message
fles::StorableTimeslice component{ 0 };
inputArchive >> component;
/// Process the Timeslice
DoUnpack(component, 0);
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
if( ( fdMaxPublishTime < elapsedSeconds.count() ) ||
( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
{
if (0 == fulNumMessages) {
try {
InitContainers();
}
catch (InitTaskError& e) {
LOG(error) << e.what();
ChangeState(fair::mq::Transition::ErrorFound);
}
} // if( 0 == fulNumMessages)
if (0 == fulNumMessages) InitHistograms();
fulNumMessages++;
LOG(debug) << "Received message number " << fulNumMessages << " with size " << msg->GetSize();
if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
std::istringstream iss(msgStr);
boost::archive::binary_iarchive inputArchive(iss);
/// Create an empty TS and fill it with the incoming message
fles::StorableTimeslice component {0};
inputArchive >> component;
/// Process the Timeslice
DoUnpack(component, 0);
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
if ((fdMaxPublishTime < elapsedSeconds.count())
|| (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
if (!fbConfigSent) {
// Send the configuration only once per run!
fbConfigSent = SendHistoConfAndData();
} // if( !fbConfigSent )
else
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
return true;
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
return true;
}
bool CbmDeviceMonitorTof::SendHistoConfAndData()
{
/// Prepare multiparts message and header
std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
FairMQMessagePtr messageHeader(NewMessage());
// Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
FairMQParts partsOut;
partsOut.AddPart(std::move(messageHeader));
for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist(NewMessage());
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
partsOut.AddPart(std::move(messageHist));
} // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan(NewMessage());
// Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
partsOut.AddPart(std::move(messageCan));
} // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr msgHistos(NewMessage());
// Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
RootSerializer().Serialize(*msgHistos, &fArrayHisto);
partsOut.AddPart(std::move(msgHistos));
/// Send the multi-parts message to the common histogram messages queue
if (Send(partsOut, fsChannelNameHistosInput) < 0) {
LOG(error) << "CbmDeviceMonitorTof::SendHistoConfAndData => Problem sending data";
return false;
} // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
fMonitorAlgo->ResetHistograms(kFALSE);
return true;
}
bool CbmDeviceMonitorTof::SendHistograms()
{
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message( NewMessage() );
Serialize<RootSerializer>( *message, &fArrayHisto );
// test code to check if deserialization works
/*
TObject* tempObject = nullptr;
Deserialize<RootDeserializer>(*message, tempObject);
if (TString(tempObject->ClassName()).EqualTo("TObjArray")) {
TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
LOG(info) << "Array contains " << arrayHisto->GetEntriesFast()
<< " entries";
for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) {
TObject* obj = arrayHisto->At(i);
LOG(info) << obj->GetName();
TH1* histogram = static_cast<TH1*>(obj);
LOG(info) << histogram->GetNbinsX();
}
}
*/
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message(NewMessage());
// Serialize<RootSerializer>(*message, &fArrayHisto);
RootSerializer().Serialize(*message, &fArrayHisto);
/// Send message to the common histogram messages queue
if( Send( message, fsChannelNameHistosInput ) < 0 )
{
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )
/// Send message to the common histogram messages queue
if (Send(message, fsChannelNameHistosInput) < 0) {
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
fMonitorAlgo->ResetHistograms( kFALSE );
/// Reset the histograms after sending them (but do not reset the time)
fMonitorAlgo->ResetHistograms(kFALSE);
return true;
return true;
}
CbmDeviceMonitorTof::~CbmDeviceMonitorTof()
{
}
CbmDeviceMonitorTof::~CbmDeviceMonitorTof() {}
Bool_t CbmDeviceMonitorTof::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
{
fulTsCounter++;
if( kFALSE == fbComponentsAddedToList )
{
for( uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx )
{
if( kusSysIdTof == ts.descriptor( uCompIdx, 0 ).sys_id )
{
fMonitorAlgo->AddMsComponentToList( uCompIdx, kusSysIdTof );
} // if( kusSysIdTof == ts.descriptor( uCompIdx, 0 ).sys_id )
else if( kusSysIdT0 == ts.descriptor( uCompIdx, 0 ).sys_id )
{
fMonitorAlgo->AddMsComponentToList( uCompIdx, kusSysIdT0 );
} // if( kusSysIdT0 == ts.descriptor( uCompIdx, 0 ).sys_id )
} // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
fbComponentsAddedToList = kTRUE;
} // if( kFALSE == fbComponentsAddedToList )
if( /* fbMonitorMode && */ bMcbm2018MonitorTaskTofResetHistos )
{
LOG(info) << "Reset TOF Monitor histos ";
fMonitorAlgo->ResetHistograms();
bMcbm2018MonitorTaskTofResetHistos = kFALSE;
} // if( fbMonitorMode && bMcbm2018MonitorTaskTofResetHistos )
if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
{
LOG(error) << "Failed processing TS " << ts.index()
<< " in unpacker algorithm class";
return kTRUE;
} // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
/// Clear the digis vector in case it was filled
fMonitorAlgo->ClearVector();
if( 0 == fulTsCounter % 10000 )
LOG(info) << "Processed " << fulTsCounter << " time slices";
return kTRUE;
fulTsCounter++;
if (kFALSE == fbComponentsAddedToList) {
for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
if (kusSysIdTof == ts.descriptor(uCompIdx, 0).sys_id) {
fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysIdTof);
} // if( kusSysIdTof == ts.descriptor( uCompIdx, 0 ).sys_id )
else if (kusSysIdBmon == ts.descriptor(uCompIdx, 0).sys_id) {
fMonitorAlgo->AddMsComponentToList(uCompIdx, kusSysIdBmon);
} // if( kusSysIdBmon == ts.descriptor( uCompIdx, 0 ).sys_id )
} // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
fbComponentsAddedToList = kTRUE;
} // if( kFALSE == fbComponentsAddedToList )
if (kFALSE == fMonitorAlgo->ProcessTs(ts)) {
LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class";
return kTRUE;
} // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
/// Clear the digis vector in case it was filled
fMonitorAlgo->ClearVector();
if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";
return kTRUE;
}
void CbmDeviceMonitorTof::Finish()
{
}
void CbmDeviceMonitorTof::Finish() {}
/* Copyright (C) 2020-2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
/**
* CbmDeviceMonitorTof.h
*
......@@ -8,95 +12,85 @@
#ifndef CBMDEVICEMONITORTOF_H_
#define CBMDEVICEMONITORTOF_H_
#include "FairMQDevice.h"
#include "CbmMqTMessage.h"
#include "Timeslice.hpp"
#include "TMessage.h"
#include "FairMQDevice.h"
#include "Rtypes.h"
#include "TObjArray.h"
#include <vector>
#include <map>
#include <chrono>
#include <map>
#include <vector>
class TList;
class CbmMcbm2018MonitorAlgoTof;
class CbmDeviceMonitorTof: public FairMQDevice
{
public:
CbmDeviceMonitorTof();
virtual ~CbmDeviceMonitorTof();
protected:
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
private:
/// Constants
static const uint16_t kusSysIdTof = 0x60;
static const uint16_t kusSysIdT0 = 0x90;
/// Control flags
Bool_t fbIgnoreOverlapMs = kFALSE; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice
Bool_t fbDebugMonitorMode = kFALSE; //! Switch ON the filling of a additional set of histograms
Bool_t fbIgnoreCriticalErrors = kTRUE; //! If ON not printout at all for critical errors
Bool_t fbComponentsAddedToList = kFALSE;
/// User settings parameters
std::string fsChannelNameDataInput = "tofcomponent";
std::string fsChannelNameHistosInput = "histogram-in";
std::string fsChannelNameHistosConfig = "histo-conf";
std::string fsChannelNameCanvasConfig = "canvas-conf";
uint32_t fuHistoryHistoSize = 3600;
uint32_t fuMinTotPulser = 185;
uint32_t fuMaxTotPulser = 195;
int32_t fiGdpbIndex = -1;
uint32_t fuPublishFreqTs = 100;
double_t fdMinPublishTime = 0.5;
double_t fdMaxPublishTime = 5.0;
/// List of MQ channels names
std::vector< std::string > fsAllowedChannels = { fsChannelNameDataInput };
/// Parameters management
TList* fParCList = nullptr;
/// Statistics & first TS rejection
uint64_t fulNumMessages = 0;
uint64_t fulTsCounter = 0;
std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now();
/// Processing algo
CbmMcbm2018MonitorAlgoTof * fMonitorAlgo;
/// Array of histograms to send to the histogram server
TObjArray fArrayHisto = {};
/// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder = {};
/// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server
/// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)"
/// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)"
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig = {};
bool IsChannelNameAllowed(std::string channelName);
Bool_t InitContainers();
Bool_t DoUnpack(const fles::Timeslice& ts, size_t component);
void Finish();
bool SendHistograms();
class CbmDeviceMonitorTof : public FairMQDevice {
public:
CbmDeviceMonitorTof();
virtual ~CbmDeviceMonitorTof();
protected:
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
private:
/// Constants
static const uint16_t kusSysIdTof = 0x60;
static const uint16_t kusSysIdBmon = 0x90;
/// Control flags
Bool_t fbIgnoreOverlapMs = kFALSE; //! Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice
Bool_t fbDebugMonitorMode = kFALSE; //! Switch ON the filling of a additional set of histograms
Bool_t fbIgnoreCriticalErrors = kTRUE; //! If ON not printout at all for critical errors
Bool_t fbComponentsAddedToList = kFALSE;
/// User settings parameters
std::string fsChannelNameDataInput = "tofcomponent";
std::string fsChannelNameHistosInput = "histogram-in";
uint32_t fuHistoryHistoSize = 3600;
uint32_t fuMinTotPulser = 185;
uint32_t fuMaxTotPulser = 195;
int32_t fiGdpbIndex = -1;
uint32_t fuPublishFreqTs = 100;
double_t fdMinPublishTime = 0.5;
double_t fdMaxPublishTime = 5.0;
/// List of MQ channels names
std::vector<std::string> fsAllowedChannels = {fsChannelNameDataInput};
/// Parameters management
TList* fParCList = nullptr;
/// Statistics & first TS rejection
uint64_t fulNumMessages = 0;
uint64_t fulTsCounter = 0;
std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now();
/// Processing algo
CbmMcbm2018MonitorAlgoTof* fMonitorAlgo;
/// Array of histograms to send to the histogram server
TObjArray fArrayHisto = {};
/// Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server
std::vector<std::pair<std::string, std::string>> fvpsHistosFolder = {};
/// Vector of string pairs with ( CanvasName, CanvasConfig ) to send to the histogram server
/// Format of Can config is "NbPadX(U);NbPadY(U);ConfigPad1(s);....;ConfigPadXY(s)"
/// Format of Pad config is "GrixX(b),GridY(b),LogX(b),LogY(b),LogZ(b),HistoName(s),DrawOptions(s)"
std::vector<std::pair<std::string, std::string>> fvpsCanvasConfig = {};
/// Flag indicating whether the histograms and canvases configurations were already published
bool fbConfigSent = false;
bool IsChannelNameAllowed(std::string channelName);
bool InitContainers();
bool InitHistograms();
Bool_t DoUnpack(const fles::Timeslice& ts, size_t component);
void Finish();
bool SendHistoConfAndData();
bool SendHistograms();
};
// special class to expose protected TMessage constructor
class CbmMQTMessage : public TMessage
{
public:
CbmMQTMessage(void* buf, Int_t len)
: TMessage(buf, len)
{
ResetBit(kIsOwner);
}
};
#endif /* CBMDEVICEMONITORTOF_H_ */
/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau, Florian Uhlig [committer] */
#include "CbmDeviceMonitorBmon.h"
#include <iomanip>
#include <string>
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true");
options.add_options()("HistEvoSz", bpo::value<uint32_t>()->default_value(1800),
"Size of evolution histos in seconds");
options.add_options()("PulsTotMin", bpo::value<uint32_t>()->default_value(185), "Minimal TOT for pulser cut");
options.add_options()("PulsTotMax", bpo::value<uint32_t>()->default_value(195), "Maximal TOT for pulser cut");
options.add_options()("SpillThr", bpo::value<uint32_t>()->default_value(25), "Hits Nb Thr for spill detection");
options.add_options()("SpillThrNonPuls", bpo::value<uint32_t>()->default_value(10),
"Non pulser Hits Nb Thr for spill detection");
options.add_options()("SpillCheckInt", bpo::value<double>()->default_value(0.128),
"Interval in seconds between count checks for spill detection");
options.add_options()("ChanMap", bpo::value<std::string>()->default_value("0,1,2,3,4,5,6,7"),
"Set Bmon channel map e.g. 0,1,2,3,4,5,6,7");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("t0component"),
"MQ channel name for TS data");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceMonitorBmon(); }
/* Copyright (C) 2021 Institute for Nuclear Research, Moscow
SPDX-License-Identifier: GPL-3.0-only
Authors: Nikolay Karpushkin [committer] */
#include "CbmDeviceMonitorPsd.h"
#include <iomanip>
#include <string>
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true");
options.add_options()("MonitorMode", bpo::value<bool>()->default_value(true), "Monitor mode ON/OFF");
options.add_options()("MonitorChanMode", bpo::value<bool>()->default_value(false), "Monitor channelwise mode ON/OFF");
options.add_options()("MonitorWfmMode", bpo::value<bool>()->default_value(false), "Monitor waveform mode ON/OFF");
options.add_options()("MonitorFitMode", bpo::value<bool>()->default_value(false), "Monitor fit waveform mode ON/OFF");
options.add_options()("HistEvoSz", bpo::value<uint32_t>()->default_value(1800),
"Size of evolution histos in seconds");
options.add_options()("HistChrgArgs", bpo::value<vector<int>>()->multitoken(), "Charge histos arguments");
options.add_options()("HistAmplArgs", bpo::value<vector<int>>()->multitoken(), "Ampl histos arguments");
options.add_options()("HistZlArgs", bpo::value<vector<int>>()->multitoken(), "ZL histos arguments");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("psdcomponent"),
"MQ channel name for TS data");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
options.add_options()("ChNameHistCfg", bpo::value<std::string>()->default_value("histo-conf"),
"MQ channel name for histos config");
options.add_options()("ChNameCanvCfg", bpo::value<std::string>()->default_value("canvas-conf"),
"MQ channel name for canvases config");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceMonitorPsd(); }
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmDeviceMonitorReqBmon.h"
#include <iomanip>
#include <string>
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true");
options.add_options()("HistEvoSz", bpo::value<uint32_t>()->default_value(1800),
"Size of evolution histos in seconds");
options.add_options()("PulsTotMin", bpo::value<uint32_t>()->default_value(185), "Minimal TOT for pulser cut");
options.add_options()("PulsTotMax", bpo::value<uint32_t>()->default_value(195), "Maximal TOT for pulser cut");
options.add_options()("SpillThr", bpo::value<uint32_t>()->default_value(25), "Hits Nb Thr for spill detection");
options.add_options()("SpillThrNonPuls", bpo::value<uint32_t>()->default_value(10),
"Non pulser Hits Nb Thr for spill detection");
options.add_options()("SpillCheckInt", bpo::value<double>()->default_value(0.128),
"Interval in seconds between count checks for spill detection");
options.add_options()("ChanMap", bpo::value<std::string>()->default_value("0,1,2,3,4,5,6,7"),
"Set Bmon channel map e.g. 0,1,2,3,4,5,6,7");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("ts-request"),
"MQ channel name for TS data");
options.add_options()("TsBlockName", bpo::value<std::string>()->default_value(""),
"Block name for requesting TS data, Bmon SysId request if empty");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceMonitorReqBmon(); }
/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmDeviceMonitorReqTof.h"
#include <iomanip>
#include <string>
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true");
options.add_options()("DebugMoni", bpo::value<bool>()->default_value(false), "Debug Monitor Mode");
options.add_options()("IgnCritErr", bpo::value<bool>()->default_value(true), "Ignore Critical Errors");
options.add_options()("HistEvoSz", bpo::value<uint32_t>()->default_value(1800),
"Size of evolution histos in seconds");
options.add_options()("PulsTotMin", bpo::value<uint32_t>()->default_value(185), "Minimal TOT for pulser cut");
options.add_options()("PulsTotMax", bpo::value<uint32_t>()->default_value(195), "Maximal TOT for pulser cut");
options.add_options()("GdpbIdx", bpo::value<int32_t>()->default_value(-1),
"Single gDPB selection by index, -1 (default) to disable");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("ts-request"),
"MQ channel name for TS data");
options.add_options()("TsBlockName", bpo::value<std::string>()->default_value(""),
"Block name for requesting TS data, TOF SysId request if empty");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceMonitorReqTof(); }
#include "runFairMQDevice.h"
#include "CbmDeviceMonitorT0.h"
#include <string>
#include <iomanip>
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
{
options.add_options() ("IgnOverMs", bpo::value< bool >()->default_value( true ),
"Ignore overlap MS if true");
options.add_options() ("HistEvoSz", bpo::value< uint32_t >()->default_value( 1800 ),
"Size of evolution histos in seconds");
options.add_options() ("PulsTotMin", bpo::value< uint32_t >()->default_value( 185 ),
"Minimal TOT for pulser cut");
options.add_options() ("PulsTotMax", bpo::value< uint32_t >()->default_value( 195 ),
"Maximal TOT for pulser cut");
options.add_options() ("SpillThr", bpo::value< uint32_t >()->default_value( 1000 ),
"Hits Nb Thr for spill detection");
options.add_options() ("PubFreqTs", bpo::value< uint32_t >()->default_value( 100 ),
"Histo publishing frequency in TS");
options.add_options() ("PubTimeMin", bpo::value< double_t >()->default_value( 1.0 ),
"Minimal time between two publishing");
options.add_options() ("PubTimeMax", bpo::value< double_t >()->default_value( 10.0 ),
"Maximal time between two publishing");
options.add_options() ( "TsNameIn", bpo::value< std::string >()->default_value( "t0component" ),
"MQ channel name for TS data");
options.add_options() ( "ChNameIn", bpo::value< std::string >()->default_value( "histogram-in" ),
"MQ channel name for histos");
options.add_options() ( "ChNameHistCfg", bpo::value< std::string >()->default_value( "histo-conf" ),
"MQ channel name for histos config");
options.add_options() ( "ChNameCanvCfg", bpo::value< std::string >()->default_value( "canvas-conf" ),
"MQ channel name for canvases config");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new CbmDeviceMonitorT0();
}
#include "runFairMQDevice.h"
/* Copyright (C) 2020 Facility for Antiproton and Ion Research in Europe, Darmstadt
SPDX-License-Identifier: GPL-3.0-only
Authors: Pierre-Alain Loizeau [committer] */
#include "CbmDeviceMonitorTof.h"
#include <string>
#include <iomanip>
#include <string>
#include "runFairMQDevice.h"
namespace bpo = boost::program_options;
using namespace std;
void addCustomOptions(bpo::options_description& options)
{
options.add_options() ("IgnOverMs", bpo::value< bool >()->default_value( true ),
"Ignore overlap MS if true");
options.add_options() ("DebugMoni", bpo::value< bool >()->default_value( false ),
"Debug Monitor Mode");
options.add_options() ("IgnCritErr", bpo::value< bool >()->default_value( true ),
"Ignore Critical Errors");
options.add_options() ("HistEvoSz", bpo::value< uint32_t >()->default_value( 1800 ),
"Size of evolution histos in seconds");
options.add_options() ("PulsTotMin", bpo::value< uint32_t >()->default_value( 185 ),
"Minimal TOT for pulser cut");
options.add_options() ("PulsTotMax", bpo::value< uint32_t >()->default_value( 195 ),
"Maximal TOT for pulser cut");
options.add_options() ("GdpbIdx", bpo::value< int32_t >()->default_value( -1 ),
"Single gDPB selection by index, -1 (default) to disable");
options.add_options() ("PubFreqTs", bpo::value< uint32_t >()->default_value( 100 ),
"Histo publishing frequency in TS");
options.add_options() ("PubTimeMin", bpo::value< double_t >()->default_value( 1.0 ),
"Minimal time between two publishing");
options.add_options() ("PubTimeMax", bpo::value< double_t >()->default_value( 10.0 ),
"Maximal time between two publishing");
options.add_options() ( "TsNameIn", bpo::value< std::string >()->default_value( "tofcomponent" ),
"MQ channel name for TS data");
options.add_options() ( "ChNameIn", bpo::value< std::string >()->default_value( "histogram-in" ),
"MQ channel name for histos");
options.add_options() ( "ChNameHistCfg", bpo::value< std::string >()->default_value( "histo-conf" ),
"MQ channel name for histos config");
options.add_options() ( "ChNameCanvCfg", bpo::value< std::string >()->default_value( "canvas-conf" ),
"MQ channel name for canvases config");
options.add_options()("IgnOverMs", bpo::value<bool>()->default_value(true), "Ignore overlap MS if true");
options.add_options()("DebugMoni", bpo::value<bool>()->default_value(false), "Debug Monitor Mode");
options.add_options()("IgnCritErr", bpo::value<bool>()->default_value(true), "Ignore Critical Errors");
options.add_options()("HistEvoSz", bpo::value<uint32_t>()->default_value(1800),
"Size of evolution histos in seconds");
options.add_options()("PulsTotMin", bpo::value<uint32_t>()->default_value(185), "Minimal TOT for pulser cut");
options.add_options()("PulsTotMax", bpo::value<uint32_t>()->default_value(195), "Maximal TOT for pulser cut");
options.add_options()("GdpbIdx", bpo::value<int32_t>()->default_value(-1),
"Single gDPB selection by index, -1 (default) to disable");
options.add_options()("PubFreqTs", bpo::value<uint32_t>()->default_value(100), "Histo publishing frequency in TS");
options.add_options()("PubTimeMin", bpo::value<double_t>()->default_value(1.0),
"Minimal time between two publishing");
options.add_options()("PubTimeMax", bpo::value<double_t>()->default_value(10.0),
"Maximal time between two publishing");
options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("tofcomponent"),
"MQ channel name for TS data");
options.add_options()("ChNameIn", bpo::value<std::string>()->default_value("histogram-in"),
"MQ channel name for histos");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new CbmDeviceMonitorTof();
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new CbmDeviceMonitorTof(); }
#!/bin/bash
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
fi
if [ $# -ge 1 ]; then
_nbmoni=$1
((_pubfreqts = $_nbmoni*100 ))
_pubminsec=1.0
_pubmaxsec=10.0
if [ $# -ge 4 ]; then
_filename=""
_dirname=""
_hostname=$4
if [ $# -ge 5 ]; then
_pubfreqts=$5
if [ $# -ge 6 ]; then
_pubminsec=$6
if [ $# -ge 7 ]; then
_pubmaxsec=$7
fi
fi
fi
elif [ $# -ge 2 ]; then
_filename=$2
_hostname=""
if [ $# -eq 3 ]; then
_dirname=$3
else
_dirname=""
fi
else
echo 'Starting connection to local stream'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo 'startMQSamplerT0Monitor2020.sh'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> <full filename pattern list>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> <filename pattern> <folder_path>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
_filename=""
_dirname=""
_hostname="localhost"
fi
else
echo 'Starting connection to local stream with 1 monitor process'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo 'startMQSamplerT0Monitor2020.sh'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> <full filename pattern list>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> <filename pattern> <folder_path>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
_filename=""
_dirname=""
_hostname="localhost"
_nbmoni=1
_pubfreqts=100
_pubminsec=1.0
_pubmaxsec=10.0
fi
_paramfile=@VMCWORKDIR@/macro/beamtime/mcbm2020/mT0Par.par
SAMPLER="MultiTsaSampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --max-timeslices 0"
SAMPLER+=" --severity info"
#SAMPLER+=" --flib-port 10"
if [ "$_hostname" != "" ]; then
SAMPLER+=" --flib-host $_hostname"
elif [ "$_filename" != "" ]; then
SAMPLER+=" --filename $_filename"
if [ "$_dirname" != "" ]; then
SAMPLER+=" --dirname $_dirname"
fi
fi
SAMPLER+=" --high-water-mark 1000"
SAMPLER+=" --send-ts-per-sysid 1"
SAMPLER+=" --channel-config name=t0component,type=push,method=bind,address=tcp://127.0.0.1:11555"
#SAMPLER+=" --transport shmem"
SAMPLER+=" --transport zeromq"
#SAMPLER+=" --transport nanomsg"
xterm -l -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER &
_iMoni=0
while (( _iMoni < _nbmoni )); do
(( _yOffset=100*_iMoni ))
(( _iMoni += 1 ))
MONITOR="T0MonitorMcbm2018"
MONITOR+=" --id mon$_iMoni"
MONITOR+=" --severity info"
MONITOR+=" --PubFreqTs $_pubfreqts"
MONITOR+=" --PubTimeMin $_pubminsec"
MONITOR+=" --PubTimeMax $_pubmaxsec"
MONITOR+=" --channel-config name=t0component,type=pull,method=connect,address=tcp://127.0.0.1:11555"
#MONITOR+=" --transport shmem"
MONITOR+=" --transport zeromq"
#MONITOR+=" --transport nanomsg"
MONITOR+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0"
MONITOR+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666"
MONITOR+=" --channel-config name=histo-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11667"
MONITOR+=" --channel-config name=canvas-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11668"
xterm -l -geometry 80x23+500+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/monitor/$MONITOR &
done
PARAMETERSERVER="parmq-server"
PARAMETERSERVER+=" --id parmq-server"
PARAMETERSERVER+=" --severity info"
PARAMETERSERVER+=" --channel-name parameters"
PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0"
PARAMETERSERVER+=" --first-input-name $_paramfile"
PARAMETERSERVER+=" --first-input-type ASCII"
PARAMETERSERVER+=" --libs-to-load=libCbmFlibMcbm2018" # doesn't work due to runtime problem
xterm -geometry 80x23+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER &
HISTSERVER="MqHistoServer"
HISTSERVER+=" --id server1"
HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666"
HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11667"
HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11668"
xterm -geometry 80x23+1500+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &
#!/bin/bash
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
fi
if [ $# -ge 1 ]; then
_nbmoni=$1
((_pubfreqts = $_nbmoni*100 ))
_pubminsec=1.0
_pubmaxsec=10.0
if [ $# -ge 4 ]; then
_filename=""
_dirname=""
_hostname=$4
if [ $# -ge 5 ]; then
_pubfreqts=$5
if [ $# -ge 6 ]; then
_pubminsec=$6
if [ $# -ge 7 ]; then
_pubmaxsec=$7
fi
fi
fi
elif [ $# -ge 2 ]; then
_filename=$2
_hostname=""
if [ $# -eq 3 ]; then
_dirname=$3
else
_dirname=""
fi
else
echo 'Starting connection to local stream'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo 'startMQSamplerT0Monitor2020.sh'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> <full filename pattern list>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> <filename pattern> <folder_path>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
_filename=""
_dirname=""
_hostname="localhost"
fi
else
echo 'Starting connection to local stream with 1 monitor process'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo 'startMQSamplerT0Monitor2020.sh'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> <full filename pattern list>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> <filename pattern> <folder_path>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
_filename=""
_dirname=""
_hostname="localhost"
_nbmoni=1
_pubfreqts=100
_pubminsec=1.0
_pubmaxsec=10.0
fi
_paramfile=@VMCWORKDIR@/macro/beamtime/mcbm2021/mT0Par.par
LOGFILETAG=`hostname`
LOGFILETAG+="_"
LOGFILETAG+=`date +%Y_%m_%d_%H_%M_%S`
LOGFILETAG+=".log"
SAMPLER="MultiTsaSampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --max-timeslices 0"
SAMPLER+=" --severity info"
#SAMPLER+=" --flib-port 10"
if [ "$_hostname" != "" ]; then
SAMPLER+=" --flib-host $_hostname"
elif [ "$_filename" != "" ]; then
SAMPLER+=" --filename $_filename"
if [ "$_dirname" != "" ]; then
SAMPLER+=" --dirname $_dirname"
fi
fi
SAMPLER+=" --high-water-mark 1000"
SAMPLER+=" --send-ts-per-sysid 1"
SAMPLER+=" --channel-config name=t0component,type=push,method=bind,address=tcp://127.0.0.1:11555"
#SAMPLER+=" --transport shmem"
SAMPLER+=" --transport zeromq"
#SAMPLER+=" --transport nanomsg"
# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
SAMPLER_LOG="sampler1_$LOGFILETAG"
xterm -l -lf $SAMPLER_LOG -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER &
_iMoni=0
while (( _iMoni < _nbmoni )); do
(( _yOffset=100*_iMoni ))
(( _iMoni += 1 ))
MONITOR="T0MonitorMcbm2018"
MONITOR+=" --id mon$_iMoni"
MONITOR+=" --severity info"
MONITOR+=" --PubFreqTs $_pubfreqts"
MONITOR+=" --PubTimeMin $_pubminsec"
MONITOR+=" --PubTimeMax $_pubmaxsec"
MONITOR+=" --PulsTotMin 185"
MONITOR+=" --PulsTotMax 190"
MONITOR+=" --SpillThr 200"
MONITOR+=" --ChanMap 4,5,6,7,0,1,2,3"
MONITOR+=" --channel-config name=t0component,type=pull,method=connect,address=tcp://127.0.0.1:11555"
#MONITOR+=" --transport shmem"
MONITOR+=" --transport zeromq"
#MONITOR+=" --transport nanomsg"
MONITOR+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0"
MONITOR+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666"
MONITOR+=" --channel-config name=histo-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11667"
MONITOR+=" --channel-config name=canvas-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11668"
# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
MONITOR_LOG="monit0_$_iMoni"
MONITOR_LOG+="_$LOGFILETAG"
xterm -l -lf $MONITOR_LOG -geometry 80x23+500+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/monitor/$MONITOR &
done
PARAMETERSERVER="parmq-server"
PARAMETERSERVER+=" --id parmq-server"
PARAMETERSERVER+=" --severity info"
PARAMETERSERVER+=" --channel-name parameters"
PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0"
PARAMETERSERVER+=" --first-input-name $_paramfile"
PARAMETERSERVER+=" --first-input-type ASCII"
PARAMETERSERVER+=" --libs-to-load=libCbmFlibMcbm2018" # doesn't work due to runtime problem
# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
PARAMSRV_LOG="parmq_$LOGFILETAG"
xterm -l -lf $PARAMSRV_LOG -geometry 80x23+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER &
HISTSERVER="MqHistoServer"
HISTSERVER+=" --id server1"
HISTSERVER+=" --severity info"
HISTSERVER+=" --histport 8082"
HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666"
HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11667"
HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11668"
# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
HISTSRV_LOG="server1_$LOGFILETAG"
xterm -l -lf $HISTSRV_LOG -geometry 80x23+1500+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &
#!/bin/bash
$SIMPATH/bin/fairmq-shmmonitor --cleanup
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
fi
if [ $# -ge 1 ]; then
_nbmoni=$1
......@@ -44,7 +47,7 @@ else
_nbmoni=1
fi
_paramfile=/scratch/cbmroot_macro/macro/beamtime/mcbm2019/mT0Par.par
_paramfile=@VMCWORKDIR@/macro/beamtime/mcbm2019/mT0Par.par
SAMPLER="MultiTsaSampler"
SAMPLER+=" --id sampler1"
......
#!/bin/bash
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
fi
if [ $# -ge 1 ]; then
_nbmoni=$1
if [ $# -ge 4 ]; then
_filename=""
_dirname=""
_hostname=$4
elif [ $# -ge 2 ]; then
_filename=$2
_hostname=""
if [ $# -eq 3 ]; then
_dirname=$3
else
_dirname=""
fi
else
echo 'Starting connection to local stream'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo 'startMQSamplerPsdMonitor2021.sh'
echo 'startMQSamplerPsdMonitor2021.sh <Nb Monitor processes>'
echo 'startMQSamplerPsdMonitor2021.sh <Nb Monitor processes> <full filename pattern list>'
echo 'startMQSamplerPsdMonitor2021.sh <Nb Monitor processes> <filename pattern> <folder_path>'
echo 'startMQSamplerPsdMonitor2021.sh <Nb Monitor processes> "" "" <hostname(s) list>'
_filename=""
_dirname=""
_hostname="localhost"
fi
else
echo 'Starting connection to local stream with 1 monitor process'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo 'startMQSamplerPsdMonitor2021.sh'
echo 'startMQSamplerPsdMonitor2021.sh <Nb Monitor processes>'
echo 'startMQSamplerPsdMonitor2021.sh <Nb Monitor processes> <full filename pattern list>'
echo 'startMQSamplerPsdMonitor2021.sh <Nb Monitor processes> <filename pattern> <folder_path>'
echo 'startMQSamplerPsdMonitor2021.sh <Nb Monitor processes> "" "" <hostname(s) list>'
_filename=""
_dirname=""
_hostname="localhost"
_nbmoni=1
fi
_paramfile=@VMCWORKDIR@/macro/beamtime/mcbm2021/mPsdPar.par
SAMPLER="MultiTsaSampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --max-timeslices 0"
SAMPLER+=" --severity info"
#SAMPLER+=" --flib-port 10"
if [ "$_hostname" != "" ]; then
SAMPLER+=" --flib-host $_hostname"
elif [ "$_filename" != "" ]; then
SAMPLER+=" --filename $_filename"
if [ "$_dirname" != "" ]; then
SAMPLER+=" --dirname $_dirname"
fi
fi
SAMPLER+=" --high-water-mark 1000"
SAMPLER+=" --send-ts-per-sysid 1"
SAMPLER+=" --channel-config name=psdcomponent,type=push,method=bind,address=tcp://127.0.0.1:11555"
SAMPLER+=" --transport shmem"
#SAMPLER+=" --transport zeromq"
#SAMPLER+=" --transport nanomsg"
xterm -l -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER &
_iMoni=0
((_oubfreqts = $_nbmoni*100 ))
while (( _iMoni < _nbmoni )); do
(( _yOffset=100*_iMoni ))
(( _iMoni += 1 ))
MONITOR="PsdMonitorMcbm2018"
MONITOR+=" --id mon$_iMoni"
MONITOR+=" --severity info"
MONITOR+=" --HistEvoSz 600"
MONITOR+=" --HistChrgArgs 500 0 5000"
MONITOR+=" --HistAmplArgs 100 0 500"
MONITOR+=" --HistZlArgs 100 0 5000"
MONITOR+=" --PubFreqTs $_oubfreqts"
MONITOR+=" --channel-config name=psdcomponent,type=pull,method=connect,address=tcp://127.0.0.1:11555"
MONITOR+=" --transport shmem"
#MONITOR+=" --transport zeromq"
#MONITOR+=" --transport nanomsg"
MONITOR+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0"
MONITOR+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666"
MONITOR+=" --channel-config name=histo-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11667"
MONITOR+=" --channel-config name=canvas-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11668"
xterm -l -geometry 80x23+500+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/monitor/$MONITOR &
done
PARAMETERSERVER="parmq-server"
PARAMETERSERVER+=" --id parmq-server"
PARAMETERSERVER+=" --severity info"
PARAMETERSERVER+=" --channel-name parameters"
PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0"
PARAMETERSERVER+=" --first-input-name $_paramfile"
PARAMETERSERVER+=" --first-input-type ASCII"
PARAMETERSERVER+=" --libs-to-load=libCbmFlibMcbm2018" # doesn't work due to runtime problem
xterm -geometry 80x23+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER &
HISTSERVER="MqHistoServer"
HISTSERVER+=" --id server1"
HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666"
HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11667"
HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11668"
xterm -geometry 800x230+1500+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &
#!/bin/bash
$SIMPATH/bin/fairmq-shmmonitor --cleanup
if [ $# -ge 1 ]; then
_nbmoni=$1
((_pubfreqts = $_nbmoni*100 ))
_pubminsec=1.0
_pubmaxsec=10.0
if [ $# -ge 4 ]; then
_filename=""
_dirname=""
_hostname=$4
if [ $# -ge 5 ]; then
_pubfreqts=$5
if [ $# -ge 6 ]; then
_pubminsec=$6
if [ $# -ge 7 ]; then
_pubmaxsec=$7
fi
fi
fi
elif [ $# -ge 2 ]; then
_filename=$2
_hostname=""
if [ $# -eq 3 ]; then
_dirname=$3
else
_dirname=""
fi
else
echo 'Starting connection to local stream'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo 'startMQSamplerT0Monitor2020.sh'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> <full filename pattern list>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> <filename pattern> <folder_path>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
_filename=""
_dirname=""
_hostname="localhost"
fi
else
echo 'Starting connection to local stream with 1 monitor process'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo 'startMQSamplerT0Monitor2020.sh'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> <full filename pattern list>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> <filename pattern> <folder_path>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo 'startMQSamplerT0Monitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
_filename=""
_dirname=""
_hostname="localhost"
_nbmoni=1
_pubfreqts=100
_pubminsec=1.0
_pubmaxsec=10.0
fi
_paramfile=/scratch/cbmroot_macro/macro/beamtime/mcbm2020/mT0Par.par
SAMPLER="MultiTsaSampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --max-timeslices 0"
SAMPLER+=" --severity info"
#SAMPLER+=" --flib-port 10"
if [ "$_hostname" != "" ]; then
SAMPLER+=" --flib-host $_hostname"
elif [ "$_filename" != "" ]; then
SAMPLER+=" --filename $_filename"
if [ "$_dirname" != "" ]; then
SAMPLER+=" --dirname $_dirname"
fi
fi
SAMPLER+=" --high-water-mark 1000"
SAMPLER+=" --send-ts-per-sysid 1"
SAMPLER+=" --channel-config name=t0component,type=push,method=bind,address=tcp://127.0.0.1:11555"
#SAMPLER+=" --transport shmem"
SAMPLER+=" --transport zeromq"
#SAMPLER+=" --transport nanomsg"
xterm -l -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER &
_iMoni=0
while (( _iMoni < _nbmoni )); do
(( _yOffset=100*_iMoni ))
(( _iMoni += 1 ))
MONITOR="T0MonitorMcbm2018"
MONITOR+=" --id mon$_iMoni"
MONITOR+=" --severity info"
MONITOR+=" --PubFreqTs $_pubfreqts"
MONITOR+=" --PubTimeMin $_pubminsec"
MONITOR+=" --PubTimeMax $_pubmaxsec"
MONITOR+=" --channel-config name=t0component,type=pull,method=connect,address=tcp://127.0.0.1:11555"
#MONITOR+=" --transport shmem"
MONITOR+=" --transport zeromq"
#MONITOR+=" --transport nanomsg"
MONITOR+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0"
MONITOR+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666"
MONITOR+=" --channel-config name=histo-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11667"
MONITOR+=" --channel-config name=canvas-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11668"
xterm -l -geometry 80x23+500+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/monitor/$MONITOR &
done
PARAMETERSERVER="parmq-server"
PARAMETERSERVER+=" --id parmq-server"
PARAMETERSERVER+=" --severity info"
PARAMETERSERVER+=" --channel-name parameters"
PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0"
PARAMETERSERVER+=" --first-input-name $_paramfile"
PARAMETERSERVER+=" --first-input-type ASCII"
PARAMETERSERVER+=" --libs-to-load=libCbmFlibMcbm2018" # doesn't work due to runtime problem
xterm -geometry 80x23+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER &
HISTSERVER="MqHistoServer"
HISTSERVER+=" --id server1"
HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666"
HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11667"
HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11668"
xterm -geometry 80x23+1500+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &
#!/bin/bash
$SIMPATH/bin/fairmq-shmmonitor --cleanup
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
fi
if [ $# -ge 1 ]; then
_nbmoni=$1
......@@ -68,7 +71,7 @@ else
_pubmaxsec=10.0
fi
_paramfile=/scratch/cbmroot_macro/macro/beamtime/mcbm2020/mTofPar.par
_paramfile=@VMCWORKDIR@/macro/beamtime/mcbm2020/mTofPar.par
SAMPLER="MultiTsaSampler"
SAMPLER+=" --id sampler1"
......
#!/bin/bash
if [ -e @SIMPATH@/bin/fairmq-shmmonitor ]; then
@SIMPATH@/bin/fairmq-shmmonitor --cleanup
fi
if [ $# -ge 1 ]; then
_nbmoni=$1
((_pubfreqts = $_nbmoni*100 ))
_pubminsec=1.0
_pubmaxsec=10.0
if [ $# -ge 4 ]; then
_filename=""
_dirname=""
_hostname=$4
if [ $# -ge 5 ]; then
_pubfreqts=$5
if [ $# -ge 6 ]; then
_pubminsec=$6
if [ $# -ge 7 ]; then
_pubmaxsec=$7
fi
fi
fi
elif [ $# -ge 2 ]; then
_filename=$2
_hostname=""
if [ $# -eq 3 ]; then
_dirname=$3
else
_dirname=""
fi
else
echo 'Starting connection to local stream'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo 'startMQSamplerTofMonitor2020.sh'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes>'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes> <full filename pattern list>'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes> <filename pattern> <folder_path>'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list>'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
_filename=""
_dirname=""
_hostname="localhost"
fi
else
echo 'Starting connection to local stream with 1 monitor process'
echo ' for other usages, please supply at least a filename.'
echo 'Possible usages are:'
echo 'startMQSamplerTofMonitor2020.sh'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes>'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes> <full filename pattern list>'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes> <filename pattern> <folder_path>'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list>'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS>'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s>'
echo 'startMQSamplerTofMonitor2020.sh <Nb Monitor processes> "" "" <hostname(s) list> <Hist publish freq. in TS> <Min Hist pub. in s> <Max Hist pub. in s>'
_filename=""
_dirname=""
_hostname="localhost"
_nbmoni=1
_pubfreqts=100
_pubminsec=1.0
_pubmaxsec=10.0
fi
_paramfile=@VMCWORKDIR@/macro/beamtime/mcbm2021/mTofPar.par
LOGFILETAG=`hostname`
LOGFILETAG+="_"
LOGFILETAG+=`date +%Y_%m_%d_%H_%M_%S`
LOGFILETAG+=".log"
SAMPLER="MultiTsaSampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --max-timeslices 0"
SAMPLER+=" --severity info"
#SAMPLER+=" --flib-port 10"
if [ "$_hostname" != "" ]; then
SAMPLER+=" --flib-host $_hostname"
elif [ "$_filename" != "" ]; then
SAMPLER+=" --filename $_filename"
if [ "$_dirname" != "" ]; then
SAMPLER+=" --dirname $_dirname"
fi
fi
SAMPLER+=" --high-water-mark 1000"
#SAMPLER+=" --no-split-ts 1"
SAMPLER+=" --send-ts-per-channel 1"
SAMPLER+=" --sysid-chan 0x60:tofcomponent"
SAMPLER+=" --sysid-chan 0x90:tofcomponent"
SAMPLER+=" --channel-config name=tofcomponent,type=push,method=bind,address=tcp://127.0.0.1:11555"
#SAMPLER+=" --transport shmem"
SAMPLER+=" --transport zeromq"
#SAMPLER+=" --transport nanomsg"
# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
SAMPLER_LOG="sampler1_$LOGFILETAG"
xterm -l -lf $SAMPLER_LOG -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/source/$SAMPLER &
_iMoni=0
((_oubfreqts = $_nbmoni*100 ))
while (( _iMoni < _nbmoni )); do
(( _yOffset=100*_iMoni ))
(( _iMoni += 1 ))
MONITOR="TofMonitorMcbm2018"
MONITOR+=" --id mon$_iMoni"
MONITOR+=" --severity info"
# MONITOR+=" --DebugMoni 1"
MONITOR+=" --PubFreqTs $_pubfreqts"
MONITOR+=" --PubTimeMin $_pubminsec"
MONITOR+=" --PubTimeMax $_pubmaxsec"
MONITOR+=" --channel-config name=tofcomponent,type=pull,method=connect,address=tcp://127.0.0.1:11555"
#MONITOR+=" --transport shmem"
MONITOR+=" --transport zeromq"
#MONITOR+=" --transport nanomsg"
MONITOR+=" --channel-config name=parameters,type=req,method=connect,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0"
MONITOR+=" --channel-config name=histogram-in,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11666"
MONITOR+=" --channel-config name=histo-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11667"
MONITOR+=" --channel-config name=canvas-conf,type=pub,method=connect,transport=zeromq,address=tcp://127.0.0.1:11668"
# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
MONITOR_LOG="monit0_$_iMoni"
MONITOR_LOG+="_$LOGFILETAG"
xterm -l -lf $MONITOR_LOG -geometry 80x23+500+$_yOffset -hold -e @CMAKE_BINARY_DIR@/bin/MQ/monitor/$MONITOR &
done
PARAMETERSERVER="parmq-server"
PARAMETERSERVER+=" --id parmq-server"
PARAMETERSERVER+=" --severity info"
PARAMETERSERVER+=" --channel-name parameters"
PARAMETERSERVER+=" --channel-config name=parameters,type=rep,method=bind,transport=zeromq,address=tcp://127.0.0.1:11005,rateLogging=0"
PARAMETERSERVER+=" --first-input-name $_paramfile"
PARAMETERSERVER+=" --first-input-type ASCII"
PARAMETERSERVER+=" --libs-to-load=libCbmFlibMcbm2018" # doesn't work due to runtime problem
# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
PARAMSRV_LOG="parmq_$LOGFILETAG"
xterm -l -lf $PARAMSRV_LOG -geometry 80x23+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/parmq/$PARAMETERSERVER &
HISTSERVER="MqHistoServer"
HISTSERVER+=" --id server1"
HISTSERVER+=" --severity info"
HISTSERVER+=" --histport 8081"
HISTSERVER+=" --channel-config name=histogram-in,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11666"
HISTSERVER+=" --channel-config name=histo-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11667"
HISTSERVER+=" --channel-config name=canvas-conf,type=sub,method=bind,transport=zeromq,address=tcp://127.0.0.1:11668"
# Replaces log filename Xterm.log.hostname.yyyy.mm.dd.hh.mm.ss.XXXXXX
# with ProcessName_hostname_yyyy_mm_dd_hh_mm_ss.log
HISTSRV_LOG="server1_$LOGFILETAG"
xterm -l -lf $HISTSRV_LOG -geometry 80x23+1500+0 -hold -e @CMAKE_BINARY_DIR@/bin/MQ/histogramServer/$HISTSERVER &
Set(INCLUDE_DIRECTORIES
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_SOURCE_DIR}/MQ/base
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${FAIRROOT_INCLUDE_DIR}
${FAIRMQ_INCLUDE_DIR}
${FAIRMQ_INCLUDE_DIR}/options
${Boost_INCLUDE_DIR}
${ZeroMQ_INCLUDE_DIR}
)
Include_Directories(${INCLUDE_DIRECTORIES})
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
Set(LINK_DIRECTORIES
${Boost_LIBRARY_DIRS}
${ROOT_LIBRARY_DIR}
${FAIRROOT_LIBRARY_DIR}
)
Link_Directories(${LINK_DIRECTORIES})
set(INCLUDE_DIRECTORIES
${CMAKE_CURRENT_SOURCE_DIR}
)
# Set the install path within the build directory
set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/MQ/parmq")
# Set the install path within the installation directory
set(BIN_DESTINATION bin/MQ/parmq)
Set(BOOST_LIBS
${Boost_SYSTEM_LIBRARY}
${Boost_SERIALIZATION_LIBRARY}
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_LOG_LIBRARY}
RIO
)
If(UNIX AND NOT APPLE)
List(APPEND BOOST_LIBS pthread)
EndIf()
set(FAIR_LIBS
Base
ParBase
FairMQ
Gen
)
If(FAIRLOGGER_FOUND)
set(FAIR_LIBS
${FAIR_LIBS}
FairLogger
)
EndIf()
Set(EXE_NAME parmq-server)
Set(SRCS
ParameterMQServer.cxx
runParameterMQServer.cxx
set(EXE_NAME parmq-server)
set(SRCS ParameterMQServer.cxx runParameterMQServer.cxx
# CbmMQTestContFact.cxx
)
Set(DEPENDENCIES
${DEPENDENCIES}
${FAIR_LIBS}
${BOOST_LIBS}
Core
Net
Geom
CbmField
CbmTofBase
CbmStsBase
CbmStsSim
CbmSimBase
CbmFlibMcbm2018
)
GENERATE_EXECUTABLE()
)
set(PRIVATE_DEPENDENCIES
CbmMQBase
CbmSimSteer
FairRoot::ParBase
ROOT::Core
ROOT::Geom
ROOT::Net
)
set(INTERFACE_DEPENDENCIES
FairMQ::FairMQ
)
generate_cbm_executable()
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
......@@ -12,212 +12,232 @@
* @author M. Al-Turany, A. Rybalchenko
*/
#include "CbmMQDefs.h"
#include "TMessage.h"
#include "Rtypes.h"
#include "ParameterMQServer.h"
#include "FairRuntimeDb.h"
#include "FairParAsciiFileIo.h"
#include "FairParRootFileIo.h"
#include "FairParGenericSet.h"
#include "CbmMQDefs.h"
#include "CbmSetup.h"
#include "ParameterMQServer.h"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h"
#include "FairParAsciiFileIo.h"
#include "FairParGenericSet.h"
#include "FairParRootFileIo.h"
#include "FairRuntimeDb.h"
#include "Rtypes.h"
#include "TGeoManager.h"
#include "TList.h"
#include "TMessage.h"
#include "TObjString.h"
#include "TSystem.h"
#include "TGeoManager.h"
using namespace std;
ParameterMQServer::ParameterMQServer() :
fRtdb(FairRuntimeDb::instance()),
fFirstInputName("first_input.root"),
fFirstInputType("ROOT"),
fSecondInputName(""),
fSecondInputType("ROOT"),
fOutputName(""),
fOutputType("ROOT"),
fChannelName("data")
ParameterMQServer::ParameterMQServer()
: fRtdb(FairRuntimeDb::instance())
{
}
void ParameterMQServer::InitTask()
{
string loadLibs = fConfig->GetValue<string>("libs-to-load");
if ( loadLibs.length() > 0 ) {
if (loadLibs.length() > 0) {
LOG(info) << "There are libraries to load.";
if (loadLibs.find(";") != std::string::npos) {
LOG(info) << "There are several libraries to load";
istringstream f(loadLibs);
string s;
while (getline(f, s, ';')) {
LOG(info)<< "Load library " << s;
gSystem->Load(s.c_str());
LOG(info) << "Load library " << s;
gSystem->Load(s.c_str());
}
} else {
LOG(info)<< "Load library " << loadLibs;
}
else {
LOG(info) << "Load library " << loadLibs;
gSystem->Load(loadLibs.c_str());
}
} else {
}
else {
LOG(info) << "There are no libraries to load.";
}
fFirstInputName = fConfig->GetValue<string>("first-input-name");
fFirstInputType = fConfig->GetValue<string>("first-input-type");
fSecondInputName = fConfig->GetValue<string>("second-input-name");
fSecondInputType = fConfig->GetValue<string>("second-input-type");
fOutputName = fConfig->GetValue<string>("output-name");
fOutputType = fConfig->GetValue<string>("output-type");
fChannelName = fConfig->GetValue<string>("channel-name");
if (fRtdb != 0)
{
// Set first input
if (fFirstInputType == "ROOT")
{
FairParRootFileIo* par1R = new FairParRootFileIo();
par1R->open(fFirstInputName.data(), "UPDATE");
fRtdb->setFirstInput(par1R);
}
else if (fFirstInputType == "ASCII")
{
FairParAsciiFileIo* par1A = new FairParAsciiFileIo();
if (fFirstInputName.find(";") != std::string::npos) {
LOG(info) << "File list found!";
TList *parFileList = new TList();
TObjString* parFile(NULL);
istringstream f(fFirstInputName);
string s;
while (getline(f, s, ';')) {
LOG(info)<< "File: " << s;
parFile = new TObjString(s.c_str());
parFileList->Add(parFile);
par1A->open(parFileList, "in");
}
} else {
LOG(info) << "Single input file found!";
par1A->open(fFirstInputName.data(), "in");
}
fRtdb->setFirstInput(par1A);
}
// Set second input
if (fSecondInputName != "")
{
if (fSecondInputType == "ROOT")
{
FairParRootFileIo* par2R = new FairParRootFileIo();
par2R->open(fSecondInputName.data(), "UPDATE");
fRtdb->setSecondInput(par2R);
}
else if (fSecondInputType == "ASCII")
{
FairParAsciiFileIo* par2A = new FairParAsciiFileIo();
if (fSecondInputName.find(";") != std::string::npos) {
LOG(info) << "File list found!";
TList *parFileList = new TList();
TObjString* parFile(NULL);
istringstream f(fSecondInputName);
string s;
while (getline(f, s, ';')) {
LOG(info)<< "File: " << s;
parFile = new TObjString(s.c_str());
parFileList->Add(parFile);
par2A->open(parFileList, "in");
}
} else {
LOG(info) << "Single input file found!";
par2A->open(fFirstInputName.data(), "in");
}
fRtdb->setSecondInput(par2A);
}
fFirstInputName = fConfig->GetValue<string>("first-input-name");
fFirstInputType = fConfig->GetValue<string>("first-input-type");
fSecondInputName = fConfig->GetValue<string>("second-input-name");
fSecondInputType = fConfig->GetValue<string>("second-input-type");
fOutputName = fConfig->GetValue<string>("output-name");
fOutputType = fConfig->GetValue<string>("output-type");
fChannelName = fConfig->GetValue<string>("channel-name");
fsSetupName = fConfig->GetValue<std::string>("setup");
LOG(info) << "Using setup: " << fsSetupName;
if (fRtdb != 0) {
// Set first input
if (fFirstInputType == "ROOT") {
FairParRootFileIo* par1R = new FairParRootFileIo();
par1R->open(fFirstInputName.data(), "UPDATE");
fRtdb->setFirstInput(par1R);
}
else if (fFirstInputType == "ASCII") {
FairParAsciiFileIo* par1A = new FairParAsciiFileIo();
if (fFirstInputName.find(";") != std::string::npos) {
LOG(info) << "File list found!";
TList* parFileList = new TList();
TObjString* parFile(NULL);
istringstream f(fFirstInputName);
string s;
while (getline(f, s, ';')) {
LOG(info) << "File: " << s;
parFile = new TObjString(s.c_str());
parFileList->Add(parFile);
par1A->open(parFileList, "in");
}
}
else {
LOG(info) << "Single input file found!";
par1A->open(fFirstInputName.data(), "in");
}
fRtdb->setFirstInput(par1A);
}
// Set output
if (fOutputName != "")
{
if (fOutputType == "ROOT")
{
FairParRootFileIo* parOut = new FairParRootFileIo(kTRUE);
parOut->open(fOutputName.data());
fRtdb->setOutput(parOut);
}
fRtdb->saveOutput();
// Set second input
if (fSecondInputName != "") {
if (fSecondInputType == "ROOT") {
FairParRootFileIo* par2R = new FairParRootFileIo();
par2R->open(fSecondInputName.data(), "UPDATE");
fRtdb->setSecondInput(par2R);
}
else if (fSecondInputType == "ASCII") {
FairParAsciiFileIo* par2A = new FairParAsciiFileIo();
if (fSecondInputName.find(";") != std::string::npos) {
LOG(info) << "File list found!";
TList* parFileList = new TList();
TObjString* parFile(NULL);
istringstream f(fSecondInputName);
string s;
while (getline(f, s, ';')) {
LOG(info) << "File: " << s;
parFile = new TObjString(s.c_str());
parFileList->Add(parFile);
par2A->open(parFileList, "in");
}
}
else {
LOG(info) << "Single input file found!";
par2A->open(fFirstInputName.data(), "in");
}
fRtdb->setSecondInput(par2A);
}
}
// Set output
if (fOutputName != "") {
if (fOutputType == "ROOT") {
FairParRootFileIo* parOut = new FairParRootFileIo(kTRUE);
parOut->open(fOutputName.data());
fRtdb->setOutput(parOut);
}
fRtdb->saveOutput();
}
fRtdb->print();
}
fRtdb->print();
// ----- CbmSetup -----------------------------------------------------
if ("" != fsSetupName) {
fSetup = CbmSetup::Instance();
fSetup->LoadSetup(fsSetupName.data());
}
// ------------------------------------------------------------------------
}
void ParameterMQServer::Run()
{
string parameterName = "";
FairParGenericSet* par = nullptr;
while (cbm::mq::CheckCurrentState(this, cbm::mq::State::Running))
{
FairMQMessagePtr req(NewMessage());
if (Receive(req, fChannelName, 0) > 0)
{
string reqStr(static_cast<char*>(req->GetData()), req->GetSize());
LOG(info) << "Received parameter request from client: \"" << reqStr << "\"";
size_t pos = reqStr.rfind(",");
string newParameterName = reqStr.substr(0, pos);
int runId = stoi(reqStr.substr(pos + 1));
LOG(info) << "Parameter name: " << newParameterName;
LOG(info) << "Run ID: " << runId;
LOG(info) << "Retrieving parameter...";
// Check if the parameter name has changed to avoid getting same container repeatedly
if (newParameterName != parameterName)
{
parameterName = newParameterName;
par = static_cast<FairParGenericSet*>(fRtdb->getContainer(parameterName.c_str()));
}
LOG(info) << "Retrieving parameter...Done";
if (-1 != runId) {
fRtdb->initContainers(runId);
}
LOG(info) << "Sending following parameter to the client:";
if (par)
{
par->print();
TMessage* tmsg = new TMessage(kMESS_OBJECT);
tmsg->WriteObject(par);
FairMQMessagePtr rep(NewMessage(tmsg->Buffer(),
tmsg->BufferSize(),
[](void* /*data*/, void* object){ delete static_cast<TMessage*>(object); },
tmsg));
if (Send(rep, fChannelName, 0) < 0)
{
LOG(error) << "failed sending reply";
break;
}
}
else
{
LOG(error) << "Parameter uninitialized!";
// Send an empty message back to keep the REQ/REP cycle
FairMQMessagePtr rep(NewMessage());
if (Send(rep, fChannelName, 0) < 0)
{
LOG(error) << "failed sending reply";
break;
}
}
string parameterName = "";
FairParGenericSet* par = nullptr;
while (cbm::mq::CheckCurrentState(this, cbm::mq::State::Running)) {
FairMQMessagePtr req(NewMessage());
if (Receive(req, fChannelName, 0) > 0) {
string reqStr(static_cast<char*>(req->GetData()), req->GetSize());
LOG(info) << "Received parameter request from client: \"" << reqStr << "\"";
if ("setup" == reqStr) {
// TODO: support for multiple setups on Par Server? with request containing setup name?
if ("" != fsSetupName && fSetup) {
/// Prepare serialized versions of the CbmSetup
CbmSetupStorable exchangableSetup(fSetup);
TMessage* tmsg = new TMessage(kMESS_OBJECT);
tmsg->WriteObject(&exchangableSetup);
FairMQMessagePtr rep(NewMessage(
tmsg->Buffer(), tmsg->BufferSize(),
[](void* /*data*/, void* object) { delete static_cast<TMessage*>(object); }, tmsg));
if (Send(rep, fChannelName, 0) < 0) {
LOG(error) << "failed sending reply to Setup request";
break;
}
}
else {
LOG(error) << "CbmSetup uninitialized!";
// Send an empty message back to keep the REQ/REP cycle
FairMQMessagePtr rep(NewMessage());
if (Send(rep, fChannelName, 0) < 0) {
LOG(error) << "failed sending reply to Setup request";
break;
}
}
}
else {
size_t pos = reqStr.rfind(",");
string newParameterName = reqStr.substr(0, pos);
int runId = stoi(reqStr.substr(pos + 1));
LOG(info) << "Parameter name: " << newParameterName;
LOG(info) << "Run ID: " << runId;
LOG(info) << "Retrieving parameter...";
// Check if the parameter name has changed to avoid getting same container repeatedly
if (newParameterName != parameterName) {
parameterName = newParameterName;
par = static_cast<FairParGenericSet*>(fRtdb->getContainer(parameterName.c_str()));
}
LOG(info) << "Retrieving parameter...Done";
if (-1 != runId) { fRtdb->initContainers(runId); }
LOG(info) << "Sending following parameter to the client:";
if (par) {
par->print();
TMessage* tmsg = new TMessage(kMESS_OBJECT);
tmsg->WriteObject(par);
FairMQMessagePtr rep(NewMessage(
tmsg->Buffer(), tmsg->BufferSize(),
[](void* /*data*/, void* object) { delete static_cast<TMessage*>(object); }, tmsg));
if (Send(rep, fChannelName, 0) < 0) {
LOG(error) << "failed sending reply";
break;
}
}
else {
LOG(error) << "Parameter uninitialized!";
// Send an empty message back to keep the REQ/REP cycle
FairMQMessagePtr rep(NewMessage());
if (Send(rep, fChannelName, 0) < 0) {
LOG(error) << "failed sending reply";
break;
}
}
}
}
}
}
ParameterMQServer::~ParameterMQServer()
......
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
......@@ -15,52 +15,55 @@
#ifndef PARAMETERMQSERVER_H_
#define PARAMETERMQSERVER_H_
#include <string>
#include "FairMQDevice.h"
#include <string>
class FairRuntimeDb;
class CbmSetup;
class ParameterMQServer : public FairMQDevice {
public:
ParameterMQServer();
class ParameterMQServer : public FairMQDevice
{
public:
ParameterMQServer();
ParameterMQServer(const ParameterMQServer&) = delete;
ParameterMQServer operator=(const ParameterMQServer&) = delete;
ParameterMQServer(const ParameterMQServer&) = delete;
ParameterMQServer operator=(const ParameterMQServer&) = delete;
virtual ~ParameterMQServer();
virtual ~ParameterMQServer();
virtual void Run();
virtual void InitTask();
virtual void Run();
virtual void InitTask();
void SetFirstInputName(const std::string& firstInputName) { fFirstInputName = firstInputName; }
std::string GetFirstInputName() { return fFirstInputName; }
void SetFirstInputType(const std::string& firstInputType) { fFirstInputType = firstInputType; }
std::string GetFirstInputType() { return fFirstInputType; }
void SetSecondInputName(const std::string& secondInputName) { fSecondInputName = secondInputName; }
std::string GetSecondInputName() { return fSecondInputName; }
void SetSecondInputType(const std::string& secondInputType) { fSecondInputType = secondInputType; }
std::string GetSecondInputType() { return fSecondInputType; }
void SetOutputName(const std::string& outputName) { fOutputName = outputName; }
std::string GetOutputName() { return fOutputName; }
void SetOutputType(const std::string& outputType) { fOutputType = outputType; }
std::string GetOutputType() { return fOutputType; }
void SetFirstInputName(const std::string& firstInputName) { fFirstInputName = firstInputName; }
std::string GetFirstInputName() { return fFirstInputName; }
void SetFirstInputType(const std::string& firstInputType) { fFirstInputType = firstInputType; }
std::string GetFirstInputType() { return fFirstInputType; }
void SetSecondInputName(const std::string& secondInputName) { fSecondInputName = secondInputName; }
std::string GetSecondInputName() { return fSecondInputName; }
void SetSecondInputType(const std::string& secondInputType) { fSecondInputType = secondInputType; }
std::string GetSecondInputType() { return fSecondInputType; }
void SetOutputName(const std::string& outputName) { fOutputName = outputName; }
std::string GetOutputName() { return fOutputName; }
void SetOutputType(const std::string& outputType) { fOutputType = outputType; }
std::string GetOutputType() { return fOutputType; }
void SetChannelName(const std::string& channelName) { fChannelName = channelName; }
std::string GetChannelName() { return fChannelName; }
void SetChannelName(const std::string& channelName) { fChannelName = channelName; }
std::string GetChannelName() { return fChannelName; }
private:
FairRuntimeDb* fRtdb = nullptr;
CbmSetup* fSetup = nullptr;
private:
FairRuntimeDb* fRtdb;
std::string fFirstInputName = "first_input.root";
std::string fFirstInputType = "ROOT";
std::string fSecondInputName = "";
std::string fSecondInputType = "ROOT";
std::string fOutputName = "";
std::string fOutputType = "ROOT";
std::string fFirstInputName;
std::string fFirstInputType;
std::string fSecondInputName;
std::string fSecondInputType;
std::string fOutputName;
std::string fOutputType;
std::string fChannelName = "data";
std::string fChannelName;
std::string fsSetupName = "";
};
#endif /* PARAMETERMQSERVER_H_ */