1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# SPDX-License-Identifier: MIT
from ...utils import *
# System endpoints
def msg_handler(message, regtype=None):
def f(x):
x.is_message = True
x.message = message
x.regtype = regtype
return x
return f
class ASCMessage1(Register64):
EP = 7, 0
class ASCTimeout(Exception):
pass
class ASCBaseEndpoint:
BASE_MESSAGE = Register64
SHORT = None
def __init__(self, asc, epnum, name=None):
self.asc = asc
self.epnum = epnum
self.name = name or self.SHORT or f"{type(self).__name__}@{epnum:#x}"
self.msghandler = {}
self.msgtypes = {}
for name in dir(self):
i = getattr(self, name)
if not callable(i):
continue
if not getattr(i, "is_message", False):
continue
self.msghandler[i.message] = i
self.msgtypes[i.message] = i.regtype if i.regtype else self.BASE_MESSAGE
def handle_msg(self, msg0, msg1):
msg0 = self.BASE_MESSAGE(msg0)
handler = self.msghandler.get(msg0.TYPE, None)
regtype = self.msgtypes.get(msg0.TYPE, self.BASE_MESSAGE)
if handler is None:
return False
return handler(regtype(msg0.value))
def send(self, msg):
self.asc.send(msg, ASCMessage1(EP=self.epnum))
def start(self):
pass
def stop(self):
pass
def log(self, msg):
print(f"[{self.name}] {msg}")
|