import inspect from botocore.discovery import ( EndpointDiscoveryHandler, EndpointDiscoveryManager, EndpointDiscoveryRefreshFailed, HTTPClientError, logger, ) class AioEndpointDiscoveryManager(EndpointDiscoveryManager): async def _refresh_current_endpoints(self, **kwargs): cache_key = self._create_cache_key(**kwargs) try: response = self._describe_endpoints(**kwargs) if inspect.isawaitable(response): response = await response endpoints = self._parse_endpoints(response) self._cache[cache_key] = endpoints self._failed_attempts.pop(cache_key, None) return endpoints except (ConnectionError, HTTPClientError): self._failed_attempts[cache_key] = self._time() + 60 return None async def describe_endpoint(self, **kwargs): operation = kwargs['Operation'] discovery_required = self._model.discovery_required_for(operation) if not self._always_discover and not discovery_required: # Discovery set to only run on required operations logger.debug( 'Optional discovery disabled. Skipping discovery for Operation: %s' % operation ) return None # Get the endpoint for the provided operation and identifiers cache_key = self._create_cache_key(**kwargs) endpoints = self._get_current_endpoints(cache_key) if endpoints: return self._select_endpoint(endpoints) # All known endpoints are stale recently_failed = self._recently_failed(cache_key) if not recently_failed: # We haven't failed to discover recently, go ahead and refresh endpoints = await self._refresh_current_endpoints(**kwargs) if endpoints: return self._select_endpoint(endpoints) # Discovery has failed recently, do our best to get an endpoint logger.debug('Endpoint Discovery has failed for: %s', kwargs) stale_entries = self._cache.get(cache_key, None) if stale_entries: # We have stale entries, use those while discovery is failing return self._select_endpoint(stale_entries) if discovery_required: # It looks strange to be checking recently_failed again but, # this informs us as to whether or not we tried to refresh earlier if recently_failed: # Discovery is required and we haven't already refreshed endpoints = await self._refresh_current_endpoints(**kwargs) if endpoints: return self._select_endpoint(endpoints) # No endpoints even refresh, raise hard error raise EndpointDiscoveryRefreshFailed() # Discovery is optional, just use the default endpoint for now return None class AioEndpointDiscoveryHandler(EndpointDiscoveryHandler): async def discover_endpoint(self, request, operation_name, **kwargs): ids = request.context.get('discovery', {}).get('identifiers') if ids is None: return endpoint = await self._manager.describe_endpoint( Operation=operation_name, Identifiers=ids ) if endpoint is None: logger.debug('Failed to discover and inject endpoint') return if not endpoint.startswith('http'): endpoint = 'https://' + endpoint logger.debug('Injecting discovered endpoint: %s', endpoint) request.url = endpoint