Skip to content

Sandhill API Reference

This document provides API reference for Sandhill code not covered by the user documentation. If you plan on developing additional data processors, filters, or the like, this may be useful. Otherwise, you can safely ignore these documentation pages.

Routes

Core routing API should not be needed even when developing new functionality for your instance. Still, it's provided here for the curious.

sandhill.routes.static

Sandhill overrides/additions to the default Flask /static route.

favicon()

Wrapper to calling handle_static for the ever popular favicon file.

Returns:

Type Description
file stream

The favicon.ico file stream from inside /static

Raises:

Type Description
HTTPException

On HTTP error

Source code in sandhill/routes/static.py
@app.route('/favicon.ico')
def favicon():
    '''
    Wrapper to calling handle_static for the ever popular favicon file. \n
    Returns:
        (file stream): The favicon.ico file stream from inside `/static` \n
    Raises:
        HTTPException: On HTTP error \n
    '''
    return handle_static('favicon.ico')

handle_static(filename)

Replacement for the default Flask /static path handler. Retrieves the requested static file by first looking for it inside the instance/static/ directory. If the file is not found, this method will then look for the file in the core sandhill/templates/ directory.

Parameters:

Name Type Description Default
filename str

The requested file path within /static

required

Returns:

Type Description
file stream

File stream of the file object

Raises:

Type Description
HTTPException

On HTTP error

Source code in sandhill/routes/static.py
@app.route('/static/<path:filename>', endpoint='static')
def handle_static(filename):
    '''
    Replacement for the default Flask `/static` path handler. \
    Retrieves the requested static file by first looking for it inside \
    the `instance/static/` directory. If the file is not found, this \
    method will then look for the file in the core `sandhill/templates/` \
    directory. \n
    Args:
        filename (str): The requested file path within `/static` \n
    Returns:
        (file stream): File stream of the file object \n
    Raises:
        HTTPException: On HTTP error \n
    '''
    # Return from instance/static/ if available
    static_path = os.path.join(app.instance_path, "static")

    # Fall back to sandhill/static/
    if not os.path.isfile(os.path.join(static_path, filename)):
        static_path = os.path.join(app.root_path, "static")

    cache_timeout = app.get_send_file_max_age(filename)
    return send_from_directory(static_path, filename, max_age=cache_timeout)

sandhill.routes.error

Sandhill HTTP error handling

handle_http_abort(exc)

Overrides the default Flask template for abort codes.

Parameters:

Name Type Description Default
exc HTTPException

A HTTPException from a 4xx or 5xx HTTP code

required

Returns:

Type Description
Response

The Flask error response

Source code in sandhill/routes/error.py
@app.errorhandler(HTTPException)
def handle_http_abort(exc):
    """
    Overrides the default Flask template for abort codes. \n
    Args:
        exc (werkzeug.exceptions.HTTPException): A HTTPException from a 4xx or 5xx HTTP code \n
    Returns:
        (flask.Response): The Flask error response \n
    """
    # Check if the request accepts json format, if so prefer that for rendering
    request_format = match_request_format(None, ["application/json", "text/html"])
    if request_format == "application/json":
        exc_dict = {"code": exc.code, "name": exc.name, "description": exc.description}
        return jsonify(exc_dict), exc.code

    # Otherwise return html
    return render_template("abort.html.j2", e=exc), exc.code

sandhill.routes.main

The main route provides the entry point for Sandhill, loading and adding routes.

add_routes()

Decorator function that adds all routes to the Flask app based on JSON route configs loaded from instance/configs/routes/.

Source code in sandhill/routes/main.py
def add_routes():
    """
    Decorator function that adds all routes to the Flask app based \
    on JSON route configs loaded from `instance/configs/routes/`. \n
    """
    app.logger.info("Running add_routes")
    def decorator(func, **options):
        all_routes = get_all_routes()
        app.logger.info(f"Loading routes: {', '.join([repr(route) for route in all_routes])}")
        for route in all_routes:
            endpoint = options.pop('endpoint', None)
            options['methods'] = route.methods
            app.logger.info(
                f"Adding URL rule: {route.rule}, {endpoint}, {func} {json.dumps(options)}"
            )
            app.add_url_rule(route.rule, endpoint, func, **options)
        return func
    return decorator

main(*args, **kwargs)

Entry point for the whole Sandhill application, handling all routes and determining if a route has output to respond with after all processing is completed.

Based on the route_config that the path matches to, it will load all the required data processors before rendering the result.

Parameters:

Name Type Description Default
*args

Unused

()
**kwargs

Unused

{}

Returns:

Type Description

A valid response for Flask to render out, or raises HTTP 500

Source code in sandhill/routes/main.py
@add_routes()
def main(*args, **kwargs): # pylint: disable=unused-argument
    """
    Entry point for the whole Sandhill application, handling all routes and \
    determining if a route has output to respond with after all processing \
    is completed. \n
    Based on the route_config that the path matches to, it will load all the \
    required data processors before rendering the result. \n
    Args:
        *args: Unused \n
        **kwargs: Unused \n
    Returns:
        A valid response for Flask to render out, or raises HTTP 500 \n
    """
    route_used = request.url_rule.rule
    ## loop over all the configs in the instance dir looking at the "route"
    ## field to determine which configs to use
    route_config = load_route_config(route_used)
    ## process and load data routes
    route_data = []
    data = {}
    ## if 'template' is in the route_config, append the template processor
    ## to handle legacy configs
    if 'template' in route_config:
        if 'data' not in route_config:
            route_config['data'] = []
        route_config['data'].append(
            {
                'processor': 'template.render',
                'file': route_config['template'],
                'name': '_template_render'
            })
    route_rules = tolistfromkeys(route_config, 'route', 'routes')
    if 'data' in route_config:
        for idx, entry in enumerate(route_config['data']):
            if 'name' in entry and 'processor' in entry:
                route_data.append(entry)
            else:
                app.logger.warning(f"Unable to parse route data entry number {idx} " \
                                   f"for: {1} {','.join(route_rules)}")
        data = load_route_data(route_data)
    # check if none of the route processors returned a FlaskResponse
    if not isinstance(data, (FlaskResponse, WerkzeugReponse)):
        app.logger.warning(
            f"None of the 'data' processors in {route_rules}"
            f" returned a Response object"
        )
        if app.debug:
            data = jsonify(data)
        else:
            abort(500)
    return data

Data Processors

Sandhill routes are composed of a list of data processors. These are single actions that Sandhill may take while processing a request. See the data processor documentation for full details.

Utils

Utilities are bits of helper code used elsewhere in Sandhill. Functions or classes in utils/ may be useful in writing instance specific code for your Sandhill site.

sandhill.utils.api

Functionality to support API calls.

api_get(**kwargs)

Perform an API call using requests.get() and return the response object. This function adds logging surrounding the call.

Parameters:

Name Type Description Default
**kwargs dict

Arguments to requests.get()

{}

Raises:

Type Description
RequestException

If the call cannot return a response.

Source code in sandhill/utils/api.py
def api_get(**kwargs):
    """
    Perform an API call using `requests.get()` and return the response object. This function adds \
    logging surrounding the call. \n
    Args:
        **kwargs (dict): Arguments to [`requests.get()`](#TODO) \n
    Raises:
        requests.RequestException: If the call cannot return a response. \n
    """
    if "timeout" not in kwargs:
        kwargs["timeout"] = 10
    app.logger.debug(f"API GET arguments: {kwargs}")
    response = requests.get(**kwargs)   # pylint: disable=missing-timeout
    app.logger.debug(f"API GET called: {response.url}")
    if not response.ok:
        app.logger.warning(
            f"API GET call returned {response.status_code}: {response.text}"
        )
    return response

api_get_multi(requests_kwargs)

Perform multiple API calls in parellel using futures, returning a list of responses.

Parameters:

Name Type Description Default
requests_kwargs list of dict

Each arguments to [requests.get()]

required

Returns:

Type Description

A generator yielding response objects

Source code in sandhill/utils/api.py
def api_get_multi(requests_kwargs):
    """
    Perform multiple API calls in parellel using futures, returning a list \
    of responses. \n
    Args:
        requests_kwargs (list of dict): Each arguments to [`requests.get()`] \n
    Returns:
        A generator yielding response objects \n
    """
    def request_futures(requests_kwargs):
        futures = []
        with FuturesSession() as session:
            for kwargs in requests_kwargs:
                if "timeout" not in kwargs:
                    kwargs["timeout"] = 10
                futures.append(session.get(**kwargs))
            for future in futures:
                yield future.result()

    return request_futures(requests_kwargs)

establish_url(url, fallback)

Set URL to fallback if provided URL is none; also checks the URL validity.

Parameters:

Name Type Description Default
url str

A possible URL.

required
fallback str

A secondary URL to fallback to if url is None.

required

Raises:

Type Description
HTTPException

If URL to be returned is not a valid formatted URL.

Source code in sandhill/utils/api.py
def establish_url(url, fallback):
    """
    Set URL to fallback if provided URL is none; also checks the URL validity. \n
    Args:
        url (str): A possible URL. \n
        fallback (str): A secondary URL to fallback to if `url` is None. \n
    raises:
        werkzeurg.exceptions.HTTPException: If URL to be returned is not a valid formatted URL. \n
    """
    url = url if url else fallback
    try:
        parsed = urlparse(url)
        if not url or not all([parsed.scheme, parsed.netloc]):
            raise ValueError
    except ValueError:
        app.logger.debug(f"URL provided is not valid: {url}")
        abort(400)
    return url

sandhill.utils.config_loader

Utilities for loading of loading config/ files.

get_all_routes(routes_dir='config/routes/')

Finds all routes in JSON files with within given directory and order them according to the desired load order.

Parameters:

Name Type Description Default
routes_dir str

The directory to look for route configs.

'config/routes/'

Returns:

Type Description
list

All of the route rules found in desired order.

Source code in sandhill/utils/config_loader.py
def get_all_routes(routes_dir="config/routes/"):
    '''
    Finds all routes in JSON files with within given directory and order them \
    according to the desired load order. \n
    Args:
        routes_dir (str): The directory to look for route configs. \n
    Returns:
        (list): All of the route rules found in desired order. \n
    '''
    routes = load_routes_from_configs(routes_dir)

    # if no routes are found, add a default one for the home page
    if not routes:
        app.logger.warning("No routes loaded; will use welcome home page route.")
        routes.append(Route(rule="/"))

    # prefer most specifc path (hardcoded path over variable) in left to right manner
    re_var = re.compile(r'<\w+:\w+>')
    sort_routes = []
    for route in routes:
        sort_routes.append((route, re_var.sub(' ', route.rule)))
    sort_routes = sorted(sort_routes, key=operator.itemgetter(1), reverse=True)

    return [r[0] for r in sort_routes]

load_json_config(file_path)

Load a JSON file.

Parameters:

Name Type Description Default
file_path str

The full path to the JSON file to load

required

Returns:

Type Description
dict

The contents of the loaded JSON file, or an empty dictionary upon error loading or parsing the file.

Source code in sandhill/utils/config_loader.py
@catch(OSError, "Unable to read json file at path: {file_path} Error: {exc}",
       return_val=collections.OrderedDict())
@catch(JSONDecodeError, "Malformed json at path: {file_path} Error: {exc}",
       return_val=collections.OrderedDict())
def load_json_config(file_path):
    """
    Load a JSON file. \n
    Args:
        file_path (str): The full path to the JSON file to load \n
    Returns:
        (dict): The contents of the loaded JSON file, or an empty dictionary \
                upon error loading or parsing the file. \n
    """
    app.logger.info(f"Loading json file at {file_path}")
    with open(file_path, encoding='utf-8') as json_config_file:
        return json.load(json_config_file, object_pairs_hook=collections.OrderedDict)

load_json_configs(path, recurse=False)

Loads all the config files in the provided path.

Parameters:

Name Type Description Default
path string

The directory path from which to load the config files.

required
recurse bool

If set to True, does a recursive walk into the path.

False

Returns:

Type Description
dict

Dictionary with keys of each file path and values of their loaded JSON.

Source code in sandhill/utils/config_loader.py
def load_json_configs(path, recurse=False):
    """
    Loads all the config files in the provided path. \n
    Args:
        path (string): The directory path from which to load the config files. \n
        recurse (bool): If set to True, does a recursive walk into the path. \n
    Returns:
        (dict): Dictionary with keys of each file path and values of their loaded JSON. \n
    """
    config_files = {}
    if not os.path.isdir(path):
        app.logger.warning(f"Failed to load json configs; invalid directory: {path}")
    for root, _, files in os.walk(path):
        for config_file in files:
            if config_file.endswith('.json'):
                config_file_path = os.path.join(root, config_file)
                config_files[config_file_path] = load_json_config(config_file_path)
        if not recurse:
            break

    return config_files

load_route_config(route_rule, routes_dir='config/routes/')

Return the json data for the provided directory.

Parameters:

Name Type Description Default
route_rule str

the route rule to match to in the json configs (the route key)

required
routes_dir str

the path to look for route configs. Default = config/routes/

'config/routes/'

Returns:

Type Description
OrderedDict

The loaded json of the matched route config, or empty dict if not found

Source code in sandhill/utils/config_loader.py
@catch(FileNotFoundError, "Route dir not found at path {routes_dir} - " \
       "creating welcome home page route. Error: {exc}",
       return_val=collections.OrderedDict({
           "route": ["/"],
           "template": "home.html.j2"
       }))
def load_route_config(route_rule, routes_dir="config/routes/"):
    '''
    Return the json data for the provided directory. \n
    Args:
        route_rule (str): the route rule to match to in the json configs (the `route` key) \n
        routes_dir (str): the path to look for route configs. Default = config/routes/ \n
    Returns:
        (OrderedDict): The loaded json of the matched route config, or empty dict if not found \n
    '''
    route_path = os.path.join(app.instance_path, routes_dir)
    data = collections.OrderedDict()
    conf_files = [
        os.path.join(route_path, j) for j in os.listdir(route_path) if j.endswith(".json")
    ]
    for conf_file in conf_files:
        check_data = load_json_config(conf_file)
        if route_rule in tolistfromkeys(check_data, "route", "routes"):
            data = check_data
            break
    return data

load_routes_from_configs(routes_dir='config/routes/')

Given a path relative to the instance/ dir, load all JSON files within and extract the "route" keys.

Parameters:

Name Type Description Default
routes_dir string

The relative path to the JSON files

'config/routes/'

Returns:

Type Description
list

A list of routes from the configs

Source code in sandhill/utils/config_loader.py
@catch(FileNotFoundError, "Route dir not found at path {routes_dir} Error: {exc}", return_val=[])
def load_routes_from_configs(routes_dir="config/routes/"):
    '''
    Given a path relative to the `instance/` dir, load all JSON files within \
    and extract the "route" keys. \n
    Args:
        routes_dir (string): The relative path to the JSON files \n
    Returns:
        (list): A list of routes from the configs \n
    '''
    route_path = os.path.join(app.instance_path, routes_dir)
    routes = []
    conf_files = [
        os.path.join(route_path, j) for j in os.listdir(route_path) if j.endswith(".json")
    ]
    for conf_file in conf_files:
        data = load_json_config(conf_file)
        r_rules = tolist(*[data.get(key) for key in ("route", "routes") if key in data])
        methods = tolist(*[data.get(key) for key in ("method", "methods") if key in data])
        for rule in r_rules:
            routes.append(Route(
                rule=rule,
                methods=methods,
            ))
    return routes

sandhill.utils.context

Context related functionality

app_context()

Create a flask app context if not already present.

# Example use
with context.app_context():
    ...

Returns:

Type Description

A context manager class instance.

Source code in sandhill/utils/context.py
def app_context():
    """
    Create a flask app context if not already present.\n
    ```
    # Example use
    with context.app_context():
        ...
    ``` \n
    Returns:
        A context manager class instance. \n
    """
    class NullContext: # pylint: disable=all
        def __enter__(self): return None
        def __exit__(self, exc_type, exc_value, traceback): return None

    return app.app_context() if not has_app_context() else NullContext()

context_processors()

The list of Sandhill context processor functions.

Returns:

Type Description
dict

Context processors mapped as: name => function

Source code in sandhill/utils/context.py
@app.context_processor
def context_processors():
    """
    The list of  Sandhill context processor functions. \n
    Returns:
        (dict): Context processors mapped as: name => function \n
    """
    # TODO move function definitions to sandhill/context/ and import them for use below

    def strftime(fmt: str = None, day: str = None) -> str:
        """
        Wrapper around datetime.strftime with default yyyy-mm-dd format \n
        args:
            fmt (str): The format for the date to return \n
            day (str): A date in yyyy-mm-dd format to format, or today if not passed \n
        returns:
            (str): The formatted date \n
        """
        fmt = "%Y-%m-%d" if not fmt else fmt
        day = datetime.now() if not day else datetime.strptime(day, "%Y-%m-%d")
        return day.strftime(fmt)

    def context_sandbug(value: Any, comment: str = None):
        """
        Sandbug as a context processor, because we can. Will output the given \
        value into the logs. For debugging. \n
        Args:
            value (Any): The value to debug. \n
            comment (str): Additional comment to add to log output. \n
        Returns:
            (None): Sandbug does not return a value. \n
        """
        sandbug(value, comment) # pylint: disable=undefined-variable

    def urlcomponents():
        """
        Creates a deepcopy of the url components part of the request object. \n
        Returns:
            (dict): The copied data. \n
        """
        try:
            return {
                "path": str(request.path),
                "full_path": str(request.full_path),
                "base_url": str(request.base_url),
                "url": str(request.url),
                "url_root": str(request.url_root),
                "query_args": deepcopy(request.query_args),
                "host": str(request.host)
            }
        except (UnicodeDecodeError, TypeError):
            abort(400)

    def find_mismatches(dict1: dict, dict2: dict) -> dict:
        """
        Return detailed info about how and where the two supplied dicts don't match. \n
        Args:
            dict1 (dict): A dictionary to compare. \n
            dict2 (dict): A dictionary to compare. \n
        Returns:
            (dict) A dictionary highlighting what is different between the two inputs. \n
        """
        return dict(DeepDiff(dict1, dict2, ignore_order=True))

    @pass_context
    def get_var(context, var: str):
        """
        Returns the given variable in the current application context \n
        Args:
            var (str): The name of the variable to get from the context \n
        Returns:
            (any): The value of the provided variable in the current context \n
        """
        ctx = dict(context)
        return ctx[var] if var in ctx else None

    # Mapping of context function names to actual functions
    return {
        'debug': app.debug,
        'strftime': strftime,
        'sandbug': context_sandbug,
        'urlcomponents': urlcomponents,
        'find_mismatches': find_mismatches,
        'get_var': get_var
    }

list_custom_context_processors()

Get the full list of the available custom context processors.

Returns:

Type Description
list

A list of strings of the context processor names.

Source code in sandhill/utils/context.py
def list_custom_context_processors():
    """
    Get the full list of the available custom context processors. \n
    Returns:
        (list): A list of strings of the context processor names. \n
    """
    custom = []
    for entries in app.template_context_processors[None]:
        ctx_procs = entries()
        for key in ctx_procs:
            if key not in ['g', 'request', 'session']:
                custom.append(key)
    return custom

sandhill.utils.error_handling

Methods to help handle errors

catch(exc_class, exc_msg=None, **kwargs)

Decorator to catch general exceptions and handle in a standarized manor.

Parameters:

Name Type Description Default
exc_class Exception

Type of exception to catch

required
exc_msg String) (optional

Message to log; the parameter {exc} is available in the string template.

Ex: f"Error: {exc}"

None
**kwargs

Optional arguments:

return_val (Any): Value to return after the exception has been handled

return_arg (str): Function kwarg to be returned after the exception has been handled

abort (int): Status code to abort with

{}

Returns:

Type Description
Any

Only if return_val or return_arg is provided in kwargs.

Raises:

Type Description
HTTPException

If no return_val or return_arg is provided in kwargs.

Examples:

@catch(KeyError, "Some error message", return_val=None)
def myfunc():
    ...

@catch((KeyError, IndexError), "Some error message", return_arg='myval')
def myfunc(myval):
    ...

Source code in sandhill/utils/error_handling.py
def catch(exc_class, exc_msg=None, **kwargs):
    """
    Decorator to catch general exceptions and handle in a standarized manor. \n
    Args:
        exc_class (Exception): Type of exception to catch \n
        exc_msg (String) (optional): Message to log; the \
            parameter `{exc}` is available in the string template.\n
            Ex: `f"Error: {exc}"` \n
        **kwargs: Optional arguments:\n
            return_val (Any): Value to return after the exception has been handled\n
            return_arg (str): Function kwarg to be returned after the exception has been handled\n
            abort (int): Status code to abort with \n
    Returns:
        (Any): Only if return_val or return_arg is provided in kwargs. \n
    Raises:
        (HTTPException): If no return_val or return_arg is provided in kwargs. \n
    Examples:
    ```python
    @catch(KeyError, "Some error message", return_val=None)
    def myfunc():
        ...

    @catch((KeyError, IndexError), "Some error message", return_arg='myval')
    def myfunc(myval):
        ...
    ``` \n
    """
    def inner(func):
        @wraps(func)
        def wrapper(*args, **func_kwargs):
            try:
                rval = func(*args, **func_kwargs)
            except exc_class as exc:
                def get_func_params():
                    '''Get original function parameters and their current values (including
                    defaults, if not passed)'''
                    func_params = {}
                    sig = inspect.signature(func)
                    for idx, (pname, param) in enumerate(sig.parameters.items()):
                        if pname not in func_params:
                            func_params[pname] = param.default
                        if idx < len(args):
                            func_params[pname] = args[idx]
                    return func_params

                # Re-map the function arguments to their variable name
                # for use in formatted error message string
                args_dict = {**get_func_params(), **func_kwargs}
                args_dict['exc'] = exc

                # Handling of the exception
                if exc_msg:
                    sandhill.app.logger.warning(f"{request.url if request else ''} raised: " + \
                        exc_msg.format(**args_dict))

                # Get the return_arg value if required and present in the function's arguments
                return_arg = None
                if 'return_arg' in kwargs:
                    if kwargs.get('return_arg') and kwargs.get('return_arg') in args_dict:
                        return_arg = args_dict[kwargs.get('return_arg')]

                # Abort if specified
                if 'abort' in kwargs:
                    abort(kwargs.get('abort'))

                # If no return_val specified, we'll re-raise the error
                if 'return_val' not in kwargs and 'return_arg' not in kwargs:
                    raise exc
                rval = return_arg if return_arg else kwargs.get('return_val')
            return rval
        return wrapper
    return inner

dp_abort(http_code)

Data processor abort. Will abort with the given status code if the data processor has an on_fail key set and the value is 0.

If the value is non-0, the 'on_fail' code will override the passed code.

Parameters:

Name Type Description Default
http_code int

A valid HTTP status code

required

Raises:

Type Description
HTTPException

Can raises exception if data processor's on_fail is defined in parent context

Source code in sandhill/utils/error_handling.py
def dp_abort(http_code):
    """
    Data processor abort. Will abort with the given status code if \
    the data processor has an `on_fail` key set and the value is `0`. \n
    If the value is non-`0`, the 'on_fail' code will override \
    the passed code. \n
    Args:
        http_code (int): A valid HTTP status code \n
    Raises:
        (HTTPException): Can raises exception if data processor's `on_fail` is \
            defined in parent context \n
    """
    parent_locals = inspect.currentframe().f_back.f_locals
    data = parent_locals['data'] if 'data' in parent_locals else {}
    if 'on_fail' in data:
        abort(http_code if data['on_fail'] == 0 else data['on_fail'])

sandhill.utils.generic

Generic functions that could be used in most any context.

getconfig(name, default=None)

Get the value of the given config name. It will first check in the environment for the variable name, otherwise look in the app.config, otherwise use the default param

Parameters:

Name Type Description Default
name str

Name of the config variable to look for

required
default str | None

The defaut value if not found elsewhere

None

Returns:

Type Description
str

Value of the config variable, default value otherwise

Source code in sandhill/utils/generic.py
def getconfig(name, default=None):
    '''
    Get the value of the given config name. It will first \
    check in the environment for the variable name, otherwise \
    look in the app.config, otherwise use the default param \n
    Args:
        name (str): Name of the config variable to look for \n
        default (str|None): The defaut value if not found elsewhere \n
    Returns:
        (str): Value of the config variable, default value otherwise \n
    '''
    value = default
    if name in os.environ and os.environ[name]:
        value = os.environ[name]
    elif name in app.config and app.config[name] is not None:
        value = app.config[name]
    return value

getdescendant(obj, list_keys, extract=False, put=None)

Gets key values from the dictionary/list if they exist;

will check recursively through the obj.

Parameters:

Name Type Description Default
obj dict | list

A dict/list to check, possibly containing nested dicts/lists.

required
list_keys list | str

List of descendants to follow (or . delimited string)

required
extract bool

If set to true, will remove the last matching value from the obj.

False
put Any

Replace the found value with this new value in the obj, or append if the found value at a list key of "[]"

None

Returns:

Type Description
Any

The last matching value from list_keys, or None if no match

Raises:

Type Description
IndexError

When attempting to put a list index that is invalid.

Examples:

# Get "key1" of mydict, then index 2 of that result, then "key3" of that result
v = getdescendant(mydict, "key1.2.key3")
# Same as above, only also remove the found item from mydict
v = getdescendant(mydict, "key1.2.key3", extract=True)
# Replace value with new value
v = getdescendant(mydict, "key1.2.key3", put="Replacement value!")
# Append to a list
v = getdescendant(mydict, "key1.2.[]", put="Append this value.")

Source code in sandhill/utils/generic.py
@catch(ValueError, "Could not find {list_keys} in: {obj}", return_val=None)
def getdescendant(obj, list_keys, extract=False, put=None):
    '''
    Gets key values from the dictionary/list if they exist; \n
    will check recursively through the `obj`. \n
    Args:
        obj (dict|list): A dict/list to check, possibly containing nested dicts/lists. \n
        list_keys (list|str): List of descendants to follow (or . delimited string) \n
        extract (bool): If set to true, will remove the last matching value from the `obj`. \n
        put (Any): Replace the found value with this new value in the `obj`, \
                   or append if the found value at a list key of `"[]"` \n
    Returns:
        (Any): The last matching value from list_keys, or None if no match \n
    Raises:
        IndexError: When attempting to put a list index that is invalid. \n
    Examples:
    ```python
    # Get "key1" of mydict, then index 2 of that result, then "key3" of that result
    v = getdescendant(mydict, "key1.2.key3")
    # Same as above, only also remove the found item from mydict
    v = getdescendant(mydict, "key1.2.key3", extract=True)
    # Replace value with new value
    v = getdescendant(mydict, "key1.2.key3", put="Replacement value!")
    # Append to a list
    v = getdescendant(mydict, "key1.2.[]", put="Append this value.")
    ``` \n
    '''
    list_keys = list_keys.split('.') if isinstance(list_keys, str) else list_keys
    for idx, key in enumerate(list_keys):
        pobj = obj
        if isinstance(obj, Mapping) and key in obj:
            obj = obj[key]
        elif isinstance(obj, list) and str(key).isdigit() and int(key) < len(obj):
            key = int(key)
            obj = obj[key]
        else:
            obj = None
        #  This is the last key in the loop
        if (idx + 1) == len(list_keys):
            if extract and obj is not None:
                del pobj[key]
            if put is not None and pobj is not None:
                if isinstance(pobj, Mapping):
                    pobj[key] = put
                if isinstance(pobj, list):
                    if key == "[]":
                        pobj.append(put)
                    elif isinstance(key, int) and key < len(pobj):
                        pobj[key] = put
                    else:
                        raise IndexError(f"Index of {key} is invalid for list: {pobj}")
    return obj if list_keys else None

getmodulepath(path)

Get the Python module path for a directory or file in Sandhill

Parameters:

Name Type Description Default
path str

A file or dir path in Sandhill

required

Returns:

Type Description
str

module (e.g. 'instance' or 'sandhill.filters.filters')

Source code in sandhill/utils/generic.py
def getmodulepath(path):
    """
    Get the Python module path for a directory or file in Sandhill \n
    Args:
        path (str): A file or dir path in Sandhill \n
    Returns:
        (str): module (e.g. 'instance' or 'sandhill.filters.filters') \n
    """
    install_path = os.path.dirname(app.root_path)
    subpath = re.sub('^' + re.escape(install_path), '', path)
    return re.sub('\\.py$', '', subpath).strip('/').replace('/', '.')

ifnone(*args)

Returns the default value if the key is not in the dictionary or if a non-dictionary is provided it will return the default if it is not set.

Parameters:

Name Type Description Default
*args Any

With 3 args:

var (dict): The dictionary to check

key (str): The key of the dictionary

default_value (Any): Return val if key is not in var

With 2 args:

var (Any): The variable to check

default_value (Any): Return val if the variable is None
()

Returns:

Type Description

(Any) The default_value if the value is None or the key is not in the dict.

Raises:

Type Description
TypeError

If invalid number of arguments passed.

Source code in sandhill/utils/generic.py
def ifnone(*args):
    '''
    Returns the default value if the key is not in the dictionary or if \
    a non-dictionary is provided it will return the default if it is not set. \n
    Args:
        *args (Any):
            With 3 args:\n
                var (dict): The dictionary to check\n
                key (str): The key of the dictionary\n
                default_value (Any): Return val if key is not in var\n
            With 2 args:\n
                var (Any): The variable to check\n
                default_value (Any): Return val if the variable is None\n
    Returns:
        (Any) The default_value if the value is None or the key is not in the dict. \n
    Raises:
        (TypeError): If invalid number of arguments passed. \n
    '''
    var = args[0] if args else None
    if len(args) not in [2, 3]:
        raise TypeError(
            f"ifnone() missing required positional argument (2 or 3) {len(args)} received."
        )
    if len(args) == 3:
        key = args[1]
        default_val = args[2]
        return var[key] if isinstance(var, dict) and key in var else default_val # pylint: disable=unsupported-membership-test,unsubscriptable-object
    # len(args) == 2
    default_value = args[1]
    return var if var is not None else default_value

overlay_dicts_matching_key(target: list[dict], overlays: list[dict], key: Hashable)

Given the target, find and replace matching dicts, for all matching dicts in the list of overlays, using the value for the provided key to compare them.

For each overlay dict, overlay the values on top of any matching original dict from the target.

If multiple overlays match an original target dict, both overlays will use the original dict as a base for the overlay.

If no matching dict was found in the original target list, the overlay will be appended to the target as is.

Parameters:

Name Type Description Default
target list

The list of dicts to search and update.

required
overlays list

The list of dict match and overlay.

required
key Hashable

The key (both dicts) for the comparison value.

required
Source code in sandhill/utils/generic.py
def overlay_dicts_matching_key(target: list[dict], overlays: list[dict], key: Hashable):
    """
    Given the target, find and replace matching dicts, for all matching dicts \
    in the list of overlays, using the value for the provided key to compare them. \n
    For each overlay dict, overlay the values on top of any matching original \
    dict from the target. \n
    If multiple overlays match an original target dict, both overlays will use \
    the original dict as a base for the overlay. \n
    If no matching dict was found in the original target list, the overlay will be \
    appended to the target as is. \n
    Args:
        target (list): The list of dicts to search and update. \n
        overlays (list): The list of dict match and overlay. \n
        key (Hashable): The key (both dicts) for the comparison value. \n
    """
    base_ref = {}
    # Extract all base/default dictionaries from target
    for overlay in overlays:
        if (oval := overlay.get(key)):
            if [base_dict := tdict for tdict in target if tdict.get(key) == oval]:
                target.remove(base_dict)
                base_ref[oval] = base_dict

    # For each overlay, copy matching base and update with overlay before appending
    for overlay in overlays:
        if (base_dict := base_ref.get(overlay.get(key))):
            overlay = {**base_dict, **overlay}
        target.append(overlay)

pop_dict_matching_key(haystack: list[dict], match: dict, key: Hashable) -> list[dict]

Search the haystack for all dicts that have the same value as the passed match dict for the given key.

Matched dicts are removed from the haystack and returned as a list.

Parameters:

Name Type Description Default
haystack list

A list of dicts to search through.

required
match dict

The dict to match against.

required
key Hashable

The key (both dicts) for the comparison value.

required

Returns:

Type Description
list

A list of matching dicts removed from the haystack.

Source code in sandhill/utils/generic.py
def pop_dict_matching_key(haystack: list[dict], match: dict, key: Hashable) -> list[dict]:
    """
    Search the haystack for all dicts that have the same \
    value as the passed match dict for the given key. \n
    Matched dicts are removed from the haystack and \
    returned as a list. \n
    Args:
        haystack (list): A list of dicts to search through. \n
        match (dict): The dict to match against. \n
        key (Hashable): The key (both dicts) for the comparison value. \n
    Returns:
        (list): A list of matching dicts removed from the haystack. \n
    """
    matched = []
    if (needle_val := match.get(key)):
        for hay in list(haystack):
            if hay.get(key) == needle_val:
                matched.append(hay)
                haystack.remove(hay)
    return matched

recursive_merge(dict1: dict, dict2: dict, sanity: int = 100) -> dict

Given 2 dictionaries, merge them together, overriding dict1 values by dict2 if existing in both.

Parameters:

Name Type Description Default
dict1 dict

Base dictionary, keys will be overridden if the keys are in both.

required
dict2 dict

Prioritized dictionary, keys will be kept if the keys are in both.

required
sanity int

The depth to reach before raising an error.

100

Returns:

Name Type Description
dict dict

dict1 and dict2 merged

Raises:

Type Description
RecursionError

if dictionary depth reaches sanity

TypeError

dict1 and dict2 needs to be dictionaries, sanity needs to be an integer

Source code in sandhill/utils/generic.py
def recursive_merge(dict1: dict, dict2: dict, sanity: int = 100) -> dict:
    """
    Given 2 dictionaries, merge them together, overriding dict1 \
    values by dict2 if existing in both.\n

    Args:
        dict1 (dict): Base dictionary, keys will be overridden if the keys are in both. \n
        dict2 (dict): Prioritized dictionary, keys will be kept if the keys are in both. \n
        sanity (int): The depth to reach before raising an error. \n

    Returns:
        dict: dict1 and dict2 merged\n

    Raises:
        RecursionError: if dictionary depth reaches sanity\n
        TypeError: dict1 and dict2 needs to be dictionaries, sanity needs to be an integer\n
    """
    if not (isinstance(dict1, dict) and isinstance(dict2, dict) and isinstance(sanity, int)):
        raise TypeError(('The function only accepts dictionaries'
                    ' as dict1 and dict2 and integer as sanity'))
    merged = dict1.copy()
    sanity -= 1
    if sanity < 0:
        raise RecursionError('Reached depth limit of recursion, aborting')
    for key, value in dict2.items():
        if key in merged and isinstance(merged[key], dict) and isinstance(value, dict):
            merged[key] = recursive_merge(merged[key], value, sanity)
        elif value is None and key in merged:
            del merged[key]
        else:
            merged[key] = value
    return merged

tolist(*args)

Combine arguments, appending them to a list; args may be scalars or lists. If args is a list, then the values of the list are appended (not the list itself).

Parameters:

Name Type Description Default
*args Any

Items to combine.

()

Returns:

Type Description
list

The combined list.

Source code in sandhill/utils/generic.py
def tolist(*args):
    """
    Combine arguments, appending them to a list; args may be scalars or lists. If args is \
    a list, then the values of the list are appended (not the list itself). \n
    Args:
        *args (Any): Items to combine. \n
    Returns:
        (list): The combined list. \n
    """
    combined = []
    for i in args:
        if isinstance(i, list):
            combined += i
        else:
            combined.append(i)
    return combined

tolistfromkeys(fromdict, *args)

From a given dict, find all keys and return them within a list.

If found keys are already a list, then extends the return list with the matched value instead of appending a list within a list.

Parameters:

Name Type Description Default
fromdict dict

Dictionary to search within.

required
*args str | int

Keys to find.

()

Returns:

Type Description
list

The list with found key values.

Source code in sandhill/utils/generic.py
def tolistfromkeys(fromdict, *args):
    """
    From a given dict, find all keys and return them within a list. \n
    If found keys are already a list, then extends the return list with the matched value \
    instead of appending a list within a list. \n
    Args:
        fromdict (dict): Dictionary to search within. \n
        *args (str|int): Keys to find. \n
    Returns:
        (list): The list with found key values. \n
    """
    values = []
    for arg in args:
        if arg in fromdict:
            value = fromdict[arg]
            values.extend(value if isinstance(value, list) else [value])
    return values

touniquelist(*args)

Combine arguments while excluding duplicate values. Same functionality as tolist() only with duplicate values being removed.

Parameters:

Name Type Description Default
*args Any

Items to combine.

()

Returns:

Type Description
list

The combined list with duplicates removed.

Source code in sandhill/utils/generic.py
def touniquelist(*args):
    """
    Combine arguments while excluding duplicate values. Same functionality as `tolist()` \
    only with duplicate values being removed. \n
    Args:
        *args (Any): Items to combine. \n
    Returns:
        (list): The combined list with duplicates removed. \n
    """
    unique_list = []
    _ = [unique_list.append(i) for i in tolist(*args) if i not in unique_list]
    return unique_list

utils.html.HTMLTagFilter

Class used to filter through HTML and remove all tags except for those set as allowed. Used by the filtertags() template filter.

sandhill.utils.jsonpath

Wrapper functions for JSONPath queries.

find(data, path=None, deepcopy=True)

Get the values for a given JSONPath.

Parameters:

Name Type Description Default
data dict | list

The JSON data

required
path str

The JSONPath to find

None

Returns:

Type Description
list

A list of matches, empty if none found

Source code in sandhill/utils/jsonpath.py
def find(data, path=None, deepcopy=True):
    '''
    Get the values for a given JSONPath. \n
    Args:
        data (dict|list): The JSON data \n
        path (str): The JSONPath to find \n
    Returns:
        (list): A list of matches, empty if none found \n
    '''
    if deepcopy:
        data = copy.deepcopy(data)
    if path is None:
        return data
    pattern = parse(path)
    matches = pattern.find(data)
    return [match.value for match in matches]

put(data, path, value, deepcopy=True)

Set a value at the given JSONPath location.

Parameters:

Name Type Description Default
data dict | list

The JSON data

required
path str

The JSONPath to find

Last element in path will be removed, which must be a specific Field or Index only

required
value any

The value to set

required

Returns:

Type Description
dict | list

The modified JSON data

Source code in sandhill/utils/jsonpath.py
def put(data, path, value, deepcopy=True):
    '''
    Set a value at the given JSONPath location. \n
    Args:
        data (dict|list): The JSON data \n
        path (str): The JSONPath to find \n
            Last element in path will be removed, which must be \
            a specific Field or Index only \n
        value (any): The value to set\n
    Returns:
        (dict|list): The modified JSON data \n
    '''
    if deepcopy:
        data = copy.deepcopy(data)
    pattern = parse(path)
    if not isinstance(pattern.right, Fields) and not isinstance(pattern.right, Index):
        raise ValueError("jsonpath.put can only set specific Fields or Indexes")

    matches = pattern.left.find(data)
    for match in matches:
        new_value = match.value
        idxs = [pattern.right.index] if isinstance(pattern.right, Index) \
            else pattern.right.reified_fields(match.context)
        for idx in idxs:
            new_value[idx] = value
        parse(str(match.full_path)).update(data, new_value)

    return data

append(data, path, value, deepcopy=True)

Append a value to the given JSONPath location. Location must be a list.

Parameters:

Name Type Description Default
data dict | list

The JSON data

required
path str

The JSONPath to a list(s)

required
value any

The value to append

required

Returns:

Type Description
dict | list

The modified JSON data

Source code in sandhill/utils/jsonpath.py
def append(data, path, value, deepcopy=True):
    '''
    Append a value to the given JSONPath location. Location must be a list. \n
    Args:
        data (dict|list): The JSON data \n
        path (str): The JSONPath to a list(s) \n
        value (any): The value to append \n
    Returns:
        (dict|list): The modified JSON data \n
    '''
    if deepcopy:
        data = copy.deepcopy(data)
    pattern = parse(path)
    matches = pattern.find(data)
    for match in matches:
        section = match.value
        if not isinstance(section, list):
            raise ValueError("jsonpath.append can only do so to a list. " \
                             f"Path '{path}' found type {type(section)}")
        section.append(value)
        parse(str(match.full_path)).update(data, section)
    return data

delete(data, path, deepcopy=True)

Delete item(s) from JSON data.

Parameters:

Name Type Description Default
data dict | list

The JSON data

required
path str

The JSONPath to the object(s) to delete

Last element in path will be removed, which must be a specific Field or Index only

required

Returns:

Type Description
dict | list

The modified JSON data

Source code in sandhill/utils/jsonpath.py
def delete(data, path, deepcopy=True):
    '''
    Delete item(s) from JSON data. \n
    Args:
        data (dict|list): The JSON data \n
        path (str): The JSONPath to the object(s) to delete \n
            Last element in path will be removed, which must be \
            a specific Field or Index only \n
    Returns:
        (dict|list): The modified JSON data \n
    '''
    if deepcopy:
        data = copy.deepcopy(data)
    pattern = parse(path)
    if not isinstance(pattern.right, Fields) and not isinstance(pattern.right, Index):
        raise ValueError("jsonpath.put can only set specific Fields or Indexes")

    matches = pattern.left.find(data)
    for match in matches:
        new_value = match.value
        idxs = [pattern.right.index] if isinstance(pattern.right, Index) \
            else pattern.right.reified_fields(match.context)
        for idx in idxs:
            del new_value[idx]
        parse(str(match.full_path)).update(data, new_value)

    return data

eval_within(string: str, context: dict)

Given a string containing JSONPath queries, replace the queries with the values they found.

JSONPath queries will query within the context.

Example context

{
    "item": {
        "elem1": { "elem2": "value1" } }
    "parent": {
        "elem3": "value2" }
}

Example query strings

# No given context
"$.elem1.elem2"     # would query the first item in the context dictionary
# Specified conext
"$parent.elem3"     # would query the the "parent" key in the context dictionary

Parameters:

Name Type Description Default
string str

The string to search within for JSONPath queries

required
context dict

A dictionary of contexts upon which a JSONPath could query.

required
Source code in sandhill/utils/jsonpath.py
def eval_within(string: str, context: dict):
    '''
    Given a string containing JSONPath queries, replace the queries with the values they found.\n
    JSONPath queries will query within the context.\n
    **Example context** \n
    ```json
    {
        "item": {
            "elem1": { "elem2": "value1" } }
        "parent": {
            "elem3": "value2" }
    }
    ``` \n
    **Example query strings** \n
    ```python
    # No given context
    "$.elem1.elem2"     # would query the first item in the context dictionary
    # Specified conext
    "$parent.elem3"     # would query the the "parent" key in the context dictionary
    ``` \n
    Args:
        string (str): The string to search within for JSONPath queries \n
        context (dict): A dictionary of contexts upon which a JSONPath could query. \n
    '''
    if not isinstance(context, dict) or len(context) == 0:
        app.logger.debug("jsonpath.eval_within given invalid/empty context. Skipping.")
        return string

    space_esc = '&&&SPACE&&&'
    # find matching square brackets and replace spaces with placeholder
    brackets = re.findall(r'(\[.+?\])', string)
    for before, after in zip(brackets, [bkt.replace(' ', space_esc) for bkt in brackets]):
        string = string.replace(before, after, 1)

    parts = re.split(r'\s', string)

    # for vals starting with  $, JSONPath find and replace value with str() of result
    jsonpath_pat = re.compile(r'^\$([a-zA-Z0-9_]+)?\.([^ ]+)$')
    context_keys = list(context.keys())
    for idx, part in enumerate(parts):
        if (match := jsonpath_pat.match(part)):
            ctx_key = match.group(1)
            if not ctx_key:
                ctx_key = context_keys[0]
            path = f"$.{match.group(2)}".replace(space_esc, " ")
            result = find(context[ctx_key], path) if ctx_key in context else []
            # If only one results, then remove it from list
            result = result[0] if len(result) == 1 else result
            result = json.dumps(result) if isinstance(result, str) else result
            parts[idx] = result
        else:
            parts[idx] = part.replace(space_esc, " ")

    return " ".join([str(part) for part in parts])

sandhill.utils.request

Client request related functions

match_request_format(view_args_key, allowed_formats, default_format='text/html')

Match a request mimetype to the given view_args_key or the allowed mimetypes provided by client.

Parameters:

Name Type Description Default
view_args_key str | None

the key in the url request to check within for matching format.

required
allowed_formats list

list of acceptable mimetypes.

required
default_format str

the mimetype to use by default if view_args_key value is not allowed.

'text/html'

Returns:

Name Type Description
result_format str

the mimetype for the format to return.

Source code in sandhill/utils/request.py
def match_request_format(view_args_key, allowed_formats, default_format='text/html'):
    """
    Match a request mimetype to the given view_args_key or the allowed mimetypes \
    provided by client. \n
    Args:
        view_args_key (str|None): the key in the url request to check within for \
            matching format. \n
        allowed_formats (list): list of acceptable mimetypes. \n
        default_format (str): the mimetype to use by default if view_args_key value is \
            not allowed. \n
    Returns:
        result_format (str): the mimetype for the format to return. \n
    """
    result_format = default_format
    # check for accept header
    for mtype, _ in list(request.accept_mimetypes):
        if mtype in allowed_formats:
            result_format = mtype
            break

    # check for ext; e.g. search.json
    if request.view_args and view_args_key in request.view_args:
        mimetypes.init()
        extension = "." + request.view_args[view_args_key]
        if extension in mimetypes.types_map:
            result_format = mimetypes.types_map[extension]

    if result_format not in allowed_formats:
        abort(501)

    return result_format

overlay_with_query_args(query_config, request_args=None, *, allow_undefined=False)

Given a query config, overlay request.args on the defaults to generate a combined list of query arguments

Parameters:

Name Type Description Default
query_config dict

A dictionary containing rules for query args to parse, each key being a query arg name (e.g. "arg_name").

Format for each key as below:

"arg_name": {
    "base": ["value"]
      -- Optional (str | list): value that cannot be changed by request.args; will
         always be returned
    "default": ["overridable"]
      -- Optional (str | list): value which will be replaced by matching
         requst.args, if passed
}

Either "base" or "default" is required. If appropriate "arg_name" is not passed, that "arg_name" will be filtered out. If "default" is not set, the request,arg matching "arg_name" will be filtered out. Both "base" and "default" may be set at the same time. In this case, only the "default" value will be able to be overridded by requests.args; the "base" will remain unchanged.

required
request_args dict
None
allow_undefined bool

If True, fields not defined in the query_config will be permitted

False

Returns:

Type Description
dict

A dict of the combined query arguments

Source code in sandhill/utils/request.py
def overlay_with_query_args(query_config, request_args=None, *, allow_undefined=False):
    """
    Given a query config, overlay request.args on the defaults to generate a combined \
    list of query arguments \n
    Args:
        query_config (dict): A dictionary containing rules for query args to parse, each key \
                             being a query arg name (e.g. "arg_name").\n
            Format for each key as below: \n
            ```
            "arg_name": {
                "base": ["value"]
                  -- Optional (str | list): value that cannot be changed by request.args; will
                     always be returned
                "default": ["overridable"]
                  -- Optional (str | list): value which will be replaced by matching
                     requst.args, if passed
            }
            ``` \n
            Either "base" or "default" is required. If appropriate "arg_name" is not passed, \
            that "arg_name" will be filtered out. If "default" is not set, the request,arg \
            matching "arg_name" will be filtered out. Both "base" and "default" may be set at \
            the same time. In this case, only the "default" value will be able to be \
            overridded by requests.args; the "base" will remain unchanged. \n
        request_args (dict):
        allow_undefined (bool): If True, fields not defined in the query_config will be permitted \n
    Returns:
        (dict): A dict of the combined query arguments \n
    """
    # grab the query string params and convert to a flat dict if request args not passed in
    # i.e. duplicative keys will be converted to a list of strings
    if request_args is None:
        request_args = {}

    # avoid modifying incoming args dict and always allow url override (config to be enforced next)
    request_args = deepcopy(request_args) | request.args.to_dict(flat=False)

    query_params = {}
    for field_name, field_conf in query_config.items():
        query_params[field_name] = []
        # Load base from config
        if 'base' in field_conf:
            query_params[field_name] = field_conf['base']
        # Load from request_args
        if field_name in request_args:
            # Allow override if field defined with a default
            if 'default' in field_conf and field_conf['default'] is not None:
                query_params[field_name] = touniquelist(
                    query_params[field_name],
                    request_args[field_name]
                )
            # Remove field from request_args having already processed it
            del request_args[field_name]
        # Load default from config if solr_param field not defined in request_args
        elif 'default' in field_conf:
            query_params[field_name] = touniquelist(
                query_params[field_name],
                field_conf['default']
            )
        # Remove field from solr query if empty
        if not any(query_params[field_name]):
            del query_params[field_name]

        # restrictions
        #TODO something like: query_params[field_name] = apply_restrictions(
        #    query_params[field_name], field_conf['restrictions'])
        # Anyone who puts a negative number in rows or (any other non integer) will
        # get what they deserve. "-1" for instance will return the max
        # number of search results.
        if 'max' in field_conf:
            query_params[field_name] = [
                val if str(val).isdigit() and int(val) < int(field_conf['max'])
                else str(field_conf['max']) for val in query_params[field_name]
            ]
        if 'min' in field_conf:
            query_params[field_name] = [
                val if str(val).isdigit() and int(val) > int(field_conf['min'])
                else str(field_conf['min']) for val in query_params[field_name]
            ]

    if allow_undefined:
        query_params.update(request_args)

    return query_params

utils.solr.Solr

Class for handling Solr related logic, such as encoding/decoding.

sandhill.utils.template

Template and Jinja2 utilities

evaluate_conditions(conditions, ctx, match_all=True)

Render each conditions' evaluate using the given context; the result must match a value in the conditions' match_when or none of the conditions' match_when_not.

Parameters:

Name Type Description Default
conditions list

List of dict containing keys 'value' and 'allowed'

required
ctx dict

Context dictionary for template variables

required
match_all bool

If all conditions need to be matched for it to be considered a match.

Default: True

True

Returns:

Type Description
int

returns the number of matches matched ONLY if all are matched, else returns 0

Raises:

Type Description
KeyError

when "match_when"/"match_when_not" or "evaluate" is not in conditions

Source code in sandhill/utils/template.py
def evaluate_conditions(conditions, ctx, match_all=True):
    """
    Render each conditions' `evaluate` using the given context; the result must \
    match a value in the conditions' `match_when` or none of the conditions' `match_when_not`. \n
    Args:
        conditions (list): List of dict containing keys 'value' and 'allowed' \n
        ctx (dict): Context dictionary for template variables \n
        match_all (bool): If all conditions need to be matched for it to be considered \
            a match. \n
            Default: True \n
    Returns:
        (int): returns the number of matches matched ONLY if all are matched, else returns 0 \n
    Raises:
        KeyError: when "match_when"/"match_when_not" or "evaluate" is not in conditions \n
    """
    matched = 0
    for match in conditions:
        # Idea: use boosts if the matched value for 2 config files is the same,
        # e.g. matched += boost
        check_value = render_template_string(match['evaluate'], ctx)
        if not any(key in ['match_when', 'match_when_not'] for key in match.keys()) or \
            {'match_when', 'match_when_not'}.issubset(set(match.keys())):
            raise KeyError(
                "One (and only one) of the keys 'match_when' or 'match_when_not' must be present"
            )
        if 'match_when' in match and check_value in match['match_when']:
            matched += 1
        elif 'match_when_not' in match and check_value not in match['match_when_not']:
            matched += 1
    # Only assigned matched value if ALL matches are successful
    return matched if matched == len(conditions) or not match_all else 0

render_template_json(json_obj, ctx)

Serialize a JSON, render it as a template, then convert back to JSON

Parameters:

Name Type Description Default
json_obj dict | list

JSON represented in Python

required
ctx dict

Context for the jinja template

required

Returns:

Type Description
dict | list

The updated JSON structure

Raises:

Type Description
JSONDecodeError

If the resulting templace is unable to be parsed as JSON

Source code in sandhill/utils/template.py
def render_template_json(json_obj, ctx):
    """
    Serialize a JSON, render it as a template, then convert back to JSON \n
    Args:
        json_obj (dict|list): JSON represented in Python \n
        ctx (dict): Context for the jinja template \n
    Returns:
        (dict|list): The updated JSON structure \n
    Raises:
        json.JSONDecodeError: If the resulting templace is unable to be parsed as JSON \n
    """
    # Disable autoescape for JSON output to avoid HTML entities being injected
    rendered = render_template_string(
        "{% autoescape false -%}" +
        json.dumps(json_obj) +
        "{%- endautoescape %}",
        ctx
    )
    return json.loads(rendered)

render_template_string(template_str, ctx)

Renders Jinja templates with added Sandhill filters/context processors.

Parameters:

Name Type Description Default
template_str string

jinja template variable

required
ctx dict

Context for the jinja template

required

Returns:

Type Description
str

The rendered template as a string.

Raises:

Type Description
TemplateError

On invalid template.

Source code in sandhill/utils/template.py
def render_template_string(template_str, ctx):
    """
    Renders Jinja templates with added Sandhill filters/context processors. \n
    Args:
        template_str (string): jinja template variable \n
        ctx (dict): Context for the jinja template \n
    Returns:
        (str): The rendered template as a string. \n
    Raises:
        jinja2.TemplateError: On invalid template. \n
    """
    with context.app_context():
        return flask.render_template_string(template_str, **ctx)

sandhill.utils.test

Dummy functions for use in unit tests.

sandhill.utils.xml

XML loading and handling functionality.

load(source) -> etree._Element

Load an XML document.

Parameters:

Name Type Description Default
source

XML source. Either path, url, string, or loaded LXML Element

required

Returns:

Type Description
_Element

Loaded XML object tree, or None on invalid source

Source code in sandhill/utils/xml.py
@catch(etree.XMLSyntaxError, "Invalid XML source: {source} Exc: {exc}", return_val=None)
@catch(RequestException, "XML API call failed: {source} Exc: {exc}", return_val=None)
@catch(RequestsConnectionError, "Invalid host in XML call: {source} Exc: {exc}", return_val=None)
def load(source) -> etree._Element: # pylint: disable=protected-access
    '''
    Load an XML document. \n
    Args:
        source: XML source. Either path, url, string, or loaded LXML Element \n
    Returns:
        Loaded XML object tree, or None on invalid source \n
    '''
    if not isinstance(source, (str, bytes)) or len(source) < 1:
        # pylint: disable=protected-access
        return source if isinstance(source, etree._ElementTree) else None

    source = source.strip()
    if source[0] == ord('<'):           # Handle source as bytes
        source = io.BytesIO(source)
    elif source[0] == '<':              # Handle source as string
        source = io.StringIO(source)
    elif checkers.is_file(source):      # Handle source as local file
        pass  # etree.parse handles local file paths natively
    elif checkers.is_url(source):       # Handle source as URL
        response = requests.get(source, timeout=10)
        if not response:
            app.logger.warning(f"Failed to retrieve XML URL (or timed out): {source}")
            return None
        source = io.BytesIO(response.content)
    else:
        app.logger.warning(f"XML source is not valid file, URL, or XML string. {source[:40]}"
                           + (len(source) > 40) * '...')
        return None

    return etree.parse(source)

xpath(source, query) -> list

Retrieve the matching xpath content from an XML source

Parameters:

Name Type Description Default
query str

XPath query to match against

required
source

XML source. Either path, url, or string

required

Returns:

Type Description
list

Matching results from XPath query, or None on failure

Source code in sandhill/utils/xml.py
@catch(etree.XPathEvalError, "Invalid XPath query {query} Exc {exc}", return_val=None)
def xpath(source, query) -> list:
    '''
    Retrieve the matching xpath content from an XML source \n
    Args:
        query (str): XPath query to match against \n
        source: XML source. Either path, url, or string \n
    Returns:
        Matching results from XPath query, or None on failure \n
    '''
    doc = load(source)
    return doc.xpath(query, namespaces=doc.getroot().nsmap) if doc else None

xpath_by_id(source, query) -> dict

For the matching xpath content, organize into dict with key being the id param of the matched tags. Elements without an id attribute will not be returned.

Parameters:

Name Type Description Default
query str

XPath query to match against

required
source

XML source. Either path, url, or string

required

Returns:

Type Description
dict | None

Dict mapping with keys of id, and values of content within matching elements, or None on failure

Source code in sandhill/utils/xml.py
def xpath_by_id(source, query) -> dict:
    '''
    For the matching xpath content, organize into dict with key \
    being the id param of the matched tags. Elements without an id attribute \
    will not be returned. \n
    Args:
        query (str): XPath query to match against \n
        source: XML source. Either path, url, or string \n
    Returns:
        (dict|None): Dict mapping with keys of id, and values of content within \
                     matching elements, or None on failure \n
    '''
    matched = xpath(source, query)
    if matched is None: # Explicit check to avoid empty results being matched
        return None

    idmap = {}
    for match in matched:
        if 'id' in match.keys():
            text = match.text if match.text is not None else ''
            for elem in match.iterchildren():
                text += etree.tostring(elem, encoding='unicode')
            text += match.tail if match.tail is not None else ''
            idmap[match.get('id')] = text
    return idmap

Bootstrap

This is the code that starts up Sandhill, initializing the application. Have a look at the bootstrap documentation for more details.

bootstrap

The core of the bootstrap module handles, among other things, loading other Python code.

bootstrap.request

Standard changes Sandhill makes to the default Flask request object.

Specifically, it:

  • Adds query_args, a normal Python dictionary with args as keys.

bootstrap.g

Standard changes Sandhill makes to the default Flask g object.

Specifically, it:

  • Adds instance_path available in g object.

bootstrap.debugtoolbar

Bootstrap hook to add FlaskDebugToolbar to the Flaks application when debug mode is enabled.

bootstrap.disable_debug_caching

sandhill.bootstrap.disable_debug_caching.disable_browser_cache(response)

Adds headers to disable browser caching when app is in debug mode.

Source code in sandhill/bootstrap/disable_debug_caching.py
@app.after_request
def disable_browser_cache(response):
    """Adds headers to disable browser caching when app is in debug mode."""
    if app.debug:
        response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
        response.headers["Pragma"] = "no-cache"
        response.headers["Expires"] = 0
    return response