diff --git a/tclchecknew.py b/tclchecknew.py new file mode 100644 index 0000000..c835031 --- /dev/null +++ b/tclchecknew.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# pylint: disable=C0111,C0326,C0103 + +"""Checks for the latest FULL or OTA updates for specified PRD number.""" + +import os +import sys + +from tcllib import argparser +from tcllib.devices import Device +from tcllib.dumpmgr import write_info_if_dumps_found +from tcllib.requests import (CheckNewRequest, ChecksumRequest, DownloadRequest, + EncryptHeaderRequest, RequestRunner, + ServerSelector) + + +dpdesc = """ + Checks for the latest FULL updates for the specified PRD number or for an OTA from the + version specified as fvver. + """ +dp = argparser.DefaultParser(__file__, dpdesc) +dp.add_argument("prd", nargs=1, help="CU Reference #, e.g. PRD-63117-011") +dp.add_argument("fvver", nargs="?", help="Firmware version to check for OTA updates, e.g. AAM481 (omit to run FULL check)", default="AAA000") +dp.add_argument("-i", "--imei", help="use specified IMEI instead of default", type=str) +dp.add_argument("-m", "--mode", help="force type of update to check for", default="auto", type=str, choices=["full", "ota"]) +dp.add_argument("-t", "--type", help="force type of check to run", default="auto", type=str, choices=["desktop", "mobile"]) +dp.add_argument("--rawmode", help="override --mode with raw value (2=OTA, 4=FULL)", metavar="MODE") +dp.add_argument("--rawcltp", help="override --type with raw value (10=MOBILE, 2010=DESKTOP)", metavar="CLTP") +args = dp.parse_args(sys.argv[1:]) + +dev = Device(args.prd[0], args.fvver) +dev.imei = "3531510" + + +def sel_mode(txtmode, autoval, rawval): + """Handle custom mode.""" + if rawval: + return rawval + if txtmode == "auto": + return autoval + elif txtmode == "ota": + return dev.MODE_STATES["OTA"] + return dev.MODE_STATES["FULL"] + + +def sel_cltp(txtmode, autoval, rawval): + """Handle custom CLTP.""" + if rawval: + return rawval + if txtmode == "auto": + return autoval + elif txtmode == "desktop": + return dev.CLTP_STATES["DESKTOP"] + return dev.CLTP_STATES["MOBILE"] + + +if args.imei: + print("Use specified IMEI: {}".format(args.imei)) + dev.imei = args.imei + +if args.fvver == "AAA000": + dev.mode = sel_mode(args.mode, dev.MODE_STATES["FULL"], args.rawmode) + dev.cltp = sel_cltp(args.type, dev.CLTP_STATES["DESKTOP"], args.rawcltp) +else: + dev.mode = sel_mode(args.mode, dev.MODE_STATES["OTA"], args.rawmode) + dev.cltp = sel_cltp(args.type, dev.CLTP_STATES["MOBILE"], args.rawcltp) + +print("Mode: {}".format(dev.mode)) +print("CLTP: {}".format(dev.cltp)) + +runner = RequestRunner(ServerSelector()) + +# Check for update +chk = CheckNewRequest(dev) +runner.run(chk) +if not chk.success: + print("{}".format(chk.error)) + sys.exit(2) +chkres = chk.get_result() +print(chkres.pretty_xml()) + +# Request download +dlr = DownloadRequest(dev, chkres.tvver, chkres.fw_id) +runner.run(dlr) +if not dlr.success: + print("{}".format(dlr.error)) + sys.exit(3) +dlrres = dlr.get_result() +print(dlrres.pretty_xml()) + +if dlrres.encslaves: + encrunner = RequestRunner(ServerSelector(dlrres.encslaves), https=False) + cks = ChecksumRequest(dlrres.fileurl, dlrres.fileurl) + encrunner.run(cks) + if not cks.success: + print("{}".format(cks.error)) + sys.exit(4) + cksres = cks.get_result() + print(cksres.pretty_xml()) + +for s in dlrres.slaves: + print("http://{}{}".format(s, dlrres.fileurl)) + +for s in dlrres.s3_slaves: + print("http://{}{}".format(s, dlrres.s3_fileurl)) + +if dev.mode == dev.MODE_STATES["FULL"]: + hdr = EncryptHeaderRequest(dlrres.fileurl) + encrunner.run(hdr) + if not hdr.success: + print("{}".format(hdr.error)) + sys.exit(5) + hdrres = hdr.get_result() + headname = "header_{}.bin".format(chkres.tvver) + headdir = "headers" + if not os.path.exists(headdir): + os.makedirs(headdir) + if len(hdrres.rawdata) == 4194320: + # TODO: Check sha1sum + print("Header length check passed. Writing to {}.".format(headname)) + with open(os.path.join(headdir, headname), "wb") as f: + f.write(hdrres.rawdata) + else: + print("Header length invalid ({}).".format(len(hdrres.rawdata))) + +write_info_if_dumps_found() diff --git a/tcllib/requests/__init__.py b/tcllib/requests/__init__.py index afe17ce..47f8bbc 100644 --- a/tcllib/requests/__init__.py +++ b/tcllib/requests/__init__.py @@ -4,6 +4,7 @@ """Library for generic TCL API requests.""" from .checkrequest import CheckRequest +from .checknewrequest import CheckNewRequest from .checksumrequest import ChecksumRequest from .downloadrequest import DownloadRequest from .encryptheaderrequest import EncryptHeaderRequest diff --git a/tcllib/requests/checknewrequest.py b/tcllib/requests/checknewrequest.py new file mode 100644 index 0000000..1ef47dd --- /dev/null +++ b/tcllib/requests/checknewrequest.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +"""Generic update check request.""" + +import binascii +import hashlib +import random +import time +import zlib +from collections import OrderedDict +from math import floor + +from .. import devices +from .tclrequest import TclRequest +from .tclresult import CheckResult + +VDKEY = "010010000110111101110111001000000110000101110010011001010010000001111001011011110111010100100000011001110110010101110100001000000111010001101000011010010111001100100000011010110110010101111001001000000111011101101111011100100110010000111111" +VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n" + +def get_salt(): + """Generate cryptographic salt.""" + millis = floor(time.time() * 1000) + tail = "{:06d}".format(random.randint(0, 999999)) + return "{}{}".format(millis, tail) + + +def get_vk2(params_dict, cltp): + """Generate salted hash of API parameters.""" + #params_dict["cltp"] = cltp + query = "" + for key, val in params_dict.items(): + if query: + query += "&" + query += key + "=" + str(val) + #vdk = zlib.decompress(binascii.a2b_base64(VDKEY_B64Z)) + #query += vdk.decode("utf-8") + query += VDKEY + engine = hashlib.sha1() + engine.update(bytes(query, "utf-8")) + hexhash = engine.hexdigest() + return hexhash + +class CheckNewRequest(TclRequest): + """Generic update check request.""" + + def __init__(self, device: devices.Device): + """Populate variables..""" + super().__init__() + self.uri = "/check_new.php" + self.method = "GET" + self.device = device + + def get_headers(self): + """Return request headers.""" + return {"User-Agent": "GOTU Client v10.1.1"} + + def get_params(self): + """Return request parameters.""" + params = OrderedDict() + params["id"] = self.device.imei + params["salt"] = get_salt() + params["curef"] = self.device.curef + params["fv"] = self.device.fwver + params["type"] = self.device.type + params["mode"] = self.device.mode + params["cltp"] = self.device.cltp + params["vk"] = get_vk2(params, self.device.cltp) + #params["cktp"] = self.device.cktp + #params["rtd"] = self.device.rtd + #params["chnl"] = self.device.chnl + #params["osvs"] = self.device.osvs + #params["ckot"] = self.device.ckot + print(repr(params)) + return params + + def is_done(self, http_status: int, contents: str) -> bool: + """Handle request result.""" + ok_states = { + 204: "No update available.", + 404: "No data for requested CUREF/FV combination.", + } + if http_status == 200: + self.response = contents + self.result = CheckResult(contents) + self.success = True + return True + elif http_status in ok_states: + self.error = ok_states[http_status] + self.success = False + return True + elif http_status not in [500, 502, 503]: + # Errors OTHER than 500, 502 or 503 are probably + # errors where we don't need to retry + self.error = "HTTP {}.".format(http_status) + self.success = False + return True + return False + +# Check requests have 4 possible outcomes: +# 1. HTTP 200 with XML data - our desired info +# 2. HTTP 204 - means: no newer update available +# 3. HTTP 404 - means: invalid device or firmware version +# 4. anything else: server problem (esp. 500, 502, 503)