Skip to content

Commit

Permalink
fix!: realtime v2 (#178)
Browse files Browse the repository at this point in the history
* add push

* add tests

* fix: realtime listeners

* send broadcast using _push method

* change channels storage

* use _on method

* wip

* fix broadcast callback trigger

* use kargs

* fix postgres_changes and add send buffer

* wip presence

* add push buffer

* fix subscribe callback

* fix presence and add tests

* run tests on ci

* fix action name

* store supabase env vars

* fix issue with format

* skip coverage

* use str enum instead of StrEnum

* fix issues with older python versions

* fix unsubscribe and rejoin logic

* fix format

* use *args

* fire and forget unsubscribe call

* add method for removing channel and update README

* update README

* add RealtimeChannelOptions type

* remove FIXME

* make some types and attributes private

* move push to separate file

* add tests for postgres changes

* remove need of *args from callbacks

* fix rls policy

* use async events and semaphores when testing

* fix migration

* cancel tasks at test end

* wip

* reset local db instance onCI

* wait until supabase is ready:

* increase timeout on tests

* rename Socket to RealtimeClient

* run tests twice

* fix format

* Generate sync bindings

* poetry lock

* format

* add filter to postgres_changes

* send filter only when non-None

* add example which uses filter

* update sync bindings

* code review fixes

* ci

* poetry lock
  • Loading branch information
grdsdev authored Aug 16, 2024
1 parent 01be00d commit 981a5d0
Show file tree
Hide file tree
Showing 40 changed files with 4,429 additions and 1,032 deletions.
29 changes: 22 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
python-version: [3.9, "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12"]
runs-on: ${{ matrix.os }}
steps:
- name: Clone Repository
Expand All @@ -27,13 +27,28 @@ jobs:
- name: Set up Poetry
uses: abatilo/[email protected]
with:
poetry-version: 1.3.2
poetry-version: latest

# - name: Run Tests
# env:
# SUPABASE_ID: ${{ secrets.SUPABASE_ID }}
# API_KEY: ${{ secrets.API_KEY }}
# run: make run_tests
- name: Install Supabase CLI
uses: supabase/setup-cli@v1
with:
version: latest

- name: Setup Local Supabase
run: |
supabase start --workdir tests
supabase db reset --workdir tests
supabase status --workdir tests -o env > tests/.env \
--override-name auth.anon_key=SUPABASE_ANON_KEY \
--override-name api.url=SUPABASE_URL
- name: Wait for Supabase to be ready
run: |
echo "Waiting for 5 seconds to ensure Supabase is fully initialized..."
sleep 5
- name: Run Tests
run: make run_tests || make run_tests

# - name: Upload Coverage
# uses: codecov/codecov-action@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/conventional-commits-lint.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@ if (failed) {
process.exit(1);
}

process.exit(0);
process.exit(0);
2 changes: 1 addition & 1 deletion .github/workflows/conventional-commits.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ jobs:
node .github/workflows/conventional-commits-lint.js push <<EOF
${{ toJSON(github.event) }}
EOF
EOF
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ tests_pre_commit:
run_tests: tests

tests_only:
poetry run pytest --cov=./ --cov-report=xml --cov-report=html -vv
poetry run pytest -vv

build_sync:
poetry run unasync realtime tests
210 changes: 170 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,73 +1,203 @@
# realtime-py
Python Client Library to interface with the Phoenix Realtime Server
<br />
<p align="center">
<a href="https://supabase.io">
<picture>
<source media="(prefers-color-scheme: dark)" srcset="https://raw.githubusercontent.com/supabase/supabase/master/packages/common/assets/images/supabase-logo-wordmark--dark.svg">
<source media="(prefers-color-scheme: light)" srcset="https://raw.githubusercontent.com/supabase/supabase/master/packages/common/assets/images/supabase-logo-wordmark--light.svg">
<img alt="Supabase Logo" width="300" src="https://raw.githubusercontent.com/supabase/supabase/master/packages/common/assets/images/logo-preview.jpg">
</picture>
</a>

## Requirements
**Python 3 higher**
<h1 align="center">Supabase Realtime Client</h1>

<h3 align="center">Send ephemeral messages with <b>Broadcast</b>, track and synchronize state with <b>Presence</b>, and listen to database changes with <b>Postgres Change Data Capture (CDC)</b>.</h3>

<p align="center">
<a href="https://supabase.com/docs/guides/realtime">Guides</a>
·
<a href="https://supabase.com/docs/reference/python">Reference Docs</a>
·
<a href="https://multiplayer.dev">Multiplayer Demo</a>
</p>
</p>

# Overview

This client enables you to use the following Supabase Realtime's features:

- **Broadcast**: send ephemeral messages from client to clients with minimal latency. Use cases include sharing cursor positions between users.
- **Presence**: track and synchronize shared state across clients with the help of CRDTs. Use cases include tracking which users are currently viewing a specific webpage.
- **Postgres Change Data Capture (CDC)**: listen for changes in your PostgreSQL database and send them to clients.

# Usage

## Installing the Package

## Installation
```bash
pip3 install realtime==1.0.2
pip3 install realtime==2.0.0
```

## Installation from source
```bash
pip3 install -r requirements.txt
python3 usage.py
## Creating a Channel

```python
import asyncio
from typing import Optional
from realtime.client import RealtimeClient
from realtime.channel import RealtimeSubscribeStates

client = RealtimeClient(REALTIME_URL, API_KEY)
channel = client.channel('test-channel')

def _on_subscribe(status: RealtimeSubscribeStates, err: Optional[Exception]):
if status == RealtimeSubscribeStates.SUBSCRIBED:
print('Connected!')
elif status == RealtimeSubscribeStates.CHANNEL_ERROR:
print(f'There was an error subscribing to channel: {err.message}')
elif status == RealtimeSubscribeStates.TIMED_OUT:
print('Realtime server did not respond in time.')
elif status == RealtimeSubscribeStates.CLOSED:
print('Realtime channel was unexpectedly closed.')

await channel.subscribe(_on_subscribe)

# Listen for all incoming events, often the last thing you want to do.
await client.listen()
```

## Quick Start
### Notes:

- `REALTIME_URL` is `ws://localhost:4000/socket` when developing locally and `wss://<project_ref>.supabase.co/realtime/v1` when connecting to your Supabase project.
- `API_KEY` is a JWT whose claims must contain `exp` and `role` (existing database role).
- Channel name can be any `string`.

## Broadcast

Your client can send and receive messages based on the `event`.

```python
from realtime.connection import Socket
# Setup...

channel = client.channel(
"broadcast-test", {"config": {"broadcast": {"ack": False, "self": False}}}
)

await channel.on_broadcast("some-event", lambda payload: print(payload)).subscribe()
await channel.send_broadcast("some-event", {"hello": "world"})
```

### Notes:

def callback1(payload):
print("Callback 1: ", payload)
- Setting `ack` to `true` means that the `channel.send` promise will resolve once server replies with acknowledgement that it received the broadcast message request.
- Setting `self` to `true` means that the client will receive the broadcast message it sent out.
- Setting `private` to `true` means that the client will use RLS to determine if the user can connect or not to a given channel.

def callback2(payload):
print("Callback 2: ", payload)
## Presence

if __name__ == "__main__":
URL = "ws://localhost:4000/socket/websocket"
s = Socket(URL)
s.connect()
Your client can track and sync state that's stored in the channel.

```python
# Setup...

channel = client.channel(
"presence-test",
{
"config": {
"presence": {
"key": ""
}
}
}
)

channel.on_presence_sync(lambda: print("Online users: ", channel.presence_state()))
channel.on_presence_join(lambda new_presences: print("New users have joined: ", new_presences))
channel.on_presence_leave(lambda left_presences: print("Users have left: ", left_presences))

await channel.track({ 'user_id': 1 })
```

channel_1 = s.set_channel("realtime:public:todos")
channel_1.join().on("UPDATE", callback1)
## Postgres CDC

channel_2 = s.set_channel("realtime:public:users")
channel_2.join().on("*", callback2)
Receive database changes on the client.

s.listen()
```python
# Setup...

channel = client.channel("db-changes")

channel.on_postgres_changes(
"*",
schema="public",
callback=lambda payload: print("All changes in public schema: ", payload),
)

channel.on_postgres_changes(
"INSERT",
schema="public",
table="messages",
callback=lambda payload: print("All inserts in messages table: ", payload),
)

channel.on_postgres_changes(
"UPDATE",
schema="public",
table="users",
filter="username=eq.Realtime",
callback=lambda payload: print(
"All updates on users table when username is Realtime: ", payload
),
)

channel.subscribe(
lambda status, err: status == RealtimeSubscribeStates.SUBSCRIBED
and print("Ready to receive database changes!")
)
```

## Get All Channels

You can see all the channels that your client has instantiated.

## Sample usage with Supabase
```python
# Setup...

Here's how you could connect to your realtime endpoint using Supabase endpoint. Correct as of 5th June 2021. Please replace `SUPABASE_ID` and `API_KEY` with your own `SUPABASE_ID` and `API_KEY`. The variables shown below are fake and they will not work if you try to run the snippet.
client.get_channels()
```

## Cleanup

It is highly recommended that you clean up your channels after you're done with them.

- Remove a single channel

```python
from realtime.connection import Socket
# Setup...

SUPABASE_ID = "dlzlllxhaakqdmaapvji"
API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYW5vbiIsImlhdCI6MT"
channel = client.channel('some-channel-to-remove')

channel.subscribe()

def callback1(payload):
print("Callback 1: ", payload)
await client.remove_channel(channel)
```

- Remove all channels

```python
# Setup...

if __name__ == "__main__":
URL = f"wss://{SUPABASE_ID}.supabase.co/realtime/v1/websocket?apikey={API_KEY}&vsn=1.0.0"
s = Socket(URL)
s.connect()
channel1 = client.channel('a-channel-to-remove')
channel2 = client.channel('another-channel-to-remove')

channel_1 = s.set_channel("realtime:*")
channel_1.join().on("UPDATE", callback1)
s.listen()
await channel1.subscribe()
await channel2.subscribe()

await client.remove_all_channels()
```

Then, go to the Supabase interface and toggle a row in a table. You should see a corresponding payload show up in your console/terminal.
## Credits

This repo draws heavily from [phoenix-js](https://github.com/phoenixframework/phoenix/tree/master/assets/js/phoenix).

## License

MIT.
Loading

0 comments on commit 981a5d0

Please sign in to comment.