From 5cd02cf026cad26c48828715236c2cc6e78cf21d Mon Sep 17 00:00:00 2001 From: siddh34 Date: Sun, 9 Nov 2025 13:03:01 +0530 Subject: [PATCH 1/2] CortexFlow[#143]: agent api --- core/api/protos/agent.proto | 98 ++++ core/api/src/agent.rs | 406 +++++++++++++++++ core/api/src/api.rs | 428 +++++++++++++----- core/api/src/constants.rs | 2 + core/api/src/helpers.rs | 8 + core/api/src/lib.rs | 5 +- core/api/src/main.rs | 3 + core/api/src/mod.rs | 3 - core/api/src/requests.rs | 57 +++ core/api/src/structs.rs | 48 ++ core/common/src/constants.rs | 2 + core/common/src/formatters.rs | 15 + core/common/src/lib.rs | 1 + core/src/components/metrics/src/helpers.rs | 7 +- core/src/components/metrics/src/structs.rs | 4 + .../metrics_tracer/src/data_structures.rs | 3 + .../src/components/metrics_tracer/src/main.rs | 6 + 17 files changed, 983 insertions(+), 113 deletions(-) create mode 100644 core/api/src/constants.rs create mode 100644 core/api/src/helpers.rs delete mode 100644 core/api/src/mod.rs create mode 100644 core/api/src/structs.rs create mode 100644 core/common/src/formatters.rs diff --git a/core/api/protos/agent.proto b/core/api/protos/agent.proto index 9a0ad4e..69e3176 100644 --- a/core/api/protos/agent.proto +++ b/core/api/protos/agent.proto @@ -3,6 +3,8 @@ package agent; import "google/protobuf/empty.proto"; +// Active connections + message RequestActiveConnections{ optional string pod_ip = 2 ; } @@ -18,6 +20,96 @@ message ActiveConnectionResponse{ repeated ConnectionEvent events = 2; // List of connection events } +// Network metrics + + + +// Latency metrics request and response messages + +message LatencyMetricsRequest { + optional uint32 tgid = 1; // Filter by thread group ID + optional string process_name = 2; // Filter by process name + optional uint64 start_time = 3; // Start timestamp (microseconds) + optional uint64 end_time = 4; // End timestamp (microseconds) +} + +message LatencyMetric { + uint64 delta_us = 1; // Latency in microseconds + uint64 timestamp_us = 2; // Event timestamp + uint32 tgid = 3; // Thread group ID + string process_name = 4; // Process name (comm) + uint32 local_port = 5; // Local port + uint32 remote_port = 6; // Remote port (big-endian) + uint32 address_family = 7; // "IPv4" or "IPv6" + string src_address_v4 = 8; // Source IP address + string dst_address_v4 = 9; // Destination IP address + string src_address_v6 = 10; // Source IPv6 address + string dst_address_v6 = 11; // Destination IPv6 address +} + +message LatencyMetricsResponse { + string status = 1; + repeated LatencyMetric metrics = 2; + uint32 total_count = 3; + double average_latency_us = 4; // Average latency + double min_latency_us = 5; // Minimum latency + double max_latency_us = 6; // Maximum latency +} + +// Packet Loss Metrics + +message PacketLossMetricsRequest { + optional uint32 tgid = 1; // Filter by thread group ID + optional uint64 start_time = 2; // Start timestamp + optional uint64 end_time = 3; // End timestamp +} + +message PacketLossMetric { + uint32 tgid = 1; // Thread group ID + string process_name = 2; // Process name + uint64 timestamp_us = 3; // Event timestamp + uint32 total_packets_lost = 4; // Total packets lost + uint32 total_packets_transmitted = 5; // Total packets transmitted + double packet_loss_percentage = 6; // % of total packet loss + uint64 total_data_loss_bytes = 7; // Total size of data loss + uint64 total_data_transmitted_bytes = 8; // Total transmitted data + double data_loss_ratio = 9; // Ratio between loss and transmitted +} + +message PacketLossMetricsResponse { + string status = 1; + repeated PacketLossMetric metrics = 2; + uint32 total_connections = 3; +} + +// Dropped TCP Packets + +message DroppedPacketsRequest { + optional uint32 tgid = 1; // Filter by thread group ID + optional uint64 start_time = 2; // Start timestamp + optional uint64 end_time = 3; // End timestamp +} + +message DroppedPacketMetric { + uint32 tgid = 1; // Thread group ID + string process_name = 2; // Process name + int32 sk_drops = 3; // Socket drops (from sk_drops field) + int32 sk_err = 4; // Socket errors + int32 sk_err_soft = 5; // Soft errors + uint32 sk_backlog_len = 6; // Backlog length (congestion indicator) + int32 sk_wmem_queued = 7; // Write memory queued + int32 sk_rcvbuf = 8; // Receive buffer size + uint32 sk_ack_backlog = 9; // ACK backlog + uint64 timestamp_us = 10; // Event timestamp +} + +message DroppedPacketsResponse { + string status = 1; + repeated DroppedPacketMetric metrics = 2; + uint32 total_drops = 3; // Total drops across all connections +} + + //declare agent api service Agent{ @@ -31,6 +123,12 @@ service Agent{ // remove ip from blocklist endpoint rpc RmIpFromBlocklist(RmIpFromBlocklistRequest) returns (RmIpFromBlocklistResponse); + // metrics data + rpc GetLatencyMetrics(LatencyMetricsRequest) returns (LatencyMetricsResponse); + + rpc GetPacketLossMetrics(PacketLossMetricsRequest) returns (PacketLossMetricsResponse); + + rpc GetDroppedPacketsMetrics(DroppedPacketsRequest) returns (DroppedPacketsResponse); } message AddIpToBlocklistRequest{ diff --git a/core/api/src/agent.rs b/core/api/src/agent.rs index c7045f9..933d066 100644 --- a/core/api/src/agent.rs +++ b/core/api/src/agent.rs @@ -24,6 +24,181 @@ pub struct ActiveConnectionResponse { pub events: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct LatencyMetricsRequest { + /// Filter by thread group ID + #[prost(uint32, optional, tag = "1")] + pub tgid: ::core::option::Option, + /// Filter by process name + #[prost(string, optional, tag = "2")] + pub process_name: ::core::option::Option<::prost::alloc::string::String>, + /// Start timestamp (microseconds) + #[prost(uint64, optional, tag = "3")] + pub start_time: ::core::option::Option, + /// End timestamp (microseconds) + #[prost(uint64, optional, tag = "4")] + pub end_time: ::core::option::Option, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct LatencyMetric { + /// Latency in microseconds + #[prost(uint64, tag = "1")] + pub delta_us: u64, + /// Event timestamp + #[prost(uint64, tag = "2")] + pub timestamp_us: u64, + /// Thread group ID + #[prost(uint32, tag = "3")] + pub tgid: u32, + /// Process name (comm) + #[prost(string, tag = "4")] + pub process_name: ::prost::alloc::string::String, + /// Local port + #[prost(uint32, tag = "5")] + pub local_port: u32, + /// Remote port (big-endian) + #[prost(uint32, tag = "6")] + pub remote_port: u32, + /// "IPv4" or "IPv6" + #[prost(uint32, tag = "7")] + pub address_family: u32, + /// Source IP address + #[prost(string, tag = "8")] + pub src_address_v4: ::prost::alloc::string::String, + /// Destination IP address + #[prost(string, tag = "9")] + pub dst_address_v4: ::prost::alloc::string::String, + /// Source IPv6 address + #[prost(string, tag = "10")] + pub src_address_v6: ::prost::alloc::string::String, + /// Destination IPv6 address + #[prost(string, tag = "11")] + pub dst_address_v6: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LatencyMetricsResponse { + #[prost(string, tag = "1")] + pub status: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "2")] + pub metrics: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "3")] + pub total_count: u32, + /// Average latency + #[prost(double, tag = "4")] + pub average_latency_us: f64, + /// Minimum latency + #[prost(double, tag = "5")] + pub min_latency_us: f64, + /// Maximum latency + #[prost(double, tag = "6")] + pub max_latency_us: f64, +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct PacketLossMetricsRequest { + /// Filter by thread group ID + #[prost(uint32, optional, tag = "1")] + pub tgid: ::core::option::Option, + /// Start timestamp + #[prost(uint64, optional, tag = "2")] + pub start_time: ::core::option::Option, + /// End timestamp + #[prost(uint64, optional, tag = "3")] + pub end_time: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PacketLossMetric { + /// Thread group ID + #[prost(uint32, tag = "1")] + pub tgid: u32, + /// Process name + #[prost(string, tag = "2")] + pub process_name: ::prost::alloc::string::String, + /// Event timestamp + #[prost(uint64, tag = "3")] + pub timestamp_us: u64, + /// Total packets lost + #[prost(uint32, tag = "4")] + pub total_packets_lost: u32, + /// Total packets transmitted + #[prost(uint32, tag = "5")] + pub total_packets_transmitted: u32, + /// % of total packet loss + #[prost(double, tag = "6")] + pub packet_loss_percentage: f64, + /// Total size of data loss + #[prost(uint64, tag = "7")] + pub total_data_loss_bytes: u64, + /// Total transmitted data + #[prost(uint64, tag = "8")] + pub total_data_transmitted_bytes: u64, + /// Ratio between loss and transmitted + #[prost(double, tag = "9")] + pub data_loss_ratio: f64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PacketLossMetricsResponse { + #[prost(string, tag = "1")] + pub status: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "2")] + pub metrics: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "3")] + pub total_connections: u32, +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct DroppedPacketsRequest { + /// Filter by thread group ID + #[prost(uint32, optional, tag = "1")] + pub tgid: ::core::option::Option, + /// Start timestamp + #[prost(uint64, optional, tag = "2")] + pub start_time: ::core::option::Option, + /// End timestamp + #[prost(uint64, optional, tag = "3")] + pub end_time: ::core::option::Option, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct DroppedPacketMetric { + /// Thread group ID + #[prost(uint32, tag = "1")] + pub tgid: u32, + /// Process name + #[prost(string, tag = "2")] + pub process_name: ::prost::alloc::string::String, + /// Socket drops (from sk_drops field) + #[prost(int32, tag = "3")] + pub sk_drops: i32, + /// Socket errors + #[prost(int32, tag = "4")] + pub sk_err: i32, + /// Soft errors + #[prost(int32, tag = "5")] + pub sk_err_soft: i32, + /// Backlog length (congestion indicator) + #[prost(uint32, tag = "6")] + pub sk_backlog_len: u32, + /// Write memory queued + #[prost(int32, tag = "7")] + pub sk_wmem_queued: i32, + /// Receive buffer size + #[prost(int32, tag = "8")] + pub sk_rcvbuf: i32, + /// ACK backlog + #[prost(uint32, tag = "9")] + pub sk_ack_backlog: u32, + /// Event timestamp + #[prost(uint64, tag = "10")] + pub timestamp_us: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DroppedPacketsResponse { + #[prost(string, tag = "1")] + pub status: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "2")] + pub metrics: ::prost::alloc::vec::Vec, + /// Total drops across all connections + #[prost(uint32, tag = "3")] + pub total_drops: u32, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct AddIpToBlocklistRequest { #[prost(string, optional, tag = "1")] pub ip: ::core::option::Option<::prost::alloc::string::String>, @@ -244,6 +419,79 @@ pub mod agent_client { .insert(GrpcMethod::new("agent.Agent", "RmIpFromBlocklist")); self.inner.unary(req, path, codec).await } + /// metrics data + pub async fn get_latency_metrics( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/agent.Agent/GetLatencyMetrics", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("agent.Agent", "GetLatencyMetrics")); + self.inner.unary(req, path, codec).await + } + pub async fn get_packet_loss_metrics( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/agent.Agent/GetPacketLossMetrics", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("agent.Agent", "GetPacketLossMetrics")); + self.inner.unary(req, path, codec).await + } + pub async fn get_dropped_packets_metrics( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/agent.Agent/GetDroppedPacketsMetrics", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("agent.Agent", "GetDroppedPacketsMetrics")); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -290,6 +538,28 @@ pub mod agent_server { tonic::Response, tonic::Status, >; + /// metrics data + async fn get_latency_metrics( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_packet_loss_metrics( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_dropped_packets_metrics( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// declare agent api #[derive(Debug)] @@ -543,6 +813,142 @@ pub mod agent_server { }; Box::pin(fut) } + "/agent.Agent/GetLatencyMetrics" => { + #[allow(non_camel_case_types)] + struct GetLatencyMetricsSvc(pub Arc); + impl< + T: Agent, + > tonic::server::UnaryService + for GetLatencyMetricsSvc { + type Response = super::LatencyMetricsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_latency_metrics(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetLatencyMetricsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/agent.Agent/GetPacketLossMetrics" => { + #[allow(non_camel_case_types)] + struct GetPacketLossMetricsSvc(pub Arc); + impl< + T: Agent, + > tonic::server::UnaryService + for GetPacketLossMetricsSvc { + type Response = super::PacketLossMetricsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_packet_loss_metrics(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetPacketLossMetricsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/agent.Agent/GetDroppedPacketsMetrics" => { + #[allow(non_camel_case_types)] + struct GetDroppedPacketsMetricsSvc(pub Arc); + impl< + T: Agent, + > tonic::server::UnaryService + for GetDroppedPacketsMetricsSvc { + type Response = super::DroppedPacketsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_dropped_packets_metrics(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetDroppedPacketsMetricsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { let mut response = http::Response::new( diff --git a/core/api/src/api.rs b/core/api/src/api.rs index 51ff64d..9aa7db8 100644 --- a/core/api/src/api.rs +++ b/core/api/src/api.rs @@ -1,66 +1,63 @@ #![allow(warnings)] use anyhow::Context; use chrono::Local; +use cortexbrain_common::formatters::{format_ipv4, format_ipv6}; use prost::bytes::BytesMut; use std::str::FromStr; -use std::{ sync::Mutex }; -use tonic::{ Request, Response, Status }; +use std::sync::Mutex; +use tonic::{Request, Response, Status}; use tracing::info; -use aya::{ maps::{ MapData, PerfEventArray }, util::online_cpus }; +use aya::{ + maps::{MapData, PerfEventArray}, + util::online_cpus, +}; use std::result::Result::Ok; use tonic::async_trait; -use std::collections::HashMap; use aya::maps::HashMap as ayaHashMap; +use std::collections::HashMap; use tokio::sync::mpsc; use tokio::task; -use crate::agent::ConnectionEvent; +use crate::agent::{ConnectionEvent, DroppedPacketMetric, DroppedPacketsRequest, DroppedPacketsResponse, LatencyMetric, LatencyMetricsRequest, LatencyMetricsResponse, PacketLossMetricsRequest, PacketLossMetricsResponse}; + +use crate::structs::{NetworkMetrics, PacketLog, TimeStampMetrics}; + // * contains agent api configuration use crate::agent::{ - agent_server::Agent, - ActiveConnectionResponse, - RequestActiveConnections, - AddIpToBlocklistRequest, - BlocklistResponse, - RmIpFromBlocklistRequest, - RmIpFromBlocklistResponse, + agent_server::Agent, ActiveConnectionResponse, AddIpToBlocklistRequest, BlocklistResponse, + RequestActiveConnections, RmIpFromBlocklistRequest, RmIpFromBlocklistResponse, }; +use crate::constants::PIN_BLOCKLIST_MAP_PATH; + +use crate::helpers::comm_to_string; use aya::maps::Map; -use bytemuck_derive::Zeroable; +use cortexbrain_common::constants::BPF_PATH; use cortexflow_identity::enums::IpProtocols; use std::net::Ipv4Addr; use tracing::warn; -#[repr(C)] -#[derive(Clone, Copy, Zeroable)] -pub struct PacketLog { - pub proto: u8, - pub src_ip: u32, - pub src_port: u16, - pub dst_ip: u32, - pub dst_port: u16, - pub pid: u32, -} -unsafe impl aya::Pod for PacketLog {} - pub struct AgentApi { //* event_rx is an istance of a mpsc receiver. //* is used to receive the data from the transmitter (tx) - event_rx: Mutex, Status>>>, - event_tx: mpsc::Sender, Status>>, + active_connection_event_rx: Mutex, Status>>>, + active_connection_event_tx: mpsc::Sender, Status>>, + latency_metrics_rx: Mutex>, + latency_metrics_tx: mpsc::Sender, + dropped_packet_metrics_rx: Mutex>, + dropped_packet_metrics_tx: mpsc::Sender, } //* Event sender trait. Takes an event from a map and send that to the mpsc channel //* using the send_map function #[async_trait] pub trait EventSender: Send + Sync + 'static { - async fn send_event(&self, event: Vec); - async fn send_map( + async fn send_active_connection_event(&self, event: Vec); + async fn send_active_connection_event_map( &self, map: Vec, - tx: mpsc::Sender, Status>> + tx: mpsc::Sender, Status>>, ) { let status = Status::new(tonic::Code::Ok, "success"); let event = Ok(map); @@ -72,45 +69,64 @@ pub trait EventSender: Send + Sync + 'static { // send event function. takes an HashMap and send that using mpsc event_tx #[async_trait] impl EventSender for AgentApi { - async fn send_event(&self, event: Vec) { - self.send_map(event, self.event_tx.clone()).await; + async fn send_active_connection_event(&self, event: Vec) { + self.send_active_connection_event_map(event, self.active_connection_event_tx.clone()) + .await; } } -const BPF_PATH: &str = "BPF_PATH"; -const PIN_BLOCKLIST_MAP_PATH: &str = "PIN_BLOCKLIST_MAP_PATH"; - //initialize a default trait for AgentApi. Loads a name and a bpf istance. //this trait is essential for init the Agent. impl Default for AgentApi { //TODO:this part needs a better error handling fn default() -> Self { - // load maps mapdata - let mapdata = MapData::from_pin("/sys/fs/bpf/maps/events_map").expect( - "cannot open events_map Mapdata" - ); - let map = Map::PerfEventArray(mapdata); //creates a PerfEventArray from the mapdata + // load connections maps mapdata + let active_connection_mapdata = MapData::from_pin("/sys/fs/bpf/maps/events_map") + .expect("cannot open events_map Mapdata"); + let active_connection_map = Map::PerfEventArray(active_connection_mapdata); //creates a PerfEventArray from the mapdata + + let mut active_connection_events_array = PerfEventArray::try_from(active_connection_map) + .expect("Error while initializing events array"); + + // load network metrics maps mapdata + let network_metrics_mapdata = MapData::from_pin("/sys/fs/bpf/maps/net_metrics") + .expect("cannot open net_metrics Mapdata"); + let network_metrics_map = Map::PerfEventArray(network_metrics_mapdata); //creates a PerfEventArray from the mapdata + let mut network_metrics_events_array = PerfEventArray::try_from(network_metrics_map) + .expect("Error while initializing network metrics array"); + + // load time stamp events maps mapdata + let time_stamp_events_mapdata = MapData::from_pin("/sys/fs/bpf/maps/time_stamp_events") + .expect("cannot open time_stamp_events Mapdata"); + let time_stamp_events_map = Map::PerfEventArray(time_stamp_events_mapdata); // + let mut time_stamp_events_array = PerfEventArray::try_from(time_stamp_events_map) + .expect("Error while initializing time stamp events array"); //init a mpsc channel - let (tx, rx) = mpsc::channel(1024); + let (conn_tx, conn_rx) = mpsc::channel(1024); + let (lat_tx, lat_rx) = mpsc::channel(2048); + let (drop_tx, drop_rx) = mpsc::channel(2048); let api = AgentApi { - event_rx: rx.into(), - event_tx: tx.clone(), + active_connection_event_rx: conn_rx.into(), + active_connection_event_tx: conn_tx.clone(), + latency_metrics_rx: Mutex::new(lat_rx), + latency_metrics_tx: lat_tx.clone(), + dropped_packet_metrics_rx: Mutex::new(drop_rx), + dropped_packet_metrics_tx: drop_tx.clone(), }; - let mut events_array = PerfEventArray::try_from(map).expect( - "Error while initializing events array" - ); + // For network metrics - //spawn an event reader + //spawn an event readers task::spawn(async move { let mut net_events_buffer = Vec::new(); //scan the cpus to read the data for cpu_id in online_cpus() .map_err(|e| anyhow::anyhow!("Error {:?}", e)) - .unwrap() { - let buf = events_array + .unwrap() + { + let buf = active_connection_events_array .open(cpu_id, None) .expect("Error during the creation of net_events_buf structure"); @@ -128,10 +144,11 @@ impl Default for AgentApi { if events.read > 0 { for i in 0..events.read { let data = &buffers[i]; - if data.len() >= std::mem::size_of::() { - let pl: PacketLog = unsafe { - std::ptr::read(data.as_ptr() as *const _) - }; + if data.len() + >= std::mem::size_of::() + { + let pl: PacketLog = + unsafe { std::ptr::read(data.as_ptr() as *const _) }; let src = Ipv4Addr::from(u32::from_be(pl.src_ip)); let dst = Ipv4Addr::from(u32::from_be(pl.dst_ip)); let src_port = u16::from_be(pl.src_port as u16); @@ -162,13 +179,12 @@ impl Default for AgentApi { dst_ip_port: format!("{}:{}", dst, dst_port), }); info!("sending events to the MPSC channel"); - let _ = tx.send(Ok(evt)).await; + let _ = conn_tx.send(Ok(evt)).await; } Err(_) => { info!( "Event Id: {} Protocol: Unknown ({})", - event_id, - pl.proto + event_id, pl.proto ); } }; @@ -194,6 +210,156 @@ impl Default for AgentApi { } }); + task::spawn(async move { + let mut net_metrics_buffer = Vec::new(); + //scan the cpus to read the data + for cpu_id in online_cpus() + .map_err(|e| anyhow::anyhow!("Error {:?}", e)) + .unwrap() + { + let buf = network_metrics_events_array + .open(cpu_id, None) + .expect("Error during the creation of net_metrics_buf structure"); + + let buffers = vec![BytesMut::with_capacity(4096); 8]; + net_metrics_buffer.push((buf, buffers)); + } + + info!("Starting network metrics listener"); + + //send the data through a mpsc channel + loop { + for (buf, buffers) in net_metrics_buffer.iter_mut() { + match buf.read_events(buffers) { + Ok(events) => { + //read the events, this function is similar to the one used in identity/helpers.rs/display_events + if events.read > 0 { + for i in 0..events.read { + let data = &buffers[i]; + if data.len() + >= std::mem::size_of::() + { + let nm: NetworkMetrics = + unsafe { std::ptr::read(data.as_ptr() as *const _) }; + + let dropped_packet_metrics = DroppedPacketMetric { + tgid: nm.tgid, + process_name: comm_to_string(&nm.comm), + sk_drops: nm.sk_drops, + sk_err: nm.sk_err, + sk_err_soft: nm.sk_err_soft, + sk_backlog_len: nm.sk_backlog_len, + sk_wmem_queued: nm.sk_write_memory_queued, + sk_rcvbuf: nm.sk_receive_buffer_size, + sk_ack_backlog: nm.sk_ack_backlog, + timestamp_us: nm.ts_us, + }; + + if dropped_packet_metrics.sk_drops > 0 { + info!( + "Dropped Packet Metric - tgid: {}, process_name: {}, sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_wmem_queued: {}, sk_rcvbuf: {}, sk_ack_backlog: {}, timestamp_us: {}", + dropped_packet_metrics.tgid, + dropped_packet_metrics.process_name, + dropped_packet_metrics.sk_drops, + dropped_packet_metrics.sk_err, + dropped_packet_metrics.sk_err_soft, + dropped_packet_metrics.sk_backlog_len, + dropped_packet_metrics.sk_wmem_queued, + dropped_packet_metrics.sk_rcvbuf, + dropped_packet_metrics.sk_ack_backlog, + dropped_packet_metrics.timestamp_us + ); + let _ = drop_tx.send(dropped_packet_metrics).await; + } else { + info!( + "No dropped packets for tgid: {}, process_name: {}", + dropped_packet_metrics.tgid, + dropped_packet_metrics.process_name + ); + } + } else { + warn!( + "Received network metrics data too small: {} bytes", + data.len() + ); + } + } + } + } + Err(e) => { + eprintln!("Errore nella lettura network metrics eventi: {}", e); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + } + // small delay to avoid cpu congestion + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + }); + + task::spawn(async move { + let mut ts_events_buffer = Vec::new(); + //scan the cpus to read the data + for cpu_id in online_cpus() + .map_err(|e| anyhow::anyhow!("Error {:?}", e)) + .unwrap() + { + let buf = time_stamp_events_array + .open(cpu_id, None) + .expect("Error during the creation of time stamp events buf structure"); + + let buffers = vec![BytesMut::with_capacity(4096); 8]; + ts_events_buffer.push((buf, buffers)); + } + + info!("Starting time stamp events listener"); + + //send the data through a mpsc channel + loop { + for (buf, buffers) in ts_events_buffer.iter_mut() { + match buf.read_events(buffers) { + Ok(events) => { + //read the events, this function is similar to the one used in identity/helpers.rs/display_events + if events.read > 0 { + for i in 0..events.read { + let data = &buffers[i]; + if data.len() + >= std::mem::size_of::() + { + let tsm: TimeStampMetrics = + unsafe { std::ptr::read(data.as_ptr() as *const _) }; + let latency_metric = LatencyMetric { + delta_us: tsm.delta_us, + timestamp_us: tsm.ts_us, + tgid: tsm.tgid, + process_name: comm_to_string(&tsm.comm), + local_port: tsm.lport as u32, + remote_port: tsm.dport_be as u32, + address_family: tsm.af as u32, + src_address_v4: format_ipv4(tsm.saddr_v4), + dst_address_v4: format_ipv4(tsm.daddr_v4), + src_address_v6: format_ipv6(&tsm.saddr_v6), + dst_address_v6: format_ipv6(&tsm.daddr_v6), + }; + let _ = lat_tx.send(latency_metric).await; + } else { + warn!( + "Received time stamp metrics data too small: {} bytes", + data.len() + ); + } + } + } + } + Err(e) => { + eprintln!("Errore nella lettura time stamp eventi: {}", e); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + } + } + }); + api } } @@ -206,7 +372,7 @@ impl Agent for AgentApi { // * active connections. The data are transformed and sent to the api with a mpsc channel async fn active_connections( &self, - request: Request + request: Request, ) -> Result, Status> { //read request let req = request.into_inner(); @@ -215,40 +381,39 @@ impl Agent for AgentApi { let mut aggregated_events: Vec = Vec::new(); //aggregate events - while let Ok(evt) = self.event_rx.lock().unwrap().try_recv() { + while let Ok(evt) = self.active_connection_event_rx.lock().unwrap().try_recv() { if let Ok(vec) = evt { aggregated_events.extend(vec); } } //log response for debugging - info!("DEBUGGING RESPONSE FROM ACTIVE CONNECTION REQUEST: {:?}", aggregated_events); + info!( + "DEBUGGING RESPONSE FROM ACTIVE CONNECTION REQUEST: {:?}", + aggregated_events + ); //return response - Ok( - Response::new(ActiveConnectionResponse { - status: "success".to_string(), - events: aggregated_events, - }) - ) + Ok(Response::new(ActiveConnectionResponse { + status: "success".to_string(), + events: aggregated_events, + })) } // * creates and add ip to the blocklist async fn add_ip_to_blocklist( &self, - request: Request + request: Request, ) -> Result, Status> { //read request let req = request.into_inner(); //open blocklist map - let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map").expect( - "cannot open blocklist_map Mapdata" - ); + let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map") + .expect("cannot open blocklist_map Mapdata"); let blocklist_mapdata = Map::HashMap(mapdata); //load mapdata - let mut blocklist_map: ayaHashMap = ayaHashMap - ::try_from(blocklist_mapdata) - .unwrap(); + let mut blocklist_map: ayaHashMap = + ayaHashMap::try_from(blocklist_mapdata).unwrap(); if req.ip.is_none() { // log blocklist event @@ -265,8 +430,7 @@ impl Agent for AgentApi { blocklist_map.insert(u8_4_ip, u8_4_ip, 0); info!("CURRENT BLOCKLIST: {:?}", blocklist_map); } - let path = std::env - ::var(PIN_BLOCKLIST_MAP_PATH) + let path = std::env::var(PIN_BLOCKLIST_MAP_PATH) .context("Blocklist map path not found!") .unwrap(); @@ -281,28 +445,24 @@ impl Agent for AgentApi { } //save ip into the blocklist - Ok( - Response::new(BlocklistResponse { - status: "success".to_string(), - events: converted_blocklist_map, - }) - ) + Ok(Response::new(BlocklistResponse { + status: "success".to_string(), + events: converted_blocklist_map, + })) } async fn check_blocklist( &self, - request: Request<()> + request: Request<()>, ) -> Result, Status> { info!("Returning blocklist hashmap"); //open blocklist map - let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map").expect( - "cannot open blocklist_map Mapdata" - ); + let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map") + .expect("cannot open blocklist_map Mapdata"); let blocklist_mapdata = Map::HashMap(mapdata); //load mapdata - let blocklist_map: ayaHashMap = ayaHashMap - ::try_from(blocklist_mapdata) - .unwrap(); + let blocklist_map: ayaHashMap = + ayaHashMap::try_from(blocklist_mapdata).unwrap(); //convert the maps with a buffer to match the protobuffer types @@ -314,28 +474,24 @@ impl Agent for AgentApi { let value = Ipv4Addr::from(k).to_string(); converted_blocklist_map.insert(key, value); } - Ok( - Response::new(BlocklistResponse { - status: "success".to_string(), - events: converted_blocklist_map, - }) - ) + Ok(Response::new(BlocklistResponse { + status: "success".to_string(), + events: converted_blocklist_map, + })) } async fn rm_ip_from_blocklist( &self, - request: Request + request: Request, ) -> Result, Status> { //read request let req = request.into_inner(); info!("Removing ip from blocklist map"); //open blocklist map - let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map").expect( - "cannot open blocklist_map Mapdata" - ); + let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map") + .expect("cannot open blocklist_map Mapdata"); let blocklist_mapdata = Map::HashMap(mapdata); //load mapdata - let mut blocklist_map: ayaHashMap = ayaHashMap - ::try_from(blocklist_mapdata) - .unwrap(); + let mut blocklist_map: ayaHashMap = + ayaHashMap::try_from(blocklist_mapdata).unwrap(); //remove the address let ip_to_remove = req.ip; let u8_4_ip_to_remove = Ipv4Addr::from_str(&ip_to_remove).unwrap().octets(); @@ -351,11 +507,69 @@ impl Agent for AgentApi { converted_blocklist_map.insert(key, value); } - Ok( - Response::new(RmIpFromBlocklistResponse { - status: "Ip removed from blocklist".to_string(), - events: converted_blocklist_map, - }) - ) + Ok(Response::new(RmIpFromBlocklistResponse { + status: "Ip removed from blocklist".to_string(), + events: converted_blocklist_map, + })) } -} + + async fn get_latency_metrics( + &self, + request: Request, + ) -> Result, Status> { + // Extract the request parameters + let req = request.into_inner(); + info!("Getting latency metrics"); + + // Here you would typically query your data source for the latency metrics + // For demonstration purposes, we'll return a dummy response + let response = LatencyMetricsResponse { + status: "success".to_string(), + metrics: vec![], + total_count: 0, + average_latency_us: 0.0, + min_latency_us: 0.0, + max_latency_us: 0.0, + }; + + Ok(Response::new(response)) + } + + async fn get_packet_loss_metrics( + &self, + request: Request, + ) -> Result, Status> { + // Extract the request parameters + let req = request.into_inner(); + info!("Getting packet loss metrics"); + + // Here you would typically query your data source for the packet loss metrics + // For demonstration purposes, we'll return a dummy response + let response = PacketLossMetricsResponse { + status: "success".to_string(), + metrics: vec![], + total_connections: 0, + }; + + Ok(Response::new(response)) + } + + async fn get_dropped_packets_metrics( + &self, + request: Request, + ) -> Result, Status> { + // Extract the request parameters + let req = request.into_inner(); + info!("Getting dropped packets metrics"); + + // Here you would typically query your data source for the dropped packets metrics + // For demonstration purposes, we'll return a dummy response + let response = DroppedPacketsResponse { + status: "success".to_string(), + metrics: vec![], + total_drops: 0, + }; + + Ok(Response::new(response)) + } +} \ No newline at end of file diff --git a/core/api/src/constants.rs b/core/api/src/constants.rs new file mode 100644 index 0000000..b23bb8d --- /dev/null +++ b/core/api/src/constants.rs @@ -0,0 +1,2 @@ +pub const PIN_BLOCKLIST_MAP_PATH: &str = "PIN_BLOCKLIST_MAP_PATH"; +pub const TASK_COMM_LEN: usize = 16; diff --git a/core/api/src/helpers.rs b/core/api/src/helpers.rs new file mode 100644 index 0000000..ea2fc3b --- /dev/null +++ b/core/api/src/helpers.rs @@ -0,0 +1,8 @@ + + +use crate::constants::TASK_COMM_LEN; + +pub fn comm_to_string(comm: &[u8; TASK_COMM_LEN]) -> String { + let end = comm.iter().position(|&c| c == 0).unwrap_or(comm.len()); + String::from_utf8_lossy(&comm[..end]).to_string() +} diff --git a/core/api/src/lib.rs b/core/api/src/lib.rs index 0b13fb6..03ecd68 100644 --- a/core/api/src/lib.rs +++ b/core/api/src/lib.rs @@ -1,4 +1,7 @@ pub mod api; pub mod agent; pub mod client; -pub mod requests; \ No newline at end of file +pub mod requests; +pub mod structs; +pub mod constants; +pub mod helpers; diff --git a/core/api/src/main.rs b/core/api/src/main.rs index 4410458..0843684 100644 --- a/core/api/src/main.rs +++ b/core/api/src/main.rs @@ -4,6 +4,9 @@ use cortexbrain_common::logger; mod agent; mod api; +mod structs; +mod constants; +mod helpers; mod agent_proto { use tonic::include_file_descriptor_set; diff --git a/core/api/src/mod.rs b/core/api/src/mod.rs deleted file mode 100644 index e5b9c8d..0000000 --- a/core/api/src/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod api; -pub mod agent; -pub mod requests; \ No newline at end of file diff --git a/core/api/src/requests.rs b/core/api/src/requests.rs index e9fb2b1..05ffdba 100644 --- a/core/api/src/requests.rs +++ b/core/api/src/requests.rs @@ -15,6 +15,12 @@ use crate::agent::BlocklistResponse; use crate::agent::AddIpToBlocklistRequest; use crate::agent::RmIpFromBlocklistRequest; use crate::agent::RmIpFromBlocklistResponse; +use crate::agent::DroppedPacketsRequest; +use crate::agent::DroppedPacketsResponse; +use crate::agent::LatencyMetricsRequest; +use crate::agent::LatencyMetricsResponse; +use crate::agent::PacketLossMetricsRequest; +use crate::agent::PacketLossMetricsResponse; #[cfg(feature = "client")] pub async fn send_active_connection_request( @@ -68,3 +74,54 @@ pub async fn remove_ip_from_blocklist_request( let response = client.rm_ip_from_blocklist(request).await?; Ok(response) } + +#[cfg(feature = "client")] +pub async fn get_dropped_packets( + mut client: AgentClient, + tgid: Option, + start_time: Option, + end_time: Option, +) -> Result, Error> { + let request = Request::new(DroppedPacketsRequest { + tgid, + start_time, + end_time, + }); + let response = client.get_dropped_packets_metrics(request).await?; + Ok(response) +} + +#[cfg(feature = "client")] +pub async fn get_latency_metrics( + mut client: AgentClient, + tgid: Option, + start_time: Option, + end_time: Option, + process_name: Option, +) -> Result, Error> { + + let request = Request::new(LatencyMetricsRequest { + tgid, + start_time, + end_time, + process_name + }); + let response = client.get_latency_metrics(request).await?; + Ok(response) +} + +#[cfg(feature = "client")] +pub async fn get_packet_loss_metrics( + mut client: AgentClient, + tgid: Option, + start_time: Option, + end_time: Option, +) -> Result, Error> { + let request = Request::new(PacketLossMetricsRequest { + tgid, + start_time, + end_time, + }); + let response = client.get_packet_loss_metrics(request).await?; + Ok(response) +} \ No newline at end of file diff --git a/core/api/src/structs.rs b/core/api/src/structs.rs new file mode 100644 index 0000000..1dc4ec8 --- /dev/null +++ b/core/api/src/structs.rs @@ -0,0 +1,48 @@ +use bytemuck::Zeroable; +use crate::constants::TASK_COMM_LEN; + + +#[repr(C)] +#[derive(Clone, Copy, Zeroable)] +pub struct PacketLog { + pub proto: u8, + pub src_ip: u32, + pub src_port: u16, + pub dst_ip: u32, + pub dst_port: u16, + pub pid: u32, +} +unsafe impl aya::Pod for PacketLog {} + +#[repr(C)] +#[derive(Clone, Copy, Zeroable)] +pub struct NetworkMetrics { + pub tgid: u32, + pub comm: [u8; TASK_COMM_LEN], + pub ts_us: u64, + pub sk_err: i32, + pub sk_err_soft: i32, + pub sk_backlog_len: u32, + pub sk_write_memory_queued: i32, + pub sk_receive_buffer_size: i32, + pub sk_ack_backlog: u32, + pub sk_drops: i32, +} +unsafe impl aya::Pod for NetworkMetrics {} + +#[repr(C)] +#[derive(Clone, Copy, Zeroable)] +pub struct TimeStampMetrics { + pub delta_us: u64, + pub ts_us: u64, + pub tgid: u32, + pub comm: [u8; TASK_COMM_LEN], + pub lport: u16, + pub dport_be: u16, + pub af: u16, + pub saddr_v4: u32, + pub daddr_v4: u32, + pub saddr_v6: [u32; 4], + pub daddr_v6: [u32; 4], +} +unsafe impl aya::Pod for TimeStampMetrics {} \ No newline at end of file diff --git a/core/common/src/constants.rs b/core/common/src/constants.rs index 4cc3835..2776484 100644 --- a/core/common/src/constants.rs +++ b/core/common/src/constants.rs @@ -5,3 +5,5 @@ pub const BPF_PATH: &str = "BPF_PATH"; /// Environment variable name for the BPF map pinning path. /// Used for sharing maps between eBPF programs. pub const PIN_MAP_PATH: &str = "PIN_MAP_PATH"; + + diff --git a/core/common/src/formatters.rs b/core/common/src/formatters.rs new file mode 100644 index 0000000..ebf4306 --- /dev/null +++ b/core/common/src/formatters.rs @@ -0,0 +1,15 @@ +use std::net::Ipv4Addr; + +pub fn format_ipv4(ip: u32) -> String { + Ipv4Addr::from(u32::from_be(ip)).to_string() +} + +pub fn format_ipv6(ip: &[u32; 4]) -> String { + format!( + "{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", + (ip[0] >> 16) & 0xFFFF, ip[0] & 0xFFFF, + (ip[1] >> 16) & 0xFFFF, ip[1] & 0xFFFF, + (ip[2] >> 16) & 0xFFFF, ip[2] & 0xFFFF, + (ip[3] >> 16) & 0xFFFF, ip[3] & 0xFFFF + ) +} diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index c5d4373..f8fadc6 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -1,2 +1,3 @@ pub mod constants; pub mod logger; +pub mod formatters; \ No newline at end of file diff --git a/core/src/components/metrics/src/helpers.rs b/core/src/components/metrics/src/helpers.rs index 8fa552d..1b4628e 100644 --- a/core/src/components/metrics/src/helpers.rs +++ b/core/src/components/metrics/src/helpers.rs @@ -34,6 +34,9 @@ pub async fn display_metrics_map( if data.len() >= std::mem::size_of::() { let net_metrics: NetworkMetrics = unsafe { std::ptr::read_unaligned(data.as_ptr() as *const _) }; + let tgid = net_metrics.tgid; + let comm = String::from_utf8_lossy(&net_metrics.comm); + let ts_us = net_metrics.ts_us; let sk_drop_count = net_metrics.sk_drops; let sk_err = net_metrics.sk_err; let sk_err_soft = net_metrics.sk_err_soft; @@ -42,8 +45,8 @@ pub async fn display_metrics_map( let sk_ack_backlog = net_metrics.sk_ack_backlog; let sk_receive_buffer_size = net_metrics.sk_receive_buffer_size; info!( - "sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_write_memory_queued: {}, sk_ack_backlog: {}, sk_receive_buffer_size: {}", - sk_drop_count, sk_err, sk_err_soft, sk_backlog_len, sk_write_memory_queued, sk_ack_backlog, sk_receive_buffer_size + "tgid: {}, comm: {}, ts_us: {}, sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_write_memory_queued: {}, sk_ack_backlog: {}, sk_receive_buffer_size: {}", + tgid, comm, ts_us, sk_drop_count, sk_err, sk_err_soft, sk_backlog_len, sk_write_memory_queued, sk_ack_backlog, sk_receive_buffer_size ); } else { info!("Received data too small: {} bytes, expected: {}", data.len(), std::mem::size_of::()); diff --git a/core/src/components/metrics/src/structs.rs b/core/src/components/metrics/src/structs.rs index 75b3d9c..7fd53c7 100644 --- a/core/src/components/metrics/src/structs.rs +++ b/core/src/components/metrics/src/structs.rs @@ -1,8 +1,12 @@ + pub const TASK_COMM_LEN: usize = 16; // linux/sched.h #[repr(C)] #[derive(Clone, Copy)] pub struct NetworkMetrics { + pub tgid: u32, + pub comm: [u8; TASK_COMM_LEN], + pub ts_us: u64, pub sk_err: i32, // Offset 284 pub sk_err_soft: i32, // Offset 600 pub sk_backlog_len: i32, // Offset 196 diff --git a/core/src/components/metrics_tracer/src/data_structures.rs b/core/src/components/metrics_tracer/src/data_structures.rs index 9e89bbd..f6d7afe 100644 --- a/core/src/components/metrics_tracer/src/data_structures.rs +++ b/core/src/components/metrics_tracer/src/data_structures.rs @@ -4,6 +4,9 @@ pub const TASK_COMM_LEN: usize = 16; pub struct NetworkMetrics { + pub tgid: u32, + pub comm: [u8; TASK_COMM_LEN], + pub ts_us: u64, pub sk_err: i32, // Offset 284 pub sk_err_soft: i32, // Offset 600 pub sk_backlog_len: i32, // Offset 196 diff --git a/core/src/components/metrics_tracer/src/main.rs b/core/src/components/metrics_tracer/src/main.rs index a1964f9..ad0d809 100644 --- a/core/src/components/metrics_tracer/src/main.rs +++ b/core/src/components/metrics_tracer/src/main.rs @@ -37,6 +37,9 @@ fn try_metrics_tracer(ctx: ProbeContext) -> Result { return Err(1); } + let tgid = (unsafe { bpf_get_current_pid_tgid() } >> 32) as u32; + let comm = unsafe { bpf_get_current_comm() }.map_err(|_| 1i64)?; + let ts_us: u64 = unsafe { bpf_ktime_get_ns() } / 1_000; let sk_err_offset = 284; let sk_err_soft_offset = 600; let sk_backlog_len_offset = 196; @@ -54,6 +57,9 @@ fn try_metrics_tracer(ctx: ProbeContext) -> Result { let sk_drops = unsafe { bpf_probe_read_kernel::(sk_pointer.add(sk_drops_offset) as *const i32).map_err(|_| 1)? }; let net_metrics = NetworkMetrics { + tgid, + comm, + ts_us, sk_err, sk_err_soft, sk_backlog_len, From 9ff87d8023e7274d724fc249dfd3c6781e49f5d1 Mon Sep 17 00:00:00 2001 From: siddh34 Date: Thu, 13 Nov 2025 10:22:33 +0530 Subject: [PATCH 2/2] CortexFlow[#143]: CLI changes & packet loss metrics --- cli/src/main.rs | 8 +- cli/src/monitoring.rs | 125 +++++++++++ core/api/protos/agent.proto | 49 +---- core/api/src/agent.rs | 182 +--------------- core/api/src/api.rs | 199 +++++++++++++----- core/api/src/helpers.rs | 2 - core/api/src/requests.rs | 53 +---- core/api/src/structs.rs | 6 +- core/common/src/constants.rs | 2 - core/src/components/identity/src/main.rs | 6 +- .../components/identity/src/map_handlers.rs | 4 +- core/src/components/metrics/src/main.rs | 4 +- core/src/components/metrics/src/structs.rs | 2 +- .../src/components/metrics_tracer/src/main.rs | 20 +- 14 files changed, 320 insertions(+), 342 deletions(-) diff --git a/cli/src/main.rs b/cli/src/main.rs index fb49d09..272123f 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -16,7 +16,7 @@ use tracing::debug; use crate::essential::{ CliError, info, update_cli }; use crate::install::{ InstallArgs, InstallCommands, install_cortexflow, install_simple_example }; use crate::logs::{ LogsArgs, logs_command }; -use crate::monitoring::{ MonitorArgs, MonitorCommands, list_features, monitor_identity_events }; +use crate::monitoring::{ MonitorArgs, MonitorCommands, list_features, monitor_dropped_packets, monitor_identity_events, monitor_latency_metrics }; use crate::policies::{ PoliciesArgs, PoliciesCommands, @@ -109,6 +109,12 @@ async fn args_parser() -> Result<(), CliError> { MonitorCommands::Connections => { let _ = monitor_identity_events().await.map_err(|e| eprintln!("{}",e) )?; } + MonitorCommands::Latencymetrics => { + let _ = monitor_latency_metrics().await.map_err(|e| eprintln!("{}",e) )?; + } + MonitorCommands::Droppedpackets => { + let _ = monitor_dropped_packets().await.map_err(|e| eprintln!("{}",e) )?; + } } Some(Commands::Policies(policies_args)) => { match policies_args.policy_cmd { diff --git a/cli/src/monitoring.rs b/cli/src/monitoring.rs index 75941ac..03480e0 100644 --- a/cli/src/monitoring.rs +++ b/cli/src/monitoring.rs @@ -23,6 +23,14 @@ pub enum MonitorCommands { name = "connections", about = "Monitor the recent connections detected by the identity service" )] Connections, + #[command( + name = "latencymetrics", + about = "Monitor the latency metrics detected by the metrics service" + )] Latencymetrics, + #[command( + name = "droppedpackets", + about = "Monitor the dropped packets metrics detected by the metrics service" + )] Droppedpackets, } // cfcli monitor @@ -140,3 +148,120 @@ pub async fn monitor_identity_events() -> Result<(), Error> { Ok(()) } + +pub async fn monitor_latency_metrics() -> Result<(), Error> { + //function to monitor latency metrics + println!("{} {}", "=====>".blue().bold(), "Connecting to cortexflow Client".white()); + + match connect_to_client().await { + Ok(client) => { + println!("{} {}", "=====>".blue().bold(), "Connected to CortexFlow Client".green()); + //send request to get latency metrics + match agent_api::requests::send_latency_metrics_request(client).await { + Ok(response) => { + let resp = response.into_inner(); + if resp.metrics.is_empty() { + println!("{} No latency metrics found", "=====>".blue().bold()); + } else { + println!("{} Found {} latency metrics", "=====>".blue().bold(), resp.metrics.len()); + for (i, metric) in resp.metrics.iter().enumerate() { + println!( + "index {} Latency[{}], tgid {} process_name {} address_family {} delta_us {} src_address_v4 {} dst_address_v4 {} src_address_v6 {} dst_address_v6 {} local_port {} remote_port {} timestamp_us {}", + "=====>".blue().bold(), + i, + metric.tgid, + metric.process_name, + metric.address_family, + metric.delta_us, + metric.src_address_v4, + metric.dst_address_v4, + format!("{:?}", metric.src_address_v6), + format!("{:?}", metric.dst_address_v6), + metric.local_port, + metric.remote_port, + metric.timestamp_us + ); + } + } + } + Err(e) => { + println!( + "{} {} {} {}", + "=====>".blue().bold(), + "An error occured".red(), + "Error:", + e + ); + return Err(e); + } + } + } + Err(e) =>{ + println!( + "{} {}", + "=====>".blue().bold(), + "Failed to connect to CortexFlow Client".red() + ); + return Err(e); + } + } + Ok(()) +} + +pub async fn monitor_dropped_packets() -> Result<(), Error> { + //function to monitor dropped packets metrics + println!("{} {}", "=====>".blue().bold(), "Connecting to cortexflow Client".white()); + + match connect_to_client().await { + Ok(client) => { + println!("{} {}", "=====>".blue().bold(), "Connected to CortexFlow Client".green()); + //send request to get dropped packets metrics + match agent_api::requests::send_dropped_packets_request(client).await { + Ok(response) => { + let resp = response.into_inner(); + if resp.metrics.is_empty() { + println!("{} No dropped packets metrics found", "=====>".blue().bold()); + } else { + println!("{} Found {} dropped packets metrics", "=====>".blue().bold(), resp.metrics.len()); + for (i, metric) in resp.metrics.iter().enumerate() { + println!( + "{} DroppedPackets[{}]\n TGID: {}\n Process: {}\n SK Drops: {}\n Socket Errors: {}\n Soft Errors: {}\n Backlog Length: {}\n Write Memory Queued: {}\n Receive Buffer Size: {}\n ACK Backlog: {}\n Timestamp: {} µs", + "=====>".blue().bold(), + i, + metric.tgid, + metric.process_name, + metric.sk_drops, + metric.sk_err, + metric.sk_err_soft, + metric.sk_backlog_len, + metric.sk_wmem_queued, + metric.sk_rcvbuf, + metric.sk_ack_backlog, + metric.timestamp_us + ); + } + } + } + Err(e) => { + println!( + "{} {} {} {}", + "=====>".blue().bold(), + "An error occured".red(), + "Error:", + e + ); + return Err(e); + } + } + } + Err(e) =>{ + println!( + "{} {}", + "=====>".blue().bold(), + "Failed to connect to CortexFlow Client".red() + ); + return Err(e); + } + } + Ok(()) +} \ No newline at end of file diff --git a/core/api/protos/agent.proto b/core/api/protos/agent.proto index 69e3176..3cd236b 100644 --- a/core/api/protos/agent.proto +++ b/core/api/protos/agent.proto @@ -22,17 +22,8 @@ message ActiveConnectionResponse{ // Network metrics - - // Latency metrics request and response messages -message LatencyMetricsRequest { - optional uint32 tgid = 1; // Filter by thread group ID - optional string process_name = 2; // Filter by process name - optional uint64 start_time = 3; // Start timestamp (microseconds) - optional uint64 end_time = 4; // End timestamp (microseconds) -} - message LatencyMetric { uint64 delta_us = 1; // Latency in microseconds uint64 timestamp_us = 2; // Event timestamp @@ -56,40 +47,8 @@ message LatencyMetricsResponse { double max_latency_us = 6; // Maximum latency } -// Packet Loss Metrics - -message PacketLossMetricsRequest { - optional uint32 tgid = 1; // Filter by thread group ID - optional uint64 start_time = 2; // Start timestamp - optional uint64 end_time = 3; // End timestamp -} - -message PacketLossMetric { - uint32 tgid = 1; // Thread group ID - string process_name = 2; // Process name - uint64 timestamp_us = 3; // Event timestamp - uint32 total_packets_lost = 4; // Total packets lost - uint32 total_packets_transmitted = 5; // Total packets transmitted - double packet_loss_percentage = 6; // % of total packet loss - uint64 total_data_loss_bytes = 7; // Total size of data loss - uint64 total_data_transmitted_bytes = 8; // Total transmitted data - double data_loss_ratio = 9; // Ratio between loss and transmitted -} - -message PacketLossMetricsResponse { - string status = 1; - repeated PacketLossMetric metrics = 2; - uint32 total_connections = 3; -} - // Dropped TCP Packets -message DroppedPacketsRequest { - optional uint32 tgid = 1; // Filter by thread group ID - optional uint64 start_time = 2; // Start timestamp - optional uint64 end_time = 3; // End timestamp -} - message DroppedPacketMetric { uint32 tgid = 1; // Thread group ID string process_name = 2; // Process name @@ -112,7 +71,6 @@ message DroppedPacketsResponse { //declare agent api service Agent{ - // active connections endpoint rpc ActiveConnections(RequestActiveConnections) returns (ActiveConnectionResponse); @@ -124,11 +82,10 @@ service Agent{ rpc RmIpFromBlocklist(RmIpFromBlocklistRequest) returns (RmIpFromBlocklistResponse); // metrics data - rpc GetLatencyMetrics(LatencyMetricsRequest) returns (LatencyMetricsResponse); - - rpc GetPacketLossMetrics(PacketLossMetricsRequest) returns (PacketLossMetricsResponse); + rpc GetLatencyMetrics(google.protobuf.Empty) returns (LatencyMetricsResponse); - rpc GetDroppedPacketsMetrics(DroppedPacketsRequest) returns (DroppedPacketsResponse); + // dropped packets + rpc GetDroppedPacketsMetrics(google.protobuf.Empty) returns (DroppedPacketsResponse); } message AddIpToBlocklistRequest{ diff --git a/core/api/src/agent.rs b/core/api/src/agent.rs index 933d066..c6f5126 100644 --- a/core/api/src/agent.rs +++ b/core/api/src/agent.rs @@ -24,21 +24,6 @@ pub struct ActiveConnectionResponse { pub events: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] -pub struct LatencyMetricsRequest { - /// Filter by thread group ID - #[prost(uint32, optional, tag = "1")] - pub tgid: ::core::option::Option, - /// Filter by process name - #[prost(string, optional, tag = "2")] - pub process_name: ::core::option::Option<::prost::alloc::string::String>, - /// Start timestamp (microseconds) - #[prost(uint64, optional, tag = "3")] - pub start_time: ::core::option::Option, - /// End timestamp (microseconds) - #[prost(uint64, optional, tag = "4")] - pub end_time: ::core::option::Option, -} -#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct LatencyMetric { /// Latency in microseconds #[prost(uint64, tag = "1")] @@ -92,69 +77,6 @@ pub struct LatencyMetricsResponse { #[prost(double, tag = "6")] pub max_latency_us: f64, } -#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] -pub struct PacketLossMetricsRequest { - /// Filter by thread group ID - #[prost(uint32, optional, tag = "1")] - pub tgid: ::core::option::Option, - /// Start timestamp - #[prost(uint64, optional, tag = "2")] - pub start_time: ::core::option::Option, - /// End timestamp - #[prost(uint64, optional, tag = "3")] - pub end_time: ::core::option::Option, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct PacketLossMetric { - /// Thread group ID - #[prost(uint32, tag = "1")] - pub tgid: u32, - /// Process name - #[prost(string, tag = "2")] - pub process_name: ::prost::alloc::string::String, - /// Event timestamp - #[prost(uint64, tag = "3")] - pub timestamp_us: u64, - /// Total packets lost - #[prost(uint32, tag = "4")] - pub total_packets_lost: u32, - /// Total packets transmitted - #[prost(uint32, tag = "5")] - pub total_packets_transmitted: u32, - /// % of total packet loss - #[prost(double, tag = "6")] - pub packet_loss_percentage: f64, - /// Total size of data loss - #[prost(uint64, tag = "7")] - pub total_data_loss_bytes: u64, - /// Total transmitted data - #[prost(uint64, tag = "8")] - pub total_data_transmitted_bytes: u64, - /// Ratio between loss and transmitted - #[prost(double, tag = "9")] - pub data_loss_ratio: f64, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct PacketLossMetricsResponse { - #[prost(string, tag = "1")] - pub status: ::prost::alloc::string::String, - #[prost(message, repeated, tag = "2")] - pub metrics: ::prost::alloc::vec::Vec, - #[prost(uint32, tag = "3")] - pub total_connections: u32, -} -#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] -pub struct DroppedPacketsRequest { - /// Filter by thread group ID - #[prost(uint32, optional, tag = "1")] - pub tgid: ::core::option::Option, - /// Start timestamp - #[prost(uint64, optional, tag = "2")] - pub start_time: ::core::option::Option, - /// End timestamp - #[prost(uint64, optional, tag = "3")] - pub end_time: ::core::option::Option, -} #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct DroppedPacketMetric { /// Thread group ID @@ -422,7 +344,7 @@ pub mod agent_client { /// metrics data pub async fn get_latency_metrics( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoRequest<()>, ) -> std::result::Result< tonic::Response, tonic::Status, @@ -444,33 +366,10 @@ pub mod agent_client { .insert(GrpcMethod::new("agent.Agent", "GetLatencyMetrics")); self.inner.unary(req, path, codec).await } - pub async fn get_packet_loss_metrics( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic_prost::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/agent.Agent/GetPacketLossMetrics", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("agent.Agent", "GetPacketLossMetrics")); - self.inner.unary(req, path, codec).await - } + /// dropped packets pub async fn get_dropped_packets_metrics( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoRequest<()>, ) -> std::result::Result< tonic::Response, tonic::Status, @@ -541,21 +440,15 @@ pub mod agent_server { /// metrics data async fn get_latency_metrics( &self, - request: tonic::Request, + request: tonic::Request<()>, ) -> std::result::Result< tonic::Response, tonic::Status, >; - async fn get_packet_loss_metrics( - &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + /// dropped packets async fn get_dropped_packets_metrics( &self, - request: tonic::Request, + request: tonic::Request<()>, ) -> std::result::Result< tonic::Response, tonic::Status, @@ -816,19 +709,14 @@ pub mod agent_server { "/agent.Agent/GetLatencyMetrics" => { #[allow(non_camel_case_types)] struct GetLatencyMetricsSvc(pub Arc); - impl< - T: Agent, - > tonic::server::UnaryService + impl tonic::server::UnaryService<()> for GetLatencyMetricsSvc { type Response = super::LatencyMetricsResponse; type Future = BoxFuture< tonic::Response, tonic::Status, >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { ::get_latency_metrics(&inner, request).await @@ -858,67 +746,17 @@ pub mod agent_server { }; Box::pin(fut) } - "/agent.Agent/GetPacketLossMetrics" => { - #[allow(non_camel_case_types)] - struct GetPacketLossMetricsSvc(pub Arc); - impl< - T: Agent, - > tonic::server::UnaryService - for GetPacketLossMetricsSvc { - type Response = super::PacketLossMetricsResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::get_packet_loss_metrics(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = GetPacketLossMetricsSvc(inner); - let codec = tonic_prost::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } "/agent.Agent/GetDroppedPacketsMetrics" => { #[allow(non_camel_case_types)] struct GetDroppedPacketsMetricsSvc(pub Arc); - impl< - T: Agent, - > tonic::server::UnaryService + impl tonic::server::UnaryService<()> for GetDroppedPacketsMetricsSvc { type Response = super::DroppedPacketsResponse; type Future = BoxFuture< tonic::Response, tonic::Status, >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { ::get_dropped_packets_metrics(&inner, request) diff --git a/core/api/src/api.rs b/core/api/src/api.rs index 9aa7db8..27641b4 100644 --- a/core/api/src/api.rs +++ b/core/api/src/api.rs @@ -1,9 +1,11 @@ #![allow(warnings)] use anyhow::Context; use chrono::Local; -use cortexbrain_common::formatters::{format_ipv4, format_ipv6}; +use cortexbrain_common::{ + formatters::{format_ipv4, format_ipv6}, +}; use prost::bytes::BytesMut; -use std::str::FromStr; +use std::{str::FromStr, sync::Arc}; use std::sync::Mutex; use tonic::{Request, Response, Status}; use tracing::info; @@ -20,7 +22,12 @@ use std::collections::HashMap; use tokio::sync::mpsc; use tokio::task; -use crate::agent::{ConnectionEvent, DroppedPacketMetric, DroppedPacketsRequest, DroppedPacketsResponse, LatencyMetric, LatencyMetricsRequest, LatencyMetricsResponse, PacketLossMetricsRequest, PacketLossMetricsResponse}; +use crate::{ + agent::{ + ConnectionEvent, DroppedPacketMetric, DroppedPacketsResponse, + LatencyMetric, LatencyMetricsResponse, + }, +}; use crate::structs::{NetworkMetrics, PacketLog, TimeStampMetrics}; @@ -43,10 +50,10 @@ pub struct AgentApi { //* is used to receive the data from the transmitter (tx) active_connection_event_rx: Mutex, Status>>>, active_connection_event_tx: mpsc::Sender, Status>>, - latency_metrics_rx: Mutex>, - latency_metrics_tx: mpsc::Sender, - dropped_packet_metrics_rx: Mutex>, - dropped_packet_metrics_tx: mpsc::Sender, + latency_metrics_rx: Mutex, Status>>>, + latency_metrics_tx: mpsc::Sender, Status>>, + dropped_packet_metrics_rx: Mutex, Status>>>, + dropped_packet_metrics_tx: mpsc::Sender, Status>>, } //* Event sender trait. Takes an event from a map and send that to the mpsc channel @@ -64,6 +71,29 @@ pub trait EventSender: Send + Sync + 'static { let _ = tx.send(event).await; } + + async fn send_latency_metrics_event(&self, event: Vec); + async fn send_latency_metrics_event_map( + &self, + map: Vec, + tx: mpsc::Sender, Status>>, + ) { + let status = Status::new(tonic::Code::Ok, "success"); + let event = Ok(map); + let _ = tx.send(event).await; + } + + async fn send_dropped_packet_metrics_event(&self, event: Vec); + async fn send_dropped_packet_metrics_event_map( + &self, + map: Vec, + tx: mpsc::Sender, Status>>, + ) { + let status = Status::new(tonic::Code::Ok, "success"); + let event = Ok(map); + let _ = tx.send(event).await; + } + } // send event function. takes an HashMap and send that using mpsc event_tx @@ -73,6 +103,16 @@ impl EventSender for AgentApi { self.send_active_connection_event_map(event, self.active_connection_event_tx.clone()) .await; } + + async fn send_latency_metrics_event(&self, event: Vec) { + self.send_latency_metrics_event_map(event, self.latency_metrics_tx.clone()) + .await; + } + + async fn send_dropped_packet_metrics_event(&self, event: Vec) { + self.send_dropped_packet_metrics_event_map(event, self.dropped_packet_metrics_tx.clone()) + .await; + } } //initialize a default trait for AgentApi. Loads a name and a bpf istance. @@ -89,14 +129,14 @@ impl Default for AgentApi { .expect("Error while initializing events array"); // load network metrics maps mapdata - let network_metrics_mapdata = MapData::from_pin("/sys/fs/bpf/maps/net_metrics") + let network_metrics_mapdata = MapData::from_pin("/sys/fs/bpf/trace_maps/net_metrics") .expect("cannot open net_metrics Mapdata"); let network_metrics_map = Map::PerfEventArray(network_metrics_mapdata); //creates a PerfEventArray from the mapdata let mut network_metrics_events_array = PerfEventArray::try_from(network_metrics_map) .expect("Error while initializing network metrics array"); // load time stamp events maps mapdata - let time_stamp_events_mapdata = MapData::from_pin("/sys/fs/bpf/maps/time_stamp_events") + let time_stamp_events_mapdata = MapData::from_pin("/sys/fs/bpf/trace_maps/time_stamp_events") .expect("cannot open time_stamp_events Mapdata"); let time_stamp_events_map = Map::PerfEventArray(time_stamp_events_mapdata); // let mut time_stamp_events_array = PerfEventArray::try_from(time_stamp_events_map) @@ -106,6 +146,7 @@ impl Default for AgentApi { let (conn_tx, conn_rx) = mpsc::channel(1024); let (lat_tx, lat_rx) = mpsc::channel(2048); let (drop_tx, drop_rx) = mpsc::channel(2048); + let api = AgentApi { active_connection_event_rx: conn_rx.into(), active_connection_event_tx: conn_tx.clone(), @@ -144,9 +185,7 @@ impl Default for AgentApi { if events.read > 0 { for i in 0..events.read { let data = &buffers[i]; - if data.len() - >= std::mem::size_of::() - { + if data.len() >= std::mem::size_of::() { let pl: PacketLog = unsafe { std::ptr::read(data.as_ptr() as *const _) }; let src = Ipv4Addr::from(u32::from_be(pl.src_ip)); @@ -212,6 +251,7 @@ impl Default for AgentApi { task::spawn(async move { let mut net_metrics_buffer = Vec::new(); + //scan the cpus to read the data for cpu_id in online_cpus() .map_err(|e| anyhow::anyhow!("Error {:?}", e)) @@ -236,9 +276,7 @@ impl Default for AgentApi { if events.read > 0 { for i in 0..events.read { let data = &buffers[i]; - if data.len() - >= std::mem::size_of::() - { + if data.len() >= std::mem::size_of::() { let nm: NetworkMetrics = unsafe { std::ptr::read(data.as_ptr() as *const _) }; @@ -248,7 +286,7 @@ impl Default for AgentApi { sk_drops: nm.sk_drops, sk_err: nm.sk_err, sk_err_soft: nm.sk_err_soft, - sk_backlog_len: nm.sk_backlog_len, + sk_backlog_len: nm.sk_backlog_len as u32, sk_wmem_queued: nm.sk_write_memory_queued, sk_rcvbuf: nm.sk_receive_buffer_size, sk_ack_backlog: nm.sk_ack_backlog, @@ -256,6 +294,7 @@ impl Default for AgentApi { }; if dropped_packet_metrics.sk_drops > 0 { + let mut evt = Vec::new(); info!( "Dropped Packet Metric - tgid: {}, process_name: {}, sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_wmem_queued: {}, sk_rcvbuf: {}, sk_ack_backlog: {}, timestamp_us: {}", dropped_packet_metrics.tgid, @@ -269,13 +308,8 @@ impl Default for AgentApi { dropped_packet_metrics.sk_ack_backlog, dropped_packet_metrics.timestamp_us ); - let _ = drop_tx.send(dropped_packet_metrics).await; - } else { - info!( - "No dropped packets for tgid: {}, process_name: {}", - dropped_packet_metrics.tgid, - dropped_packet_metrics.process_name - ); + evt.push(dropped_packet_metrics.clone()); + let _ = drop_tx.send(Ok(evt)).await; } } else { warn!( @@ -323,9 +357,7 @@ impl Default for AgentApi { if events.read > 0 { for i in 0..events.read { let data = &buffers[i]; - if data.len() - >= std::mem::size_of::() - { + if data.len() >= std::mem::size_of::() { let tsm: TimeStampMetrics = unsafe { std::ptr::read(data.as_ptr() as *const _) }; let latency_metric = LatencyMetric { @@ -341,7 +373,23 @@ impl Default for AgentApi { src_address_v6: format_ipv6(&tsm.saddr_v6), dst_address_v6: format_ipv6(&tsm.daddr_v6), }; - let _ = lat_tx.send(latency_metric).await; + info!( + "Latency Metric - tgid: {}, process_name: {}, delta_us: {}, timestamp_us: {}, local_port: {}, remote_port: {}, address_family: {}, src_address_v4: {}, dst_address_v4: {}, src_address_v6: {}, dst_address_v6: {}", + latency_metric.tgid, + latency_metric.process_name, + latency_metric.delta_us, + latency_metric.timestamp_us, + latency_metric.local_port, + latency_metric.remote_port, + latency_metric.address_family, + latency_metric.src_address_v4, + latency_metric.dst_address_v4, + latency_metric.src_address_v6, + latency_metric.dst_address_v6 + ); + let mut evt = Vec::new(); + evt.push(latency_metric.clone()); + let _ = lat_tx.send(Ok(evt)).await; } else { warn!( "Received time stamp metrics data too small: {} bytes", @@ -515,7 +563,7 @@ impl Agent for AgentApi { async fn get_latency_metrics( &self, - request: Request, + request: Request<()>, ) -> Result, Status> { // Extract the request parameters let req = request.into_inner(); @@ -523,32 +571,54 @@ impl Agent for AgentApi { // Here you would typically query your data source for the latency metrics // For demonstration purposes, we'll return a dummy response - let response = LatencyMetricsResponse { - status: "success".to_string(), - metrics: vec![], - total_count: 0, - average_latency_us: 0.0, - min_latency_us: 0.0, - max_latency_us: 0.0, - }; - Ok(Response::new(response)) - } + let mut aggregated_latency_metrics_events: Vec = Vec::new(); - async fn get_packet_loss_metrics( - &self, - request: Request, - ) -> Result, Status> { - // Extract the request parameters - let req = request.into_inner(); - info!("Getting packet loss metrics"); + while let Ok(evt) = self.latency_metrics_rx.lock().unwrap().try_recv() { + if let Ok(vec) = evt { + aggregated_latency_metrics_events.extend(vec); + } + } - // Here you would typically query your data source for the packet loss metrics - // For demonstration purposes, we'll return a dummy response - let response = PacketLossMetricsResponse { + let total_count = aggregated_latency_metrics_events.len() as u32; + + let (average_latency_us, min_latency_us, max_latency_us) = + if !aggregated_latency_metrics_events.is_empty() { + let sum: u64 = aggregated_latency_metrics_events + .iter() + .map(|m| m.delta_us) + .sum(); + let avg = sum as f64 / aggregated_latency_metrics_events.len() as f64; + + let min = aggregated_latency_metrics_events + .iter() + .map(|m| m.delta_us) + .min() + .unwrap_or(0) as f64; + + let max = aggregated_latency_metrics_events + .iter() + .map(|m| m.delta_us) + .max() + .unwrap_or(0) as f64; + + (avg, min, max) + } else { + (0.0, 0.0, 0.0) + }; + + info!( + "Latency metrics - total_count: {}, average: {:.2}us, min: {:.2}us, max: {:.2}us", + total_count, average_latency_us, min_latency_us, max_latency_us + ); + + let response = LatencyMetricsResponse { status: "success".to_string(), - metrics: vec![], - total_connections: 0, + metrics: aggregated_latency_metrics_events, + total_count, + average_latency_us, + max_latency_us, + min_latency_us, }; Ok(Response::new(response)) @@ -556,20 +626,37 @@ impl Agent for AgentApi { async fn get_dropped_packets_metrics( &self, - request: Request, + request: Request<()>, ) -> Result, Status> { // Extract the request parameters let req = request.into_inner(); info!("Getting dropped packets metrics"); - // Here you would typically query your data source for the dropped packets metrics - // For demonstration purposes, we'll return a dummy response + let mut aggregated_dropped_packet_metrics: Vec = Vec::new(); + let mut total_drops = 0u32; + + // Collect all metrics from channel + while let Ok(evt) = self.dropped_packet_metrics_rx.lock().unwrap().try_recv() { + if let Ok(vec) = evt { + for metric in vec { + total_drops += metric.sk_drops as u32; + aggregated_dropped_packet_metrics.push(metric); + } + } + } + + info!( + "Dropped packets metrics - total_metrics: {}, total_drops: {}", + aggregated_dropped_packet_metrics.len(), + total_drops + ); + let response = DroppedPacketsResponse { status: "success".to_string(), - metrics: vec![], - total_drops: 0, + metrics: aggregated_dropped_packet_metrics, + total_drops, }; Ok(Response::new(response)) } -} \ No newline at end of file +} diff --git a/core/api/src/helpers.rs b/core/api/src/helpers.rs index ea2fc3b..c060949 100644 --- a/core/api/src/helpers.rs +++ b/core/api/src/helpers.rs @@ -1,5 +1,3 @@ - - use crate::constants::TASK_COMM_LEN; pub fn comm_to_string(comm: &[u8; TASK_COMM_LEN]) -> String { diff --git a/core/api/src/requests.rs b/core/api/src/requests.rs index 05ffdba..a518f4a 100644 --- a/core/api/src/requests.rs +++ b/core/api/src/requests.rs @@ -15,12 +15,8 @@ use crate::agent::BlocklistResponse; use crate::agent::AddIpToBlocklistRequest; use crate::agent::RmIpFromBlocklistRequest; use crate::agent::RmIpFromBlocklistResponse; -use crate::agent::DroppedPacketsRequest; use crate::agent::DroppedPacketsResponse; -use crate::agent::LatencyMetricsRequest; use crate::agent::LatencyMetricsResponse; -use crate::agent::PacketLossMetricsRequest; -use crate::agent::PacketLossMetricsResponse; #[cfg(feature = "client")] pub async fn send_active_connection_request( @@ -76,52 +72,23 @@ pub async fn remove_ip_from_blocklist_request( } #[cfg(feature = "client")] -pub async fn get_dropped_packets( +pub async fn send_dropped_packets_request( mut client: AgentClient, - tgid: Option, - start_time: Option, - end_time: Option, ) -> Result, Error> { - let request = Request::new(DroppedPacketsRequest { - tgid, - start_time, - end_time, - }); - let response = client.get_dropped_packets_metrics(request).await?; + let request = Request::new(()); + let response = client.get_dropped_packets_metrics( + request + ).await?; Ok(response) } #[cfg(feature = "client")] -pub async fn get_latency_metrics( +pub async fn send_latency_metrics_request( mut client: AgentClient, - tgid: Option, - start_time: Option, - end_time: Option, - process_name: Option, ) -> Result, Error> { - - let request = Request::new(LatencyMetricsRequest { - tgid, - start_time, - end_time, - process_name - }); - let response = client.get_latency_metrics(request).await?; + let request = Request::new(()); + let response = client.get_latency_metrics( + request + ).await?; Ok(response) } - -#[cfg(feature = "client")] -pub async fn get_packet_loss_metrics( - mut client: AgentClient, - tgid: Option, - start_time: Option, - end_time: Option, -) -> Result, Error> { - let request = Request::new(PacketLossMetricsRequest { - tgid, - start_time, - end_time, - }); - let response = client.get_packet_loss_metrics(request).await?; - Ok(response) -} \ No newline at end of file diff --git a/core/api/src/structs.rs b/core/api/src/structs.rs index 1dc4ec8..b15fa22 100644 --- a/core/api/src/structs.rs +++ b/core/api/src/structs.rs @@ -14,7 +14,7 @@ pub struct PacketLog { } unsafe impl aya::Pod for PacketLog {} -#[repr(C)] +#[repr(C, packed)] #[derive(Clone, Copy, Zeroable)] pub struct NetworkMetrics { pub tgid: u32, @@ -22,7 +22,7 @@ pub struct NetworkMetrics { pub ts_us: u64, pub sk_err: i32, pub sk_err_soft: i32, - pub sk_backlog_len: u32, + pub sk_backlog_len: i32, pub sk_write_memory_queued: i32, pub sk_receive_buffer_size: i32, pub sk_ack_backlog: u32, @@ -45,4 +45,4 @@ pub struct TimeStampMetrics { pub saddr_v6: [u32; 4], pub daddr_v6: [u32; 4], } -unsafe impl aya::Pod for TimeStampMetrics {} \ No newline at end of file +unsafe impl aya::Pod for TimeStampMetrics {} diff --git a/core/common/src/constants.rs b/core/common/src/constants.rs index 2776484..4cc3835 100644 --- a/core/common/src/constants.rs +++ b/core/common/src/constants.rs @@ -5,5 +5,3 @@ pub const BPF_PATH: &str = "BPF_PATH"; /// Environment variable name for the BPF map pinning path. /// Used for sharing maps between eBPF programs. pub const PIN_MAP_PATH: &str = "PIN_MAP_PATH"; - - diff --git a/core/src/components/identity/src/main.rs b/core/src/components/identity/src/main.rs index 9804256..9ee00e9 100644 --- a/core/src/components/identity/src/main.rs +++ b/core/src/components/identity/src/main.rs @@ -59,9 +59,9 @@ async fn main() -> Result<(), anyhow::Error> { match init_bpf_maps(bpf.clone()) { std::result::Result::Ok(mut bpf_maps) => { info!("Successfully loaded bpf maps"); - - //TODO: save the bpf maps in a Vec instead of using a tuple - match map_pinner(&bpf_maps, &bpf_map_save_path.into()).await { + let pin_path = std::path::PathBuf::from(&bpf_map_save_path); + info!("About to call map_pinner with path: {:?}", pin_path); + match map_pinner(&bpf_maps, &pin_path).await { std::result::Result::Ok(_) => { info!("maps pinned successfully"); //load veth_trace program ref veth_trace.rs diff --git a/core/src/components/identity/src/map_handlers.rs b/core/src/components/identity/src/map_handlers.rs index 49697b7..6f2e818 100644 --- a/core/src/components/identity/src/map_handlers.rs +++ b/core/src/components/identity/src/map_handlers.rs @@ -1,6 +1,6 @@ use anyhow::Error; use anyhow::Ok; -use aya::Bpf; +use aya::Ebpf; use aya::maps::HashMap; use aya::maps::Map; use k8s_openapi::api::core::v1::ConfigMap; @@ -13,7 +13,7 @@ use std::sync::Mutex; use tokio::fs; use tracing::{error, info}; -pub fn init_bpf_maps(bpf: Arc>) -> Result<(Map, Map, Map), anyhow::Error> { +pub fn init_bpf_maps(bpf: Arc>) -> Result<(Map, Map, Map), anyhow::Error> { // this function init the bpfs maps used in the main program /* index 0: events_map diff --git a/core/src/components/metrics/src/main.rs b/core/src/components/metrics/src/main.rs index 2b0f2e9..6b22a86 100644 --- a/core/src/components/metrics/src/main.rs +++ b/core/src/components/metrics/src/main.rs @@ -47,7 +47,9 @@ async fn main() -> Result<(), anyhow::Error> { match init_ebpf_maps(bpf.clone()) { std::result::Result::Ok(maps) => { info!("BPF maps loaded successfully"); - match map_pinner(&maps, &bpf_map_save_path.clone().into()).await { + let pin_path = std::path::PathBuf::from(&bpf_map_save_path); + info!("About to call map_pinner with path: {:?}", pin_path); + match map_pinner(&maps, &pin_path).await { std::result::Result::Ok(_) => { info!("BPF maps pinned successfully to {}", bpf_map_save_path); diff --git a/core/src/components/metrics/src/structs.rs b/core/src/components/metrics/src/structs.rs index 7fd53c7..dc63ace 100644 --- a/core/src/components/metrics/src/structs.rs +++ b/core/src/components/metrics/src/structs.rs @@ -1,7 +1,7 @@ pub const TASK_COMM_LEN: usize = 16; // linux/sched.h -#[repr(C)] +#[repr(C, packed)] #[derive(Clone, Copy)] pub struct NetworkMetrics { pub tgid: u32, diff --git a/core/src/components/metrics_tracer/src/main.rs b/core/src/components/metrics_tracer/src/main.rs index ad0d809..2f5e5a1 100644 --- a/core/src/components/metrics_tracer/src/main.rs +++ b/core/src/components/metrics_tracer/src/main.rs @@ -57,16 +57,16 @@ fn try_metrics_tracer(ctx: ProbeContext) -> Result { let sk_drops = unsafe { bpf_probe_read_kernel::(sk_pointer.add(sk_drops_offset) as *const i32).map_err(|_| 1)? }; let net_metrics = NetworkMetrics { - tgid, - comm, - ts_us, - sk_err, - sk_err_soft, - sk_backlog_len, - sk_write_memory_queued, - sk_receive_buffer_size, - sk_ack_backlog, - sk_drops, + tgid: tgid, + comm: comm, + ts_us: ts_us, + sk_err: sk_err, + sk_err_soft: sk_err_soft, + sk_backlog_len: sk_backlog_len, + sk_write_memory_queued: sk_write_memory_queued, + sk_receive_buffer_size: sk_receive_buffer_size, + sk_ack_backlog: sk_ack_backlog, + sk_drops: sk_drops, }; unsafe {