Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support moto client wrappers #755

dazza-codes opened this issue Feb 13, 2020 · 64 comments

support moto client wrappers #755

dazza-codes opened this issue Feb 13, 2020 · 64 comments
enhancement New feature or request


Copy link

dazza-codes commented Feb 13, 2020

This bug arises in pytest with moto 1.3.14 and althoughrequirements-dev.txt has a dev-version, that fix is for something else, i.e. this is irrelevant:

# We need:

See also:

Below is an exception detail, when testing the following pytest fixtures:

from moto import mock_config
from moto import mock_batch

def aws_region():
    return "us-west-2"

def aio_aws_session(event_loop):
    with mock_config():
        aws_session = aiobotocore.get_session(loop=event_loop)
        yield aws_session

async def aio_aws_batch_client(aio_aws_session, aws_region):
    with mock_config():
        with mock_batch():
            async with aio_aws_session.create_client("batch", region_name=aws_region) as client:
                yield client

This raises a simple exception when trying to parse a moto response (below) and the source code for botocore seems to match (there is no AWSResponse.raw_headers attr). Maybe there are API version differences between aiobotocore, botocore and moto (at the time of posting this issue). In the project, the requirements pull in the aiobotocore deps for boto3/botocore and moto is the latest release:

$ python --version
Python 3.6.7
$ cat /etc/lsb-release 

The simple test function is:

async def test_async_aws_batch_client(aio_aws_batch_client):
    assert isinstance(aio_aws_batch_client, BaseClient)
    job_queues = await aio_aws_batch_client.describe_job_queues()
    # AttributeError: 'AWSResponse' object has no attribute 'raw_headers'

The moto job-queues should be an empty list (and it is, see pdb details below).

>       job_queues = await aio_aws_batch_client.describe_job_queues()

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/opt/conda/envs/python-notes/lib/python3.6/site-packages/aiobotocore/ in _make_api_call
    operation_model, request_dict, request_context)
/opt/conda/envs/python-notes/lib/python3.6/site-packages/aiobotocore/ in _make_request
/opt/conda/envs/python-notes/lib/python3.6/site-packages/aiobotocore/ in _send_request
    request, operation_model, context)
/opt/conda/envs/python-notes/lib/python3.6/site-packages/aiobotocore/ in _get_response
    request, operation_model)
/opt/conda/envs/python-notes/lib/python3.6/site-packages/aiobotocore/ in _do_get_response
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

http_response = <botocore.awsrequest.AWSResponse object at 0x7eff6ebdc6d8>, operation_model = OperationModel(name=DescribeJobQueues)

    async def convert_to_response_dict(http_response, operation_model):
        """Convert an HTTP response object to a request dict.
        This converts the requests library's HTTP response object to
        a dictionary.
        :type http_response: botocore.vendored.requests.model.Response
        :param http_response: The HTTP response from an AWS service request.
        :rtype: dict
        :return: A response dictionary which will contain the following keys:
            * headers (dict)
            * status_code (int)
            * body (string or file-like object)
        response_dict = {
            # botocore converts keys to str, so make sure that they are in
            # the expected case. See detailed discussion here:
            # aiohttp's CIMultiDict camel cases the headers :(
            'headers': HTTPHeaderDict(
                {k.decode('utf-8').lower(): v.decode('utf-8')
>                for k, v in http_response.raw_headers}),
            'status_code': http_response.status_code,
            'context': {
E       AttributeError: 'AWSResponse' object has no attribute 'raw_headers'

/opt/conda/envs/python-notes/lib/python3.6/site-packages/aiobotocore/ AttributeError
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> entering PDB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PDB post_mortem (IO-capturing turned off) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> /opt/conda/envs/python-notes/lib/python3.6/site-packages/aiobotocore/
-> for k, v in http_response.raw_headers}),

(Pdb) http_response
<botocore.awsrequest.AWSResponse object at 0x7fed5d7c62b0>
(Pdb) dir(http_response)
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_content', 'content', 'headers', 'raw', 'status_code', 'text', 'url']

(Pdb) http_response.headers
{'server': ''}
(Pdb) http_response.content
b'{"jobQueues": []}'
(Pdb) http_response.status_code
(Pdb) http_response.text
'{"jobQueues": []}'
(Pdb) http_response.url

(Pdb) http_response.raw
<moto.core.models.MockRawResponse object at 0x7eff6ed909e8>
(Pdb) dir(http_response.raw)
['__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__next__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '_checkClosed', '_checkReadable', '_checkSeekable', '_checkWritable', 'close', 'closed', 'detach', 'fileno', 'flush', 'getbuffer', 'getvalue', 'isatty', 'read', 'read1', 'readable', 'readinto', 'readinto1', 'readline', 'readlines', 'seek', 'seekable', 'stream', 'tell', 'truncate', 'writable', 'write', 'writelines']
(Pdb) http_response.raw.readlines()

Note that the moto response is an botocore.awsrequest.AWSResponse and not a

  • :type http_response: botocore.vendored.requests.model.Response
Copy link
Contributor Author

dazza-codes commented Feb 14, 2020

The job_queues request works OK when the function is modified as follows, but I have no idea whether this change breaks asyncio behavior because it removes a couple of await calls. The patched function below was applied directly to site-packages to see what happens and now the following test passes:

# Use dummy AWS credentials
AWS_REGION = "us-west-2"

def aws_credentials(monkeypatch):
    monkeypatch.setenv("AWS_ACCESS_KEY_ID", AWS_ACCESS_KEY_ID)
    monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing")
    monkeypatch.setenv("AWS_SESSION_TOKEN", "testing")

def aws_region():
    return AWS_REGION

def aio_aws_session(aws_credentials, aws_region, event_loop):
    session = aiobotocore.get_session(loop=event_loop)
    session.user_agent_name = "aiobotocore-pytest"

    assert session.get_default_client_config() is None
    aioconfig = aiobotocore.config.AioConfig(max_pool_connections=1, region_name=aws_region)
    assert session.get_default_client_config() == aioconfig

    # ensure fake credentials
    session.set_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

    # Try this debug logger, but it might be overkill

    # # Add custom response parser factory
    # aio_response_parser_factory = AioResponseParserFactory()
    # session.register_component("response_parser_factory", aio_response_parser_factory)

    yield session

async def aio_aws_batch_client(aio_aws_session):
    with mock_batch():
        async with aio_aws_session.create_client("batch") as client:
            yield client

async def test_aio_aws_batch_client(aio_aws_batch_client):
    assert isinstance(aio_aws_batch_client, BaseClient)
    job_queues = await aio_aws_batch_client.describe_job_queues()
    assert job_queues == {
        "ResponseMetadata": {
            "HTTPStatusCode": 200,
            "HTTPHeaders": {"server": ""},
            "RetryAttempts": 0,
        "jobQueues": [],

patched site-package code for aiobotocore.endpoint:

# aiobotocore/

from botocore.utils import lowercase_dict  # this a new import

async def convert_to_response_dict(http_response, operation_model):
    """Convert an HTTP response object to a request dict.

    This converts the requests library's HTTP response object to
    a dictionary.

    :type http_response: botocore.vendored.requests.model.Response
    :param http_response: The HTTP response from an AWS service request.

    :rtype: dict
    :return: A response dictionary which will contain the following keys:
        * headers (dict)
        * status_code (int)
        * body (string or file-like object)

    response_dict = {
        'headers': HTTPHeaderDict(lowercase_dict(http_response.headers)),
        'status_code': http_response.status_code,
        'context': {
    if response_dict['status_code'] >= 300:
        response_dict['body'] = http_response.content  # modified but removed `await`
    elif operation_model.has_event_stream_output:
        response_dict['body'] = http_response.raw
    elif operation_model.has_streaming_output:
        length = response_dict['headers'].get('content-length')
        response_dict['body'] = StreamingBody(http_response.raw, length)
        response_dict['body'] = http_response.content  # modified but removed `await`
    return response_dict

Copy link
Contributor Author

dazza-codes commented Feb 16, 2020

It's possible that moto registers something with the before-send event hook and the pytest function never hits the actual aiobotocore methods to send an aiohttp request.

Although this documentation is on boto3, the event system is also in botocore:

By editing site-packages as follows and then running pytest --pdb, it drops into the test call stack:

    async def _do_get_response(self, request, operation_model):
            logger.debug("Sending http request: %s", request)
            history_recorder.record('HTTP_REQUEST', {
                'method': request.method,
                'headers': request.headers,
                'streaming': operation_model.has_streaming_input,
                'url': request.url,
                'body': request.body
            service_id = operation_model.service_model.service_id.hyphenize()
            event_name = 'before-send.%s.%s' % (service_id,
            responses = self._event_emitter.emit(event_name, request=request)
            http_response = first_non_none_response(responses)

            assert False

            if http_response is None:
                http_response = await self._send(request)
(Pdb) http_response = first_non_none_response(responses)
(Pdb) http_response
<botocore.awsrequest.AWSResponse object at 0x7fafc23bcf98>

So this test never hits the await self._send(...) call. It never uses the aio_session of the AioEndpoint and so it does not use the ClientResponseProxy.

It's not clear whether the event emitter has any details about the registered callable that returns the http_response.

(Pdb) dir(self._event_emitter)
['__class__', '__copy__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_alias_event_name', '_emitter', '_event_aliases', '_replace_subsection', '_verify_accept_kwargs', '_verify_and_register', '_verify_is_callable', 'emit', 'emit_until_response', 'register', 'register_first', 'register_last', 'unregister']

If the hack to add the assert False is replaced with http_response = None, then it calls aiobotocore code (which starts to call live-aws services, with no proxies defined).

To add a proxy requires an AioSession test fixture, with something like

    session = aiobotocore.get_session(loop=event_loop)
    aioconfig = aiobotocore.config.AioConfig(max_pool_connections=1, region_name=aws_region)
    # TODO: test passing proxies into the aiobotocore.endpoint; the proxy must replace
    #       'https://{service}.{region_name}{url_path}'
    proxies = {
        'http': os.getenv("HTTP_PROXY", ""),
        'https': os.getenv("HTTPS_PROXY", ""),
    aioconfig.proxies = proxies

To get that working requires using moto-server or something. Somehow there must be an easy way to:

  • disable the callback that seems to be registered to before_send
  • enable a moto-server backend as a proxy with a pytest function scope

In the moto git repo:

$ git grep 'before-send'    * Switch from mocking requests to using before-send for AWS calls
moto/core/"before-send", botocore_stubber))

moto uses:

from botocore.handlers import BUILTIN_HANDLERS

botocore_stubber = BotocoreStubber()
BUILTIN_HANDLERS.append(("before-send", botocore_stubber))

where BotocoreStubber does the work of registering and calling callbacks for moto:

For example, an AWS batch client, with moto mock_batch applied, has the following event callbacks after the client has issued a client.describe_job_queues() method call:

>>> for evt, cb in
...     print(evt, cb)
provide-client-params.batch.DescribeJobQueues deque([])
before-parameter-build.batch.DescribeJobQueues deque([<function generate_idempotent_uuid at 0x7facc3585048>])
before-call.batch.DescribeJobQueues deque([<function inject_api_version_header_if_needed at 0x7facc35869d8>])
request-created.batch.DescribeJobQueues deque([<bound method RequestSigner.handler of <botocore.signers.RequestSigner object at 0x7facb826cda0>>])
choose-signer.batch.DescribeJobQueues deque([<function set_operation_specific_signer at 0x7facc3581ea0>])
before-sign.batch.DescribeJobQueues deque([])
before-send.batch.DescribeJobQueues deque([<moto.core.models.BotocoreStubber object at 0x7facc3267b00>])
response-received.batch.DescribeJobQueues deque([])
needs-retry.batch.DescribeJobQueues deque([<botocore.retryhandler.RetryHandler object at 0x7facba787470>])
after-call.batch.DescribeJobQueues deque([])
getattr.batch.get_credentials deque([])
getattr.batch.credentials deque([])

Note esp. the moto callback in:

before-send.batch.DescribeJobQueues deque([<moto.core.models.BotocoreStubber object at 0x7facc3267b00>])

Copy link

spulec commented Feb 19, 2020

Note that the moto response is an botocore.awsrequest.AWSResponse and not a

:type http_response: botocore.vendored.requests.model.Response

My understanding is that botocore is using the former (what Moto uses) going forward and deprecating the use of requests.

Copy link
Contributor Author

dazza-codes commented Feb 19, 2020

It's possible to detect when moto mocks are active, e.g.

def has_moto_mocks(client, event_name):
    # moto registers mock callbacks with the `before-send` event-name, using
    # specific callbacks for the methods that are generated dynamically. By
    # checking that the first callback is a BotocoreStubber, this verifies
    # that moto mocks are intercepting client requests.
    callbacks =[event_name]
    if len(callbacks) > 0:
        stub = callbacks[0]
        assert isinstance(stub, BotocoreStubber)
        return stub.enabled
    return False

I don't know if it's possible to simply disable it with stub.enabled = False. The botocore client does not expose any public API to iterate on the event callbacks, so this ^^ has to resort to sneaking around in the private API. Since that ^^ function treats the data as read-only, it's nearly OK, but if something were to start modifications of the callbacks, that could get very tricky.

When I find some time to craft a full PR on this, there are better ways to work around this using the MotoService in the test suite of aiobotocore. For example, these snippets are a clue to what seems to be working well, thanks to MotoService:

# assumes python >= py3.6 (async generators are OK)

async def aio_aws_s3_server():
    async with MotoService("s3") as svc:
        yield svc.endpoint_url

def aio_aws_session(aws_credentials, aws_region, event_loop):
    # pytest-asyncio provides and manages the `event_loop`
    session = aiobotocore.get_session(loop=event_loop)
    session.user_agent_name = "aiomoto"
    assert session.get_default_client_config() is None
    aioconfig = aiobotocore.config.AioConfig(max_pool_connections=1, region_name=aws_region)
    # forget about adding any proxies for moto.server, that doesn't work
    assert session.get_default_client_config() == aioconfig
    session.set_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
    yield session

async def aio_aws_s3_client(aio_aws_session, aio_aws_s3_server):
    # aio_aws_s3_server is just a string URI for the new `moto.server` for this client
    async with aio_aws_session.create_client("s3", endpoint_url=aio_aws_s3_server) as client:
        yield client

async def test_aio_aws_s3_client(aio_aws_s3_client):
    client = aio_aws_s3_client
    assert isinstance(client, AioBaseClient)
    assert client.meta.config.region_name == AWS_REGION
    assert client.meta.region_name == AWS_REGION

    resp = await client.list_buckets()
    assert response_success(resp)
    assert resp.get("Buckets") == []

    # the event-name mocks are dynamically generated after calling the method;
    # for aio-clients, they should be disabled for aiohttp to hit moto.server.
    assert not has_moto_mocks(client, "before-send.s3.ListBuckets")

The only minor drawback is that MotoService is designed to spin-up and tear-down a new moto.server for every test. It wraps it all nicely in a thread with some async entry/exit points. It might be useful to have a session-scope moto.server with options to just reset it for each test (haven't figured out what that looks like).

Copy link

spulec commented Feb 20, 2020

In case helpful, moto does have a reset API:

Copy link

seems to work: #773

@thehesiod thehesiod changed the title AttributeError: 'AWSResponse' object has no attribute 'raw_headers' support in-proc moto Feb 23, 2020
@thehesiod thehesiod added enhancement New feature or request and removed bug Something isn't working labels Feb 23, 2020
Copy link

thehesiod commented Feb 23, 2020

given #773 works I see this now as supporting in-proc moto, which we never tackled. This would be a nice thing because it would allow for tests which coordinate between multiple services. This is going to be a big project because moto does not support aiobotocore....maybe not so big though because we don't test that many services :)

@thehesiod thehesiod changed the title support in-proc moto support moto client wrappers Feb 23, 2020
Copy link

ugh, forgot again this is my in-proc version. So we basically don't support the moto wrapper based tests, but that's not a big deal in my opinion as we expose fixtures for each client (see my PR above). I'm going to close this given I don't see any benefits of supporting the moto wrappers. Feel free to re-open if I'm missing something.

Copy link

Just hit the same issue. I am replacing something written using botocore and set of monkeypatches with aiobotocore. For testing my project I thought of using moto in mocking (default) configuration. However got hit by this.
I am looking into botocore itself and see that their http_session (default one at least) wraps urllib response in AWSResponse (as being noted above)

My thinking is that even if you guys decided to not support testing with moto, you should conform to return type of whatever stock botocore returns now in case there are some client code which registers custom handler for some reason which as result produces botocore.awsrequest.AWSResponse object. That case will make aiobotocore incompatible as replacement for botocore.

Copy link

Going to re-open for more investigation

@thehesiod thehesiod reopened this Feb 29, 2020
Copy link

piyush0 commented Jul 14, 2020

Hi just wanted to check in if there's any progress on this ?

Copy link

haven't had a chance yet to look into this, been swamped at work

Copy link

@kkopachev totally agree

Copy link

Adding my voice here. Working on making s3fs async, and would like to test via moto.

Copy link

is it is most definitely possible, I do it extensively at work, however it requires you use the server model instead of wrapper which requires a bit more work

Copy link

I honestly didn't know about that - works just fine!

Copy link

This is how I fixed it for moto==4.1.15:

class MockedAWSResponse(AWSResponse):
    raw_headers = {}  # type: ignore

    async def read(self):  # type: ignore
        return self.text
def patch_async_botocore_moto():
    with ExitStack() as stack: 
        target_botocore = "botocore.awsrequest.AWSResponse"
        patch_botocore = patch(target_botocore, MockedAWSResponse)
        target_moto = "moto.core.botocore_stubber.AWSResponse"
        patch_moto = patch(target_moto, MockedAWSResponse)

Copy link

Apakottur commented Aug 25, 2023

Here's the 2023 version of my patch, passes strict mypy and works with a bunch of functions from S3, SQS and CloudFront.
moto = "4.2.0"
aiobotocore = "2.6.0"
types-aiobotocore = "2.6.0"

from import Awaitable, Callable, Iterator
from dataclasses import dataclass
from typing import TypeVar

import aiobotocore
import aiobotocore.endpoint
import botocore

T = TypeVar("T")
R = TypeVar("R")

class _PatchedAWSReponseContent:
    """Patched version of `botocore.awsrequest.AWSResponse.content`"""

    content: bytes | Awaitable[bytes]

    def __await__(self) -> Iterator[bytes]:
        async def _generate_async() -> bytes:
            if isinstance(self.content, Awaitable):
                return await self.content
                return self.content

        return _generate_async().__await__()

    def decode(self, encoding: str) -> str:
        assert isinstance(self.content, bytes)
        return self.content.decode(encoding)

class PatchedAWSResponse:
    """Patched version of `botocore.awsrequest.AWSResponse`"""

    def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
        self._response = response
        self.status_code = response.status_code
        self.content = _PatchedAWSReponseContent(response.content)
        self.raw = response.raw
        if not hasattr(self.raw, "raw_headers"):
            self.raw.raw_headers = {}

def _factory(
    original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
    async def patched_convert_to_response_dict(http_response: botocore.awsrequest.AWSResponse, operation_model: T) -> R:
        return await original(PatchedAWSResponse(http_response), operation_model)  # type: ignore[arg-type]

    return patched_convert_to_response_dict

aiobotocore.endpoint.convert_to_response_dict = _factory(aiobotocore.endpoint.convert_to_response_dict)  # type: ignore[assignment]

Copy link

takeda commented Aug 30, 2023

Right now the unit tests via Stubber are broken: #939 as well as moto (which was a great surprise when I started work on migrating unit tests from one to the other).

I'm also frustrated that Amazon doesn't provide async support natively. It would be much easier for them to add it to boto3 than for @thehesiod to patch the package up. There's a ticket that was opened for 8 years now: boto/botocore#458. It's not like AWS service is free to use. Stuff like this should be provided by Amazon not volunteers.

@Apakottur I'm glad that it looks like there's a workaround for moto at least, so far it looks like resolved my issue.

Copy link

lokucrazy commented Oct 22, 2023

Hi I was just wondering if there was gonna be a fix for this. @juftin fix helped me get past it, but I'm not sure I understand if this is something that needs to be fixed or just a weird quirk of everything.

Copy link

Here's the 2023 version of my patch, passes strict mypy and works with a bunch of functions from S3, SQS and CloudFront. moto = "4.2.0" aiobotocore = "2.6.0" types-aiobotocore = "2.6.0"

from import Awaitable, Callable, Iterator
from dataclasses import dataclass
from typing import TypeVar

import aiobotocore
import aiobotocore.endpoint
import botocore

T = TypeVar("T")
R = TypeVar("R")

class _PatchedAWSReponseContent:
    """Patched version of `botocore.awsrequest.AWSResponse.content`"""

    content: bytes | Awaitable[bytes]

    def __await__(self) -> Iterator[bytes]:
        async def _generate_async() -> bytes:
            if isinstance(self.content, Awaitable):
                return await self.content
                return self.content

        return _generate_async().__await__()

    def decode(self, encoding: str) -> str:
        assert isinstance(self.content, bytes)
        return self.content.decode(encoding)

class PatchedAWSResponse:
    """Patched version of `botocore.awsrequest.AWSResponse`"""

    def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
        self._response = response
        self.status_code = response.status_code
        self.content = _PatchedAWSReponseContent(response.content)
        self.raw = response.raw
        if not hasattr(self.raw, "raw_headers"):
            self.raw.raw_headers = {}

def _factory(
    original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
    async def patched_convert_to_response_dict(http_response: botocore.awsrequest.AWSResponse, operation_model: T) -> R:
        return await original(PatchedAWSResponse(http_response), operation_model)  # type: ignore[arg-type]

    return patched_convert_to_response_dict

aiobotocore.endpoint.convert_to_response_dict = _factory(aiobotocore.endpoint.convert_to_response_dict)  # type: ignore[assignment]

This is great @Apakottur, I needed a few slight modifications to work with my tests (mainly had to also patch the response object used in the RetryContext). Sharing below.

This has been tested with calls to DynamoDB and SQS, with the following package versions:
moto == "4.2.5"
aiobotocore == "2.6.0"

from import Awaitable, Callable, Iterator
from dataclasses import dataclass
from typing import TypeVar

import aiobotocore
import aiobotocore.endpoint
import botocore
import botocore.retries.standard

T = TypeVar("T")
R = TypeVar("R")

class _PatchedAWSReponseContent:
    """Patched version of `botocore.awsrequest.AWSResponse.content`"""

    content: bytes | Awaitable[bytes]

    def __await__(self) -> Iterator[bytes]:
        async def _generate_async() -> bytes:
            if isinstance(self.content, Awaitable):
                return await self.content
                return self.content

        return _generate_async().__await__()

    def decode(self, encoding: str) -> str:
        assert isinstance(self.content, bytes)
        return self.content.decode(encoding)

class PatchedAWSResponse:
    """Patched version of `botocore.awsrequest.AWSResponse`"""

      def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
          self._response = response
          self.status_code = response.status_code
          self.headers = response.headers
          self.url = response.url
          self.content = _PatchedAWSReponseContent(response.content)
          self.raw = response.raw
          if not hasattr(self.raw, "raw_headers"):
              self.raw.raw_headers = {}

class PatchedRetryContext(botocore.retries.standard.RetryContext):
    """Patched version of `botocore.retries.standard.RetryContext`"""

    def __init__(self, *args, **kwargs):
        if kwargs.get("http_response"):
            kwargs["http_response"] = PatchedAWSResponse(kwargs["http_response"])
        super().__init__(*args, **kwargs)

def _factory(
    original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
    async def patched_convert_to_response_dict(http_response: botocore.awsrequest.AWSResponse, operation_model: T) -> R:
        return await original(PatchedAWSResponse(http_response), operation_model)  # type: ignore[arg-type]

    return patched_convert_to_response_dict

aiobotocore.endpoint.convert_to_response_dict = _factory(aiobotocore.endpoint.convert_to_response_dict)  # type: ignore[assignment]
botocore.retries.standard.RetryContext = PatchedRetryContext

Copy link

raw_headers isn't technically supported by aws standard, so probably it should be removed from aiobotocore anyway

Copy link

thehesiod commented Oct 24, 2023

raw headers is necessary as aiohttp munges the response header names which breaks the AWS API calls. I expressed to the aiohttp group they should stop doing this but they decided instead to expose this attribute :(

Copy link

Here's the 2023 version of my patch, passes strict mypy and works with a bunch of functions from S3, SQS and CloudFront. moto = "4.2.0" aiobotocore = "2.6.0" types-aiobotocore = "2.6.0"

from import Awaitable, Callable, Iterator

This is great @Apakottur, I needed a few slight modifications to work with my tests (mainly had to also patch the response object used in the RetryContext). Sharing below.

This has been tested with calls to DynamoDB and SQS, with the following package versions: moto == "4.2.5" aiobotocore == "2.6.0"

from import Awaitable, Callable, Iterator

Thanks for this!

Wrapped up with the mock_aws() context manager for anyone else using @Apakottur, @msinto93 solution with pytest:

# Attempt to import optional dependencies
from import Awaitable, Callable, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Generator, TypeVar

import aiobotocore
import aiobotocore.endpoint
import botocore
import botocore.retries.standard

import pytest
from moto import mock_aws

T = TypeVar("T")
R = TypeVar("R")

class _PatchedAWSReponseContent:
    """Patched version of `botocore.awsrequest.AWSResponse.content`"""

    content: bytes | Awaitable[bytes]

    def __await__(self) -> Iterator[bytes]:
        async def _generate_async() -> bytes:
            if isinstance(self.content, Awaitable):
                return await self.content
                return self.content

        return _generate_async().__await__()

    def decode(self, encoding: str) -> str:
        assert isinstance(self.content, bytes)
        return self.content.decode(encoding)

class PatchedAWSResponse:
    """Patched version of `botocore.awsrequest.AWSResponse`"""

    def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
        self._response = response
        self.status_code = response.status_code
        self.headers = response.headers
        self.url = response.url
        self.content = _PatchedAWSReponseContent(response.content)
        self.raw = response.raw
        if not hasattr(self.raw, "raw_headers"):
            self.raw.raw_headers = {}

class PatchedRetryContext(botocore.retries.standard.RetryContext):
    """Patched version of `botocore.retries.standard.RetryContext`"""

    def __init__(self, *args, **kwargs):
        if kwargs.get("http_response"):
            kwargs["http_response"] = PatchedAWSResponse(kwargs["http_response"])
        super().__init__(*args, **kwargs)

def _factory(
    original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]],
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
    async def patched_convert_to_response_dict(http_response: botocore.awsrequest.AWSResponse, operation_model: T) -> R:
        return await original(PatchedAWSResponse(http_response), operation_model)  # type: ignore[arg-type]

    return patched_convert_to_response_dict

def mock_aio_aws(monkeypatch: pytest.MonkeyPatch) -> Generator[None, None, None]:

    # Patch aiobotocore and botocore
        aiobotocore.endpoint, "convert_to_response_dict", _factory(aiobotocore.endpoint.convert_to_response_dict)
    monkeypatch.setattr(botocore.retries.standard, "RetryContext", PatchedRetryContext)
    with mock_aws():

Which can be used like:

import boto3
import pytest
from botocore.exceptions import ClientError

from .test_utils import mock_aio_aws

def mock_aws(monkeypatch):
    with mock_aio_aws(monkeypatch):

Copy link

Just building on whats before

I needed to remove the self.headers["x-amz-crc32"] = None
and I rewrote the context manager so it can be a drop in replacement so now you can just do from moto_patch import mock_aio_aws as mock_aws without passing around monkeypatch

# Attempt to import optional dependencies
from import Awaitable, Callable, Generator, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TypeVar
from unittest.mock import patch

import aiobotocore
import aiobotocore.endpoint
import botocore
import botocore.retries.standard
from moto import mock_aws

T = TypeVar("T")
R = TypeVar("R")

class _PatchedAWSReponseContent:
    """Patched version of `botocore.awsrequest.AWSResponse.content`"""

    content: bytes | Awaitable[bytes]

    def __await__(self) -> Iterator[bytes]:
        async def _generate_async() -> bytes:
            if isinstance(self.content, Awaitable):
                return await self.content
            return self.content

        return _generate_async().__await__()

    def decode(self, encoding: str) -> str:
        assert isinstance(self.content, bytes)
        return self.content.decode(encoding)

class PatchedAWSResponse:
    """Patched version of `botocore.awsrequest.AWSResponse`"""

    def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
        self._response = response
        self.status_code = response.status_code

        #  '317822581'{'server': '', 'date': 'Thu, 29 Aug 2024 17:10:05 GMT', 'x-amzn-requestid': 'Rlz2JkR24Tzbh5GEFyIBCKempp5HjXw6uh17z5J5EtoGhW4Udr97', 'x-amz-crc32': '317822581'}
        self.headers = response.headers
        self.headers["x-amz-crc32"] = None
        self.url = response.url
        self.content = _PatchedAWSReponseContent(response.content)
        self._content = self.content
        self.raw = response.raw
        self.text = response.text
        if not hasattr(self.raw, "raw_headers"):
            self.raw.raw_headers = {}

class PatchedRetryContext(botocore.retries.standard.RetryContext):
    """Patched version of `botocore.retries.standard.RetryContext`"""

    def __init__(self, *args, **kwargs):
        if kwargs.get("http_response"):
            kwargs["http_response"] = PatchedAWSResponse(kwargs["http_response"])
        super().__init__(*args, **kwargs)

def _factory(
    original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]],
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
    async def patched_convert_to_response_dict(http_response: botocore.awsrequest.AWSResponse, operation_model: T) -> R:
        return await original(PatchedAWSResponse(http_response), operation_model)  # type: ignore[arg-type]

    return patched_convert_to_response_dict

def mock_aio_aws() -> Generator[None, None, None]:
    with (
            "aiobotocore.endpoint.convert_to_response_dict", new=_factory(aiobotocore.endpoint.convert_to_response_dict)
        patch("botocore.retries.standard.RetryContext", new=PatchedRetryContext),

Copy link

rdbisme commented Sep 10, 2024

Can we revisit this to make them part of, I don't know, moto? It's a bit disappointing that thousands of people need to backport this to their codebase.

I'm pretty sure this is 100% required when using s3fs.

Copy link

olly-writes-code commented Sep 13, 2024

I was trying to use @harvey251's approach for fsspec but I can't get it play nice :/


from import Awaitable, Callable, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Generator, TypeVar

import aiobotocore
import aiobotocore.endpoint
import botocore
import botocore.retries.standard
from moto import mock_aws

import pytest

T = TypeVar("T")
R = TypeVar("R")

class _PatchedAWSReponseContent:
    """Patched version of `botocore.awsrequest.AWSResponse.content`"""

    content: bytes | Awaitable[bytes]

    def __await__(self) -> Iterator[bytes]:
        async def _generate_async() -> bytes:
            if isinstance(self.content, Awaitable):
                return await self.content
                return self.content

        return _generate_async().__await__()

    def decode(self, encoding: str) -> str:
        assert isinstance(self.content, bytes)
        return self.content.decode(encoding)

class PatchedAWSResponse:
    """Patched version of `botocore.awsrequest.AWSResponse`"""

    def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
        self._response = response
        self.status_code = response.status_code
        self.headers = response.headers
        self.headers["x-amz-crc32"] = None
        self.url = response.url
        self.content = _PatchedAWSReponseContent(response.content)
        self.raw = response.raw
        if not hasattr(self.raw, "raw_headers"):
            self.raw.raw_headers = {}

class PatchedRetryContext(botocore.retries.standard.RetryContext):
    """Patched version of `botocore.retries.standard.RetryContext`"""

    def __init__(self, *args, **kwargs):
        if kwargs.get("http_response"):
            kwargs["http_response"] = PatchedAWSResponse(kwargs["http_response"])
        super().__init__(*args, **kwargs)

def _factory(
    original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]],
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
    async def patched_convert_to_response_dict(
        http_response: botocore.awsrequest.AWSResponse, operation_model: T
    ) -> R:
        return await original(PatchedAWSResponse(http_response), operation_model)  # type: ignore[arg-type]

    return patched_convert_to_response_dict

def mock_aio_aws(monkeypatch: pytest.MonkeyPatch) -> Generator[None, None, None]:
    # Patch aiobotocore and botocore
    monkeypatch.setattr(botocore.retries.standard, "RetryContext", PatchedRetryContext)
    with mock_aws():

Test that fails

import boto3
import fsspec

import pytest
from moto_patch import mock_aio_aws

def mock_aws(monkeypatch):
    with mock_aio_aws(monkeypatch):

def test_fsspec_s3(mock_aws):
    # Create a mock S3 bucket
    conn = boto3.client("s3", region_name="us-east-1")

    # Write data to a file in the mock S3 bucket using fsspec
    with"s3://test-bucket/test.txt", "w") as f:
        f.write("Hello, mocked S3!")

    # Read data from the mock S3 bucket using fsspec
    with"s3://test-bucket/test.txt", "r") as f:
        content =

    assert content == "Hello, mocked S3!"


self = <s3fs.core.S3FileSystem object at 0x149f914d0>, path = 'test-bucket/test.txt', bucket = 'test-bucket'
key = 'test.txt', refresh = False, version_id = None

    async def _info(self, path, bucket=None, key=None, refresh=False, version_id=None):
        path = self._strip_protocol(path)
        bucket, key, path_version_id = self.split_path(path)
        fullpath = "/".join((bucket, key))

        if version_id is not None:
            if not self.version_aware:
                raise ValueError(
                    "version_id cannot be specified if the "
                    "filesystem is not version aware"
        if path in ["/", ""]:
            return {"name": path, "size": 0, "type": "directory"}
        version_id = _coalesce_version_id(path_version_id, version_id)
        if not refresh:
            out = self._ls_from_cache(fullpath)
            if out is not None:
                if self.version_aware and version_id is not None:
                    # If cached info does not match requested version_id,
                    # fallback to calling head_object
                    out = [
                        for o in out
                        if o["name"] == fullpath and version_id == o.get("VersionId")
                    if out:
                        return out[0]
                    out = [o for o in out if o["name"] == fullpath]
                    if out:
                        return out[0]
                    return {"name": path, "size": 0, "type": "directory"}
        if key:
                out = await self._call_s3(
                return {
                    "ETag": out.get("ETag", ""),
                    "LastModified": out.get("LastModified", ""),
>                   "size": out["ContentLength"],
                    "name": "/".join([bucket, key]),
                    "type": "file",
                    "StorageClass": out.get("StorageClass", "STANDARD"),
                    "VersionId": out.get("VersionId"),
                    "ContentType": out.get("ContentType"),
E               KeyError: 'ContentLength'

Copy link

Love this thread, great example of devs around the world uniting for years to tackle a tricky integration 💪

Here's the 2025 version of my patch, passes strict mypy+ruff and works with a bunch of functions from S3, SQS, SFN, SSM and CloudFront.
moto = "5.0.27"
aiobotocore = "2.18.0"
types-aiobotocore = "2.18.0"

from import Awaitable, Callable, Iterator
from dataclasses import dataclass
from typing import Any, TypeVar

import aiobotocore
import aiobotocore.endpoint
import aiobotocore.httpchecksum
import aiobotocore.response
import botocore.awsrequest
import botocore.httpchecksum

T = TypeVar("T")
R = TypeVar("R")

class _PatchedAWSResponseContent:
    """Patched version of `botocore.awsrequest.AWSResponse.content`"""

    content: bytes | Awaitable[bytes]

    def __await__(self) -> Iterator[bytes]:
        async def _generate_async() -> bytes:
            if isinstance(self.content, Awaitable):
                return await self.content
                return self.content

        return _generate_async().__await__()

    def decode(self, encoding: str) -> str:
        assert isinstance(self.content, bytes)
        return self.content.decode(encoding)

class PatchedAWSResponse:
    """Patched version of `botocore.awsrequest.AWSResponse`"""

    def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
        self._response = response
        self.status_code = response.status_code
        self.content = _PatchedAWSResponseContent(response.content)
        self.raw = response.raw
        if not hasattr(self.raw, "raw_headers"):
            self.raw.raw_headers = {}

def _factory(
    original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]],
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
    """Factory for patching `aiobotocore.endpoint.convert_to_response_dict`"""

    async def patched_convert_to_response_dict(http_response: botocore.awsrequest.AWSResponse, operation_model: T) -> R:
        return await original(PatchedAWSResponse(http_response), operation_model)  # type: ignore[arg-type]

    return patched_convert_to_response_dict

aiobotocore.endpoint.convert_to_response_dict = _factory(aiobotocore.endpoint.convert_to_response_dict)  # type: ignore[assignment]

async def _patched_read(self: aiobotocore.response.StreamingBody, _amt: Any = None) -> Any:
    """Patched version of ``"""
    return = _patched_read  # type: ignore[assignment]

# Remove the async version of the function, which is not compatible with Moto.
del aiobotocore.httpchecksum.AioAwsChunkedWrapper._make_chunk  # noqa: SLF001

Copy link

Love this thread, great example of devs around the world uniting for years to tackle a tricky integration 💪

Here's the 2025 version of my patch, passes strict mypy+ruff and works with a bunch of functions from S3, SQS, SFN, SSM and CloudFront. moto = "5.0.27" aiobotocore = "2.18.0" types-aiobotocore = "2.18.0"

Thanks for this!

I changed the part of the raw headers (which you set to empty dict), since I needed some headers for the s3 responses (like content-type and Etag) to work:

if not hasattr(self.raw, "raw_headers"):
    self.raw.raw_headers = []
for header in response.headers:
    self.raw.raw_headers.append((header.encode(), response.headers[header].encode()))

Copy link

Love this thread, great example of devs around the world uniting for years to tackle a tricky integration 💪

Here's the 2025 version of my patch, passes strict mypy+ruff and works with a bunch of functions from S3, SQS, SFN, SSM and CloudFront. moto = "5.0.27" aiobotocore = "2.18.0" types-aiobotocore = "2.18.0"

Newest aiobotocore version breaks again, here's my updated code -

@aio-libs aio-libs locked and limited conversation to collaborators Feb 24, 2025
@jakob-keller jakob-keller converted this issue into discussion #1300 Feb 24, 2025

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

enhancement New feature or request
None yet

Successfully merging a pull request may close this issue.