Rakesh Vidyadharan Help

Application Performance Management

A variety of products, both commercial and opensource are available for Application Performance Management (APM). Most of the available products involve users instrumenting source code of applications to be managed. The instrumentation results in data being published to services provided by the service vendor.

A customer needed a solution which did not involve publishing management data to an external service. They also did not want an expensive commercial solution. After researching available options, I decided to roll out a very simple framework that used existing infrastructure (mainly in terms of database). The basic task of instrumenting source code remains unchanged in scope.

Infrastructure

The solution I developed stored APM data into QuestDB which the customer already used as their Time Series Database (TSDB). The primary mode of bulk data ingestion into QuestDB is via the Influx Line Protocol (ILP).

Implementation

A simple data structure was developed that matches the ILP format. Applications can be instrumented as desired using the simple data structure, and published to QuestDB over ILP.

Note: Most existing APM solutions use or support instrumentation using OpenTelemetry. However, I decided against using it and developed the simple data structure to make the task of ingestion into QuestDB seamless. This does limit options for future migration into another database/storage engine.

APM Record

The following structure is used to capture APM records. A parent APM record is created at the start of a business process (eg. HTTP request processing cycle, an operation that is performed by a daemon process etc.). Additional process records are added to the parent as appropriate to capture the functions that are invoked as part of the process implementation. The full record can then be serialised over ILP to a supported database.

// // Created by Rakesh on 07/03/2025. // #pragma once #include <chrono> #include <format> #include <map> #include <source_location> #include <string> #include <string_view> #include <variant> #include <vector> namespace spt::ilp { struct APMRecord { using DateTime = std::chrono::time_point<std::chrono::high_resolution_clock, std::chrono::nanoseconds>; using Value = std::variant<bool, int64_t, uint64_t, double, std::string>; struct Process { enum class Type : std::uint8_t { Function, Step, Other }; explicit Process( Type type ) : type( type ) {} Process() = default; ~Process() = default; Process( Process&& ) = default; Process& operator=( Process&& ) = default; Process( const Process& ) = delete; Process& operator=( const Process& ) = delete; std::map<std::string, std::string, std::less<>> tags; std::map<std::string, Value, std::less<>> values; DateTime timestamp{ std::chrono::high_resolution_clock::now() }; std::chrono::nanoseconds duration{ 0 }; Type type{ Type::Other }; }; explicit APMRecord( std::string_view id ) : id{ id } {} ~APMRecord() = default; APMRecord( APMRecord&& ) = default; APMRecord& operator=( APMRecord&& ) = default; APMRecord( const APMRecord& ) = delete; APMRecord& operator=( const APMRecord& ) = delete; std::vector<Process> processes; std::map<std::string, std::string, std::less<>> tags; std::map<std::string, Value, std::less<>> values; std::string id; std::string application; DateTime timestamp{ std::chrono::high_resolution_clock::now() }; std::chrono::nanoseconds duration{ 0 }; }; template <typename T> concept Record = requires( T t ) { std::is_same_v<decltype(t.timestamp), std::chrono::time_point<std::chrono::high_resolution_clock, std::chrono::nanoseconds>>; std::is_same_v<decltype(t.duration), std::chrono::nanoseconds>; }; APMRecord createAPMRecord( std::string_view id, std::string_view application, APMRecord::Process::Type type, std::size_t size, std::source_location loc = std::source_location::current() ); APMRecord::Process& addProcess( APMRecord& apm, APMRecord::Process::Type type, std::source_location loc = std::source_location::current() ); APMRecord::Process& addException( APMRecord& apm, const std::exception& ex, std::string_view prefix, std::source_location loc = std::source_location::current() ); template <Record T> void setDuration( T& record ) { record.duration = std::chrono::duration_cast<std::chrono::nanoseconds>( std::chrono::high_resolution_clock::now() - record.timestamp ); } template <Record T> void addCurrentFunction( T& record, const std::source_location loc = std::source_location::current(), std::string_view tagName = "caller" ) { record.values.try_emplace( std::format( "{}_file", tagName ), loc.file_name() ); record.values.try_emplace( std::format( "{}_line", tagName ), static_cast<uint64_t>( loc.line() ) ); if ( auto fn = std::string{ loc.function_name() }; !fn.empty() ) record.values.try_emplace( std::format( "{}_function", tagName ), std::move( fn ) ); } }
// // Created by Rakesh on 17/03/2025. // #include "apmrecord.hpp" using spt::ilp::APMRecord; APMRecord spt::ilp::createAPMRecord( std::string_view id, std::string_view application, APMRecord::Process::Type type, std::size_t size, const std::source_location loc ) { auto apm = APMRecord{ id }; apm.application = application; apm.processes.reserve( size ); apm.processes.emplace_back( type ); { auto& p = apm.processes.back(); p.values.try_emplace( "file", loc.file_name() ); p.values.try_emplace( "line", static_cast<uint64_t>( loc.line() ) ); if ( auto fn = std::string{ loc.function_name() }; !fn.empty() ) p.values.try_emplace( "function", std::move( fn ) ); } return apm; } APMRecord::Process& spt::ilp::addProcess( APMRecord& apm, APMRecord::Process::Type type, const std::source_location loc ) { apm.processes.emplace_back( type ); auto& p = apm.processes.back(); p.values.try_emplace( "file", loc.file_name() ); p.values.try_emplace( "line", static_cast<uint64_t>( loc.line() ) ); if ( auto fn = std::string{ loc.function_name() }; !fn.empty() ) p.values.try_emplace( "function", std::move( fn ) ); return p; } APMRecord::Process& spt::ilp::addException( APMRecord& apm, const std::exception& ex, std::string_view prefix, const std::source_location loc ) { apm.processes.emplace_back( APMRecord::Process::Type::Step ); auto& p = apm.processes.back(); p.values.try_emplace( "file", loc.file_name() ); p.values.try_emplace( "line", static_cast<uint64_t>( loc.line() ) ); if ( auto fn = std::string{ loc.function_name() }; !fn.empty() ) { p.values.try_emplace( "function", fn ); p.tags.try_emplace( "parent", std::move( fn ) ); } p.values.try_emplace( "error", std::format( "{}. {}", prefix, ex.what() ) ); return p; }

Utility functions to (de)serialise the full APM record from/to BSON. Note that we add a _ttl field that is used as a **TTL** index in MongoDB.

// // Created by Rakesh on 6/4/21. // #include <charconv> #include <bsoncxx/builder/stream/document.hpp> #include <http2/framework/common.hpp> #include <ilp/apmrecord.hpp> #include <log/NanoLog.hpp> #include <magic_enum/magic_enum.hpp> namespace spt::ilp::bson { template<class... Ts> struct overload : Ts... { using Ts::operator()...; }; void serialise( const std::map<std::string, spt::ilp::APMRecord::Value, std::less<>>& map, bsoncxx::builder::stream::document& builder, std::string_view property ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::finalize; auto values = document{}; for ( const auto& pair : map ) { std::visit( overload { [&values, &pair]( bool v ){ values << pair.first << v; }, [&values, &pair]( int64_t v ){ values << pair.first << v; }, [&values, &pair]( uint64_t v ){ values << pair.first << static_cast<int64_t>( v ); }, [&values, &pair]( double v ){ values << pair.first << v; }, [&values, &pair]( const std::string& v ){ values << pair.first << v; }, }, pair.second ); } builder << property << (values << finalize); } bsoncxx::document::value serialise( const spt::ilp::APMRecord::Process& process ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::open_document; using bsoncxx::builder::stream::close_document; using bsoncxx::builder::stream::finalize; auto doc = document{}; doc << "type" << magic_enum::enum_name( process.type ) << "timestamp" << bsoncxx::types::b_int64{ std::chrono::duration_cast<std::chrono::nanoseconds>( process.timestamp.time_since_epoch() ).count() } << "duration" << bsoncxx::types::b_int64{ apm.duration.count() } << "_ttl" << bsoncxx::types::b_date{ std::chrono::system_clock::now() }; auto tags = document{}; for ( const auto& [key, value] : process.tags ) tags << key << value; doc << "tags" << (tags << finalize); serialise( process.values, doc, "values" ); return doc << finalize; } bsoncxx::document::value serialise( const spt::ilp::APMRecord& apm ) { using bsoncxx::builder::stream::array; using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::finalize; auto doc = document{}; doc << "_id" << bsoncxx::oid{ apm.id } << "application" << apm.application << "timestamp" << bsoncxx::types::b_int64{ std::chrono::duration_cast<std::chrono::nanoseconds>( apm.timestamp.time_since_epoch() ).count() } << "duration" << bsoncxx::types::b_int64{ apm.duration.count() }; auto tags = document{}; for ( const auto& [key, value] : apm.tags ) tags << key << value; doc << "tags" << (tags << finalize); serialise( apm.values, doc, "values" ); auto arr = array{}; for ( const auto& proc : apm.processes ) arr << serialise( proc ); doc << "processes" << (arr << finalize); return doc << finalize; } template <typename Record> void parse( Record& r, bsoncxx::document::view bson ) { using spt::util::bsonValueIfExists; if ( auto tags = bsonValueIfExists<bsoncxx::document::view>( "tags", bson ); tags ) { for ( const auto& elem : *tags ) { r.tags.try_emplace( std::string{ elem.key() }, std::string{ elem.get_string().value } ); } } if ( auto values = bsonValueIfExists<bsoncxx::document::view>( "values", bson ); values ) { for ( const auto& elem : *values ) { switch ( elem.type() ) { using enum bsoncxx::type; case k_bool: r.values.try_emplace( std::string{ elem.key() }, elem.get_bool().value ); break; case k_int64: r.values.try_emplace( std::string{ elem.key() }, elem.get_int64().value ); break; case k_double: r.values.try_emplace( std::string{ elem.key() }, elem.get_double().value ); break; case k_string: r.values.try_emplace( std::string{ elem.key() }, std::string{ elem.get_string().value } ); break; default: LOG_CRIT << "Invalid type " << bsoncxx::to_string( elem.type() ) << " for key " << elem.key() << " in apm record"; break; } } } } spt::ilp::APMRecord::Process parseProcess( bsoncxx::document::view bson ) { using spt::util::bsonValue; using spt::util::bsonValueIfExists; auto p = spt::ilp::APMRecord::Process(); if ( auto t = magic_enum::enum_cast<spt::ilp::APMRecord::Process::Type>( bsonValue<std::string>( "type", bson ) ); t ) p.type = *t; if ( auto v = bsonValueIfExists<std::chrono::nanoseconds>( "timestamp", bson ); v ) p.timestamp = spt::ilp::APMRecord::DateTime{ *v }; if ( auto v = bsonValueIfExists<std::chrono::nanoseconds>( "duration", bson ); v ) p.duration = *v; parse( p, bson ); return p; } spt::ilp::APMRecord parse( bsoncxx::document::view bson ) { using spt::util::bsonValue; using spt::util::bsonValueIfExists; auto apm = spt::ilp::APMRecord{ bsonValue<bsoncxx::oid>( "_id", bson ).to_string() }; if ( auto str = bsonValueIfExists<std::string>( "application", bson ); str ) apm.application = *str; if ( auto v = bsonValueIfExists<std::chrono::nanoseconds>( "timestamp", bson ); v ) apm.timestamp = spt::ilp::APMRecord::DateTime{ *v }; if ( auto v = bsonValueIfExists<std::chrono::nanoseconds>( "duration", bson ); v ) apm.duration = *v; parse( apm, bson ); if ( auto procs = bsonValueIfExists<bsoncxx::array::view>( "processes", bson ); procs ) { apm.processes.reserve( std::distance( procs->begin(), procs->end() ) ); for ( const auto& item : *procs ) { apm.processes.emplace_back( parseProcess( item.get_document().value ) ); } } return apm; } }

Instrument Code

The toughest part of capturing APM data lies in instrumenting source code to capture APM data at the desired level of verbosity. There is obviously a trade-off in terms of the effort required to instrument source code, as well as the performance degradation involved in capturing APM data. Another big side effect of instrumenting source code is that the code base becomes littered with all the APM instrumentation instructions (macros despite being discouraged help a lot to reduce clutter).

Template functions for HTTP request handling introduced earlier in REST API Framework updated with instrumentation. The additional wrapper lines of code around key instruction and calls to other functions (internal and external) can be noticed.

// // Created by Rakesh on 15/02/2021. // #pragma once #include "common.hpp" #include "handlers.hpp" #include "db/repository.hpp" #include "db/storage.hpp" #include "db/filter/between.hpp" #include "model/entities.hpp" #include "model/entitiesquery.hpp" #include "model/paginate.hpp" #include "util/config.hpp" #include "validate/validate.hpp" #include <sstream> #include <bsoncxx/exception/exception.hpp> #include <log/NanoLog.hpp> #include <http2/framework/compress.hpp> #include <mongo-service/common/util/bson.hpp> #include <mongo-service/common/util/date.hpp> namespace spt::http { using std::operator""s; using std::operator""sv; using model::Model; namespace ptemplate { template <Model M> bool canRead( const M& m, const model::JwtToken& token ) { if ( !hasEntityAccess( m.EntityType(), token ) ) return false; if ( token.user.role == model::Role::superuser ) return true; if ( !m.customer ) return false; return m.customer.code == token.user.customerCode; } } template <Model M, typename AuthFunction> auto create( const spt::http2::framework::Request& req, std::string_view payload, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn, bool skipVersion = false ) -> Response { LOG_INFO << "Handling POST request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "Create entity {}", M::EntityType() ) ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); if ( payload.empty() ) { LOG_WARN << "Request to " << req.path << " did not include payload. APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Empty payload" ); return error( 400, "No payload"sv, methods, req.header, apm ); } try { WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Authorisation failed" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } auto& parse = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); DEFER( spt::ilp::setDuration( parse ) ); spt::ilp::addCurrentFunction( parse ); parse.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, "Parse JSON payload" ); auto m = M{ payload }; spt::ilp::setDuration( parse ); if ( m.id != model::DEFAULT_OID ) { LOG_WARN << "Create entity payload included id. " << payload << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Id specified" ); return error( 400, "Cannot specify id"sv, methods, req.header, apm ); } WRAP_CODE_LINE( auto [vstatus, vmesg] = validate::validate( m, *jwt, apm ); ) if ( vstatus != 200 ) { LOG_WARN << "Validation failed. " << vmesg << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, model::ilp::value::APM_VALIDATION_ERROR ); return error( vstatus, vmesg, methods, req.header, apm ); } if ( m.id == model::DEFAULT_OID ) m.id = bsoncxx::oid{}; cp.values.try_emplace( "entity_id", m.id.to_string() ); WRAP_CODE_LINE( auto status = db::create( m, apm, skipVersion ); ) if ( status != 200 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( status, "Error creating entity"sv, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const simdjson::simdjson_error& e ) { LOG_WARN << "JSON parse error processing " << req.path << ". " << e.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, e, "JSON exception" ); return error( 400, "Error parsing payload"sv, methods, req.header, apm ); } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename AuthFunction> auto at( const spt::http2::framework::Request& req, std::string_view timestamp, std::string_view payload, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn, bool skipVersion = false ) -> Response { LOG_INFO << "Handling POST request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "Create entity {} at time", M::EntityType() ) ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); if ( payload.empty() ) { LOG_WARN << "Request to " << req.path << " did not include payload. APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Empty payload" ); return error( 400, "No payload"sv, methods, req.header, apm ); } if ( timestamp.empty() ) { LOG_WARN << "Request to " << req.path << " did not include timestamp. APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Timestamp not in payload" ); return error( 400, "No timestamp"sv, methods, req.header, apm ); } try { spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); apm.processes.back().values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "Parse timestamp" ); spt::ilp::addCurrentFunction( apm.processes.back() ); auto ts = spt::util::parseISO8601( timestamp ); spt::ilp::setDuration( apm.processes.back() ); if ( !ts.has_value() ) { LOG_WARN << "Invalid timestamp: " << timestamp << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid timestamp in payload" ); return error( 400, "Invalid timestamp"sv, methods, req.header, apm ); } WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } auto& parse = addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); spt::ilp::addCurrentFunction( parse ); parse.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, "Parse JSON payload" ); DEFER( spt::ilp::setDuration( parse ) ); auto m = M{ payload }; spt::ilp::setDuration( apm ); if ( m.id != model::DEFAULT_OID ) { LOG_WARN << "Create entity payload included id. " << payload << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Payload included id" ); return error( 400, "Cannot specify id"sv, methods, req.header, apm ); } WRAP_CODE_LINE( auto [vstatus, vmesg] = validate::validate( m, *jwt, apm ); ) if ( vstatus != 200 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Validation failed" ); return error( vstatus, vmesg, methods, req.header, apm ); } { spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); apm.processes.back().values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "Generate id" ); spt::ilp::addCurrentFunction( apm.processes.back() ); m.id = spt::util::generateId( *ts, bsoncxx::oid{} ); auto count = 0; while ( count < 10 ) { if ( const auto [_, opt] = db::retrieve<M>( m.id, ""sv ); opt ) { m.id = spt::util::generateId( *ts, bsoncxx::oid{} ); ++count; } else break; } spt::ilp::setDuration( apm.processes.back() ); } cp.values.try_emplace( "entity_id", m.id.to_string() ); m.metadata.created = *ts; WRAP_CODE_LINE( auto status = db::create( m, apm, skipVersion ); ) if ( status != 200 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error creating entity" ); return error( status, "Error creating entity"sv, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const simdjson::simdjson_error& e ) { LOG_WARN << "JSON parse error processing " << req.path << ". " << e.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, e, "JSON exception" ); return error( 400, "Error parsing payload"sv, methods, req.header, apm ); } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename AuthFunction> auto update( const spt::http2::framework::Request& req, std::string_view payload, std::string_view entityId, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn ) -> Response { LOG_INFO << "Handling PUT request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "Update entity {}", M::EntityType() ) ); DEFER( spt::ilp::setDuration( cp ) ); cp.values.try_emplace( "entity_id", std::string{ entityId } ); auto idx = apm.processes.size(); if ( payload.empty() ) { LOG_WARN << "Request to " << req.path << " did not include payload. APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Empty payload" ); return error( 400, "No payload"sv, methods, req.header, apm ); } try { WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } auto& parse = addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); spt::ilp::addCurrentFunction( parse ); parse.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, "Parse JSON payload" ); DEFER( spt::ilp::setDuration( parse ) ); auto m = M{ payload }; spt::ilp::setDuration( parse ); if ( m.id.to_string() != entityId ) { LOG_WARN << "Update entity id " << m.id << " not same as path " << entityId << ". " << spt::util::json::str( m ) << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Mis-matching entity id" ); return error( 400, "Incorrect id"sv, methods, req.header, apm ); } WRAP_CODE_LINE( const auto [vstatus, vmesg] = validate::validate( m, *jwt, apm ); ) if ( vstatus != 200 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Validation failed" ); return error( vstatus, vmesg, methods, req.header, apm ); } std::optional<bsoncxx::oid> restoredFrom = std::nullopt; if ( auto iter = req.header.find( "x-wp-restored-from" ); iter != req.header.end() ) restoredFrom = spt::util::parseId( iter->second.value ); WRAP_CODE_LINE( const auto status = db::update( m, jwt->user.role == model::Role::superuser ? ""sv : jwt->user.customerCode, apm, false, restoredFrom ); ) if ( status != 200 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( status, "Error updating entity"sv, methods, req.header, apm ); } if ( const auto p = model::pipeline<M>(); !p.empty() ) { WRAP_CODE_LINE( auto [mstatus, mopt] = db::retrieve<M>( m.id, m.customer.code, apm ); ) if ( mopt ) m = std::move( *mopt ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const simdjson::simdjson_error& e ) { LOG_WARN << "JSON parse error processing " << req.path << ". " << e.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, e, "JSON exception" ); return error( 400, "Error parsing payload"sv, methods, req.header, apm ); } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename AuthFunction> auto get( const spt::http2::framework::Request& req, std::string_view entityId, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn ) -> Response { LOG_INFO << "Handling GET request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } const auto id = spt::util::parseId( entityId ); if ( !id ) { LOG_INFO << "Rejecting request for " << req.path << " with invalid id " << entityId << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid id" ); return error( 400, methods, req.header, apm ); } WRAP_CODE_LINE( auto [mstatus, m] = db::retrieve<M>( *id, jwt->user.role == model::Role::superuser ? ""sv : jwt->user.customerCode, apm ); ) if ( mstatus != 200 && mstatus != 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( mstatus, "Error retrieving entity"sv, methods, req.header, apm ); } if ( !m ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Entity not found" ); return error( 404, methods, req.header, apm ); } if ( !ptemplate::canRead( *m, *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User cannot access other customer" ); return error( 403, "User not allowed to view entity belonging to other customer"sv, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, *m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename ValueType, typename AuthFunction> auto get( const spt::http2::framework::Request& req, std::string_view property, ValueType value, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn, bool caseInsensitive = false ) -> Response { LOG_INFO << "Handling GET request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "Retrieve entity {} by property {}", M::EntityType(), property ) ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } WRAP_CODE_LINE( auto [mstatus, m] = db::retrieve<M,ValueType>( property, value, jwt->user.role == model::Role::superuser ? ""sv : jwt->user.customerCode, apm, caseInsensitive ); ) if ( mstatus != 200 && mstatus != 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving entity" ); return error( mstatus, "Error retrieving entity"sv, methods, req.header, apm ); } if ( !m ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( 404, methods, req.header, apm ); } if ( !ptemplate::canRead( *m, *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User cannot access other customer" ); return error( 403, "User not allowed to view entity belonging to other customer"sv, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, *m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename ValueType> auto getForCustomer( const spt::http2::framework::Request& req, std::string_view property, ValueType value, std::string_view customerCode, std::span<const std::string>& methods, spt::ilp::APMRecord& apm ) -> Response { LOG_INFO << "Handling GET request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "Retrieve entity {} by property {}", M::EntityType(), property ) ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !superuserRole( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } WRAP_CODE_LINE( auto [mstatus, m] = db::retrieve<M,ValueType>( property, value, std::string{ customerCode.data(), customerCode.size() }, apm ); ) if ( mstatus != 200 && mstatus != 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving entity" ); return error( mstatus, "Error retrieving entity"sv, methods, req.header, apm ); } if ( !m ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( mstatus, "Error retrieving entity"sv, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, *m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename AuthFunction> auto retrieveAll( const spt::http2::framework::Request& req, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn ) -> Response { LOG_INFO << "Handling GET request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "List entity {}", M::EntityType() ) ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [qstatus, eq] = parseQuery( req, apm ); ) if ( qstatus != 200 || !eq ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid query" ); return error( 400, "Invalid query"sv, methods, req.header, apm ); } WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } auto query = bsoncxx::builder::stream::document{}; if ( jwt->user.role != model::Role::superuser ) { query << "customer.code" << jwt->user.customerCode; } if ( eq->after ) { auto oid = spt::util::parseId( *eq->after ); if ( !oid ) { LOG_INFO << "Rejecting request for " << req.path << " with invalid after " << *eq->after << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid after parameter" ); return error( 400, methods, req.header, apm ); } query << "_id" << bsoncxx::builder::stream::open_document << ( eq->descending ? "$lt" : "$gt" ) << *oid << bsoncxx::builder::stream::close_document; } WRAP_CODE_LINE( auto [mstatus, m] = db::query<M>( query << bsoncxx::builder::stream::finalize, *eq, apm ); ) if ( mstatus != 200 && mstatus != 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving entity" ); return error( mstatus, "Error retrieving entity"sv, methods, req.header, apm ); } if ( !m ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( 404, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, *m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M> auto retrieveAll( const spt::http2::framework::Request& req, std::string_view customerCode, std::span<const std::string> methods, spt::ilp::APMRecord& apm ) -> Response { LOG_INFO << "Handling GET request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "List entity {} for customer {}", M::EntityType(), customerCode ) ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [qstatus, eq] = parseQuery( req, apm ); ) if ( qstatus != 200 || !eq ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid query" ); return error( 400, "Invalid query"sv, methods, req.header, apm ); } WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !superuserRole( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } auto query = bsoncxx::builder::stream::document{}; query << "customer.code" << customerCode; if ( eq->after ) { auto oid = spt::util::parseId( *eq->after ); if ( !oid ) { LOG_INFO << "Rejecting request for " << req.path << " with invalid after " << *eq->after << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid after parameter" ); return error( 400, methods, req.header, apm ); } query << "_id" << bsoncxx::builder::stream::open_document << ( eq->descending ? "$lt" : "$gt" ) << *oid << bsoncxx::builder::stream::close_document; } WRAP_CODE_LINE( auto [mstatus, m] = db::query<M>( query << bsoncxx::builder::stream::finalize, *eq, apm ); ) if ( mstatus != 200 && mstatus != 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving entity" ); return error( mstatus, "Error retrieving entity"sv, methods, req.header, apm ); } if ( !m ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( 404, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, *m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename ValueType, typename AuthFunction> auto retrieveAll( const spt::http2::framework::Request& req, std::string_view property, ValueType value, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn ) -> Response { LOG_INFO << "Handling GET request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "List entity {} by property {}", M::EntityType(), property ) ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [qstatus, eq] = parseQuery( req, apm ); ) if ( qstatus != 200 || !eq ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid query" ); return error( 400, "Invalid query"sv, methods, req.header, apm ); } WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } auto query = bsoncxx::builder::stream::document{}; query << property << value; if ( jwt->user.role != model::Role::superuser ) { query << "customer.code" << jwt->user.customerCode; } if ( eq->after ) { auto oid = spt::util::parseId( *eq->after ); if ( !oid ) { LOG_INFO << "Rejecting request for " << req.path << " with invalid after " << *eq->after << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid after parameter" ); return error( 400, methods, req.header, apm ); } query << "_id" << bsoncxx::builder::stream::open_document << ( eq->descending ? "$lt" : "$gt" ) << *oid << bsoncxx::builder::stream::close_document; } WRAP_CODE_LINE( auto [mstatus, m] = db::query<M>( query << bsoncxx::builder::stream::finalize, *eq, apm ); ) if ( mstatus != 200 && mstatus != 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving entity" ); return error( mstatus, "Error retrieving entity"sv, methods, req.header, apm ); } if ( !m ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( 404, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, *m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename AuthFunction> auto byReference( const spt::http2::framework::Request& req, bsoncxx::oid id, std::string_view type, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn ) -> Response { LOG_INFO << "Handling GET request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "List entity {} by reference {}", M::EntityType(), type ) ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [qstatus, eq] = parseQuery( req, apm ); ) if ( qstatus != 200 || !eq ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid query" ); return error( 400, "Invalid query"sv, methods, req.header, apm ); } WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } if ( auto t = magic_enum::enum_cast<model::EntityType>( type ); !t ) { LOG_WARN << "User " << jwt->user.username << " specified invalid type " << type << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid type" ); return error( 400, "Invalid type"sv, methods, req.header, apm ); } auto query = bsoncxx::builder::stream::document{}; query << "references._id" << id << "references.type" << type; if ( jwt->user.role != model::Role::superuser ) { query << "customer.code" << jwt->user.customerCode; } if ( eq->after ) { auto oid = spt::util::parseId( *eq->after ); if ( !oid ) { LOG_INFO << "Rejecting request for " << req.path << " with invalid after " << *eq->after << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid after parameter" ); return error( 400, methods, req.header, apm ); } query << "_id" << bsoncxx::builder::stream::open_document << ( eq->descending ? "$lt" : "$gt" ) << *oid << bsoncxx::builder::stream::close_document; } WRAP_CODE_LINE( auto [mstatus, m] = db::query<M>( query << bsoncxx::builder::stream::finalize, *eq, apm ); ) if ( mstatus != 200 && mstatus != 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving entity" ); return error( mstatus, "Error retrieving entity"sv, methods, req.header, apm ); } if ( !m ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( 404, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, *m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename AuthFunction> auto between( const spt::http2::framework::Request& req, std::string_view property, std::chrono::time_point<std::chrono::system_clock> start, std::chrono::time_point<std::chrono::system_clock> end, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn ) -> Response { LOG_INFO << "Handling GET request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "List entity {} by date range", M::EntityType() ) ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [qstatus, eq] = parseQuery( req, apm ); ) if ( qstatus != 200 || !eq ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid query" ); return error( 400, "Invalid query"sv, methods, req.header, apm ); } WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } auto filter = db::filter::Between{}; filter.field = property; filter.from = start; filter.to = end; filter.descending = eq->descending; if ( eq->after ) { const auto a = spt::util::parseISO8601( *eq->after ); if ( !a.has_value() ) { LOG_INFO << "Rejecting request for " << req.path << " with invalid after " << *eq->after << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid after parameter" ); return error( 400, a.error(), methods, req.header, apm ); } filter.after = a.value(); } if ( jwt->user.role != model::Role::superuser ) filter.customer = jwt->user.customerCode; WRAP_CODE_LINE( auto [mstatus, m] = db::between<M>( std::move( filter ), *eq, apm ); ) if ( mstatus != 200 && mstatus != 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving entity" ); return error( mstatus, "Error retrieving entities"sv, methods, req.header, apm ); } if ( !m ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( 404, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, *m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename AuthFunction> auto between( const spt::http2::framework::RoutingRequest& req, boost::container::flat_map<std::string_view, std::string_view>&& args, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn ) { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( cp ) ); auto type = args["type"sv]; if ( type != "created"sv && type != "modified"sv ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid type" ); return error( 404, methods, req.req.header, apm ); } auto prop = std::format( "metadata.{}", type ); const auto svar = spt::util::parseISO8601( args["start"sv] ); if ( !svar.has_value() ) { LOG_WARN << "Invalid start date in path " << req.req.path << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid start date" ); return error( 400, svar.error(), methods, req.req.header, apm ); } const auto evar = spt::util::parseISO8601( args["end"sv] ); if ( !evar.has_value() ) { LOG_WARN << "Invalid end date in path " << req.req.path << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid end date" ); return error( 400, evar.error(), methods, req.req.header, apm ); } auto idx = apm.processes.size(); auto resp = between<M>( req.req, prop, svar.value(), evar.value(), methods, apm, std::forward<AuthFunction>( authfn ) ); spt::ilp::addCurrentFunction( apm.processes[idx] ); return resp; } template <Model M, typename AuthFunction> auto refcounts( const spt::http2::framework::Request& req, std::string_view entityId, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn ) -> Response { LOG_INFO << "Handling GET request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "Count references to entity {}", M::EntityType() ) ); cp.values.try_emplace( "entity_id", std::string{ entityId } ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } auto id = spt::util::parseId( entityId ); if ( !id ) { LOG_INFO << "Rejecting request for " << req.path << " with invalid id " << entityId << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid id" ); return error( 400, methods, req.header, apm ); } WRAP_CODE_LINE( auto [mstatus, m] = db::refcounts<M>( *id, apm ); ) if ( mstatus != 200 && mstatus != 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving entity" ); return error( mstatus, "Error counting references to entity"sv, methods, req.header, apm ); } if ( mstatus == 404 || !m ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( 404, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, *m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename AuthFunction> auto remove( const spt::http2::framework::Request& req, std::string_view entityId, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn ) -> Response { LOG_INFO << "Handling DELETE request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "Remove entity {}", M::EntityType() ) ); cp.values.try_emplace( "entity_id", std::string{ entityId } ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } auto id = spt::util::parseId( entityId ); if ( !id ) { LOG_INFO << "Rejecting request for " << req.path << " with invalid id " << entityId << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid id" ); return error( 400, methods, req.header, apm ); } WRAP_CODE_LINE( const auto mstatus = db::remove<M>( *id, jwt->user.role == model::Role::superuser ? ""sv : jwt->user.customerCode, apm ); ) if ( mstatus != 200 && mstatus != 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error deleting entity" ); return error( mstatus, "Error deleting entity"sv, methods, req.header, apm ); } if ( mstatus == 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Entity does not exist" ); return error( mstatus, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename AuthFunction> auto versionHistorySummary( const spt::http2::framework::Request& req, std::string_view entityId, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction&& authfn ) -> Response { LOG_INFO << "Handling GET request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "Retrieve version history summary for entity {}", M::EntityType() ) ); cp.values.try_emplace( "entity_id", std::string{ entityId } ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } auto id = spt::util::parseId( entityId ); if ( !id ) { LOG_INFO << "Rejecting request for " << req.path << " with invalid id " << entityId << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid id" ); return error( 400, methods, req.header, apm ); } WRAP_CODE_LINE( auto [mstatus, m] = db::versionHistorySummary<M>( *id, apm ); ) if ( mstatus != 200 && mstatus != 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving entity" ); return error( mstatus, "Error retrieving entity history summaries"sv, methods, req.header, apm ); } if ( !m ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( 404, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, *m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } template <Model M, typename AuthFunction> auto versionHistoryDocument( const spt::http2::framework::Request& req, std::string_view historyId, std::span<const std::string> methods, spt::ilp::APMRecord& apm, AuthFunction authfn ) -> Response { LOG_INFO << "Handling GET request for " << req.path << ". APM id: " << apm.id; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, std::format( "Retrieve version history document for entity {}", M::EntityType() ) ); cp.values.try_emplace( "entity_id", std::string{ historyId } ); DEFER( spt::ilp::setDuration( cp ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [astatus, jwt] = authorise( req, apm ); ) if ( astatus != 200 || !jwt ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid bearer token" ); return error( astatus, methods, req.header, apm ); } if ( !authfn( M::EntityType(), *jwt ) ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "User role not authorised" ); return error( 401, methods, req.header, apm ); } auto id = spt::util::parseId( historyId ); if ( !id ) { LOG_INFO << "Rejecting request for " << req.path << " with invalid id " << historyId << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid id" ); return error( 400, methods, req.header, apm ); } WRAP_CODE_LINE( auto [mstatus, m] = db::versionHistoryDocument<M>( *id, apm ); ) if ( mstatus != 200 && mstatus != 404 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving entity" ); return error( mstatus, "Error retrieving entity history document"sv, methods, req.header, apm ); } if ( !m ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error" ); return error( 404, methods, req.header, apm ); } LOG_INFO << "Writing response for " << req.path << ". APM id: " << apm.id; auto resp = Response{ req.header }; resp.jwt = jwt; WRAP_CODE_LINE( output( req, resp, *m, apm ); ) resp.correlationId = correlationId( req ); resp.set( methods, Response::origins() ); resp.entity = M::EntityType(); return resp; } catch ( const bsoncxx::exception& b ) { LOG_WARN << "BSON error processing " << req.path << ". " << b.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, b, "BSON exception" ); return error( 417, "Error creating entity"sv, methods, req.header, apm ); } catch ( const std::exception& ex ) { LOG_WARN << "Error processing request " << req.path << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); } catch ( ... ) { LOG_WARN << "Unexpected error processing request " << req.path << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return error( 500, "Internal server error"sv, methods, req.header, apm ); } inline std::optional<model::Paginate> paginate( const model::EntitiesQuery& eq, spt::ilp::APMRecord& apm ) { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, "Retrieve paginate" ); DEFER( spt::ilp::setDuration( cp ) ); auto oid = spt::util::parseId( *eq.after ); if ( !oid ) { LOG_INFO << "Invalid after " << *eq.after << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Invalid after parameter" ); return std::nullopt; } auto idx = apm.processes.size(); auto [status, opt] = db::retrieve<model::Paginate>( *oid, ""sv, apm ); spt::ilp::addCurrentFunction( apm.processes[idx] ); if ( !opt ) { LOG_WARN << "No entities retrieved after " << *oid << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving entity" ); return std::nullopt; } return std::move( opt ); } }

Template functions for data access introduced earlier in REST API Framework updated with instrumentation.

// // Created by Rakesh on 14/02/2021. // #pragma once #include "filter/between.hpp" #include "filter/id.hpp" #include "filter/property.hpp" #include "metadata.hpp" #include "model/entities.hpp" #include "model/entitiesquery.hpp" #include "model/json.hpp" #include "model/ilp.hpp" #include "model/model.hpp" #include "model/refcount.hpp" #include "model/versionhistory.hpp" #include "util/config.hpp" #include "util/stacktrace.hpp" #include <bsoncxx/builder/stream/document.hpp> #include <bsoncxx/document/view.hpp> #include <bsoncxx/json.hpp> #include <chrono> #include <fmt/format.h> #include <ilp/apmrecord.hpp> #include <log/NanoLog.hpp> #include <mongo-service/api/api.hpp> #include <mongo-service/api/repository/repository.hpp> #include <mongo-service/common/util/bson.hpp> #include <mongo-service/common/util/defer.hpp> #include <range/v3/range/concepts.hpp> #include <range/v3/algorithm/find_if.hpp> namespace spt::db { using std::operator""sv; using std::operator""s; using model::Model; #define WRAP_CODE_LINE(...) \ idx = apm.processes.size(); \ __VA_ARGS__ \ spt::ilp::addCurrentFunction( apm.processes[idx] ); inline std::string cacheKey( std::string_view type, bsoncxx::oid id ) { return fmt::format( "/cache/entity/{}/id/{}", type, id.to_string() ); } template <Model M> std::tuple<int16_t, int32_t> count( bsoncxx::document::view query, spt::ilp::APMRecord& apm ) { auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); auto count = spt::mongoservice::api::model::request::Count{ bsoncxx::document::value{ query } }; count.database = M::Database(); count.collection = M::Collection(); count.options.emplace(); count.options->limit = 10'000; count.options->maxTime = std::chrono::milliseconds{ 500 }; count.correlationId = apm.id; auto result = spt::mongoservice::api::repository::count( count ); if ( !result.has_value() ) { LOG_WARN << "Error counting " << M::Database() << ':' << M::Collection() << ". " << magic_enum::enum_name( result.error().cause ) << ". " << result.error().message << ". APM id: " << apm.id; p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", result.error().message ) ); return { 417, 0 }; } return { 200, result.value().count }; } template <Model M> std::tuple<int16_t, std::optional<bsoncxx::oid>> lastId( bsoncxx::document::view query, bsoncxx::document::value&& projection, bsoncxx::document::value&& sort, spt::ilp::APMRecord& apm ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::open_document; using bsoncxx::builder::stream::close_document; using bsoncxx::builder::stream::finalize; auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); auto retrieve = spt::mongoservice::api::model::request::Retrieve{ bsoncxx::document::value{ query } }; retrieve.database = M::Database(); retrieve.collection = M::Collection(); retrieve.options.emplace(); retrieve.options->projection.emplace( std::move( projection ) ); retrieve.options->limit = 1; retrieve.options->sort.emplace( std::move( sort ) ); retrieve.correlationId = apm.id; const auto result = spt::mongoservice::api::repository::retrieve<spt::mongoservice::api::model::request::IdFilter>( retrieve ); if ( !result.has_value() ) { LOG_WARN << "Error retrieving last document id from " << M::Database() << ':' << M::Collection() << ". " << magic_enum::enum_name( result.error().cause ) << ". " << result.error().message << ". APM id: " << apm.id; p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", result.error().message ) ); return { 417, std::nullopt }; } if ( result.value().results.empty() ) { LOG_INFO << "No last document id from " << M::Database() << ':' << M::Collection() << ". " << bsoncxx::to_json( query ) << ". APM id: " << apm.id; p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "No results returned." ); return { 412, std::nullopt }; } return { 200, result.value().results.front().id }; } template <Model M> uint16_t create( const M& m, spt::ilp::APMRecord& apm, bool skipVersion = false ) { auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); try { auto now = std::chrono::system_clock::now(); auto md = Metadata{}; md.year = std::format( "{:%Y}", now ); md.month = std::format( "{:%m}", now ); auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); spt::ilp::addCurrentFunction( cp ); auto cr = spt::mongoservice::api::model::request::CreateWithReference<M, Metadata>{ m, md }; cr.database = M::Database(); cr.collection = M::Collection(); cr.skipVersion = skipVersion; cr.correlationId = apm.id; const auto res = spt::mongoservice::api::repository::create( cr ); spt::ilp::setDuration( cp ); if ( !res.has_value() ) { LOG_WARN << "Error creating model " << spt::util::json::str( m ) << ". " << magic_enum::enum_name( res.error().cause ) << ". " << res.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", res.error().message ) ); return 417; } if ( !skipVersion ) { auto& step = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); DEFER( spt::ilp::setDuration( step ) ); step.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, "Cache model" ); auto bson = spt::util::bson( m ); auto bv = bson.view().get_document().value; util::Configuration::instance().set( cacheKey( M::EntityType(), m.id ), std::string{ reinterpret_cast<const char*>( bv.data() ), bv.length() }, 600 ); } return 200; } catch ( const std::exception& ex ) { LOG_WARN << "Error writing model " << ex.what() << ". " << spt::util::json::str( m ) << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return 422; } catch ( ... ) { LOG_WARN << "Unknown error writing model. " << spt::util::json::str( m ) << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return 500; } template <Model M> uint16_t update( const M& m, std::string_view customer, spt::ilp::APMRecord& apm, bool skipVersion = false, std::optional<bsoncxx::oid> restoredFrom = std::nullopt ) { auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); try { auto filter = filter::Id{}; filter.id = m.id; filter.customer = customer; auto now = std::chrono::system_clock::now(); auto md = Metadata{}; md.year = std::format( "{:%Y}", now ); md.month = std::format( "{:%m}", now ); if ( restoredFrom ) md.restoredFrom = *restoredFrom; auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); spt::ilp::addCurrentFunction( cp ); auto ur = spt::mongoservice::api::model::request::ReplaceWithReference<M, Metadata, filter::Id>{ filter, m, md }; ur.database = M::Database(); ur.collection = M::Collection(); ur.skipVersion = skipVersion; ur.correlationId = apm.id; const auto res = spt::mongoservice::api::repository::update( ur ); spt::ilp::setDuration( cp ); if ( !res.has_value() ) { LOG_WARN << "Error replacing model " << spt::util::json::str( m ) << ". " << magic_enum::enum_name( res.error().cause ) << ". " << res.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", res.error().message ) ); return 417; } auto& step = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); DEFER( spt::ilp::setDuration( step ) ); step.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, "Cache model" ); auto bson = spt::util::bson( m ); auto bv = bson.view().get_document().value; util::Configuration::instance().set( cacheKey( M::EntityType(), m.id ), std::string{ reinterpret_cast<const char*>( bv.data() ), bv.length() }, 600 ); return 200; } catch ( const std::exception& ex ) { LOG_WARN << "Error replacing model " << ex.what() << ". " << spt::util::json::str( m ) << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return 422; } catch ( ... ) { LOG_WARN << "Unknown error replacing model. " << spt::util::json::str( m ) << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return 500; } template <Model M> std::tuple<int16_t, std::optional<M>> pipeline( std::vector<spt::mongoservice::api::model::request::Pipeline::Document::Stage> stages, spt::ilp::APMRecord& apm, bool caseInsensitive = false ) { auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); try { auto model = spt::mongoservice::api::model::request::Pipeline{}; model.database = M::Database(); model.collection = M::Collection(); model.options.emplace(); model.options->limit = 1; model.correlationId = apm.id; if ( caseInsensitive ) { model.options->collation.emplace(); model.options->collation->locale = "en"; model.options->collation->strength = 1; } model.document.specification.insert( model.document.specification.end(), std::make_move_iterator( stages.begin() ), std::make_move_iterator( stages.end() ) ); stages.erase( stages.begin(), stages.end() ); auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); spt::ilp::addCurrentFunction( cp ); auto resp = spt::mongoservice::api::repository::pipeline<M>( model ); spt::ilp::setDuration( cp ); if ( !resp.has_value() ) { LOG_WARN << "Error executing query against " << M::Database() << ':' << M::Collection() << ". " << spt::util::json::str( model.document ) << ". " << magic_enum::enum_name( resp.error().cause ) << ". " << resp.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", resp.error().message ) ); return { 417, std::nullopt }; } if ( resp.value().results.empty() ) { LOG_WARN << "No matching documents for query against " << M::Database() << ':' << M::Collection() << ". " << spt::util::json::str( model.document ) << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "No matching documents" ); return { 404, std::nullopt }; } return { 200, std::move( resp.value().results.front() ) }; } catch ( const std::exception& ex ) { LOG_WARN << "Error executing query against " << M::Database() << ':' << M::Collection() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return { 422, std::nullopt }; } catch ( ... ) { LOG_WARN << "Unknown error executing query against " << M::Database() << ':' << M::Collection() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return { 500, std::nullopt }; } template <Model M> std::tuple<int16_t, std::optional<model::Entities<M>>> pipeline( bsoncxx::document::value match, std::vector<spt::mongoservice::api::model::request::Pipeline::Document::Stage> stages, model::EntitiesQuery options, spt::ilp::APMRecord& apm, bool caseInsensitive = false ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::open_document; using bsoncxx::builder::stream::close_document; using bsoncxx::builder::stream::finalize; auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); auto idx = apm.processes.size(); try { auto req = spt::mongoservice::api::model::request::Pipeline{}; req.database = M::Database(); req.collection = M::Collection(); req.correlationId = apm.id; req.document.specification = std::move( stages ); if ( caseInsensitive ) { req.options.emplace(); req.options->collation.emplace(); req.options->collation->locale = "en"; req.options->collation->strength = 1; } auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); spt::ilp::addCurrentFunction( cp ); auto resp = spt::mongoservice::api::repository::pipeline<M>( req ); spt::ilp::setDuration( cp ); if ( !resp.has_value() ) { LOG_WARN << "Error executing query against " << M::Database() << ':' << M::Collection() << ". " << spt::util::json::str( req.document ) << ". " << magic_enum::enum_name( resp.error().cause ) << ". " << resp.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", resp.error().message ) ); return { 417, std::nullopt }; } model::Entities<M> ms; ms.entities = std::move( resp.value().results ); ms.page = std::size( ms.entities ); if ( !options.after && ms.page > 0 && ms.page < options.limit ) { ms.total = ms.page; } else { WRAP_CODE_LINE( if ( const auto [cstatus, csize] = count<M>( match, apm ); cstatus == 200 ) ms.total = csize; ) } if ( ms.page > 0 && ms.total != ms.page ) { WRAP_CODE_LINE( auto [lstatus, lv] = lastId<M>( match, document{} << "_id" << 1 << finalize, document{} << "_id" << (options.descending ? 1 : -1) << finalize, apm ); ) if ( lstatus != 200 ) return { lstatus, std::nullopt }; auto lid = ms.entities.back().id; if ( lid != *lv ) ms.next = lid.to_string(); } return { 200, std::move( ms ) }; } catch ( const std::exception& ex ) { LOG_WARN << "Error executing query against " << M::Database() << ':' << M::Collection() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return { 422, std::nullopt }; } catch ( ... ) { LOG_WARN << "Unknown error executing query against " << M::Database() << ':' << M::Collection() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return { 500, std::nullopt }; } template <Model M> std::tuple<int16_t, std::optional<std::vector<M>>> rawpipeline( std::vector<spt::mongoservice::api::model::request::Pipeline::Document::Stage> stages, spt::ilp::APMRecord& apm ) { auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); try { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "retrieve data" ); spt::ilp::addCurrentFunction( cp ); auto req = spt::mongoservice::api::model::request::Pipeline{}; req.database = M::Database(); req.collection = M::Collection(); req.correlationId = apm.id; req.document.specification = std::move( stages ); auto res = spt::mongoservice::api::repository::pipeline<M>( req ); if ( !res.has_value() ) { LOG_WARN << "Error executing query against " << M::Database() << ':' << M::Collection() << ". " << spt::util::json::str( req.document ) << ". " << magic_enum::enum_name( res.error().cause ) << ". " << res.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", res.error().message ) ); return { 417, std::nullopt }; } return { 200, std::move( res.value().results ) }; } catch ( const std::exception& ex ) { LOG_WARN << "Error executing query against " << M::Database() << ':' << M::Collection() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return { 422, std::nullopt }; } catch ( ... ) { LOG_WARN << "Unknown error executing query against " << M::Database() << ':' << M::Collection() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return { 500, std::nullopt }; } template <Model M> std::tuple<int16_t, std::optional<M>> retrieve( bsoncxx::oid id, std::string_view customer, spt::ilp::APMRecord& apm ) { auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); auto idx = apm.processes.size(); if ( auto pl = model::pipeline<M>(); !pl.empty() ) { LOG_DEBUG << "Entity " << M::EntityType() << " requires pipeline. APM id: " << apm.id; auto filter = filter::Id{ customer, id }; auto stages = std::vector<spt::mongoservice::api::model::request::Pipeline::Document::Stage>{}; stages.reserve( pl.size() + 1 ); stages.emplace_back( "$match", spt::util::bson( filter ) ); stages.insert( stages.end(), std::make_move_iterator( pl.begin() ), std::make_move_iterator( pl.end() ) ); pl.erase( pl.begin(), pl.end() ); WRAP_CODE_LINE( auto result = pipeline<M>( std::move( stages ), apm ); ) return result; } const auto ckey = cacheKey( M::EntityType(), id ); { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "retrieve cached value" ); spt::ilp::addCurrentFunction( cp ); if ( auto cached = util::Configuration::instance().get( ckey ); cached ) { auto mv = bsoncxx::document::view( reinterpret_cast<const uint8_t*>( cached->data() ), cached->size() ); auto m = M{ mv }; cp.values.try_emplace( model::ilp::name::APM_NOTE_VALUE, "Return cached value" ); spt::ilp::setDuration( cp ); return { 200, std::move( m ) }; } spt::ilp::setDuration( cp ); } try { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "retrieve data" ); spt::ilp::addCurrentFunction( cp ); auto r = spt::mongoservice::api::model::request::Retrieve<filter::Id>{}; r.database = M::Database(); r.collection = M::Collection(); r.correlationId = apm.id; r.document.emplace( customer, id ); auto resp = spt::mongoservice::api::repository::retrieve<M>( r ); spt::ilp::setDuration( cp ); if ( !resp.has_value() ) { LOG_WARN << "Error retrieving document " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". " << magic_enum::enum_name( resp.error().cause ) << ". " << resp.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", resp.error().message ) ); return { resp.error().message == "Not found" ? 404 : 417, std::nullopt }; } if ( !resp.value().result ) { LOG_WARN << "No document " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Document not found" ); return { 404, std::nullopt }; } { auto& cache = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); DEFER( spt::ilp::setDuration( cache ) ); cache.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "cache data" ); spt::ilp::addCurrentFunction( cache ); auto bson = spt::util::marshall( *resp.value().result ); auto dv = bson.view(); auto str = std::string{ reinterpret_cast<const char *>( dv.data() ), dv.length() }; util::Configuration::instance().set( ckey, str, 600 ); } return { 200, std::move( resp.value().result ) }; } catch ( const std::exception& ex ) { LOG_WARN << "Error retrieving document " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return { 422, std::nullopt }; } catch ( ... ) { LOG_WARN << "Unknown error retrieving document " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return { 500, std::nullopt }; } template<Model M, typename ValueType> std::tuple<int16_t, std::optional<M>> retrieve( std::string_view property, ValueType value, std::string_view customer, spt::ilp::APMRecord& apm, bool caseInsensitive = false ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::finalize; auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); auto idx = apm.processes.size(); if ( auto pl = model::pipeline<M>(); !pl.empty() ) { LOG_DEBUG << "Entity " << M::EntityType() << " requires pipeline. APM id: " << apm.id; auto stages = std::vector<spt::mongoservice::api::model::request::Pipeline::Document::Stage>{}; stages.reserve( 3 + pl.size() ); auto match = filter::Property<ValueType>{ property, value, customer }; stages.emplace_back( "$match", spt::util::bson( match ) ); stages.emplace_back( "$sort", document{} << "_id" << -1 << finalize ); stages.emplace_back( "$limit", bsoncxx::types::b_int32{ 1 } ); stages.insert( stages.end(), std::make_move_iterator( pl.begin() ), std::make_move_iterator( pl.end() ) ); pl.erase( pl.begin(), pl.end() ); WRAP_CODE_LINE( auto result = pipeline<M>( std::move( stages ), apm, caseInsensitive ); ) return result; } try { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "retrieve data" ); spt::ilp::addCurrentFunction( cp ); auto r = spt::mongoservice::api::model::request::Retrieve<filter::Property<ValueType>>{}; r.database = M::Database(); r.collection = M::Collection(); r.correlationId = apm.id; r.document.emplace( property, value, customer ); r.options.emplace(); r.options->limit = 1; r.options->sort = document{} << "_id" << -1 << finalize; if ( caseInsensitive ) { r.options->collation.emplace(); r.options->collation->locale = "en"; r.options->collation->strength = 1; } auto resp = spt::mongoservice::api::repository::retrieve<M>( r ); spt::ilp::setDuration( cp ); if ( !resp.has_value() ) { LOG_WARN << "Unable to retrieve document from " << M::Database() << ':' << M::Collection() << " with property: " << property << ", value: " << value << ". " << magic_enum::enum_name( resp.error().cause ) << ". " << resp.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", resp.error().message ) ); return { resp.error().message == "Not found" ? 404 : 417, std::nullopt }; } if ( resp.value().results.empty() ) { LOG_WARN << "No matching document " << M::Database() << ':' << M::Collection() << " with property: " << property << ", value: " << value << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Document not found" ); return { 404, std::nullopt }; } return { 200, std::move( resp.value().results.front() ) }; } catch ( const std::exception& ex ) { LOG_WARN << "Error retrieving document " << M::Database() << ':' << M::Collection() << " with property: " << property << ", value: " << value << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return { 422, std::nullopt }; } catch ( ... ) { LOG_WARN << "Unknown error retrieving document " << M::Database() << ':' << M::Collection() << " with property: " << property << ", value: " << value << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return { 500, std::nullopt }; } template <Model M> std::tuple<int16_t, std::optional<model::Entities<M>>> query( bsoncxx::document::value query, const model::EntitiesQuery& options, spt::ilp::APMRecord& apm ) { using bsoncxx::builder::stream::array; using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::open_document; using bsoncxx::builder::stream::close_document; using bsoncxx::builder::stream::finalize; auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); auto idx = apm.processes.size(); if ( auto pl = model::pipeline<M>(); !pl.empty() ) { LOG_DEBUG << "Entity " << M::EntityType() << " requires pipeline. APM id: " << apm.id; auto stages = std::vector<spt::mongoservice::api::model::request::Pipeline::Document::Stage>{}; stages.reserve( pl.size() + 3 ); stages.emplace_back( "$match", query.view() ); stages.emplace_back( "$sort", document{} << "_id" << (options.descending ? -1 : 1) << finalize ); stages.emplace_back( "$limit", bsoncxx::types::b_int32{ 1 } ); stages.insert( stages.end(), std::make_move_iterator( pl.begin() ), std::make_move_iterator( pl.end() ) ); pl.erase( pl.begin(), pl.end() ); WRAP_CODE_LINE( auto result = pipeline<M>( std::move( query ), std::move( stages ), options, apm ); ) return result; } try { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "retrieve data" ); spt::ilp::addCurrentFunction( cp ); auto retrieve = spt::mongoservice::api::model::request::Retrieve{ bsoncxx::document::value{ query } }; retrieve.database = M::Database(); retrieve.collection = M::Collection(); retrieve.correlationId = apm.id; retrieve.options.emplace(); retrieve.options->limit = options.limit; retrieve.options->sort = document{} << "_id" << (options.descending ? -1 : 1) << finalize; auto result = spt::mongoservice::api::repository::retrieve<M>( retrieve ); spt::ilp::setDuration( cp ); if ( !result.has_value() ) { LOG_WARN << "Unable to execute query against " << M::Database() << ':' << M::Collection() << ". " << magic_enum::enum_name( result.error().cause ) << ". " << result.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", result.error().message ) ); return { 417, std::nullopt }; } model::Entities<M> ms; ms.entities = std::move( result.value().results ); ms.page = std::size( ms.entities ); if ( !options.after && ms.page > 0 && ms.page < options.limit ) { ms.total = ms.page; } else { WRAP_CODE_LINE( if ( const auto [cstatus, csize] = count<M>( query, apm ); cstatus == 200 ) ms.total = csize; ) } if ( ms.page > 0 && ms.total != ms.page ) { WRAP_CODE_LINE( auto [lstatus, lv] = lastId<M>( query, document{} << "_id" << 1 << finalize, document{} << "_id" << (options.descending ? 1 : -1) << finalize, apm ); ) if ( lstatus != 200 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving last id" ); return { lstatus, std::nullopt }; } auto lid = ms.entities.back().id; if ( lid != *lv ) ms.next = lid.to_string(); } return { 200, std::move( ms ) }; } catch ( const std::exception& ex ) { LOG_WARN << "Error executing query against " << M::Database() << ':' << M::Collection() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return { 422, std::nullopt }; } catch ( ... ) { LOG_WARN << "Unknown error executing query against " << M::Database() << ':' << M::Collection() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return { 500, std::nullopt }; } template <Model M> std::tuple<int16_t, std::optional<std::vector<M>>> rawquery( bsoncxx::document::value query, const model::EntitiesQuery& options, spt::ilp::APMRecord& apm, bool caseInsensitive = false ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::open_document; using bsoncxx::builder::stream::close_document; using bsoncxx::builder::stream::finalize; auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); try { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "retrieve data" ); spt::ilp::addCurrentFunction( cp ); auto retrieve = spt::mongoservice::api::model::request::Retrieve{ bsoncxx::document::value{ query } }; retrieve.database = M::Database(); retrieve.collection = M::Collection(); retrieve.correlationId = apm.id; retrieve.options.emplace(); retrieve.options->limit = options.limit; retrieve.options->sort = document{} << "_id" << (options.descending ? -1 : 1) << finalize; if ( caseInsensitive ) { retrieve.options->collation.emplace(); retrieve.options->collation->locale = "en"; retrieve.options->collation->strength = 1; } auto result = spt::mongoservice::api::repository::retrieve<M>( retrieve ); spt::ilp::setDuration( cp ); if ( !result.has_value() ) { LOG_WARN << "Unable to execute query against " << M::Database() << ':' << M::Collection() << ". " << magic_enum::enum_name( result.error().cause ) << ". " << result.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", result.error().message ) ); return { 417, std::nullopt }; } return { 200, std::move( result.value().results ) }; } catch ( const std::exception& ex ) { LOG_WARN << "Error executing query against " << M::Database() << ':' << M::Collection() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return { 422, std::nullopt }; } catch ( ... ) { LOG_WARN << "Unknown error executing query against " << M::Database() << ':' << M::Collection() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return { 500, std::nullopt }; } template<Model M, typename ValueType> std::tuple<int16_t, std::optional<std::vector<M>>> rawquery( std::string_view property, ValueType value, std::string_view customer, const model::EntitiesQuery& options, spt::ilp::APMRecord& apm, bool caseInsensitive = false ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::finalize; auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); auto idx = apm.processes.size(); auto f = filter::Property<ValueType>{ property, value, customer }; if ( auto pl = model::pipeline<M>(); !pl.empty() ) { LOG_DEBUG << "Entity " << M::EntityType() << " requires pipeline. APM id: " << apm.id; auto stages = std::vector<spt::mongoservice::api::model::request::Pipeline::Document::Stage>{}; stages.reserve( pl.size() + 3 ); stages.emplace_back( "$match", spt::util::marshall( f ) ); stages.emplace_back( "$sort", document{} << "_id" << (options.descending ? -1 : 1) << finalize ); stages.emplace_back( "$limit", bsoncxx::types::b_int32{ options.limit } ); stages.insert( stages.end(), std::make_move_iterator( pl.begin() ), std::make_move_iterator( pl.end() ) ); pl.erase( pl.begin(), pl.end() ); WRAP_CODE_LINE( auto result = rawpipeline<M>( std::move( stages ), apm ); ) return result; } return rawquery<M>( spt::util::marshall( f ), options, apm, caseInsensitive ); } template <Model M> int16_t transaction( bsoncxx::array::view items, spt::ilp::APMRecord& apm ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::open_document; using bsoncxx::builder::stream::close_document; using bsoncxx::builder::stream::finalize; using spt::mongoservice::api::execute; using spt::mongoservice::api::Request; auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); try { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "retrieve data" ); spt::ilp::addCurrentFunction( cp ); auto req = Request::transaction( std::string{ M::Database() }, std::string{ M::Collection() }, document{} << "items" << items << finalize ); req.correlationId = apm.id; const auto [type, opt] = execute( req ); spt::ilp::setDuration( cp ); if ( !opt ) { LOG_WARN << "Unable to execute transaction against " << M::Database() << ':' << M::Collection() << ". " << req.document << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error." ); return 424; } const auto view = opt->view(); if ( const auto err = spt::util::bsonValueIfExists<std::string>( "error", view ); err ) { LOG_WARN << "Error executing query against " << M::Database() << ':' << M::Collection() << ". " << req.document << ". " << view << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Error executing query. {}", *err ) ); return 417; } LOG_INFO << "Transaction results " << view << ". APM id: " << apm.id; return 200; } catch ( const std::exception& ex ) { LOG_WARN << "Error executing query against " << M::Database() << ':' << M::Collection() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return 422; } catch ( ... ) { LOG_WARN << "Unknown error executing query against " << M::Database() << ':' << M::Collection() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return 500; } template <Model M> std::tuple<int16_t, std::optional<model::Entities<M>>> between( const filter::Between& filter, model::EntitiesQuery options, spt::ilp::APMRecord& apm ) { using bsoncxx::builder::stream::array; using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::open_document; using bsoncxx::builder::stream::close_document; using bsoncxx::builder::stream::finalize; using spt::mongoservice::api::execute; using spt::mongoservice::api::Request; auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); auto idx = apm.processes.size(); if ( auto pl = model::pipeline<M>(); !pl.empty() ) { LOG_DEBUG << "Entity " << M::EntityType() << " requires pipeline. APM id: " << apm.id; auto stages = std::vector<spt::mongoservice::api::model::request::Pipeline::Document::Stage>{}; stages.reserve( pl.size() + 3 ); stages.emplace_back( "$match", spt::util::bson( filter ) ); stages.emplace_back( "$sort", document{} << filter.field << (options.descending ? -1 : 1) << finalize ); stages.emplace_back( "$limit", bsoncxx::types::b_int32{ options.limit } ); stages.insert( stages.end(), std::make_move_iterator( pl.begin() ), std::make_move_iterator( pl.end() ) ); pl.erase( pl.begin(), pl.end() ); WRAP_CODE_LINE( auto result = pipeline<M>( spt::util::marshall( filter ), std::move( stages ), options, apm ); ) return result; } try { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "retrieve data" ); spt::ilp::addCurrentFunction( cp ); auto retrieve = spt::mongoservice::api::model::request::Retrieve{ spt::util::marshall( filter ) }; retrieve.database = M::Database(); retrieve.collection = M::Collection(); retrieve.correlationId = apm.id; retrieve.options.emplace(); retrieve.options->limit = options.limit; retrieve.options->sort = document{} << filter.field << (options.descending ? -1 : 1) << finalize; auto res = spt::mongoservice::api::repository::retrieve<M>( retrieve ); spt::ilp::setDuration( cp ); if ( !res.has_value() ) { LOG_WARN << "Error executing query against " << M::Database() << ':' << M::Collection() << ". " << magic_enum::enum_name( res.error().cause ) << ". " << res.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", res.error().message ) ); return { 417, std::nullopt }; } model::Entities<M> ms; ms.entities = std::move( res.value().results ); ms.page = std::size( ms.entities ); if ( !options.after && ms.page > 0 && ms.page < options.limit ) { ms.total = ms.page; } else { WRAP_CODE_LINE( if ( const auto [cstatus, csize] = count<M>( *retrieve.document, apm ); cstatus == 200 ) ms.total = csize; ) } if ( ms.page > 0 && ms.total != ms.page ) { WRAP_CODE_LINE( auto [lstatus, lv] = lastId<M>( *retrieve.document, document{} << filter.field << 1 << finalize, document{} << filter.field << (options.descending ? 1 : -1) << finalize, apm ); ) if ( lstatus != 200 ) { cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Error retrieving last id" ); return { lstatus, std::nullopt }; } if ( ms.entities.back().id != *lv ) { ms.next = spt::util::isoDateMillis( filter.field == "created"sv ? ms.entities.back().metadata.created : ms.entities.back().metadata.modified ); } } return { 200, std::move( ms ) }; } catch ( const std::exception& ex ) { LOG_WARN << "Error executing query against " << M::Database() << ':' << M::Collection() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return { 422, std::nullopt }; } catch ( ... ) { LOG_WARN << "Unknown error executing query against " << M::Database() << ':' << M::Collection() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return { 500, std::nullopt }; } template <Model M> std::tuple<int16_t, std::optional<model::RefCounts>> refcounts( bsoncxx::oid id, spt::ilp::APMRecord& apm ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::finalize; using spt::mongoservice::api::execute; using spt::mongoservice::api::Request; auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); const auto refs = M::References(); if ( refs.empty() ) return { 200, std::nullopt }; try { model::RefCounts rf; rf.references.reserve( refs.size() ); for ( const auto& ref : refs ) { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "retrieve data" ); spt::ilp::addCurrentFunction( cp ); auto req = spt::mongoservice::api::model::request::Count<bsoncxx::document::value>{ document{} << ref.field << id << finalize }; req.database = model::database( ref.type ); req.collection = model::collection( ref.type ); req.correlationId = apm.id; const auto resp = spt::mongoservice::api::repository::count( req ); spt::ilp::setDuration( cp ); if ( !resp.has_value() ) { LOG_WARN << "Error refcounting document " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << " in collection " << model::collection( ref.type ) << ". " << magic_enum::enum_name( resp.error().cause ) << ". " << resp.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", resp.error().message ) ); return { 417, std::nullopt }; } rf.references.emplace_back( model::RefCount{ static_cast<int32_t>( resp.value().count ), ref.type } ); } return { 200, std::move( rf ) }; } catch ( const std::exception& ex ) { LOG_WARN << "Error retrieving reference counts for " << M::Database() << ':' << M::Collection() << ':' << id << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return { 422, std::nullopt }; } catch ( ... ) { LOG_WARN << "Unknown error retrieving document " << M::Database() << ':' << M::Collection() << ':' << id << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return { 500, std::nullopt }; } template <Model M> int16_t remove( bsoncxx::oid id, std::string_view customer, spt::ilp::APMRecord& apm ) { auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); auto idx = apm.processes.size(); try { WRAP_CODE_LINE( auto [rstatus, rc] = refcounts<M>( id, apm ); ) if ( rstatus != 200 ) return rstatus; if ( rc ) { for ( const auto& r : rc->references ) { if ( r.count > 0 ) { LOG_INFO << "Rejecting delete document " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << " as it is being referenced. " << spt::util::json::str( *rc ) << ". APM id: " << apm.id; p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Cannot deleted entity that is being referenced" ); return 412; } } } auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "remove data" ); spt::ilp::addCurrentFunction( cp ); auto now = std::chrono::system_clock::now(); auto req = spt::mongoservice::api::model::request::Delete<filter::Id, Metadata>( filter::Id{} ); req.database = M::Database(); req.collection = M::Collection(); req.correlationId = apm.id; req.document->id = id; if ( !customer.empty() ) req.document->customer = customer; req.metadata.emplace(); req.metadata->year = std::format( "{:%Y}", now ); req.metadata->month = std::format( "{:%m}", now ); auto resp = spt::mongoservice::api::repository::remove( req ); spt::ilp::setDuration( cp ); if ( !resp.has_value() ) { LOG_WARN << "Error deleting document " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". " << magic_enum::enum_name( resp.error().cause ) << ". " << resp.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", resp.error().message ) ); return 417; } auto iter = ranges::find_if( resp.value().success, [&id]( const bsoncxx::oid& oid ) { return oid == id; } ); if ( iter == ranges::end( resp.value().success ) ) { LOG_WARN << "No document deleted for " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "No document deleted" ); return 417; } { auto& dc = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); dc.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "remove cache entry" ); spt::ilp::addCurrentFunction( dc ); util::Configuration::instance().remove( cacheKey( M::EntityType(), id ) ); spt::ilp::setDuration( dc ); } return 200; } catch ( const std::exception& ex ) { LOG_WARN << "Error removing document " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return 422; } catch ( ... ) { LOG_WARN << "Unknown error removing document " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return 500; } template <Model M> int16_t rawremove( bsoncxx::oid id, std::string_view customer, spt::ilp::APMRecord& apm ) { auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); try { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "remove data" ); spt::ilp::addCurrentFunction( cp ); auto now = std::chrono::system_clock::now(); auto req = spt::mongoservice::api::model::request::Delete<filter::Id, Metadata>( filter::Id{} ); req.database = M::Database(); req.collection = M::Collection(); req.correlationId = apm.id; req.document->id = id; if ( !customer.empty() ) req.document->customer = customer; req.metadata.emplace(); req.metadata->year = std::format( "{:%Y}", now ); req.metadata->month = std::format( "{:%m}", now ); auto resp = spt::mongoservice::api::repository::remove( req ); if ( !resp.has_value() ) { LOG_WARN << "Error deleting document " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". " << magic_enum::enum_name( resp.error().cause ) << ". " << resp.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", resp.error().message ) ); return 417; } auto iter = ranges::find_if( resp.value().success, [&id]( const bsoncxx::oid& oid ) { return oid == id; } ); if ( iter == ranges::end( resp.value().success ) ) { LOG_WARN << "No document deleted for " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "No document deleted" ); return 417; } { auto& dc = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); dc.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "remove cache entry" ); spt::ilp::addCurrentFunction( dc ); util::Configuration::instance().remove( cacheKey( M::EntityType(), id ) ); spt::ilp::setDuration( dc ); } return 200; } catch ( const std::exception& ex ) { LOG_WARN << "Error removing document " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return 422; } catch ( ... ) { LOG_WARN << "Unknown error removing document " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return 500; } template <Model M> int16_t remove( bsoncxx::document::value&& query, spt::ilp::APMRecord& apm ) { auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); try { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "remove data" ); spt::ilp::addCurrentFunction( cp ); const auto now = std::chrono::system_clock::now(); auto req = spt::mongoservice::api::model::request::Delete<bsoncxx::document::value, Metadata>{ std::move( query ) }; req.database = M::Database(); req.collection = M::Collection(); req.correlationId = apm.id; req.metadata.emplace(); req.metadata->year = std::format( "{:%Y}", now ); req.metadata->month = std::format( "{:%m}", now ); auto resp = spt::mongoservice::api::repository::remove( req ); spt::ilp::setDuration( cp ); if ( !resp.has_value() ) { LOG_WARN << "Error deleting document(s) in " << M::Database() << ':' << M::Collection() << ". " << bsoncxx::to_json( req.document->view() ) << ". " << magic_enum::enum_name( resp.error().cause ) << ". " << resp.error().message << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Database error. {}", resp.error().message ) ); return 417; } if ( resp.value().success.empty() ) { LOG_WARN << "No documents deleted for " << M::Database() << ':' << M::Collection() << bsoncxx::to_json( req.document->view() ) << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "No documents deleted" ); return 204; } return 200; } catch ( const std::exception& ex ) { LOG_WARN << "Error removing document(s) in " << M::Database() << ':' << M::Collection() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return 422; } catch ( ... ) { LOG_WARN << "Unknown error removing document(s) in " << M::Database() << ':' << M::Collection() << LOG_WARN << util::stacktrace() << ". APM id: " << apm.id; p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return 500; } template <Model M> std::tuple<int16_t, std::optional<model::Entities<model::VersionHistorySummary>>> versionHistorySummary( bsoncxx::oid id, spt::ilp::APMRecord& apm ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::open_document; using bsoncxx::builder::stream::close_document; using bsoncxx::builder::stream::finalize; using spt::mongoservice::api::execute; using spt::mongoservice::api::Request; auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); const auto& dbc = impl::DatabaseConfiguration::instance(); try { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "remove data" ); spt::ilp::addCurrentFunction( cp ); auto req = Request::retrieve( dbc.historyDb, std::string{ model::VersionHistorySummary::Collection() }, document{} << "database" << M::Database() << "collection" << M::Collection() << "entity._id" << id << finalize ); req.options = document{} << "projection" << open_document << "_id" << 1 << "action" << 1 << "created" << 1 << "metadata" << 1 << close_document << "sort" << open_document << "_id" << 1 << close_document << finalize; req.correlationId = apm.id; const auto [type, opt] = execute( req ); spt::ilp::setDuration( cp ); if ( !opt ) { LOG_WARN << "Unable to retrieve version history summary documents for " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". " << req.document << ". " << *req.options << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error." ); return { 424, std::nullopt }; } const auto view = opt->view(); if ( const auto err = spt::util::bsonValueIfExists<std::string>( "error", view ); err ) { LOG_WARN << "Error retrieving version history summary documents for " << M::Database() << ':' << M::Collection() << ':' << id << ". " << req.document << ". " << *req.options << ". " << view << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Error returned. {}", *err ) ); return { 417, std::nullopt }; } auto arr = spt::util::bsonValueIfExists<bsoncxx::array::view>( "results", view ); if ( !arr ) { LOG_WARN << "No version history summary documents for " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". " << req.document << ". " << *req.options << ". " << view << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "No matches returned." ); return { 404, std::nullopt }; } model::Entities<model::VersionHistorySummary> ms; ms.entities.reserve( std::distance( arr->begin(), arr->end() ) ); std::for_each( std::cbegin( *arr ), std::cend( *arr ), [&ms]( const auto& d ) { ms.entities.emplace_back( d.get_document().value ); } ); ms.page = std::size( ms.entities ); ms.total = ms.page; return { 200, std::move( ms ) }; } catch ( const std::exception& ex ) { LOG_WARN << "Error retrieving version history summary documents for " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return { 422, std::nullopt }; } catch ( ... ) { LOG_WARN << "Unknown error retrieving version history documents for " << M::Database() << ':' << M::Collection() << ':' << id.to_string() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return { 500, std::nullopt }; } template <Model M> std::tuple<int16_t, std::optional<model::VersionHistoryDocument<M>>> versionHistoryDocument( bsoncxx::oid id, spt::ilp::APMRecord& apm ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::open_document; using bsoncxx::builder::stream::close_document; using bsoncxx::builder::stream::finalize; using spt::mongoservice::api::execute; using spt::mongoservice::api::Request; auto& p = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Function ); DEFER( spt::ilp::setDuration( p ) ); const auto& dbc = impl::DatabaseConfiguration::instance(); try { auto& cp = spt::ilp::addProcess( apm, spt::ilp::APMRecord::Process::Type::Step ); cp.values.try_emplace( model::ilp::name::APM_STEP_PROCESS, "remove data" ); spt::ilp::addCurrentFunction( cp ); auto req = Request::retrieve( dbc.historyDb, std::string{ model::VersionHistorySummary::Collection() }, document{} << "_id" << id << "database" << M::Database() << "collection" << M::Collection() << finalize ); req.correlationId = apm.id; const auto [type, opt] = execute( req ); spt::ilp::setDuration( cp ); if ( !opt ) { LOG_WARN << "Unable to retrieve version history document for " << M::Database() << ':' << M::Collection() << " with id: " << id << ". " << req.document << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Database error." ); return { 424, std::nullopt }; } const auto view = opt->view(); if ( const auto err = spt::util::bsonValueIfExists<std::string>( "error", view ); err ) { LOG_WARN << "Error retrieving version history document for " << M::Database() << ':' << M::Collection() << " with id: " << id << ". " << req.document << ". " << view << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, std::format( "Error returned {}.", *err ) ); return { 417, std::nullopt }; } auto doc = spt::util::bsonValueIfExists<bsoncxx::document::view>( "result", view ); if ( !doc ) { LOG_WARN << "No version history document for " << M::Database() << ':' << M::Collection() << " with id: " << id << ". " << req.document << ". " << view << ". APM id: " << apm.id; cp.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "No matches returned." ); return { 404, std::nullopt }; } return { 200, model::VersionHistoryDocument<M>{ *doc } }; } catch ( const std::exception& ex ) { LOG_WARN << "Error retrieving version history document for " << M::Database() << ':' << M::Collection() << " with id: " << id.to_string() << ". " << ex.what() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); spt::ilp::addException( apm, ex, "Exception" ); return { 422, std::nullopt }; } catch ( ... ) { LOG_WARN << "Unknown error retrieving version history document for " << M::Database() << ':' << M::Collection() << " with id: " << id.to_string() << ". APM id: " << apm.id; LOG_WARN << util::stacktrace(); p.values.try_emplace( model::ilp::name::APM_ERROR_VALUE, "Unexpected error" ); } return { 500, std::nullopt }; } }

Sample REST API endpoint handlers that create the APM record which is then passed to the function call chain.

// // Created by Rakesh on 22/08/2024. // #include "http/template.hpp" #include "model/catalog.hpp" using std::operator ""s; using std::operator ""sv; using spt::http2::framework::RoutingRequest; enum class Action : uint_fast8_t { create, retrieve, update, remove }; #define CREATE_APM( action ) \ auto apm = spt::ilp::createAPMRecord( bsoncxx::oid{}.to_string(), "inventory-api", spt::ilp::APMRecord::Process::Type::Function, 48 ); \ apm.tags.try_emplace( "action", std::string{ magic_enum::enum_name( (action) ) } ); \ auto idx = apm.processes.size(); #define FINISH_APM( name ) \ if ( resp.entity.empty() ) resp.entity = (name); \ spt::ilp::addCurrentFunction( apm.processes[1] ); \ spt::ilp::setDuration( apm.processes[0] ); \ resp.apm = std::move( apm ); \ return resp; } #define WRAP_CODE_LINE(...) \ idx = apm.processes.size(); \ __VA_ARGS__ \ spt::ilp::addCurrentFunction( apm.processes[idx] ); void spt::http::addCatalogRoutes( spt::http2::framework::Server<Response>& server ) { static const auto methods = std::array{ "GET"s, "OPTIONS"s, "POST"s, "PUT"s, "DELETE"s }; server.addHandler( "GET"sv, "/catalog/"sv, []( const RoutingRequest& req, auto&& ) { CREATE_APM( Action::retrieve ) WRAP_CODE_LINE( auto resp = retrieveAll<model::Catalog>( req.req, methods, apm, &userRole ); ) FINISH_APM( pcatalog::entity() ) }, "./paths/catalog.yaml#/root"sv ); server.addHandler( "GET"sv, "/catalog/id/{id}"sv, []( const RoutingRequest& req, auto args ) { CREATE_APM( Action::retrieve ) WRAP_CODE_LINE( auto resp = get<model::Catalog>( req.req, args["id"sv], methods, apm, &userRole ); ) FINISH_APM( pcatalog::entity() ) }, "./paths/catalog.yaml#/id"sv ); server.addHandler( "GET"sv, "/catalog/identifier/{identifier}"sv, []( const RoutingRequest& req, auto args ) { CREATE_APM( Action::retrieve ) WRAP_CODE_LINE( auto resp = get<model::Catalog, std::string_view>( req.req, "identifier"sv, args["identifier"sv], methods, apm, &userRole ); ) FINISH_APM( pcatalog::entity() ) }, "./paths/catalog.yaml#/identifier"sv ); server.addHandler( "GET"sv, "/catalog/customer/code/{code}"sv, []( const RoutingRequest& req, auto args ) { CREATE_APM( Action::retrieve ) WRAP_CODE_LINE( auto resp = retrieveAll<model::Catalog>( req.req, args["code"sv], methods, apm ); ) FINISH_APM( pcatalog::entity() ) }, "./paths/catalog.yaml#/customer"sv ); server.addHandler( "GET"sv, "/catalog/count/references/{id}"sv, []( const RoutingRequest& req, auto&& args ) { CREATE_APM( Action::retrieve ) WRAP_CODE_LINE( auto resp = refcounts<model::Catalog>( req.req, args["id"sv], methods, apm, &userRole ); ) FINISH_APM( pcatalog::entity() ) }, "./paths/catalog.yaml#/refcount"sv ); server.addHandler( "GET"sv, "/catalog/history/summary/{id}"sv, []( const RoutingRequest& req, auto&& args ) { CREATE_APM( Action::retrieve ) WRAP_CODE_LINE( auto resp = versionHistorySummary<model::Catalog>( req.req, args["id"sv], methods, apm, &superuserRole ); ) FINISH_APM( pcatalog::entity() ) }, "./paths/catalog.yaml#/historySummary"sv ); server.addHandler( "GET"sv, "/catalog/history/document/{id}"sv, []( const RoutingRequest& req, auto&& args ) { CREATE_APM( Action::retrieve ) WRAP_CODE_LINE( auto resp = versionHistoryDocument<model::Catalog>( req.req, args["id"sv], methods, apm, &superuserRole ); ) FINISH_APM( pcatalog::entity() ) }, "./paths/catalog.yaml#/historyDocument"sv ); server.addHandler( "GET"sv, "/catalog/{type}/between/{start}/{end}"sv, []( const RoutingRequest& req, auto&& args ) { CREATE_APM( Action::retrieve ) WRAP_CODE_LINE( auto resp = between<model::Catalog>( req, std::forward<decltype(args)>( args ), methods, apm, &userRole ); ) FINISH_APM( pcatalog::entity() ) }, "./paths/catalog.yaml#/created"sv ); server.addHandler( "POST"sv, "/catalog/"sv, []( const RoutingRequest& req, auto&& ) { CREATE_APM( Action::create ) WRAP_CODE_LINE( auto resp = create<model::Catalog>( req.req, req.body, methods, apm, &adminRole ); ) FINISH_APM( pcatalog::entity() ) }, "./paths/catalog.yaml#/root"sv ); server.addHandler( "PUT"sv, "/catalog/id/{id}"sv, []( const RoutingRequest& req, auto&& args ) { CREATE_APM( Action::update ) WRAP_CODE_LINE( auto resp = update<model::Catalog>( req.req, req.body, args["id"sv], methods, apm, &adminRole ); ) FINISH_APM( pcatalog::entity() ) }, "./paths/catalog.yaml#/update"sv ); server.addHandler( "DELETE"sv, "/catalog/id/{id}"sv, []( const RoutingRequest& req, auto args ) { CREATE_APM( Action::remove ) WRAP_CODE_LINE( auto resp = remove<model::Catalog>( req.req, args["id"sv], methods, apm, &adminRole ); ) FINISH_APM( pcatalog::entity() ) }, "./paths/catalog.yaml#/id"sv ); }

A custom HTTP response structure that is used by the application. The APM record is saved to MongoDB via the mongo-service proxy.

// // Created by Rakesh on 6/4/21. // #include "db/repository.hpp" #include "http/response.hpp" #include "util/config.hpp" #include <charconv> #include <boost/json/parse.hpp> #include <fmt/ranges.h> #include <http2/framework/common.hpp> #include <log/NanoLog.hpp> #include <mongo-service/api/api.hpp> #include <range/v3/algorithm/find.hpp> using spt::http::Response; using std::operator""s; using std::operator""sv; namespace { namespace presponse { std::vector<std::string> origins() { const auto def = []() { return std::vector{ "http://local.sptci.com:3000"s, "https://sso-dev.sptci.com"s, "https://sso.sptci.com"s, "https://dashboard-dev.sptci.com"s, "https://admin-dev.sptci.com"s, "https://dashboard.sptci.com"s, "https://admin.sptci.com"s, "https://www.sptci.com"s, "https://sptci.com"s }; }; if ( auto cfg = spt::util::Configuration::instance().get( "/service/cors/origins"sv ); cfg ) { auto ec = boost::system::error_code{}; auto p = boost::json::parse( *cfg, ec ); if ( ec ) { LOG_CRIT << "Error parsing configured origins " << *cfg; return def(); } auto resp = std::vector<std::string>{}; resp.reserve( 8 ); for ( const auto& item : p.as_array() ) resp.emplace_back( item.as_string() ); LOG_INFO << "Configured origins " << fmt::format( "{}", resp ); return resp; } LOG_WARN << "Configured origins not found. Using defaults..."; return def(); } template<class... Ts> struct overload : Ts... { using Ts::operator()...; }; void serialise( const std::map<std::string, spt::ilp::APMRecord::Value, std::less<>>& map, bsoncxx::builder::stream::document& builder, std::string_view property ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::finalize; auto values = document{}; for ( const auto& pair : map ) { std::visit( overload { [&values, &pair]( bool v ){ values << pair.first << v; }, [&values, &pair]( int64_t v ){ values << pair.first << v; }, [&values, &pair]( uint64_t v ){ values << pair.first << static_cast<int64_t>( v ); }, [&values, &pair]( double v ){ values << pair.first << v; }, [&values, &pair]( const std::string& v ){ values << pair.first << v; }, }, pair.second ); } builder << property << (values << finalize); } bsoncxx::document::value serialise( const spt::ilp::APMRecord::Process& process ) { using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::open_document; using bsoncxx::builder::stream::close_document; using bsoncxx::builder::stream::finalize; auto doc = document{}; doc << "type" << magic_enum::enum_name( process.type ) << "timestamp" << bsoncxx::types::b_int64{ std::chrono::duration_cast<std::chrono::nanoseconds>( process.timestamp.time_since_epoch() ).count() } << "duration" << bsoncxx::types::b_int64{ process.duration.count() }; auto tags = document{}; for ( const auto& [key, value] : process.tags ) tags << key << value; doc << "tags" << (tags << finalize); serialise( process.values, doc, "values" ); return doc << finalize; } bsoncxx::document::value serialise( const spt::ilp::APMRecord& apm ) { using bsoncxx::builder::stream::array; using bsoncxx::builder::stream::document; using bsoncxx::builder::stream::finalize; auto doc = document{}; doc << "_id" << bsoncxx::oid{ apm.id } << "application" << apm.application << "timestamp" << bsoncxx::types::b_int64{ std::chrono::duration_cast<std::chrono::nanoseconds>( apm.timestamp.time_since_epoch() ).count() } << "duration" << bsoncxx::types::b_int64{ apm.duration.count() }; auto tags = document{}; for ( const auto& [key, value] : apm.tags ) tags << key << value; doc << "tags" << (tags << finalize); serialise( apm.values, doc, "values" ); auto arr = array{}; for ( const auto& proc : apm.processes ) arr << serialise( proc ); doc << "processes" << (arr << finalize); return doc << finalize; } } } const std::vector<std::string>& Response::origins() { static const auto vec = presponse::origins(); return vec; } Response::Response( const nghttp2::asio_http2::header_map& headers ) { auto iter = headers.find( "origin"s ); if ( iter == std::cend( headers ) ) iter = headers.find( "Origin"s ); if ( iter == std::cend( headers ) ) return; origin = iter->second.value; } void Response::set( std::span<const std::string> methods, std::span<const std::string> origins ) { headers = nghttp2::asio_http2::header_map{ { "Access-Control-Allow-Methods", { fmt::format( "{:n:}", methods ), false } }, { "Access-Control-Allow-Headers", { "*, authorization", false } }, { "content-type", { "application/json; charset=utf-8", false } }, { "content-length", { std::to_string( body.size() ), false } } }; if ( compressed ) { headers.emplace( "content-encoding", nghttp2::asio_http2::header_value{ "gzip", false } ); } if ( origin.empty() ) return; if ( const auto iter = ranges::find( origins, origin ); iter != ranges::end( origins ) ) { headers.emplace( "Access-Control-Allow-Origin", nghttp2::asio_http2::header_value{ origin, false } ); headers.emplace( "Vary", nghttp2::asio_http2::header_value{ "Origin", false } ); } else LOG_WARN << "Origin " << origin << " not allowed"; } template <> void spt::http2::framework::extraProcess( const Request& req, spt::http::Response& response, boost::asio::thread_pool& pool ) { if ( !response.apm ) return; int32_t size{ 0 }; if ( response.filePath.empty() ) { size = static_cast<int32_t>( response.body.size() ); } else { if ( const auto iter = response.headers.find( "content-length"s ); iter != std::cend( response.headers ) ) { std::from_chars( iter->second.value.data(), iter->second.value.data() + iter->second.value.size(), size ); } } response.apm->tags.try_emplace( "request_method", req.method ); response.apm->tags.try_emplace( "entity", response.entity ); response.apm->tags.try_emplace( "host", spt::util::hostname() ); response.apm->tags.try_emplace( "response_status", std::to_string( response.status ) ); response.apm->tags.try_emplace( "response_compressed", std::format( "{}", response.compressed ) ); response.apm->values.try_emplace( "request_path", req.path ); response.apm->values.try_emplace( "ipaddress", ipaddress( req ) ); response.apm->values.try_emplace( "response_size", static_cast<int64_t>( size ) ); if ( response.jwt ) { response.apm->values.try_emplace( "username", response.jwt->user.username ); response.apm->tags.try_emplace( "customer", response.jwt->user.customerCode ); response.apm->tags.try_emplace( "role", std::string{ magic_enum::enum_name( response.jwt->user.role ) } ); } spt::ilp::setDuration( *response.apm ); boost::asio::post( pool, [apm=std::move(response.apm.value())] { auto request = mongoservice::api::Request::create( spt::model::database( spt::model::EntityType::SearchDocument ), "webapm", presponse::serialise( apm ) ); request.correlationId = apm.id; request.skipVersion = true; const auto [_, opt] = mongoservice::api::execute( request ); if ( !opt ) { LOG_WARN << "Unable to save apm " << request.database << ':' << request.collection << ':' << apm.id; return; } const auto view = opt->view(); if ( const auto err = util::bsonValueIfExists<std::string>( "error", view ); err ) { LOG_WARN << "Unable to save apm " << request.database << ':' << request.collection << ':' << apm.id << ". " << *err; return; } LOG_DEBUG << "Saved apm " << request.database << ':' << request.collection << ':' << apm.id; } ); }

A sample APM record document generated during an integration test run from my local MongoDB instance.

{ "_id": { "$oid": "67e721c2af679fd3e60d1007" }, "application": "inventory-api", "timestamp": { "$numberLong": "390037115100666" }, "duration": { "$numberLong": "5016042" }, "tags": { "action": "retrieve", "customer": "int-test", "entity": "InventoryItemTransaction", "host": "rakesh-mbp.local", "request_method": "POST", "response_compressed": "true", "response_status": "200", "role": "admin" }, "values": { "ipaddress": "127.0.0.1", "request_path": "/inventory/item/transaction/", "response_size": { "$numberLong": "235" }, "username": "test" }, "processes": [ { "type": "Function", "timestamp": { "$numberLong": "390037115100791" }, "duration": { "$numberLong": "5000917" }, "values": { "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/http/impl/inventoryitemtransaction.cpp", "function": "auto wirepulse::http::addInventoryItemTransactionRoutes(spt::http2::framework::Server<Response> &)::(anonymous class)::operator()(const RoutingRequest &, auto &&) const [auto:1 = boost::container::flat_map<std::string_view, std::string_view>]", "line": { "$numberLong": "85" } } }, { "type": "Function", "timestamp": { "$numberLong": "390037115104416" }, "duration": { "$numberLong": "4992375" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/http/impl/inventoryitemtransaction.cpp", "caller_function": "auto wirepulse::http::addInventoryItemTransactionRoutes(spt::http2::framework::Server<Response> &)::(anonymous class)::operator()(const RoutingRequest &, auto &&) const [auto:1 = boost::container::flat_map<std::string_view, std::string_view>]", "caller_line": { "$numberLong": "87" }, "entity_id": "67e721c2af679fd3e60d1008", "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/http/template.hpp", "function": "Response wirepulse::http::create(const spt::http2::framework::Request &, std::string_view, std::span<const std::string>, spt::ilp::APMRecord &, AuthFunction &&, bool) [M = wirepulse::model::InventoryItemTransaction, AuthFunction = bool (*)(std::string_view, const wirepulse::model::JwtToken &)]", "line": { "$numberLong": "53" }, "note": "Create entity InventoryItemTransaction" } }, { "type": "Function", "timestamp": { "$numberLong": "390037115110041" }, "duration": { "$numberLong": "625750" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/http/template.hpp", "caller_function": "Response wirepulse::http::create(const spt::http2::framework::Request &, std::string_view, std::span<const std::string>, spt::ilp::APMRecord &, AuthFunction &&, bool) [M = wirepulse::model::InventoryItemTransaction, AuthFunction = bool (*)(std::string_view, const wirepulse::model::JwtToken &)]", "caller_line": { "$numberLong": "67" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/http/impl/common.cpp", "function": "AuthResponse wirepulse::http::authorise(const spt::http2::framework::Request &, spt::ilp::APMRecord &)", "line": { "$numberLong": "87" } } }, { "type": "Function", "timestamp": { "$numberLong": "390037115119291" }, "duration": { "$numberLong": "611750" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/http/impl/common.cpp", "caller_function": "AuthResponse wirepulse::http::authorise(const spt::http2::framework::Request &, spt::ilp::APMRecord &)", "caller_line": { "$numberLong": "107" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/storage.cpp", "function": "model::JwtToken::Ptr wirepulse::db::validateToken(std::string_view, spt::ilp::APMRecord &)", "line": { "$numberLong": "65" } } }, { "type": "Function", "timestamp": { "$numberLong": "390037115120833" }, "duration": { "$numberLong": "605833" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/storage.cpp", "caller_function": "model::JwtToken::Ptr wirepulse::db::validateToken(std::string_view, spt::ilp::APMRecord &)", "caller_line": { "$numberLong": "72" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/impl/tokenrepository.cpp", "function": "model::JwtToken::Ptr wirepulse::db::impl::token(std::string_view, spt::ilp::APMRecord &)", "line": { "$numberLong": "159" }, "note": "Retrieve token" } }, { "type": "Function", "timestamp": { "$numberLong": "390037115149166" }, "duration": { "$numberLong": "572167" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/impl/tokenrepository.cpp", "caller_function": "model::JwtToken::Ptr wirepulse::db::impl::token(std::string_view, spt::ilp::APMRecord &)", "caller_line": { "$numberLong": "171" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/impl/tokenrepository.cpp", "function": "std::optional<bsoncxx::oid> (anonymous namespace)::ptoken::jwtToken(const model::JwtToken &, spt::ilp::APMRecord &)", "line": { "$numberLong": "101" }, "note": "Retrieve JWT token" } }, { "type": "Step", "timestamp": { "$numberLong": "390037115152125" }, "duration": { "$numberLong": "567708" }, "tags": { "process": "Retrieve data" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/impl/tokenrepository.cpp", "caller_function": "std::optional<bsoncxx::oid> (anonymous namespace)::ptoken::jwtToken(const model::JwtToken &, spt::ilp::APMRecord &)", "caller_line": { "$numberLong": "114" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/impl/tokenrepository.cpp", "function": "std::optional<bsoncxx::oid> (anonymous namespace)::ptoken::jwtToken(const model::JwtToken &, spt::ilp::APMRecord &)", "line": { "$numberLong": "113" } } }, { "type": "Step", "timestamp": { "$numberLong": "390037115741791" }, "duration": { "$numberLong": "38334" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/http/template.hpp", "caller_function": "Response wirepulse::http::create(const spt::http2::framework::Request &, std::string_view, std::span<const std::string>, spt::ilp::APMRecord &, AuthFunction &&, bool) [M = wirepulse::model::InventoryItemTransaction, AuthFunction = bool (*)(std::string_view, const wirepulse::model::JwtToken &)]", "caller_line": { "$numberLong": "82" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/http/template.hpp", "function": "Response wirepulse::http::create(const spt::http2::framework::Request &, std::string_view, std::span<const std::string>, spt::ilp::APMRecord &, AuthFunction &&, bool) [M = wirepulse::model::InventoryItemTransaction, AuthFunction = bool (*)(std::string_view, const wirepulse::model::JwtToken &)]", "line": { "$numberLong": "80" }, "note": "Parse JSON payload" } }, { "type": "Function", "timestamp": { "$numberLong": "390037115780625" }, "duration": { "$numberLong": "3045958" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/http/template.hpp", "caller_function": "Response wirepulse::http::create(const spt::http2::framework::Request &, std::string_view, std::span<const std::string>, spt::ilp::APMRecord &, AuthFunction &&, bool) [M = wirepulse::model::InventoryItemTransaction, AuthFunction = bool (*)(std::string_view, const wirepulse::model::JwtToken &)]", "caller_line": { "$numberLong": "94" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/inventoryitemtransaction.cpp", "function": "Tuple wirepulse::validate::validate(model::InventoryItemTransaction &, const model::JwtToken &, spt::ilp::APMRecord &)", "line": { "$numberLong": "19" }, "note": "validate inventory item transaction" } }, { "type": "Function", "timestamp": { "$numberLong": "390037115784416" }, "duration": { "$numberLong": "2110917" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/inventoryitemtransaction.cpp", "caller_function": "Tuple wirepulse::validate::validate(model::InventoryItemTransaction &, const model::JwtToken &, spt::ilp::APMRecord &)", "caller_line": { "$numberLong": "45" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/impl/validate.hpp", "function": "std::expected<std::optional<M>, Tuple> wirepulse::validate::impl::validate(M &, const model::JwtToken &, spt::ilp::APMRecord &, bool) [M = wirepulse::model::InventoryItemTransaction]", "line": { "$numberLong": "44" }, "note": "validate entity" } }, { "type": "Function", "timestamp": { "$numberLong": "390037115787916" }, "duration": { "$numberLong": "487250" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/impl/validate.hpp", "caller_function": "std::expected<std::optional<M>, Tuple> wirepulse::validate::impl::validate(M &, const model::JwtToken &, spt::ilp::APMRecord &, bool) [M = wirepulse::model::InventoryItemTransaction]", "caller_line": { "$numberLong": "48" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/impl/user.cpp", "function": "std::expected<wirepulse::model::User, std::tuple<int16_t, std::string>> wirepulse::validate::impl::user(const model::JwtToken &, spt::ilp::APMRecord &)", "line": { "$numberLong": "16" }, "note": "validate user" } }, { "type": "Function", "timestamp": { "$numberLong": "390037115791625" }, "duration": { "$numberLong": "482625" }, "values": { "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(std::string_view, ValueType, std::string_view, spt::ilp::APMRecord &, bool) [M = wirepulse::model::User, ValueType = std::string]", "line": { "$numberLong": "544" } } }, { "type": "Step", "timestamp": { "$numberLong": "390037115793541" }, "duration": { "$numberLong": "477375" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "caller_function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(std::string_view, ValueType, std::string_view, spt::ilp::APMRecord &, bool) [M = wirepulse::model::User, ValueType = std::string]", "caller_line": { "$numberLong": "570" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(std::string_view, ValueType, std::string_view, spt::ilp::APMRecord &, bool) [M = wirepulse::model::User, ValueType = std::string]", "line": { "$numberLong": "568" }, "process": "retrieve data" } }, { "type": "Function", "timestamp": { "$numberLong": "390037116281666" }, "duration": { "$numberLong": "652375" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/impl/validate.hpp", "caller_function": "std::expected<std::optional<M>, Tuple> wirepulse::validate::impl::validate(M &, const model::JwtToken &, spt::ilp::APMRecord &, bool) [M = wirepulse::model::InventoryItemTransaction]", "caller_line": { "$numberLong": "58" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/impl/customer.hpp", "function": "std::expected<model::Customer, std::tuple<int16_t, std::string>> wirepulse::validate::impl::customer(Model &, const model::JwtToken &, spt::ilp::APMRecord &) [Model = wirepulse::model::InventoryItemTransaction]", "line": { "$numberLong": "29" }, "note": "validate customer" } }, { "type": "Function", "timestamp": { "$numberLong": "390037116285166" }, "duration": { "$numberLong": "630334" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/impl/customer.hpp", "caller_function": "std::expected<model::Customer, std::tuple<int16_t, std::string>> wirepulse::validate::impl::customer(Model &, const model::JwtToken &, spt::ilp::APMRecord &) [Model = wirepulse::model::InventoryItemTransaction]", "caller_line": { "$numberLong": "33" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(std::string_view, ValueType, std::string_view, spt::ilp::APMRecord &, bool) [M = wirepulse::model::Customer, ValueType = std::string]", "line": { "$numberLong": "544" } } }, { "type": "Step", "timestamp": { "$numberLong": "390037116287500" }, "duration": { "$numberLong": "625416" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "caller_function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(std::string_view, ValueType, std::string_view, spt::ilp::APMRecord &, bool) [M = wirepulse::model::Customer, ValueType = std::string]", "caller_line": { "$numberLong": "570" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(std::string_view, ValueType, std::string_view, spt::ilp::APMRecord &, bool) [M = wirepulse::model::Customer, ValueType = std::string]", "line": { "$numberLong": "568" }, "process": "retrieve data" } }, { "type": "Function", "timestamp": { "$numberLong": "390037116938833" }, "duration": { "$numberLong": "949875" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/impl/validate.hpp", "caller_function": "std::expected<std::optional<M>, Tuple> wirepulse::validate::impl::validate(M &, const model::JwtToken &, spt::ilp::APMRecord &, bool) [M = wirepulse::model::InventoryItemTransaction]", "caller_line": { "$numberLong": "69" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/impl/facility.hpp", "function": "std::expected<std::optional<model::Facility>, std::tuple<int16_t, std::string>> wirepulse::validate::impl::facility(Model &, const model::JwtToken &, spt::ilp::APMRecord &) [Model = wirepulse::model::InventoryItemTransaction]", "line": { "$numberLong": "29" }, "note": "validate facility" } }, { "type": "Function", "timestamp": { "$numberLong": "390037116941750" }, "duration": { "$numberLong": "632833" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/impl/facility.hpp", "caller_function": "std::expected<std::optional<model::Facility>, std::tuple<int16_t, std::string>> wirepulse::validate::impl::facility(Model &, const model::JwtToken &, spt::ilp::APMRecord &) [Model = wirepulse::model::InventoryItemTransaction]", "caller_line": { "$numberLong": "35" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/impl/customer.hpp", "function": "std::expected<model::Customer, std::tuple<int16_t, std::string>> wirepulse::validate::impl::customer(Model &, const model::JwtToken &, spt::ilp::APMRecord &) [Model = wirepulse::model::InventoryItemTransaction]", "line": { "$numberLong": "29" }, "note": "validate customer" } }, { "type": "Function", "timestamp": { "$numberLong": "390037116943833" }, "duration": { "$numberLong": "624625" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/impl/customer.hpp", "caller_function": "std::expected<model::Customer, std::tuple<int16_t, std::string>> wirepulse::validate::impl::customer(Model &, const model::JwtToken &, spt::ilp::APMRecord &) [Model = wirepulse::model::InventoryItemTransaction]", "caller_line": { "$numberLong": "33" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(std::string_view, ValueType, std::string_view, spt::ilp::APMRecord &, bool) [M = wirepulse::model::Customer, ValueType = std::string]", "line": { "$numberLong": "544" } } }, { "type": "Step", "timestamp": { "$numberLong": "390037116945416" }, "duration": { "$numberLong": "621667" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "caller_function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(std::string_view, ValueType, std::string_view, spt::ilp::APMRecord &, bool) [M = wirepulse::model::Customer, ValueType = std::string]", "caller_line": { "$numberLong": "570" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(std::string_view, ValueType, std::string_view, spt::ilp::APMRecord &, bool) [M = wirepulse::model::Customer, ValueType = std::string]", "line": { "$numberLong": "568" }, "process": "retrieve data" } }, { "type": "Function", "timestamp": { "$numberLong": "390037117579166" }, "duration": { "$numberLong": "301334" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/impl/facility.hpp", "caller_function": "std::expected<std::optional<model::Facility>, std::tuple<int16_t, std::string>> wirepulse::validate::impl::facility(Model &, const model::JwtToken &, spt::ilp::APMRecord &) [Model = wirepulse::model::InventoryItemTransaction]", "caller_line": { "$numberLong": "45" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(bsoncxx::oid, std::string_view, spt::ilp::APMRecord &) [M = wirepulse::model::Facility]", "line": { "$numberLong": "442" } } }, { "type": "Step", "timestamp": { "$numberLong": "390037117583333" }, "duration": { "$numberLong": "295250" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "caller_function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(bsoncxx::oid, std::string_view, spt::ilp::APMRecord &) [M = wirepulse::model::Facility]", "caller_line": { "$numberLong": "466" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(bsoncxx::oid, std::string_view, spt::ilp::APMRecord &) [M = wirepulse::model::Facility]", "line": { "$numberLong": "464" }, "note": "Return cached value", "process": "retrieve cached value" } }, { "type": "Function", "timestamp": { "$numberLong": "390037117900583" }, "duration": { "$numberLong": "917792" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/validate/inventoryitemtransaction.cpp", "caller_function": "Tuple wirepulse::validate::validate(model::InventoryItemTransaction &, const model::JwtToken &, spt::ilp::APMRecord &)", "caller_line": { "$numberLong": "49" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(bsoncxx::oid, std::string_view, spt::ilp::APMRecord &) [M = wirepulse::model::InventoryItem]", "line": { "$numberLong": "442" } } }, { "type": "Function", "timestamp": { "$numberLong": "390037117923541" }, "duration": { "$numberLong": "888500" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "caller_function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::retrieve(bsoncxx::oid, std::string_view, spt::ilp::APMRecord &) [M = wirepulse::model::InventoryItem]", "caller_line": { "$numberLong": "458" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::pipeline(std::vector<spt::mongoservice::api::model::request::Pipeline::Document::Stage>, spt::ilp::APMRecord &, bool) [M = wirepulse::model::InventoryItem]", "line": { "$numberLong": "247" } } }, { "type": "Step", "timestamp": { "$numberLong": "390037117928958" }, "duration": { "$numberLong": "880375" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "caller_function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::pipeline(std::vector<spt::mongoservice::api::model::request::Pipeline::Document::Stage>, spt::ilp::APMRecord &, bool) [M = wirepulse::model::InventoryItem]", "caller_line": { "$numberLong": "269" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "std::tuple<int16_t, std::optional<M>> wirepulse::db::pipeline(std::vector<spt::mongoservice::api::model::request::Pipeline::Document::Stage>, spt::ilp::APMRecord &, bool) [M = wirepulse::model::InventoryItem]", "line": { "$numberLong": "268" } } }, { "type": "Function", "timestamp": { "$numberLong": "390037118834833" }, "duration": { "$numberLong": "1195750" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/http/template.hpp", "caller_function": "Response wirepulse::http::create(const spt::http2::framework::Request &, std::string_view, std::span<const std::string>, spt::ilp::APMRecord &, AuthFunction &&, bool) [M = wirepulse::model::InventoryItemTransaction, AuthFunction = bool (*)(std::string_view, const wirepulse::model::JwtToken &)]", "caller_line": { "$numberLong": "105" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "uint16_t wirepulse::db::create(const M &, spt::ilp::APMRecord &, bool) [M = wirepulse::model::InventoryItemTransaction]", "line": { "$numberLong": "122" } } }, { "type": "Step", "timestamp": { "$numberLong": "390037118851000" }, "duration": { "$numberLong": "768875" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "caller_function": "uint16_t wirepulse::db::create(const M &, spt::ilp::APMRecord &, bool) [M = wirepulse::model::InventoryItemTransaction]", "caller_line": { "$numberLong": "133" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "uint16_t wirepulse::db::create(const M &, spt::ilp::APMRecord &, bool) [M = wirepulse::model::InventoryItemTransaction]", "line": { "$numberLong": "132" } } }, { "type": "Step", "timestamp": { "$numberLong": "390037119620416" }, "duration": { "$numberLong": "409250" }, "values": { "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/db/repository.hpp", "function": "uint16_t wirepulse::db::create(const M &, spt::ilp::APMRecord &, bool) [M = wirepulse::model::InventoryItemTransaction]", "line": { "$numberLong": "151" }, "note": "Cache model" } }, { "type": "Function", "timestamp": { "$numberLong": "390037120039750" }, "duration": { "$numberLong": "41458" }, "values": { "caller_file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/http/template.hpp", "caller_function": "Response wirepulse::http::create(const spt::http2::framework::Request &, std::string_view, std::span<const std::string>, spt::ilp::APMRecord &, AuthFunction &&, bool) [M = wirepulse::model::InventoryItemTransaction, AuthFunction = bool (*)(std::string_view, const wirepulse::model::JwtToken &)]", "caller_line": { "$numberLong": "115" }, "file": "/Users/rakesh/projects/customer/wirepulse/inventory-api/src/http/common.hpp", "function": "void wirepulse::http::output(const spt::http2::framework::Request &, Response &, const M &, spt::ilp::APMRecord &) [M = wirepulse::model::InventoryItemTransaction]", "line": { "$numberLong": "52" } } } ] }

General Procedure

We follow the following general procedure for capturing APM data.

  • Create an APM Record at the start of a business process cycle. For our REST API handlers, we start by creating a record at the start of the handler function, and finish it (typically setting the duration) at the end of the function. See lines 31, 38 etc. in the Handler tab above.

  • All our interfaces were modified to accept an additional APMRecord parameter. We then pass the APM record all the way through the function call chain.

  • We wrap each function invocation in a WRAP_CODE_LINE macro (see the Handler tab above). This ensures that each invocation results in the caller function being added to the APMRecord::Process record.

  • At the start of each function, we add a APMRecord::Process to the apm record.

  • We call the setDuration function at the end of each function (via a DEFER macro - see defer.hpp).

  • We add notes, errors etc. as appropriate to the process record to further enhance the data in the APM database.

  • For functions that call out to third-party APIs, we add a process of type Step, and capture basic metrics around that invocation.

  • At the end of the business process cycle, we publish the APM data.

  • See Sample tab above for a sample of the APM data collection in our application during a HTTP request process.

Publish Data

The gathered APM data needs to be published to a supported database for requisite analysis. There are a variety of options available for publishing the data. Some of the options I considered were the following:

  • Publish the APM records directly over ILP. This generally requires maintaining a connection pool of ILP clients. Alternatively, we can write the APM records to a concurrent queue, and then consume and publish from the queue in a dedicated thread. Using a queue incurs the potential for unconsumed records being lost during a service shutdown process.

  • Write the individual APM records as simple text files to a directory, and have another process monitor the directory and publish each batch of records to QuestDB over ILP.

    • A small variation of the above. Instead of writing to a local (generally a mounted) directory, to allow another process running on another node to access the files; we store the files into a cloud storage service such as AWS S3.

  • Collect APM records for set periods of time in a single file (hourly for instance), and have the other process read these files. This is purely to improve write efficiency, as the services do not need to create individual files for each APM set. This of course will need the use of mutexes or similar to ensure thread-safety. This model would not work for storing in a cloud blob store.

    • A variation of the above. Instead of writing to a file, write rows to a SQLite database. SQLite supports multiple processes reading the same database file. There are reports that writing rows to SQLite is faster than writing to a file.

  • Write the APM records to an intermediary database or message queue, from which another process can republish the data over ILP. This suffers from a potential issue in that storing to a local file is more fail-proof than writing to an external network service.

    • For my current project, we decided to write to MongoDB. We already had basic HTTP request metrics being collected and stored in MongoDB, before eventually being stored in QuestDB. We decided to maintain the same strategy, while replacing the metric with the more comprehensive APM data. There are a number of reasons for our choice:

      • QuestDB is not set up in a very robust manner. Just a docker container running on a AWS EC2 host alongside other daemon processes. There is no backup strategy. All the data can be recreated from our Atlas database, which is our database of record.

      • All applications/services have a connection pool to mongo-service. There is no need to maintain yet another connection pool to QuestDB, which would also result in needing to increase the number of client connections it needs to support.

      • Allows us to post-process the information before storing in QuestDB. In particular, we use the mmdb service to look up geographical information of client IP Address for analytics.

      • No need for other infrastructure such as AWS EFS to allow processes running on other nodes to access files written to by the services.

      • This strategy does suffer from the following downsides:

        • The risk of losing some APM records is higher, since the possibility of encountering database errors is slightly higher than the possibility of encountering filesystem errors.

        • Writing to an external database will be slower than writing to a local file.

        • We need another process than monitors the collection to which the APM records are being written to, and then publish those to QuestDB. This is not a major downside for us, since we already have a suite of such monitors running for other business purposes.

View Data

Data stored in QuestDB can be viewed by any tool that supports the PostgreSQL wire protocol. We normally use Grafana for monitoring/analytics, and use the same to display the APM data.

All APM data (data associated with the root APMRecord and its children processes) are stored in the same table in QuestDB. The parent record does not have a type value, and can be used as the discriminator to select or filter out the parent record.

Last modified: 30 April 2025