ChatGPT Browser Automation
In [ ]:
%%bash
cd ~ && git clone https://gitlab.com/awesomeai/awesome-chatgpt-zh.git
In [ ]:
%%writefile ~/go-chatgpt-api.yaml
services:
go-chatgpt-api:
container_name: go-chatgpt-api
image: linweiyuan/go-chatgpt-api
ports:
- 8080:8080
environment:
- GIN_MODE=release
- GO_CHATGPT_API_PROXY=socks5://chatgpt-proxy-server-warp:65535
depends_on:
- chatgpt-proxy-server-warp
restart: unless-stopped
chatgpt-proxy-server-warp:
container_name: chatgpt-proxy-server-warp
image: linweiyuan/chatgpt-proxy-server-warp
environment:
- LOG_LEVEL=OFF
restart: unless-stopped
In [ ]:
%%bash
docker-compose -f ~/go-chatgpt-api.yaml pull
In [ ]:
%%bash
docker-compose -f ~/go-chatgpt-api.yaml up -d
In [ ]:
%%bash
docker-compose -f ~/go-chatgpt-api.yaml stop
In [ ]:
%%bash
docker-compose -f ~/go-chatgpt-api.yaml start
In [ ]:
%%bash
docker images
In [ ]:
%%bash
docker ps
In [ ]:
%%bash
docker logs -f go-chatgpt-api
💡 get access_token => accessToken
%%bash
curl -H 'Authorization: <access_token>' http://127.0.0.1:8080/conversations\?offset=0\&limit=1
{"items":[{"id":"...","title":"Jupyter User Requests Help.","create_time":"2023-04-17T10:22:31.256627+00:00","update_time":"2023-04-17T10:22:33+00:00"}],"total":11,"limit":1,"offset":0,"has_missing_conversations":false}
In [ ]:
class Common:
chat_gpt_model = 'gpt-3.5-turbo'
role_user = 'user'
role_assistant = 'assistant'
question_answer_map = {}
message_channel = []
exit_for_loop_channel = []
response_text_channel = []
conversation_done_channel = []
parent_message_id = ''
conversation_id = ''
reload_conversations_channel = []
current_node = None
In [ ]:
import json, os, requests, uuid
chat_gpt_base_url = 'http://127.0.0.1:8080'
# open the JSON file and read the access_token
with open(os.path.expanduser('~/.config/revChatGPT/config.json'), 'r') as f:
access_token = json.load(f).get('access_token', None)
common = Common()
def get_conversations():
response = requests.get(f'{chat_gpt_base_url}/conversations?offset=0&limit=100', headers = {'Authorization': access_token})
return response.json()
def get_conversation(conversation_id):
response = requests.get(f'{chat_gpt_base_url}/conversation/{conversation_id}', headers = {'Authorization': access_token})
conversation = response.json()
current_node = conversation['current_node']
common.parent_message_id = current_node
handle_conversation_detail(current_node, conversation['mapping'])
common.exit_for_loop_channel.append(True)
def handle_conversation_detail(current_node, mapping):
conversation_detail = mapping[current_node]
parent_id = conversation_detail.get('parent', '')
if parent_id != '':
common.question_answer_map[parent_id] = conversation_detail['message']['content']['parts'][0].strip()
handle_conversation_detail(parent_id, mapping)
if 'message' not in conversation_detail:
return
message = conversation_detail['message']
parts = message['content']['parts']
if len(parts) > 0 and parts[0] != '':
if message['author']['role'] == common.role_user:
common.message_channel.append(message)
temp_conversation_id = ''
def start_conversation(content):
parent_message_id = common.parent_message_id
if parent_message_id == '' or common.conversation_id == '':
common.conversation_id = ''
parent_message_id = str(uuid.uuid4())
response = requests.post(
f'{chat_gpt_base_url}/conversation',
headers = {
'Authorization': access_token,
'Content-Type': 'application/json',
'Accept': 'text/event-stream'
},
data = json.dumps({
'action': 'next',
'messages': [{
'id': uuid.uuid4().hex,
'author': {
'role': common.role_user
},
'role': common.role_user,
'content': {
'content_type': 'text',
'parts': [content]
}
}],
'parent_message_id': parent_message_id,
'model': common.chat_gpt_model,
'conversation_id': common.conversation_id,
'continue_text': 'continue'
}),
stream=True
)
# get it again from response
common.parent_message_id = ''
for line in response.iter_lines():
if not line.startswith(b'data: '):
continue
if line.endswith(b'[DONE]'):
common.conversation_done_channel.append(True)
break
make_conversation_response = json.loads(line.decode('utf-8')[len('data: '):])
if common.parent_message_id == '':
common.parent_message_id = make_conversation_response['message']['id']
global temp_conversation_id
if common.conversation_id == '' and temp_conversation_id == '':
temp_conversation_id = make_conversation_response['conversation_id']
if make_conversation_response is not None:
parts = make_conversation_response['message']['content']['parts']
if len(parts) > 0:
common.response_text_channel.append(parts[0])
yield parts[0]
if make_conversation_response['message']['end_turn'] == True:
common.conversation_done_channel.append(True)
break
if common.conversation_id == '':
generate_title(temp_conversation_id)
else:
common.reload_conversations_channel.append(True)
def generate_title(conversation_id):
requests.post(
f'{chat_gpt_base_url}/conversation/gen_title/{conversation_id}',
headers = {
'Authorization': access_token,
'Content-Type': 'application/json'
},
data = json.dumps({
'message_id': common.parent_message_id,
'model': common.chat_gpt_model
})
)
def rename_title(conversation_id, title):
requests.patch(
f'{chat_gpt_base_url}/conversation/{conversation_id}',
headers={
'Authorization': access_token,
'Content-Type': 'application/json'
},
data = json.dumps({
'title': title
})
)
def delete_conversation(conversation_id):
requests.patch(
f'{chat_gpt_base_url}/conversation/{conversation_id}',
headers={
'Authorization': access_token,
'Content-Type': 'application/json'
},
data=json.dumps({
'is_visible': False
})
)
def recover_conversation(conversation_id):
requests.patch(
f'{chat_gpt_base_url}/conversation/{conversation_id}',
headers={
'Authorization': access_token,
'Content-Type': 'application/json'
},
data=json.dumps({
'is_visible': True
})
)
def clear_conversations():
requests.patch(f'{chat_gpt_base_url}/conversations', headers = {'Authorization': access_token}, data = {'is_visible': False})
common.conversation_id = ''
common.current_node = None
common.reload_conversations_channel.append(True)
In [ ]:
get_conversations()
In [ ]:
# open the JSON file and read the conversation_id
with open(os.path.expanduser('~/.config/revChatGPT/config.json'), 'r') as f:
conversation_id = json.load(f).get('conversation_id', None)
In [ ]:
try:
common.conversation_id = conversation_id
get_conversation(common.conversation_id)
except RecursionError as errr:
print('Error Recursion:', errr)
In [ ]:
generate_title(common.conversation_id)
In [ ]:
import IPython
In [ ]:
for response in start_conversation('小 G ,我们又见面了。'):
IPython.display.display(IPython.core.display.Markdown(response))
IPython.display.clear_output(wait=True)
In [ ]:
%%bash
pip install --upgrade revChatGPT
In [ ]:
%%bash
mkdir -p ~/.config/revChatGPT
💡 get <access_token> => accessToken
In [ ]:
%%writefile ~/.config/revChatGPT/config.json
{
"access_token": "<access_token>",
"conversation_id": "<conversation_id>"
}
In [ ]:
import json, os
In [ ]:
from revChatGPT.V1 import Chatbot, configure
In [ ]:
# open the JSON file and read the conversation_id
with open(os.path.expanduser('~/.config/revChatGPT/config.json'), 'r') as f:
conversation_id = json.load(f).get('conversation_id', None)
In [ ]:
import IPython
In [ ]:
bot = Chatbot(
config = configure(),
conversation_id = conversation_id,
lazy_loading = True,
base_url = 'http://127.0.0.1:8080/'
)
def ask(prompt):
for response in bot.ask(prompt):
IPython.display.display(IPython.core.display.Markdown(response['message']))
IPython.display.clear_output(wait=True)
In [ ]:
ask('''
小 G ,我们又见面了。
''')
In [ ]:
def gpt_after(parent_id=None):
return Chatbot(
config = configure(),
conversation_id = conversation_id,
parent_id = parent_id
)
In [ ]:
for response in gpt_after().ask(
'女孩子月经期间该怎么护理?'
):
IPython.display.display(IPython.core.display.Markdown(response['message']))
IPython.display.clear_output(wait=True)
In [ ]:
for response in gpt_after(response['parent_id']).ask(
'女孩子月经期间该怎么陪伴?'
):
IPython.display.display(IPython.core.display.Markdown(response['message']))
IPython.display.clear_output(wait=True)
In [ ]:
print(f"parent_id: {response['parent_id']}")
In [ ]:
%%bash
npm install -g playwright
In [ ]:
%%bash
npx playwright install webkit
In [ ]:
%%bash
pip install playwright
In [ ]:
%%bash
npx playwright codegen --browser webkit twitter.com
selenium safari
In [ ]:
from selenium import webdriver
driver = webdriver.Safari()
selenium undetected chrome
In [ ]:
import undetected_chromedriver as uc
options = uc.ChromeOptions()
# options.add_argument('--window-size=1024,768')
options.add_argument('--headless')
driver = uc.Chrome(options = options)
driver.get('https://chat.openai.com/auth/login')
In [ ]:
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.ui import WebDriverWait
In [ ]:
WebDriverWait(driver, 5).until(
expected_conditions.presence_of_element_located((By.XPATH, '//*[text()="Log in"]'))
)
driver.execute_script('''
document.evaluate(
'//*[text()="Log in"]',
document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null
).snapshotItem(0).dispatchEvent(
new MouseEvent('click', {
view: window,
bubbles: true,
cancelable: true
})
);
''')
In [ ]:
WebDriverWait(driver, 5).until(
expected_conditions.presence_of_element_located((By.XPATH, '//button[@data-provider="google"]'))
)
driver.execute_script('''
document.evaluate(
'//button[@data-provider="google"]',
document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null
).snapshotItem(0).dispatchEvent(
new MouseEvent('click', {
view: window,
bubbles: true,
cancelable: true
})
);
''')
In [ ]:
import json, os
with open(os.path.expanduser('~/.config/pyChatGPT/secret_settings.json'), 'r') as f:
secret_settings = json.load(f)
In [ ]:
WebDriverWait(driver, 5).until(
expected_conditions.presence_of_element_located((By.XPATH, '//input[@type="email"]'))
)
driver.execute_script(f'''
const google_email_input = document.evaluate('//input[@type="email"]', document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null).snapshotItem(0);
google_email_input.value = '{secret_settings['email']}';
google_email_input.dispatchEvent(
new Event('input', {{
view: window,
bubbles: true,
cancelable: true
}})
);
''')
In [ ]:
WebDriverWait(driver, 5).until(
expected_conditions.presence_of_element_located((By.XPATH, '//*[@id="identifierNext"]'))
)
driver.execute_script('''
document.evaluate(
'//*[@id="identifierNext"]',
document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null
).snapshotItem(0).dispatchEvent(
new MouseEvent('click', {
view: window,
bubbles: true,
cancelable: true
})
);
''')
In [ ]:
WebDriverWait(driver, 5).until(
expected_conditions.element_to_be_clickable((By.XPATH, '//input[@type="password"]'))
).click()
from selenium.webdriver.common.action_chains import ActionChains
ActionChains(driver).send_keys(secret_settings['password']).send_keys(Keys.ENTER).perform()
access token
In [ ]:
driver.execute_async_script('''
var callback = arguments[arguments.length - 1];
var xhr = new XMLHttpRequest();
xhr.open('GET', 'https://chat.openai.com/api/auth/session', true);
xhr.setRequestHeader('Content-Type', 'application/json');
xhr.onload = function() {
if (xhr.readyState === xhr.DONE && xhr.status === 200) {
var data = JSON.parse(xhr.responseText);
callback(data.accessToken);
}
};
xhr.send();
''')
In [ ]:
driver.get('https://chat.openai.com/api/auth/session')
In [ ]:
import json
access_token = json.loads(driver.find_element(By.TAG_NAME, 'pre').text)['accessToken']
In [ ]:
driver.execute_async_script(f'''
var callback = arguments[arguments.length - 1];
var xhr = new XMLHttpRequest();
xhr.open("GET", "https://chat.openai.com/backend-api/conversation/{secret_settings['conversation_id']}", true);
xhr.setRequestHeader('Content-Type', 'application/json');
xhr.setRequestHeader('Authorization', 'Bearer {access_token}');
xhr.onload = function() {{
if (xhr.readyState === xhr.DONE && xhr.status === 200) {{
var data = JSON.parse(xhr.responseText);
console.log(data);
callback(data);
}}
}};
xhr.send();
''')
chat page
In [ ]:
driver.get(f'https://chat.openai.com/c/{secret_settings["conversation_id"]}')
In [ ]:
WebDriverWait(driver, 5).until(
expected_conditions.presence_of_element_located((By.ID, 'headlessui-portal-root'))
)
driver.execute_script('''
document.getElementById('headlessui-portal-root').remove();
''')
In [ ]:
chatgpt_textbox = (By.TAG_NAME, 'textarea')
chatgpt_streaming = (By.CLASS_NAME, 'result-streaming')
chatgpt_big_response = (By.XPATH, '//div[@class="flex-1 overflow-hidden"]//div[p]')
chatgpt_small_response = (By.XPATH, '//div[starts-with(@class, "markdown prose w-full break-words")]')
def request(prompt: str) -> None:
textbox = WebDriverWait(driver, 5).until(
expected_conditions.element_to_be_clickable(chatgpt_textbox)
)
textbox.click()
driver.execute_script('''
var element = arguments[0], txt = arguments[1];
element.value += txt;
element.dispatchEvent(new Event("change"));
''',
textbox,
prompt,
)
textbox.send_keys(Keys.ENTER)
def get_last_response():
responses = driver.find_elements(*chatgpt_big_response)
if responses:
response = responses[-1]
if 'text-red' in response.get_attribute('class'):
raise ValueError(response.text)
return response
return driver.find_elements(*chatgpt_small_response)[-1]
def get_response() -> str:
result_streaming = WebDriverWait(driver, 3).until(
expected_conditions.presence_of_element_located(chatgpt_streaming)
)
while result_streaming:
response = get_last_response()
response = response.get_attribute('innerHTML')
yield response
result_streaming = driver.find_elements(*chatgpt_streaming)
response = get_last_response()
yield response.get_attribute('innerHTML')
def ask(prompt: str) -> str:
request(prompt)
return get_response()
In [ ]:
import io
from IPython.display import display
from PIL import Image
display(Image.open(io.BytesIO(driver.get_screenshot_as_png())))
In [ ]:
import IPython
In [ ]:
for response in ask('我累了。带着我祷告吧。'):
IPython.display.display(IPython.core.display.Markdown(response))
IPython.display.clear_output(wait = True)
In [ ]:
%%bash
pip install --upgrade ipymock
In [ ]:
%%bash
pip install --upgrade undetected_chromedriver==3.* selenium_profiles==2.*
In [ ]:
%%bash
sudo apt upgrade --yes chromium-driver
In [ ]:
%%bash
apt show chromium-driver
In [ ]:
import ipymock.browser
ipymock.browser.init()
In [ ]:
import IPython
In [ ]:
for response in ipymock.browser.ask('我累了。带着我祷告吧。'):
IPython.display.display(IPython.core.display.Markdown(response))
IPython.display.clear_output(wait = True)
In [ ]:
%%bash
# install dependencies on headless linux server
apt install chromium-browser xvfb
In [ ]:
%%bash
pip install --upgrade selenium_profiles pyChatGPT
In [ ]:
from selenium.webdriver.support import expected_conditions as EC
from selenium.common import exceptions as SeleniumExceptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
import time
google_oauth_btn = (By.XPATH, '//button[@data-provider="google"]')
microsoft_oauth_btn = (By.XPATH, '//button[@data-provider="windowslive"]')
google_email_input = (By.XPATH, '//input[@type="email"]')
google_next_btn = (By.XPATH, '//*[@id="identifierNext"]')
google_try_again_btn = (By.XPATH, '//*[@id="next"]/div/button')
google_pwd_input = (By.XPATH, '//input[@type="password"]')
google_pwd_next_btn = (By.XPATH, '//*[@id="passwordNext"]')
google_code_samp = (By.TAG_NAME, 'samp')
microsoft_email_input = (By.XPATH, '//input[@type="email"]')
microsoft_pwd_input = (By.XPATH, '//input[@type="password"]')
microsoft_next_btn = (By.XPATH, '//input[@type="submit"]')
openai_email_input = (By.XPATH, '//input[@name="username"]')
openai_pwd_input = (By.XPATH, '//input[@type="password"]')
openai_continue_btn = (By.XPATH, '//button[text()="Continue"]')
openai_captcha_input = (By.XPATH, '//input[@name="captcha"]')
openai_captcha_sitekey = (
By.XPATH, '//div[@data-recaptcha-provider="recaptcha_enterprise"]',
)
def login(self) -> None:
'''
Login to ChatGPT
'''
if self._ChatGPT__auth_type == 'google':
__google_login(self)
elif self._ChatGPT__auth_type == 'microsoft':
__microsoft_login(self)
elif self._ChatGPT__auth_type == 'openai':
__openai_login(self)
def __google_login(self) -> None:
'''
Login to ChatGPT using Google
'''
self.logger.debug('Clicking Google button...')
# import pdb; pdb.set_trace()
# ActionChains(self.driver).key_down(Keys.ALT).key_down(Keys.COMMAND).key_down('i').key_up('i').key_up(Keys.COMMAND).key_up(Keys.ALT).pause(1).perform()
# ActionChains(self.driver).move_to_element_with_offset(self.driver.find_element(*google_oauth_btn), 2, -2).pause(1).perform()
# ActionChains(self.driver).click(self.driver.find_element(*google_oauth_btn)).pause(1).perform()
# ActionChains(self.driver).send_keys(Keys.TAB).pause(1).send_keys(Keys.TAB).pause(1).send_keys(Keys.TAB).pause(1).send_keys(Keys.TAB).pause(1).send_keys(Keys.ENTER).perform()
# self.driver.find_element(*google_oauth_btn).click()
self.driver.execute_script('''
document.evaluate(
'//button[@data-provider="google"]',
document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null
).snapshotItem(0).dispatchEvent(
new MouseEvent('click', {
view: window,
bubbles: true,
cancelable: true
})
);
''')
import time
time.sleep(1)
# google_email_entry = (By.XPATH, f'//div[@data-identifier="{self._ChatGPT__email}"]')
# try:
# self.logger.debug('Checking if Google remembers email...')
# WebDriverWait(self.driver, 3).until(
# EC.element_to_be_clickable(google_email_entry)
# ).click()
# self.logger.debug('Google remembers email')
# except SeleniumExceptions.TimeoutException:
self.logger.debug('Google does not remember email')
# self.driver.find_element(by=By.TAG_NAME, value='body').click()
# ActionChains(self.driver).move_to_element(self.driver.find_element(by=By.XPATH, value='//*[@id="headingText"]/span')).pause(1).perform()
# ActionChains(self.driver).click(self.driver.find_element(by=By.XPATH, value='//*[@id="headingText"]/span')).pause(1).perform()
self.logger.debug('Entering email...')
# import pdb; pdb.set_trace()
# WebDriverWait(self.driver, 3).until(
# EC.element_to_be_clickable(google_email_input)
# ).click()
# self.driver.find_element(*google_email_input).send_keys(self._ChatGPT__email)
# self.driver.find_element(*google_email_input).send_keys(Keys.ENTER)
# actions = ActionChains(self.driver)
# for l in self._ChatGPT__email:
# actions.key_down(l).key_up(l).pause(1)
# actions.send_keys(Keys.ENTER).perform()
# ActionChains(self.driver).move_to_element_with_offset(self.driver.find_element(*google_email_input), 2, -2).pause(1).send_keys(self._ChatGPT__email).send_keys(Keys.ENTER).perform()
self.driver.execute_script(f'''
const google_email_input = document.evaluate('//input[@type="email"]', document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null).snapshotItem(0);
google_email_input.value = '{self._ChatGPT__email}';
google_email_input.dispatchEvent(
new Event('input', {{
view: window,
bubbles: true,
cancelable: true
}})
);
''')
import time
time.sleep(1)
self.logger.debug('Clicking Next...')
# import pdb; pdb.set_trace()
# ActionChains(self.driver).move_to_element(self.driver.find_element(*google_next_btn)).pause(1).perform()
# self.driver.find_element(*google_next_btn).send_keys(Keys.ENTER)
# WebDriverWait(self.driver, 1).until(
# EC.element_to_be_clickable(google_next_btn)
# ).click()
self.driver.execute_script('''
document.evaluate(
'//*[@id="identifierNext"]',
document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null
).snapshotItem(0).dispatchEvent(
new MouseEvent('click', {
view: window,
bubbles: true,
cancelable: true
})
);
''')
import time
time.sleep(1)
while True:
try:
self.logger.debug('Clicking Try Again...')
import pdb; pdb.set_trace()
WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable(google_try_again_btn)
).click()
except SeleniumExceptions.TimeoutException:
self.logger.debug('Try Again button not found')
break
self.logger.debug('Entering email...')
WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable(google_email_input)
).click()
self.driver.find_element(*google_email_input).send_keys(self._ChatGPT__email)
self.logger.debug('Clicking Next...')
import pdb; pdb.set_trace()
WebDriverWait(self.driver, 1).until(
EC.element_to_be_clickable(google_next_btn)
).click()
self.logger.debug('Entering password...')
WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable(google_pwd_input)
).click()
self.driver.find_element(*google_pwd_input).send_keys(self._ChatGPT__password)
self.logger.debug('Clicking Next...')
WebDriverWait(self.driver, 1).until(
EC.element_to_be_clickable(google_pwd_next_btn)
).click()
try:
self.logger.debug('Checking if verification code is required...')
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located(google_code_samp)
)
self.logger.debug('Code is required')
prev_code = self.driver.find_elements(By.TAG_NAME, 'samp')[0].text
print('Verification code:', prev_code)
while True:
code = self.driver.find_elements(*google_code_samp)
if not code:
break
if code[0].text != prev_code:
print('Verification code:', code[0].text)
prev_code = code[0].text
time.sleep(1)
except SeleniumExceptions.TimeoutException:
self.logger.debug('Code is not required')
def __microsoft_login(self) -> None:
self.logger.debug('Clicking Microsoft button...')
self.driver.find_element(*microsoft_oauth_btn).click()
self.logger.debug('Entering email...')
WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable(microsoft_email_input)
).send_keys(self._ChatGPT__email)
self.logger.debug('Clicking Next...')
self.driver.find_element(*microsoft_next_btn).click()
self.logger.debug('Entering password...')
WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable(microsoft_pwd_input)
).send_keys(self._ChatGPT__password)
self.logger.debug('Clicking Next...')
self.driver.find_element(*microsoft_next_btn).click()
self.logger.debug('Clicking allow...')
WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable(microsoft_next_btn)
).click()
def __have_recaptcha_value(self) -> bool:
'''
Check if the recaptcha input has a value
:return: Boolean indicating if the recaptcha input has a value
'''
try:
recaptcha_result = self.driver.find_element(*openai_captcha_input)
return recaptcha_result.get_attribute('value') != ''
except SeleniumExceptions.NoSuchElementException:
return False
def __pypasser_solve(self, retry: int) -> None:
'''
Solve the recaptcha using PyPasser
:param retry: Number of times to retry solving the recaptcha
'''
try:
from pypasser import reCaptchaV2
except ModuleNotFoundError:
raise ModuleNotFoundError(
'Please install ffmpeg_downloader, PyPasser, and pocketsphinx by running `pip install ffmpeg_downloader PyPasser pocketsphinx`'
)
self.logger.debug(f'Trying pypasser solver, max retry = {retry}')
try:
reCaptchaV2(self.driver, False, retry)
except Exception as e:
self.logger.debug(f'pypasser solver error: {str(e)}')
def __twocaptcha_solve(self, retry: int) -> None:
'''
Solve the recaptcha using 2captcha
:param retry: Number of times to retry solving the recaptcha
'''
try:
from twocaptcha import TwoCaptcha
except ModuleNotFoundError:
raise ModuleNotFoundError(
'Please install twocaptcha by running `pip install 2captcha-python`'
)
self.logger.debug(f'Trying 2captcha solver, max retry = {retry}')
solver = TwoCaptcha(self._ChatGPT__solver_apikey, pollingInterval=5)
sitekey = self.driver.find_element(*openai_captcha_sitekey).get_attribute(
'data-recaptcha-sitekey'
)
result = None
for _ in range(retry):
try:
result = solver.recaptcha(
sitekey=sitekey,
url=self.driver.current_url,
invisible=1,
enterprise=1,
)
if result:
captcha_input = self.driver.find_element(*openai_captcha_input)
self.driver.execute_script(
'arguments[0].setAttribute("value", arguments[1])',
captcha_input,
result['code'],
)
break
except Exception as e:
self.logger.debug(f'2captcha solver error: {str(e)}')
def __openai_login(self, retry: int = 3) -> None:
'''
Login to ChatGPT using OpenAI
:param retry: Number of times to retry solving the recaptcha
'''
self.logger.debug('Entering email...')
self.driver.find_element(*openai_email_input).send_keys(self._ChatGPT__email)
self.driver.find_element(*openai_continue_btn).click()
have_recaptcha = False
try:
WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(
(By.CSS_SELECTOR, 'iframe[title="reCAPTCHA"]')
)
)
have_recaptcha = True
self.logger.debug('Captcha detected')
except SeleniumExceptions.TimeoutException:
self.logger.debug('No captcha detected')
try:
WebDriverWait(self.driver, 3).until(
EC.text_to_be_present_in_element_attribute(
openai_captcha_input, 'value', '_'
)
)
except SeleniumExceptions.TimeoutException:
if self._ChatGPT__captcha_solver == 'pypasser':
__pypasser_solve(self, retry)
elif self._ChatGPT__captcha_solver == '2captcha':
__twocaptcha_solve(self, retry)
if have_recaptcha:
if __have_recaptcha_value(self):
self.logger.debug('Congrat! reCAPTCHA is solved')
else:
self.logger.debug('Oops! you need to solve reCAPTCHA manually')
self.driver.get(self.driver.current_url)
while not __have_recaptcha_value(self):
time.sleep(1)
self.logger.debug('Clicking Continue...')
self.driver.find_element(*openai_continue_btn).click()
self.logger.debug('Entering password...')
self.driver.find_element(*openai_pwd_input).send_keys(self._ChatGPT__password)
self.driver.find_element(*openai_continue_btn).click()
In [ ]:
from selenium.webdriver.support import expected_conditions as EC
from selenium.common import exceptions as SeleniumExceptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
import undetected_chromedriver as uc
from markdownify import markdownify
from threading import Thread
import platform
import logging
import weakref
import json
import time
import re
import os
cf_challenge_form = (By.ID, 'challenge-form')
chatgpt_textbox = (By.TAG_NAME, 'textarea')
chatgpt_streaming = (By.CLASS_NAME, 'result-streaming')
chatgpt_big_response = (By.XPATH, '//div[@class="flex-1 overflow-hidden"]//div[p]')
chatgpt_small_response = (
By.XPATH, '//div[starts-with(@class, "markdown prose w-full break-words")]',
)
chatgpt_alert = (By.XPATH, '//div[@role="alert"]')
chatgpt_intro = (By.ID, 'headlessui-portal-root')
chatgpt_login_btn = (By.XPATH, '//*[text()="Log in"]')
chatgpt_login_h1 = (By.XPATH, '//h1[text()="Welcome back"]')
chatgpt_logged_h1 = (By.XPATH, '//h1[text()="ChatGPT"]')
chatgpt_new_chat = (By.LINK_TEXT, 'New chat')
chatgpt_clear_convo = (By.LINK_TEXT, 'Clear conversations')
chatgpt_confirm_clear_convo = (By.LINK_TEXT, 'Confirm clear conversations')
chatgpt_chats_list_first_node = (
By.XPATH, '//div[substring(@class, string-length(@class) - string-length("text-sm") + 1) = "text-sm"]//a',
)
chatgpt_chat_url = 'https://chat.openai.com/c'
class ChatGPT:
'''
An unofficial Python wrapper for OpenAI's ChatGPT API
'''
def __init__(
self,
session_token: str = None,
conversation_id: str = '',
auth_type: str = None,
email: str = None,
password: str = None,
login_cookies_path: str = '',
captcha_solver: str = 'pypasser',
solver_apikey: str = '',
proxy: str = None,
chrome_args: list = [],
moderation: bool = True,
verbose: bool = False,
):
'''
Initialize the ChatGPT object\n
:param session_token: The session token to use for authentication
:param conversation_id: The conversation ID to use for the chat session
:param auth_type: The authentication type to use (`google`, `microsoft`, `openai`)
:param email: The email to use for authentication
:param password: The password to use for authentication
:param login_cookies_path: The path to the cookies file to use for authentication
:param captcha_solver: The captcha solver to use (`pypasser`, `2captcha`)
:param solver_apikey: The apikey of the captcha solver to use (if any)
:param proxy: The proxy to use for the browser (`https://ip:port`)
:param chrome_args: The arguments to pass to the browser
:param moderation: Whether to enable message moderation
:param verbose: Whether to enable verbose logging
'''
self.__init_logger(verbose)
self.__session_token = session_token
self.__conversation_id = conversation_id
self.__auth_type = auth_type
self.__email = email
self.__password = password
self.__login_cookies_path = login_cookies_path
self.__captcha_solver = captcha_solver
self.__solver_apikey = solver_apikey
self.__proxy = proxy
self.__chrome_args = chrome_args
self.__moderation = moderation
if not self.__session_token and (
not self.__email or not self.__password or not self.__auth_type
):
raise ValueError(
'Please provide either a session token or login credentials'
)
if self.__auth_type not in [None, 'google', 'microsoft', 'openai']:
raise ValueError('Invalid authentication type')
if self.__captcha_solver not in [None, 'pypasser', '2captcha']:
raise ValueError('Invalid captcha solver')
if self.__captcha_solver == '2captcha' and not self.__solver_apikey:
raise ValueError('Please provide a 2captcha apikey')
if self.__proxy and not re.findall(
r'(https?|socks(4|5)?):\/\/.+:\d{1,5}', self.__proxy
):
raise ValueError('Invalid proxy format')
if self.__auth_type == 'openai' and self.__captcha_solver == 'pypasser':
try:
import ffmpeg_downloader as ffdl
except ModuleNotFoundError:
raise ValueError(
'Please install ffmpeg_downloader, PyPasser, and pocketsphinx by running `pip install ffmpeg_downloader PyPasser pocketsphinx`'
)
ffmpeg_installed = bool(ffdl.ffmpeg_version)
self.logger.debug(f'ffmpeg installed: {ffmpeg_installed}')
if not ffmpeg_installed:
import subprocess
subprocess.run(['ffdl', 'install'])
os.environ['PATH'] += os.pathsep + ffdl.ffmpeg_dir
self.__init_browser()
weakref.finalize(self, self.__del__)
def __del__(self):
'''
Close the browser and display
'''
self.__is_active = False
if hasattr(self, 'driver'):
self.logger.debug('Closing browser...')
self.driver.quit()
if hasattr(self, 'display'):
self.logger.debug('Closing display...')
self.display.stop()
def __init_logger(self, verbose: bool) -> None:
'''
Initialize the logger\n
:param verbose: Whether to enable verbose logging
'''
self.logger = logging.getLogger('pyChatGPT')
self.logger.setLevel(logging.DEBUG)
if verbose:
formatter = logging.Formatter('[%(funcName)s] %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
self.logger.addHandler(stream_handler)
def __init_browser(self) -> None:
'''
Initialize the browser
'''
if platform.system() == 'Linux' and 'DISPLAY' not in os.environ:
self.logger.debug('Starting virtual display...')
try:
from pyvirtualdisplay import Display
self.display = Display()
except ModuleNotFoundError:
raise ValueError(
'Please install PyVirtualDisplay to start a virtual display by running `pip install PyVirtualDisplay`'
)
except FileNotFoundError as e:
if 'No such file or directory: \'Xvfb\'' in str(e):
raise ValueError(
'Please install Xvfb to start a virtual display by running `sudo apt install xvfb`'
)
raise e
self.display.start()
self.logger.debug('Initializing browser...')
options = uc.ChromeOptions()
options.add_argument('--window-size=1024,768')
if self.__proxy:
options.add_argument(f'--proxy-server={self.__proxy}')
for arg in self.__chrome_args:
options.add_argument(arg)
try:
self.driver = uc.Chrome(options=options)
except TypeError as e:
if str(e) == 'expected str, bytes or os.PathLike object, not NoneType':
raise ValueError('Chrome installation not found')
raise e
if self.__login_cookies_path and os.path.exists(self.__login_cookies_path):
self.logger.debug('Restoring cookies...')
try:
with open(self.__login_cookies_path, 'r', encoding='utf-8') as f:
cookies = json.load(f)
for cookie in cookies:
if cookie['name'] == '__Secure-next-auth.session-token':
self.__session_token = cookie['value']
except json.decoder.JSONDecodeError:
self.logger.debug(f'Invalid cookies file: {self.__login_cookies_path}')
if self.__session_token:
self.logger.debug('Restoring session_token...')
self.driver.execute_cdp_cmd(
'Network.setCookie',
{
'domain': 'chat.openai.com',
'path': '/',
'name': '__Secure-next-auth.session-token',
'value': self.__session_token,
'httpOnly': True,
'secure': True,
},
)
if not self.__moderation:
self.logger.debug('Blocking moderation...')
self.driver.execute_cdp_cmd(
'Network.setBlockedURLs',
{'urls': ['https://chat.openai.com/backend-api/moderations']},
)
self.logger.debug('Ensuring Cloudflare cookies...')
self.__ensure_cf()
self.logger.debug('Opening chat page...')
self.driver.get(f'{chatgpt_chat_url}/{self.__conversation_id}')
self.__check_blocking_elements()
self.__is_active = True
Thread(target=self.__keep_alive, daemon=True).start()
def __ensure_cf(self, retry: int = 3) -> None:
'''
Ensure Cloudflare cookies are set\n
:param retry: Number of retries
'''
self.logger.debug('Opening new tab...')
original_window = self.driver.current_window_handle
self.driver.switch_to.new_window('tab')
self.logger.debug('Getting Cloudflare challenge...')
self.driver.get('https://chat.openai.com/api/auth/session')
try:
WebDriverWait(self.driver, 10).until_not(
EC.presence_of_element_located(cf_challenge_form)
)
except SeleniumExceptions.TimeoutException:
self.logger.debug(f'Cloudflare challenge failed, retrying {retry}...')
self.driver.save_screenshot(f'cf_failed_{retry}.png')
if retry > 0:
self.logger.debug('Closing tab...')
self.driver.close()
self.driver.switch_to.window(original_window)
return self.__ensure_cf(retry - 1)
raise ValueError('Cloudflare challenge failed')
self.logger.debug('Cloudflare challenge passed')
self.logger.debug('Validating authorization...')
response = self.driver.page_source
if response[0] != '{':
response = self.driver.find_element(By.TAG_NAME, 'pre').text
response = json.loads(response)
if (not response) or (
'error' in response and response['error'] == 'RefreshAccessTokenError'
):
self.logger.debug('Authorization is invalid')
if not self.__auth_type:
raise ValueError('Invalid session token')
self.__login()
self.logger.debug('Authorization is valid')
self.logger.debug('Closing tab...')
self.driver.close()
self.driver.switch_to.window(original_window)
def __check_capacity(self, target_url: str):
'''
Check if ChatGPT is at capacity\n
:param target_url: URL to retry if ChatGPT is at capacity
'''
while True:
try:
self.logger.debug('Checking if ChatGPT is at capacity...')
WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(
(By.XPATH, '//div[text()="ChatGPT is at capacity right now"]')
)
)
self.logger.debug('ChatGPT is at capacity, retrying...')
self.driver.get(target_url)
except SeleniumExceptions.TimeoutException:
self.logger.debug('ChatGPT is not at capacity')
break
def __login(self) -> None:
'''
Login to ChatGPT
'''
self.logger.debug('Opening new tab...')
original_window = self.driver.current_window_handle
self.driver.switch_to.new_window('tab')
self.logger.debug('Opening login page...')
self.driver.get('https://chat.openai.com/auth/login')
self.__check_capacity('https://chat.openai.com/auth/login')
self.logger.debug('Clicking login button...')
WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable(chatgpt_login_btn)
).click()
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located(chatgpt_login_h1)
)
login(self)
self.logger.debug('Checking if login was successful')
try:
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located(chatgpt_logged_h1)
)
if self.__login_cookies_path:
self.logger.debug('Saving cookies...')
with open(self.__login_cookies_path, 'w', encoding='utf-8') as f:
json.dump(
[
i
for i in self.driver.get_cookies()
if i['name'] == '__Secure-next-auth.session-token'
],
f,
indent = 4,
)
except SeleniumExceptions.TimeoutException as e:
self.driver.save_screenshot('login_failed.png')
raise e
self.logger.debug('Closing tab...')
self.driver.close()
self.driver.switch_to.window(original_window)
def __keep_alive(self) -> None:
'''
Keep the session alive by updating the local storage\n
Credit to Rawa#8132 in the ChatGPT Hacking Discord server
'''
while self.__is_active:
self.logger.debug('Updating session...')
payload = (
'{"event":"session","data":{"trigger":"getSession"},"timestamp":%d}'
% int(time.time())
)
try:
self.driver.execute_script(
'window.localStorage.setItem("nextauth.message", arguments[0])',
payload,
)
except Exception as e:
self.logger.debug(f'Failed to update session: {str(e)}')
time.sleep(60)
def __check_blocking_elements(self) -> None:
'''
Check for blocking elements and dismiss them
'''
self.logger.debug('Looking for blocking elements...')
try:
intro = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(chatgpt_intro)
)
self.logger.debug('Dismissing intro...')
self.driver.execute_script('arguments[0].remove()', intro)
except SeleniumExceptions.TimeoutException:
pass
alerts = self.driver.find_elements(*chatgpt_alert)
if alerts:
self.logger.debug('Dismissing alert...')
self.driver.execute_script('arguments[0].remove()', alerts[0])
def __request(self, prompt: str) -> None:
self.logger.debug('Ensuring Cloudflare cookies...')
self.__ensure_cf()
self.logger.debug('Sending message...')
textbox = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable(chatgpt_textbox)
)
textbox.click()
self.driver.execute_script(
'''
var element = arguments[0], txt = arguments[1];
element.value += txt;
element.dispatchEvent(new Event("change"));
''',
textbox,
prompt,
)
textbox.send_keys(Keys.ENTER)
def __get_last_response(self):
self.logger.debug('Getting response...')
responses = self.driver.find_elements(*chatgpt_big_response)
if responses:
response = responses[-1]
if 'text-red' in response.get_attribute('class'):
self.logger.debug('Response is an error')
raise ValueError(response.text)
return response
return self.driver.find_elements(*chatgpt_small_response)[-1]
def __response(self):
result_streaming = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(chatgpt_streaming)
)
while result_streaming:
response = self.__get_last_response()
response = response.get_attribute('innerHTML')
# response = markdownify(response).replace('Copy code`', '`')
yield response
result_streaming = self.driver.find_elements(*chatgpt_streaming)
response = self.__get_last_response()
yield response.get_attribute('innerHTML')
def ask(self, prompt: str):
self.__request(prompt)
return self.__response()
def __get_conversation_id(self) -> str:
pattern = re.compile(
r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'
)
matches = pattern.search(self.driver.current_url)
if not matches:
self.reset_conversation()
WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable(chatgpt_chats_list_first_node)
).click()
time.sleep(0.5)
matches = pattern.search(self.driver.current_url)
return matches.group()
def wait(self, prompt: str, stream: bool = False) -> dict:
'''
Wait an answer from ChatGPT\n
:param prompt: Prompt to send
:return: Dictionary with keys `content` and `conversation_id`
'''
self.__request(prompt)
if stream:
prev_text = ''
for content in self.__response():
from bs4 import BeautifulSoup
text = BeautifulSoup(content, 'html.parser').get_text()
if text != prev_text:
print(text[len(prev_text):], end = '')
prev_text = text
time.sleep(0.1)
print()
return {'content': content, 'conversation_id': self.__get_conversation_id()}
self.logger.debug('Waiting for completion...')
WebDriverWait(self.driver, 120).until_not(
EC.presence_of_element_located(chatgpt_streaming)
)
response = self.__get_last_response()
content = markdownify(
response.get_attribute('innerHTML')
).replace('Copy code`', '`')
return {'content': content, 'conversation_id': self.__get_conversation_id()}
def reset_conversation(self) -> None:
'''
Reset the conversation
'''
if not self.driver.current_url.startswith(chatgpt_chat_url):
return self.logger.debug('Current URL is not chat page, skipping reset')
self.logger.debug('Resetting conversation...')
try:
self.driver.find_element(*chatgpt_new_chat).click()
except SeleniumExceptions.NoSuchElementException:
self.logger.debug('New chat button not found')
self.driver.save_screenshot('reset_conversation_failed.png')
def clear_conversations(self) -> None:
'''
Clear all conversations
'''
if not self.driver.current_url.startswith(chatgpt_chat_url):
return self.logger.debug('Current URL is not chat page, skipping clear')
self.logger.debug('Clearing conversations...')
try:
self.driver.find_element(*chatgpt_clear_convo).click()
except SeleniumExceptions.NoSuchElementException:
self.logger.debug('Clear conversations button not found')
try:
self.driver.find_element(*chatgpt_confirm_clear_convo).click()
except SeleniumExceptions.NoSuchElementException:
return self.logger.debug('Confirm clear conversations button not found')
try:
WebDriverWait(self.driver, 20).until_not(
EC.presence_of_element_located(chatgpt_chats_list_first_node)
)
self.logger.debug('Cleared conversations')
except SeleniumExceptions.TimeoutException:
self.logger.debug('Clear conversations failed')
def refresh_chat_page(self) -> None:
'''
Refresh the chat page
'''
if not self.driver.current_url.startswith(chatgpt_chat_url):
return self.logger.debug('Current URL is not chat page, skipping refresh')
self.driver.get(chatgpt_chat_url)
self.__check_capacity(chatgpt_chat_url)
self.__check_blocking_elements()
In [ ]:
import os
# from pyChatGPT import ChatGPT
# open the JSON file and read the conversation_id
with open(os.path.expanduser('~/.config/pyChatGPT/secret_settings.json'), 'r') as f:
secret_settings = json.load(f)
# auth with google login
# reuse cookies generated by successful login before login,
# if `login_cookies_path` does not exist, it will process logining with `auth_type`, and save cookies to `login_cookies_path`
# only works when `auth_type` is `openai` or `google`
bot = ChatGPT(
**secret_settings, moderation = False, verbose = False, chrome_args = ['--headless'],
login_cookies_path = os.path.expanduser('~/.config/pyChatGPT/cookies.json'),
)
In [ ]:
import IPython
In [ ]:
for response in bot.ask('我累了。带着我祷告吧。'):
IPython.display.display(IPython.core.display.Markdown(response))
IPython.display.clear_output(wait = True)
In [ ]:
bot.wait('我累了。带着我祷告吧。')
Out[ ]:
In [ ]:
bot.wait('我累了。带着我祷告吧。', stream = True)
Out[ ]:
In [ ]:
# bot.reset_conversation() # reset the conversation
# bot.clear_conversations() # clear all conversations
# bot.refresh_chat_page() # refresh the chat page
In [ ]:
%%bash
mkdir -p ~/.config/pyChatGPT
%%writefile ~/.config/pyChatGPT/secret_settings.json
{
"auth_type": "google",
"email": "<email_address>",
"password": "<password>",
"conversation_id": "<conversation_id>"
}
%%writefile ~/.config/pyChatGPT/cookies.json
[
{
"domain": "chat.openai.com",
"expiry": 1684309055,
"httpOnly": true,
"name": "__Secure-next-auth.session-token",
"path": "/",
"sameSite": "Lax",
"secure": true,
"value": "<cookie_value>"
}
]
In [ ]:
import json, os
with open(os.path.expanduser('~/.config/pyChatGPT/cookies.json')) as f:
data = json.load(f)
print(json.dumps(data, indent=4))