Source code for crossbar.webservice.longpoll

#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

import os

from txaio import make_logger

import crossbar
from crossbar.router import longpoll
from crossbar.webservice.base import RouterWebService


[docs] class WampLongPollResourceSession(longpoll.WampLongPollResourceSession): """ """ def __init__(self, parent, transport_details): longpoll.WampLongPollResourceSession.__init__(self, parent, transport_details)
[docs] self._transport_info = { "type": "longpoll", "protocol": transport_details["protocol"], "peer": transport_details["peer"], "http_headers_received": transport_details["http_headers_received"], "http_headers_sent": transport_details["http_headers_sent"], }
[docs] self._cbtid = None
[docs] class WampLongPollResource(longpoll.WampLongPollResource): """ """
[docs] protocol = WampLongPollResourceSession
[docs] log = make_logger()
[docs] def getNotice(self, peer, redirectUrl=None, redirectAfter=0): try: page = self._templates.get_template("cb_lp_notice.html") content = page.render( redirectUrl=redirectUrl, redirectAfter=redirectAfter, cbVersion=crossbar.__version__, peer=peer, workerPid=os.getpid(), ) content = content.encode("utf8") return content except Exception: self.log.failure("Error rendering LongPoll notice page template: {log_failure.value}")
[docs] class RouterWebServiceLongPoll(RouterWebService): """ HTTP-Long-Polling based WAMP transport wrapped as a Web service. """ @staticmethod
[docs] def create(transport, path, config): personality = transport.worker.personality personality.WEB_SERVICE_CHECKERS["longpoll"](personality, config) options = config.get("options", {}) resource = WampLongPollResource( transport._worker._router_session_factory, timeout=options.get("request_timeout", 10), killAfter=options.get("session_timeout", 30), queueLimitBytes=options.get("queue_limit_bytes", 128 * 1024), queueLimitMessages=options.get("queue_limit_messages", 100), debug_transport_id=options.get("debug_transport_id", None), ) resource._templates = transport.templates return RouterWebServiceLongPoll(transport, path, config, resource)