Web_scraping_project/job_scraper.py
2025-11-20 18:59:46 +00:00

309 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import random
import sqlite3
import os
from datetime import datetime
from typing import Optional, Dict
from playwright.async_api import async_playwright
from browserforge.injectors.playwright import AsyncNewContext
class LinkedInJobScraper:
def __init__(
self,
engine,
db_path: str = "linkedin_jobs.db",
human_speed: float = 1.0
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self._init_db()
def _init_db(self):
os.makedirs(os.path.dirname(self.db_path) if os.path.dirname(self.db_path) else ".", exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT,
title TEXT,
company TEXT,
location TEXT,
salary TEXT,
description TEXT,
url TEXT UNIQUE,
workplace_type TEXT,
scraped_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
return True
except:
return False
async def _login(self, page, credentials: Dict) -> bool:
"""Human-realistic LinkedIn login"""
print("🔐 Navigating to LinkedIn login page...")
await page.goto("https://www.linkedin.com/login", timeout=60000)
await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed)
email_field = await page.query_selector('input[name="session_key"]')
if not email_field:
print("❌ Email field not found.")
return False
print("✍️ Typing username...")
await email_field.click()
await asyncio.sleep(random.uniform(0.4, 0.9) * self.human_speed)
for char in credentials["email"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.06, 0.14) * self.human_speed)
await asyncio.sleep(random.uniform(1.0, 1.8) * self.human_speed)
password_field = await page.query_selector('input[name="session_password"]')
if not password_field:
print("❌ Password field not found.")
return False
print("🔒 Typing password...")
await password_field.click()
await asyncio.sleep(random.uniform(0.3, 0.7) * self.human_speed)
for char in credentials["password"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.08, 0.16) * self.human_speed)
await asyncio.sleep(random.uniform(0.8, 1.5) * self.human_speed)
print("✅ Submitting login form...")
await page.keyboard.press("Enter")
for _ in range(15):
current_url = page.url
if "/feed" in current_url or "/jobs" in current_url:
if "login" not in current_url:
print("✅ Login successful!")
await asyncio.sleep(random.uniform(2.0, 3.0) * self.human_speed)
return True
await asyncio.sleep(1)
print("❌ Login may have failed.")
return False
async def _extract_job_details(self, page) -> Dict:
"""Extract from ANY job page: LinkedIn Easy Apply OR external site"""
await asyncio.sleep(2 * self.human_speed)
async def get_text(selector: str) -> str:
try:
el = await page.query_selector(selector)
if el:
text = await el.inner_text()
return text.strip() if text else "N/A"
except:
pass
return "N/A"
# Try multiple strategies for each field
title = await get_text("h1.t-24") # LinkedIn
if title == "N/A":
title = await get_text("h1, h2") # External
company = await get_text("a.app-aware-link[href*='/company/']") # LinkedIn
if company == "N/A":
company = await get_text("div.org, .company, [class*='company']") # External
location = await get_text("span[class*='location']") # LinkedIn
if location == "N/A":
location = await get_text(".location, [class*='location']")
description = await get_text("div[class*='description__text']") # LinkedIn
if description == "N/A":
description = await get_text(".job-desc, .description, main, body")
# Workplace & salary — LinkedIn only (external may not have)
workplace = await get_text("span.job-workplace-type") or "N/A"
salary = await get_text("span.salary") or "N/A"
return {
"title": title,
"company": company,
"location": location,
"workplace_type": workplace,
"salary": salary,
"description": description,
"url": page.url
}
async def _save_to_markdown(self, job_data: Dict, keyword: str):
os.makedirs("linkedin_jobs", exist_ok=True)
clean_keyword = keyword.replace(" ", "_")
filename = f"linkedin_{clean_keyword}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
filepath = os.path.join("linkedin_jobs", filename)
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"# {job_data['title']}\n\n")
f.write(f"- **Company**: {job_data['company']}\n")
f.write(f"- **Location**: {job_data['location']}\n")
f.write(f"- **Workplace**: {job_data['workplace_type']}\n")
f.write(f"- **Salary**: {job_data['salary']}\n")
f.write(f"- **URL**: <{job_data['url']}>\n\n")
f.write(f"## Description\n\n{job_data['description']}\n")
async def _save_to_db(self, job_data: Dict, keyword: str):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO jobs
(keyword, title, company, location, salary, description, url, workplace_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
keyword,
job_data["title"],
job_data["company"],
job_data["location"],
job_data["salary"],
job_data["description"],
job_data["url"],
job_data["workplace_type"]
))
conn.commit()
async def scrape_jobs(
self,
search_keywords: str,
max_pages: int = 1,
credentials: Optional[Dict] = None
):
encoded_keywords = search_keywords.replace(" ", "%20")
search_url = f"https://www.linkedin.com/jobs/search/?keywords={encoded_keywords}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
if credentials:
print("🔐 Attempting LinkedIn login...")
if not await self._login(page, credentials):
print("❌ Login failed. Exiting.")
await browser.close()
return
else:
print(" No credentials — proceeding as guest.")
print(f"🔍 Searching for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=60000)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
scraped_count = 0
all_job_links = []
# Collect job links
for page_num in range(1, max_pages + 1):
print(f"📄 Collecting job links from page {page_num}/{max_pages}")
for _ in range(50):
links = await page.query_selector_all("a[href*='/jobs/view/']")
if links:
for link in links:
href = await link.get_attribute("href")
if href and href not in all_job_links:
all_job_links.append(href)
break
await asyncio.sleep(1)
print(f" Found {len(links) if 'links' in locals() else 0} new job links.")
if page_num < max_pages:
next_btn = await page.query_selector("button[aria-label='Next']")
if next_btn and await next_btn.is_enabled():
await self._human_click(page, next_btn)
await asyncio.sleep(4 * self.human_speed)
else:
print("🔚 No next page.")
break
# Process each job
for idx, href in enumerate(all_job_links):
try:
full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}"
print(f" → Opening job {idx+1}: {full_url}")
await page.goto(full_url, wait_until='load', timeout=60000)
await asyncio.sleep(3 * self.human_speed)
if not await page.query_selector("h1.t-24"):
print(f" ⚠️ Invalid job page, skipping.")
continue
# Find and click the main "Apply" button
apply_btn = None
apply_selectors = [
"button[aria-label*='Apply']",
"button:has-text('Apply')",
"a:has-text('Apply')",
"button:has-text('Easy Apply')"
]
for selector in apply_selectors:
apply_btn = await page.query_selector(selector)
if apply_btn:
break
if not apply_btn:
print(f" ⚠️ No 'Apply' button found, skipping.")
continue
# Click "Apply"
print(f" → Clicking 'Apply' / 'Easy Apply' button...")
await self._human_click(page, apply_btn, wait_after=False)
await asyncio.sleep(4 * self.human_speed) # Wait for next page/form to load
# Now scrape WHATEVER page is displayed (Easy Apply form OR external site)
job_data = await self._extract_job_details(page)
if job_data["title"] == "N/A" and "linkedin.com" in page.url:
# On LinkedIn but no title → likely Easy Apply form; use job ID as title
job_id = full_url.split("/")[-2] if "/jobs/view/" in full_url else "unknown"
job_data["title"] = f"Easy Apply Job - ID {job_id}"
await self._save_to_db(job_data, search_keywords)
await self._save_to_markdown(job_data, search_keywords)
scraped_count += 1
domain = "LinkedIn (Easy Apply)" if "linkedin.com" in page.url else "External Site"
print(f" ✅ Scraped ({domain}): {job_data['title'][:50]}...")
except Exception as e:
print(f" ⚠️ Failed on job {idx+1}: {str(e)[:100]}")
finally:
# Return to search results
print(" ↩️ Returning to LinkedIn search results...")
await page.goto(search_url, timeout=60000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()
print(f"✅ Completed! Scraped {scraped_count} job pages (internal + external) for '{search_keywords}'.")