import asyncio import random from typing import Optional, Dict from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError from browserforge.injectors.playwright import AsyncNewContext from llm_agent import LLMJobRefiner import re from fetcher import StealthyFetcher from datetime import datetime import json import redis class LinkedInJobScraper: def __init__( self, engine, db_path: str = "linkedin_jobs.db", human_speed: float = 1.0, user_request: str = "Extract all standard job details" ): self.engine = engine self.db_path = db_path self.human_speed = human_speed self.user_request = user_request self._init_db() self.llm_agent = LLMJobRefiner() # Initialize Redis connection self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) def _init_db(self): # This method is kept for backward compatibility but LLMJobRefiner handles PostgreSQL now pass async def _human_click(self, page, element, wait_after: bool = True): if not element: return False await element.scroll_into_view_if_needed() await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed) try: await element.click() if wait_after: await asyncio.sleep(random.uniform(2, 4) * self.human_speed) return True except: return False async def _login(self, page, credentials: Dict) -> bool: print("🔐 Navigating to LinkedIn login page...") await page.goto("https://www.linkedin.com/login", timeout=120000) await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed) email_field = await page.query_selector('input[name="session_key"]') if not email_field: print("❌ Email field not found.") return False print("âœī¸ Typing username...") await email_field.click() await asyncio.sleep(random.uniform(0.4, 0.9) * self.human_speed) for char in credentials["email"]: await page.keyboard.type(char) await asyncio.sleep(random.uniform(0.06, 0.14) * self.human_speed) await asyncio.sleep(random.uniform(1.0, 1.8) * self.human_speed) password_field = await page.query_selector('input[name="session_password"]') if not password_field: print("❌ Password field not found.") return False print("🔒 Typing password...") await password_field.click() await asyncio.sleep(random.uniform(0.3, 0.7) * self.human_speed) for char in credentials["password"]: await page.keyboard.type(char) await asyncio.sleep(random.uniform(0.08, 0.16) * self.human_speed) await asyncio.sleep(random.uniform(0.8, 1.5) * self.human_speed) print("✅ Submitting login form...") await page.keyboard.press("Enter") for _ in range(15): current_url = page.url if "/feed" in current_url or "/jobs" in current_url: if "login" not in current_url: print("✅ Login successful!") await asyncio.sleep(random.uniform(2.0, 3.0) * self.human_speed) return True await asyncio.sleep(1) print("❌ Login may have failed.") return False async def _extract_page_content_for_llm(self, page) -> str: """ Extract raw page content as HTML/text for LLM processing The LLM will handle all extraction logic, not specific selectors """ await asyncio.sleep(2 * self.human_speed) await self.engine._human_like_scroll(page) await asyncio.sleep(2 * self.human_speed) page_content = await page.content() return page_content def _calculate_keyword_match(self, title: str, keywords: str) -> float: if not title or not keywords: return 0.0 title_lower = title.lower() keyword_list = [kw.strip().lower() for kw in keywords.split()] matches = sum(1 for kw in keyword_list if kw in title_lower) return matches / len(keyword_list) if keyword_list else 0.0 def _extract_location_from_keywords(self, search_keywords: str) -> str: location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) return location_match.group(1).strip().lower() if location_match else "" async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links): current_links = await page.query_selector_all("a[href*='/jobs/view/']") new_jobs = 0 location_from_keywords = self._extract_location_from_keywords(search_keywords) for link in current_links: href = await link.get_attribute("href") if href: full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}" job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href if job_id and job_id not in seen_job_ids: title_element = await link.query_selector("span.job-title, h3, .job-card-title") if title_element: title = await title_element.inner_text() match_percentage = self._calculate_keyword_match(title, search_keywords) location_match = True if location_from_keywords: location_element = await link.query_selector("span.job-location, .job-card-location, .location") if location_element: location_text = await location_element.inner_text() location_match = location_from_keywords in location_text.lower() if match_percentage >= 0.7 and location_match: seen_job_ids.add(job_id) all_job_links.append((href, title)) new_jobs += 1 elif match_percentage < 0.7: print(f" âš ī¸ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})") elif not location_match: print(f" âš ī¸ Skipping job due to location mismatch: {title[:50]}... (expected: {location_from_keywords})") else: seen_job_ids.add(job_id) all_job_links.append((href, "Unknown Title")) new_jobs += 1 return new_jobs async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links): current_page = 1 while True: print(f"📄 Processing page {current_page}") new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) print(f" ➕ Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})") next_btn = await page.query_selector("button[aria-label='Next']") if next_btn and await next_btn.is_enabled(): await self._human_click(page, next_btn) await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed) try: await page.wait_for_function("() => window.location.href.includes('start=')", timeout=120000) except: pass current_page += 1 else: print("🔚 'Next' button not available — stopping pagination.") break async def _handle_infinite_scroll(self, page, search_keywords: str, seen_job_ids, all_job_links): last_height = await page.evaluate("document.body.scrollHeight") no_new_jobs_count = 0 max_no_new = 3 while no_new_jobs_count < max_no_new: await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) new_jobs_found = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) print(f" ➕ Found {new_jobs_found} new job(s) (total: {len(all_job_links)})") new_height = await page.evaluate("document.body.scrollHeight") if new_height == last_height: no_new_jobs_count += 1 else: no_new_jobs_count = 0 last_height = new_height if new_jobs_found == 0 and no_new_jobs_count >= 1: print("🔚 No new jobs loaded. Stopping scroll.") break async def _extract_job_posted_date(self, page) -> str: """ Extract the job posted date from LinkedIn job page Returns date in MM/DD/YY format """ try: # Try multiple selectors for the posted date selectors = [ "span[class*='posted-date']", "span:has-text('ago')", "span:has-text('Posted')", "span.job-details-jobs-unified-top-card__job-insight-view-model-secondary" ] for selector in selectors: date_element = await page.query_selector(selector) if date_element: date_text = await date_element.inner_text() if date_text: # Clean the text date_text = date_text.strip() # Check if it contains "ago" (e.g., "2 hours ago", "1 day ago") if "ago" in date_text.lower(): # Use current date since it's relative current_date = datetime.now() return current_date.strftime("%m/%d/%y") elif "Posted" in date_text: # Extract date from "Posted X days ago" or similar current_date = datetime.now() return current_date.strftime("%m/%d/%y") else: # Try to parse actual date formats # Common LinkedIn format: "Mar 15, 2025" import re date_match = re.search(r'([A-Za-z]+)\s+(\d{1,2}),\s+(\d{4})', date_text) if date_match: month_name = date_match.group(1) day = date_match.group(2) year = date_match.group(3) # Convert month name to number months = { 'Jan': '01', 'Feb': '02', 'Mar': '03', 'Apr': '04', 'May': '05', 'Jun': '06', 'Jul': '07', 'Aug': '08', 'Sep': '09', 'Oct': '10', 'Nov': '11', 'Dec': '12' } month_num = months.get(month_name[:3], '01') return f"{month_num}/{day.zfill(2)}/{year[-2:]}" # If no date found, use current date current_date = datetime.now() return current_date.strftime("%m/%d/%y") except Exception as e: print(f" âš ī¸ Error extracting posted date: {str(e)}") # Return current date as fallback current_date = datetime.now() return current_date.strftime("%m/%d/%y") async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str): """Add failed job to Redis cache for later retry""" try: job_data = { "job_url": job_url, "job_id": job_id, "error_type": error_type, "timestamp": datetime.now().isoformat() } # Use job_id as the key to avoid duplicates self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data)) print(f" đŸ“Ļ Added failed job to Redis cache: {job_id} (Error: {error_type})") except Exception as e: print(f" ❌ Failed to add job to Redis cache: {str(e)}") async def scrape_jobs( self, search_keywords: Optional[str], max_pages: int = 1, credentials: Optional[Dict] = None ): location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) location = location_match.group(1).strip() if location_match else "" clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip() encoded_keywords = clean_keywords.replace(" ", "%20") search_url = f"https://www.linkedin.com/jobs/search/?keywords={encoded_keywords}" if location: search_url += f"&location={location.replace(' ', '%20')}" profile = self.engine._select_profile() renderer = random.choice(self.engine.common_renderers[self.engine.os]) vendor = random.choice(self.engine.common_vendors) spoof_script = self.engine._get_spoof_script(renderer, vendor) async with async_playwright() as pw: browser = await pw.chromium.launch( headless=False, args=['--disable-blink-features=AutomationControlled'] ) context = await AsyncNewContext(browser, fingerprint=profile) await context.add_init_script(f""" Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }}); Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }}); Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }}); """) await context.add_init_script(spoof_script) page = await context.new_page() # Create a temporary fetcher for protection checks on main page temp_fetcher = StealthyFetcher(self.engine, browser, context) session_loaded = await self.engine.load_session(context) login_successful = False if session_loaded: print("🔁 Using saved session — verifying login...") await page.goto("https://www.linkedin.com/feed/", timeout=120000) if "feed" in page.url and "login" not in page.url: print("✅ Session still valid.") login_successful = True else: print("âš ī¸ Saved session expired — re-authenticating.") session_loaded = False if not session_loaded and credentials: print("🔐 Performing fresh login...") login_successful = await self._login(page, credentials) if login_successful: await self.engine.save_session(context) else: print("❌ Login failed. Exiting.") await browser.close() self.engine.report_outcome("block") return elif not credentials: print("â„šī¸ No credentials — proceeding as guest.") login_successful = True await page.wait_for_load_state("load", timeout=120000) print("✅ Post-login page fully loaded. Starting search...") # >>> PROTECTION CHECK USING FETCHER LOGIC <<< protection_type = await temp_fetcher._detect_protection(page) if protection_type: print(f"đŸ›Ąī¸ Protection detected on initial page: {protection_type}") content_accessible = await temp_fetcher._is_content_accessible(page) if not content_accessible: print("🔒 Content not accessible.") handled = False if protection_type == "cloudflare": handled = await self.engine._handle_cloudflare(page) elif protection_type == "captcha": handled = False if not handled: await browser.close() self.engine.report_outcome("protection_block") return else: print("✅ Protection present but content accessible — proceeding.") print(f"🔍 Searching for: {search_keywords}") await page.goto(search_url, wait_until='load', timeout=120000) await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed) # >>> PROTECTION CHECK ON SEARCH PAGE <<< protection_type = await temp_fetcher._detect_protection(page) if protection_type: print(f"đŸ›Ąī¸ Protection detected on search page: {protection_type}") content_accessible = await temp_fetcher._is_content_accessible(page) if not content_accessible: print("🔒 Content not accessible.") handled = False if protection_type == "cloudflare": handled = await self.engine._handle_cloudflare(page) elif protection_type == "captcha": handled = False if not handled: await browser.close() self.engine.report_outcome("protection_block") return else: print("✅ Protection present but content accessible — proceeding.") all_job_links = [] seen_job_ids = set() print("🔄 Collecting initial job links...") initial_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) print(f" ➕ Found {initial_jobs} initial job(s) (total: {len(all_job_links)})") iteration = 1 while iteration <= 5: # Fixed the condition - was "iteration >= 5" which never runs print(f"🔄 Iteration {iteration}: Checking for new jobs...") prev_job_count = len(all_job_links) await self._handle_infinite_scroll(page, search_keywords, seen_job_ids, all_job_links) new_jobs_count = len(all_job_links) - prev_job_count if new_jobs_count > 0: print(f" ➕ Found {new_jobs_count} new jobs via infinite scroll") iteration += 1 continue pagination_exists = await page.query_selector("button[aria-label='Next']") if pagination_exists: print("â­ī¸ Pagination detected. Processing pages...") await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links) iteration += 1 continue else: print("🔄 Refreshing page to check for new results...") await page.reload(wait_until='load') await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) new_jobs_after_refresh = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) if new_jobs_after_refresh > 0: print(f" ➕ Found {new_jobs_after_refresh} new job(s) after refresh") iteration += 1 continue else: print("🔚 No new jobs found after refresh. Stopping.") break print(f"✅ Collected {len(all_job_links)} unique job links.") scraped_count = 0 for idx, (href, title) in enumerate(all_job_links): try: full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}" print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}") fetcher = StealthyFetcher(self.engine, browser, context) job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1.t-24") if not job_page: print(f" ❌ Failed to fetch job page {full_url} after retries.") await self._add_job_to_redis_cache(full_url, full_url.split("/")[-2] if "/jobs/view/" in full_url else "unknown", "fetch_failure") self.engine.report_outcome("fetch_failure", url=full_url) continue # Extract posted date from the job page posted_date = await self._extract_job_posted_date(job_page) print(f" 📅 Posted date extracted: {posted_date}") apply_btn = None apply_selectors = [ "button[aria-label*='Apply']", "button:has-text('Apply')", "a:has-text('Apply')", "button:has-text('Easy Apply')" ] for selector in apply_selectors: apply_btn = await job_page.query_selector(selector) if apply_btn: break final_url = full_url external_url = None page_content = None if apply_btn: print(" → Clicking 'Apply' / 'Easy Apply' button...") page_waiter = asyncio.create_task(context.wait_for_event("page")) await self._human_click(job_page, apply_btn, wait_after=False) external_page = None try: external_page = await asyncio.wait_for(page_waiter, timeout=5.0) print(" 🌐 External job site opened in new tab.") await external_page.wait_for_load_state("load", timeout=120000) await asyncio.sleep(2 * self.human_speed) await self.engine._human_like_scroll(external_page) await asyncio.sleep(2 * self.human_speed) # Extract raw content from external page for LLM processing external_url = external_page.url final_url = external_url page_content = await self._extract_page_content_for_llm(external_page) if not external_page.is_closed(): await external_page.close() except asyncio.TimeoutError: print(" đŸ–Ĩī¸ No external tab — scraping LinkedIn job page directly.") await job_page.wait_for_timeout(60000) try: await job_page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=80000) except PlaywrightTimeoutError: pass await self.engine._human_like_scroll(job_page) await asyncio.sleep(2 * self.human_speed) page_content = await self._extract_page_content_for_llm(job_page) else: print(" âš ī¸ No 'Apply' button found — scraping job details directly.") await self.engine._human_like_scroll(job_page) await asyncio.sleep(2 * self.human_speed) page_content = await self._extract_page_content_for_llm(job_page) job_id = full_url.split("/")[-2] if "/jobs/view/" in full_url else "unknown" raw_data = { "page_content": page_content, "url": final_url, "job_id": job_id, "search_keywords": search_keywords, "posted_date": posted_date # Add the posted date to raw data } # LLM agent is now fully responsible for extraction and validation refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request) if refined_data and refined_data.get("title", "N/A") != "N/A": # Ensure compulsory fields are present (fallback if LLM missed them) compulsory_fields = ['company_name', 'job_id', 'url'] for field in compulsory_fields: if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]: if field == 'job_id': refined_data[field] = job_id elif field == 'url': refined_data[field] = final_url elif field == 'company_name': refined_data[field] = "Unknown Company" refined_data['scraped_at'] = datetime.now().isoformat() refined_data['category'] = clean_keywords refined_data['posted_date'] = posted_date # Add posted date to refined data await self.llm_agent.save_job_data(refined_data, search_keywords) scraped_count += 1 print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...") self.engine.report_outcome("success", url=raw_data["url"]) else: print(f" 🟡 Could not extract meaningful data from: {final_url}") await self._add_job_to_redis_cache(final_url, job_id, "llm_failure") self.engine.report_outcome("llm_failure", url=raw_data["url"]) await job_page.close() except Exception as e: error_msg = str(e)[:100] print(f" âš ī¸ Failed on job {idx+1}: {error_msg}") job_id = full_url.split("/")[-2] if "/jobs/view/" in full_url else "unknown" if 'full_url' in locals() else "unknown" job_url = full_url if 'full_url' in locals() else "unknown" await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}") if 'job_page' in locals() and job_page: await job_page.close() continue finally: print(" â†Šī¸ Returning to LinkedIn search results...") await page.goto(search_url, timeout=120000) await asyncio.sleep(4 * self.human_speed) await browser.close() if scraped_count > 0: self.engine.report_outcome("success") print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.") else: self.engine.report_outcome("captcha") print("âš ī¸ No jobs processed successfully.")