# scraping_engine.py import asyncio import hashlib import random import os from typing import List, Optional, Dict from browserforge.fingerprints import FingerprintGenerator from dotenv import load_dotenv # Load environment variables load_dotenv() class FingerprintScrapingEngine: def __init__( self, seed: str = "default_seed", num_variations: int = 10, target_os: str = "windows", db_path: str = "jobs.db", markdown_path: str = "scraped_jobs.md", proxies: List[str] = None, login_credentials: Optional[Dict[str, str]] = None, search_keywords: Optional[str] = None ): if target_os not in ['windows', 'macos']: raise ValueError("operating_system must be 'windows' or 'macos'") # Load credentials from .env if not provided if login_credentials is None: username = os.getenv("SCRAPING_USERNAME") password = os.getenv("SCRAPING_PASSWORD") if username and password: login_credentials = { "username": username, "password": password} self.seed = seed self.os = target_os self.markdown_path = markdown_path self.proxies = proxies or [] self.login_credentials = login_credentials self.search_keywords = search_keywords self.fingerprint_generator = FingerprintGenerator( browser=('chrome',), os=(self.os,) ) self.num_variations = num_variations self.common_renderers = { 'windows': [ "ANGLE (Intel, Intel(R) UHD Graphics 630 (0x00003E9B) Direct3D11 vs_5_0 ps_5_0, D3D11)", "ANGLE (Intel, Intel(R) UHD Graphics (0x00009A49) Direct3D11 vs_5_0 ps_5_0, D3D11)", "ANGLE (Intel(R) Iris(TM) Graphics 540 Direct3D11 vs_5_0 ps_5_0)", "ANGLE (Intel, Intel(R) UHD Graphics 620 (0x00005916) Direct3D11 vs_5_0 ps_5_0, D3D11)", "ANGLE (Intel, Intel(R) HD Graphics 530 (0x0000191B) Direct3D11 vs_5_0 ps_5_0, D3D11)", "ANGLE (Intel, Intel(R) UHD Graphics 600 (0x00003180) Direct3D11 vs_5_0 ps_5_0, D3D11)", "ANGLE (Intel, Intel(R) Iris(R) Xe Graphics (0x00009A49) Direct3D11 vs_5_0 ps_5_0, D3D11)", ], 'macos': [ "Intel HD Graphics 530 OpenGL Engine", "Intel Iris Graphics 6100 OpenGL Engine", "Intel UHD Graphics 630 OpenGL Engine", "Intel HD Graphics 4000 OpenGL Engine", "Intel Iris Pro OpenGL Engine", "Intel UHD Graphics 617 OpenGL Engine", ] } self.common_vendors = ["Intel Inc.", "Intel", "Intel Corporation"] def _select_profile(self): seed_hash = int(hashlib.sha256(self.seed.encode()).hexdigest(), 16) random.seed(seed_hash) profile = self.fingerprint_generator.generate() profile.navigator.hardwareConcurrency = random.choice([4, 8, 12, 16]) profile.navigator.deviceMemory = random.choice([4, 8]) return profile def _get_spoof_script(self, renderer: str, vendor: str): seed_hash = int(hashlib.sha256(self.seed.encode()).hexdigest(), 16) return f""" (function() {{ const originalGetContext = HTMLCanvasElement.prototype.getContext; HTMLCanvasElement.prototype.getContext = function(type, attributes) {{ if (type === 'webgl' || type === 'experimental-webgl') {{ const ctx = originalGetContext.call(this, type, attributes); if (ctx) {{ const originalGetParameter = ctx.getParameter.bind(ctx); ctx.getParameter = function(pname) {{ if (pname === 0x9245) {{ return '{vendor}'; }} if (pname === 0x9246) {{ return '{renderer}'; }} return originalGetParameter(pname); }}; const originalGetShaderPrecisionFormat = ctx.getShaderPrecisionFormat.bind(ctx); ctx.getShaderPrecisionFormat = function(shadertype, precisiontype) {{ const format = originalGetShaderPrecisionFormat(shadertype, precisiontype); if (precisiontype === ctx.HIGH_FLOAT) {{ format.rangeMin = 127; format.rangeMax = 127; format.precision = 23; }} if (precisiontype === ctx.MEDIUM_FLOAT) {{ format.rangeMin = 62; format.rangeMax = 62; format.precision = 14; }} return format; }}; }} return ctx; }} return originalGetContext.call(this, type, attributes); }}; const originalToDataURL = HTMLCanvasElement.prototype.toDataURL; HTMLCanvasElement.prototype.toDataURL = function(type, encoderOptions) {{ const ctx = this.getContext('2d'); if (ctx) {{ const imageData = ctx.getImageData(0, 0, this.width, this.height); for (let i = 0; i < imageData.data.length; i += 4) {{ const noise = (Math.sin({seed_hash % 100000000} + i) * 0.5 + 0.5) * 2 - 1; imageData.data[i] = Math.min(255, Math.max(0, imageData.data[i] + noise)); imageData.data[i+1] = Math.min(255, Math.max(0, imageData.data[i+1] + noise)); imageData.data[i+2] = Math.min(255, Math.max(0, imageData.data[i+2] + noise)); }} ctx.putImageData(imageData, 0, 0); }} return originalToDataURL.call(this, type, encoderOptions); }}; const originalAudioContext = window.AudioContext || window.webkitAudioContext; if (originalAudioContext) {{ const AudioContextOverride = function() {{ const ctx = new originalAudioContext(); const originalCreateAnalyser = ctx.createAnalyser; ctx.createAnalyser = function() {{ const analyser = originalCreateAnalyser.call(ctx); analyser.getByteFrequencyData = function(array) {{ for (let i = 0; i < array.length; i++) {{ array[i] = Math.floor(Math.sin(i * 0.1 + {seed_hash % 1000}) * 128 + 128); }} }}; analyser.getFloatFrequencyData = function(array) {{ for (let i = 0; i < array.length; i++) {{ array[i] = Math.sin(i * 0.1 + {seed_hash % 1000}) * 100 - 100; }} }}; return analyser; }}; return ctx; }}; window.AudioContext = AudioContextOverride; window.webkitAudioContext = AudioContextOverride; }} const originalQueryFonts = window.queryLocalFonts; if (originalQueryFonts) {{ window.queryLocalFonts = async function() {{ return [ {{family: "Arial", style: "normal", weight: "400"}}, {{family: "Times New Roman", style: "normal", weight: "400"}}, {{family: "Courier New", style: "normal", weight: "400"}} ]; }}; }} // Remove bot indicators delete navigator.__proto__.webdriver; window.chrome = {{ runtime: {{}} }}; }})(); """ async def _human_like_scroll(self, page): scroll_height = await page.evaluate("document.body.scrollHeight") current_scroll = 0 while current_scroll < scroll_height: scroll_step = random.randint(50, 300) await page.evaluate(f"window.scrollBy(0, {scroll_step})") await asyncio.sleep(random.uniform(0.1, 0.8)) current_scroll += scroll_step if random.random() < 0.1: await asyncio.sleep(random.uniform(1, 3)) async def _simulate_human_interaction(self, page): try: elements = await page.query_selector_all("a, button, input") if elements and random.random() < 0.3: target = random.choice(elements) await target.hover() await asyncio.sleep(random.uniform(0.2, 1.0)) except: pass async def _detect_cloudflare(self, page) -> bool: """Detect Cloudflare challenge pages""" content = await page.content() return ( "#cf-chl" in content or "checking your browser" in content.lower() or "just a moment" in content.lower() ) async def _handle_cloudflare(self, page, max_retries: int = 3): """Wait for Cloudflare to resolve""" for i in range(max_retries): if not await self._detect_cloudflare(page): return True print(f"☁️ Cloudflare detected - waiting... (attempt {i+1})") await asyncio.sleep(8 + random.uniform(2, 5)) await page.wait_for_load_state("load", timeout=60000) return False async def _avoid_captcha(self, page) -> bool: await asyncio.sleep(2 + random.random() * 3) await self._human_like_scroll(page) await self._simulate_human_interaction(page) await asyncio.sleep(3 + random.random() * 2) return True async def _solve_captcha_fallback(self, page) -> bool: await asyncio.sleep(15 + random.random() * 10) captcha_content = await page.content() if "captcha" not in captcha_content.lower() and "cloudflare" not in captcha_content.lower(): return True print("🔄 Refreshing session to bypass CAPTCHA...") await page.reload() await self._avoid_captcha(page) captcha_content = await page.content() if "captcha" not in captcha_content.lower() and "cloudflare" not in captcha_content.lower(): return True return False