Ofure Ikheloa b13d14d26d Enhance job handling in scraper and sender modules:
- Update fetch timeout in StealthyFetcher for improved reliability.
- Refactor LLMJobRefiner to create and manage Quelah Jobs table in PostgreSQL.
- Modify RedisManager to track sent job counts for jobs.csv and adjust deduplication logic.
- Implement job URL-based deduplication across scraper and sender.
2025-12-12 21:14:37 +01:00

576 lines
22 KiB
Python

import asyncio
import random
import os
import json
import time
from typing import Optional, Dict
from playwright.async_api import async_playwright
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
from fetcher import StealthyFetcher
from datetime import datetime
import pika
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
from scraping_engine import FingerprintScrapingEngine
from dotenv import load_dotenv
from ssl_connection import create_ssl_connection_parameters # Import from ssl.py
import redis
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Environment variables
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5671"))
RABBITMQ_SSL_ENABLED = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true"
# Redis configuration
REDIS_HOST = os.getenv('REDIS_HOST', 'redis-scrape.thejobhub.xyz')
REDIS_PORT = int(os.getenv('REDIS_PORT', '6380'))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD')
REDIS_SSL_ENABLED = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true'
class RedisManager:
"""Manages Redis connection and operations for job tracking and caching."""
def __init__(self):
self.redis_client = None
self._connect()
def _connect(self):
"""Establish connection to Redis server."""
if not REDIS_PASSWORD:
logger.warning("Warning: REDIS_PASSWORD not found in environment.")
try:
self.redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
ssl=REDIS_SSL_ENABLED,
ssl_cert_reqs=None,
socket_connect_timeout=10,
socket_timeout=30,
retry_on_timeout=True
)
response = self.redis_client.ping()
logger.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}! Response: {response}")
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
self.redis_client = None
def is_job_seen(self, job_url: str) -> bool:
"""✅ CHANGED: Check by job URL instead of job ID"""
if not self.redis_client:
return False
try:
return bool(self.redis_client.exists(f"job_seen:{job_url}"))
except Exception as e:
logger.error(f"Redis error checking job_seen: {e}")
return False
def mark_job_seen(self, job_url: str):
"""✅ CHANGED: Mark by job URL instead of job ID"""
if not self.redis_client:
return
try:
self.redis_client.setex(f"job_seen:{job_url}", 2592000, "1")
except Exception as e:
logger.error(f"Redis error marking job_seen: {e}")
def get_cached_llm_result(self, job_url: str) -> Optional[Dict]:
if not self.redis_client:
return None
try:
cached_data = self.redis_client.get(f"llm_cache:{job_url}")
if cached_data:
return json.loads(cached_data)
return None
except Exception as e:
logger.error(f"Redis error getting LLM cache: {e}")
return None
def cache_llm_result(self, job_url: str, result: Dict):
if not self.redis_client:
return
try:
self.redis_client.setex(f"llm_cache:{job_url}", 604800, json.dumps(result))
except Exception as e:
logger.error(f"Redis error caching LLM result: {e}")
def add_job_to_error_cache(self, job_url: str, job_id: str, error_type: str):
if not self.redis_client:
return
try:
error_data = {
"job_url": job_url,
"job_id": job_id,
"error_type": error_type,
"timestamp": datetime.now().isoformat()
}
self.redis_client.setex(f"error_cache:{job_id}", 3600, json.dumps(error_data))
except Exception as e:
logger.error(f"Redis error adding to error cache: {e}")
class MultiPlatformJobScraper:
def __init__(
self,
engine: FingerprintScrapingEngine,
user_request: str = "Extract all standard job details"
):
self.engine = engine
self.user_request = user_request
self.llm_agent = LLMJobRefiner()
self.browser = None
self.pw = None
self.redis_manager = RedisManager()
async def init_browser(self):
if self.browser is not None:
try:
await self.browser.new_page()
await self.close_browser()
except:
await self.close_browser()
if self.browser is None:
try:
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
self.pw = await async_playwright().start()
self.browser = await self.pw.chromium.launch(
headless=True,
args=[
'--disable-blink-features=AutomationControlled',
'--no-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu'
]
)
logger.info("✅ Browser launched (will reuse for all jobs)")
except Exception as e:
logger.error(f"💥 Failed to launch browser: {e}")
raise
async def create_fresh_context(self):
if self.browser is None:
await self.init_browser()
try:
await self.browser.new_page()
except Exception:
logger.warning("Browser appears dead. Reinitializing...")
await self.close_browser()
await self.init_browser()
profile = self.engine._select_profile()
context = await AsyncNewContext(self.browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
spoof_script = self.engine._get_spoof_script(
random.choice(self.engine.common_renderers[self.engine.os]),
random.choice(self.engine.common_vendors)
)
await context.add_init_script(spoof_script)
return context
async def close_browser(self):
if self.browser:
try:
await self.browser.close()
except:
pass
self.browser = None
if self.pw:
try:
await self.pw.stop()
except:
pass
self.pw = None
async def _safe_inner_text(self, element):
if not element:
return "Unknown"
try:
return await element.text_content()
except:
return "Unknown"
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
speed = self.engine.optimization_params.get("base_delay", 2.0) / 2
await asyncio.sleep(random.uniform(0.3, 0.8) * (speed / 2))
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * (speed / 2))
return True
except:
return False
async def _extract_page_content_for_llm(self, page) -> str:
speed = self.engine.optimization_params.get("base_delay", 2.0)
await asyncio.sleep(2 * (speed / 2))
if "lever.co" not in page.url:
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * (speed / 2))
return await page.content()
async def _is_job_seen(self, job_url: str) -> bool:
"""✅ Use job URL for deduplication"""
return self.redis_manager.is_job_seen(job_url)
async def _mark_job_seen(self, job_url: str):
"""✅ Use job URL for marking"""
self.redis_manager.mark_job_seen(job_url)
async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]:
return self.redis_manager.get_cached_llm_result(job_url)
async def _cache_llm_result(self, job_url: str, result: Dict):
self.redis_manager.cache_llm_result(job_url, result)
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
logger.info(f" 📦 Adding failed job to Redis cache: {job_id} (Error: {error_type})")
self.redis_manager.add_job_to_error_cache(job_url, job_id, error_type)
def _get_platform(self, url: str) -> str:
if "ashbyhq.com" in url:
return "ashby"
elif "lever.co" in url:
return "lever"
elif "greenhouse.io" in url:
return "greenhouse"
else:
return "unknown"
def _is_job_expired_or_invalid(self, page_content: str) -> bool:
"""Check if job is expired, removed, or has no description"""
content_lower = page_content.lower()
# Check for JavaScript-only pages
if "you need to enable javascript to run this app" in content_lower:
return True
invalid_phrases = [
"job no longer available",
"position has been filled",
"this job has expired",
"page not found",
"404 error",
"job has been closed",
"erweima.png", # Detect spam/ad content
"wocao03.com",
"github.com/wocao01"
]
for phrase in invalid_phrases:
if phrase in content_lower:
return True
# Check for meaningful description content
description_keywords = ['responsibilities', 'requirements', 'description', 'duties', 'role', 'about the']
has_description = any(kw in content_lower for kw in description_keywords)
return not has_description
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def scrape_job(
self,
job_url: str,
company_name: str,
message_id: str
):
platform = self._get_platform(job_url)
# ✅ ONLY extract job_id from URL
job_id = job_url.strip("/").split("/")[-1]
# ✅ Check if already processed BY URL (not job_id)
if await self._is_job_seen(job_url):
logger.info(f"⏭️ Skipping already processed job URL: {job_url}")
return True
cached_result = await self._get_cached_llm_result(job_url)
if cached_result:
logger.info(f"📦 Using cached LLM result for: {job_url}")
# Save to Quelah Jobs - company_name will be overridden by LLM if found
await self.llm_agent.save_job_data(cached_result, company_name, "quelah")
await self._mark_job_seen(job_url) # ✅ Mark by URL
return True
context = None
page = None
start_time = time.time()
try:
context = await self.create_fresh_context()
page = await context.new_page()
timeout_ms = self.engine.optimization_params.get("request_timeout", 120000)
temp_fetcher = StealthyFetcher(self.engine, self.browser, context)
fetch_timeout = 60000 if platform == "lever" else timeout_ms
# ✅ PLATFORM-SPECIFIC WAIT LOGIC WITH ASHBY FIX
if platform == "ashby":
# Ashby requires JS execution - wait for network idle + job content
job_page = await asyncio.wait_for(
temp_fetcher.fetch_url(job_url, wait_for_selector=None, timeout=fetch_timeout),
timeout=fetch_timeout / 100.0
)
if job_page:
# Wait for React hydration (job content to appear)
try:
await job_page.wait_for_function(
"document.querySelector('h1') && document.querySelector('h1').innerText.length > 0",
timeout=120000
)
except Exception:
# Fallback: check if we got valid content
content = await job_page.content()
if "you need to enable javascript" in content.lower():
logger.warning(f"⚠️ Ashby page still shows JS error: {job_url}")
raise Exception("Ashby JS content not loaded")
elif platform == "greenhouse":
job_page = await asyncio.wait_for(
temp_fetcher.fetch_url(job_url, wait_for_selector="h1, div.job-desc", timeout=fetch_timeout),
timeout=fetch_timeout / 1000.0
)
else: # lever & others
job_page = await asyncio.wait_for(
temp_fetcher.fetch_url(job_url, wait_for_selector="h1", timeout=fetch_timeout),
timeout=fetch_timeout / 1000.0
)
if job_page is None:
logger.error(f"❌ Failed to load page for {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "page_load_failed")
await self._mark_job_seen(job_url)
return True
page_content = await job_page.content()
if self._is_job_expired_or_invalid(page_content):
logger.warning(f"🗑️ Discarding invalid job: {job_url}")
self.engine.report_outcome("job_discarded", url=job_url)
await self._mark_job_seen(job_url) # ✅ Mark by URL
return True
# Apply type logic
if platform in ["ashby", "lever", "greenhouse"]:
apply_type = 'AI'
else:
apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')")
apply_type = 'signup'
if apply_btn:
await self._human_click(job_page, apply_btn)
speed = self.engine.optimization_params.get("base_delay", 2.0)
await asyncio.sleep(2 * (speed / 2))
form = await job_page.query_selector("form, div[class*='application-form']")
if form:
login_indicators = await job_page.query_selector("input[type='email'], input[type='password'], text='sign in', text='log in'")
if not login_indicators:
apply_type = 'AI'
final_url = job_url
page_content = await self._extract_page_content_for_llm(job_page)
posted_date = "12/01/25" # Fixed date
raw_data = {
"page_content": page_content,
"url": final_url,
"job_id": job_id,
"search_keywords": company_name, # Only used if LLM can't find company
"posted_date": posted_date
}
llm_timeout = max(60, self.engine.feedback.get("avg_response_time", 10) * 2)
refined_data = await asyncio.wait_for(
self.llm_agent.refine_job_data(raw_data, self.user_request),
timeout=llm_timeout
)
success = False
if refined_data and refined_data.get("title", "N/A") != "N/A":
# ✅ ONLY job_id, url are guaranteed - everything else from LLM
compulsory_fields = ['job_id', 'url']
for field in compulsory_fields:
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]:
if field == 'job_id':
refined_data[field] = job_id
elif field == 'url':
refined_data[field] = final_url
# Company name: prefer LLM extraction, fallback to queue
if not refined_data.get('company_name') or refined_data['company_name'] in ["N/A", "", "Unknown"]:
refined_data['company_name'] = company_name
refined_data.update({
'apply_type': apply_type,
'scraped_at': datetime.now().isoformat(),
'category': company_name,
'posted_date': posted_date,
'message_id': message_id,
'platform': platform
})
await self.llm_agent.save_job_data(refined_data, company_name, "quelah")
await self._cache_llm_result(job_url, refined_data)
await self._mark_job_seen(job_url) # ✅ Mark by URL
response_time = time.time() - start_time
self.engine.report_outcome("success", url=final_url, response_time=response_time)
logger.info(f"✅ Saved to Quelah Jobs ({platform}): {refined_data['title'][:50]}...")
success = True
else:
logger.warning(f"🟡 LLM failed to refine: {final_url}")
await self._add_job_to_redis_cache(final_url, job_id, "llm_failure")
await self._mark_job_seen(job_url) # ✅ Mark by URL
self.engine.report_outcome("llm_failure", url=final_url)
return True
return success
except asyncio.TimeoutError:
logger.error(f"⏰ Timeout processing job ({platform}): {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "timeout")
await self._mark_job_seen(job_url) # ✅ Mark by URL
self.engine.report_outcome("timeout", url=job_url)
return True
except Exception as e:
error_msg = str(e)
if "NoneType" in error_msg or "disconnected" in error_msg or "Browser" in error_msg:
logger.warning("Browser connection lost. Forcing reinitialization.")
await self.close_browser()
error_type = "exception"
if "page.goto: net::ERR_ABORTED" in error_msg or "page.goto: net::ERR_FAILED" in error_msg:
error_type = "job_not_found"
else:
if "required" in error_msg.lower() or "missing" in error_msg.lower():
error_type = "missing_fields"
elif "captcha" in error_msg.lower() or "cloudflare" in error_msg.lower():
error_type = "anti_bot_protection"
logger.error(f"💥 Error processing job ({platform}) {job_url}: {error_msg}")
await self._add_job_to_redis_cache(job_url, job_id, error_type)
await self._mark_job_seen(job_url) # ✅ Mark by URL
self.engine.report_outcome(error_type, url=job_url)
return True
finally:
if context:
try:
await context.close()
except Exception:
pass
# Global metrics
METRICS = {
"processed": 0,
"success": 0,
"failed": 0,
"skipped": 0,
"start_time": time.time()
}
async def process_message_async(scraper: MultiPlatformJobScraper, ch, method, properties, body):
try:
job_data = json.loads(body)
job_link = job_data['job_link']
company_name = job_data['company_name']
message_id = properties.message_id or f"msg_{int(time.time()*1000)}"
logger.info(f"📥 Processing job: {job_link} (ID: {message_id})")
_ = await scraper.scrape_job(job_link, company_name, message_id)
METRICS["processed"] += 1
except json.JSONDecodeError:
logger.error("❌ Invalid JSON in message")
ch.basic_ack(delivery_tag=method.delivery_tag)
METRICS["failed"] += 1
return
except Exception as e:
logger.error(f"💥 Unexpected error: {str(e)}")
METRICS["failed"] += 1
finally:
# ✅ CRITICAL: Acknowledge ALL messages
ch.basic_ack(delivery_tag=method.delivery_tag)
def callback_wrapper(scraper: MultiPlatformJobScraper):
def callback(ch, method, properties, body):
asyncio.run(process_message_async(scraper, ch, method, properties, body))
return callback
def start_consumer():
engine = FingerprintScrapingEngine(
seed="multiplatform_scraper",
target_os="windows",
num_variations=10
)
scraper = MultiPlatformJobScraper(engine)
connection = None
for attempt in range(5):
try:
parameters = create_ssl_connection_parameters()
if RABBITMQ_SSL_ENABLED:
logger.info(f"Connecting to RabbitMQ over SSL at {RABBITMQ_HOST}:{RABBITMQ_PORT}")
else:
logger.info(f"Connecting to RabbitMQ at {RABBITMQ_HOST}:{RABBITMQ_PORT}")
connection = pika.BlockingConnection(parameters)
break
except Exception as e:
logger.error(f"RabbitMQ connection attempt {attempt + 1} failed: {e}")
time.sleep(2 ** attempt)
if not connection:
logger.error("Failed to connect to RabbitMQ after retries")
return
channel = connection.channel()
channel.queue_declare(queue='job_queue', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper))
logger.info('Waiting for messages (All platforms → Quelah Jobs). To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
logger.info("Shutting down...")
channel.stop_consuming()
connection.close()
asyncio.run(scraper.close_browser())
if __name__ == "__main__":
start_consumer()