492 lines
21 KiB
Python

import asyncio
import random
from typing import Optional, Dict
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
import re
from datetime import datetime
import json
import redis
from urllib.parse import urlparse
import hashlib
import csv
import os
class CryptoJobScraper:
def __init__(
self,
engine,
db_path: str = "crypto_jobs.db",
human_speed: float = 1.0,
user_request: str = "Extract all standard job details"
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self.user_request = user_request
self.llm_agent = LLMJobRefiner()
self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
self.FORBIDDEN_ATS_DOMAINS = [
'ashby', 'ashbyhq',
'greenhouse', 'boards.greenhouse.io',
'gem', 'gem.com',
'rippling',
'myworkday', 'myworkdayjobs',
'smartrecruiters',
'workable',
'lever', 'jobs.lever.co',
'linkedin.com' # ✅ Added LinkedIn
]
self.INVALID_CONTENT_PHRASES = [
"invalid job url",
"cookie consent",
"privacy policy",
"not a valid job",
"job not found",
"page not found",
"The requested job post could not be found. It may have been removed.",
"this page does not contain a job description"
]
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
return True
except:
return False
async def _extract_page_content_for_llm(self, page) -> str:
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
page_content = await page.content()
return page_content
def _calculate_keyword_match(self, title: str, keywords: str) -> float:
if not title or not keywords:
return 0.0
title_lower = title.lower()
keyword_list = [kw.strip().lower() for kw in keywords.split()]
matches = sum(1 for kw in keyword_list if kw in title_lower)
return matches / len(keyword_list) if keyword_list else 0.0
async def _extract_job_title_from_card(self, card) -> str:
try:
title_selectors = [
'h3', 'h2', 'h4',
'strong', 'span'
]
for selector in title_selectors:
title_element = await card.query_selector(selector)
if title_element:
title_text = await title_element.inner_text()
if title_text and len(title_text.strip()) > 3:
return title_text.strip()
card_text = await card.inner_text()
lines = [line.strip() for line in card_text.split('\n') if line.strip()]
if lines:
for line in lines:
if len(line) > 5 and not any(skip in line.lower() for skip in ['today', 'featured', 'yesterday', 'company', 'location']):
return line
return "Unknown Title"
except:
return "Unknown Title"
async def _collect_job_elements_from_page(self, page, search_keywords: str, seen_slugs):
job_cards = []
job_found = False
await asyncio.sleep(3 * self.human_speed)
try:
await page.wait_for_selector('a[href^="/"][href*="-"]', timeout=60000)
candidates = await page.query_selector_all('a[href^="/"][href*="-"]')
for link in candidates:
href = await link.get_attribute("href") or ""
href = href.rstrip('/')
if not href or len(href.split('/')) != 3:
continue
if '-' not in href.split('/')[-1]:
continue
slug = href.split('/')[-1]
if len(slug) < 8 or slug.startswith(('login', 'signup', 'about', 'terms')):
continue
full_url = "https://cryptocurrencyjobs.co" + href if not href.startswith('http') else href
if slug in seen_slugs:
continue
title = await self._extract_job_title_from_card(link)
if not title or title == "Unknown Title":
title = slug.replace('-', ' ').title()
match_percentage = self._calculate_keyword_match(title, search_keywords)
if match_percentage >= 0.4 or not search_keywords.strip():
seen_slugs.add(slug)
job_cards.append((full_url, title, link))
job_found = True
print(f" ✅ Collected {len(job_cards)} valid job links after filtering ({len(candidates)} raw candidates).")
except Exception as e:
print(f" ⚠️ Error collecting job cards: {e}")
if not job_found:
print(" ❌ No valid job listings passed filters.")
return job_cards
async def _handle_pagination_and_collect_all(self, page, search_keywords: str, seen_slugs):
all_job_elements = []
scroll_attempt = 0
max_scrolls = 40
prev_count = 0
while scroll_attempt < max_scrolls:
print(f" Scroll attempt {scroll_attempt + 1} | Current total jobs: {len(all_job_elements)}")
page_elements = await self._collect_job_elements_from_page(page, search_keywords, seen_slugs)
all_job_elements.extend(page_elements)
current_count = len(all_job_elements)
if current_count == prev_count and scroll_attempt > 3:
print(" 🔚 No new jobs after several scrolls → assuming end of list.")
break
prev_count = current_count
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(random.uniform(2.5, 5.5) * self.human_speed)
try:
load_more = await page.query_selector(
'button:has-text("Load more"), button:has-text("More"), div[role="button"]:has-text("Load"), a:has-text("Load more")'
)
if load_more:
print(" Found 'Load more' button → clicking...")
await self._human_click(page, load_more)
await asyncio.sleep(random.uniform(3.0, 6.0) * self.human_speed)
except:
pass
scroll_attempt += 1
print(f" Finished scrolling → collected {len(all_job_elements)} unique job links.")
return all_job_elements
async def _extract_job_posted_date_from_card(self, card) -> str:
try:
card_text = await card.inner_text()
if "Today" in card_text:
return datetime.now().strftime("%m/%d/%y")
elif "Yesterday" in card_text:
from datetime import timedelta
return (datetime.now() - timedelta(days=1)).strftime("%m/%d/%y")
else:
match = re.search(r'(\d+)d', card_text)
if match:
days = int(match.group(1))
from datetime import timedelta
return (datetime.now() - timedelta(days=days)).strftime("%m/%d/%y")
except:
pass
return datetime.now().strftime("%m/%d/%y")
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
try:
job_data = {
"job_url": job_url,
"job_id": job_id,
"error_type": error_type,
"timestamp": datetime.now().isoformat()
}
self.redis_client.hset("failed_crypto_jobs", job_id, json.dumps(job_data))
print(f" 📦 Added failed job to Redis cache: {job_id} (Error: {error_type})")
except Exception as e:
print(f" ❌ Failed to add job to Redis cache: {str(e)}")
async def _is_forbidden_ats_url(self, url: str) -> bool:
url_lower = url.lower()
return any(domain in url_lower for domain in self.FORBIDDEN_ATS_DOMAINS)
def _get_ats_platform_name(self, url: str) -> str:
"""Return canonical ATS name based on URL (e.g., 'ashby', 'greenhouse')"""
url_lower = url.lower()
# Order matters: more specific first
if 'boards.greenhouse.io' in url_lower:
return 'greenhouse'
elif 'jobs.lever.co' in url_lower:
return 'lever'
elif 'myworkdayjobs' in url_lower or 'myworkday' in url_lower:
return 'workday'
elif 'linkedin.com' in url_lower:
return 'linkedin'
elif 'ashbyhq.com' in url_lower or 'ashby' in url_lower:
return 'ashby'
elif 'gem.com' in url_lower or 'gem' in url_lower:
return 'gem'
elif 'rippling' in url_lower:
return 'rippling'
elif 'smartrecruiters' in url_lower:
return 'smartrecruiters'
elif 'workable' in url_lower:
return 'workable'
else:
# Fallback: extract domain part
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
for forbidden in self.FORBIDDEN_ATS_DOMAINS:
if forbidden in domain:
return forbidden.split('.')[0] if '.' in forbidden else forbidden
except:
pass
return 'forbidden_ats'
def _log_forbidden_ats_url(self, url: str, platform: str):
"""Append forbidden URL to {platform}.csv"""
filename = f"{platform}.csv"
file_exists = os.path.isfile(filename)
with open(filename, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(['url', 'timestamp'])
writer.writerow([url, datetime.now().isoformat()])
print(f" 📥 Logged forbidden ATS URL to {filename}: {url}")
async def _is_invalid_job_page(self, page_content: str) -> bool:
content_lower = page_content.lower()
return any(phrase in content_lower for phrase in self.INVALID_CONTENT_PHRASES)
def _extract_job_id_from_url(self, url: str) -> Optional[str]:
try:
parsed = urlparse(url)
path_parts = [p for p in parsed.path.split('/') if p]
if not path_parts:
return None
candidate = path_parts[-1]
candidate = re.split(r'[?#]', candidate)[0]
candidate = re.sub(r'\.html?$', '', candidate)
if not candidate or not any(c.isdigit() for c in candidate):
return None
if re.search(r'[A-Za-z]{6,}\s', candidate):
return None
return candidate
except:
return None
async def scrape_jobs(
self,
search_keywords: Optional[str],
max_pages: int = 1,
credentials: Optional[Dict] = None
):
query = ""
location = ""
if search_keywords and search_keywords.strip():
parts = search_keywords.split(',', 1)
query = parts[0].strip()
if len(parts) > 1:
location = parts[1].strip()
clean_query = query.replace(' ', '+')
clean_location = location.replace(' ', '+')
search_url = "https://cryptocurrencyjobs.co/"
if clean_query:
search_url += f"?query={clean_query}"
if clean_location:
search_url += f"&location={clean_location}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
print(f"🔍 Searching for: {search_keywords or 'all jobs'}")
print(f" 🔗 URL: {search_url}")
await page.goto(search_url, wait_until='networkidle', timeout=120000)
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
try:
await page.wait_for_selector("a[href^='/'][href*='-']", timeout=10000)
except:
print(" ⚠️ No job links found initially, waiting longer...")
await asyncio.sleep(5 * self.human_speed)
seen_slugs = set()
all_job_elements = await self._handle_pagination_and_collect_all(page, search_keywords, seen_slugs)
print(f"✅ Collected {len(all_job_elements)} unique job links.")
scraped_count = 0
for idx, (href, title, job_element) in enumerate(all_job_elements):
job_detail_page = None
apply_page = None
skip_job = False
final_scrape_url = None
try:
print(f" → Processing job {idx+1}/{len(all_job_elements)}: {title}")
posted_date = await self._extract_job_posted_date_from_card(job_element)
job_detail_page = await context.new_page()
await job_detail_page.goto(href, wait_until='networkidle', timeout=60000)
await asyncio.sleep(2 * self.human_speed)
page_content = await job_detail_page.content()
if await self._is_invalid_job_page(page_content):
print(" 🚫 Page contains invalid content → skipping.")
await job_detail_page.close()
continue
apply_clicked = False
apply_selectors = [
'a[href*="apply"], a:text("Apply"), a:text("Apply Now"), a:text("Apply here")',
'button:text("Apply"), button:has-text("Apply")',
'[data-testid="apply-button"], [aria-label*="apply"], [role="button"]:has-text("Apply")',
'a.btn-apply, .apply-button, .apply-link, a:has-text("Apply")',
'a[rel="noopener"]:has-text("Apply")',
]
for sel in apply_selectors:
apply_elem = await job_detail_page.query_selector(sel)
if apply_elem:
print(f" 🔗 Found Apply element with selector: {sel}")
await self._human_click(job_detail_page, apply_elem, wait_after=True)
apply_clicked = True
break
apply_page = job_detail_page
if apply_clicked:
await asyncio.sleep(random.uniform(3.0, 6.0) * self.human_speed)
pages = context.pages
new_pages = [p for p in pages if p != job_detail_page and p.url != "about:blank"]
if new_pages:
candidate_page = new_pages[-1]
new_url = candidate_page.url.strip()
print(f" New tab opened: {new_url}")
if new_url and await self._is_forbidden_ats_url(new_url):
platform = self._get_ats_platform_name(new_url)
self._log_forbidden_ats_url(new_url, platform)
if candidate_page != job_detail_page:
await candidate_page.close()
await job_detail_page.close()
skip_job = True
else:
apply_page = candidate_page
else:
print(" No new tab → using original page.")
if skip_job:
continue
final_scrape_url = apply_page.url
page_content = await self._extract_page_content_for_llm(apply_page)
if await self._is_invalid_job_page(page_content):
print(" 🚫 Final page contains invalid content → skipping.")
if apply_page != job_detail_page:
await apply_page.close()
await job_detail_page.close()
continue
job_id = self._extract_job_id_from_url(final_scrape_url)
if not job_id:
job_id = "job_" + hashlib.md5(final_scrape_url.encode()).hexdigest()[:12]
raw_data = {
"page_content": page_content,
"url": final_scrape_url,
"job_id": job_id,
"search_keywords": search_keywords,
"posted_date": posted_date
}
refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request)
if refined_data and refined_data.get("title", "N/A") != "N/A":
compulsory_fields = ['company_name', 'job_id', 'url']
for field in compulsory_fields:
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]:
if field == 'job_id':
refined_data[field] = job_id
elif field == 'url':
refined_data[field] = final_scrape_url
elif field == 'company_name':
refined_data[field] = "Unknown Company"
refined_data['scraped_at'] = datetime.now().isoformat()
refined_data['category'] = search_keywords or "all"
refined_data['posted_date'] = posted_date
await self.llm_agent.save_job_data(refined_data, search_keywords or "all")
scraped_count += 1
print(f" ✅ Scraped: {refined_data['title'][:50]}... (Job ID: {job_id})")
self.engine.report_outcome("success", url=final_scrape_url)
else:
print(f" 🟡 Could not extract meaningful data from: {final_scrape_url}")
await self._add_job_to_redis_cache(final_scrape_url, job_id, "llm_failure")
self.engine.report_outcome("llm_failure", url=final_scrape_url)
if apply_page != job_detail_page and not apply_page.is_closed():
await apply_page.close()
if job_detail_page and not job_detail_page.is_closed():
await job_detail_page.close()
except Exception as e:
error_msg = str(e)[:100]
print(f" ⚠️ Failed on job {idx+1}: {error_msg}")
job_id_for_log = "unknown"
if 'final_scrape_url' in locals() and final_scrape_url:
job_id_for_log = "job_" + hashlib.md5(final_scrape_url.encode()).hexdigest()[:12]
await self._add_job_to_redis_cache(href, job_id_for_log, f"exception: {error_msg}")
if job_detail_page and not job_detail_page.is_closed():
await job_detail_page.close()
if 'apply_page' in locals() and apply_page and apply_page != job_detail_page and not apply_page.is_closed():
await apply_page.close()
continue
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords or 'all jobs'}'.")
else:
self.engine.report_outcome("scraping_error")
print("⚠️ No jobs processed successfully.")