From cbcffa8cd4eb2a69e2780b14f60e8f8e7dac0a9d Mon Sep 17 00:00:00 2001 From: Ofure Ikheloa Date: Tue, 9 Dec 2025 09:12:35 +0100 Subject: [PATCH] modify to queue failed jobs and also extract date of job posting --- amazon_job_scraper.py | 329 +++++++++++++++++++++++++++++------------- 1 file changed, 232 insertions(+), 97 deletions(-) diff --git a/amazon_job_scraper.py b/amazon_job_scraper.py index 8e54bdd..c73f3dc 100644 --- a/amazon_job_scraper.py +++ b/amazon_job_scraper.py @@ -1,13 +1,15 @@ "Specifically for scraping job postings from Amazon Jobs." import asyncio import random -import re from typing import Optional, Dict from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError from browserforge.injectors.playwright import AsyncNewContext from llm_agent import LLMJobRefiner +import re from fetcher import StealthyFetcher from datetime import datetime +import json +import redis class AmazonJobScraper: @@ -22,7 +24,12 @@ class AmazonJobScraper: self.db_path = db_path self.human_speed = human_speed self.user_request = user_request + self._init_db() self.llm_agent = LLMJobRefiner() + self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) + + def _init_db(self): + pass # Handled by LLMJobRefiner async def _human_click(self, page, element, wait_after: bool = True): if not element: @@ -32,30 +39,15 @@ class AmazonJobScraper: try: await element.click() if wait_after: - await asyncio.sleep(random.uniform(1.0, 2.0) * self.human_speed) + await asyncio.sleep(random.uniform(2, 4) * self.human_speed) return True except: return False - def _extract_location_from_keywords(self, search_keywords: str) -> str: - location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) - return location_match.group(1).strip() if location_match else "" - - def _build_amazon_search_url(self, keywords: str) -> str: - clean_keywords = re.sub(r'location:\s*[^,]+', '', keywords, flags=re.IGNORECASE).strip() - location = self._extract_location_from_keywords(keywords) - - base_url = "https://www.amazon.jobs/en/search?" - params = [] - - if clean_keywords: - params.append(f"base_query={clean_keywords.replace(' ', '+')}") - if location: - params.append(f"loc_query={location.replace(' ', '+')}") - params.append("offset=0") - params.append("result_limit=10") - - return base_url + "&".join(params) + async def _login(self, page, credentials: Dict) -> bool: + # Amazon job pages do NOT require login. + # Skip login unless we're scraping internal dashboards (not needed here). + return True async def _extract_page_content_for_llm(self, page) -> str: await asyncio.sleep(2 * self.human_speed) @@ -63,60 +55,131 @@ class AmazonJobScraper: await asyncio.sleep(2 * self.human_speed) return await page.content() - async def _scrape_job_links_from_page(self, page, seen_job_ids, all_job_links): - job_cards = await page.query_selector_all('div.job-tile a[href^="/en/jobs/"]') + def _calculate_keyword_match(self, title: str, keywords: str) -> float: + if not title or not keywords: + return 0.0 + title_lower = title.lower() + keyword_list = [kw.strip().lower() for kw in keywords.split()] + matches = sum(1 for kw in keyword_list if kw in title_lower) + return matches / len(keyword_list) if keyword_list else 0.0 + + def _extract_location_from_keywords(self, search_keywords: str) -> str: + location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) + return location_match.group(1).strip().lower() if location_match else "" + + async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links): + current_links = await page.query_selector_all("a[href*='/jobs/']") new_jobs = 0 - for card in job_cards: - href = await card.get_attribute("href") - if not href: + location_from_keywords = self._extract_location_from_keywords(search_keywords) + + for link in current_links: + href = await link.get_attribute("href") + if not href or "page=" in href or "search?" in href: continue - full_url = f"https://www.amazon.jobs{href}" if href.startswith("/") else href - job_id = href.split("/")[-1] if href.split("/")[-1] else "unknown" - - if job_id in seen_job_ids: - continue - - title_element = await card.query_selector('h3') - title = await title_element.inner_text() if title_element else "Unknown Title" - - seen_job_ids.add(job_id) - all_job_links.append((full_url, title)) - new_jobs += 1 - + + full_url = href if href.startswith("http") else f"https://www.amazon.jobs{href}" + job_id = href.strip("/").split("/")[-1] if href else "unknown" + + if job_id and job_id not in seen_job_ids: + title_element = await link.query_selector("h3") or await link.query_selector(".job-title") + title = await title_element.inner_text() if title_element else "Unknown Title" + + match_percentage = self._calculate_keyword_match(title, search_keywords) + location_match = True + if location_from_keywords: + location_element = await link.query_selector(".location-and-id") + if location_element: + location_text = await location_element.inner_text() + location_match = location_from_keywords in location_text.lower() + + if match_percentage >= 0.7 and location_match: + seen_job_ids.add(job_id) + all_job_links.append((href, title)) + new_jobs += 1 + elif match_percentage < 0.7: + print(f" âš ī¸ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})") + elif not location_match: + print(f" âš ī¸ Skipping job due to location mismatch: {title[:50]}... (expected: {location_from_keywords})") + else: + seen_job_ids.add(job_id) + all_job_links.append((href, "Unknown Title")) + new_jobs += 1 return new_jobs - async def _scroll_and_collect_jobs(self, page, seen_job_ids, all_job_links, max_pages=5): - offset = 0 - jobs_per_page = 10 - for page_num in range(max_pages): - print(f"📄 Fetching Amazon job page {page_num + 1} (offset: {offset})") - current_url = page.url - if "offset=" in current_url: - base_url = current_url.split("offset=")[0] - new_url = base_url + f"offset={offset}&result_limit={jobs_per_page}" + async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links): + current_page = 1 + while current_page <= 10: # Amazon limits to ~10 pages publicly + print(f"📄 Processing page {current_page}") + new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) + print(f" ➕ Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})") + + next_btn = await page.query_selector("a[aria-label='Next page']") + if next_btn: + next_url = await next_btn.get_attribute("href") + if next_url: + full_next_url = next_url if next_url.startswith("http") else f"https://www.amazon.jobs{next_url}" + print(f" âžĄī¸ Navigating to next page: {full_next_url}") + await page.goto(full_next_url, timeout=120000) + await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) + current_page += 1 + else: + break else: - new_url = current_url + f"&offset={offset}&result_limit={jobs_per_page}" - - await page.goto(new_url, wait_until='domcontentloaded', timeout=120000) - await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) - - new_jobs = await self._scrape_job_links_from_page(page, seen_job_ids, all_job_links) - print(f" ➕ Found {new_jobs} new job(s) on page {page_num + 1} (total: {len(all_job_links)})") - - if new_jobs == 0 and page_num > 0: - print("🔚 No new jobs — stopping pagination.") + print("🔚 No 'Next' button found — stopping pagination.") break - offset += jobs_per_page + async def _extract_job_posted_date(self, page) -> str: + try: + # Amazon often includes "Posted X days ago" in job description + content = await page.content() + match = re.search(r'Posted\s+(\d+)\s+day[s]?\s+ago', content, re.IGNORECASE) + if match: + days_ago = int(match.group(1)) + posted_date = datetime.now() - timedelta(days=days_ago) + return posted_date.strftime("%m/%d/%y") + + # Fallback: check for explicit date in page (rare) + date_match = re.search(r'(\d{1,2})/(\d{1,2})/(\d{4})', content) + if date_match: + month, day, year = date_match.groups() + return f"{month.zfill(2)}/{day.zfill(2)}/{year[-2:]}" + + # Default to today + return datetime.now().strftime("%m/%d/%y") + except Exception as e: + print(f" âš ī¸ Error extracting Amazon posted date: {str(e)}") + return datetime.now().strftime("%m/%d/%y") + + async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str): + try: + job_data = { + "job_url": job_url, + "job_id": job_id, + "error_type": error_type, + "timestamp": datetime.now().isoformat() + } + self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data)) + print(f" đŸ“Ļ Added failed job to Redis cache: {job_id} (Error: {error_type})") + except Exception as e: + print(f" ❌ Failed to add job to Redis cache: {str(e)}") async def scrape_jobs( self, search_keywords: Optional[str], - max_pages: int = 5, - credentials: Optional[Dict] = None # Not used for Amazon + max_pages: int = 1, + credentials: Optional[Dict] = None ): - search_url = self._build_amazon_search_url(search_keywords) - print(f"🔍 Amazon search URL: {search_url}") + from datetime import timedelta # needed for date math + + location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) + location = location_match.group(1).strip() if location_match else "" + clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip() + encoded_keywords = clean_keywords.replace(" ", "+") # Amazon uses + for spaces + + search_url = f"https://www.amazon.jobs/en/search?base_query={encoded_keywords}" + if location: + # Amazon uses location filter via `loc_query` + search_url += f"&loc_query={location.replace(' ', '+')}" profile = self.engine._select_profile() renderer = random.choice(self.engine.common_renderers[self.engine.os]) @@ -140,15 +203,15 @@ class AmazonJobScraper: page = await context.new_page() temp_fetcher = StealthyFetcher(self.engine, browser, context) - # Amazon doesn't require login - print("🌐 Navigating to Amazon Jobs (no login required)...") - await page.goto(search_url, wait_until='domcontentloaded', timeout=120000) - await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) + print("✅ Bypassing login (Amazon jobs are public)...") + login_successful = True - # Protection check + await page.wait_for_load_state("load", timeout=120000) + + # Protection check (same as LinkedIn logic) protection_type = await temp_fetcher._detect_protection(page) if protection_type: - print(f"đŸ›Ąī¸ Protection detected: {protection_type}") + print(f"đŸ›Ąī¸ Protection detected on initial page: {protection_type}") content_accessible = await temp_fetcher._is_content_accessible(page) if not content_accessible: handled = await self.engine._handle_cloudflare(page) if protection_type == "cloudflare" else False @@ -157,79 +220,151 @@ class AmazonJobScraper: self.engine.report_outcome("protection_block") return else: - print("✅ Protection present but content accessible.") + print("✅ Protection present but content accessible — proceeding.") + + print(f"🔍 Searching Amazon for: {search_keywords}") + await page.goto(search_url, wait_until='load', timeout=120000) + await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed) + + # Protection check on search page + protection_type = await temp_fetcher._detect_protection(page) + if protection_type: + print(f"đŸ›Ąī¸ Protection detected on search page: {protection_type}") + content_accessible = await temp_fetcher._is_content_accessible(page) + if not content_accessible: + handled = await self.engine._handle_cloudflare(page) if protection_type == "cloudflare" else False + if not handled: + await browser.close() + self.engine.report_outcome("protection_block") + return + else: + print("✅ Protection present but content accessible — proceeding.") all_job_links = [] seen_job_ids = set() - print("🔄 Collecting job links via pagination...") - await self._scroll_and_collect_jobs(page, seen_job_ids, all_job_links, max_pages=max_pages) + print("🔄 Collecting initial job links...") + initial_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) + print(f" ➕ Found {initial_jobs} initial job(s) (total: {len(all_job_links)})") - print(f"✅ Collected {len(all_job_links)} unique Amazon job links.") + # Amazon uses pagination (not infinite scroll) + await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links) + + print(f"✅ Collected {len(all_job_links)} unique job links.") scraped_count = 0 - for idx, (job_url, title) in enumerate(all_job_links): + for idx, (href, title) in enumerate(all_job_links): try: - print(f" → Opening job {idx+1}/{len(all_job_links)}: {job_url}") - fetcher = StealthyFetcher(self.engine, browser, context) - job_page = await fetcher.fetch_url(job_url, wait_for_selector="h1.job-title") + full_url = href if href.startswith("http") else f"https://www.amazon.jobs{href}" + print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}") + fetcher = StealthyFetcher(self.engine, browser, context) + job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1[data-testid='job-title']") if not job_page: - print(f" ❌ Failed to fetch job page: {job_url}") - self.engine.report_outcome("fetch_failure", url=job_url) + print(f" ❌ Failed to fetch job page {full_url} after retries.") + job_id = href.strip("/").split("/")[-1] if href else "unknown" + await self._add_job_to_redis_cache(full_url, job_id, "fetch_failure") + self.engine.report_outcome("fetch_failure", url=full_url) continue - # Extract raw HTML for LLM - await self.engine._human_like_scroll(job_page) - await asyncio.sleep(2 * self.human_speed) - page_content = await self._extract_page_content_for_llm(job_page) + posted_date = await self._extract_job_posted_date(job_page) + print(f" 📅 Posted date extracted: {posted_date}") - job_id = job_url.split("/")[-1] if job_url.split("/")[-1] else "unknown" + apply_btn = await job_page.query_selector("a:has-text('Apply now'), button:has-text('Apply now')") + + final_url = full_url + external_url = None + page_content = None + + if apply_btn: + apply_href = await apply_btn.get_attribute("href") + if apply_href and apply_href.startswith("http"): + print(" 🌐 Detected external apply URL — capturing directly.") + external_url = apply_href + final_url = external_url + # We won't navigate; just pass Amazon job page to LLM + page_content = await self._extract_page_content_for_llm(job_page) + else: + print(" → Clicking 'Apply now' (may open new tab)...") + page_waiter = asyncio.create_task(context.wait_for_event("page")) + await self._human_click(job_page, apply_btn, wait_after=False) + + external_page = None + try: + external_page = await asyncio.wait_for(page_waiter, timeout=5.0) + print(" 🌐 External job site opened in new tab.") + await external_page.wait_for_load_state("load", timeout=120000) + await asyncio.sleep(2 * self.human_speed) + await self.engine._human_like_scroll(external_page) + external_url = external_page.url + final_url = external_url + page_content = await self._extract_page_content_for_llm(external_page) + if not external_page.is_closed(): + await external_page.close() + except asyncio.TimeoutError: + print(" đŸ–Ĩī¸ No external tab — using Amazon job page.") + page_content = await self._extract_page_content_for_llm(job_page) + else: + print(" âš ī¸ No 'Apply now' button — scraping job page directly.") + page_content = await self._extract_page_content_for_llm(job_page) + + job_id = href.strip("/").split("/")[-1] if href else "unknown" raw_data = { "page_content": page_content, - "url": job_url, + "url": final_url, "job_id": job_id, - "search_keywords": search_keywords + "search_keywords": search_keywords, + "posted_date": posted_date } refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request) if refined_data and refined_data.get("title", "N/A") != "N/A": - # Ensure compulsory fields compulsory_fields = ['company_name', 'job_id', 'url'] for field in compulsory_fields: if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]: if field == 'job_id': refined_data[field] = job_id elif field == 'url': - refined_data[field] = job_url + refined_data[field] = final_url elif field == 'company_name': refined_data[field] = "Amazon" - + refined_data['scraped_at'] = datetime.now().isoformat() - refined_data['category'] = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip() + refined_data['category'] = clean_keywords + refined_data['posted_date'] = posted_date await self.llm_agent.save_job_data(refined_data, search_keywords) scraped_count += 1 print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...") - self.engine.report_outcome("success", url=job_url) + self.engine.report_outcome("success", url=raw_data["url"]) else: - print(f" 🟡 LLM could not extract valid data from: {job_url}") - self.engine.report_outcome("llm_failure", url=job_url) + print(f" 🟡 Could not extract meaningful data from: {final_url}") + await self._add_job_to_redis_cache(final_url, job_id, "llm_failure") + self.engine.report_outcome("llm_failure", url=raw_data["url"]) await job_page.close() except Exception as e: - print(f" âš ī¸ Failed on job {idx+1}: {str(e)[:100]}") + error_msg = str(e)[:100] + print(f" âš ī¸ Failed on job {idx+1}: {error_msg}") + job_id = (href.strip("/").split("/")[-1] if href else "unknown") if 'href' in locals() else "unknown" + job_url = full_url if 'full_url' in locals() else "unknown" + await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}") if 'job_page' in locals() and job_page: await job_page.close() continue + finally: + print(" â†Šī¸ Returning to Amazon search results...") + await page.goto(search_url, timeout=120000) + await asyncio.sleep(4 * self.human_speed) + await browser.close() if scraped_count > 0: self.engine.report_outcome("success") - print(f"✅ Completed! Processed {scraped_count} Amazon jobs for '{search_keywords}'.") + print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.") else: - self.engine.report_outcome("no_jobs") - print("âš ī¸ No Amazon jobs processed successfully.") + self.engine.report_outcome("captcha") + print("âš ī¸ No jobs processed successfully.") \ No newline at end of file