# scraping_engine.py import asyncio import hashlib import random import os import json from playwright.async_api import Page from typing import List, Optional, Dict, Any from browserforge.fingerprints import FingerprintGenerator from dotenv import load_dotenv from config import load_spoof_config import time # Load environment variables load_dotenv() class FingerprintScrapingEngine: def __init__( self, seed: str = "default_seed", num_variations: int = 10, target_os: str = "windows", db_path: str = "jobs.db", markdown_path: str = "scraped_jobs.md", proxies: List[str] = None, login_credentials: Optional[Dict[str, str]] = None ): if target_os not in ['windows', 'macos']: raise ValueError("operating_system must be 'windows' or 'macos'") if login_credentials is None: username = os.getenv("SCRAPING_USERNAME") password = os.getenv("SCRAPING_PASSWORD") if username and password: login_credentials = { "username": username, "password": password} self.seed = seed self.os = target_os self.markdown_path = markdown_path self.proxies = proxies or [] self.login_credentials = login_credentials self.fingerprint_generator = FingerprintGenerator( browser=('chrome',), os=(self.os,) ) self.num_variations = num_variations # Load spoof config spoof_config = load_spoof_config() self.common_renderers = spoof_config["renderers"] self.common_vendors = spoof_config["vendors"] self.feedback_file = f"feedback_{seed}.json" # Feedback system self.feedback = self._load_feedback() # ← NEW: Session persistence paths self.session_dir = "browser_sessions" os.makedirs(self.session_dir, exist_ok=True) self.session_path = os.path.join( self.session_dir, f"{seed}_session.json") self.optimization_params = { "base_delay": 2.0, "max_concurrent_requests": 4, "request_timeout": 120000, "retry_attempts": 3, "captcha_handling_strategy": "avoid", # or "solve_fallback" "cloudflare_wait_strategy": "smart_wait", # or "aggressive_reload" } self._update_params_from_feedback() def _load_feedback(self) -> Dict[str, Any]: if os.path.exists(self.feedback_file): try: with open(self.feedback_file, "r") as f: data = json.load(f) data.setdefault("success_rate", 1.0) data.setdefault("captcha_count", 0) data.setdefault("cloudflare_count", 0) data.setdefault("avg_response_time", 10.0) # New metric data.setdefault("failed_domains", {}) # New metrice return data except: pass return {"success_rate": 1.0, "captcha_count": 0, "cloudflare_count": 0} def save_feedback(self): with open(self.feedback_file, "w") as f: json.dump(self.feedback, f) def report_outcome(self, outcome: str, url: Optional[str] = None, response_time: Optional[float] = None): if outcome == "success": self.feedback["success_rate"] = min( 1.0, self.feedback["success_rate"] + 0.05) # Smaller increment else: self.feedback["success_rate"] = max( 0.05, self.feedback["success_rate"] - 0.1) # Smaller decrement if outcome == "captcha": self.feedback["captcha_count"] += 1 # Adapt strategy if many captchas self.optimization_params["captcha_handling_strategy"] = "solve_fallback" elif outcome == "cloudflare": self.feedback["cloudflare_count"] += 1 # Adjust wait strategy based on frequency if self.feedback["cloudflare_count"] > 5: self.optimization_params["cloudflare_wait_strategy"] = "aggressive_reload" # Track domain-specific failures if url and outcome != "success": domain = url.split("//")[1].split("/")[0] if domain not in self.feedback["failed_domains"]: self.feedback["failed_domains"][domain] = 0 self.feedback["failed_domains"][domain] += 1 # Update average response time if response_time: prev_avg = self.feedback.get("avg_response_time", 10.0) # Simple moving average self.feedback["avg_response_time"] = ( prev_avg * 0.9) + (response_time * 0.1) self.save_feedback() self._update_params_from_feedback() # Update params based on new feedback def _update_params_from_feedback(self): """Adjust optimization parameters based on feedback.""" sr = self.feedback["success_rate"] cc = self.feedback["captcha_count"] cf = self.feedback["cloudflare_count"] avg_rt = self.feedback.get("avg_response_time", 10.0) # Adjust base delay based on success rate and avg response time if sr < 0.6: self.optimization_params["base_delay"] = max( 5.0, self.optimization_params["base_delay"] * 1.2) elif sr > 0.8: self.optimization_params["base_delay"] = min( 3.0, self.optimization_params["base_delay"] * 0.9) # Reduce concurrency if many captchas/cloudflares if cc > 3 or cf > 3: self.optimization_params["max_concurrent_requests"] = max( 2, self.optimization_params["max_concurrent_requests"] - 2) else: # Reset to default self.optimization_params["max_concurrent_requests"] = 4 # Increase timeout if avg response time is high if avg_rt > 20: self.optimization_params["request_timeout"] = 150000 # 90 seconds print(f"Optimization Params Updated: {self.optimization_params}") # ← NEW: Save browser context (cookies + localStorage) async def save_session(self, context): """Save authenticated session to disk tied to seed""" try: storage = await context.storage_state() with open(self.session_path, "w", encoding="utf-8") as f: json.dump(storage, f, indent=2) print(f"💾 Session saved for seed '{self.seed}'") except Exception as e: print(f"⚠️ Failed to save session: {e}") # ← NEW: Load session if exists async def load_session(self, context): """Restore session if available""" if os.path.exists(self.session_path): try: with open(self.session_path, "r", encoding="utf-8") as f: storage = json.load(f) await context.add_cookies(storage.get("cookies", [])) # Note: Playwright doesn't support localStorage restore via API directly, # but cookies are the main auth carrier (e.g., li_at on LinkedIn) print(f"🔁 Reusing session for seed '{self.seed}'") return True except Exception as e: print(f"⚠️ Failed to load session: {e}") # Optionally delete corrupted session if os.path.exists(self.session_path): os.remove(self.session_path) return False def _select_profile(self): seed_hash = int(hashlib.sha256(self.seed.encode()).hexdigest(), 16) random.seed(seed_hash) profile = self.fingerprint_generator.generate() concurrency_options = [4, 8, 12, 16] memory_options = [4, 8] if self.feedback["success_rate"] < 0.5: concurrency_options = [8, 4] memory_options = [8] profile.navigator.hardwareConcurrency = random.choice( concurrency_options) profile.navigator.deviceMemory = random.choice(memory_options) return profile def _get_spoof_script(self, renderer: str, vendor: str): seed_hash = int(hashlib.sha256(self.seed.encode()).hexdigest(), 16) if self.feedback["captcha_count"] > 2: noise_factor = seed_hash % 100000000 + 100000000 else: noise_factor = seed_hash % 100000000 return f""" (function() {{ const originalGetContext = HTMLCanvasElement.prototype.getContext; HTMLCanvasElement.prototype.getContext = function(type, attributes) {{ if (type === 'webgl' || type === 'experimental-webgl') {{ const ctx = originalGetContext.call(this, type, attributes); if (ctx) {{ const originalGetParameter = ctx.getParameter.bind(ctx); ctx.getParameter = function(pname) {{ if (pname === 0x9245) {{ return '{vendor}'; }} if (pname === 0x9246) {{ return '{renderer}'; }} return originalGetParameter(pname); }}; const originalGetShaderPrecisionFormat = ctx.getShaderPrecisionFormat.bind(ctx); ctx.getShaderPrecisionFormat = function(shadertype, precisiontype) {{ const format = originalGetShaderPrecisionFormat(shadertype, precisiontype); if (precisiontype === ctx.HIGH_FLOAT) {{ format.rangeMin = 127; format.rangeMax = 127; format.precision = 23; }} if (precisiontype === ctx.MEDIUM_FLOAT) {{ format.rangeMin = 62; format.rangeMax = 62; format.precision = 14; }} return format; }}; }} return ctx; }} return originalGetContext.call(this, type, attributes); }}; const originalToDataURL = HTMLCanvasElement.prototype.toDataURL; HTMLCanvasElement.prototype.toDataURL = function(type, encoderOptions) {{ const ctx = this.getContext('2d'); if (ctx) {{ const imageData = ctx.getImageData(0, 0, this.width, this.height); for (let i = 0; i < imageData.data.length; i += 4) {{ const noise = (Math.sin({noise_factor} + i) * 0.5 + 0.5) * 2 - 1; imageData.data[i] = Math.min(255, Math.max(0, imageData.data[i] + noise)); imageData.data[i+1] = Math.min(255, Math.max(0, imageData.data[i+1] + noise)); imageData.data[i+2] = Math.min(255, Math.max(0, imageData.data[i+2] + noise)); }} ctx.putImageData(imageData, 0, 0); }} return originalToDataURL.call(this, type, encoderOptions); }}; const originalAudioContext = window.AudioContext || window.webkitAudioContext; if (originalAudioContext) {{ const AudioContextOverride = function() {{ const ctx = new originalAudioContext(); const originalCreateAnalyser = ctx.createAnalyser; ctx.createAnalyser = function() {{ const analyser = originalCreateAnalyser.call(ctx); analyser.getByteFrequencyData = function(array) {{ for (let i = 0; i < array.length; i++) {{ array[i] = Math.floor(Math.sin(i * 0.1 + {seed_hash % 1000}) * 128 + 128); }} }}; analyser.getFloatFrequencyData = function(array) {{ for (let i = 0; i < array.length; i++) {{ array[i] = Math.sin(i * 0.1 + {seed_hash % 1000}) * 100 - 100; }} }}; return analyser; }}; return ctx; }}; window.AudioContext = AudioContextOverride; window.webkitAudioContext = AudioContextOverride; }} const originalQueryFonts = window.queryLocalFonts; if (originalQueryFonts) {{ window.queryLocalFonts = async function() {{ return [ {{family: "Arial", style: "normal", weight: "400"}}, {{family: "Times New Roman", style: "normal", weight: "400"}}, {{family: "Courier New", style: "normal", weight: "400"}} ]; }}; }} // Remove bot indicators delete navigator.__proto__.webdriver; window.chrome = {{ runtime: {{}} }}; }})(); """ async def _human_like_scroll(self, page): scroll_height = await page.evaluate("document.body.scrollHeight") current_scroll = 0 while current_scroll < scroll_height: scroll_step = random.randint(50, 300) await page.evaluate(f"window.scrollBy(0, {scroll_step})") await asyncio.sleep(random.uniform(0.1, 0.8)) current_scroll += scroll_step if random.random() < 0.1: await asyncio.sleep(random.uniform(1, 3)) async def _simulate_human_interaction(self, page): try: elements = await page.query_selector_all("a, button, input") if elements and random.random() < 0.3: target = random.choice(elements) await target.hover() await asyncio.sleep(random.uniform(0.2, 1.0)) except: pass async def _avoid_captcha(self, page) -> bool: await asyncio.sleep(2 + random.random() * 3) await self._human_like_scroll(page) await self._simulate_human_interaction(page) await asyncio.sleep(3 + random.random() * 2) return True async def _solve_captcha_fallback(self, page) -> bool: await asyncio.sleep(15 + random.random() * 10) captcha_content = await page.content() if "captcha" not in captcha_content.lower() and "cloudflare" not in captcha_content.lower(): return True print("🔄 Refreshing session to bypass CAPTCHA...") await page.reload() await self._avoid_captcha(page) captcha_content = await page.content() if "captcha" not in captcha_content.lower() and "cloudflare" not in captcha_content.lower(): return True return False async def _detect_cloudflare(self, page: Page) -> bool: """Detect Cloudflare challenges.""" content = await page.content() return ( "#cf-chl" in content or "checking your browser" in content.lower() or "just a moment" in content.lower() or "turnstile" in content.lower() # Check for Cloudflare Turnstile ) async def _handle_cloudflare(self, page: Page) -> bool: """ Handle Cloudflare challenges, including Turnstile if present. This is a simplified approach; real-world handling might require more sophisticated logic or external solvers. """ max_wait_time = 60 # Total time to wait for Cloudflare to resolve start_time = time.time() while time.time() - start_time < max_wait_time: if not await self._detect_cloudflare(page): print("Cloudflare challenge resolved.") return True print("Cloudflare active, waiting...") # Simulate more human-like behavior while waiting await self._simulate_human_interaction(page) # Wait for a random period, increasing slightly each time wait_time = min(10, 2 + random.uniform(1, 3) + (time.time() - start_time) * 0.1) await asyncio.sleep(wait_time) # Reload occasionally to trigger potential client-side checks if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2: print("Reloading page during Cloudflare wait...") await page.reload(wait_until='domcontentloaded', timeout=120000) print("Timeout waiting for Cloudflare resolution.") return False