trying out asyncio

This commit is contained in:
dekuNukem
2025-08-24 00:10:07 +01:00
parent 53c90396bb
commit aca32d2fda
2 changed files with 21 additions and 19 deletions

View File

@@ -1,14 +1,14 @@
# scp ./* pi@192.168.1.98:~/usb4vc/rpi_app
# scp ./* pi@192.168.1.74:~/usb4vc/rpi_app
# sh sync.sh; ssh -t pi@192.168.1.62 "pkill python3;cd ~/usb4vc/rpi_app;python3 usb4vc_main.py"
# sh sync.sh; ssh -t pi@192.168.1.74 "pkill python3;cd ~/usb4vc/rpi_app;python3 usb4vc_main.py"
scp ./* pi@192.168.1.98:~/usb4vc/rpi_app
# ssh -t pi@192.168.1.98 "pkill python3;cd ~/usb4vc/rpi_app;python3 usb4vc_main.py"
# ssh -t pi@192.168.1.98 "pkill python3;cd ~/usb4vc/rpi_app;python3 usb4vc_check_update.py"
scp ./* pi@192.168.1.74:~/usb4vc/rpi_app
# ssh -t pi@192.168.1.74 "pkill python3;cd ~/usb4vc/rpi_app;python3 usb4vc_main.py"
# ssh -t pi@192.168.1.74 "pkill python3;cd ~/usb4vc/rpi_app;python3 usb4vc_check_update.py"
# ssh -t pi@192.168.1.98 "pkill python3;cd ~/usb4vc/rpi_app;python3 firmware_flasher.py /home/pi/usb4vc/firmware/PBFW_IBMPC_PBID1_V0_1_5.hex /home/pi/usb4vc/firmware/ibmpc_test.hex"
# ssh -t pi@192.168.1.98 "pkill python3;cd ~/usb4vc/rpi_app;python3 bb_tester.py"
# ssh -t pi@192.168.1.74 "pkill python3;cd ~/usb4vc/rpi_app;python3 firmware_flasher.py /home/pi/usb4vc/firmware/PBFW_IBMPC_PBID1_V0_1_5.hex /home/pi/usb4vc/firmware/ibmpc_test.hex"
# ssh -t pi@192.168.1.74 "pkill python3;cd ~/usb4vc/rpi_app;python3 bb_tester.py"
# pi@169.254.245.21 bb_tester
# ssh pi@192.168.1.74
ssh -t pi@192.168.1.98 "pkill python3;cd ~/usb4vc/rpi_app;python3 usb4vc_evdev_test.py"
ssh -t pi@192.168.1.74 "pkill python3;cd ~/usb4vc/rpi_app;python3 usb4vc_evdev_test.py"

View File

@@ -13,7 +13,7 @@ import asyncio
hid_device_info_dict = {}
POLL_INTERVAL_SEC = 1.0 # how often we rescan for devices
POLL_INTERVAL_SEC = 1 # how often we rescan for devices
def hid_info_dict_add(dev_path):
if dev_path in hid_device_info_dict:
@@ -30,34 +30,37 @@ def hid_info_dict_add(dev_path):
'is_mouse':False,
'is_gp':False,
}
this_cap = this_device.capabilities(verbose=True)
cap_str = str(this_cap)
if 'BTN_LEFT' in cap_str and "EV_REL" in cap_str:
info_dict['is_mouse'] = True
if 'KEY_ENTER' in cap_str and "KEY_Y" in cap_str:
info_dict['is_kb'] = True
print(this_cap)
hid_device_info_dict[dev_path] = info_dict
def hid_info_dict_remove(dev_path):
hid_device_info_dict.pop(dev_path, None)
async def read_device_events(path: str):
"""
Open a single device and print its events until it goes away
(or this task is cancelled).
"""
dev = None
try:
dev = evdev.InputDevice(path)
# Announce the device
print(f"[attach] {path} name='{dev.name}'")
hid_info_dict_add(path)
# Async loop over input events
async for event in dev.async_read_loop():
# Skip non-value events if you want; here we show everything
print(f"!!!!!!!!!!!!!!! {path}: {evdev.categorize(event)}")
pcard_spi.xfer([0,0,0,0,0,0,0,0])
except (FileNotFoundError, OSError) as e:
# Common when the device is unplugged (e.g. [Errno 19] No such device)
print(f"[disconnect] {path}: {e}")
hid_info_dict_remove(path)
except asyncio.CancelledError:
# We were asked to stop (e.g. device disappeared or program exiting)
raise
finally:
if dev is not None:
@@ -87,7 +90,6 @@ async def watch_all_devices():
try:
current_paths = set([x for x in evdev.list_devices() if is_ignored_device(x) is False])
except Exception as e:
# Very rare, but if listing fails, keep previous state and retry
print(f"[warn] list_devices failed: {e}")
current_paths = known_paths