implementing revamped device scan and data read

This commit is contained in:
dekunukem
2021-12-28 11:54:21 +00:00
parent 3bec922baa
commit bf5195ff8c
4 changed files with 303 additions and 209 deletions

View File

@@ -2,26 +2,35 @@ import time
import evdev
# sh sync.sh; ssh -t pi@192.168.1.56 "pkill python3;cd ~/usb4vc;python3 evdev_test.py"
# sh sync.sh; ssh -t pi@192.168.1.56 "pkill python3;cd ~/usb4vc;python3 usb4vc_main.py"
def get_input_devices():
result = []
available_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
for this_device in available_devices:
dev_dict = {
'path':this_device.path,
'name':this_device.name,
'axes_info':{},
'is_kb':False,
'is_mouse':False,
'is_gp':False,
}
cap_str = str(this_device.capabilities(verbose=True))
if 'BTN_LEFT' in cap_str and "EV_REL" in cap_str:
dev_dict['is_mouse'] = True
if 'KEY_ENTER' in cap_str and "KEY_Y" in cap_str:
dev_dict['is_kb'] = True
if 'EV_ABS' in cap_str and "BTN_SOUTH" in cap_str:
dev_dict['is_gp'] = True
try:
for item in this_device.capabilities()[EV_ABS]:
dev_dict['axes_info'][item[0]] = item[1]._asdict()
except Exception as e:
print('get_input_devices exception:', e)
result.append(dev_dict)
return result
start = time.time_ns()
keyboard_devices = []
mouse_devices = []
gamepad_devices = []
available_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
for this_device in available_devices:
print(this_device.path, this_device.name)
this_capability = str(this_device.capabilities(verbose=True))
if 'BTN_LEFT' in this_capability and "EV_REL" in this_capability:
print('this is a mouse')
mouse_devices.append(this_device)
elif 'KEY_ENTER' in this_capability and "KEY_Y" in this_capability:
print('this is a keyboard')
keyboard_devices.append(this_device)
elif 'EV_ABS' in this_capability and "BTN_SOUTH" in this_capability:
print('this is a gamepad')
gamepad_devices.append(this_device)
print("-----")
print(time.time_ns() - start)
get_input_devices()
print((time.time_ns() - start) / 1000000)

View File

@@ -1,3 +1,215 @@
def raw_input_event_worker():
mouse_status_dict = {}
gamepad_status_dict = {}
print("raw_input_event_worker started")
while 1:
time.sleep(1)
# ----------------- KEYBOARD PARSING -----------------
for key in list(keyboard_opened_device_dict):
try:
data = keyboard_opened_device_dict[key][0].read(16)
except OSError:
keyboard_opened_device_dict[key][0].close()
del keyboard_opened_device_dict[key]
print("keyboard disappeared:", key)
continue
if data is None:
continue
data = list(data[8:])
if data[0] == EV_KEY:
pcard_spi.xfer(make_keyboard_spi_packet(data, keyboard_opened_device_dict[key][1]))
# ----------------- MOUSE PARSING -----------------
for key in list(mouse_opened_device_dict):
try:
data = mouse_opened_device_dict[key][0].read(16)
except OSError:
mouse_opened_device_dict[key][0].close()
del mouse_opened_device_dict[key]
print("mouse disappeared:", key)
continue
if data is None:
continue
"""
0 - 1 event_type
2 - 3 key code
4 - 7 key status
"""
data = list(data[8:])
# mouse movement and scrolling
# buffer those values until a SYNC event
if data[0] == EV_REL:
if data[2] == REL_X:
rawx = int.from_bytes(data[4:6], byteorder='little', signed=True)
rawx = int(rawx * usb4vc_ui.get_mouse_sensitivity()) & 0xffff
mouse_status_dict["x"] = list(rawx.to_bytes(2, byteorder='little'))
if data[2] == REL_Y:
rawy = int.from_bytes(data[4:6], byteorder='little', signed=True)
rawy = int(rawy * usb4vc_ui.get_mouse_sensitivity()) & 0xffff
mouse_status_dict["y"] = list(rawy.to_bytes(2, byteorder='little'))
if data[2] == REL_WHEEL:
mouse_status_dict["scroll"] = data[4:6]
# mouse button pressed, send it out immediately
if data[0] == EV_KEY:
key_code = data[3] * 256 + data[2]
if 0x110 <= key_code <= 0x117:
mouse_status_dict[key_code] = data[4]
mouse_status_dict['x'] = [0, 0]
mouse_status_dict['y'] = [0, 0]
mouse_status_dict['scroll'] = [0, 0]
to_transfer = make_mouse_spi_packet(mouse_status_dict, mouse_opened_device_dict[key][1])
pcard_spi.xfer(to_transfer)
"""
Logitech unifying receiver identifies itself as a mouse,
so need to handle keyboard presses here too for compatibility
"""
if 0x1 <= key_code <= 127:
pcard_spi.xfer(make_keyboard_spi_packet(data, mouse_opened_device_dict[key][1]))
# SYNC event happened, send out an update
if data[0] == EV_SYN and data[2] == SYN_REPORT and len(mouse_status_dict) > 0:
to_transfer = make_mouse_spi_packet(mouse_status_dict, mouse_opened_device_dict[key][1])
pcard_spi.xfer(to_transfer)
mouse_status_dict['x'] = [0, 0]
mouse_status_dict['y'] = [0, 0]
mouse_status_dict['scroll'] = [0, 0]
# ----------------- GAMEPAD PARSING -----------------
for key in list(gamepad_opened_device_dict):
try:
data = gamepad_opened_device_dict[key][0].read(16)
except OSError:
gamepad_opened_device_dict[key][0].close()
del gamepad_opened_device_dict[key]
print("gamepad disappeared:", key)
continue
if data is None:
continue
data = list(data[8:])
gamepad_id = gamepad_opened_device_dict[key][1]
if gamepad_id not in gamepad_status_dict:
gamepad_status_dict[gamepad_id] = {}
# gamepad button presses, send out immediately
if data[0] == EV_KEY:
key_code = data[3] * 256 + data[2]
if not (GP_BTN_SOUTH <= key_code <= GP_BTN_THUMBR):
continue
gamepad_status_dict[gamepad_id][key_code] = data[4]
# joystick / analogue trigger movements, cache until next SYNC event
if data[0] == EV_ABS:
abs_axes = data[3] * 256 + data[2]
abs_value = int.from_bytes(data[4:8], byteorder='little', signed=True)
gamepad_status_dict[gamepad_id][abs_axes] = abs_value
# SYNC report, update now
if data[0] == EV_SYN and data[2] == SYN_REPORT:
gp_to_transfer, kb_to_transfer, mouse_to_transfer = make_gamepad_spi_packet(gamepad_status_dict, gamepad_id, gamepad_opened_device_dict[key][2])
pcard_spi.xfer(gp_to_transfer)
if kb_to_transfer is not None:
time.sleep(0.001)
pcard_spi.xfer(kb_to_transfer)
if mouse_to_transfer is not None:
time.sleep(0.001)
pcard_spi.xfer(mouse_to_transfer)
# ----------------- PBOARD INTERRUPT -----------------
if GPIO.event_detected(SLAVE_REQ_PIN):
slave_result = None
for x in range(2):
slave_result = pcard_spi.xfer(make_spi_msg_ack())
print(int(time.time()), slave_result)
if slave_result[SPI_BUF_INDEX_MAGIC] == SPI_MISO_MAGIC and slave_result[SPI_BUF_INDEX_MSG_TYPE] == SPI_MISO_MSG_TYPE_KB_LED_REQUEST:
try:
change_kb_led(slave_result[3], slave_result[4], slave_result[5])
change_kb_led(slave_result[3], slave_result[4], slave_result[5])
except Exception as e:
print('change_kb_led exception:', e)
def usb_device_scan_worker():
print("usb_device_scan_worker started")
while 1:
time.sleep(0.75)
try:
device_file_list = os.listdir(input_device_path)
except FileNotFoundError:
print("No input devices found")
continue
except Exception as e:
print('list input device exception:', e)
continue
mouse_list = [os.path.join(input_device_path, x) for x in device_file_list if 'event-mouse' in x]
keyboard_list = [os.path.join(input_device_path, x) for x in device_file_list if 'event-kbd' in x]
gamepad_list = [os.path.join(input_device_path, x) for x in device_file_list if 'event-joystick' in x]
for item in keyboard_list:
if item not in keyboard_opened_device_dict:
try:
this_file = open(item, "rb")
os.set_blocking(this_file.fileno(), False)
device_index = 0
try:
device_index = sum([int(x) for x in item.split(':')[1].split('.')][:2])
except:
pass
keyboard_opened_device_dict[item] = (this_file, device_index)
print("opened keyboard", keyboard_opened_device_dict[item][1], ':' , item)
except Exception as e:
print("keyboard open exception:", e)
continue
for item in mouse_list:
if item not in mouse_opened_device_dict:
try:
this_file = open(item, "rb")
os.set_blocking(this_file.fileno(), False)
device_index = 0
try:
device_index = sum([int(x) for x in item.split(':')[1].split('.')][:2])
except:
pass
mouse_opened_device_dict[item] = (this_file, device_index)
print("opened mouse", mouse_opened_device_dict[item][1], ':' , item)
except Exception as e:
print("mouse open exception:", e)
continue
for item in gamepad_list:
if item not in gamepad_opened_device_dict:
try:
this_file = open(item, "rb")
os.set_blocking(this_file.fileno(), False)
device_index = 0
try:
device_index = sum([int(x) for x in item.split(':')[1].split('.')][:2])
except:
pass
gamepad_info = subprocess.getoutput("timeout 0.5 evtest " + str(item))
gamepad_opened_device_dict[item] = (this_file, device_index, parse_evtest_info(gamepad_info))
print("opened gamepad", gamepad_opened_device_dict[item][1], ':' , item)
except Exception as e:
print("gamepad open exception:", e)
continue
def parse_evtest_info(input_str):
input_str = input_str.replace('\r', '').split('Event type 3 (EV_ABS)\n')[-1].split('Event type')[0]
current_axis_code = None
info_dict = {}
for line in input_str.split('\n'):
line = line.strip().replace('\t', '')
if " (ABS_" in line:
current_axis_code = int(line.split('Event code ')[1].split(' (ABS_')[0])
info_dict[current_axis_code] = {}
continue
if current_axis_code is None or len(line) < 3:
continue
line_split = line.split(' ')
info_dict[current_axis_code][line_split[0].lower()] = int(line_split[-1])
return info_dict
# print(time.time(), '-----------')
print(gp_spi_msg)

View File

@@ -25,11 +25,11 @@ GPIO.setup(PBOARD_RESET_PIN, GPIO.IN)
GPIO.setup(PBOARD_DFU_PIN, GPIO.OUT)
GPIO.output(PBOARD_DFU_PIN, GPIO.LOW)
# reset_pboard()
reset_pboard()
usb4vc_ui.ui_init()
# usb4vc_ui.ui_init()
usb4vc_ui.ui_worker.start()
# usb4vc_ui.ui_worker.start()
usb4vc_usb_scan.usb_device_scan_thread.start()
usb4vc_usb_scan.raw_input_event_parser_thread.start()

View File

@@ -2,8 +2,8 @@ import os
import sys
import time
import spidev
import evdev
import threading
import subprocess
import RPi.GPIO as GPIO
import usb4vc_ui
import usb4vc_shared
@@ -29,9 +29,7 @@ GPIO.add_event_detect(SLAVE_REQ_PIN, GPIO.RISING)
pcard_spi = spidev.SpiDev(0, 0)
pcard_spi.max_speed_hz = 2000000
keyboard_opened_device_dict = {}
mouse_opened_device_dict = {}
gamepad_opened_device_dict = {}
ooopened_device_dict = {}
led_device_path = '/sys/class/leds'
input_device_path = '/dev/input/by-path/'
@@ -378,209 +376,84 @@ def raw_input_event_worker():
gamepad_status_dict = {}
print("raw_input_event_worker started")
while 1:
# ----------------- KEYBOARD PARSING -----------------
for key in list(keyboard_opened_device_dict):
for key in list(ooopened_device_dict):
try:
data = keyboard_opened_device_dict[key][0].read(16)
data = ooopened_device_dict[key]['file'].read(16)
except OSError:
keyboard_opened_device_dict[key][0].close()
del keyboard_opened_device_dict[key]
print("keyboard disappeared:", key)
print("Device disappeared:", ooopened_device_dict[key]['name'])
ooopened_device_dict[key]['file'].close()
del ooopened_device_dict[key]
continue
if data is None:
continue
data = list(data[8:])
if data[0] == EV_KEY:
pcard_spi.xfer(make_keyboard_spi_packet(data, keyboard_opened_device_dict[key][1]))
# ----------------- MOUSE PARSING -----------------
for key in list(mouse_opened_device_dict):
try:
data = mouse_opened_device_dict[key][0].read(16)
except OSError:
mouse_opened_device_dict[key][0].close()
del mouse_opened_device_dict[key]
print("mouse disappeared:", key)
continue
if data is None:
continue
"""
0 - 1 event_type
2 - 3 key code
4 - 7 key status
"""
data = list(data[8:])
# mouse movement and scrolling
# buffer those values until a SYNC event
if data[0] == EV_REL:
if data[2] == REL_X:
rawx = int.from_bytes(data[4:6], byteorder='little', signed=True)
rawx = int(rawx * usb4vc_ui.get_mouse_sensitivity()) & 0xffff
mouse_status_dict["x"] = list(rawx.to_bytes(2, byteorder='little'))
if data[2] == REL_Y:
rawy = int.from_bytes(data[4:6], byteorder='little', signed=True)
rawy = int(rawy * usb4vc_ui.get_mouse_sensitivity()) & 0xffff
mouse_status_dict["y"] = list(rawy.to_bytes(2, byteorder='little'))
if data[2] == REL_WHEEL:
mouse_status_dict["scroll"] = data[4:6]
# mouse button pressed, send it out immediately
print(data)
# event is a key press
if data[0] == EV_KEY:
key_code = data[3] * 256 + data[2]
if 0x110 <= key_code <= 0x117:
print('EV_KEY:', key_code)
# keyboard keys
if 0x1 <= key_code <= 248:
pcard_spi.xfer(make_keyboard_spi_packet(data, ooopened_device_dict[key]['id']))
# Mouse buttons
elif 0x110 <= key_code <= 0x117:
mouse_status_dict[key_code] = data[4]
mouse_status_dict['x'] = [0, 0]
mouse_status_dict['y'] = [0, 0]
mouse_status_dict['scroll'] = [0, 0]
to_transfer = make_mouse_spi_packet(mouse_status_dict, mouse_opened_device_dict[key][1])
pcard_spi.xfer(to_transfer)
"""
Logitech unifying receiver identifies itself as a mouse,
so need to handle keyboard presses here too for compatibility
"""
if 0x1 <= key_code <= 127:
pcard_spi.xfer(make_keyboard_spi_packet(data, mouse_opened_device_dict[key][1]))
pcard_spi.xfer(make_mouse_spi_packet(mouse_status_dict, ooopened_device_dict[key]['id']))
# SYNC event happened, send out an update
if data[0] == EV_SYN and data[2] == SYN_REPORT and len(mouse_status_dict) > 0:
to_transfer = make_mouse_spi_packet(mouse_status_dict, mouse_opened_device_dict[key][1])
pcard_spi.xfer(to_transfer)
mouse_status_dict['x'] = [0, 0]
mouse_status_dict['y'] = [0, 0]
mouse_status_dict['scroll'] = [0, 0]
# ----------------- GAMEPAD PARSING -----------------
for key in list(gamepad_opened_device_dict):
def get_input_devices():
result = []
available_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
for this_device in available_devices:
dev_dict = {
'path':this_device.path,
'name':this_device.name,
'axes_info':{},
'is_kb':False,
'is_mouse':False,
'is_gp':False,
}
cap_str = str(this_device.capabilities(verbose=True))
if 'BTN_LEFT' in cap_str and "EV_REL" in cap_str:
dev_dict['is_mouse'] = True
if 'KEY_ENTER' in cap_str and "KEY_Y" in cap_str:
dev_dict['is_kb'] = True
if 'EV_ABS' in cap_str and "BTN_SOUTH" in cap_str:
dev_dict['is_gp'] = True
try:
data = gamepad_opened_device_dict[key][0].read(16)
except OSError:
gamepad_opened_device_dict[key][0].close()
del gamepad_opened_device_dict[key]
print("gamepad disappeared:", key)
continue
if data is None:
continue
data = list(data[8:])
gamepad_id = gamepad_opened_device_dict[key][1]
if gamepad_id not in gamepad_status_dict:
gamepad_status_dict[gamepad_id] = {}
# gamepad button presses, send out immediately
if data[0] == EV_KEY:
key_code = data[3] * 256 + data[2]
if not (GP_BTN_SOUTH <= key_code <= GP_BTN_THUMBR):
continue
gamepad_status_dict[gamepad_id][key_code] = data[4]
# joystick / analogue trigger movements, cache until next SYNC event
if data[0] == EV_ABS:
abs_axes = data[3] * 256 + data[2]
abs_value = int.from_bytes(data[4:8], byteorder='little', signed=True)
gamepad_status_dict[gamepad_id][abs_axes] = abs_value
# SYNC report, update now
if data[0] == EV_SYN and data[2] == SYN_REPORT:
gp_to_transfer, kb_to_transfer, mouse_to_transfer = make_gamepad_spi_packet(gamepad_status_dict, gamepad_id, gamepad_opened_device_dict[key][2])
pcard_spi.xfer(gp_to_transfer)
if kb_to_transfer is not None:
time.sleep(0.001)
pcard_spi.xfer(kb_to_transfer)
if mouse_to_transfer is not None:
time.sleep(0.001)
pcard_spi.xfer(mouse_to_transfer)
# ----------------- PBOARD INTERRUPT -----------------
if GPIO.event_detected(SLAVE_REQ_PIN):
slave_result = None
for x in range(2):
slave_result = pcard_spi.xfer(make_spi_msg_ack())
print(int(time.time()), slave_result)
if slave_result[SPI_BUF_INDEX_MAGIC] == SPI_MISO_MAGIC and slave_result[SPI_BUF_INDEX_MSG_TYPE] == SPI_MISO_MSG_TYPE_KB_LED_REQUEST:
try:
change_kb_led(slave_result[3], slave_result[4], slave_result[5])
change_kb_led(slave_result[3], slave_result[4], slave_result[5])
except Exception as e:
print('change_kb_led exception:', e)
for item in this_device.capabilities()[EV_ABS]:
dev_dict['axes_info'][item[0]] = item[1]._asdict()
except Exception as e:
print('get_input_devices exception:', e)
result.append(dev_dict)
return result
def usb_device_scan_worker():
print("usb_device_scan_worker started")
while 1:
time.sleep(0.75)
try:
device_file_list = os.listdir(input_device_path)
except FileNotFoundError:
print("No input devices found")
continue
except Exception as e:
print('list input device exception:', e)
device_list = get_input_devices()
if len(device_list) == 0:
print('No input devices found')
continue
mouse_list = [os.path.join(input_device_path, x) for x in device_file_list if 'event-mouse' in x]
keyboard_list = [os.path.join(input_device_path, x) for x in device_file_list if 'event-kbd' in x]
gamepad_list = [os.path.join(input_device_path, x) for x in device_file_list if 'event-joystick' in x]
for item in keyboard_list:
if item not in keyboard_opened_device_dict:
for item in device_list:
if item['path'] in ooopened_device_dict:
continue
try:
this_file = open(item['path'], "rb")
os.set_blocking(this_file.fileno(), False)
item['file'] = this_file
try:
this_file = open(item, "rb")
os.set_blocking(this_file.fileno(), False)
device_index = 0
try:
device_index = sum([int(x) for x in item.split(':')[1].split('.')][:2])
except:
pass
keyboard_opened_device_dict[item] = (this_file, device_index)
print("opened keyboard", keyboard_opened_device_dict[item][1], ':' , item)
except Exception as e:
print("keyboard open exception:", e)
continue
for item in mouse_list:
if item not in mouse_opened_device_dict:
try:
this_file = open(item, "rb")
os.set_blocking(this_file.fileno(), False)
device_index = 0
try:
device_index = sum([int(x) for x in item.split(':')[1].split('.')][:2])
except:
pass
mouse_opened_device_dict[item] = (this_file, device_index)
print("opened mouse", mouse_opened_device_dict[item][1], ':' , item)
except Exception as e:
print("mouse open exception:", e)
continue
for item in gamepad_list:
if item not in gamepad_opened_device_dict:
try:
this_file = open(item, "rb")
os.set_blocking(this_file.fileno(), False)
device_index = 0
try:
device_index = sum([int(x) for x in item.split(':')[1].split('.')][:2])
except:
pass
gamepad_info = subprocess.getoutput("timeout 0.5 evtest " + str(item))
gamepad_opened_device_dict[item] = (this_file, device_index, parse_evtest_info(gamepad_info))
print("opened gamepad", gamepad_opened_device_dict[item][1], ':' , item)
except Exception as e:
print("gamepad open exception:", e)
continue
def parse_evtest_info(input_str):
input_str = input_str.replace('\r', '').split('Event type 3 (EV_ABS)\n')[-1].split('Event type')[0]
current_axis_code = None
info_dict = {}
for line in input_str.split('\n'):
line = line.strip().replace('\t', '')
if " (ABS_" in line:
current_axis_code = int(line.split('Event code ')[1].split(' (ABS_')[0])
info_dict[current_axis_code] = {}
continue
if current_axis_code is None or len(line) < 3:
continue
line_split = line.split(' ')
info_dict[current_axis_code][line_split[0].lower()] = int(line_split[-1])
return info_dict
item['id'] = int(item['path'][len(item['path'].rstrip('0123456789')):])
except:
item['id'] = 255
ooopened_device_dict[item['path']] = item
print("opened device:", item['name'])
except Exception as e:
print("Device open exception:", e, item)
raw_input_event_parser_thread = threading.Thread(target=raw_input_event_worker, daemon=True)
usb_device_scan_thread = threading.Thread(target=usb_device_scan_worker, daemon=True)