|
| 1 | +# -*- coding: utf-8 -*- |
1 | 2 | import threading
|
2 | 3 | import traceback
|
3 |
| -from typing import Callable, Generator, Optional |
4 |
| - |
5 |
| -from CHRLINE.services.thrift.ttypes import OpType |
6 | 4 |
|
7 | 5 |
|
8 | 6 | class Poll(object):
|
9 |
| - op_interrupt: dict[int, Callable[[dict, object], None]] |
10 | 7 |
|
11 |
| - def __init__(self) -> None: |
| 8 | + def __init__(self): |
12 | 9 | self.subscriptionId = 0
|
13 | 10 | self.eventSyncToken = None
|
14 |
| - self.op_interrupt = {} |
15 | 11 |
|
16 |
| - def __fetchOps(self, count: int = 100) -> Generator[dict, None, None]: |
| 12 | + def __fetchOps(self, count=100): |
17 | 13 | fetchOps = self.fetchOps
|
18 | 14 | if self.DEVICE_TYPE in self.SYNC_SUPPORT:
|
19 | 15 | fetchOps = self.sync
|
20 | 16 | ops = fetchOps(self.revision, count)
|
21 |
| - if "error" in ops: |
22 |
| - raise Exception(ops["error"]) |
| 17 | + if 'error' in ops: |
| 18 | + raise Exception(ops['error']) |
23 | 19 | for op in ops:
|
24 |
| - op_type = self.checkAndGetValue(op, "type", 3) |
25 |
| - if op_type != -1: |
26 |
| - self.setRevision(self.checkAndGetValue(op, "revision", 1)) |
| 20 | + opType = self.checkAndGetValue(op, 'type', 3) |
| 21 | + if opType != -1: |
| 22 | + self.setRevision(self.checkAndGetValue(op, 'revision', 1)) |
27 | 23 | yield op
|
28 | 24 |
|
29 |
| - def __fetchMyEvents( |
30 |
| - self, count: int = 100, initLastSyncToken: bool = False |
31 |
| - ) -> Generator[dict, None, None]: |
32 |
| - resp = self.fetchMyEvents( |
33 |
| - self.subscriptionId, self.eventSyncToken, count |
34 |
| - ) |
35 |
| - events = self.checkAndGetValue(resp, "events", 2) |
| 25 | + def __fetchMyEvents(self, count: int = 100, initLastSyncToken: bool = False): |
| 26 | + resp = self.fetchMyEvents(self.subscriptionId, self.eventSyncToken, count) |
| 27 | + events = self.checkAndGetValue(resp, 'events', 2) |
36 | 28 | for event in events:
|
37 |
| - syncToken = self.checkAndGetValue(event, "syncToken", 5) |
| 29 | + syncToken = self.checkAndGetValue(event, 'syncToken', 5) |
38 | 30 | self.setEventSyncToken(syncToken)
|
39 | 31 | yield event
|
40 | 32 | if initLastSyncToken:
|
41 |
| - syncToken = self.checkAndGetValue(resp, "syncToken", 3) |
| 33 | + syncToken = self.checkAndGetValue(resp, 'syncToken', 3) |
42 | 34 | self.setEventSyncToken(syncToken)
|
43 | 35 |
|
44 |
| - def __execute( |
45 |
| - self, op: dict, func: Callable[[dict, object], None] |
46 |
| - ) -> None: |
| 36 | + def __execute(self, op, func): |
47 | 37 | try:
|
48 | 38 | func(op, self)
|
49 |
| - except Exception: |
| 39 | + except Exception as e: |
50 | 40 | self.log(traceback.format_exc())
|
51 | 41 |
|
52 |
| - def setRevision(self, revision: Optional[int]) -> None: |
| 42 | + def setRevision(self, revision): |
53 | 43 | if revision is None:
|
54 |
| - self.log("revision is None!!") |
| 44 | + self.log(f'revision is None!!') |
55 | 45 | revision = 0
|
56 | 46 | self.revision = max(revision, self.revision)
|
57 | 47 |
|
58 |
| - def setEventSyncToken(self, syncToken: Optional[int]) -> None: |
| 48 | + def setEventSyncToken(self, syncToken): |
59 | 49 | if syncToken is None:
|
60 |
| - self.log("syncToken is None!!") |
| 50 | + self.log(f'syncToken is None!!') |
61 | 51 | syncToken = 0
|
62 | 52 | if self.eventSyncToken is None:
|
63 | 53 | self.eventSyncToken = syncToken
|
64 | 54 | else:
|
65 | 55 | self.eventSyncToken = max(int(syncToken), int(self.eventSyncToken))
|
66 | 56 |
|
67 |
| - def add_op_interrupt( |
68 |
| - self, op_type: int, func: Callable[[dict, object], None] |
69 |
| - ) -> None: |
70 |
| - self.op_interrupt[op_type] = func |
71 |
| - |
72 |
| - def add_op_interrupt_with_dict( |
73 |
| - self, d: dict[int, Callable[[dict, object], None]] |
74 |
| - ) -> None: |
75 |
| - self.op_interrupt.update(d) |
76 |
| - |
77 |
| - def trace(self, isThreading: bool = True) -> None: |
| 57 | + def trace(self, func, isThreading=True): |
78 | 58 | while self.is_login:
|
79 | 59 | for op in self.__fetchOps():
|
80 |
| - op_type: int = self.checkAndGetValue(op, "type", "val_3", 3) |
81 |
| - if op_type in [-1, OpType.END_OF_OPERATION]: |
82 |
| - continue |
83 |
| - |
84 |
| - func = self.op_interrupt.get(op_type) |
85 |
| - if func is None: |
86 |
| - continue |
87 |
| - |
88 |
| - if isThreading: |
89 |
| - _td = threading.Thread( |
90 |
| - target=self.__execute, args=(op, func) |
91 |
| - ) |
92 |
| - _td.daemon = True |
93 |
| - _td.start() |
94 |
| - else: |
95 |
| - self.__execute(op, func) |
| 60 | + opType = self.checkAndGetValue(op, 'type', 'val_3', 3) |
| 61 | + if opType != 0 and opType != -1: |
| 62 | + if isThreading: |
| 63 | + _td = threading.Thread(target=self.__execute, args=(op, func)) |
| 64 | + _td.daemon = True |
| 65 | + _td.start() |
| 66 | + else: |
| 67 | + self.__execute(op, func) |
0 commit comments