import copy import logging from botocore.exceptions import EndpointProviderError from botocore.regions import EndpointRulesetResolver LOG = logging.getLogger(__name__) class AioEndpointRulesetResolver(EndpointRulesetResolver): async def construct_endpoint( self, operation_model, call_args, request_context, ): """Invokes the provider with params defined in the service's ruleset""" if call_args is None: call_args = {} if request_context is None: request_context = {} provider_params = await self._get_provider_params( operation_model, call_args, request_context ) LOG.debug( 'Calling endpoint provider with parameters: %s' % provider_params ) try: provider_result = self._provider.resolve_endpoint( **provider_params ) except EndpointProviderError as ex: botocore_exception = self.ruleset_error_to_botocore_exception( ex, provider_params ) if botocore_exception is None: raise else: raise botocore_exception from ex LOG.debug('Endpoint provider result: %s' % provider_result.url) # The endpoint provider does not support non-secure transport. if not self._use_ssl and provider_result.url.startswith('https://'): provider_result = provider_result._replace( url=f'http://{provider_result.url[8:]}' ) # Multi-valued headers are not supported in botocore. Replace the list # of values returned for each header with just its first entry, # dropping any additionally entries. provider_result = provider_result._replace( headers={ key: val[0] for key, val in provider_result.headers.items() } ) return provider_result async def _get_provider_params( self, operation_model, call_args, request_context ): """Resolve a value for each parameter defined in the service's ruleset The resolution order for parameter values is: 1. Operation-specific static context values from the service definition 2. Operation-specific dynamic context values from API parameters 3. Client-specific context parameters 4. Built-in values such as region, FIPS usage, ... """ provider_params = {} # Builtin values can be customized for each operation by hooks # subscribing to the ``before-endpoint-resolution.*`` event. customized_builtins = await self._get_customized_builtins( operation_model, call_args, request_context ) for param_name, param_def in self._param_definitions.items(): param_val = self._resolve_param_from_context( param_name=param_name, operation_model=operation_model, call_args=call_args, ) if param_val is None and param_def.builtin is not None: param_val = self._resolve_param_as_builtin( builtin_name=param_def.builtin, builtins=customized_builtins, ) if param_val is not None: provider_params[param_name] = param_val return provider_params async def _get_customized_builtins( self, operation_model, call_args, request_context ): service_id = self._service_model.service_id.hyphenize() customized_builtins = copy.copy(self._builtins) # Handlers are expected to modify the builtins dict in place. await self._event_emitter.emit( 'before-endpoint-resolution.%s' % service_id, builtins=customized_builtins, model=operation_model, params=call_args, context=request_context, ) return customized_builtins