Web_scraping_project/job_scraper2.py

562 lines
27 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import random
from typing import Optional, Dict
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
import re
from fetcher import StealthyFetcher
from datetime import datetime
import json
import redis
class LinkedInJobScraper:
def __init__(
self,
engine,
db_path: str = "linkedin_jobs.db",
human_speed: float = 1.0,
user_request: str = "Extract all standard job details"
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self.user_request = user_request
self._init_db()
self.llm_agent = LLMJobRefiner()
# Initialize Redis connection
self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def _init_db(self):
# This method is kept for backward compatibility but LLMJobRefiner handles PostgreSQL now
pass
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
return True
except:
return False
async def _login(self, page, credentials: Dict) -> bool:
print("🔐 Navigating to LinkedIn login page...")
await page.goto("https://www.linkedin.com/login", timeout=120000)
await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed)
email_field = await page.query_selector('input[name="session_key"]')
if not email_field:
print("❌ Email field not found.")
return False
print("✍️ Typing username...")
await email_field.click()
await asyncio.sleep(random.uniform(0.4, 0.9) * self.human_speed)
for char in credentials["email"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.06, 0.14) * self.human_speed)
await asyncio.sleep(random.uniform(1.0, 1.8) * self.human_speed)
password_field = await page.query_selector('input[name="session_password"]')
if not password_field:
print("❌ Password field not found.")
return False
print("🔒 Typing password...")
await password_field.click()
await asyncio.sleep(random.uniform(0.3, 0.7) * self.human_speed)
for char in credentials["password"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.08, 0.16) * self.human_speed)
await asyncio.sleep(random.uniform(0.8, 1.5) * self.human_speed)
print("✅ Submitting login form...")
await page.keyboard.press("Enter")
for _ in range(15):
current_url = page.url
if "/feed" in current_url or "/jobs" in current_url:
if "login" not in current_url:
print("✅ Login successful!")
await asyncio.sleep(random.uniform(2.0, 3.0) * self.human_speed)
return True
await asyncio.sleep(1)
print("❌ Login may have failed.")
return False
async def _extract_page_content_for_llm(self, page) -> str:
"""
Extract raw page content as HTML/text for LLM processing
The LLM will handle all extraction logic, not specific selectors
"""
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
page_content = await page.content()
return page_content
def _calculate_keyword_match(self, title: str, keywords: str) -> float:
if not title or not keywords:
return 0.0
title_lower = title.lower()
keyword_list = [kw.strip().lower() for kw in keywords.split()]
matches = sum(1 for kw in keyword_list if kw in title_lower)
return matches / len(keyword_list) if keyword_list else 0.0
def _extract_location_from_keywords(self, search_keywords: str) -> str:
location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE)
return location_match.group(1).strip().lower() if location_match else ""
async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links):
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
new_jobs = 0
location_from_keywords = self._extract_location_from_keywords(search_keywords)
for link in current_links:
href = await link.get_attribute("href")
if href:
full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}"
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
if job_id and job_id not in seen_job_ids:
title_element = await link.query_selector("span.job-title, h3, .job-card-title")
if title_element:
title = await title_element.inner_text()
match_percentage = self._calculate_keyword_match(title, search_keywords)
location_match = True
if location_from_keywords:
location_element = await link.query_selector("span.job-location, .job-card-location, .location")
if location_element:
location_text = await location_element.inner_text()
location_match = location_from_keywords in location_text.lower()
if match_percentage >= 0.7 and location_match:
seen_job_ids.add(job_id)
all_job_links.append((href, title))
new_jobs += 1
elif match_percentage < 0.7:
print(f" ⚠️ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})")
elif not location_match:
print(f" ⚠️ Skipping job due to location mismatch: {title[:50]}... (expected: {location_from_keywords})")
else:
seen_job_ids.add(job_id)
all_job_links.append((href, "Unknown Title"))
new_jobs += 1
return new_jobs
async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links):
current_page = 1
while True:
print(f"📄 Processing page {current_page}")
new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})")
next_btn = await page.query_selector("button[aria-label='Next']")
if next_btn and await next_btn.is_enabled():
await self._human_click(page, next_btn)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
try:
await page.wait_for_function("() => window.location.href.includes('start=')", timeout=120000)
except:
pass
current_page += 1
else:
print("🔚 'Next' button not available — stopping pagination.")
break
async def _handle_infinite_scroll(self, page, search_keywords: str, seen_job_ids, all_job_links):
last_height = await page.evaluate("document.body.scrollHeight")
no_new_jobs_count = 0
max_no_new = 3
while no_new_jobs_count < max_no_new:
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
new_jobs_found = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {new_jobs_found} new job(s) (total: {len(all_job_links)})")
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == last_height:
no_new_jobs_count += 1
else:
no_new_jobs_count = 0
last_height = new_height
if new_jobs_found == 0 and no_new_jobs_count >= 1:
print("🔚 No new jobs loaded. Stopping scroll.")
break
async def _extract_job_posted_date(self, page) -> str:
"""
Extract the job posted date from LinkedIn job page
Returns date in MM/DD/YY format
"""
try:
# Try multiple selectors for the posted date
selectors = [
"span[class*='posted-date']",
"span:has-text('ago')",
"span:has-text('Posted')",
"span.job-details-jobs-unified-top-card__job-insight-view-model-secondary"
]
for selector in selectors:
date_element = await page.query_selector(selector)
if date_element:
date_text = await date_element.inner_text()
if date_text:
# Clean the text
date_text = date_text.strip()
# Check if it contains "ago" (e.g., "2 hours ago", "1 day ago")
if "ago" in date_text.lower():
# Use current date since it's relative
current_date = datetime.now()
return current_date.strftime("%m/%d/%y")
elif "Posted" in date_text:
# Extract date from "Posted X days ago" or similar
current_date = datetime.now()
return current_date.strftime("%m/%d/%y")
else:
# Try to parse actual date formats
# Common LinkedIn format: "Mar 15, 2025"
import re
date_match = re.search(r'([A-Za-z]+)\s+(\d{1,2}),\s+(\d{4})', date_text)
if date_match:
month_name = date_match.group(1)
day = date_match.group(2)
year = date_match.group(3)
# Convert month name to number
months = {
'Jan': '01', 'Feb': '02', 'Mar': '03', 'Apr': '04',
'May': '05', 'Jun': '06', 'Jul': '07', 'Aug': '08',
'Sep': '09', 'Oct': '10', 'Nov': '11', 'Dec': '12'
}
month_num = months.get(month_name[:3], '01')
return f"{month_num}/{day.zfill(2)}/{year[-2:]}"
# If no date found, use current date
current_date = datetime.now()
return current_date.strftime("%m/%d/%y")
except Exception as e:
print(f" ⚠️ Error extracting posted date: {str(e)}")
# Return current date as fallback
current_date = datetime.now()
return current_date.strftime("%m/%d/%y")
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
"""Add failed job to Redis cache for later retry"""
try:
job_data = {
"job_url": job_url,
"job_id": job_id,
"error_type": error_type,
"timestamp": datetime.now().isoformat()
}
# Use job_id as the key to avoid duplicates
self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data))
print(f" 📦 Added failed job to Redis cache: {job_id} (Error: {error_type})")
except Exception as e:
print(f" ❌ Failed to add job to Redis cache: {str(e)}")
async def scrape_jobs(
self,
search_keywords: Optional[str],
max_pages: int = 1,
credentials: Optional[Dict] = None
):
location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE)
location = location_match.group(1).strip() if location_match else ""
clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip()
encoded_keywords = clean_keywords.replace(" ", "%20")
search_url = f"https://www.linkedin.com/jobs/search/?keywords={encoded_keywords}"
if location:
search_url += f"&location={location.replace(' ', '%20')}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
# Create a temporary fetcher for protection checks on main page
temp_fetcher = StealthyFetcher(self.engine, browser, context)
session_loaded = await self.engine.load_session(context)
login_successful = False
if session_loaded:
print("🔁 Using saved session — verifying login...")
await page.goto("https://www.linkedin.com/feed/", timeout=120000)
if "feed" in page.url and "login" not in page.url:
print("✅ Session still valid.")
login_successful = True
else:
print("⚠️ Saved session expired — re-authenticating.")
session_loaded = False
if not session_loaded and credentials:
print("🔐 Performing fresh login...")
login_successful = await self._login(page, credentials)
if login_successful:
await self.engine.save_session(context)
else:
print("❌ Login failed. Exiting.")
await browser.close()
self.engine.report_outcome("block")
return
elif not credentials:
print(" No credentials — proceeding as guest.")
login_successful = True
await page.wait_for_load_state("load", timeout=120000)
print("✅ Post-login page fully loaded. Starting search...")
# >>> PROTECTION CHECK USING FETCHER LOGIC <<<
protection_type = await temp_fetcher._detect_protection(page)
if protection_type:
print(f"🛡️ Protection detected on initial page: {protection_type}")
content_accessible = await temp_fetcher._is_content_accessible(page)
if not content_accessible:
print("🔒 Content not accessible.")
handled = False
if protection_type == "cloudflare":
handled = await self.engine._handle_cloudflare(page)
elif protection_type == "captcha":
handled = False
if not handled:
await browser.close()
self.engine.report_outcome("protection_block")
return
else:
print("✅ Protection present but content accessible — proceeding.")
print(f"🔍 Searching for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=120000)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
# >>> PROTECTION CHECK ON SEARCH PAGE <<<
protection_type = await temp_fetcher._detect_protection(page)
if protection_type:
print(f"🛡️ Protection detected on search page: {protection_type}")
content_accessible = await temp_fetcher._is_content_accessible(page)
if not content_accessible:
print("🔒 Content not accessible.")
handled = False
if protection_type == "cloudflare":
handled = await self.engine._handle_cloudflare(page)
elif protection_type == "captcha":
handled = False
if not handled:
await browser.close()
self.engine.report_outcome("protection_block")
return
else:
print("✅ Protection present but content accessible — proceeding.")
all_job_links = []
seen_job_ids = set()
print("🔄 Collecting initial job links...")
initial_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {initial_jobs} initial job(s) (total: {len(all_job_links)})")
iteration = 1
while iteration <= 5: # Fixed the condition - was "iteration >= 5" which never runs
print(f"🔄 Iteration {iteration}: Checking for new jobs...")
prev_job_count = len(all_job_links)
await self._handle_infinite_scroll(page, search_keywords, seen_job_ids, all_job_links)
new_jobs_count = len(all_job_links) - prev_job_count
if new_jobs_count > 0:
print(f" Found {new_jobs_count} new jobs via infinite scroll")
iteration += 1
continue
pagination_exists = await page.query_selector("button[aria-label='Next']")
if pagination_exists:
print("⏭️ Pagination detected. Processing pages...")
await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links)
iteration += 1
continue
else:
print("🔄 Refreshing page to check for new results...")
await page.reload(wait_until='load')
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
new_jobs_after_refresh = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
if new_jobs_after_refresh > 0:
print(f" Found {new_jobs_after_refresh} new job(s) after refresh")
iteration += 1
continue
else:
print("🔚 No new jobs found after refresh. Stopping.")
break
print(f"✅ Collected {len(all_job_links)} unique job links.")
scraped_count = 0
for idx, (href, title) in enumerate(all_job_links):
try:
full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}"
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
fetcher = StealthyFetcher(self.engine, browser, context)
job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1.t-24")
if not job_page:
print(f" ❌ Failed to fetch job page {full_url} after retries.")
await self._add_job_to_redis_cache(full_url, full_url.split("/")[-2] if "/jobs/view/" in full_url else "unknown", "fetch_failure")
self.engine.report_outcome("fetch_failure", url=full_url)
continue
# Extract posted date from the job page
posted_date = await self._extract_job_posted_date(job_page)
print(f" 📅 Posted date extracted: {posted_date}")
apply_btn = None
apply_selectors = [
"button[aria-label*='Apply']",
"button:has-text('Apply')",
"a:has-text('Apply')",
"button:has-text('Easy Apply')"
]
for selector in apply_selectors:
apply_btn = await job_page.query_selector(selector)
if apply_btn:
break
final_url = full_url
external_url = None
page_content = None
if apply_btn:
print(" → Clicking 'Apply' / 'Easy Apply' button...")
page_waiter = asyncio.create_task(context.wait_for_event("page"))
await self._human_click(job_page, apply_btn, wait_after=False)
external_page = None
try:
external_page = await asyncio.wait_for(page_waiter, timeout=5.0)
print(" 🌐 External job site opened in new tab.")
await external_page.wait_for_load_state("load", timeout=120000)
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(external_page)
await asyncio.sleep(2 * self.human_speed)
# Extract raw content from external page for LLM processing
external_url = external_page.url
final_url = external_url
page_content = await self._extract_page_content_for_llm(external_page)
if not external_page.is_closed():
await external_page.close()
except asyncio.TimeoutError:
print(" 🖥️ No external tab — scraping LinkedIn job page directly.")
await job_page.wait_for_timeout(60000)
try:
await job_page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=80000)
except PlaywrightTimeoutError:
pass
await self.engine._human_like_scroll(job_page)
await asyncio.sleep(2 * self.human_speed)
page_content = await self._extract_page_content_for_llm(job_page)
else:
print(" ⚠️ No 'Apply' button found — scraping job details directly.")
await self.engine._human_like_scroll(job_page)
await asyncio.sleep(2 * self.human_speed)
page_content = await self._extract_page_content_for_llm(job_page)
job_id = full_url.split("/")[-2] if "/jobs/view/" in full_url else "unknown"
raw_data = {
"page_content": page_content,
"url": final_url,
"job_id": job_id,
"search_keywords": search_keywords,
"posted_date": posted_date # Add the posted date to raw data
}
# LLM agent is now fully responsible for extraction and validation
refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request)
if refined_data and refined_data.get("title", "N/A") != "N/A":
# Ensure compulsory fields are present (fallback if LLM missed them)
compulsory_fields = ['company_name', 'job_id', 'url']
for field in compulsory_fields:
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]:
if field == 'job_id':
refined_data[field] = job_id
elif field == 'url':
refined_data[field] = final_url
elif field == 'company_name':
refined_data[field] = "Unknown Company"
refined_data['scraped_at'] = datetime.now().isoformat()
refined_data['category'] = clean_keywords
refined_data['posted_date'] = posted_date # Add posted date to refined data
await self.llm_agent.save_job_data(refined_data, search_keywords)
scraped_count += 1
print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...")
self.engine.report_outcome("success", url=raw_data["url"])
else:
print(f" 🟡 Could not extract meaningful data from: {final_url}")
await self._add_job_to_redis_cache(final_url, job_id, "llm_failure")
self.engine.report_outcome("llm_failure", url=raw_data["url"])
await job_page.close()
except Exception as e:
error_msg = str(e)[:100]
print(f" ⚠️ Failed on job {idx+1}: {error_msg}")
job_id = full_url.split("/")[-2] if "/jobs/view/" in full_url else "unknown" if 'full_url' in locals() else "unknown"
job_url = full_url if 'full_url' in locals() else "unknown"
await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}")
if 'job_page' in locals() and job_page:
await job_page.close()
continue
finally:
print(" ↩️ Returning to LinkedIn search results...")
await page.goto(search_url, timeout=120000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.")
else:
self.engine.report_outcome("captcha")
print("⚠️ No jobs processed successfully.")