Source code for girder.utility.path

"""This module contains utility methods for parsing girder path strings."""

import re
from ..constants import AccessType
from ..exceptions import AccessException, GirderException, ValidationException
from ..exceptions import ResourcePathNotFound
from .model_importer import ModelImporter
from girder.models.collection import Collection
from girder.models.user import User


# Expose the ResourcePathNotFound exception as its original name
NotFoundException = ResourcePathNotFound


[docs]def encode(token): """Escape special characters in a token for path representation. :param str token: The token to encode :return: The encoded string :rtype: str """ return token.replace('\\', '\\\\').replace('/', '\\-')
[docs]def decode(token): """Un-escape special characters in a token from a path representation. :param str token: The token to decode :return: The decoded string :rtype: str """ return token.replace('\\-', '/').replace('\\\\', '\\')
[docs]def split(path): """Split an encoded path string into decoded tokens. :param str path: An encoded path string :return: A list of decoded tokens :rtype: `list` """ # It would be better to split by the regex `(?<!\\)(?>\\\\)*/`, # but python doesn't support atomic grouping. :( chunks = path.split('/') processed = [chunks[0]] # matches an odd number of backslashes at the end of the string escape = re.compile(r'(?<!\\)(?:\\\\)*\\$') # Loop through the chunks and check if any of the forward slashes was # escaped. for chunk in chunks[1:]: if escape.search(processed[-1]): # join the chunks processed[-1] = processed[-1] + '/' + chunk else: # append a new token processed.append(chunk) # now decode all of the tokens and return return [decode(token) for token in processed]
[docs]def join(tokens): """Join a list of tokens into an encoded path string. :param tokens: A list of tokens :return: The encoded path string :rtype: str """ return '/'.join([encode(token) for token in tokens])
[docs]def lookUpToken(token, parentType, parent): """ Find a particular child resource by name or throw an exception. :param token: the name of the child resource to find :param parentType: the type of the parent to search :param parent: the parent resource :returns: the child resource """ # (model name, mask, search filter) searchTable = ( ('folder', parentType in ('user', 'collection', 'folder'), { 'name': token, 'parentId': parent['_id'], 'parentCollection': parentType }), ('item', parentType == 'folder', {'name': token, 'folderId': parent['_id']}), ('file', parentType == 'item', {'name': token, 'itemId': parent['_id']}), ) for candidateModel, mask, filterObject in searchTable: if not mask: continue candidateChild = ModelImporter.model(candidateModel).findOne(filterObject) if candidateChild is not None: return candidateChild, candidateModel # if no folder, item, or file matches, give up raise ResourcePathNotFound('Child resource not found: %s(%s)->%s' % ( parentType, parent.get('name', parent.get('_id')), token))
[docs]def lookUpPath(path, user=None, filter=True, force=False): """ Look up a resource in the data hierarchy by path. :param path: path of the resource :param user: user with correct privileges to access path :param filter: Whether the returned model should be filtered. :type filter: bool :param force: if True, don't validate the access. :type force: bool """ path = path.lstrip('/') pathArray = split(path) model = pathArray[0] if model == 'user': username = pathArray[1] parent = User().findOne({'login': username}) if parent is None: raise ResourcePathNotFound('User not found: %s' % username) elif model == 'collection': collectionName = pathArray[1] parent = Collection().findOne({'name': collectionName}) if parent is None: raise ResourcePathNotFound('Collection not found: %s' % collectionName) else: raise ValidationException('Invalid path format') try: document = parent if not force: ModelImporter.model(model).requireAccess(document, user) for token in pathArray[2:]: document, model = lookUpToken(token, model, document) if not force: ModelImporter.model(model).requireAccess(document, user) except (ValidationException, AccessException): # We should not distinguish the response between access and validation errors so that # adversarial users cannot discover the existence of data they don't have access to by # looking up a path. raise ResourcePathNotFound('Path not found: %s' % path) if filter: document = ModelImporter.model(model).filter(document, user) return { 'model': model, 'document': document }
[docs]def getResourceName(type, doc): """ Get the name of a resource that can be put in a path, :param type: the resource model type. :type type: str :param doc: the resource document. :type doc: dict :return: the name of the resource. :rtype: str """ if type == 'user': return doc['login'] elif type in ('file', 'item', 'folder', 'user', 'collection'): return doc['name'] else: raise GirderException('Invalid resource type.')
[docs]def getResourcePath(type, doc, user=None, force=False): """ Get the path for a resource. :param type: the resource model type. :type type: str :param doc: the resource document. :type doc: dict :param user: user with correct privileges to access path :type user: dict or None :param force: if True, don't validate the access. :type force: bool :return: the path to the resource. :rtype: str """ path = [] while True: path.insert(0, getResourceName(type, doc)) if type == 'file': parentModel = 'item' parentId = doc['itemId'] elif type == 'item': parentModel = 'folder' parentId = doc['folderId'] elif type == 'folder': parentModel = doc['parentCollection'] parentId = doc['parentId'] else: break doc = ModelImporter.model(parentModel).load( id=parentId, user=user, level=AccessType.READ, force=force) type = parentModel path.insert(0, type) return '/' + join(path)