Web_scraping_project/amazon_job_scraper.py
2025-12-05 17:25:54 +01:00

236 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import random
import re
from typing import Optional, Dict
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
from fetcher import StealthyFetcher
from datetime import datetime
class AmazonJobScraper:
def __init__(
self,
engine,
db_path: str = "amazon_jobs.db",
human_speed: float = 1.0,
user_request: str = "Extract all standard job details"
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self.user_request = user_request
self.llm_agent = LLMJobRefiner()
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(1.0, 2.0) * self.human_speed)
return True
except:
return False
def _extract_location_from_keywords(self, search_keywords: str) -> str:
location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE)
return location_match.group(1).strip() if location_match else ""
def _build_amazon_search_url(self, keywords: str) -> str:
clean_keywords = re.sub(r'location:\s*[^,]+', '', keywords, flags=re.IGNORECASE).strip()
location = self._extract_location_from_keywords(keywords)
base_url = "https://www.amazon.jobs/en/search?"
params = []
if clean_keywords:
params.append(f"base_query={clean_keywords.replace(' ', '+')}")
if location:
params.append(f"loc_query={location.replace(' ', '+')}")
params.append("offset=0")
params.append("result_limit=10")
return base_url + "&".join(params)
async def _extract_page_content_for_llm(self, page) -> str:
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
return await page.content()
async def _scrape_job_links_from_page(self, page, seen_job_ids, all_job_links):
job_cards = await page.query_selector_all('div.job-tile a[href^="/en/jobs/"]')
new_jobs = 0
for card in job_cards:
href = await card.get_attribute("href")
if not href:
continue
full_url = f"https://www.amazon.jobs{href}" if href.startswith("/") else href
job_id = href.split("/")[-1] if href.split("/")[-1] else "unknown"
if job_id in seen_job_ids:
continue
title_element = await card.query_selector('h3')
title = await title_element.inner_text() if title_element else "Unknown Title"
seen_job_ids.add(job_id)
all_job_links.append((full_url, title))
new_jobs += 1
return new_jobs
async def _scroll_and_collect_jobs(self, page, seen_job_ids, all_job_links, max_pages=5):
offset = 0
jobs_per_page = 10
for page_num in range(max_pages):
print(f"📄 Fetching Amazon job page {page_num + 1} (offset: {offset})")
current_url = page.url
if "offset=" in current_url:
base_url = current_url.split("offset=")[0]
new_url = base_url + f"offset={offset}&result_limit={jobs_per_page}"
else:
new_url = current_url + f"&offset={offset}&result_limit={jobs_per_page}"
await page.goto(new_url, wait_until='domcontentloaded', timeout=120000)
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
new_jobs = await self._scrape_job_links_from_page(page, seen_job_ids, all_job_links)
print(f" Found {new_jobs} new job(s) on page {page_num + 1} (total: {len(all_job_links)})")
if new_jobs == 0 and page_num > 0:
print("🔚 No new jobs — stopping pagination.")
break
offset += jobs_per_page
async def scrape_jobs(
self,
search_keywords: Optional[str],
max_pages: int = 5,
credentials: Optional[Dict] = None # Not used for Amazon
):
search_url = self._build_amazon_search_url(search_keywords)
print(f"🔍 Amazon search URL: {search_url}")
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
temp_fetcher = StealthyFetcher(self.engine, browser, context)
# Amazon doesn't require login
print("🌐 Navigating to Amazon Jobs (no login required)...")
await page.goto(search_url, wait_until='domcontentloaded', timeout=120000)
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
# Protection check
protection_type = await temp_fetcher._detect_protection(page)
if protection_type:
print(f"🛡️ Protection detected: {protection_type}")
content_accessible = await temp_fetcher._is_content_accessible(page)
if not content_accessible:
handled = await self.engine._handle_cloudflare(page) if protection_type == "cloudflare" else False
if not handled:
await browser.close()
self.engine.report_outcome("protection_block")
return
else:
print("✅ Protection present but content accessible.")
all_job_links = []
seen_job_ids = set()
print("🔄 Collecting job links via pagination...")
await self._scroll_and_collect_jobs(page, seen_job_ids, all_job_links, max_pages=max_pages)
print(f"✅ Collected {len(all_job_links)} unique Amazon job links.")
scraped_count = 0
for idx, (job_url, title) in enumerate(all_job_links):
try:
print(f" → Opening job {idx+1}/{len(all_job_links)}: {job_url}")
fetcher = StealthyFetcher(self.engine, browser, context)
job_page = await fetcher.fetch_url(job_url, wait_for_selector="h1.job-title")
if not job_page:
print(f" ❌ Failed to fetch job page: {job_url}")
self.engine.report_outcome("fetch_failure", url=job_url)
continue
# Extract raw HTML for LLM
await self.engine._human_like_scroll(job_page)
await asyncio.sleep(2 * self.human_speed)
page_content = await self._extract_page_content_for_llm(job_page)
job_id = job_url.split("/")[-1] if job_url.split("/")[-1] else "unknown"
raw_data = {
"page_content": page_content,
"url": job_url,
"job_id": job_id,
"search_keywords": search_keywords
}
refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request)
if refined_data and refined_data.get("title", "N/A") != "N/A":
# Ensure compulsory fields
compulsory_fields = ['company_name', 'job_id', 'url']
for field in compulsory_fields:
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]:
if field == 'job_id':
refined_data[field] = job_id
elif field == 'url':
refined_data[field] = job_url
elif field == 'company_name':
refined_data[field] = "Amazon"
refined_data['scraped_at'] = datetime.now().isoformat()
refined_data['category'] = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip()
await self.llm_agent.save_job_data(refined_data, search_keywords)
scraped_count += 1
print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...")
self.engine.report_outcome("success", url=job_url)
else:
print(f" 🟡 LLM could not extract valid data from: {job_url}")
self.engine.report_outcome("llm_failure", url=job_url)
await job_page.close()
except Exception as e:
print(f" ⚠️ Failed on job {idx+1}: {str(e)[:100]}")
if 'job_page' in locals() and job_page:
await job_page.close()
continue
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Processed {scraped_count} Amazon jobs for '{search_keywords}'.")
else:
self.engine.report_outcome("no_jobs")
print("⚠️ No Amazon jobs processed successfully.")