modify to include scraping date posted, queuing failed jobs to be sent to redis for later scraping with back-up scraper.
This commit is contained in:
commit
2b1387b3e6
491
job_scraper.py
491
job_scraper.py
@ -1,491 +0,0 @@
|
|||||||
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import random
|
|
||||||
import sqlite3
|
|
||||||
import os
|
|
||||||
from datetime import datetime
|
|
||||||
from typing import Optional, Dict, List
|
|
||||||
from playwright.async_api import async_playwright
|
|
||||||
from browserforge.injectors.playwright import AsyncNewContext
|
|
||||||
|
|
||||||
|
|
||||||
class LinkedInJobScraper:
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
engine,
|
|
||||||
db_path: str = "linkedin_jobs.db",
|
|
||||||
human_speed: float = 1.0
|
|
||||||
):
|
|
||||||
self.engine = engine
|
|
||||||
self.db_path = db_path
|
|
||||||
self.human_speed = human_speed
|
|
||||||
self._init_db()
|
|
||||||
|
|
||||||
def _init_db(self):
|
|
||||||
os.makedirs(os.path.dirname(self.db_path) if os.path.dirname(self.db_path) else ".", exist_ok=True)
|
|
||||||
with sqlite3.connect(self.db_path) as conn:
|
|
||||||
cursor = conn.cursor()
|
|
||||||
cursor.execute('''
|
|
||||||
CREATE TABLE IF NOT EXISTS jobs (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
keyword TEXT,
|
|
||||||
title TEXT,
|
|
||||||
company TEXT,
|
|
||||||
location TEXT,
|
|
||||||
salary TEXT,
|
|
||||||
description TEXT,
|
|
||||||
url TEXT UNIQUE,
|
|
||||||
workplace_type TEXT,
|
|
||||||
scraped_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
||||||
)
|
|
||||||
''')
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
async def _human_click(self, page, element, wait_after: bool = True):
|
|
||||||
if not element:
|
|
||||||
return False
|
|
||||||
await element.scroll_into_view_if_needed()
|
|
||||||
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
|
|
||||||
try:
|
|
||||||
await element.click()
|
|
||||||
if wait_after:
|
|
||||||
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
|
|
||||||
return True
|
|
||||||
except:
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def _login(self, page, credentials: Dict) -> bool:
|
|
||||||
"""Human-realistic LinkedIn login"""
|
|
||||||
print("🔐 Navigating to LinkedIn login page...")
|
|
||||||
await page.goto("https://www.linkedin.com/login", timeout=60000)
|
|
||||||
await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed)
|
|
||||||
|
|
||||||
email_field = await page.query_selector('input[name="session_key"]')
|
|
||||||
if not email_field:
|
|
||||||
print("❌ Email field not found.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
print("✍️ Typing username...")
|
|
||||||
await email_field.click()
|
|
||||||
await asyncio.sleep(random.uniform(0.4, 0.9) * self.human_speed)
|
|
||||||
for char in credentials["email"]:
|
|
||||||
await page.keyboard.type(char)
|
|
||||||
await asyncio.sleep(random.uniform(0.06, 0.14) * self.human_speed)
|
|
||||||
await asyncio.sleep(random.uniform(1.0, 1.8) * self.human_speed)
|
|
||||||
|
|
||||||
password_field = await page.query_selector('input[name="session_password"]')
|
|
||||||
if not password_field:
|
|
||||||
print("❌ Password field not found.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
print("🔒 Typing password...")
|
|
||||||
await password_field.click()
|
|
||||||
await asyncio.sleep(random.uniform(0.3, 0.7) * self.human_speed)
|
|
||||||
for char in credentials["password"]:
|
|
||||||
await page.keyboard.type(char)
|
|
||||||
await asyncio.sleep(random.uniform(0.08, 0.16) * self.human_speed)
|
|
||||||
await asyncio.sleep(random.uniform(0.8, 1.5) * self.human_speed)
|
|
||||||
|
|
||||||
print("✅ Submitting login form...")
|
|
||||||
await page.keyboard.press("Enter")
|
|
||||||
|
|
||||||
for _ in range(15):
|
|
||||||
current_url = page.url
|
|
||||||
if "/feed" in current_url or "/jobs" in current_url:
|
|
||||||
if "login" not in current_url:
|
|
||||||
print("✅ Login successful!")
|
|
||||||
await asyncio.sleep(random.uniform(2.0, 3.0) * self.human_speed)
|
|
||||||
return True
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
print("❌ Login may have failed.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def _extract_job_details(self, page) -> Dict:
|
|
||||||
"""Extract from ANY job page: LinkedIn Easy Apply OR external site"""
|
|
||||||
await asyncio.sleep(2 * self.human_speed)
|
|
||||||
|
|
||||||
async def get_text(selector: str) -> str:
|
|
||||||
try:
|
|
||||||
el = await page.query_selector(selector)
|
|
||||||
if el:
|
|
||||||
text = await el.inner_text()
|
|
||||||
return text.strip() if text else "N/A"
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
return "N/A"
|
|
||||||
|
|
||||||
title = await get_text("h1.t-24")
|
|
||||||
if title == "N/A":
|
|
||||||
title = await get_text("h1, h2")
|
|
||||||
|
|
||||||
company = await get_text("a.app-aware-link[href*='/company/']")
|
|
||||||
if company == "N/A":
|
|
||||||
company = await get_text("div.org, .company, [class*='company']")
|
|
||||||
|
|
||||||
location = await get_text("span[class*='location']")
|
|
||||||
if location == "N/A":
|
|
||||||
location = await get_text(".location, [class*='location']")
|
|
||||||
|
|
||||||
description = await get_text("div[class*='description__text']")
|
|
||||||
if description == "N/A":
|
|
||||||
description = await get_text(".job-desc, .description, main, body")
|
|
||||||
|
|
||||||
workplace = await get_text("span.job-workplace-type") or "N/A"
|
|
||||||
salary = await get_text("span.salary") or "N/A"
|
|
||||||
|
|
||||||
return {
|
|
||||||
"title": title,
|
|
||||||
"company": company,
|
|
||||||
"location": location,
|
|
||||||
"workplace_type": workplace,
|
|
||||||
"salary": salary,
|
|
||||||
"description": description,
|
|
||||||
"url": page.url
|
|
||||||
}
|
|
||||||
|
|
||||||
async def _save_to_markdown(self, job_data: Dict, keyword: str, verified: bool=True):
|
|
||||||
"""Save to appropriate folder using job ID to avoid duplication"""
|
|
||||||
folder = "linkedin_jobs" if verified else "linkedin_jobs_unverified"
|
|
||||||
os.makedirs(folder, exist_ok=True)
|
|
||||||
|
|
||||||
# Extract job ID from URL for LinkedIn jobs
|
|
||||||
url = job_data.get("url", "")
|
|
||||||
if "/jobs/view/" in url:
|
|
||||||
try:
|
|
||||||
job_id = url.split("/view/")[1].split("/")[0]
|
|
||||||
except:
|
|
||||||
job_id = "unknown"
|
|
||||||
else:
|
|
||||||
# For external jobs, use a hash of the URL (first 12 chars)
|
|
||||||
import hashlib
|
|
||||||
job_id = hashlib.md5(url.encode()).hexdigest()[:12]
|
|
||||||
|
|
||||||
clean_keyword = keyword.replace(" ", "_")
|
|
||||||
filename = f"linkedin_{clean_keyword}_job_{job_id}.md"
|
|
||||||
filepath = os.path.join(folder, filename)
|
|
||||||
|
|
||||||
# Only save if file doesn't already exist (idempotent)
|
|
||||||
if os.path.exists(filepath):
|
|
||||||
print(f" 📝 Skipping duplicate Markdown file: {filename}")
|
|
||||||
return
|
|
||||||
|
|
||||||
with open(filepath, "w", encoding="utf-8") as f:
|
|
||||||
f.write(f"# {job_data['title']}\n\n")
|
|
||||||
f.write(f"- **Company**: {job_data['company']}\n")
|
|
||||||
f.write(f"- **Location**: {job_data['location']}\n")
|
|
||||||
f.write(f"- **Workplace**: {job_data['workplace_type']}\n")
|
|
||||||
f.write(f"- **Salary**: {job_data['salary']}\n")
|
|
||||||
f.write(f"- **URL**: <{url}>\n\n")
|
|
||||||
f.write(f"## Description\n\n{job_data['description']}\n")
|
|
||||||
|
|
||||||
async def _save_to_db(self, job_data: Dict, keyword: str):
|
|
||||||
with sqlite3.connect(self.db_path) as conn:
|
|
||||||
cursor = conn.cursor()
|
|
||||||
cursor.execute('''
|
|
||||||
INSERT OR IGNORE INTO jobs
|
|
||||||
(keyword, title, company, location, salary, description, url, workplace_type)
|
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
||||||
''', (
|
|
||||||
keyword,
|
|
||||||
job_data["title"],
|
|
||||||
job_data["company"],
|
|
||||||
job_data["location"],
|
|
||||||
job_data["salary"],
|
|
||||||
job_data["description"],
|
|
||||||
job_data["url"],
|
|
||||||
job_data["workplace_type"]
|
|
||||||
))
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
async def scrape_jobs(
|
|
||||||
self,
|
|
||||||
search_keywords: Optional[str],
|
|
||||||
max_pages: int = 1,
|
|
||||||
credentials: Optional[Dict] = None
|
|
||||||
):
|
|
||||||
encoded_keywords = search_keywords.replace(" ", "%20")
|
|
||||||
search_url = f"https://www.linkedin.com/jobs/search/?keywords={encoded_keywords}"
|
|
||||||
|
|
||||||
profile = self.engine._select_profile()
|
|
||||||
renderer = random.choice(self.engine.common_renderers[self.engine.os])
|
|
||||||
vendor = random.choice(self.engine.common_vendors)
|
|
||||||
spoof_script = self.engine._get_spoof_script(renderer, vendor)
|
|
||||||
|
|
||||||
async with async_playwright() as pw:
|
|
||||||
browser = await pw.chromium.launch(
|
|
||||||
headless= False,
|
|
||||||
args=['--disable-blink-features=AutomationControlled']
|
|
||||||
)
|
|
||||||
context = await AsyncNewContext(browser, fingerprint=profile)
|
|
||||||
|
|
||||||
await context.add_init_script(f"""
|
|
||||||
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
|
|
||||||
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
|
|
||||||
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
|
|
||||||
""")
|
|
||||||
await context.add_init_script(spoof_script)
|
|
||||||
|
|
||||||
page = await context.new_page()
|
|
||||||
|
|
||||||
session_loaded = await self.engine.load_session(context)
|
|
||||||
login_successful = False
|
|
||||||
|
|
||||||
if session_loaded:
|
|
||||||
print("🔁 Using saved session — verifying login...")
|
|
||||||
await page.goto("https://www.linkedin.com/feed/", timeout=80000)
|
|
||||||
if "feed" in page.url and "login" not in page.url:
|
|
||||||
print("✅ Session still valid.")
|
|
||||||
login_successful = True
|
|
||||||
else:
|
|
||||||
print("⚠️ Saved session expired — re-authenticating.")
|
|
||||||
session_loaded = False
|
|
||||||
|
|
||||||
if not session_loaded and credentials:
|
|
||||||
print("🔐 Performing fresh login...")
|
|
||||||
login_successful = await self._login(page, credentials)
|
|
||||||
if login_successful:
|
|
||||||
await self.engine.save_session(context)
|
|
||||||
else:
|
|
||||||
print("❌ Login failed. Exiting.")
|
|
||||||
await browser.close()
|
|
||||||
self.engine.report_outcome("block")
|
|
||||||
return
|
|
||||||
elif not credentials:
|
|
||||||
print("ℹ️ No credentials — proceeding as guest.")
|
|
||||||
login_successful = True
|
|
||||||
else:
|
|
||||||
pass
|
|
||||||
|
|
||||||
await page.wait_for_load_state("load", timeout=60000)
|
|
||||||
print("✅ Post-login page fully loaded. Starting search...")
|
|
||||||
|
|
||||||
if await self.engine._detect_cloudflare(page):
|
|
||||||
print("☁️ Cloudflare detected on initial load.")
|
|
||||||
if not await self.engine._handle_cloudflare(page):
|
|
||||||
print("❌ Cloudflare could not be resolved.")
|
|
||||||
await browser.close()
|
|
||||||
self.engine.report_outcome("cloudflare")
|
|
||||||
return
|
|
||||||
|
|
||||||
print(f"🔍 Searching for: {search_keywords}")
|
|
||||||
await page.goto(search_url, wait_until='load', timeout=80000)
|
|
||||||
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
|
|
||||||
|
|
||||||
if await self.engine._detect_cloudflare(page):
|
|
||||||
print("☁️ Cloudflare detected on search page.")
|
|
||||||
if not await self.engine._handle_cloudflare(page):
|
|
||||||
await browser.close()
|
|
||||||
self.engine.report_outcome("cloudflare")
|
|
||||||
return
|
|
||||||
|
|
||||||
scraped_count = 0
|
|
||||||
all_job_links = []
|
|
||||||
seen_job_ids = set()
|
|
||||||
|
|
||||||
# ← NEW: Scroll once to reveal pagination (if any)
|
|
||||||
print("🔄 Scrolling to bottom to reveal pagination controls...")
|
|
||||||
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
|
||||||
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
|
|
||||||
|
|
||||||
# Check if pagination exists
|
|
||||||
pagination_exists = await page.query_selector("button[aria-label='Next']")
|
|
||||||
if pagination_exists:
|
|
||||||
print("⏭️ Pagination detected. Using page navigation.")
|
|
||||||
current_page = 1
|
|
||||||
while current_page <= max_pages:
|
|
||||||
print(f"📄 Processing page {current_page}/{max_pages}")
|
|
||||||
|
|
||||||
# Collect job links on current page
|
|
||||||
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
|
|
||||||
new_jobs = 0
|
|
||||||
for link in current_links:
|
|
||||||
href = await link.get_attribute("href")
|
|
||||||
if href:
|
|
||||||
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
|
|
||||||
if job_id and job_id not in seen_job_ids:
|
|
||||||
seen_job_ids.add(job_id)
|
|
||||||
all_job_links.append(href)
|
|
||||||
new_jobs += 1
|
|
||||||
|
|
||||||
print(f" ➕ Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})")
|
|
||||||
|
|
||||||
# Try to go to next page
|
|
||||||
if current_page < max_pages:
|
|
||||||
next_btn = await page.query_selector("button[aria-label='Next']")
|
|
||||||
if next_btn and await next_btn.is_enabled():
|
|
||||||
await self._human_click(page, next_btn)
|
|
||||||
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
|
|
||||||
# Wait for URL to change or new content
|
|
||||||
try:
|
|
||||||
await page.wait_for_function("() => window.location.href.includes('start=')", timeout=60000)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
print("🔚 'Next' button not available — stopping pagination.")
|
|
||||||
break
|
|
||||||
current_page += 1
|
|
||||||
|
|
||||||
else:
|
|
||||||
print("🔄 No pagination found. Falling back to infinite scroll...")
|
|
||||||
last_height = await page.evaluate("document.body.scrollHeight")
|
|
||||||
no_new_jobs_count = 0
|
|
||||||
max_no_new = 3
|
|
||||||
|
|
||||||
while no_new_jobs_count < max_no_new:
|
|
||||||
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
|
||||||
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
|
|
||||||
|
|
||||||
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
|
|
||||||
new_jobs_found = 0
|
|
||||||
|
|
||||||
for link in current_links:
|
|
||||||
href = await link.get_attribute("href")
|
|
||||||
if href:
|
|
||||||
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
|
|
||||||
if job_id and job_id not in seen_job_ids:
|
|
||||||
seen_job_ids.add(job_id)
|
|
||||||
all_job_links.append(href)
|
|
||||||
new_jobs_found += 1
|
|
||||||
|
|
||||||
print(f" ➕ Found {new_jobs_found} new job(s) (total: {len(all_job_links)})")
|
|
||||||
|
|
||||||
new_height = await page.evaluate("document.body.scrollHeight")
|
|
||||||
if new_height == last_height:
|
|
||||||
no_new_jobs_count += 1
|
|
||||||
else:
|
|
||||||
no_new_jobs_count = 0
|
|
||||||
last_height = new_height
|
|
||||||
|
|
||||||
if new_jobs_found == 0 and no_new_jobs_count >= 1:
|
|
||||||
print("🔚 No new jobs loaded. Stopping scroll.")
|
|
||||||
break
|
|
||||||
|
|
||||||
print(f"✅ Collected {len(all_job_links)} unique job links.")
|
|
||||||
|
|
||||||
# ← Rest of job processing loop unchanged
|
|
||||||
scraped_count = 0
|
|
||||||
for idx, href in enumerate(all_job_links):
|
|
||||||
try:
|
|
||||||
full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}"
|
|
||||||
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
|
|
||||||
await page.goto(full_url, wait_until='load', timeout=60000)
|
|
||||||
await asyncio.sleep(3 * self.human_speed)
|
|
||||||
|
|
||||||
is_cloudflare = await self.engine._detect_cloudflare(page)
|
|
||||||
page_content = await page.content()
|
|
||||||
has_captcha_text = "captcha" in page_content.lower()
|
|
||||||
captcha_present = is_cloudflare or has_captcha_text
|
|
||||||
|
|
||||||
title_element = await page.query_selector("h1.t-24")
|
|
||||||
job_data_accessible = title_element is not None
|
|
||||||
|
|
||||||
if captcha_present:
|
|
||||||
if job_data_accessible:
|
|
||||||
print(" ⚠️ CAPTCHA detected, but job data is accessible. Proceeding in stealth mode...")
|
|
||||||
await self.engine._avoid_captcha(page)
|
|
||||||
else:
|
|
||||||
print(" ⚠️ CAPTCHA detected and job data blocked. Attempting recovery...")
|
|
||||||
if not await self.engine._solve_captcha_fallback(page):
|
|
||||||
print(" ❌ CAPTCHA recovery failed. Skipping job.")
|
|
||||||
continue
|
|
||||||
title_element = await page.query_selector("h1.t-24")
|
|
||||||
if not title_element:
|
|
||||||
print(" ❌ Job data still unavailable after CAPTCHA handling. Skipping.")
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not captcha_present:
|
|
||||||
await self.engine._avoid_captcha(page)
|
|
||||||
|
|
||||||
apply_btn = None
|
|
||||||
apply_selectors = [
|
|
||||||
"button[aria-label*='Apply']",
|
|
||||||
"button:has-text('Apply')",
|
|
||||||
"a:has-text('Apply')",
|
|
||||||
"button:has-text('Easy Apply')"
|
|
||||||
]
|
|
||||||
for selector in apply_selectors:
|
|
||||||
apply_btn = await page.query_selector(selector)
|
|
||||||
if apply_btn:
|
|
||||||
break
|
|
||||||
|
|
||||||
job_data = None
|
|
||||||
final_url = full_url
|
|
||||||
|
|
||||||
if apply_btn:
|
|
||||||
print(" → Clicking 'Apply' / 'Easy Apply' button...")
|
|
||||||
|
|
||||||
page_waiter = asyncio.create_task(context.wait_for_event("page"))
|
|
||||||
await self._human_click(page, apply_btn, wait_after=False)
|
|
||||||
|
|
||||||
external_page = None
|
|
||||||
try:
|
|
||||||
external_page = await asyncio.wait_for(page_waiter, timeout=5.0)
|
|
||||||
print(" 🌐 External job site opened in new tab.")
|
|
||||||
await external_page.wait_for_load_state("load", timeout=60000)
|
|
||||||
await asyncio.sleep(2 * self.human_speed)
|
|
||||||
await self.engine._human_like_scroll(external_page)
|
|
||||||
await asyncio.sleep(2 * self.human_speed)
|
|
||||||
|
|
||||||
job_data = await self._extract_job_details(external_page)
|
|
||||||
final_url = external_page.url
|
|
||||||
|
|
||||||
if not external_page.is_closed():
|
|
||||||
await external_page.close()
|
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
print(" 🖥️ No external tab — scraping LinkedIn job page.")
|
|
||||||
await page.wait_for_timeout(2000)
|
|
||||||
try:
|
|
||||||
await page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=8000)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
await self.engine._human_like_scroll(page)
|
|
||||||
await asyncio.sleep(2 * self.human_speed)
|
|
||||||
job_data = await self._extract_job_details(page)
|
|
||||||
final_url = page.url
|
|
||||||
else:
|
|
||||||
print(" ⚠️ No 'Apply' button found — scraping job details directly.")
|
|
||||||
await self.engine._human_like_scroll(page)
|
|
||||||
await asyncio.sleep(2 * self.human_speed)
|
|
||||||
job_data = await self._extract_job_details(page)
|
|
||||||
final_url = page.url
|
|
||||||
|
|
||||||
job_data["url"] = final_url
|
|
||||||
|
|
||||||
if job_data["title"] == "N/A" and "linkedin.com" in final_url:
|
|
||||||
job_id = final_url.split("/")[-2] if "/jobs/view/" in final_url else "unknown"
|
|
||||||
job_data["title"] = f"Easy Apply Job - ID {job_id}"
|
|
||||||
|
|
||||||
is_meaningful = (
|
|
||||||
job_data["title"] != "N/A" or
|
|
||||||
job_data["company"] != "N/A" or
|
|
||||||
(job_data["description"] != "N/A" and len(job_data["description"]) > 20)
|
|
||||||
)
|
|
||||||
|
|
||||||
if is_meaningful:
|
|
||||||
await self._save_to_db(job_data, search_keywords)
|
|
||||||
await self._save_to_markdown(job_data, search_keywords, verified=True)
|
|
||||||
scraped_count += 1
|
|
||||||
print(f" ✅ Scraped (verified): {job_data['title'][:50]}...")
|
|
||||||
else:
|
|
||||||
await self._save_to_markdown(job_data, search_keywords, verified=False)
|
|
||||||
print(f" 🟡 Scraped (unverified): {final_url} — low-quality data")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ Failed on job {idx+1}: {str(e)[:100]}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
finally:
|
|
||||||
print(" ↩️ Returning to LinkedIn search results...")
|
|
||||||
await page.goto(search_url, timeout=60000)
|
|
||||||
await asyncio.sleep(4 * self.human_speed)
|
|
||||||
|
|
||||||
await browser.close()
|
|
||||||
|
|
||||||
if scraped_count > 0:
|
|
||||||
self.engine.report_outcome("success")
|
|
||||||
print(f"✅ Completed! Saved {scraped_count} verified + additional unverified jobs for '{search_keywords}'.")
|
|
||||||
else:
|
|
||||||
self.engine.report_outcome("captcha")
|
|
||||||
print("⚠️ No verified jobs scraped — check 'linkedin_jobs_unverified' for raw outputs.")
|
|
||||||
@ -1,3 +1,4 @@
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import random
|
import random
|
||||||
from typing import Optional, Dict
|
from typing import Optional, Dict
|
||||||
@ -7,6 +8,8 @@ from llm_agent import LLMJobRefiner
|
|||||||
import re
|
import re
|
||||||
from fetcher import StealthyFetcher
|
from fetcher import StealthyFetcher
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
import json
|
||||||
|
import redis
|
||||||
|
|
||||||
|
|
||||||
class LinkedInJobScraper:
|
class LinkedInJobScraper:
|
||||||
@ -23,6 +26,8 @@ class LinkedInJobScraper:
|
|||||||
self.user_request = user_request
|
self.user_request = user_request
|
||||||
self._init_db()
|
self._init_db()
|
||||||
self.llm_agent = LLMJobRefiner()
|
self.llm_agent = LLMJobRefiner()
|
||||||
|
# Initialize Redis connection
|
||||||
|
self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
|
||||||
|
|
||||||
def _init_db(self):
|
def _init_db(self):
|
||||||
# This method is kept for backward compatibility but LLMJobRefiner handles PostgreSQL now
|
# This method is kept for backward compatibility but LLMJobRefiner handles PostgreSQL now
|
||||||
@ -189,6 +194,82 @@ class LinkedInJobScraper:
|
|||||||
print("🔚 No new jobs loaded. Stopping scroll.")
|
print("🔚 No new jobs loaded. Stopping scroll.")
|
||||||
break
|
break
|
||||||
|
|
||||||
|
async def _extract_job_posted_date(self, page) -> str:
|
||||||
|
"""
|
||||||
|
Extract the job posted date from LinkedIn job page
|
||||||
|
Returns date in MM/DD/YY format
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Try multiple selectors for the posted date
|
||||||
|
selectors = [
|
||||||
|
"span[class*='posted-date']",
|
||||||
|
"span:has-text('ago')",
|
||||||
|
"span:has-text('Posted')",
|
||||||
|
"span.job-details-jobs-unified-top-card__job-insight-view-model-secondary"
|
||||||
|
]
|
||||||
|
|
||||||
|
for selector in selectors:
|
||||||
|
date_element = await page.query_selector(selector)
|
||||||
|
if date_element:
|
||||||
|
date_text = await date_element.inner_text()
|
||||||
|
if date_text:
|
||||||
|
# Clean the text
|
||||||
|
date_text = date_text.strip()
|
||||||
|
|
||||||
|
# Check if it contains "ago" (e.g., "2 hours ago", "1 day ago")
|
||||||
|
if "ago" in date_text.lower():
|
||||||
|
# Use current date since it's relative
|
||||||
|
current_date = datetime.now()
|
||||||
|
return current_date.strftime("%m/%d/%y")
|
||||||
|
elif "Posted" in date_text:
|
||||||
|
# Extract date from "Posted X days ago" or similar
|
||||||
|
current_date = datetime.now()
|
||||||
|
return current_date.strftime("%m/%d/%y")
|
||||||
|
else:
|
||||||
|
# Try to parse actual date formats
|
||||||
|
# Common LinkedIn format: "Mar 15, 2025"
|
||||||
|
import re
|
||||||
|
date_match = re.search(r'([A-Za-z]+)\s+(\d{1,2}),\s+(\d{4})', date_text)
|
||||||
|
if date_match:
|
||||||
|
month_name = date_match.group(1)
|
||||||
|
day = date_match.group(2)
|
||||||
|
year = date_match.group(3)
|
||||||
|
|
||||||
|
# Convert month name to number
|
||||||
|
months = {
|
||||||
|
'Jan': '01', 'Feb': '02', 'Mar': '03', 'Apr': '04',
|
||||||
|
'May': '05', 'Jun': '06', 'Jul': '07', 'Aug': '08',
|
||||||
|
'Sep': '09', 'Oct': '10', 'Nov': '11', 'Dec': '12'
|
||||||
|
}
|
||||||
|
|
||||||
|
month_num = months.get(month_name[:3], '01')
|
||||||
|
return f"{month_num}/{day.zfill(2)}/{year[-2:]}"
|
||||||
|
|
||||||
|
# If no date found, use current date
|
||||||
|
current_date = datetime.now()
|
||||||
|
return current_date.strftime("%m/%d/%y")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠️ Error extracting posted date: {str(e)}")
|
||||||
|
# Return current date as fallback
|
||||||
|
current_date = datetime.now()
|
||||||
|
return current_date.strftime("%m/%d/%y")
|
||||||
|
|
||||||
|
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
|
||||||
|
"""Add failed job to Redis cache for later retry"""
|
||||||
|
try:
|
||||||
|
job_data = {
|
||||||
|
"job_url": job_url,
|
||||||
|
"job_id": job_id,
|
||||||
|
"error_type": error_type,
|
||||||
|
"timestamp": datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
# Use job_id as the key to avoid duplicates
|
||||||
|
self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data))
|
||||||
|
print(f" 📦 Added failed job to Redis cache: {job_id} (Error: {error_type})")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ❌ Failed to add job to Redis cache: {str(e)}")
|
||||||
|
|
||||||
async def scrape_jobs(
|
async def scrape_jobs(
|
||||||
self,
|
self,
|
||||||
search_keywords: Optional[str],
|
search_keywords: Optional[str],
|
||||||
@ -308,7 +389,7 @@ class LinkedInJobScraper:
|
|||||||
print(f" ➕ Found {initial_jobs} initial job(s) (total: {len(all_job_links)})")
|
print(f" ➕ Found {initial_jobs} initial job(s) (total: {len(all_job_links)})")
|
||||||
|
|
||||||
iteration = 1
|
iteration = 1
|
||||||
while True and iteration >= 5:
|
while iteration <= 5: # Fixed the condition - was "iteration >= 5" which never runs
|
||||||
print(f"🔄 Iteration {iteration}: Checking for new jobs...")
|
print(f"🔄 Iteration {iteration}: Checking for new jobs...")
|
||||||
|
|
||||||
prev_job_count = len(all_job_links)
|
prev_job_count = len(all_job_links)
|
||||||
@ -353,9 +434,14 @@ class LinkedInJobScraper:
|
|||||||
job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1.t-24")
|
job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1.t-24")
|
||||||
if not job_page:
|
if not job_page:
|
||||||
print(f" ❌ Failed to fetch job page {full_url} after retries.")
|
print(f" ❌ Failed to fetch job page {full_url} after retries.")
|
||||||
|
await self._add_job_to_redis_cache(full_url, full_url.split("/")[-2] if "/jobs/view/" in full_url else "unknown", "fetch_failure")
|
||||||
self.engine.report_outcome("fetch_failure", url=full_url)
|
self.engine.report_outcome("fetch_failure", url=full_url)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Extract posted date from the job page
|
||||||
|
posted_date = await self._extract_job_posted_date(job_page)
|
||||||
|
print(f" 📅 Posted date extracted: {posted_date}")
|
||||||
|
|
||||||
apply_btn = None
|
apply_btn = None
|
||||||
apply_selectors = [
|
apply_selectors = [
|
||||||
"button[aria-label*='Apply']",
|
"button[aria-label*='Apply']",
|
||||||
@ -417,7 +503,8 @@ class LinkedInJobScraper:
|
|||||||
"page_content": page_content,
|
"page_content": page_content,
|
||||||
"url": final_url,
|
"url": final_url,
|
||||||
"job_id": job_id,
|
"job_id": job_id,
|
||||||
"search_keywords": search_keywords
|
"search_keywords": search_keywords,
|
||||||
|
"posted_date": posted_date # Add the posted date to raw data
|
||||||
}
|
}
|
||||||
|
|
||||||
# LLM agent is now fully responsible for extraction and validation
|
# LLM agent is now fully responsible for extraction and validation
|
||||||
@ -437,18 +524,24 @@ class LinkedInJobScraper:
|
|||||||
|
|
||||||
refined_data['scraped_at'] = datetime.now().isoformat()
|
refined_data['scraped_at'] = datetime.now().isoformat()
|
||||||
refined_data['category'] = clean_keywords
|
refined_data['category'] = clean_keywords
|
||||||
|
refined_data['posted_date'] = posted_date # Add posted date to refined data
|
||||||
await self.llm_agent.save_job_data(refined_data, search_keywords)
|
await self.llm_agent.save_job_data(refined_data, search_keywords)
|
||||||
scraped_count += 1
|
scraped_count += 1
|
||||||
print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...")
|
print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...")
|
||||||
self.engine.report_outcome("success", url=raw_data["url"])
|
self.engine.report_outcome("success", url=raw_data["url"])
|
||||||
else:
|
else:
|
||||||
print(f" 🟡 Could not extract meaningful data from: {final_url}")
|
print(f" 🟡 Could not extract meaningful data from: {final_url}")
|
||||||
|
await self._add_job_to_redis_cache(final_url, job_id, "llm_failure")
|
||||||
self.engine.report_outcome("llm_failure", url=raw_data["url"])
|
self.engine.report_outcome("llm_failure", url=raw_data["url"])
|
||||||
|
|
||||||
await job_page.close()
|
await job_page.close()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f" ⚠️ Failed on job {idx+1}: {str(e)[:100]}")
|
error_msg = str(e)[:100]
|
||||||
|
print(f" ⚠️ Failed on job {idx+1}: {error_msg}")
|
||||||
|
job_id = full_url.split("/")[-2] if "/jobs/view/" in full_url else "unknown" if 'full_url' in locals() else "unknown"
|
||||||
|
job_url = full_url if 'full_url' in locals() else "unknown"
|
||||||
|
await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}")
|
||||||
if 'job_page' in locals() and job_page:
|
if 'job_page' in locals() and job_page:
|
||||||
await job_page.close()
|
await job_page.close()
|
||||||
continue
|
continue
|
||||||
|
|||||||
14
llm_agent.py
14
llm_agent.py
@ -75,6 +75,7 @@ class LLMJobRefiner:
|
|||||||
url TEXT,
|
url TEXT,
|
||||||
category TEXT,
|
category TEXT,
|
||||||
scraped_at TIMESTAMP,
|
scraped_at TIMESTAMP,
|
||||||
|
posted_date TEXT,
|
||||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||||
)
|
)
|
||||||
''')
|
''')
|
||||||
@ -87,6 +88,7 @@ class LLMJobRefiner:
|
|||||||
|
|
||||||
cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON jobs(job_id)')
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON jobs(job_id)')
|
||||||
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON jobs(category)')
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON jobs(category)')
|
||||||
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_posted_date ON jobs(posted_date)')
|
||||||
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
cursor.close()
|
cursor.close()
|
||||||
@ -141,6 +143,7 @@ class LLMJobRefiner:
|
|||||||
cleaned_content = self._clean_html_for_llm(page_content)
|
cleaned_content = self._clean_html_for_llm(page_content)
|
||||||
job_id = raw_data.get('job_id', 'unknown')
|
job_id = raw_data.get('job_id', 'unknown')
|
||||||
url = raw_data.get('url', 'N/A')
|
url = raw_data.get('url', 'N/A')
|
||||||
|
posted_date = raw_data.get('posted_date', datetime.now().strftime("%m/%d/%y"))
|
||||||
|
|
||||||
prompt = f"""
|
prompt = f"""
|
||||||
You are a job posting data extractor.
|
You are a job posting data extractor.
|
||||||
@ -208,6 +211,9 @@ class LLMJobRefiner:
|
|||||||
print(f" ⚠️ LLM returned '{value}' for {field} but job content appears present")
|
print(f" ⚠️ LLM returned '{value}' for {field} but job content appears present")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# Add the posted_date to the refined data
|
||||||
|
refined_data['posted_date'] = posted_date
|
||||||
|
|
||||||
return refined_data
|
return refined_data
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -245,8 +251,8 @@ class LLMJobRefiner:
|
|||||||
cursor.execute('''
|
cursor.execute('''
|
||||||
INSERT INTO jobs
|
INSERT INTO jobs
|
||||||
(title, company_name, location, description, requirements,
|
(title, company_name, location, description, requirements,
|
||||||
qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at)
|
qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at, posted_date)
|
||||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
ON CONFLICT (job_id) DO NOTHING
|
ON CONFLICT (job_id) DO NOTHING
|
||||||
''', (
|
''', (
|
||||||
job_data.get("title", "N/A"),
|
job_data.get("title", "N/A"),
|
||||||
@ -260,7 +266,8 @@ class LLMJobRefiner:
|
|||||||
job_data.get("job_id", "N/A"),
|
job_data.get("job_id", "N/A"),
|
||||||
job_data.get("url", "N/A"),
|
job_data.get("url", "N/A"),
|
||||||
job_data.get("category", "N/A"),
|
job_data.get("category", "N/A"),
|
||||||
job_data.get("scraped_at")
|
job_data.get("scraped_at"),
|
||||||
|
job_data.get("posted_date", "N/A")
|
||||||
))
|
))
|
||||||
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
@ -287,6 +294,7 @@ class LLMJobRefiner:
|
|||||||
f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n")
|
f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n")
|
||||||
f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n")
|
f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n")
|
||||||
f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n")
|
f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n")
|
||||||
|
f.write(f"- **Posted Date**: {job_data.get('posted_date', 'N/A')}\n")
|
||||||
f.write(f"- **Category**: {job_data.get('category', 'N/A')}\n")
|
f.write(f"- **Category**: {job_data.get('category', 'N/A')}\n")
|
||||||
f.write(f"- **Scraped At**: {job_data.get('scraped_at', 'N/A')}\n")
|
f.write(f"- **Scraped At**: {job_data.get('scraped_at', 'N/A')}\n")
|
||||||
f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n")
|
f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user