"""This module contains utility methods for parsing girder path strings."""
import re
from ..constants import AccessType
from ..exceptions import AccessException, GirderException, ValidationException
from ..exceptions import ResourcePathNotFound
from .model_importer import ModelImporter
from girder.models.collection import Collection
from girder.models.user import User
# Expose the ResourcePathNotFound exception as its original name
NotFoundException = ResourcePathNotFound
[docs]def encode(token):
"""Escape special characters in a token for path representation.
:param str token: The token to encode
:return: The encoded string
:rtype: str
"""
return token.replace('\\', '\\\\').replace('/', '\\-')
[docs]def decode(token):
"""Un-escape special characters in a token from a path representation.
:param str token: The token to decode
:return: The decoded string
:rtype: str
"""
return token.replace('\\-', '/').replace('\\\\', '\\')
[docs]def split(path):
"""Split an encoded path string into decoded tokens.
:param str path: An encoded path string
:return: A list of decoded tokens
:rtype: `list`
"""
# It would be better to split by the regex `(?<!\\)(?>\\\\)*/`,
# but python doesn't support atomic grouping. :(
chunks = path.split('/')
processed = [chunks[0]]
# matches an odd number of backslashes at the end of the string
escape = re.compile(r'(?<!\\)(?:\\\\)*\\$')
# Loop through the chunks and check if any of the forward slashes was
# escaped.
for chunk in chunks[1:]:
if escape.search(processed[-1]):
# join the chunks
processed[-1] = processed[-1] + '/' + chunk
else:
# append a new token
processed.append(chunk)
# now decode all of the tokens and return
return [decode(token) for token in processed]
[docs]def join(tokens):
"""Join a list of tokens into an encoded path string.
:param tokens: A list of tokens
:return: The encoded path string
:rtype: str
"""
return '/'.join([encode(token) for token in tokens])
[docs]def lookUpToken(token, parentType, parent):
"""
Find a particular child resource by name or throw an exception.
:param token: the name of the child resource to find
:param parentType: the type of the parent to search
:param parent: the parent resource
:returns: the child resource
"""
# (model name, mask, search filter)
searchTable = (
('folder', parentType in ('user', 'collection', 'folder'), {
'name': token,
'parentId': parent['_id'],
'parentCollection': parentType
}),
('item', parentType == 'folder', {'name': token, 'folderId': parent['_id']}),
('file', parentType == 'item', {'name': token, 'itemId': parent['_id']}),
)
for candidateModel, mask, filterObject in searchTable:
if not mask:
continue
candidateChild = ModelImporter.model(candidateModel).findOne(filterObject)
if candidateChild is not None:
return candidateChild, candidateModel
# if no folder, item, or file matches, give up
raise ResourcePathNotFound('Child resource not found: %s(%s)->%s' % (
parentType, parent.get('name', parent.get('_id')), token))
[docs]def lookUpPath(path, user=None, filter=True, force=False):
"""
Look up a resource in the data hierarchy by path.
:param path: path of the resource
:param user: user with correct privileges to access path
:param filter: Whether the returned model should be filtered.
:type filter: bool
:param force: if True, don't validate the access.
:type force: bool
"""
path = path.lstrip('/')
pathArray = split(path)
model = pathArray[0]
if model == 'user':
username = pathArray[1]
parent = User().findOne({'login': username})
if parent is None:
raise ResourcePathNotFound('User not found: %s' % username)
elif model == 'collection':
collectionName = pathArray[1]
parent = Collection().findOne({'name': collectionName})
if parent is None:
raise ResourcePathNotFound('Collection not found: %s' % collectionName)
else:
raise ValidationException('Invalid path format')
try:
document = parent
if not force:
ModelImporter.model(model).requireAccess(document, user)
for token in pathArray[2:]:
document, model = lookUpToken(token, model, document)
if not force:
ModelImporter.model(model).requireAccess(document, user)
except (ValidationException, AccessException):
# We should not distinguish the response between access and validation errors so that
# adversarial users cannot discover the existence of data they don't have access to by
# looking up a path.
raise ResourcePathNotFound('Path not found: %s' % path)
if filter:
document = ModelImporter.model(model).filter(document, user)
return {
'model': model,
'document': document
}
[docs]def getResourceName(type, doc):
"""
Get the name of a resource that can be put in a path,
:param type: the resource model type.
:type type: str
:param doc: the resource document.
:type doc: dict
:return: the name of the resource.
:rtype: str
"""
if type == 'user':
return doc['login']
elif type in ('file', 'item', 'folder', 'user', 'collection'):
return doc['name']
else:
raise GirderException('Invalid resource type.')
[docs]def getResourcePath(type, doc, user=None, force=False):
"""
Get the path for a resource.
:param type: the resource model type.
:type type: str
:param doc: the resource document.
:type doc: dict
:param user: user with correct privileges to access path
:type user: dict or None
:param force: if True, don't validate the access.
:type force: bool
:return: the path to the resource.
:rtype: str
"""
path = []
while True:
path.insert(0, getResourceName(type, doc))
if type == 'file':
parentModel = 'item'
parentId = doc['itemId']
elif type == 'item':
parentModel = 'folder'
parentId = doc['folderId']
elif type == 'folder':
parentModel = doc['parentCollection']
parentId = doc['parentId']
else:
break
doc = ModelImporter.model(parentModel).load(
id=parentId, user=user, level=AccessType.READ, force=force)
type = parentModel
path.insert(0, type)
return '/' + join(path)