diff --git a/cli/Cargo.lock b/cli/Cargo.lock index 6ef922e..b773e7b 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -171,30 +171,6 @@ dependencies = [ "object", "once_cell", "thiserror 1.0.69", - "tokio", -] - -[[package]] -name = "aya-log" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b600d806c1d07d3b81ab5f4a2a95fd80f479a0d3f1d68f29064d660865f85f02" -dependencies = [ - "aya", - "aya-log-common", - "bytes", - "log", - "thiserror 1.0.69", - "tokio", -] - -[[package]] -name = "aya-log-common" -version = "0.1.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "befef9fe882e63164a2ba0161874e954648a72b0e1c4b361f532d590638c4eec" -dependencies = [ - "num_enum", ] [[package]] @@ -438,7 +414,6 @@ version = "0.1.1-beta.1" dependencies = [ "anyhow", "aya", - "aya-log", "bytemuck", "bytemuck_derive", "bytes", @@ -1138,27 +1113,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_enum" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a973b4e44ce6cad84ce69d797acf9a044532e4184c4f267913d1b546a0727b7a" -dependencies = [ - "num_enum_derive", - "rustversion", -] - -[[package]] -name = "num_enum_derive" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77e878c846a8abae00dd069496dbe8751b16ac1c3d6bd2a7283a938e8228f90d" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "object" version = "0.36.7" diff --git a/core/src/components/conntracker/src/data_structures.rs b/core/src/components/conntracker/src/data_structures.rs index d8b4ec1..35861a8 100644 --- a/core/src/components/conntracker/src/data_structures.rs +++ b/core/src/components/conntracker/src/data_structures.rs @@ -61,6 +61,21 @@ pub struct VethLog { } +// TODO: write documentation about this structure +#[repr(C)] +#[derive(Clone,Copy,Debug)] +pub struct TcpPacketRegistry{ + pub proto: u8, + pub src_ip: u32, + pub dst_ip: u32, + pub src_port: u16, + pub dst_port: u16, + pub pid: u32, + pub command: [u8;16], + pub cgroup_id: u64, + +} + // docs: // // BPF maps used in the conntracker programs @@ -90,4 +105,7 @@ pub static mut VETH_EVENTS: PerfEventArray = PerfEventArray::new(0); #[map(name = "Blocklist")] pub static mut BLOCKLIST: HashMap<[u8;4], [u8;4]> = HashMap::<[u8;4], [u8;4]>::with_max_entries(1024, 0); -//here i need to pass an address like this: [135,171,168,192] \ No newline at end of file +//here i need to pass an address like this: [135,171,168,192] + +#[map(name = "TcpPacketRegistry",pinning = "by_name")] +pub static mut PACKET_REGISTRY: PerfEventArray = PerfEventArray::new(0); \ No newline at end of file diff --git a/core/src/components/conntracker/src/offsets.rs b/core/src/components/conntracker/src/offsets.rs index 3b3101c..1ced2af 100644 --- a/core/src/components/conntracker/src/offsets.rs +++ b/core/src/components/conntracker/src/offsets.rs @@ -60,9 +60,12 @@ impl OFFSETS { pub const DST_PORT_OFFSET_FROM_IP_HEADER: usize = 2; // destination port offset // TOTAL BYTES SUM - pub const ETH_STACK_BYTES: usize = OFFSETS::SRC_MAC + OFFSETS::DST_MAC + OFFSETS::ETHERTYPE_BYTES; // ethernet protocol total stacked bytes + pub const ETH_STACK_BYTES: usize = + OFFSETS::SRC_MAC + OFFSETS::DST_MAC + OFFSETS::ETHERTYPE_BYTES; // ethernet protocol total stacked bytes pub const DST_T0TAL_BYTES_OFFSET: usize = OFFSETS::ETH_STACK_BYTES + OFFSETS::DST_BYTE_OFFSET; // destination total bytes offset pub const SRC_T0TAL_BYTES_OFFSET: usize = OFFSETS::ETH_STACK_BYTES + OFFSETS::SRC_BYTE_OFFSET; // source total bytes offset pub const PROTOCOL_T0TAL_BYTES_OFFSET: usize = OFFSETS::ETH_STACK_BYTES + OFFSETS::IPV4_PROTOCOL_OFFSET; // total bytes offset + + pub const SKB_DATA_POINTER: usize = 208; // sk_buff structure data pointer } diff --git a/core/src/components/conntracker/src/tcp_analyzer.rs b/core/src/components/conntracker/src/tcp_analyzer.rs index 1b416d2..9fe5d29 100644 --- a/core/src/components/conntracker/src/tcp_analyzer.rs +++ b/core/src/components/conntracker/src/tcp_analyzer.rs @@ -1,6 +1,83 @@ - use aya_ebpf::programs::ProbeContext; +use aya_ebpf::helpers::{ + bpf_get_current_comm, + bpf_get_current_pid_tgid, + bpf_get_current_cgroup_id, +}; + +use crate::bindings::{ sk_buff }; +use crate::offsets::OFFSETS; +use crate::data_structures::{ PACKET_REGISTRY, TcpPacketRegistry }; +use crate::veth_tracer::{ read_linux_inner_struct, read_linux_inner_value }; + +// docs: +// TODO: add function documentation +// docs: +// +// how skb works? http://oldvger.kernel.org/~davem/skb_data.html +// +// ref: https://elixir.bootlin.com/linux/v6.17.7/source/net/ipv4/tcp_ipv4.c#L2195 +// + +//in tcp_v4_recv skb->data pub fn try_tcp_analyzer(ctx: ProbeContext) -> Result { - todo!() + let sk_buff_pointer: *const sk_buff = ctx.arg(0).ok_or(1i64)?; + // first control: i'm, verifying that the pointer is not null + if sk_buff_pointer.is_null() { + return Err(1); + } + + let skb_data_pointer = read_linux_inner_struct::( + sk_buff_pointer as *const u8, + OFFSETS::SKB_DATA_POINTER + )?; + let first_ipv4_byte = read_linux_inner_value::(skb_data_pointer as *const u8, 0)?; + let ihl = (first_ipv4_byte & 0x0f) as usize; // 0x0F=00001111 &=AND bit a bit operator to extract the last 4 bit + let ip_header_len = ihl * 4; //returns the header lenght in bytes + + let proto = read_linux_inner_struct::( + skb_data_pointer, + OFFSETS::IPV4_PROTOCOL_OFFSET + )? as u8; + + if proto != 6 { + return Ok(0); + } else { + // get the source ip,destination ip and connection id + let src_ip = read_linux_inner_value::(skb_data_pointer, OFFSETS::SRC_BYTE_OFFSET)?; + let dst_ip = read_linux_inner_value::(skb_data_pointer, OFFSETS::DST_BYTE_OFFSET)?; + let src_port = u16::from_be( + read_linux_inner_value( + skb_data_pointer, + ip_header_len + OFFSETS::SRC_PORT_OFFSET_FROM_IP_HEADER + )? + ); + let dst_port = u16::from_be( + read_linux_inner_value( + skb_data_pointer, + ip_header_len + OFFSETS::DST_PORT_OFFSET_FROM_IP_HEADER + )? + ); + + let command = bpf_get_current_comm()?; + let pid = (bpf_get_current_pid_tgid() >> 32) as u32; + let cgroup_id = unsafe { bpf_get_current_cgroup_id() }; + + let log = TcpPacketRegistry { + proto, + src_ip, + dst_ip, + src_port, + dst_port, + pid, + command, + cgroup_id, + }; + unsafe { + PACKET_REGISTRY.output(&ctx, &log, 0); + } + } + + Ok(0) } diff --git a/core/src/components/conntracker/src/veth_tracer.rs b/core/src/components/conntracker/src/veth_tracer.rs index f932c7d..e2f07e7 100644 --- a/core/src/components/conntracker/src/veth_tracer.rs +++ b/core/src/components/conntracker/src/veth_tracer.rs @@ -77,7 +77,7 @@ pub fn try_veth_tracer(ctx: ProbeContext, mode: u8) -> Result { // // Returns a Result type with a const pointer to an inner field or an error code as i64 -fn read_linux_inner_struct(ptr: *const u8, offset: usize) -> Result<*const T, i64> { +pub fn read_linux_inner_struct(ptr: *const u8, offset: usize) -> Result<*const T, i64> { if ptr.is_null() { return Err(1); } else { @@ -105,7 +105,7 @@ fn read_linux_inner_struct(ptr: *const u8, offset: usize) -> Result<*const T, // // Returns a Result type with the value or an error code as i64 -fn read_linux_inner_value(ptr: *const u8, offset: usize) -> Result { +pub fn read_linux_inner_value(ptr: *const u8, offset: usize) -> Result { if ptr.is_null() { return Err(1); } diff --git a/core/src/components/identity/Cargo.toml b/core/src/components/identity/Cargo.toml index 125d2ec..6d9703e 100644 --- a/core/src/components/identity/Cargo.toml +++ b/core/src/components/identity/Cargo.toml @@ -18,7 +18,6 @@ enums = [] [dependencies] aya = "0.13.1" -aya-log = "0.2.1" bytes = "1.4" tokio = { version = "1.48.0", features = ["rt","rt-multi-thread","fs","signal","fs","time","macros"] } anyhow = "1.0" diff --git a/core/src/components/identity/src/helpers.rs b/core/src/components/identity/src/helpers.rs index 3a0be5b..5e236e3 100644 --- a/core/src/components/identity/src/helpers.rs +++ b/core/src/components/identity/src/helpers.rs @@ -1,6 +1,7 @@ #![allow(warnings)] use crate::enums::IpProtocols; -use crate::structs::{PacketLog, VethLog}; +use crate::structs::{PacketLog, TcpPacketRegistry, VethLog}; +use anyhow::Error; use aya::programs::tc::SchedClassifierLinkId; use aya::{ Bpf, @@ -8,10 +9,18 @@ use aya::{ programs::{SchedClassifier, TcAttachType}, }; use bytes::BytesMut; +use cortexbrain_common::constants; +use k8s_openapi::api::core::v1::Pod; +use kube::api::ObjectList; +use kube::{Api, Client}; use nix::net::if_::if_nameindex; use std::collections::HashMap; +use std::fs; +use std::hash::Hash; +use std::path::PathBuf; use std::result::Result::Ok; use std::sync::Mutex; +use std::time::Duration; use std::{ borrow::BorrowMut, net::Ipv4Addr, @@ -20,8 +29,9 @@ use std::{ atomic::{AtomicBool, Ordering}, }, }; -use tracing::{error, info, warn}; -use cortexbrain_common::constants; +use tokio::time; +use tracing::{debug, error, info, warn}; +use tracing_subscriber::fmt::format; /* * TryFrom Trait implementation for IpProtocols enum @@ -70,7 +80,10 @@ pub async fn display_events>( ); } Err(_) => { - info!("Event Id: {} Protocol: Unknown ({})", event_id, pl.proto) + info!( + "Event Id: {} Protocol: Unknown ({})", + event_id, pl.proto + ); } }; } else { @@ -89,7 +102,7 @@ pub async fn display_events>( pub fn reverse_be_addr(addr: u32) -> Ipv4Addr { let mut octects = addr.to_be_bytes(); - let [a,b,c,d] = [octects[3], octects[2], octects[1], octects[0]]; + let [a, b, c, d] = [octects[3], octects[2], octects[1], octects[0]]; let reversed_ip = Ipv4Addr::new(a, b, c, d); reversed_ip } @@ -148,7 +161,7 @@ pub async fn display_veth_events>( .await { std::result::Result::Ok(_) => { - info!("Attach/Detach veth function attached correctly") + info!("Attach/Detach veth function attached correctly"); } Err(e) => error!( "Error attaching Attach/Detach function. Error : {}", @@ -234,7 +247,7 @@ async fn attach_detach_veth( } } 2 => { - // INFO: Detaching occurs automatically when veth is deleted by kernel itsel + // INFO: Detaching occurs automatically when veth is deleted by kernel itself let mut link_ids = link_ids.lock().unwrap(); match link_ids.remove(iface) { Some(_) => { @@ -252,3 +265,313 @@ async fn attach_detach_veth( } Ok(()) } + +// CHECK THIS DIR: /sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-besteffort.slice +/* helper functions to display events from the TcpPacketRegistry structure */ +pub async fn display_tcp_registry_events>( + mut perf_buffers: Vec>, + running: Arc, + mut buffers: Vec, +) { + while running.load(Ordering::SeqCst) { + for buf in perf_buffers.iter_mut() { + match buf.read_events(&mut buffers) { + std::result::Result::Ok(events) => { + for i in 0..events.read { + let data = &buffers[i]; + if data.len() >= std::mem::size_of::() { + let tcp_pl: TcpPacketRegistry = + unsafe { std::ptr::read(data.as_ptr() as *const _) }; + let src = reverse_be_addr(tcp_pl.src_ip); + let dst = reverse_be_addr(tcp_pl.dst_ip); + let src_port = u16::from_be(tcp_pl.src_port); + let dst_port = u16::from_be(tcp_pl.dst_port); + let event_id = tcp_pl.pid; + let command = tcp_pl.command.to_vec(); + let end = command + .iter() + .position(|&x| x == 0) + .unwrap_or(command.len()); + let command_str = String::from_utf8_lossy(&command[..end]).to_string(); + let cgroup_id = tcp_pl.cgroup_id; + + // construct the parent path + //let proc_path = PathBuf::from("/proc") + // .join(event_id.to_string()) + // .join("cgroup"); + + //let proc_content = fs::read_to_string(&proc_path); + //match proc_content { + // Ok(proc_content) => { + match IpProtocols::try_from(tcp_pl.proto) { + std::result::Result::Ok(proto) => { + info!( + "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{} Command: {} Cgroup_id: {}", + event_id, + proto, + src, + src_port, + dst, + dst_port, + command_str, + cgroup_id //proc_content + ); + } + Err(_) => { + info!( + "Event Id: {} Protocol: Unknown ({})", + event_id, tcp_pl.proto + ); + } + }; + //} + //Err(e) => + // eprintln!( + // "An error occured while accessing the content from the {:?} path: {}", + // &proc_path, + // e + // ), + } else { + warn!("Received packet data too small: {} bytes", data.len()); + } + } + } + Err(e) => { + error!("Error reading events: {:?}", e); + } + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } +} + +pub async fn scan_cgroup_paths(path: String) -> Result, Error> { + let mut cgroup_paths: Vec = Vec::new(); + let default_path = "/sys/fs/cgroup/kubepods.slice".to_string(); + + let target_path = if fs::metadata(&path).is_err() { + error!("Using default path: {}", &default_path); + default_path + } else { + path + }; + let entries = match fs::read_dir(&target_path) { + Ok(entries) => entries, + Err(e) => { + error!( + "Error reading cgroup directory {:?}: {}", + &target_path.clone(), + e + ); + return Ok(cgroup_paths); + } + }; + for entry in entries { + if let Ok(entry) = entry { + let path = entry.path(); + if path.is_dir() { + if let Some(path_str) = path.to_str() { + cgroup_paths.push(path_str.to_string()); + } + } + } + } + + Ok(cgroup_paths) +} + +pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> { + let interval = std::time::Duration::from_secs(time_delta); + let mut discovered_pods = HashMap::::new(); + while true { + let scanned_paths = scan_cgroup_paths("/sys/fs/cgroup/kubelet.slice".to_string()) + .await + .expect("An error occured during the cgroup scan"); + //--> this should return : + // /sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice + // /sys/fs/cgroup/kubelet.slice/kubelet.service + let mut scanned_subpaths = Vec::::new(); + for path in scanned_paths { + //info!("Scanned cgroup path: {}", path); + // scan the subgroups + let subpaths = scan_cgroup_paths(path.to_string()).await; + match subpaths { + Ok(paths) => { + for subpath in paths { + scanned_subpaths.push(subpath); + } + // ---> this should return the cgroups files and also : + // kubelet-kubepods-burstable.slice + // kubelet-kubepods-besteffort.slice + + // this directories needs to be scanned again to get further information about the pods + // for example: + // kubelet-kubepods-besteffort-pod088f8704_24f0_4636_a8e2_13f75646f370.slice + // where pod088f8704_24f0_4636_a8e2_13f75646f370 is the pod UID + } + Err(e) => { + error!("An error occured during the cgroup subpath scan: {}", e); + continue; + } + } + } + + let mut scanned_subpaths_v2 = Vec::::new(); + // second cgroup scan level to get the pod UIDs + for scanned_subpath in &scanned_subpaths { + let subpaths_v2 = scan_cgroup_paths(scanned_subpath.to_string()).await; + match subpaths_v2 { + Ok(paths) => { + for sub2 in paths { + scanned_subpaths_v2.push(sub2); + // this contains the addressed like this + //kubelet-kubepods-besteffort-pod088f8704_24f0_4636_a8e2_13f75646f370.slice + } + } + Err(e) => { + error!("An error occured during the cgroup subpath v2 scan: {}", e); + continue; + } + } + } + + //read the subpaths + let mut uids = Vec::::new(); + for subpath in scanned_subpaths_v2 { + let uid = extract_pod_uid(subpath.clone()) + .expect("An error occured during the extraction of pod UIDs"); + debug!("Debugging extracted UID: {:?}", &uid); + uids.push(uid); + } + // get pod information from UID and store the info in an HashMqp for O(1) access + let service_map = get_pod_info().await?; + + for (uid) in uids { + if let Some(name) = service_map.get(&uid) { + info!("UID (from eBPF): {} name:(from K8s): {}", &uid, name); + } + } + + info!( + "Cronjob completed a cgroup scan cycle. Next scan will be in {} seconds", + time_delta + ); + time::sleep(interval).await; + } + + Ok(()) +} + +fn extract_pod_uid(cgroup_path: String) -> Result { + // example of cgroup path: + // /sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-besteffort.slice/kubelet-kubepods-besteffort-pod93580201_87d5_44e6_9779_f6153ca17637.slice + // or + // /sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-burstable.slice/kubelet-kubepods-burstable-poddd3a1c6b_af40_41b1_8e1c_9e31fe8d96cb.slice + + // split the path by "/" + let splits: Vec<&str> = cgroup_path.split("/").collect(); + let mut uid_vec = Vec::::new(); + debug!("Debugging splits: {:?}", &splits); + + let mut pod_split_vec = Vec::::new(); + + let index = extract_target_from_splits(splits.clone())?; + + let pod_split = splits[index] + .trim_start_matches("kubelet-kubepods-besteffort-") + .trim_start_matches("kubelet-kubepods-burstable-") + .trim_start_matches("kubepods-besteffort-") + .trim_start_matches("kubepods-burstable-"); + + let uid_ = pod_split + .trim_start_matches("pod") + .trim_end_matches(".slice"); //return uids with underscore (_) [ex.dd3a1c6b_af40_41b1_8e1c_9e31fe8d96cb] + + let uid = uid_.replace("_", "-"); + Ok(uid.to_string()) +} + +fn extract_target_from_splits(splits: Vec<&str>) -> Result { + for (index, split) in splits.iter().enumerate() { + // find the split that contains the word 'pod' + if split.contains("-pod") { + debug!("Target index; {}", index); + return Ok(index); + } + } + Err(Error::msg("'-pod' word not found in split")) +} + +/* unfortunately you cannot query the pods using the uids directly from ListParams */ +async fn query_all_pods() -> Result, Error> { + let client = Client::try_default() + .await + .expect("Cannot connect to kubernetes client"); + let pods: Api = Api::all(client); + let lp = kube::api::ListParams::default(); // default list params + let pod_list = pods + .list(&lp) + .await + .expect("An error occured during the pod list extraction"); + + Ok(pod_list) +} + +// fast pod caching system +async fn get_pod_info() -> Result, Error> { + let all_pods = query_all_pods().await?; + + let mut service_map = HashMap::::new(); + + for pod in all_pods { + if let (Some(name), Some(uid)) = (pod.metadata.name, pod.metadata.uid) { + service_map.insert(uid, name); + } + } + + Ok(service_map) +} + +mod tests { + use tracing_subscriber::fmt::format; + + use crate::helpers::{extract_pod_uid, extract_target_from_splits}; + + #[test] + fn extract_uid_from_string() { + let cgroup_paths = vec!["/sys/fs/cgroup/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-pod231bd2d7_0f09_4781_a4e1_e4ea026342dd.slice".to_string(), + "/sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-besteffort.slice/kubelet-kubepods-besteffort-pod231bd2d7_0f09_4781_a4e1_e4ea026342dd.slice".to_string()]; + + let mut uid_vec = Vec::::new(); + + for cgroup_path in cgroup_paths { + let uid = extract_pod_uid(cgroup_path) + .map_err(|e| format!("An error occured {}", e)) + .unwrap(); + uid_vec.push(uid); + } + + let check = vec![ + "231bd2d7-0f09-4781-a4e1-e4ea026342dd".to_string(), + "231bd2d7-0f09-4781-a4e1-e4ea026342dd".to_string(), + ]; + + assert_eq!(uid_vec, check); + } + + #[test] + fn test_extract_target_index() { + let cgroup_paths = vec!["/sys/fs/cgroup/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-pod231bd2d7_0f09_4781_a4e1_e4ea026342dd.slice".to_string(), + "/sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-besteffort.slice/kubelet-kubepods-besteffort-pod231bd2d7_0f09_4781_a4e1_e4ea026342dd.slice".to_string()]; + + let mut index_vec = Vec::::new(); + for cgroup_path in cgroup_paths { + let mut splits: Vec<&str> = cgroup_path.split("/").collect(); + + let target_index = extract_target_from_splits(splits).unwrap(); + index_vec.push(target_index); + } + let index_check = vec![6, 7]; + assert_eq!(index_vec, index_check); + } +} diff --git a/core/src/components/identity/src/main.rs b/core/src/components/identity/src/main.rs index 9ee00e9..1e7ca94 100644 --- a/core/src/components/identity/src/main.rs +++ b/core/src/components/identity/src/main.rs @@ -12,26 +12,39 @@ mod enums; mod helpers; -mod structs; mod map_handlers; +mod structs; use aya::{ - Bpf, - maps::{ Map, MapData, perf::{ PerfEventArray, PerfEventArrayBuffer } }, - programs::{ KProbe, SchedClassifier, TcAttachType, tc::SchedClassifierLinkId }, + Ebpf, + maps::{ + Map, MapData, + perf::{PerfEventArray, PerfEventArrayBuffer}, + }, + programs::{KProbe, SchedClassifier, TcAttachType, tc::SchedClassifierLinkId}, util::online_cpus, }; -use crate::helpers::{ display_events, display_veth_events, get_veth_channels }; -use crate::map_handlers::{ init_bpf_maps, map_pinner,populate_blocklist }; +use crate::helpers::{ + display_events, display_tcp_registry_events, display_veth_events, get_veth_channels, + scan_cgroup_cronjob, +}; +use crate::map_handlers::{init_bpf_maps, map_pinner, populate_blocklist}; use bytes::BytesMut; -use std::{ convert::TryInto, path::Path, sync::{ Arc, Mutex, atomic::{ AtomicBool, Ordering } } }; +use std::{ + convert::TryInto, + path::Path, + sync::{ + Arc, Mutex, + atomic::{AtomicBool, Ordering}, + }, +}; -use anyhow::{ Context, Ok }; -use tokio::{ fs, signal }; -use tracing::{ error, info }; +use anyhow::{Context, Ok}; use cortexbrain_common::{constants, logger}; +use tokio::{fs, signal}; +use tracing::{error, info}; use std::collections::HashMap; @@ -47,13 +60,15 @@ async fn main() -> Result<(), anyhow::Error> { let link_ids = Arc::new(Mutex::new(HashMap::::new())); //init conntracker data path - let bpf_path = std::env::var(constants::BPF_PATH).context("BPF_PATH environment variable required")?; - let data = fs::read(Path::new(&bpf_path)).await.context("failed to load file from path")?; + let bpf_path = + std::env::var(constants::BPF_PATH).context("BPF_PATH environment variable required")?; + let data = fs::read(Path::new(&bpf_path)) + .await + .context("failed to load file from path")?; //init bpf data - let bpf = Arc::new(Mutex::new(Bpf::load(&data)?)); - let bpf_map_save_path = std::env - ::var(constants::PIN_MAP_PATH) + let bpf = Arc::new(Mutex::new(Ebpf::load(&data)?)); + let bpf_map_save_path = std::env::var(constants::PIN_MAP_PATH) .context("PIN_MAP_PATH environment variable required")?; match init_bpf_maps(bpf.clone()) { @@ -61,7 +76,7 @@ async fn main() -> Result<(), anyhow::Error> { info!("Successfully loaded bpf maps"); let pin_path = std::path::PathBuf::from(&bpf_map_save_path); info!("About to call map_pinner with path: {:?}", pin_path); - match map_pinner(&bpf_maps, &pin_path).await { + match map_pinner(&bpf_maps, &pin_path) { std::result::Result::Ok(_) => { info!("maps pinned successfully"); //load veth_trace program ref veth_trace.rs @@ -76,22 +91,24 @@ async fn main() -> Result<(), anyhow::Error> { { populate_blocklist(&mut bpf_maps.2).await; } - + { - init_tc_classifier(bpf.clone(), interfaces, link_ids.clone()) - .await - .context( - "An error occured during the execution of attach_bpf_program function", - )?; + init_tc_classifier(bpf.clone(), interfaces, link_ids.clone()).await.context( + "An error occured during the execution of attach_bpf_program function" + )?; + } + { + init_tcp_registry(bpf.clone()).await.context( + "An error occured during the execution of init_tcp_registry function", + )?; } - event_listener(bpf_maps, link_ids.clone(), bpf.clone()).await.context( - "Error initializing event_listener" - )?; + event_listener(bpf_maps, link_ids.clone(), bpf.clone()) + .await + .context("Error initializing event_listener")?; } Err(e) => { error!("Error while pinning bpf_maps: {}", e); - signal::ctrl_c(); } } } @@ -106,9 +123,9 @@ async fn main() -> Result<(), anyhow::Error> { //attach the tc classifier program to a vector of interfaces async fn init_tc_classifier( - bpf: Arc>, + bpf: Arc>, ifaces: Vec, - link_ids: Arc>> + link_ids: Arc>>, ) -> Result<(), anyhow::Error> { //this funtion initialize the tc classifier program info!("Loading programs"); @@ -121,23 +138,31 @@ async fn init_tc_classifier( .try_into() .context("Failed to init SchedClassifier program")?; - program.load().context("Failed to load identity_classifier program")?; + program + .load() + .context("Failed to load identity_classifier program")?; for interface in ifaces { match program.attach(&interface, TcAttachType::Ingress) { std::result::Result::Ok(link_id) => { - info!("Program 'identity_classifier' attached to interface {}", interface); + info!( + "Program 'identity_classifier' attached to interface {}", + interface + ); let mut map = link_ids.lock().unwrap(); map.insert(interface.clone(), link_id); } - Err(e) => error!("Error attaching program to interface {}: {:?}", interface, e), + Err(e) => error!( + "Error attaching program to interface {}: {:?}", + interface, e + ), } } Ok(()) } -async fn init_veth_tracer(bpf: Arc>) -> Result<(), anyhow::Error> { +async fn init_veth_tracer(bpf: Arc>) -> Result<(), anyhow::Error> { //this functions init the veth_tracer used to make the InterfacesRegistry let mut bpf_new = bpf.lock().unwrap(); @@ -159,7 +184,9 @@ async fn init_veth_tracer(bpf: Arc>) -> Result<(), anyhow::Error> { .program_mut("veth_deletion_trace") .ok_or_else(|| anyhow::anyhow!("program 'veth_deletion_trace' not found"))? .try_into()?; - veth_deletion_tracer.load().context("Failed to load deletetion_tracer program")?; + veth_deletion_tracer + .load() + .context("Failed to load deletetion_tracer program")?; match veth_deletion_tracer.attach("unregister_netdevice_queue", 0) { std::result::Result::Ok(_) => info!("veth_deletion_trace program attached successfully"), @@ -169,10 +196,48 @@ async fn init_veth_tracer(bpf: Arc>) -> Result<(), anyhow::Error> { Ok(()) } +async fn init_tcp_registry(bpf: Arc>) -> Result<(), anyhow::Error> { + let mut bpf_new = bpf.lock().unwrap(); + + // init tcp registry + let tcp_analyzer: &mut KProbe = bpf_new + .program_mut("tcp_message_tracer") + .ok_or_else(|| anyhow::anyhow!("program 'tcp_message_tracer' not found"))? + .try_into()?; + + tcp_analyzer + .load() + .context("Failed to load tcp_message_tracer")?; + + info!("initializing tcp tracing functions"); + + match tcp_analyzer.attach("tcp_v4_rcv", 0) { + std::result::Result::Ok(_) => { + info!("tcp_message_tracer attached successfully to the tcp_v4_rcv function ") + } + Err(e) => error!( + "Error attaching tcp_message_tracer to the tcp_v4_rcv function. Error: {:?}", + e + ), + } + + match tcp_analyzer.attach("tcp_v4_connect", 0) { + std::result::Result::Ok(_) => { + info!("tcp_message_tracer attached successfully to the tcp_v4_connect function ") + } + Err(e) => error!( + "Error attaching tcp_message_tracer to the tcp_v4_connect function. Error: {:?}", + e + ), + } + + Ok(()) +} + async fn event_listener( - bpf_maps: (Map, Map, Map), + bpf_maps: (Map, Map, Map, Map), link_ids: Arc>>, - bpf: Arc> + bpf: Arc>, ) -> Result<(), anyhow::Error> { // this function init the event listener. Listens for veth events (creation/deletion) and network events (pod to pod communications) /* Doc: @@ -194,11 +259,14 @@ async fn event_listener( // init PerfEventArrays let mut perf_veth_array: PerfEventArray = PerfEventArray::try_from(bpf_maps.1)?; let mut perf_net_events_array: PerfEventArray = PerfEventArray::try_from(bpf_maps.0)?; - /* let mut connections_perf_array = PerCpuHashMap::<&mut MapData,u8,ConnArray>::try_from(connections_map_raw)?; //change with lru hash map*/ - //init PerfEventArrays buffers + let mut tcp_registry_array: PerfEventArray = PerfEventArray::try_from(bpf_maps.3)?; + + // init PerfEventArrays buffers let mut perf_veth_buffer: Vec> = Vec::new(); let mut perf_net_events_buffer: Vec> = Vec::new(); - /* let mut connections_perf_buffers = Vec::new(); */ + let mut tcp_registry_buffer: Vec> = Vec::new(); + + // fill the input buffers for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? { let veth_buf: PerfEventArrayBuffer = perf_veth_array.open(cpu_id, None)?; @@ -208,34 +276,62 @@ async fn event_listener( let events_buf: PerfEventArrayBuffer = perf_net_events_array.open(cpu_id, None)?; perf_net_events_buffer.push(events_buf); } + for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? { + let tcp_registry_buf: PerfEventArrayBuffer = + tcp_registry_array.open(cpu_id, None)?; + tcp_registry_buffer.push(tcp_registry_buf); + } + info!("Listening for events..."); + // init runnings let veth_running = Arc::new(AtomicBool::new(true)); let net_events_running = Arc::new(AtomicBool::new(true)); + let tcp_registry_running = Arc::new(AtomicBool::new(true)); + // init output buffers let mut veth_buffers = vec![BytesMut::with_capacity(1024); 10]; let mut events_buffers = vec![BytesMut::with_capacity(1024); online_cpus().iter().len()]; - // let mut connections_buffers = vec![BytesMut::with_capacity(1024); 10]; + let mut tcp_buffers = vec![BytesMut::with_capacity(1024); online_cpus().iter().len()]; + // init running signals let veth_running_signal = veth_running.clone(); let net_events_running_signal = net_events_running.clone(); + let tcp_registry_running_signal = tcp_registry_running.clone(); + let veth_link_ids = link_ids.clone(); - //display_events(perf_buffers, running, buffers).await; let veth_events_displayer = tokio::spawn(async move { display_veth_events( bpf.clone(), perf_veth_buffer, veth_running, veth_buffers, - veth_link_ids - ).await; + veth_link_ids, + ) + .await; }); + + // IDEA: Maybe we don't need to display all this events let net_events_displayer = tokio::spawn(async move { display_events(perf_net_events_buffer, net_events_running, events_buffers).await; }); + let tcp_registry_events_displayer: tokio::task::JoinHandle<()> = tokio::spawn(async move { + display_tcp_registry_events(tcp_registry_buffer, tcp_registry_running, tcp_buffers).await; + }); + + let scan_cgroup_cronjob = tokio::spawn(async move { + let _ = scan_cgroup_cronjob(180).await; + }); + tokio::select! { + result = scan_cgroup_cronjob=>{ + match result{ + Err(e)=>error!("scan_cgroup_cronjob panicked {:?}",e), + std::result::Result::Ok(_) => info!("cgroup scan cronjob exited"), + } + } result = veth_events_displayer=>{ match result{ Err(e)=>error!("veth_event_displayer panicked {:?}",e), @@ -244,15 +340,24 @@ async fn event_listener( } result = net_events_displayer=>{ - match result{ + match result{ Err(e)=>error!("net_event_displayer panicked {:?}",e), - std::result::Result::Ok(_) => info!("Found new net_event"), + std::result::Result::Ok(_) => info!("Found new net_event"), } } + + result = tcp_registry_events_displayer => { + match result{ + Err(e)=>error!("tcp_registry_events_displayer panicked {:?}",e), + std::result::Result::Ok(_)=>info!("Found new tcp_register event") + } + } + _= signal::ctrl_c()=>{ info!("Triggered Exiting..."); veth_running_signal.store(false, Ordering::SeqCst); net_events_running_signal.store(false, Ordering::SeqCst); + tcp_registry_running_signal.store(false, Ordering::SeqCst); } } diff --git a/core/src/components/identity/src/map_handlers.rs b/core/src/components/identity/src/map_handlers.rs index 6f2e818..a225a47 100644 --- a/core/src/components/identity/src/map_handlers.rs +++ b/core/src/components/identity/src/map_handlers.rs @@ -10,10 +10,10 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use std::sync::Mutex; -use tokio::fs; +use tracing::warn; use tracing::{error, info}; -pub fn init_bpf_maps(bpf: Arc>) -> Result<(Map, Map, Map), anyhow::Error> { +pub fn init_bpf_maps(bpf: Arc>) -> Result<(Map, Map, Map, Map), anyhow::Error> { // this function init the bpfs maps used in the main program /* index 0: events_map @@ -34,35 +34,45 @@ pub fn init_bpf_maps(bpf: Arc>) -> Result<(Map, Map, Map), anyhow::E .take_map("Blocklist") .ok_or_else(|| anyhow::anyhow!("Blocklist map not found"))?; - // + let tcp_registry_map = bpf_new + .take_map("TcpPacketRegistry") + .ok_or_else(|| anyhow::anyhow!("TcpPacketRegistry map not found"))?; - Ok((events_map, veth_map, blocklist_map)) + Ok((events_map, veth_map, blocklist_map, tcp_registry_map)) } //TODO: save bpf maps path in the cli metadata //takes an array of bpf maps and pin them to persiste session data //TODO: change maps type with a Vec instead of (Map,Map). This method is only for fast development and it's not optimized //TODO: add bpf mounts during cli installation -pub async fn map_pinner(maps: &(Map, Map, Map), path: &PathBuf) -> Result<(), Error> { - // check if the map exists +pub fn map_pinner(maps: &(Map, Map, Map, Map), path: &PathBuf) -> Result<(), Error> { if !path.exists() { info!("Pin path {:?} does not exist. Creating it...", path); - fs::create_dir_all(&path).await?; + std::fs::create_dir_all(&path)?; #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; - fs::set_permissions(&path, std::fs::Permissions::from_mode(0o755)).await?; + std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o755))?; } } - let map1_path = path.join("events_map"); - let map2_path = path.join("veth_map"); - let map3_path = path.join("blocklist_map"); + let configs = [ + (&maps.0, "events_map"), + (&maps.1, "veth_map"), + (&maps.2, "blocklist_map"), + (&maps.3, "tcp_packet_registry"), + ]; - // maps pinning - maps.0.pin(&map1_path)?; - maps.1.pin(&map2_path)?; - maps.2.pin(&map3_path)?; + for (name, paths) in configs { + let map_path = path.join(paths); + if map_path.exists() { + warn!("Path {} already exists", paths); + warn!("Removing path {}", paths); + let _ = std::fs::remove_file(&map_path); + } + info!("Trying to pin map {:?} in map path: {:?}", name, &map_path); + name.pin(&map_path)?; + } Ok(()) } @@ -71,8 +81,7 @@ pub async fn populate_blocklist(map: &mut Map) -> Result<(), Error> { let namespace = "cortexflow"; let configmap = "cortexbrain-client-config"; - let mut blocklist_map =HashMap::<_, [u8; 4],[u8;4]>::try_from(map)?; - + let mut blocklist_map = HashMap::<_, [u8; 4], [u8; 4]>::try_from(map)?; let api: Api = Api::namespaced(client, namespace); match api.get(configmap).await { diff --git a/core/src/components/identity/src/structs.rs b/core/src/components/identity/src/structs.rs index ae9c6db..d8cff93 100644 --- a/core/src/components/identity/src/structs.rs +++ b/core/src/components/identity/src/structs.rs @@ -41,3 +41,16 @@ pub struct VethLog { pub netns: u32, pub pid: u32, } + +#[repr(C)] +#[derive(Clone, Copy)] +pub struct TcpPacketRegistry{ + pub proto: u8, + pub src_ip: u32, + pub dst_ip: u32, + pub src_port: u16, + pub dst_port: u16, + pub pid: u32, + pub command: [u8;16], + pub cgroup_id: u64, +} \ No newline at end of file diff --git a/core/src/testing/identity.yaml b/core/src/testing/identity.yaml index 968894b..675f60d 100644 --- a/core/src/testing/identity.yaml +++ b/core/src/testing/identity.yaml @@ -34,12 +34,12 @@ spec: mountPath: /sys/fs/bpf mountPropagation: Bidirectional readOnly: false - - name: proc - mountPath: /host/proc - readOnly: false - name: kernel-dev mountPath: /lib/modules readOnly: false + - name: cgroup + mountPath: /sys/fs/cgroup + readOnly: true securityContext: runAsUser: 0 privileged: true @@ -53,7 +53,7 @@ spec: - SYS_PTRACE containers: - name: identity - image: lorenzotettamanti/cortexflow-identity:0.1.1-beta.2 + image: lorenzotettamanti/cortexflow-identity:0.1.1-cgroup_scannerv_exp command: ["/bin/bash", "-c"] args: - | @@ -82,12 +82,12 @@ spec: mountPath: /sys/fs/bpf mountPropagation: Bidirectional readOnly: false - - name: proc - mountPath: /host/proc - readOnly: false - name: kernel-dev mountPath: /lib/modules readOnly: false + - name: cgroup + mountPath: /sys/fs/cgroup + readOnly: true securityContext: privileged: true allowPrivilegeEscalation: true @@ -106,12 +106,12 @@ spec: mountPath: /sys/fs/bpf mountPropagation: Bidirectional readOnly: false - - name: proc - mountPath: /host/proc - readOnly: false - name: kernel-dev mountPath: /lib/modules readOnly: false + - name: cgroup + mountPath: /sys/fs/cgroup + readOnly: true resources: limits: cpu: "1" @@ -134,11 +134,11 @@ spec: hostPath: path: /sys/fs/bpf type: Directory - - name: proc - hostPath: - path: /proc - type: Directory - name: kernel-dev hostPath: path: /lib/modules type: Directory + - name: cgroup + hostPath: + path: /sys/fs/cgroup + type: Directory diff --git a/core/src/testing/rolebinding.yaml b/core/src/testing/rolebinding.yaml index 2c70dfb..b4dbf7b 100644 --- a/core/src/testing/rolebinding.yaml +++ b/core/src/testing/rolebinding.yaml @@ -21,3 +21,25 @@ roleRef: kind: Role name: coredns-configmap-access apiGroup: rbac.authorization.k8s.io +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: cf-api-pods-access +rules: +- apiGroups: [ "" ] + resources: [ "pods" ] + verbs: [ "get", "list", "watch" ] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: cf-api-pods-access-binding +subjects: +- kind: ServiceAccount + name: default + namespace: cortexflow +roleRef: + kind: ClusterRole + name: cf-api-pods-access + apiGroup: rbac.authorization.k8s.io