-
Notifications
You must be signed in to change notification settings - Fork 29
/
03_flasher.py
executable file
·154 lines (116 loc) · 4.77 KB
/
03_flasher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python3
import time
import tqdm
import sys
import struct
from argparse import ArgumentParser
from panda import Panda # type: ignore
from tp20 import TP20Transport
from kwp2000 import ACCESS_TYPE, ROUTINE_CONTROL_TYPE, KWP2000Client, SESSION_TYPE, ECU_IDENTIFICATION_TYPE
CHUNK_SIZE = 240
def compute_key(seed):
key = seed
for _ in range(3):
tmp = (key ^ 0x3F_1735) & 0xFFFF_FFFF
key = (tmp + 0xA3FF_7890) & 0xFFFF_FFFF
if key < 0xA3FF_7890:
key = (key >> 1) | (tmp << 0x1F)
key = key & 0xFFFF_FFFF
return key
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("--bus", default=0, type=int, help="CAN bus number to use")
parser.add_argument("--input", required=True, help="input to flash")
parser.add_argument("--start-address", default=0x5E000, type=int, help="start address")
parser.add_argument("--end-address", default=0x5EFFF, type=int, help="end address (inclusive)")
args = parser.parse_args()
with open(args.input, "rb") as input_fw:
input_fw_s = input_fw.read()
assert args.start_address < args.end_address
assert args.end_address < len(input_fw_s)
assert input_fw_s[-4:] != b"Ende", "Firmware is not patched"
print("\n[READY TO FLASH]")
print("WARNING! USE AT YOUR OWN RISK! THIS COULD BREAK YOUR ECU AND REQUIRE REPLACEMENT!")
print("before proceeding:")
print("* put vehicle in park, and accessory mode (your engine should not be running)")
print("* ensure battery is fully charged. A full flash can take up to 15 minutes")
resp = input("continue [y/n]")
if resp.lower() != "y":
sys.exit(1)
p = Panda()
p.can_clear(0xFFFF)
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
print("Connecting...")
tp20 = TP20Transport(p, 0x9, bus=args.bus)
kwp_client = KWP2000Client(tp20)
print("\nEntering programming mode")
kwp_client.diagnostic_session_control(SESSION_TYPE.PROGRAMMING)
print("Done. Waiting to reconnect...")
for i in range(10):
time.sleep(1)
print(f"\nReconnecting... {i}")
p.can_clear(0xFFFF)
try:
tp20 = TP20Transport(p, 0x9, bus=args.bus)
break
except Exception as e:
print(e)
kwp_client = KWP2000Client(tp20)
print("\nReading ecu identification & flash status")
ident = kwp_client.read_ecu_identifcation(ECU_IDENTIFICATION_TYPE.ECU_IDENT)
print("ECU identification", ident)
status = kwp_client.read_ecu_identifcation(ECU_IDENTIFICATION_TYPE.STATUS_FLASH)
print("Flash status", status)
print("\nRequest seed")
seed = kwp_client.security_access(ACCESS_TYPE.PROGRAMMING_REQUEST_SEED)
print(f"seed: {seed.hex()}")
seed_int = struct.unpack(">I", seed)[0]
key_int = compute_key(seed_int)
key = struct.pack(">I", key_int)
print(f"key: {key.hex()}")
print("\n Send key")
kwp_client.security_access(ACCESS_TYPE.PROGRAMMING_SEND_KEY, key)
print("\nRequest download")
size = args.end_address - args.start_address + 1
chunk_size = kwp_client.request_download(args.start_address, size)
print(f"Chunk size: {chunk_size}")
assert chunk_size >= CHUNK_SIZE, "Chosen chunk size too large"
print("\nErase flash")
f_routine = kwp_client.erase_flash(args.start_address, args.end_address)
print("F_routine", f_routine)
print("Done. Waiting to reconnect...")
for i in range(10):
time.sleep(1)
print(f"\nReconnecting... {i}")
p.can_clear(0xFFFF)
try:
tp20 = TP20Transport(p, 0x9, bus=args.bus)
break
except Exception as e:
print(e)
kwp_client = KWP2000Client(tp20)
print("\nRequest erase results")
result = kwp_client.request_routine_results_by_local_identifier(ROUTINE_CONTROL_TYPE.ERASE_FLASH)
assert result == b"\x00", "Erase failed"
print("\nTransfer data")
to_flash = input_fw_s[args.start_address : args.end_address + 1]
checksum = sum(to_flash) & 0xFFFF
progress = tqdm.tqdm(total=len(to_flash))
while to_flash:
chunk = to_flash[:CHUNK_SIZE]
kwp_client.transfer_data(chunk)
# Keep channel alive
tp20.can_send(b"\xa3")
tp20.can_recv()
to_flash = to_flash[CHUNK_SIZE:]
progress.update(CHUNK_SIZE)
print("\nRequest transfer exit")
kwp_client.request_transfer_exit()
print("\nStart checksum check")
kwp_client.calculate_flash_checksum(args.start_address, args.end_address, checksum)
print("\nRequest checksum results")
result = kwp_client.request_routine_results_by_local_identifier(ROUTINE_CONTROL_TYPE.CALCULATE_FLASH_CHECKSUM)
assert result == b"\x00", "Checksum check failed"
print("\nStop communication")
kwp_client.stop_communication()
print("\nDone!")