diff --git a/amazon_job_scraper.py b/amazon_job_scraper.py deleted file mode 100644 index cd48a7e..0000000 --- a/amazon_job_scraper.py +++ /dev/null @@ -1,371 +0,0 @@ -import asyncio -import random -from typing import Optional, Dict -from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError -from browserforge.injectors.playwright import AsyncNewContext -from llm_agent import LLMJobRefiner -import re -from fetcher import StealthyFetcher -from datetime import datetime, timedelta -import json -import redis - - -class AmazonJobScraper: - def __init__( - self, - engine, - db_path: str = "amazon_jobs.db", - human_speed: float = 1.0, - user_request: str = "Extract all standard job details" - ): - self.engine = engine - self.db_path = db_path - self.human_speed = human_speed - self.user_request = user_request - self._init_db() - self.llm_agent = LLMJobRefiner() - self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) - - # Country alias map for flexible location matching - self.country_aliases = { - "united states": ["united states", "usa", "u.s.a", "u.s.", "us", "america", ", us", ", usa"], - "united kingdom": ["united kingdom", "uk", "great britain", "england", "gb", ", uk", ", gb"], - "canada": ["canada", "ca", ", ca"], - "india": ["india", "in", ", in"], - "germany": ["germany", "de", ", de"], - "france": ["france", "fr", ", fr"], - "australia": ["australia", "au", ", au"], - # Add more as needed - } - - def _init_db(self): - pass - - async def _safe_inner_text(self, element): - if not element: - return "Unknown" - try: - return await element.text_content() - except: - return "Unknown" - - async def _human_click(self, page, element, wait_after: bool = True): - if not element: - return False - await element.scroll_into_view_if_needed() - await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed) - try: - await element.click() - if wait_after: - await asyncio.sleep(random.uniform(2, 4) * self.human_speed) - return True - except: - return False - - async def _login(self, page, credentials: Dict) -> bool: - return True - - async def _extract_page_content_for_llm(self, page) -> str: - await asyncio.sleep(2 * self.human_speed) - await self.engine._human_like_scroll(page) - await asyncio.sleep(2 * self.human_speed) - return await page.content() - - def _extract_keywords_and_location(self, search_keywords: str): - location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) - location = location_match.group(1).strip() if location_match else "" - clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip() - return clean_keywords, location - - def _normalize_text(self, text: str) -> str: - return re.sub(r'[^a-z0-9\s]', ' ', text.lower()).strip() - - def _location_matches(self, job_location_text: str, target_location: str) -> bool: - if not target_location: - return True - target = target_location.lower().strip() - job_text = job_location_text.lower() - - # Direct substring match (e.g., "Berlin" in "Berlin, Germany") - if target in job_text: - return True - - # Check country aliases - for canonical, aliases in self.country_aliases.items(): - if target in canonical or any(target == alias for alias in aliases if len(alias) <= 3): - return any(alias in job_text for alias in aliases) - - return False - - def _parse_posted_date_from_card_text(self, card_text: str) -> str: - date_match = re.search(r'Posted\s+([A-Za-z]+\s+\d{1,2},\s+\d{4})', card_text) - if date_match: - try: - dt = datetime.strptime(date_match.group(1), "%B %d, %Y") - return dt.strftime("%m/%d/%y") - except ValueError: - pass - days_match = re.search(r'Posted\s+(\d+)\s+day[s]?\s+ago', card_text, re.IGNORECASE) - if days_match: - days = int(days_match.group(1)) - dt = datetime.now() - timedelta(days=days) - return dt.strftime("%m/%d/%y") - return datetime.now().strftime("%m/%d/%y") - - async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links): - await asyncio.sleep(1.5 * self.human_speed) - job_cards = await page.query_selector_all("div[data-job-id]") - new_jobs = 0 - - clean_kw, location_kw = self._extract_keywords_and_location(search_keywords) - keyword_terms = [term.lower().strip() for term in clean_kw.split() if term.strip()] - - for card in job_cards: - job_id = await card.get_attribute("data-job-id") - if not job_id or not job_id.isdigit() or job_id in seen_job_ids: - continue - - link = await card.query_selector("a[href*='/jobs/']") - if not link: - continue - - href = await link.get_attribute("href") - if not href or any(x in href for x in ["search?", "locations", "teams", "page=", "my.", "/account/"]): - continue - - card_text = await self._safe_inner_text(card) - normalized_card = self._normalize_text(card_text) - - # ✅ Check: ALL keyword terms must appear in card - keywords_match = all(term in normalized_card for term in keyword_terms) if keyword_terms else True - - # ✅ Check location separately with alias support - location_match = True - if location_kw: - loc_el = await card.query_selector(".location-and-id span") - job_loc = (await self._safe_inner_text(loc_el)).strip() if loc_el else "" - location_match = self._location_matches(job_loc, location_kw) - - if keywords_match and location_match: - title_span = await card.query_selector("h2.job-title span, h2 span") - title = (await self._safe_inner_text(title_span)).strip() if title_span else "Unknown" - posted_date = self._parse_posted_date_from_card_text(card_text) - seen_job_ids.add(job_id) - all_job_links.append((href, title, posted_date)) - new_jobs += 1 - print(f" ✅ Accepted: {title} (posted: {posted_date})") - else: - reasons = [] - if not keywords_match: - reasons.append("keyword mismatch") - if not location_match: - reasons.append("location mismatch") - print(f" ⚠️ Skipping: {'; '.join(reasons)}") - - return new_jobs - - async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links): - current_page_num = 1 - max_pages = 400 - - while current_page_num <= max_pages: - print(f"📄 Processing page {current_page_num}") - await asyncio.sleep(1.5 * self.human_speed) - - new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) - print(f" ➕ Found {new_jobs} new job(s) (total: {len(all_job_links)})") - - # Scroll to bottom to trigger lazy-loaded pagination (if any) - await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") - await asyncio.sleep(2 * self.human_speed) - - # Look for ANY link containing 'page=N' - next_page_num = current_page_num + 1 - next_selector = f"a[href*='page={next_page_num}']" - next_link = await page.query_selector(next_selector) - - if next_link: - href = await next_link.get_attribute("href") - if href: - next_url = "https://www.amazon.jobs" + href if href.startswith("/") else href - print(f" ➡️ Going to page {next_page_num}: {next_url}") - await page.goto(next_url, timeout=120000) - try: - await page.wait_for_selector("div[data-job-id]", timeout=30000) - except PlaywrightTimeoutError: - print(" ⚠️ No jobs loaded on next page.") - break - current_page_num = next_page_num - else: - break - else: - print(" 🔚 No next page link found.") - break - - print(f"✅ Finished pagination after {current_page_num} pages.") - - async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str): - try: - job_data = { - "job_url": job_url, - "job_id": job_id, - "error_type": error_type, - "timestamp": datetime.now().isoformat() - } - self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data)) - print(f" 📦 Added failed job to Redis cache: {job_id} (Error: {error_type})") - except Exception as e: - print(f" ❌ Failed to add to Redis: {str(e)}") - - async def scrape_jobs( - self, - search_keywords: Optional[str], - max_pages: int = 10, - credentials: Optional[Dict] = None - ): - clean_kw, location_kw = self._extract_keywords_and_location(search_keywords) - encoded_keywords = clean_kw.replace(" ", "+") - # ✅ FIXED: removed extra spaces - search_url = f"https://www.amazon.jobs/en/search?base_query={encoded_keywords}" - if location_kw: - search_url += f"&loc_query={location_kw.replace(' ', '+')}" - - profile = self.engine._select_profile() - renderer = random.choice(self.engine.common_renderers[self.engine.os]) - vendor = random.choice(self.engine.common_vendors) - spoof_script = self.engine._get_spoof_script(renderer, vendor) - - async with async_playwright() as pw: - browser = await pw.chromium.launch( - headless=False, - args=['--disable-blink-features=AutomationControlled'] - ) - context = await AsyncNewContext(browser, fingerprint=profile) - - await context.add_init_script(f""" - Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }}); - Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }}); - Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }}); - """) - await context.add_init_script(spoof_script) - - page = await context.new_page() - temp_fetcher = StealthyFetcher(self.engine, browser, context) - - print("✅ Bypassing login (Amazon jobs are public)...") - await page.wait_for_load_state("load", timeout=120000) - - protection_type = await temp_fetcher._detect_protection(page) - if protection_type: - print(f"🛡️ Protection detected: {protection_type}") - content_accessible = await temp_fetcher._is_content_accessible(page) - if not content_accessible: - handled = await self.engine._handle_cloudflare(page) if protection_type == "cloudflare" else False - if not handled: - await browser.close() - self.engine.report_outcome("protection_block") - return - else: - print("✅ Protection present but content accessible.") - - print(f"🔍 Searching Amazon for: {search_keywords}") - await page.goto(search_url, timeout=120000) - - try: - await page.wait_for_selector("div[data-job-id]", timeout=40000) - print("✅ Job listings detected.") - except PlaywrightTimeoutError: - print("❌ No job cards found.") - await browser.close() - return - - all_job_links = [] - seen_job_ids = set() - - print("🔄 Collecting initial job links...") - initial_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) - print(f" ➕ Found {initial_jobs} initial job(s) (total: {len(all_job_links)})") - - await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links) - - print(f"✅ Collected {len(all_job_links)} unique job listings.") - - scraped_count = 0 - for idx, (href, title, posted_date) in enumerate(all_job_links): - try: - # ✅ FIXED: removed extra spaces - full_url = href if href.startswith("http") else f"https://www.amazon.jobs{href}" - print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url} (posted: {posted_date})") - - fetcher = StealthyFetcher(self.engine, browser, context) - job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1[data-testid='job-title']") - if not job_page: - job_id = href.strip("/").split("/")[-1] if href else "unknown" - await self._add_job_to_redis_cache(full_url, job_id, "fetch_failure") - self.engine.report_outcome("fetch_failure", url=full_url) - continue - - apply_btn = await job_page.query_selector("a:has-text('Apply now'), button:has-text('Apply now')") - final_url = full_url - page_content = await self._extract_page_content_for_llm(job_page) - job_id = href.strip("/").split("/")[-1] if href else "unknown" - - raw_data = { - "page_content": page_content, - "url": final_url, - "job_id": job_id, - "search_keywords": search_keywords, - "posted_date": posted_date - } - - refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request) - - if refined_data and refined_data.get("title", "N/A") != "N/A": - compulsory_fields = ['company_name', 'job_id', 'url'] - for field in compulsory_fields: - if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]: - if field == 'job_id': - refined_data[field] = job_id - elif field == 'url': - refined_data[field] = final_url - elif field == 'company_name': - refined_data[field] = "Amazon" - - refined_data['scraped_at'] = datetime.now().isoformat() - refined_data['category'] = clean_kw - refined_data['posted_date'] = posted_date - await self.llm_agent.save_job_data(refined_data, search_keywords) - scraped_count += 1 - print(f" ✅ Scraped: {refined_data['title'][:50]}...") - self.engine.report_outcome("success", url=raw_data["url"]) - else: - print(f" 🟡 LLM failed to refine: {full_url}") - await self._add_job_to_redis_cache(full_url, job_id, "llm_failure") - self.engine.report_outcome("llm_failure", url=raw_data["url"]) - - await job_page.close() - - except Exception as e: - error_msg = str(e)[:100] - print(f" ⚠️ Exception on job {idx+1}: {error_msg}") - job_id = (href.strip("/").split("/")[-1] if href else "unknown") if 'href' in locals() else "unknown" - job_url = full_url if 'full_url' in locals() else "unknown" - await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}") - if 'job_page' in locals() and job_page: - await job_page.close() - continue - - finally: - if not page.is_closed(): - await page.goto(search_url, timeout=120000) - await asyncio.sleep(4 * self.human_speed) - - await browser.close() - - if scraped_count > 0: - self.engine.report_outcome("success") - print(f"✅ Completed! Processed {scraped_count} jobs.") - else: - self.engine.report_outcome("captcha") - print("⚠️ No jobs processed successfully.") \ No newline at end of file