/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt
   SPDX-License-Identifier: GPL-3.0-only
   Authors: Dominik Smith [committer], Pierre-Alain Loizeau, Volker Friese */

#include "CbmDevEventSink.h"

// CBM headers
#include "CbmMQDefs.h"

#include "TimesliceMetaData.h"

// FAIRROOT headers
#include "FairMQProgOptions.h"  // device->fConfig
#include "FairRootFileSink.h"
#include "FairRootManager.h"
#include "FairRunOnline.h"

#include "BoostSerializer.h"

#include "RootSerializer.h"

// External packages
#include <boost/archive/binary_iarchive.hpp>
#include <boost/serialization/utility.hpp>

/// C++ headers
#include <thread>  // this_thread::sleep_for

#include <stdexcept>
#include <string>


using std::istringstream;
using std::string;
using std::vector;


struct InitTaskError : std::runtime_error {
  using std::runtime_error::runtime_error;
};


// -----   Destructor   -------------------------------------------------------
CbmDevEventSink::~CbmDevEventSink()
{

  // Close things properly if not already done
  if (!fFinishDone) Finish();

  // Clear and delete members
  if (fTsMetaData) delete fTsMetaData;
  if (fEventVec != nullptr) {
    fEventVec->clear();
    delete fEventVec;
  }
  if (fFairRun) delete fFairRun;
}
// ----------------------------------------------------------------------------


// -----   Initialize   -------------------------------------------------------
void CbmDevEventSink::InitTask()
try {

  // Read options from executable
  LOG(info) << "Init options for CbmDevEventSink";
  string outputFileName       = fConfig->GetValue<std::string>("OutFileName");
  string channelNameDataInput = fConfig->GetValue<std::string>("ChannelNameDataInput");
  string channelNameCommands  = fConfig->GetValue<std::string>("ChannelNameCommands");

  // --- Hook action on input channels
  OnData(channelNameDataInput, &CbmDevEventSink::HandleData);
  OnData(channelNameCommands, &CbmDevEventSink::HandleCommand);

  // --- Prepare ROOT output
  // TODO: WE use FairRunOnline and FairRootManager to manage the output. There might be a more
  // elegant way.
  fTsMetaData = new TimesliceMetaData();
  fEventVec   = new vector<CbmDigiEvent>();
  if ("" != outputFileName) {
    fFairRun     = new FairRunOnline();
    fFairRootMgr = FairRootManager::Instance();
    fFairRootMgr->SetSink(new FairRootFileSink(outputFileName));
    if (nullptr == fFairRootMgr->GetOutFile()) throw InitTaskError("Could not open ROOT file");
  }
  else {
    throw InitTaskError("Empty output filename!");
  }
  fFairRootMgr->InitSink();
  fFairRootMgr->RegisterAny("TimesliceMetaData.", fTsMetaData, kTRUE);
  fFairRootMgr->RegisterAny("DigiEvent", fEventVec, kTRUE);
  fFairRootMgr->WriteFolder();
  LOG(info) << "Init ROOT Output to " << outputFileName;
}
catch (InitTaskError& e) {
  LOG(error) << e.what();
  cbm::mq::ChangeState(this, cbm::mq::Transition::ErrorFound);
}
// ----------------------------------------------------------------------------


// -----   Finish execution   -------------------------------------------------
void CbmDevEventSink::Finish()
{
  fFairRootMgr->Write();
  fFairRootMgr->CloseSink();
  LOG(info) << "File closed after " << fNumMessages << " and saving " << fNumTs << " TS";
  LOG(info) << "Index of last processed timeslice: " << fPrevTsIndex, ChangeState(fair::mq::Transition::Stop);
  std::this_thread::sleep_for(std::chrono::milliseconds(3000));
  ChangeState(fair::mq::Transition::End);
  fFinishDone = true;
}
// ----------------------------------------------------------------------------


// -----   Handle command message   -------------------------------------------
bool CbmDevEventSink::HandleCommand(FairMQMessagePtr& msg, int)
{
  // Deserialize command string
  string command;
  string msgStrCmd(static_cast<char*>(msg->GetData()), msg->GetSize());
  istringstream issCmd(msgStrCmd);
  boost::archive::binary_iarchive inputArchiveCmd(issCmd);
  inputArchiveCmd >> command;

  // Command tag is up to the first blank
  size_t charPosDel = command.find(' ');
  string type       = command.substr(0, charPosDel);

  // EOF command
  if (type == "EOF") {

    // The second substring should be the last timeslice index
    if (charPosDel == string::npos) {
      LOG(error) << "HandleCommand: Incomplete EOF command " << command;
      return false;
    }
    charPosDel++;
    string rest = command.substr(charPosDel);
    charPosDel  = rest.find(' ');
    if (charPosDel == string::npos) {
      LOG(error) << "HandleCommand: Incomplete EOF command " << command;
      return false;
    }
    uint64_t lastTsIndex = std::stoul(rest.substr(0, charPosDel));

    // The third substring should be the timeslice count
    charPosDel++;
    uint64_t numTs = std::stoul(rest.substr(charPosDel));

    // Log
    LOG(info) << "HandleCommand: Received EOF command with final TS index " << lastTsIndex << " and total number of TS "
              << numTs;
    Finish();
  }  //? EOF

  // STOP command
  else if (type == "STOP") {
    LOG(info) << "HandleCommand: Received STOP command";
    Finish();
  }

  // Unknown command
  else {
    LOG(warning) << "HandleCommand: Unknown command " << type << " => will be ignored!";
  }

  return true;
}
// ----------------------------------------------------------------------------


// -----   Handle data in input channel   -------------------------------------
bool CbmDevEventSink::HandleData(FairMQParts& parts, int)
{
  fNumMessages++;
  LOG(debug) << "Received message number " << fNumMessages << " with " << parts.Size() << " parts"
             << ", size0: " << parts.At(0)->GetSize();
  if (0 == fNumMessages % 10000) LOG(info) << "Received " << fNumMessages << " messages";

  // --- Extract TimesliceMetaData (part 0)    TObject* tempObjectPointer = nullptr;
  TObject* tempObjectPointer = nullptr;
  RootSerializer().Deserialize(*parts.At(0), tempObjectPointer);
  if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("TimesliceMetaData")) {
    (*fTsMetaData) = *(static_cast<TimesliceMetaData*>(tempObjectPointer));
  }
  else {
    LOG(fatal) << "Failed to deserialize the TS metadata";
  }

  // --- Extract event vector (part 1)
  std::string msgStrEvt(static_cast<char*>(parts.At(1)->GetData()), (parts.At(1))->GetSize());
  std::istringstream issEvt(msgStrEvt);
  boost::archive::binary_iarchive inputArchiveEvt(issEvt);
  inputArchiveEvt >> (*fEventVec);

  // --- Dump tree entry for this timeslice
  fFairRootMgr->StoreWriteoutBufferData(fFairRootMgr->GetEventTime());
  fFairRootMgr->Fill();
  fFairRootMgr->DeleteOldWriteoutBufferData();
  fEventVec->clear();

  // --- Timeslice log
  LOG(info) << "Processed TS " << fTsMetaData->GetIndex() << " with " << fEventVec->size() << " events";

  return true;
}
// ----------------------------------------------------------------------------