added evtest parsing

This commit is contained in:
dekunukem
2021-12-17 18:14:36 +00:00
parent 6e11de3304
commit acf4e89338

View File

@@ -3,6 +3,7 @@ import sys
import time
import spidev
import threading
import subprocess
import RPi.GPIO as GPIO
from usb4vc_oled import oled_display_queue
@@ -78,6 +79,22 @@ SPI_MISO_MSG_TYPE_KB_LED_REQUEST = 129
SPI_MOSI_MAGIC = 0xde
SPI_MISO_MAGIC = 0xcd
GP_BTN_SOUTH = 0x130
GP_BTN_EAST = 0x131
GP_BTN_C = 0x132
GP_BTN_NORTH = 0x133
GP_BTN_WEST = 0x134
GP_BTN_Z = 0x135
GP_BTN_TL = 0x136
GP_BTN_TR = 0x137
GP_BTN_TL2 = 0x138
GP_BTN_TR2 = 0x139
GP_BTN_SELECT = 0x13a
GP_BTN_START = 0x13b
GP_BTN_MODE = 0x13c
GP_BTN_THUMBL = 0x13d
GP_BTN_THUMBR = 0x13e
nop_spi_msg_template = [SPI_MOSI_MAGIC] + [0]*31
info_request_spi_msg_template = [SPI_MOSI_MAGIC, 0, SPI_MOSI_MSG_TYPE_INFO_REQUEST] + [0]*29
set_protocl_spi_msg_template = [SPI_MOSI_MAGIC, 0, SPI_MOSI_MSG_TYPE_SET_PROTOCOL] + [0]*29
@@ -96,6 +113,10 @@ def make_keyboard_spi_packet(input_data, kbd_id):
result[6] = input_data[4]
return result
def make_gamepad_spi_packet(gp_status_dict, gp_id, mode):
result = list(gamepad_event_mapped_spi_msg_template)
def change_kb_led(scrolllock, numlock, capslock):
led_file_list = os.listdir(led_device_path)
capslock_list = [os.path.join(led_device_path, x) for x in led_file_list if 'capslock' in x]
@@ -114,12 +135,14 @@ def change_kb_led(scrolllock, numlock, capslock):
with open(os.path.join(item, 'brightness'), 'w') as led_file:
led_file.write(str(capslock))
def raw_input_event_worker():
mouse_status_dict = {}
mouse_button_state_list = [0] * 5
joypad_status_dict = {}
gamepad_status_dict = {}
print("raw_input_event_worker started")
while 1:
# ----------------- KEYBOARD PARSING -----------------
for key in list(keyboard_opened_device_dict):
try:
data = keyboard_opened_device_dict[key][0].read(16)
@@ -134,6 +157,8 @@ def raw_input_event_worker():
if data[0] == EV_KEY:
pcard_spi.xfer(make_keyboard_spi_packet(data, keyboard_opened_device_dict[key][1]))
# ----------------- MOUSE PARSING -----------------
for key in list(mouse_opened_device_dict):
try:
data = mouse_opened_device_dict[key][0].read(16)
@@ -147,7 +172,7 @@ def raw_input_event_worker():
"""
0 - 1 event_type
2 - 3 key code
4 - 8 key status
4 - 7 key status
"""
data = list(data[8:])
# mouse movement and scrolling
@@ -191,6 +216,7 @@ def raw_input_event_worker():
mouse_status_dict.clear()
pcard_spi.xfer(to_transfer)
# ----------------- GAMEPAD PARSING -----------------
for key in list(gamepad_opened_device_dict):
try:
data = gamepad_opened_device_dict[key][0].read(16)
@@ -202,11 +228,25 @@ def raw_input_event_worker():
if data is None:
continue
data = list(data[8:])
print(data)
# if data[0] == EV_KEY:
# to_transfer = keyboard_event_spi_msg_template
# to_transfer[3] = gamepad_opened_device_dict[key][1]
# pcard_spi.xfer(to_transfer)
gamepad_id = gamepad_opened_device_dict[key][1]
if gamepad_id not in gamepad_status_dict:
gamepad_status_dict[gamepad_id] = {}
# gamepad button presses, send out immediately
if data[0] == EV_KEY:
key_code = data[3] * 256 + data[2]
if not (GP_BTN_SOUTH <= key_code <= GP_BTN_THUMBR):
continue
gamepad_status_dict[gamepad_id][key_code] = data[4]
# joystick / analogue trigger movements
if data[0] == EV_ABS:
abs_axes = data[3] * 256 + data[2]
abs_value = int.from_bytes(data[4:8], byteorder='little', signed=True)
# print(data)
# print(hex(abs_axes), abs_value)
# print()
gamepad_status_dict[gamepad_id][abs_axes] = abs_value
print(gamepad_status_dict)
print(gamepad_opened_device_dict[key][2])
if GPIO.event_detected(SLAVE_REQ_PIN):
slave_result = None
@@ -276,12 +316,29 @@ def usb_device_scan_worker():
device_index = sum([int(x) for x in item.split(':')[1].split('.')][:2])
except:
pass
gamepad_opened_device_dict[item] = (this_file, device_index)
gamepad_info = subprocess.getoutput("timeout 0.5 evtest " + str(item))
gamepad_opened_device_dict[item] = (this_file, device_index, parse_evtest_info(gamepad_info))
print("opened gamepad", gamepad_opened_device_dict[item][1], ':' , item)
except Exception as e:
print("gamepad open exception:", e)
continue
def parse_evtest_info(input_str):
input_str = input_str.replace('\r', '').split('Event type 3 (EV_ABS)\n')[-1].split('Event type')[0]
current_axis_code = None
info_dict = {}
for line in input_str.split('\n'):
line = line.strip().replace('\t', '')
if " (ABS_" in line:
current_axis_code = int(line.split('Event code ')[1].split(' (ABS_')[0])
info_dict[current_axis_code] = {}
continue
if current_axis_code is None or len(line) < 3:
continue
line_split = line.split(' ')
info_dict[current_axis_code][line_split[0].lower()] = int(line_split[-1])
return info_dict
raw_input_event_parser_thread = threading.Thread(target=raw_input_event_worker, daemon=True)
usb_device_scan_thread = threading.Thread(target=usb_device_scan_worker, daemon=True)