Skip to content

v2.0.0 with Async Architecture#649

Open
jasonacox wants to merge 41 commits into
masterfrom
v2-async
Open

v2.0.0 with Async Architecture#649
jasonacox wants to merge 41 commits into
masterfrom
v2-async

Conversation

@jasonacox

@jasonacox jasonacox commented Sep 14, 2025

Copy link
Copy Markdown
Owner

v2.0.0 - Async Architecture Introduction (MAJOR REVISION)

This major release introduces the foundation for native asyncio-based device communication while fully preserving the existing synchronous API for backward compatibility.

Highlights:

  • Version bump to 2.x to signal new async subsystem (legacy sync classes unchanged).
  • Planning document ASYNC.md added (vision, goals, milestones for XenonDeviceAsync & related classes).
  • No behavioral changes to existing synchronous code paths in this initial 2.0.0 tag.
  • Future minor releases (2.1.x+) will add new async classes and examples without removing sync support.

Compatibility:

  • Existing imports and synchronous usage continue to work (API surface of 1.x retained).
  • New async classes will live alongside current modules (no name collisions) and require explicit opt‑in.
  • Officially removed Python 2.7 support.

Migration Guidance:

  • You can adopt async incrementally—no action required if you stay with sync API.
  • When async classes land, prefer await device.status_async() patterns in event loops for concurrency gains.

See ASYNC.md for roadmap details.

Re: #645 #646

Copilot AI review requested due to automatic review settings September 14, 2025 00:25

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces v2.0.0 with async architecture foundations while removing Python 2.7 support. It establishes the groundwork for future async capabilities without changing existing synchronous API behavior.

  • Removes all Python 2.7 compatibility code across the codebase
  • Updates version to 2.0.0 to signal breaking change (Python 2 removal)
  • Adds comprehensive async roadmap documentation in ASYNC.md

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.

Show a summary per file
File Description
tools/ttcorefunc.py Removes Python 2 compatibility imports and helper code
tinytuya/wizard.py Removes Python 2 imports and raw_input compatibility
tinytuya/scanner.py Removes Python 2 imports, raw_input compatibility, and unicode handling
tinytuya/core/crypto_helper.py Removes Python 2 print_function import
tinytuya/core/core.py Updates version to 2.0.0 and removes Python 2 compatibility
tinytuya/core/XenonDevice.py Removes Python 2 compatibility for nonce XOR operation
server/server.py Removes Python 2 print_function import
RELEASE.md Adds v2.0.0 release notes
ASYNC.md Adds comprehensive async roadmap and planning document

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

This commit introduces a async base class, XenonDeviceAsync, which mirrors the functionality of the existing XenonDevice class.

The primary goal is to establish a robust foundation for future asynchronous changes by implementing minimal socket communication patches.

While the revised socket code has been tested, it may still contain undiscovered bugs that will be addressed in subsequent commits.
@3735943886

3735943886 commented Sep 14, 2025

Copy link
Copy Markdown
Collaborator

That looks like a very long way to go. 😓

The #645 v.2.0.0-async branch works well for me with my small patches from #646. I've been running it in my real environment for over 8000 minutes with around 100 devices connected. There have been no issues so far with resource leaks, deadlocks, or delays—just an instant response without threading, and so on.
So rather than discarding the entire branch, I think it would be better if we could reuse its logic where possible.

@jasonacox

Copy link
Copy Markdown
Owner Author

Yes, but I think it can be fast. And yes, @3735943886 - absolutely! Your code would be a good place to start.

In the #646 branch have you been using the direct Async classes or the wrappers?

Would you be willing to move those *Async classes over to this branch?

@3735943886

Copy link
Copy Markdown
Collaborator

As a first step, I've committed XenonDeviceAsync as a foundational class for upcoming v2-async support, with minimal socket patches. It seems quite stable so far, likely because the original XenonDevice class was so robust. My only concern is whether my revised socket communication code works without issue.

In the #646 branch have you been using the direct Async classes or the wrappers?

Would you be willing to move those *Async classes over to this branch?

I'm currently running a DeviceAsync-based approach for 24/7 asynchronous multi-device monitoring. You can view the code I'm using at this link: https://github.com/3735943886/tuya2mqtt/blob/async-beta/core/tuya2mqtt.py

I've also tested some synchronous v1 examples and shared my feedback in the comments on PR #646.

@3735943886

3735943886 commented Sep 14, 2025

Copy link
Copy Markdown
Collaborator

I believe the major issue in PR #645 was that XenonDeviceAsync robustness was compromised when an AI refactored it. This caused a cascade of errors that transmitted to Device then BulbDevice(and other device specific classes). After replacing XenonDeviceAsync in PR #646, the approach using AsyncWrapper seemed quite promising. (At least for me, there were no issues.) Also, as you can see from the last comment on PR #645, callback supports for sync API were somewhat feasible.
However, it's hard to say whether the AsyncWrapper is the ultimate endpoint. Then again, simply copying and pasting the Device class and just adding async/await, as was done initially with the DeviceAsync class, isn't fancy. So, what should we do with the DeviceAsync class for the next step? It seems we need a consensus here.

@jasonacox

Copy link
Copy Markdown
Owner Author

As a first step, I've committed XenonDeviceAsync as a foundational class for upcoming v2-async support, with minimal socket patches

Perfect! Are you able to push to v2-async?

I may have misunderstood the discussion in #645 but I came away thinking (and agreeing) that we want to keep the existing sync code in tact. The attempt now will be to create an extension to the library that is async. It looks like you closed and deleted #646 - but likely we could start with some of your fixes there. My proposal is to start with copying in:

Classes:

  • XenonDeviceAsync
  • DeviceAsync

But I'm also wondering, do we need both? That inheritance paradigm came from pytuya and I often debated on why and where it made sense to put new functions.

Perhaps we just use XenonDeviceAsync and rename it as DeviceAsync and fold in the "Device" functions into that same class. Then we derive Outlet, Bulb, Cover, etc., from there.

+------------------------------+            +---------------------------+
|  DeviceAsync (base)          |            | MessageHelper (shared)    |
|  - state machine             |<--calls--> |  pack/unpack (sync funcs) |
|  - connection supervisor     |            |  crypto helpers           |
|  - protocol v3.1..v3.5       |            +---------------------------+
|  - send queue (asyncio.Queue)|
|  - recv task (reader loop)   |            +---------------------------+
|  - handshake coroutine       |<--uses---->| Crypto (AESCipher)        |
+--------------+---------------+            +---------------------------+
               | derives
    +----------+-----------+
    | Async Device Mixins  |
    | (Outlet/Bulb/etc.)   |
    +----------------------+

No sacred cows here. Love it, hate it? Cast your ideas out. 😁

@3735943886

Copy link
Copy Markdown
Collaborator

My stance was to keep XenonDeviceAsync as close as possible to the XenonDevice, which means preserving the core class's legacy code intact(not the whole sync code). I was not opposed to the AsyncWrapper and of course, will defer to the opinions of the long-time contributors. However, I do think the AsyncWrapper may have the advantage of providing non-blocking status callback for those who prefer a sync style without running event loop.

Classes:

  • XenonDeviceAsync
  • DeviceAsync

But I'm also wondering, do we need both? That inheritance paradigm came from pytuya and I often debated on why and where it made sense to put new functions.

Got it. I'll create the DeviceAsync class based on XenonDeviceAsync and integrate the methods from the existing Device class. It'll be done shortly.

DeviceAsync class, adapted from the v1 XenonDevice and Device class with async-compatible methods. Serves as a foundational base for v2 async features.
@uzlonewolf

Copy link
Copy Markdown
Collaborator

I may have misunderstood the discussion in #645 but I came away thinking (and agreeing) that we want to keep the existing sync code in tact.

I wanted to temporarily keep it untouched for the first couple of releases until the async version has proven itself in the real-world, and then replace it with a wrapper later.

Perhaps we just use XenonDeviceAsync and rename it as DeviceAsync and fold in the "Device" functions into that same class. Then we derive Outlet, Bulb, Cover, etc., from there.

Sounds good to me. I think everything already derives from Device anyway, I don't remember anything using XenonDevice directly.

Success on getting true async classes and asyncio functions in place would mean that scanner.py can be rewritten to use that class to accomplish the same performance. It has always bugged me that scanner.py is essentially a completely separate algorithm from the library and how most people would use the library.

Yes, I planned on rewriting the scanner to use the new API once said API has stabilized. This is what, the 4th async PR? I didn't want to rewrite it while things were still changing so much. That said, the force-scan requires brute forcing both the device version and the local key, so it is always going to be a separate algorithm compared to code which already knows both.

@uzlonewolf

Copy link
Copy Markdown
Collaborator

Ok, I think I have my callback idea mostly fleshed out. Please note that this does not prevent you from sending/receiving as you've always done, this callback method is an option in addition to that.

import asyncio
import tunytuya

async def data_handler( device, data, tuya_message ):
    # do something with the received data here
    print( 'in data handler', device.id, data )
    print( 'device list', devices )

async def cmd_handler( device, tuya_message ):
    # received a payload-less TuyaMessage, you can check the retcode here if you want
    pass

async def connected( device, error ):
    # connection to device established, or failed
    if not error:
        await device.status()

devices = {}

async def main():
    async with asyncio.TaskGroup() as tg:
        # start up a background scan job in case we have multiple devices using auto-ip
        scanner = tinytuya.scanner.background_scan()
        scanner_task = tg.create_task( scanner.run_task() )

        for devdata in device_list:
            d = tinytuya.DeviceAsync( devdata )

            d.register_data_handler( data_handler )
            d.register_command_handler( cmd_handler )
            d.register_connect_handler( connected )
            d.register_scanner( scanner )

            task = tg.create_task( d.run_task() )

            devices[d.id] = (d, task)

    # The await is implicit when the context manager exits.

asyncio.run( main() )

Of note is the new scanner interface. The problem with the current scanner is it does not handle multiple devices with auto-ip well. You cannot simply fire up multiple instances (i.e. 1 for each device) because only 1 UDP receiver can receive broadcast packets at a time. Kicking off a single thread and then having each device register the ID of interest fixes that.

@uzlonewolf

Copy link
Copy Markdown
Collaborator

Can someone tell me where that @classmethod create(...) is supposed to be called? I have yet to see it actually get called from anywhere.

@uzlonewolf

Copy link
Copy Markdown
Collaborator

Also, I still plan on deleting all references to nowait and changing all functions to either send, or receive, but never both. The _send_receive() function is a hot mess and doing this will allow it to be cleanly broken up into 2 functions. Hopefully this will also help remind people that device communication is asynchronous and there are gotchas (i.e. calling data = d.status(nowait=False) may not actually return the status if the device slips an asynchronous DP update in before the status() result).

@3735943886

3735943886 commented Sep 14, 2025

Copy link
Copy Markdown
Collaborator

Can someone tell me where that @classmethod create(...) is supposed to be called? I have yet to see it actually get called from anywhere.

Since the __init__ method is synchronous, it can't handle I/O operations find_device and device_info that the original XenonDevice class relied on. While using an async with block is the preferred way to handle initialization for asynchronous classes, I believe providing a separate @classmethod create(...) is a reasonable design choice for users who want to initialize the device without using the async with syntax.

@uzlonewolf

Copy link
Copy Markdown
Collaborator

I believe providing a separate @classmethod create(...) is a reasonable design choice for users who want to initialize the device without using the async with syntax.

But where/who is supposed to call it? I am not using async with in the callback example I posted above and ended up needing to call initialize() myself since create() was never called.

@3735943886

Copy link
Copy Markdown
Collaborator

The @classmethod create(...) method is intended to be called by end users as an alternative to the async with context manager for initialization.

The previous README in PR #645 documented it, but it's not available in the current branch. We'd eventually need restoration the detailed documentation.

# Recommended: Using async context manager
async with tinytuya.DeviceAsync(...) as device:
    # Use the device inside this block
    # Resources are automatically cleaned up when the block is exited.

# Alternative: Using the factory method
device = await tinytuya.DeviceAsync.create(...)
try:
    # Use the device
finally:
    await device.close()  # You must manually call close() to release resources.

@uzlonewolf

Copy link
Copy Markdown
Collaborator

The previous README in PR #645 documented it

Ah, I never noticed that.

Anyway, I hated it, so I rewrote things to eliminate it.

@3735943886

Copy link
Copy Markdown
Collaborator

The previous README in PR #645 documented it

Ah, I never noticed that.

Anyway, I hated it, so I rewrote things to eliminate it.

Great, much better. The create/initialize methods were based on my uncertainty about moving I/O operations out of the __init__ method. I tried to set those operations immediately after initialization whenever possible.

@uzlonewolf

Copy link
Copy Markdown
Collaborator

In this particular case I think the old behavior was actually a bug - why should a device being offline throw an exception when the program starts, but dropping offline later simply causes it to re-scan?

@jasonacox

jasonacox commented Sep 15, 2025

Copy link
Copy Markdown
Owner Author

This is what, the 4th async PR?

Ha! Touche! I really liked the idea of preserving the sync classes and reverting them in the other PRs would have been messy. I also like the idea of cleaning up the inheritence to something that makes more sense.

The previous README in PR #645 documented it, but it's not available in the current branch

Easy to restore and I'm happy to help with the docs when we get to that. Also, I really want to get good test coverage with regression. I pushed one out there (test-devices.py) that uses DeviceAsync. There is a bug in DeviceAsync.py for find_device() right now for Auto-IP discovery, but didn't want to step on any changes you may be doing on that already.

Love what I see so far. :)

PS - Code Coverage Report: https://app.codecov.io/gh/jasonacox/tinytuya/tree/v2-async/tinytuya

@uzlonewolf

uzlonewolf commented Sep 23, 2025

Copy link
Copy Markdown
Collaborator

That is correct. You couldn't use it without then calling the private _send_receive() anyway. Edit: looks like you could use it with .send().

Why do you need to use it instead of simply calling set_value()/set_multiple_values()?

@3735943886

Copy link
Copy Markdown
Collaborator
# The IR Commands JSON has the following format:
command = {
    "control": "send_ir",
    "head": "",
    "key1": "[[TO_BE_REPLACED]]",
    "type": 0,
    "delay": 300,
}
# Sending the IR command:
payload = d.generate_payload(tinytuya.CONTROL, {"201": json.dumps(command)})
d.send(payload)

Can I use set_value or set_multiple_values instead of this code?

@uzlonewolf

Copy link
Copy Markdown
Collaborator

Yes, calling set_value() calls generate_payload with tinytuya.CONTROL. That code is equivalent to:

# The IR Commands JSON has the following format:
command = {
    "control": "send_ir",
    "head": "",
    "key1": "[[TO_BE_REPLACED]]",
    "type": 0,
    "delay": 300,
}
# Sending the IR command:
d.set_value(201, json.dumps(command))

@uzlonewolf

Copy link
Copy Markdown
Collaborator

I've been using this script to test callbacks and multi-device scanning, and it seems to work pretty good. The only problem is asyncio.TaskGroup() is only available on python 3.11+.

import asyncio
import tinytuya

tinytuya.set_debug()

async def data_handler( device, data, tuya_message ):
    # do something with the received data here
    print( 'in data handler', device.id, data, tuya_message )
    #print(devices)

async def connected( device, error ):
    print( 'connected?', device.id, error )
    # connection to device established, or failed
    if not error:
        print( 'req status', device.id )
        await device.status()
    else:
        print( 'connect error', device.id, error )

async def disconnected( device, error ):
    # device disconnected
    print( 'device disconnected', device.id, error )

devices = {}
device_list = (
    { 'id': 'eb...ut', 'key': "", 'name': 'Feit non-CCT', 'ver': 3.5, 'addr': None },
    { 'id': 'eb..ds', 'key': '', 'name': 'Geeni BW229 Smart Filament Bulb', 'ver': 3.4, 'addr': None },
)

async def main():
    async with asyncio.TaskGroup() as tg:
        for devdata in device_list:
            d = tinytuya.DeviceAsync( devdata['id'], address=devdata['addr'], local_key=devdata.get('key'), version=devdata['ver'] )

            print(d, d.version)

            d.register_response_handler( data_handler )
            d.register_connect_handler( connected )
            d.register_disconnect_handler( disconnected )

            task = tg.create_task( d.start_receiving(True) )
            hb = tg.create_task( d.start_heartbeats(False) )

            devices[d.id] = (d, task, hb)

    # The await is implicit when the context manager exits.

asyncio.run( main() )

@3735943886

3735943886 commented Sep 23, 2025

Copy link
Copy Markdown
Collaborator

Latest DeviceAsync has been experiencing delays with methods like set_value. The same code with same devices was instant response at the initial v2-async commit.
The code below simply turns a light switch on and off three times, using both the latest v2-async branch and the initial v2-async commit for comparison.

import asyncio
import tinytuya
import time

d = None
async def device_routine(id, ip, key, ver):
  global d
  async with tinytuya.DeviceAsync(id, ip, key, version = ver, persist = True) as d:
    while(True):
      data = await d.receive()
      print('Received Payload: %r' % data)
      await d.heartbeat()

async def main():
  global d
  task = asyncio.create_task(device_routine('eb7ba8427911a8ccbda92w', '', '', 3.4))
  await asyncio.sleep(1)
  try:
    await d.status(nowait=True)
    nowait = True
  except:
    nowait = False
  if nowait:
    start_time = time.perf_counter()
    await d.turn_off(1, nowait=True)
    await d.turn_on(1, nowait=True)
    await d.turn_off(1, nowait=True)
    await d.turn_on(1, nowait=True)
    await d.turn_off(1, nowait=True)
    await d.turn_on(1, nowait=True)
    end_time = time.perf_counter()
  else:
    start_time = time.perf_counter()
    await d.turn_off(1)
    await d.turn_on(1)
    await d.turn_off(1)
    await d.turn_on(1)
    await d.turn_off(1)
    await d.turn_on(1)
    end_time = time.perf_counter()

  await asyncio.sleep(1)
  elapsed_time = end_time - start_time
  print(f"elapsed time: {elapsed_time:.4f} secs")

if __name__ == "__main__":
  asyncio.run(main())

result with latest v2-async branch

(tuya) root@nmwork:/home/nmwork/script/tuya# python3 monitor_async.py
Received Payload: {}
Received Payload: {'protocol': 4, 't': 1758600039, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758600039, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: {'protocol': 4, 't': 1758600039, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758600040, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: {'protocol': 4, 't': 1758600040, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758600040, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
elapsed time: 4.4025 secs
(tuya) root@nmwork:/home/nmwork/script/tuya# python3 monitor_async.py
Received Payload: {}
Received Payload: {'protocol': 4, 't': 1758600048, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758600049, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: {'protocol': 4, 't': 1758600049, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758600049, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: {'protocol': 4, 't': 1758600049, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758600049, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
elapsed time: 5.0027 secs
(tuya) root@nmwork:/home/nmwork/script/tuya#

result with initial v2-async commit

(tuya) root@nmwork:/home/nmwork/script/tuya# python3 monitor_async.py
Received Payload: {'dps': {'1': True, '2': True, '7': 0, '8': 0, '14': 'off', '17': '', '18': '', '19': ''}}
Received Payload: {'protocol': 4, 't': 1758599959, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758599959, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: {'protocol': 4, 't': 1758599959, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758599959, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: {'protocol': 4, 't': 1758599959, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758599959, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: None
elapsed time: 0.0687 secs
(tuya) root@nmwork:/home/nmwork/script/tuya# python3 monitor_async.py
Received Payload: {'dps': {'1': True, '2': True, '7': 0, '8': 0, '14': 'off', '17': '', '18': '', '19': ''}}
Received Payload: {'protocol': 4, 't': 1758599964, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758599964, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: {'protocol': 4, 't': 1758599964, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758599964, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: {'protocol': 4, 't': 1758599964, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758599964, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: None
elapsed time: 0.0679 secs
(tuya) root@nmwork:/home/nmwork/script/tuya#

@uzlonewolf

uzlonewolf commented Sep 23, 2025

Copy link
Copy Markdown
Collaborator

I originally had it locking for both send and receive as if you tried to do both using different tasks (such as in your example) while the device was not connected then the 2 different _ensure_connection() calls would step on each other. I just pushed a tweak so it skips the device locking if you're only sending or only receiving and the device connection is already open. If a connection is already established then simultaneous sends should be fine, and there is already a separate read lock to ensure simultaneous reads cannot happen. I may back it off a bit more so only receives and connection attempts are locked, I'll need to think about this a bit.

@3735943886

Copy link
Copy Markdown
Collaborator

Awesome! It works perfectly now.

(tuya) root@nmwork:/home/nmwork/script/tuya# python3 ./monitor_async.py
Received Payload: {'protocol': 4, 't': 1758606818, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758606818, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: {'protocol': 4, 't': 1758606818, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758606818, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: {'protocol': 4, 't': 1758606818, 'data': {'dps': {'1': False}}, 'dps': {'1': False}}
Received Payload: {'protocol': 4, 't': 1758606818, 'data': {'dps': {'1': True}}, 'dps': {'1': True}}
Received Payload: {}
elapsed time: 0.0054 secs
(tuya) root@nmwork:/home/nmwork/script/tuya#

@uzlonewolf

Copy link
Copy Markdown
Collaborator

To expound on my dislike for generate_payload(), the problem is the returned payload can change based upon the device version/type and yet there is nothing stopping the user from saving the result and using it later. The following code will fail:

d = Device('..', address='Auto')
payload = d.generate_payload(...)
d.status() # address=Auto causes a detect, and the device version changes to, say, v3.5
d.send(payload) # will fail because v3.5 requires some payload tweaks

Device22 and some gateway/Zigbee changes can also change the returned payload. Changing _generate_payload() to be private and making everyone use set_value()/set_multiple_values() instead side-steps these payload issues.

@3735943886

3735943886 commented Sep 24, 2025

Copy link
Copy Markdown
Collaborator

It seems that send cannot be used independently (or at least not easily?) without generate_payload. Another approach might be to make the current send method private (e.g., _send) and expose a new public send method with the same arguments as _generate_payload. This way, send would handle both payload generation and transmission in a single, atomic operation, reducing potential issues. What do you think?

@uzlonewolf

Copy link
Copy Markdown
Collaborator

I agree with making _send() private, but do we really need to expose raw payload generation? Where would that be useful?

@3735943886

Copy link
Copy Markdown
Collaborator

I agree with making _send() private, but do we really need to expose raw payload generation? Where would that be useful?

I don’t think it’s useful at the moment, but I’m just wondering if it’s being used somewhere since it was provided. If you think it’s unnecessary, I’m fine with your decision.

@3735943886

3735943886 commented Jun 4, 2026

Copy link
Copy Markdown
Collaborator

@jasonacox @uzlonewolf

Since the async PR hasn't seen much recent activity, I’ve been wondering if there might be an alternative path to non-blocking I/O that doesn't require a full transition to asyncio.

One concern is that adopting asyncio throughout the codebase could introduce major API and architectural changes, plus async/await propagation. Given TinyTuya's lightweight nature and existing design philosophy, I’m not entirely sure a full asyncio architecture is the best fit.

Do you think a select/selectors-based approach could be worth exploring? It seems like it might support non-blocking socket operations while preserving much of the current synchronous API and backward compatibility.

I think there’s some progress on the async side already, but it’s not like it’s a huge sunk cost — so I’m not suggesting this lightly, but I’m genuinely curious whether a select/selectors + callback-driven design might ultimately be simpler and more maintainable for this project.

You have much more experience here than I do, so I’d really value your thoughts on whether such an approach would be viable or beneficial.

@3735943886

3735943886 commented Jun 5, 2026

Copy link
Copy Markdown
Collaborator

Monitor (TBD) — Concept for Single-Thread, Multi-Device Status Monitoring

Design / concept note for tinytuya.Monitor.

1. Motivation

TinyTuya is intentionally small and, in steady state, very robust: a device is
a thin object around one TCP socket, and the protocol codec
(pack_message/unpack_message/parse_header, AESCipher) is already cleanly
separated from I/O. I propose adding asynchronous, multi-device monitoring without
disturbing that core.

The problem only appears when watching many devices at once. Today there are
two patterns:

  • examples/async_send_receive.py — a single device, one loop:
    receive() / heartbeat(nowait=True) / occasional status().
  • examples/threading.py — many devices, one OS thread per device, each
    blocked in device.receive().

Monitor generalizes the first to N devices and replaces the second: it watches
any number of devices on a single thread using selectors
(select/poll/epoll), and delivers updates through callbacks. No asyncio,
no per-device threads, no new dependencies.

2. Key observation: only receiving needs to be non-blocking

Almost every command already has a non-blocking form. set_value,
set_status, set_multiple_values, turn_on/off, set_colour, heartbeat,
… all accept nowait=True, which only writes to the socket and returns
immediately (the reply, if any, comes back later). Sending is therefore already
"async enough".

The one genuinely blocking operation on the send side is connecting
(TCP connect, plus the v3.4/3.5 session-key handshake). That is handled once,
up front (see §7), and is the only blocking step that remains.

So the part that actually has to become non-blocking is receive() — and
especially so when many devices must be watched concurrently. That single
insight is what Monitor is built around.

3. What "non-blocking receive" actually requires

receive() reads from a TCP stream, so a "socket is readable" event from
selectors does not mean a whole message is available — it may be a partial
frame, or several frames at once. Monitor therefore keeps a per-device
receive buffer
and reassembles frames:

  1. selectors reports a device socket readable.
  2. Monitor does one non-blocking recv() and appends the bytes to that
    device's buffer.
  3. It then extracts complete frames from the buffer (find the prefix, read
    the length, wait until the whole frame has arrived), decodes each one, and
    fires the callback. Leftover bytes stay buffered for the next event.

This buffer-and-reassemble step is the essence of asynchronous receive; a frame
split across two recv() calls, leading garbage, or back-to-back frames are all
handled naturally.

4. No duplicated protocol logic

Monitor does not re-implement framing or decryption. The framing logic
(prefix search + length accumulation) lives in one place,
message_helper.find_frame(), and is shared by both the blocking read path
(XenonDevice._receive()) and Monitor. Decoding and dispatch reuse the
device's own XenonDevice._process_message(). This keeps the crypto/parsing in
a single source of truth — important for staying tiny and robust.

5. One thread, many devices, callbacks

A single Monitor thread runs the reactor loop:

select() ─▶ for each readable socket: recv + reassemble + decode ─▶ callback

Callbacks (on_status, on_connect, on_disconnect) can be set globally or
per device. Gateways are supported transparently (see §8).

6. Thread-safety model: the Monitor owns all socket I/O

The danger with sockets is concurrent access — two threads touching the same
socket, or the shared seqno/cipher state, at the same time. A single thread
doing recv → process → send → recv … sequentially is the standard, race-free
event-loop pattern (the same model asyncio, Node.js and nginx use); "one
component does both directions" is not itself dangerous.

Monitor makes that guarantee concrete: the reactor thread is the only thread
that ever touches a monitored device's socket.
Other threads never call the
device directly. Instead they hand work to the reactor:

mon.send(device, 'set_value', 1, True)   # thread-safe; runs on the reactor thread

send() enqueues the command and wakes the reactor (via an internal self-pipe);
the reactor performs the actual nowait write. This eliminates the
seqno/cipher/interleaved-sendall races by construction — no locks needed.

Rule: while a device is registered with a Monitor, its blocking command
methods must not be called directly from another thread. Use mon.send(...).
When the reactor is instead driven from a caller's own loop (mon.poll()),
everything is already single-threaded and device.method(..., nowait=True)
may be called directly.

7. Heartbeats and the connection lifecycle

A device's lifecycle has two distinct phases:

  • Connect (blocking, once): mon.add(device) opens the persistent
    connection and, for v3.4/3.5, performs the session-key handshake. This is the
    only blocking step, done up front.
  • Steady-state (fully non-blocking): the reactor only does select(),
    recv(), and nowait writes — it never blocks.

Heartbeats are sent by the Monitor. Tuya devices close the connection after
~30 s of silence, so the reactor sends heartbeat(nowait=True) per device on a
timer (heartbeat_interval, default 12 s). This is consistent with §6 — a
heartbeat is just another nowait send, executed on the reactor's single
thread, so it never races and never blocks the receive loop. The user does not
have to manage heartbeats manually (unlike examples/threading.py).

Reconnect is the one open item. When recv() returns end-of-stream the
reactor detects the drop and reports it via on_disconnect. Re-establishing the
connection is blocking (connect + handshake), so doing it inline would briefly
stall reception for the other devices. How to handle reconnect (a dedicated
worker, or delegating to the application) is the only remaining design decision;
steady-state operation is already completely non-blocking.

8. Gateways and sub-devices (cid routing)

A Zigbee gateway multiplexes several logical sub-devices over one socket,
each identified by a cid. Monitor routes each decoded frame to the matching
child device and fires that child's callback. Register the gateway /
top-level device
; its children are delivered automatically.

9. Side effect: the status cache stays warm

Because decoding reuses _process_message(), every update also refreshes the
device's normal cache. device.cached_status() / _last_status therefore stay
current, so the latest known state can be read on demand in addition to
being received via callbacks.

10. Scope and non-goals

  • Monitor is about receiving from many devices on one thread. Sending is
    already non-blocking via nowait=True; Monitor only marshals those sends to
    keep them thread-safe.
  • It requires a persistent connection (persist=True): the whole model
    assumes a kept-open socket. Non-persistent devices close the socket after each
    command and are not monitoring targets.
  • It is not asyncio and adds no dependenciesselectors is in the
    standard library, and the existing blocking API is untouched.

11. At a glance

import tinytuya

def on_status(device, result):
    print(device.id, result.get('dps'))

mon = tinytuya.Monitor(on_status=on_status)

for cfg in my_devices:
    d = tinytuya.OutletDevice(cfg.id, cfg.ip, cfg.key, version=3.3, persist=True)
    mon.add(d)                 # blocking connect happens here, once

mon.start()                    # reactor runs on one daemon thread
...
mon.send(d, 'set_value', 1, True)   # thread-safe command
...
mon.stop()

Or drive it from a caller's own loop instead of start():

mon = tinytuya.Monitor(on_status=on_status)
mon.add(d)
while True:
    mon.poll(timeout=1.0)      # one reactor iteration; direct nowait sends OK here

@jasonacox

Copy link
Copy Markdown
Owner Author

Thanks @3735943886 - this is a pragmatic, low-risk proposal that solves the multi-device monitoring problem without breaking anything. I want to play with it a bit but it could be a good candidate for a near-term release alongside the sync API.

However, I still believe that the harder asyncio work is more long-term relevant (e.g. asyncio native apps like HA). I'm going to confess that it gives me heartburn. I like the simplicity of tinytuya w/o asyncio, but I know it comes with practical limitation when you want to use it for scale and with other popular platforms and frameworks.

This PR has been on pause for too long, and I will also confess that I was waiting for either a python asyncio guru to chime in or a Mythos++ level AI to make it trivial and just do all the gross work for us. 🤣 But all to sya, I wanted to get back to this. My new role at works is keeping me super busy and making it hard to focus on this. And Sam (and his LLM) isn't quite ready for this type of task just yet (and he may disagree :). But I do hope to get back to this soon.

@jasonacox-sam can you create a branch and PR that implements the above Monitor (TBD) proposal by @3735943886 ? I want to use that for exploration/testing.

@jasonacox-sam

Copy link
Copy Markdown
Collaborator

Created PR #712Add Monitor — single-thread multi-device status monitoring

Implements the Monitor (TBD) proposal by @3735943886 verbatim:

  • tinytuya/core/Monitor.py — selector-based reactor with per-device receive buffers, frame reassembly, automatic heartbeats, callback dispatch (on_status/on_connect/on_disconnect), gateway/cid routing, and thread-safe command queue
  • Two operating modes: daemon thread (start()/stop()) or manual poll() loop
  • No new dependenciesselectors is stdlib, protocol logic reuses existing parse_header()/unpack_message()/_decode_payload()
  • Examples included for both modes

The one open item from the proposal is reconnect strategy (§7) — current implementation fires on_disconnect but does not auto-reconnect, since reconnect is blocking and would stall the reactor.

Ready for real-device testing. 🧪

— Sam ⚙️

@uzlonewolf

Copy link
Copy Markdown
Collaborator

So, I've actually been thinking about this quite a lot over the last few months, usually followed by "man I really wish I had time to implement this" 😂😭 Although I still hate how Python implemented asyncio I'm actually pretty excited about getting TinyTuya moved over to it.

There are a number of things I would like to see before "going live" with it though. Nearly all of these are not actually related to ayncio but are more of a "this is a major update to TinyTuya so I would like to see these included in that release" type of thing.

  1. Better documentation. I got a good start with Documentation #685 but never had time to finish it.

  2. Better tests. I hate the current test suite and with how it's currently implemented I'm not even sure what exactly it is testing. Instead, I would like to expand my "fake v3.5 device" to have it mimic various devices/protocol versions and act just like a real device we can throw commands at to check responses. This way we can test the entire chain from broadcast discovery to device control.

  3. Overhaul single-shot command receiving. A number of devices return the status as a sequence of separate packets, and all devices could send an unrelated async update or have an old packet queued up in the receive buffer, so I would like an explicit (non-async) .receive() call to wait for, say, 100ms and return a merged dict containing every DP received during that time. As it is now, things like BulbDevice detection will fail if an unrelated/old update is returned before the .status() result.

  4. Rework logging. Enabling debug output quickly becomes unmanageable when you have dozens/hundreds of devices, and trying to find the one device you're trying to troubleshoot in that mess is next to impossible. As such I would like to resurrect Rework logging #300. Back then I ran into issues where certain calls 5+ functions deep couldn't easily be converted and so it got moved to my "worry about this later" pile. Since this async rewrite is basically starting from scratch it should be a lot easier to implement if we do it now.

  5. Dogfood it. My own stuff is still using a like 2 year old version of TinyTuya because it's been "good enough" for what I've been doing 😂. My multi-server has major issues though and is a hot mess and I really want to get it moved over to asyncio.

@3735943886

3735943886 commented Jun 11, 2026

Copy link
Copy Markdown
Collaborator
  1. Better tests. I hate the current test suite and with how it's currently implemented I'm not even sure what exactly it is testing. Instead, I would like to expand my "fake v3.5 device" to have it mimic various devices/protocol versions and act just like a real device we can throw commands at to check responses. This way we can test the entire chain from broadcast discovery to device control.

I actually already have a working solution called tuyamock built on tinytuya for testing clients without hardware.
Admittedly, using a tinytuya-based mock creates a bit of a "chicken and egg" situation. However, I believe it still provides great value as a regression test suite to ensure future updates to tinytuya don't introduce breaking changes for clients.
Would it be okay if I open a PR to integrate this into the repository for our tests? @jasonacox @uzlonewolf

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants