feat(scraping): enhance job scraping with session persistence and feedback system

- Add config module for spoof data management
- Implement session persistence to reuse authenticated sessions
- Add feedback system to track success rates and adjust fingerprinting
- Improve job link collection with pagination and scroll detection
- Separate verified/unverified job listings into different folders
- Enhance error handling for CAPTCHA and Cloudflare challenges
This commit is contained in:
Ofure Ikheloa 2025-11-21 16:51:26 +01:00
parent 68495a0a54
commit 458e914d71
4 changed files with 377 additions and 93 deletions

39
config.py Normal file
View File

@ -0,0 +1,39 @@
import os
import json
def load_spoof_config():
"""Load spoof data from JSON config file. Falls back to defaults if missing."""
config_path = os.path.join(os.path.dirname(__file__), "spoof_config.json")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f)
else:
# Generate default config file on first run
default_config = {
"renderers": {
"windows": [
"ANGLE (Intel, Intel(R) UHD Graphics 630 (0x00003E9B) Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (Intel, Intel(R) UHD Graphics (0x00009A49) Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (Intel(R) Iris(TM) Graphics 540 Direct3D11 vs_5_0 ps_5_0)",
"ANGLE (Intel, Intel(R) UHD Graphics 620 (0x00005916) Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (Intel, Intel(R) HD Graphics 530 (0x0000191B) Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (Intel, Intel(R) UHD Graphics 600 (0x00003180) Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (Intel, Intel(R) Iris(R) Xe Graphics (0x00009A49) Direct3D11 vs_5_0 ps_5_0, D3D11)",
],
"macos": [
"Intel HD Graphics 530 OpenGL Engine",
"Intel Iris Graphics 6100 OpenGL Engine",
"Intel UHD Graphics 630 OpenGL Engine",
"Intel HD Graphics 4000 OpenGL Engine",
"Intel Iris Pro OpenGL Engine",
"Intel UHD Graphics 617 OpenGL Engine",
]
},
"vendors": ["Intel Inc.", "Intel", "Intel Corporation"]
}
with open(config_path, "w", encoding="utf-8") as f:
json.dump(default_config, f, indent=2)
return default_config

View File

@ -1,4 +1,5 @@
import asyncio import asyncio
import random import random
import sqlite3 import sqlite3
@ -114,24 +115,22 @@ class LinkedInJobScraper:
pass pass
return "N/A" return "N/A"
# Try multiple strategies for each field title = await get_text("h1.t-24")
title = await get_text("h1.t-24") # LinkedIn
if title == "N/A": if title == "N/A":
title = await get_text("h1, h2") # External title = await get_text("h1, h2")
company = await get_text("a.app-aware-link[href*='/company/']") # LinkedIn company = await get_text("a.app-aware-link[href*='/company/']")
if company == "N/A": if company == "N/A":
company = await get_text("div.org, .company, [class*='company']") # External company = await get_text("div.org, .company, [class*='company']")
location = await get_text("span[class*='location']") # LinkedIn location = await get_text("span[class*='location']")
if location == "N/A": if location == "N/A":
location = await get_text(".location, [class*='location']") location = await get_text(".location, [class*='location']")
description = await get_text("div[class*='description__text']") # LinkedIn description = await get_text("div[class*='description__text']")
if description == "N/A": if description == "N/A":
description = await get_text(".job-desc, .description, main, body") description = await get_text(".job-desc, .description, main, body")
# Workplace & salary — LinkedIn only (external may not have)
workplace = await get_text("span.job-workplace-type") or "N/A" workplace = await get_text("span.job-workplace-type") or "N/A"
salary = await get_text("span.salary") or "N/A" salary = await get_text("span.salary") or "N/A"
@ -145,19 +144,39 @@ class LinkedInJobScraper:
"url": page.url "url": page.url
} }
async def _save_to_markdown(self, job_data: Dict, keyword: str): async def _save_to_markdown(self, job_data: Dict, keyword: str, verified: bool=True):
os.makedirs("linkedin_jobs", exist_ok=True) """Save to appropriate folder using job ID to avoid duplication"""
clean_keyword = keyword.replace(" ", "_") folder = "linkedin_jobs" if verified else "linkedin_jobs_unverified"
filename = f"linkedin_{clean_keyword}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md" os.makedirs(folder, exist_ok=True)
filepath = os.path.join("linkedin_jobs", filename)
# Extract job ID from URL for LinkedIn jobs
url = job_data.get("url", "")
if "/jobs/view/" in url:
try:
job_id = url.split("/view/")[1].split("/")[0]
except:
job_id = "unknown"
else:
# For external jobs, use a hash of the URL (first 12 chars)
import hashlib
job_id = hashlib.md5(url.encode()).hexdigest()[:12]
clean_keyword = keyword.replace(" ", "_")
filename = f"linkedin_{clean_keyword}_job_{job_id}.md"
filepath = os.path.join(folder, filename)
# Only save if file doesn't already exist (idempotent)
if os.path.exists(filepath):
print(f" 📝 Skipping duplicate Markdown file: {filename}")
return
with open(filepath, "w", encoding="utf-8") as f: with open(filepath, "w", encoding="utf-8") as f:
f.write(f"# {job_data['title']}\n\n") f.write(f"# {job_data['title']}\n\n")
f.write(f"- **Company**: {job_data['company']}\n") f.write(f"- **Company**: {job_data['company']}\n")
f.write(f"- **Location**: {job_data['location']}\n") f.write(f"- **Location**: {job_data['location']}\n")
f.write(f"- **Workplace**: {job_data['workplace_type']}\n") f.write(f"- **Workplace**: {job_data['workplace_type']}\n")
f.write(f"- **Salary**: {job_data['salary']}\n") f.write(f"- **Salary**: {job_data['salary']}\n")
f.write(f"- **URL**: <{job_data['url']}>\n\n") f.write(f"- **URL**: <{url}>\n\n")
f.write(f"## Description\n\n{job_data['description']}\n") f.write(f"## Description\n\n{job_data['description']}\n")
async def _save_to_db(self, job_data: Dict, keyword: str): async def _save_to_db(self, job_data: Dict, keyword: str):
@ -208,59 +227,176 @@ class LinkedInJobScraper:
await context.add_init_script(spoof_script) await context.add_init_script(spoof_script)
page = await context.new_page() page = await context.new_page()
if credentials: session_loaded = await self.engine.load_session(context)
print("🔐 Attempting LinkedIn login...") login_successful = False
if not await self._login(page, credentials):
if session_loaded:
print("🔁 Using saved session — verifying login...")
await page.goto("https://www.linkedin.com/feed/", timeout=60000)
if "feed" in page.url and "login" not in page.url:
print("✅ Session still valid.")
login_successful = True
else:
print("⚠️ Saved session expired — re-authenticating.")
session_loaded = False
if not session_loaded and credentials:
print("🔐 Performing fresh login...")
login_successful = await self._login(page, credentials)
if login_successful:
await self.engine.save_session(context)
else:
print("❌ Login failed. Exiting.") print("❌ Login failed. Exiting.")
await browser.close() await browser.close()
self.engine.report_outcome("block")
return return
else: elif not credentials:
print(" No credentials — proceeding as guest.") print(" No credentials — proceeding as guest.")
login_successful = True
else:
pass
await page.wait_for_load_state("load", timeout=60000)
print("✅ Post-login page fully loaded. Starting search...")
if await self.engine._detect_cloudflare(page):
print("☁️ Cloudflare detected on initial load.")
if not await self.engine._handle_cloudflare(page):
print("❌ Cloudflare could not be resolved.")
await browser.close()
self.engine.report_outcome("cloudflare")
return
print(f"🔍 Searching for: {search_keywords}") print(f"🔍 Searching for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=60000) await page.goto(search_url, wait_until='load', timeout=60000)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed) await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
if await self.engine._detect_cloudflare(page):
print("☁️ Cloudflare detected on search page.")
if not await self.engine._handle_cloudflare(page):
await browser.close()
self.engine.report_outcome("cloudflare")
return
scraped_count = 0 scraped_count = 0
all_job_links = [] all_job_links = []
seen_job_ids = set()
# Collect job links # ← NEW: Scroll once to reveal pagination (if any)
for page_num in range(1, max_pages + 1): print("🔄 Scrolling to bottom to reveal pagination controls...")
print(f"📄 Collecting job links from page {page_num}/{max_pages}") await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
for _ in range(50): await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
links = await page.query_selector_all("a[href*='/jobs/view/']")
if links: # Check if pagination exists
for link in links: pagination_exists = await page.query_selector("button[aria-label='Next']")
href = await link.get_attribute("href") if pagination_exists:
if href and href not in all_job_links: print("⏭️ Pagination detected. Using page navigation.")
current_page = 1
while current_page <= max_pages:
print(f"📄 Processing page {current_page}/{max_pages}")
# Collect job links on current page
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
new_jobs = 0
for link in current_links:
href = await link.get_attribute("href")
if href:
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
if job_id and job_id not in seen_job_ids:
seen_job_ids.add(job_id)
all_job_links.append(href) all_job_links.append(href)
break new_jobs += 1
await asyncio.sleep(1)
print(f" Found {len(links) if 'links' in locals() else 0} new job links.") print(f" Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})")
if page_num < max_pages: # Try to go to next page
next_btn = await page.query_selector("button[aria-label='Next']") if current_page < max_pages:
if next_btn and await next_btn.is_enabled(): next_btn = await page.query_selector("button[aria-label='Next']")
await self._human_click(page, next_btn) if next_btn and await next_btn.is_enabled():
await asyncio.sleep(4 * self.human_speed) await self._human_click(page, next_btn)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
# Wait for URL to change or new content
try:
await page.wait_for_function("() => window.location.href.includes('start=')", timeout=30000)
except:
pass
else:
print("🔚 'Next' button not available — stopping pagination.")
break
current_page += 1
else:
print("🔄 No pagination found. Falling back to infinite scroll...")
last_height = await page.evaluate("document.body.scrollHeight")
no_new_jobs_count = 0
max_no_new = 3
while no_new_jobs_count < max_no_new:
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
new_jobs_found = 0
for link in current_links:
href = await link.get_attribute("href")
if href:
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
if job_id and job_id not in seen_job_ids:
seen_job_ids.add(job_id)
all_job_links.append(href)
new_jobs_found += 1
print(f" Found {new_jobs_found} new job(s) (total: {len(all_job_links)})")
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == last_height:
no_new_jobs_count += 1
else: else:
print("🔚 No next page.") no_new_jobs_count = 0
last_height = new_height
if new_jobs_found == 0 and no_new_jobs_count >= 1:
print("🔚 No new jobs loaded. Stopping scroll.")
break break
# Process each job print(f"✅ Collected {len(all_job_links)} unique job links.")
# ← Rest of job processing loop unchanged
scraped_count = 0
for idx, href in enumerate(all_job_links): for idx, href in enumerate(all_job_links):
try: try:
full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}" full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}"
print(f" → Opening job {idx+1}: {full_url}") print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
await page.goto(full_url, wait_until='load', timeout=60000) await page.goto(full_url, wait_until='load', timeout=60000)
await asyncio.sleep(3 * self.human_speed) await asyncio.sleep(3 * self.human_speed)
if not await page.query_selector("h1.t-24"): is_cloudflare = await self.engine._detect_cloudflare(page)
print(f" ⚠️ Invalid job page, skipping.") page_content = await page.content()
continue has_captcha_text = "captcha" in page_content.lower()
captcha_present = is_cloudflare or has_captcha_text
title_element = await page.query_selector("h1.t-24")
job_data_accessible = title_element is not None
if captcha_present:
if job_data_accessible:
print(" ⚠️ CAPTCHA detected, but job data is accessible. Proceeding in stealth mode...")
await self.engine._avoid_captcha(page)
else:
print(" ⚠️ CAPTCHA detected and job data blocked. Attempting recovery...")
if not await self.engine._solve_captcha_fallback(page):
print(" ❌ CAPTCHA recovery failed. Skipping job.")
continue
title_element = await page.query_selector("h1.t-24")
if not title_element:
print(" ❌ Job data still unavailable after CAPTCHA handling. Skipping.")
continue
if not captcha_present:
await self.engine._avoid_captcha(page)
# Find and click the main "Apply" button
apply_btn = None apply_btn = None
apply_selectors = [ apply_selectors = [
"button[aria-label*='Apply']", "button[aria-label*='Apply']",
@ -273,36 +409,83 @@ class LinkedInJobScraper:
if apply_btn: if apply_btn:
break break
if not apply_btn: job_data = None
print(f" ⚠️ No 'Apply' button found, skipping.") final_url = full_url
continue
# Click "Apply" if apply_btn:
print(f" → Clicking 'Apply' / 'Easy Apply' button...") print(" → Clicking 'Apply' / 'Easy Apply' button...")
await self._human_click(page, apply_btn, wait_after=False)
await asyncio.sleep(4 * self.human_speed) # Wait for next page/form to load page_waiter = asyncio.create_task(context.wait_for_event("page"))
await self._human_click(page, apply_btn, wait_after=False)
external_page = None
try:
external_page = await asyncio.wait_for(page_waiter, timeout=5.0)
print(" 🌐 External job site opened in new tab.")
await external_page.wait_for_load_state("load", timeout=30000)
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(external_page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(external_page)
final_url = external_page.url
if not external_page.is_closed():
await external_page.close()
except asyncio.TimeoutError:
print(" 🖥️ No external tab — scraping LinkedIn job page.")
await page.wait_for_timeout(2000)
try:
await page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=8000)
except:
pass
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(page)
final_url = page.url
else:
print(" ⚠️ No 'Apply' button found — scraping job details directly.")
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(page)
final_url = page.url
# Now scrape WHATEVER page is displayed (Easy Apply form OR external site) job_data["url"] = final_url
job_data = await self._extract_job_details(page)
if job_data["title"] == "N/A" and "linkedin.com" in page.url:
# On LinkedIn but no title → likely Easy Apply form; use job ID as title
job_id = full_url.split("/")[-2] if "/jobs/view/" in full_url else "unknown"
job_data["title"] = f"Easy Apply Job - ID {job_id}"
await self._save_to_db(job_data, search_keywords) if job_data["title"] == "N/A" and "linkedin.com" in final_url:
await self._save_to_markdown(job_data, search_keywords) job_id = final_url.split("/")[-2] if "/jobs/view/" in final_url else "unknown"
scraped_count += 1 job_data["title"] = f"Easy Apply Job - ID {job_id}"
domain = "LinkedIn (Easy Apply)" if "linkedin.com" in page.url else "External Site"
print(f" ✅ Scraped ({domain}): {job_data['title'][:50]}...") is_meaningful = (
job_data["title"] != "N/A" or
job_data["company"] != "N/A" or
(job_data["description"] != "N/A" and len(job_data["description"]) > 20)
)
if is_meaningful:
await self._save_to_db(job_data, search_keywords)
await self._save_to_markdown(job_data, search_keywords, verified=True)
scraped_count += 1
print(f" ✅ Scraped (verified): {job_data['title'][:50]}...")
else:
await self._save_to_markdown(job_data, search_keywords, verified=False)
print(f" 🟡 Scraped (unverified): {final_url} — low-quality data")
except Exception as e: except Exception as e:
print(f" ⚠️ Failed on job {idx+1}: {str(e)[:100]}") print(f" ⚠️ Failed on job {idx+1}: {str(e)[:100]}")
continue
finally: finally:
# Return to search results
print(" ↩️ Returning to LinkedIn search results...") print(" ↩️ Returning to LinkedIn search results...")
await page.goto(search_url, timeout=60000) await page.goto(search_url, timeout=60000)
await asyncio.sleep(4 * self.human_speed) await asyncio.sleep(4 * self.human_speed)
await browser.close() await browser.close()
print(f"✅ Completed! Scraped {scraped_count} job pages (internal + external) for '{search_keywords}'.")
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Saved {scraped_count} verified + additional unverified jobs for '{search_keywords}'.")
else:
self.engine.report_outcome("captcha")
print("⚠️ No verified jobs scraped — check 'linkedin_jobs_unverified' for raw outputs.")

View File

@ -10,13 +10,13 @@ async def main():
target_os="windows", target_os="windows",
db_path="job_listings.db", db_path="job_listings.db",
markdown_path="job_listings.md", markdown_path="job_listings.md",
search_keywords="Accountant", search_keywords="Data Anaylst"
) )
scraper = LinkedInJobScraper(engine, human_speed=1.2) scraper = LinkedInJobScraper(engine, human_speed=1.6)
await scraper.scrape_jobs( await scraper.scrape_jobs(
search_keywords="Accountant", # ← Your search terms search_keywords="Data Anaylst", # ← Your search terms
max_pages=3, max_pages=3,
credentials={ credentials={
"email": os.getenv("SCRAPING_USERNAME"), "email": os.getenv("SCRAPING_USERNAME"),

View File

@ -1,12 +1,15 @@
# scraping_engine.py # scraping_engine.py
import asyncio import asyncio
import hashlib import hashlib
import random import random
import os import os
import json
from typing import List, Optional, Dict from typing import List, Optional, Dict
from browserforge.fingerprints import FingerprintGenerator from browserforge.fingerprints import FingerprintGenerator
from dotenv import load_dotenv from dotenv import load_dotenv
from config import load_spoof_config
# Load environment variables # Load environment variables
load_dotenv() load_dotenv()
@ -27,7 +30,6 @@ class FingerprintScrapingEngine:
if target_os not in ['windows', 'macos']: if target_os not in ['windows', 'macos']:
raise ValueError("operating_system must be 'windows' or 'macos'") raise ValueError("operating_system must be 'windows' or 'macos'")
# Load credentials from .env if not provided
if login_credentials is None: if login_credentials is None:
username = os.getenv("SCRAPING_USERNAME") username = os.getenv("SCRAPING_USERNAME")
password = os.getenv("SCRAPING_PASSWORD") password = os.getenv("SCRAPING_PASSWORD")
@ -47,37 +49,99 @@ class FingerprintScrapingEngine:
) )
self.num_variations = num_variations self.num_variations = num_variations
self.common_renderers = {
'windows': [ # Load spoof config
"ANGLE (Intel, Intel(R) UHD Graphics 630 (0x00003E9B) Direct3D11 vs_5_0 ps_5_0, D3D11)", spoof_config = load_spoof_config()
"ANGLE (Intel, Intel(R) UHD Graphics (0x00009A49) Direct3D11 vs_5_0 ps_5_0, D3D11)", self.common_renderers = spoof_config["renderers"]
"ANGLE (Intel(R) Iris(TM) Graphics 540 Direct3D11 vs_5_0 ps_5_0)", self.common_vendors = spoof_config["vendors"]
"ANGLE (Intel, Intel(R) UHD Graphics 620 (0x00005916) Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (Intel, Intel(R) HD Graphics 530 (0x0000191B) Direct3D11 vs_5_0 ps_5_0, D3D11)", # Feedback system
"ANGLE (Intel, Intel(R) UHD Graphics 600 (0x00003180) Direct3D11 vs_5_0 ps_5_0, D3D11)", self.feedback_file = f"feedback_{seed}.json"
"ANGLE (Intel, Intel(R) Iris(R) Xe Graphics (0x00009A49) Direct3D11 vs_5_0 ps_5_0, D3D11)", self.feedback = self._load_feedback()
],
'macos': [ # ← NEW: Session persistence paths
"Intel HD Graphics 530 OpenGL Engine", self.session_dir = "browser_sessions"
"Intel Iris Graphics 6100 OpenGL Engine", os.makedirs(self.session_dir, exist_ok=True)
"Intel UHD Graphics 630 OpenGL Engine", self.session_path = os.path.join(self.session_dir, f"{seed}_session.json")
"Intel HD Graphics 4000 OpenGL Engine",
"Intel Iris Pro OpenGL Engine", def _load_feedback(self):
"Intel UHD Graphics 617 OpenGL Engine", if os.path.exists(self.feedback_file):
] try:
} with open(self.feedback_file, "r") as f:
self.common_vendors = ["Intel Inc.", "Intel", "Intel Corporation"] data = json.load(f)
data.setdefault("success_rate", 1.0)
data.setdefault("captcha_count", 0)
data.setdefault("cloudflare_count", 0)
return data
except:
pass
return {"success_rate": 1.0, "captcha_count": 0, "cloudflare_count": 0}
def save_feedback(self):
with open(self.feedback_file, "w") as f:
json.dump(self.feedback, f)
def report_outcome(self, outcome: str):
if outcome == "success":
self.feedback["success_rate"] = min(1.0, self.feedback["success_rate"] + 0.1)
else:
self.feedback["success_rate"] = max(0.1, self.feedback["success_rate"] - 0.2)
if outcome == "captcha":
self.feedback["captcha_count"] += 1
elif outcome == "cloudflare":
self.feedback["cloudflare_count"] += 1
self.save_feedback()
# ← NEW: Save browser context (cookies + localStorage)
async def save_session(self, context):
"""Save authenticated session to disk tied to seed"""
try:
storage = await context.storage_state()
with open(self.session_path, "w", encoding="utf-8") as f:
json.dump(storage, f, indent=2)
print(f"💾 Session saved for seed '{self.seed}'")
except Exception as e:
print(f"⚠️ Failed to save session: {e}")
# ← NEW: Load session if exists
async def load_session(self, context):
"""Restore session if available"""
if os.path.exists(self.session_path):
try:
with open(self.session_path, "r", encoding="utf-8") as f:
storage = json.load(f)
await context.add_cookies(storage.get("cookies", []))
# Note: Playwright doesn't support localStorage restore via API directly,
# but cookies are the main auth carrier (e.g., li_at on LinkedIn)
print(f"🔁 Reusing session for seed '{self.seed}'")
return True
except Exception as e:
print(f"⚠️ Failed to load session: {e}")
# Optionally delete corrupted session
if os.path.exists(self.session_path):
os.remove(self.session_path)
return False
def _select_profile(self): def _select_profile(self):
seed_hash = int(hashlib.sha256(self.seed.encode()).hexdigest(), 16) seed_hash = int(hashlib.sha256(self.seed.encode()).hexdigest(), 16)
random.seed(seed_hash) random.seed(seed_hash)
profile = self.fingerprint_generator.generate() profile = self.fingerprint_generator.generate()
profile.navigator.hardwareConcurrency = random.choice([4, 8, 12, 16]) concurrency_options = [4, 8, 12, 16]
profile.navigator.deviceMemory = random.choice([4, 8]) memory_options = [4, 8]
if self.feedback["success_rate"] < 0.5:
concurrency_options = [8, 4]
memory_options = [8]
profile.navigator.hardwareConcurrency = random.choice(concurrency_options)
profile.navigator.deviceMemory = random.choice(memory_options)
return profile return profile
def _get_spoof_script(self, renderer: str, vendor: str): def _get_spoof_script(self, renderer: str, vendor: str):
seed_hash = int(hashlib.sha256(self.seed.encode()).hexdigest(), 16) seed_hash = int(hashlib.sha256(self.seed.encode()).hexdigest(), 16)
if self.feedback["captcha_count"] > 2:
noise_factor = seed_hash % 100000000 + 100000000
else:
noise_factor = seed_hash % 100000000
return f""" return f"""
(function() {{ (function() {{
const originalGetContext = HTMLCanvasElement.prototype.getContext; const originalGetContext = HTMLCanvasElement.prototype.getContext;
@ -113,7 +177,7 @@ class FingerprintScrapingEngine:
if (ctx) {{ if (ctx) {{
const imageData = ctx.getImageData(0, 0, this.width, this.height); const imageData = ctx.getImageData(0, 0, this.width, this.height);
for (let i = 0; i < imageData.data.length; i += 4) {{ for (let i = 0; i < imageData.data.length; i += 4) {{
const noise = (Math.sin({seed_hash % 100000000} + i) * 0.5 + 0.5) * 2 - 1; const noise = (Math.sin({noise_factor} + i) * 0.5 + 0.5) * 2 - 1;
imageData.data[i] = Math.min(255, Math.max(0, imageData.data[i] + noise)); imageData.data[i] = Math.min(255, Math.max(0, imageData.data[i] + noise));
imageData.data[i+1] = Math.min(255, Math.max(0, imageData.data[i+1] + noise)); imageData.data[i+1] = Math.min(255, Math.max(0, imageData.data[i+1] + noise));
imageData.data[i+2] = Math.min(255, Math.max(0, imageData.data[i+2] + noise)); imageData.data[i+2] = Math.min(255, Math.max(0, imageData.data[i+2] + noise));
@ -184,7 +248,6 @@ class FingerprintScrapingEngine:
pass pass
async def _detect_cloudflare(self, page) -> bool: async def _detect_cloudflare(self, page) -> bool:
"""Detect Cloudflare challenge pages"""
content = await page.content() content = await page.content()
return ( return (
"#cf-chl" in content or "#cf-chl" in content or
@ -193,7 +256,6 @@ class FingerprintScrapingEngine:
) )
async def _handle_cloudflare(self, page, max_retries: int = 3): async def _handle_cloudflare(self, page, max_retries: int = 3):
"""Wait for Cloudflare to resolve"""
for i in range(max_retries): for i in range(max_retries):
if not await self._detect_cloudflare(page): if not await self._detect_cloudflare(page):
return True return True