Commit 141d412c authored by Pierre-Alain Loizeau's avatar Pierre-Alain Loizeau
Browse files

[MQ] In digi evt sink, add an option to bypass the buffer to write TS in order with no holes

parent 88f06195
......@@ -473,6 +473,8 @@ bool CbmMqHistoServer::SaveHistograms()
// open separate histo file in recreate mode
histoFile = new TFile(fsHistoFileName.data(), "RECREATE");
LOG(info) << "Save Histos in file " << fsHistoFileName.data();
if (nullptr == histoFile) return false;
/// Register the histos in the HTTP server
......
......@@ -69,6 +69,8 @@ try {
fsChannelNameDataInput = fConfig->GetValue<std::string>("EvtNameIn");
fsAllowedChannels[0] = fsChannelNameDataInput;
fbBypassConsecutiveTs = fConfig->GetValue<bool>("BypassConsecutiveTs");
fbFillHistos = fConfig->GetValue<bool>("FillHistos");
fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
......@@ -270,9 +272,9 @@ bool CbmDeviceDigiEventSink::HandleData(FairMQParts& parts, int /*index*/)
CbmEventTimeslice unpTs(parts);
/// FIXME: Need to check if TS arrived in order (probably not!!!) + buffer!!!
LOG(info) << "Next TS check " << fuPrevTsIndex << " " << fulTsCounter << " " << unpTs.fTsMetaData.GetIndex()
<< " Sorage size: " << fmFullTsStorage.size();
if (fuPrevTsIndex + 1 == unpTs.fTsMetaData.GetIndex()
LOG(debug) << "Next TS check " << fuPrevTsIndex << " " << fulTsCounter << " " << unpTs.fTsMetaData.GetIndex()
<< " Storage size: " << fmFullTsStorage.size();
if (fbBypassConsecutiveTs || (fuPrevTsIndex + 1 == unpTs.fTsMetaData.GetIndex())
|| (0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == unpTs.fTsMetaData.GetIndex())) {
LOG(debug) << "TS direct to dump";
/// Fill all storage variables registers for data output
......@@ -294,9 +296,26 @@ bool CbmDeviceDigiEventSink::HandleData(FairMQParts& parts, int /*index*/)
/// Clear metadata => crashes, maybe not needed as due to move the pointer is invalidated?
// delete fTsMetaData;
/// Check TS queue and process it if needed (in case it filled a hole!)
CheckTsQueues();
LOG(debug) << "TS queues checked";
if (fbBypassConsecutiveTs) {
/// Skip checking the TS buffer as writing straight to file
/// => Just check if we are done and can close the file or not
if (fbReceivedEof) {
/// In this case we cannot check if the last TS received/processed is the final one due to lack of order
/// => use instead the fact that we received all expected TS
if ((fulTsCounter + fvulMissedTsIndices.size()) == fuTotalTsCount) {
LOG(info) << "CbmDeviceDigiEventSink::HandleData => "
<< "Found all expected TS (" << fulTsCounter << ") and total nb of TS " << fuTotalTsCount
<< " after accounting for the ones reported as missing by the source (" << fvulMissedTsIndices.size()
<< ")";
Finish();
} // if ((fulTsCounter + fvulMissedTsIndices.size()) == fuTotalTsCount)
}
}
else {
/// Check TS queue and process it if needed (in case it filled a hole!)
CheckTsQueues();
LOG(debug) << "TS queues checked";
}
/// Histograms management
if (kTRUE == fbFillHistos) {
......@@ -613,7 +632,7 @@ CbmDeviceDigiEventSink::~CbmDeviceDigiEventSink()
{
/// FIXME: Add pointers check before delete
/// Close things properly if not alredy done
/// Close things properly if not already done
if (!fbFinishDone) Finish();
/// Clear events vector
......
......@@ -82,9 +82,10 @@ private:
/// Constants
/// Control flags
Bool_t fbStoreFullTs = false; //! If true, store digis vectors with full TS in addition to selected events
Bool_t fbFillHistos = false; //! Switch ON/OFF filling of histograms
Bool_t fbFinishDone = false; //! Keep track of whether the Finish was already called
bool fbStoreFullTs = false; //! If true, store digis vectors with full TS in addition to selected events
bool fbBypassConsecutiveTs = false; //! Switch ON/OFF the bypass of the consecutive TS buffer before writing to file
bool fbFillHistos = false; //! Switch ON/OFF filling of histograms
bool fbFinishDone = false; //! Keep track of whether the Finish was already called
/// User settings parameters
/// Algo enum settings
......
......@@ -20,6 +20,8 @@ void addCustomOptions(bpo::options_description& options)
"Name (full or relative path) of the output .root file ");
options.add_options()("EvtNameIn", bpo::value<std::string>()->default_value("events"),
"MQ channel name for built events");
options.add_options()("BypassConsecutiveTs", bpo::value<bool>()->default_value(false),
"Do not wait for having consecutive TS in buffer before writing to file if true");
options.add_options()("FillHistos", bpo::value<bool>()->default_value(false),
"Fill histograms and send them to histo server if true");
......
......@@ -285,6 +285,7 @@ EVTSINK+=" --id evtsink1"
EVTSINK+=" --severity info"
#EVTSINK+=" --severity debug"
#EVTSINK+=" --StoreFullTs 1"
#EVTSINK+=" --BypassConsecutiveTs 1"
EVTSINK+=" --OutFileName mcbm_digis_events.root"
EVTSINK+=" --FillHistos true"
EVTSINK+=" --PubFreqTs $_pubfreqts"
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment