Source code for crossbar.bridge.rest.callee
#####################################################################################
#
# Copyright (c) typedef int GmbH
# SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################
from urllib.parse import urljoin
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.web.http_headers import Headers
[docs]
class RESTCallee(ApplicationSession):
def __init__(self, *args, **kwargs):
[docs]
self._webtransport = kwargs.pop("webTransport", None)
if not self._webtransport:
import treq
self._webtransport = treq
super(RESTCallee, self).__init__(*args, **kwargs)
@inlineCallbacks
[docs]
def onJoin(self, details):
assert "baseurl" in self.config.extra
assert "procedure" in self.config.extra
baseURL = self.config.extra["baseurl"]
procedure = self.config.extra["procedure"]
@inlineCallbacks
def on_call(method=None, url=None, body="", headers={}, params={}):
newURL = urljoin(baseURL, url)
params = {x.encode("utf8"): y.encode("utf8") for x, y in params.items()}
res = yield self._webtransport.request(
method, newURL, data=body.encode("utf8"), headers=Headers(headers), params=params
)
content = yield self._webtransport.text_content(res)
headers = {
x.decode("utf8"): [z.decode("utf8") for z in y]
for x, y in dict(res.headers.getAllRawHeaders()).items()
}
resp = {"code": res.code, "content": content, "headers": headers}
returnValue(resp)
yield self.register(on_call, procedure)