# SPDX-License-Identifier: MIT import os, sys, struct, serial, time from construct import * from enum import IntEnum, IntFlag from serial.tools.miniterm import Miniterm from .utils import * from .sysreg import * __all__ = ["REGION_RWX_EL0", "REGION_RW_EL0", "REGION_RX_EL1"] # Hack to disable input buffer flushing class Serial(serial.Serial): def _reset_input_buffer(self): return def reset_input_buffer(self): return class UartError(RuntimeError): pass class UartTimeout(UartError): pass class UartCMDError(UartError): pass class UartChecksumError(UartError): pass class UartRemoteError(UartError): pass class Feature(IntFlag): DISABLE_DATA_CSUMS = 0x01 # Data transfers don't use checksums @classmethod def get_all(cls): return cls.DISABLE_DATA_CSUMS def __str__(self): return ", ".join(feature.name for feature in self.__class__ if feature & self) or "" class START(IntEnum): BOOT = 0 EXCEPTION = 1 EXCEPTION_LOWER = 2 HV = 3 class EXC(IntEnum): SYNC = 0 IRQ = 1 FIQ = 2 SERROR = 3 class EVENT(IntEnum): MMIOTRACE = 1 IRQTRACE = 2 class EXC_RET(IntEnum): UNHANDLED = 1 HANDLED = 2 EXIT_GUEST = 3 STEP = 4 class DCP_SHUTDOWN_MODE(IntEnum): QUIESCED = 0 SLEEP_IF_EXTERNAL = 1 SLEEP = 2 class PIX_FMT(IntEnum): XRGB = 0 XBGR = 1 class DART(IntEnum): T8020 = 0 T8110 = 1 T6000 = 2 ExcInfo = Struct( "regs" / Array(32, Int64ul), "spsr" / RegAdapter(SPSR), "elr" / Int64ul, "esr" / RegAdapter(ESR), "far" / Int64ul, "afsr1" / Int64ul, "sp" / Array(3, Int64ul), "cpu_id" / Int64ul, "mpidr" / Int64ul, "elr_phys" / Int64ul, "far_phys" / Int64ul, "sp_phys" / Int64ul, "data" / Int64ul, ) # Sends 56+ byte Commands and Expects 36 Byte Responses # Commands are format self.CMD_LEN: raise ValueError("Incorrect payload size %d"%len(payload)) payload = payload.ljust(self.CMD_LEN, b"\x00") command = struct.pack(" ") self.pted = True if c == 10: self.pted = False sys.stdout.write(chr(c)) sys.stdout.flush() def ttymode(self, dev=None): if dev is None: dev = self.dev tout = dev.timeout self.tty_enable = True dev.timeout = None term = Miniterm(dev, eol='cr') term.exit_character = chr(0x1d) # GS/CTRL+] term.menu_character = chr(0x14) # Menu: CTRL+T term.raw = True term.set_rx_encoding('UTF-8') term.set_tx_encoding('UTF-8') print('--- TTY mode | Quit: CTRL+] | Menu: CTRL+T ---') term.start() try: term.join(True) except KeyboardInterrupt: pass print('--- Exit TTY mode ---') term.join() term.close() dev.timeout = tout self.tty_enable = False def reply(self, cmd): reply = b'' while True: if not reply or reply[-1] != 255: reply = b'' reply += self.readfull(1) if reply != b"\xff": self.unkhandler(reply) continue else: reply = b'\xff' reply += self.readfull(1) if reply != b"\xff\x55": self.unkhandler(reply) continue reply += self.readfull(1) if reply != b"\xff\x55\xaa": self.unkhandler(reply) continue reply += self.readfull(1) cmdin = struct.unpack(">", hexdump(reply)) checksum = struct.unpack(">", hexdump(reply)) status, data, checksum = struct.unpack("> DATA:") chexdump(data) ccsum = self.data_checksum(data) if checksum != ccsum: raise UartChecksumError("Reply data checksum error: Expected 0x%08x, got 0x%08x"%(checksum, ccsum)) if self.enabled_features & Feature.DISABLE_DATA_CSUMS: # Extra sentinel after the data to make sure no data was lost sentinel = struct.unpack(" 6: raise ValueError("Too many arguments") args = list(args) + [0] * (6 - len(args)) req = struct.pack("<7Q", opcode, *args) if self.debug: print("<<<< %08x: %08x %08x %08x %08x %08x %08x"%tuple([opcode] + args)) reply = self.iface.proxyreq(req, reboot=reboot, no_reply=no_reply, pre_reply=None) if no_reply or reboot and reply is None: return ret_fmt = "q" if signed else "Q" rop, status, retval = struct.unpack(">>> %08x: %d %08x"%(rop, status, retval)) if reboot: return if rop != opcode: raise ProxyReplyError("Reply opcode mismatch: Expected 0x%08x, got 0x%08x"%(opcode,rop)) if status != self.S_OK: if status == self.S_BADCMD: raise ProxyCommandError("Reply error: Bad Command") else: raise ProxyRemoteError("Reply error: Unknown error (%d)"%status) return retval def request(self, opcode, *args, **kwargs): free = [] args = list(args) args2 = [] for i, arg in enumerate(args): if isinstance(arg, str): arg = arg.encode("utf-8") + b"\0" if isinstance(arg, bytes) and self.heap: p = self.heap.malloc(len(arg)) free.append(p) self.iface.writemem(p, arg) if (i < (len(args) - 1)) and args[i + 1] is None: args[i + 1] = len(arg) arg = p args2.append(arg) try: return self._request(opcode, *args2, **kwargs) finally: for i in free: self.heap.free(i) def nop(self): self.request(self.P_NOP) def exit(self, retval=0): self.request(self.P_EXIT, retval) def call(self, addr, *args, reboot=False): if len(args) > 5: raise ValueError("Too many arguments") return self.request(self.P_CALL, addr, *args, reboot=reboot) def reload(self, addr, *args, el1=False): if len(args) > 4: raise ValueError("Too many arguments") if el1: self.request(self.P_EL1_CALL, addr, *args, no_reply=True) else: try: self.request(self.P_VECTOR, addr, *args) self.iface.wait_boot() except ProxyCommandError: # old m1n1 does not support P_VECTOR try: self.mmu_shutdown() except ProxyCommandError: # older m1n1 does not support MMU pass self.request(self.P_CALL, addr, *args, reboot=True) def get_bootargs(self): return self.request(self.P_GET_BOOTARGS) def get_base(self): return self.request(self.P_GET_BASE) def set_baud(self, baudrate): self.iface.tty_enable = False def change(): self.iface.dev.baudrate = baudrate try: self.request(self.P_SET_BAUD, baudrate, 16, 0x005aa5f0, pre_reply=change) finally: self.iface.tty_enable = True def udelay(self, usec): self.request(self.P_UDELAY, usec) def set_exc_guard(self, mode): self.request(self.P_SET_EXC_GUARD, mode) def get_exc_count(self): return self.request(self.P_GET_EXC_COUNT) def el0_call(self, addr, *args): if len(args) > 4: raise ValueError("Too many arguments") return self.request(self.P_EL0_CALL, addr, *args) def el1_call(self, addr, *args): if len(args) > 4: raise ValueError("Too many arguments") return self.request(self.P_EL1_CALL, addr, *args) def gl1_call(self, addr, *args): if len(args) > 4: raise ValueError("Too many arguments") return self.request(self.P_GL1_CALL, addr, *args) def gl2_call(self, addr, *args): if len(args) > 4: raise ValueError("Too many arguments") return self.request(self.P_GL2_CALL, addr, *args) def get_simd_state(self, buf): self.request(self.P_GET_SIMD_STATE, buf) def put_simd_state(self, buf): self.request(self.P_PUT_SIMD_STATE, buf) def reboot(self): self.request(self.P_REBOOT, no_reply=True) def write64(self, addr, data): '''write 8 byte value to given address''' if addr & 7: raise AlignmentError() self.request(self.P_WRITE64, addr, data) def write32(self, addr, data): '''write 4 byte value to given address''' if addr & 3: raise AlignmentError() self.request(self.P_WRITE32, addr, data) def write16(self, addr, data): '''write 2 byte value to given address''' if addr & 1: raise AlignmentError() self.request(self.P_WRITE16, addr, data) def write8(self, addr, data): '''write 1 byte value to given address''' self.request(self.P_WRITE8, addr, data) def read64(self, addr): '''return 8 byte value from given address''' if addr & 7: raise AlignmentError() return self.request(self.P_READ64, addr) def read32(self, addr): '''return 4 byte value given address''' if addr & 3: raise AlignmentError() return self.request(self.P_READ32, addr) def read16(self, addr): '''return 2 byte value from given address''' if addr & 1: raise AlignmentError() return self.request(self.P_READ16, addr) def read8(self, addr): '''return 1 byte value from given address''' return self.request(self.P_READ8, addr) def set64(self, addr, data): '''Or 64 bit value of data into memory at addr and return result''' if addr & 7: raise AlignmentError() return self.request(self.P_SET64, addr, data) def set32(self, addr, data): '''Or 32 bit value of data into memory at addr and return result''' if addr & 3: raise AlignmentError() return self.request(self.P_SET32, addr, data) def set16(self, addr, data): '''Or 16 bit value of data into memory at addr and return result''' if addr & 1: raise AlignmentError() return self.request(self.P_SET16, addr, data) def set8(self, addr, data): '''Or byte value of data into memory at addr and return result''' return self.request(self.P_SET8, addr, data) def clear64(self, addr, data): '''Clear bits in 64 bit memory at address addr that are set in parameter data and return result''' if addr & 7: raise AlignmentError() return self.request(self.P_CLEAR64, addr, data) def clear32(self, addr, data): '''Clear bits in 32 bit memory at address addr that are set in parameter data and return result''' if addr & 3: raise AlignmentError() return self.request(self.P_CLEAR32, addr, data) def clear16(self, addr, data): '''Clear bits in 16 bit memory at address addr that are set in parameter data and return result''' if addr & 1: raise AlignmentError() return self.request(self.P_CLEAR16, addr, data) def clear8(self, addr, data): '''Clear bits in 8 bit memory at addr that are set in data and return result''' return self.request(self.P_CLEAR8, addr, data) def mask64(self, addr, clear, set): '''Clear bits in 64 bit memory at address addr that are set in clear, then set the bits in set and return result''' if addr & 7: raise AlignmentError() return self.request(self.P_MASK64, addr, clear, set) def mask32(self, addr, clear, set): '''Clear bits in 32 bit memory at address addr that are set in clear, then set the bits in set and return result''' if addr & 3: raise AlignmentError() return self.request(self.P_MASK32, addr, clear, set) def mask16(self, addr, clear, set): '''Clear select bits in 16 bit memory addr that are set in clear parameter, then set the bits in set parameter and return result''' if addr & 1: raise AlignmentError() return self.request(self.P_MASK16, addr, clear, set) def mask8(self, addr, clear, set): '''Clear bits in 1 byte memory at addr that are set in clear parameter, then set the bits in set parameter and return the result''' return self.request(self.P_MASK8, addr, clear, set) def writeread64(self, addr, data): return self.request(self.P_WRITEREAD64, addr, data) def writeread32(self, addr, data): return self.request(self.P_WRITEREAD32, addr, data) def writeread16(self, addr, data): return self.request(self.P_WRITEREAD16, addr, data) def writeread8(self, addr, data): return self.request(self.P_WRITEREAD8, addr, data) def memcpy64(self, dst, src, size): if src & 7 or dst & 7: raise AlignmentError() self.request(self.P_MEMCPY64, dst, src, size) def memcpy32(self, dst, src, size): if src & 3 or dst & 3: raise AlignmentError() self.request(self.P_MEMCPY32, dst, src, size) def memcpy16(self, dst, src, size): if src & 1 or dst & 1: raise AlignmentError() self.request(self.P_MEMCPY16, dst, src, size) def memcpy8(self, dst, src, size): self.request(self.P_MEMCPY8, dst, src, size) def memset64(self, dst, src, size): if dst & 7: raise AlignmentError() self.request(self.P_MEMSET64, dst, src, size) def memset32(self, dst, src, size): if dst & 3: raise AlignmentError() self.request(self.P_MEMSET32, dst, src, size) def memset16(self, dst, src, size): if dst & 1: raise AlignmentError() self.request(self.P_MEMSET16, dst, src, size) def memset8(self, dst, src, size): self.request(self.P_MEMSET8, dst, src, size) def ic_ialluis(self): self.request(self.P_IC_IALLUIS) def ic_iallu(self): self.request(self.P_IC_IALLU) def ic_ivau(self, addr, size): self.request(self.P_IC_IVAU, addr, size) def dc_ivac(self, addr, size): self.request(self.P_DC_IVAC, addr, size) def dc_isw(self, sw): self.request(self.P_DC_ISW, sw) def dc_csw(self, sw): self.request(self.P_DC_CSW, sw) def dc_cisw(self, sw): self.request(self.P_DC_CISW, sw) def dc_zva(self, addr, size): self.request(self.P_DC_ZVA, addr, size) def dc_cvac(self, addr, size): self.request(self.P_DC_CVAC, addr, size) def dc_cvau(self, addr, size): self.request(self.P_DC_CVAU, addr, size) def dc_civac(self, addr, size): self.request(self.P_DC_CIVAC, addr, size) def mmu_shutdown(self): self.request(self.P_MMU_SHUTDOWN) def mmu_init(self): self.request(self.P_MMU_INIT) def mmu_disable(self): return self.request(self.P_MMU_DISABLE) def mmu_restore(self, flags): self.request(self.P_MMU_RESTORE, flags) def mmu_init_secondary(self, cpu): self.request(self.P_MMU_INIT_SECONDARY, cpu) def xzdec(self, inbuf, insize, outbuf=0, outsize=0): return self.request(self.P_XZDEC, inbuf, insize, outbuf, outsize, signed=True) def gzdec(self, inbuf, insize, outbuf, outsize): return self.request(self.P_GZDEC, inbuf, insize, outbuf, outsize, signed=True) def smp_start_secondaries(self): self.request(self.P_SMP_START_SECONDARIES) def smp_call(self, cpu, addr, *args): if len(args) > 4: raise ValueError("Too many arguments") self.request(self.P_SMP_CALL, cpu, addr, *args) def smp_call_sync(self, cpu, addr, *args): if len(args) > 4: raise ValueError("Too many arguments") return self.request(self.P_SMP_CALL_SYNC, cpu, addr, *args) def smp_wait(self, cpu): return self.request(self.P_SMP_WAIT, cpu) def smp_set_wfe_mode(self, mode): return self.request(self.P_SMP_SET_WFE_MODE, mode) def heapblock_alloc(self, size): return self.request(self.P_HEAPBLOCK_ALLOC, size) def malloc(self, size): return self.request(self.P_MALLOC, size) def memalign(self, align, size): return self.request(self.P_MEMALIGN, align, size) def free(self, ptr): self.request(self.P_FREE, ptr) def kboot_boot(self, kernel): self.request(self.P_KBOOT_BOOT, kernel) def kboot_set_chosen(self, name, value): self.request(self.P_KBOOT_SET_CHOSEN, name, value) def kboot_set_initrd(self, base, size): self.request(self.P_KBOOT_SET_INITRD, base, size) def kboot_prepare_dt(self, dt_addr): return self.request(self.P_KBOOT_PREPARE_DT, dt_addr) def pmgr_clock_enable(self, clkid): return self.request(self.P_PMGR_CLOCK_ENABLE, clkid) def pmgr_clock_disable(self, clkid): return self.request(self.P_PMGR_CLOCK_DISABLE, clkid) def pmgr_adt_clocks_enable(self, path): return self.request(self.P_PMGR_ADT_CLOCKS_ENABLE, path) def pmgr_adt_clocks_disable(self, path): return self.request(self.P_PMGR_ADT_CLOCKS_DISABLE, path) def pmgr_reset(self, die, name): return self.request(self.P_PMGR_RESET, die, name) def iodev_set_usage(self, iodev, usage): return self.request(self.P_IODEV_SET_USAGE, iodev, usage) def iodev_can_read(self, iodev): return self.request(self.P_IODEV_CAN_READ, iodev) def iodev_can_write(self, iodev): return self.request(self.P_IODEV_CAN_WRITE, iodev) def iodev_read(self, iodev, buf, size=None): return self.request(self.P_IODEV_READ, iodev, buf, size) def iodev_write(self, iodev, buf, size=None): return self.request(self.P_IODEV_WRITE, iodev, buf, size) def iodev_whoami(self): return IODEV(self.request(self.P_IODEV_WHOAMI)) def usb_iodev_vuart_setup(self, iodev): return self.request(self.P_USB_IODEV_VUART_SETUP, iodev) def tunables_apply_global(self, path, prop): return self.request(self.P_TUNABLES_APPLY_GLOBAL, path, prop) def tunables_apply_local(self, path, prop, reg_offset): return self.request(self.P_TUNABLES_APPLY_LOCAL, path, prop, reg_offset) def tunables_apply_local_addr(self, path, prop, base): return self.request(self.P_TUNABLES_APPLY_LOCAL, path, prop, base) def dart_init(self, base, sid, dart_type=DART.T8020): return self.request(self.P_DART_INIT, base, sid, dart_type) def dart_shutdown(self, dart): return self.request(self.P_DART_SHUTDOWN, dart) def dart_map(self, dart, iova, bfr, len): return self.request(self.P_DART_MAP, dart, iova, bfr, len) def dart_unmap(self, dart, iova, len): return self.request(self.P_DART_UNMAP, dart, iova, len) def hv_init(self): return self.request(self.P_HV_INIT) def hv_map(self, from_, to, size, incr): return self.request(self.P_HV_MAP, from_, to, size, incr) def hv_start(self, entry, *args): return self.request(self.P_HV_START, entry, *args) def hv_translate(self, addr, s1=False, w=False): '''Translate virtual address stage 1 only if s1, for write if w''' return self.request(self.P_HV_TRANSLATE, addr, s1, w) def hv_pt_walk(self, addr): return self.request(self.P_HV_PT_WALK, addr) def hv_map_vuart(self, base, irq, iodev): return self.request(self.P_HV_MAP_VUART, base, irq, iodev) def hv_trace_irq(self, evt_type, num, count, flags): return self.request(self.P_HV_TRACE_IRQ, evt_type, num, count, flags) def hv_wdt_start(self, cpu): return self.request(self.P_HV_WDT_START, cpu) def hv_start_secondary(self, cpu, entry, *args): return self.request(self.P_HV_START_SECONDARY, cpu, entry, *args) def hv_switch_cpu(self, cpu): return self.request(self.P_HV_SWITCH_CPU, cpu) def hv_set_time_stealing(self, enabled, reset): return self.request(self.P_HV_SET_TIME_STEALING, int(bool(enabled)), int(bool(reset))) def hv_pin_cpu(self, cpu): return self.request(self.P_HV_PIN_CPU, cpu) def hv_write_hcr(self, hcr): return self.request(self.P_HV_WRITE_HCR, hcr) def hv_map_virtio(self, base, config): return self.request(self.P_HV_MAP_VIRTIO, base, config) def virtio_put_buffer(self, base, qu, idx, length): return self.request(self.P_VIRTIO_PUT_BUFFER, base, qu, idx, length) def fb_init(self): return self.request(self.P_FB_INIT) def fb_shutdown(self, restore_logo=True): return self.request(self.P_FB_SHUTDOWN, restore_logo) def fb_blit(self, x, y, w, h, ptr, stride, pix_fmt=PIX_FMT.XRGB): return self.request(self.P_FB_BLIT, x, y, w, h, ptr, stride | pix_fmt << 32) def fb_unblit(self, x, y, w, h, ptr, stride): return self.request(self.P_FB_UNBLIT, x, y, w, h, ptr, stride) def fb_fill(self, x, y, w, h, color): return self.request(self.P_FB_FILL, x, y, w, h, color) def fb_clear(self, color): return self.request(self.P_FB_CLEAR, color) def fb_display_logo(self): return self.request(self.P_FB_DISPLAY_LOGO) def fb_restore_logo(self): return self.request(self.P_FB_RESTORE_LOGO) def fb_improve_logo(self): return self.request(self.P_FB_IMPROVE_LOGO) def pcie_init(self): return self.request(self.P_PCIE_INIT) def pcie_shutdown(self): return self.request(self.P_PCIE_SHUTDOWN) def nvme_init(self): return self.request(self.P_NVME_INIT) def nvme_shutdown(self): return self.request(self.P_NVME_SHUTDOWN) def nvme_read(self, nsid, lba, bfr): return self.request(self.P_NVME_READ, nsid, lba, bfr) def nvme_flush(self, nsid): return self.request(self.P_NVME_FLUSH, nsid) def mcc_get_carveouts(self): return self.request(self.P_MCC_GET_CARVEOUTS) def display_init(self): return self.request(self.P_DISPLAY_INIT) def display_configure(self, cfg): return self.request(self.P_DISPLAY_CONFIGURE, cfg) def display_shutdown(self, mode): return self.request(self.P_DISPLAY_SHUTDOWN, mode) def display_start_dcp(self): return self.request(self.P_DISPLAY_START_DCP) def display_is_external(self): return self.request(self.P_DISPLAY_IS_EXTERNAL) def dapf_init_all(self): return self.request(self.P_DAPF_INIT_ALL) def dapf_init(self, path): return self.request(self.P_DAPF_INIT, path) __all__.extend(k for k, v in globals().items() if (callable(v) or isinstance(v, type)) and v.__module__ == __name__) if __name__ == "__main__": import serial uartdev = os.environ.get("M1N1DEVICE", "/dev/m1n1") usbuart = serial.Serial(uartdev, 115200) uartif = UartInterface(usbuart, debug=True) print("Sending NOP...", end=' ') uartif.nop() print("OK") proxy = M1N1Proxy(uartif, debug=True) print("Sending Proxy NOP...", end=' ') proxy.nop() print("OK") print("Boot args: 0x%x" % proxy.get_bootargs())