Skip to content

[EH/SB/EG] next* #40483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eng/pylintrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[MASTER]
py-version=3.9
py-version=3.8
ignore-patterns=test_*,conftest,setup
reports=no

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,15 @@ def _get_authentication_policy(credential, bearer_token_policy=BearerTokenCreden
)


def _is_cloud_event(event):
# type: (Any) -> bool
def _is_cloud_event(event: Any) -> bool:
required = ("id", "source", "specversion", "type")
try:
return all((_ in event for _ in required)) and event["specversion"] == "1.0"
except TypeError:
return False


def _is_eventgrid_event_format(event):
# type: (Any) -> bool
def _is_eventgrid_event_format(event: Any) -> bool:
required = ("subject", "eventType", "data", "dataVersion", "id", "eventTime")
try:
return all((prop in event for prop in required))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ class CloudEventDistributedTracingPolicy(SansIOHTTPPolicy):

_CONTENT_TYPE = "application/cloudevents-batch+json; charset=utf-8"

def on_request(self, request):
# type: (PipelineRequest) -> None
def on_request(self, request: "PipelineRequest") -> None:
try:
traceparent = request.http_request.headers["traceparent"]
tracestate = request.http_request.headers["tracestate"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ def __init__(
self._api_version = api_version if api_version is not None else DEFAULT_API_VERSION

@staticmethod
def _policies(credential, **kwargs):
# type: (Union[AzureKeyCredential, AzureSasCredential, TokenCredential], Any) -> List[Any]
def _policies(
credential: Union["AzureKeyCredential", "AzureSasCredential", "TokenCredential"],
**kwargs: Any
) -> List[Any]:
auth_policy = _get_authentication_policy(credential)
sdk_moniker = "eventgrid/{}".format(VERSION)
policies = [
Expand Down Expand Up @@ -234,16 +236,13 @@ def send(self, events: SendType, *, channel_name: Optional[str] = None, **kwargs
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)

def close(self):
# type: () -> None
"""Close the :class:`~azure.eventgrid.EventGridPublisherClient` session."""
return self._client.close()
def close(self) -> None:
"""Closes the EventGridPublisherClient session."""
self._client.close()

def __enter__(self):
# type: () -> EventGridPublisherClient
def __enter__(self) -> "EventGridPublisherClient":
self._client.__enter__()
return self

def __exit__(self, *args):
# type: (*Any) -> None
def __exit__(self, *args) -> None:
self._client.__exit__(*args)
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ class EventGridSasCredentialPolicy(SansIOHTTPPolicy):
:raises: ValueError or TypeError
"""

def __init__(self, credential, name, **kwargs): # pylint: disable=unused-argument
# type: (AzureSasCredential, str, Any) -> None
def __init__(self, credential: "AzureSasCredential", name: str, **kwargs: Any) -> None: # pylint: disable=unused-argument
super(EventGridSasCredentialPolicy, self).__init__()
self._credential = credential
if not name:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ def _deserialize(cls, data, exist_discriminators):
discriminator_value = data.find(xml_name).text # pyright: ignore
else:
discriminator_value = data.get(discriminator._rest_name)
mapped_cls = cls.__mapping__.get(discriminator_value, cls) # pyright: ignore
mapped_cls = cls.__mapping__.get(discriminator_value, cls) # pyright: ignore # pylint: disable=no-member
return mapped_cls._deserialize(data, exist_discriminators)

def as_dict(self, *, exclude_readonly: bool = False) -> typing.Dict[str, typing.Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# license information.
# --------------------------------------------------------------------------
# TODO: Check types of kwargs (issue exists for this)
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
import logging
import time
import queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from urllib.parse import urlparse
import socket
from ssl import SSLError
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
from typing import Any, Dict, List, Tuple, Optional, NamedTuple, Type, Union, cast

from ._transport_async import AsyncTransport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import uuid
import logging
import time
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio

from .._encode import encode_payload
from ._link_async import Link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import uuid
import logging
import time
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
from typing import Optional, Union

from ..constants import ConnectionState, SessionState, SessionTransferState, Role
Expand Down Expand Up @@ -105,8 +105,7 @@ async def _on_connection_state_change(self):
if self.state not in [SessionState.DISCARDING, SessionState.UNMAPPED]:
await self._set_state(SessionState.DISCARDING)

def _get_next_output_handle(self):
# type: () -> int
def _get_next_output_handle(self) -> int:
"""Get the next available outgoing handle number within the max handle limit.

:raises ValueError: If maximum handle has been reached.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
# THE POSSIBILITY OF SUCH DAMAGE.
# -------------------------------------------------------------------------

import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
import errno
import socket
import ssl
Expand Down
12 changes: 4 additions & 8 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ def from_incoming_frame(cls, connection, channel):
new_session = cls(connection, channel)
return new_session

def _set_state(self, new_state):
# type: (SessionState) -> None
def _set_state(self, new_state: SessionState) -> None:
"""Update the session state.
:param ~pyamqp.constants.SessionState new_state: The new state to set.
"""
Expand All @@ -104,8 +103,7 @@ def _on_connection_state_change(self):
if self.state not in [SessionState.DISCARDING, SessionState.UNMAPPED]:
self._set_state(SessionState.DISCARDING)

def _get_next_output_handle(self):
# type: () -> int
def _get_next_output_handle(self) -> int:
"""Get the next available outgoing handle number within the max handle limit.

:raises ValueError: If maximum handle has been reached.
Expand Down Expand Up @@ -374,8 +372,7 @@ def _incoming_detach(self, frame):
)
)

def _wait_for_response(self, wait, end_state):
# type: (Union[bool, float], SessionState) -> None
def _wait_for_response(self, wait: Union[bool, float], end_state: SessionState) -> None:
if wait is True:
self._connection.listen(wait=False)
while self.state != end_state:
Expand All @@ -398,8 +395,7 @@ def begin(self, wait=False):
elif not self.allow_pipelined_open:
raise ValueError("Connection has been configured to not allow piplined-open. Please set 'wait' parameter.")

def end(self, error=None, wait=False):
# type: (Optional[AMQPError], bool) -> None
def end(self, error: Optional[AMQPError] = None, wait: bool = False) -> None:
try:
if self.state not in [SessionState.UNMAPPED, SessionState.DISCARDING]:
self._outgoing_end(error=error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
import sys
from asyncio import Semaphore


def get_dict_with_loop_if_needed(loop):
if sys.version_info >= (3, 10):
Expand All @@ -17,7 +15,7 @@ def get_dict_with_loop_if_needed(loop):
return {}


async def semaphore_acquire_with_timeout(semaphore: Semaphore, timeout=None):
async def semaphore_acquire_with_timeout(semaphore: asyncio.Semaphore, timeout=None):
try:
return await asyncio.wait_for(semaphore.acquire(), timeout=timeout)
except asyncio.TimeoutError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
import logging
import queue
import time
from asyncio import Lock
from typing import Optional, Callable, Awaitable, TYPE_CHECKING

from .._producer_async import EventHubProducer
Expand Down Expand Up @@ -39,7 +38,7 @@ def __init__(
self._max_buffer_len = max_buffer_length
self._cur_buffered_len = 0
self._producer: EventHubProducer = producer
self._lock = Lock()
self._lock = asyncio.Lock()
self._max_wait_time = max_wait_time
self._on_success = self.failsafe_callback(on_success)
self._on_error = self.failsafe_callback(on_error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
import logging
from typing import Dict, List, Callable, Optional, Awaitable, TYPE_CHECKING
from asyncio import Lock

from ._partition_resolver_async import PartitionResolver
from ...aio._producer_async import EventHubProducer
Expand Down Expand Up @@ -37,7 +36,7 @@ def __init__(
):
self._buffered_producers: Dict[str, BufferedProducer] = {}
self._partition_ids: List[str] = partitions
self._lock = Lock()
self._lock = asyncio.Lock()
self._on_success = on_success
self._on_error = on_error
self._create_producer = create_producer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
jenkins-hash lookup3 algorithm implementation
"""

from asyncio import Lock
from asyncio import Lock # pylint:disable=do-not-import-asyncio
from ..._buffered_producer._partition_resolver import (
generate_hash_code,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from __future__ import annotations

import logging
import asyncio
import asyncio # pylint: disable=do-not-import-asyncio
import time
import functools
from typing import TYPE_CHECKING, Any, Dict, List, Callable, Optional, Union, cast
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from asyncio import Lock
from asyncio import Lock # pylint:disable=do-not-import-asyncio

from .._connection_manager import _ConnectionMode
from .._constants import TransportType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# --------------------------------------------------------------------------------------------
from __future__ import annotations
import uuid
import asyncio
import asyncio # pylint: disable=do-not-import-asyncio
import logging
from collections import deque
from typing import TYPE_CHECKING, Callable, Awaitable, Dict, Optional, Union, List, Any, Deque
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# --------------------------------------------------------------------------------------------

from __future__ import annotations
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
import logging
import datetime
import warnings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
cast,
)
import uuid
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
import logging
import time
from functools import partial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

import asyncio
import asyncio # pylint:disable=do-not-import-asyncio


def get_running_loop() -> asyncio.AbstractEventLoop:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# --------------------------------------------------------------------------------------------
from __future__ import annotations
import uuid
import asyncio
import asyncio # pylint: disable=do-not-import-asyncio
import logging
from typing import Iterable, Union, Optional, Any, AnyStr, List, TYPE_CHECKING, cast

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
import logging
import time
import warnings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# --------------------------------------------------------------------------------------------

from __future__ import annotations
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
import time
import logging
from typing import Any, Callable, Dict, List, Optional, Union, cast, TYPE_CHECKING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# --------------------------------------------------------------------------------------------

from __future__ import annotations
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
import time
import logging
from typing import Callable, Dict, Union, cast, TYPE_CHECKING, List, Optional, Any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# license information.
# --------------------------------------------------------------------------
# TODO: Check types of kwargs (issue exists for this)
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
import logging
import time
import queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from urllib.parse import urlparse
import socket
from ssl import SSLError
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
from typing import Any, Dict, List, Tuple, Optional, NamedTuple, Type, Union, cast

from ._transport_async import AsyncTransport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import uuid
import logging
import time
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio

from .._encode import encode_payload
from ._link_async import Link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import uuid
import logging
import time
import asyncio
import asyncio # pylint:disable=do-not-import-asyncio
from typing import Optional, Union

from ..constants import ConnectionState, SessionState, SessionTransferState, Role
Expand Down Expand Up @@ -105,8 +105,7 @@ async def _on_connection_state_change(self):
if self.state not in [SessionState.DISCARDING, SessionState.UNMAPPED]:
await self._set_state(SessionState.DISCARDING)

def _get_next_output_handle(self):
# type: () -> int
def _get_next_output_handle(self) -> int:
"""Get the next available outgoing handle number within the max handle limit.

:raises ValueError: If maximum handle has been reached.
Expand Down
Loading