Source code for girder.api.v1.group

#!/usr/bin/env python
# -*- coding: utf-8 -*-

###############################################################################
#  Copyright 2013 Kitware Inc.
#
#  Licensed under the Apache License, Version 2.0 ( the "License" );
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
###############################################################################

from ..describe import Description, autoDescribeRoute
from ..rest import Resource, filtermodel
from girder.models.model_base import AccessException
from girder.constants import AccessType, SettingKey
from girder.utility import mail_utils
from girder.api import access


[docs]class Group(Resource): """API Endpoint for groups.""" def __init__(self): super(Group, self).__init__() self.resourceName = 'group' self.route('DELETE', (':id',), self.deleteGroup) self.route('DELETE', (':id', 'member'), self.removeFromGroup) self.route('DELETE', (':id', 'moderator'), self.demote) self.route('DELETE', (':id', 'admin'), self.demote) self.route('GET', (), self.find) self.route('GET', (':id',), self.getGroup) self.route('GET', (':id', 'access'), self.getGroupAccess) self.route('GET', (':id', 'invitation'), self.getGroupInvitations) self.route('GET', (':id', 'member'), self.listMembers) self.route('POST', (), self.createGroup) self.route('POST', (':id', 'invitation'), self.inviteToGroup) self.route('POST', (':id', 'member'), self.joinGroup) self.route('POST', (':id', 'moderator'), self.promoteToModerator) self.route('POST', (':id', 'admin'), self.promoteToAdmin) self.route('PUT', (':id',), self.updateGroup) @access.public @filtermodel(model='group') @autoDescribeRoute( Description('Search for groups or list all groups.') .param('text', 'Pass this to perform a full-text search for groups.', required=False) .param('exact', 'If true, only return exact name matches. This is ' 'case sensitive.', required=False, dataType='boolean', default=False) .pagingParams(defaultSort='name') .errorResponse() ) def find(self, text, exact, limit, offset, sort): user = self.getCurrentUser() if text is not None: if exact: groupList = self.model('group').find( {'name': text}, offset=offset, limit=limit, sort=sort) else: groupList = self.model('group').textSearch( text, user=user, offset=offset, limit=limit, sort=sort) else: groupList = self.model('group').list( user=user, offset=offset, limit=limit, sort=sort) return list(groupList) @access.user @filtermodel(model='group') @autoDescribeRoute( Description('Create a new group.') .responseClass('Group') .notes('Must be logged in.') .param('name', 'Unique name for the group.', strip=True) .param('description', 'Description of the group.', required=False, default='', strip=True) .param('public', 'Whether the group should be publicly visible.', required=False, dataType='boolean', default=False) .errorResponse() .errorResponse('Write access was denied on the parent', 403) ) def createGroup(self, name, description, public): return self.model('group').createGroup( name=name, creator=self.getCurrentUser(), description=description, public=public) @access.public @filtermodel(model='group') @autoDescribeRoute( Description('Get a group by ID.') .responseClass('Group') .modelParam('id', model='group', level=AccessType.READ) .errorResponse('ID was invalid.') .errorResponse('Read access was denied for the group.', 403) ) def getGroup(self, group): # Add in the current setting for adding to groups group['_addToGroupPolicy'] = self.model('setting').get(SettingKey.ADD_TO_GROUP_POLICY) return group @access.public @filtermodel(model='group', addFields={'access', 'requests'}) @autoDescribeRoute( Description('Get the access control list for a group.') .responseClass('Group') .modelParam('id', model='group', level=AccessType.READ) .errorResponse('ID was invalid.') .errorResponse('Read access was denied for the group.', 403) ) def getGroupAccess(self, group): groupModel = self.model('group') group['access'] = groupModel.getFullAccessList(group) group['requests'] = list(groupModel.getFullRequestList(group)) return group @access.public @filtermodel(model='user') @autoDescribeRoute( Description('Show outstanding invitations for a group.') .responseClass('Group') .modelParam('id', model='group', level=AccessType.READ) .pagingParams(defaultSort='lastName') .errorResponse() .errorResponse('Read access was denied for the group.', 403) ) def getGroupInvitations(self, group, limit, offset, sort): return list(self.model('group').getInvites(group, limit, offset, sort)) @access.user @filtermodel(model='group') @autoDescribeRoute( Description('Update a group by ID.') .modelParam('id', model='group', level=AccessType.WRITE) .param('name', 'The name to set on the group.', required=False, strip=True) .param('description', 'Description for the group.', required=False, strip=True) .param('public', 'Whether the group should be publicly visible', dataType='boolean', required=False) .param('addAllowed', 'Can admins or moderators directly add members ' 'to this group? Only system administrators are allowed to ' 'set this field', required=False, enum=['default', 'no', 'yesmod', 'yesadmin']) .errorResponse() .errorResponse('Write access was denied for the group.', 403) ) def updateGroup(self, group, name, description, public, addAllowed): if public is not None: self.model('group').setPublic(group, public) if name is not None: group['name'] = name if description is not None: group['description'] = description if addAllowed is not None: self.requireAdmin(self.getCurrentUser()) group['addAllowed'] = addAllowed return self.model('group').updateGroup(group) @access.user @filtermodel(model='group', addFields={'access', 'requests'}) @autoDescribeRoute( Description('Request to join a group, or accept an invitation to join.') .responseClass('Group') .modelParam('id', model='group', level=AccessType.READ) .errorResponse('ID was invalid.') .errorResponse('You were not invited to this group, or do not have ' 'read access to it.', 403) ) def joinGroup(self, group): groupModel = self.model('group') group = groupModel.joinGroup(group, self.getCurrentUser()) group['access'] = groupModel.getFullAccessList(group) group['requests'] = list(groupModel.getFullRequestList(group)) return group @access.public @filtermodel(model='user') @autoDescribeRoute( Description('List members of a group.') .modelParam('id', model='group', level=AccessType.READ) .pagingParams(defaultSort='lastName') .errorResponse('ID was invalid.') .errorResponse('Read access was denied for the group.', 403) ) def listMembers(self, group, limit, offset, sort): return list(self.model('group').listMembers(group, offset=offset, limit=limit, sort=sort)) @access.user @filtermodel(model='group', addFields={'access', 'requests'}) @autoDescribeRoute( Description("Invite a user to join a group, or accept a user's request to join.") .responseClass('Group') .notes('The "force" option to this endpoint is only available to ' 'administrators and can be used to bypass the invitation process' ' and instead add the user directly to the group.') .modelParam('id', model='group', level=AccessType.WRITE) .modelParam('userId', 'The ID of the user to invite or accept.', destName='userToInvite', level=AccessType.READ, paramType='form') .param('level', 'The access level the user will be given when they accept the invitation.', required=False, dataType='integer', default=AccessType.READ) .param('quiet', 'If you do not want this action to send an email to ' 'the target user, set this to true.', dataType='boolean', required=False, default=False) .param('force', 'Add user directly rather than sending an invitation ' '(admin-only option).', dataType='boolean', required=False, default=False) .errorResponse() .errorResponse('Write access was denied for the group.', 403) ) def inviteToGroup(self, group, userToInvite, level, quiet, force): groupModel = self.model('group') user = self.getCurrentUser() if force: if not user['admin']: mustBeAdmin = True addPolicy = self.model('setting').get(SettingKey.ADD_TO_GROUP_POLICY) addGroup = group.get('addAllowed', 'default') if addGroup not in ['no', 'yesadmin', 'yesmod']: addGroup = addPolicy if (groupModel.hasAccess( group, user, AccessType.ADMIN) and ('mod' in addPolicy or 'admin' in addPolicy) and addGroup.startswith('yes')): mustBeAdmin = False elif (groupModel.hasAccess( group, user, AccessType.WRITE) and 'mod' in addPolicy and addGroup == 'yesmod'): mustBeAdmin = False if mustBeAdmin: self.requireAdmin(user) groupModel.addUser(group, userToInvite, level=level) else: # Can only invite into access levels that you yourself have groupModel.requireAccess(group, user, level) groupModel.inviteUser(group, userToInvite, level) if not quiet: html = mail_utils.renderTemplate('groupInvite.mako', { 'userToInvite': userToInvite, 'user': user, 'group': group }) mail_utils.sendEmail( to=userToInvite['email'], text=html, subject="Girder: You've been invited to a group") group['access'] = groupModel.getFullAccessList(group) group['requests'] = list(groupModel.getFullRequestList(group)) return group @access.user @filtermodel(model='group', addFields={'access'}) @autoDescribeRoute( Description('Promote a member to be a moderator of the group.') .responseClass('Group') .modelParam('id', model='group', level=AccessType.ADMIN) .modelParam('userId', 'The ID of the user to promote.', level=AccessType.READ, paramType='formData') .errorResponse('ID was invalid.') .errorResponse("You don't have permission to promote users.", 403) ) def promoteToModerator(self, group, user): return self._promote(group, user, AccessType.WRITE) @access.user @filtermodel(model='group', addFields={'access'}) @autoDescribeRoute( Description('Promote a member to be an administrator of the group.') .responseClass('Group') .modelParam('id', model='group', level=AccessType.ADMIN) .modelParam('userId', 'The ID of the user to promote.', level=AccessType.READ, paramType='formData') .errorResponse('ID was invalid.') .errorResponse("You don't have permission to promote users.", 403) ) def promoteToAdmin(self, group, user): return self._promote(group, user, AccessType.ADMIN) def _promote(self, group, user, level): """ Promote a user to moderator or administrator. :param group: The group to promote within. :param user: The user to promote. :param level: Either WRITE or ADMIN, for moderator or administrator. :type level: AccessType :returns: The updated group document. """ if not group['_id'] in user.get('groups', []): raise AccessException('That user is not a group member.') group = self.model('group').setUserAccess(group, user, level=level, save=True) group['access'] = self.model('group').getFullAccessList(group) return group @access.user @filtermodel(model='group', addFields={'access', 'requests'}) @autoDescribeRoute( Description('Demote a user to a normal group member.') .responseClass('Group') .modelParam('id', model='group', level=AccessType.ADMIN) .modelParam('userId', 'The ID of the user to demote.', level=AccessType.READ, paramType='formData') .errorResponse() .errorResponse("You don't have permission to demote users.", 403) ) def demote(self, group, user): groupModel = self.model('group') group = groupModel.setUserAccess(group, user, level=AccessType.READ, save=True) group['access'] = groupModel.getFullAccessList(group) group['requests'] = list(groupModel.getFullRequestList(group)) return group @access.user @filtermodel(model='group', addFields={'access', 'requests'}) @autoDescribeRoute( Description('Remove a user from a group, or uninvite them.') .responseClass('Group') .notes('If the specified user is not yet a member of the group, this ' 'will delete any outstanding invitation or membership request for ' 'the user. Passing no userId parameter will assume that the ' 'current user is removing themself.') .modelParam('id', model='group', level=AccessType.READ) .modelParam('userId', 'The ID of the user to remove. If not passed, will ' 'remove yourself from the group.', required=False, level=AccessType.READ, destName='userToRemove', paramType='formData') .errorResponse() .errorResponse("You don't have permission to remove that user.", 403) ) def removeFromGroup(self, group, userToRemove): user = self.getCurrentUser() groupModel = self.model('group') if userToRemove is None: # Assume user is removing themself from the group userToRemove = user # If removing someone else, you must have at least as high an # access level as they do, and you must have at least write access # to remove any user other than yourself. if user['_id'] != userToRemove['_id']: if groupModel.hasAccess(group, userToRemove, AccessType.ADMIN): groupModel.requireAccess(group, user, AccessType.ADMIN) else: groupModel.requireAccess(group, user, AccessType.WRITE) group = groupModel.removeUser(group, userToRemove) group['access'] = groupModel.getFullAccessList(group) group['requests'] = list(groupModel.getFullRequestList(group)) return group @access.user @autoDescribeRoute( Description('Delete a group by ID.') .modelParam('id', model='group', level=AccessType.ADMIN) .errorResponse('ID was invalid.') .errorResponse('Admin access was denied for the group.', 403) ) def deleteGroup(self, group): self.model('group').remove(group) return {'message': 'Deleted the group %s.' % group['name']}