import asyncio import random import os import json import time from typing import Optional, Dict from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError from browserforge.injectors.playwright import AsyncNewContext from llm_agent import LLMJobRefiner from fetcher import StealthyFetcher from datetime import datetime import redis import pika from tenacity import retry, stop_after_attempt, wait_exponential import logging # Import your engine from scraping_engine import FingerprintScrapingEngine # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Environment variables RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "rabbitq.thejobhub.xyz") RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5672")) RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest") RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest") REDIS_HOST = os.getenv("REDIS_HOST", "redis-scrape.thejobhub.xyz") REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) class AshbyJobScraper: def __init__( self, engine: FingerprintScrapingEngine, user_request: str = "Extract all standard job details" ): self.engine = engine self.user_request = user_request self.llm_agent = LLMJobRefiner() self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True) self.browser = None self.context = None async def init_browser(self): """Initialize browser once using engine's fingerprint""" if self.browser is None: profile = self.engine._select_profile() renderer = random.choice(self.engine.common_renderers[self.engine.os]) vendor = random.choice(self.engine.common_vendors) spoof_script = self.engine._get_spoof_script(renderer, vendor) pw = await async_playwright().start() self.browser = await pw.chromium.launch( headless=True, args=['--disable-blink-features=AutomationControlled'] ) self.context = await AsyncNewContext(self.browser, fingerprint=profile) await self.context.add_init_script(f""" Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }}); Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }}); Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }}); """) await self.context.add_init_script(spoof_script) async def close_browser(self): if self.browser: await self.browser.close() self.browser = None async def _safe_inner_text(self, element): if not element: return "Unknown" try: return await element.text_content() except: return "Unknown" async def _human_click(self, page, element, wait_after: bool = True): if not element: return False await element.scroll_into_view_if_needed() speed = self.engine.optimization_params.get("base_delay", 2.0) / 2 await asyncio.sleep(random.uniform(0.3, 0.8) * (speed / 2)) try: await element.click() if wait_after: await asyncio.sleep(random.uniform(2, 4) * (speed / 2)) return True except: return False async def _extract_page_content_for_llm(self, page) -> str: speed = self.engine.optimization_params.get("base_delay", 2.0) await asyncio.sleep(2 * (speed / 2)) await self.engine._human_like_scroll(page) await asyncio.sleep(2 * (speed / 2)) return await page.content() async def _is_job_seen(self, job_id: str) -> bool: return self.redis_client.get(f"seen_job:{job_id}") is not None async def _mark_job_seen(self, job_id: str): self.redis_client.setex(f"seen_job:{job_id}", 7 * 24 * 3600, "1") async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]: cached = self.redis_client.get(f"llm_cache:{job_url}") if cached: return json.loads(cached) return None async def _cache_llm_result(self, job_url: str, result: Dict): self.redis_client.setex(f"llm_cache:{job_url}", 7 * 24 * 3600, json.dumps(result)) async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str): try: job_data = { "job_url": job_url, "job_id": job_id, "error_type": error_type, "timestamp": datetime.now().isoformat() } self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data)) logger.info(f"📦 Added failed job to Redis cache: {job_id} (Error: {error_type})") except Exception as e: logger.error(f"❌ Failed to add to Redis: {str(e)}") @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) async def scrape_job( self, job_url: str, company_name: str, message_id: str ): job_id = job_url.strip("/").split("/")[-1] if await self._is_job_seen(job_id): logger.info(f"⏭️ Skipping already processed job: {job_id}") return True cached_result = await self._get_cached_llm_result(job_url) if cached_result: logger.info(f"📦 Using cached LLM result for: {job_url}") await self.llm_agent.save_job_data(cached_result, company_name) await self._mark_job_seen(job_id) return True page = None start_time = time.time() try: await self.init_browser() page = await self.context.new_page() # Fetch with timeout from engine config timeout_ms = self.engine.optimization_params.get("request_timeout", 120000) temp_fetcher = StealthyFetcher(self.engine, self.browser, self.context) job_page = await asyncio.wait_for( temp_fetcher.fetch_url(job_url, wait_for_selector="h1"), timeout=timeout_ms / 1000.0 ) if not job_page: await self._add_job_to_redis_cache(job_url, job_id, "fetch_failure") self.engine.report_outcome("fetch_failure", url=job_url) return False # Handle Cloudflare if detected if await self.engine._detect_cloudflare(job_page): success = await self.engine._handle_cloudflare(job_page) if not success: await self._add_job_to_redis_cache(job_url, job_id, "cloudflare") self.engine.report_outcome("cloudflare", url=job_url) return False apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')") apply_type = 'signup' if apply_btn: await self._human_click(job_page, apply_btn) speed = self.engine.optimization_params.get("base_delay", 2.0) await asyncio.sleep(2 * (speed / 2)) form = await job_page.query_selector("form, div[class*='application-form']") if form: apply_type = 'AI' final_url = job_url page_content = await self._extract_page_content_for_llm(job_page) posted_date = datetime.now().strftime("%m/%d/%y") raw_data = { "page_content": page_content, "url": final_url, "job_id": job_id, "search_keywords": company_name, "posted_date": posted_date } # LLM call with timeout llm_timeout = max(30, self.engine.feedback.get("avg_response_time", 10) * 2) refined_data = await asyncio.wait_for( self.llm_agent.refine_job_data(raw_data, self.user_request), timeout=llm_timeout ) success = False if refined_data and refined_data.get("title", "N/A") != "N/A": compulsory_fields = ['company_name', 'job_id', 'url'] for field in compulsory_fields: if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]: if field == 'job_id': refined_data[field] = job_id elif field == 'url': refined_data[field] = final_url elif field == 'company_name': refined_data[field] = company_name refined_data['apply_type'] = apply_type refined_data['scraped_at'] = datetime.now().isoformat() refined_data['category'] = company_name refined_data['posted_date'] = posted_date refined_data['message_id'] = message_id await self.llm_agent.save_job_data(refined_data, company_name) await self._cache_llm_result(job_url, refined_data) await self._mark_job_seen(job_id) response_time = time.time() - start_time self.engine.report_outcome("success", url=final_url, response_time=response_time) logger.info(f"✅ Scraped: {refined_data['title'][:50]}...") success = True else: logger.warning(f"🟡 LLM failed to refine: {final_url}") await self._add_job_to_redis_cache(final_url, job_id, "llm_failure") self.engine.report_outcome("llm_failure", url=final_url) return success except asyncio.TimeoutError: logger.error(f"⏰ Timeout processing job: {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "timeout") self.engine.report_outcome("timeout", url=job_url) return False except Exception as e: logger.error(f"💥 Error processing job {job_url}: {str(e)}") await self._add_job_to_redis_cache(job_url, job_id, "exception") self.engine.report_outcome("exception", url=job_url) return False finally: if page: await page.close() # Global metrics METRICS = { "processed": 0, "success": 0, "failed": 0, "skipped": 0, "start_time": time.time() } async def process_message_async(scraper: AshbyJobScraper, ch, method, properties, body): try: job_data = json.loads(body) job_link = job_data['job_link'] company_name = job_data['company_name'] message_id = properties.message_id or f"msg_{int(time.time()*1000)}" logger.info(f"📥 Processing job: {job_link} (ID: {message_id})") success = await scraper.scrape_job(job_link, company_name, message_id) METRICS["processed"] += 1 if success: METRICS["success"] += 1 else: METRICS["failed"] += 1 except json.JSONDecodeError: logger.error("❌ Invalid JSON in message") METRICS["failed"] += 1 except Exception as e: logger.error(f"💥 Unexpected error: {str(e)}") METRICS["failed"] += 1 finally: ch.basic_ack(delivery_tag=method.delivery_tag) def callback_wrapper(scraper: AshbyJobScraper): def callback(ch, method, properties, body): asyncio.run(process_message_async(scraper, ch, method, properties, body)) return callback def start_consumer(): # Initialize REAL engine engine = FingerprintScrapingEngine( seed="ashby_scraper", target_os="windows", num_variations=10 ) scraper = AshbyJobScraper(engine) # RabbitMQ connection with retries connection = None for attempt in range(5): try: credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) parameters = pika.ConnectionParameters( host=RABBITMQ_HOST, port=RABBITMQ_PORT, virtual_host='/', credentials=credentials, heartbeat=600, blocked_connection_timeout=300 ) connection = pika.BlockingConnection(parameters) break except Exception as e: logger.error(f"RabbitMQ connection attempt {attempt + 1} failed: {e}") time.sleep(2 ** attempt) if not connection: logger.error("Failed to connect to RabbitMQ after retries") return channel = connection.channel() channel.queue_declare(queue='job_queue', durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper)) logger.info('Waiting for messages. To exit press CTRL+C') try: channel.start_consuming() except KeyboardInterrupt: logger.info("Shutting down...") channel.stop_consuming() connection.close() asyncio.run(scraper.close_browser()) if __name__ == "__main__": start_consumer()