Web_scraping_project/scraping_engine.py
2025-11-20 18:58:26 +00:00

226 lines
10 KiB
Python

# scraping_engine.py
import asyncio
import hashlib
import random
import os
from typing import List, Optional, Dict
from browserforge.fingerprints import FingerprintGenerator
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
class FingerprintScrapingEngine:
def __init__(
self,
seed: str = "default_seed",
num_variations: int = 10,
target_os: str = "windows",
db_path: str = "jobs.db",
markdown_path: str = "scraped_jobs.md",
proxies: List[str] = None,
login_credentials: Optional[Dict[str, str]] = None,
search_keywords: Optional[str] = None
):
if target_os not in ['windows', 'macos']:
raise ValueError("operating_system must be 'windows' or 'macos'")
# Load credentials from .env if not provided
if login_credentials is None:
username = os.getenv("SCRAPING_USERNAME")
password = os.getenv("SCRAPING_PASSWORD")
if username and password:
login_credentials = {
"username": username, "password": password}
self.seed = seed
self.os = target_os
self.markdown_path = markdown_path
self.proxies = proxies or []
self.login_credentials = login_credentials
self.search_keywords = search_keywords
self.fingerprint_generator = FingerprintGenerator(
browser=('chrome',),
os=(self.os,)
)
self.num_variations = num_variations
self.common_renderers = {
'windows': [
"ANGLE (Intel, Intel(R) UHD Graphics 630 (0x00003E9B) Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (Intel, Intel(R) UHD Graphics (0x00009A49) Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (Intel(R) Iris(TM) Graphics 540 Direct3D11 vs_5_0 ps_5_0)",
"ANGLE (Intel, Intel(R) UHD Graphics 620 (0x00005916) Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (Intel, Intel(R) HD Graphics 530 (0x0000191B) Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (Intel, Intel(R) UHD Graphics 600 (0x00003180) Direct3D11 vs_5_0 ps_5_0, D3D11)",
"ANGLE (Intel, Intel(R) Iris(R) Xe Graphics (0x00009A49) Direct3D11 vs_5_0 ps_5_0, D3D11)",
],
'macos': [
"Intel HD Graphics 530 OpenGL Engine",
"Intel Iris Graphics 6100 OpenGL Engine",
"Intel UHD Graphics 630 OpenGL Engine",
"Intel HD Graphics 4000 OpenGL Engine",
"Intel Iris Pro OpenGL Engine",
"Intel UHD Graphics 617 OpenGL Engine",
]
}
self.common_vendors = ["Intel Inc.", "Intel", "Intel Corporation"]
def _select_profile(self):
seed_hash = int(hashlib.sha256(self.seed.encode()).hexdigest(), 16)
random.seed(seed_hash)
profile = self.fingerprint_generator.generate()
profile.navigator.hardwareConcurrency = random.choice([4, 8, 12, 16])
profile.navigator.deviceMemory = random.choice([4, 8])
return profile
def _get_spoof_script(self, renderer: str, vendor: str):
seed_hash = int(hashlib.sha256(self.seed.encode()).hexdigest(), 16)
return f"""
(function() {{
const originalGetContext = HTMLCanvasElement.prototype.getContext;
HTMLCanvasElement.prototype.getContext = function(type, attributes) {{
if (type === 'webgl' || type === 'experimental-webgl') {{
const ctx = originalGetContext.call(this, type, attributes);
if (ctx) {{
const originalGetParameter = ctx.getParameter.bind(ctx);
ctx.getParameter = function(pname) {{
if (pname === 0x9245) {{ return '{vendor}'; }}
if (pname === 0x9246) {{ return '{renderer}'; }}
return originalGetParameter(pname);
}};
const originalGetShaderPrecisionFormat = ctx.getShaderPrecisionFormat.bind(ctx);
ctx.getShaderPrecisionFormat = function(shadertype, precisiontype) {{
const format = originalGetShaderPrecisionFormat(shadertype, precisiontype);
if (precisiontype === ctx.HIGH_FLOAT) {{
format.rangeMin = 127; format.rangeMax = 127; format.precision = 23;
}}
if (precisiontype === ctx.MEDIUM_FLOAT) {{
format.rangeMin = 62; format.rangeMax = 62; format.precision = 14;
}}
return format;
}};
}}
return ctx;
}}
return originalGetContext.call(this, type, attributes);
}};
const originalToDataURL = HTMLCanvasElement.prototype.toDataURL;
HTMLCanvasElement.prototype.toDataURL = function(type, encoderOptions) {{
const ctx = this.getContext('2d');
if (ctx) {{
const imageData = ctx.getImageData(0, 0, this.width, this.height);
for (let i = 0; i < imageData.data.length; i += 4) {{
const noise = (Math.sin({seed_hash % 100000000} + i) * 0.5 + 0.5) * 2 - 1;
imageData.data[i] = Math.min(255, Math.max(0, imageData.data[i] + noise));
imageData.data[i+1] = Math.min(255, Math.max(0, imageData.data[i+1] + noise));
imageData.data[i+2] = Math.min(255, Math.max(0, imageData.data[i+2] + noise));
}}
ctx.putImageData(imageData, 0, 0);
}}
return originalToDataURL.call(this, type, encoderOptions);
}};
const originalAudioContext = window.AudioContext || window.webkitAudioContext;
if (originalAudioContext) {{
const AudioContextOverride = function() {{
const ctx = new originalAudioContext();
const originalCreateAnalyser = ctx.createAnalyser;
ctx.createAnalyser = function() {{
const analyser = originalCreateAnalyser.call(ctx);
analyser.getByteFrequencyData = function(array) {{
for (let i = 0; i < array.length; i++) {{
array[i] = Math.floor(Math.sin(i * 0.1 + {seed_hash % 1000}) * 128 + 128);
}}
}};
analyser.getFloatFrequencyData = function(array) {{
for (let i = 0; i < array.length; i++) {{
array[i] = Math.sin(i * 0.1 + {seed_hash % 1000}) * 100 - 100;
}}
}};
return analyser;
}};
return ctx;
}};
window.AudioContext = AudioContextOverride;
window.webkitAudioContext = AudioContextOverride;
}}
const originalQueryFonts = window.queryLocalFonts;
if (originalQueryFonts) {{
window.queryLocalFonts = async function() {{
return [
{{family: "Arial", style: "normal", weight: "400"}},
{{family: "Times New Roman", style: "normal", weight: "400"}},
{{family: "Courier New", style: "normal", weight: "400"}}
];
}};
}}
// Remove bot indicators
delete navigator.__proto__.webdriver;
window.chrome = {{ runtime: {{}} }};
}})();
"""
async def _human_like_scroll(self, page):
scroll_height = await page.evaluate("document.body.scrollHeight")
current_scroll = 0
while current_scroll < scroll_height:
scroll_step = random.randint(50, 300)
await page.evaluate(f"window.scrollBy(0, {scroll_step})")
await asyncio.sleep(random.uniform(0.1, 0.8))
current_scroll += scroll_step
if random.random() < 0.1:
await asyncio.sleep(random.uniform(1, 3))
async def _simulate_human_interaction(self, page):
try:
elements = await page.query_selector_all("a, button, input")
if elements and random.random() < 0.3:
target = random.choice(elements)
await target.hover()
await asyncio.sleep(random.uniform(0.2, 1.0))
except:
pass
async def _detect_cloudflare(self, page) -> bool:
"""Detect Cloudflare challenge pages"""
content = await page.content()
return (
"#cf-chl" in content or
"checking your browser" in content.lower() or
"just a moment" in content.lower()
)
async def _handle_cloudflare(self, page, max_retries: int = 3):
"""Wait for Cloudflare to resolve"""
for i in range(max_retries):
if not await self._detect_cloudflare(page):
return True
print(f"☁️ Cloudflare detected - waiting... (attempt {i+1})")
await asyncio.sleep(8 + random.uniform(2, 5))
await page.wait_for_load_state("load", timeout=60000)
return False
async def _avoid_captcha(self, page) -> bool:
await asyncio.sleep(2 + random.random() * 3)
await self._human_like_scroll(page)
await self._simulate_human_interaction(page)
await asyncio.sleep(3 + random.random() * 2)
return True
async def _solve_captcha_fallback(self, page) -> bool:
await asyncio.sleep(15 + random.random() * 10)
captcha_content = await page.content()
if "captcha" not in captcha_content.lower() and "cloudflare" not in captcha_content.lower():
return True
print("🔄 Refreshing session to bypass CAPTCHA...")
await page.reload()
await self._avoid_captcha(page)
captcha_content = await page.content()
if "captcha" not in captcha_content.lower() and "cloudflare" not in captcha_content.lower():
return True
return False