summaryrefslogtreecommitdiff
path: root/tools/proxyclient/m1n1/proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/proxyclient/m1n1/proxy.py')
-rw-r--r--tools/proxyclient/m1n1/proxy.py1104
1 files changed, 1104 insertions, 0 deletions
diff --git a/tools/proxyclient/m1n1/proxy.py b/tools/proxyclient/m1n1/proxy.py
new file mode 100644
index 0000000..1869f3e
--- /dev/null
+++ b/tools/proxyclient/m1n1/proxy.py
@@ -0,0 +1,1104 @@
+# SPDX-License-Identifier: MIT
+import os, sys, struct, serial, time
+from construct import *
+from enum import IntEnum, IntFlag
+from serial.tools.miniterm import Miniterm
+
+from .utils import *
+from .sysreg import *
+
+__all__ = ["REGION_RWX_EL0", "REGION_RW_EL0", "REGION_RX_EL1"]
+
+# Hack to disable input buffer flushing
+class Serial(serial.Serial):
+ def _reset_input_buffer(self):
+ return
+
+ def reset_input_buffer(self):
+ return
+
+class UartError(RuntimeError):
+ pass
+
+class UartTimeout(UartError):
+ pass
+
+class UartCMDError(UartError):
+ pass
+
+class UartChecksumError(UartError):
+ pass
+
+class UartRemoteError(UartError):
+ pass
+
+class Feature(IntFlag):
+ DISABLE_DATA_CSUMS = 0x01 # Data transfers don't use checksums
+
+ @classmethod
+ def get_all(cls):
+ return cls.DISABLE_DATA_CSUMS
+
+ def __str__(self):
+ return ", ".join(feature.name for feature in self.__class__
+ if feature & self) or "<none>"
+
+
+class START(IntEnum):
+ BOOT = 0
+ EXCEPTION = 1
+ EXCEPTION_LOWER = 2
+ HV = 3
+
+class EXC(IntEnum):
+ SYNC = 0
+ IRQ = 1
+ FIQ = 2
+ SERROR = 3
+
+class EVENT(IntEnum):
+ MMIOTRACE = 1
+ IRQTRACE = 2
+
+class EXC_RET(IntEnum):
+ UNHANDLED = 1
+ HANDLED = 2
+ EXIT_GUEST = 3
+ STEP = 4
+
+class DCP_SHUTDOWN_MODE(IntEnum):
+ QUIESCED = 0
+ SLEEP_IF_EXTERNAL = 1
+ SLEEP = 2
+
+class PIX_FMT(IntEnum):
+ XRGB = 0
+ XBGR = 1
+
+class DART(IntEnum):
+ T8020 = 0
+ T8110 = 1
+ T6000 = 2
+
+ExcInfo = Struct(
+ "regs" / Array(32, Int64ul),
+ "spsr" / RegAdapter(SPSR),
+ "elr" / Int64ul,
+ "esr" / RegAdapter(ESR),
+ "far" / Int64ul,
+ "afsr1" / Int64ul,
+ "sp" / Array(3, Int64ul),
+ "cpu_id" / Int64ul,
+ "mpidr" / Int64ul,
+ "elr_phys" / Int64ul,
+ "far_phys" / Int64ul,
+ "sp_phys" / Int64ul,
+ "data" / Int64ul,
+)
+# Sends 56+ byte Commands and Expects 36 Byte Responses
+# Commands are format <I48sI
+# 4 byte command, 48 byte null padded data + 4 byte checksum
+# Responses are of the format: struct format <Ii24sI
+# 4byte Response , 4 byte status, 24 byte string, 4 byte Checksum
+# Response must start 0xff55aaXX where XX distiguishes between them
+# In little endian mode these numbers as listed as REQ_* constants
+# defined under UartInterface
+#
+# Event Response REQ_EVENT passed to registered Event Handler
+# Boot Response REQ_BOOT passed to handle_boot() which may
+# pass to a matching registered handler based on reason, code values
+# If the status is ST_OK returns the data field to caller
+# Otherwise reports a remote Error
+
+class UartInterface(Reloadable):
+ REQ_NOP = 0x00AA55FF
+ REQ_PROXY = 0x01AA55FF
+ REQ_MEMREAD = 0x02AA55FF
+ REQ_MEMWRITE = 0x03AA55FF
+ REQ_BOOT = 0x04AA55FF
+ REQ_EVENT = 0x05AA55FF
+
+ CHECKSUM_SENTINEL = 0xD0DECADE
+ DATA_END_SENTINEL = 0xB0CACC10
+
+ ST_OK = 0
+ ST_BADCMD = -1
+ ST_INVAL = -2
+ ST_XFERERR = -3
+ ST_CSUMERR = -4
+
+ CMD_LEN = 56
+ REPLY_LEN = 36
+ EVENT_HDR_LEN = 8
+
+ def __init__(self, device=None, debug=False):
+ self.debug = debug
+ self.devpath = None
+ if device is None:
+ device = os.environ.get("M1N1DEVICE", "/dev/m1n1:115200")
+ if isinstance(device, str):
+ baud = 115200
+ if ":" in device:
+ device, baud = device.rsplit(":", 1)
+ baud = int(baud)
+ self.devpath = device
+ self.baudrate = baud
+
+ device = Serial(self.devpath, baud)
+
+ self.dev = device
+ self.dev.timeout = 0
+ self.dev.flushOutput()
+ self.dev.flushInput()
+ self.pted = False
+ #d = self.dev.read(1)
+ #while d != "":
+ #d = self.dev.read(1)
+ self.dev.timeout = int(os.environ.get("M1N1TIMEOUT", "3"))
+ self.tty_enable = True
+ self.handlers = {}
+ self.evt_handlers = {}
+ self.enabled_features = Feature(0)
+
+ def checksum(self, data):
+ sum = 0xDEADBEEF;
+ for c in data:
+ sum *= 31337
+ sum += c ^ 0x5a
+ sum &= 0xFFFFFFFF
+
+ return (sum ^ 0xADDEDBAD) & 0xFFFFFFFF
+
+ def data_checksum(self, data):
+ if self.enabled_features & Feature.DISABLE_DATA_CSUMS:
+ return self.CHECKSUM_SENTINEL
+
+ return self.checksum(data)
+
+ def readfull(self, size):
+ d = b''
+ while len(d) < size:
+ block = self.dev.read(size - len(d))
+ if not block:
+ raise UartTimeout("Expected %d bytes, got %d bytes"%(size,len(d)))
+ d += block
+ return d
+
+ def cmd(self, cmd, payload=b""):
+ if len(payload) > self.CMD_LEN:
+ raise ValueError("Incorrect payload size %d"%len(payload))
+
+ payload = payload.ljust(self.CMD_LEN, b"\x00")
+ command = struct.pack("<I", cmd) + payload
+ command += struct.pack("<I", self.checksum(command))
+ if self.debug:
+ print("<<", hexdump(command))
+ self.dev.write(command)
+
+ def unkhandler(self, s):
+ if not self.tty_enable:
+ return
+ for c in s:
+ if not self.pted:
+ sys.stdout.write("TTY> ")
+ self.pted = True
+ if c == 10:
+ self.pted = False
+ sys.stdout.write(chr(c))
+ sys.stdout.flush()
+
+ def ttymode(self, dev=None):
+ if dev is None:
+ dev = self.dev
+
+ tout = dev.timeout
+ self.tty_enable = True
+ dev.timeout = None
+
+ term = Miniterm(dev, eol='cr')
+ term.exit_character = chr(0x1d) # GS/CTRL+]
+ term.menu_character = chr(0x14) # Menu: CTRL+T
+ term.raw = True
+ term.set_rx_encoding('UTF-8')
+ term.set_tx_encoding('UTF-8')
+
+ print('--- TTY mode | Quit: CTRL+] | Menu: CTRL+T ---')
+ term.start()
+ try:
+ term.join(True)
+ except KeyboardInterrupt:
+ pass
+
+ print('--- Exit TTY mode ---')
+ term.join()
+ term.close()
+
+ dev.timeout = tout
+ self.tty_enable = False
+
+ def reply(self, cmd):
+ reply = b''
+ while True:
+ if not reply or reply[-1] != 255:
+ reply = b''
+ reply += self.readfull(1)
+ if reply != b"\xff":
+ self.unkhandler(reply)
+ continue
+ else:
+ reply = b'\xff'
+ reply += self.readfull(1)
+ if reply != b"\xff\x55":
+ self.unkhandler(reply)
+ continue
+ reply += self.readfull(1)
+ if reply != b"\xff\x55\xaa":
+ self.unkhandler(reply)
+ continue
+ reply += self.readfull(1)
+ cmdin = struct.unpack("<I", reply)[0]
+ if cmdin == self.REQ_EVENT:
+ reply += self.readfull(self.EVENT_HDR_LEN - 4)
+ data_len, event_type = struct.unpack("<HH", reply[4:])
+ reply += self.readfull(data_len + 4)
+ if self.debug:
+ print(">>", hexdump(reply))
+ checksum = struct.unpack("<I", reply[-4:])[0]
+ ccsum = self.data_checksum(reply[:-4])
+ if checksum != ccsum:
+ print("Event checksum error: Expected 0x%08x, got 0x%08x"%(checksum, ccsum))
+ raise UartChecksumError()
+ self.handle_event(EVENT(event_type), reply[self.EVENT_HDR_LEN:-4])
+ reply = b''
+ continue
+
+ reply += self.readfull(self.REPLY_LEN - 4)
+ if self.debug:
+ print(">>", hexdump(reply))
+ status, data, checksum = struct.unpack("<i24sI", reply[4:])
+ ccsum = self.checksum(reply[:-4])
+ if checksum != ccsum:
+ print("Reply checksum error: Expected 0x%08x, got 0x%08x"%(checksum, ccsum))
+ raise UartChecksumError()
+
+ if cmdin != cmd:
+ if cmdin == self.REQ_BOOT and status == self.ST_OK:
+ self.handle_boot(data)
+ reply = b''
+ continue
+ raise UartCMDError("Reply command mismatch: Expected 0x%08x, got 0x%08x"%(cmd, cmdin))
+ if status != self.ST_OK:
+ if status == self.ST_BADCMD:
+ raise UartRemoteError("Reply error: Bad Command")
+ elif status == self.ST_INVAL:
+ raise UartRemoteError("Reply error: Invalid argument")
+ elif status == self.ST_XFERERR:
+ raise UartRemoteError("Reply error: Data transfer failed")
+ elif status == self.ST_CSUMERR:
+ raise UartRemoteError("Reply error: Data checksum failed")
+ else:
+ raise UartRemoteError("Reply error: Unknown error (%d)"%status)
+ return data
+
+ def handle_boot(self, data):
+ reason, code, info = struct.unpack("<IIQ", data[:16])
+ reason = START(reason)
+ if reason in (START.EXCEPTION, START.EXCEPTION_LOWER):
+ code = EXC(code)
+ if (reason, code) in self.handlers:
+ self.handlers[(reason, code)](reason, code, info)
+ elif reason != START.BOOT:
+ print(f"Proxy callback without handler: {reason}, {code}")
+
+ def set_handler(self, reason, code, handler):
+ self.handlers[(reason, code)] = handler
+
+ def handle_event(self, event_id, data):
+ if event_id in self.evt_handlers:
+ self.evt_handlers[event_id](data)
+
+ def set_event_handler(self, event_id, handler):
+ self.evt_handlers[event_id] = handler
+
+ def wait_boot(self):
+ try:
+ return self.reply(self.REQ_BOOT)
+ except:
+ # Over USB, reboots cause a reconnect
+ self.dev.close()
+ print("Waiting for reconnection... ", end="")
+ sys.stdout.flush()
+ for i in range(100):
+ print(".", end="")
+ sys.stdout.flush()
+ try:
+ self.dev.open()
+ except serial.serialutil.SerialException:
+ time.sleep(0.1)
+ else:
+ break
+ else:
+ raise UartTimeout("Reconnection timed out")
+ print(" Connected")
+
+ def wait_and_handle_boot(self):
+ self.handle_boot(self.wait_boot())
+
+ def nop(self):
+ features = Feature.get_all()
+
+ # Send the supported feature flags in the NOP message (has no effect
+ # if the target does not support it)
+ self.cmd(self.REQ_NOP, struct.pack("<Q", features.value))
+ result = self.reply(self.REQ_NOP)
+
+ # Get the enabled feature flags from the message response (returns
+ # 0 if the target does not support it)
+ features = Feature(struct.unpack("<QQQ", result)[0])
+
+ if self.debug:
+ print(f"Enabled features: {features}")
+
+ self.enabled_features = features
+
+ def proxyreq(self, req, reboot=False, no_reply=False, pre_reply=None):
+ self.cmd(self.REQ_PROXY, req)
+ if pre_reply:
+ pre_reply()
+ if no_reply:
+ return
+ elif reboot:
+ return self.wait_boot()
+ else:
+ return self.reply(self.REQ_PROXY)
+
+ def writemem(self, addr, data, progress=False):
+ checksum = self.data_checksum(data)
+ size = len(data)
+ req = struct.pack("<QQI", addr, size, checksum)
+ self.cmd(self.REQ_MEMWRITE, req)
+ if self.debug:
+ print("<< DATA:")
+ chexdump(data)
+ for i in range(0, len(data), 8192):
+ self.dev.write(data[i:i + 8192])
+ if progress:
+ sys.stdout.write(".")
+ sys.stdout.flush()
+ if progress:
+ print()
+ if self.enabled_features & Feature.DISABLE_DATA_CSUMS:
+ # Extra sentinel after the data to make sure no data is lost
+ self.dev.write(struct.pack("<I", self.DATA_END_SENTINEL))
+
+ # should automatically report a CRC failure
+ self.reply(self.REQ_MEMWRITE)
+
+ def readmem(self, addr, size):
+ if size == 0:
+ return b""
+
+ req = struct.pack("<QQ", addr, size)
+ self.cmd(self.REQ_MEMREAD, req)
+ reply = self.reply(self.REQ_MEMREAD)
+ checksum = struct.unpack("<I",reply[:4])[0]
+ data = self.readfull(size)
+ if self.debug:
+ print(">> DATA:")
+ chexdump(data)
+ ccsum = self.data_checksum(data)
+ if checksum != ccsum:
+ raise UartChecksumError("Reply data checksum error: Expected 0x%08x, got 0x%08x"%(checksum, ccsum))
+
+ if self.enabled_features & Feature.DISABLE_DATA_CSUMS:
+ # Extra sentinel after the data to make sure no data was lost
+ sentinel = struct.unpack("<I", self.readfull(4))[0]
+ if sentinel != self.DATA_END_SENTINEL:
+ raise UartChecksumError(f"Reply data sentinel error: Expected "
+ f"{self.DATA_END_SENTINEL:#x}, got {sentinel:#x}")
+
+ return data
+
+ def readstruct(self, addr, stype):
+ return stype.parse(self.readmem(addr, stype.sizeof()))
+
+class ProxyError(RuntimeError):
+ pass
+
+class ProxyReplyError(ProxyError):
+ pass
+
+class ProxyRemoteError(ProxyError):
+ pass
+
+class ProxyCommandError(ProxyRemoteError):
+ pass
+
+class AlignmentError(Exception):
+ pass
+
+class IODEV(IntEnum):
+ UART = 0
+ FB = 1
+ USB_VUART = 2
+ USB0 = 3
+ USB1 = 4
+ USB2 = 5
+ USB3 = 6
+ USB4 = 7
+ USB5 = 8
+ USB6 = 9
+ USB7 = 10
+
+class USAGE(IntFlag):
+ CONSOLE = (1 << 0)
+ UARTPROXY = (1 << 1)
+
+class GUARD(IntFlag):
+ OFF = 0
+ SKIP = 1
+ MARK = 2
+ RETURN = 3
+ SILENT = 0x100
+
+REGION_RWX_EL0 = 0x80000000000
+REGION_RW_EL0 = 0xa0000000000
+REGION_RX_EL1 = 0xc0000000000
+
+# Uses UartInterface.proxyreq() to send requests to M1N1 and process
+# reponses sent back.
+class M1N1Proxy(Reloadable):
+ S_OK = 0
+ S_BADCMD = -1
+
+ P_NOP = 0x000
+ P_EXIT = 0x001
+ P_CALL = 0x002
+ P_GET_BOOTARGS = 0x003
+ P_GET_BASE = 0x004
+ P_SET_BAUD = 0x005
+ P_UDELAY = 0x006
+ P_SET_EXC_GUARD = 0x007
+ P_GET_EXC_COUNT = 0x008
+ P_EL0_CALL = 0x009
+ P_EL1_CALL = 0x00a
+ P_VECTOR = 0x00b
+ P_GL1_CALL = 0x00c
+ P_GL2_CALL = 0x00d
+ P_GET_SIMD_STATE = 0x00e
+ P_PUT_SIMD_STATE = 0x00f
+ P_REBOOT = 0x010
+
+ P_WRITE64 = 0x100
+ P_WRITE32 = 0x101
+ P_WRITE16 = 0x102
+ P_WRITE8 = 0x103
+ P_READ64 = 0x104
+ P_READ32 = 0x105
+ P_READ16 = 0x106
+ P_READ8 = 0x107
+ P_SET64 = 0x108
+ P_SET32 = 0x109
+ P_SET16 = 0x10a
+ P_SET8 = 0x10b
+ P_CLEAR64 = 0x10c
+ P_CLEAR32 = 0x10d
+ P_CLEAR16 = 0x10e
+ P_CLEAR8 = 0x10f
+ P_MASK64 = 0x110
+ P_MASK32 = 0x111
+ P_MASK16 = 0x112
+ P_MASK8 = 0x113
+ P_WRITEREAD64 = 0x114
+ P_WRITEREAD32 = 0x115
+ P_WRITEREAD16 = 0x116
+ P_WRITEREAD8 = 0x117
+
+ P_MEMCPY64 = 0x200
+ P_MEMCPY32 = 0x201
+ P_MEMCPY16 = 0x202
+ P_MEMCPY8 = 0x203
+ P_MEMSET64 = 0x204
+ P_MEMSET32 = 0x205
+ P_MEMSET16 = 0x206
+ P_MEMSET8 = 0x207
+
+ P_IC_IALLUIS = 0x300
+ P_IC_IALLU = 0x301
+ P_IC_IVAU = 0x302
+ P_DC_IVAC = 0x303
+ P_DC_ISW = 0x304
+ P_DC_CSW = 0x305
+ P_DC_CISW = 0x306
+ P_DC_ZVA = 0x307
+ P_DC_CVAC = 0x308
+ P_DC_CVAU = 0x309
+ P_DC_CIVAC = 0x30a
+ P_MMU_SHUTDOWN = 0x30b
+ P_MMU_INIT = 0x30c
+ P_MMU_DISABLE = 0x30d
+ P_MMU_RESTORE = 0x30e
+ P_MMU_INIT_SECONDARY = 0x30f
+
+ P_XZDEC = 0x400
+ P_GZDEC = 0x401
+
+ P_SMP_START_SECONDARIES = 0x500
+ P_SMP_CALL = 0x501
+ P_SMP_CALL_SYNC = 0x502
+ P_SMP_WAIT = 0x503
+ P_SMP_SET_WFE_MODE = 0x504
+
+ P_HEAPBLOCK_ALLOC = 0x600
+ P_MALLOC = 0x601
+ P_MEMALIGN = 0x602
+ P_FREE = 0x602
+
+ P_KBOOT_BOOT = 0x700
+ P_KBOOT_SET_CHOSEN = 0x701
+ P_KBOOT_SET_INITRD = 0x702
+ P_KBOOT_PREPARE_DT = 0x703
+
+ P_PMGR_CLOCK_ENABLE = 0x800
+ P_PMGR_CLOCK_DISABLE = 0x801
+ P_PMGR_ADT_CLOCKS_ENABLE = 0x802
+ P_PMGR_ADT_CLOCKS_DISABLE = 0x803
+ P_PMGR_RESET = 0x804
+
+ P_IODEV_SET_USAGE = 0x900
+ P_IODEV_CAN_READ = 0x901
+ P_IODEV_CAN_WRITE = 0x902
+ P_IODEV_READ = 0x903
+ P_IODEV_WRITE = 0x904
+ P_IODEV_WHOAMI = 0x905
+ P_USB_IODEV_VUART_SETUP = 0x906
+
+ P_TUNABLES_APPLY_GLOBAL = 0xa00
+ P_TUNABLES_APPLY_LOCAL = 0xa01
+
+ P_DART_INIT = 0xb00
+ P_DART_SHUTDOWN = 0xb01
+ P_DART_MAP = 0xb02
+ P_DART_UNMAP = 0xb03
+
+ P_HV_INIT = 0xc00
+ P_HV_MAP = 0xc01
+ P_HV_START = 0xc02
+ P_HV_TRANSLATE = 0xc03
+ P_HV_PT_WALK = 0xc04
+ P_HV_MAP_VUART = 0xc05
+ P_HV_TRACE_IRQ = 0xc06
+ P_HV_WDT_START = 0xc07
+ P_HV_START_SECONDARY = 0xc08
+ P_HV_SWITCH_CPU = 0xc09
+ P_HV_SET_TIME_STEALING = 0xc0a
+ P_HV_PIN_CPU = 0xc0b
+ P_HV_WRITE_HCR = 0xc0c
+ P_HV_MAP_VIRTIO = 0xc0d
+ P_VIRTIO_PUT_BUFFER = 0xc0e
+
+ P_FB_INIT = 0xd00
+ P_FB_SHUTDOWN = 0xd01
+ P_FB_BLIT = 0xd02
+ P_FB_UNBLIT = 0xd03
+ P_FB_FILL = 0xd04
+ P_FB_CLEAR = 0xd05
+ P_FB_DISPLAY_LOGO = 0xd06
+ P_FB_RESTORE_LOGO = 0xd07
+ P_FB_IMPROVE_LOGO = 0xd08
+
+ P_PCIE_INIT = 0xe00
+ P_PCIE_SHUTDOWN = 0xe01
+
+ P_NVME_INIT = 0xf00
+ P_NVME_SHUTDOWN = 0xf01
+ P_NVME_READ = 0xf02
+ P_NVME_FLUSH = 0xf03
+
+ P_MCC_GET_CARVEOUTS = 0x1000
+
+ P_DISPLAY_INIT = 0x1100
+ P_DISPLAY_CONFIGURE = 0x1101
+ P_DISPLAY_SHUTDOWN = 0x1102
+ P_DISPLAY_START_DCP = 0x1103
+ P_DISPLAY_IS_EXTERNAL = 0x1104
+
+ P_DAPF_INIT_ALL = 0x1200
+ P_DAPF_INIT = 0x1201
+
+ def __init__(self, iface, debug=False):
+ self.debug = debug
+ self.iface = iface
+ self.heap = None
+
+ def _request(self, opcode, *args, reboot=False, signed=False, no_reply=False, pre_reply=None):
+ if len(args) > 6:
+ raise ValueError("Too many arguments")
+ args = list(args) + [0] * (6 - len(args))
+ req = struct.pack("<7Q", opcode, *args)
+ if self.debug:
+ print("<<<< %08x: %08x %08x %08x %08x %08x %08x"%tuple([opcode] + args))
+ reply = self.iface.proxyreq(req, reboot=reboot, no_reply=no_reply, pre_reply=None)
+ if no_reply or reboot and reply is None:
+ return
+ ret_fmt = "q" if signed else "Q"
+ rop, status, retval = struct.unpack("<Qq" + ret_fmt, reply)
+ if self.debug:
+ print(">>>> %08x: %d %08x"%(rop, status, retval))
+ if reboot:
+ return
+ if rop != opcode:
+ raise ProxyReplyError("Reply opcode mismatch: Expected 0x%08x, got 0x%08x"%(opcode,rop))
+ if status != self.S_OK:
+ if status == self.S_BADCMD:
+ raise ProxyCommandError("Reply error: Bad Command")
+ else:
+ raise ProxyRemoteError("Reply error: Unknown error (%d)"%status)
+ return retval
+
+ def request(self, opcode, *args, **kwargs):
+ free = []
+ args = list(args)
+ args2 = []
+ for i, arg in enumerate(args):
+ if isinstance(arg, str):
+ arg = arg.encode("utf-8") + b"\0"
+ if isinstance(arg, bytes) and self.heap:
+ p = self.heap.malloc(len(arg))
+ free.append(p)
+ self.iface.writemem(p, arg)
+ if (i < (len(args) - 1)) and args[i + 1] is None:
+ args[i + 1] = len(arg)
+ arg = p
+ args2.append(arg)
+ try:
+ return self._request(opcode, *args2, **kwargs)
+ finally:
+ for i in free:
+ self.heap.free(i)
+
+ def nop(self):
+ self.request(self.P_NOP)
+ def exit(self, retval=0):
+ self.request(self.P_EXIT, retval)
+ def call(self, addr, *args, reboot=False):
+ if len(args) > 5:
+ raise ValueError("Too many arguments")
+ return self.request(self.P_CALL, addr, *args, reboot=reboot)
+ def reload(self, addr, *args, el1=False):
+ if len(args) > 4:
+ raise ValueError("Too many arguments")
+ if el1:
+ self.request(self.P_EL1_CALL, addr, *args, no_reply=True)
+ else:
+ try:
+ self.request(self.P_VECTOR, addr, *args)
+ self.iface.wait_boot()
+ except ProxyCommandError: # old m1n1 does not support P_VECTOR
+ try:
+ self.mmu_shutdown()
+ except ProxyCommandError: # older m1n1 does not support MMU
+ pass
+ self.request(self.P_CALL, addr, *args, reboot=True)
+ def get_bootargs(self):
+ return self.request(self.P_GET_BOOTARGS)
+ def get_base(self):
+ return self.request(self.P_GET_BASE)
+ def set_baud(self, baudrate):
+ self.iface.tty_enable = False
+ def change():
+ self.iface.dev.baudrate = baudrate
+ try:
+ self.request(self.P_SET_BAUD, baudrate, 16, 0x005aa5f0, pre_reply=change)
+ finally:
+ self.iface.tty_enable = True
+ def udelay(self, usec):
+ self.request(self.P_UDELAY, usec)
+ def set_exc_guard(self, mode):
+ self.request(self.P_SET_EXC_GUARD, mode)
+ def get_exc_count(self):
+ return self.request(self.P_GET_EXC_COUNT)
+ def el0_call(self, addr, *args):
+ if len(args) > 4:
+ raise ValueError("Too many arguments")
+ return self.request(self.P_EL0_CALL, addr, *args)
+ def el1_call(self, addr, *args):
+ if len(args) > 4:
+ raise ValueError("Too many arguments")
+ return self.request(self.P_EL1_CALL, addr, *args)
+ def gl1_call(self, addr, *args):
+ if len(args) > 4:
+ raise ValueError("Too many arguments")
+ return self.request(self.P_GL1_CALL, addr, *args)
+ def gl2_call(self, addr, *args):
+ if len(args) > 4:
+ raise ValueError("Too many arguments")
+ return self.request(self.P_GL2_CALL, addr, *args)
+ def get_simd_state(self, buf):
+ self.request(self.P_GET_SIMD_STATE, buf)
+ def put_simd_state(self, buf):
+ self.request(self.P_PUT_SIMD_STATE, buf)
+ def reboot(self):
+ self.request(self.P_REBOOT, no_reply=True)
+
+ def write64(self, addr, data):
+ '''write 8 byte value to given address'''
+ if addr & 7:
+ raise AlignmentError()
+ self.request(self.P_WRITE64, addr, data)
+ def write32(self, addr, data):
+ '''write 4 byte value to given address'''
+ if addr & 3:
+ raise AlignmentError()
+ self.request(self.P_WRITE32, addr, data)
+ def write16(self, addr, data):
+ '''write 2 byte value to given address'''
+ if addr & 1:
+ raise AlignmentError()
+ self.request(self.P_WRITE16, addr, data)
+ def write8(self, addr, data):
+ '''write 1 byte value to given address'''
+ self.request(self.P_WRITE8, addr, data)
+
+ def read64(self, addr):
+ '''return 8 byte value from given address'''
+ if addr & 7:
+ raise AlignmentError()
+ return self.request(self.P_READ64, addr)
+ def read32(self, addr):
+ '''return 4 byte value given address'''
+ if addr & 3:
+ raise AlignmentError()
+ return self.request(self.P_READ32, addr)
+ def read16(self, addr):
+ '''return 2 byte value from given address'''
+ if addr & 1:
+ raise AlignmentError()
+ return self.request(self.P_READ16, addr)
+ def read8(self, addr):
+ '''return 1 byte value from given address'''
+ return self.request(self.P_READ8, addr)
+
+ def set64(self, addr, data):
+ '''Or 64 bit value of data into memory at addr and return result'''
+ if addr & 7:
+ raise AlignmentError()
+ return self.request(self.P_SET64, addr, data)
+ def set32(self, addr, data):
+ '''Or 32 bit value of data into memory at addr and return result'''
+ if addr & 3:
+ raise AlignmentError()
+ return self.request(self.P_SET32, addr, data)
+ def set16(self, addr, data):
+ '''Or 16 bit value of data into memory at addr and return result'''
+ if addr & 1:
+ raise AlignmentError()
+ return self.request(self.P_SET16, addr, data)
+ def set8(self, addr, data):
+ '''Or byte value of data into memory at addr and return result'''
+ return self.request(self.P_SET8, addr, data)
+
+ def clear64(self, addr, data):
+ '''Clear bits in 64 bit memory at address addr that are set
+ in parameter data and return result'''
+ if addr & 7:
+ raise AlignmentError()
+ return self.request(self.P_CLEAR64, addr, data)
+ def clear32(self, addr, data):
+ '''Clear bits in 32 bit memory at address addr that are set
+ in parameter data and return result'''
+ if addr & 3:
+ raise AlignmentError()
+ return self.request(self.P_CLEAR32, addr, data)
+ def clear16(self, addr, data):
+ '''Clear bits in 16 bit memory at address addr that are set
+ in parameter data and return result'''
+ if addr & 1:
+ raise AlignmentError()
+ return self.request(self.P_CLEAR16, addr, data)
+ def clear8(self, addr, data):
+ '''Clear bits in 8 bit memory at addr that are set in data
+ and return result'''
+ return self.request(self.P_CLEAR8, addr, data)
+
+ def mask64(self, addr, clear, set):
+ '''Clear bits in 64 bit memory at address addr that are
+ set in clear, then set the bits in set and return result'''
+ if addr & 7:
+ raise AlignmentError()
+ return self.request(self.P_MASK64, addr, clear, set)
+ def mask32(self, addr, clear, set):
+ '''Clear bits in 32 bit memory at address addr that are
+ set in clear, then set the bits in set and return result'''
+ if addr & 3:
+ raise AlignmentError()
+ return self.request(self.P_MASK32, addr, clear, set)
+ def mask16(self, addr, clear, set):
+ '''Clear select bits in 16 bit memory addr that are set
+ in clear parameter, then set the bits in set parameter and return result'''
+ if addr & 1:
+ raise AlignmentError()
+ return self.request(self.P_MASK16, addr, clear, set)
+ def mask8(self, addr, clear, set):
+ '''Clear bits in 1 byte memory at addr that are set
+ in clear parameter, then set the bits in set parameter
+ and return the result'''
+ return self.request(self.P_MASK8, addr, clear, set)
+
+ def writeread64(self, addr, data):
+ return self.request(self.P_WRITEREAD64, addr, data)
+ def writeread32(self, addr, data):
+ return self.request(self.P_WRITEREAD32, addr, data)
+ def writeread16(self, addr, data):
+ return self.request(self.P_WRITEREAD16, addr, data)
+ def writeread8(self, addr, data):
+ return self.request(self.P_WRITEREAD8, addr, data)
+
+ def memcpy64(self, dst, src, size):
+ if src & 7 or dst & 7:
+ raise AlignmentError()
+ self.request(self.P_MEMCPY64, dst, src, size)
+ def memcpy32(self, dst, src, size):
+ if src & 3 or dst & 3:
+ raise AlignmentError()
+ self.request(self.P_MEMCPY32, dst, src, size)
+ def memcpy16(self, dst, src, size):
+ if src & 1 or dst & 1:
+ raise AlignmentError()
+ self.request(self.P_MEMCPY16, dst, src, size)
+ def memcpy8(self, dst, src, size):
+ self.request(self.P_MEMCPY8, dst, src, size)
+
+ def memset64(self, dst, src, size):
+ if dst & 7:
+ raise AlignmentError()
+ self.request(self.P_MEMSET64, dst, src, size)
+ def memset32(self, dst, src, size):
+ if dst & 3:
+ raise AlignmentError()
+ self.request(self.P_MEMSET32, dst, src, size)
+ def memset16(self, dst, src, size):
+ if dst & 1:
+ raise AlignmentError()
+ self.request(self.P_MEMSET16, dst, src, size)
+ def memset8(self, dst, src, size):
+ self.request(self.P_MEMSET8, dst, src, size)
+
+ def ic_ialluis(self):
+ self.request(self.P_IC_IALLUIS)
+ def ic_iallu(self):
+ self.request(self.P_IC_IALLU)
+ def ic_ivau(self, addr, size):
+ self.request(self.P_IC_IVAU, addr, size)
+ def dc_ivac(self, addr, size):
+ self.request(self.P_DC_IVAC, addr, size)
+ def dc_isw(self, sw):
+ self.request(self.P_DC_ISW, sw)
+ def dc_csw(self, sw):
+ self.request(self.P_DC_CSW, sw)
+ def dc_cisw(self, sw):
+ self.request(self.P_DC_CISW, sw)
+ def dc_zva(self, addr, size):
+ self.request(self.P_DC_ZVA, addr, size)
+ def dc_cvac(self, addr, size):
+ self.request(self.P_DC_CVAC, addr, size)
+ def dc_cvau(self, addr, size):
+ self.request(self.P_DC_CVAU, addr, size)
+ def dc_civac(self, addr, size):
+ self.request(self.P_DC_CIVAC, addr, size)
+ def mmu_shutdown(self):
+ self.request(self.P_MMU_SHUTDOWN)
+ def mmu_init(self):
+ self.request(self.P_MMU_INIT)
+ def mmu_disable(self):
+ return self.request(self.P_MMU_DISABLE)
+ def mmu_restore(self, flags):
+ self.request(self.P_MMU_RESTORE, flags)
+ def mmu_init_secondary(self, cpu):
+ self.request(self.P_MMU_INIT_SECONDARY, cpu)
+
+
+ def xzdec(self, inbuf, insize, outbuf=0, outsize=0):
+ return self.request(self.P_XZDEC, inbuf, insize, outbuf,
+ outsize, signed=True)
+
+ def gzdec(self, inbuf, insize, outbuf, outsize):
+ return self.request(self.P_GZDEC, inbuf, insize, outbuf,
+ outsize, signed=True)
+
+ def smp_start_secondaries(self):
+ self.request(self.P_SMP_START_SECONDARIES)
+ def smp_call(self, cpu, addr, *args):
+ if len(args) > 4:
+ raise ValueError("Too many arguments")
+ self.request(self.P_SMP_CALL, cpu, addr, *args)
+ def smp_call_sync(self, cpu, addr, *args):
+ if len(args) > 4:
+ raise ValueError("Too many arguments")
+ return self.request(self.P_SMP_CALL_SYNC, cpu, addr, *args)
+ def smp_wait(self, cpu):
+ return self.request(self.P_SMP_WAIT, cpu)
+ def smp_set_wfe_mode(self, mode):
+ return self.request(self.P_SMP_SET_WFE_MODE, mode)
+
+ def heapblock_alloc(self, size):
+ return self.request(self.P_HEAPBLOCK_ALLOC, size)
+ def malloc(self, size):
+ return self.request(self.P_MALLOC, size)
+ def memalign(self, align, size):
+ return self.request(self.P_MEMALIGN, align, size)
+ def free(self, ptr):
+ self.request(self.P_FREE, ptr)
+
+ def kboot_boot(self, kernel):
+ self.request(self.P_KBOOT_BOOT, kernel)
+ def kboot_set_chosen(self, name, value):
+ self.request(self.P_KBOOT_SET_CHOSEN, name, value)
+ def kboot_set_initrd(self, base, size):
+ self.request(self.P_KBOOT_SET_INITRD, base, size)
+ def kboot_prepare_dt(self, dt_addr):
+ return self.request(self.P_KBOOT_PREPARE_DT, dt_addr)
+
+ def pmgr_clock_enable(self, clkid):
+ return self.request(self.P_PMGR_CLOCK_ENABLE, clkid)
+ def pmgr_clock_disable(self, clkid):
+ return self.request(self.P_PMGR_CLOCK_DISABLE, clkid)
+ def pmgr_adt_clocks_enable(self, path):
+ return self.request(self.P_PMGR_ADT_CLOCKS_ENABLE, path)
+ def pmgr_adt_clocks_disable(self, path):
+ return self.request(self.P_PMGR_ADT_CLOCKS_DISABLE, path)
+ def pmgr_reset(self, die, name):
+ return self.request(self.P_PMGR_RESET, die, name)
+
+ def iodev_set_usage(self, iodev, usage):
+ return self.request(self.P_IODEV_SET_USAGE, iodev, usage)
+ def iodev_can_read(self, iodev):
+ return self.request(self.P_IODEV_CAN_READ, iodev)
+ def iodev_can_write(self, iodev):
+ return self.request(self.P_IODEV_CAN_WRITE, iodev)
+ def iodev_read(self, iodev, buf, size=None):
+ return self.request(self.P_IODEV_READ, iodev, buf, size)
+ def iodev_write(self, iodev, buf, size=None):
+ return self.request(self.P_IODEV_WRITE, iodev, buf, size)
+ def iodev_whoami(self):
+ return IODEV(self.request(self.P_IODEV_WHOAMI))
+ def usb_iodev_vuart_setup(self, iodev):
+ return self.request(self.P_USB_IODEV_VUART_SETUP, iodev)
+
+ def tunables_apply_global(self, path, prop):
+ return self.request(self.P_TUNABLES_APPLY_GLOBAL, path, prop)
+ def tunables_apply_local(self, path, prop, reg_offset):
+ return self.request(self.P_TUNABLES_APPLY_LOCAL, path, prop, reg_offset)
+ def tunables_apply_local_addr(self, path, prop, base):
+ return self.request(self.P_TUNABLES_APPLY_LOCAL, path, prop, base)
+
+ def dart_init(self, base, sid, dart_type=DART.T8020):
+ return self.request(self.P_DART_INIT, base, sid, dart_type)
+ def dart_shutdown(self, dart):
+ return self.request(self.P_DART_SHUTDOWN, dart)
+ def dart_map(self, dart, iova, bfr, len):
+ return self.request(self.P_DART_MAP, dart, iova, bfr, len)
+ def dart_unmap(self, dart, iova, len):
+ return self.request(self.P_DART_UNMAP, dart, iova, len)
+
+ def hv_init(self):
+ return self.request(self.P_HV_INIT)
+ def hv_map(self, from_, to, size, incr):
+ return self.request(self.P_HV_MAP, from_, to, size, incr)
+ def hv_start(self, entry, *args):
+ return self.request(self.P_HV_START, entry, *args)
+ def hv_translate(self, addr, s1=False, w=False):
+ '''Translate virtual address
+ stage 1 only if s1, for write if w'''
+ return self.request(self.P_HV_TRANSLATE, addr, s1, w)
+ def hv_pt_walk(self, addr):
+ return self.request(self.P_HV_PT_WALK, addr)
+ def hv_map_vuart(self, base, irq, iodev):
+ return self.request(self.P_HV_MAP_VUART, base, irq, iodev)
+ def hv_trace_irq(self, evt_type, num, count, flags):
+ return self.request(self.P_HV_TRACE_IRQ, evt_type, num, count, flags)
+ def hv_wdt_start(self, cpu):
+ return self.request(self.P_HV_WDT_START, cpu)
+ def hv_start_secondary(self, cpu, entry, *args):
+ return self.request(self.P_HV_START_SECONDARY, cpu, entry, *args)
+ def hv_switch_cpu(self, cpu):
+ return self.request(self.P_HV_SWITCH_CPU, cpu)
+ def hv_set_time_stealing(self, enabled, reset):
+ return self.request(self.P_HV_SET_TIME_STEALING, int(bool(enabled)), int(bool(reset)))
+ def hv_pin_cpu(self, cpu):
+ return self.request(self.P_HV_PIN_CPU, cpu)
+ def hv_write_hcr(self, hcr):
+ return self.request(self.P_HV_WRITE_HCR, hcr)
+ def hv_map_virtio(self, base, config):
+ return self.request(self.P_HV_MAP_VIRTIO, base, config)
+ def virtio_put_buffer(self, base, qu, idx, length):
+ return self.request(self.P_VIRTIO_PUT_BUFFER, base, qu, idx, length)
+
+ def fb_init(self):
+ return self.request(self.P_FB_INIT)
+ def fb_shutdown(self, restore_logo=True):
+ return self.request(self.P_FB_SHUTDOWN, restore_logo)
+ def fb_blit(self, x, y, w, h, ptr, stride, pix_fmt=PIX_FMT.XRGB):
+ return self.request(self.P_FB_BLIT, x, y, w, h, ptr, stride | pix_fmt << 32)
+ def fb_unblit(self, x, y, w, h, ptr, stride):
+ return self.request(self.P_FB_UNBLIT, x, y, w, h, ptr, stride)
+ def fb_fill(self, x, y, w, h, color):
+ return self.request(self.P_FB_FILL, x, y, w, h, color)
+ def fb_clear(self, color):
+ return self.request(self.P_FB_CLEAR, color)
+ def fb_display_logo(self):
+ return self.request(self.P_FB_DISPLAY_LOGO)
+ def fb_restore_logo(self):
+ return self.request(self.P_FB_RESTORE_LOGO)
+ def fb_improve_logo(self):
+ return self.request(self.P_FB_IMPROVE_LOGO)
+
+ def pcie_init(self):
+ return self.request(self.P_PCIE_INIT)
+ def pcie_shutdown(self):
+ return self.request(self.P_PCIE_SHUTDOWN)
+
+ def nvme_init(self):
+ return self.request(self.P_NVME_INIT)
+ def nvme_shutdown(self):
+ return self.request(self.P_NVME_SHUTDOWN)
+ def nvme_read(self, nsid, lba, bfr):
+ return self.request(self.P_NVME_READ, nsid, lba, bfr)
+ def nvme_flush(self, nsid):
+ return self.request(self.P_NVME_FLUSH, nsid)
+
+ def mcc_get_carveouts(self):
+ return self.request(self.P_MCC_GET_CARVEOUTS)
+
+ def display_init(self):
+ return self.request(self.P_DISPLAY_INIT)
+ def display_configure(self, cfg):
+ return self.request(self.P_DISPLAY_CONFIGURE, cfg)
+ def display_shutdown(self, mode):
+ return self.request(self.P_DISPLAY_SHUTDOWN, mode)
+ def display_start_dcp(self):
+ return self.request(self.P_DISPLAY_START_DCP)
+ def display_is_external(self):
+ return self.request(self.P_DISPLAY_IS_EXTERNAL)
+
+ def dapf_init_all(self):
+ return self.request(self.P_DAPF_INIT_ALL)
+ def dapf_init(self, path):
+ return self.request(self.P_DAPF_INIT, path)
+
+__all__.extend(k for k, v in globals().items()
+ if (callable(v) or isinstance(v, type)) and v.__module__ == __name__)
+
+if __name__ == "__main__":
+ import serial
+ uartdev = os.environ.get("M1N1DEVICE", "/dev/m1n1")
+ usbuart = serial.Serial(uartdev, 115200)
+ uartif = UartInterface(usbuart, debug=True)
+ print("Sending NOP...", end=' ')
+ uartif.nop()
+ print("OK")
+ proxy = M1N1Proxy(uartif, debug=True)
+ print("Sending Proxy NOP...", end=' ')
+ proxy.nop()
+ print("OK")
+ print("Boot args: 0x%x" % proxy.get_bootargs())