Added tclchecknew.py and CheckNewRequest
This commit is contained in:
128
tclchecknew.py
Normal file
128
tclchecknew.py
Normal file
@@ -0,0 +1,128 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# pylint: disable=C0111,C0326,C0103
|
||||
|
||||
"""Checks for the latest FULL or OTA updates for specified PRD number."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from tcllib import argparser
|
||||
from tcllib.devices import Device
|
||||
from tcllib.dumpmgr import write_info_if_dumps_found
|
||||
from tcllib.requests import (CheckNewRequest, ChecksumRequest, DownloadRequest,
|
||||
EncryptHeaderRequest, RequestRunner,
|
||||
ServerSelector)
|
||||
|
||||
|
||||
dpdesc = """
|
||||
Checks for the latest FULL updates for the specified PRD number or for an OTA from the
|
||||
version specified as fvver.
|
||||
"""
|
||||
dp = argparser.DefaultParser(__file__, dpdesc)
|
||||
dp.add_argument("prd", nargs=1, help="CU Reference #, e.g. PRD-63117-011")
|
||||
dp.add_argument("fvver", nargs="?", help="Firmware version to check for OTA updates, e.g. AAM481 (omit to run FULL check)", default="AAA000")
|
||||
dp.add_argument("-i", "--imei", help="use specified IMEI instead of default", type=str)
|
||||
dp.add_argument("-m", "--mode", help="force type of update to check for", default="auto", type=str, choices=["full", "ota"])
|
||||
dp.add_argument("-t", "--type", help="force type of check to run", default="auto", type=str, choices=["desktop", "mobile"])
|
||||
dp.add_argument("--rawmode", help="override --mode with raw value (2=OTA, 4=FULL)", metavar="MODE")
|
||||
dp.add_argument("--rawcltp", help="override --type with raw value (10=MOBILE, 2010=DESKTOP)", metavar="CLTP")
|
||||
args = dp.parse_args(sys.argv[1:])
|
||||
|
||||
dev = Device(args.prd[0], args.fvver)
|
||||
dev.imei = "3531510"
|
||||
|
||||
|
||||
def sel_mode(txtmode, autoval, rawval):
|
||||
"""Handle custom mode."""
|
||||
if rawval:
|
||||
return rawval
|
||||
if txtmode == "auto":
|
||||
return autoval
|
||||
elif txtmode == "ota":
|
||||
return dev.MODE_STATES["OTA"]
|
||||
return dev.MODE_STATES["FULL"]
|
||||
|
||||
|
||||
def sel_cltp(txtmode, autoval, rawval):
|
||||
"""Handle custom CLTP."""
|
||||
if rawval:
|
||||
return rawval
|
||||
if txtmode == "auto":
|
||||
return autoval
|
||||
elif txtmode == "desktop":
|
||||
return dev.CLTP_STATES["DESKTOP"]
|
||||
return dev.CLTP_STATES["MOBILE"]
|
||||
|
||||
|
||||
if args.imei:
|
||||
print("Use specified IMEI: {}".format(args.imei))
|
||||
dev.imei = args.imei
|
||||
|
||||
if args.fvver == "AAA000":
|
||||
dev.mode = sel_mode(args.mode, dev.MODE_STATES["FULL"], args.rawmode)
|
||||
dev.cltp = sel_cltp(args.type, dev.CLTP_STATES["DESKTOP"], args.rawcltp)
|
||||
else:
|
||||
dev.mode = sel_mode(args.mode, dev.MODE_STATES["OTA"], args.rawmode)
|
||||
dev.cltp = sel_cltp(args.type, dev.CLTP_STATES["MOBILE"], args.rawcltp)
|
||||
|
||||
print("Mode: {}".format(dev.mode))
|
||||
print("CLTP: {}".format(dev.cltp))
|
||||
|
||||
runner = RequestRunner(ServerSelector())
|
||||
|
||||
# Check for update
|
||||
chk = CheckNewRequest(dev)
|
||||
runner.run(chk)
|
||||
if not chk.success:
|
||||
print("{}".format(chk.error))
|
||||
sys.exit(2)
|
||||
chkres = chk.get_result()
|
||||
print(chkres.pretty_xml())
|
||||
|
||||
# Request download
|
||||
dlr = DownloadRequest(dev, chkres.tvver, chkres.fw_id)
|
||||
runner.run(dlr)
|
||||
if not dlr.success:
|
||||
print("{}".format(dlr.error))
|
||||
sys.exit(3)
|
||||
dlrres = dlr.get_result()
|
||||
print(dlrres.pretty_xml())
|
||||
|
||||
if dlrres.encslaves:
|
||||
encrunner = RequestRunner(ServerSelector(dlrres.encslaves), https=False)
|
||||
cks = ChecksumRequest(dlrres.fileurl, dlrres.fileurl)
|
||||
encrunner.run(cks)
|
||||
if not cks.success:
|
||||
print("{}".format(cks.error))
|
||||
sys.exit(4)
|
||||
cksres = cks.get_result()
|
||||
print(cksres.pretty_xml())
|
||||
|
||||
for s in dlrres.slaves:
|
||||
print("http://{}{}".format(s, dlrres.fileurl))
|
||||
|
||||
for s in dlrres.s3_slaves:
|
||||
print("http://{}{}".format(s, dlrres.s3_fileurl))
|
||||
|
||||
if dev.mode == dev.MODE_STATES["FULL"]:
|
||||
hdr = EncryptHeaderRequest(dlrres.fileurl)
|
||||
encrunner.run(hdr)
|
||||
if not hdr.success:
|
||||
print("{}".format(hdr.error))
|
||||
sys.exit(5)
|
||||
hdrres = hdr.get_result()
|
||||
headname = "header_{}.bin".format(chkres.tvver)
|
||||
headdir = "headers"
|
||||
if not os.path.exists(headdir):
|
||||
os.makedirs(headdir)
|
||||
if len(hdrres.rawdata) == 4194320:
|
||||
# TODO: Check sha1sum
|
||||
print("Header length check passed. Writing to {}.".format(headname))
|
||||
with open(os.path.join(headdir, headname), "wb") as f:
|
||||
f.write(hdrres.rawdata)
|
||||
else:
|
||||
print("Header length invalid ({}).".format(len(hdrres.rawdata)))
|
||||
|
||||
write_info_if_dumps_found()
|
@@ -4,6 +4,7 @@
|
||||
"""Library for generic TCL API requests."""
|
||||
|
||||
from .checkrequest import CheckRequest
|
||||
from .checknewrequest import CheckNewRequest
|
||||
from .checksumrequest import ChecksumRequest
|
||||
from .downloadrequest import DownloadRequest
|
||||
from .encryptheaderrequest import EncryptHeaderRequest
|
||||
|
104
tcllib/requests/checknewrequest.py
Normal file
104
tcllib/requests/checknewrequest.py
Normal file
@@ -0,0 +1,104 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""Generic update check request."""
|
||||
|
||||
import binascii
|
||||
import hashlib
|
||||
import random
|
||||
import time
|
||||
import zlib
|
||||
from collections import OrderedDict
|
||||
from math import floor
|
||||
|
||||
from .. import devices
|
||||
from .tclrequest import TclRequest
|
||||
from .tclresult import CheckResult
|
||||
|
||||
VDKEY = "010010000110111101110111001000000110000101110010011001010010000001111001011011110111010100100000011001110110010101110100001000000111010001101000011010010111001100100000011010110110010101111001001000000111011101101111011100100110010000111111"
|
||||
VDKEY_B64Z = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
|
||||
|
||||
def get_salt():
|
||||
"""Generate cryptographic salt."""
|
||||
millis = floor(time.time() * 1000)
|
||||
tail = "{:06d}".format(random.randint(0, 999999))
|
||||
return "{}{}".format(millis, tail)
|
||||
|
||||
|
||||
def get_vk2(params_dict, cltp):
|
||||
"""Generate salted hash of API parameters."""
|
||||
#params_dict["cltp"] = cltp
|
||||
query = ""
|
||||
for key, val in params_dict.items():
|
||||
if query:
|
||||
query += "&"
|
||||
query += key + "=" + str(val)
|
||||
#vdk = zlib.decompress(binascii.a2b_base64(VDKEY_B64Z))
|
||||
#query += vdk.decode("utf-8")
|
||||
query += VDKEY
|
||||
engine = hashlib.sha1()
|
||||
engine.update(bytes(query, "utf-8"))
|
||||
hexhash = engine.hexdigest()
|
||||
return hexhash
|
||||
|
||||
class CheckNewRequest(TclRequest):
|
||||
"""Generic update check request."""
|
||||
|
||||
def __init__(self, device: devices.Device):
|
||||
"""Populate variables.."""
|
||||
super().__init__()
|
||||
self.uri = "/check_new.php"
|
||||
self.method = "GET"
|
||||
self.device = device
|
||||
|
||||
def get_headers(self):
|
||||
"""Return request headers."""
|
||||
return {"User-Agent": "GOTU Client v10.1.1"}
|
||||
|
||||
def get_params(self):
|
||||
"""Return request parameters."""
|
||||
params = OrderedDict()
|
||||
params["id"] = self.device.imei
|
||||
params["salt"] = get_salt()
|
||||
params["curef"] = self.device.curef
|
||||
params["fv"] = self.device.fwver
|
||||
params["type"] = self.device.type
|
||||
params["mode"] = self.device.mode
|
||||
params["cltp"] = self.device.cltp
|
||||
params["vk"] = get_vk2(params, self.device.cltp)
|
||||
#params["cktp"] = self.device.cktp
|
||||
#params["rtd"] = self.device.rtd
|
||||
#params["chnl"] = self.device.chnl
|
||||
#params["osvs"] = self.device.osvs
|
||||
#params["ckot"] = self.device.ckot
|
||||
print(repr(params))
|
||||
return params
|
||||
|
||||
def is_done(self, http_status: int, contents: str) -> bool:
|
||||
"""Handle request result."""
|
||||
ok_states = {
|
||||
204: "No update available.",
|
||||
404: "No data for requested CUREF/FV combination.",
|
||||
}
|
||||
if http_status == 200:
|
||||
self.response = contents
|
||||
self.result = CheckResult(contents)
|
||||
self.success = True
|
||||
return True
|
||||
elif http_status in ok_states:
|
||||
self.error = ok_states[http_status]
|
||||
self.success = False
|
||||
return True
|
||||
elif http_status not in [500, 502, 503]:
|
||||
# Errors OTHER than 500, 502 or 503 are probably
|
||||
# errors where we don't need to retry
|
||||
self.error = "HTTP {}.".format(http_status)
|
||||
self.success = False
|
||||
return True
|
||||
return False
|
||||
|
||||
# Check requests have 4 possible outcomes:
|
||||
# 1. HTTP 200 with XML data - our desired info
|
||||
# 2. HTTP 204 - means: no newer update available
|
||||
# 3. HTTP 404 - means: invalid device or firmware version
|
||||
# 4. anything else: server problem (esp. 500, 502, 503)
|
Reference in New Issue
Block a user