diff --git a/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx b/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx index bb172d52ce11b37b4b637e70531843e44dd48f2f..772c18451f55202ddc76807da6cd3275f9a80b09 100644 --- a/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx +++ b/MQ/mcbm/CbmDeviceBuildDigiEvents.cxx @@ -12,6 +12,7 @@ #include "CbmDeviceBuildDigiEvents.h" /// CBM headers +#include "CbmDigiEvent.h" #include "CbmEvent.h" #include "CbmFlesCanvasTools.h" #include "CbmMQDefs.h" @@ -72,6 +73,7 @@ try { fvsSetHistMaxDigiNb = fConfig->GetValue<std::vector<std::string>>("SetHistMaxDigiNb"); fbDoNotSend = fConfig->GetValue<bool>("DoNotSend"); + fbDigiEventOutput = fConfig->GetValue<bool>("DigiEventOutput"); fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn"); fsChannelNameDataOutput = fConfig->GetValue<std::string>("EvtNameOut"); fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn"); @@ -577,7 +579,14 @@ bool CbmDeviceBuildDigiEvents::HandleData(FairMQParts& parts, int /*index*/) fpAlgo->ProcessTs(); /// Send events vector to ouput - if (!fbDoNotSend && !SendEvents(parts)) return false; + if (!fbDoNotSend) { + if (fbDigiEventOutput) { + if (!(SendDigiEvents(parts))) return false; + } + else { + if (!(SendEvents(parts))) return false; + } + } /// Clear metadata fTimeSliceMetaDataArray->Clear(); @@ -664,6 +673,126 @@ bool CbmDeviceBuildDigiEvents::SendEvents(FairMQParts& partsIn) return true; } +bool CbmDeviceBuildDigiEvents::SendDigiEvents(FairMQParts& partsIn) +{ + /// Get vector reference from algo + std::vector<CbmEvent*> vEvents = fpAlgo->GetEventVector(); + + /// Move CbmEvent from temporary vector to std::vector of full objects + LOG(debug) << "In Vector size: " << vEvents.size(); + std::vector<CbmDigiEvent> vOutEvents; + vOutEvents.reserve(vEvents.size()); + for (CbmEvent* event : vEvents) { + CbmDigiEvent selEvent; + selEvent.fTime = event->GetStartTime(); + selEvent.fNumber = event->GetNumber(); + + /// FIXME: for pure digi based event, we select "continuous slices of digis" + /// => Copy block of [First Digi index, last digi index] with assign(it_start, it_stop) + /// FIXME: Keep TRD1D + TRD2D support, may lead to holes in the digi sequence! + /// => Would need to keep the loop + + /// Get the proper order for block selection as TRD1D and TRD2D may insert indices in separate loops + /// => Needed to ensure that the start and stop of the block copy do not trigger a vector size exception + event->SortIndices(); + /// for each detector, find the data in the Digi vectors and copy them + /// TODO: Template + loop on list of data types? + /// ==> T0 + uint32_t uNbDigis = (0 < event->GetNofData(ECbmDataType::kT0Digi) ? event->GetNofData(ECbmDataType::kT0Digi) : 0); + if (uNbDigis) { + auto startIt = fvDigiT0->begin() + event->GetIndex(ECbmDataType::kT0Digi, 0); + auto stopIt = fvDigiT0->begin() + event->GetIndex(ECbmDataType::kT0Digi, uNbDigis - 1); + ++stopIt; + selEvent.fData.fT0.fDigis.assign(startIt, stopIt); + } + + /// ==> STS + uNbDigis = (0 < event->GetNofData(ECbmDataType::kStsDigi) ? event->GetNofData(ECbmDataType::kStsDigi) : 0); + if (uNbDigis) { + auto startIt = fvDigiSts->begin() + event->GetIndex(ECbmDataType::kStsDigi, 0); + auto stopIt = fvDigiSts->begin() + event->GetIndex(ECbmDataType::kStsDigi, uNbDigis - 1); + ++stopIt; + selEvent.fData.fSts.fDigis.assign(startIt, stopIt); + } + + /// ==> MUCH + uNbDigis = (0 < event->GetNofData(ECbmDataType::kMuchDigi) ? event->GetNofData(ECbmDataType::kMuchDigi) : 0); + if (uNbDigis) { + auto startIt = fvDigiMuch->begin() + event->GetIndex(ECbmDataType::kMuchDigi, 0); + auto stopIt = fvDigiMuch->begin() + event->GetIndex(ECbmDataType::kMuchDigi, uNbDigis - 1); + ++stopIt; + selEvent.fData.fMuch.fDigis.assign(startIt, stopIt); + } + + /// ==> TRD + TRD2D + uNbDigis = (0 < event->GetNofData(ECbmDataType::kTrdDigi) ? event->GetNofData(ECbmDataType::kTrdDigi) : 0); + if (uNbDigis) { + auto startIt = fvDigiTrd->begin() + event->GetIndex(ECbmDataType::kTrdDigi, 0); + auto stopIt = fvDigiTrd->begin() + event->GetIndex(ECbmDataType::kTrdDigi, uNbDigis - 1); + ++stopIt; + selEvent.fData.fTrd.fDigis.assign(startIt, stopIt); + } + + /// ==> TOF + uNbDigis = (0 < event->GetNofData(ECbmDataType::kTofDigi) ? event->GetNofData(ECbmDataType::kTofDigi) : 0); + if (uNbDigis) { + auto startIt = fvDigiTof->begin() + event->GetIndex(ECbmDataType::kTofDigi, 0); + auto stopIt = fvDigiTof->begin() + event->GetIndex(ECbmDataType::kTofDigi, uNbDigis - 1); + ++stopIt; + selEvent.fData.fTof.fDigis.assign(startIt, stopIt); + } + + /// ==> RICH + uNbDigis = (0 < event->GetNofData(ECbmDataType::kRichDigi) ? event->GetNofData(ECbmDataType::kRichDigi) : 0); + if (uNbDigis) { + auto startIt = fvDigiRich->begin() + event->GetIndex(ECbmDataType::kRichDigi, 0); + auto stopIt = fvDigiRich->begin() + event->GetIndex(ECbmDataType::kRichDigi, uNbDigis - 1); + ++stopIt; + selEvent.fData.fRich.fDigis.assign(startIt, stopIt); + } + + /// ==> PSD + uNbDigis = (0 < event->GetNofData(ECbmDataType::kPsdDigi) ? event->GetNofData(ECbmDataType::kPsdDigi) : 0); + if (uNbDigis) { + auto startIt = fvDigiPsd->begin() + event->GetIndex(ECbmDataType::kPsdDigi, 0); + auto stopIt = fvDigiPsd->begin() + event->GetIndex(ECbmDataType::kPsdDigi, uNbDigis - 1); + ++stopIt; + selEvent.fData.fPsd.fDigis.assign(startIt, stopIt); + } + + vOutEvents.push_back(std::move(selEvent)); + } + + LOG(debug) << "Out Vector size: " << vEvents.size(); + /// Serialize the array of events into a single MQ message + std::stringstream ossEvt; + boost::archive::binary_oarchive oaEvt(ossEvt); + oaEvt << vOutEvents; + std::string* strMsgEvt = new std::string(ossEvt.str()); + FairMQMessagePtr message(NewMessage(const_cast<char*>(strMsgEvt->c_str()), // data + strMsgEvt->length(), // size + [](void*, void* object) { delete static_cast<std::string*>(object); }, + strMsgEvt)); // object that manages the data + LOG(debug) << "Serializing done"; + + /// Make a new composed messaged with TsHeader + vector of Digi Event + TsMetaData + /// FIXME: Find out if possible to use only the boost serializer + FairMQParts partsOut; + partsOut.AddPart(std::move(partsIn.At(0))); // TsHeader + partsOut.AddPart(std::move(partsIn.At(partsIn.Size() - 1))); // TsMetaData + partsOut.AddPart(std::move(message)); // DigiEvent vector + LOG(debug) << "Message preparation done"; + + if (Send(partsOut, fsChannelNameDataOutput) < 0) { + LOG(error) << "Problem sending data to " << fsChannelNameDataOutput; + return false; + } + + vOutEvents.clear(); + + return true; +} + bool CbmDeviceBuildDigiEvents::SendHistoConfAndData() { /// Prepare multiparts message and header diff --git a/MQ/mcbm/CbmDeviceBuildDigiEvents.h b/MQ/mcbm/CbmDeviceBuildDigiEvents.h index 3631c31ea864a11e5e9a7128dc9a7b924ce4ac2e..42af1bcfdcf6cf50836f5046758ecacae09d437b 100644 --- a/MQ/mcbm/CbmDeviceBuildDigiEvents.h +++ b/MQ/mcbm/CbmDeviceBuildDigiEvents.h @@ -67,7 +67,8 @@ private: std::vector<std::string> fvsSetTrigMinLayersNb = {}; std::vector<std::string> fvsSetHistMaxDigiNb = {}; /// I/O control - bool fbDoNotSend = false; + bool fbDoNotSend = false; + bool fbDigiEventOutput = false; /// message queues std::string fsChannelNameDataInput = "unpts_0"; std::string fsChannelNameDataOutput = "events"; @@ -133,6 +134,7 @@ private: bool InitHistograms(); void Finish(); bool SendEvents(FairMQParts& partsIn); + bool SendDigiEvents(FairMQParts& partsIn); bool SendHistoConfAndData(); bool SendHistograms(); }; diff --git a/MQ/mcbm/CbmDeviceDigiEventSink.cxx b/MQ/mcbm/CbmDeviceDigiEventSink.cxx index d8d359cd759bcbbf4d990a147fba3b2247788f92..c184d93abf5185c2a4e52f4553f174b41f9a58c3 100644 --- a/MQ/mcbm/CbmDeviceDigiEventSink.cxx +++ b/MQ/mcbm/CbmDeviceDigiEventSink.cxx @@ -35,6 +35,7 @@ #include "TCanvas.h" #include "TFile.h" #include "TH1.h" +#include "TProfile.h" #include "TList.h" #include "TNamed.h" @@ -73,6 +74,7 @@ try { fbWriteMissingTs = fConfig->GetValue<bool>("WriteMissingTs"); fbDisableCompression = fConfig->GetValue<bool>("DisableCompression"); fiTreeFileMaxSize = fConfig->GetValue<int64_t>("TreeFileMaxSize"); + fbDigiEventInput = fConfig->GetValue<bool>("DigiEventInput"); fbFillHistos = fConfig->GetValue<bool>("FillHistos"); fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs"); @@ -216,9 +218,9 @@ bool CbmDeviceDigiEventSink::InitHistograms() std::vector<std::pair<TNamed*, std::string>> vHistos = {}; fhFullTsBuffSizeEvo = - new TH1I("hFullTsBuffSizeEvo", "Evo. of the full TS buffer size; Time in run [s]; Size []", 720, 0, 7200); + new TProfile("hFullTsBuffSizeEvo", "Evo. of the full TS buffer size; Time in run [s]; Size []", 720, 0, 7200); fhMissTsBuffSizeEvo = - new TH1I("hMissTsBuffSizeEvo", "Evo. of the missed TS buffer size; Time in run [s]; Size []", 720, 0, 7200); + new TProfile("hMissTsBuffSizeEvo", "Evo. of the missed TS buffer size; Time in run [s]; Size []", 720, 0, 7200); fhFullTsProcEvo = new TH1I("hFullTsProcEvo", "Processed full TS; Time in run [s]; # []", 720, 0, 7200); fhMissTsProcEvo = new TH1I("hMissTsProcEvo", "Processed missing TS; Time in run [s]; # []", 720, 0, 7200); fhTotalTsProcEvo = new TH1I("hTotalTsProcEvo", "Total processed TS; Time in run [s]; # []", 720, 0, 7200); @@ -349,7 +351,7 @@ bool CbmDeviceDigiEventSink::HandleData(FairMQParts& parts, int /*index*/) if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages"; /// Unpack the message - CbmEventTimeslice unpTs(parts); + CbmEventTimeslice unpTs(parts, fbDigiEventInput); /// FIXME: Need to check if TS arrived in order (probably not!!!) + buffer!!! LOG(debug) << "Next TS check " << fuPrevTsIndex << " " << fulTsCounter << " " << unpTs.fTsMetaData.GetIndex() @@ -404,19 +406,31 @@ bool CbmDeviceDigiEventSink::HandleData(FairMQParts& parts, int /*index*/) /// Fill histograms every 5 or more seconds /// TODO: make it a parameter std::chrono::duration<double_t> elapsedSecondsFill = currentTime - fLastFillTime; - if (5.0 < elapsedSecondsFill.count()) { + if (1.0 < elapsedSecondsFill.count()) { std::chrono::duration<double_t> secInRun = currentTime - fStartTime; /// Rely on the fact that all histos have same X axis to avoid multiple "current bin" search + /* int32_t iBinIndex = fhFullTsBuffSizeEvo->FindBin(secInRun.count()); fhFullTsBuffSizeEvo->SetBinContent(iBinIndex, fmFullTsStorage.size()); fhMissTsBuffSizeEvo->SetBinContent(iBinIndex, fvulMissedTsIndices.size()); fhFullTsProcEvo->SetBinContent(iBinIndex, fulTsCounter); fhMissTsProcEvo->SetBinContent(iBinIndex, fulMissedTsCounter); fhTotalTsProcEvo->SetBinContent(iBinIndex, (fulTsCounter + fulMissedTsCounter)); - fhTotalEventsEvo->SetBinContent(iBinIndex, fmFullTsStorage.size()); + fhTotalEventsEvo->SetBinContent(iBinIndex, fulProcessedEvents); + */ + fhFullTsBuffSizeEvo->Fill(secInRun.count(), fmFullTsStorage.size()); + fhMissTsBuffSizeEvo->Fill(secInRun.count(), fvulMissedTsIndices.size()); + fhFullTsProcEvo->Fill(secInRun.count(), (fulTsCounter - fulLastFullTsCounter)); + fhMissTsProcEvo->Fill(secInRun.count(), (fulMissedTsCounter - fulLastMissTsCounter)); + fhTotalTsProcEvo->Fill(secInRun.count(), + (fulTsCounter - fulLastFullTsCounter + fulMissedTsCounter - fulLastMissTsCounter)); + fhTotalEventsEvo->Fill(secInRun.count(), fulProcessedEvents -fulLastProcessedEvents); fLastFillTime = currentTime; + fulLastFullTsCounter = fulTsCounter; + fulLastMissTsCounter = fulMissedTsCounter; + fulLastProcessedEvents = fulProcessedEvents; } /// Send histograms each N timeslices. @@ -592,7 +606,7 @@ void CbmDeviceDigiEventSink::PrepareTreeEntry(CbmEventTimeslice unpTs) new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()]) TimesliceMetaData(std::move(unpTs.fTsMetaData)); - /// Extract CbmEvent TClonesArray from input message + /// Extract CbmEvent vector from input message (*fEventsSel) = std::move(unpTs.GetSelectedData()); if (kTRUE == fbFillHistos) { /// Accumulated counts, will show rise + plateau pattern in spill @@ -768,99 +782,150 @@ void CbmDeviceDigiEventSink::Finish() fbFinishDone = kTRUE; } -CbmEventTimeslice::CbmEventTimeslice(FairMQParts& parts) + +CbmEventTimeslice::CbmEventTimeslice(FairMQParts& parts, bool bDigiEvtInput) { - /// Extract unpacked data from input message + fbDigiEvtInput =bDigiEvtInput; + uint32_t uPartIdx = 0; - /// TODO: code order of vectors in the TS header!! + if (fbDigiEvtInput) { + /// Digi events => Extract selected data from input message + if (3 != parts.Size()) { + LOG(error) << "CbmEventTimeslice::CbmEventTimeslice => Wrong number of parts to deserialize DigiEvents: " + << parts.Size() << " VS 3!"; + LOG(fatal) << "Probably the wrong value was used for the option DigiEventInput of the Sink or DigiEventOutput of " + << "the event builder"; + } - /// TS header - TObject* tempObjectPointer = nullptr; - RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer); - if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("CbmTsEventHeader")) { - fCbmTsEventHeader = *(static_cast<CbmTsEventHeader*>(tempObjectPointer)); - } - else { - LOG(fatal) << "Failed to deserialize the TS header"; - } - ++uPartIdx; - - /// T0 - std::string msgStrT0(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); - std::istringstream issT0(msgStrT0); - boost::archive::binary_iarchive inputArchiveT0(issT0); - inputArchiveT0 >> fvDigiT0; - ++uPartIdx; - - /// STS - std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); - std::istringstream issSts(msgStrSts); - boost::archive::binary_iarchive inputArchiveSts(issSts); - inputArchiveSts >> fvDigiSts; - ++uPartIdx; - - /// MUCH - std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); - std::istringstream issMuch(msgStrMuch); - boost::archive::binary_iarchive inputArchiveMuch(issMuch); - inputArchiveMuch >> fvDigiMuch; - ++uPartIdx; - - /// TRD - std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); - std::istringstream issTrd(msgStrTrd); - boost::archive::binary_iarchive inputArchiveTrd(issTrd); - inputArchiveTrd >> fvDigiTrd; - ++uPartIdx; - - /// T0F - std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); - std::istringstream issTof(msgStrTof); - boost::archive::binary_iarchive inputArchiveTof(issTof); - inputArchiveTof >> fvDigiTof; - ++uPartIdx; - - /// RICH - std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); - std::istringstream issRich(msgStrRich); - boost::archive::binary_iarchive inputArchiveRich(issRich); - inputArchiveRich >> fvDigiRich; - ++uPartIdx; - - /// PSD - std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); - std::istringstream issPsd(msgStrPsd); - boost::archive::binary_iarchive inputArchivePsd(issPsd); - inputArchivePsd >> fvDigiPsd; - ++uPartIdx; - - /// TS metadata - tempObjectPointer = nullptr; - RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer); - - if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("TimesliceMetaData")) { - fTsMetaData = *(static_cast<TimesliceMetaData*>(tempObjectPointer)); + /// (1) TS header + TObject* tempObjectPointer = nullptr; + RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer); + if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("CbmTsEventHeader")) { + fCbmTsEventHeader = *(static_cast<CbmTsEventHeader*>(tempObjectPointer)); + } + else { + LOG(fatal) << "Failed to deserialize the TS header"; + } + ++uPartIdx; + + /// (2) TS metadata + tempObjectPointer = nullptr; + RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer); + + if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("TimesliceMetaData")) { + fTsMetaData = *(static_cast<TimesliceMetaData*>(tempObjectPointer)); + } + else { + LOG(fatal) << "Failed to deserialize the TS metadata"; + } + ++uPartIdx; + + /// (3) Events + std::string msgStrEvt(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issEvt(msgStrEvt); + boost::archive::binary_iarchive inputArchiveEvt(issEvt); + inputArchiveEvt >> fvDigiEvents; + ++uPartIdx; + + LOG(debug) << "Input event array " << fvDigiEvents.size(); } else { - LOG(fatal) << "Failed to deserialize the TS metadata"; - } - ++uPartIdx; + /// Raw data + raw events => Extract unpacked data from input message + if (10 != parts.Size()) { + LOG(error) << "CbmEventTimeslice::CbmEventTimeslice => Wrong number of parts to deserialize raw data + events: " + << parts.Size() << " VS 10!"; + LOG(fatal) << "Probably the wrong value was used for the option DigiEventInput of the Sink or DigiEventOutput of " + << "the event builder"; + } - /// Events - /// FIXME: Find out if possible to use only the boost serializer/deserializer - /* - std::string msgStrEvt(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); - std::istringstream issEvt(msgStrEvt); - boost::archive::binary_iarchive inputArchiveEvt(issEvt); - inputArchiveEvt >> fvEvents; - ++uPartIdx; - LOG(info) << "Input event array " << fvEvents.size(); - */ - std::vector<CbmEvent>* pvOutEvents = nullptr; - RootSerializer().Deserialize(*parts.At(uPartIdx), pvOutEvents); - fvEvents = std::move(*pvOutEvents); - LOG(debug) << "Input event array " << fvEvents.size(); + /// (1) TS header + TObject* tempObjectPointer = nullptr; + RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer); + if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("CbmTsEventHeader")) { + fCbmTsEventHeader = *(static_cast<CbmTsEventHeader*>(tempObjectPointer)); + } + else { + LOG(fatal) << "Failed to deserialize the TS header"; + } + ++uPartIdx; + + /// (2) T0 + std::string msgStrT0(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issT0(msgStrT0); + boost::archive::binary_iarchive inputArchiveT0(issT0); + inputArchiveT0 >> fvDigiT0; + ++uPartIdx; + + /// (3) STS + std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issSts(msgStrSts); + boost::archive::binary_iarchive inputArchiveSts(issSts); + inputArchiveSts >> fvDigiSts; + ++uPartIdx; + + /// (4) MUCH + std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issMuch(msgStrMuch); + boost::archive::binary_iarchive inputArchiveMuch(issMuch); + inputArchiveMuch >> fvDigiMuch; + ++uPartIdx; + + /// (5) TRD + std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issTrd(msgStrTrd); + boost::archive::binary_iarchive inputArchiveTrd(issTrd); + inputArchiveTrd >> fvDigiTrd; + ++uPartIdx; + + /// (6) T0F + std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issTof(msgStrTof); + boost::archive::binary_iarchive inputArchiveTof(issTof); + inputArchiveTof >> fvDigiTof; + ++uPartIdx; + + /// (7) RICH + std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issRich(msgStrRich); + boost::archive::binary_iarchive inputArchiveRich(issRich); + inputArchiveRich >> fvDigiRich; + ++uPartIdx; + + /// (8) PSD + std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issPsd(msgStrPsd); + boost::archive::binary_iarchive inputArchivePsd(issPsd); + inputArchivePsd >> fvDigiPsd; + ++uPartIdx; + + /// (9) TS metadata + tempObjectPointer = nullptr; + RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer); + + if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("TimesliceMetaData")) { + fTsMetaData = *(static_cast<TimesliceMetaData*>(tempObjectPointer)); + } + else { + LOG(fatal) << "Failed to deserialize the TS metadata"; + } + ++uPartIdx; + + /// (10) Events + /// FIXME: Find out if possible to use only the boost serializer/deserializer + /* + std::string msgStrEvt(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize()); + std::istringstream issEvt(msgStrEvt); + boost::archive::binary_iarchive inputArchiveEvt(issEvt); + inputArchiveEvt >> fvEvents; + ++uPartIdx; + LOG(info) << "Input event array " << fvEvents.size(); + */ + std::vector<CbmEvent>* pvOutEvents = nullptr; + RootSerializer().Deserialize(*parts.At(uPartIdx), pvOutEvents); + fvEvents = std::move(*pvOutEvents); + LOG(debug) << "Input event array " << fvEvents.size(); + } } CbmEventTimeslice::~CbmEventTimeslice() @@ -873,13 +938,12 @@ CbmEventTimeslice::~CbmEventTimeslice() fvDigiRich.clear(); fvDigiPsd.clear(); fvEvents.clear(); + fvDigiEvents.clear(); } - -std::vector<CbmDigiEvent> CbmEventTimeslice::GetSelectedData() +void CbmEventTimeslice::ExtractSelectedData() { - std::vector<CbmDigiEvent> vEventsSel; - vEventsSel.reserve(fvEvents.size()); + fvDigiEvents.reserve(fvEvents.size()); /// Loop on events in input vector for (CbmEvent event : fvEvents) { @@ -1075,8 +1139,6 @@ std::vector<CbmDigiEvent> CbmEventTimeslice::GetSelectedData() */ } - vEventsSel.push_back(selEvent); + fvDigiEvents.push_back(selEvent); } - - return vEventsSel; } diff --git a/MQ/mcbm/CbmDeviceDigiEventSink.h b/MQ/mcbm/CbmDeviceDigiEventSink.h index b3f588b553316a1ce591a5503264688b2fa20506..a91e38280ef5b735375fcf3240387dc65ab285af 100644 --- a/MQ/mcbm/CbmDeviceDigiEventSink.h +++ b/MQ/mcbm/CbmDeviceDigiEventSink.h @@ -42,6 +42,7 @@ class TCanvas; class TFile; class TH1; +class TProfile; class TList; class TClonesArray; //class TimesliceMetaData; @@ -51,13 +52,20 @@ class FairRootManager; class CbmEventTimeslice { /// TODO: rename to CbmTsWithEvents public: - CbmEventTimeslice(FairMQParts& parts); + CbmEventTimeslice(FairMQParts& parts, bool bDigiEvtInput = false); ~CbmEventTimeslice(); - std::vector<CbmDigiEvent> GetSelectedData(); + void ExtractSelectedData(); + std::vector<CbmDigiEvent>& GetSelectedData(){ + if (!fbDigiEvtInput) ExtractSelectedData(); + return fvDigiEvents; + } + /// Input Type + bool fbDigiEvtInput = false; /// TS information in header CbmTsEventHeader fCbmTsEventHeader; + /// Raw data std::vector<CbmTofDigi> fvDigiT0; std::vector<CbmStsDigi> fvDigiSts; std::vector<CbmMuchDigi> fvDigiMuch; @@ -65,8 +73,12 @@ public: std::vector<CbmTofDigi> fvDigiTof; std::vector<CbmRichDigi> fvDigiRich; std::vector<CbmPsdDigi> fvDigiPsd; + /// extra Metadata TimesliceMetaData fTsMetaData; + /// Raw events std::vector<CbmEvent> fvEvents; + /// Digi events + std::vector<CbmDigiEvent> fvDigiEvents; }; class CbmDeviceDigiEventSink : public FairMQDevice { @@ -88,6 +100,7 @@ private: bool fbBypassConsecutiveTs = false; //! Switch ON/OFF the bypass of the consecutive TS buffer before writing to file bool fbWriteMissingTs = false; //! Switch ON/OFF writing of empty TS to file for the missing ones (if no bypass) bool fbDisableCompression = false; //! Switch ON/OFF the ROOT file compression + bool fbDigiEventInput = false; //! Switch ON/OFF the input of CbmDigiEvents instead of raw data + CbmEvents bool fbFillHistos = false; //! Switch ON/OFF filling of histograms bool fbInitDone = false; //! Keep track of whether the Init was already fully completed bool fbFinishDone = false; //! Keep track of whether the Finish was already called @@ -119,8 +132,11 @@ private: uint64_t fulNumMessages = 0; uint64_t fulTsCounter = 0; uint64_t fulMissedTsCounter = 0; - std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now(); uint64_t fulProcessedEvents = 0; + uint64_t fulLastFullTsCounter = 0; + uint64_t fulLastMissTsCounter = 0; + uint64_t fulLastProcessedEvents = 0; + std::chrono::system_clock::time_point fLastPublishTime = std::chrono::system_clock::now(); std::chrono::system_clock::time_point fLastFillTime = std::chrono::system_clock::now(); std::chrono::system_clock::time_point fStartTime = std::chrono::system_clock::now(); @@ -167,8 +183,8 @@ private: /// Flag indicating whether the histograms and canvases configurations were already published bool fbConfigSent = false; - TH1* fhFullTsBuffSizeEvo; - TH1* fhMissTsBuffSizeEvo; + TProfile* fhFullTsBuffSizeEvo; + TProfile* fhMissTsBuffSizeEvo; TH1* fhFullTsProcEvo; TH1* fhMissTsProcEvo; TH1* fhTotalTsProcEvo; diff --git a/MQ/mcbm/runBuildDigiEvents.cxx b/MQ/mcbm/runBuildDigiEvents.cxx index 8237c2176260df92e4e0301d92e772fdca3ff58f..894a683054ea08613652e935431c7c646bc9704d 100644 --- a/MQ/mcbm/runBuildDigiEvents.cxx +++ b/MQ/mcbm/runBuildDigiEvents.cxx @@ -42,6 +42,8 @@ void addCustomOptions(bpo::options_description& options) "Set max nb of digi in histograms for selected detector, use string matching " "ECbmModuleId,dMaxDigiNb e.g. kTof,1000"); options.add_options()("DoNotSend", bpo::value<bool>()->default_value(false), "Disable the sending of data if true"); + options.add_options()("DigiEventOutput", bpo::value<bool>()->default_value(false), + "Enable output of CbmDigiEvents instead of raw data + CbmEvents if true"); options.add_options()("TsNameIn", bpo::value<std::string>()->default_value("unpts_0"), "MQ channel name for unpacked TS data"); options.add_options()("EvtNameOut", bpo::value<std::string>()->default_value("events"), diff --git a/MQ/mcbm/runDigiEventSink.cxx b/MQ/mcbm/runDigiEventSink.cxx index 94c750ce5746cd33001a7168240c3888d9563022..c6df0f8088914c815b34d44251cd338ba9e3ce64 100644 --- a/MQ/mcbm/runDigiEventSink.cxx +++ b/MQ/mcbm/runDigiEventSink.cxx @@ -31,6 +31,9 @@ void addCustomOptions(bpo::options_description& options) options.add_options()("TreeFileMaxSize", bpo::value<int64_t>()->default_value(10000000000LL), "Set the maximum output tree size (~file size) in bytes"); + options.add_options()("DigiEventInput", bpo::value<bool>()->default_value(false), + "Enable the input of CbmDigiEvents instead of raw data + CbmEvents if true"); + options.add_options()("FillHistos", bpo::value<bool>()->default_value(false), "Fill histograms and send them to histo server if true");