from botocore.handlers import check_for_200_error as boto_check_for_200_error from botocore.handlers import ( inject_presigned_url_ec2 as boto_inject_presigned_url_ec2, ) from botocore.handlers import ( inject_presigned_url_rds as boto_inject_presigned_url_rds, ) from botocore.handlers import ( parse_get_bucket_location as boto_parse_get_bucket_location, ) from botocore.hooks import HierarchicalEmitter, logger from botocore.signers import ( add_generate_db_auth_token as boto_add_generate_db_auth_token, ) from botocore.signers import ( add_generate_presigned_post as boto_add_generate_presigned_post, ) from botocore.signers import ( add_generate_presigned_url as boto_add_generate_presigned_url, ) from ._helpers import resolve_awaitable from .handlers import ( check_for_200_error, inject_presigned_url_ec2, inject_presigned_url_rds, parse_get_bucket_location, ) from .signers import ( add_generate_db_auth_token, add_generate_presigned_post, add_generate_presigned_url, ) _HANDLER_MAPPING = { boto_inject_presigned_url_ec2: inject_presigned_url_ec2, boto_inject_presigned_url_rds: inject_presigned_url_rds, boto_add_generate_presigned_url: add_generate_presigned_url, boto_add_generate_presigned_post: add_generate_presigned_post, boto_add_generate_db_auth_token: add_generate_db_auth_token, boto_parse_get_bucket_location: parse_get_bucket_location, boto_check_for_200_error: check_for_200_error, } class AioHierarchicalEmitter(HierarchicalEmitter): async def _emit(self, event_name, kwargs, stop_on_response=False): responses = [] # Invoke the event handlers from most specific # to least specific, each time stripping off a dot. handlers_to_call = self._lookup_cache.get(event_name) if handlers_to_call is None: handlers_to_call = self._handlers.prefix_search(event_name) self._lookup_cache[event_name] = handlers_to_call elif not handlers_to_call: # Short circuit and return an empty response is we have # no handlers to call. This is the common case where # for the majority of signals, nothing is listening. return [] kwargs['event_name'] = event_name responses = [] for handler in handlers_to_call: logger.debug('Event %s: calling handler %s', event_name, handler) # Await the handler if its a coroutine. response = await resolve_awaitable(handler(**kwargs)) responses.append((handler, response)) if stop_on_response and response is not None: return responses return responses async def emit_until_response(self, event_name, **kwargs): responses = await self._emit(event_name, kwargs, stop_on_response=True) if responses: return responses[-1] else: return None, None def _verify_and_register( self, event_name, handler, unique_id, register_method, unique_id_uses_count, ): handler = _HANDLER_MAPPING.get(handler, handler) self._verify_is_callable(handler) self._verify_accept_kwargs(handler) register_method(event_name, handler, unique_id, unique_id_uses_count)