/******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** * ParameterMQServer.cxx * * @since 2015-10-26 * @author M. Al-Turany, A. Rybalchenko */ #include "ParameterMQServer.h" #include "CbmMQDefs.h" #include "CbmSetup.h" #include "FairMQLogger.h" #include "FairMQProgOptions.h" #include "FairParAsciiFileIo.h" #include "FairParGenericSet.h" #include "FairParRootFileIo.h" #include "FairRuntimeDb.h" #include "Rtypes.h" #include "TGeoManager.h" #include "TList.h" #include "TMessage.h" #include "TObjString.h" #include "TSystem.h" using namespace std; ParameterMQServer::ParameterMQServer() : fRtdb(FairRuntimeDb::instance()) { } void ParameterMQServer::InitTask() { string loadLibs = fConfig->GetValue<string>("libs-to-load"); if (loadLibs.length() > 0) { LOG(info) << "There are libraries to load."; if (loadLibs.find(";") != std::string::npos) { LOG(info) << "There are several libraries to load"; istringstream f(loadLibs); string s; while (getline(f, s, ';')) { LOG(info) << "Load library " << s; gSystem->Load(s.c_str()); } } else { LOG(info) << "Load library " << loadLibs; gSystem->Load(loadLibs.c_str()); } } else { LOG(info) << "There are no libraries to load."; } fFirstInputName = fConfig->GetValue<string>("first-input-name"); fFirstInputType = fConfig->GetValue<string>("first-input-type"); fSecondInputName = fConfig->GetValue<string>("second-input-name"); fSecondInputType = fConfig->GetValue<string>("second-input-type"); fOutputName = fConfig->GetValue<string>("output-name"); fOutputType = fConfig->GetValue<string>("output-type"); fChannelName = fConfig->GetValue<string>("channel-name"); fsSetupName = fConfig->GetValue<std::string>("setup"); LOG(info) << "Using setup: " << fsSetupName; if (fRtdb != 0) { // Set first input if (fFirstInputType == "ROOT") { FairParRootFileIo* par1R = new FairParRootFileIo(); par1R->open(fFirstInputName.data(), "UPDATE"); fRtdb->setFirstInput(par1R); } else if (fFirstInputType == "ASCII") { FairParAsciiFileIo* par1A = new FairParAsciiFileIo(); if (fFirstInputName.find(";") != std::string::npos) { LOG(info) << "File list found!"; TList* parFileList = new TList(); TObjString* parFile(NULL); istringstream f(fFirstInputName); string s; while (getline(f, s, ';')) { LOG(info) << "File: " << s; parFile = new TObjString(s.c_str()); parFileList->Add(parFile); par1A->open(parFileList, "in"); } } else { LOG(info) << "Single input file found!"; par1A->open(fFirstInputName.data(), "in"); } fRtdb->setFirstInput(par1A); } // Set second input if (fSecondInputName != "") { if (fSecondInputType == "ROOT") { FairParRootFileIo* par2R = new FairParRootFileIo(); par2R->open(fSecondInputName.data(), "UPDATE"); fRtdb->setSecondInput(par2R); } else if (fSecondInputType == "ASCII") { FairParAsciiFileIo* par2A = new FairParAsciiFileIo(); if (fSecondInputName.find(";") != std::string::npos) { LOG(info) << "File list found!"; TList* parFileList = new TList(); TObjString* parFile(NULL); istringstream f(fSecondInputName); string s; while (getline(f, s, ';')) { LOG(info) << "File: " << s; parFile = new TObjString(s.c_str()); parFileList->Add(parFile); par2A->open(parFileList, "in"); } } else { LOG(info) << "Single input file found!"; par2A->open(fFirstInputName.data(), "in"); } fRtdb->setSecondInput(par2A); } } // Set output if (fOutputName != "") { if (fOutputType == "ROOT") { FairParRootFileIo* parOut = new FairParRootFileIo(kTRUE); parOut->open(fOutputName.data()); fRtdb->setOutput(parOut); } fRtdb->saveOutput(); } } fRtdb->print(); // ----- CbmSetup ----------------------------------------------------- if ("" != fsSetupName) { fSetup = CbmSetup::Instance(); fSetup->LoadSetup(fsSetupName.data()); } // ------------------------------------------------------------------------ } void ParameterMQServer::Run() { string parameterName = ""; FairParGenericSet* par = nullptr; while (cbm::mq::CheckCurrentState(this, cbm::mq::State::Running)) { FairMQMessagePtr req(NewMessage()); if (Receive(req, fChannelName, 0) > 0) { string reqStr(static_cast<char*>(req->GetData()), req->GetSize()); LOG(info) << "Received parameter request from client: \"" << reqStr << "\""; if ("setup" == reqStr) { // TODO: support for multiple setups on Par Server? with request containing setup name? if ("" != fsSetupName && fSetup) { /// Prepare serialized versions of the CbmSetup CbmSetupStorable exchangableSetup(fSetup); TMessage* tmsg = new TMessage(kMESS_OBJECT); tmsg->WriteObject(&exchangableSetup); FairMQMessagePtr rep(NewMessage( tmsg->Buffer(), tmsg->BufferSize(), [](void* /*data*/, void* object) { delete static_cast<TMessage*>(object); }, tmsg)); if (Send(rep, fChannelName, 0) < 0) { LOG(error) << "failed sending reply to Setup request"; break; } } else { LOG(error) << "CbmSetup uninitialized!"; // Send an empty message back to keep the REQ/REP cycle FairMQMessagePtr rep(NewMessage()); if (Send(rep, fChannelName, 0) < 0) { LOG(error) << "failed sending reply to Setup request"; break; } } } else { size_t pos = reqStr.rfind(","); string newParameterName = reqStr.substr(0, pos); int runId = stoi(reqStr.substr(pos + 1)); LOG(info) << "Parameter name: " << newParameterName; LOG(info) << "Run ID: " << runId; LOG(info) << "Retrieving parameter..."; // Check if the parameter name has changed to avoid getting same container repeatedly if (newParameterName != parameterName) { parameterName = newParameterName; par = static_cast<FairParGenericSet*>(fRtdb->getContainer(parameterName.c_str())); } LOG(info) << "Retrieving parameter...Done"; if (-1 != runId) { fRtdb->initContainers(runId); } LOG(info) << "Sending following parameter to the client:"; if (par) { par->print(); TMessage* tmsg = new TMessage(kMESS_OBJECT); tmsg->WriteObject(par); FairMQMessagePtr rep(NewMessage( tmsg->Buffer(), tmsg->BufferSize(), [](void* /*data*/, void* object) { delete static_cast<TMessage*>(object); }, tmsg)); if (Send(rep, fChannelName, 0) < 0) { LOG(error) << "failed sending reply"; break; } } else { LOG(error) << "Parameter uninitialized!"; // Send an empty message back to keep the REQ/REP cycle FairMQMessagePtr rep(NewMessage()); if (Send(rep, fChannelName, 0) < 0) { LOG(error) << "failed sending reply"; break; } } } } } } ParameterMQServer::~ParameterMQServer() { if (gGeoManager) { gGeoManager->GetListOfVolumes()->Delete(); gGeoManager->GetListOfShapes()->Delete(); } delete fRtdb; }