Files
greaseweazle-firmware/scripts/greaseweazle/usb.py
Keir Fraser 9b4b69da85 usb: Place index timings in-band within the flux read stream.
This removes the low limit on max index timings per read request.
2020-10-21 10:29:51 +01:00

515 lines
16 KiB
Python

# greaseweazle/usb.py
#
# Written & released by Keir Fraser <keir.xen@gmail.com>
#
# This is free and unencumbered software released into the public domain.
# See the file COPYING for more details, or visit <http://unlicense.org>.
import struct
from greaseweazle import version
from greaseweazle import error
from greaseweazle.flux import Flux
## Control-Path command set
class ControlCmd:
ClearComms = 10000
Normal = 9600
## Command set
class Cmd:
GetInfo = 0
Update = 1
Seek = 2
Side = 3
SetParams = 4
GetParams = 5
Motor = 6
ReadFlux = 7
WriteFlux = 8
GetFluxStatus = 9
GetIndexTimes = 10
SwitchFwMode = 11
Select = 12
Deselect = 13
SetBusType = 14
SetPin = 15
Reset = 16
EraseFlux = 17
SourceBytes = 18
SinkBytes = 19
str = {
GetInfo: "GetInfo",
Update: "Update",
Seek: "Seek",
Side: "Side",
SetParams: "SetParams",
GetParams: "GetParams",
Motor: "Motor",
ReadFlux: "ReadFlux",
WriteFlux: "WriteFlux",
GetFluxStatus: "GetFluxStatus",
GetIndexTimes: "GetIndexTimes",
SwitchFwMode: "SwitchFwMode",
Select: "Select",
Deselect: "Deselect",
SetBusType: "SetBusType",
SetPin: "SetPin",
Reset: "Reset",
EraseFlux: "EraseFlux",
SourceBytes: "SourceBytes",
SinkBytes: "SinkBytes"
}
## Command responses/acknowledgements
class Ack:
Okay = 0
BadCommand = 1
NoIndex = 2
NoTrk0 = 3
FluxOverflow = 4
FluxUnderflow = 5
Wrprot = 6
NoUnit = 7
NoBus = 8
BadUnit = 9
BadPin = 10
str = {
Okay: "Okay",
BadCommand: "Bad Command",
NoIndex: "No Index",
NoTrk0: "Track 0 not found",
FluxOverflow: "Flux Overflow",
FluxUnderflow: "Flux Underflow",
Wrprot: "Disk is Write Protected",
NoUnit: "No drive unit selected",
NoBus: "No bus type (eg. Shugart, IBM/PC) specified",
BadUnit: "Bad unit number",
BadPin: "Not a modifiable pin"
}
## Cmd.GetInfo indexes
class GetInfo:
Firmware = 0
BandwidthStats = 1
## Cmd.{Get,Set}Params indexes
class Params:
Delays = 0
## Cmd.SetBusType values
class BusType:
Invalid = 0
IBMPC = 1
Shugart = 2
## Flux read stream opcodes, preceded by 0xFF byte
class FluxOp:
LongFlux = 1
Index = 2
## CmdError: Encapsulates a command acknowledgement.
class CmdError(Exception):
def __init__(self, cmd, code):
self.cmd = cmd
self.code = code
def __str__(self):
return "%s: %s" % (Cmd.str.get(self.cmd, "UnknownCmd"),
Ack.str.get(self.code, "Unknown Error (%u)"
% self.code))
class Unit:
## Unit information, instance variables:
## major, minor: Greaseweazle firmware version number
## max_cmd: Maximum Cmd number accepted by this unit
## sample_freq: Resolution of all time values passed to/from this unit
## update_mode: True iff the Greaseweazle unit is in update mode
## Unit(ser):
## Accepts a Pyserial instance for Greaseweazle communications.
def __init__(self, ser):
self.ser = ser
self.reset()
# Copy firmware info to instance variables (see above for definitions).
self._send_cmd(struct.pack("3B", Cmd.GetInfo, 3, GetInfo.Firmware))
x = struct.unpack("<4BI3B21x", self.ser.read(32))
(self.major, self.minor, is_main_firmware,
self.max_cmd, self.sample_freq, self.hw_model,
self.hw_submodel, self.usb_speed) = x
# Old firmware doesn't report HW type but runs on STM32F1 only.
if self.hw_model == 0:
self.hw_model = 1
# Check whether firmware is in update mode: limited command set if so.
self.update_mode = (is_main_firmware == 0)
if self.update_mode:
self.update_jumpered = (self.sample_freq & 1)
del self.sample_freq
return
# We are running main firmware: Check whether an update is needed.
# We can use only the GetInfo command if the firmware is out of date.
self.update_needed = (version.major != self.major
or version.minor != self.minor)
if self.update_needed:
return
# Initialise the delay properties with current firmware values.
self._send_cmd(struct.pack("4B", Cmd.GetParams, 4, Params.Delays, 10))
(self._select_delay, self._step_delay,
self._seek_settle_delay, self._motor_delay,
self._auto_off_delay) = struct.unpack("<5H", self.ser.read(10))
## reset:
## Resets communications with Greaseweazle.
def reset(self):
self.ser.reset_output_buffer()
self.ser.baudrate = ControlCmd.ClearComms
self.ser.baudrate = ControlCmd.Normal
self.ser.reset_input_buffer()
## _send_cmd:
## Send given command byte sequence to Greaseweazle.
## Raise a CmdError if command fails.
def _send_cmd(self, cmd):
self.ser.write(cmd)
(c,r) = struct.unpack("2B", self.ser.read(2))
error.check(c == cmd[0], "Command returned garbage (%02x != %02x)"
% (c, cmd[0]))
if r != 0:
raise CmdError(c, r)
## seek:
## Seek the selected drive's heads to the specified track (cyl, side).
def seek(self, cyl, side):
self._send_cmd(struct.pack("3B", Cmd.Seek, 3, cyl))
self._send_cmd(struct.pack("3B", Cmd.Side, 3, side))
## set_bus_type:
## Set the floppy bus type.
def set_bus_type(self, type):
self._send_cmd(struct.pack("3B", Cmd.SetBusType, 3, type))
## set_pin:
## Set a pin level.
def set_pin(self, pin, level):
self._send_cmd(struct.pack("4B", Cmd.SetPin, 4, pin, int(level)))
## power_on_reset:
## Re-initialise to power-on defaults.
def power_on_reset(self):
self._send_cmd(struct.pack("2B", Cmd.Reset, 2))
## drive_select:
## Select the specified drive unit.
def drive_select(self, unit):
self._send_cmd(struct.pack("3B", Cmd.Select, 3, unit))
## drive_deselect:
## Deselect currently-selected drive unit (if any).
def drive_deselect(self):
self._send_cmd(struct.pack("2B", Cmd.Deselect, 2))
## drive_motor:
## Turn the specified drive's motor on/off.
def drive_motor(self, unit, state):
self._send_cmd(struct.pack("4B", Cmd.Motor, 4, unit, int(state)))
## switch_fw_mode:
## Switch between update bootloader and main firmware.
def switch_fw_mode(self, mode):
self._send_cmd(struct.pack("3B", Cmd.SwitchFwMode, 3, int(mode)))
## update_firmware:
## Update Greaseweazle to the given new firmware.
def update_firmware(self, dat):
self._send_cmd(struct.pack("<2BI", Cmd.Update, 6, len(dat)))
self.ser.write(dat)
(ack,) = struct.unpack("B", self.ser.read(1))
return ack
## update_bootloader:
## Update Greaseweazle with the given new bootloader.
def update_bootloader(self, dat):
self._send_cmd(struct.pack("<2B2I", Cmd.Update, 10,
len(dat), 0xdeafbee3))
self.ser.write(dat)
(ack,) = struct.unpack("B", self.ser.read(1))
return ack
## _decode_flux:
## Decode the Greaseweazle data stream into a list of flux samples.
def _decode_flux(self, dat):
flux, index = [], []
dat_i = iter(dat)
ticks_since_index = 0
try:
while True:
i = next(dat_i)
if i < 250:
flux.append(i)
ticks_since_index += i
elif i == 255:
opcode = next(dat_i)
if opcode == FluxOp.LongFlux:
val = (next(dat_i) & 254) >> 1
val += (next(dat_i) & 254) << 6
val += (next(dat_i) & 254) << 13
val += (next(dat_i) & 254) << 20
flux.append(val)
ticks_since_index += val
elif opcode == FluxOp.Index:
val = (next(dat_i) & 254) >> 1
val += (next(dat_i) & 254) << 6
val += (next(dat_i) & 254) << 13
val += (next(dat_i) & 254) << 20
index.append(ticks_since_index + val)
ticks_since_index = -val
pass
else:
raise error.Fatal("Bad opcode in flux stream (%d)"
% opcode)
else:
val = (i - 249) * 250
val += next(dat_i) - 1
flux.append(val)
ticks_since_index += val
except StopIteration:
pass
error.check(flux[-1] == 0, "Missing terminator on flux read stream")
return flux[:-1], index
## _encode_flux:
## Convert the given flux timings into an encoded data stream.
def _encode_flux(self, flux):
dat = bytearray()
for val in flux:
if val == 0:
pass
elif val < 250:
dat.append(val)
else:
high = val // 250
if high <= 5:
dat.append(249+high)
dat.append(1 + val%250)
else:
dat.append(255)
dat.append(1 | (val<<1) & 255)
dat.append(1 | (val>>6) & 255)
dat.append(1 | (val>>13) & 255)
dat.append(1 | (val>>20) & 255)
dat.append(0) # End of Stream
return dat
## _read_track:
## Private helper which issues command requests to Greaseweazle.
def _read_track(self, nr_revs):
# Request and read all flux timings for this track.
dat = bytearray()
self._send_cmd(struct.pack("<2BH", Cmd.ReadFlux, 4, nr_revs+1))
while True:
dat += self.ser.read(1)
dat += self.ser.read(self.ser.in_waiting)
if dat[-1] == 0:
break
# Check flux status. An exception is raised if there was an error.
self._send_cmd(struct.pack("2B", Cmd.GetFluxStatus, 2))
return dat
## read_track:
## Read and decode flux and index timings for the current track.
def read_track(self, nr_revs, nr_retries=5):
retry = 0
while True:
try:
dat = self._read_track(nr_revs)
except CmdError as error:
# An error occurred. We may retry on transient overflows.
if error.code == Ack.FluxOverflow and retry < nr_retries:
retry += 1
else:
raise error
else:
# Success!
break
# Decode the flux list and read the index-times list.
flux_list, index_list = self._decode_flux(dat)
# Clip the initial partial revolution.
to_index = index_list[0]
for i in range(len(flux_list)):
to_index -= flux_list[i]
if to_index < 0:
flux_list[i] = -to_index
flux_list = flux_list[i:]
break
if to_index >= 0:
# We ran out of flux.
flux_list = []
index_list = index_list[1:]
# Success: Return the requested full index-to-index revolutions.
return Flux(index_list, flux_list, self.sample_freq)
## write_track:
## Write the given flux stream to the current track via Greaseweazle.
def write_track(self, flux_list, terminate_at_index, nr_retries=5):
# Create encoded data stream.
dat = self._encode_flux(flux_list)
retry = 0
while True:
try:
# Write the flux stream to the track via Greaseweazle.
self._send_cmd(struct.pack("3B", Cmd.WriteFlux, 3,
int(terminate_at_index)))
self.ser.write(dat)
self.ser.read(1) # Sync with Greaseweazle
self._send_cmd(struct.pack("2B", Cmd.GetFluxStatus, 2))
except CmdError as error:
# An error occurred. We may retry on transient underflows.
if error.code == Ack.FluxUnderflow and retry < nr_retries:
retry += 1
else:
raise error
else:
# Success!
break
## erase_track:
## Erase the current track via Greaseweazle.
def erase_track(self, ticks):
self._send_cmd(struct.pack("<2BI", Cmd.EraseFlux, 6, int(ticks)))
self.ser.read(1) # Sync with Greaseweazle
self._send_cmd(struct.pack("2B", Cmd.GetFluxStatus, 2))
## source_bytes:
## Command Greaseweazle to source 'nr' garbage bytes.
def source_bytes(self, nr):
self._send_cmd(struct.pack("<2BI", Cmd.SourceBytes, 6, nr))
while nr > 0:
self.ser.read(1)
waiting = self.ser.in_waiting
self.ser.read(waiting)
nr -= 1 + waiting
## sink_bytes:
## Command Greaseweazle to sink 'nr' garbage bytes.
def sink_bytes(self, nr):
self._send_cmd(struct.pack("<2BI", Cmd.SinkBytes, 6, nr))
dat = bytes(1024*1024)
while nr > len(dat):
self.ser.write(dat)
nr -= len(dat)
self.ser.write(dat[:nr])
self.ser.read(1) # Sync with Greaseweazle
## bw_stats:
## Get min/max bandwidth for previous source/sink command. Mbps (float).
def bw_stats(self):
self._send_cmd(struct.pack("3B", Cmd.GetInfo, 3,
GetInfo.BandwidthStats))
min_bytes, min_usecs, max_bytes, max_usecs = struct.unpack(
"<4I16x", self.ser.read(32))
min_bw = (8 * min_bytes) / min_usecs
max_bw = (8 * max_bytes) / max_usecs
return min_bw, max_bw
##
## Delay-property public getters and setters:
## select_delay: Delay (usec) after asserting drive select
## step_delay: Delay (usec) after issuing a head-step command
## seek_settle_delay: Delay (msec) after completing a head-seek operation
## motor_delay: Delay (msec) after turning on drive spindle motor
## auto_off_delay: Timeout (msec) since last command upon which all
## drives are deselected and spindle motors turned off
##
def _set_delays(self):
self._send_cmd(struct.pack("<3B5H", Cmd.SetParams,
3+5*2, Params.Delays,
self._select_delay, self._step_delay,
self._seek_settle_delay,
self._motor_delay, self._auto_off_delay))
@property
def select_delay(self):
return self._select_delay
@select_delay.setter
def select_delay(self, select_delay):
self._select_delay = select_delay
self._set_delays()
@property
def step_delay(self):
return self._step_delay
@step_delay.setter
def step_delay(self, step_delay):
self._step_delay = step_delay
self._set_delays()
@property
def seek_settle_delay(self):
return self._seek_settle_delay
@seek_settle_delay.setter
def seek_settle_delay(self, seek_settle_delay):
self._seek_settle_delay = seek_settle_delay
self._set_delays()
@property
def motor_delay(self):
return self._motor_delay
@motor_delay.setter
def motor_delay(self, motor_delay):
self._motor_delay = motor_delay
self._set_delays()
@property
def auto_off_delay(self):
return self._auto_off_delay
@auto_off_delay.setter
def auto_off_delay(self, auto_off_delay):
self._auto_off_delay = auto_off_delay
self._set_delays()
# Local variables:
# python-indent: 4
# End: