"Specifically for scraping job postings from Amazon Jobs." import asyncio import random import re from typing import Optional, Dict from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError from browserforge.injectors.playwright import AsyncNewContext from llm_agent import LLMJobRefiner from fetcher import StealthyFetcher from datetime import datetime class AmazonJobScraper: def __init__( self, engine, db_path: str = "amazon_jobs.db", human_speed: float = 1.0, user_request: str = "Extract all standard job details" ): self.engine = engine self.db_path = db_path self.human_speed = human_speed self.user_request = user_request self.llm_agent = LLMJobRefiner() async def _human_click(self, page, element, wait_after: bool = True): if not element: return False await element.scroll_into_view_if_needed() await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed) try: await element.click() if wait_after: await asyncio.sleep(random.uniform(1.0, 2.0) * self.human_speed) return True except: return False def _extract_location_from_keywords(self, search_keywords: str) -> str: location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) return location_match.group(1).strip() if location_match else "" def _build_amazon_search_url(self, keywords: str) -> str: clean_keywords = re.sub(r'location:\s*[^,]+', '', keywords, flags=re.IGNORECASE).strip() location = self._extract_location_from_keywords(keywords) base_url = "https://www.amazon.jobs/en/search?" params = [] if clean_keywords: params.append(f"base_query={clean_keywords.replace(' ', '+')}") if location: params.append(f"loc_query={location.replace(' ', '+')}") params.append("offset=0") params.append("result_limit=10") return base_url + "&".join(params) async def _extract_page_content_for_llm(self, page) -> str: await asyncio.sleep(2 * self.human_speed) await self.engine._human_like_scroll(page) await asyncio.sleep(2 * self.human_speed) return await page.content() async def _scrape_job_links_from_page(self, page, seen_job_ids, all_job_links): job_cards = await page.query_selector_all('div.job-tile a[href^="/en/jobs/"]') new_jobs = 0 for card in job_cards: href = await card.get_attribute("href") if not href: continue full_url = f"https://www.amazon.jobs{href}" if href.startswith("/") else href job_id = href.split("/")[-1] if href.split("/")[-1] else "unknown" if job_id in seen_job_ids: continue title_element = await card.query_selector('h3') title = await title_element.inner_text() if title_element else "Unknown Title" seen_job_ids.add(job_id) all_job_links.append((full_url, title)) new_jobs += 1 return new_jobs async def _scroll_and_collect_jobs(self, page, seen_job_ids, all_job_links, max_pages=5): offset = 0 jobs_per_page = 10 for page_num in range(max_pages): print(f"📄 Fetching Amazon job page {page_num + 1} (offset: {offset})") current_url = page.url if "offset=" in current_url: base_url = current_url.split("offset=")[0] new_url = base_url + f"offset={offset}&result_limit={jobs_per_page}" else: new_url = current_url + f"&offset={offset}&result_limit={jobs_per_page}" await page.goto(new_url, wait_until='domcontentloaded', timeout=120000) await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) new_jobs = await self._scrape_job_links_from_page(page, seen_job_ids, all_job_links) print(f" ➕ Found {new_jobs} new job(s) on page {page_num + 1} (total: {len(all_job_links)})") if new_jobs == 0 and page_num > 0: print("🔚 No new jobs — stopping pagination.") break offset += jobs_per_page async def scrape_jobs( self, search_keywords: Optional[str], max_pages: int = 5, credentials: Optional[Dict] = None # Not used for Amazon ): search_url = self._build_amazon_search_url(search_keywords) print(f"🔍 Amazon search URL: {search_url}") profile = self.engine._select_profile() renderer = random.choice(self.engine.common_renderers[self.engine.os]) vendor = random.choice(self.engine.common_vendors) spoof_script = self.engine._get_spoof_script(renderer, vendor) async with async_playwright() as pw: browser = await pw.chromium.launch( headless=False, args=['--disable-blink-features=AutomationControlled'] ) context = await AsyncNewContext(browser, fingerprint=profile) await context.add_init_script(f""" Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }}); Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }}); Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }}); """) await context.add_init_script(spoof_script) page = await context.new_page() temp_fetcher = StealthyFetcher(self.engine, browser, context) # Amazon doesn't require login print("🌐 Navigating to Amazon Jobs (no login required)...") await page.goto(search_url, wait_until='domcontentloaded', timeout=120000) await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) # Protection check protection_type = await temp_fetcher._detect_protection(page) if protection_type: print(f"🛡️ Protection detected: {protection_type}") content_accessible = await temp_fetcher._is_content_accessible(page) if not content_accessible: handled = await self.engine._handle_cloudflare(page) if protection_type == "cloudflare" else False if not handled: await browser.close() self.engine.report_outcome("protection_block") return else: print("✅ Protection present but content accessible.") all_job_links = [] seen_job_ids = set() print("🔄 Collecting job links via pagination...") await self._scroll_and_collect_jobs(page, seen_job_ids, all_job_links, max_pages=max_pages) print(f"✅ Collected {len(all_job_links)} unique Amazon job links.") scraped_count = 0 for idx, (job_url, title) in enumerate(all_job_links): try: print(f" → Opening job {idx+1}/{len(all_job_links)}: {job_url}") fetcher = StealthyFetcher(self.engine, browser, context) job_page = await fetcher.fetch_url(job_url, wait_for_selector="h1.job-title") if not job_page: print(f" ❌ Failed to fetch job page: {job_url}") self.engine.report_outcome("fetch_failure", url=job_url) continue # Extract raw HTML for LLM await self.engine._human_like_scroll(job_page) await asyncio.sleep(2 * self.human_speed) page_content = await self._extract_page_content_for_llm(job_page) job_id = job_url.split("/")[-1] if job_url.split("/")[-1] else "unknown" raw_data = { "page_content": page_content, "url": job_url, "job_id": job_id, "search_keywords": search_keywords } refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request) if refined_data and refined_data.get("title", "N/A") != "N/A": # Ensure compulsory fields compulsory_fields = ['company_name', 'job_id', 'url'] for field in compulsory_fields: if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]: if field == 'job_id': refined_data[field] = job_id elif field == 'url': refined_data[field] = job_url elif field == 'company_name': refined_data[field] = "Amazon" refined_data['scraped_at'] = datetime.now().isoformat() refined_data['category'] = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip() await self.llm_agent.save_job_data(refined_data, search_keywords) scraped_count += 1 print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...") self.engine.report_outcome("success", url=job_url) else: print(f" 🟡 LLM could not extract valid data from: {job_url}") self.engine.report_outcome("llm_failure", url=job_url) await job_page.close() except Exception as e: print(f" ⚠️ Failed on job {idx+1}: {str(e)[:100]}") if 'job_page' in locals() and job_page: await job_page.close() continue await browser.close() if scraped_count > 0: self.engine.report_outcome("success") print(f"✅ Completed! Processed {scraped_count} Amazon jobs for '{search_keywords}'.") else: self.engine.report_outcome("no_jobs") print("⚠️ No Amazon jobs processed successfully.")