from requests import get, post, put, packages import requests from re import findall from os.path import exists import json import os import sys,re import random,time import base64 import hashlib import urllib.parse import uuid packages.urllib3.disable_warnings() from urllib.parse import unquote """ new Env('wskey本地转换'); cron 57 21,9 * * * """ hadsend=True UserAgent="" def printf(text): print(text) sys.stdout.flush() def randomuserAgent(): global struuid,addressid,iosVer,iosV,clientVersion,iPhone,area,ADID,lng,lat global UserAgent struuid=''.join(random.sample(['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z','0','1','2','3','4','5','6','7','8','9','a','b','c','z'], 40)) addressid = ''.join(random.sample('1234567898647', 10)) iosVer = ''.join(random.sample(["15.1.1","14.5.1", "14.4", "14.3", "14.2", "14.1", "14.0.1"], 1)) iosV = iosVer.replace('.', '_') clientVersion=''.join(random.sample(["10.3.0", "10.2.7", "10.2.4"], 1)) iPhone = ''.join(random.sample(["8", "9", "10", "11", "12", "13"], 1)) area=''.join(random.sample('0123456789', 2)) + '_' + ''.join(random.sample('0123456789', 4)) + '_' + ''.join(random.sample('0123456789', 5)) + '_' + ''.join(random.sample('0123456789', 5)) ADID = ''.join(random.sample('0987654321ABCDEF', 8)) + '-' + ''.join(random.sample('0987654321ABCDEF', 4)) + '-' + ''.join(random.sample('0987654321ABCDEF', 4)) + '-' + ''.join(random.sample('0987654321ABCDEF', 4)) + '-' + ''.join(random.sample('0987654321ABCDEF', 12)) lng='119.31991256596'+str(random.randint(100,999)) lat='26.1187118976'+str(random.randint(100,999)) UserAgent=f'jdapp;iPhone;10.0.4;{iosVer};{uuid};network/wifi;ADID/{ADID};model/iPhone{iPhone},1;addressid/{addressid};appBuild/167707;jdSupportDarkMode/0;Mozilla/5.0 (iPhone; CPU iPhone OS {iosV} like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/null;supportJDSHWK/1' def load_send(): global send global hadsend cur_path = os.path.abspath(os.path.dirname(__file__)) sys.path.append(cur_path) if os.path.exists(cur_path + "/sendNotify.py"): try: from sendNotify import send hadsend=True except: printf("加载sendNotify.py的通知服务失败,请检查~") hadsend=False else: printf("加载通知服务失败,缺少sendNotify.py文件") hadsend=False load_send() def send_notification(title, content,summary): # Add your own WxPusher API key here api_key = os.environ["WP_APP_TOKEN_ONE"] uids= os.environ["WP_APP_MAIN_UID"] desp = '''

texttext

📢

despdesp

''' desp=desp.replace("texttext",title) desp=desp.replace("despdesp" ,content.replace("\n", '
')) payload = {"appToken": api_key, "content": desp, "summary": title+"\n"+summary, "contentType": 2, "uids": [uids] } # Send the request res = requests.post('http://wxpusher.zjiecode.com/api/send/message', json=payload, timeout=15).json() if res["code"]==1000: printf("WxPusher 发送通知消息成功!") else: printf(res.text) def randomstr(num): randomstr = ''.join(str(uuid.uuid4()).split('-'))[num:] return randomstr def randomstr1(num): randomstr = "" for i in range(num): randomstr = randomstr + random.choice("abcdefghijklmnopqrstuvwxyz0123456789") return randomstr def sign_core(inarg): key = b'80306f4370b39fd5630ad0529f77adb6' mask = [0x37, 0x92, 0x44, 0x68, 0xA5, 0x3D, 0xCC, 0x7F, 0xBB, 0xF, 0xD9, 0x88, 0xEE, 0x9A, 0xE9, 0x5A] array = [0 for _ in range(len(inarg))] for i in range(len(inarg)): r0 = int(inarg[i]) r2 = mask[i & 0xf] r4 = int(key[i & 7]) r0 = r2 ^ r0 r0 = r0 ^ r4 r0 = r0 + r2 r2 = r2 ^ r0 r1 = int(key[i & 7]) r2 = r2 ^ r1 array[i] = r2 & 0xff return bytes(array) def base64Encode(string): return base64.b64encode(string.encode("utf-8")).decode('utf-8').translate(str.maketrans("KLMNOPQRSTABCDEFGHIJUVWXYZabcdopqrstuvwxefghijklmnyz0123456789+/", "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/")) def base64Decode(string): return base64.b64decode(string.translate(str.maketrans("KLMNOPQRSTABCDEFGHIJUVWXYZabcdopqrstuvwxefghijklmnyz0123456789+/", "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"))).decode('utf-8') def randomeid(): return 'eidAaf8081218as20a2GM%s7FnfQYOecyDYLcd0rfzm3Fy2ePY4UJJOeV0Ub840kG8C7lmIqt3DTlc11fB/s4qsAP8gtPTSoxu' % randomstr1(20) def get_ep(jduuid : str=''): if not jduuid: jduuid = randomstr(16) ts = str(int(time.time() * 1000)) bsjduuid = base64Encode(jduuid) area = base64Encode('%s_%s_%s_%s' % ( random.randint(1, 10000), random.randint(1, 10000), random.randint(1, 10000), random.randint(1, 10000))) d_model = random.choice(['Mi11Ultra', 'Mi11', 'Mi10']) d_model = base64Encode(d_model) return '{"hdid":"JM9F1ywUPwflvMIpYPok0tt5k9kW4ArJEU3lfLhxBqw=","ts":%s,"ridx":-1,"cipher":{"area":"%s","d_model":"%s","wifiBssid":"dW5hbw93bq==","osVersion":"CJS=","d_brand":"WQvrb21f","screen":"CtS1DIenCNqm","uuid":"%s","aid":"%s","openudid":"%s"},"ciphertype":5,"version":"1.2.0","appname":"com.jingdong.app.mall"}' % ( int(ts) - random.randint(100, 1000), area, d_model, bsjduuid, bsjduuid, bsjduuid), jduuid, ts def get_sign(functionId, body, client : str="android", clientVersion : str='11.2.8',jduuid : str='') -> dict: if isinstance(body,dict): d=body body=json.dumps(body) else: d=json.loads(body) if "eid" in d: eid=d["eid"] else: eid=randomeid() ep, suid, st = get_ep(jduuid) sv = random.choice(["102", "111", "120"]) all_arg = "functionId=%s&body=%s&uuid=%s&client=%s&clientVersion=%s&st=%s&sv=%s" % (functionId, body, suid, client, clientVersion, st, sv) back_bytes = sign_core(str.encode(all_arg)) sign = hashlib.md5(base64.b64encode(back_bytes)).hexdigest() convertUrl='functionId=%s&body=%s&clientVersion=%s&client=%s&sdkVersion=31&lang=zh_CN&harmonyOs=0&networkType=wifi&oaid=%s&eid=%s&ef=1&ep=%s&st=%s&sign=%s&sv=%s' % (functionId,body, clientVersion, client, suid, eid, urllib.parse.quote(ep), st, sign, sv) return convertUrl def getcookie_wskey(key): body = "body=%7B%22to%22%3A%22https%3A//plogin.m.jd.com/jd-mlogin/static/html/appjmp_blank.html%22%7D" pin = findall("pin=([^;]*);", key)[0] for num in range(0,20): sign = get_sign("genToken",{"url": "https://plogin.m.jd.com/jd-mlogin/static/html/appjmp_blank.html"},"android","11.2.8") url = f"https://api.m.jd.com/client.action?functionId=genToken&{sign}" headers = { "cookie": key, 'user-agent': UserAgent, 'accept-language': 'zh-Hans-CN;q=1, en-CN;q=0.9', 'content-type': 'application/x-www-form-urlencoded;' } try: token = post(url=url, headers=headers, data=body, verify=False).json() token=token['tokenKey'] except Exception as error: printf(f"【错误】{unquote(pin)}在获取token时:\n{error}") return pin, "False" if token!="xxx": break else: printf(f"【警告】{unquote(pin)}在获取token时失败,等待5秒后重试") time.sleep(5) if token=="xxx": printf(f"【错误】{unquote(pin)}在获取token时失败,跳过") return "Error" url = 'https://un.m.jd.com/cgi-bin/app/appjmp' params = { 'tokenKey': token, 'to': 'https://plogin.m.jd.com/cgi-bin/m/thirdapp_auth_page', 'client_type': 'android', 'appid': 879, 'appup_type': 1, } try: res = get(url=url, params=params, verify=False, allow_redirects=False).cookies.get_dict() except Exception as error: printf(f"【错误】{unquote(pin)}在获取cookie时:\n{error}") return "Error" try: if "app_open" in res['pt_key']: cookie = f"pt_key={res['pt_key']};pt_pin={res['pt_pin']};" return cookie else: return ("Error:"+str(res)) except Exception as error: printf(f"【错误】{unquote(pin)}在获取cookie时:\n{str(res)}") return "Error" def subcookie(pt_pin, cookie, token): if True: reamrk="" if token!="": strptpin=pt_pin if re.search('%', strptpin): strptpin = unquote(strptpin, 'utf-8') url = 'http://127.0.0.1:5600/api/envs' headers = {'Authorization': f'Bearer {token}'} body = { 'searchValue': pt_pin, 'Authorization': f'Bearer {token}' } datas = get(url, params=body, headers=headers).json()['data'] old = False isline=True for data in datas: if "pt_key" in data['value']: try: body = {"name": "JD_COOKIE", "value": cookie, "_id": data['_id']} except: body = {"name": "JD_COOKIE", "value": cookie, "id": data['id']} isline=False old = True try: reamrk=data['remarks'] except: reamrk="" if reamrk!="" and not reamrk is None: strptpin=strptpin+"("+reamrk.split("@@")[0]+")" if old: put(url, json=body, headers=headers) url = 'http://127.0.0.1:5600/api/envs/enable' if isline: body = [body['_id']] else: body = [body['id']] put(url, json=body, headers=headers) printf(f"更新成功:{strptpin}") else: body = [{"value": cookie, "name": "JD_COOKIE"}] post(url, json=body, headers=headers) printf(f"新增成功:{strptpin}") def getRemark(pt_pin,token): reamrk="" if re.search('%', pt_pin): strreturn=unquote(pt_pin, 'utf-8') else: strreturn=pt_pin if token!="": url = 'http://127.0.0.1:5600/api/envs' headers = {'Authorization': f'Bearer {token}'} body = { 'searchValue': pt_pin, 'Authorization': f'Bearer {token}' } datas = get(url, params=body, headers=headers).json()['data'] for data in datas: if "pt_key" in data['value']: try: reamrk=data['remarks'] break except: pass if not reamrk is None and reamrk!="": strreturn=strreturn+"("+reamrk.split("@@")[0]+")" return strreturn def main(): printf("版本: 20230602") printf("说明: 如果用Wxpusher通知需配置WP_APP_TOKEN_ONE和WP_APP_MAIN_UID,其中WP_APP_MAIN_UID是你的Wxpusher UID") printf("====================================") config="" iswxpusher=False counttime=0 if os.path.exists("/ql/config/auth.json"): config="/ql/config/auth.json" if os.path.exists("/ql/data/config/auth.json"): config="/ql/data/config/auth.json" if config=="": printf(f"无法判断使用环境,退出脚本!") return try: if os.environ.get("WP_APP_TOKEN_ONE")==None or os.environ.get("WP_APP_MAIN_UID")==None: printf('没有配置Wxpusher相关变量,将调用sendNotify.py发送通知') else: if os.environ.get("WP_APP_TOKEN_ONE")=="" or os.environ.get("WP_APP_MAIN_UID")=="": printf('没有配置Wxpusher相关变量,将调用sendNotify.py发送通知') else: printf('检测到已配置Wxpusher相关变量,将使用Wxpusher发送通知') iswxpusher=True except: iswxpusher=False resurt="" resurt1="" resurt2="" summary="" with open(config, "r", encoding="utf-8") as f1: token = json.load(f1)['token'] url = 'http://127.0.0.1:5600/api/envs' headers = {'Authorization': f'Bearer {token}'} body = { 'searchValue': 'JD_WSCK', 'Authorization': f'Bearer {token}' } datas = get(url, params=body, headers=headers).json()['data'] if len(datas)>0: printf("\n===============开始转换JD_WSCK==============") else: printf("\n错误:没有需要转换的JD_WSCK,退出脚本!") return for data in datas: randomuserAgent() if data['status']!=0: continue key = data['value'] pin = re.findall(r'(pin=([^; ]+)(?=;?))',key)[0][1] cookie = getcookie_wskey(key) if "app_open" in cookie: #printf("转换成功:"cookie) orgpin = cookie.split(";")[1].split("=")[1] subcookie(orgpin, cookie, token) newpin=getRemark(orgpin,token) resurt1=resurt1+f"转换成功:{newpin}\n" else: newpin=getRemark(pin,token) if "fake_" in cookie: message = f"pin为{newpin}的wskey过期了!" printf(message) url = 'http://127.0.0.1:5600/api/envs/disable' try: body = [data['_id']] except: body = [data['id']] put(url, json=body, headers=headers) printf(f"禁用成功:{newpin}") resurt2=resurt2+f"wskey已禁用:{newpin}\n" else: message = f"转换失败:{newpin}" resurt2=resurt2+f"转换失败:{newpin}\n" if resurt2!="": resurt="👇👇👇👇👇转换异常👇👇👇👇👇\n"+resurt2+"\n" summary="部分CK转换异常" if resurt1!="": resurt=resurt+"👇👇👇👇👇转换成功👇👇👇👇👇\n"+resurt1 if summary=="": summary="全部转换成功" if iswxpusher: send_notification("JD_WSCK转换结果",resurt,summary) else: if hadsend: send("JD_WSCK转换结果",resurt) else: printf("没有启用通知!") else: if resurt1!="": resurt=resurt+"👇👇👇👇👇转换成功👇👇👇👇👇\n"+resurt1 printf("\n\n===============转换结果==============\n") printf(resurt) if __name__ == '__main__': main()