mirror of
https://github.com/dekuNukem/USB4VC.git
synced 2025-10-31 11:26:46 -07:00
restored old code
This commit is contained in:
@@ -35,6 +35,8 @@ GPIO.setup(PCARD_BUSY_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
|
||||
pcard_spi = spidev.SpiDev(0, 0)
|
||||
pcard_spi.max_speed_hz = 2000000
|
||||
|
||||
opened_device_dict = {}
|
||||
|
||||
led_device_path = '/sys/class/leds'
|
||||
input_device_path = '/dev/input/by-path/'
|
||||
|
||||
@@ -739,6 +741,138 @@ def multiply_round_up_0(number, multi):
|
||||
new_number = -1
|
||||
return int(new_number)
|
||||
|
||||
USB_GAMEPAD_STICK_AXES_NAMES = ["ABS_X", "ABS_Y", "ABS_RX", "ABS_RY"]
|
||||
|
||||
gamepad_status_dict = {}
|
||||
gamepad_hold_check_interval = 0.02
|
||||
def raw_input_event_worker():
|
||||
last_usb_event = 0
|
||||
mouse_status_dict = {'x': [0, 0], 'y': [0, 0], 'scroll': 0, 'hscroll': 0, BTN_LEFT:0, BTN_RIGHT:0, BTN_MIDDLE:0, BTN_SIDE:0, BTN_EXTRA:0, BTN_FORWARD:0, BTN_BACK:0, BTN_TASK:0}
|
||||
next_gamepad_hold_check = time.time() + gamepad_hold_check_interval
|
||||
last_mouse_msg = []
|
||||
last_gamepad_msg = None
|
||||
in_deadzone_list = []
|
||||
last_mouse_button_msg = None
|
||||
print("raw_input_event_worker started")
|
||||
while 1:
|
||||
now = time.time()
|
||||
if now > next_gamepad_hold_check:
|
||||
joystick_hold_update()
|
||||
next_gamepad_hold_check = now + gamepad_hold_check_interval
|
||||
|
||||
# give other threads some breathing room if
|
||||
# no USB input events
|
||||
if now - last_usb_event > 1:
|
||||
time.sleep(0.005)
|
||||
|
||||
for key in list(opened_device_dict):
|
||||
this_device = opened_device_dict[key]
|
||||
this_id = this_device['id']
|
||||
try:
|
||||
data = this_device['file'].read(16)
|
||||
except OSError:
|
||||
print("Device disappeared:", this_device['name'])
|
||||
this_device['file'].close()
|
||||
del opened_device_dict[key]
|
||||
continue
|
||||
if data is None:
|
||||
continue
|
||||
|
||||
usb4vc_ui.my_oled.kick()
|
||||
last_usb_event = now
|
||||
|
||||
if this_device['is_gp'] and this_id not in gamepad_status_dict:
|
||||
gamepad_status_dict[this_id] = {}
|
||||
|
||||
data = list(data[8:])
|
||||
event_code = data[3] * 256 + data[2]
|
||||
|
||||
# event is a key press
|
||||
if data[0] == EV_KEY:
|
||||
# keyboard keys
|
||||
if 0x1 <= event_code <= 248 and event_code not in gamepad_buttons_as_kb_codes:
|
||||
xfer_when_not_busy(make_keyboard_spi_packet(data, this_id))
|
||||
# Mouse buttons
|
||||
elif 0x110 <= event_code <= 0x117:
|
||||
mouse_status_dict[event_code] = data[4]
|
||||
next_gamepad_hold_check = now + gamepad_hold_check_interval
|
||||
if data[4] != 2:
|
||||
clear_mouse_movement(mouse_status_dict)
|
||||
last_mouse_button_msg = make_mouse_spi_packet(mouse_status_dict, this_id)
|
||||
xfer_when_not_busy(list(last_mouse_button_msg))
|
||||
# Gamepad buttons
|
||||
elif is_gamepad_button(event_code) or event_code in gamepad_buttons_as_kb_codes:
|
||||
this_btn_status = data[4]
|
||||
if this_btn_status != 0:
|
||||
this_btn_status = 1
|
||||
gamepad_status_dict[this_id][event_code] = this_btn_status
|
||||
|
||||
# event is relative axes AKA mouse
|
||||
elif data[0] == EV_REL and event_code == REL_X:
|
||||
rawx = int.from_bytes(data[4:6], byteorder='little', signed=True)
|
||||
rawx = multiply_round_up_0(rawx, usb4vc_ui.get_mouse_sensitivity()) & 0xffff
|
||||
mouse_status_dict["x"] = list(rawx.to_bytes(2, byteorder='little'))
|
||||
elif data[0] == EV_REL and event_code == REL_Y:
|
||||
rawy = int.from_bytes(data[4:6], byteorder='little', signed=True)
|
||||
rawy = multiply_round_up_0(rawy, usb4vc_ui.get_mouse_sensitivity()) & 0xffff
|
||||
mouse_status_dict["y"] = list(rawy.to_bytes(2, byteorder='little'))
|
||||
elif data[0] == EV_REL and event_code == REL_WHEEL:
|
||||
mouse_status_dict['scroll'] = data[4]
|
||||
elif data[0] == EV_REL and event_code == REL_HWHEEL:
|
||||
mouse_status_dict['hscroll'] = data[4]
|
||||
|
||||
# event is absolute axes AKA joystick
|
||||
elif this_device['is_gp'] and data[0] == EV_ABS:
|
||||
abs_value = int.from_bytes(data[4:8], byteorder='little', signed=True)
|
||||
abs_value_midpoint127 = convert_to_8bit_midpoint127(abs_value, this_device['axes_info'], event_code)
|
||||
gamepad_status_dict[this_id][event_code] = abs_value_midpoint127
|
||||
|
||||
# SYNC event
|
||||
elif data[0] == EV_SYN and event_code == SYN_REPORT:
|
||||
if this_device['is_mouse']:
|
||||
this_mouse_msg = make_mouse_spi_packet(mouse_status_dict, this_id)
|
||||
if this_mouse_msg == last_mouse_button_msg:
|
||||
pass
|
||||
# send spi mouse message if there is moment, or the button is not typematic
|
||||
elif (max(this_mouse_msg[13:18]) != 2 or sum(this_mouse_msg[4:10]) != 0) and (this_mouse_msg[4:] != last_mouse_msg[4:] or sum(this_mouse_msg[4:]) != 0):
|
||||
xfer_when_not_busy(list(this_mouse_msg))
|
||||
next_gamepad_hold_check = now + gamepad_hold_check_interval
|
||||
clear_mouse_movement(mouse_status_dict)
|
||||
last_mouse_msg = list(this_mouse_msg)
|
||||
if this_device['is_gp']:
|
||||
for axis_name in USB_GAMEPAD_STICK_AXES_NAMES:
|
||||
axis_code = code_name_to_value_lookup.get(axis_name)[0]
|
||||
this_gp_dict = gamepad_status_dict[this_device['id']]
|
||||
if axis_code in this_gp_dict and 127 - 10 <= this_gp_dict[axis_code] <= 127 + 10:
|
||||
this_gp_dict[axis_code] = 127
|
||||
gamepad_output = make_gamepad_spi_packet(gamepad_status_dict, this_device)
|
||||
if gamepad_output != last_gamepad_msg:
|
||||
# print(gamepad_output)
|
||||
gp_to_transfer, kb_to_transfer, mouse_to_transfer = gamepad_output
|
||||
xfer_when_not_busy(list(gp_to_transfer))
|
||||
if kb_to_transfer is not None:
|
||||
time.sleep(0.001)
|
||||
xfer_when_not_busy(list(kb_to_transfer))
|
||||
if mouse_to_transfer is not None:
|
||||
time.sleep(0.001)
|
||||
xfer_when_not_busy(list(mouse_to_transfer))
|
||||
next_gamepad_hold_check = now + gamepad_hold_check_interval
|
||||
last_gamepad_msg = gamepad_output
|
||||
|
||||
# ----------------- PBOARD INTERRUPT -----------------
|
||||
if GPIO.event_detected(SLAVE_REQ_PIN):
|
||||
# send out ACK to turn off P-Card interrupt
|
||||
slave_result = xfer_when_not_busy(make_spi_msg_ack())
|
||||
time.sleep(0.001)
|
||||
# send another to shift response into RPi
|
||||
slave_result = xfer_when_not_busy(make_spi_msg_ack())
|
||||
print(int(time.time()), slave_result)
|
||||
if slave_result[SPI_BUF_INDEX_MAGIC] == SPI_MISO_MAGIC and slave_result[SPI_BUF_INDEX_MSG_TYPE] == SPI_MISO_MSG_TYPE_KB_LED_REQUEST:
|
||||
try:
|
||||
change_kb_led(slave_result[3], slave_result[4], slave_result[5])
|
||||
change_kb_led(slave_result[3], slave_result[4], slave_result[5])
|
||||
except Exception as e:
|
||||
print('exception change_kb_led:', e)
|
||||
|
||||
def check_is_gamepad(capability_dict):
|
||||
keys_and_buttons = capability_dict.get(('EV_KEY', 1))
|
||||
@@ -755,12 +889,6 @@ def check_is_gamepad(capability_dict):
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_device_count():
|
||||
mouse_count = 69
|
||||
kb_count = 69
|
||||
gp_count = 69
|
||||
return (mouse_count, kb_count, gp_count)
|
||||
|
||||
def get_input_devices():
|
||||
result = []
|
||||
available_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
|
||||
@@ -784,6 +912,10 @@ def get_input_devices():
|
||||
dev_dict['is_mouse'] = True
|
||||
if 'KEY_ENTER' in cap_str and "KEY_Y" in cap_str:
|
||||
dev_dict['is_kb'] = True
|
||||
# print(this_device.name)
|
||||
# print(this_cap)
|
||||
# print(check_is_gamepad(this_cap))
|
||||
# print("!!!!!!!!!!!!!!!!!!!!!!")
|
||||
if check_is_gamepad(this_cap):
|
||||
dev_dict['is_gp'] = True
|
||||
try:
|
||||
@@ -795,16 +927,49 @@ def get_input_devices():
|
||||
result.append(dev_dict)
|
||||
return result
|
||||
|
||||
def get_device_count():
|
||||
mouse_count = 0
|
||||
kb_count = 0
|
||||
gp_count = 0
|
||||
for key in opened_device_dict:
|
||||
if opened_device_dict[key]['is_mouse']:
|
||||
mouse_count += 1
|
||||
elif opened_device_dict[key]['is_kb']:
|
||||
kb_count += 1
|
||||
elif opened_device_dict[key]['is_gp']:
|
||||
gp_count += 1
|
||||
return (mouse_count, kb_count, gp_count)
|
||||
|
||||
def usb_device_scan_worker():
|
||||
print("usb_device_scan_worker started")
|
||||
while 1:
|
||||
time.sleep(0.75)
|
||||
wwwww = get_input_devices()
|
||||
for item in wwwww:
|
||||
print(item)
|
||||
print("-----------------")
|
||||
try:
|
||||
device_list = get_input_devices()
|
||||
except Exception as e:
|
||||
print('exception get_input_devices:', e)
|
||||
continue
|
||||
if len(device_list) == 0:
|
||||
print('No input devices found')
|
||||
continue
|
||||
|
||||
# raw_input_event_parser_thread = threading.Thread(target=raw_input_event_worker, daemon=True)
|
||||
for item in device_list:
|
||||
if item['path'] in opened_device_dict:
|
||||
continue
|
||||
try:
|
||||
this_file = open(item['path'], "rb")
|
||||
os.set_blocking(this_file.fileno(), False)
|
||||
item['file'] = this_file
|
||||
try:
|
||||
item['id'] = int(item['path'][len(item['path'].rstrip('0123456789')):])
|
||||
except:
|
||||
item['id'] = 255
|
||||
opened_device_dict[item['path']] = item
|
||||
print("opened device:", hex(item['vendor_id']), hex(item['product_id']), item['name'])
|
||||
except Exception as e:
|
||||
print("exception Device open:", e, item)
|
||||
|
||||
raw_input_event_parser_thread = threading.Thread(target=raw_input_event_worker, daemon=True)
|
||||
usb_device_scan_thread = threading.Thread(target=usb_device_scan_worker, daemon=True)
|
||||
|
||||
def get_pboard_info():
|
||||
|
||||
Reference in New Issue
Block a user