596 lines
25 KiB
Python

import asyncio
import random
import os
import json
import time
from typing import Optional, Dict
from playwright.async_api import async_playwright
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
from fetcher import StealthyFetcher
from datetime import datetime
import pika
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
from scraping_engine import FingerprintScrapingEngine
from dotenv import load_dotenv
from ssl_connection import create_ssl_connection_parameters # Import from ssl.py
import redis
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Environment variables
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST")
RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5671"))
RABBITMQ_SSL_ENABLED = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true"
# Redis configuration
REDIS_HOST = os.getenv('REDIS_HOST')
REDIS_PORT = int(os.getenv('REDIS_PORT', '6380'))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD')
REDIS_SSL_ENABLED = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true'
# TTL constants (in seconds)
JOB_SEEN_TTL = 2592000 # 30 days (1 month)
class RedisManager:
"""Manages Redis connection and operations for job tracking and caching."""
def __init__(self):
self.redis_client = None
self._connect()
def _connect(self):
"""Establish connection to Redis server."""
if not REDIS_PASSWORD:
logger.warning("Warning: REDIS_PASSWORD not found in environment.")
try:
self.redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
ssl=REDIS_SSL_ENABLED,
ssl_cert_reqs=None,
socket_connect_timeout=10,
socket_timeout=30,
retry_on_timeout=True
)
response = self.redis_client.ping()
logger.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}! Response: {response}")
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
self.redis_client = None
def is_job_seen(self, job_id: str) -> bool:
if not self.redis_client:
return False
try:
return bool(self.redis_client.exists(f"job_seen:{job_id}"))
except Exception as e:
logger.error(f"Redis error checking job_seen: {e}")
return False
def get_cached_llm_result(self, job_url: str) -> Optional[Dict]:
if not self.redis_client:
return None
try:
cached_data = self.redis_client.get(f"llm_cache:{job_url}")
if cached_data:
return json.loads(cached_data)
return None
except Exception as e:
logger.error(f"Redis error getting LLM cache: {e}")
return None
def mark_job_seen(self, job_id: str):
if not self.redis_client:
return
try:
# Set with TTL of 1 month
self.redis_client.setex(f"job_seen:{job_id}", JOB_SEEN_TTL, "1")
except Exception as e:
logger.error(f"Redis error marking job_seen: {e}")
def cache_llm_result(self, job_url: str, result: Dict):
if not self.redis_client:
return
try:
self.redis_client.set(f"llm_cache:{job_url}", json.dumps(result)) # No TTL for LLM cache
except Exception as e:
logger.error(f"Redis error caching LLM result: {e}")
def add_job_to_error_cache(self, job_url: str, job_id: str, error_type: str):
if not self.redis_client:
return
try:
error_data = {
"job_url": job_url,
"job_id": job_id,
"error_type": error_type,
"timestamp": datetime.now().isoformat()
}
self.redis_client.set(f"error_cache:{job_id}", json.dumps(error_data)) # No TTL for error cache
except Exception as e:
logger.error(f"Redis error adding to error cache: {e}")
def remove_job_from_error_cache(self, job_id: str):
"""Remove job from error cache when successfully processed later."""
if not self.redis_client:
return
try:
deleted = self.redis_client.delete(f"error_cache:{job_id}")
if deleted:
logger.info(f"✅ Removed job {job_id} from error cache after successful processing")
except Exception as e:
logger.error(f"Redis error removing from error cache: {e}")
def add_job_to_sent_cache(self, job_id: str):
"""Mark job as sent for processing."""
if not self.redis_client:
return
try:
self.redis_client.set(f"sent_job:{job_id}", "1") # No TTL
except Exception as e:
logger.error(f"Redis error adding to sent cache: {e}")
def remove_job_from_sent_cache(self, job_id: str):
"""Remove job from sent cache upon successful or failed completion."""
if not self.redis_client:
return
try:
deleted = self.redis_client.delete(f"sent_job:{job_id}")
if deleted:
logger.debug(f"🧹 Removed job {job_id} from sent cache")
except Exception as e:
logger.error(f"Redis error removing from sent cache: {e}")
def is_job_in_sent_cache(self, job_id: str) -> bool:
"""Check if job is already in sent cache."""
if not self.redis_client:
return False
try:
return bool(self.redis_client.exists(f"sent_job:{job_id}"))
except Exception as e:
logger.error(f"Redis error checking sent cache: {e}")
return False
class MultiPlatformJobScraper:
def __init__(
self,
engine: FingerprintScrapingEngine,
user_request: str = "Extract all standard job details"
):
self.engine = engine
self.user_request = user_request
self.llm_agent = LLMJobRefiner()
self.browser = None
self.pw = None
self.redis_manager = RedisManager()
async def init_browser(self):
if self.browser is not None:
try:
await self.browser.new_page()
await self.close_browser()
except:
await self.close_browser()
if self.browser is None:
try:
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
self.pw = await async_playwright().start()
self.browser = await self.pw.chromium.launch(
headless=True,
args=[
'--disable-blink-features=AutomationControlled',
'--no-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu'
]
)
logger.info("✅ Browser launched (will reuse for all jobs)")
except Exception as e:
logger.error(f"💥 Failed to launch browser: {e}")
raise
async def create_fresh_context(self):
if self.browser is None:
await self.init_browser()
try:
await self.browser.new_page()
except Exception:
logger.warning("Browser appears dead. Reinitializing...")
await self.close_browser()
await self.init_browser()
profile = self.engine._select_profile()
context = await AsyncNewContext(self.browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
spoof_script = self.engine._get_spoof_script(
random.choice(self.engine.common_renderers[self.engine.os]),
random.choice(self.engine.common_vendors)
)
await context.add_init_script(spoof_script)
return context
async def close_browser(self):
if self.browser:
try:
await self.browser.close()
except:
pass
self.browser = None
if self.pw:
try:
await self.pw.stop()
except:
pass
self.pw = None
async def _safe_inner_text(self, element):
if not element:
return "Unknown"
try:
return await element.text_content()
except:
return "Unknown"
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
speed = self.engine.optimization_params.get("base_delay", 2.0) / 2
await asyncio.sleep(random.uniform(0.3, 0.8) * (speed / 2))
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * (speed / 2))
return True
except:
return False
async def _extract_page_content_for_llm(self, page) -> str:
speed = self.engine.optimization_params.get("base_delay", 2.0)
await asyncio.sleep(2 * (speed / 2))
if "lever.co" not in page.url:
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * (speed / 2))
return await page.content()
async def _is_job_seen(self, job_id: str) -> bool:
return self.redis_manager.is_job_seen(job_id)
async def _mark_job_seen(self, job_id: str):
self.redis_manager.mark_job_seen(job_id)
async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]:
return self.redis_manager.get_cached_llm_result(job_url)
async def _cache_llm_result(self, job_url: str, result: Dict):
self.redis_manager.cache_llm_result(job_url, result)
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
logger.info(f" 📦 Adding failed job to Redis cache: {job_id} (Error: {error_type})")
self.redis_manager.add_job_to_error_cache(job_url, job_id, error_type)
async def _remove_job_from_error_cache(self, job_id: str):
"""Remove job from error cache when successfully processed."""
self.redis_manager.remove_job_from_error_cache(job_id)
async def _add_job_to_sent_cache(self, job_id: str):
"""Add job to sent cache when processing begins."""
self.redis_manager.add_job_to_sent_cache(job_id)
async def _remove_job_from_sent_cache(self, job_id: str):
"""Remove job from sent cache when processing completes."""
self.redis_manager.remove_job_from_sent_cache(job_id)
def _get_platform(self, url: str) -> str:
if "ashbyhq.com" in url:
return "ashby"
elif "lever.co" in url:
return "lever"
elif "greenhouse.io" in url:
return "greenhouse"
else:
return "unknown"
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def scrape_job(
self,
job_url: str,
company_name: str,
message_id: str
):
platform = self._get_platform(job_url)
if platform == "unknown":
logger.info(f"⏭️ Skipping unsupported platform: {job_url}")
return True
job_id = job_url.strip("/").split("/")[-1]
# Add job to sent cache at the beginning of processing
await self._add_job_to_sent_cache(job_id)
if await self._is_job_seen(job_id):
logger.info(f"⏭️ Skipping already processed job: {job_id}")
await self._remove_job_from_sent_cache(job_id)
return True
cached_result = await self._get_cached_llm_result(job_url)
if cached_result:
logger.info(f"📦 Using cached LLM result for: {job_url}")
await self.llm_agent.save_job_data(cached_result, company_name)
await self._mark_job_seen(job_id)
await self._remove_job_from_sent_cache(job_id)
return True
context = None
page = None
start_time = time.time()
try:
context = await self.create_fresh_context()
page = await context.new_page()
timeout_ms = self.engine.optimization_params.get("request_timeout", 120000)
temp_fetcher = StealthyFetcher(self.engine, self.browser, context)
fetch_timeout = 60000 if platform == "lever" else timeout_ms
job_page = await asyncio.wait_for(
temp_fetcher.fetch_url(job_url, wait_for_selector="h1", timeout=fetch_timeout),
timeout=fetch_timeout / 1000.0
)
# Check if job still exists (minimal content validation)
page_content = await job_page.content()
if len(page_content.strip()) < 500: # Arbitrary threshold for "page exists"
logger.error(f"❌ Job no longer exists (empty/deleted): {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "job_not_found")
self.engine.report_outcome("job_not_found", url=job_url)
await self._remove_job_from_sent_cache(job_id)
return False
if platform == "ashby":
try:
await job_page.wait_for_selector("div[class*='job-posting'], article, main", timeout=60000)
except Exception:
logger.warning(f"⚠️ Ashby page didn't load properly: {job_url}")
await self._remove_job_from_sent_cache(job_id)
return False
elif platform == "lever":
pass
elif platform == "greenhouse":
try:
await job_page.wait_for_selector("div.job-desc, section", timeout=60000)
except Exception:
pass
# Extract page content for initial validation
page_content = await self._extract_page_content_for_llm(job_page)
# Check for job expiration or unavailability indicators
page_text_lower = page_content.lower()
job_unavailable_indicators = [
"job no longer available",
"position has been filled",
"this job has expired",
"job posting has expired",
"no longer accepting applications",
"position is closed",
"job is no longer active",
"this position is no longer open"
]
if any(indicator in page_text_lower for indicator in job_unavailable_indicators):
logger.error(f"❌ Job no longer available/expired: {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "job_not_found")
self.engine.report_outcome("job_not_found", url=job_url)
await self._remove_job_from_sent_cache(job_id)
return False
# 🔑 APPLY TYPE LOGIC
if platform in ["ashby", "lever", "greenhouse"]:
apply_type = 'AI' # Always AI for these platforms
else:
# For other platforms: check if form is accessible without login
apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')")
apply_type = 'signup' # default
if apply_btn:
await self._human_click(job_page, apply_btn)
speed = self.engine.optimization_params.get("base_delay", 2.0)
await asyncio.sleep(2 * (speed / 2))
form = await job_page.query_selector("form, div[class*='application-form']")
if form:
# Check for login prompts in form
login_indicators = await job_page.query_selector("input[type='email'], input[type='password'], text='sign in', text='log in'")
if not login_indicators:
apply_type = 'AI'
else:
apply_type = 'signup'
else:
apply_type = 'signup'
final_url = job_url
# Hardcode posted_date to Dec 1st 2025
posted_date = "12/01/25"
raw_data = {
"page_content": page_content,
"url": final_url,
"job_id": job_id,
"search_keywords": company_name,
"posted_date": posted_date
}
llm_timeout = max(60, self.engine.feedback.get("avg_response_time", 10) * 2)
refined_data = await asyncio.wait_for(
self.llm_agent.refine_job_data(raw_data, self.user_request),
timeout=llm_timeout
)
success = False
if refined_data:
# Define compulsory fields that must be present and valid
compulsory_fields = ['title', 'company_name', 'description', 'job_id', 'url']
# Check if ALL compulsory fields are present and valid BEFORE any processing
missing_fields = []
for field in compulsory_fields:
field_value = refined_data.get(field, "")
if not field_value or str(field_value).strip() in ["", "N/A", "Unknown", "Not provided", "Not available", "Company", "Job"]:
missing_fields.append(field)
# If any compulsory field is missing, discard the job immediately
if missing_fields:
logger.error(f"❌ Job discarded - missing compulsory fields: {', '.join(missing_fields)} : {final_url}")
error_type = "missing_compulsory_fields"
await self._add_job_to_redis_cache(final_url, job_id, error_type)
self.engine.report_outcome(error_type, url=final_url)
await self._remove_job_from_sent_cache(job_id)
return False
# If we get here, all compulsory fields are valid - now add additional metadata
refined_data.update({
'apply_type': apply_type,
'scraped_at': datetime.now().isoformat(),
'category': company_name,
'posted_date': posted_date,
'message_id': message_id,
'platform': platform
})
# Save to database and markdown
await self.llm_agent.save_job_data(refined_data, company_name)
await self._cache_llm_result(job_url, refined_data)
await self._mark_job_seen(job_id)
# Remove from error cache if it was previously failed
await self._remove_job_from_error_cache(job_id)
response_time = time.time() - start_time
self.engine.report_outcome("success", url=final_url, response_time=response_time)
logger.info(f"✅ Scraped ({platform}): {refined_data['title'][:50]}... (Apply Type: {apply_type})")
success = True
await self._remove_job_from_sent_cache(job_id)
else:
logger.warning(f"🟡 LLM failed to refine: {final_url}")
await self._add_job_to_redis_cache(final_url, job_id, "llm_failure")
self.engine.report_outcome("llm_failure", url=final_url)
await self._remove_job_from_sent_cache(job_id)
return success
except asyncio.TimeoutError:
logger.error(f"⏰ Timeout processing job ({platform}): {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "timeout")
self.engine.report_outcome("timeout", url=job_url)
await self._remove_job_from_sent_cache(job_id)
return False
except Exception as e:
error_msg = str(e)
if "NoneType" in error_msg or "disconnected" in error_msg or "Browser" in error_msg:
logger.warning("Browser connection lost. Forcing reinitialization.")
await self.close_browser()
# 🔍 Distinguish job-not-found vs other errors
if "page.goto: net::ERR_ABORTED" in error_msg or "page.goto: net::ERR_FAILED" in error_msg:
logger.error(f"❌ Job no longer exists (404/network error): {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "job_not_found")
self.engine.report_outcome("job_not_found", url=job_url)
else:
# Categorize other errors
error_type = "exception"
if "timeout" in error_msg.lower():
error_type = "timeout"
elif "llm" in error_msg.lower() or "refine" in error_msg.lower():
error_type = "llm_failure"
else:
error_type = "scraping_error"
logger.error(f"💥 Error processing job ({platform}) {job_url}: {error_msg}")
await self._add_job_to_redis_cache(job_url, job_id, error_type)
self.engine.report_outcome(error_type, url=job_url)
await self._remove_job_from_sent_cache(job_id)
return False
finally:
if context:
try:
await context.close()
except Exception:
pass
# Global metrics
METRICS = {
"processed": 0,
"success": 0,
"failed": 0,
"skipped": 0,
"start_time": time.time()
}
async def process_message_async(scraper: MultiPlatformJobScraper, ch, method, properties, body):
try:
job_data = json.loads(body)
job_link = job_data['job_link']
company_name = job_data['company_name']
message_id = properties.message_id or f"msg_{int(time.time()*1000)}"
logger.info(f"📥 Processing job: {job_link} (ID: {message_id})")
success = await scraper.scrape_job(job_link, company_name, message_id)
METRICS["processed"] += 1
if success:
METRICS["success"] += 1
else:
METRICS["failed"] += 1
except json.JSONDecodeError:
logger.error("❌ Invalid JSON in message")
METRICS["failed"] += 1
except Exception as e:
logger.error(f"💥 Unexpected error: {str(e)}")
METRICS["failed"] += 1
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
def callback_wrapper(scraper: MultiPlatformJobScraper):
def callback(ch, method, properties, body):
asyncio.run(process_message_async(scraper, ch, method, properties, body))
return callback
def start_consumer():
engine = FingerprintScrapingEngine(
# Other env vars...
seed=os.getenv("SEED_NAME", "multiplatform_scraper"),
target_os="windows",
num_variations=10
)
scraper = MultiPlatformJobScraper(engine)
connection = None
for attempt in range(5):
try:
parameters = create_ssl_connection_parameters()
if RABBITMQ_SSL_ENABLED:
logger.info(f"Connecting to RabbitMQ over SSL at {RABBITMQ_HOST}:{RABBITMQ_PORT}")
else:
logger.info(f"Connecting to RabbitMQ at {RABBITMQ_HOST}:{RABBITMQ_PORT}")
connection = pika.BlockingConnection(parameters)
break
except Exception as e:
logger.error(f"RabbitMQ connection attempt {attempt + 1} failed: {e}")
time.sleep(2 ** attempt)
if not connection:
logger.error("Failed to connect to RabbitMQ after retries")
return
channel = connection.channel()
channel.queue_declare(queue='job_queue', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper))
logger.info('Waiting for messages (Ashby, Lever, Greenhouse). To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
logger.info("Shutting down...")
channel.stop_consuming()
connection.close()
asyncio.run(scraper.close_browser())
if __name__ == "__main__":
start_consumer()