Source code for spacetrack.aio

import asyncio
import ssl
import time
from collections.abc import Mapping

import aiohttp
import aiohttp.web_exceptions
import requests.certs
from aiohttp.helpers import parse_mimetype

from .base import RATELIMIT_KEY, AuthenticationError, SpaceTrackClient, logger
from .operators import _stringify_predicate_value


[docs]class AsyncSpaceTrackClient(SpaceTrackClient): """Asynchronous SpaceTrack client class. This class should be considered experimental. It must be closed by calling :meth:`~spacetrack.aio.AsyncSpaceTrackClient.close`. Alternatively, instances of this class can be used as a context manager. Parameters: identity: Space-Track username. password: Space-Track password. base_url: May be overridden to use e.g. https://testing.space-track.org/ For more information, refer to the `Space-Track documentation`_. .. _`Space-Track documentation`: https://www.space-track.org/documentation #api-requestClasses .. attribute:: session :class:`aiohttp.ClientSession` instance. """ @staticmethod def _create_session(): # Use requests/certifi CA file ctx = ssl.create_default_context(cafile=requests.certs.where()) connector = aiohttp.TCPConnector(ssl=ctx) return aiohttp.ClientSession(connector=connector) async def _ratelimit_callback(self, until): duration = int(round(until - time.time())) logger.info('Rate limit reached. Sleeping for {:d} seconds.', duration) if self.callback is not None: await self.callback(until)
[docs] async def authenticate(self): if not self._authenticated: login_url = self.base_url + 'ajaxauth/login' data = {'identity': self.identity, 'password': self.password} resp = await self.session.post(login_url, data=data) await _raise_for_status(resp) # If login failed, we get a JSON response with {'Login': 'Failed'} resp_data = await resp.json() if isinstance(resp_data, Mapping): if resp_data.get('Login', None) == 'Failed': raise AuthenticationError() self._authenticated = True
[docs] async def generic_request(self, class_, iter_lines=False, iter_content=False, controller=None, parse_types=False, **kwargs): """Generic Space-Track query coroutine. The request class methods use this method internally; the public API is as follows: .. code-block:: python st.tle_publish(*args, **st) st.basicspacedata.tle_publish(*args, **st) st.file(*args, **st) st.fileshare.file(*args, **st) st.spephemeris.file(*args, **st) They resolve to the following calls respectively: .. code-block:: python st.generic_request('tle_publish', *args, **st) st.generic_request('tle_publish', *args, controller='basicspacedata', **st) st.generic_request('file', *args, **st) st.generic_request('file', *args, controller='fileshare', **st) st.generic_request('file', *args, controller='spephemeris', **st) Parameters: class_: Space-Track request class name iter_lines: Yield result line by line iter_content: Yield result in 100 KiB chunks. controller: Optionally specify request controller to use. parse_types: Parse string values in response according to type given in predicate information, e.g. ``'2017-01-01'`` -> ``datetime.date(2017, 1, 1)``. **kwargs: These keywords must match the predicate fields on Space-Track. You may check valid keywords with the following snippet: .. code-block:: python spacetrack = AsyncSpaceTrackClient(...) await spacetrack.tle.get_predicates() # or await spacetrack.get_predicates('tle') See :func:`~spacetrack.operators._stringify_predicate_value` for which Python objects are converted appropriately. Yields: Lines—stripped of newline characters—if ``iter_lines=True`` Yields: 100 KiB chunks if ``iter_content=True`` Returns: Parsed JSON object, unless ``format`` keyword argument is passed. .. warning:: Passing ``format='json'`` will return the JSON **unparsed**. Do not set ``format`` if you want the parsed JSON object returned! """ if iter_lines and iter_content: raise ValueError('iter_lines and iter_content cannot both be True') if 'format' in kwargs and parse_types: raise ValueError('parse_types can only be used if format is unset.') if controller is None: controller = self._find_controller(class_) else: classes = self.request_controllers.get(controller, None) if classes is None: raise ValueError( f'Unknown request controller {controller!r}') if class_ not in classes: raise ValueError( f'Unknown request class {class_!r} for controller {controller!r}') # Decode unicode unless class == download, including conversion of # CRLF newlines to LF. decode = (class_ != 'download') if not decode and iter_lines: error = ( 'iter_lines disabled for binary data, since CRLF newlines ' 'split over chunk boundaries would yield extra blank lines. ' 'Use iter_content=True instead.') raise ValueError(error) await self.authenticate() url = f'{self.base_url}{controller}/query/class/{class_}' offline_check = (class_, controller) in self.offline_predicates valid_fields = {p.name for p in self.rest_predicates} predicates = None if not offline_check: predicates = await self.get_predicates(class_) predicate_fields = {p.name for p in predicates} valid_fields = predicate_fields | {p.name for p in self.rest_predicates} else: valid_fields |= self.offline_predicates[(class_, controller)] for key, value in kwargs.items(): if key not in valid_fields: raise TypeError( f"'{class_}' got an unexpected argument '{key}'") value = _stringify_predicate_value(value) url += f'/{key}/{value}' logger.debug(url) resp = await self._ratelimited_get(url) await _raise_for_status(resp) if iter_lines: return _iter_lines_generator(resp, decode_unicode=decode) elif iter_content: return _iter_content_generator(resp, decode_unicode=decode) else: # If format is specified, return that format unparsed. Otherwise, # parse the default JSON response. if 'format' in kwargs: if decode: # Replace CRLF newlines with LF, Python will handle platform # specific newlines if written to file. data = await resp.text() data = data.replace('\r', '') else: data = await resp.read() return data else: data = await resp.json() if predicates is None or not parse_types: return data else: return self._parse_types(data, predicates)
async def _ratelimited_get(self, *args, **kwargs): minute_limit = self._per_minute_throttle.check(RATELIMIT_KEY, 1) hour_limit = self._per_hour_throttle.check(RATELIMIT_KEY, 1) sleep_time = 0 if minute_limit.limited: sleep_time = minute_limit.retry_after.total_seconds() if hour_limit.limited: sleep_time = max(sleep_time, hour_limit.retry_after.total_seconds()) if sleep_time > 0: await self._ratelimit_wait(sleep_time) resp = await self.session.get(*args, **kwargs) # It's possible that Space-Track will return HTTP status 500 with a # query rate limit violation. This can happen if a script is cancelled # before it has finished sleeping to satisfy the rate limit and it is # started again. # # Let's catch this specific instance and retry once if it happens. if resp.status == 500: text = await resp.text() # Let's only retry if the error page tells us it's a rate limit # violation.in if 'violated your query rate limit' in text: # It seems that only the per-minute rate limit causes an HTTP # 500 error. Breaking the per-hour limit seems to result in an # email from Space-Track instead. await self._ratelimit_wait(60) resp = await self.session.get(*args, **kwargs) return resp async def _ratelimit_wait(self, duration): until = time.monotonic() + duration asyncio.ensure_future(self._ratelimit_callback(until)) await asyncio.sleep(duration) async def _download_predicate_data(self, class_, controller): """Get raw predicate information for given request class, and cache for subsequent calls. """ await self.authenticate() url = f'{self.base_url}{controller}/modeldef/class/{class_}' resp = await self._ratelimited_get(url) await _raise_for_status(resp) resp_json = await resp.json() return resp_json['data']
[docs] async def get_predicates(self, class_, controller=None): """Get full predicate information for given request class, and cache for subsequent calls. """ if class_ not in self._predicates: if controller is None: controller = self._find_controller(class_) else: classes = self.request_controllers.get(controller, None) if classes is None: raise ValueError( f'Unknown request controller {controller!r}') if class_ not in classes: raise ValueError( f'Unknown request class {class_!r}') predicates_data = await self._download_predicate_data( class_, controller) predicate_objects = self._parse_predicates_data(predicates_data) self._predicates[class_] = predicate_objects return self._predicates[class_]
def __enter__(self): raise TypeError("Use async with instead") def __exit__(self, exc_type, exc_val, exc_tb): pass async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close()
[docs] async def close(self): """Close aiohttp session.""" await self.session.close()
def get_encoding(response): ctype = response.headers.get('content-type', '').lower() mimetype = parse_mimetype(ctype) # Fallback to UTF-8 return mimetype.parameters.get('charset', 'UTF-8') async def _iter_content_generator(response, decode_unicode): encoding = None if decode_unicode: ctype = response.headers.get('content-type', '').lower() mimetype = parse_mimetype(ctype) # Fallback to UTF-8 encoding = mimetype.parameters.get('charset', 'UTF-8') async for chunk in response.content.iter_chunked(100 * 1024): if decode_unicode: chunk = chunk.decode(encoding) # Replace CRLF newlines with LF, Python will handle # platform specific newlines if written to file. chunk = chunk.replace('\r\n', '\n') # Chunk could be ['...\r', '\n...'], strip trailing \r chunk = chunk.rstrip('\r') yield chunk async def _iter_lines_generator(response, decode_unicode): pending = None async for chunk in _iter_content_generator(response, decode_unicode): if pending is not None: chunk = pending + chunk lines = chunk.splitlines() if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]: pending = lines.pop() else: pending = None for line in lines: yield line if pending is not None: yield pending async def _raise_for_status(response): """Raise an appropriate error for a given response. Arguments: response (:py:class:`aiohttp.ClientResponse`): The API response. Raises: :py:class:`aiohttp.ClientResponseError`: The appropriate error for the response's status. """ if 400 <= response.status: reason = response.reason spacetrack_error_msg = None try: json = await response.json() if isinstance(json, Mapping): spacetrack_error_msg = json['error'] except (ValueError, KeyError, aiohttp.ClientResponseError): pass if not spacetrack_error_msg: spacetrack_error_msg = await response.text() if spacetrack_error_msg: reason += '\nSpace-Track response:\n' + spacetrack_error_msg raise aiohttp.ClientResponseError( response.request_info, response.history, status=response.status, message=reason, headers=response.headers)