254 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import random
from typing import Optional, Dict
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
import re
from fetcher import StealthyFetcher
from datetime import datetime
import json
import redis
class CryptoJobScraper:
def __init__(
self,
engine,
db_path: str = "crypto_jobs.db",
human_speed: float = 1.0,
user_request: str = "Extract all standard job details"
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self.user_request = user_request
self.llm_agent = LLMJobRefiner()
self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
return True
except:
return False
async def _extract_page_content_for_llm(self, page) -> str:
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
page_content = await page.content()
return page_content
def _calculate_keyword_match(self, title: str, keywords: str) -> float:
if not title or not keywords:
return 0.0
title_lower = title.lower()
keyword_list = [kw.strip().lower() for kw in keywords.split()]
matches = sum(1 for kw in keyword_list if kw in title_lower)
return matches / len(keyword_list) if keyword_list else 0.0
async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links):
current_links = await page.query_selector_all("a[href*='/job/']")
new_jobs = 0
for link in current_links:
href = await link.get_attribute("href")
if not href or not href.startswith("http"):
href = "https://cryptocurrencyjobs.co" + href
job_id = href.split("/")[-1] if href.endswith("/") else href.split("/")[-1]
if job_id and job_id not in seen_job_ids:
title_element = await link.query_selector("h3, .job-title")
title = (await title_element.inner_text()) if title_element else "Unknown Title"
match_percentage = self._calculate_keyword_match(title, search_keywords)
if match_percentage >= 0.5: # Lower threshold than LinkedIn
seen_job_ids.add(job_id)
all_job_links.append((href, title))
new_jobs += 1
else:
print(f" ⚠️ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})")
return new_jobs
async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links):
current_page = 1
while True:
print(f"📄 Processing page {current_page}")
new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {new_jobs} new job(s) (total: {len(all_job_links)})")
next_btn = await page.query_selector('a[rel="next"]')
if next_btn:
next_url = await next_btn.get_attribute("href")
if next_url and not next_url.startswith("http"):
next_url = "https://cryptocurrencyjobs.co" + next_url
await page.goto(next_url, timeout=120000)
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
current_page += 1
else:
print("🔚 No 'Next' page — stopping pagination.")
break
async def _extract_job_posted_date(self, page) -> str:
try:
date_element = await page.query_selector(".job-posted-date, .job-date, time")
if date_element:
date_text = await date_element.inner_text()
if "Today" in date_text:
return datetime.now().strftime("%m/%d/%y")
elif "Yesterday" in date_text:
yesterday = datetime.now().replace(day=datetime.now().day - 1)
return yesterday.strftime("%m/%d/%y")
else:
return datetime.now().strftime("%m/%d/%y")
except:
pass
return datetime.now().strftime("%m/%d/%y")
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
try:
job_data = {
"job_url": job_url,
"job_id": job_id,
"error_type": error_type,
"timestamp": datetime.now().isoformat()
}
self.redis_client.hset("failed_crypto_jobs", job_id, json.dumps(job_data))
print(f" 📦 Added failed job to Redis cache: {job_id} (Error: {error_type})")
except Exception as e:
print(f" ❌ Failed to add job to Redis cache: {str(e)}")
async def scrape_jobs(
self,
search_keywords: Optional[str],
max_pages: int = 1,
credentials: Optional[Dict] = None
):
# cryptocurrencyjobs.co uses URL params differently
encoded_keywords = search_keywords.replace(" ", "%20")
search_url = f"https://cryptocurrencyjobs.co/?q={encoded_keywords}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
# Fetch main search page
print(f"🔍 Searching for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=120000)
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
all_job_links = []
seen_job_ids = set()
print("🔄 Collecting job links from search results...")
await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links)
print(f"✅ Collected {len(all_job_links)} unique job links.")
scraped_count = 0
for idx, (href, title) in enumerate(all_job_links):
try:
full_url = href
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
fetcher = StealthyFetcher(self.engine, browser, context)
job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1")
if not job_page:
print(f" ❌ Failed to fetch job page {full_url}")
await self._add_job_to_redis_cache(full_url, full_url.split("/")[-1], "fetch_failure")
self.engine.report_outcome("fetch_failure", url=full_url)
continue
posted_date = await self._extract_job_posted_date(job_page)
await self.engine._human_like_scroll(job_page)
await asyncio.sleep(2 * self.human_speed)
page_content = await self._extract_page_content_for_llm(job_page)
job_id = full_url.split("/")[-1] if full_url.split("/")[-1] else "unknown"
raw_data = {
"page_content": page_content,
"url": full_url,
"job_id": job_id,
"search_keywords": search_keywords,
"posted_date": posted_date
}
refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request)
if refined_data and refined_data.get("title", "N/A") != "N/A":
compulsory_fields = ['company_name', 'job_id', 'url']
for field in compulsory_fields:
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]:
if field == 'job_id':
refined_data[field] = job_id
elif field == 'url':
refined_data[field] = full_url
elif field == 'company_name':
refined_data[field] = "Unknown Company"
refined_data['scraped_at'] = datetime.now().isoformat()
refined_data['category'] = search_keywords
refined_data['posted_date'] = posted_date
await self.llm_agent.save_job_data(refined_data, search_keywords)
scraped_count += 1
print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...")
self.engine.report_outcome("success", url=raw_data["url"])
else:
print(f" 🟡 Could not extract meaningful data from: {full_url}")
await self._add_job_to_redis_cache(full_url, job_id, "llm_failure")
self.engine.report_outcome("llm_failure", url=raw_data["url"])
await job_page.close()
except Exception as e:
error_msg = str(e)[:100]
print(f" ⚠️ Failed on job {idx+1}: {error_msg}")
job_id = full_url.split("/")[-1] if 'full_url' in locals() else "unknown"
job_url = full_url if 'full_url' in locals() else "unknown"
await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}")
if 'job_page' in locals() and job_page:
await job_page.close()
continue
finally:
print(" ↩️ Returning to search results...")
await page.goto(search_url, timeout=120000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}' based on request '{self.user_request}'.")
else:
self.engine.report_outcome("scraping_error")
print("⚠️ No jobs processed successfully.")