Web_scraping_project/scraping_engine.py

378 lines
16 KiB
Python

# scraping_engine.py
import asyncio
import hashlib
import random
import os
import json
from playwright.async_api import Page
from typing import List, Optional, Dict, Any
from browserforge.fingerprints import FingerprintGenerator
from dotenv import load_dotenv
from config import load_spoof_config
import time
# Load environment variables
load_dotenv()
class FingerprintScrapingEngine:
def __init__(
self,
seed: str = "default_seed",
num_variations: int = 10,
target_os: str = "windows",
db_path: str = "jobs.db",
markdown_path: str = "scraped_jobs.md",
proxies: List[str] = None,
login_credentials: Optional[Dict[str, str]] = None
):
if target_os not in ['windows', 'macos']:
raise ValueError("operating_system must be 'windows' or 'macos'")
if login_credentials is None:
username = os.getenv("SCRAPING_USERNAME")
password = os.getenv("SCRAPING_PASSWORD")
if username and password:
login_credentials = {
"username": username, "password": password}
self.seed = seed
self.os = target_os
self.markdown_path = markdown_path
self.proxies = proxies or []
self.login_credentials = login_credentials
self.fingerprint_generator = FingerprintGenerator(
browser=('chrome',),
os=(self.os,)
)
self.num_variations = num_variations
# Load spoof config
spoof_config = load_spoof_config()
self.common_renderers = spoof_config["renderers"]
self.common_vendors = spoof_config["vendors"]
self.feedback_file = f"feedback_{seed}.json"
# Feedback system
self.feedback = self._load_feedback()
# ← NEW: Session persistence paths
self.session_dir = "browser_sessions"
os.makedirs(self.session_dir, exist_ok=True)
self.session_path = os.path.join(
self.session_dir, f"{seed}_session.json")
self.optimization_params = {
"base_delay": 2.0,
"max_concurrent_requests": 4,
"request_timeout": 120000,
"retry_attempts": 3,
"captcha_handling_strategy": "avoid", # or "solve_fallback"
"cloudflare_wait_strategy": "smart_wait", # or "aggressive_reload"
}
self._update_params_from_feedback()
def _load_feedback(self) -> Dict[str, Any]:
if os.path.exists(self.feedback_file):
try:
with open(self.feedback_file, "r") as f:
data = json.load(f)
data.setdefault("success_rate", 1.0)
data.setdefault("captcha_count", 0)
data.setdefault("cloudflare_count", 0)
data.setdefault("avg_response_time", 10.0) # New metric
data.setdefault("failed_domains", {}) # New metrice
return data
except:
pass
return {"success_rate": 1.0, "captcha_count": 0, "cloudflare_count": 0}
def save_feedback(self):
with open(self.feedback_file, "w") as f:
json.dump(self.feedback, f)
def report_outcome(self, outcome: str, url: Optional[str] = None, response_time: Optional[float] = None):
if outcome == "success":
self.feedback["success_rate"] = min(
1.0, self.feedback["success_rate"] + 0.05) # Smaller increment
else:
self.feedback["success_rate"] = max(
0.05, self.feedback["success_rate"] - 0.1) # Smaller decrement
if outcome == "captcha":
self.feedback["captcha_count"] += 1
# Adapt strategy if many captchas
self.optimization_params["captcha_handling_strategy"] = "solve_fallback"
elif outcome == "cloudflare":
self.feedback["cloudflare_count"] += 1
# Adjust wait strategy based on frequency
if self.feedback["cloudflare_count"] > 5:
self.optimization_params["cloudflare_wait_strategy"] = "aggressive_reload"
# Track domain-specific failures
if url and outcome != "success":
domain = url.split("//")[1].split("/")[0]
if domain not in self.feedback["failed_domains"]:
self.feedback["failed_domains"][domain] = 0
self.feedback["failed_domains"][domain] += 1
# Update average response time
if response_time:
prev_avg = self.feedback.get("avg_response_time", 10.0)
# Simple moving average
self.feedback["avg_response_time"] = (
prev_avg * 0.9) + (response_time * 0.1)
self.save_feedback()
self._update_params_from_feedback() # Update params based on new feedback
def _update_params_from_feedback(self):
"""Adjust optimization parameters based on feedback."""
sr = self.feedback["success_rate"]
cc = self.feedback["captcha_count"]
cf = self.feedback["cloudflare_count"]
avg_rt = self.feedback.get("avg_response_time", 10.0)
# Adjust base delay based on success rate and avg response time
if sr < 0.6:
self.optimization_params["base_delay"] = max(
5.0, self.optimization_params["base_delay"] * 1.2)
elif sr > 0.8:
self.optimization_params["base_delay"] = min(
3.0, self.optimization_params["base_delay"] * 0.9)
# Reduce concurrency if many captchas/cloudflares
if cc > 3 or cf > 3:
self.optimization_params["max_concurrent_requests"] = max(
2, self.optimization_params["max_concurrent_requests"] - 2)
else:
# Reset to default
self.optimization_params["max_concurrent_requests"] = 4
# Increase timeout if avg response time is high
if avg_rt > 20:
self.optimization_params["request_timeout"] = 150000 # 90 seconds
print(f"Optimization Params Updated: {self.optimization_params}")
# ← NEW: Save browser context (cookies + localStorage)
async def save_session(self, context):
"""Save authenticated session to disk tied to seed"""
try:
storage = await context.storage_state()
with open(self.session_path, "w", encoding="utf-8") as f:
json.dump(storage, f, indent=2)
print(f"💾 Session saved for seed '{self.seed}'")
except Exception as e:
print(f"⚠️ Failed to save session: {e}")
# ← NEW: Load session if exists
async def load_session(self, context):
"""Restore session if available"""
if os.path.exists(self.session_path):
try:
with open(self.session_path, "r", encoding="utf-8") as f:
storage = json.load(f)
await context.add_cookies(storage.get("cookies", []))
# Note: Playwright doesn't support localStorage restore via API directly,
# but cookies are the main auth carrier (e.g., li_at on LinkedIn)
print(f"🔁 Reusing session for seed '{self.seed}'")
return True
except Exception as e:
print(f"⚠️ Failed to load session: {e}")
# Optionally delete corrupted session
if os.path.exists(self.session_path):
os.remove(self.session_path)
return False
def _select_profile(self):
seed_hash = int(hashlib.sha256(self.seed.encode()).hexdigest(), 16)
random.seed(seed_hash)
profile = self.fingerprint_generator.generate()
concurrency_options = [4, 8, 12, 16]
memory_options = [4, 8]
if self.feedback["success_rate"] < 0.5:
concurrency_options = [8, 4]
memory_options = [8]
profile.navigator.hardwareConcurrency = random.choice(
concurrency_options)
profile.navigator.deviceMemory = random.choice(memory_options)
return profile
def _get_spoof_script(self, renderer: str, vendor: str):
seed_hash = int(hashlib.sha256(self.seed.encode()).hexdigest(), 16)
if self.feedback["captcha_count"] > 2:
noise_factor = seed_hash % 100000000 + 100000000
else:
noise_factor = seed_hash % 100000000
return f"""
(function() {{
const originalGetContext = HTMLCanvasElement.prototype.getContext;
HTMLCanvasElement.prototype.getContext = function(type, attributes) {{
if (type === 'webgl' || type === 'experimental-webgl') {{
const ctx = originalGetContext.call(this, type, attributes);
if (ctx) {{
const originalGetParameter = ctx.getParameter.bind(ctx);
ctx.getParameter = function(pname) {{
if (pname === 0x9245) {{ return '{vendor}'; }}
if (pname === 0x9246) {{ return '{renderer}'; }}
return originalGetParameter(pname);
}};
const originalGetShaderPrecisionFormat = ctx.getShaderPrecisionFormat.bind(ctx);
ctx.getShaderPrecisionFormat = function(shadertype, precisiontype) {{
const format = originalGetShaderPrecisionFormat(shadertype, precisiontype);
if (precisiontype === ctx.HIGH_FLOAT) {{
format.rangeMin = 127; format.rangeMax = 127; format.precision = 23;
}}
if (precisiontype === ctx.MEDIUM_FLOAT) {{
format.rangeMin = 62; format.rangeMax = 62; format.precision = 14;
}}
return format;
}};
}}
return ctx;
}}
return originalGetContext.call(this, type, attributes);
}};
const originalToDataURL = HTMLCanvasElement.prototype.toDataURL;
HTMLCanvasElement.prototype.toDataURL = function(type, encoderOptions) {{
const ctx = this.getContext('2d');
if (ctx) {{
const imageData = ctx.getImageData(0, 0, this.width, this.height);
for (let i = 0; i < imageData.data.length; i += 4) {{
const noise = (Math.sin({noise_factor} + i) * 0.5 + 0.5) * 2 - 1;
imageData.data[i] = Math.min(255, Math.max(0, imageData.data[i] + noise));
imageData.data[i+1] = Math.min(255, Math.max(0, imageData.data[i+1] + noise));
imageData.data[i+2] = Math.min(255, Math.max(0, imageData.data[i+2] + noise));
}}
ctx.putImageData(imageData, 0, 0);
}}
return originalToDataURL.call(this, type, encoderOptions);
}};
const originalAudioContext = window.AudioContext || window.webkitAudioContext;
if (originalAudioContext) {{
const AudioContextOverride = function() {{
const ctx = new originalAudioContext();
const originalCreateAnalyser = ctx.createAnalyser;
ctx.createAnalyser = function() {{
const analyser = originalCreateAnalyser.call(ctx);
analyser.getByteFrequencyData = function(array) {{
for (let i = 0; i < array.length; i++) {{
array[i] = Math.floor(Math.sin(i * 0.1 + {seed_hash % 1000}) * 128 + 128);
}}
}};
analyser.getFloatFrequencyData = function(array) {{
for (let i = 0; i < array.length; i++) {{
array[i] = Math.sin(i * 0.1 + {seed_hash % 1000}) * 100 - 100;
}}
}};
return analyser;
}};
return ctx;
}};
window.AudioContext = AudioContextOverride;
window.webkitAudioContext = AudioContextOverride;
}}
const originalQueryFonts = window.queryLocalFonts;
if (originalQueryFonts) {{
window.queryLocalFonts = async function() {{
return [
{{family: "Arial", style: "normal", weight: "400"}},
{{family: "Times New Roman", style: "normal", weight: "400"}},
{{family: "Courier New", style: "normal", weight: "400"}}
];
}};
}}
// Remove bot indicators
delete navigator.__proto__.webdriver;
window.chrome = {{ runtime: {{}} }};
}})();
"""
async def _human_like_scroll(self, page):
scroll_height = await page.evaluate("document.body.scrollHeight")
current_scroll = 0
while current_scroll < scroll_height:
scroll_step = random.randint(50, 300)
await page.evaluate(f"window.scrollBy(0, {scroll_step})")
await asyncio.sleep(random.uniform(0.1, 0.8))
current_scroll += scroll_step
if random.random() < 0.1:
await asyncio.sleep(random.uniform(1, 3))
async def _simulate_human_interaction(self, page):
try:
elements = await page.query_selector_all("a, button, input")
if elements and random.random() < 0.3:
target = random.choice(elements)
await target.hover()
await asyncio.sleep(random.uniform(0.2, 1.0))
except:
pass
async def _avoid_captcha(self, page) -> bool:
await asyncio.sleep(2 + random.random() * 3)
await self._human_like_scroll(page)
await self._simulate_human_interaction(page)
await asyncio.sleep(3 + random.random() * 2)
return True
async def _solve_captcha_fallback(self, page) -> bool:
await asyncio.sleep(15 + random.random() * 10)
captcha_content = await page.content()
if "captcha" not in captcha_content.lower() and "cloudflare" not in captcha_content.lower():
return True
print("🔄 Refreshing session to bypass CAPTCHA...")
await page.reload()
await self._avoid_captcha(page)
captcha_content = await page.content()
if "captcha" not in captcha_content.lower() and "cloudflare" not in captcha_content.lower():
return True
return False
async def _detect_cloudflare(self, page: Page) -> bool:
"""Detect Cloudflare challenges."""
content = await page.content()
return (
"#cf-chl" in content
or "checking your browser" in content.lower()
or "just a moment" in content.lower()
or "turnstile" in content.lower() # Check for Cloudflare Turnstile
)
async def _handle_cloudflare(self, page: Page) -> bool:
"""
Handle Cloudflare challenges, including Turnstile if present.
This is a simplified approach; real-world handling might require more sophisticated logic or external solvers.
"""
max_wait_time = 60 # Total time to wait for Cloudflare to resolve
start_time = time.time()
while time.time() - start_time < max_wait_time:
if not await self._detect_cloudflare(page):
print("Cloudflare challenge resolved.")
return True
print("Cloudflare active, waiting...")
# Simulate more human-like behavior while waiting
await self._simulate_human_interaction(page)
# Wait for a random period, increasing slightly each time
wait_time = min(10, 2 + random.uniform(1, 3) +
(time.time() - start_time) * 0.1)
await asyncio.sleep(wait_time)
# Reload occasionally to trigger potential client-side checks
if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2:
print("Reloading page during Cloudflare wait...")
await page.reload(wait_until='load', timeout=80000)
print("Timeout waiting for Cloudflare resolution.")
return False