summaryrefslogtreecommitdiff
path: root/tools/proxyclient/hv/trace_aop.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/proxyclient/hv/trace_aop.py')
-rw-r--r--tools/proxyclient/hv/trace_aop.py349
1 files changed, 349 insertions, 0 deletions
diff --git a/tools/proxyclient/hv/trace_aop.py b/tools/proxyclient/hv/trace_aop.py
new file mode 100644
index 0000000..489c9b3
--- /dev/null
+++ b/tools/proxyclient/hv/trace_aop.py
@@ -0,0 +1,349 @@
+# SPDX-License-Identifier: MIT
+
+from m1n1.trace import Tracer
+from m1n1.trace.dart import DARTTracer
+from m1n1.trace.asc import ASCTracer, EP, EPState, msg, msg_log, DIR, EPContainer
+from m1n1.utils import *
+from m1n1.constructutils import *
+from m1n1.fw.afk.rbep import *
+from m1n1.fw.afk.epic import *
+from m1n1.fw.aop import *
+from m1n1.fw.aop.ipc import *
+
+import sys
+
+class AFKRingBufSniffer(AFKRingBuf):
+ def __init__(self, ep, state, base, size):
+ super().__init__(ep, base, size)
+ self.state = state
+ self.rptr = getattr(state, "rptr", 0)
+
+ def update_rptr(self, rptr):
+ self.state.rptr = rptr
+
+ def update_wptr(self):
+ raise NotImplementedError()
+
+ def get_wptr(self):
+ return struct.unpack("<I", self.read_buf(2 * self.BLOCK_SIZE, 4))[0]
+
+ def read_buf(self, off, size):
+ return self.ep.dart.ioread(0, self.base + off, size)
+
+class AFKEp(EP):
+ BASE_MESSAGE = AFKEPMessage
+
+ def __init__(self, tracer, epid):
+ super().__init__(tracer, epid)
+ self.txbuf = None
+ self.rxbuf = None
+ self.state.txbuf = EPState()
+ self.state.rxbuf = EPState()
+ self.state.shmem_iova = None
+ self.state.txbuf_info = None
+ self.state.rxbuf_info = None
+ self.state.verbose = 1
+
+ def start(self):
+ self.create_bufs()
+
+ def create_bufs(self):
+ if not self.state.shmem_iova:
+ return
+ if not self.txbuf and self.state.txbuf_info:
+ off, size = self.state.txbuf_info
+ self.txbuf = AFKRingBufSniffer(self, self.state.txbuf,
+ self.state.shmem_iova + off, size)
+ if not self.rxbuf and self.state.rxbuf_info:
+ off, size = self.state.rxbuf_info
+ self.rxbuf = AFKRingBufSniffer(self, self.state.rxbuf,
+ self.state.shmem_iova + off, size)
+
+ Init = msg_log(0x80, DIR.TX)
+ Init_Ack = msg_log(0xa0, DIR.RX)
+
+ GetBuf = msg_log(0x89, DIR.RX)
+
+ Shutdown = msg_log(0xc0, DIR.TX)
+ Shutdown_Ack = msg_log(0xc1, DIR.RX)
+
+ @msg(0xa1, DIR.TX, AFKEP_GetBuf_Ack)
+ def GetBuf_Ack(self, msg):
+ self.state.shmem_iova = msg.DVA
+ self.txbuf = None
+ self.rxbuf = None
+ self.state.txbuf = EPState()
+ self.state.rxbuf = EPState()
+ self.state.txbuf_info = None
+ self.state.rxbuf_info = None
+
+ @msg(0xa2, DIR.TX, AFKEP_Send)
+ def Send(self, msg):
+ for data in self.txbuf.read():
+ #if self.state.verbose >= 3:
+ if True:
+ self.log(f"===TX DATA=== epid={self.epid} rptr={self.txbuf.state.rptr:#x}")
+ chexdump(data)
+ self.log(f"===END DATA===")
+ self.log("Backtrace on TX data:")
+ self.hv.bt()
+ self.handle_ipc(data, dir=">")
+ return True
+
+ Hello = msg_log(0xa3, DIR.TX)
+
+ @msg(0x85, DIR.RX, AFKEPMessage)
+ def Recv(self, msg):
+ for data in self.rxbuf.read():
+ #if self.state.verbose >= 3:
+ if True:
+ self.log(f"===RX DATA=== epid={self.epid} rptr={self.rxbuf.state.rptr:#x}")
+ chexdump(data)
+ self.log(f"===END DATA===")
+ self.handle_ipc(data, dir="<")
+ return True
+
+ def handle_ipc(self, data, dir=None):
+ pass
+
+ @msg(0x8a, DIR.RX, AFKEP_InitRB)
+ def InitTX(self, msg):
+ off = msg.OFFSET * AFKRingBuf.BLOCK_SIZE
+ size = msg.SIZE * AFKRingBuf.BLOCK_SIZE
+ self.state.txbuf_info = (off, size)
+ self.create_bufs()
+
+ @msg(0x8b, DIR.RX, AFKEP_InitRB)
+ def InitRX(self, msg):
+ off = msg.OFFSET * AFKRingBuf.BLOCK_SIZE
+ size = msg.SIZE * AFKRingBuf.BLOCK_SIZE
+ self.state.rxbuf_info = (off, size)
+ self.create_bufs()
+
+class DummyAFKEp(AFKEp):
+ def handle_ipc(self, data, dir=None):
+ pass
+
+class EPICEp(AFKEp):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.pending_call = None
+ self.pending_cmd = None
+
+ def handle_hello(self, hdr, sub, fd):
+ if sub.type != 0xc0:
+ return False
+
+ payload = fd.read()
+ name = payload.split(b"\0")[0].decode("ascii")
+ self.log(f"Hello! (endpoint {name})")
+ return True
+
+ def handle_notify(self, hdr, sub, fd):
+ for calltype in CALLTYPES:
+ if calltype.matches(hdr, sub):
+ call = calltype.from_stream(fd)
+ self.trace_call_early(call)
+ self.pending_call = call
+ return True
+
+ return False
+
+ def handle_reply(self, hdr, sub, fd):
+ if self.pending_call is None:
+ return False
+
+ call = self.pending_call
+ call.read_resp(fd)
+ self.trace_call(call)
+ self.pending_call = None
+ return True
+
+ def dispatch_ipc(self, dir, hdr, sub, fd):
+ if sub.category == EPICCategory.COMMAND:
+ return self.handle_notify(hdr, sub, fd)
+ if dir == "<" and sub.category == EPICCategory.REPORT:
+ return self.handle_hello(hdr, sub, fd)
+ if dir == ">" and sub.category == EPICCategory.NOTIFY:
+ return self.handle_notify(hdr, sub, fd)
+ if dir == "<" and sub.category == EPICCategory.REPLY:
+ return self.handle_reply(hdr, sub, fd)
+
+ def handle_ipc(self, data, dir=None):
+ fd = BytesIO(data)
+ hdr = EPICHeader.parse_stream(fd)
+ sub = EPICSubHeaderVer2.parse_stream(fd)
+
+ if not getattr(self, 'VERBOSE', False):
+ return
+
+ self.log(f"{dir} 0x{hdr.channel:x} Type {hdr.type} Ver {hdr.version} Tag {hdr.seq}")
+ self.log(f" Len {sub.length} Ver {sub.version} Cat {sub.category} Type {sub.type:#x} Ts {sub.timestamp:#x}")
+ self.log(f" Unk1 {sub.unk1:#x} Unk2 {sub.unk2:#x}")
+
+ if self.dispatch_ipc(dir, hdr, sub, fd):
+ return
+
+ def trace_call_early(self, call):
+ # called at TX time
+ if isinstance(call, IndirectCall):
+ call.read_txbuf(self)
+
+ def trace_call(self, call):
+ if isinstance(call, IndirectCall):
+ call.read_rxbuf(self)
+ call = call.unwrap()
+ call.dump(self.log)
+
+class SPUAppEp(EPICEp):
+ SHORT = "SPUApp"
+
+class AccelEp(EPICEp):
+ SHORT = "accel"
+
+class GyroEp(EPICEp):
+ SHORT = "gyro"
+
+class LASEp(EPICEp):
+ SHORT = "las"
+
+class WakeHintEp(EPICEp):
+ SHORT = "wakehint"
+
+class UNK26Ep(EPICEp):
+ SHORT = "unk26"
+
+class AudioEp(EPICEp):
+ SHORT = "aop-audio"
+ VERBOSE = True
+
+class VoiceTriggerEp(EPICEp):
+ SHORT = "aop-voicetrigger"
+ VERBOSE = True
+
+
+class AOPTracer(ASCTracer, AOPBase):
+ ENDPOINTS = {
+ 0x20: SPUAppEp,
+ 0x21: AccelEp,
+ 0x22: GyroEp,
+ 0x24: LASEp,
+ 0x25: WakeHintEp,
+ 0x26: UNK26Ep,
+ 0x27: AudioEp,
+ 0x28: VoiceTriggerEp,
+ }
+
+ def __init__(self, hv, devpath, verbose=False):
+ self.default_bootargs = None
+ super().__init__(hv, devpath, verbose)
+ self.u = hv.u
+ AOPBase.__init__(self, hv.u, self.dev)
+
+ def start(self, *args):
+ self.default_bootargs = self.read_bootargs()
+ super().start(*args)
+
+ def w_CPU_CONTROL(self, val):
+ if val.RUN:
+ self.bootargs = self.read_bootargs()
+ self.log("Bootargs patched by AP:")
+ self.default_bootargs.dump_diff(self.bootargs, self.log)
+ self.log("(End of list)")
+ super().w_CPU_CONTROL(val)
+
+ @classmethod
+ def replay(cls, f, passthru=False):
+ epmap = dict()
+ epcont = EPContainer()
+
+ class FakeASCTracer:
+ def __init__(self):
+ self.hv = None
+
+ def log(self, str):
+ print(str)
+ asc_tracer = FakeASCTracer()
+
+ for cls in cls.mro():
+ eps = getattr(cls, "ENDPOINTS", None)
+ if eps is None:
+ break
+ for k, v in eps.items():
+ if k in epmap:
+ continue
+ ep = v(asc_tracer, k)
+ epmap[k] = ep
+ if getattr(epcont, ep.name, None):
+ ep.name = f"{ep.name}{k:02x}"
+ setattr(epcont, ep.name, ep)
+ ep.start()
+
+ def readdump(firstline, hdr, f):
+ l = firstline
+ assert hdr in l
+ postscribe = l[l.index(hdr) + len(hdr):]
+ annotation = dict([s.split("=") for s \
+ in postscribe.strip().split(" ")])
+
+ dump = []
+ for l in f:
+ if "===END DATA===" in l:
+ break
+ dump.append(l)
+ return chexundump("".join(dump)), annotation
+
+ def read_txbuf(icall, ep):
+ hdr = "===COMMAND TX DATA==="
+ for l in f:
+ if hdr in l:
+ break
+ data, annot = readdump(l, hdr, f)
+ assert int(annot["addr"], 16) == icall.args.txbuf
+ icall.txbuf = data
+ def read_rxbuf(icall, ep):
+ hdr = "===COMMAND RX DATA==="
+ for l in f:
+ if hdr in l:
+ break
+ data, annot = readdump(l, hdr, f)
+ assert int(annot["addr"], 16) == icall.rets.rxbuf
+ icall.rxbuf = data
+ IndirectCall.read_rxbuf = read_rxbuf
+ IndirectCall.read_txbuf = read_txbuf
+
+ for l in f:
+ if (rxhdr := "===RX DATA===") in l:
+ dir = "<"
+ hdr = rxhdr
+ elif (txhdr := "===TX DATA===") in l:
+ dir = ">"
+ hdr = txhdr
+ else:
+ if passthru:
+ print(l, end="")
+ continue
+ data, annot = readdump(l, hdr, f)
+ epid = int(annot["epid"])
+ epmap[epid].handle_ipc(data, dir)
+
+
+if __name__ == "__main__":
+ # We can replay traces by saving the textual output of live tracing
+ # and then passing it to this script.
+ with open(sys.argv[1]) as f:
+ AOPTracer.replay(f)
+ sys.exit(0)
+
+dart_aop_tracer = DARTTracer(hv, "/arm-io/dart-aop", verbose=4)
+dart_aop_tracer.start()
+
+dart_aop_base = u.adt["/arm-io/dart-aop"].get_reg(0)[0]
+
+#hv.trace_range(irange(*u.adt["/arm-io/dart-aop"].get_reg(1)))
+#hv.trace_range(irange(*u.adt["/arm-io/aop"].get_reg(1)))
+#hv.trace_range(irange(*u.adt["/arm-io/aop"].get_reg(3)))
+#hv.trace_range(irange(*u.adt["/arm-io/admac-aop-audio"].get_reg(0)))
+
+aop_tracer = AOPTracer(hv, "/arm-io/aop", verbose=1)
+aop_tracer.start(dart_aop_tracer.dart)