Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 136 additions & 149 deletions src/rbac/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,104 @@ pub enum SessionKey {

pub type UserSessionMap = HashMap<String, Vec<(SessionKey, DateTime<Utc>)>>;

pub struct AuthSnapShot {
pub username: String,
pub tenant_id: String,
pub permissions: Vec<Permission>,
}
// Unlike `check_auth`, this evaluates a pre-cloned snapshot instead of directly querying the SESSIONS map.
// It executes completely lock-free, avoiding holding the SESSIONS read lock during the heavy permission matching.
// This breaks the lock-order inversion deadlock by ensuring cross-map reads happen outside the SESSIONS lock scope.
pub fn check_auth_snapshot(
snapshot: AuthSnapShot,
required_action: Action,
context_resource: Option<&str>,
context_user: Option<&str>,
) -> Response {
let AuthSnapShot {
username,
tenant_id,
permissions,
} = snapshot;

let mut perms: HashSet<Permission> = HashSet::from_iter(permissions);
perms.extend(aggregate_group_permissions(&username, &tenant_id));

if perms.iter().any(|user_perm| match *user_perm {
Permission::Unit(action) => {
action == required_action || action == Action::All || action == Action::SuperAdmin
}
Permission::Resource(action, ref resource_type) => {
if let Some(resource_type) = resource_type.as_ref() {
match resource_type {
ParseableResourceType::Stream(resource_id)
| ParseableResourceType::Llm(resource_id) => {
let ok_resource = if let Some(context_resource_id) = context_resource {
let is_internal = PARSEABLE
.get_stream(context_resource_id, &Some(tenant_id.to_owned()))
.is_ok_and(|stream| {
stream
.get_stream_type()
.eq(&crate::storage::StreamType::Internal)
});
resource_id == context_resource_id || resource_id == "*" || is_internal
} else {
true
};
(action == required_action
|| action == Action::All
|| action == Action::SuperAdmin)
&& ok_resource
}
ParseableResourceType::All => {
action == required_action
|| action == Action::All
|| action == Action::SuperAdmin
}
}
} else if resource_type.is_none()
&& matches!(
action,
Action::Ingest
| Action::Query
| Action::ListStream
| Action::GetSchema
| Action::GetStats
| Action::GetRetention
| Action::PutRetention
| Action::GetLLM
| Action::QueryLLM
| Action::ListLLM
)
{
let ok_resource = if let Some(context_resource_id) = context_resource {
let is_internal = PARSEABLE
.get_stream(context_resource_id, &Some(tenant_id.to_owned()))
.is_ok_and(|stream| {
stream
.get_stream_type()
.eq(&crate::storage::StreamType::Internal)
});
!is_internal
} else {
true
};
action == required_action && ok_resource
} else {
false
}
}
Permission::SelfUser if required_action == Action::GetUserRoles => {
context_user.map(|x| x == username).unwrap_or_default()
}
_ => false,
}) {
Response::Authorized
} else {
Response::UnAuthorized
}
}

#[derive(Debug, Default)]
pub struct Sessions {
// map session key to user, tenant, and their permission
Expand All @@ -218,6 +316,16 @@ impl Sessions {
self.user_sessions.remove(tenant_id);
}

pub fn auth_snapshot(&self, key: &SessionKey) -> Option<AuthSnapShot> {
self.active_sessions
.get(key)
.map(|(username, tenant_id, perms)| AuthSnapShot {
username: username.clone(),
tenant_id: tenant_id.clone(),
permissions: perms.clone(),
})
}

// only checks if the session is expired or not
pub fn is_session_expired(&self, key: &SessionKey) -> bool {
// fetch userid from session key
Expand Down Expand Up @@ -328,120 +436,6 @@ impl Sessions {
self.active_sessions.get(key).map(|(_, _, perms)| perms)
}

// returns None if user is not in the map
// Otherwise returns Some(Response) where response is authorized/unauthorized
pub fn check_auth(
&self,
key: &SessionKey,
required_action: Action,
context_resource: Option<&str>,
context_user: Option<&str>,
) -> Option<Response> {
self.active_sessions
.get(key)
.map(|(username, tenant_id, perms)| {
let mut perms: HashSet<Permission> = HashSet::from_iter(perms.clone());
perms.extend(aggregate_group_permissions(username, tenant_id));

if perms.iter().any(|user_perm| {
match *user_perm {
// if any action is ALL then we we authorize
Permission::Unit(action) => {
action == required_action
|| action == Action::All
|| action == Action::SuperAdmin
}
Permission::Resource(action, ref resource_type) => {
if let Some(resource_type) = resource_type.as_ref() {
// default flow for all actions other than global-ingestion (ingestion action without any dataset restriction)
match resource_type {
ParseableResourceType::Stream(resource_id)
| ParseableResourceType::Llm(resource_id) => {
let ok_resource =
if let Some(context_resource_id) = context_resource {
let is_internal = PARSEABLE
.get_stream(
context_resource_id,
&Some(tenant_id.to_owned()),
)
.is_ok_and(|stream| {
stream.get_stream_type().eq(
&crate::storage::StreamType::Internal,
)
});
resource_id == context_resource_id
|| resource_id == "*"
|| is_internal
} else {
// if no resource to match then resource check is not needed
// WHEN IS THIS VALID??
true
};
(action == required_action
|| action == Action::All
|| action == Action::SuperAdmin)
&& ok_resource
}
ParseableResourceType::All => {
action == required_action
|| action == Action::All
|| action == Action::SuperAdmin
}
}
} else if resource_type.is_none()
&& matches!(
action,
Action::Ingest
| Action::Query
| Action::ListStream
| Action::GetSchema
| Action::GetStats
| Action::GetRetention
| Action::PutRetention
| Action::GetLLM
| Action::QueryLLM
| Action::ListLLM
)
{
// flow for global-ingestion / global-query / global-reader / global-writer
let ok_resource =
if let Some(context_resource_id) = context_resource {
let is_internal = PARSEABLE
.get_stream(
context_resource_id,
&Some(tenant_id.to_owned()),
)
.is_ok_and(|stream| {
stream
.get_stream_type()
.eq(&crate::storage::StreamType::Internal)
});
!is_internal
} else {
// if no resource to match then resource check is not needed
// WHEN IS THIS VALID??
true
};
action == required_action && ok_resource
} else {
// the default flow (some resource_type and an action) was covered in the first if
// if the resource type is also None and action is not ingest then return with false
false
}
}
Permission::SelfUser if required_action == Action::GetUserRoles => {
context_user.map(|x| x == username).unwrap_or_default()
}
_ => false,
}
}) {
Response::Authorized
} else {
Response::UnAuthorized
}
})
}

pub fn get_user_and_tenant_id(&self, key: &SessionKey) -> Option<(String, String)> {
self.active_sessions
.get(key)
Expand Down Expand Up @@ -493,49 +487,42 @@ impl From<Vec<User>> for Users {

fn aggregate_group_permissions(username: &str, tenant_id: &String) -> HashSet<Permission> {
let mut group_perms = HashSet::new();

let user = if let Some(tenant_users) = users().get(tenant_id)
&& let Some(user) = tenant_users.get(username)
&& !user.protected
{
user.to_owned()
} else {
return group_perms;
let user_groups: HashSet<String> = {
if let Some(tenant_users) = users().get(tenant_id)
&& let Some(user) = tenant_users.get(username)
&& !user.protected
{
user.user_groups.clone()
} else {
return group_perms;
}
};
// let Some(user) = users().get(username).cloned() else {
// return group_perms;
// };

if user.user_groups.is_empty() {
return group_perms;
}
for group_name in &user_groups {
let group_roles: HashSet<String> = {
if let Some(groups) = read_user_groups().get(tenant_id)
&& let Some(group) = groups.get(group_name)
{
group.roles.clone()
} else {
continue;
}
};

for group_name in &user.user_groups {
if let Some(groups) = read_user_groups().get(tenant_id)
&& let Some(group) = groups.get(group_name)
{
for role_name in group.roles.iter() {
let roles = if let Some(tenant_roles) = roles().get(tenant_id)
&& let Some(role) = tenant_roles.get(role_name)
for role_name in group_roles {
let role_privileges = {
if let Some(tenant_roles) = roles().get(tenant_id)
&& let Some(role) = tenant_roles.get(&role_name)
{
role.clone()
role.privileges().to_vec()
} else {
continue;
};
// let Some(privileges) = roles().get(role_name).cloned() else {
// continue;
// };

for role in roles.privileges() {
group_perms.extend(RoleBuilder::from(role).build());
}
};
for privs in role_privileges {
group_perms.extend(RoleBuilder::from(&privs).build());
}
} else {
continue;
};
// let Some(group) = read_user_groups().get(group_name).cloned() else {
// continue;
// };
}
}

group_perms
Expand Down
Loading
Loading