import bs4 import time import requests import ipaddress BASE_URL = "https://api.agicto.cn/v1" API_KEYS = [] REVOKE_KEYS = [] class TempMail: base_api = "https://www.1secmail.com/api/v1/" def __init__(self): self.mail: str = self.random_mail() def __str__(self): return self.mail def random_mail(self): return requests.get( self.base_api, params={"action": "genRandomMailbox"} ).json()[0] def check_inbox(self): mail, domain = self.mail.split("@") mail = {"login": mail, "domain": domain} inbox = requests.get( self.base_api, params={"action": "getMessages", **mail}, ).json() if inbox: inbox = requests.get( self.base_api, params={"action": "readMessage", "id": inbox[0]["id"], **mail}, ) return inbox.json()["body"] def generate_api_key() -> str: mail = TempMail() sess = requests.Session() sess.headers.update( { "user-agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/89.0.142.86 Safari/537.36" ), "x-forwarded-for": str( ipaddress.IPv4Address(random.randint(0, (1 << 32) - 1)) ), } ) sess.post( BASE_URL + "/sendVerifyCode", json={"email": str(mail), "channel": ""}, ) step = 0 while step < 20: inbox = mail.check_inbox() if inbox: break time.sleep(1) step += 1 if not inbox: raise Exception("Failed to get inbox") soup = bs4.BeautifulSoup(inbox, "html.parser") code = soup.find("div", {"class": "code-box"}).text token = sess.post( BASE_URL + "/loginByCode", json={ "email": str(mail), "verify_code": code, "invite_code": "", }, ).json()["data"]["access_token"] api_key = sess.post( BASE_URL + "/service/keyList", headers={ "authorization": f"Bearer {token}", }, ).json()["data"]["recordList"][0]["openKey"] return api_key import threading def increase_api_keys(max=6, per_thread=3): if len(API_KEYS) < max: threads = [ threading.Thread(target=lambda: API_KEYS.append(generate_api_key())) for _ in range(per_thread) ] [t.start() for t in threads] import re import random import json from werkzeug import exceptions # non-stream def chat_completion(**kwargs): sess = requests.Session() for i in range(max_retries := 3): try: api_key = random.choice(API_KEYS) sess.headers.update( { "authorization": "Bearer " + api_key, "x-forwarded-for": str( ipaddress.IPv4Address(random.randint(0, (1 << 32) - 1)) ), }, ) r = sess.post(BASE_URL + "/chat/completions", json=kwargs, timeout=30) r.raise_for_status() if r.json().get("error"): raise requests.exceptions.HTTPError(response=r) break except IndexError: continue except requests.exceptions.Timeout: continue except requests.exceptions.HTTPError as e: try: data = e.response.json() if data.get("error") or data.get("code") == 1: msg = data.get("error", {}).get("message") or data.get("message") if re.search(r"agicto", msg, re.IGNORECASE): if api_key in API_KEYS: API_KEYS.remove(api_key) REVOKE_KEYS.append(api_key) continue raise exceptions.BadRequest(msg) except json.JSONDecodeError: pass if i == max_retries - 1: raise exceptions.ServiceUnavailable try: content: str = r.json()["choices"][0]["message"]["content"] return bytes(content, "utf-8").decode("unicode_escape") except: raise exceptions.ServiceUnavailable # stream def chat_completion_stream(**kwargs): kwargs["stream"] = True sess = requests.Session() for i in range(max_retries := 3): try: api_key = random.choice(API_KEYS) sess.headers.update( { "authorization": "Bearer " + api_key, "x-forwarded-for": str( ipaddress.IPv4Address(random.randint(0, (1 << 32) - 1)) ), }, ) r = sess.post( BASE_URL + "/chat/completions", json=kwargs, stream=True, timeout=30 ) r.raise_for_status() if "text/event-stream" not in r.headers.get("Content-Type"): raise requests.exceptions.HTTPError(response=r) break except IndexError: continue except requests.exceptions.Timeout: continue except requests.exceptions.HTTPError as e: try: data = e.response.json() if data.get("error") or data.get("code") == 1: msg = data.get("error", {}).get("message") or data.get("message") if re.search(r"agicto", msg, re.IGNORECASE): if api_key in API_KEYS: API_KEYS.remove(api_key) REVOKE_KEYS.append(api_key) continue raise exceptions.BadRequest(msg) except json.JSONDecodeError: pass if i == max_retries - 1: raise exceptions.ServiceUnavailable init = True for line in r.iter_lines(): line: bytes if line: try: data: dict = json.loads(line.decode().removeprefix("data: ")) except json.JSONDecodeError: continue if choices := data.get("choices"): delta: dict = choices[0]["delta"] if delta := delta.get("content"): if init: init = False yield "" yield delta if init: raise exceptions.ServiceUnavailable if __name__ == "__main__": print(generate_api_key()) exit(0) increase_api_keys(per_thread=1) while True: if len(API_KEYS) == 0: print("waiting for api key...") time.sleep(1) else: print(f"{len(API_KEYS)} api keys available") break data = { "model": "claude-3-opus-20240229", "messages": [ {"role": "user", "content": "who are you?"}, ], } print("generating...") for i in chat_completion_stream(**data): print(i, end="")