Skip to content
This repository has been archived by the owner on Jul 4, 2024. It is now read-only.

Commit

Permalink
Moved some unnecessary services to private project
Browse files Browse the repository at this point in the history
  • Loading branch information
Joscha Götzer committed Aug 25, 2020
1 parent f575921 commit 55b9780
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 96 deletions.
2 changes: 1 addition & 1 deletion botkit/botkit_modules/module_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def enable_module(self, module_info: ModuleInfosCollectionModel):
module_to_enable = module_info.page_items[0]
module_name = module_to_enable.name
module = self.module_loader.get_module_by_name(module_name)
await self.module_loader.register_module(module)
await self.module_loader.try_register_module(module)
module_to_enable.is_enabled = True
return module_info

Expand Down
28 changes: 22 additions & 6 deletions botkit/botkit_modules/system/status_pings.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ class Ping(BaseModel):

class StatusPings:
def __init__(
self, client: Client, log_chat: Union[int, str], environment: str, environment_priority: List[str],
self,
client: Client,
log_chat: Union[int, str],
environment: str,
environment_priority: List[str],
):
self.log_chat = log_chat
self.environment = environment
Expand Down Expand Up @@ -99,7 +103,9 @@ async def update_status(self, queried_ping: Optional[Ping], other_detected: bool
return

# Waiting for all other instances to be offline for some time
if self.timestamp_older_than(queried_ping.ping_time, seconds=self.reactivate_after_seconds):
if self.timestamp_older_than(
queried_ping.ping_time, seconds=self.reactivate_after_seconds
):
command = ToggleSystemStateCommand(
new_state="unpause",
triggered_by=self.__class__.__name__,
Expand All @@ -117,13 +123,17 @@ async def update_status(self, queried_ping: Optional[Ping], other_detected: bool
f"{self.environment} ready to take over."
)
else:
self.log.debug(f"Paused since another instance with higher priority ({queried_ping.env}) is running.")
self.log.debug(
f"Paused since another instance with higher priority ({queried_ping.env}) is running."
)

def has_higher_priority(self, env: str, compare_to: str) -> Optional[bool]:
try:
return self.priority.index(env) < self.priority.index(compare_to)
except ValueError:
self.log.exception(f"Environment priority map does not contain '{env}' and '{compare_to}'.")
self.log.exception(
f"Environment priority map does not contain '{env}' and '{compare_to}'."
)
return None

async def query_most_recent_ping(self) -> Optional[Ping]:
Expand Down Expand Up @@ -164,7 +174,11 @@ def timestamp_older_than(dt: datetime, seconds: int) -> bool:

@staticmethod
def timestamp_between(dt: datetime, min_seconds: int, max_seconds: int):
return dt + timedelta(max_seconds) > datetime.now(tz=pytz.UTC) > dt + timedelta(seconds=min_seconds)
return (
dt + timedelta(max_seconds)
> datetime.now(tz=pytz.UTC)
> dt + timedelta(seconds=min_seconds)
)

async def _send_ping(self, force_resend: bool = False):
self.last_sent_ping = Ping(env=self.environment, ping_time=datetime.now(tz=pytz.UTC))
Expand All @@ -177,4 +191,6 @@ async def _send_ping(self, force_resend: bool = False):
return
except:
pass
self.last_ping_msg = await self.client.send_message(self.log_chat, self.last_sent_ping.json())
self.last_ping_msg = await self.client.send_message(
self.log_chat, self.last_sent_ping.json()
)
70 changes: 70 additions & 0 deletions botkit/botkit_modules/system/system_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Any, List, Optional
from unittest.mock import Mock

from haps import Inject
from pyrogram import Chat, Message, User

from botkit.core.moduleloader import ModuleLoader
from botkit.core.modules import Module, module
from botkit.routing.pipelines.callbacks import HandlerSignature
from botkit.routing.route import RouteDefinition, RouteHandler
from botkit.routing.route_builder.builder import RouteBuilder
from botkit.routing.route_builder.route_collection import RouteCollection
from botkit.routing.update_types.updatetype import UpdateType
from botkit.types.client import IClient


def notests(func):
func.notests = True
return func


class SystemTestsModule(Module):
loader: ModuleLoader = Inject()

def register(self, routes: RouteBuilder):
pass

async def load(self) -> None:
for m in self.loader.modules:

if not m.route_collection:
continue

for client, routes in m.route_collection.routes_by_client.items():
await self.test_module_routes(routes)

async def unload(self) -> None:
return await super().unload()

async def test_module_routes(self, routes: List[RouteDefinition]):
for route in routes:
for update_type, route_wrapper in route.handler_by_update_type.items():
await self.fire_request(update_type, route_wrapper)

async def fire_request(self, update_type: UpdateType, route: RouteHandler):
try:
should_not_test = route.callback.notests
return
except AttributeError:
pass

client = Mock(IClient)
if update_type == UpdateType.message:
message = Mock(Message)
(user := Mock(User)).configure_mock()
(chat := Mock(Chat)).configure_mock(id=12345)
message.configure_mock(
message_id=12345,
command="test",
from_user=user,
chat=chat,
text="test",
forward_from=None,
reply_to_message=None,
)
try:
res = await route.callback(client, message)
print(res)
except Exception as ex:
self.log.exception(ex)
24 changes: 18 additions & 6 deletions botkit/botkit_modules/system/sytem_management_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pyrogram import Filters, Message, User, Client
from typing import Optional, List, Any, Literal

from botkit.botkit_modules.system.system_tests import notests
from botkit.persistence.callback_manager import (
RedisCallbackManager,
ICallbackManager,
Expand Down Expand Up @@ -49,7 +50,9 @@ def register(self, routes: RouteBuilder):
routes.on(restart_command & only_owner).call(self.restart_system)

routes.on(Filters.command(["off", "pause"]) & only_owner).call(self.handle_pause_command)
routes.on(Filters.command(["on", "unpause"]) & only_owner).call(self.handle_unpause_command)
routes.on(Filters.command(["on", "unpause"]) & only_owner).call(
self.handle_unpause_command
)

command_bus.register(_ToggleSystemStateCommandHandler(self))

Expand All @@ -59,14 +62,19 @@ async def restart_system(_, message: Message):
await message.delete()
command_bus.execute(
ToggleSystemStateCommand(
new_state="pause", triggered_by="user", reason_phrase="User requested restart of system.",
new_state="pause",
triggered_by="user",
reason_phrase="User requested restart of system.",
)
)
await asyncio.sleep(2)
command_bus.execute(
ToggleSystemStateCommand(new_state="unpause", triggered_by="user", reason_phrase="Starting back up.",)
ToggleSystemStateCommand(
new_state="unpause", triggered_by="user", reason_phrase="Starting back up.",
)
)

@notests
async def handle_pause_command(self, _client, message: Message):
await message.delete()
if self.system_paused:
Expand All @@ -83,14 +91,18 @@ async def pause_system(self):
and not self.module_loader.is_disabled(x)
and not isinstance(x, type(self))
]
self.log.info(f"Pausing modules:\n" + "\n".join([m.get_name() for m in loaded_modules]) + "\n...")
self.log.info(
f"Pausing modules:\n" + "\n".join([m.get_name() for m in loaded_modules]) + "\n..."
)
tasks = [self.module_loader.unregister_module(m) for m in loaded_modules]
await asyncio.gather(*tasks, return_exceptions=True)
self.system_paused = True
self.paused_modules = loaded_modules

try:
callback_manager: RedisCallbackManager = Container().get_object(ICallbackManager, "redis")
callback_manager: RedisCallbackManager = Container().get_object(
ICallbackManager, "redis"
)
callback_manager.callbacks.sync()
self.log.info("Callbacks synced.")
except:
Expand All @@ -110,7 +122,7 @@ async def unpause_system(self):
if self.paused_modules:
self.log.info(f"Unpausing {len(self.paused_modules)} modules...")
for m in self.paused_modules:
await self.module_loader.register_module(m)
await self.module_loader.try_register_module(m)
else:
self.log.error(
f"For some reason there were no paused modules: {self.paused_modules}. "
Expand Down
6 changes: 3 additions & 3 deletions botkit/core/moduleloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ async def register_enabled_modules(self) -> None:
tasks: List[Coroutine] = []
for n, module in enumerate(self.modules):
module.group_index = n
tasks.append(self.register_module(module))
tasks.append(self.try_register_module(module))

results = await asyncio.gather(*tasks)
await asyncio.gather(*tasks)

async def register_module(self, module: Module) -> None:
async def try_register_module(self, module: Module) -> None:
try:
if self.is_disabled(module):
log.debug(f"{module.get_name()} is disabled.")
Expand Down
29 changes: 19 additions & 10 deletions botkit/dispatching/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from botkit.core.modules import Module
from botkit.routing.route import RouteHandler
from botkit.routing.update_types.updatetype import UpdateType
from botkit.types.client import IClient

"""
Indicates where the evaluation of individual updates takes place
Expand All @@ -24,7 +25,7 @@ class BotkitDispatcher:
def __init__(self):
self.callback_action_dispatchers: Dict[Client, CallbackActionDispatcher] = dict()
self._inline_query_factory: Any = None
self._module_handlers: Dict[int, Dict[Client, List[Handler]]] = dict()
self.module_handlers: Dict[int, Dict[Client, List[Handler]]] = dict()

self.log = logzero.setup_logger(BotkitDispatcher.__name__)

Expand All @@ -34,7 +35,9 @@ async def add_module_routes(self, module: Module):
for client, routes in module.route_collection.routes_by_client.items():
for route in routes:
for update_type, route_wrapper in route.handler_by_update_type.items():
await self.add_route_for_update_type(module, client, update_type, route_wrapper)
await self.add_route_for_update_type(
module, client, update_type, route_wrapper
)

"""
TODO: split this up into:
Expand All @@ -52,7 +55,8 @@ async def add_module_routes(self, module: Module):
"""

self.log.info(
f"({module.group_index}) {module.get_name()} loaded" + (" with: " + ", ".join(log_msg) if log_msg else "")
f"({module.group_index}) {module.get_name()} loaded"
+ (" with: " + ", ".join(log_msg) if log_msg else "")
)

async def add_route_for_update_type(
Expand Down Expand Up @@ -89,14 +93,14 @@ async def add_route_for_update_type(
async def remove_module_routes(self, module: Module):
group = module.group_index

for client, h in self._module_handlers[group].items():
for client, h in self.module_handlers[group].items():
for handler in h:
try:
client.remove_handler(handler, group)
except Exception:
self.log.exception(f"Could not remove handler {handler} from group {group}.")

del self._module_handlers[group]
del self.module_handlers[group]

async def add_handler(self, group: int, client: Client, handler: Handler):
assert group is not None
Expand All @@ -106,16 +110,21 @@ async def add_handler(self, group: int, client: Client, handler: Handler):
async with client.dispatcher.locks_list[-1]:
client.add_handler(handler, group)

self._module_handlers.setdefault(group, {})
self._module_handlers[group].setdefault(client, [])
self._module_handlers[group][client].append(handler)
self.module_handlers.setdefault(group, {})
self.module_handlers[group].setdefault(client, [])
self.module_handlers[group][client].append(handler)

def is_registered(self, module: Module) -> bool:
return module.group_index in self._module_handlers
return module.group_index in self.module_handlers

async def _get_or_create_action_dispatcher(self, client) -> CallbackActionDispatcher:

if not (action_dispatcher := self.callback_action_dispatchers.get(client)):
self.callback_action_dispatchers[client] = (action_dispatcher := CallbackActionDispatcher())
self.callback_action_dispatchers[client] = (
action_dispatcher := CallbackActionDispatcher()
)

# All callback queries use the same group (only one of them applies for a given update)
await self.add_handler(0, client, action_dispatcher.pyrogram_handler)

return action_dispatcher
1 change: 1 addition & 0 deletions botkit/routing/pipelines/execution_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


class SendTo(Enum):
self = auto()
same_chat = auto()
same_chat_quote = auto()
same_chat_quote_replied_to = auto()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def gather_initial_state_async(context: BotkitContext):

return result
except Exception as e:
raise GatherStepError(gatherer) from e
raise GatherStepError(e)

return gather_initial_state_async, is_coroutine

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def evaluate_send_target(send_target: SendTarget, context: BotkitContext) -> _Ev
else:
static_send_target = send_target

if static_send_target == SendTo.self or static_send_target == SendTo.self.name:
return _EvaluatedSendTarget("me", None)
if static_send_target == SendTo.same_chat or static_send_target == SendTo.same_chat.name:
return _EvaluatedSendTarget(context.chat_id, None)
if (
Expand Down
20 changes: 14 additions & 6 deletions botkit/routing/route_builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from pyrogram.client.filters.filters import create
from pyrogram.client.handlers.handler import Handler

from botkit.routing.pipelines.execution_plan import ExecutionPlan, SendTarget, SendTo, SendTo
from botkit.routing.pipelines.callbacks import HandlerSignature
from botkit.routing.pipelines.execution_plan import ExecutionPlan, SendTarget, SendTo
from botkit.routing.pipelines.gatherer import GathererSignature
from botkit.routing.pipelines.reducer import ReducerSignature
from botkit.routing.route import RouteDefinition
Expand All @@ -31,7 +32,6 @@
from botkit.routing.route_builder.webhook_action_expression import WebhookActionExpressionMixin
from botkit.routing.triggers import RouteTriggers
from botkit.routing.types import TState
from botkit.routing.pipelines.callbacks import HandlerSignature
from botkit.routing.update_types.updatetype import UpdateType
from botkit.types.client import IClient
from botkit.views.base import InlineResultViewBase
Expand Down Expand Up @@ -83,15 +83,15 @@ def then_invoke(self, component: "Component") -> RouteExpression:
self._route_collection.add_for_current_client(route)
return RouteExpression(self._route_collection, route)

def then_update(self, view_type): # TODO: update with functional views
def then_update(self, view_type) -> RouteExpression:
self._plan.set_view(view_type, "update")
route = RouteDefinition(triggers=self._triggers, plan=self._plan)
self._route_collection.add_for_current_client(route)
return RouteExpression(self._route_collection, route)

def then_send(
self, view_or_view_type, to: SendTarget = SendTo.same_chat, via: IClient = None,
):
) -> RouteExpression:
if via and not self._route_collection.current_client.is_user:
raise ValueError(
"Can only send a view `via` another bot when the client that this route belongs to is a "
Expand Down Expand Up @@ -126,8 +126,16 @@ def call(self, handler: HandlerSignature) -> RouteExpression:
self._route_collection.add_for_current_client(route)
return RouteExpression(self._route_collection, route)

def send_view(self, view: TView) -> RouteExpression:
self._plan.set_view(view, "send")
def send_view(
self, view_or_view_type, to: SendTarget = SendTo.same_chat, via: IClient = None,
):
if via and not self._route_collection.current_client.is_user:
raise ValueError(
"Can only send a view `via` another bot when the client that this route belongs to is a "
"userbot. A userbot and a regular bot together form a 'companion bot' relationship.",
self._route_collection.current_client,
)
self._plan.set_view(view_or_view_type, "send").set_send_via(via).set_send_target(to)
route = RouteDefinition(triggers=self._triggers, plan=self._plan)
self._route_collection.add_for_current_client(route)
return RouteExpression(self._route_collection, route)
Expand Down
Empty file.
Loading

0 comments on commit 55b9780

Please sign in to comment.