diff --git a/scraper.py b/scraper.py index 89800c2..9a001c6 100644 --- a/scraper.py +++ b/scraper.py @@ -15,7 +15,7 @@ import logging from tenacity import retry, stop_after_attempt, wait_exponential from scraping_engine import FingerprintScrapingEngine from dotenv import load_dotenv -from ssl_connection import create_ssl_connection_parameters # Import from ssl.py +from ssl_connection import create_ssl_connection_parameters # Import from ssl.py import redis load_dotenv() # Configure logging @@ -31,6 +31,11 @@ REDIS_HOST = os.getenv('REDIS_HOST') REDIS_PORT = int(os.getenv('REDIS_PORT', '6380')) REDIS_PASSWORD = os.getenv('REDIS_PASSWORD') REDIS_SSL_ENABLED = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true' + +# TTL constants (in seconds) +JOB_SEEN_TTL = 2592000 # 30 days (1 month) + + class RedisManager: """Manages Redis connection and operations for job tracking and caching.""" @@ -72,15 +77,6 @@ class RedisManager: logger.error(f"Redis error checking job_seen: {e}") return False - def mark_job_seen(self, job_id: str): - if not self.redis_client: - return - - try: - self.redis_client.setex(f"job_seen:{job_id}", 2592000, "1") - except Exception as e: - logger.error(f"Redis error marking job_seen: {e}") - def get_cached_llm_result(self, job_url: str) -> Optional[Dict]: if not self.redis_client: return None @@ -94,19 +90,26 @@ class RedisManager: logger.error(f"Redis error getting LLM cache: {e}") return None + def mark_job_seen(self, job_id: str): + if not self.redis_client: + return + try: + # Set with TTL of 1 month + self.redis_client.setex(f"job_seen:{job_id}", JOB_SEEN_TTL, "1") + except Exception as e: + logger.error(f"Redis error marking job_seen: {e}") + def cache_llm_result(self, job_url: str, result: Dict): if not self.redis_client: return - try: - self.redis_client.setex(f"llm_cache:{job_url}", 604800, json.dumps(result)) + self.redis_client.set(f"llm_cache:{job_url}", json.dumps(result)) # No TTL for LLM cache except Exception as e: logger.error(f"Redis error caching LLM result: {e}") - + def add_job_to_error_cache(self, job_url: str, job_id: str, error_type: str): if not self.redis_client: return - try: error_data = { "job_url": job_url, @@ -114,10 +117,52 @@ class RedisManager: "error_type": error_type, "timestamp": datetime.now().isoformat() } - self.redis_client.setex(f"error_cache:{job_id}", 3600, json.dumps(error_data)) + self.redis_client.set(f"error_cache:{job_id}", json.dumps(error_data)) # No TTL for error cache except Exception as e: logger.error(f"Redis error adding to error cache: {e}") + def remove_job_from_error_cache(self, job_id: str): + """Remove job from error cache when successfully processed later.""" + if not self.redis_client: + return + try: + deleted = self.redis_client.delete(f"error_cache:{job_id}") + if deleted: + logger.info(f"โœ… Removed job {job_id} from error cache after successful processing") + except Exception as e: + logger.error(f"Redis error removing from error cache: {e}") + + + def add_job_to_sent_cache(self, job_id: str): + """Mark job as sent for processing.""" + if not self.redis_client: + return + try: + self.redis_client.set(f"sent_job:{job_id}", "1") # No TTL + except Exception as e: + logger.error(f"Redis error adding to sent cache: {e}") + + def remove_job_from_sent_cache(self, job_id: str): + """Remove job from sent cache upon successful or failed completion.""" + if not self.redis_client: + return + try: + deleted = self.redis_client.delete(f"sent_job:{job_id}") + if deleted: + logger.debug(f"๐Ÿงน Removed job {job_id} from sent cache") + except Exception as e: + logger.error(f"Redis error removing from sent cache: {e}") + + def is_job_in_sent_cache(self, job_id: str) -> bool: + """Check if job is already in sent cache.""" + if not self.redis_client: + return False + try: + return bool(self.redis_client.exists(f"sent_job:{job_id}")) + except Exception as e: + logger.error(f"Redis error checking sent cache: {e}") + return False + class MultiPlatformJobScraper: def __init__( @@ -244,6 +289,18 @@ class MultiPlatformJobScraper: logger.info(f" ๐Ÿ“ฆ Adding failed job to Redis cache: {job_id} (Error: {error_type})") self.redis_manager.add_job_to_error_cache(job_url, job_id, error_type) + async def _remove_job_from_error_cache(self, job_id: str): + """Remove job from error cache when successfully processed.""" + self.redis_manager.remove_job_from_error_cache(job_id) + + async def _add_job_to_sent_cache(self, job_id: str): + """Add job to sent cache when processing begins.""" + self.redis_manager.add_job_to_sent_cache(job_id) + + async def _remove_job_from_sent_cache(self, job_id: str): + """Remove job from sent cache when processing completes.""" + self.redis_manager.remove_job_from_sent_cache(job_id) + def _get_platform(self, url: str) -> str: if "ashbyhq.com" in url: return "ashby" @@ -266,14 +323,20 @@ class MultiPlatformJobScraper: logger.info(f"โญ๏ธ Skipping unsupported platform: {job_url}") return True job_id = job_url.strip("/").split("/")[-1] + + # Add job to sent cache at the beginning of processing + await self._add_job_to_sent_cache(job_id) + if await self._is_job_seen(job_id): logger.info(f"โญ๏ธ Skipping already processed job: {job_id}") + await self._remove_job_from_sent_cache(job_id) return True cached_result = await self._get_cached_llm_result(job_url) if cached_result: logger.info(f"๐Ÿ“ฆ Using cached LLM result for: {job_url}") await self.llm_agent.save_job_data(cached_result, company_name) await self._mark_job_seen(job_id) + await self._remove_job_from_sent_cache(job_id) return True context = None page = None @@ -294,12 +357,14 @@ class MultiPlatformJobScraper: logger.error(f"โŒ Job no longer exists (empty/deleted): {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") self.engine.report_outcome("job_not_found", url=job_url) + await self._remove_job_from_sent_cache(job_id) return False if platform == "ashby": try: await job_page.wait_for_selector("div[class*='job-posting'], article, main", timeout=60000) except Exception: logger.warning(f"โš ๏ธ Ashby page didn't load properly: {job_url}") + await self._remove_job_from_sent_cache(job_id) return False elif platform == "lever": pass @@ -328,6 +393,7 @@ class MultiPlatformJobScraper: logger.error(f"โŒ Job no longer available/expired: {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "job_not_found") self.engine.report_outcome("job_not_found", url=job_url) + await self._remove_job_from_sent_cache(job_id) return False # ๐Ÿ”‘ APPLY TYPE LOGIC if platform in ["ashby", "lever", "greenhouse"]: @@ -383,6 +449,7 @@ class MultiPlatformJobScraper: error_type = "missing_compulsory_fields" await self._add_job_to_redis_cache(final_url, job_id, error_type) self.engine.report_outcome(error_type, url=final_url) + await self._remove_job_from_sent_cache(job_id) return False # If we get here, all compulsory fields are valid - now add additional metadata @@ -399,20 +466,25 @@ class MultiPlatformJobScraper: await self.llm_agent.save_job_data(refined_data, company_name) await self._cache_llm_result(job_url, refined_data) await self._mark_job_seen(job_id) + # Remove from error cache if it was previously failed + await self._remove_job_from_error_cache(job_id) response_time = time.time() - start_time self.engine.report_outcome("success", url=final_url, response_time=response_time) logger.info(f"โœ… Scraped ({platform}): {refined_data['title'][:50]}... (Apply Type: {apply_type})") success = True + await self._remove_job_from_sent_cache(job_id) else: logger.warning(f"๐ŸŸก LLM failed to refine: {final_url}") await self._add_job_to_redis_cache(final_url, job_id, "llm_failure") self.engine.report_outcome("llm_failure", url=final_url) + await self._remove_job_from_sent_cache(job_id) return success except asyncio.TimeoutError: logger.error(f"โฐ Timeout processing job ({platform}): {job_url}") await self._add_job_to_redis_cache(job_url, job_id, "timeout") self.engine.report_outcome("timeout", url=job_url) + await self._remove_job_from_sent_cache(job_id) return False except Exception as e: error_msg = str(e) @@ -438,6 +510,7 @@ class MultiPlatformJobScraper: logger.error(f"๐Ÿ’ฅ Error processing job ({platform}) {job_url}: {error_msg}") await self._add_job_to_redis_cache(job_url, job_id, error_type) self.engine.report_outcome(error_type, url=job_url) + await self._remove_job_from_sent_cache(job_id) return False finally: if context: @@ -478,9 +551,12 @@ def callback_wrapper(scraper: MultiPlatformJobScraper): def callback(ch, method, properties, body): asyncio.run(process_message_async(scraper, ch, method, properties, body)) return callback + + def start_consumer(): engine = FingerprintScrapingEngine( - seed="multiplatform_scraper", + # Other env vars... + seed=os.getenv("SEED_NAME", "multiplatform_scraper"), target_os="windows", num_variations=10 )