from botocore.retries.standard import ( DEFAULT_MAX_ATTEMPTS, ExponentialBackoff, MaxAttemptsChecker, ModeledRetryableChecker, OrRetryChecker, RetryEventAdapter, RetryHandler, RetryPolicy, RetryQuotaChecker, StandardRetryConditions, ThrottledRetryableChecker, TransientRetryableChecker, logger, quota, special, ) from .._helpers import async_any, resolve_awaitable from .special import AioRetryDDBChecksumError def register_retry_handler(client, max_attempts=DEFAULT_MAX_ATTEMPTS): retry_quota = RetryQuotaChecker(quota.RetryQuota()) service_id = client.meta.service_model.service_id service_event_name = service_id.hyphenize() client.meta.events.register( f'after-call.{service_event_name}', retry_quota.release_retry_quota ) handler = AioRetryHandler( retry_policy=AioRetryPolicy( retry_checker=AioStandardRetryConditions( max_attempts=max_attempts ), retry_backoff=ExponentialBackoff(), ), retry_event_adapter=RetryEventAdapter(), retry_quota=retry_quota, ) unique_id = 'retry-config-%s' % service_event_name client.meta.events.register( 'needs-retry.%s' % service_event_name, handler.needs_retry, unique_id=unique_id, ) return handler class AioRetryHandler(RetryHandler): async def needs_retry(self, **kwargs): """Connect as a handler to the needs-retry event.""" retry_delay = None context = self._retry_event_adapter.create_retry_context(**kwargs) if await self._retry_policy.should_retry(context): # Before we can retry we need to ensure we have sufficient # capacity in our retry quota. if self._retry_quota.acquire_retry_quota(context): retry_delay = self._retry_policy.compute_retry_delay(context) logger.debug( "Retry needed, retrying request after delay of: %s", retry_delay, ) else: logger.debug( "Retry needed but retry quota reached, " "not retrying request." ) else: logger.debug("Not retrying request.") self._retry_event_adapter.adapt_retry_response_from_context(context) return retry_delay class AioRetryPolicy(RetryPolicy): async def should_retry(self, context): return await resolve_awaitable( self._retry_checker.is_retryable(context) ) class AioStandardRetryConditions(StandardRetryConditions): def __init__( self, max_attempts=DEFAULT_MAX_ATTEMPTS ): # noqa: E501, lgtm [py/missing-call-to-init] # Note: This class is for convenience so you can have the # standard retry condition in a single class. self._max_attempts_checker = MaxAttemptsChecker(max_attempts) self._additional_checkers = AioOrRetryChecker( [ TransientRetryableChecker(), ThrottledRetryableChecker(), ModeledRetryableChecker(), AioOrRetryChecker( [ special.RetryIDPCommunicationError(), AioRetryDDBChecksumError(), ] ), ] ) async def is_retryable(self, context): return self._max_attempts_checker.is_retryable( context ) and await resolve_awaitable( self._additional_checkers.is_retryable(context) ) class AioOrRetryChecker(OrRetryChecker): async def is_retryable(self, context): return await async_any( checker.is_retryable(context) for checker in self._checkers )