summaryrefslogtreecommitdiff
path: root/tools/proxyclient/m1n1/fw/dcp
diff options
context:
space:
mode:
Diffstat (limited to 'tools/proxyclient/m1n1/fw/dcp')
-rw-r--r--tools/proxyclient/m1n1/fw/dcp/__init__.py2
-rw-r--r--tools/proxyclient/m1n1/fw/dcp/client.py14
-rw-r--r--tools/proxyclient/m1n1/fw/dcp/dcpav.py110
-rw-r--r--tools/proxyclient/m1n1/fw/dcp/dcpep.py163
-rw-r--r--tools/proxyclient/m1n1/fw/dcp/iboot.py215
-rw-r--r--tools/proxyclient/m1n1/fw/dcp/ipc.py789
-rw-r--r--tools/proxyclient/m1n1/fw/dcp/manager.py319
-rw-r--r--tools/proxyclient/m1n1/fw/dcp/parse_log.py43
8 files changed, 1655 insertions, 0 deletions
diff --git a/tools/proxyclient/m1n1/fw/dcp/__init__.py b/tools/proxyclient/m1n1/fw/dcp/__init__.py
new file mode 100644
index 0000000..e77f6cf
--- /dev/null
+++ b/tools/proxyclient/m1n1/fw/dcp/__init__.py
@@ -0,0 +1,2 @@
+# SPDX-License-Identifier: MIT
+
diff --git a/tools/proxyclient/m1n1/fw/dcp/client.py b/tools/proxyclient/m1n1/fw/dcp/client.py
new file mode 100644
index 0000000..941ab20
--- /dev/null
+++ b/tools/proxyclient/m1n1/fw/dcp/client.py
@@ -0,0 +1,14 @@
+# SPDX-License-Identifier: MIT
+from ...utils import *
+
+from ..asc import StandardASC
+from .dcpep import DCPEndpoint
+
+class DCPClient(StandardASC):
+ ENDPOINTS = {
+ 0x37: DCPEndpoint,
+ }
+
+ def __init__(self, u, asc_base, dart=None, disp_dart=None):
+ super().__init__(u, asc_base, dart)
+ self.disp_dart = disp_dart
diff --git a/tools/proxyclient/m1n1/fw/dcp/dcpav.py b/tools/proxyclient/m1n1/fw/dcp/dcpav.py
new file mode 100644
index 0000000..d063c6c
--- /dev/null
+++ b/tools/proxyclient/m1n1/fw/dcp/dcpav.py
@@ -0,0 +1,110 @@
+# SPDX-License-Identifier: MIT
+from construct import *
+
+from ...utils import *
+from ..asc import StandardASC
+from ..afk.epic import *
+
+class DCPAVControllerService(EPICStandardService):
+ NAME = "dcpav-controller-epic"
+ SHORT = "dcpav"
+
+ def setPower(self, power):
+ self.call(8, 0x8, struct.pack("<16xI12x", power))
+
+ def getPower(self, power):
+ return struct.unpack("<16xI12x", self.call(8, 0x9, bytes(32)))
+
+ def wakeDisplay(self):
+ self.call(8, 0xa, bytes(16))
+
+ def sleepDisplay(self):
+ self.call(8, 0xb, bytes(16))
+
+ def forceHotPlugDetect(self):
+ self.call(8, 0xc, bytes(16))
+
+ def setVirtualDeviceMode(self, mode):
+ self.call(8, 0xd, struct.pack("<16xI12x", mode))
+
+class DCPDPControllerService(EPICStandardService):
+ NAME = "dcpdp-controller-epic"
+ SHORT = "dcpdp"
+
+class DCPDPTXEndpoint(EPICEndpoint):
+ SHORT = "dptx"
+
+ SERVICES = [
+ DCPAVControllerService,
+ DCPDPControllerService,
+ ]
+
+ATC0 = 0
+ATC1 = 1
+ATC2 = 2
+ATC3 = 3
+LPDPTX = 4
+DPTX = 5
+
+DPPHY = 0
+DPIN0 = 1
+DPIN1 = 2
+
+class DCPDPTXRemotePortService(EPICStandardService):
+ NAME = "dcpdptx-port-epic"
+ SHORT = "port"
+
+ def displayRequest(self):
+ self.call(8, 8, bytes(16))
+
+ def displayRelease(self):
+ self.call(8, 9, bytes(16))
+
+ def connectTo(self, connected, unit, port, unk=0):
+ target = 0
+ if connected:
+ target |= (1 << 8)
+ target |= unit
+ target |= port << 4
+ self.call(8, 13, struct.pack("<16xII8x", unk, target))
+
+class DCPDPTXPortEndpoint(EPICEndpoint):
+ SHORT = "dpport"
+
+ SERVICES = [
+ DCPDPTXRemotePortService,
+ DCPDPControllerService,
+ ]
+
+class DCPDPDevice(EPICStandardService):
+ NAME = "dcpav-device-epic"
+ SHORT = "dpdev"
+
+class DCPAVDeviceEndpoint(EPICEndpoint):
+ SHORT = "avdev"
+
+ SERVICES = [
+ DCPDPDevice,
+ ]
+
+class DCPDPService(EPICStandardService):
+ NAME = "dcpav-service-epic"
+ SHORT = "dpserv"
+
+class DCPAVServiceEndpoint(EPICEndpoint):
+ SHORT = "avserv"
+
+ SERVICES = [
+ DCPDPService,
+ ]
+
+class DCPAVSimpleVideoInterface(EPICStandardService):
+ NAME = "dcpav-video-interface-epic"
+ SHORT = "video"
+
+class DCPAVVideoEndpoint(EPICEndpoint):
+ SHORT = "avserv"
+
+ SERVICES = [
+ DCPAVSimpleVideoInterface,
+ ]
diff --git a/tools/proxyclient/m1n1/fw/dcp/dcpep.py b/tools/proxyclient/m1n1/fw/dcp/dcpep.py
new file mode 100644
index 0000000..6e4f250
--- /dev/null
+++ b/tools/proxyclient/m1n1/fw/dcp/dcpep.py
@@ -0,0 +1,163 @@
+# SPDX-License-Identifier: MIT
+import struct
+from dataclasses import dataclass
+from enum import IntEnum
+
+from ..asc.base import *
+from ...utils import *
+
+## DCP main endpoint
+
+class DCPMessage(Register64):
+ TYPE = 3, 0
+
+class DCPEp_SetShmem(DCPMessage):
+ DVA = 63, 16
+ FLAG = 7, 4
+ TYPE = 3, 0, Constant(0)
+
+class DCPEp_InitComplete(DCPMessage):
+ TYPE = 3, 0, Constant(1)
+
+class CallContext(IntEnum):
+ CB = 0
+ CMD = 2
+ ASYNC = 3
+ OOBCB = 4
+ OOBCMD = 6
+
+class DCPEp_Msg(DCPMessage):
+ LEN = 63, 32
+ OFF = 31, 16
+ CTX = 11, 8, CallContext
+ ACK = 6
+ TYPE = 3, 0, Constant(2)
+
+@dataclass
+class DCPCallState:
+ tag: str
+ off: int
+ in_len: int
+ in_data: bytes
+ out_addr: int
+ out_len: int
+ complete: bool = False
+
+class DCPCallChannel(Reloadable):
+ def __init__(self, dcpep, name, buf, bufsize):
+ self.dcp = dcpep
+ self.name = name
+ self.buf = buf
+ self.bufsize = bufsize
+ self.off = 0
+ self.pending = []
+
+ def ack(self):
+ if not self.pending:
+ raise Exception("ACK with no calls pending")
+
+ self.pending[-1].complete = True
+
+ def call(self, ctx, tag, inbuf, out_len):
+ in_len = len(inbuf)
+ data = tag.encode("ascii")[::-1] + struct.pack("<II", in_len, out_len) + inbuf
+ data_size = len(data) + out_len
+ assert (self.off + data_size) <= self.bufsize
+
+ self.dcp.asc.iface.writemem(self.dcp.shmem + self.buf + self.off, data)
+
+ state = DCPCallState(off=self.off, tag=tag, in_len=in_len, in_data=data, out_len=out_len,
+ out_addr=self.buf + self.off + 12 + in_len)
+
+ self.off += align_up(data_size, 0x40)
+ self.pending.append(state)
+
+ print(f"len={data_size:#x} {in_len}")
+ self.dcp.send(DCPEp_Msg(LEN=data_size, OFF=state.off, CTX=ctx, ACK=0))
+
+ while not state.complete:
+ self.dcp.asc.work()
+
+ print(f"off={state.out_addr:#x} len={out_len}")
+ out_data = self.dcp.asc.iface.readmem(self.dcp.shmem + state.out_addr, out_len)
+
+ assert self.pending.pop() is state
+ self.off = state.off
+
+ return out_data
+
+class DCPCallbackChannel(Reloadable):
+ def __init__(self, dcpep, name, buf, bufsize):
+ self.dcp = dcpep
+ self.name = name
+ self.buf = buf
+ self.bufsize = bufsize
+ self.pending = []
+
+ def cb(self, msg):
+ data = self.dcp.asc.iface.readmem(self.dcp.shmem + self.buf + msg.OFF, msg.LEN)
+ tag = data[:4][::-1].decode("ascii")
+ in_len, out_len = struct.unpack("<II", data[4:12])
+ in_data = data[12:12 + in_len]
+
+ state = DCPCallState(off=msg.OFF, tag=tag, in_len=in_len, out_len=out_len,
+ in_data=in_data, out_addr=self.buf + msg.OFF + 12 + in_len)
+
+ self.pending.append(state)
+
+ out_data = self.dcp.mgr.handle_cb(state)
+ self.dcp.asc.iface.writemem(self.dcp.shmem + state.out_addr, out_data)
+ self.dcp.send(DCPEp_Msg(CTX=msg.CTX, ACK=1))
+
+ assert self.pending.pop() is state
+
+
+class DCPEndpoint(ASCBaseEndpoint):
+ BASE_MESSAGE = DCPMessage
+ SHORT = "dcpep"
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.shmem = self.shmem_dva = None
+ self.init_complete = False
+ self.mgr = None
+
+ self.ch_cb = DCPCallbackChannel(self, "CB", 0x60000, 0x8000)
+ self.ch_cmd = DCPCallChannel(self, "CMD", 0, 0x8000)
+ self.ch_async = DCPCallbackChannel(self, "ASYNC", 0x40000, 0x20000)
+ self.ch_oobcb = DCPCallbackChannel(self, "OOBCB", 0x68000, 0x8000)
+ self.ch_oobcmd = DCPCallChannel(self, "OOBCMD", 0x8000, 0x8000)
+
+ @msg_handler(2, DCPEp_Msg)
+ def Rx(self, msg):
+ if msg.ACK:
+ if msg.CTX in (CallContext.CMD, CallContext.CB):
+ self.ch_cmd.ack()
+ elif msg.CTX in (CallContext.OOBCMD, CallContext.OOBCB):
+ self.ch_oobcmd.ack()
+ else:
+ raise Exception(f"Unknown RX ack channel {msg.CTX}")
+ else:
+ if msg.CTX == CallContext.CB:
+ self.ch_cb.cb(msg)
+ elif msg.CTX == CallContext.OOBCMD:
+ self.ch_oobcb.cb(msg)
+ elif msg.CTX == CallContext.ASYNC:
+ self.ch_async.cb(msg)
+ else:
+ raise Exception(f"Unknown RX callback channel {msg.CTX}")
+ return True
+
+ @msg_handler(1, DCPEp_InitComplete)
+ def InitComplete(self, msg):
+ self.log("init complete")
+ self.init_complete = True
+ return True
+
+ def initialize(self):
+ self.shmem, self.shmem_dva = self.asc.ioalloc(0x100000)
+ self.asc.p.memset32(self.shmem, 0, 0x100000)
+ self.send(DCPEp_SetShmem(DVA=self.shmem_dva))
+ while not self.init_complete:
+ self.asc.work()
+
diff --git a/tools/proxyclient/m1n1/fw/dcp/iboot.py b/tools/proxyclient/m1n1/fw/dcp/iboot.py
new file mode 100644
index 0000000..8124ca9
--- /dev/null
+++ b/tools/proxyclient/m1n1/fw/dcp/iboot.py
@@ -0,0 +1,215 @@
+# SPDX-License-Identifier: MIT
+from construct import *
+
+from ...utils import *
+from ..asc import StandardASC
+from ..afk.epic import *
+from .dcpav import *
+
+EOTF = "EOTF" / Enum(Int32ul,
+ GAMMA_SDR = 1,
+ GAMMA_HDR = 2,
+)
+
+Encoding = "Encoding" / Enum(Int32ul,
+ RGB = 1,
+ YCBCR_444 = 3,
+ YCBCR_422 = 4,
+ YCBCR_420 = 5,
+)
+
+Colorimetry = "Colorimetry" / Enum(Int32ul,
+ BT601_709 = 1,
+ BT2020 = 2,
+ DCIP3 = 3,
+)
+
+Colorspace = "Colorspace" / Enum(Int32ul,
+ SRGB = 1,
+ Native = 2,
+ BT2020 = 3,
+)
+
+SurfaceFormat = "SurfaceFormat" / Enum(Int32ul,
+ BGRA = 1,
+ BGRA2 = 2,
+ RGBA = 3,
+ w18p = 4,
+ BGRA3 = 5,
+ _444v = 6,
+ _422v = 7,
+ _420v = 8,
+ w30r = 9,
+ w40a = 10,
+)
+
+Transform = "Transform" / Enum(Int8ul,
+ NONE = 0,
+ XFLIP = 1,
+ YFLIP = 2,
+ ROT_90 = 3,
+ ROT_180 = 4,
+ ROT_270 = 5,
+)
+
+AddrFormat = "AddrFormat" / Enum(Int32ul,
+ PLANAR = 1,
+ TILED = 2,
+ AGX = 3
+)
+
+TimingMode = Struct(
+ "valid" / Bool(Int32ul),
+ "width" / Int32ul,
+ "height" / Int32ul,
+ "fps_frac" / Int16ul,
+ "fps_int" / Int16ul,
+ Padding(8),
+)
+
+TimingModeList = Struct(
+ "count" / Int32ul,
+ "list" / GreedyRange(TimingMode),
+)
+
+ColorMode = Struct(
+ "valid" / Bool(Int32ul),
+ "colorimetry" / Colorimetry,
+ "eotf" / EOTF,
+ "encoding" / Encoding,
+ "bpp" / Int32ul,
+ "unk" / Int32ul,
+)
+
+ColorModeList = Struct(
+ "count" / Int32ul,
+ "list" / GreedyRange(ColorMode),
+)
+
+SwapInfo = Struct(
+ "unk1" / Int32ul,
+ "unk2" / Int32ul,
+ "unk3" / Int32ul,
+ "swap_id" / Int32ul,
+ "unk5" / Int32ul,
+)
+
+IBootPlaneInfo = Struct(
+ "unk1" / Default(Int32ul, 0),
+ "addr" / Default(Int64ul, 0),
+ "tile_size" / Default(Int32ul, 0),
+ "stride" / Default(Int32ul, 0),
+ "unk5" / Default(Int32ul, 0),
+ "unk6" / Default(Int32ul, 0),
+ "unk7" / Default(Int32ul, 0),
+ "unk8" / Default(Int32ul, 0),
+ "addr_format" / Default(AddrFormat, 0),
+ "unk9" / Default(Int32ul, 0),
+)
+
+IBootLayerInfo = Struct(
+ "planes" / Array(3, IBootPlaneInfo),
+ "unk" / Default(Int32ul, 0),
+ "plane_cnt" / Int32ul,
+ "width" / Int32ul,
+ "height" / Int32ul,
+ "surface_fmt" / SurfaceFormat,
+ "colorspace" / Colorspace,
+ "eotf" / EOTF,
+ "transform" / Transform,
+ Padding(3)
+)
+
+SwapSetLayer = Struct(
+ "unk" / Default(Int32ul, 0),
+ "layer_id" / Int32ul,
+ "layer_info" / IBootLayerInfo,
+ "src_w" / Int32ul,
+ "src_h" / Int32ul,
+ "src_x" / Int32ul,
+ "src_y" / Int32ul,
+ "dst_w" / Int32ul,
+ "dst_h" / Int32ul,
+ "dst_x" / Int32ul,
+ "dst_y" / Int32ul,
+ "unk2" / Default(Int32ul, 0),
+)
+
+class DCPIBootService(EPICService):
+ NAME = "disp0-service"
+ SHORT = "disp0"
+
+ def send_cmd(self, op, data=b'', replen=None):
+ msg = struct.pack("<IIII", op, 16 + len(data), 0, 0) + data
+ if replen is not None:
+ replen += 8
+ resp = super().send_cmd(0xc0, msg, replen)
+ if not resp:
+ return
+ rcmd, rlen = struct.unpack("<II", resp[:8])
+ return resp[8:rlen]
+
+ def setPower(self, power):
+ self.send_cmd(2, b"\x01" if power else b"\x00")
+
+ def getModeCount(self):
+ buf = self.send_cmd(3, b"", 12)
+ hpd, timing_cnt, color_cnt = struct.unpack("<B3xII", buf)
+ return bool(hpd), timing_cnt, color_cnt
+
+ def getTimingModes(self):
+ return TimingModeList.parse(self.send_cmd(4, replen=4096)).list
+
+ def getColorModes(self):
+ return ColorModeList.parse(self.send_cmd(5, replen=4096)).list
+
+ def setMode(self, timing_mode, color_mode):
+ data = TimingMode.build(timing_mode) + ColorMode.build(color_mode)
+ self.send_cmd(6, data)
+
+ def swapBegin(self):
+ return SwapInfo.parse(self.send_cmd(15, replen=128))
+
+ def swapSetLayer(self, layer_id, info, src_rect, dst_rect):
+ data = Container()
+ data.layer_id = layer_id
+ data.layer_info = info
+ data.src_w, data.src_h, data.src_x, data.src_y = src_rect
+ data.dst_w, data.dst_h, data.dst_x, data.dst_y = dst_rect
+ return self.send_cmd(16, SwapSetLayer.build(data), replen=128)
+
+ def swapSetTimestamp(self):
+ pass
+ # 17
+
+ def swapEnd(self):
+ return self.send_cmd(18, b"\x00" * 12, replen=128)
+
+ #def swapWait(self, swap_id):
+ #buf = struct.pack("<IIII", 1, swap_id, 0, swap_id)
+ #return self.send_cmd(19, buf, replen=128)
+
+class DCPIBootEndpoint(EPICEndpoint):
+ SHORT = "iboot"
+
+ SERVICES = [
+ DCPIBootService,
+ ]
+
+
+class DCPIBootClient(StandardASC):
+ DVA_OFFSET = 0xf00000000
+
+ ENDPOINTS = {
+ 0x20: AFKSystemEndpoint,
+ 0x23: DCPIBootEndpoint,
+ 0x24: DCPDPTXEndpoint,
+ 0x2a: DCPDPTXPortEndpoint,
+ 0x27: DCPAVDeviceEndpoint,
+ 0x28: DCPAVServiceEndpoint,
+ 0x29: DCPAVVideoEndpoint,
+ }
+
+ def __init__(self, u, asc_base, dart=None, disp_dart=None):
+ super().__init__(u, asc_base, dart)
+ self.disp_dart = disp_dart
diff --git a/tools/proxyclient/m1n1/fw/dcp/ipc.py b/tools/proxyclient/m1n1/fw/dcp/ipc.py
new file mode 100644
index 0000000..c961a8c
--- /dev/null
+++ b/tools/proxyclient/m1n1/fw/dcp/ipc.py
@@ -0,0 +1,789 @@
+# SPDX-License-Identifier: MIT
+
+from dataclasses import dataclass
+import pprint
+from enum import IntEnum
+
+from ..common import *
+from m1n1.utils import *
+from construct import *
+
+@dataclass
+class ByRef:
+ val: object
+
+class Pointer(Subconstruct):
+ pass
+
+class InPtr(Pointer):
+ pass
+
+class OutPtr(Pointer):
+ pass
+
+class InOutPtr(Pointer):
+ pass
+
+class InOut(Subconstruct):
+ pass
+
+Ptr = InOutPtr
+
+class NULL:
+ def __str__(self):
+ return "NULL"
+ def __repr__(self):
+ return "NULL"
+NULL = NULL()
+
+class Method:
+ def __init__(self, rtype, name, *args, **kwargs):
+ self.rtype = rtype
+ self.name = name
+
+ if args and kwargs:
+ raise Exception("Cannot specify args and kwargs")
+ elif args:
+ args = [(f"arg{i}", arg) for i, arg in enumerate(args)]
+ self.as_kwargs = False
+ elif kwargs:
+ args = list(kwargs.items())
+ self.as_kwargs = True
+ else:
+ args = []
+
+ self.args = args
+
+ in_size = 0
+ out_size = 0
+ self.in_fields = []
+ self.out_fields = []
+
+ if rtype is not None:
+ args.append(("ret", rtype))
+
+ self.dir = []
+ self.nullable = []
+ self.array_of_p = []
+
+ for i, (name, field) in enumerate(self.args):
+ align = 1
+
+ pfield = field
+ dir = "in"
+
+ if name == "ret":
+ dir = "out"
+
+ while isinstance(pfield, Subconstruct):
+ if isinstance(pfield, InPtr):
+ dir = "in"
+ elif isinstance(pfield, OutPtr):
+ dir = "out"
+ elif isinstance(pfield, (InOut, InOutPtr)):
+ dir = "inout"
+ pfield = pfield.subcon
+ if isinstance(pfield, FormatField):
+ align = min(4, pfield.length)
+
+ if dir in ("in", "inout"):
+ #if in_size % align:
+ #self.in_fields.append(Padding(align - (in_size % align)))
+ #in_size += align - (in_size % align)
+
+ self.in_fields.append(name / field)
+ in_size += field.sizeof()
+
+ if dir in ("out", "inout"):
+ #if out_size % align:
+ #self.out_fields.append(Padding(align - (out_size % align)))
+ #out_size += align - (out_size % align)
+
+ self.out_fields.append(name / field)
+ out_size += field.sizeof()
+
+ self.dir.append(dir)
+
+ for i, (name, field) in enumerate(self.args):
+ array_size = None
+ array_of_p = False
+ nullable = False
+ pfield = field
+
+ while isinstance(pfield, Subconstruct):
+ if isinstance(pfield, Array) and array_size is None:
+ array_size = pfield.count
+ if isinstance(pfield, Pointer):
+ nullable = True
+ array_of_p = array_size is not None
+ pfield = pfield.subcon
+
+ if nullable:
+ if array_of_p:
+ self.in_fields.append((name + "_null") / bool_[array_size])
+ in_size += array_size
+ else:
+ self.in_fields.append((name + "_null") / bool_)
+ in_size += 1
+
+ self.nullable.append(nullable)
+ self.array_of_p.append(array_of_p)
+
+ if in_size % 4:
+ self.in_fields.append(Padding(4 - (in_size % 4)))
+ if out_size % 4:
+ self.out_fields.append(Padding(4 - (out_size % 4)))
+
+ self.in_struct = Struct(*self.in_fields)
+ self.out_struct = Struct(*self.out_fields)
+
+ def get_field_val(self, i, in_vals, out_vals=None, nullobj=None):
+ name, field = self.args[i]
+
+ nullable = self.nullable[i]
+ array_of_p = self.array_of_p[i]
+
+ val = None
+
+ if out_vals:
+ val = out_vals.get(name, val)
+ if val is None and in_vals:
+ val = in_vals.get(name, val)
+
+ if nullable and val is not None:
+ null = in_vals.get(name + "_null", None)
+ if null is None:
+ return None
+ if not array_of_p:
+ val = nullobj if null else val
+ else:
+ val2 = [nullobj if n else val for val, n in zip(val, null)]
+ if isinstance(val, ListContainer):
+ val2 = ListContainer(val2)
+ val = val2
+
+ return val
+
+ def fmt_args(self, in_vals, out_vals=None):
+ s = []
+
+ for i, (name, field) in enumerate(self.args):
+ if name == "ret":
+ continue
+
+ dir = self.dir[i]
+ nullable = self.nullable[i]
+
+ val = self.get_field_val(i, in_vals, out_vals, nullobj=NULL)
+
+ if val is not None:
+ if self.is_long(val):
+ s.append(f"{name}=...")
+ elif isinstance(val, ListContainer):
+ s.append(f"{name}={list(val)!r}")
+ else:
+ s.append(f"{name}={val!r}")
+ elif dir == "out":
+ s.append(f"{name}=<out>")
+ else:
+ s.append(f"{name}=?")
+
+ return ", ".join(s)
+
+ def print_long_args(self, indent, in_vals, out_vals=None):
+ for i, (name, field) in enumerate(self.args):
+ if name == "ret":
+ continue
+
+ val = self.get_field_val(i, in_vals, out_vals, nullobj=NULL)
+
+ if name in in_vals and out_vals is not None and name not in out_vals:
+ continue
+
+ if self.is_long(val):
+ hdr = f"{indent} {name} = "
+ if isinstance(val, (ListContainer, Container)):
+ print(hdr + str(val).replace("\n", "\n" + indent))
+ elif isinstance(val, bytes):
+ print(hdr + f"({len(val):#x} bytes)")
+ chexdump(val, indent=indent + " ")
+ else:
+ dindent = " " * len(hdr)
+ if isinstance(val, dict) and "_io" in val:
+ del val["_io"]
+ print(hdr + pprint.pformat(val, sort_dicts=False).replace("\n", "\n" + dindent))
+
+ def is_long(self, arg):
+ if isinstance(arg, (list, bytes)):
+ return len(arg) > 4 or any(self.is_long(i) for i in arg)
+
+ return isinstance(arg, (dict, list, bytes))
+
+ def parse_input(self, data):
+ vals = self.in_struct.parse(data)
+
+ return Container({ k: v() if callable(v) else v for k,v in vals.items() })
+
+ def parse_output(self, data, in_vals):
+ context = dict(in_vals)
+
+ if "data" in context:
+ del context["data"]
+
+ vals = self.out_struct.parse(data, **context)
+
+ return Container({ k: v() if callable(v) else v for k,v in vals.items() })
+
+ def __str__(self):
+ if self.rtype is None:
+ rtype = "void"
+ else:
+ rtype = str(self.rtype)
+
+ args = []
+ for name, field in self.args:
+ if name == "ret":
+ continue
+ args.append(f"{field} {name}")
+
+ return f"{rtype} {self.name}({', '.join(args)})"
+
+ def callback(self, func, in_data):
+ in_vals = self.parse_input(in_data)
+
+ args = []
+ kwargs = {}
+
+ out_vals = {}
+
+ for i, (name, field) in enumerate(self.args):
+ if name == "ret":
+ continue
+
+ dir = self.dir[i]
+
+ val = self.get_field_val(i, in_vals, out_vals, nullobj=NULL)
+ is_null = val is NULL
+ if is_null:
+ val = None
+
+ if dir == "inout":
+ if val is not None and not isinstance(val, list):
+ val = ByRef(val)
+ out_vals[name] = val
+ elif dir == "out" and not is_null:
+ val = ByRef(None)
+ out_vals[name] = val
+
+ if self.as_kwargs:
+ kwargs[name] = val
+ else:
+ args.append(val)
+
+ retval = func(*args, **kwargs)
+
+ if self.rtype is None:
+ assert retval is None
+ else:
+ assert retval is not None
+ out_vals["ret"] = retval
+
+ out_vals = {k: v.val if isinstance(v, ByRef) else v for k, v in out_vals.items()}
+
+ context = dict(in_vals)
+
+ if "obj" in context:
+ del context["obj"]
+
+ out_data = self.out_struct.build(out_vals, **context)
+ return out_data
+
+
+ def call(self, call, *args, **kwargs):
+ if args and kwargs:
+ raise Exception("Cannot use both args and kwargs")
+
+ if args:
+ for arg, (name, field) in zip(args, self.args):
+ kwargs[name] = arg
+
+ in_vals = {}
+ out_refs = {}
+
+ for i, (name, field) in enumerate(self.args):
+ if name == "ret":
+ continue
+
+ val = kwargs[name]
+ dir = self.dir[i]
+ nullable = self.nullable[i]
+ array_of_p = self.array_of_p[i]
+
+ if nullable:
+ if not array_of_p:
+ in_vals[name + "_null"] = val is None
+ else:
+ defaults = field.parse(b"\x00" * field.sizeof())
+ in_vals[name + "_null"] = [i is None for i in val]
+ val = [v if v is not None else defaults[i] for i, v in enumerate(val)]
+ else:
+ assert val is not None
+
+ if val is None:
+ continue
+
+ if dir == "out":
+ assert isinstance(val, ByRef)
+ out_refs[name] = val
+ elif dir == "inout":
+ if isinstance(val, ByRef):
+ in_vals[name] = val.val
+ out_refs[name] = val
+ elif val is not None:
+ in_vals[name] = val
+ elif val is not None:
+ in_vals[name] = val
+
+ in_data = self.in_struct.build(in_vals)
+ print(f"{self.name}({self.fmt_args(in_vals)})")
+
+ out_data = call(in_data)
+ out_vals = self.parse_output(out_data, in_vals)
+
+ for k, v in out_refs.items():
+ v.val = out_vals[k]
+
+ if self.rtype is not None:
+ return out_vals["ret"]
+
+def dump_fields(fields):
+ off = 0
+ for f in fields:
+ sizeof = f.sizeof()
+ print(f"{off:#x}: {f} ({sizeof:#x})")
+ off += sizeof
+
+class Call(Method):
+ pass
+
+class Callback(Method):
+ pass
+
+int8_t = Int8sl
+uint8_t = Int8ul
+int16_t = Int16sl
+uint16_t = Int16ul
+int32_t = Int32sl
+uint32_t = Int32ul
+int64_t = Int64sl
+uint64_t = Int64ul
+
+uint = uint32_t
+int_ = int32_t
+ulong = uint64_t
+long_ = int64_t
+
+void = None
+
+class IPCObject:
+ @classmethod
+ def methods(cls):
+ ret = {}
+ for c in cls.mro():
+ ret.update({k: (cls, v) for k, v in cls.__dict__.items() if isinstance(v, Method)})
+
+ return ret
+
+rt_bw_config_t = Struct(
+ "unk1" / UnkBytes(8),
+ "reg1" / Int64ul,
+ "reg2" / Int64ul,
+ "unk2" / UnkBytes(4),
+ "bit" / Int32ul,
+ "padding" / UnkBytes(0x1c),
+)
+
+IOUserClient = Struct(
+ "addr" / Hex(Int64ul),
+ "unk" / Int32ul,
+ "flag1" / Int8ul,
+ "flag2" / Int8ul,
+ Padding(2)
+)
+
+IOMobileFramebufferUserClient = IOUserClient
+
+IOMFBStatus = Int32ul
+IOMFBParameterName = Int32ul
+
+BufferDescriptor = uint64_t
+
+SwapCompleteData = Bytes(0x12)
+SwapInfoBlob = Bytes(0x6c4)
+
+SWAP_SURFACES = 4
+
+Rect = NamedTuple("rect", "x y w h", Int32ul[4])
+
+IOMFBSwapRec = Struct(
+ "ts1" / Default(Int64ul, 0),
+ "ts2" / Default(Int64ul, 0),
+ "unk_10" / Default(Int64ul, 0),
+ "unk_18" / Default(Int64ul, 0),
+ "ts64_unk" / Default(Int64ul, 0),
+ "unk_28" / Default(Int64ul, 0),
+ "ts3" / Default(Int64ul, 0),
+ "unk_38" / Default(Int64ul, 0),
+ "flags1" / Hex(Int64ul),
+ "flags2" / Hex(Int64ul),
+ "swap_id" / Int32ul,
+ "surf_ids" / Int32ul[SWAP_SURFACES],
+ "src_rect" / Rect[SWAP_SURFACES],
+ "surf_flags" / Int32ul[SWAP_SURFACES],
+ "surf_unk" / Int32ul[SWAP_SURFACES],
+ "dst_rect" / Rect[SWAP_SURFACES],
+ "swap_enabled" / Hex(Int32ul),
+ "swap_completed" / Hex(Int32ul),
+ "unk_10c" / Hex(Default(Int32ul, 0)),
+ "unk_110" / UnkBytes(0x1b8),
+ "unk_2c8" / Hex(Default(Int32ul, 0)),
+ "unk_2cc" / UnkBytes(0x14),
+ "unk_2e0" / Hex(Default(Int32ul, 0)),
+ "unk_2e2" / UnkBytes(0x2),
+ "bl_unk" / Hex(Int64ul), # seen: 0x0, 0x1, 0x101, 0x1_0000, 0x101_010101
+ "bl_val" / Hex(Int32ul), # range 0x10000000 - approximately 0x7fe07fc0 for 4 - 510 nits
+ "bl_power" / Hex(Int8ul), # constant 0x40, 0x00: backlight off
+ "unk_2f3" / UnkBytes(0x2d),
+)
+
+assert IOMFBSwapRec.sizeof() == 0x320
+
+MAX_PLANES = 3
+
+ComponentTypes = Struct(
+ "count" / Int8ul,
+ "types" / SizedArray(7, "count", Int8ul),
+)
+
+#ComponentTypes = Bytes(8)
+
+PlaneInfo = Struct(
+ "width" / Int32ul,
+ "height" / Int32ul,
+ "base" / Hex(Int32ul),
+ "offset" / Hex(Int32ul),
+ "stride" / Hex(Int32ul),
+ "size" / Hex(Int32ul),
+ "tile_size" / Int16ul,
+ "tile_w" / Int8ul,
+ "tile_h" / Int8ul,
+ "unk1" / UnkBytes(0xd),
+ "unk2" / Hex(Int8ul),
+ "unk3" / UnkBytes(0x26),
+)
+
+assert PlaneInfo.sizeof() == 0x50
+
+IOSurface = Struct(
+ "is_tiled" / bool_,
+ "unk_1" / bool_,
+ "unk_2" / bool_,
+ "plane_cnt" / Int32ul,
+ "plane_cnt2" / Int32ul,
+ "format" / FourCC,
+ "unk_f" / Default(Hex(Int32ul), 0),
+ "xfer_func" / Int8ul,
+ "colorspace" / Int8ul,
+ "stride" / Int32ul,
+ "pix_size" / Int16ul,
+ "pel_w" / Int8ul,
+ "pel_h" / Int8ul,
+ "offset" / Default(Hex(Int32ul), 0),
+ "width" / Int32ul,
+ "height" / Int32ul,
+ "buf_size" / Hex(Int32ul),
+ "unk_2d" / Default(Int32ul, 0),
+ "unk_31" / Default(Int32ul, 0),
+ "surface_id" / Int32ul,
+ "comp_types" / Default(SizedArray(MAX_PLANES, "plane_cnt", ComponentTypes), []),
+ "has_comp" / Bool(Int64ul),
+ "planes" / Default(SizedArray(MAX_PLANES, "plane_cnt", PlaneInfo), []),
+ "has_planes" / Bool(Int64ul),
+ "compression_info" / Default(SizedArray(MAX_PLANES, "plane_cnt", UnkBytes(0x34)), []),
+ "has_compr_info" / Bool(Int64ul),
+ "unk_1f5" / Int32ul,
+ "unk_1f9" / Int32ul,
+ "padding" / UnkBytes(7),
+)
+
+assert IOSurface.sizeof() == 0x204
+
+IOMFBColorFixedMatrix = Array(5, Array(3, ulong))
+
+class PropID(IntEnum):
+ BrightnessCorrection = 14
+
+class UPPipeAP_H13P(IPCObject):
+ A000 = Call(bool_, "late_init_signal")
+ A029 = Call(void, "setup_video_limits")
+ A034 = Call(void, "update_notify_clients_dcp", Array(14, uint))
+ A035 = Call(bool_, "is_hilo")
+ A036 = Call(bool_, "apt_supported")
+ A037 = Call(uint, "get_dfb_info", InOutPtr(uint), InOutPtr(Array(4, ulong)), InOutPtr(uint))
+ A038 = Call(uint, "get_dfb_compression_info", InOutPtr(uint))
+
+ D000 = Callback(bool_, "did_boot_signal")
+ D001 = Callback(bool_, "did_power_on_signal")
+ D002 = Callback(void, "will_power_off_signal")
+ D003 = Callback(void, "rt_bandwidth_setup_ap", config=OutPtr(rt_bw_config_t))
+
+IdleCachingState = uint32_t
+
+class UnifiedPipeline2(IPCObject):
+ A352 = Call(bool_, "applyProperty", uint, uint)
+ A353 = Call(uint, "get_system_type")
+ A357 = Call(void, "set_create_DFB")
+ A358 = Call(IOMFBStatus, "vi_set_temperature_hint")
+
+ D100 = Callback(void, "match_pmu_service")
+ D101 = Callback(uint32_t, "UNK_get_some_field")
+ D102 = Callback(void, "set_number_property", key=string(0x40), value=uint)
+ D103 = Callback(void, "set_boolean_property", key=string(0x40), value=bool_)
+ D106 = Callback(void, "removeProperty", key=string(0x40))
+ D107 = Callback(bool_, "create_provider_service")
+ D108 = Callback(bool_, "create_product_service")
+ D109 = Callback(bool_, "create_PMU_service")
+ D110 = Callback(bool_, "create_iomfb_service")
+ D111 = Callback(bool_, "create_backlight_service")
+ D112 = Callback(void, "set_idle_caching_state_ap", IdleCachingState, uint)
+ D116 = Callback(bool_, "start_hardware_boot")
+ D117 = Callback(bool_, "is_dark_boot")
+ D118 = Callback(bool_, "is_waking_from_hibernate")
+ D120 = Callback(bool_, "read_edt_data", key=string(0x40), count=uint, value=InOut(Lazy(SizedArray(8, "count", uint32_t))))
+
+ D122 = Callback(bool_, "setDCPAVPropStart", length=uint)
+ D123 = Callback(bool_, "setDCPAVPropChunk", data=HexDump(SizedBytes(0x1000, "length")), offset=uint, length=uint)
+ D124 = Callback(bool_, "setDCPAVPropEnd", key=string(0x40))
+
+class UPPipe2(IPCObject):
+ A102 = Call(uint64_t, "test_control", cmd=uint64_t, arg=uint)
+ A103 = Call(void, "get_config_frame_size", width=InOutPtr(uint), height=InOutPtr(uint))
+ A104 = Call(void, "set_config_frame_size", width=uint, height=uint)
+ A105 = Call(void, "program_config_frame_size")
+ A130 = Call(bool_, "init_ca_pmu")
+ A131 = Call(bool_, "pmu_service_matched")
+ A132 = Call(bool_, "backlight_service_matched")
+
+ D201 = Callback(uint32_t, "map_buf", buf=InPtr(BufferDescriptor), vaddr=OutPtr(ulong), dva=OutPtr(ulong), unk=bool_)
+ D202 = Callback(void, "unmap_buf", buf=InPtr(BufferDescriptor), unk1=uint, unk2=ulong, unkB=uint)
+
+ D206 = Callback(bool_, "match_pmu_service_2")
+ D207 = Callback(bool_, "match_backlight_service")
+ D208 = Callback(uint64_t, "get_calendar_time_ms")
+ D211 = Callback(void, "update_backlight_factor_prop", int_)
+
+class PropRelay(IPCObject):
+ D300 = Callback(void, "pr_publish", prop_id=uint32_t, value=int_)
+
+class IOMobileFramebufferAP(IPCObject):
+ A401 = Call(uint32_t, "start_signal")
+
+ A407 = Call(uint32_t, "swap_start", swap_id=InOutPtr(uint), client=InOutPtr(IOUserClient))
+ A408 = Call(uint32_t, "swap_submit_dcp",
+ swap_rec=InPtr(IOMFBSwapRec),
+ surfaces=Array(4, InPtr(IOSurface)),
+ surfAddr=Array(4, Hex(ulong)),
+ unkBool=bool_,
+ unkFloat=Float64l,
+ unkInt=uint,
+ unkOutBool=OutPtr(bool_))
+
+ A410 = Call(uint32_t, "set_display_device", uint)
+ A411 = Call(bool_, "is_main_display")
+ A438 = Call(uint32_t, "swap_set_color_matrix", matrix=InOutPtr(IOMFBColorFixedMatrix), func=uint32_t, unk=uint)
+#"A438": "IOMobileFramebufferAP::swap_set_color_matrix(IOMFBColorFixedMatrix*, IOMFBColorMatrixFunction, unsigned int)",
+
+ A412 = Call(uint32_t, "set_digital_out_mode", uint, uint)
+ A413 = Call(uint32_t, "get_digital_out_state", InOutPtr(uint))
+ A414 = Call(uint32_t, "get_display_area", InOutPtr(ulong))
+ A419 = Call(uint32_t, "get_gamma_table", InOutPtr(Bytes(0xc0c)))
+ A422 = Call(uint32_t, "set_matrix", uint, InPtr(Array(3, Array(3, ulong))))
+ A423 = Call(uint32_t, "set_contrast", InOutPtr(Float32l))
+ A426 = Call(uint32_t, "get_color_remap_mode", InOutPtr(uint32_t))
+ A427 = Call(uint32_t, "setBrightnessCorrection", uint)
+
+ A435 = Call(uint32_t, "set_block_dcp", arg1=uint64_t, arg2=uint, arg3=uint, arg4=Array(8, ulong), arg5=uint, data=SizedBytes(0x1000, "length"), length=ulong)
+ A439 = Call(uint32_t, "set_parameter_dcp", param=IOMFBParameterName, value=Lazy(SizedArray(4, "count", ulong)), count=uint)
+
+ A440 = Call(uint, "display_width")
+ A441 = Call(uint, "display_height")
+ A442 = Call(void, "get_display_size", OutPtr(uint), OutPtr(uint))
+ A443 = Call(int_, "do_create_default_frame_buffer")
+ A444 = Call(void, "printRegs")
+ A447 = Call(int_, "enable_disable_video_power_savings", uint)
+ A454 = Call(void, "first_client_open")
+ A455 = Call(void, "last_client_close_dcp", OutPtr(uint))
+ A456 = Call(bool_, "writeDebugInfo", ulong)
+ A457 = Call(void, "flush_debug_flags", uint)
+ A458 = Call(bool_, "io_fence_notify", uint, uint, ulong, IOMFBStatus)
+ A460 = Call(bool_, "setDisplayRefreshProperties")
+ A463 = Call(void, "flush_supportsPower", bool_)
+ A464 = Call(uint, "abort_swaps_dcp", InOutPtr(IOMobileFramebufferUserClient))
+
+ A467 = Call(uint, "update_dfb", surf=InPtr(IOSurface))
+ A468 = Call(uint32_t, "setPowerState", ulong, bool_, OutPtr(uint))
+ A469 = Call(bool_, "isKeepOnScreen")
+
+ D552 = Callback(bool_, "setProperty_dict", key=string(0x40), value=InPtr(Padded(0x1000, OSDictionary())))
+ D561 = Callback(bool_, "setProperty_dict", key=string(0x40), value=InPtr(Padded(0x1000, OSDictionary())))
+ D563 = Callback(bool_, "setProperty_int", key=string(0x40), value=InPtr(uint64_t))
+ D565 = Callback(bool_, "setProperty_bool", key=string(0x40), value=InPtr(Bool(uint32_t)))
+ D567 = Callback(bool_, "setProperty_str", key=string(0x40), value=string(0x40))
+
+ D574 = Callback(IOMFBStatus, "powerUpDART", bool_)
+
+ D575 = Callback(bool_, "get_dot_pitch", OutPtr(uint))
+ D576 = Callback(void, "hotPlug_notify_gated", ulong)
+ D577 = Callback(void, "powerstate_notify", bool_, bool_)
+ D578 = Callback(bool_, "idle_fence_create", IdleCachingState)
+ D579 = Callback(void, "idle_fence_complete")
+
+ D581 = Callback(void, "swap_complete_head_of_line", uint, bool_, uint, bool_)
+ D582 = Callback(bool_, "create_default_fb_surface", uint, uint)
+ D583 = Callback(bool_, "serializeDebugInfoCb", ulong, InPtr(uint64_t), uint)
+ D584 = Callback(void, "clear_default_surface")
+
+ D588 = Callback(void, "resize_default_fb_surface_gated")
+ D589 = Callback(void, "swap_complete_ap_gated", swap_id=uint, unkBool=bool_, swap_data=InPtr(SwapCompleteData), swap_info=SwapInfoBlob, unkUint=uint)
+
+ D591 = Callback(void, "swap_complete_intent_gated", swap_id=uint, unkB=bool_, unkInt=uint32_t, width=uint, height=uint)
+ D593 = Callback(void, "enable_backlight_message_ap_gated", bool_)
+ D594 = Callback(void, "setSystemConsoleMode", bool_)
+
+ D596 = Callback(bool_, "isDFBAllocated")
+ D597 = Callback(bool_, "preserveContents")
+ D598 = Callback(void, "find_swap_function_gated")
+
+class ServiceRelay(IPCObject):
+ D400 = Callback(void, "get_property", obj=FourCC, key=string(0x40), value=OutPtr(Bytes(0x200)), lenght=InOutPtr(uint))
+ D401 = Callback(bool_, "sr_get_uint_prop", obj=FourCC, key=string(0x40), value=InOutPtr(ulong))
+ D404 = Callback(void, "sr_set_uint_prop", obj=FourCC, key=string(0x40), value=uint)
+ D406 = Callback(void, "set_fx_prop", obj=FourCC, key=string(0x40), value=uint)
+ D408 = Callback(uint64_t, "sr_getClockFrequency", obj=FourCC, arg=uint)
+ D411 = Callback(IOMFBStatus, "sr_mapDeviceMemoryWithIndex", obj=FourCC, index=uint, flags=uint, addr=OutPtr(ulong), length=OutPtr(ulong))
+ D413 = Callback(bool_, "sr_setProperty_dict", obj=FourCC, key=string(0x40), value=InPtr(Padded(0x1000, OSDictionary())))
+ D414 = Callback(bool_, "sr_setProperty_int", obj=FourCC, key=string(0x40), value=InPtr(uint64_t))
+ D415 = Callback(bool_, "sr_setProperty_bool", obj=FourCC, key=string(0x40), value=InPtr(Bool(uint32_t)))
+
+mem_desc_id = uint
+
+class MemDescRelay(IPCObject):
+ D451 = Callback(mem_desc_id, "allocate_buffer", uint, ulong, uint, OutPtr(ulong), OutPtr(ulong), OutPtr(ulong))
+ D452 = Callback(mem_desc_id, "map_physical", paddr=ulong, size=ulong, flags=uint, dva=OutPtr(ulong), dvasize=OutPtr(ulong))
+ D453 = Callback(mem_desc_id, "withAddressRange", ulong, ulong, uint, uint64_t, OutPtr(uint), OutPtr(ulong))
+ D454 = Callback(IOMFBStatus, "prepare", uint, uint)
+ D455 = Callback(IOMFBStatus, "complete", uint, uint)
+ D456 = Callback(bool_, "release_descriptor", uint)
+
+ALL_CLASSES = [
+ UPPipeAP_H13P,
+ UnifiedPipeline2,
+ IOMobileFramebufferAP,
+ ServiceRelay,
+ PropRelay,
+ UPPipe2,
+ MemDescRelay,
+]
+
+ALL_METHODS = {}
+
+for cls in ALL_CLASSES:
+ ALL_METHODS.update(cls.methods())
+
+SHORT_CHANNELS = {
+ "CB": "d",
+ "CMD": "C",
+ "ASYNC": "a",
+ "OOBCMD": "O",
+ "OOBCB": "o",
+}
+
+RDIR = { ">": "<", "<": ">" }
+
+class Call:
+ def __init__(self, dir, chan, off, msg, in_size, out_size, in_data=b''):
+ self.dir = dir
+ self.chan = chan
+ self.msg = msg
+ self.off = off
+ self.in_size = in_size
+ self.out_size = out_size
+ self.in_data = in_data
+ self.out_data = None
+ self.complete = False
+ self.ret = None
+
+ def ack(self, out_data):
+ self.out_data = out_data
+ self.complete = True
+
+ def print_req(self, indent=""):
+ log = f"{indent}{self.dir}{SHORT_CHANNELS[self.chan]}[{self.off:#x}] {self.msg} "
+
+ cls, method = ALL_METHODS.get(self.msg, (None, None))
+ if cls is None:
+ print(log + f"{self.in_size:#x}/{self.out_size:#x}")
+ return
+
+ log += f"{cls.__name__}::{method.name}("
+ in_size = method.in_struct.sizeof()
+
+ if in_size != len(self.in_data):
+ print(f"{log} !! Expected {in_size:#x} bytes, got {len(self.in_data):#x} bytes (in)")
+ dump_fields(method.in_fields)
+ chexdump(self.in_data)
+ self.in_vals = {}
+ return
+
+ self.in_vals = method.parse_input(self.in_data)
+
+ log += f"{method.fmt_args(self.in_vals)})"
+
+ print(log)
+
+ method.print_long_args(indent, self.in_vals)
+ #if method.in_fields:
+ #print(self.in_vals)
+
+ def print_reply(self, indent=""):
+ assert self.complete
+ log = f"{indent}{RDIR[self.dir]}{SHORT_CHANNELS[self.chan]}[{self.off:#x}] {self.msg} "
+
+ cls, method = ALL_METHODS.get(self.msg, (None, None))
+ if cls is None:
+ print(log + f"{self.in_size:#x}/{self.out_size:#x}")
+ return
+
+ log += f"{cls.__name__}::{method.name}("
+ out_size = method.out_struct.sizeof()
+
+ if out_size != len(self.out_data):
+ print(f"{log} !! Expected {out_size:#x} bytes, got {len(self.out_data):#x} bytes (out)")
+ dump_fields(method.out_fields)
+ chexdump(self.out_data)
+ return
+
+ self.out_vals = method.parse_output(self.out_data, self.in_vals)
+
+ log += f"{method.fmt_args(self.in_vals, self.out_vals)})"
+
+ if "ret" in self.out_vals:
+ self.ret = self.out_vals.ret
+ del self.out_vals["ret"]
+ log += f" = {self.ret!r}"
+
+ print(log)
+
+ method.print_long_args(indent, self.in_vals, self.out_vals)
+ #if len(method.out_fields) - (self.ret is not None):
+ #print(self.out_vals)
diff --git a/tools/proxyclient/m1n1/fw/dcp/manager.py b/tools/proxyclient/m1n1/fw/dcp/manager.py
new file mode 100644
index 0000000..7977c3a
--- /dev/null
+++ b/tools/proxyclient/m1n1/fw/dcp/manager.py
@@ -0,0 +1,319 @@
+# SPDX-License-Identifier: MIT
+import pprint
+import struct, functools, time
+from dataclasses import dataclass
+from enum import IntEnum
+
+from construct.lib import hexundump
+
+from ..asc.base import *
+from ...utils import *
+
+from . import ipc
+from .dcpep import CallContext
+
+## DCP API manager
+
+class DCPBaseManager:
+ def __init__(self, dcpep):
+ self.dcpep = dcpep
+ self.dcp = dcpep.asc
+ dcpep.mgr = self
+
+ self.name_map = {}
+ self.tag_map = {}
+
+ self.in_callback = 0
+
+ for k, (cls, v) in ipc.ALL_METHODS.items():
+ self.name_map[v.name] = k, v
+ self.tag_map[k] = v
+
+ def handle_cb(self, state):
+ method = self.tag_map.get(state.tag, None)
+ if method is None:
+ raise Exception(f"Unknown callback {state.tag}")
+
+ func = getattr(self, method.name, None)
+
+ if func is None:
+ raise Exception(f"Unimplemented callback {method!s} [{state.tag}]")
+
+ self.in_callback += 1
+ try:
+ retval = method.callback(func, state.in_data)
+ except Exception as e:
+ print(f"Exception in callback {method.name}")
+ raise
+ self.in_callback -= 1
+ return retval
+
+ def __getattr__(self, attr):
+ tag, method = self.name_map.get(attr, (None, None))
+ if method is None or tag.startswith("D"):
+ raise AttributeError(f"Unknown method {attr}")
+
+ out_len = method.out_struct.sizeof()
+ if self.in_callback:
+ ctx = CallContext.CB
+ else:
+ ctx = CallContext.CMD
+ rpc = functools.partial(self.dcpep.ch_cmd.call, ctx, tag, out_len=out_len)
+ return functools.partial(method.call, rpc)
+
+class DCPManager(DCPBaseManager):
+ def __init__(self, dcpep, compatible='t8103'):
+ super().__init__(dcpep)
+
+ self.iomfb_prop = {}
+ self.dcpav_prop = {}
+ self.service_prop = {}
+ self.pr_prop = {}
+
+ self.swaps = 0
+ self.frame = 0
+
+ self.mapid = 0
+ self.bufs = {}
+
+ self.compatible = compatible
+
+ ## IOMobileFramebufferAP methods
+
+ def find_swap_function_gated(self):
+ pass
+
+ def create_provider_service(self):
+ return True
+
+ def create_product_service(self):
+ return True
+
+ def create_PMU_service(self):
+ return True
+
+ def create_iomfb_service(self):
+ return True
+
+ def create_backlight_service(self):
+ return False
+
+ def setProperty(self, key, value):
+ self.iomfb_prop[key] = value
+ print(f"setProperty({key} = {value!r})")
+ return True
+
+ setProperty_dict = setProperty_int = setProperty_bool = setProperty_str = setProperty
+
+ def swap_complete_ap_gated(self, swap_id, unkBool, swap_data, swap_info, unkUint):
+ swap_data_ptr = "NULL" if swap_data is None else "..."
+ print(f"swap_complete_ap_gated({swap_id}, {unkBool}, {swap_data_ptr}, ..., {unkUint}")
+ if swap_data is not None:
+ chexdump(swap_data)
+ chexdump(swap_info)
+ self.swaps += 1
+ self.frame = swap_id
+
+ def swap_complete_intent_gated(self, swap_id, unkB, unkInt, width, height):
+ print(f"swap_complete_intent_gated({swap_id}, {unkB}, {unkInt}, {width}, {height}")
+ self.swaps += 1
+ self.frame = swap_id
+
+ def enable_backlight_message_ap_gated(self, unkB):
+ print(f"enable_backlight_message_ap_gated({unkB})")
+
+ # wrapper for set_digital_out_mode to print information on the setted modes
+ def SetDigitalOutMode(self, color_id, timing_id):
+ color_mode = [x for x in self.dcpav_prop['ColorElements'] if x['ID'] == color_id][0]
+ timing_mode = [x for x in self.dcpav_prop['TimingElements'] if x['ID'] == timing_id][0]
+ pprint.pprint(color_mode)
+ pprint.pprint(timing_mode)
+ self.set_digital_out_mode(color_id, timing_id)
+
+ ## UPPipeAP_H13P methods
+
+ def did_boot_signal(self):
+ return True
+
+ def did_power_on_signal(self):
+ return True
+
+ def will_power_off_signal(self):
+ return
+
+ def rt_bandwidth_setup_ap(self, config):
+ print("rt_bandwidth_setup_ap(...)")
+ if self.compatible == 't8103':
+ config.val = {
+ "reg1": 0x23b738014, # reg[5] in disp0/dispext0, plus 0x14 - part of pmgr
+ "reg2": 0x23bc3c000, # reg[6] in disp0/dispext0 - part of pmp/pmgr
+ "bit": 2,
+ }
+ elif self.compatible == 't600x':
+ config.val = {
+ "reg1": 0x28e3d0000 + 0x988, # reg[4] in disp0/dispext0, plus 0x988
+ "reg2": 0x0,
+ "bit": 0,
+ }
+ else:
+ raise ValueError(self.compatible)
+
+ ## UnifiedPipeline2 methods
+
+ def match_pmu_service(self):
+ pass
+
+ def set_number_property(self, key, value):
+ pass
+
+ def create_provider_service(self):
+ return True
+
+ def is_dark_boot(self):
+ return False
+
+ def read_edt_data(self, key, count, value):
+ return False
+
+ def UNK_get_some_field(self):
+ return 0
+
+ def start_hardware_boot(self):
+ self.set_create_DFB()
+ self.do_create_default_frame_buffer()
+ self.setup_video_limits()
+ self.flush_supportsPower(True)
+ self.late_init_signal()
+ self.setDisplayRefreshProperties()
+ return True
+
+ def setDCPAVPropStart(self, length):
+ print(f"setDCPAVPropStart({length:#x})")
+ self.dcpav_prop_len = length - 1 # off by one?
+ self.dcpav_prop_off = 0
+ self.dcpav_prop_data = []
+ return True
+
+ def setDCPAVPropChunk(self, data, offset, length):
+ print(f"setDCPAVPropChunk(..., {offset:#x}, {length:#x})")
+ assert offset == self.dcpav_prop_off
+ self.dcpav_prop_data.append(data)
+ self.dcpav_prop_off += len(data)
+ return True
+
+ def setDCPAVPropEnd(self, key):
+ print(f"setDCPAVPropEnd({key!r})")
+ blob = b"".join(self.dcpav_prop_data)
+ assert self.dcpav_prop_len == len(blob)
+ self.dcpav_prop[key] = ipc.OSSerialize().parse(blob)
+ self.dcpav_prop_data = self.dcpav_prop_len = self.dcpav_prop_off = None
+ #pprint.pprint(self.dcpav_prop[key])
+ return True
+
+ def set_boolean_property(self, key, value):
+ print(f"set {key!r} = {value}")
+
+ def removeProperty(self, key):
+ print(f"removeProperty({key!r})")
+
+ def powerstate_notify(self, unk1, unk2):
+ print(f"powerstate_notify({unk1}, {unk2})")
+
+ def create_default_fb_surface(self, width, height):
+ print(f"create_default_fb_surface({width}x{height})")
+ return True
+
+ def powerUpDART(self, unk):
+ print(f"powerUpDART({unk})")
+ return 0
+
+ def hotPlug_notify_gated(self, unk):
+ print(f"hotPlug_notify_gated({unk})")
+
+ def is_waking_from_hibernate(self):
+ return False
+
+ ## UPPipe2 methods
+
+ def match_pmu_service_2(self):
+ return True
+
+ def match_backlight_service(self):
+ return True
+
+ def get_calendar_time_ms(self):
+ return time.time_ns() // 1000_000
+
+ def update_backlight_factor_prop(self, value):
+ pass
+
+ def map_buf(self, buf, vaddr, dva, unk):
+ print(f"map buf {buf}, {unk}")
+ paddr, dcpdva, dvasize = self.bufs[buf]
+ vaddr.val = 0
+ dva.val = self.dcp.disp_dart.iomap(4, paddr, dvasize)
+ print(f"mapped to dva {dva}")
+ return 0
+
+ def update_backlight_factor_prop(self, unk):
+ print(f"update_backlight_factor_prop {unk}")
+
+ ## ServiceRelay methods
+
+ def sr_setProperty(self, obj, key, value):
+ self.service_prop.setdefault(obj, {})[key] = value
+ print(f"sr_setProperty({obj}/{key} = {value!r})")
+ return True
+
+ def sr_getClockFrequency(self, obj, arg):
+ print(f"sr_getClockFrequency({obj}, {arg})")
+ return 533333328
+
+ sr_setProperty_dict = sr_setProperty_int = sr_setProperty_bool = sr_setProperty_str = sr_setProperty
+
+ def sr_get_uint_prop(self, obj, key, value):
+ value.val = 0
+ return False
+
+ def sr_set_uint_prop(self, obj, key, value):
+ print(f"sr_set_uint_prop({obj}, {key} = {value})")
+
+ def set_fx_prop(self, obj, key, value):
+ print(f"set_fx_prop({obj}, {key} = {value})")
+
+ def sr_mapDeviceMemoryWithIndex(self, obj, index, flags, addr, length):
+ assert obj == "PROV"
+ addr.val, length.val = self.dcp.u.adt["/arm-io/disp0"].get_reg(index)
+ print(f"sr_mapDeviceMemoryWithIndex({obj}, {index}, {flags}, {addr.val:#x}, {length.val:#x})")
+ return 0
+
+ ## PropRelay methods
+
+ def pr_publish(self, prop_id, value):
+ self.pr_prop[prop_id] = value
+ print(f"pr_publish({prop_id}, {value!r})")
+
+ ## MemDescRelay methods:
+
+ def allocate_buffer(self, unk0, size, unk1, paddr, dva, dvasize):
+ print(f"allocate_buffer({unk0}, {size}, {unk1})")
+
+ dvasize.val = align_up(size, 4096)
+ paddr.val = self.dcp.u.memalign(0x4000, size)
+ dva.val = self.dcp.dart.iomap(0, paddr.val, size)
+
+ self.mapid += 1
+ print(f"Allocating {self.mapid} as {hex(paddr.val)} / {hex(dva.val)}")
+
+ self.bufs[self.mapid] = (paddr.val, dva.val, dvasize.val)
+
+ return self.mapid
+
+ def map_physical(self, paddr, size, flags, dva, dvasize):
+ dvasize.val = align_up(size, 4096)
+ dva.val = self.dcp.dart.iomap(0, paddr, size)
+ print(f"map_physical({paddr:#x}, {size:#x}, {flags}, {dva.val:#x}, {dvasize.val:#x})")
+
+ self.mapid += 1
+ return self.mapid
+
diff --git a/tools/proxyclient/m1n1/fw/dcp/parse_log.py b/tools/proxyclient/m1n1/fw/dcp/parse_log.py
new file mode 100644
index 0000000..e57f637
--- /dev/null
+++ b/tools/proxyclient/m1n1/fw/dcp/parse_log.py
@@ -0,0 +1,43 @@
+# SPDX-License-Identifier: MIT
+
+from m1n1.utils import *
+from m1n1.fw.dcp.ipc import *
+
+def parse_log(fd):
+ op_stack = {}
+ for line in fd:
+ optype, args = line.split(" ", 1)
+ if optype == "CALL":
+ d, msg, chan, off, msg, in_size, out_size, in_data = args.split(" ")
+ op = Call(d, chan, int(off, 0), msg, int(in_size, 0), int(out_size, 0),
+ bytes.fromhex(in_data))
+ op_stack.setdefault(chan, []).append(op)
+ elif optype == "ACK":
+ d, msg, chan, off, out_data = args.split(" ")
+ op = op_stack[chan].pop()
+ assert int(off, 0) == op.off
+ op.ack(bytes.fromhex(out_data))
+ else:
+ raise Exception(f"Unknown log cmd {optype}")
+
+ yield op
+
+def dump_log(fd):
+ nesting = {
+ "": 0,
+ "OOB": 0,
+ }
+ for op in parse_log(fd):
+ ctx = ""
+ if "OOB" in op.chan:
+ ctx = "[OOB] -----------> "
+ if not op.complete:
+ op.print_req(indent=ctx + " " * nesting.setdefault(ctx, 0))
+ nesting[ctx] += 1
+ else:
+ nesting[ctx] -= 1
+ op.print_reply(indent=ctx + " " * nesting.setdefault(ctx, 0))
+
+if __name__ == "__main__":
+ import sys
+ dump_log(open(sys.argv[1]))