Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* CbmDeviceMonitorTof.cxx
*
* @since 2020-04-15
* @author P.-A Loizeau
*/
#include "CbmDeviceMonitorTof.h"
#include "CbmMQDefs.h"
#include "CbmMcbm2018MonitorAlgoTof.h"
#include "CbmFlesCanvasTools.h"
#include "StorableTimeslice.hpp"
#include "FairMQLogger.h"
#include "FairMQProgOptions.h" // device->fConfig
#include "FairParGenericSet.h"
#include "RootSerializer.h"
#include "BoostSerializer.h"
#include "TNamed.h"
#include "TList.h"
#include "TCanvas.h"
#include "TFile.h"
#include "TH1.h"
#include <string>
#include <iomanip>
#include <array>
#include <boost/serialization/utility.hpp>
#include <boost/archive/binary_iarchive.hpp>
#include <stdexcept>
struct InitTaskError : std::runtime_error { using std::runtime_error::runtime_error; };
using namespace std;
Bool_t bMcbm2018MonitorTaskTofResetHistos = kFALSE;
CbmDeviceMonitorTof::CbmDeviceMonitorTof()
: fMonitorAlgo{ new CbmMcbm2018MonitorAlgoTof() }
{
}
void CbmDeviceMonitorTof::InitTask()
try
{
/// Read options from executable
LOG(info) << "Init options for CbmMqStarHistoServer.";
fbIgnoreOverlapMs = fConfig->GetValue< bool >( "IgnOverMs" );
fbDebugMonitorMode = fConfig->GetValue< bool >( "DebugMoni" );
fbIgnoreCriticalErrors = fConfig->GetValue< bool >( "IgnCritErr" );
fuHistoryHistoSize = fConfig->GetValue< uint32_t >( "HistEvoSz" );
fuMinTotPulser = fConfig->GetValue< uint32_t >( "PulsTotMin" );
fuMaxTotPulser = fConfig->GetValue< uint32_t >( "PulsTotMax" );
fiGdpbIndex = fConfig->GetValue< int32_t >( "GdpbIdx" );
fuPublishFreqTs = fConfig->GetValue< uint32_t >( "PubFreqTs" );
fdMinPublishTime = fConfig->GetValue< double_t >( "PubTimeMin" );
fdMaxPublishTime = fConfig->GetValue< double_t >( "PubTimeMax" );
fsChannelNameDataInput = fConfig->GetValue< std::string >( "TsNameIn" );
fsChannelNameHistosInput = fConfig->GetValue< std::string >( "ChNameIn" );
fsChannelNameHistosConfig = fConfig->GetValue< std::string >( "ChNameHistCfg" );
fsChannelNameCanvasConfig = fConfig->GetValue< std::string >( "ChNameCanvCfg" );
fsAllowedChannels[ 0 ] = fsChannelNameDataInput;
LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;

Pierre-Alain Loizeau
committed
/// Set the Monitor Algo in Absolute time scale
fMonitorAlgo->UseAbsoluteTime();
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
// Get the information about created channels from the device
// Check if the defined channels from the topology (by name)
// are in the list of channels which are possible/allowed
// for the device
// The idea is to check at initilization if the devices are
// properly connected. For the time beeing this is done with a
// nameing convention. It is not avoided that someone sends other
// data on this channel.
//logger::SetLogLevel("INFO");
int noChannel = fChannels.size();
LOG(info) << "Number of defined channels: " << noChannel;
for( auto const &entry : fChannels )
{
LOG(info) << "Channel name: " << entry.first;
if( std::string::npos != entry.first.find( fsChannelNameDataInput ) )
{
if( !IsChannelNameAllowed( entry.first ) )
throw InitTaskError( "Channel name does not match." );
OnData( entry.first, &CbmDeviceMonitorTof::HandleData );
} // if( std::string::npos != entry.first.find( fsChannelNameDataInput ) )
} // for( auto const &entry : fChannels )
InitContainers();
} catch (InitTaskError& e) {
LOG(error) << e.what();
// Wrapper defined in CbmMQDefs.h to support different FairMQ versions
cbm::mq::ChangeState( this, cbm::mq::Transition::ErrorFound );
}
bool CbmDeviceMonitorTof::IsChannelNameAllowed(std::string channelName)
{
for( auto const &entry : fsAllowedChannels ) {
std::size_t pos1 = channelName.find(entry);
if( pos1 != std::string::npos ) {
const vector< std::string >::const_iterator pos =
std::find( fsAllowedChannels.begin(), fsAllowedChannels.end(), entry );
const vector< std::string >::size_type idx = pos - fsAllowedChannels.begin();
LOG(info) << "Found " << entry << " in " << channelName;
LOG(info) << "Channel name " << channelName
<< " found in list of allowed channel names at position " << idx;
return true;
} // if (pos1!=std::string::npos)
} // for(auto const &entry : fsAllowedChannels)
LOG(info) << "Channel name " << channelName
<< " not found in list of allowed channel names.";
LOG(error) << "Stop device.";
return false;
}
Bool_t CbmDeviceMonitorTof::InitContainers()
{
LOG(info) << "Init parameter containers for CbmDeviceMonitorTof.";
fParCList = fMonitorAlgo->GetParList();
for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ ) {
FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
fParCList->Remove( tempObj );
std::string paramName{ tempObj->GetName() };
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
// Her must come the proper Runid
std::string message = paramName + ",111";
LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
FairMQMessagePtr req( NewSimpleMessage(message) );
FairMQMessagePtr rep( NewMessage() );
FairParGenericSet* newObj = nullptr;
if ( Send(req, "parameters") > 0 ) {
if ( Receive( rep, "parameters" ) >= 0) {
if ( rep->GetSize() != 0 ) {
CbmMQTMessage tmsg( rep->GetData(), rep->GetSize() );
newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
LOG( info ) << "Received unpack parameter from the server:";
newObj->print();
} else {
LOG( error ) << "Received empty reply. Parameter not available";
} // if (rep->GetSize() != 0)
} // if (Receive(rep, "parameters") >= 0)
} // if (Send(req, "parameters") > 0)
fParCList->AddAt( newObj, iparC );
delete tempObj;
} // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
/// Need to add accessors for all options
fMonitorAlgo->SetIgnoreOverlapMs( fbIgnoreOverlapMs );
fMonitorAlgo->SetDebugMonitorMode( fbDebugMonitorMode );
fMonitorAlgo->SetIgnoreCriticalErrors( fbIgnoreCriticalErrors );
fMonitorAlgo->SetHistoryHistoSize( fuHistoryHistoSize );
fMonitorAlgo->SetPulserTotLimits( fuMinTotPulser, fuMaxTotPulser );
fMonitorAlgo->SetGdpbIndex( fiGdpbIndex );
Bool_t initOK = fMonitorAlgo->InitContainers();
// Bool_t initOK = fMonitorAlgo->ReInitContainers();
/// Histos creation and obtain pointer on them
/// Trigger histo creation on all associated algos
initOK &= fMonitorAlgo->CreateHistograms();
/// Obtain vector of pointers on each histo from the algo (+ optionally desired folder)
std::vector< std::pair< TNamed *, std::string > > vHistos = fMonitorAlgo->GetHistoVector();
/// Obtain vector of pointers on each canvas from the algo (+ optionally desired folder)
std::vector< std::pair< TCanvas *, std::string > > vCanvases = fMonitorAlgo->GetCanvasVector();
/// Add pointers to each histo in the histo array
/// Create histo config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Histo name, Folder >
/// and send it through a separate channel using the BoostSerializer
for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
{
// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
// << " in " << vHistos[ uHisto ].second.data()
// ;
fArrayHisto.Add( vHistos[ uHisto ].first );
std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(),
vHistos[ uHisto ].second );
fvpsHistosFolder.push_back( psHistoConfig );
/// Serialize the vector of histo config into a single MQ message
FairMQMessagePtr messageHist( NewMessage() );
Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig );
/// Send message to the common histogram config messages queue
if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
{
LOG(error) << "Problem sending histo config";
return false;
} // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
LOG(info) << "Config of hist " << psHistoConfig.first.data()
<< " in folder " << psHistoConfig.second.data() ;
} // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
/// Create canvas config vector
/// ===> Use an std::vector< std::pair< std::string, std::string > > with < Canvas name, config >
/// and send it through a separate channel using the BoostSerializer
for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
{
// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
// << " in " << vCanvases[ uCanv ].second.data();
std::string sCanvName = (vCanvases[ uCanv ].first)->GetName();
std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first );
std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf );
fvpsCanvasConfig.push_back( psCanvConfig );
/// Serialize the vector of canvas config into a single MQ message
FairMQMessagePtr messageCan( NewMessage() );
Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig );
/// Send message to the common canvas config messages queue
if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
{
LOG(error) << "Problem sending canvas config";
return false;
} // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
LOG(info) << "Config string of Canvas " << psCanvConfig.first.data()
<< " is " << psCanvConfig.second.data() ;
} // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
return initOK;
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool CbmDeviceMonitorTof::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
fulNumMessages++;
LOG(debug) << "Received message number "<< fulNumMessages
<< " with size " << msg->GetSize();
if( 0 == fulNumMessages % 10000 )
LOG(info) << "Received " << fulNumMessages << " messages";
std::string msgStr( static_cast<char*>( msg->GetData() ), msg->GetSize() );
std::istringstream iss( msgStr );
boost::archive::binary_iarchive inputArchive( iss );
/// Create an empty TS and fill it with the incoming message
fles::StorableTimeslice component{ 0 };
inputArchive >> component;
/// Process the Timeslice
DoUnpack(component, 0);
/// Send histograms each 100 time slices. Should be each ~1s
/// Use also runtime checker to trigger sending after M s if
/// processing too slow or delay sending if processing too fast
std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
if( ( fdMaxPublishTime < elapsedSeconds.count() ) ||
( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
{
SendHistograms();
fLastPublishTime = std::chrono::system_clock::now();
} // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
return true;
}
bool CbmDeviceMonitorTof::SendHistograms()
{
/// Serialize the array of histos into a single MQ message
FairMQMessagePtr message( NewMessage() );
Serialize<RootSerializer>( *message, &fArrayHisto );
// test code to check if deserialization works
/*
TObject* tempObject = nullptr;
Deserialize<RootDeserializer>(*message, tempObject);
if (TString(tempObject->ClassName()).EqualTo("TObjArray")) {
TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
LOG(info) << "Array contains " << arrayHisto->GetEntriesFast()
<< " entries";
for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) {
TObject* obj = arrayHisto->At(i);
LOG(info) << obj->GetName();
TH1* histogram = static_cast<TH1*>(obj);
LOG(info) << histogram->GetNbinsX();
}
}
*/
/// Send message to the common histogram messages queue
if( Send( message, fsChannelNameHistosInput ) < 0 )
{
LOG(error) << "Problem sending data";
return false;
} // if( Send( message, fsChannelNameHistosInput ) < 0 )
/// Reset the histograms after sending them (but do not reset the time)
fMonitorAlgo->ResetHistograms( kFALSE );
return true;
}
CbmDeviceMonitorTof::~CbmDeviceMonitorTof()
{
}
Bool_t CbmDeviceMonitorTof::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
{
fulTsCounter++;
if( kFALSE == fbComponentsAddedToList )
{
for( uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx )
{
if( kusSysIdTof == ts.descriptor( uCompIdx, 0 ).sys_id )
{
fMonitorAlgo->AddMsComponentToList( uCompIdx, kusSysIdTof );
} // if( kusSysIdTof == ts.descriptor( uCompIdx, 0 ).sys_id )
else if( kusSysIdT0 == ts.descriptor( uCompIdx, 0 ).sys_id )
{
fMonitorAlgo->AddMsComponentToList( uCompIdx, kusSysIdT0 );
} // if( kusSysIdT0 == ts.descriptor( uCompIdx, 0 ).sys_id )
} // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
fbComponentsAddedToList = kTRUE;
} // if( kFALSE == fbComponentsAddedToList )
if( /* fbMonitorMode && */ bMcbm2018MonitorTaskTofResetHistos )
{
LOG(info) << "Reset TOF Monitor histos ";
fMonitorAlgo->ResetHistograms();
bMcbm2018MonitorTaskTofResetHistos = kFALSE;
} // if( fbMonitorMode && bMcbm2018MonitorTaskTofResetHistos )
if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
{
LOG(error) << "Failed processing TS " << ts.index()
<< " in unpacker algorithm class";
return kTRUE;
} // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
/// Clear the digis vector in case it was filled
fMonitorAlgo->ClearVector();
if( 0 == fulTsCounter % 10000 )
LOG(info) << "Processed " << fulTsCounter << " time slices";
return kTRUE;
}
void CbmDeviceMonitorTof::Finish()
{
}