Skip to content
Snippets Groups Projects
Commit f73626c5 authored by Felix Weiglhofer's avatar Felix Weiglhofer
Browse files

online: Monitor idle time and time spend writing output.

parent e3a7301b
No related branches found
No related tags found
1 merge request!1811online: Monitor idle time and time spend writing output.
Pipeline #29329 passed
...@@ -144,6 +144,7 @@ set(SRCS ...@@ -144,6 +144,7 @@ set(SRCS
detectors/rich/Unpack.cxx detectors/rich/Unpack.cxx
detectors/rich/UnpackMS.cxx detectors/rich/UnpackMS.cxx
global/ParFiles.cxx global/ParFiles.cxx
global/StorableRecoResults.cxx
global/Reco.cxx global/Reco.cxx
global/RecoResultsInputArchive.cxx global/RecoResultsInputArchive.cxx
global/RecoResultsOutputArchive.cxx global/RecoResultsOutputArchive.cxx
......
...@@ -142,6 +142,11 @@ namespace cbm::algo ...@@ -142,6 +142,11 @@ namespace cbm::algo
*/ */
size_t NElements() const { return fData.size(); } size_t NElements() const { return fData.size(); }
/**
* @brief Return total size in bytes of the underlying data.
*/
size_t SizeBytes() const { return fData.size() * sizeof(T); }
/** /**
* @brief Get the underlying data. * @brief Get the underlying data.
*/ */
......
...@@ -573,3 +573,18 @@ void Reco::QueueProcessingMetrics(const ProcessingMonitor& mon) ...@@ -573,3 +573,18 @@ void Reco::QueueProcessingMetrics(const ProcessingMonitor& mon)
GetMonitor().QueueMetric("cbmreco", {{"hostname", fles::system::current_hostname()}, {"child", Opts().ChildId()}}, GetMonitor().QueueMetric("cbmreco", {{"hostname", fles::system::current_hostname()}, {"child", Opts().ChildId()}},
std::move(fields)); std::move(fields));
} }
void Reco::QueueProcessingExtraMetrics(const ProcessingExtraMonitor& mon)
{
if (!HasMonitor()) {
return;
}
MetricFieldSet fields = {{"processingTimeIdle", FilterNan(mon.timeIdle)},
{"processingTimeWriteArchive", mon.timeWriteArchive.wall()},
{"processingThroughputWriteArchive", FilterNan(mon.timeWriteArchive.throughput())},
{"processingBytesWritten", FilterNan(mon.bytesWritten)}};
GetMonitor().QueueMetric("cbmreco", {{"hostname", fles::system::current_hostname()}, {"child", Opts().ChildId()}},
std::move(fields));
}
...@@ -97,6 +97,16 @@ namespace cbm::algo ...@@ -97,6 +97,16 @@ namespace cbm::algo
std::optional<i64> tsDelta; //< id difference between current and previous timeslice std::optional<i64> tsDelta; //< id difference between current and previous timeslice
}; };
/**
* @brief Monitor for additional processing steps
* @note Used in the main function, this should be eventually merged with ProcessingMonitor and we have a single class that handles the full processing loop
*/
struct ProcessingExtraMonitor {
xpu::timings timeWriteArchive; //< time spent writing archive
size_t bytesWritten; //< bytes written to archive (estimated)
double timeIdle = 0.; //< time spent idle (waiting for next timeslice) [ms]
};
class Reco : SubChain { class Reco : SubChain {
public: public:
Reco(); Reco();
...@@ -112,6 +122,8 @@ namespace cbm::algo ...@@ -112,6 +122,8 @@ namespace cbm::algo
void Finalize(); void Finalize();
void PrintTimings(xpu::timings&); void PrintTimings(xpu::timings&);
void QueueProcessingExtraMetrics(const ProcessingExtraMonitor&);
private: private:
bool fInitialized = false; bool fInitialized = false;
ChainContext fContext; ChainContext fContext;
......
/* Copyright (C) 2024 FIAS Frankfurt Institute for Advanced Studies, Frankfurt / Main
SPDX-License-Identifier: GPL-3.0-only
Authors: Felix Weiglhofer [committer], P.-A. Loizeau */
#include "StorableRecoResults.h"
using namespace cbm::algo;
size_t StorableRecoResults::SizeBytes() const
{
size_t size = 0;
size += fBmonDigis.size() * sizeof(CbmBmonDigi);
size += fStsDigis.size() * sizeof(CbmStsDigi);
size += fMuchDigis.size() * sizeof(CbmMuchDigi);
size += fTrd2dDigis.size() * sizeof(CbmTrdDigi);
size += fTrdDigis.size() * sizeof(CbmTrdDigi);
size += fTofDigis.size() * sizeof(CbmTofDigi);
size += fRichDigis.size() * sizeof(CbmRichDigi);
for (const auto& ev : fDigiEvents) {
size += ev.fData.SizeBytes();
}
size += fStsClusters.SizeBytes();
size += fStsHits.SizeBytes();
size += fTofHits.SizeBytes();
size += fTrdHits.SizeBytes();
size += fTracks.size() * sizeof(ca::Track);
// Exclude TrackHitIndexContainers for now to avoid looping over all tracks
// Better way to do this: Just query from boost the size of the written archive.
// Requires changes in flesnet to the archive classes for this
return size;
}
...@@ -44,6 +44,11 @@ namespace cbm::algo ...@@ -44,6 +44,11 @@ namespace cbm::algo
*/ */
uint64_t TsStartTime() const { return fTsStartTime; } uint64_t TsStartTime() const { return fTsStartTime; }
/**
* @brief Total size in bytes
*/
size_t SizeBytes() const;
std::vector<CbmBmonDigi>& BmonDigis() { return fBmonDigis; } std::vector<CbmBmonDigi>& BmonDigis() { return fBmonDigis; }
const std::vector<CbmBmonDigi>& BmonDigis() const { return fBmonDigis; } const std::vector<CbmBmonDigi>& BmonDigis() const { return fBmonDigis; }
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
**/ **/
class CbmDigiData { class CbmDigiData {
public: public:
CbmBmonDigiData fBmon; ///< Beam monitor data CbmBmonDigiData fBmon; ///< Beam monitor data
CbmStsDigiData fSts; ///< STS data CbmStsDigiData fSts; ///< STS data
CbmMuchDigiData fMuch; ///< MUCH data CbmMuchDigiData fMuch; ///< MUCH data
...@@ -94,6 +94,22 @@ public: ...@@ -94,6 +94,22 @@ public:
default: return 0; break; default: return 0; break;
} }
} }
/** @brief Return total size in bytes */
size_t SizeBytes() const
{
size_t size = 0;
size += fBmon.Size() * sizeof(CbmBmonDigi);
size += fSts.Size() * sizeof(CbmStsDigi);
size += fMuch.Size() * sizeof(CbmMuchDigi);
size += fTrd.Size() * sizeof(CbmTrdDigi);
size += fTrd2d.Size() * sizeof(CbmTrdDigi);
size += fTof.Size() * sizeof(CbmTofDigi);
size += fPsd.Size() * sizeof(CbmPsdDigi);
size += fFsd.Size() * sizeof(CbmFsdDigi);
size += fRich.Size() * sizeof(CbmRichDigi);
return size;
}
}; };
BOOST_CLASS_VERSION(CbmDigiData, 5) BOOST_CLASS_VERSION(CbmDigiData, 5)
......
...@@ -172,6 +172,8 @@ int main(int argc, char** argv) ...@@ -172,6 +172,8 @@ int main(int argc, char** argv)
fles::TimesliceAutoSource source(opts.InputLocator()); fles::TimesliceAutoSource source(opts.InputLocator());
ProcessingExtraMonitor extraMonitor;
std::optional<RecoResultsOutputArchive> archive; std::optional<RecoResultsOutputArchive> archive;
if (!opts.OutputFile().empty()) { if (!opts.OutputFile().empty()) {
L_(info) << "Writing results to file: " << opts.OutputFile(); L_(info) << "Writing results to file: " << opts.OutputFile();
...@@ -186,16 +188,26 @@ int main(int argc, char** argv) ...@@ -186,16 +188,26 @@ int main(int argc, char** argv)
int num_ts = opts.NumTimeslices(); int num_ts = opts.NumTimeslices();
if (num_ts > 0) num_ts += opts.SkipTimeslices(); if (num_ts > 0) num_ts += opts.SkipTimeslices();
L_(debug) << "Starting to fetch timeslices from source..."; L_(debug) << "Starting to fetch timeslices from source...";
auto startFetchTS = std::chrono::high_resolution_clock::now();
while (auto ts = source.get()) { while (auto ts = source.get()) {
if (tsIdx < opts.SkipTimeslices()) { if (tsIdx < opts.SkipTimeslices()) {
tsIdx++; tsIdx++;
continue; continue;
} }
auto endFetchTS = std::chrono::high_resolution_clock::now();
auto durationFetchTS = endFetchTS - startFetchTS;
extraMonitor.timeIdle +=
std::chrono::duration_cast<std::chrono::duration<double, std::milli>>(durationFetchTS).count();
try { try {
RecoResults result = reco.Run(*ts); RecoResults result = reco.Run(*ts);
if (archive) { if (archive) {
xpu::scoped_timer t_{"Write Archive", &extraMonitor.timeWriteArchive};
auto storable = makeStorableRecoResults(*ts, result); auto storable = makeStorableRecoResults(*ts, result);
extraMonitor.bytesWritten = storable->SizeBytes();
xpu::t_add_bytes(extraMonitor.bytesWritten);
archive->put(storable); archive->put(storable);
} }
} }
...@@ -203,6 +215,7 @@ int main(int argc, char** argv) ...@@ -203,6 +215,7 @@ int main(int argc, char** argv)
// TODO: Add flag if we want to abort on exception or continue with next timeslice // TODO: Add flag if we want to abort on exception or continue with next timeslice
L_(error) << "Caught ProcessingError while processing timeslice " << tsIdx << ": " << e.what(); L_(error) << "Caught ProcessingError while processing timeslice " << tsIdx << ": " << e.what();
} }
reco.QueueProcessingExtraMetrics(extraMonitor);
// Release memory after each timeslice and log memory usage // Release memory after each timeslice and log memory usage
// This is useful to detect memory leaks as the memory usage should be constant between timeslices // This is useful to detect memory leaks as the memory usage should be constant between timeslices
...@@ -212,6 +225,8 @@ int main(int argc, char** argv) ...@@ -212,6 +225,8 @@ int main(int argc, char** argv)
tsIdx++; tsIdx++;
if (num_ts > 0 && tsIdx >= num_ts) break; if (num_ts > 0 && tsIdx >= num_ts) break;
startFetchTS = std::chrono::high_resolution_clock::now();
} }
if (archive) archive->end_stream(); if (archive) archive->end_stream();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment