From 2d22fbdb920463be169f3c848eb78fcd99330789 Mon Sep 17 00:00:00 2001 From: Ofure Ikheloa Date: Tue, 9 Dec 2025 12:00:57 +0100 Subject: [PATCH] Enhance AmazonJobScraper to support flexible location matching and extract posted dates; refine LLMJobRefiner prompts for better data extraction. --- amazon_job_scraper.py | 337 +++++++++++++++++++++--------------------- llm_agent.py | 28 ++-- 2 files changed, 186 insertions(+), 179 deletions(-) diff --git a/amazon_job_scraper.py b/amazon_job_scraper.py index 442975c..cd48a7e 100644 --- a/amazon_job_scraper.py +++ b/amazon_job_scraper.py @@ -1,4 +1,3 @@ -"Specifically for scraping job postings from Amazon Jobs." import asyncio import random from typing import Optional, Dict @@ -7,7 +6,7 @@ from browserforge.injectors.playwright import AsyncNewContext from llm_agent import LLMJobRefiner import re from fetcher import StealthyFetcher -from datetime import datetime +from datetime import datetime, timedelta import json import redis @@ -28,8 +27,28 @@ class AmazonJobScraper: self.llm_agent = LLMJobRefiner() self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) + # Country alias map for flexible location matching + self.country_aliases = { + "united states": ["united states", "usa", "u.s.a", "u.s.", "us", "america", ", us", ", usa"], + "united kingdom": ["united kingdom", "uk", "great britain", "england", "gb", ", uk", ", gb"], + "canada": ["canada", "ca", ", ca"], + "india": ["india", "in", ", in"], + "germany": ["germany", "de", ", de"], + "france": ["france", "fr", ", fr"], + "australia": ["australia", "au", ", au"], + # Add more as needed + } + def _init_db(self): - pass # Handled by LLMJobRefiner + pass + + async def _safe_inner_text(self, element): + if not element: + return "Unknown" + try: + return await element.text_content() + except: + return "Unknown" async def _human_click(self, page, element, wait_after: bool = True): if not element: @@ -45,8 +64,6 @@ class AmazonJobScraper: return False async def _login(self, page, credentials: Dict) -> bool: - # Amazon job pages do NOT require login. - # Skip login unless we're scraping internal dashboards (not needed here). return True async def _extract_page_content_for_llm(self, page) -> str: @@ -55,100 +72,138 @@ class AmazonJobScraper: await asyncio.sleep(2 * self.human_speed) return await page.content() - def _calculate_keyword_match(self, title: str, keywords: str) -> float: - if not title or not keywords: - return 0.0 - title_lower = title.lower() - keyword_list = [kw.strip().lower() for kw in keywords.split()] - matches = sum(1 for kw in keyword_list if kw in title_lower) - return matches / len(keyword_list) if keyword_list else 0.0 - - def _extract_location_from_keywords(self, search_keywords: str) -> str: + def _extract_keywords_and_location(self, search_keywords: str): location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) - return location_match.group(1).strip().lower() if location_match else "" + location = location_match.group(1).strip() if location_match else "" + clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip() + return clean_keywords, location + + def _normalize_text(self, text: str) -> str: + return re.sub(r'[^a-z0-9\s]', ' ', text.lower()).strip() + + def _location_matches(self, job_location_text: str, target_location: str) -> bool: + if not target_location: + return True + target = target_location.lower().strip() + job_text = job_location_text.lower() + + # Direct substring match (e.g., "Berlin" in "Berlin, Germany") + if target in job_text: + return True + + # Check country aliases + for canonical, aliases in self.country_aliases.items(): + if target in canonical or any(target == alias for alias in aliases if len(alias) <= 3): + return any(alias in job_text for alias in aliases) + + return False + + def _parse_posted_date_from_card_text(self, card_text: str) -> str: + date_match = re.search(r'Posted\s+([A-Za-z]+\s+\d{1,2},\s+\d{4})', card_text) + if date_match: + try: + dt = datetime.strptime(date_match.group(1), "%B %d, %Y") + return dt.strftime("%m/%d/%y") + except ValueError: + pass + days_match = re.search(r'Posted\s+(\d+)\s+day[s]?\s+ago', card_text, re.IGNORECASE) + if days_match: + days = int(days_match.group(1)) + dt = datetime.now() - timedelta(days=days) + return dt.strftime("%m/%d/%y") + return datetime.now().strftime("%m/%d/%y") async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links): - current_links = await page.query_selector_all("a[href*='/jobs/']") + await asyncio.sleep(1.5 * self.human_speed) + job_cards = await page.query_selector_all("div[data-job-id]") new_jobs = 0 - location_from_keywords = self._extract_location_from_keywords(search_keywords) - - for link in current_links: - href = await link.get_attribute("href") - if not href or "page=" in href or "search?" in href: + + clean_kw, location_kw = self._extract_keywords_and_location(search_keywords) + keyword_terms = [term.lower().strip() for term in clean_kw.split() if term.strip()] + + for card in job_cards: + job_id = await card.get_attribute("data-job-id") + if not job_id or not job_id.isdigit() or job_id in seen_job_ids: continue - - full_url = href if href.startswith("http") else f"https://www.amazon.jobs{href}" - job_id = href.strip("/").split("/")[-1] if href else "unknown" - - if job_id and job_id not in seen_job_ids: - title_element = await link.query_selector("h3") or await link.query_selector(".job-title") - title = await title_element.inner_text() if title_element else "Unknown Title" - - match_percentage = self._calculate_keyword_match(title, search_keywords) - location_match = True - if location_from_keywords: - location_element = await link.query_selector(".location-and-id") - if location_element: - location_text = await location_element.inner_text() - location_match = location_from_keywords in location_text.lower() - - if match_percentage >= 0.7 and location_match: - seen_job_ids.add(job_id) - all_job_links.append((href, title)) - new_jobs += 1 - elif match_percentage < 0.7: - print(f" ⚠️ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})") - elif not location_match: - print(f" ⚠️ Skipping job due to location mismatch: {title[:50]}... (expected: {location_from_keywords})") - else: + + link = await card.query_selector("a[href*='/jobs/']") + if not link: + continue + + href = await link.get_attribute("href") + if not href or any(x in href for x in ["search?", "locations", "teams", "page=", "my.", "/account/"]): + continue + + card_text = await self._safe_inner_text(card) + normalized_card = self._normalize_text(card_text) + + # ✅ Check: ALL keyword terms must appear in card + keywords_match = all(term in normalized_card for term in keyword_terms) if keyword_terms else True + + # ✅ Check location separately with alias support + location_match = True + if location_kw: + loc_el = await card.query_selector(".location-and-id span") + job_loc = (await self._safe_inner_text(loc_el)).strip() if loc_el else "" + location_match = self._location_matches(job_loc, location_kw) + + if keywords_match and location_match: + title_span = await card.query_selector("h2.job-title span, h2 span") + title = (await self._safe_inner_text(title_span)).strip() if title_span else "Unknown" + posted_date = self._parse_posted_date_from_card_text(card_text) seen_job_ids.add(job_id) - all_job_links.append((href, "Unknown Title")) + all_job_links.append((href, title, posted_date)) new_jobs += 1 + print(f" ✅ Accepted: {title} (posted: {posted_date})") + else: + reasons = [] + if not keywords_match: + reasons.append("keyword mismatch") + if not location_match: + reasons.append("location mismatch") + print(f" ⚠️ Skipping: {'; '.join(reasons)}") + return new_jobs async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links): - current_page = 1 - while current_page <= 10: # Amazon limits to ~10 pages publicly - print(f"📄 Processing page {current_page}") - new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) - print(f" ➕ Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})") + current_page_num = 1 + max_pages = 400 - next_btn = await page.query_selector("a[aria-label='Next page']") - if next_btn: - next_url = await next_btn.get_attribute("href") - if next_url: - full_next_url = next_url if next_url.startswith("http") else f"https://www.amazon.jobs{next_url}" - print(f" ➡️ Navigating to next page: {full_next_url}") - await page.goto(full_next_url, timeout=120000) - await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) - current_page += 1 + while current_page_num <= max_pages: + print(f"📄 Processing page {current_page_num}") + await asyncio.sleep(1.5 * self.human_speed) + + new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) + print(f" ➕ Found {new_jobs} new job(s) (total: {len(all_job_links)})") + + # Scroll to bottom to trigger lazy-loaded pagination (if any) + await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") + await asyncio.sleep(2 * self.human_speed) + + # Look for ANY link containing 'page=N' + next_page_num = current_page_num + 1 + next_selector = f"a[href*='page={next_page_num}']" + next_link = await page.query_selector(next_selector) + + if next_link: + href = await next_link.get_attribute("href") + if href: + next_url = "https://www.amazon.jobs" + href if href.startswith("/") else href + print(f" ➡️ Going to page {next_page_num}: {next_url}") + await page.goto(next_url, timeout=120000) + try: + await page.wait_for_selector("div[data-job-id]", timeout=30000) + except PlaywrightTimeoutError: + print(" ⚠️ No jobs loaded on next page.") + break + current_page_num = next_page_num else: break else: - print("🔚 No 'Next' button found — stopping pagination.") + print(" 🔚 No next page link found.") break - async def _extract_job_posted_date(self, page) -> str: - try: - # Amazon often includes "Posted X days ago" in job description - content = await page.content() - match = re.search(r'Posted\s+(\d+)\s+day[s]?\s+ago', content, re.IGNORECASE) - if match: - days_ago = int(match.group(1)) - posted_date = datetime.now() - timedelta(days=days_ago) - return posted_date.strftime("%m/%d/%y") - - # Fallback: check for explicit date in page (rare) - date_match = re.search(r'(\d{1,2})/(\d{1,2})/(\d{4})', content) - if date_match: - month, day, year = date_match.groups() - return f"{month.zfill(2)}/{day.zfill(2)}/{year[-2:]}" - - # Default to today - return datetime.now().strftime("%m/%d/%y") - except Exception as e: - print(f" ⚠️ Error extracting Amazon posted date: {str(e)}") - return datetime.now().strftime("%m/%d/%y") + print(f"✅ Finished pagination after {current_page_num} pages.") async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str): try: @@ -161,25 +216,20 @@ class AmazonJobScraper: self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data)) print(f" 📦 Added failed job to Redis cache: {job_id} (Error: {error_type})") except Exception as e: - print(f" ❌ Failed to add job to Redis cache: {str(e)}") + print(f" ❌ Failed to add to Redis: {str(e)}") async def scrape_jobs( self, search_keywords: Optional[str], - max_pages: int = 400, + max_pages: int = 10, credentials: Optional[Dict] = None ): - from datetime import timedelta # needed for date math - - location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) - location = location_match.group(1).strip() if location_match else "" - clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip() - encoded_keywords = clean_keywords.replace(" ", "+") # Amazon uses + for spaces - + clean_kw, location_kw = self._extract_keywords_and_location(search_keywords) + encoded_keywords = clean_kw.replace(" ", "+") + # ✅ FIXED: removed extra spaces search_url = f"https://www.amazon.jobs/en/search?base_query={encoded_keywords}" - if location: - # Amazon uses location filter via `loc_query` - search_url += f"&loc_query={location.replace(' ', '+')}" + if location_kw: + search_url += f"&loc_query={location_kw.replace(' ', '+')}" profile = self.engine._select_profile() renderer = random.choice(self.engine.common_renderers[self.engine.os]) @@ -204,14 +254,11 @@ class AmazonJobScraper: temp_fetcher = StealthyFetcher(self.engine, browser, context) print("✅ Bypassing login (Amazon jobs are public)...") - login_successful = True - await page.wait_for_load_state("load", timeout=120000) - # Protection check (same as LinkedIn logic) protection_type = await temp_fetcher._detect_protection(page) if protection_type: - print(f"🛡️ Protection detected on initial page: {protection_type}") + print(f"🛡️ Protection detected: {protection_type}") content_accessible = await temp_fetcher._is_content_accessible(page) if not content_accessible: handled = await self.engine._handle_cloudflare(page) if protection_type == "cloudflare" else False @@ -220,25 +267,18 @@ class AmazonJobScraper: self.engine.report_outcome("protection_block") return else: - print("✅ Protection present but content accessible — proceeding.") + print("✅ Protection present but content accessible.") print(f"🔍 Searching Amazon for: {search_keywords}") - await page.goto(search_url, wait_until='load', timeout=120000) - await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed) + await page.goto(search_url, timeout=120000) - # Protection check on search page - protection_type = await temp_fetcher._detect_protection(page) - if protection_type: - print(f"🛡️ Protection detected on search page: {protection_type}") - content_accessible = await temp_fetcher._is_content_accessible(page) - if not content_accessible: - handled = await self.engine._handle_cloudflare(page) if protection_type == "cloudflare" else False - if not handled: - await browser.close() - self.engine.report_outcome("protection_block") - return - else: - print("✅ Protection present but content accessible — proceeding.") + try: + await page.wait_for_selector("div[data-job-id]", timeout=40000) + print("✅ Job listings detected.") + except PlaywrightTimeoutError: + print("❌ No job cards found.") + await browser.close() + return all_job_links = [] seen_job_ids = set() @@ -247,67 +287,28 @@ class AmazonJobScraper: initial_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) print(f" ➕ Found {initial_jobs} initial job(s) (total: {len(all_job_links)})") - # Amazon uses pagination (not infinite scroll) await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links) - print(f"✅ Collected {len(all_job_links)} unique job links.") + print(f"✅ Collected {len(all_job_links)} unique job listings.") scraped_count = 0 - for idx, (href, title) in enumerate(all_job_links): + for idx, (href, title, posted_date) in enumerate(all_job_links): try: + # ✅ FIXED: removed extra spaces full_url = href if href.startswith("http") else f"https://www.amazon.jobs{href}" - print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}") + print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url} (posted: {posted_date})") fetcher = StealthyFetcher(self.engine, browser, context) job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1[data-testid='job-title']") if not job_page: - print(f" ❌ Failed to fetch job page {full_url} after retries.") job_id = href.strip("/").split("/")[-1] if href else "unknown" await self._add_job_to_redis_cache(full_url, job_id, "fetch_failure") self.engine.report_outcome("fetch_failure", url=full_url) continue - posted_date = await self._extract_job_posted_date(job_page) - print(f" 📅 Posted date extracted: {posted_date}") - apply_btn = await job_page.query_selector("a:has-text('Apply now'), button:has-text('Apply now')") - final_url = full_url - external_url = None - page_content = None - - if apply_btn: - apply_href = await apply_btn.get_attribute("href") - if apply_href and apply_href.startswith("http"): - print(" 🌐 Detected external apply URL — capturing directly.") - external_url = apply_href - final_url = external_url - # We won't navigate; just pass Amazon job page to LLM - page_content = await self._extract_page_content_for_llm(job_page) - else: - print(" → Clicking 'Apply now' (may open new tab)...") - page_waiter = asyncio.create_task(context.wait_for_event("page")) - await self._human_click(job_page, apply_btn, wait_after=False) - - external_page = None - try: - external_page = await asyncio.wait_for(page_waiter, timeout=5.0) - print(" 🌐 External job site opened in new tab.") - await external_page.wait_for_load_state("load", timeout=120000) - await asyncio.sleep(2 * self.human_speed) - await self.engine._human_like_scroll(external_page) - external_url = external_page.url - final_url = external_url - page_content = await self._extract_page_content_for_llm(external_page) - if not external_page.is_closed(): - await external_page.close() - except asyncio.TimeoutError: - print(" 🖥️ No external tab — using Amazon job page.") - page_content = await self._extract_page_content_for_llm(job_page) - else: - print(" ⚠️ No 'Apply now' button — scraping job page directly.") - page_content = await self._extract_page_content_for_llm(job_page) - + page_content = await self._extract_page_content_for_llm(job_page) job_id = href.strip("/").split("/")[-1] if href else "unknown" raw_data = { @@ -332,22 +333,22 @@ class AmazonJobScraper: refined_data[field] = "Amazon" refined_data['scraped_at'] = datetime.now().isoformat() - refined_data['category'] = clean_keywords + refined_data['category'] = clean_kw refined_data['posted_date'] = posted_date await self.llm_agent.save_job_data(refined_data, search_keywords) scraped_count += 1 - print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...") + print(f" ✅ Scraped: {refined_data['title'][:50]}...") self.engine.report_outcome("success", url=raw_data["url"]) else: - print(f" 🟡 Could not extract meaningful data from: {final_url}") - await self._add_job_to_redis_cache(final_url, job_id, "llm_failure") + print(f" 🟡 LLM failed to refine: {full_url}") + await self._add_job_to_redis_cache(full_url, job_id, "llm_failure") self.engine.report_outcome("llm_failure", url=raw_data["url"]) await job_page.close() except Exception as e: error_msg = str(e)[:100] - print(f" ⚠️ Failed on job {idx+1}: {error_msg}") + print(f" ⚠️ Exception on job {idx+1}: {error_msg}") job_id = (href.strip("/").split("/")[-1] if href else "unknown") if 'href' in locals() else "unknown" job_url = full_url if 'full_url' in locals() else "unknown" await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}") @@ -356,15 +357,15 @@ class AmazonJobScraper: continue finally: - print(" ↩️ Returning to Amazon search results...") - await page.goto(search_url, timeout=120000) - await asyncio.sleep(4 * self.human_speed) + if not page.is_closed(): + await page.goto(search_url, timeout=120000) + await asyncio.sleep(4 * self.human_speed) await browser.close() if scraped_count > 0: self.engine.report_outcome("success") - print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.") + print(f"✅ Completed! Processed {scraped_count} jobs.") else: self.engine.report_outcome("captcha") print("⚠️ No jobs processed successfully.") \ No newline at end of file diff --git a/llm_agent.py b/llm_agent.py index 2f23313..8241ae2 100644 --- a/llm_agent.py +++ b/llm_agent.py @@ -146,15 +146,22 @@ class LLMJobRefiner: posted_date = raw_data.get('posted_date', datetime.now().strftime("%m/%d/%y")) prompt = f""" - You are a job posting data extractor. - - EXTRACT EXACT TEXT - DO NOT SUMMARIZE, PARAPHRASE, OR INVENT. - - For these critical fields, follow these rules: - - description: Extract ALL job description text. If ANY job details exist (duties, responsibilities, overview), include them. Only use "Not provided" if absolutely no description exists. - - requirements: Extract ALL requirements text. If ANY requirements exist (skills, experience, education needed), include them. Only use "Not provided" if none exist. - - qualifications: Extract ALL qualifications text. If ANY qualifications exist, include them. Only use "Not provided" if none exist. - +You are an expert job posting parser. Extract information EXACTLY as it appears in the text. DO NOT summarize, paraphrase, or invent. + +CRITICAL INSTRUCTIONS: +- The job is from AMAZON. Look for these exact section headings: + - "## Basic Qualifications" → extract as "qualifications" + - "## Preferred Qualifications" → include this in "qualifications" too + - "## Description" or "About the Role" or "Key job responsibilities" → extract as "description" + - "You Will:" or "Job responsibilities" → include in "description" + - Requirements are often embedded in qualifications or description + +FIELD RULES: +- description: MUST include ALL role details, responsibilities, and overview. Never "Not provided" if any job description exists. +- qualifications: MUST include ALL content from "Basic Qualifications" and "Preferred Qualifications" sections. Combine them. +- requirements: If no separate "requirements" section, extract required skills/experience from qualifications/description. +- For Amazon jobs, company_name = "Amazon". + REQUIRED FIELDS (must have valid values, never "N/A"): - title, company_name, job_id, url @@ -170,7 +177,6 @@ class LLMJobRefiner: "company_name": "...", "location": "...", "description": "...", - "requirements": "...", "qualifications": "...", "salary_range": "...", "nature_of_work": "...", @@ -196,7 +202,7 @@ class LLMJobRefiner: return None # CRITICAL: Validate content fields - check if they SHOULD exist - content_fields = ['description', 'requirements', 'qualifications'] + content_fields = ['description', 'qualifications'] cleaned_original = cleaned_content.lower() # Simple heuristic: if page contains job-related keywords, content fields should NOT be "Not provided"