From e35033756b5866be659475931814e4df2f591cd9 Mon Sep 17 00:00:00 2001 From: Gagan Yarramsetty Date: Tue, 16 Jun 2026 09:53:59 +0530 Subject: [PATCH] fix(rbac): resolve RBAC deadlocks via sequential locking Eliminated nested `RwLock` acquisitions by introducing `AuthSnapShot` for read paths and sequential lock-dropping for user mutations. Server remains deadlock-free under `quest-parallel` concurrent loads. --- src/rbac/map.rs | 285 +++++++++++++++++++++++------------------------- src/rbac/mod.rs | 156 +++++++++++++++++--------- 2 files changed, 241 insertions(+), 200 deletions(-) diff --git a/src/rbac/map.rs b/src/rbac/map.rs index 652e78ed4..84520e1d9 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -193,6 +193,104 @@ pub enum SessionKey { pub type UserSessionMap = HashMap)>>; +pub struct AuthSnapShot { + pub username: String, + pub tenant_id: String, + pub permissions: Vec, +} +// Unlike `check_auth`, this evaluates a pre-cloned snapshot instead of directly querying the SESSIONS map. +// It executes completely lock-free, avoiding holding the SESSIONS read lock during the heavy permission matching. +// This breaks the lock-order inversion deadlock by ensuring cross-map reads happen outside the SESSIONS lock scope. +pub fn check_auth_snapshot( + snapshot: AuthSnapShot, + required_action: Action, + context_resource: Option<&str>, + context_user: Option<&str>, +) -> Response { + let AuthSnapShot { + username, + tenant_id, + permissions, + } = snapshot; + + let mut perms: HashSet = HashSet::from_iter(permissions); + perms.extend(aggregate_group_permissions(&username, &tenant_id)); + + if perms.iter().any(|user_perm| match *user_perm { + Permission::Unit(action) => { + action == required_action || action == Action::All || action == Action::SuperAdmin + } + Permission::Resource(action, ref resource_type) => { + if let Some(resource_type) = resource_type.as_ref() { + match resource_type { + ParseableResourceType::Stream(resource_id) + | ParseableResourceType::Llm(resource_id) => { + let ok_resource = if let Some(context_resource_id) = context_resource { + let is_internal = PARSEABLE + .get_stream(context_resource_id, &Some(tenant_id.to_owned())) + .is_ok_and(|stream| { + stream + .get_stream_type() + .eq(&crate::storage::StreamType::Internal) + }); + resource_id == context_resource_id || resource_id == "*" || is_internal + } else { + true + }; + (action == required_action + || action == Action::All + || action == Action::SuperAdmin) + && ok_resource + } + ParseableResourceType::All => { + action == required_action + || action == Action::All + || action == Action::SuperAdmin + } + } + } else if resource_type.is_none() + && matches!( + action, + Action::Ingest + | Action::Query + | Action::ListStream + | Action::GetSchema + | Action::GetStats + | Action::GetRetention + | Action::PutRetention + | Action::GetLLM + | Action::QueryLLM + | Action::ListLLM + ) + { + let ok_resource = if let Some(context_resource_id) = context_resource { + let is_internal = PARSEABLE + .get_stream(context_resource_id, &Some(tenant_id.to_owned())) + .is_ok_and(|stream| { + stream + .get_stream_type() + .eq(&crate::storage::StreamType::Internal) + }); + !is_internal + } else { + true + }; + action == required_action && ok_resource + } else { + false + } + } + Permission::SelfUser if required_action == Action::GetUserRoles => { + context_user.map(|x| x == username).unwrap_or_default() + } + _ => false, + }) { + Response::Authorized + } else { + Response::UnAuthorized + } +} + #[derive(Debug, Default)] pub struct Sessions { // map session key to user, tenant, and their permission @@ -218,6 +316,16 @@ impl Sessions { self.user_sessions.remove(tenant_id); } + pub fn auth_snapshot(&self, key: &SessionKey) -> Option { + self.active_sessions + .get(key) + .map(|(username, tenant_id, perms)| AuthSnapShot { + username: username.clone(), + tenant_id: tenant_id.clone(), + permissions: perms.clone(), + }) + } + // only checks if the session is expired or not pub fn is_session_expired(&self, key: &SessionKey) -> bool { // fetch userid from session key @@ -328,120 +436,6 @@ impl Sessions { self.active_sessions.get(key).map(|(_, _, perms)| perms) } - // returns None if user is not in the map - // Otherwise returns Some(Response) where response is authorized/unauthorized - pub fn check_auth( - &self, - key: &SessionKey, - required_action: Action, - context_resource: Option<&str>, - context_user: Option<&str>, - ) -> Option { - self.active_sessions - .get(key) - .map(|(username, tenant_id, perms)| { - let mut perms: HashSet = HashSet::from_iter(perms.clone()); - perms.extend(aggregate_group_permissions(username, tenant_id)); - - if perms.iter().any(|user_perm| { - match *user_perm { - // if any action is ALL then we we authorize - Permission::Unit(action) => { - action == required_action - || action == Action::All - || action == Action::SuperAdmin - } - Permission::Resource(action, ref resource_type) => { - if let Some(resource_type) = resource_type.as_ref() { - // default flow for all actions other than global-ingestion (ingestion action without any dataset restriction) - match resource_type { - ParseableResourceType::Stream(resource_id) - | ParseableResourceType::Llm(resource_id) => { - let ok_resource = - if let Some(context_resource_id) = context_resource { - let is_internal = PARSEABLE - .get_stream( - context_resource_id, - &Some(tenant_id.to_owned()), - ) - .is_ok_and(|stream| { - stream.get_stream_type().eq( - &crate::storage::StreamType::Internal, - ) - }); - resource_id == context_resource_id - || resource_id == "*" - || is_internal - } else { - // if no resource to match then resource check is not needed - // WHEN IS THIS VALID?? - true - }; - (action == required_action - || action == Action::All - || action == Action::SuperAdmin) - && ok_resource - } - ParseableResourceType::All => { - action == required_action - || action == Action::All - || action == Action::SuperAdmin - } - } - } else if resource_type.is_none() - && matches!( - action, - Action::Ingest - | Action::Query - | Action::ListStream - | Action::GetSchema - | Action::GetStats - | Action::GetRetention - | Action::PutRetention - | Action::GetLLM - | Action::QueryLLM - | Action::ListLLM - ) - { - // flow for global-ingestion / global-query / global-reader / global-writer - let ok_resource = - if let Some(context_resource_id) = context_resource { - let is_internal = PARSEABLE - .get_stream( - context_resource_id, - &Some(tenant_id.to_owned()), - ) - .is_ok_and(|stream| { - stream - .get_stream_type() - .eq(&crate::storage::StreamType::Internal) - }); - !is_internal - } else { - // if no resource to match then resource check is not needed - // WHEN IS THIS VALID?? - true - }; - action == required_action && ok_resource - } else { - // the default flow (some resource_type and an action) was covered in the first if - // if the resource type is also None and action is not ingest then return with false - false - } - } - Permission::SelfUser if required_action == Action::GetUserRoles => { - context_user.map(|x| x == username).unwrap_or_default() - } - _ => false, - } - }) { - Response::Authorized - } else { - Response::UnAuthorized - } - }) - } - pub fn get_user_and_tenant_id(&self, key: &SessionKey) -> Option<(String, String)> { self.active_sessions .get(key) @@ -493,49 +487,42 @@ impl From> for Users { fn aggregate_group_permissions(username: &str, tenant_id: &String) -> HashSet { let mut group_perms = HashSet::new(); - - let user = if let Some(tenant_users) = users().get(tenant_id) - && let Some(user) = tenant_users.get(username) - && !user.protected - { - user.to_owned() - } else { - return group_perms; + let user_groups: HashSet = { + if let Some(tenant_users) = users().get(tenant_id) + && let Some(user) = tenant_users.get(username) + && !user.protected + { + user.user_groups.clone() + } else { + return group_perms; + } }; - // let Some(user) = users().get(username).cloned() else { - // return group_perms; - // }; - if user.user_groups.is_empty() { - return group_perms; - } + for group_name in &user_groups { + let group_roles: HashSet = { + if let Some(groups) = read_user_groups().get(tenant_id) + && let Some(group) = groups.get(group_name) + { + group.roles.clone() + } else { + continue; + } + }; - for group_name in &user.user_groups { - if let Some(groups) = read_user_groups().get(tenant_id) - && let Some(group) = groups.get(group_name) - { - for role_name in group.roles.iter() { - let roles = if let Some(tenant_roles) = roles().get(tenant_id) - && let Some(role) = tenant_roles.get(role_name) + for role_name in group_roles { + let role_privileges = { + if let Some(tenant_roles) = roles().get(tenant_id) + && let Some(role) = tenant_roles.get(&role_name) { - role.clone() + role.privileges().to_vec() } else { continue; - }; - // let Some(privileges) = roles().get(role_name).cloned() else { - // continue; - // }; - - for role in roles.privileges() { - group_perms.extend(RoleBuilder::from(role).build()); } + }; + for privs in role_privileges { + group_perms.extend(RoleBuilder::from(&privs).build()); } - } else { - continue; - }; - // let Some(group) = read_user_groups().get(group_name).cloned() else { - // continue; - // }; + } } group_perms diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index 1d952bbab..787d5755c 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -59,9 +59,14 @@ pub struct Users; impl Users { pub fn put_user(&self, user: User) { - let tenant_id = user.tenant.as_deref().unwrap_or(DEFAULT_TENANT); - mut_sessions().remove_user(user.userid(), tenant_id); - mut_users().insert(user); + let userid = user.userid().to_owned(); // 1. Clone userid BEFORE moving user + let tenant_id = user.tenant.clone().unwrap_or(DEFAULT_TENANT.to_owned()); + // Lock USERS, insert, and DROP IT + { + let mut users = mut_users(); + users.insert(user); + } + mut_sessions().remove_user(&userid, &tenant_id); } pub fn get_user_groups(&self, userid: &str, tenant_id: &Option) -> HashSet { @@ -136,7 +141,10 @@ impl Users { pub fn delete_user(&mut self, userid: &str, tenant_id: &Option) { let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - self.remove_user(userid, tenant_id); + // Lock USERS, remove, and DROP IT + { + self.remove_user(userid, tenant_id); + } mut_sessions().remove_user(userid, tenant_id); } @@ -149,38 +157,59 @@ impl Users { // caller ensures that this operation is valid for the user pub fn change_password_hash(&self, userid: &str, hash: &String, tenant_id: &Option) { let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - if let Some(users) = mut_users().get_mut(tenant_id) - && let Some(User { - ty: UserType::Native(user), - .. - }) = users.get_mut(userid) - { - user.password_hash.clone_from(hash); - mut_sessions().remove_user(userid, tenant_id); + let password_updated = { + if let Some(users) = mut_users().get_mut(tenant_id) + && let Some(User { + ty: UserType::Native(user), + .. + }) = users.get_mut(userid) + { + user.password_hash.clone_from(hash); + true + } else { + false + } }; + if password_updated { + mut_sessions().remove_user(userid, tenant_id); + } } pub fn add_roles(&self, userid: &str, roles: HashSet, tenant_id: &Option) { let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - if let Some(users) = mut_users().get_mut(tenant_id) - && let Some(user) = users.get_mut(userid) - { - user.roles.extend(roles); - let new_perms = roles_to_permission(user.roles(), tenant_id); - mut_sessions().refresh_user_permissions(userid, tenant_id, &new_perms); + let new_roles = { + if let Some(users) = mut_users().get_mut(tenant_id) + && let Some(user) = users.get_mut(userid) + { + user.roles.extend(roles); + Some(user.roles()) + } else { + None + } }; + if let Some(new_roles) = new_roles { + let new_perms = roles_to_permission(new_roles, tenant_id); + mut_sessions().refresh_user_permissions(userid, tenant_id, &new_perms); + } } pub fn remove_roles(&self, userid: &str, roles: HashSet, tenant_id: &Option) { let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - if let Some(users) = mut_users().get_mut(tenant_id) - && let Some(user) = users.get_mut(userid) - { - let diff = HashSet::from_iter(user.roles.difference(&roles).cloned()); - user.roles = diff; - let new_perms = roles_to_permission(user.roles(), tenant_id); - mut_sessions().refresh_user_permissions(userid, tenant_id, &new_perms); + let new_roles = { + if let Some(users) = mut_users().get_mut(tenant_id) + && let Some(user) = users.get_mut(userid) + { + let diff = HashSet::from_iter(user.roles.difference(&roles).cloned()); + user.roles = diff; + Some(user.roles()) + } else { + None + } }; + if let Some(new_roles) = new_roles { + let new_perms = roles_to_permission(new_roles, tenant_id); + mut_sessions().refresh_user_permissions(userid, tenant_id, &new_perms); + } } pub fn contains(&self, userid: &str, tenant_id: &Option) -> bool { @@ -200,18 +229,27 @@ impl Users { let user_groups = self.get_user_groups(&userid, &Some(tenant_id.clone())); for group in user_groups { - if let Some(groups) = read_user_groups().get(&tenant_id) - && let Some(group) = groups.get(&group) - { - let group_roles = &group.roles; - for role in group_roles { - if let Some(roles) = roles().get(&tenant_id) - && let Some(privilege_list) = roles.get(role) + let group_roles: HashSet = { + if let Some(groups) = read_user_groups().get(&tenant_id) + && let Some(group) = groups.get(&group) + { + group.roles.clone() + } else { + continue; + } + }; + for role_name in group_roles { + let role_privileges: Vec<_> = { + if let Some(tenant_roles) = roles().get(&tenant_id) + && let Some(role) = tenant_roles.get(&role_name) { - for privelege in privilege_list.privileges() { - permissions.extend(RoleBuilder::from(privelege).build()); - } + role.privileges().to_vec() + } else { + continue; } + }; + for privs in role_privileges { + permissions.extend(RoleBuilder::from(&privs).build()); } } } @@ -230,11 +268,13 @@ impl Users { let tenant_id = &user.tenant; let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let perms = roles_to_permission(user.roles(), tenant); + mut_sessions().track_new( user.userid().to_owned(), session, Utc::now() + expires_in, - roles_to_permission(user.roles(), tenant), + perms, tenant_id, ); } @@ -246,10 +286,16 @@ impl Users { context_stream: Option<&str>, context_user: Option<&str>, ) -> Response { - // try fetch from auth map for faster auth flow - if let Some(res) = sessions().check_auth(&key, action, context_stream, context_user) { - return res; + // Get snap shot and drop session read lock + let snapshot = { + let sess = sessions(); + sess.auth_snapshot(&key) + }; + // Evaluate permissions completely lock-free + if let Some(snapshot) = snapshot { + return map::check_auth_snapshot(snapshot, action, context_stream, context_user); } + // attempt reloading permissions into new session for basic auth user // id user will be reloaded only through login endpoint let SessionKey::BasicAuth { username, password } = &key else { @@ -273,16 +319,25 @@ impl Users { // if user exists and password matches // add this user to auth map if basic_user.verify_password(password) { - let mut sessions = mut_sessions(); - sessions.track_new( - username.clone(), - key.clone(), - DateTime::::MAX_UTC, - roles_to_permission(user.roles(), tenant), - &user.tenant, - ); - return sessions - .check_auth(&key, action, context_stream, context_user) + let perms = roles_to_permission(user.roles(), tenant); + { + let mut sessions = mut_sessions(); + sessions.track_new( + username.clone(), + key.clone(), + DateTime::::MAX_UTC, + perms, + &user.tenant, + ); + } + let snapshot = { + let sess = sessions(); + sess.auth_snapshot(&key) + }; + return snapshot + .map(|snap| { + map::check_auth_snapshot(snap, action, context_stream, context_user) + }) .expect("entry for this key just added"); } } @@ -378,8 +433,7 @@ pub struct UsersPrism { pub fn roles_to_permission(roles: Vec, tenant_id: &str) -> Vec { let mut perms = HashSet::new(); for role in &roles { - let role_map = &map::roles(); - if let Some(roles) = role_map.get(tenant_id) + if let Some(roles) = map::roles().get(tenant_id) && let Some(privilege_list) = roles.get(role) { for privs in privilege_list.privileges() {