Skip to content
Snippets Groups Projects
Commit e78213cf authored by Felix Weiglhofer's avatar Felix Weiglhofer
Browse files

algo: Rework storage classes.

* Use `fles::InputArchive` / `fles::OutputArchive` to read / write archives
* Reuse `fles::ArchiveDescriptor` as archive header
* Removed `--split-output-per-ts` flag
* Update `RecoResultsDescriptor` to include timeslice-index and start time
* Added `--dump-archive` flag to quickly inspect written archives for basic debugging
* Removed ability to add wildcards in output filename e.g. for hostname (this should be handled by slurm-startscript)
parent c25124a0
No related branches found
No related tags found
1 merge request!1344algo: Rework storage classes.
Pipeline #24524 passed
...@@ -44,7 +44,6 @@ set(SRCS ...@@ -44,7 +44,6 @@ set(SRCS
detectors/trd2d/Unpack.cxx detectors/trd2d/Unpack.cxx
detectors/rich/ReadoutConfig.cxx detectors/rich/ReadoutConfig.cxx
detectors/rich/Unpack.cxx detectors/rich/Unpack.cxx
global/Archive.cxx
global/Reco.cxx global/Reco.cxx
qa/DigiEventQa.cxx qa/DigiEventQa.cxx
qa/Histo1D.cxx qa/Histo1D.cxx
...@@ -144,6 +143,9 @@ install( ...@@ -144,6 +143,9 @@ install(
base/RecoParams.h base/RecoParams.h
base/SubChain.h base/SubChain.h
global/Reco.h global/Reco.h
global/RecoResultsInputArchive.h
global/RecoResultsOutputArchive.h
global/StorableRecoResults.h
DESTINATION DESTINATION
include/ include/
) )
...@@ -63,8 +63,6 @@ Options::Options(int argc, char** argv) ...@@ -63,8 +63,6 @@ Options::Options(int argc, char** argv)
generic.add_options() generic.add_options()
("output,o", po::value(&fOutputFile)->default_value("")->value_name("<file>"), ("output,o", po::value(&fOutputFile)->default_value("")->value_name("<file>"),
"write results to file") "write results to file")
("split-output-per-ts,S", po::bool_switch(&fSplitOutputPerTS)->default_value(false),
"Write results to file per timeslice (resulting files are named <file>_<tsnr>). Requires -o.")
("device,d", po::value(&fDevice)->default_value("cpu")->value_name("<device>"), ("device,d", po::value(&fDevice)->default_value("cpu")->value_name("<device>"),
"select device (cpu, cuda0, cuda1, hip0, ...)") "select device (cpu, cuda0, cuda1, hip0, ...)")
("log-level,l", po::value(&fLogLevel)->default_value(info)->value_name("<level>"), ("log-level,l", po::value(&fLogLevel)->default_value(info)->value_name("<level>"),
...@@ -88,6 +86,8 @@ Options::Options(int argc, char** argv) ...@@ -88,6 +86,8 @@ Options::Options(int argc, char** argv)
"Set number of OpenMP threads (-1 = use OMP_NUM_THREADS environment variable)") "Set number of OpenMP threads (-1 = use OMP_NUM_THREADS environment variable)")
("times,t", po::bool_switch(&fCollectKernelTimes)->default_value(false), ("times,t", po::bool_switch(&fCollectKernelTimes)->default_value(false),
"print kernel times") "print kernel times")
("dump-archive", po::bool_switch(&fDumpArchive)->default_value(false),
"Dump archive content to stdout and exit. Provide archive with '-i'. (This is a hack to quick check archive content until we have proper tooling.)")
("help,h", ("help,h",
"produce help message") "produce help message")
; ;
......
...@@ -4,9 +4,6 @@ ...@@ -4,9 +4,6 @@
#ifndef CBM_ALGO_BASE_OPTIONS_H #ifndef CBM_ALGO_BASE_OPTIONS_H
#define CBM_ALGO_BASE_OPTIONS_H #define CBM_ALGO_BASE_OPTIONS_H
#include <boost/serialization/access.hpp>
#include <boost/serialization/version.hpp>
#include <set> #include <set>
#include <string> #include <string>
#include <vector> #include <vector>
...@@ -27,7 +24,6 @@ namespace cbm::algo ...@@ -27,7 +24,6 @@ namespace cbm::algo
fs::path ParamsDir() const { return fParamsDir; } fs::path ParamsDir() const { return fParamsDir; }
const std::string& InputLocator() const { return fInputLocator; } const std::string& InputLocator() const { return fInputLocator; }
fs::path OutputFile() const { return fOutputFile; } fs::path OutputFile() const { return fOutputFile; }
bool SplitOutputPerTS() const { return fSplitOutputPerTS; }
severity_level LogLevel() const { return fLogLevel; } severity_level LogLevel() const { return fLogLevel; }
fs::path LogFile() const { return fLogFile; } fs::path LogFile() const { return fLogFile; }
const std::string& Device() const { return fDevice; } const std::string& Device() const { return fDevice; }
...@@ -43,6 +39,7 @@ namespace cbm::algo ...@@ -43,6 +39,7 @@ namespace cbm::algo
return fNumOMPThreads > 0 ? std::make_optional(fNumOMPThreads) : std::nullopt; return fNumOMPThreads > 0 ? std::make_optional(fNumOMPThreads) : std::nullopt;
} }
const std::string& ChildId() const { return fChildId; } const std::string& ChildId() const { return fChildId; }
bool DumpArchive() const { return fDumpArchive; }
const std::vector<Step>& Steps() const { return fRecoSteps; } const std::vector<Step>& Steps() const { return fRecoSteps; }
bool HasStep(Step step) const { return std::find(fRecoSteps.begin(), fRecoSteps.end(), step) != fRecoSteps.end(); } bool HasStep(Step step) const { return std::find(fRecoSteps.begin(), fRecoSteps.end(), step) != fRecoSteps.end(); }
...@@ -63,11 +60,11 @@ namespace cbm::algo ...@@ -63,11 +60,11 @@ namespace cbm::algo
std::string fParamsDir; // TODO: can we make this a std::path? std::string fParamsDir; // TODO: can we make this a std::path?
std::string fInputLocator; std::string fInputLocator;
std::string fOutputFile; std::string fOutputFile;
bool fSplitOutputPerTS = false;
severity_level fLogLevel; severity_level fLogLevel;
std::string fLogFile; std::string fLogFile;
std::string fDevice; std::string fDevice;
std::string fMonitorUri; std::string fMonitorUri;
bool fDumpArchive = false;
bool fCollectKernelTimes = false; bool fCollectKernelTimes = false;
int fNumTimeslices = -1; int fNumTimeslices = -1;
int fSkipTimeslices = 0; int fSkipTimeslices = 0;
...@@ -76,28 +73,6 @@ namespace cbm::algo ...@@ -76,28 +73,6 @@ namespace cbm::algo
std::vector<RecoData> fOutputTypes; std::vector<RecoData> fOutputTypes;
std::vector<fles::Subsystem> fDetectors; std::vector<fles::Subsystem> fDetectors;
std::string fChildId = "00"; std::string fChildId = "00";
private: // serialization
friend class boost::serialization::access;
template<class Archive>
void serialize(Archive& ar, const unsigned int /*version*/)
{
ar& fParamsDir;
ar& fInputLocator;
ar& fOutputFile;
ar& fSplitOutputPerTS;
ar& fLogLevel;
ar& fLogFile;
ar& fDevice;
ar& fMonitorUri;
ar& fCollectKernelTimes;
ar& fNumTimeslices;
ar& fSkipTimeslices;
ar& fRecoSteps;
ar& fOutputTypes;
ar& fDetectors;
}
}; };
} // namespace cbm::algo } // namespace cbm::algo
......
/* Copyright (C) 2023 FIAS Frankfurt Institute for Advanced Studies, Frankfurt / Main
SPDX-License-Identifier: GPL-3.0-only
Authors: Felix Weiglhofer [committer] */
#include "Archive.h"
#include "CbmDigiEvent.h"
#include <boost/algorithm/string.hpp>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/serialization/vector.hpp>
#include <fstream>
#include <iomanip>
#include <sstream>
#include "log.hpp"
using namespace cbm::algo;
namespace ba = boost::archive;
Archive::Archive(fs::path file)
{
L_(info) << "Reading archive from " << file;
std::ifstream ifs(file.string(), std::ios::binary);
ba::binary_iarchive ia(ifs);
ia >> *this;
}
void Archive::Store(fs::path file, std::optional<size_t> tsId) const
{
file = makeFileName(file, tsId);
L_(info) << "Writing archive to " << file;
std::ofstream ofs(file.string(), std::ios::binary);
ba::binary_oarchive oa(ofs);
oa << *this;
}
fs::path Archive::makeFileName(fs::path file, std::optional<size_t> tsId) const
{
if (!tsId) return file;
// Shamelessly copied from fles::OutputArchiveSequence
if (file.string().find("%n") == std::string::npos) {
L_(warning)
<< "Archive file name does not contain %n wildcard (timeslice number). Appending id to file name instead.";
file += "_%n";
}
std::ostringstream number;
number << std::setw(4) << std::setfill('0') << *tsId;
return boost::replace_all_copy(file.string(), "%n", number.str());
}
/* Copyright (C) 2023 FIAS Frankfurt Institute for Advanced Studies, Frankfurt / Main
SPDX-License-Identifier: GPL-3.0-only
Authors: Felix Weiglhofer [committer] */
#ifndef CBM_ALGO_GLOBAL_ARCHIVE_H
#define CBM_ALGO_GLOBAL_ARCHIVE_H
#include "CbmDigiEvent.h"
#include "MicrosliceDescriptor.hpp"
#include <boost/serialization/access.hpp>
#include <boost/serialization/version.hpp>
#include <optional>
#include <vector>
#include "ArchiveDescriptor.h"
#include "compat/Filesystem.h"
namespace cbm::algo
{
class Archive {
public:
/**
* @brief Read Archive object from file.
*/
Archive(fs::path file);
/**
* @brief Construct empty archive.
*/
Archive(ArchiveDescriptor descriptor) : fDescriptor(descriptor) {}
~Archive() = default;
/**
* @brief Store Archive object to file.
*/
void Store(fs::path file, std::optional<size_t> tsId = {}) const;
const ArchiveDescriptor& Descriptor() const { return fDescriptor; }
std::vector<CbmDigiEvent>& TimesliceResults() { return fTimesliceResults; }
const std::vector<CbmDigiEvent>& TimesliceResults() const { return fTimesliceResults; }
private:
ArchiveDescriptor fDescriptor;
// TODO: Need a better storage class here, that can also store Hits, ...
// Will be changed with the rework of storage
std::vector<CbmDigiEvent> fTimesliceResults;
friend class boost::serialization::access;
template<class Archive>
void serialize(Archive& ar, const unsigned int /*version*/)
{
ar& fDescriptor;
ar& fTimesliceResults;
}
fs::path makeFileName(fs::path file, std::optional<size_t> tsId) const;
};
} // namespace cbm::algo
BOOST_CLASS_VERSION(cbm::algo::Archive, 1)
#endif
/* Copyright (C) 2023 FIAS Frankfurt Institute for Advanced Studies, Frankfurt / Main
SPDX-License-Identifier: GPL-3.0-only
Authors: Felix Weiglhofer [committer] */
#ifndef CBM_ALGO_GLOBAL_ARCHIVE_DESCRIPTOR_H
#define CBM_ALGO_GLOBAL_ARCHIVE_DESCRIPTOR_H
#include "System.hpp"
#include <boost/serialization/access.hpp>
#include <boost/serialization/version.hpp>
#include <chrono>
#include <string>
namespace cbm::algo
{
class ArchiveDescriptor {
public:
ArchiveDescriptor()
: fTimeCreated(std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()))
, fHostname(fles::system::current_hostname())
, fUsername(fles::system::current_username())
{
}
std::time_t TimeCreated() const { return fTimeCreated; }
const std::string& Hostname() const { return fHostname; }
const std::string& Username() const { return fUsername; }
private: // members
std::time_t fTimeCreated = std::time_t();
std::string fHostname;
std::string fUsername;
private: // serialization
friend class boost::serialization::access;
template<class Archive>
void serialize(Archive& ar, const unsigned int /*version*/)
{
ar& fTimeCreated;
ar& fHostname;
ar& fUsername;
}
};
} // namespace cbm::algo
#endif // CBM_ALGO_GLOBAL_ARCHIVE_DESCRIPTOR_H
/* Copyright (C) 2023 FIAS Frankfurt Institute for Advanced Studies, Frankfurt / Main
SPDX-License-Identifier: GPL-3.0-only
Authors: Felix Weiglhofer [committer] */
#ifndef CBM_ALGO_GLOBAL_RECORESULTSINPUTARCHIVE_H
#define CBM_ALGO_GLOBAL_RECORESULTSINPUTARCHIVE_H
#include <InputArchive.hpp>
#include "StorableRecoResults.h"
namespace cbm::algo
{
using RecoResultsInputArchive =
fles::InputArchive<StorableRecoResults, StorableRecoResults, fles::ArchiveType::RecoResultsArchive>;
} // namespace cbm::algo
#endif // CBM_ALGO_GLOBAL_RECORESULTSINPUTARCHIVE_H
/* Copyright (C) 2023 FIAS Frankfurt Institute for Advanced Studies, Frankfurt / Main
SPDX-License-Identifier: GPL-3.0-only
Authors: Felix Weiglhofer [committer] */
#ifndef CBM_ALGO_GLOBAL_RECO_RESULTS_OUTPUT_ARCHIVE_H
#define CBM_ALGO_GLOBAL_RECO_RESULTS_OUTPUT_ARCHIVE_H
#include <OutputArchive.hpp>
#include "StorableRecoResults.h"
namespace cbm::algo
{
using RecoResultsOutputArchive =
fles::OutputArchive<StorableRecoResults, StorableRecoResults, fles::ArchiveType::RecoResultsArchive>;
} // namespace cbm::algo
#endif // CBM_ALGO_GLOBAL_RECO_RESULTS_OUTPUT_ARCHIVE_H
/* Copyright (C) 2023 FIAS Frankfurt Institute for Advanced Studies, Frankfurt / Main
SPDX-License-Identifier: GPL-3.0-only
Authors: Felix Weiglhofer [committer] */
#ifndef CBM_ALGO_GLOBAL_STORABLE_RECO_RESULTS_H
#define CBM_ALGO_GLOBAL_STORABLE_RECO_RESULTS_H
#include "CbmDigiEvent.h"
#include <boost/serialization/access.hpp>
#include <boost/serialization/vector.hpp>
#include <cstdint>
namespace cbm::algo
{
class StorableRecoResults {
public:
/**
* @brief Default constructor (required by boost::serialization)
*/
StorableRecoResults() = default;
StorableRecoResults(uint64_t tsIndex, uint64_t tsStartTime) : fTsIndex(tsIndex), fTsStartTime(tsStartTime) {}
/**
* @brief Index of the timeslice during the run
*/
uint64_t TsIndex() const { return fTsIndex; }
/**
* @brief Start time of the timeslice
*/
uint64_t TsStartTime() const { return fTsStartTime; }
std::vector<CbmDigiEvent>& DigiEvents() { return fDigiEvents; }
const std::vector<CbmDigiEvent>& DigiEvents() const { return fDigiEvents; }
private:
uint64_t fTsIndex = UINT64_MAX;
uint64_t fTsStartTime = UINT64_MAX;
std::vector<CbmDigiEvent> fDigiEvents;
friend class boost::serialization::access;
template<class Archive>
void serialize(Archive& ar, const unsigned int version)
{
ar& fTsIndex;
ar& fTsStartTime;
ar& fDigiEvents;
}
};
} // namespace cbm::algo
#endif // CBM_ALGO_GLOBAL_STORABLE_RECO_RESULTS_H
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
# The included libraries provide the interface to the experiment data in timeslices # The included libraries provide the interface to the experiment data in timeslices
# both online and in timeslice archive (.tsa) files. # both online and in timeslice archive (.tsa) files.
set(FLESNET_VERSION 8a8b7cefd37b18cfd15a3c0c812ff652217e994e) # 2023-07-06 set(FLESNET_VERSION 0529b038b2c2c8d9b82580e0d080a6abc2ef199f) # 2023-07-18
set(FLESNET_SRC_URL "https://github.com/cbm-fles/flesnet") set(FLESNET_SRC_URL "https://github.com/cbm-fles/flesnet")
......
...@@ -10,14 +10,63 @@ ...@@ -10,14 +10,63 @@
#include <xpu/host.h> #include <xpu/host.h>
#include "Archive.h"
#include "BuildInfo.h" #include "BuildInfo.h"
#include "Options.h" #include "Options.h"
#include "Reco.h" #include "Reco.h"
#include "RecoResultsInputArchive.h"
#include "RecoResultsOutputArchive.h"
#include "compat/OpenMP.h" #include "compat/OpenMP.h"
using namespace cbm::algo; using namespace cbm::algo;
std::shared_ptr<StorableRecoResults> makeStorableRecoResults(const fles::Timeslice& ts, const RecoResults& results)
{
auto storable = std::make_shared<StorableRecoResults>(ts.index(), ts.start_time());
storable->DigiEvents().reserve(results.events.size());
for (const auto& digiEvent : results.events) {
storable->DigiEvents().emplace_back(digiEvent.ToStorable());
}
return storable;
}
bool dumpArchive(const Options& opts)
{
// Limit the number of events per timeslice to dump to avoid spamming the log
constexpr size_t DumpEventsPerTS = 10;
if (!opts.DumpArchive()) return false;
L_(info) << "Dumping archive: " << opts.InputLocator();
RecoResultsInputArchive archive(opts.InputLocator());
auto desc = archive.descriptor();
L_(info) << "Archive descriptor: ";
L_(info) << " - time_created: " << desc.time_created();
L_(info) << " - hostname: " << desc.hostname();
L_(info) << " - username: " << desc.username();
for (auto recoResults = archive.get(); !archive.eos(); recoResults = archive.get()) {
if (recoResults == nullptr) {
L_(error) << "Failed to read RecoResults from archive";
break;
}
size_t nEvents = recoResults->DigiEvents().size();
L_(info) << "TS " << recoResults->TsIndex() << " start: " << recoResults->TsStartTime() << " events: " << nEvents;
for (size_t i = 0; i < std::min(DumpEventsPerTS, nEvents); i++) {
const auto& digiEvent = recoResults->DigiEvents().at(i);
L_(info) << " - Event " << i << " number: " << digiEvent.fNumber << "; time: " << digiEvent.fTime
<< "; nStsDigis: " << digiEvent.fData.Size(ECbmModuleId::kSts);
}
if (nEvents > DumpEventsPerTS) L_(info) << "...";
}
return true;
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
Options opts {argc, argv}; Options opts {argc, argv};
...@@ -52,11 +101,19 @@ int main(int argc, char** argv) ...@@ -52,11 +101,19 @@ int main(int argc, char** argv)
} }
L_(info) << ss.str(); L_(info) << ss.str();
if (dumpArchive(opts)) return 0;
Reco reco; Reco reco;
reco.Init(opts); reco.Init(opts);
Archive archive(ArchiveDescriptor {}); // TODO: use opts.Detector() once detector flag is merged
fles::TimesliceAutoSource source {opts.InputLocator()}; fles::TimesliceAutoSource source {opts.InputLocator()};
std::optional<RecoResultsOutputArchive> archive;
if (!opts.OutputFile().empty()) {
L_(info) << "Writing results to file: " << opts.OutputFile();
archive.emplace(opts.OutputFile().string());
}
int tsIdx = 0; int tsIdx = 0;
int num_ts = opts.NumTimeslices(); int num_ts = opts.NumTimeslices();
if (num_ts > 0) { num_ts += opts.SkipTimeslices(); } if (num_ts > 0) { num_ts += opts.SkipTimeslices(); }
...@@ -67,24 +124,17 @@ int main(int argc, char** argv) ...@@ -67,24 +124,17 @@ int main(int argc, char** argv)
} }
auto result = reco.Run(*ts); auto result = reco.Run(*ts);
if (opts.SplitOutputPerTS() && !opts.OutputFile().empty()) { if (archive) {
Archive tsResults(ArchiveDescriptor {}); // TODO: use opts.Detector() once detector flag is merged auto storable = makeStorableRecoResults(*ts, result);
for (auto& event : result.events) { archive->put(storable);
tsResults.TimesliceResults().emplace_back(event.ToStorable());
}
tsResults.Store(opts.OutputFile(), ts->index());
}
else {
for (auto& event : result.events) {
archive.TimesliceResults().emplace_back(event.ToStorable());
}
} }
tsIdx++; tsIdx++;
if (num_ts > 0 && tsIdx >= num_ts) { break; } if (num_ts > 0 && tsIdx >= num_ts) { break; }
} }
if (!opts.SplitOutputPerTS() && !opts.OutputFile().empty()) archive.Store(opts.OutputFile()); if (archive) archive->end_stream();
reco.Finalize(); reco.Finalize();
return 0; return 0;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment