Web_scraping_project/amazon_job_scraper.py

370 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"Specifically for scraping job postings from Amazon Jobs."
import asyncio
import random
from typing import Optional, Dict
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
import re
from fetcher import StealthyFetcher
from datetime import datetime
import json
import redis
class AmazonJobScraper:
def __init__(
self,
engine,
db_path: str = "amazon_jobs.db",
human_speed: float = 1.0,
user_request: str = "Extract all standard job details"
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self.user_request = user_request
self._init_db()
self.llm_agent = LLMJobRefiner()
self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def _init_db(self):
pass # Handled by LLMJobRefiner
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
return True
except:
return False
async def _login(self, page, credentials: Dict) -> bool:
# Amazon job pages do NOT require login.
# Skip login unless we're scraping internal dashboards (not needed here).
return True
async def _extract_page_content_for_llm(self, page) -> str:
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
return await page.content()
def _calculate_keyword_match(self, title: str, keywords: str) -> float:
if not title or not keywords:
return 0.0
title_lower = title.lower()
keyword_list = [kw.strip().lower() for kw in keywords.split()]
matches = sum(1 for kw in keyword_list if kw in title_lower)
return matches / len(keyword_list) if keyword_list else 0.0
def _extract_location_from_keywords(self, search_keywords: str) -> str:
location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE)
return location_match.group(1).strip().lower() if location_match else ""
async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links):
current_links = await page.query_selector_all("a[href*='/jobs/']")
new_jobs = 0
location_from_keywords = self._extract_location_from_keywords(search_keywords)
for link in current_links:
href = await link.get_attribute("href")
if not href or "page=" in href or "search?" in href:
continue
full_url = href if href.startswith("http") else f"https://www.amazon.jobs{href}"
job_id = href.strip("/").split("/")[-1] if href else "unknown"
if job_id and job_id not in seen_job_ids:
title_element = await link.query_selector("h3") or await link.query_selector(".job-title")
title = await title_element.inner_text() if title_element else "Unknown Title"
match_percentage = self._calculate_keyword_match(title, search_keywords)
location_match = True
if location_from_keywords:
location_element = await link.query_selector(".location-and-id")
if location_element:
location_text = await location_element.inner_text()
location_match = location_from_keywords in location_text.lower()
if match_percentage >= 0.7 and location_match:
seen_job_ids.add(job_id)
all_job_links.append((href, title))
new_jobs += 1
elif match_percentage < 0.7:
print(f" ⚠️ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})")
elif not location_match:
print(f" ⚠️ Skipping job due to location mismatch: {title[:50]}... (expected: {location_from_keywords})")
else:
seen_job_ids.add(job_id)
all_job_links.append((href, "Unknown Title"))
new_jobs += 1
return new_jobs
async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links):
current_page = 1
while current_page <= 10: # Amazon limits to ~10 pages publicly
print(f"📄 Processing page {current_page}")
new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})")
next_btn = await page.query_selector("a[aria-label='Next page']")
if next_btn:
next_url = await next_btn.get_attribute("href")
if next_url:
full_next_url = next_url if next_url.startswith("http") else f"https://www.amazon.jobs{next_url}"
print(f" ➡️ Navigating to next page: {full_next_url}")
await page.goto(full_next_url, timeout=120000)
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
current_page += 1
else:
break
else:
print("🔚 No 'Next' button found — stopping pagination.")
break
async def _extract_job_posted_date(self, page) -> str:
try:
# Amazon often includes "Posted X days ago" in job description
content = await page.content()
match = re.search(r'Posted\s+(\d+)\s+day[s]?\s+ago', content, re.IGNORECASE)
if match:
days_ago = int(match.group(1))
posted_date = datetime.now() - timedelta(days=days_ago)
return posted_date.strftime("%m/%d/%y")
# Fallback: check for explicit date in page (rare)
date_match = re.search(r'(\d{1,2})/(\d{1,2})/(\d{4})', content)
if date_match:
month, day, year = date_match.groups()
return f"{month.zfill(2)}/{day.zfill(2)}/{year[-2:]}"
# Default to today
return datetime.now().strftime("%m/%d/%y")
except Exception as e:
print(f" ⚠️ Error extracting Amazon posted date: {str(e)}")
return datetime.now().strftime("%m/%d/%y")
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
try:
job_data = {
"job_url": job_url,
"job_id": job_id,
"error_type": error_type,
"timestamp": datetime.now().isoformat()
}
self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data))
print(f" 📦 Added failed job to Redis cache: {job_id} (Error: {error_type})")
except Exception as e:
print(f" ❌ Failed to add job to Redis cache: {str(e)}")
async def scrape_jobs(
self,
search_keywords: Optional[str],
max_pages: int = 1,
credentials: Optional[Dict] = None
):
from datetime import timedelta # needed for date math
location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE)
location = location_match.group(1).strip() if location_match else ""
clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip()
encoded_keywords = clean_keywords.replace(" ", "+") # Amazon uses + for spaces
search_url = f"https://www.amazon.jobs/en/search?base_query={encoded_keywords}"
if location:
# Amazon uses location filter via `loc_query`
search_url += f"&loc_query={location.replace(' ', '+')}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
temp_fetcher = StealthyFetcher(self.engine, browser, context)
print("✅ Bypassing login (Amazon jobs are public)...")
login_successful = True
await page.wait_for_load_state("load", timeout=120000)
# Protection check (same as LinkedIn logic)
protection_type = await temp_fetcher._detect_protection(page)
if protection_type:
print(f"🛡️ Protection detected on initial page: {protection_type}")
content_accessible = await temp_fetcher._is_content_accessible(page)
if not content_accessible:
handled = await self.engine._handle_cloudflare(page) if protection_type == "cloudflare" else False
if not handled:
await browser.close()
self.engine.report_outcome("protection_block")
return
else:
print("✅ Protection present but content accessible — proceeding.")
print(f"🔍 Searching Amazon for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=120000)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
# Protection check on search page
protection_type = await temp_fetcher._detect_protection(page)
if protection_type:
print(f"🛡️ Protection detected on search page: {protection_type}")
content_accessible = await temp_fetcher._is_content_accessible(page)
if not content_accessible:
handled = await self.engine._handle_cloudflare(page) if protection_type == "cloudflare" else False
if not handled:
await browser.close()
self.engine.report_outcome("protection_block")
return
else:
print("✅ Protection present but content accessible — proceeding.")
all_job_links = []
seen_job_ids = set()
print("🔄 Collecting initial job links...")
initial_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {initial_jobs} initial job(s) (total: {len(all_job_links)})")
# Amazon uses pagination (not infinite scroll)
await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links)
print(f"✅ Collected {len(all_job_links)} unique job links.")
scraped_count = 0
for idx, (href, title) in enumerate(all_job_links):
try:
full_url = href if href.startswith("http") else f"https://www.amazon.jobs{href}"
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
fetcher = StealthyFetcher(self.engine, browser, context)
job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1[data-testid='job-title']")
if not job_page:
print(f" ❌ Failed to fetch job page {full_url} after retries.")
job_id = href.strip("/").split("/")[-1] if href else "unknown"
await self._add_job_to_redis_cache(full_url, job_id, "fetch_failure")
self.engine.report_outcome("fetch_failure", url=full_url)
continue
posted_date = await self._extract_job_posted_date(job_page)
print(f" 📅 Posted date extracted: {posted_date}")
apply_btn = await job_page.query_selector("a:has-text('Apply now'), button:has-text('Apply now')")
final_url = full_url
external_url = None
page_content = None
if apply_btn:
apply_href = await apply_btn.get_attribute("href")
if apply_href and apply_href.startswith("http"):
print(" 🌐 Detected external apply URL — capturing directly.")
external_url = apply_href
final_url = external_url
# We won't navigate; just pass Amazon job page to LLM
page_content = await self._extract_page_content_for_llm(job_page)
else:
print(" → Clicking 'Apply now' (may open new tab)...")
page_waiter = asyncio.create_task(context.wait_for_event("page"))
await self._human_click(job_page, apply_btn, wait_after=False)
external_page = None
try:
external_page = await asyncio.wait_for(page_waiter, timeout=5.0)
print(" 🌐 External job site opened in new tab.")
await external_page.wait_for_load_state("load", timeout=120000)
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(external_page)
external_url = external_page.url
final_url = external_url
page_content = await self._extract_page_content_for_llm(external_page)
if not external_page.is_closed():
await external_page.close()
except asyncio.TimeoutError:
print(" 🖥️ No external tab — using Amazon job page.")
page_content = await self._extract_page_content_for_llm(job_page)
else:
print(" ⚠️ No 'Apply now' button — scraping job page directly.")
page_content = await self._extract_page_content_for_llm(job_page)
job_id = href.strip("/").split("/")[-1] if href else "unknown"
raw_data = {
"page_content": page_content,
"url": final_url,
"job_id": job_id,
"search_keywords": search_keywords,
"posted_date": posted_date
}
refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request)
if refined_data and refined_data.get("title", "N/A") != "N/A":
compulsory_fields = ['company_name', 'job_id', 'url']
for field in compulsory_fields:
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]:
if field == 'job_id':
refined_data[field] = job_id
elif field == 'url':
refined_data[field] = final_url
elif field == 'company_name':
refined_data[field] = "Amazon"
refined_data['scraped_at'] = datetime.now().isoformat()
refined_data['category'] = clean_keywords
refined_data['posted_date'] = posted_date
await self.llm_agent.save_job_data(refined_data, search_keywords)
scraped_count += 1
print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...")
self.engine.report_outcome("success", url=raw_data["url"])
else:
print(f" 🟡 Could not extract meaningful data from: {final_url}")
await self._add_job_to_redis_cache(final_url, job_id, "llm_failure")
self.engine.report_outcome("llm_failure", url=raw_data["url"])
await job_page.close()
except Exception as e:
error_msg = str(e)[:100]
print(f" ⚠️ Failed on job {idx+1}: {error_msg}")
job_id = (href.strip("/").split("/")[-1] if href else "unknown") if 'href' in locals() else "unknown"
job_url = full_url if 'full_url' in locals() else "unknown"
await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}")
if 'job_page' in locals() and job_page:
await job_page.close()
continue
finally:
print(" ↩️ Returning to Amazon search results...")
await page.goto(search_url, timeout=120000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.")
else:
self.engine.report_outcome("captcha")
print("⚠️ No jobs processed successfully.")