556 lines
21 KiB
Python

import asyncio
import random
import os
import json
import time
from typing import Optional, Dict
from playwright.async_api import async_playwright
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
from fetcher import StealthyFetcher
from datetime import datetime
import pika
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
from scraping_engine import FingerprintScrapingEngine
from dotenv import load_dotenv
from ssl_connection import create_ssl_connection_parameters # Import from ssl.py
import redis
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Environment variables
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5671"))
RABBITMQ_SSL_ENABLED = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true"
# Redis configuration
REDIS_HOST = os.getenv('REDIS_HOST', 'redis-scrape.thejobhub.xyz')
REDIS_PORT = int(os.getenv('REDIS_PORT', '6380'))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD')
REDIS_SSL_ENABLED = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true'
class RedisManager:
"""Manages Redis connection and operations for job tracking and caching."""
def __init__(self):
self.redis_client = None
self._connect()
def _connect(self):
"""Establish connection to Redis server."""
if not REDIS_PASSWORD:
logger.warning("Warning: REDIS_PASSWORD not found in environment.")
try:
self.redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
ssl=REDIS_SSL_ENABLED,
ssl_cert_reqs=None,
socket_connect_timeout=10,
socket_timeout=30,
retry_on_timeout=True
)
response = self.redis_client.ping()
logger.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}! Response: {response}")
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
self.redis_client = None
def is_job_seen(self, job_id: str) -> bool:
if not self.redis_client:
return False
try:
return bool(self.redis_client.exists(f"job_seen:{job_id}"))
except Exception as e:
logger.error(f"Redis error checking job_seen: {e}")
return False
def mark_job_seen(self, job_id: str):
if not self.redis_client:
return
try:
self.redis_client.setex(f"job_seen:{job_id}", 2592000, "1")
except Exception as e:
logger.error(f"Redis error marking job_seen: {e}")
def get_cached_llm_result(self, job_url: str) -> Optional[Dict]:
if not self.redis_client:
return None
try:
cached_data = self.redis_client.get(f"llm_cache:{job_url}")
if cached_data:
return json.loads(cached_data)
return None
except Exception as e:
logger.error(f"Redis error getting LLM cache: {e}")
return None
def cache_llm_result(self, job_url: str, result: Dict):
if not self.redis_client:
return
try:
self.redis_client.setex(f"llm_cache:{job_url}", 604800, json.dumps(result))
except Exception as e:
logger.error(f"Redis error caching LLM result: {e}")
def add_job_to_error_cache(self, job_url: str, job_id: str, error_type: str):
if not self.redis_client:
return
try:
error_data = {
"job_url": job_url,
"job_id": job_id,
"error_type": error_type,
"timestamp": datetime.now().isoformat()
}
self.redis_client.setex(f"error_cache:{job_id}", 3600, json.dumps(error_data))
except Exception as e:
logger.error(f"Redis error adding to error cache: {e}")
class MultiPlatformJobScraper:
def __init__(
self,
engine: FingerprintScrapingEngine,
user_request: str = "Extract all standard job details"
):
self.engine = engine
self.user_request = user_request
self.llm_agent = LLMJobRefiner()
self.browser = None
self.pw = None
self.redis_manager = RedisManager()
async def init_browser(self):
if self.browser is not None:
try:
await self.browser.new_page()
await self.close_browser()
except:
await self.close_browser()
if self.browser is None:
try:
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
self.pw = await async_playwright().start()
self.browser = await self.pw.chromium.launch(
headless=True,
args=[
'--disable-blink-features=AutomationControlled',
'--no-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu'
]
)
logger.info("✅ Browser launched (will reuse for all jobs)")
except Exception as e:
logger.error(f"💥 Failed to launch browser: {e}")
raise
async def create_fresh_context(self):
if self.browser is None:
await self.init_browser()
try:
await self.browser.new_page()
except Exception:
logger.warning("Browser appears dead. Reinitializing...")
await self.close_browser()
await self.init_browser()
profile = self.engine._select_profile()
context = await AsyncNewContext(self.browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
spoof_script = self.engine._get_spoof_script(
random.choice(self.engine.common_renderers[self.engine.os]),
random.choice(self.engine.common_vendors)
)
await context.add_init_script(spoof_script)
return context
async def close_browser(self):
if self.browser:
try:
await self.browser.close()
except:
pass
self.browser = None
if self.pw:
try:
await self.pw.stop()
except:
pass
self.pw = None
async def _safe_inner_text(self, element):
if not element:
return "Unknown"
try:
return await element.text_content()
except:
return "Unknown"
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
speed = self.engine.optimization_params.get("base_delay", 2.0) / 2
await asyncio.sleep(random.uniform(0.3, 0.8) * (speed / 2))
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * (speed / 2))
return True
except:
return False
async def _extract_page_content_for_llm(self, page) -> str:
speed = self.engine.optimization_params.get("base_delay", 2.0)
await asyncio.sleep(2 * (speed / 2))
if "lever.co" not in page.url:
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * (speed / 2))
return await page.content()
async def _is_job_seen(self, job_id: str) -> bool:
return self.redis_manager.is_job_seen(job_id)
async def _mark_job_seen(self, job_id: str):
self.redis_manager.mark_job_seen(job_id)
async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]:
return self.redis_manager.get_cached_llm_result(job_url)
async def _cache_llm_result(self, job_url: str, result: Dict):
self.redis_manager.cache_llm_result(job_url, result)
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
logger.info(f" 📦 Adding failed job to Redis cache: {job_id} (Error: {error_type})")
self.redis_manager.add_job_to_error_cache(job_url, job_id, error_type)
def _get_platform(self, url: str) -> str:
if "ashbyhq.com" in url:
return "ashby"
elif "lever.co" in url:
return "lever"
elif "greenhouse.io" in url:
return "greenhouse"
else:
return "unknown"
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def scrape_job(
self,
job_url: str,
company_name: str,
message_id: str
):
platform = self._get_platform(job_url)
if platform == "unknown":
logger.info(f"⏭️ Skipping unsupported platform: {job_url}")
return True
job_id = job_url.strip("/").split("/")[-1]
if await self._is_job_seen(job_id):
logger.info(f"⏭️ Skipping already processed job: {job_id}")
return True
cached_result = await self._get_cached_llm_result(job_url)
if cached_result:
logger.info(f"📦 Using cached LLM result for: {job_url}")
await self.llm_agent.save_job_data(cached_result, company_name)
await self._mark_job_seen(job_id)
return True
context = None
page = None
start_time = time.time()
try:
context = await self.create_fresh_context()
page = await context.new_page()
timeout_ms = self.engine.optimization_params.get("request_timeout", 120000)
temp_fetcher = StealthyFetcher(self.engine, self.browser, context)
fetch_timeout = 60000 if platform == "lever" else timeout_ms
job_page = await asyncio.wait_for(
temp_fetcher.fetch_url(job_url, wait_for_selector="h1", timeout=fetch_timeout),
timeout=fetch_timeout / 1000.0
)
# Check if job still exists (minimal content validation)
page_content = await job_page.content()
if len(page_content.strip()) < 500: # Arbitrary threshold for "page exists"
logger.error(f"❌ Job no longer exists (empty/deleted): {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "job_not_found")
self.engine.report_outcome("job_not_found", url=job_url)
return False
if platform == "ashby":
try:
await job_page.wait_for_selector("div[class*='job-posting'], article, main", timeout=60000)
except Exception:
logger.warning(f"⚠️ Ashby page didn't load properly: {job_url}")
return False
elif platform == "lever":
pass
elif platform == "greenhouse":
try:
await job_page.wait_for_selector("div.job-desc, section", timeout=60000)
except Exception:
pass
# Extract page content for initial validation
page_content = await self._extract_page_content_for_llm(job_page)
# Check for job expiration or unavailability indicators
page_text_lower = page_content.lower()
job_unavailable_indicators = [
"job no longer available",
"position has been filled",
"this job has expired",
"job posting has expired",
"no longer accepting applications",
"position is closed",
"job is no longer active",
"this position is no longer open"
]
if any(indicator in page_text_lower for indicator in job_unavailable_indicators):
logger.error(f"❌ Job no longer available/expired: {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "job_not_found")
self.engine.report_outcome("job_not_found", url=job_url)
return False
# 🔑 APPLY TYPE LOGIC
if platform in ["ashby", "lever", "greenhouse"]:
apply_type = 'AI' # Always AI for these platforms
else:
# For other platforms: check if form is accessible without login
apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')")
apply_type = 'signup' # default
if apply_btn:
await self._human_click(job_page, apply_btn)
speed = self.engine.optimization_params.get("base_delay", 2.0)
await asyncio.sleep(2 * (speed / 2))
form = await job_page.query_selector("form, div[class*='application-form']")
if form:
# Check for login prompts in form
login_indicators = await job_page.query_selector("input[type='email'], input[type='password'], text='sign in', text='log in'")
if not login_indicators:
apply_type = 'AI'
else:
apply_type = 'signup'
else:
apply_type = 'signup'
final_url = job_url
# Hardcode posted_date to Dec 1st 2025
posted_date = "12/01/25"
raw_data = {
"page_content": page_content,
"url": final_url,
"job_id": job_id,
"search_keywords": company_name,
"posted_date": posted_date
}
llm_timeout = max(60, self.engine.feedback.get("avg_response_time", 10) * 2)
refined_data = await asyncio.wait_for(
self.llm_agent.refine_job_data(raw_data, self.user_request),
timeout=llm_timeout
)
success = False
if refined_data and refined_data.get("title", "N/A") != "N/A":
# Check if description is missing or empty
description = refined_data.get("description", "").strip()
if not description or description in ["N/A", "Unknown", ""]:
logger.error(f"❌ Job discarded - missing description: {final_url}")
await self._add_job_to_redis_cache(final_url, job_id, "job_not_found")
self.engine.report_outcome("job_not_found", url=final_url)
return False
compulsory_fields = ['company_name', 'job_id', 'url']
for field in compulsory_fields:
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]:
if field == 'job_id':
refined_data[field] = job_id
elif field == 'url':
refined_data[field] = final_url
elif field == 'company_name':
refined_data[field] = company_name
refined_data.update({
'apply_type': apply_type,
'scraped_at': datetime.now().isoformat(),
'category': company_name,
'posted_date': posted_date,
'message_id': message_id,
'platform': platform
})
await self.llm_agent.save_job_data(refined_data, company_name)
await self._cache_llm_result(job_url, refined_data)
await self._mark_job_seen(job_id)
response_time = time.time() - start_time
self.engine.report_outcome("success", url=final_url, response_time=response_time)
logger.info(f"✅ Scraped ({platform}): {refined_data['title'][:50]}... (Apply Type: {apply_type})")
success = True
else:
logger.warning(f"🟡 LLM failed to refine: {final_url}")
await self._add_job_to_redis_cache(final_url, job_id, "llm_failure")
self.engine.report_outcome("llm_failure", url=final_url)
return success
except asyncio.TimeoutError:
logger.error(f"⏰ Timeout processing job ({platform}): {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "timeout")
self.engine.report_outcome("timeout", url=job_url)
return False
except Exception as e:
error_msg = str(e)
if "NoneType" in error_msg or "disconnected" in error_msg or "Browser" in error_msg:
logger.warning("Browser connection lost. Forcing reinitialization.")
await self.close_browser()
# 🔍 Distinguish job-not-found vs other errors
if "page.goto: net::ERR_ABORTED" in error_msg or "page.goto: net::ERR_FAILED" in error_msg:
logger.error(f"❌ Job no longer exists (404/network error): {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "job_not_found")
self.engine.report_outcome("job_not_found", url=job_url)
else:
# Categorize other errors
error_type = "exception"
if "timeout" in error_msg.lower():
error_type = "timeout"
elif "llm" in error_msg.lower() or "refine" in error_msg.lower():
error_type = "llm_failure"
else:
error_type = "scraping_error"
logger.error(f"💥 Error processing job ({platform}) {job_url}: {error_msg}")
await self._add_job_to_redis_cache(job_url, job_id, error_type)
self.engine.report_outcome(error_type, url=job_url)
return False
finally:
if context:
try:
await context.close()
except Exception:
pass
# Global metrics
METRICS = {
"processed": 0,
"success": 0,
"failed": 0,
"skipped": 0,
"start_time": time.time()
}
async def process_message_async(scraper: MultiPlatformJobScraper, ch, method, properties, body):
try:
job_data = json.loads(body)
job_link = job_data['job_link']
company_name = job_data['company_name']
message_id = properties.message_id or f"msg_{int(time.time()*1000)}"
logger.info(f"📥 Processing job: {job_link} (ID: {message_id})")
success = await scraper.scrape_job(job_link, company_name, message_id)
METRICS["processed"] += 1
if success:
METRICS["success"] += 1
else:
METRICS["failed"] += 1
except json.JSONDecodeError:
logger.error("❌ Invalid JSON in message")
METRICS["failed"] += 1
except Exception as e:
logger.error(f"💥 Unexpected error: {str(e)}")
METRICS["failed"] += 1
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
def callback_wrapper(scraper: MultiPlatformJobScraper):
def callback(ch, method, properties, body):
asyncio.run(process_message_async(scraper, ch, method, properties, body))
return callback
def start_consumer():
engine = FingerprintScrapingEngine(
seed="multiplatform_scraper",
target_os="windows",
num_variations=10
)
scraper = MultiPlatformJobScraper(engine)
connection = None
for attempt in range(5):
try:
parameters = create_ssl_connection_parameters()
if RABBITMQ_SSL_ENABLED:
logger.info(f"Connecting to RabbitMQ over SSL at {RABBITMQ_HOST}:{RABBITMQ_PORT}")
else:
logger.info(f"Connecting to RabbitMQ at {RABBITMQ_HOST}:{RABBITMQ_PORT}")
connection = pika.BlockingConnection(parameters)
break
except Exception as e:
logger.error(f"RabbitMQ connection attempt {attempt + 1} failed: {e}")
time.sleep(2 ** attempt)
if not connection:
logger.error("Failed to connect to RabbitMQ after retries")
return
channel = connection.channel()
channel.queue_declare(queue='job_queue', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper))
logger.info('Waiting for messages (Ashby, Lever, Greenhouse). To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
logger.info("Shutting down...")
channel.stop_consuming()
connection.close()
asyncio.run(scraper.close_browser())
if __name__ == "__main__":
start_consumer()