77import logging
88import os .path
99from collections .abc import Callable
10- from typing import Any
10+ from typing import Any , TypeVar
1111
1212import zeep .helpers
1313from zeep .cache import SqliteCache
@@ -164,6 +164,27 @@ def _load_document() -> DocumentWithDeferredLoad:
164164 return document
165165
166166
167+ _T = TypeVar ("_T" )
168+
169+
170+ def handle_snapshot_errors (func : Callable [..., _T ]) -> Callable [..., _T ]:
171+ """Decorator to handle snapshot URI errors."""
172+
173+ async def wrapper (self , uri : str , * args : Any , ** kwargs : Any ) -> _T :
174+ try :
175+ return await func (self , uri , * args , ** kwargs )
176+ except TimeoutError as error :
177+ raise ONVIFTimeoutError (
178+ f"Timed out fetching { obscure_user_pass_url (uri )} : { error } "
179+ ) from error
180+ except aiohttp .ClientError as error :
181+ raise ONVIFError (
182+ f"Error fetching { obscure_user_pass_url (uri )} : { error } "
183+ ) from error
184+
185+ return wrapper
186+
187+
167188class ZeepAsyncClient (BaseZeepAsyncClient ):
168189 """Overwrite create_service method to be async."""
169190
@@ -601,7 +622,7 @@ async def get_snapshot(
601622 middlewares = (DigestAuthMiddleware (self .user , self .passwd ),)
602623
603624 response = await self ._try_snapshot_uri (uri , auth = auth , middlewares = middlewares )
604- content = await response . read ( )
625+ content = await self . _try_read_snapshot_content ( uri , response )
605626
606627 # If the request fails with a 401, strip user/pass from URL and retry
607628 if (
@@ -612,7 +633,7 @@ async def get_snapshot(
612633 response = await self ._try_snapshot_uri (
613634 stripped_uri , auth = auth , middlewares = middlewares
614635 )
615- content = await response . read ( )
636+ content = await self . _try_read_snapshot_content ( uri , response )
616637
617638 if response .status == 401 :
618639 raise ONVIFAuthError (f"Failed to authenticate to { uri } " )
@@ -622,24 +643,23 @@ async def get_snapshot(
622643
623644 return None
624645
646+ @handle_snapshot_errors
647+ async def _try_read_snapshot_content (
648+ self ,
649+ uri : str ,
650+ response : aiohttp .ClientResponse ,
651+ ) -> bytes :
652+ """Try to read the snapshot URI."""
653+ return await response .read ()
654+
655+ @handle_snapshot_errors
625656 async def _try_snapshot_uri (
626657 self ,
627658 uri : str ,
628659 auth : BasicAuth | None = None ,
629660 middlewares : tuple [DigestAuthMiddleware , ...] | None = None ,
630661 ) -> aiohttp .ClientResponse :
631- try :
632- return await self ._snapshot_client .get (
633- uri , auth = auth , middlewares = middlewares
634- )
635- except TimeoutError as error :
636- raise ONVIFTimeoutError (
637- f"Timed out fetching { obscure_user_pass_url (uri )} : { error } "
638- ) from error
639- except aiohttp .ClientError as error :
640- raise ONVIFError (
641- f"Error fetching { obscure_user_pass_url (uri )} : { error } "
642- ) from error
662+ return await self ._snapshot_client .get (uri , auth = auth , middlewares = middlewares )
643663
644664 def get_definition (
645665 self , name : str , port_type : str | None = None
0 commit comments