Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

amqp over websocket support #197

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
11 changes: 4 additions & 7 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ jobs:
tests:
runs-on: ubuntu-latest

services:
rabbitmq:
image: docker://mosquito/aiormq-rabbitmq
ports:
- 5672:5672
- 5671:5671

strategy:
fail-fast: false

Expand All @@ -107,6 +100,8 @@ jobs:
key: venv-${{ runner.os }}-${{ github.job }}-${{ github.ref }}-${{ matrix.python }}
- run: python -m pip install poetry
- run: poetry install --with=uvloop
- name:
run: docker compose up -d && sleep 10
- name: pytest
run: >-
poetry run pytest \
Expand All @@ -123,6 +118,8 @@ jobs:
COVERALLS_PARALLEL: 'true'
COVERALLS_SERVICE_NAME: github
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Cleanup
run: docker-compose down && docker-compose rm --force

finish:
needs:
Expand Down
10 changes: 2 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,8 @@ RABBITMQ_CONTAINER_NAME:=aiormq_rabbitmq
RABBITMQ_IMAGE:=mosquito/aiormq-rabbitmq

rabbitmq:
docker kill $(RABBITMQ_CONTAINER_NAME) || true
docker run --pull=always --rm -d \
--name $(RABBITMQ_CONTAINER_NAME) \
-p 5671:5671 \
-p 5672:5672 \
-p 15671:15671 \
-p 15672:15672 \
$(RABBITMQ_IMAGE)
docker compose down
docker compose up -d

upload:
poetry publish --build --skip-existing
Expand Down
13 changes: 9 additions & 4 deletions aiormq/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from .tools import Countdown, censor_url

from .websocket_transport import open_websocket_connection

# noinspection PyUnresolvedReferences
try:
Expand Down Expand Up @@ -452,10 +453,14 @@ async def connect(

log.debug("Connecting to: %s", self)
try:
reader, writer = await asyncio.open_connection(
self.url.host, self.url.port, ssl=ssl_context,
**self.__create_connection_kwargs,
)

if self.url.scheme == "ws" or self.url.scheme == "wss":
reader, writer = await open_websocket_connection(self.url)
else:
reader, writer = await asyncio.open_connection(
self.url.host, self.url.port, ssl=ssl_context,
**self.__create_connection_kwargs,
)

frame_receiver = FrameReceiver(reader)
except OSError as e:
Expand Down
136 changes: 136 additions & 0 deletions aiormq/websocket_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from asyncio import (streams, transports, get_event_loop, events,
AbstractEventLoop, StreamReaderProtocol, BaseProtocol)
import asyncio
from typing import Dict, Optional, Any, Tuple, Callable

import aiohttp
from yarl import URL

import logging

from aiormq.abc import URLorStr
from aiohttp.client_ws import ClientWebSocketResponse

logging.basicConfig(level=logging.DEBUG)

# Create a custom logger
logger = logging.getLogger(__name__)

# Create handlers
c_handler = logging.StreamHandler()
c_handler.setLevel(logging.WARNING)

# Create formatters and add it to handlers
c_format = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
c_handler.setFormatter(c_format)

# Add handlers to the logger
logger.addHandler(c_handler)


class WebsocketTransport(transports.Transport):

def __init__(self,
loop: AbstractEventLoop,
protocol: StreamReaderProtocol,
url: URLorStr,
extra: Optional[Dict] = None) -> None:

super().__init__(extra)
self.url = url

self._loop = loop
self._protocol = protocol
self._closing = False # Set when close() or write_eof() called.
self._paused = False

self.task = self._loop.create_task(self.main_loop())

self.write_queue: asyncio.Queue = asyncio.Queue()
self.read_queue: asyncio.Queue = asyncio.Queue()

async def sender(self, ws: ClientWebSocketResponse) -> None:
while True:
data = await self.write_queue.get()
await ws.send_bytes(data)

async def receiver(self, ws: ClientWebSocketResponse) -> None:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.BINARY:
self._protocol.data_received(msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
break

self._protocol.eof_received()

async def main_loop(self) -> None:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(self.url) as ws:
await asyncio.gather(
self.sender(ws),
self.receiver(ws)
)

def get_protocol(self) -> BaseProtocol:
return self._protocol

def set_protocol(self, # type: ignore[override]
protocol: StreamReaderProtocol) -> None:
self._protocol = protocol

def is_closing(self) -> bool:
return self._closing

def write(self, data: Any) -> None:
self.write_queue.put_nowait(data)

def is_reading(self) -> bool:
return not self._paused and not self._closing

def resume_reading(self) -> None:
if self._closing or not self._paused:
return
self._paused = False
if self._loop.get_debug():
logger.debug("%r resumes reading", self)

def close(self) -> None:
self._closing = True
self.task.cancel()
self._protocol.connection_lost(None)
# self._protocol._closed.set_result(None)


async def create_websocket_connection(protocol_factory: Callable,
url: URLorStr,
**kwargs: Dict
) -> Tuple[WebsocketTransport,
StreamReaderProtocol]:
loop = get_event_loop()
protocol = protocol_factory()
transport = WebsocketTransport(loop, protocol, url, **kwargs)
return transport, protocol


_DEFAULT_LIMIT = 2 ** 16 # 64 KiB


async def open_websocket_connection(url: URL,
limit: int = _DEFAULT_LIMIT,
**kwargs: Dict
) -> Tuple[streams.StreamReader,
streams.StreamWriter]:
loop = events.get_running_loop()
reader = streams.StreamReader(limit=limit, loop=loop)
protocol = streams.StreamReaderProtocol(reader, loop=loop)

def factory() -> StreamReaderProtocol:
return protocol

transport, _ = await create_websocket_connection(
factory,
url,
**kwargs
)
writer = streams.StreamWriter(transport, protocol, reader, loop)
return reader, writer
48 changes: 48 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Docker Compose description of the combined application.
#
# 'docker-compose up' will run this.

# This section describes the various containers (services).
services:

rabbitmq:
# There is a prebuilt RabbitMQ image; see
# https://hub.docker.com/_/rabbitmq/ for details.
# This variant is built on Alpine Linux (it's smaller) and includes
# the management UI.
image: 'mosquito/aiormq-rabbitmq'

# These ports are exposed on the host; 'hostport:containerport'.
# You could connect to this server from outside with the *host's*
# DNS name or IP address and port 5672 (the left-hand side of the
# colon).
ports:
# The standard AMQP protocol port
- '5671:5671'
- '5672:5672'
# HTTP management UI
- '15671:15671'
- '15672:15672'


# Run this container on a private network for this application.
# This is necessary for magic Docker DNS to work: other containers
# also running on this network will see a host name "rabbitmq"
# (the name of this section) and the internal port 5672, even though
# that's not explicitly published above.
networks:
- network

websocket-tcp-relay:
image: 'cloudamqp/websocket-tcp-relay:latest'
ports:
# ws socket port
- '15670:15670'
command: ["/usr/bin/websocket-tcp-relay", "--bind=0.0.0.0","--upstream=tcp://rabbitmq:5672"]
networks:
- network

networks:
# Declare our private network. We must declare one for the magic
# Docker DNS to work, but otherwise its default settings are fine.
network: {}
Loading