Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stub typing for clients #1075

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Prev Previous commit
Next Next commit
Recommendations from ruff, updates to reflect we no longer support Py…
…thon 3.8.
  • Loading branch information
jackgene committed Nov 16, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 7662b0e30a8341c75074d5b7188b968caef84dd4
2 changes: 1 addition & 1 deletion aiokafka/abc.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc

from aiokafka.structs import TopicPartition

class ConsumerRebalanceListener(abc.ABC):
@@ -134,6 +135,5 @@ class AbstractTokenProvider(abc.ABC):

This feature is only available in Kafka >= 2.1.0.
"""
...

__all__ = ["ConsumerRebalanceListener", "AbstractTokenProvider"]
43 changes: 12 additions & 31 deletions aiokafka/cluster.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from collections.abc import Sequence
from concurrent.futures import Future
from typing import Any, Callable, Optional, Sequence, Set, TypedDict, Union
from typing import Any, Callable, TypedDict

from aiokafka.client import CoordinationType
from aiokafka.protocol.commit import (
GroupCoordinatorResponse_v0,
@@ -16,23 +18,18 @@ from aiokafka.protocol.metadata import (
from aiokafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition

log = ...
MetadataResponse = Union[
MetadataResponse_v0,
MetadataResponse_v1,
MetadataResponse_v2,
MetadataResponse_v3,
MetadataResponse_v4,
MetadataResponse_v5,
]
GroupCoordinatorResponse = Union[
GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1
]
MetadataResponse = MetadataResponse_v0 |\
MetadataResponse_v1 |\
MetadataResponse_v2 |\
MetadataResponse_v3 |\
MetadataResponse_v4 |\
MetadataResponse_v5
GroupCoordinatorResponse = GroupCoordinatorResponse_v0 | GroupCoordinatorResponse_v1

class ClusterConfig(TypedDict):
retry_backoff_ms: int
metadata_max_age_ms: int
bootstrap_servers: str | list[str]
...

class ClusterMetadata:
"""
@@ -65,7 +62,6 @@ class ClusterMetadata:
Returns:
set: {BrokerMetadata, ...}
"""
...

def broker_metadata(self, broker_id: str) -> BrokerMetadata | None:
"""Get BrokerMetadata
@@ -76,9 +72,8 @@ class ClusterMetadata:
Returns:
BrokerMetadata or None if not found
"""
...

def partitions_for_topic(self, topic: str) -> Optional[Set[int]]:
def partitions_for_topic(self, topic: str) -> set[int] | None:
"""Return set of all partitions for topic (whether available or not)

Arguments:
@@ -87,9 +82,8 @@ class ClusterMetadata:
Returns:
set: {partition (int), ...}
"""
...

def available_partitions_for_topic(self, topic: str) -> Optional[Set[int]]:
def available_partitions_for_topic(self, topic: str) -> set[int] | None:
"""Return set of partitions with known leaders

Arguments:
@@ -99,11 +93,9 @@ class ClusterMetadata:
set: {partition (int), ...}
None if topic not found.
"""
...

def leader_for_partition(self, partition: PartitionMetadata) -> int | None:
"""Return node_id of leader, -1 unavailable, None if unknown."""
...

def partitions_for_broker(self, broker_id: int | str) -> set[TopicPartition] | None:
"""Return TopicPartitions for which the broker is a leader.
@@ -115,7 +107,6 @@ class ClusterMetadata:
set: {TopicPartition, ...}
None if the broker either has no partitions or does not exist.
"""
...

def coordinator_for_group(self, group: str) -> int | str | None:
"""Return node_id of group coordinator.
@@ -127,7 +118,6 @@ class ClusterMetadata:
int: node_id for group coordinator
None if the group does not exist.
"""
...

def request_update(self) -> Future[ClusterMetadata]:
"""Flags metadata for update, return Future()
@@ -138,7 +128,6 @@ class ClusterMetadata:
Returns:
Future (value will be the cluster object after update)
"""
...

def topics(self, exclude_internal_topics: bool = ...) -> set[str]:
"""Get set of known topics.
@@ -152,11 +141,9 @@ class ClusterMetadata:
Returns:
set: {topic (str), ...}
"""
...

def failed_update(self, exception: BaseException) -> None:
"""Update cluster state given a failed MetadataRequest."""
...

def update_metadata(self, metadata: MetadataResponse) -> None:
"""Update cluster state given a MetadataResponse.
@@ -166,15 +153,12 @@ class ClusterMetadata:

Returns: None
"""
...

def add_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None:
"""Add a callback function to be called on each metadata update"""
...

def remove_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None:
"""Remove a previously added listener callback"""
...

def add_group_coordinator(
self, group: str, response: GroupCoordinatorResponse
@@ -188,13 +172,11 @@ class ClusterMetadata:
Returns:
string: coordinator node_id if metadata is updated, None on error
"""
...

def with_partitions(
self, partitions_to_add: Sequence[PartitionMetadata]
) -> ClusterMetadata:
"""Returns a copy of cluster metadata with partitions added"""
...

def coordinator_metadata(self, node_id: int | str) -> BrokerMetadata | None: ...
def add_coordinator(
@@ -210,6 +192,5 @@ class ClusterMetadata:
a new one was elected for the same purpose (For example group
coordinator for group X).
"""
...

def __str__(self) -> str: ...
Loading
Loading