Description
Description
Hello!
I recently started refactoring the anyrun-task connector.
I would like to use our company's SDK to simplify the further development process
The SDK uses aiohttp library to send the synchronous and asynchronous HTTP requests.
The aiohttp library uses concurrent.futures.ThreadPoolExecutor object to send the requests.
So, I encountered a problem:
ThreadPoolExecutor can't register a new thread and execute a new request in it.
Could u explain if it is possible to use custom threads in the current architecture of the pycti package?
I would be glad to get some advice if I am doing something wrong.
Reproducible Steps
This is a code example that reproduces the problem:
import os
import yaml
from pycti import OpenCTIConnectorHelper
import concurrent.futures.thread
import concurrent.futures.process
from concurrent.futures import ThreadPoolExecutor
config_file_path: str = os.path.dirname(os.path.abspath(__file__)) + "/config.yml"
config: dict = (
yaml.load(open(config_file_path), Loader=yaml.FullLoader)
if os.path.isfile(config_file_path)
else {}
)
opencti_helper = OpenCTIConnectorHelper(config)
def hello_world():
opencti_helper.log_info('Hello world!')
def sample(data):
opencti_helper.log_info('Initialize the threads pool')
with ThreadPoolExecutor() as executor:
opencti_helper.log_info('Execute a function in a new thread')
executor.submit(hello_world)
# Start the main loop
def mainloop():
opencti_helper.listen(sample)
if __name__ == '__main__':
mainloop()
Related logs:
{"timestamp": "2025-04-18T14:22:22.442747Z", "level": "INFO", "name": "api", "message": "Health check (platform version)..."}
{"timestamp": "2025-04-18T14:22:22.472819Z", "level": "INFO", "name": "api", "message": "Health check (platform version)..."}
{"timestamp": "2025-04-18T14:22:22.588374Z", "level": "INFO", "name": "ANY.RUN task", "message": "Connector registered with ID", "attributes": {"id": "a1d643fe-ea85-4e1a-b456-e1fa6ad4ef76"}}
{"timestamp": "2025-04-18T14:22:22.588814Z", "level": "INFO", "name": "ANY.RUN task", "message": "Starting PingAlive thread"}
{"timestamp": "2025-04-18T14:22:22.639166Z", "level": "INFO", "name": "api", "message": "Health check (platform version)..."}
{"timestamp": "2025-04-18T14:22:22.670693Z", "level": "INFO", "name": "api", "message": "Health check (platform version)..."}
{"timestamp": "2025-04-18T14:22:22.806325Z", "level": "INFO", "name": "ANY.RUN task", "message": "Connector registered with ID", "attributes": {"id": "a1d643fe-ea85-4e1a-b456-e1fa6ad4ef76"}}
{"timestamp": "2025-04-18T14:22:22.806957Z", "level": "INFO", "name": "ANY.RUN task", "message": "Starting PingAlive thread"}
{"timestamp": "2025-04-18T14:22:22.809703Z", "level": "INFO", "name": "ANY.RUN task", "message": "Starting ListenQueue thread"}
{"timestamp": "2025-04-18T14:22:22.810072Z", "level": "INFO", "name": "ANY.RUN task", "message": "ListenQueue connecting to rabbitMq."}
{"timestamp": "2025-04-18T14:24:49.456417Z", "level": "INFO", "name": "ANY.RUN task", "message": "Message ack", "attributes": {"tag": 1}}
{"timestamp": "2025-04-18T14:24:49.457753Z", "level": "INFO", "name": "api", "message": "Reading Stix-Core-Object", "attributes": {"id": "url--dd7a39ef-f8f4-58f3-b3e2-418acbe4f659"}}
{"timestamp": "2025-04-18T14:24:49.498927Z", "level": "INFO", "name": "api", "message": "Listing stix_nested_ref_relationships", "attributes": {"relationship_type": null, "from_id": "6d0cc4fc-673a-4ff8-9e4d-7405c99876aa", "to_id": null}}
{"timestamp": "2025-04-18T14:24:49.516998Z", "level": "INFO", "name": "api", "message": "Reporting work update_received", "attributes": {"work_id": "work_a1d643fe-ea85-4e1a-b456-e1fa6ad4ef76_2025-04-18T14:22:35.339Z"}}
{"timestamp": "2025-04-18T14:24:49.546916Z", "level": "INFO", "name": "ANY.RUN task", "message": "Initialize the threads pool"}
{"timestamp": "2025-04-18T14:24:49.547117Z", "level": "INFO", "name": "ANY.RUN task", "message": "Execute a function in a new thread"}
{"timestamp": "2025-04-18T14:24:49.549266Z", "level": "ERROR", "name": "ANY.RUN task", "message": "Error in message processing, reporting error to API", "exc_info": "Traceback (most recent call last):\n File \"/usr/local/lib/python3.11/site-packages/pycti/connector/opencti_connector_helper.py\", line 352, in _data_handler\n message = self.callback(event_data)\n ^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/opt/connector-anyrun-task/opencti.py\", line 32, in sample\n executor.submit(hello_world)\n File \"/usr/local/lib/python3.11/concurrent/futures/thread.py\", line 169, in submit\n raise RuntimeError('cannot schedule new futures after '\nRuntimeError: cannot schedule new futures after interpreter shutdown"}
{"timestamp": "2025-04-18T14:24:49.549447Z", "level": "INFO", "name": "api", "message": "Reporting work update_processed", "attributes": {"work_id": "work_a1d643fe-ea85-4e1a-b456-e1fa6ad4ef76_2025-04-18T14:22:35.339Z"}}
{"timestamp": "2025-04-18T14:24:50.512980Z", "level": "INFO", "name": "ANY.RUN task", "message": "Message processed, thread terminated", "attributes": {"tag": 1}}
Environment
- OS (where OpenCTI server runs): Debian GNU/Linux 12 (bookworm)
- OpenCTI version: 6.5.8
- Other environment details: Python 3.11
Additional information
Here is the full traceback of the production code I am trying to implement
Traceback (most recent call last):\n File "/usr/local/lib/python3.11/site-packages/pycti/connector/opencti_connector_helper.py", line 352, in _data_handler\n message = self.callback(event_data)\n ^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/connector-anyrun-task/opencti.py", line 41, in _process_message\n analysis_summary = self._anyrun.process_analysis(analysis_type)['data']\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/connector-anyrun-task/anyrun_sb.py", line 28, in process_analysis\n return self._process_windows_analysis(analysis_type)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/connector-anyrun-task/anyrun_sb.py", line 49, in _process_windows_analysis\n summary = self._get_submission(analysis_type, connector)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/connector-anyrun-task/anyrun_sb.py", line 70, in _get_submission\n task_uuid = connector.run_url_analysis(**self._config.to_dict)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/site-packages/anyrun/connectors/sandbox/operation_systems/windows.py", line 268, in run_url_analysis\n return execute_synchronously(\n ^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/site-packages/anyrun/utils/utility_functions.py", line 19, in execute_synchronously\n return event_loop.run_until_complete(function(*args, **kwargs))\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete\n return future.result()\n ^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/site-packages/anyrun/connectors/sandbox/operation_systems/windows.py", line 365, in run_url_analysis_async\n response_data = await self._make_request_async('POST', url, json=body)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/site-packages/anyrun/connectors/base_connector.py", line 83, in _make_request_async\n response: aiohttp.ClientResponse = await self._session.request(\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/site-packages/aiohttp/client.py", line 703, in _request\n conn = await self._connector.connect(\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/site-packages/aiohttp/connector.py", line 548, in connect\n proto = await self._create_connection(req, traces, timeout)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/site-packages/aiohttp/connector.py", line 1056, in _create_connection\n _, proto = await self._create_direct_connection(req, traces, timeout)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/site-packages/aiohttp/connector.py", line 1351, in _create_direct_connection\n hosts = await self._resolve_host(host, port, traces=traces)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/site-packages/aiohttp/connector.py", line 995, in _resolve_host\n return await asyncio.shield(resolved_host_task)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/site-packages/aiohttp/connector.py", line 1026, in _resolve_host_with_throttle\n addrs = await self._resolver.resolve(host, port, family=self._family)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/site-packages/aiohttp/resolver.py", line 36, in resolve\n infos = await self._loop.getaddrinfo(\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/asyncio/base_events.py", line 868, in getaddrinfo\n return await self.run_in_executor(\n ^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/asyncio/base_events.py", line 830, in run_in_executor\n executor.submit(func, *args), loop=self)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 169, in submit\n raise RuntimeError('cannot schedule new futures after '\nRuntimeError: cannot schedule new futures after interpreter shutdown