|
1 | 1 | from __future__ import annotations
|
2 | 2 |
|
| 3 | +import collections |
3 | 4 | import hashlib
|
4 | 5 | import json
|
5 | 6 | import logging
|
6 | 7 | import pathlib
|
| 8 | +import subprocess |
7 | 9 |
|
8 | 10 | import click
|
| 11 | +import zigpy.types as t |
9 | 12 | from zigpy.ota.image import ElementTagId, HueSBLOTAImage, parse_ota_image
|
10 | 13 | from zigpy.ota.validators import validate_ota_image
|
| 14 | +from zigpy.types.named import _hex_string_to_bytes |
| 15 | +from zigpy.util import convert_install_code as zigpy_convert_install_code |
11 | 16 |
|
12 | 17 | from zigpy_cli.cli import cli
|
| 18 | +from zigpy_cli.common import HEX_OR_DEC_INT |
13 | 19 |
|
14 | 20 | LOGGER = logging.getLogger(__name__)
|
15 | 21 |
|
16 | 22 |
|
| 23 | +def convert_install_code(text: str) -> t.KeyData: |
| 24 | + code = _hex_string_to_bytes(text) |
| 25 | + key = zigpy_convert_install_code(code) |
| 26 | + |
| 27 | + if key is None: |
| 28 | + raise ValueError(f"Invalid install code: {text!r}") |
| 29 | + |
| 30 | + return key |
| 31 | + |
| 32 | + |
17 | 33 | @cli.group()
|
18 | 34 | def ota():
|
19 | 35 | pass
|
@@ -125,3 +141,152 @@ def generate_index(ctx, ota_url_root, output, files):
|
125 | 141 |
|
126 | 142 | json.dump(ota_metadata, output, indent=4)
|
127 | 143 | output.write("\n")
|
| 144 | + |
| 145 | + |
| 146 | +@ota.command() |
| 147 | +@click.pass_context |
| 148 | +@click.option( |
| 149 | + "--add-network-key", "network_keys", type=t.KeyData.convert, multiple=True |
| 150 | +) |
| 151 | +@click.option( |
| 152 | + "--add-install-code", "install_codes", type=convert_install_code, multiple=True |
| 153 | +) |
| 154 | +@click.option("--fill-byte", type=HEX_OR_DEC_INT, default=0xAB) |
| 155 | +@click.option( |
| 156 | + "--output-root", |
| 157 | + type=click.Path(file_okay=False, dir_okay=True, path_type=pathlib.Path), |
| 158 | + required=True, |
| 159 | +) |
| 160 | +@click.argument("files", nargs=-1, type=pathlib.Path) |
| 161 | +def reconstruct_from_pcaps( |
| 162 | + ctx, network_keys, install_codes, fill_byte, output_root, files |
| 163 | +): |
| 164 | + for code in install_codes: |
| 165 | + print(f"Using key derived from install code: {code}") |
| 166 | + |
| 167 | + network_keys = ( |
| 168 | + [ |
| 169 | + # ZigBeeAlliance09 |
| 170 | + t.KeyData.convert("5A:69:67:42:65:65:41:6C:6C:69:61:6E:63:65:30:39"), |
| 171 | + # Z2M default |
| 172 | + t.KeyData.convert("01:03:05:07:09:0B:0D:0F:00:02:04:06:08:0A:0C:0D"), |
| 173 | + ] |
| 174 | + + list(network_keys) |
| 175 | + + list(install_codes) |
| 176 | + ) |
| 177 | + |
| 178 | + keys = "\n".join( |
| 179 | + [f'"{k}","Normal","Network Key {i + 1}"' for i, k in enumerate(network_keys)] |
| 180 | + ) |
| 181 | + packets = [] |
| 182 | + |
| 183 | + for f in files: |
| 184 | + proc = subprocess.run( |
| 185 | + [ |
| 186 | + "tshark", |
| 187 | + "-o", |
| 188 | + f"uat:zigbee_pc_keys:{keys}", |
| 189 | + "-r", |
| 190 | + str(f), |
| 191 | + "-T", |
| 192 | + "json", |
| 193 | + ], |
| 194 | + capture_output=True, |
| 195 | + ) |
| 196 | + |
| 197 | + obj = json.loads(proc.stdout) |
| 198 | + packets.extend(p["_source"]["layers"] for p in obj) |
| 199 | + |
| 200 | + ota_sizes = {} |
| 201 | + ota_chunks = collections.defaultdict(set) |
| 202 | + |
| 203 | + for packet in packets: |
| 204 | + if "zbee_zcl" not in packet: |
| 205 | + continue |
| 206 | + |
| 207 | + # Ignore non-OTA packets |
| 208 | + if packet["zbee_aps"]["zbee_aps.cluster"] != "0x0019": |
| 209 | + continue |
| 210 | + |
| 211 | + if ( |
| 212 | + packet.get("zbee_zcl", {}) |
| 213 | + .get("Payload", {}) |
| 214 | + .get("zbee_zcl_general.ota.status") |
| 215 | + == "0x00" |
| 216 | + ): |
| 217 | + zcl = packet["zbee_zcl"]["Payload"] |
| 218 | + image_version = zcl["zbee_zcl_general.ota.file.version"] |
| 219 | + image_type = zcl["zbee_zcl_general.ota.image.type"] |
| 220 | + image_manuf_code = zcl["zbee_zcl_general.ota.manufacturer_code"] |
| 221 | + |
| 222 | + image_key = (image_version, image_type, image_manuf_code) |
| 223 | + |
| 224 | + if "zbee_zcl_general.ota.image.size" in zcl: |
| 225 | + image_size = int(zcl["zbee_zcl_general.ota.image.size"]) |
| 226 | + ota_sizes[image_key] = image_size |
| 227 | + elif "zbee_zcl_general.ota.image.data" in zcl: |
| 228 | + offset = int(zcl["zbee_zcl_general.ota.file.offset"]) |
| 229 | + data = bytes.fromhex( |
| 230 | + zcl["zbee_zcl_general.ota.image.data"].replace(":", "") |
| 231 | + ) |
| 232 | + |
| 233 | + ota_chunks[image_key].add((offset, data)) |
| 234 | + |
| 235 | + for image_key, image_size in ota_sizes.items(): |
| 236 | + image_version, image_type, image_manuf_code = image_key |
| 237 | + print( |
| 238 | + f"Constructing image type={image_type}, version={image_version}" |
| 239 | + f", manuf_code={image_manuf_code}: {image_size} bytes" |
| 240 | + ) |
| 241 | + |
| 242 | + buffer = [None] * image_size |
| 243 | + |
| 244 | + for offset, chunk in sorted(ota_chunks[image_key]): |
| 245 | + current_value = buffer[offset : offset + len(chunk)] |
| 246 | + |
| 247 | + if ( |
| 248 | + all(c is not None for c in buffer[offset : offset + len(chunk)]) |
| 249 | + and current_value != chunk |
| 250 | + ): |
| 251 | + LOGGER.error( |
| 252 | + f"Inconsistent {len(chunk)} bytes starting at offset" |
| 253 | + f" 0x{offset:08X}: was {current_value}, now {chunk}" |
| 254 | + ) |
| 255 | + |
| 256 | + buffer[offset : offset + len(chunk)] = chunk |
| 257 | + |
| 258 | + missing_indices = [o for o, v in enumerate(buffer) if v is None] |
| 259 | + missing_ranges = [] |
| 260 | + |
| 261 | + # For readability, combine the list of missing indices into a list of ranges |
| 262 | + if missing_indices: |
| 263 | + start = missing_indices[0] |
| 264 | + count = 0 |
| 265 | + |
| 266 | + for i in missing_indices[1:]: |
| 267 | + if i == start + count + 1: |
| 268 | + count += 1 |
| 269 | + else: |
| 270 | + missing_ranges.append((start, count + 1)) |
| 271 | + start = i |
| 272 | + count = 0 |
| 273 | + |
| 274 | + if count > 0: |
| 275 | + missing_ranges.append((start, count + 1)) |
| 276 | + |
| 277 | + for start, count in missing_ranges: |
| 278 | + LOGGER.error( |
| 279 | + f"Missing {count} bytes starting at offset 0x{start:08X}:" |
| 280 | + f" filling with 0x{fill_byte:02X}" |
| 281 | + ) |
| 282 | + buffer[start : start + count] = [fill_byte] * count |
| 283 | + |
| 284 | + filename = output_root / ( |
| 285 | + f"ota_t{image_type}_m{image_manuf_code}_v{image_version}" |
| 286 | + f"{'_partial' if missing_ranges else ''}.ota" |
| 287 | + ) |
| 288 | + |
| 289 | + output_root.mkdir(exist_ok=True) |
| 290 | + filename.write_bytes(bytes(buffer)) |
| 291 | + |
| 292 | + info.callback([filename]) |
0 commit comments