Files
challenge/challenge.py
2023-04-19 01:10:19 +02:00

70 lines
2.1 KiB
Python

""" Mini-Challenge"""
from os.path import join
from xml.etree import ElementTree as ET
from twisted.internet import defer
class Adapter:
"""Adapter class
Aufgabe: Asynchron XML-Daten lesen, konvertieren und zurückliefern.
"""
def __init__(self, path):
self.path = path
def _read(self, name):
with open(join(self.path, name + ".xml")) as f:
return f.read()
def _parse(self, xml_data):
root = ET.fromstring(xml_data)
token = root.find("{*}Token").text
return token
def _convert(self, token):
return '{"token": %s}' % token
def retrieve_token(self, name):
"""
Retrieve the content from file name.xml in adapters path directory
in a synchronous way, extract the DataReceived/Token and return it
as a JSON-Object (stringified) '{"token": value}'.
"""
try:
xml_data = self._read(name)
token = self._parse(xml_data)
return self._convert(token)
except Exception as e:
raise ValueError("Could not retrieve token: %s" % e)
def retrieve_token_async(self, name):
"""
Retrieve the content from file name.xml in adapters path directory
in an asynchronous way, extract the DataReceived/Token and return it
as a Deferred which ends up in a JSON-Object (as above).
see: https://twistedmatrix.com/documents/8.0.0/api/twisted.internet.defer.Deferred.html
Hint: The simple way is to use defer.succeed(), bonus points for doing
it with a callback chain read->parse->convert.
"""
d = defer.Deferred()
def on_success(xml_data):
try:
token = self._parse(xml_data)
converted = self._convert(token)
d.callback(converted)
except Exception as e:
d.errback(e)
def on_error(error):
d.errback(error)
deferred = defer.maybeDeferred(self._read, name)
deferred.addCallbacks(on_success, on_error)
return d