Enhance AmazonJobScraper to support flexible location matching and extract posted dates; refine LLMJobRefiner prompts for better data extraction.

This commit is contained in:
Ofure Ikheloa 2025-12-09 12:00:57 +01:00
parent e216db35f9
commit 2d22fbdb92
2 changed files with 186 additions and 179 deletions

View File

@ -1,4 +1,3 @@
"Specifically for scraping job postings from Amazon Jobs."
import asyncio
import random
from typing import Optional, Dict
@ -7,7 +6,7 @@ from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
import re
from fetcher import StealthyFetcher
from datetime import datetime
from datetime import datetime, timedelta
import json
import redis
@ -28,8 +27,28 @@ class AmazonJobScraper:
self.llm_agent = LLMJobRefiner()
self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Country alias map for flexible location matching
self.country_aliases = {
"united states": ["united states", "usa", "u.s.a", "u.s.", "us", "america", ", us", ", usa"],
"united kingdom": ["united kingdom", "uk", "great britain", "england", "gb", ", uk", ", gb"],
"canada": ["canada", "ca", ", ca"],
"india": ["india", "in", ", in"],
"germany": ["germany", "de", ", de"],
"france": ["france", "fr", ", fr"],
"australia": ["australia", "au", ", au"],
# Add more as needed
}
def _init_db(self):
pass # Handled by LLMJobRefiner
pass
async def _safe_inner_text(self, element):
if not element:
return "Unknown"
try:
return await element.text_content()
except:
return "Unknown"
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
@ -45,8 +64,6 @@ class AmazonJobScraper:
return False
async def _login(self, page, credentials: Dict) -> bool:
# Amazon job pages do NOT require login.
# Skip login unless we're scraping internal dashboards (not needed here).
return True
async def _extract_page_content_for_llm(self, page) -> str:
@ -55,100 +72,138 @@ class AmazonJobScraper:
await asyncio.sleep(2 * self.human_speed)
return await page.content()
def _calculate_keyword_match(self, title: str, keywords: str) -> float:
if not title or not keywords:
return 0.0
title_lower = title.lower()
keyword_list = [kw.strip().lower() for kw in keywords.split()]
matches = sum(1 for kw in keyword_list if kw in title_lower)
return matches / len(keyword_list) if keyword_list else 0.0
def _extract_location_from_keywords(self, search_keywords: str) -> str:
def _extract_keywords_and_location(self, search_keywords: str):
location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE)
return location_match.group(1).strip().lower() if location_match else ""
location = location_match.group(1).strip() if location_match else ""
clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip()
return clean_keywords, location
def _normalize_text(self, text: str) -> str:
return re.sub(r'[^a-z0-9\s]', ' ', text.lower()).strip()
def _location_matches(self, job_location_text: str, target_location: str) -> bool:
if not target_location:
return True
target = target_location.lower().strip()
job_text = job_location_text.lower()
# Direct substring match (e.g., "Berlin" in "Berlin, Germany")
if target in job_text:
return True
# Check country aliases
for canonical, aliases in self.country_aliases.items():
if target in canonical or any(target == alias for alias in aliases if len(alias) <= 3):
return any(alias in job_text for alias in aliases)
return False
def _parse_posted_date_from_card_text(self, card_text: str) -> str:
date_match = re.search(r'Posted\s+([A-Za-z]+\s+\d{1,2},\s+\d{4})', card_text)
if date_match:
try:
dt = datetime.strptime(date_match.group(1), "%B %d, %Y")
return dt.strftime("%m/%d/%y")
except ValueError:
pass
days_match = re.search(r'Posted\s+(\d+)\s+day[s]?\s+ago', card_text, re.IGNORECASE)
if days_match:
days = int(days_match.group(1))
dt = datetime.now() - timedelta(days=days)
return dt.strftime("%m/%d/%y")
return datetime.now().strftime("%m/%d/%y")
async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links):
current_links = await page.query_selector_all("a[href*='/jobs/']")
await asyncio.sleep(1.5 * self.human_speed)
job_cards = await page.query_selector_all("div[data-job-id]")
new_jobs = 0
location_from_keywords = self._extract_location_from_keywords(search_keywords)
for link in current_links:
href = await link.get_attribute("href")
if not href or "page=" in href or "search?" in href:
clean_kw, location_kw = self._extract_keywords_and_location(search_keywords)
keyword_terms = [term.lower().strip() for term in clean_kw.split() if term.strip()]
for card in job_cards:
job_id = await card.get_attribute("data-job-id")
if not job_id or not job_id.isdigit() or job_id in seen_job_ids:
continue
full_url = href if href.startswith("http") else f"https://www.amazon.jobs{href}"
job_id = href.strip("/").split("/")[-1] if href else "unknown"
if job_id and job_id not in seen_job_ids:
title_element = await link.query_selector("h3") or await link.query_selector(".job-title")
title = await title_element.inner_text() if title_element else "Unknown Title"
match_percentage = self._calculate_keyword_match(title, search_keywords)
location_match = True
if location_from_keywords:
location_element = await link.query_selector(".location-and-id")
if location_element:
location_text = await location_element.inner_text()
location_match = location_from_keywords in location_text.lower()
if match_percentage >= 0.7 and location_match:
seen_job_ids.add(job_id)
all_job_links.append((href, title))
new_jobs += 1
elif match_percentage < 0.7:
print(f" ⚠️ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})")
elif not location_match:
print(f" ⚠️ Skipping job due to location mismatch: {title[:50]}... (expected: {location_from_keywords})")
else:
link = await card.query_selector("a[href*='/jobs/']")
if not link:
continue
href = await link.get_attribute("href")
if not href or any(x in href for x in ["search?", "locations", "teams", "page=", "my.", "/account/"]):
continue
card_text = await self._safe_inner_text(card)
normalized_card = self._normalize_text(card_text)
# ✅ Check: ALL keyword terms must appear in card
keywords_match = all(term in normalized_card for term in keyword_terms) if keyword_terms else True
# ✅ Check location separately with alias support
location_match = True
if location_kw:
loc_el = await card.query_selector(".location-and-id span")
job_loc = (await self._safe_inner_text(loc_el)).strip() if loc_el else ""
location_match = self._location_matches(job_loc, location_kw)
if keywords_match and location_match:
title_span = await card.query_selector("h2.job-title span, h2 span")
title = (await self._safe_inner_text(title_span)).strip() if title_span else "Unknown"
posted_date = self._parse_posted_date_from_card_text(card_text)
seen_job_ids.add(job_id)
all_job_links.append((href, "Unknown Title"))
all_job_links.append((href, title, posted_date))
new_jobs += 1
print(f" ✅ Accepted: {title} (posted: {posted_date})")
else:
reasons = []
if not keywords_match:
reasons.append("keyword mismatch")
if not location_match:
reasons.append("location mismatch")
print(f" ⚠️ Skipping: {'; '.join(reasons)}")
return new_jobs
async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links):
current_page = 1
while current_page <= 10: # Amazon limits to ~10 pages publicly
print(f"📄 Processing page {current_page}")
new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})")
current_page_num = 1
max_pages = 400
next_btn = await page.query_selector("a[aria-label='Next page']")
if next_btn:
next_url = await next_btn.get_attribute("href")
if next_url:
full_next_url = next_url if next_url.startswith("http") else f"https://www.amazon.jobs{next_url}"
print(f" ➡️ Navigating to next page: {full_next_url}")
await page.goto(full_next_url, timeout=120000)
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
current_page += 1
while current_page_num <= max_pages:
print(f"📄 Processing page {current_page_num}")
await asyncio.sleep(1.5 * self.human_speed)
new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {new_jobs} new job(s) (total: {len(all_job_links)})")
# Scroll to bottom to trigger lazy-loaded pagination (if any)
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(2 * self.human_speed)
# Look for ANY link containing 'page=N'
next_page_num = current_page_num + 1
next_selector = f"a[href*='page={next_page_num}']"
next_link = await page.query_selector(next_selector)
if next_link:
href = await next_link.get_attribute("href")
if href:
next_url = "https://www.amazon.jobs" + href if href.startswith("/") else href
print(f" ➡️ Going to page {next_page_num}: {next_url}")
await page.goto(next_url, timeout=120000)
try:
await page.wait_for_selector("div[data-job-id]", timeout=30000)
except PlaywrightTimeoutError:
print(" ⚠️ No jobs loaded on next page.")
break
current_page_num = next_page_num
else:
break
else:
print("🔚 No 'Next' button found — stopping pagination.")
print(" 🔚 No next page link found.")
break
async def _extract_job_posted_date(self, page) -> str:
try:
# Amazon often includes "Posted X days ago" in job description
content = await page.content()
match = re.search(r'Posted\s+(\d+)\s+day[s]?\s+ago', content, re.IGNORECASE)
if match:
days_ago = int(match.group(1))
posted_date = datetime.now() - timedelta(days=days_ago)
return posted_date.strftime("%m/%d/%y")
# Fallback: check for explicit date in page (rare)
date_match = re.search(r'(\d{1,2})/(\d{1,2})/(\d{4})', content)
if date_match:
month, day, year = date_match.groups()
return f"{month.zfill(2)}/{day.zfill(2)}/{year[-2:]}"
# Default to today
return datetime.now().strftime("%m/%d/%y")
except Exception as e:
print(f" ⚠️ Error extracting Amazon posted date: {str(e)}")
return datetime.now().strftime("%m/%d/%y")
print(f"✅ Finished pagination after {current_page_num} pages.")
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
try:
@ -161,25 +216,20 @@ class AmazonJobScraper:
self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data))
print(f" 📦 Added failed job to Redis cache: {job_id} (Error: {error_type})")
except Exception as e:
print(f" ❌ Failed to add job to Redis cache: {str(e)}")
print(f" ❌ Failed to add to Redis: {str(e)}")
async def scrape_jobs(
self,
search_keywords: Optional[str],
max_pages: int = 400,
max_pages: int = 10,
credentials: Optional[Dict] = None
):
from datetime import timedelta # needed for date math
location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE)
location = location_match.group(1).strip() if location_match else ""
clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip()
encoded_keywords = clean_keywords.replace(" ", "+") # Amazon uses + for spaces
clean_kw, location_kw = self._extract_keywords_and_location(search_keywords)
encoded_keywords = clean_kw.replace(" ", "+")
# ✅ FIXED: removed extra spaces
search_url = f"https://www.amazon.jobs/en/search?base_query={encoded_keywords}"
if location:
# Amazon uses location filter via `loc_query`
search_url += f"&loc_query={location.replace(' ', '+')}"
if location_kw:
search_url += f"&loc_query={location_kw.replace(' ', '+')}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
@ -204,14 +254,11 @@ class AmazonJobScraper:
temp_fetcher = StealthyFetcher(self.engine, browser, context)
print("✅ Bypassing login (Amazon jobs are public)...")
login_successful = True
await page.wait_for_load_state("load", timeout=120000)
# Protection check (same as LinkedIn logic)
protection_type = await temp_fetcher._detect_protection(page)
if protection_type:
print(f"🛡️ Protection detected on initial page: {protection_type}")
print(f"🛡️ Protection detected: {protection_type}")
content_accessible = await temp_fetcher._is_content_accessible(page)
if not content_accessible:
handled = await self.engine._handle_cloudflare(page) if protection_type == "cloudflare" else False
@ -220,25 +267,18 @@ class AmazonJobScraper:
self.engine.report_outcome("protection_block")
return
else:
print("✅ Protection present but content accessible — proceeding.")
print("✅ Protection present but content accessible.")
print(f"🔍 Searching Amazon for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=120000)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
await page.goto(search_url, timeout=120000)
# Protection check on search page
protection_type = await temp_fetcher._detect_protection(page)
if protection_type:
print(f"🛡️ Protection detected on search page: {protection_type}")
content_accessible = await temp_fetcher._is_content_accessible(page)
if not content_accessible:
handled = await self.engine._handle_cloudflare(page) if protection_type == "cloudflare" else False
if not handled:
await browser.close()
self.engine.report_outcome("protection_block")
return
else:
print("✅ Protection present but content accessible — proceeding.")
try:
await page.wait_for_selector("div[data-job-id]", timeout=40000)
print("✅ Job listings detected.")
except PlaywrightTimeoutError:
print("❌ No job cards found.")
await browser.close()
return
all_job_links = []
seen_job_ids = set()
@ -247,67 +287,28 @@ class AmazonJobScraper:
initial_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {initial_jobs} initial job(s) (total: {len(all_job_links)})")
# Amazon uses pagination (not infinite scroll)
await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links)
print(f"✅ Collected {len(all_job_links)} unique job links.")
print(f"✅ Collected {len(all_job_links)} unique job listings.")
scraped_count = 0
for idx, (href, title) in enumerate(all_job_links):
for idx, (href, title, posted_date) in enumerate(all_job_links):
try:
# ✅ FIXED: removed extra spaces
full_url = href if href.startswith("http") else f"https://www.amazon.jobs{href}"
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url} (posted: {posted_date})")
fetcher = StealthyFetcher(self.engine, browser, context)
job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1[data-testid='job-title']")
if not job_page:
print(f" ❌ Failed to fetch job page {full_url} after retries.")
job_id = href.strip("/").split("/")[-1] if href else "unknown"
await self._add_job_to_redis_cache(full_url, job_id, "fetch_failure")
self.engine.report_outcome("fetch_failure", url=full_url)
continue
posted_date = await self._extract_job_posted_date(job_page)
print(f" 📅 Posted date extracted: {posted_date}")
apply_btn = await job_page.query_selector("a:has-text('Apply now'), button:has-text('Apply now')")
final_url = full_url
external_url = None
page_content = None
if apply_btn:
apply_href = await apply_btn.get_attribute("href")
if apply_href and apply_href.startswith("http"):
print(" 🌐 Detected external apply URL — capturing directly.")
external_url = apply_href
final_url = external_url
# We won't navigate; just pass Amazon job page to LLM
page_content = await self._extract_page_content_for_llm(job_page)
else:
print(" → Clicking 'Apply now' (may open new tab)...")
page_waiter = asyncio.create_task(context.wait_for_event("page"))
await self._human_click(job_page, apply_btn, wait_after=False)
external_page = None
try:
external_page = await asyncio.wait_for(page_waiter, timeout=5.0)
print(" 🌐 External job site opened in new tab.")
await external_page.wait_for_load_state("load", timeout=120000)
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(external_page)
external_url = external_page.url
final_url = external_url
page_content = await self._extract_page_content_for_llm(external_page)
if not external_page.is_closed():
await external_page.close()
except asyncio.TimeoutError:
print(" 🖥️ No external tab — using Amazon job page.")
page_content = await self._extract_page_content_for_llm(job_page)
else:
print(" ⚠️ No 'Apply now' button — scraping job page directly.")
page_content = await self._extract_page_content_for_llm(job_page)
page_content = await self._extract_page_content_for_llm(job_page)
job_id = href.strip("/").split("/")[-1] if href else "unknown"
raw_data = {
@ -332,22 +333,22 @@ class AmazonJobScraper:
refined_data[field] = "Amazon"
refined_data['scraped_at'] = datetime.now().isoformat()
refined_data['category'] = clean_keywords
refined_data['category'] = clean_kw
refined_data['posted_date'] = posted_date
await self.llm_agent.save_job_data(refined_data, search_keywords)
scraped_count += 1
print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...")
print(f" ✅ Scraped: {refined_data['title'][:50]}...")
self.engine.report_outcome("success", url=raw_data["url"])
else:
print(f" 🟡 Could not extract meaningful data from: {final_url}")
await self._add_job_to_redis_cache(final_url, job_id, "llm_failure")
print(f" 🟡 LLM failed to refine: {full_url}")
await self._add_job_to_redis_cache(full_url, job_id, "llm_failure")
self.engine.report_outcome("llm_failure", url=raw_data["url"])
await job_page.close()
except Exception as e:
error_msg = str(e)[:100]
print(f" ⚠️ Failed on job {idx+1}: {error_msg}")
print(f" ⚠️ Exception on job {idx+1}: {error_msg}")
job_id = (href.strip("/").split("/")[-1] if href else "unknown") if 'href' in locals() else "unknown"
job_url = full_url if 'full_url' in locals() else "unknown"
await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}")
@ -356,15 +357,15 @@ class AmazonJobScraper:
continue
finally:
print(" ↩️ Returning to Amazon search results...")
await page.goto(search_url, timeout=120000)
await asyncio.sleep(4 * self.human_speed)
if not page.is_closed():
await page.goto(search_url, timeout=120000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.")
print(f"✅ Completed! Processed {scraped_count} jobs.")
else:
self.engine.report_outcome("captcha")
print("⚠️ No jobs processed successfully.")

View File

@ -146,15 +146,22 @@ class LLMJobRefiner:
posted_date = raw_data.get('posted_date', datetime.now().strftime("%m/%d/%y"))
prompt = f"""
You are a job posting data extractor.
EXTRACT EXACT TEXT - DO NOT SUMMARIZE, PARAPHRASE, OR INVENT.
For these critical fields, follow these rules:
- description: Extract ALL job description text. If ANY job details exist (duties, responsibilities, overview), include them. Only use "Not provided" if absolutely no description exists.
- requirements: Extract ALL requirements text. If ANY requirements exist (skills, experience, education needed), include them. Only use "Not provided" if none exist.
- qualifications: Extract ALL qualifications text. If ANY qualifications exist, include them. Only use "Not provided" if none exist.
You are an expert job posting parser. Extract information EXACTLY as it appears in the text. DO NOT summarize, paraphrase, or invent.
CRITICAL INSTRUCTIONS:
- The job is from AMAZON. Look for these exact section headings:
- "## Basic Qualifications" extract as "qualifications"
- "## Preferred Qualifications" include this in "qualifications" too
- "## Description" or "About the Role" or "Key job responsibilities" extract as "description"
- "You Will:" or "Job responsibilities" include in "description"
- Requirements are often embedded in qualifications or description
FIELD RULES:
- description: MUST include ALL role details, responsibilities, and overview. Never "Not provided" if any job description exists.
- qualifications: MUST include ALL content from "Basic Qualifications" and "Preferred Qualifications" sections. Combine them.
- requirements: If no separate "requirements" section, extract required skills/experience from qualifications/description.
- For Amazon jobs, company_name = "Amazon".
REQUIRED FIELDS (must have valid values, never "N/A"):
- title, company_name, job_id, url
@ -170,7 +177,6 @@ class LLMJobRefiner:
"company_name": "...",
"location": "...",
"description": "...",
"requirements": "...",
"qualifications": "...",
"salary_range": "...",
"nature_of_work": "...",
@ -196,7 +202,7 @@ class LLMJobRefiner:
return None
# CRITICAL: Validate content fields - check if they SHOULD exist
content_fields = ['description', 'requirements', 'qualifications']
content_fields = ['description', 'qualifications']
cleaned_original = cleaned_content.lower()
# Simple heuristic: if page contains job-related keywords, content fields should NOT be "Not provided"