getVideo/webview/wsgi.py


Home Back

"""
The bundled WSGI apps.

* Routing: Uses URL prefixes to route to applications
* StaticFiles: Serves a directory
* StaticResources: Serves the resources from a python package
"""

import email.utils  # For datetime formatting
import functools
from http import HTTPStatus
import logging
import mimetypes
import os
import posixpath
import traceback
import wsgiref.simple_server
import wsgiref.util

try:
    # Python 3.7+
    import importlib.resources as importlib_resources
except ImportError as e :
    # Python 3.6
    import importlib_resources

from .util import abspath


__all__ = ('StaticFiles', 'StaticResources', 'Routing')

logger = logging.getLogger(__name__)
CHUNK_SIZE = 4 * 1024  # 4k


# Follow Django in treating URLs as UTF-8 encoded (which requires undoing the
# implicit ISO-8859-1 decoding applied in Python 3). Strictly speaking, URLs
# should only be ASCII anyway, but UTF-8 can be found in the wild.
def decode_path_info(path_info):
    return path_info.encode("iso-8859-1", "replace").decode("utf-8", "replace")


def send_simple_text(environ, start_response, status, body):
    """
    Send a simple message as plain text
    """
    if isinstance(status, int):
        status = "{} {}".format(int(status), status.phrase)

    if isinstance(body, str):
        body = body.encode('utf-8')

    response_headers = [
        ('Content-Type', 'text/plain'),
        ('Content-Length', str(len(body)))
    ]

    start_response(status, response_headers)
    return [body]


def do_403(environ, start_response):
    """
    Generic app to produce a 403
    """
    urlpath = environ['SCRIPT_NAME'] + environ['PATH_INFO']

    return send_simple_text(
        environ, start_response, HTTPStatus.FORBIDDEN, "Path {} is not allowed.".format(urlpath),
    )


def do_404(environ, start_response):
    """
    Generic app to produce a 404
    """
    urlpath = environ['SCRIPT_NAME'] + environ['PATH_INFO']

    return send_simple_text(
        environ, start_response, HTTPStatus.NOT_FOUND, "Path {} was not found".format(urlpath),
    )


def do_405(environ, start_response):
    """
    Generic app to produce a 405
    """
    urlpath = environ['SCRIPT_NAME'] + environ['PATH_INFO']

    return send_simple_text(
        environ, start_response, HTTPStatus.METHOD_NOT_ALLOWED,
        "Method {} is not allowed on {}".format(
            environ['REQUEST_METHOD'], urlpath,
        ),
    )


def do_options(environ, start_response):
    """
    Generic app to produce a response to OPTIONS
    """
    start_response("204 No Content", [
        ('Allow', 'OPTIONS, GET, HEAD'),
    ])
    return []


def wsgi_catch_errors(func):
    @functools.wraps(func)
    def handler(*p):
        try:
            return func(*p)
        except BaseException:
            start_response = p[-1]
            start_response("500 Server Error", [
                ('Content-Type', 'text/plain'),
            ])
            return [traceback.format_exc().encode('utf-8')]

    return handler


class Routing(dict):
    """
    Implements a basic URL routing system.

    Path prefixes are compared to the request path. The longest prefix wins.

    Example:
        Routing({
            '/': app,
            '/static': Static('mystatic'),
        })
    """

    def no_route_found(self, environ, start_response):
        """
        Handle if there was no matching route
        """
        return do_404(environ, start_response)

    @wsgi_catch_errors
    def __call__(self, environ, start_response):
        # SCRIPT_NAME + PATH_INFO = full url
        urlpath = environ['SCRIPT_NAME'] + environ['PATH_INFO']
        if not urlpath:
            urlpath = '/'

        potentials = [
            prefix
            for prefix in self.keys()
            if posixpath.commonpath([prefix, urlpath]) == prefix
        ]
        try:
            match = max(potentials, key=len)
        except ValueError:
            # max() got an empty list, aka no matches found
            return self.no_route_found(environ, start_response)

        logger.debug("For %r found %r routes, selected %r", urlpath, potentials, match)

        app = self[match]
        environ['SCRIPT_NAME'] = urlpath[:len(match)]
        environ['PATH_INFO'] = urlpath[len(match):]

        return app(environ, start_response)


class StaticContentsApp:
    """
    Base class for static serving implementatins
    """
    max_age = 60  # 1min, takes the edge off any frequent responses while staying fresh

    def method_not_allowed(self, environ, start_response):
        """
        Handle if we got something besides GET or HEAD
        """
        return do_405(environ, start_response)

    def file_not_found(self, environ, start_response):
        """
        Handle if the file cannot be found
        """
        return do_404(environ, start_response)

    def is_a_directory(self, environ, start_response):
        """
        Handle if we were given a directory
        """
        return do_404(environ, start_response)

    def no_permissions(self, environ, start_response):
        """
        Handle if we can't open the file
        """
        return do_403(environ, start_response)

    def open(path):
        """
        Return a file-like object in 'rb' mode.

        The path given is normalized.

        Add a .name attribute to the file if applicable

        Raise a FileNotFoundError, IsADirectoryError, or a PermissionError in
        case of error.
        """
        raise NotImplementedError

    @wsgi_catch_errors
    def __call__(self, environ, start_response):
        if environ['REQUEST_METHOD'] == 'OPTIONS':
            return do_options(environ, start_response)
        elif environ['REQUEST_METHOD'] not in ('GET', 'HEAD'):
            return self.method_not_allowed(environ, start_response)

        path = posixpath.normpath(environ['PATH_INFO'] or '/')
        path_options = [path]

        if path.endswith('/'):
            path_options.append(path[:-1])

        path_options.append(posixpath.join(path, 'index.html'))

        responder = None
        for option in path_options:
            try:
                file = self.open(option)
            except FileNotFoundError:
                logger.debug("file not found: %s", option)
                if responder is None:
                    responder = self.file_not_found
            except (IsADirectoryError, OSError): # OSError on Windows
                logger.debug("is a directory: %s", option)
                if responder is None:
                    responder = self.is_a_directory
            except PermissionError:
                logger.debug("permission error: %s", option)
                if responder is None:
                    responder = self.no_permissions
            except NotADirectoryError:
                logger.debug("not a directory: %s", option)
                # This can happen if we get a file with a trailing slash
                # This should only happen with the first option, and should be
                # covered by the next option
                pass
            else:
                break
        else:
            assert responder
            return responder(environ, start_response)

        if hasattr(file, 'name'):
            filename = file.name
        else:
            filename = path

        mime, _ = mimetypes.guess_type(filename, strict=False)

        # NOTE: We're not doing cache control checking, because we don't
        # consistently have stat() available.

        # TODO: Type negotiation

        if 'HTTP_RANGE' in environ:
            return self._serve_partial_file(environ, start_response, file, filename, mime)
        else:
            return self._serve_whole_file(environ, start_response, file, filename, mime)

    def _default_headers(self, mime, file):
        rv = wsgiref.headers.Headers([
            ('Content-Type', mime or 'application/octect-stream'),
            ('Accept-Ranges', 'bytes'),
            ('Cache-Control', 'max-age={}'.format(self.max_age))
        ])

        if hasattr(file, 'fileno'):
            try:
                stat = os.fstat(file.fileno())
            except OSError:
                pass
            else:
                rv['Content-Length'] = str(stat.st_size)
                # rv['Last-Modified'] = email.utils.formatdate(stat.st_mtime, usegmt=True)

        return rv

    def _serve_whole_file(self, environ, start_response, file, filename, mime):
        response_headers = self._default_headers(mime, file)

        start_response('200 OK', response_headers._headers)

        if environ['REQUEST_METHOD'] == 'HEAD':
            file.close()
            return []
        else:
            wrapper = environ.get('wsgi.file_wrapper', wsgiref.util.FileWrapper)
            return wrapper(file, CHUNK_SIZE)

    def _parse_range(self, header, length):
        logger.debug("Got range header %r (length=%s)", header, length)
        unit, _, ranges = header.partition('=')
        if unit != 'bytes':
            raise ValueError("Range not satisfiable: {}".format(header))

        ranges = [bit.strip().split('-') for bit in ranges.split(',')]
        start, end = ranges[0]
        start = int(start) if start else 0
        end = int(end) if end else None

        if length is not None:
            if end is None:
                end = length - 1
        return start, end

    def _compose_content_range(self, start, end, total):
        rv = 'bytes '
        if start is not None:
            rv += str(start)
        rv += '-'
        if end is not None:
            rv += str(end)
        rv += '/'
        if total is not None:
            rv += str(total)
        else:
            rv += '*'
        return rv

    def _serve_partial_file(self, environ, start_response, file, filename, mime):
        response_headers = self._default_headers(mime, file)
        length = response_headers['Content-Length']
        if length:
            length = int(length)
        else:
            length = None
        start, end = self._parse_range(environ['HTTP_RANGE'], length)

        if length is not None:
            # Check ranges
            maxindex = length - 1
            if start > maxindex or end > maxindex:
                start_response('416 Range Not Satisfiable', [
                    ('Content-Range', 'bytes */{}'.format(length))
                ])
                return []

        assert start <= end
        assert length is None or end < length

        logger.debug("Serving %s (%s to %s of %s)", filename, start, end, length)

        response_headers['Content-Range'] = self._compose_content_range(start, end, length)
        if end is None:
            amount = None
            del response_headers['Content-Length']
        else:
            amount = end - start + 1
            response_headers['Content-Length'] = str(amount)

        start_response('206 Partial Content', response_headers._headers)

        if environ['REQUEST_METHOD'] == 'HEAD':
            file.close()
            return []
        else:
            return self._partial_file_wrapper(file, start, amount)

    def _partial_file_wrapper(self, file, skip, amount):
        served = 0

        if skip:
            file.seek(skip)

        while (amount is None) or (served <= amount):
            data = file.read(min(CHUNK_SIZE, amount - served))
            if not data:
                break
            served += len(data)
            yield data

        logging.debug("Served %s of %s", served, amount)


class StaticFiles(StaticContentsApp):
    """
    Serves static files from a directory on the file system.
    """
    def __init__(self, root):
        self.root = abspath(root)

    def open(self, file):
        if file:
            path = os.path.join(self.root, file.lstrip('/'))
        else:
            path = self.root
        logger.debug('Resolved %s to %s' % (file, path))
        return open(path, 'rb')


class StaticResources(StaticContentsApp):
    """
    Serves static files from resources in python packages
    """
    def __init__(self, root):
        self.root = root

    def open(self, file):
        slashed, basename = posixpath.split(file)
        slashed = slashed.rstrip('/')
        if slashed:
            packagename = "{}.{}".format(self.root, slashed.replace('/', '.'))
        else:
            packagename = self.root
        try:
            return importlib_resources.open_binary(packagename, basename)
        except ModuleNotFoundError:
            raise FileNotFoundError

Powered by Code, a simple repository browser by Fabio Di Matteo