Aggregation Pipeline
Retrieve aggregated information from the database. The document
in the payload must include a specification
array of documents which correspond to the match
, lookup
... specifications for the aggregation pipeline operation (stage). The matching documents will be returned in a results
array in the response.
Data models that represents the payload to be submitted to the service for retrieving aggregated information.
//
// Created by Rakesh on 19/12/2024.
//
#pragma once
#include "../../options/find.hpp"
#include "action.hpp"
namespace spt::mongoservice::api::model::request
{
struct Pipeline
{
struct Document
{
struct Stage
{
Stage( std::string_view command, bsoncxx::types::bson_value::value&& value ) :
command{ command }, value{ std::forward<bsoncxx::types::bson_value::value>( value ) } {}
Stage( std::string_view command, bsoncxx::document::value&& value ) :
command{ command }, value{ std::forward<bsoncxx::document::value>( value ) } {}
Stage( std::string_view command, bsoncxx::document::view value ) :
command{ command }, value{ value } {}
Stage() = default;
~Stage() = default;
Stage(Stage&&) = default;
Stage& operator=(Stage&&) = default;
Stage(const Stage&) = delete;
Stage& operator=(const Stage&) = delete;
BEGIN_VISITABLES(Stage);
std::string command;
std::optional<bsoncxx::types::bson_value::value> value;
END_VISITABLES;
};
Document() = default;
~Document() = default;
Document(Document&&) = default;
Document& operator=(Document&&) = default;
Document(const Document&) = delete;
Document& operator=(const Document&) = delete;
BEGIN_VISITABLES(Document);
VISITABLE(std::vector<Stage>, specification);
END_VISITABLES;
};
Pipeline() = default;
~Pipeline() = default;
Pipeline(Pipeline&&) = default;
Pipeline& operator=(Pipeline&&) = default;
Pipeline(const Pipeline&) = delete;
Pipeline& operator=(const Pipeline&) = delete;
void addStage( std::string_view command, bsoncxx::types::bson_value::value&& value )
{
document.specification.emplace_back( command, std::forward<bsoncxx::types::bson_value::value>( value ) );
}
void addStage( std::string_view command, bsoncxx::document::value&& value )
{
document.specification.emplace_back( command, std::forward<bsoncxx::document::value>( value ) );
}
template <util::Visitable Model>
void addStage( std::string_view command, const Model& model )
{
document.specification.emplace_back( command, util::bson( model ) );
}
BEGIN_VISITABLES(Pipeline);
VISITABLE(Document, document);
VISITABLE(std::optional<options::Find>, options);
VISITABLE(std::string, database);
VISITABLE(std::string, collection);
VISITABLE(std::string, application);
VISITABLE(std::string, correlationId);
VISITABLE_DIRECT_INIT(Action, action, {Action::pipeline});
VISITABLE_DIRECT_INIT(bool, skipMetric, {false});
END_VISITABLES;
};
}
Data models that represents the payloads the service responds with when returning matching aggregated information.
//
// Retrieved by Rakesh on 17/12/2024.
//
#pragma once
#if defined __has_include
#if __has_include("../../../common/visit_struct/visit_struct_intrusive.hpp")
#include "../../../common/visit_struct/visit_struct_intrusive.hpp"
#include "../../../common/util/serialise.hpp"
#else
#include <mongo-service/common/visit_struct/visit_struct_intrusive.hpp>
#include <mongo-service/common/util/serialise.hpp>
#endif
#endif
namespace spt::mongoservice::api::model::response
{
template <util::Visitable Document>
requires std::is_same_v<decltype(Document::id), bsoncxx::oid>
struct Retrieve
{
explicit Retrieve( bsoncxx::document::view document ) { util::unmarshall( *this, document ); }
Retrieve() = default;
~Retrieve() = default;
Retrieve(Retrieve&&) = default;
Retrieve& operator=(Retrieve&&) = default;
Retrieve(const Retrieve&) = delete;
Retrieve& operator=(const Retrieve&) = delete;
BEGIN_VISITABLES(Retrieve);
VISITABLE(std::optional<Document>, result);
VISITABLE(std::vector<Document>, results);
END_VISITABLES;
};
}
Sample code illustrating the pipeline action.
#include <mongo-service/api/repository/repository.hpp>
#include <log/NanoLog.hpp>
namespace example
{
struct Document
{
explicit Document( bsoncxx::document::view bson ) { spt::util::unmarshall( *this, bson ); }
Document() = default;
~Document() = default;
Document(Document&&) = default;
Document& operator=(Document&&) = default;
bool operator==(const Document&) const = default;
Document(const Document&) = delete;
Document& operator=(const Document&) = delete;
BEGIN_VISITABLES(Document);
VISITABLE(bsoncxx::oid, id);
VISITABLE(std::string, str);
VISITABLE_DIRECT_INIT(spt::util::DateTimeMs, created, {std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )});
VISITABLE_DIRECT_INIT(int64_t, integer, {5});
VISITABLE_DIRECT_INIT(double, floating, {10.345});
VISITABLE_DIRECT_INIT(bool, boolean, {true});
END_VISITABLES;
};
}
int main()
{
using namespace spt::mongoservice::api;
using bsoncxx::builder::stream::array;
using bsoncxx::builder::stream::open_array;
using bsoncxx::builder::stream::close_array;
using bsoncxx::builder::stream::open_document;
using bsoncxx::builder::stream::close_document;
auto pipeline = model::request::Pipeline{};
pipeline.database = "test";
pipeline.collection = "test";
pipeline.addStage( "$match", document{} << "str" << "value modified update" << finalize );
pipeline.addStage( "$sort", document{} << "_id" << -1 << finalize );
pipeline.addStage( "$limit", 1 );
pipeline.options = options::Find{};
pipeline.options->maxTime = std::chrono::milliseconds{ 500 };
pipeline.options->limit = 10000;
auto result = repository::pipeline<example::Document>( pipeline );
if ( !result.has_value() )
{
LOG_WARN << "Error retrieving documents using pipeline. " << magic_enum::enum_name( result.error().cause ) << ". " << result.error().message;
}
}
Last modified: 18 February 2025