Rakesh Vidyadharan Help

Aggregation Pipeline

Retrieve aggregated information from the database. The document in the payload must include a specification array of documents which correspond to the match, lookup... specifications for the aggregation pipeline operation (stage). The matching documents will be returned in a results array in the response.

Data models that represents the payload to be submitted to the service for retrieving aggregated information.

// // Created by Rakesh on 19/12/2024. // #pragma once #include "../../options/find.hpp" #include "action.hpp" namespace spt::mongoservice::api::model::request { struct Pipeline { struct Document { struct Stage { Stage( std::string_view command, bsoncxx::types::bson_value::value&& value ) : command{ command }, value{ std::forward<bsoncxx::types::bson_value::value>( value ) } {} Stage( std::string_view command, bsoncxx::document::value&& value ) : command{ command }, value{ std::forward<bsoncxx::document::value>( value ) } {} Stage( std::string_view command, bsoncxx::document::view value ) : command{ command }, value{ value } {} Stage() = default; ~Stage() = default; Stage(Stage&&) = default; Stage& operator=(Stage&&) = default; Stage(const Stage&) = delete; Stage& operator=(const Stage&) = delete; BEGIN_VISITABLES(Stage); std::string command; std::optional<bsoncxx::types::bson_value::value> value; END_VISITABLES; }; Document() = default; ~Document() = default; Document(Document&&) = default; Document& operator=(Document&&) = default; Document(const Document&) = delete; Document& operator=(const Document&) = delete; BEGIN_VISITABLES(Document); VISITABLE(std::vector<Stage>, specification); END_VISITABLES; }; Pipeline() = default; ~Pipeline() = default; Pipeline(Pipeline&&) = default; Pipeline& operator=(Pipeline&&) = default; Pipeline(const Pipeline&) = delete; Pipeline& operator=(const Pipeline&) = delete; void addStage( std::string_view command, bsoncxx::types::bson_value::value&& value ) { document.specification.emplace_back( command, std::forward<bsoncxx::types::bson_value::value>( value ) ); } void addStage( std::string_view command, bsoncxx::document::value&& value ) { document.specification.emplace_back( command, std::forward<bsoncxx::document::value>( value ) ); } template <util::Visitable Model> void addStage( std::string_view command, const Model& model ) { document.specification.emplace_back( command, util::bson( model ) ); } BEGIN_VISITABLES(Pipeline); VISITABLE(Document, document); VISITABLE(std::optional<options::Find>, options); VISITABLE(std::string, database); VISITABLE(std::string, collection); VISITABLE(std::string, application); VISITABLE(std::string, correlationId); VISITABLE_DIRECT_INIT(Action, action, {Action::pipeline}); VISITABLE_DIRECT_INIT(bool, skipMetric, {false}); END_VISITABLES; }; }

Data models that represents the payloads the service responds with when returning matching aggregated information.

// // Retrieved by Rakesh on 17/12/2024. // #pragma once #if defined __has_include #if __has_include("../../../common/visit_struct/visit_struct_intrusive.hpp") #include "../../../common/visit_struct/visit_struct_intrusive.hpp" #include "../../../common/util/serialise.hpp" #else #include <mongo-service/common/visit_struct/visit_struct_intrusive.hpp> #include <mongo-service/common/util/serialise.hpp> #endif #endif namespace spt::mongoservice::api::model::response { template <util::Visitable Document> requires std::is_same_v<decltype(Document::id), bsoncxx::oid> struct Retrieve { explicit Retrieve( bsoncxx::document::view document ) { util::unmarshall( *this, document ); } Retrieve() = default; ~Retrieve() = default; Retrieve(Retrieve&&) = default; Retrieve& operator=(Retrieve&&) = default; Retrieve(const Retrieve&) = delete; Retrieve& operator=(const Retrieve&) = delete; BEGIN_VISITABLES(Retrieve); VISITABLE(std::optional<Document>, result); VISITABLE(std::vector<Document>, results); END_VISITABLES; }; }

Sample code illustrating the pipeline action.

#include <mongo-service/api/repository/repository.hpp> #include <log/NanoLog.hpp> namespace example { struct Document { explicit Document( bsoncxx::document::view bson ) { spt::util::unmarshall( *this, bson ); } Document() = default; ~Document() = default; Document(Document&&) = default; Document& operator=(Document&&) = default; bool operator==(const Document&) const = default; Document(const Document&) = delete; Document& operator=(const Document&) = delete; BEGIN_VISITABLES(Document); VISITABLE(bsoncxx::oid, id); VISITABLE(std::string, str); VISITABLE_DIRECT_INIT(spt::util::DateTimeMs, created, {std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )}); VISITABLE_DIRECT_INIT(int64_t, integer, {5}); VISITABLE_DIRECT_INIT(double, floating, {10.345}); VISITABLE_DIRECT_INIT(bool, boolean, {true}); END_VISITABLES; }; } int main() { using namespace spt::mongoservice::api; using bsoncxx::builder::stream::array; using bsoncxx::builder::stream::open_array; using bsoncxx::builder::stream::close_array; using bsoncxx::builder::stream::open_document; using bsoncxx::builder::stream::close_document; auto pipeline = model::request::Pipeline{}; pipeline.database = "test"; pipeline.collection = "test"; pipeline.addStage( "$match", document{} << "str" << "value modified update" << finalize ); pipeline.addStage( "$sort", document{} << "_id" << -1 << finalize ); pipeline.addStage( "$limit", 1 ); pipeline.options = options::Find{}; pipeline.options->maxTime = std::chrono::milliseconds{ 500 }; pipeline.options->limit = 10000; auto result = repository::pipeline<example::Document>( pipeline ); if ( !result.has_value() ) { LOG_WARN << "Error retrieving documents using pipeline. " << magic_enum::enum_name( result.error().cause ) << ". " << result.error().message; } }
Last modified: 18 February 2025