+ - Look for headings like "Role overview", "What you'll do", "What you bring"
+ - Location is often in
+ """
+
prompt = f"""
You are an expert job posting parser. Extract information EXACTLY as it appears in the text. DO NOT summarize, paraphrase, or invent.
CRITICAL INSTRUCTIONS:
-- The job is from AMAZON. Look for these exact section headings:
- - "## Basic Qualifications" → extract as "qualifications"
- - "## Preferred Qualifications" → include this in "qualifications" too
- - "## Description" or "About the Role" or "Key job responsibilities" → extract as "description"
- - "You Will:" or "Job responsibilities" → include in "description"
- - Requirements are often embedded in qualifications or description
+{platform_instructions}
FIELD RULES:
- description: MUST include ALL role details, responsibilities, and overview. Never "Not provided" if any job description exists.
-- qualifications: MUST include ALL content from "Basic Qualifications" and "Preferred Qualifications" sections. Combine them.
+- qualifications: MUST include ALL required skills, experience, education, and preferred qualifications. Combine them.
- requirements: If no separate "requirements" section, extract required skills/experience from qualifications/description.
-- For Amazon jobs, company_name = "Amazon".
+- location: Extract city, state, or remote status if available.
+- salary_range: Extract if explicitly mentioned (e.g., "$70,000–$85,000").
+- nature_of_work: Extract if mentioned (e.g., "Part-time", "Remote", "On-site").
- REQUIRED FIELDS (must have valid values, never "N/A"):
- - title, company_name, job_id, url
-
- OPTIONAL FIELDS (can be "Not provided"):
- - location, salary_range, nature_of_work
-
- Page Content:
- {cleaned_content}
-
- Response format (ONLY return this JSON):
- {{
- "title": "...",
- "company_name": "...",
- "location": "...",
- "description": "...",
- "qualifications": "...",
- "salary_range": "...",
- "nature_of_work": "...",
- "job_id": "{job_id}",
- "url": "{url}"
- }}
- """
+REQUIRED FIELDS (must have valid values, never "N/A"):
+- title, company_name, job_id, url
+
+OPTIONAL FIELDS (can be "Not provided"):
+- location, salary_range, nature_of_work
+
+Page Content:
+{cleaned_content}
+
+Response format (ONLY return this JSON):
+{{
+ "title": "...",
+ "company_name": "...",
+ "location": "...",
+ "description": "...",
+ "qualifications": "...",
+ "salary_range": "...",
+ "nature_of_work": "...",
+ "job_id": "{job_id}",
+ "url": "{url}"
+}}
+"""
try:
response_text = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._generate_content_sync(prompt)
- )
+ )
refined_data = self._parse_llm_response(response_text)
-
+
if not refined_data:
return None
-
- # Validate required fields
+
+ # Validate required fields
required_fields = ['title', 'company_name', 'job_id', 'url']
for field in required_fields:
if not refined_data.get(field) or refined_data[field].strip() in ["N/A", "", "Unknown", "Company", "Job"]:
return None
-
- # CRITICAL: Validate content fields - check if they SHOULD exist
- content_fields = ['description', 'qualifications']
- cleaned_original = cleaned_content.lower()
-
- # Simple heuristic: if page contains job-related keywords, content fields should NOT be "Not provided"
- job_indicators = ['responsibilit', 'duties', 'require', 'qualifi', 'skill', 'experienc', 'educat', 'degree', 'bachelor', 'master']
- has_job_content = any(indicator in cleaned_original for indicator in job_indicators)
-
- if has_job_content:
- for field in content_fields:
- value = refined_data.get(field, "").strip()
- if value in ["Not provided", "N/A", ""]:
- # LLM failed to extract existing content
- print(f" ⚠ LLM returned '{value}' for {field} but job content appears present")
- return None
-
- # Add the posted_date to the refined data
+
+ # Add the posted_date to the refined data
refined_data['posted_date'] = posted_date
-
+
return refined_data
-
+
except Exception as e:
print(f"LLM refinement failed: {str(e)}")
return None
-
+
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL)
if not json_match:
@@ -254,11 +307,15 @@ FIELD RULES:
)
cursor = conn.cursor()
+ # Add apply_type to job_data if not present (default to 'signup')
+ if 'apply_type' not in job_data:
+ job_data['apply_type'] = 'signup'
+
cursor.execute('''
INSERT INTO jobs
(title, company_name, location, description, requirements,
- qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at, posted_date)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+ qualifications, salary_range, nature_of_work, apply_type, job_id, url, category, scraped_at, posted_date)
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (job_id) DO NOTHING
''', (
job_data.get("title", "N/A"),
@@ -269,6 +326,7 @@ FIELD RULES:
job_data.get("qualifications", "N/A"),
job_data.get("salary_range", "N/A"),
job_data.get("nature_of_work", "N/A"),
+ job_data.get("apply_type", "signup"), # Default to signup if not provided
job_data.get("job_id", "N/A"),
job_data.get("url", "N/A"),
job_data.get("category", "N/A"),
@@ -299,6 +357,7 @@ FIELD RULES:
f.write(f"- *Location*: {job_data.get('location', 'N/A')}\n")
f.write(f"- *Nature of Work*: {job_data.get('nature_of_work', 'N/A')}\n")
f.write(f"- *Salary Range*: {job_data.get('salary_range', 'N/A')}\n")
+ f.write(f"- *Apply Type*: {job_data.get('apply_type', 'signup')}\n") # Add apply type to markdown
f.write(f"- *Job ID*: {job_data.get('job_id', 'N/A')}\n")
f.write(f"- *Posted Date*: {job_data.get('posted_date', 'N/A')}\n")
f.write(f"- *Category*: {job_data.get('category', 'N/A')}\n")
diff --git a/scraper.py b/scraper.py
index 756c2d9..db1ca57 100644
--- a/scraper.py
+++ b/scraper.py
@@ -5,33 +5,127 @@ import os
import json
import time
from typing import Optional, Dict
-from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
+from playwright.async_api import async_playwright
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
from fetcher import StealthyFetcher
from datetime import datetime
-import redis
import pika
-from tenacity import retry, stop_after_attempt, wait_exponential
import logging
-
-# Import your engine
+from tenacity import retry, stop_after_attempt, wait_exponential
from scraping_engine import FingerprintScrapingEngine
+from dotenv import load_dotenv
+from ssl_connection import create_ssl_connection_parameters # Import from ssl.py
+import redis
+
+load_dotenv()
# Configure logging
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+logging.basicConfig(level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Environment variables
-RABBITMQ_HOST = os.getenv("RABBITMQ_HOST")
-RABBITMQ_PORT = os.getenv("RABBITMQ_PORT")
-RABBITMQ_USER = os.getenv("RABBITMQ_USER")
-RABBITMQ_PASS = os.getenv("RABBITMQ_PASS")
-REDIS_HOST = os.getenv("REDIS_HOST")
-REDIS_PORT = os.getenv("REDIS_PORT")
+RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
+RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5671"))
+RABBITMQ_SSL_ENABLED = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true"
+
+# Redis configuration
+REDIS_HOST = os.getenv('REDIS_HOST', 'redis-scrape.thejobhub.xyz')
+REDIS_PORT = int(os.getenv('REDIS_PORT', '6380'))
+REDIS_PASSWORD = os.getenv('REDIS_PASSWORD')
+REDIS_SSL_ENABLED = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true'
-class AshbyJobScraper:
+class RedisManager:
+ """Manages Redis connection and operations for job tracking and caching."""
+
+ def __init__(self):
+ self.redis_client = None
+ self._connect()
+
+ def _connect(self):
+ """Establish connection to Redis server."""
+ if not REDIS_PASSWORD:
+ logger.warning("Warning: REDIS_PASSWORD not found in environment.")
+
+ try:
+ self.redis_client = redis.Redis(
+ host=REDIS_HOST,
+ port=REDIS_PORT,
+ password=REDIS_PASSWORD,
+ ssl=REDIS_SSL_ENABLED,
+ ssl_cert_reqs=None,
+ socket_connect_timeout=10,
+ socket_timeout=30,
+ retry_on_timeout=True
+ )
+
+ response = self.redis_client.ping()
+ logger.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}! Response: {response}")
+
+ except Exception as e:
+ logger.error(f"Failed to connect to Redis: {e}")
+ self.redis_client = None
+
+ def is_job_seen(self, job_id: str) -> bool:
+ if not self.redis_client:
+ return False
+
+ try:
+ return bool(self.redis_client.exists(f"job_seen:{job_id}"))
+ except Exception as e:
+ logger.error(f"Redis error checking job_seen: {e}")
+ return False
+
+ def mark_job_seen(self, job_id: str):
+ if not self.redis_client:
+ return
+
+ try:
+ self.redis_client.setex(f"job_seen:{job_id}", 2592000, "1")
+ except Exception as e:
+ logger.error(f"Redis error marking job_seen: {e}")
+
+ def get_cached_llm_result(self, job_url: str) -> Optional[Dict]:
+ if not self.redis_client:
+ return None
+
+ try:
+ cached_data = self.redis_client.get(f"llm_cache:{job_url}")
+ if cached_data:
+ return json.loads(cached_data)
+ return None
+ except Exception as e:
+ logger.error(f"Redis error getting LLM cache: {e}")
+ return None
+
+ def cache_llm_result(self, job_url: str, result: Dict):
+ if not self.redis_client:
+ return
+
+ try:
+ self.redis_client.setex(f"llm_cache:{job_url}", 604800, json.dumps(result))
+ except Exception as e:
+ logger.error(f"Redis error caching LLM result: {e}")
+
+ def add_job_to_error_cache(self, job_url: str, job_id: str, error_type: str):
+ if not self.redis_client:
+ return
+
+ try:
+ error_data = {
+ "job_url": job_url,
+ "job_id": job_id,
+ "error_type": error_type,
+ "timestamp": datetime.now().isoformat()
+ }
+ self.redis_client.setex(f"error_cache:{job_id}", 3600, json.dumps(error_data))
+ except Exception as e:
+ logger.error(f"Redis error adding to error cache: {e}")
+
+
+class MultiPlatformJobScraper:
def __init__(
self,
engine: FingerprintScrapingEngine,
@@ -40,35 +134,78 @@ class AshbyJobScraper:
self.engine = engine
self.user_request = user_request
self.llm_agent = LLMJobRefiner()
- self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
self.browser = None
- self.context = None
+ self.pw = None
+ self.redis_manager = RedisManager()
async def init_browser(self):
- """Initialize browser once using engine's fingerprint"""
+ if self.browser is not None:
+ try:
+ await self.browser.new_page()
+ await self.close_browser()
+ except:
+ await self.close_browser()
+
if self.browser is None:
- profile = self.engine._select_profile()
- renderer = random.choice(self.engine.common_renderers[self.engine.os])
- vendor = random.choice(self.engine.common_vendors)
- spoof_script = self.engine._get_spoof_script(renderer, vendor)
-
- pw = await async_playwright().start()
- self.browser = await pw.chromium.launch(
- headless=True,
- args=['--disable-blink-features=AutomationControlled']
- )
- self.context = await AsyncNewContext(self.browser, fingerprint=profile)
- await self.context.add_init_script(f"""
- Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
- Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
- Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
- """)
- await self.context.add_init_script(spoof_script)
+ try:
+ profile = self.engine._select_profile()
+ renderer = random.choice(self.engine.common_renderers[self.engine.os])
+ vendor = random.choice(self.engine.common_vendors)
+ spoof_script = self.engine._get_spoof_script(renderer, vendor)
+
+ self.pw = await async_playwright().start()
+ self.browser = await self.pw.chromium.launch(
+ headless=True,
+ args=[
+ '--disable-blink-features=AutomationControlled',
+ '--no-sandbox',
+ '--disable-dev-shm-usage',
+ '--disable-gpu'
+ ]
+ )
+ logger.info("✅ Browser launched (will reuse for all jobs)")
+ except Exception as e:
+ logger.error(f"💥 Failed to launch browser: {e}")
+ raise
+
+ async def create_fresh_context(self):
+ if self.browser is None:
+ await self.init_browser()
+
+ try:
+ await self.browser.new_page()
+ except Exception:
+ logger.warning("Browser appears dead. Reinitializing...")
+ await self.close_browser()
+ await self.init_browser()
+
+ profile = self.engine._select_profile()
+ context = await AsyncNewContext(self.browser, fingerprint=profile)
+ await context.add_init_script(f"""
+ Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
+ Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
+ Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
+ """)
+ spoof_script = self.engine._get_spoof_script(
+ random.choice(self.engine.common_renderers[self.engine.os]),
+ random.choice(self.engine.common_vendors)
+ )
+ await context.add_init_script(spoof_script)
+ return context
async def close_browser(self):
if self.browser:
- await self.browser.close()
+ try:
+ await self.browser.close()
+ except:
+ pass
self.browser = None
+ if self.pw:
+ try:
+ await self.pw.stop()
+ except:
+ pass
+ self.pw = None
async def _safe_inner_text(self, element):
if not element:
@@ -95,47 +232,50 @@ class AshbyJobScraper:
async def _extract_page_content_for_llm(self, page) -> str:
speed = self.engine.optimization_params.get("base_delay", 2.0)
await asyncio.sleep(2 * (speed / 2))
- await self.engine._human_like_scroll(page)
+ if "lever.co" not in page.url:
+ await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * (speed / 2))
return await page.content()
async def _is_job_seen(self, job_id: str) -> bool:
- return self.redis_client.get(f"seen_job:{job_id}") is not None
+ return self.redis_manager.is_job_seen(job_id)
async def _mark_job_seen(self, job_id: str):
- self.redis_client.setex(f"seen_job:{job_id}", 7 * 24 * 3600, "1")
+ self.redis_manager.mark_job_seen(job_id)
async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]:
- cached = self.redis_client.get(f"llm_cache:{job_url}")
- if cached:
- return json.loads(cached)
- return None
+ return self.redis_manager.get_cached_llm_result(job_url)
async def _cache_llm_result(self, job_url: str, result: Dict):
- self.redis_client.setex(f"llm_cache:{job_url}", 7 * 24 * 3600, json.dumps(result))
+ self.redis_manager.cache_llm_result(job_url, result)
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
- try:
- job_data = {
- "job_url": job_url,
- "job_id": job_id,
- "error_type": error_type,
- "timestamp": datetime.now().isoformat()
- }
- self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data))
- logger.info(f"📦 Added failed job to Redis cache: {job_id} (Error: {error_type})")
- except Exception as e:
- logger.error(f"❌ Failed to add to Redis: {str(e)}")
+ logger.info(f" 📦 Adding failed job to Redis cache: {job_id} (Error: {error_type})")
+ self.redis_manager.add_job_to_error_cache(job_url, job_id, error_type)
- @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
+ def _get_platform(self, url: str) -> str:
+ if "ashbyhq.com" in url:
+ return "ashby"
+ elif "lever.co" in url:
+ return "lever"
+ elif "greenhouse.io" in url:
+ return "greenhouse"
+ else:
+ return "unknown"
+
+ @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def scrape_job(
self,
job_url: str,
company_name: str,
message_id: str
):
+ platform = self._get_platform(job_url)
+ if platform == "unknown":
+ logger.info(f"⏭️ Skipping unsupported platform: {job_url}")
+ return True
+
job_id = job_url.strip("/").split("/")[-1]
-
if await self._is_job_seen(job_id):
logger.info(f"⏭️ Skipping already processed job: {job_id}")
return True
@@ -147,47 +287,70 @@ class AshbyJobScraper:
await self._mark_job_seen(job_id)
return True
+ context = None
page = None
start_time = time.time()
try:
- await self.init_browser()
- page = await self.context.new_page()
-
- # Fetch with timeout from engine config
+ context = await self.create_fresh_context()
+ page = await context.new_page()
+
timeout_ms = self.engine.optimization_params.get("request_timeout", 120000)
- temp_fetcher = StealthyFetcher(self.engine, self.browser, self.context)
+ temp_fetcher = StealthyFetcher(self.engine, self.browser, context)
+
+ fetch_timeout = 60000 if platform == "lever" else timeout_ms
job_page = await asyncio.wait_for(
- temp_fetcher.fetch_url(job_url, wait_for_selector="h1"),
- timeout=timeout_ms / 1000.0
+ temp_fetcher.fetch_url(job_url, wait_for_selector="h1", timeout=fetch_timeout),
+ timeout=fetch_timeout / 1000.0
)
-
- if not job_page:
- await self._add_job_to_redis_cache(job_url, job_id, "fetch_failure")
- self.engine.report_outcome("fetch_failure", url=job_url)
+
+ # Check if job still exists (minimal content validation)
+ page_content = await job_page.content()
+ if len(page_content.strip()) < 500: # Arbitrary threshold for "page exists"
+ logger.error(f"❌ Job no longer exists (empty/deleted): {job_url}")
+ await self._add_job_to_redis_cache(job_url, job_id, "job_not_found")
+ self.engine.report_outcome("job_not_found", url=job_url)
return False
- # Handle Cloudflare if detected
- if await self.engine._detect_cloudflare(job_page):
- success = await self.engine._handle_cloudflare(job_page)
- if not success:
- await self._add_job_to_redis_cache(job_url, job_id, "cloudflare")
- self.engine.report_outcome("cloudflare", url=job_url)
+ if platform == "ashby":
+ try:
+ await job_page.wait_for_selector("div[class*='job-posting'], article, main", timeout=60000)
+ except Exception:
+ logger.warning(f"⚠️ Ashby page didn't load properly: {job_url}")
return False
+ elif platform == "lever":
+ pass
+ elif platform == "greenhouse":
+ try:
+ await job_page.wait_for_selector("div.job-desc, section", timeout=60000)
+ except Exception:
+ pass
- apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')")
- apply_type = 'signup'
- if apply_btn:
- await self._human_click(job_page, apply_btn)
- speed = self.engine.optimization_params.get("base_delay", 2.0)
- await asyncio.sleep(2 * (speed / 2))
- form = await job_page.query_selector("form, div[class*='application-form']")
- if form:
- apply_type = 'AI'
+ # 🔑 APPLY TYPE LOGIC
+ if platform in ["ashby", "lever", "greenhouse"]:
+ apply_type = 'AI' # Always AI for these platforms
+ else:
+ # For other platforms: check if form is accessible without login
+ apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')")
+ apply_type = 'signup' # default
+ if apply_btn:
+ await self._human_click(job_page, apply_btn)
+ speed = self.engine.optimization_params.get("base_delay", 2.0)
+ await asyncio.sleep(2 * (speed / 2))
+ form = await job_page.query_selector("form, div[class*='application-form']")
+ if form:
+ # Check for login prompts in form
+ login_indicators = await job_page.query_selector("input[type='email'], input[type='password'], text='sign in', text='log in'")
+ if not login_indicators:
+ apply_type = 'AI'
+ else:
+ apply_type = 'signup'
+ else:
+ apply_type = 'signup'
final_url = job_url
page_content = await self._extract_page_content_for_llm(job_page)
posted_date = datetime.now().strftime("%m/%d/%y")
-
+
raw_data = {
"page_content": page_content,
"url": final_url,
@@ -196,8 +359,7 @@ class AshbyJobScraper:
"posted_date": posted_date
}
- # LLM call with timeout
- llm_timeout = max(30, self.engine.feedback.get("avg_response_time", 10) * 2)
+ llm_timeout = max(60, self.engine.feedback.get("avg_response_time", 10) * 2)
refined_data = await asyncio.wait_for(
self.llm_agent.refine_job_data(raw_data, self.user_request),
timeout=llm_timeout
@@ -214,20 +376,23 @@ class AshbyJobScraper:
refined_data[field] = final_url
elif field == 'company_name':
refined_data[field] = company_name
-
- refined_data['apply_type'] = apply_type
- refined_data['scraped_at'] = datetime.now().isoformat()
- refined_data['category'] = company_name
- refined_data['posted_date'] = posted_date
- refined_data['message_id'] = message_id
-
+
+ refined_data.update({
+ 'apply_type': apply_type,
+ 'scraped_at': datetime.now().isoformat(),
+ 'category': company_name,
+ 'posted_date': posted_date,
+ 'message_id': message_id,
+ 'platform': platform
+ })
+
await self.llm_agent.save_job_data(refined_data, company_name)
await self._cache_llm_result(job_url, refined_data)
await self._mark_job_seen(job_id)
-
+
response_time = time.time() - start_time
self.engine.report_outcome("success", url=final_url, response_time=response_time)
- logger.info(f"✅ Scraped: {refined_data['title'][:50]}...")
+ logger.info(f"✅ Scraped ({platform}): {refined_data['title'][:50]}... (Apply Type: {apply_type})")
success = True
else:
logger.warning(f"🟡 LLM failed to refine: {final_url}")
@@ -237,18 +402,32 @@ class AshbyJobScraper:
return success
except asyncio.TimeoutError:
- logger.error(f"⏰ Timeout processing job: {job_url}")
+ logger.error(f"⏰ Timeout processing job ({platform}): {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "timeout")
self.engine.report_outcome("timeout", url=job_url)
return False
except Exception as e:
- logger.error(f"💥 Error processing job {job_url}: {str(e)}")
- await self._add_job_to_redis_cache(job_url, job_id, "exception")
- self.engine.report_outcome("exception", url=job_url)
+ error_msg = str(e)
+ if "NoneType" in error_msg or "disconnected" in error_msg or "Browser" in error_msg:
+ logger.warning("Browser connection lost. Forcing reinitialization.")
+ await self.close_browser()
+
+ # 🔍 Distinguish job-not-found vs other errors
+ if "page.goto: net::ERR_ABORTED" in error_msg or "page.goto: net::ERR_FAILED" in error_msg:
+ logger.error(f"❌ Job no longer exists (404/network error): {job_url}")
+ await self._add_job_to_redis_cache(job_url, job_id, "job_not_found")
+ self.engine.report_outcome("job_not_found", url=job_url)
+ else:
+ logger.error(f"💥 Error processing job ({platform}) {job_url}: {error_msg}")
+ await self._add_job_to_redis_cache(job_url, job_id, "exception")
+ self.engine.report_outcome("exception", url=job_url)
return False
finally:
- if page:
- await page.close()
+ if context:
+ try:
+ await context.close()
+ except Exception:
+ pass
# Global metrics
METRICS = {
@@ -259,23 +438,22 @@ METRICS = {
"start_time": time.time()
}
-async def process_message_async(scraper: AshbyJobScraper, ch, method, properties, body):
+
+async def process_message_async(scraper: MultiPlatformJobScraper, ch, method, properties, body):
try:
job_data = json.loads(body)
job_link = job_data['job_link']
company_name = job_data['company_name']
message_id = properties.message_id or f"msg_{int(time.time()*1000)}"
-
+
logger.info(f"📥 Processing job: {job_link} (ID: {message_id})")
-
success = await scraper.scrape_job(job_link, company_name, message_id)
-
+
METRICS["processed"] += 1
if success:
METRICS["success"] += 1
else:
METRICS["failed"] += 1
-
except json.JSONDecodeError:
logger.error("❌ Invalid JSON in message")
METRICS["failed"] += 1
@@ -285,33 +463,31 @@ async def process_message_async(scraper: AshbyJobScraper, ch, method, properties
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
-def callback_wrapper(scraper: AshbyJobScraper):
+
+def callback_wrapper(scraper: MultiPlatformJobScraper):
def callback(ch, method, properties, body):
asyncio.run(process_message_async(scraper, ch, method, properties, body))
return callback
+
def start_consumer():
- # Initialize REAL engine
engine = FingerprintScrapingEngine(
- seed="ashby_scraper",
+ seed="multiplatform_scraper",
target_os="windows",
num_variations=10
)
- scraper = AshbyJobScraper(engine)
-
- # RabbitMQ connection with retries
+ scraper = MultiPlatformJobScraper(engine)
+
connection = None
for attempt in range(5):
try:
- credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
- parameters = pika.ConnectionParameters(
- host=RABBITMQ_HOST,
- port=RABBITMQ_PORT,
- virtual_host='/',
- credentials=credentials,
- heartbeat=600,
- blocked_connection_timeout=300
- )
+ parameters = create_ssl_connection_parameters()
+
+ if RABBITMQ_SSL_ENABLED:
+ logger.info(f"Connecting to RabbitMQ over SSL at {RABBITMQ_HOST}:{RABBITMQ_PORT}")
+ else:
+ logger.info(f"Connecting to RabbitMQ at {RABBITMQ_HOST}:{RABBITMQ_PORT}")
+
connection = pika.BlockingConnection(parameters)
break
except Exception as e:
@@ -326,8 +502,8 @@ def start_consumer():
channel.queue_declare(queue='job_queue', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper))
-
- logger.info('Waiting for messages. To exit press CTRL+C')
+
+ logger.info('Waiting for messages (Ashby, Lever, Greenhouse). To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
@@ -336,5 +512,6 @@ def start_consumer():
connection.close()
asyncio.run(scraper.close_browser())
+
if __name__ == "__main__":
start_consumer()
\ No newline at end of file
diff --git a/scraping_engine.py b/scraping_engine.py
index 31451b2..b17f500 100644
--- a/scraping_engine.py
+++ b/scraping_engine.py
@@ -371,7 +371,7 @@ class FingerprintScrapingEngine:
# Reload occasionally to trigger potential client-side checks
if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2:
print("Reloading page during Cloudflare wait...")
- await page.reload(wait_until='load', timeout=80000)
+ await page.reload(wait_until='domcontentloaded', timeout=120000)
print("Timeout waiting for Cloudflare resolution.")
return False
diff --git a/sender.py b/sender.py
index 02b51ab..905939f 100644
--- a/sender.py
+++ b/sender.py
@@ -10,42 +10,109 @@ import uuid
from configparser import ConfigParser
import pika
import redis
+import ssl
+from dotenv import load_dotenv
+from datetime import datetime
+
+load_dotenv()
+
+
+class RedisManager:
+ """Manages Redis connection and operations for job deduplication."""
+
+ def __init__(self):
+ self.redis_host = os.getenv('REDIS_HOST', 'redis-scrape.thejobhub.xyz')
+ self.redis_port = int(os.getenv('REDIS_PORT', '6380'))
+ self.redis_password = os.getenv('REDIS_PASSWORD')
+ self.redis_ssl_enabled = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true'
+ self.redis_client = None
+ self._connect()
+
+ def _connect(self):
+ if not self.redis_password:
+ print("Warning: REDIS_PASSWORD not found in environment.")
+
+ try:
+ self.redis_client = redis.Redis(
+ host=self.redis_host,
+ port=self.redis_port,
+ password=self.redis_password,
+ ssl=self.redis_ssl_enabled,
+ ssl_cert_reqs=None,
+ socket_connect_timeout=10,
+ socket_timeout=30,
+ decode_responses=True
+ )
+ response = self.redis_client.ping()
+ print(f"Connected to Redis at {self.redis_host}:{self.redis_port}! Response: {response}")
+ except Exception as e:
+ print(f"Failed to connect to Redis: {e}")
+ self.redis_client = None
+
+ def is_job_seen(self, job_url):
+ if not self.redis_client:
+ return False
+ try:
+ return bool(self.redis_client.exists(f"sent_job:{job_url}"))
+ except Exception:
+ return False
+
+ def mark_job_sent(self, job_url):
+ if not self.redis_client:
+ return
+ try:
+ self.redis_client.setex(f"sent_job:{job_url}", 7 * 24 * 3600, "1")
+ except Exception:
+ pass
+
class Sender:
def __init__(self, config_file='config.ini'):
self.config = ConfigParser()
self.config.read(config_file)
-
- # RabbitMQ from env vars with fallbacks
+
self.rabbitmq_host = os.getenv("RABBITMQ_HOST")
- self.rabbitmq_port = os.getenv("RABBITMQ_PORT")
+ self.rabbitmq_port = int(os.getenv("RABBITMQ_PORT") or 5672)
self.username = os.getenv("RABBITMQ_USER")
self.password = os.getenv("RABBITMQ_PASS")
self.queue_name = self.config.get('rabbitmq', 'queue_name', fallback='job_queue')
self.directory = self.config.get('files', 'directory', fallback=os.path.join(os.path.expanduser("~"), "jobs", "csv"))
-
- # Cross-platform log path: use user's home directory
+
default_log_dir = os.path.join(os.path.expanduser("~"), ".web_scraping_project", "logs")
os.makedirs(default_log_dir, exist_ok=True)
default_log_file = os.path.join(default_log_dir, "sender.log")
self.log_file = self.config.get('logging', 'log_file', fallback=default_log_file)
-
+
self.virtual_host = self.config.get('rabbitmq', 'virtual_hash', fallback='/')
self.batch_size = 500
- self.retry_attempts = 5 # Increased for robustness
+ self.retry_attempts = 5
self.retry_sleep = 2
- self.check_interval = 30 # More frequent polling
+ self.check_interval = 30
- # Redis for deduplication
- redis_host = os.getenv("REDIS_HOST")
- redis_port = os.getenv("REDIS_PORT")
- self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=1, decode_responses=True)
+ self.use_ssl = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true"
+ if self.rabbitmq_port is None:
+ self.rabbitmq_port = "5671" if self.use_ssl else "5672"
+ else:
+ self.rabbitmq_port = int(self.rabbitmq_port)
+
+ self.redis_manager = RedisManager()
- # Ensure log directory exists before configuring logging
log_dir = os.path.dirname(self.log_file)
os.makedirs(log_dir, exist_ok=True)
- logging.basicConfig(filename=self.log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- self.logger = logging.getLogger(__name__)
+
+ self.logger = logging.getLogger('sender')
+ self.logger.setLevel(logging.INFO)
+ self.logger.handlers.clear()
+
+ file_handler = logging.FileHandler(self.log_file, encoding='utf-8')
+ file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
+ file_handler.setFormatter(file_formatter)
+ self.logger.addHandler(file_handler)
+
+ console_handler = logging.StreamHandler(sys.stdout)
+ console_formatter = logging.Formatter('%(levelname)s: %(message)s')
+ console_handler.setFormatter(console_formatter)
+ self.logger.addHandler(console_handler)
self.connection = None
self.channel = None
@@ -54,21 +121,54 @@ class Sender:
signal.signal(signal.SIGTERM, self.graceful_shutdown)
signal.signal(signal.SIGINT, self.graceful_shutdown)
+ def _create_ssl_options(self):
+ if not self.use_ssl:
+ return None
+ context = ssl.create_default_context()
+ verify_ssl = os.getenv("RABBITMQ_SSL_VERIFY", "false").lower() == "true"
+ if verify_ssl:
+ context.check_hostname = True
+ context.verify_mode = ssl.CERT_REQUIRED
+ else:
+ context.check_hostname = False
+ context.verify_mode = ssl.CERT_NONE
+ return pika.SSLOptions(context, self.rabbitmq_host)
+
def connect(self):
try:
+ if not self.rabbitmq_host:
+ self.logger.error("RABBITMQ_HOST environment variable is not set")
+ return False
+ if not self.username:
+ self.logger.error("RABBITMQ_USER environment variable is not set")
+ return False
+ if not self.password:
+ self.logger.error("RABBITMQ_PASS environment variable is not set")
+ return False
+
+ self.logger.info(f"Attempting to connect with host={self.rabbitmq_host}, port={self.rabbitmq_port}, user={self.username}")
+
credentials = pika.PlainCredentials(self.username, self.password)
- parameters = pika.ConnectionParameters(
- host=self.rabbitmq_host,
- port=self.rabbitmq_port,
- virtual_host=self.virtual_host,
- credentials=credentials,
- heartbeat=600,
- blocked_connection_timeout=300
- )
+ params = {
+ 'host': self.rabbitmq_host,
+ 'port': self.rabbitmq_port,
+ 'virtual_host': self.virtual_host,
+ 'credentials': credentials,
+ 'heartbeat': 600,
+ 'blocked_connection_timeout': 300
+ }
+
+ if self.use_ssl:
+ params['ssl_options'] = self._create_ssl_options()
+ self.logger.info(f"Connecting to RabbitMQ over SSL at {self.rabbitmq_host}:{self.rabbitmq_port}")
+ else:
+ self.logger.info(f"Connecting to RabbitMQ at {self.rabbitmq_host}:{self.rabbitmq_port}")
+
+ parameters = pika.ConnectionParameters(**params)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue_name, durable=True)
- self.logger.info("Connected to RabbitMQ")
+ self.logger.info("Connected to RabbitMQ successfully")
return True
except Exception as e:
self.logger.error(f"Failed to connect to RabbitMQ: {str(e)}")
@@ -96,95 +196,154 @@ class Sender:
except Exception as e:
self.logger.error(f"Failed to send message (attempt {attempt+1}): {str(e)}")
if attempt < self.retry_attempts - 1:
- time.sleep(self.retry_sleep * (2 ** attempt)) # Exponential backoff
+ time.sleep(self.retry_sleep * (2 ** attempt))
if not self.reconnect():
return False
return False
def is_job_seen(self, job_url):
- """Check if job was sent recently (7 days)"""
- return self.redis_client.get(f"sent_job:{job_url}") is not None
+ return self.redis_manager.is_job_seen(job_url)
def mark_job_sent(self, job_url):
- """Mark job as sent with 7-day TTL"""
- self.redis_client.setex(f"sent_job:{job_url}", 7 * 24 * 3600, "1")
+ self.redis_manager.mark_job_sent(job_url)
def process_csv(self, file_path):
try:
- with open(file_path, 'r') as csvfile:
+ with open(file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
- batch = []
sent_count = 0
skipped_count = 0
-
- for row in reader:
- # Validate required fields
- if 'job_link' not in row or 'company_name' not in row:
- self.logger.warning(f"Skipping invalid row in {file_path}: {row}")
- continue
-
- job_link = row['job_link'].strip()
- company_name = row['company_name'].strip()
-
- if not job_link or not company_name:
- self.logger.warning(f"Skipping empty row in {file_path}")
- continue
-
- # Deduplication
- if self.is_job_seen(job_link):
+
+ self.logger.info(f"CSV headers found: {reader.fieldnames}")
+
+ for row_num, row in enumerate(reader, start=1):
+ # ✅ IMMEDIATE EXIT CHECK
+ if not self.running:
+ self.logger.info("Shutdown requested during CSV processing. Exiting...")
+ return sent_count
+
+ if 'url' not in row or 'company' not in row:
+ self.logger.warning(f"Skipping row {row_num}: missing 'url' or 'company' field. Row: {row}")
skipped_count += 1
continue
-
- job_data = {
- 'job_link': job_link,
- 'company_name': company_name
- }
-
+
+ url = row['url'].strip()
+ company = row['company'].strip()
+
+ if not url:
+ self.logger.warning(f"Skipping row {row_num}: empty URL. Company: '{company}'")
+ skipped_count += 1
+ continue
+
+ if not company:
+ self.logger.warning(f"Skipping row {row_num}: empty company. URL: {url}")
+ skipped_count += 1
+ continue
+
+ if not url.startswith(('http://', 'https://')):
+ self.logger.warning(f"Skipping row {row_num}: invalid URL format. URL: {url}")
+ skipped_count += 1
+ continue
+
+ if self.is_job_seen(url):
+ self.logger.info(f"Skipping row {row_num}: job already sent (deduplicated). URL: {url}")
+ skipped_count += 1
+ continue
+
+ job_data = {'job_link': url, 'company_name': company}
message_id = str(uuid.uuid4())
message = json.dumps(job_data)
-
+
if self.send_message(message, message_id):
sent_count += 1
- self.mark_job_sent(job_link)
+ self.mark_job_sent(url)
else:
- self.logger.error(f"Failed to send job: {job_link}")
-
+ self.logger.error(f"Failed to send job (row {row_num}): {url}")
+ skipped_count += 1
+
if (sent_count + skipped_count) % 100 == 0:
self.logger.info(f"Progress: {sent_count} sent, {skipped_count} skipped from {file_path}")
-
+
self.logger.info(f"Completed {file_path}: {sent_count} sent, {skipped_count} skipped")
- os.rename(file_path, file_path + '.processed')
- self.logger.info(f"Processed and renamed {file_path} to {file_path}.processed")
+
+ try:
+ os.rename(file_path, file_path + '.processed')
+ self.logger.info(f"Processed and renamed {file_path} to {file_path}.processed")
+ except Exception as rename_error:
+ self.logger.error(f"Failed to rename {file_path}: {str(rename_error)}")
+ marker_file = file_path + '.processed_marker'
+ with open(marker_file, 'w') as f:
+ f.write(f"Processed at {datetime.now().isoformat()}")
+ self.logger.info(f"Created marker file: {marker_file}")
+
return sent_count
except Exception as e:
self.logger.error(f"Error processing {file_path}: {str(e)}")
return 0
def find_new_csvs(self):
+ if not self.running: # ✅ IMMEDIATE EXIT CHECK
+ return []
+ if not os.path.exists(self.directory):
+ return []
files = [f for f in os.listdir(self.directory) if f.endswith('.csv') and not f.endswith('.processed')]
files.sort()
return [os.path.join(self.directory, f) for f in files]
def run(self):
if not self.connect():
+ self.logger.error("RabbitMQ connection failed, exiting")
sys.exit(1)
- while self.running:
- new_files = self.find_new_csvs()
- if new_files:
- for file_path in new_files:
- self.logger.info(f"Processing {file_path}")
- sent = self.process_csv(file_path)
- self.logger.info(f"Sent {sent} jobs from {file_path}")
- else:
- self.logger.info("No new CSV files found")
- time.sleep(self.check_interval)
- if self.connection and self.connection.is_open:
- self.connection.close()
+
+ try:
+ while self.running:
+ new_files = self.find_new_csvs()
+ if new_files:
+ for file_path in new_files:
+ if not self.running: # ✅ IMMEDIATE EXIT CHECK
+ break
+ self.logger.info(f"Processing {file_path}")
+ sent = self.process_csv(file_path)
+ self.logger.info(f"Sent {sent} jobs from {file_path}")
+ else:
+ self.logger.info("No new CSV files found")
+
+ # Replace blocking sleep with interruptible sleep
+ for _ in range(self.check_interval):
+ if not self.running:
+ break
+ time.sleep(1)
+ except KeyboardInterrupt:
+ # This should not normally be reached due to signal handler, but added for safety
+ pass
+ finally:
+ if self.connection and self.connection.is_open:
+ self.logger.info("Closing RabbitMQ connection...")
+ self.connection.close()
def graceful_shutdown(self, signum, frame):
- self.logger.info("Received shutdown signal")
- self.running = False
+ self.logger.info("Received shutdown signal. Initiating graceful shutdown...")
+ self.running = False # This will break all loops
+
if __name__ == '__main__':
+ required_vars = ['RABBITMQ_HOST', 'RABBITMQ_PORT', 'RABBITMQ_USER', 'RABBITMQ_PASS']
+ missing_vars = [var for var in required_vars if not os.getenv(var)]
+
+ if missing_vars:
+ print(f"Missing environment variables: {missing_vars}")
+ print("Check your .env file and ensure load_dotenv() is working")
+ sys.exit(1)
+
sender = Sender()
- sender.run()
\ No newline at end of file
+ print(f"Using directory: {sender.directory}")
+ print(f"Directory exists: {os.path.exists(sender.directory)}")
+ if os.path.exists(sender.directory):
+ print(f"Files: {os.listdir(sender.directory)}")
+
+ try:
+ sender.run()
+ except KeyboardInterrupt:
+ # Fallback in case signal handler doesn't catch it
+ sender.logger.info("KeyboardInterrupt caught in main. Exiting.")
+ sys.exit(0)
\ No newline at end of file
diff --git a/ssl_connection.py b/ssl_connection.py
new file mode 100644
index 0000000..2394e3b
--- /dev/null
+++ b/ssl_connection.py
@@ -0,0 +1,80 @@
+
+import pika
+import ssl
+import os
+from dotenv import load_dotenv
+
+# Load environment variables
+load_dotenv()
+
+
+def create_ssl_connection_parameters():
+ """
+ Create and return RabbitMQ connection parameters with SSL configuration.
+ This function handles both SSL and non-SSL connections based on environment variables.
+ """
+ # Load environment variables with fallbacks
+ rabbitmq_host = os.getenv('RABBITMQ_HOST')
+ rabbitmq_port = int(os.getenv('RABBITMQ_PORT', '5671'))
+ rabbitmq_user = os.getenv('RABBITMQ_USER')
+ rabbitmq_pass = os.getenv('RABBITMQ_PASS', 'ofure-scrape')
+ rabbitmq_ssl_enabled = os.getenv('RABBITMQ_SSL_ENABLED', 'true').lower() == 'true'
+ rabbitmq_ssl_verify = os.getenv('RABBITMQ_SSL_VERIFY', 'false').lower() == 'true'
+
+ # Validate credentials
+ if not rabbitmq_pass or rabbitmq_pass == 'YOUR_STRONG_PASSWORD':
+ print("Warning: Using placeholder or empty password. Please check .env file.")
+
+ credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_pass)
+
+ if rabbitmq_ssl_enabled:
+ # SSL Context
+ context = ssl.create_default_context()
+ context.check_hostname = rabbitmq_ssl_verify
+ context.verify_mode = ssl.CERT_REQUIRED if rabbitmq_ssl_verify else ssl.CERT_NONE
+
+ ssl_options = pika.SSLOptions(context, rabbitmq_host)
+ params = pika.ConnectionParameters(
+ host=rabbitmq_host,
+ port=rabbitmq_port,
+ credentials=credentials,
+ ssl_options=ssl_options,
+ heartbeat=600,
+ blocked_connection_timeout=300,
+ virtual_host='/'
+ )
+ else:
+ # Non-SSL connection
+ params = pika.ConnectionParameters(
+ host=rabbitmq_host,
+ port=rabbitmq_port if rabbitmq_port != 5671 else 5672, # Default non-SSL port
+ credentials=credentials,
+ heartbeat=600,
+ blocked_connection_timeout=300,
+ virtual_host='/'
+ )
+
+ return params
+
+
+def test_connection():
+ """
+ Test function to verify RabbitMQ connection (original functionality preserved).
+ """
+ try:
+ params = create_ssl_connection_parameters()
+ connection = pika.BlockingConnection(params)
+ channel = connection.channel()
+ print("Connected to Secure RabbitMQ!")
+ connection.close()
+ return True
+ except Exception as e:
+ import traceback
+ print(f"Failed to connect: {e!r}")
+ traceback.print_exc()
+ return False
+
+
+# Keep the original test functionality when run directly
+if __name__ == "__main__":
+ test_connection()
\ No newline at end of file