summaryrefslogtreecommitdiff
path: root/tools/proxyclient/m1n1/fw/dcp/manager.py
blob: 7977c3a3b924a0fe4a97b8233c3f63376c481ab5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# SPDX-License-Identifier: MIT
import pprint
import struct, functools, time
from dataclasses import dataclass
from enum import IntEnum

from construct.lib import hexundump

from ..asc.base import *
from ...utils import *

from . import ipc
from .dcpep import CallContext

## DCP API manager

class DCPBaseManager:
    def __init__(self, dcpep):
        self.dcpep = dcpep
        self.dcp = dcpep.asc
        dcpep.mgr = self

        self.name_map = {}
        self.tag_map = {}

        self.in_callback = 0

        for k, (cls, v) in ipc.ALL_METHODS.items():
            self.name_map[v.name] = k, v
            self.tag_map[k] = v

    def handle_cb(self, state):
        method = self.tag_map.get(state.tag, None)
        if method is None:
            raise Exception(f"Unknown callback {state.tag}")

        func = getattr(self, method.name, None)

        if func is None:
            raise Exception(f"Unimplemented callback {method!s} [{state.tag}]")

        self.in_callback += 1
        try:
            retval = method.callback(func, state.in_data)
        except Exception as e:
            print(f"Exception in callback {method.name}")
            raise
        self.in_callback -= 1
        return retval

    def __getattr__(self, attr):
        tag, method = self.name_map.get(attr, (None, None))
        if method is None or tag.startswith("D"):
            raise AttributeError(f"Unknown method {attr}")

        out_len = method.out_struct.sizeof()
        if self.in_callback:
            ctx = CallContext.CB
        else:
            ctx = CallContext.CMD
        rpc = functools.partial(self.dcpep.ch_cmd.call, ctx, tag, out_len=out_len)
        return functools.partial(method.call, rpc)

class DCPManager(DCPBaseManager):
    def __init__(self, dcpep, compatible='t8103'):
        super().__init__(dcpep)

        self.iomfb_prop = {}
        self.dcpav_prop = {}
        self.service_prop = {}
        self.pr_prop = {}

        self.swaps = 0
        self.frame = 0

        self.mapid = 0
        self.bufs = {}

        self.compatible = compatible

    ## IOMobileFramebufferAP methods

    def find_swap_function_gated(self):
        pass

    def create_provider_service(self):
        return True

    def create_product_service(self):
        return True

    def create_PMU_service(self):
        return True

    def create_iomfb_service(self):
        return True

    def create_backlight_service(self):
        return False

    def setProperty(self, key, value):
        self.iomfb_prop[key] = value
        print(f"setProperty({key} = {value!r})")
        return True

    setProperty_dict = setProperty_int = setProperty_bool = setProperty_str = setProperty

    def swap_complete_ap_gated(self, swap_id, unkBool, swap_data, swap_info, unkUint):
        swap_data_ptr = "NULL" if swap_data is None else "..."
        print(f"swap_complete_ap_gated({swap_id}, {unkBool}, {swap_data_ptr}, ..., {unkUint}")
        if swap_data is not None:
            chexdump(swap_data)
        chexdump(swap_info)
        self.swaps += 1
        self.frame = swap_id

    def swap_complete_intent_gated(self, swap_id, unkB, unkInt, width, height):
        print(f"swap_complete_intent_gated({swap_id}, {unkB}, {unkInt}, {width}, {height}")
        self.swaps += 1
        self.frame = swap_id

    def enable_backlight_message_ap_gated(self, unkB):
        print(f"enable_backlight_message_ap_gated({unkB})")

    # wrapper for set_digital_out_mode to print information on the setted modes
    def SetDigitalOutMode(self, color_id, timing_id):
        color_mode = [x for x in self.dcpav_prop['ColorElements'] if x['ID'] == color_id][0]
        timing_mode = [x for x in self.dcpav_prop['TimingElements'] if x['ID'] == timing_id][0]
        pprint.pprint(color_mode)
        pprint.pprint(timing_mode)
        self.set_digital_out_mode(color_id, timing_id)

    ## UPPipeAP_H13P methods

    def did_boot_signal(self):
        return True

    def did_power_on_signal(self):
        return True

    def will_power_off_signal(self):
        return

    def rt_bandwidth_setup_ap(self, config):
        print("rt_bandwidth_setup_ap(...)")
        if self.compatible == 't8103':
            config.val = {
                "reg1": 0x23b738014, # reg[5] in disp0/dispext0, plus 0x14 - part of pmgr
                "reg2": 0x23bc3c000, # reg[6] in disp0/dispext0 - part of pmp/pmgr
                "bit": 2,
            }
        elif self.compatible == 't600x':
            config.val = {
                "reg1": 0x28e3d0000 + 0x988, # reg[4] in disp0/dispext0, plus 0x988
                "reg2": 0x0,
                "bit": 0,
            }
        else:
            raise ValueError(self.compatible)

    ## UnifiedPipeline2 methods

    def match_pmu_service(self):
        pass

    def set_number_property(self, key, value):
        pass

    def create_provider_service(self):
        return True

    def is_dark_boot(self):
        return False

    def read_edt_data(self, key, count, value):
        return False

    def UNK_get_some_field(self):
        return 0

    def start_hardware_boot(self):
        self.set_create_DFB()
        self.do_create_default_frame_buffer()
        self.setup_video_limits()
        self.flush_supportsPower(True)
        self.late_init_signal()
        self.setDisplayRefreshProperties()
        return True

    def setDCPAVPropStart(self, length):
        print(f"setDCPAVPropStart({length:#x})")
        self.dcpav_prop_len = length - 1 # off by one?
        self.dcpav_prop_off = 0
        self.dcpav_prop_data = []
        return True

    def setDCPAVPropChunk(self, data, offset, length):
        print(f"setDCPAVPropChunk(..., {offset:#x}, {length:#x})")
        assert offset == self.dcpav_prop_off
        self.dcpav_prop_data.append(data)
        self.dcpav_prop_off += len(data)
        return True

    def setDCPAVPropEnd(self, key):
        print(f"setDCPAVPropEnd({key!r})")
        blob = b"".join(self.dcpav_prop_data)
        assert self.dcpav_prop_len == len(blob)
        self.dcpav_prop[key] = ipc.OSSerialize().parse(blob)
        self.dcpav_prop_data = self.dcpav_prop_len = self.dcpav_prop_off = None
        #pprint.pprint(self.dcpav_prop[key])
        return True

    def set_boolean_property(self, key, value):
        print(f"set {key!r} = {value}")

    def removeProperty(self, key):
        print(f"removeProperty({key!r})")

    def powerstate_notify(self, unk1, unk2):
        print(f"powerstate_notify({unk1}, {unk2})")

    def create_default_fb_surface(self, width, height):
        print(f"create_default_fb_surface({width}x{height})")
        return True

    def powerUpDART(self, unk):
        print(f"powerUpDART({unk})")
        return 0

    def hotPlug_notify_gated(self, unk):
        print(f"hotPlug_notify_gated({unk})")

    def is_waking_from_hibernate(self):
        return False

    ## UPPipe2 methods

    def match_pmu_service_2(self):
        return True

    def match_backlight_service(self):
        return True

    def get_calendar_time_ms(self):
        return time.time_ns() // 1000_000

    def update_backlight_factor_prop(self, value):
        pass

    def map_buf(self, buf, vaddr, dva, unk):
        print(f"map buf {buf}, {unk}")
        paddr, dcpdva, dvasize = self.bufs[buf]
        vaddr.val = 0
        dva.val = self.dcp.disp_dart.iomap(4, paddr, dvasize)
        print(f"mapped to dva {dva}")
        return 0

    def update_backlight_factor_prop(self, unk):
        print(f"update_backlight_factor_prop {unk}")

    ## ServiceRelay methods

    def sr_setProperty(self, obj, key, value):
        self.service_prop.setdefault(obj, {})[key] = value
        print(f"sr_setProperty({obj}/{key} = {value!r})")
        return True

    def sr_getClockFrequency(self, obj, arg):
        print(f"sr_getClockFrequency({obj}, {arg})")
        return 533333328

    sr_setProperty_dict = sr_setProperty_int = sr_setProperty_bool = sr_setProperty_str = sr_setProperty

    def sr_get_uint_prop(self, obj, key, value):
        value.val = 0
        return False

    def sr_set_uint_prop(self, obj, key, value):
        print(f"sr_set_uint_prop({obj}, {key} = {value})")

    def set_fx_prop(self, obj, key, value):
        print(f"set_fx_prop({obj}, {key} = {value})")

    def sr_mapDeviceMemoryWithIndex(self, obj, index, flags, addr, length):
        assert obj == "PROV"
        addr.val, length.val = self.dcp.u.adt["/arm-io/disp0"].get_reg(index)
        print(f"sr_mapDeviceMemoryWithIndex({obj}, {index}, {flags}, {addr.val:#x}, {length.val:#x})")
        return 0

    ## PropRelay methods

    def pr_publish(self, prop_id, value):
        self.pr_prop[prop_id] = value
        print(f"pr_publish({prop_id}, {value!r})")

    ## MemDescRelay methods:

    def allocate_buffer(self, unk0, size, unk1, paddr, dva, dvasize):
        print(f"allocate_buffer({unk0}, {size}, {unk1})")

        dvasize.val = align_up(size, 4096)
        paddr.val = self.dcp.u.memalign(0x4000, size)
        dva.val = self.dcp.dart.iomap(0, paddr.val, size)

        self.mapid += 1
        print(f"Allocating {self.mapid} as {hex(paddr.val)} / {hex(dva.val)}")

        self.bufs[self.mapid] = (paddr.val, dva.val, dvasize.val)

        return self.mapid

    def map_physical(self, paddr, size, flags, dva, dvasize):
        dvasize.val = align_up(size, 4096)
        dva.val = self.dcp.dart.iomap(0, paddr, size)
        print(f"map_physical({paddr:#x}, {size:#x}, {flags}, {dva.val:#x}, {dvasize.val:#x})")

        self.mapid += 1
        return self.mapid