Compare commits

...

35 Commits

Author SHA1 Message Date
5901e9c1a1 Refactor scraper.py to improve code readability by removing unnecessary blank lines and ensuring consistent formatting. 2025-12-15 14:18:52 +01:00
5939f2bd04 Refactor RedisManager methods to enhance job caching and error handling; implement sent and error cache management for improved job processing flow. 2025-12-15 14:14:16 +01:00
e2e1bc442e Refactor RedisManager methods for improved error handling and logging; streamline job validation process by ensuring all compulsory fields are checked before processing. 2025-12-15 10:34:41 +01:00
87c67265f8 Refactor environment variable handling in scraper; remove default values for RabbitMQ and Redis configurations. Enhance job validation by checking for all compulsory fields before processing. 2025-12-15 09:37:52 +01:00
2c5b42b7bd Refactor job tracking to use job ID instead of job URL in RedisManager methods 2025-12-15 09:08:27 +01:00
b13d14d26d Enhance job handling in scraper and sender modules:
- Update fetch timeout in StealthyFetcher for improved reliability.
- Refactor LLMJobRefiner to create and manage Quelah Jobs table in PostgreSQL.
- Modify RedisManager to track sent job counts for jobs.csv and adjust deduplication logic.
- Implement job URL-based deduplication across scraper and sender.
2025-12-12 21:14:37 +01:00
c370de83d5 Refactor scraper and sender modules for improved Redis management and SSL connection handling
- Introduced RedisManager class in scraper.py for centralized Redis operations including job tracking and caching.
- Enhanced job scraping logic in MultiPlatformJobScraper to support multiple platforms (Ashby, Lever, Greenhouse).
- Updated browser initialization and context management to ensure better resource handling.
- Improved error handling and logging throughout the scraping process.
- Added SSL connection parameters management in a new ssl_connection.py module for RabbitMQ connections.
- Refactored sender.py to utilize RedisManager for job deduplication and improved logging mechanisms.
- Enhanced CSV processing logic in sender.py with better validation and error handling.
- Updated connection parameters for RabbitMQ to support SSL configurations based on environment variables.
2025-12-12 13:48:26 +01:00
0c447d0f77 Merge branch 'Ashby_agent' of https://gitea.thejobhub.xyz/Ofure/Web_scraping_project into Ashby_agent 2025-12-10 13:26:54 +01:00
94d87943de Refactor environment variable handling in AshbyJobScraper and Sender classes; remove fallback values for RabbitMQ and Redis configurations. 2025-12-10 13:26:47 +01:00
c0c7925be3 Delete amazon_main.py 2025-12-10 11:07:39 +00:00
20408dd5a6 Delete amazon_job_scraper.py 2025-12-10 11:07:15 +00:00
762846cb4a Add AshbyJobScraper and Sender classes for job scraping and message sending; implement Redis caching and RabbitMQ integration. 2025-12-10 12:02:43 +01:00
2d22fbdb92 Enhance AmazonJobScraper to support flexible location matching and extract posted dates; refine LLMJobRefiner prompts for better data extraction. 2025-12-09 12:00:57 +01:00
e216db35f9 Increase max pages to scrape and extend wait time between job title scrapes; add posted date to job data extraction 2025-12-09 09:30:44 +01:00
cbcffa8cd4 modify to queue failed jobs and also extract date of job posting 2025-12-09 09:12:35 +01:00
4782f174e2 Delete browser_sessions/job_scraping_12_session.json 2025-12-05 17:49:56 +00:00
10fa1ac633 Delete browser_sessions/job_scraping_123_session.json 2025-12-05 17:49:46 +00:00
ba783112f5 Delete spoof_config.json 2025-12-05 17:49:30 +00:00
9ed5641540 Delete tr.py 2025-12-05 16:50:52 +00:00
370fce0514 Merge branch 'amazon_agent' of https://gitea.thejobhub.xyz/Ofure/Web_scraping_project into amazon_agent 2025-12-05 17:50:10 +01:00
efa47d50ae amazon specific built engine 2025-12-05 17:49:31 +01:00
e49860faae Delete linkedin_main.py 2025-12-05 16:45:12 +00:00
0942339426 Delete job_scraper2.py 2025-12-05 16:44:52 +00:00
7e80801f89 Delete job_scraper.py 2025-12-05 16:44:23 +00:00
06f9820c38 Delete feedback_job_scraping_123.json 2025-12-05 16:44:08 +00:00
fbde4d03e1 Delete feedback_job_scraping_12.json 2025-12-05 16:43:42 +00:00
d0aabc5970 Delete .env 2025-12-05 16:43:25 +00:00
672c6a0333 scraper for amazon 2025-12-05 17:25:54 +01:00
224b9c3122 llm_agent now responsible for extraction. 2025-12-05 17:23:31 +01:00
160efadbfb modifications to work with postgre and use llm to extract and refine data 2025-12-05 17:00:43 +01:00
4f78a845ae refactor(llm_agent): switch from XAI to DeepSeek API and simplify job refinement
- Replace XAI/Grok integration with DeepSeek's OpenAI-compatible API
- Remove schema generation and caching logic
- Simplify prompt structure and response parsing
- Standardize database schema and markdown output format
- Update config to use DEEPSEEK_API_KEY instead of XAI_API_KEY
- Change default search keyword in linkedin_main.py
2025-12-01 10:25:37 +01:00
d7d92ba8bb fix(job_scraper): increase timeout values for page navigation
The previous timeout values were too short for slower network conditions, causing premature timeouts during job scraping. Increased wait_for_function timeout from 30s to 80s and load_state timeout from 30s to 60s to accommodate slower page loads.
2025-11-27 12:28:21 +01:00
d025828036 feat: update LLM model and increase content size limit
refactor: update timeout values in job scraper classes

feat: add spoof config for renderers and vendors

build: update pycache files for config and modules
2025-11-24 13:47:47 +01:00
fd4e8c9c05 feat(scraper): add LLM-powered job data refinement and new scraping logic
- Implement LLMJobRefiner class for processing job data with Gemini API
- Add new job_scraper2.py with enhanced scraping capabilities
- Remove search_keywords parameter from scraping engine
- Add environment variable loading in config.py
- Update main script to use new scraper and target field
2025-11-24 12:25:50 +01:00
7dca4c9159 refactor(job_scraper): improve page loading and typing in linkedin scraper
- Change page load strategy from 'load' to 'domcontentloaded' and 'networkidle' for better performance
- Make search_keywords parameter optional to handle empty searches
- Update type imports to include List for better type hints
- Set headless mode to true by default for production use
2025-11-23 09:27:05 +01:00
17 changed files with 2183 additions and 554 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

9
config.ini Normal file
View File

@ -0,0 +1,9 @@
[rabbitmq]
queue_name = job_queue
[files]
directory = C:\Users\OfuRich\jobs\csv
[logging]
log_file = C:\Users\OfuRich\Documents\ai jobhub\Web_scraping_project\logs\sender.log

View File

@ -2,6 +2,17 @@
import os
import json
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
directory = "C:/Users/OfuRich/Downloads"
# LLM Agent Configuration
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
if not DEEPSEEK_API_KEY:
raise ValueError("DEEPSEEK_API_KEY environment variable not set in .env file")
def load_spoof_config():
"""Load spoof data from JSON config file. Falls back to defaults if missing."""

97
fetcher.py Normal file
View File

@ -0,0 +1,97 @@
import asyncio
import random
import time
from playwright.async_api import Page, BrowserContext, Browser
from typing import Optional
from scraping_engine import FingerprintScrapingEngine
class StealthyFetcher:
def __init__(self, engine: FingerprintScrapingEngine, browser: Browser, context: BrowserContext):
self.engine = engine
self.browser = browser
self.context = context
self.max_retries = 5
self.base_delay = 5
async def fetch_url(self, url: str, wait_for_selector: Optional[str] = None, timeout=300000) -> Optional[Page]:
"""
Fetch URL using the provided context (caller handles page creation)
"""
page = None
try:
page = await self.context.new_page()
# Use networkidle for all platforms - works reliably for Ashby, Lever, and Greenhouse
await page.goto(url, wait_until='domcontentloaded', timeout=min(timeout, 120000))
# Skip human behavior for Lever (already loads fully without it)
if "lever.co" not in url:
await self._apply_human_behavior(page)
protection_type = await self._detect_protection(page)
if protection_type:
content_accessible = await self._is_content_accessible(page)
if not content_accessible:
handled = False
if protection_type == "cloudflare":
handled = await self._handle_cloudflare(page)
elif protection_type == "captcha":
handled = await self._handle_captcha(page)
if not handled:
return None
return page
except Exception as e:
try:
if page:
await page.close()
except Exception:
pass
raise
async def _apply_human_behavior(self, page: Page):
await self.engine._human_like_scroll(page)
await asyncio.sleep(random.uniform(1, 3))
await self.engine._simulate_human_interaction(page)
await asyncio.sleep(random.uniform(1, 2))
async def _detect_protection(self, page: Page) -> Optional[str]:
content = (await page.content()).lower()
if ("#cf-chl" in content or "checking your browser" in content or
"just a moment" in content or "cloudflare" in content or
"ddos protection" in content or "turnstile" in content):
return "cloudflare"
elif "captcha" in content or "robot" in content or "verify you're human" in content:
return "captcha"
return None
async def _is_content_accessible(self, page: Page, wait_for_selector: Optional[str] = None) -> bool:
try:
await page.wait_for_selector("body", timeout=120000)
body_text = await page.eval_on_selector("body", "el => el.innerText.toLowerCase()")
if len(body_text.strip()) < 100:
return False
job_keywords = ['job', 'role', 'apply', 'responsibilities', 'requirements', 'qualifications']
return any(word in body_text for word in job_keywords)
except:
return False
async def _handle_cloudflare(self, page: Page) -> bool:
max_wait_time = 60
start_time = time.time()
while time.time() - start_time < max_wait_time:
if not await self._detect_protection(page):
return True
await self._apply_human_behavior(page)
wait_time = min(10, 2 + random.uniform(1, 3) + (time.time() - start_time) * 0.1)
await asyncio.sleep(wait_time)
if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2:
try:
await page.reload(wait_until='domcontentloaded', timeout=120000)
except Exception:
pass
return False
async def _handle_captcha(self, page: Page) -> bool:
return False # Avoid strategy

View File

@ -1,491 +0,0 @@
import asyncio
import random
import sqlite3
import os
from datetime import datetime
from typing import Optional, Dict
from playwright.async_api import async_playwright
from browserforge.injectors.playwright import AsyncNewContext
class LinkedInJobScraper:
def __init__(
self,
engine,
db_path: str = "linkedin_jobs.db",
human_speed: float = 1.0
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self._init_db()
def _init_db(self):
os.makedirs(os.path.dirname(self.db_path) if os.path.dirname(self.db_path) else ".", exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT,
title TEXT,
company TEXT,
location TEXT,
salary TEXT,
description TEXT,
url TEXT UNIQUE,
workplace_type TEXT,
scraped_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
return True
except:
return False
async def _login(self, page, credentials: Dict) -> bool:
"""Human-realistic LinkedIn login"""
print("🔐 Navigating to LinkedIn login page...")
await page.goto("https://www.linkedin.com/login", timeout=60000)
await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed)
email_field = await page.query_selector('input[name="session_key"]')
if not email_field:
print("❌ Email field not found.")
return False
print("✍️ Typing username...")
await email_field.click()
await asyncio.sleep(random.uniform(0.4, 0.9) * self.human_speed)
for char in credentials["email"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.06, 0.14) * self.human_speed)
await asyncio.sleep(random.uniform(1.0, 1.8) * self.human_speed)
password_field = await page.query_selector('input[name="session_password"]')
if not password_field:
print("❌ Password field not found.")
return False
print("🔒 Typing password...")
await password_field.click()
await asyncio.sleep(random.uniform(0.3, 0.7) * self.human_speed)
for char in credentials["password"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.08, 0.16) * self.human_speed)
await asyncio.sleep(random.uniform(0.8, 1.5) * self.human_speed)
print("✅ Submitting login form...")
await page.keyboard.press("Enter")
for _ in range(15):
current_url = page.url
if "/feed" in current_url or "/jobs" in current_url:
if "login" not in current_url:
print("✅ Login successful!")
await asyncio.sleep(random.uniform(2.0, 3.0) * self.human_speed)
return True
await asyncio.sleep(1)
print("❌ Login may have failed.")
return False
async def _extract_job_details(self, page) -> Dict:
"""Extract from ANY job page: LinkedIn Easy Apply OR external site"""
await asyncio.sleep(2 * self.human_speed)
async def get_text(selector: str) -> str:
try:
el = await page.query_selector(selector)
if el:
text = await el.inner_text()
return text.strip() if text else "N/A"
except:
pass
return "N/A"
title = await get_text("h1.t-24")
if title == "N/A":
title = await get_text("h1, h2")
company = await get_text("a.app-aware-link[href*='/company/']")
if company == "N/A":
company = await get_text("div.org, .company, [class*='company']")
location = await get_text("span[class*='location']")
if location == "N/A":
location = await get_text(".location, [class*='location']")
description = await get_text("div[class*='description__text']")
if description == "N/A":
description = await get_text(".job-desc, .description, main, body")
workplace = await get_text("span.job-workplace-type") or "N/A"
salary = await get_text("span.salary") or "N/A"
return {
"title": title,
"company": company,
"location": location,
"workplace_type": workplace,
"salary": salary,
"description": description,
"url": page.url
}
async def _save_to_markdown(self, job_data: Dict, keyword: str, verified: bool=True):
"""Save to appropriate folder using job ID to avoid duplication"""
folder = "linkedin_jobs" if verified else "linkedin_jobs_unverified"
os.makedirs(folder, exist_ok=True)
# Extract job ID from URL for LinkedIn jobs
url = job_data.get("url", "")
if "/jobs/view/" in url:
try:
job_id = url.split("/view/")[1].split("/")[0]
except:
job_id = "unknown"
else:
# For external jobs, use a hash of the URL (first 12 chars)
import hashlib
job_id = hashlib.md5(url.encode()).hexdigest()[:12]
clean_keyword = keyword.replace(" ", "_")
filename = f"linkedin_{clean_keyword}_job_{job_id}.md"
filepath = os.path.join(folder, filename)
# Only save if file doesn't already exist (idempotent)
if os.path.exists(filepath):
print(f" 📝 Skipping duplicate Markdown file: {filename}")
return
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"# {job_data['title']}\n\n")
f.write(f"- **Company**: {job_data['company']}\n")
f.write(f"- **Location**: {job_data['location']}\n")
f.write(f"- **Workplace**: {job_data['workplace_type']}\n")
f.write(f"- **Salary**: {job_data['salary']}\n")
f.write(f"- **URL**: <{url}>\n\n")
f.write(f"## Description\n\n{job_data['description']}\n")
async def _save_to_db(self, job_data: Dict, keyword: str):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO jobs
(keyword, title, company, location, salary, description, url, workplace_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
keyword,
job_data["title"],
job_data["company"],
job_data["location"],
job_data["salary"],
job_data["description"],
job_data["url"],
job_data["workplace_type"]
))
conn.commit()
async def scrape_jobs(
self,
search_keywords: str,
max_pages: int = 1,
credentials: Optional[Dict] = None
):
encoded_keywords = search_keywords.replace(" ", "%20")
search_url = f"https://www.linkedin.com/jobs/search/?keywords={encoded_keywords}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
session_loaded = await self.engine.load_session(context)
login_successful = False
if session_loaded:
print("🔁 Using saved session — verifying login...")
await page.goto("https://www.linkedin.com/feed/", timeout=60000)
if "feed" in page.url and "login" not in page.url:
print("✅ Session still valid.")
login_successful = True
else:
print("⚠️ Saved session expired — re-authenticating.")
session_loaded = False
if not session_loaded and credentials:
print("🔐 Performing fresh login...")
login_successful = await self._login(page, credentials)
if login_successful:
await self.engine.save_session(context)
else:
print("❌ Login failed. Exiting.")
await browser.close()
self.engine.report_outcome("block")
return
elif not credentials:
print(" No credentials — proceeding as guest.")
login_successful = True
else:
pass
await page.wait_for_load_state("load", timeout=60000)
print("✅ Post-login page fully loaded. Starting search...")
if await self.engine._detect_cloudflare(page):
print("☁️ Cloudflare detected on initial load.")
if not await self.engine._handle_cloudflare(page):
print("❌ Cloudflare could not be resolved.")
await browser.close()
self.engine.report_outcome("cloudflare")
return
print(f"🔍 Searching for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=60000)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
if await self.engine._detect_cloudflare(page):
print("☁️ Cloudflare detected on search page.")
if not await self.engine._handle_cloudflare(page):
await browser.close()
self.engine.report_outcome("cloudflare")
return
scraped_count = 0
all_job_links = []
seen_job_ids = set()
# ← NEW: Scroll once to reveal pagination (if any)
print("🔄 Scrolling to bottom to reveal pagination controls...")
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
# Check if pagination exists
pagination_exists = await page.query_selector("button[aria-label='Next']")
if pagination_exists:
print("⏭️ Pagination detected. Using page navigation.")
current_page = 1
while current_page <= max_pages:
print(f"📄 Processing page {current_page}/{max_pages}")
# Collect job links on current page
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
new_jobs = 0
for link in current_links:
href = await link.get_attribute("href")
if href:
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
if job_id and job_id not in seen_job_ids:
seen_job_ids.add(job_id)
all_job_links.append(href)
new_jobs += 1
print(f" Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})")
# Try to go to next page
if current_page < max_pages:
next_btn = await page.query_selector("button[aria-label='Next']")
if next_btn and await next_btn.is_enabled():
await self._human_click(page, next_btn)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
# Wait for URL to change or new content
try:
await page.wait_for_function("() => window.location.href.includes('start=')", timeout=30000)
except:
pass
else:
print("🔚 'Next' button not available — stopping pagination.")
break
current_page += 1
else:
print("🔄 No pagination found. Falling back to infinite scroll...")
last_height = await page.evaluate("document.body.scrollHeight")
no_new_jobs_count = 0
max_no_new = 3
while no_new_jobs_count < max_no_new:
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
new_jobs_found = 0
for link in current_links:
href = await link.get_attribute("href")
if href:
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
if job_id and job_id not in seen_job_ids:
seen_job_ids.add(job_id)
all_job_links.append(href)
new_jobs_found += 1
print(f" Found {new_jobs_found} new job(s) (total: {len(all_job_links)})")
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == last_height:
no_new_jobs_count += 1
else:
no_new_jobs_count = 0
last_height = new_height
if new_jobs_found == 0 and no_new_jobs_count >= 1:
print("🔚 No new jobs loaded. Stopping scroll.")
break
print(f"✅ Collected {len(all_job_links)} unique job links.")
# ← Rest of job processing loop unchanged
scraped_count = 0
for idx, href in enumerate(all_job_links):
try:
full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}"
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
await page.goto(full_url, wait_until='load', timeout=60000)
await asyncio.sleep(3 * self.human_speed)
is_cloudflare = await self.engine._detect_cloudflare(page)
page_content = await page.content()
has_captcha_text = "captcha" in page_content.lower()
captcha_present = is_cloudflare or has_captcha_text
title_element = await page.query_selector("h1.t-24")
job_data_accessible = title_element is not None
if captcha_present:
if job_data_accessible:
print(" ⚠️ CAPTCHA detected, but job data is accessible. Proceeding in stealth mode...")
await self.engine._avoid_captcha(page)
else:
print(" ⚠️ CAPTCHA detected and job data blocked. Attempting recovery...")
if not await self.engine._solve_captcha_fallback(page):
print(" ❌ CAPTCHA recovery failed. Skipping job.")
continue
title_element = await page.query_selector("h1.t-24")
if not title_element:
print(" ❌ Job data still unavailable after CAPTCHA handling. Skipping.")
continue
if not captcha_present:
await self.engine._avoid_captcha(page)
apply_btn = None
apply_selectors = [
"button[aria-label*='Apply']",
"button:has-text('Apply')",
"a:has-text('Apply')",
"button:has-text('Easy Apply')"
]
for selector in apply_selectors:
apply_btn = await page.query_selector(selector)
if apply_btn:
break
job_data = None
final_url = full_url
if apply_btn:
print(" → Clicking 'Apply' / 'Easy Apply' button...")
page_waiter = asyncio.create_task(context.wait_for_event("page"))
await self._human_click(page, apply_btn, wait_after=False)
external_page = None
try:
external_page = await asyncio.wait_for(page_waiter, timeout=5.0)
print(" 🌐 External job site opened in new tab.")
await external_page.wait_for_load_state("load", timeout=30000)
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(external_page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(external_page)
final_url = external_page.url
if not external_page.is_closed():
await external_page.close()
except asyncio.TimeoutError:
print(" 🖥️ No external tab — scraping LinkedIn job page.")
await page.wait_for_timeout(2000)
try:
await page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=8000)
except:
pass
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(page)
final_url = page.url
else:
print(" ⚠️ No 'Apply' button found — scraping job details directly.")
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(page)
final_url = page.url
job_data["url"] = final_url
if job_data["title"] == "N/A" and "linkedin.com" in final_url:
job_id = final_url.split("/")[-2] if "/jobs/view/" in final_url else "unknown"
job_data["title"] = f"Easy Apply Job - ID {job_id}"
is_meaningful = (
job_data["title"] != "N/A" or
job_data["company"] != "N/A" or
(job_data["description"] != "N/A" and len(job_data["description"]) > 20)
)
if is_meaningful:
await self._save_to_db(job_data, search_keywords)
await self._save_to_markdown(job_data, search_keywords, verified=True)
scraped_count += 1
print(f" ✅ Scraped (verified): {job_data['title'][:50]}...")
else:
await self._save_to_markdown(job_data, search_keywords, verified=False)
print(f" 🟡 Scraped (unverified): {final_url} — low-quality data")
except Exception as e:
print(f" ⚠️ Failed on job {idx+1}: {str(e)[:100]}")
continue
finally:
print(" ↩️ Returning to LinkedIn search results...")
await page.goto(search_url, timeout=60000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Saved {scraped_count} verified + additional unverified jobs for '{search_keywords}'.")
else:
self.engine.report_outcome("captcha")
print("⚠️ No verified jobs scraped — check 'linkedin_jobs_unverified' for raw outputs.")

View File

@ -0,0 +1,504 @@
# LinkedIn Jobs - 2025-12-05 14:04:45
## Job: Machine Learning Engineer
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: The Arena
- **Location**: Lagos, Lagos State, Nigeria
- **Nature of Work**: Not specified in the provided content. Could not infer from keywords like 'remote', 'onsite', or 'hybrid'.
- **Salary Range**: Not specified in the provided content.
- **Job ID**: 4325564279
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:04:43.210072
- **URL**: <https://www.linkedin.com/jobs/view/4325564279/?eBP=NOT_ELIGIBLE_FOR_CHARGING&refId=KZdFT%2FkXGUGDBr1Ru66VSg%3D%3D&trackingId=O4Oht8qideqoj%2FUHqgXQKg%3D%3D&trk=flagship3_search_srp_jobs>
### Description
The job posting is for a Machine Learning Engineer position. The content appears to be from a LinkedIn job application dialog, showing contact information collection for the applicant Ofure Ikheloa. The main job details beyond the title and company are not fully visible in the provided content snippet, which focuses on the application form's contact info section.
### Requirements
Specific requirements are not detailed in the provided content snippet. The visible section is part of the application form for collecting the candidate's contact information.
### Qualifications
Specific qualifications are not detailed in the provided content snippet. The visible section is part of the application form.
---
## Job: Junior Software Engineer (Fresh Graduates)
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: Clarvos
- **Location**: Lagos, Lagos State, Nigeria
- **Nature of Work**: N/A
- **Salary Range**: N/A
- **Job ID**: 4348455050
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:06:23.354232
- **URL**: <https://www.linkedin.com/jobs/view/4348455050/?eBP=NOT_ELIGIBLE_FOR_CHARGING&refId=KZdFT%2FkXGUGDBr1Ru66VSg%3D%3D&trackingId=ztbzkgV%2BpdyOIs92zSWwNQ%3D%3D&trk=flagship3_search_srp_jobs>
### Description
The job posting is for a Junior Software Engineer position targeted at fresh graduates. The role appears to be with Clarvos, based in Lagos, Nigeria. The provided content shows an application form with contact information fields, indicating this is an active job application page on LinkedIn.
### Requirements
The specific requirements are not detailed in the provided content snippet. However, being a Junior Software Engineer role for fresh graduates typically requires foundational programming knowledge, problem-solving skills, and a willingness to learn.
### Qualifications
The role is explicitly for 'Fresh Graduates,' indicating that a recent bachelor's degree in Computer Science, Software Engineering, or a related field is the primary qualification. Specific educational requirements are not listed in the provided text.
---
## Job: Machine Learning Engineer - Search
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: Shopify
- **Location**: Remote - Americas
- **Nature of Work**: Remote
- **Salary Range**: N/A
- **Job ID**: unknown
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:37:51.551952
- **URL**: <https://www.shopify.com/careers/machine-learning-engineer-search_c15b011d-bfe1-4eae-af45-9f3955ce408d?utm_source=linkedin>
### Description
About the role Every day, millions of people search for products across Shopify's ecosystem. That's not just queries—that's dreams, businesses, and livelihoods riding on whether someone finds the perfect vintage jacket or the exact drill bit they need. As a Machine Learning Engineer specializing in Search Recommendations, you'll be the one making that magic happen. With a search index unifying over a billion products, you're tackling one of the hardest search problems at unprecedented scale. We're building cutting-edge product search from the ground up using the latest LLM advances and vector matching technologies to create search experiences that truly understand what people are looking for.
### Requirements
Key Responsibilities: Design and implement AI-powered features to enhance search recommendations and personalization Collaborate with data scientists and engineers to productionize data products through rigorous experimentation and metrics analysis Build and maintain robust, scalable data pipelines for search and recommendation systems Develop comprehensive tools for evaluation and relevance engineering, following high-quality software engineering practices Mentor engineers and data scientists while fostering a culture of innovation and technical excellence
### Qualifications
Qualifications: Expertise in relevance engineering and recommendation systems, with hands-on experience in Elasticsearch, Solr, or vector databases Strong proficiency in Python with solid object-oriented programming skills Proven ability to write optimized, low-latency code for high-performance systems Experience deploying machine learning, NLP, or generative AI products at scale (strong plus) Familiarity with statistical methods and exposure to Ruby, Rails, or Rust (advantageous) Track record of shipping ML solutions that real users depend on
---
## Job: Data/ML Engineer
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: Sailplan
- **Location**: Fort Lauderdale
- **Nature of Work**: This position may be located remotely or from our Headquarters in Miami / Fort Lauderdale, Florida, as determined on a case by case basis. Remote candidates must be US citizens located in the United States or Canada. Remote candidates are expected to travel to office periodically as necessary.
- **Salary Range**: N/A
- **Job ID**: unknown
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:40:02.469176
- **URL**: <https://www.wiraa.com/job-description/usAD0C4BA9641769424C946CED3DC727D9?source=Linkedin>
### Description
SailPlan is a cutting-edge technology company that is dedicated to transforming the future of maritime transportation. SailPlan offers a range of innovative solutions and services that enable its clients to optimize their operations and reduce their environmental impact. SailPlan works with some of the most important names in the shipping industry to deliver a cleaner future for the world. SailPlans team comprises of experts with a diverse range of skills and experience, including naval architects, data scientists, and software engineers. The companys collaborative and dynamic work culture fosters innovation and creativity, allowing the team to develop cutting-edge solutions that drive the industry forward. By combining state-of-the-art technology and a commitment to sustainability, SailPlan is leading the way towards a greener and more efficient maritime industry. At SailPlan, you will be part of a fast-growing team, will wear many hats and have ownership over building key aspects of our platform. You will work within a collaborative environment to build the next generation of technology for the maritime industry. If you think you have the right stuff, we are looking for YOU.
### Requirements
We are seeking an experienced Data Engineer to focus on productionalizing machine learning models within Google Cloud Platform (GCP) while collaborating on ETL design and planning. This role will be responsible for architecting, implementing, and maintaining the infrastructure needed for model training, retraining, and deployment, ensuring high-quality ML performance monitoring and external output serving. The ideal candidate has expertise in ML model operationalization, cloud architecture, and data pipeline orchestration, working closely with data scientists, cloud engineers, and analysts to bridge the gap between data engineering and MLOps, while also ensuring that model outputs are seamlessly integrated into analytics and decision-making systems.
### Qualifications
Core Requirements and Qualifications
Select and implement the appropriate GCP services for scalable ML workflows, including model training, retraining, and deployment
Develop automated monitoring and performance tracking for deployed models, surfacing quality metrics internally and ensuring external services receive high-quality outputs
Optimize model deployment pipelines to ensure efficient versioning, retraining triggers, and drift detection
Collaboration on ETL Development: Work alongside data engineers to design and optimize data pipelines that support machine learning models
Ensure seamless integration between data ingestion, transformations, and ML pipelines, leveraging BigQuery and DBT
Coordinate with sensor and instrumentation engineers to facilitate the ingestion of real-time sensor data for predictive modeling
Architect and implement CI/CD pipelines for ML models, enabling automated deployment, testing, and rollback strategies
Design cloud infrastructure that supports scalable and cost-efficient ML model training in production environments
Implement logging, alerting, and monitoring to proactively identify issues with models and data pipelines
Ensure ML model outputs are easily accessible and consumable by analytics, dashboards, and external services
Work closely with data analysts and cloud engineers to optimize Looker integrations and visualization pipelines for ML-driven insights
Maintain and document model lifecycle processes, ensuring clarity and reproducibility across the team
Required Skills
Bachelors or Masters degree in Computer Science, Software Engineering, or a related field
Strong experience in MLOps and machine learning model operationalization, particularly within GCP
Proficiency in SQL and Python, with experience in data manipulation, feature engineering, and ML model deployment
Hands-on experience with CI/CD pipelines, version control (Git), and infrastructure-as-code (Terraform, Cloud Build)
Experience working with data pipelines (specifically time-series data) and collaborating with data engineers to support ML workflows
Excellent problem-solving skills and a proactive, collaborative mindset
Preferred Qualifications
Familiarity with the maritime/shipping domain, including knowledge of sensor data and operational challenges
Experience with dbt and BigQuery for efficient data transformation
Knowledge of LookML and Looker dashboards, especially for surfacing ML insights
Experience working with real-time streaming and high-fidelity time series data
Understanding of data governance, security, and compliance best practices in cloud-based environments
---
## Job: AI Engineer (m/f/d)
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: myneva
- **Location**: Portugal, remote
- **Nature of Work**: Permanent employee, Full-time · Portugal, remote
- **Salary Range**: N/A
- **Job ID**: unknown
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:42:45.137786
- **URL**: <https://myneva-group.jobs.personio.de/job/2443218?language=en&src=752617&_pc=752617#apply>
### Description
Your mission We are looking for a polyglot AI Engineer, located in the mainland Portugal, who is as comfortable optimizing backend concurrency in Go as they are building RAG pipelines in Python and designing interfaces in TypeScript. In this role, you will wear two hats. First, you will build the core infrastructure of our AI Platform, ensuring high availability and low latency for model inference. Second, you will design and deploy autonomous AI Agents, for our product landscape and services. If you are a builder who believes that AI is more than just prompts, its about systems, integration, and architecture then we want to hear from you. 1. AI Platform Development (Go & Python): Architect and build scalable microservices in Go (Golang) to handle high-throughput requests for our AI services. Design efficient APIs (gRPC/REST) that serve as the bridge between our core application and AI models. Optimize inference latency and manage model serving infrastructure. 2. AI Agent Engineering (Python & TypeScript): Develop autonomous agents using Python frameworks (e.g., LangChain, LlamaIndex, or custom solutions) to automate internal business processes (e.g., data entry, customer support triage, financial reporting). Implement "Tool Use" and "Function Calling" to allow LLMs to interact with third-party APIs and our internal databases. Build the integration layer and user interfaces in TypeScript (Node.js/Next.js) to allow non-technical staff to interact with these agents.
### Requirements
Your profile Polyglot Proficiency: You have production-level experience in Go, Python, and TypeScript (at least 2 of the 3 languages). You know which tool to use for which job. AI/LLM Experience: You have built applications utilizing OpenAI API, Anthropic, or open-source models (Llama 3, Mistral). You understand context windows, token limits, and prompt engineering. Systems Thinking: You understand distributed systems, concurrency, and how to deploy AI in a way that doesn't break production. Agentic Workflows: Experience building multi-step reasoning agents (e.g., "Plan-and-Execute" patterns). Database Skills: Proficiency with SQL (Postgres) and Vector Stores.
### Qualifications
Nice to Haves (Bonus Points) Experience fine-tuning open-source models on custom datasets. Knowledge of temporal/orchestration frameworks for managing long-running agent workflows. Experience with container orchestration (Kubernetes/Docker). Background in DevOps or MLOps (MLflow, weights & biases).
---
## Job: ML/AI Engineer
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: Deltek, Inc
- **Location**: US Remote
- **Nature of Work**: Remote
- **Salary Range**: The U.S. salary range for this position is $57000.00 - $99750.00.
- **Job ID**: unknown
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:46:54.116677
- **URL**: <https://sjobs.brassring.com/TGnewUI/Search/home/HomeWithPreLoad?PageType=JobDetails&partnerid=25397&siteid=5259&jobId=621951#jobDetails=621951_5259>
### Description
Position Responsibilities Develop and deploy machine learning models for classification, regression, forecasting, and NLP tasks using production-grade code and best practices Build data pipelines for ML model training and inference; work with structured and unstructured data from multiple enterprise systems Implement model training workflows including data preprocessing, feature engineering, hyperparameter tuning, and model evaluation Create production-ready ML services with RESTful APIs that can be consumed by web and mobile applications; ensure proper error handling, logging, and monitoring Work with large-scale datasets from enterprise ERP systems; process time-series data, transactional data, and unstructured documents Collaborate with data scientists to productionize research models; optimize models for latency, throughput, and cost Participate in code reviews and contribute to team's ML engineering practices; document solutions and share knowledge with team members Support deployed models including troubleshooting, performance optimization, and implementing improvements based on production metrics
### Requirements
Qualifications 2-4 years of ML engineering experience with hands-on model development and production deployment Strong Python programming : Experience with scikit-learn, pandas, numpy; familiarity with PyTorch or TensorFlow ML fundamentals : Solid understanding of supervised/unsupervised learning, model evaluation, cross-validation, and feature engineering API development : Experience building RESTful APIs (Flask, FastAPI, or similar); understanding of microservices architecture Data processing : SQL proficiency; experience with data pipelines, ETL processes, and working with databases (PostgreSQL, MySQL, or similar) Cloud platforms : Working knowledge of AWS, Azure, or GCP; experience with cloud storage, compute, and managed ML services Version control and collaboration : Git workflows, agile methodologies, working in cross-functional teams Bonus : Exposure to NLP techniques, LLMs, embedding models, or vector databases; experience in B2B SaaS environments Education : BS in Computer Science, Data Science, Mathematics, or related technical field
### Qualifications
N/A
---
## Job: Machine Learning Engineer
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: PhysicsX
- **Location**: New York, United States
- **Nature of Work**: Hybrid setup enjoy our Manhattan office while keeping remote flexibility.
- **Salary Range**: $120,000 - 240,000 depending on experience
- **Job ID**: unknown
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:49:29.919057
- **URL**: <https://job-boards.eu.greenhouse.io/physicsx/jobs/4644841101?gh_src=6d71ons2teu>
### Description
What you will do
Work closely with our simulation engineers, data scientists and customers to develop an understanding of the physics and engineering challenges we are solving
Design, build and test data pipelines for machine learning that are reliable, scalable and easily deployable
Explore and manipulate 3D point cloud & mesh data
Own the delivery of technical workstreams
Create analytics environments and resources in the cloud or on premise, spanning data engineering and science
Identify the best libraries, frameworks and tools for a given task, make product design decisions to set us up for success
Work at the intersection of data science and software engineering to translate the results of our R&D and projects into re-usable libraries, tooling and products
Continuously apply and improve engineering best practices and standards and coach your colleagues in their adoption
### Requirements
What you bring to the table
Experience applying Machine learning methods (including 3D graph/point cloud deep learning methods) to real-world engineering applications, with a focus on driving measurable impact in industry settings.
Experience in ML/Computational Statistics/Modelling use-cases in industrial settings (for example supply chain optimisation or manufacturing processes) is encouraged.
A track record of scoping and delivering projects in a customer facing role
2+ years experience in a data-driven role, with exposure to software engineering concepts and best practices (e.g., versioning, testing, CI/CD, API design, MLOps)
Building machine learning models and pipelines in Python, using common libraries and frameworks (e.g., TensorFlow, MLFlow)
Distributed computing frameworks (e.g., Spark, Dask)
Cloud platforms (e.g., AWS, Azure, GCP) and HP computing
Containerization and orchestration (Docker, Kubernetes)
Strong problem-solving skills and the ability to analyse issues, identify causes, and recommend solutions quickly
Excellent collaboration and communication skills - with teams and customers alike
A background in Physics, Engineering, or equivalent
### Qualifications
N/A
---
## Job: DevOps Engineer
- **Keyword**: DevOps Engineer location:New York
- **Company**: micro1
- **Location**: N/A
- **Nature of Work**: N/A
- **Salary Range**: N/A
- **Job ID**: 4342293908
- **Category**: DevOps Engineer
- **Scraped At**: 2025-12-05T15:46:55.076540
- **URL**: <https://www.linkedin.com/jobs/view/4342293908/?eBP=NOT_ELIGIBLE_FOR_CHARGING&refId=8EZEulwt3rTN7TqojhbDIQ%3D%3D&trackingId=8OTJNgCFiTsOg9ssPeenJQ%3D%3D&trk=flagship3_search_srp_jobs>
### Description
N/A
### Requirements
N/A
### Qualifications
N/A
---
## Job: DevOps Engineer
- **Keyword**: DevOps Engineer location:New York
- **Company**: Utah Tech Labs
- **Location**: North America + 1 more
- **Nature of Work**: Freelance
- **Salary Range**: $16 $20/hr
- **Job ID**: 4325775293
- **Category**: DevOps Engineer
- **Scraped At**: 2025-12-05T15:51:10.659992
- **URL**: <https://app.usebraintrust.com/jobs/16561/?gh_src=06bf3def4us&utm_channel=jobboard&utm_source=linkedin>
### Description
We are looking for a skilled DevOps Engineer to help build the foundational infrastructure, CI/CD pipelines, and operational standards for our engineering organization. This role is hands-on and ideal for someone experienced in supporting large-scale applications across modern frontend, backend, and cloud technologies.
Responsibilities
Build and maintain AWS infrastructure using Terraform (IaC).
Set up CI/CD pipelines for Angular, React, Flutter, Python (FastAPI/Django), and Node.js (Express/TypeScript) .
Implement automated unit, integration, and regression testing pipelines.
Establish logging, monitoring, and alerting (CloudWatch, ELK/OpenSearch, Datadog, etc.).
Define and enforce data security , IAM, secrets management, and encryption best practices.
Optimize SQL/NoSQL databases, performance tuning, backups, and restore workflows.
Create SOPs, reusable infrastructure templates, and DevOps best-practice documentation.
### Requirements
23+ years DevOps experience , including work on larger, production-grade projects.
Strong experience with AWS (EC2, ECS/EKS, RDS, S3, CloudFront, IAM, VPC).
Strong proficiency with Terraform and infrastructure-as-code workflows.
Hands-on experience building CI/CD pipelines for modern frontend and backend frameworks.
Strong understanding of data security , network configuration, and secure deployment.
Familiarity with application logging, monitoring, and distributed tracing.
Experience optimizing relational and non-relational databases.
Deliverables include SOPs, infrastructure templates, CI/CD pipelines, and automated testing frameworks.
### Qualifications
N/A
---
## Job: DevOps Engineer
- **Keyword**: DevOps Engineer location:New York
- **Company**: Core4ce
- **Location**: Remote (Worldwide)
- **Nature of Work**: Remote
- **Salary Range**: N/A
- **Job ID**: 4342211043
- **Category**: DevOps Engineer
- **Scraped At**: 2025-12-05T15:52:59.449387
- **URL**: <https://jobs.silkroad.com/Core4ce/Careers/jobs/1026?src=LinkedIn>
### Description
As a DevOps Engineer, you will play a pivotal role in designing, implementing, and maintaining the infrastructure and tools necessary to support continuous integration, continuous deployment, and automated operations. You will collaborate with cross-functional teams to streamline development processes, improve system reliability, and enhance overall productivity.
### Requirements
Responsibilities: Design and implement scalable, reliable, and secure DevOps solutions that meet the needs of the organization. Lead the development and implementation of CI/CD pipelines to automate software delivery processes. Architect and manage cloud infrastructure, ensuring optimal performance, cost efficiency, and scalability. Collaborate with development, operations, and quality assurance teams to integrate automated testing and monitoring into the CI/CD pipeline. Establish and enforce DevOps best practices, standards, and guidelines across the organization. Identify opportunities for process improvement and efficiency gains within the software development lifecycle. Provide technical guidance and mentorship to junior team members on DevOps tools, practices, and methodologies. Conduct research and evaluation of new tools, technologies, and methodologies to improve DevOps processes. Troubleshoot and resolve infrastructure and deployment issues in production and non-production environments. Ensure compliance with security, privacy, and regulatory requirements in all DevOps activities.
### Qualifications
Qualifications: 5+ years of experience in DevSecOps, specifically within DoD environments. Ability to obtain and maintain a Secret clearance. Ability to obtain and maintain a DoD 8570 IAT Level II certification. Proficient in Kubernetes, data pipeline management, and containerization technologies. Knowledge of continuous integration and continuous deployment (CI/CD) tools like Jenkins, GitLab CI, JIRA, or CircleCI. Familiarity with security scanning tools and vulnerability assessment tools (e.g., SonarQube, Fortify, Nessus). Experience with AWS cloud platforms and their native DevSecOps tools. Ability to analyze and improve existing DevSecOps processes for efficiency and security. Familiarity with regulatory compliance standards relevant to the DoD (e.g., NIST, FedRAMP). Proactive approach to identifying and mitigating security risks in the software development lifecycle. Certifications relevant to DevSecOps such as Certified Kubernetes Administrator (CKA), AWS Certified DevOps Engineer, or similar credentials Preferred Qualifications: Bachelor's degree in computer science, Cybersecurity, Information Technology, or related field, with an emphasis on security. Experience with Query Development and Optimization Perform defect fixes and minor feature extensions (adding new data elements to reports, adding or modifying current reporting filters, moving, adding or changing how data is displayed in a profile page for an individual service member, integrating new data sources into the database) Experience with Query Development and Optimization Code and update SSIS packages to support data extraction On-going enhancements to support policy changes such as updates to existing business logic that computes Platform, Administrative,
---
## Job: DevOps Engineer
- **Keyword**: DevOps Engineer location:New York
- **Company**: Verana Health
- **Location**: N/A
- **Nature of Work**: Remote
- **Salary Range**: National Pay Range $148,000 - $175,000 USD
- **Job ID**: 4256494004
- **Category**: DevOps Engineer
- **Scraped At**: 2025-12-05T15:55:17.680202
- **URL**: <https://job-boards.greenhouse.io/veranahealth/jobs/8054444002>
### Description
Accelerate Real-World Insights Through Cloud Innovation Verana Health is a digital health company that harnesses exclusive real-world data to deliver quality insights that accelerate drug development and improve patient care. Backed by leading investors such as Johnson & Johnson Innovation JJDC, Inc., Novo Growth, GV (formerly Google Ventures), and more, we are transforming how medical research is conducted. Our mission-driven team is committed to making a tangible difference in patients' lives through technology and data science. You will report to the Director of Engineering and work with engineering, data science, and IT teams. Your contributions will ensure the seamless operation of our cloud infrastructure, enabling faster, safer, and more reliable delivery of our data-driven solutions. This role is critical to Verana Health's ability to innovate and scale its impact on patient care. This is a remote position. Why This Role Matters As a DevOps Engineer at Verana Health, you will help ensure the reliability, security, and scalability of our cloud infrastructure. Your work will directly help deliver critical data analytics and research tools used by healthcare professionals and researchers worldwide. What You Get to Do Architect, deploy, and maintain cloud-based infrastructure using AWS, with a focus on automation, security, and scalability. Develop and optimize CI/CD pipelines to accelerate software delivery and improve operational efficiency. Collaborate with cross-functional teams (engineering, data science, QA) to support their DevOps needs and drive continuous improvement. Implement and enforce best practices for authorization, authentication, and compliance across AWS services. Monitor system performance, troubleshoot issues, and ensure high availability of critical applications and databases. Document and refine DevOps processes to foster knowledge sharing and operational excellence. Support database management, server administration (Linux/Windows), and infrastructure orchestration using tools like Docker, Kubernetes, and Terraform. Contribute to a culture of innovation, learning, and growth within the technology team.
### Requirements
Skills and Experience that Will Help You Succeed Essential Requirements: Bachelor's degree in computer science, software engineering, or a related scientific discipline. 5+ years of professional experience in DevOps, cloud engineering, or software development. Expertise in AWS services, including IAM, VPC, EC2, S3, and cloud security best practices. Hands-on experience with CI/CD tools, containerization (Docker, Kubernetes), and infrastructure as code (Terraform, CloudFormation). Proficiency in scripting (Bash, Python) and version control (GitLab, GitHub). Experience with Linux and Windows server administration, database management, and Databricks. Desirable Skills: Exposure to healthcare or clinical research environments. Experience mentoring or guiding junior team members. Continuous learning and process improvement. Must-Haves for the Role Expertise in AWS cloud infrastructure and security. Experience with CI/CD, containerization, and infrastructure as code. Strong scripting and automation skills.
### Qualifications
N/A
---
## Job: DevOps Engineer
- **Keyword**: DevOps Engineer location:New York
- **Company**: Machinify
- **Location**: California Office - Roseville, CA
- **Nature of Work**: Remote - Remote
- **Salary Range**: $150,000 - $180,000 USD
- **Job ID**: 4325826034
- **Category**: DevOps Engineer
- **Scraped At**: 2025-12-05T16:01:49.663716
- **URL**: <https://www.remotehunter.com/apply-with-ai/47671ed0-5c8f-4f9e-9958-ceef96a2bb13?utm_medium=job_posting&utm_source=linkedin&utm_campaign=devops_engineer_remote&utm_category=devops_engineer&utm_term=dev_ops_engineer>
### Description
Who We Are Machinify is a leading healthcare intelligence company with expertise across the payment continuum, delivering unmatched value, transparency, and efficiency to health plan clients across the country. Deployed by over 60 health plans, including many of the top 20, and representing more than 160 million lives, Machinify brings together a fully configurable and content-rich, AI-powered platform along with best-in-class expertise. Were constantly reimagining whats possible in our industry, creating disruptively simple, powerfully clear ways to maximize financial outcomes and drive down healthcare costs. Location: This role is fully remote About the Opportunity Machinify is seeking a DevOps Engineer to create, maintain and automate AI/ML cloud technologies with a focus on the continued migration from VMs into Kubernetes for all applicable technology solutions supporting the Machinify Cloud and to do so with an eye to the best future implementation while solving the problems of today. We do everything at big data scale, high uptime and with an eye to incredible customer experience. Machinifys healthcare customers are at the center of everything we do, so we employ innovative thinkers to solve issues our customers don't even have yet and do so with operational excellence. Those innovative thinkers, our people, are the core of what makes Machinify differentiated in Healthcare. Everyone has a voice. Our teams are diverse and thrive on trust. We are humans who understand our customer and work collaboratively to deliver value and make a difference. The DevOps Engineer will provide technical leadership in significant technical, automation, programming, system administration, operational, and software configuration management through partnering with DevOps and Engineering as a whole. This vital team member will have responsibility for design, engineering, development and integration within production and pre-production environments, and will also be responsible for the programs configuration management, including the planning, design, engineering, implementation and execution of successful build and deployment of code updates to each upstream environment along with planning and implementing the configuration management of all underlying technologies. What youll do: Facilitate the movement of VM technologies into Kubernetes through migration or replacement. Automate everything. Nothing should require manual intervention. We routinely redeploy from the ground up to ensure automation is up to date Architect solutions to achieve a high level of performance, reliability, scalability, and security Create, maintain and troubleshoot distributed compute AI/ML technologies running in the Cloud Collaborate with a great team and learn from each other Change the way healthcare companies manage their business Be challenged when faced with solving complex problems Bring a passion for improving the lives of others by making their jobs easier and more productive Be responsible and accountable for everything you build and support Communicate effectively with other engineers in the same team, with other teams and with various other stakeholders such as product managers Operate in an Agile development environment
### Requirements
N/A
### Qualifications
Experience in the migration of VM Technologies into a Kubernetes Environment 5+ years of production support preferably in a Cloud Environment (AWS, Azure or GCP) Degree in Computer Science or equivalent work experience Extremely logical with the ability to solve problems in creative and efficient ways Knowledge / Experience in the following areas Containerization with Kubernetes Scripting (python, shell etc) Crossplane / Terraform (Infrastructure as code) Linux (CentOS/RHEL) Spark / Machine Learning running in the Cloud Frameworks for distributed machine-learning / AI, such as Azure OpenAI, AWS Bedrock or things like Tensorflow and MxNet. Good understanding of Operations security practices Working in / creating compliant environments such as Hi-Trust / SOC2 etc Continuous Integration/Continuous Deployment frameworks. Citus (Distributed Postgres a plus) You are scrappy, fast, adaptable and ambitious Critical thinking and problem solving skills
---
## Job: DevOps Engineer
- **Keyword**: DevOps Engineer location:New York
- **Company**: NV5
- **Location**: N/A
- **Nature of Work**: N/A
- **Salary Range**: N/A
- **Job ID**: 4325301423
- **Category**: DevOps Engineer
- **Scraped At**: 2025-12-05T16:03:56.440663
- **URL**: <https://careers-nv5.icims.com/jobs/11535/devops-engineer/login?mobile=false&width=1369&height=500&bga=true&needsRedirect=false&jan1offset=60&jun1offset=60>
### Description
N/A
### Requirements
N/A
### Qualifications
N/A
---
## Job: Data Scientist (Remote - US)
- **Keyword**: Data Scientist location:New York
- **Company**: Jobgether
- **Location**: US
- **Nature of Work**: Remote
- **Salary Range**: N/A
- **Job ID**: 4342169293
- **Category**: Data Scientist
- **Scraped At**: 2025-12-05T16:48:07.597147
- **URL**: <https://jobs.lever.co/jobgether/1e254d1b-5d4f-4060-8eb7-705a9ae77646/apply?source=LinkedIn>
### Description
N/A
### Requirements
N/A
### Qualifications
N/A
---
## Job: Data Scientist (Kaggle-Grandmaster)
- **Keyword**: Data Scientist location:New York
- **Company**: Mercor
- **Location**: N/A
- **Nature of Work**: Remote
- **Salary Range**: $56-$77 / hr
- **Job ID**: 4342199585
- **Category**: Data Scientist
- **Scraped At**: 2025-12-05T16:50:50.900310
- **URL**: <https://work.mercor.com/jobs/list_AAABmuPnQVAFcCPPhAJMHJKY/data-scientist-kaggle-grandmaster?referralCode=d12bb6d7-56b2-4c5d-b2aa-751065941704&utm_source=referral&utm_medium=share&utm_campaign=job_referral>
### Description
Role Description Mercor is hiring on behalf of a leading AI research lab to bring on a highly skilled Data Scientist with a Kaggle Grandmaster profile. In this role, you will transform complex datasets into actionable insights, high-performing models, and scalable analytical workflows. You will work closely with researchers and engineers to design rigorous experiments, build advanced statistical and ML models, and develop data-driven frameworks to support product and research decisions. What Youll Do Analyze large, complex datasets to uncover patterns, develop insights, and inform modeling direction Build predictive models, statistical analyses, and machine learning pipelines across tabular, time-series, NLP, or multimodal data Design and implement robust validation strategies, experiment frameworks, and analytical methodologies Develop automated data workflows, feature pipelines, and reproducible research environments Conduct exploratory data analysis (EDA), hypothesis testing, and model-driven investigations to support research and product teams Translate modeling outcomes into clear recommendations for engineering, product, and leadership teams Collaborate with ML engineers to productionize models and ensure data workflows operate reliably at scale Present findings through well-structured dashboards, reports, and documentation
### Requirements
N/A
### Qualifications
Qualifications Kaggle Competitions Grandmaster or comparable achievement: top-tier rankings, multiple medals, or exceptional competition performance 35+ years of experience in data science or applied analytics Strong proficiency in Python and data tools (Pandas, NumPy, Polars, scikit-learn, etc.) Experience building ML models end-to-end: feature engineering, training, evaluation, and deployment Solid understanding of statistical methods, experiment design, and causal or quasi-experimental analysis Familiarity with modern data stacks: SQL, distributed datasets, dashboards, and experiment tracking tools Excellent communication skills with the ability to clearly present analytical insights Nice to Have Strong contributions across multiple Kaggle tracks (Notebooks, Datasets, Discussions, Code) Experience in an AI lab, fintech, product analytics, or ML-focused organization Knowledge of LLMs, embeddings, and modern ML techniques for text, images, and multimodal data Experience working with big data ecosystems (Spark, Ray, Snowflake, BigQuery, etc.) Familiarity with statistical modeling frameworks such as Bayesian methods or probabilistic programming
---
## Job: Data Engineer
- **Keyword**: Data Scientist location:New York
- **Company**: Tithe.ly
- **Location**: Lagos, Lagos State, Nigeria
- **Nature of Work**: N/A
- **Salary Range**: N/A
- **Job ID**: 4339262912
- **Category**: Data Scientist
- **Scraped At**: 2025-12-05T16:53:19.169174
- **URL**: <https://www.linkedin.com/jobs/view/4339262912/?eBP=NOT_ELIGIBLE_FOR_CHARGING&refId=90LYJbgaP%2FlZrP8CD4Vdzg%3D%3D&trackingId=rDpj810DVM4VmjwzqTCxcQ%3D%3D&trk=flagship3_search_srp_jobs>
### Description
N/A
### Requirements
N/A
### Qualifications
N/A
---
## Job: Data Engineering Scientist
- **Keyword**: Data Scientist location:New York
- **Company**: Birdy Grey
- **Location**: United States
- **Nature of Work**: Remote
- **Salary Range**: N/A
- **Job ID**: 4322321659
- **Category**: Data Scientist
- **Scraped At**: 2025-12-05T16:57:17.957749
- **URL**: <https://job-boards.greenhouse.io/birdygrey/jobs/4970284007?gh_src=daef57357us>
### Description
THE COMPANY: BIRDY GREY Birdy Grey is a direct-to-consumer brand whose mission is to celebrate friendships during one of the most important milestones in a persons life: their wedding. Founded in 2017 by best friends Grace Lee (Founder & Chief Creative Officer) and Monica Ashauer (Co-Founder & Chief Strategy Officer), Birdy Grey offers affordable bridesmaid dresses starting at just $89, groomsmen suits starting at $199, plus fun gifts and accessories for everyone in the wedding party. Since day one, we've dressed over 2 million bridesmaids and we're proud to be a trusted resource for brides and grooms on their most cherished day. POSITION: Data Engineering Scientist REPORTS TO: Director, Data & Analytics LOCATION: Remote Headquartered in Los Angeles, CA with an office in New York, NY, Birdy Grey supports remote work for eligible roles. We ask that all employees travel to either office once a quarter. This role is not eligible for visa sponsorship. #LI-Remote We're looking for a Data Engineering Scientist who thrives on variety. This isn't a role where you'll specialize in one narrow area, you'll be building pipelines, analyzing data, creating dashboards, and developing models. If you are energized by wearing multiple hats, statistically rigorous, hungry to learn new things, and eager to observe the direct impact of your work, this role is for you. SCOPE OF RESPONSIBILITIES Data Science (30-40%) Lead the application of statistical and machine learning methodologies (using Python and relevant frameworks) to solve core business problems, focusing on predictive and prescriptive outcomes Co-design rigorous A/B and multivariate experiments, ensuring statistical validity to accurately measure the impact of product and business changes Identify opportunities where machine learning or advanced analytics can add value Prototype data-driven solutions to business problems Data Engineering (20-30%) Architect and Deploy robust data pipelines to collect, transform, and load data from various sources Design and optimize data storage solutions and database schemas Ensure data quality, reliability, and accessibility across the organization Automate repetitive data processes and workflows Establish and enforce data governance and security standards within our Cloud environment Data Analytics (30-40%) Serve as the embedded strategic data partner for key business teams, translating complex challenges into measurable analytical projects Design and manage high-impact, self-service business intelligence assets (primarily in Looker) that accelerate organizational decision velocity Conduct ad-hoc analyses to uncover insights and opportunities Translate complex findings into clear, actionable recommendations
### Requirements
THE RIGHT CANDIDATE: QUALIFICATIONS & PERSONAL ATTRIBUTES EDUCATION: Bachelors Degree Required EXPERIENCE / REQUIREMENTS: 5+ years of hands-on experience in a data science, data engineering, data analyst, analytics engineering, or ML role Expert SQL skills. Must be adept at designing, optimizing, and tuning complex queries, stored procedures, and scripts, specifically utilizing analytic window functions, CTEs, and advanced join techniques Expertise with Python (preferably) or R for data analysis and automation Expertise with at least one data visualization tool, preferably Looker Experience with Cloud data platforms, DevOps, MLOps (AWS, GCP, Azure) Knowledge of data orchestration tools (Airflow, Prefect, dbt, Dagster, etc.) Experience with ML frameworks (scikit-learn, TensorFlow, PyTorch) Experience with version control (Git) and software engineering best practices Familiarity with marketing analytics, retail, and econometric principles Understanding of basic statistics and when to apply different analytical approaches Ability to communicate technical concepts to non-technical stakeholders Comfortable with ambiguity and figuring things out independently NICE TO HAVES: Start-up experience Interest in bridal and fashion Experience with ticketing systems and change management processes Interest in increasing productivity via Automation & AI Start-up or D2C/e-commerce experience
### Qualifications
N/A
---

View File

@ -1,28 +0,0 @@
from scraping_engine import FingerprintScrapingEngine
from job_scraper import LinkedInJobScraper
import os
import asyncio
async def main():
engine = FingerprintScrapingEngine(
seed="job_scraping_engine",
target_os="windows",
db_path="job_listings.db",
markdown_path="job_listings.md",
search_keywords="Data Anaylst"
)
scraper = LinkedInJobScraper(engine, human_speed=1.6)
await scraper.scrape_jobs(
search_keywords="Data Anaylst", # ← Your search terms
max_pages=3,
credentials={
"email": os.getenv("SCRAPING_USERNAME"),
"password": os.getenv("SCRAPING_PASSWORD")
}
)
if __name__ == "__main__":
asyncio.run(main())

354
llm_agent.py Normal file
View File

@ -0,0 +1,354 @@
from openai import OpenAI
from typing import Dict, Any
import asyncio
import psycopg2
import os
from datetime import datetime
import json
import re
from bs4 import BeautifulSoup
from dotenv import load_dotenv
# Load environment variables from .env
load_dotenv()
class LLMJobRefiner:
def __init__(self):
deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
if not deepseek_api_key:
raise ValueError("DEEPSEEK_API_KEY not found in .env file.")
# Database credentials from .env
self.db_url = os.getenv("DB_URL")
self.db_username = os.getenv("DB_USERNAME")
self.db_password = os.getenv("DB_PASSWORD")
self.db_host = os.getenv("DB_HOST")
self.db_port = os.getenv("DB_PORT")
if not self.db_url or not self.db_username or not self.db_password:
raise ValueError("Database credentials not found in .env file.")
# DeepSeek uses OpenAI-compatible API
self.client = OpenAI(
api_key=deepseek_api_key,
base_url="https://api.deepseek.com/v1"
)
self.model = "deepseek-chat"
self._init_db()
def _init_db(self):
"""Initialize PostgreSQL database connection and create Quelah Jobs table"""
try:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
cursor = conn.cursor()
# ✅ CREATE NEW TABLE: quelah_jobs (no requirements field)
cursor.execute('''
CREATE TABLE IF NOT EXISTS quelah_jobs (
id SERIAL PRIMARY KEY,
title TEXT,
company_name TEXT,
location TEXT,
description TEXT,
qualifications TEXT,
salary_range TEXT,
nature_of_work TEXT,
apply_type TEXT DEFAULT 'signup',
job_id TEXT UNIQUE,
url TEXT,
category TEXT,
scraped_at TIMESTAMP,
posted_date TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Ensure uniqueness constraint
cursor.execute('''
ALTER TABLE quelah_jobs DROP CONSTRAINT IF EXISTS quelah_jobs_job_id_key;
ALTER TABLE quelah_jobs ADD CONSTRAINT quelah_jobs_job_id_key UNIQUE (job_id);
''')
# Create indexes
cursor.execute('CREATE INDEX IF NOT EXISTS idx_quelah_job_id ON quelah_jobs(job_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_quelah_category ON quelah_jobs(category)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_quelah_posted_date ON quelah_jobs(posted_date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_quelah_apply_type ON quelah_jobs(apply_type)')
conn.commit()
cursor.close()
conn.close()
print("✅ Quelah Jobs table initialized successfully")
except Exception as e:
print(f"❌ Database initialization error: {e}")
raise
def _clean_html_for_llm(self, html_content: str) -> str:
"""Clean HTML to make it more readable for LLM while preserving key job structure"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
# Remove unwanted elements
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'noscript']):
element.decompose()
# Keep only main content containers
main_content = None
candidates = [
soup.find('main'),
soup.find('div', class_=re.compile(r'job|posting|content')),
soup.find('article'),
soup.body
]
for candidate in candidates:
if candidate:
main_content = candidate
break
if not main_content:
main_content = soup.body or soup
# Extract text with some structure
lines = []
for elem in main_content.descendants:
if isinstance(elem, str):
text = elem.strip()
if text and len(text) > 5: # Skip short fragments
lines.append(text)
elif elem.name in ['h1', 'h2', 'h3', 'h4', 'p', 'li', 'strong', 'b']:
text = elem.get_text().strip()
if text:
lines.append(text)
# Join with newlines for better LLM parsing
cleaned = '\n'.join(lines)
# Limit length for LLM context
if len(cleaned) > 10000:
cleaned = cleaned[:10000] + "..."
return cleaned
except Exception as e:
print(f"HTML cleaning error: {e}")
return html_content[:100000] if len(html_content) > 100000 else html_content
def _generate_content_sync(self, prompt: str) -> str:
"""Synchronous call to DeepSeek API"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=2048,
stream=False
)
return response.choices[0].message.content or ""
except Exception as e:
print(f"DeepSeek API error: {e}")
return ""
async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]:
page_content = raw_data.get('page_content', '')
cleaned_content = self._clean_html_for_llm(page_content)
job_id = raw_data.get('job_id', 'unknown')
url = raw_data.get('url', 'N/A')
posted_date = raw_data.get('posted_date', "12/01/25") # ✅ Fixed date
# Detect platform from URL (for prompt only)
platform = "unknown"
if "ashbyhq.com" in url:
platform = "ashby"
elif "lever.co" in url:
platform = "lever"
elif "greenhouse.io" in url:
platform = "greenhouse"
# Platform-specific instructions
platform_instructions = ""
if platform == "ashby":
platform_instructions = """
For Ashby jobs:
- Title is usually in <h1> or <h2>
- Company name is often in <meta> or header
- Description is in <div class="job-posting"> or <article>
- Look for sections like "About Us", "What you'll do", "Qualifications", "Benefits"
- Location may be in <span> near job title or in metadata
"""
elif platform == "lever":
platform_instructions = """
For Lever jobs:
- Title is in <h1> or <h2>
- Company name is in <title> or header
- Description is in <div class="job-description"> or <section>
- Look for headings like "What you'll do", "What you'll need", "Why join us"
- Location is often in <div class="location">
"""
elif platform == "greenhouse":
platform_instructions = """
For Greenhouse jobs:
- Title is in <h1> or <h2>
- Company name is in <meta> or header
- Description is in <div class="job-desc"> or <section>
- Look for headings like "Role overview", "What you'll do", "What you bring"
- Location is often in <div class="location">
"""
prompt = f"""
You are an expert job posting parser. Extract information EXACTLY as it appears in the text. DO NOT summarize, paraphrase, or invent.
CRITICAL INSTRUCTIONS:
{platform_instructions}
FIELD RULES:
- description: MUST include ALL role details, responsibilities, and overview. Never "Not provided" if any job description exists.
- qualifications: MUST include ALL required skills, experience, education, and preferred qualifications. Combine them.
- location: Extract city, state, or remote status if available.
- salary_range: Extract if explicitly mentioned (e.g., "$70,000$85,000").
- nature_of_work: Extract if mentioned (e.g., "Part-time", "Remote", "On-site").
REQUIRED FIELDS (must have valid values, never "N/A"):
- title, company_name, job_id, url, description
OPTIONAL FIELDS (can be "Not provided" if the information is actually not provided):
- location, salary_range, nature_of_work
IMPORTANT: Do NOT include or extract a "requirements" field. Focus only on description and qualifications.
Page Content:
{cleaned_content}
Response format (ONLY return this JSON):
{{
"title": "...",
"company_name": "...",
"location": "...",
"description": "...",
"qualifications": "...",
"salary_range": "...",
"nature_of_work": "...",
"job_id": "{job_id}",
"url": "{url}"
}}
"""
try:
response_text = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._generate_content_sync(prompt)
)
refined_data = self._parse_llm_response(response_text)
if not refined_data:
return None
# Validate required fields
required_fields = ['title', 'company_name', 'job_id', 'url', 'description']
for field in required_fields:
if not refined_data.get(field) or refined_data[field].strip() in ["N/A", "", "Unknown", "Company", "Job"]:
return None
# Add the fixed posted_date
refined_data['posted_date'] = posted_date
return refined_data
except Exception as e:
print(f"LLM refinement failed: {str(e)}")
return None
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL)
if not json_match:
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if not json_match:
return None
try:
return json.loads(json_match.group(1) if '```' in response_text else json_match.group(0))
except json.JSONDecodeError:
return None
async def save_job_data(self, job_data: Dict[str, Any], keyword: str, platform: str = "quelah"):
"""Save ALL jobs to Quelah Jobs table and markdown"""
await self._save_to_db_quelah(job_data)
await self._save_to_markdown_quelah(job_data, keyword)
async def _save_to_db_quelah(self, job_data: Dict[str, Any]):
"""Save job data to Quelah Jobs table"""
try:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
cursor = conn.cursor()
# Set apply_type if not present
apply_type = job_data.get("apply_type", "signup")
cursor.execute('''
INSERT INTO quelah_jobs
(title, company_name, location, description, qualifications,
salary_range, nature_of_work, apply_type, job_id, url, category, scraped_at, posted_date)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (job_id) DO NOTHING
''', (
job_data.get("title", "N/A"),
job_data.get("company_name", "N/A"),
job_data.get("location", "N/A"),
job_data.get("description", "N/A"),
job_data.get("qualifications", "N/A"),
job_data.get("salary_range", "N/A"),
job_data.get("nature_of_work", "N/A"),
apply_type,
job_data.get("job_id", "N/A"),
job_data.get("url", "N/A"),
job_data.get("category", "N/A"),
job_data.get("scraped_at"),
job_data.get("posted_date", "12/01/25") # Fixed date
))
conn.commit()
cursor.close()
conn.close()
print(f" 💾 Saved to Quelah Jobs | Job ID: {job_data.get('job_id', 'N/A')}")
except Exception as e:
print(f"❌ Database save error: {e}")
async def _save_to_markdown_quelah(self, job_data: Dict[str, Any], keyword: str):
os.makedirs("quelah_jobs", exist_ok=True)
filepath = os.path.join("quelah_jobs", "quelah_jobs.md")
write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0
with open(filepath, "a", encoding="utf-8") as f:
if write_header:
f.write(f"# Quelah Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n")
f.write(f"- *Keyword*: {keyword}\n")
f.write(f"- *Company*: {job_data.get('company_name', 'N/A')}\n")
f.write(f"- *Location*: {job_data.get('location', 'N/A')}\n")
f.write(f"- *Nature of Work*: {job_data.get('nature_of_work', 'N/A')}\n")
f.write(f"- *Salary Range*: {job_data.get('salary_range', 'N/A')}\n")
f.write(f"- *Apply Type*: {job_data.get('apply_type', 'signup')}\n")
f.write(f"- *Job ID*: {job_data.get('job_id', 'N/A')}\n")
f.write(f"- *Posted Date*: {job_data.get('posted_date', '12/01/25')}\n") # Fixed date
f.write(f"- *Category*: {job_data.get('category', 'N/A')}\n")
f.write(f"- *Scraped At*: {job_data.get('scraped_at', 'N/A')}\n")
f.write(f"- *URL*: <{job_data.get('url', 'N/A')}>\n\n")
f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n")
# ✅ REMOVED requirements section
f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n")
f.write("---\n\n")

596
scraper.py Normal file
View File

@ -0,0 +1,596 @@
import asyncio
import random
import os
import json
import time
from typing import Optional, Dict
from playwright.async_api import async_playwright
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
from fetcher import StealthyFetcher
from datetime import datetime
import pika
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
from scraping_engine import FingerprintScrapingEngine
from dotenv import load_dotenv
from ssl_connection import create_ssl_connection_parameters # Import from ssl.py
import redis
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Environment variables
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST")
RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5671"))
RABBITMQ_SSL_ENABLED = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true"
# Redis configuration
REDIS_HOST = os.getenv('REDIS_HOST')
REDIS_PORT = int(os.getenv('REDIS_PORT', '6380'))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD')
REDIS_SSL_ENABLED = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true'
# TTL constants (in seconds)
JOB_SEEN_TTL = 2592000 # 30 days (1 month)
class RedisManager:
"""Manages Redis connection and operations for job tracking and caching."""
def __init__(self):
self.redis_client = None
self._connect()
def _connect(self):
"""Establish connection to Redis server."""
if not REDIS_PASSWORD:
logger.warning("Warning: REDIS_PASSWORD not found in environment.")
try:
self.redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
ssl=REDIS_SSL_ENABLED,
ssl_cert_reqs=None,
socket_connect_timeout=10,
socket_timeout=30,
retry_on_timeout=True
)
response = self.redis_client.ping()
logger.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}! Response: {response}")
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
self.redis_client = None
def is_job_seen(self, job_id: str) -> bool:
if not self.redis_client:
return False
try:
return bool(self.redis_client.exists(f"job_seen:{job_id}"))
except Exception as e:
logger.error(f"Redis error checking job_seen: {e}")
return False
def get_cached_llm_result(self, job_url: str) -> Optional[Dict]:
if not self.redis_client:
return None
try:
cached_data = self.redis_client.get(f"llm_cache:{job_url}")
if cached_data:
return json.loads(cached_data)
return None
except Exception as e:
logger.error(f"Redis error getting LLM cache: {e}")
return None
def mark_job_seen(self, job_id: str):
if not self.redis_client:
return
try:
# Set with TTL of 1 month
self.redis_client.setex(f"job_seen:{job_id}", JOB_SEEN_TTL, "1")
except Exception as e:
logger.error(f"Redis error marking job_seen: {e}")
def cache_llm_result(self, job_url: str, result: Dict):
if not self.redis_client:
return
try:
self.redis_client.set(f"llm_cache:{job_url}", json.dumps(result)) # No TTL for LLM cache
except Exception as e:
logger.error(f"Redis error caching LLM result: {e}")
def add_job_to_error_cache(self, job_url: str, job_id: str, error_type: str):
if not self.redis_client:
return
try:
error_data = {
"job_url": job_url,
"job_id": job_id,
"error_type": error_type,
"timestamp": datetime.now().isoformat()
}
self.redis_client.set(f"error_cache:{job_id}", json.dumps(error_data)) # No TTL for error cache
except Exception as e:
logger.error(f"Redis error adding to error cache: {e}")
def remove_job_from_error_cache(self, job_id: str):
"""Remove job from error cache when successfully processed later."""
if not self.redis_client:
return
try:
deleted = self.redis_client.delete(f"error_cache:{job_id}")
if deleted:
logger.info(f"✅ Removed job {job_id} from error cache after successful processing")
except Exception as e:
logger.error(f"Redis error removing from error cache: {e}")
def add_job_to_sent_cache(self, job_id: str):
"""Mark job as sent for processing."""
if not self.redis_client:
return
try:
self.redis_client.set(f"sent_job:{job_id}", "1") # No TTL
except Exception as e:
logger.error(f"Redis error adding to sent cache: {e}")
def remove_job_from_sent_cache(self, job_id: str):
"""Remove job from sent cache upon successful or failed completion."""
if not self.redis_client:
return
try:
deleted = self.redis_client.delete(f"sent_job:{job_id}")
if deleted:
logger.debug(f"🧹 Removed job {job_id} from sent cache")
except Exception as e:
logger.error(f"Redis error removing from sent cache: {e}")
def is_job_in_sent_cache(self, job_id: str) -> bool:
"""Check if job is already in sent cache."""
if not self.redis_client:
return False
try:
return bool(self.redis_client.exists(f"sent_job:{job_id}"))
except Exception as e:
logger.error(f"Redis error checking sent cache: {e}")
return False
class MultiPlatformJobScraper:
def __init__(
self,
engine: FingerprintScrapingEngine,
user_request: str = "Extract all standard job details"
):
self.engine = engine
self.user_request = user_request
self.llm_agent = LLMJobRefiner()
self.browser = None
self.pw = None
self.redis_manager = RedisManager()
async def init_browser(self):
if self.browser is not None:
try:
await self.browser.new_page()
await self.close_browser()
except:
await self.close_browser()
if self.browser is None:
try:
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
self.pw = await async_playwright().start()
self.browser = await self.pw.chromium.launch(
headless=True,
args=[
'--disable-blink-features=AutomationControlled',
'--no-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu'
]
)
logger.info("✅ Browser launched (will reuse for all jobs)")
except Exception as e:
logger.error(f"💥 Failed to launch browser: {e}")
raise
async def create_fresh_context(self):
if self.browser is None:
await self.init_browser()
try:
await self.browser.new_page()
except Exception:
logger.warning("Browser appears dead. Reinitializing...")
await self.close_browser()
await self.init_browser()
profile = self.engine._select_profile()
context = await AsyncNewContext(self.browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
spoof_script = self.engine._get_spoof_script(
random.choice(self.engine.common_renderers[self.engine.os]),
random.choice(self.engine.common_vendors)
)
await context.add_init_script(spoof_script)
return context
async def close_browser(self):
if self.browser:
try:
await self.browser.close()
except:
pass
self.browser = None
if self.pw:
try:
await self.pw.stop()
except:
pass
self.pw = None
async def _safe_inner_text(self, element):
if not element:
return "Unknown"
try:
return await element.text_content()
except:
return "Unknown"
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
speed = self.engine.optimization_params.get("base_delay", 2.0) / 2
await asyncio.sleep(random.uniform(0.3, 0.8) * (speed / 2))
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * (speed / 2))
return True
except:
return False
async def _extract_page_content_for_llm(self, page) -> str:
speed = self.engine.optimization_params.get("base_delay", 2.0)
await asyncio.sleep(2 * (speed / 2))
if "lever.co" not in page.url:
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * (speed / 2))
return await page.content()
async def _is_job_seen(self, job_id: str) -> bool:
return self.redis_manager.is_job_seen(job_id)
async def _mark_job_seen(self, job_id: str):
self.redis_manager.mark_job_seen(job_id)
async def _get_cached_llm_result(self, job_url: str) -> Optional[Dict]:
return self.redis_manager.get_cached_llm_result(job_url)
async def _cache_llm_result(self, job_url: str, result: Dict):
self.redis_manager.cache_llm_result(job_url, result)
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
logger.info(f" 📦 Adding failed job to Redis cache: {job_id} (Error: {error_type})")
self.redis_manager.add_job_to_error_cache(job_url, job_id, error_type)
async def _remove_job_from_error_cache(self, job_id: str):
"""Remove job from error cache when successfully processed."""
self.redis_manager.remove_job_from_error_cache(job_id)
async def _add_job_to_sent_cache(self, job_id: str):
"""Add job to sent cache when processing begins."""
self.redis_manager.add_job_to_sent_cache(job_id)
async def _remove_job_from_sent_cache(self, job_id: str):
"""Remove job from sent cache when processing completes."""
self.redis_manager.remove_job_from_sent_cache(job_id)
def _get_platform(self, url: str) -> str:
if "ashbyhq.com" in url:
return "ashby"
elif "lever.co" in url:
return "lever"
elif "greenhouse.io" in url:
return "greenhouse"
else:
return "unknown"
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def scrape_job(
self,
job_url: str,
company_name: str,
message_id: str
):
platform = self._get_platform(job_url)
if platform == "unknown":
logger.info(f"⏭️ Skipping unsupported platform: {job_url}")
return True
job_id = job_url.strip("/").split("/")[-1]
# Add job to sent cache at the beginning of processing
await self._add_job_to_sent_cache(job_id)
if await self._is_job_seen(job_id):
logger.info(f"⏭️ Skipping already processed job: {job_id}")
await self._remove_job_from_sent_cache(job_id)
return True
cached_result = await self._get_cached_llm_result(job_url)
if cached_result:
logger.info(f"📦 Using cached LLM result for: {job_url}")
await self.llm_agent.save_job_data(cached_result, company_name)
await self._mark_job_seen(job_id)
await self._remove_job_from_sent_cache(job_id)
return True
context = None
page = None
start_time = time.time()
try:
context = await self.create_fresh_context()
page = await context.new_page()
timeout_ms = self.engine.optimization_params.get("request_timeout", 120000)
temp_fetcher = StealthyFetcher(self.engine, self.browser, context)
fetch_timeout = 60000 if platform == "lever" else timeout_ms
job_page = await asyncio.wait_for(
temp_fetcher.fetch_url(job_url, wait_for_selector="h1", timeout=fetch_timeout),
timeout=fetch_timeout / 1000.0
)
# Check if job still exists (minimal content validation)
page_content = await job_page.content()
if len(page_content.strip()) < 500: # Arbitrary threshold for "page exists"
logger.error(f"❌ Job no longer exists (empty/deleted): {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "job_not_found")
self.engine.report_outcome("job_not_found", url=job_url)
await self._remove_job_from_sent_cache(job_id)
return False
if platform == "ashby":
try:
await job_page.wait_for_selector("div[class*='job-posting'], article, main", timeout=60000)
except Exception:
logger.warning(f"⚠️ Ashby page didn't load properly: {job_url}")
await self._remove_job_from_sent_cache(job_id)
return False
elif platform == "lever":
pass
elif platform == "greenhouse":
try:
await job_page.wait_for_selector("div.job-desc, section", timeout=60000)
except Exception:
pass
# Extract page content for initial validation
page_content = await self._extract_page_content_for_llm(job_page)
# Check for job expiration or unavailability indicators
page_text_lower = page_content.lower()
job_unavailable_indicators = [
"job no longer available",
"position has been filled",
"this job has expired",
"job posting has expired",
"no longer accepting applications",
"position is closed",
"job is no longer active",
"this position is no longer open"
]
if any(indicator in page_text_lower for indicator in job_unavailable_indicators):
logger.error(f"❌ Job no longer available/expired: {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "job_not_found")
self.engine.report_outcome("job_not_found", url=job_url)
await self._remove_job_from_sent_cache(job_id)
return False
# 🔑 APPLY TYPE LOGIC
if platform in ["ashby", "lever", "greenhouse"]:
apply_type = 'AI' # Always AI for these platforms
else:
# For other platforms: check if form is accessible without login
apply_btn = await job_page.query_selector("button:has-text('Apply for this job'), button:has-text('Apply now')")
apply_type = 'signup' # default
if apply_btn:
await self._human_click(job_page, apply_btn)
speed = self.engine.optimization_params.get("base_delay", 2.0)
await asyncio.sleep(2 * (speed / 2))
form = await job_page.query_selector("form, div[class*='application-form']")
if form:
# Check for login prompts in form
login_indicators = await job_page.query_selector("input[type='email'], input[type='password'], text='sign in', text='log in'")
if not login_indicators:
apply_type = 'AI'
else:
apply_type = 'signup'
else:
apply_type = 'signup'
final_url = job_url
# Hardcode posted_date to Dec 1st 2025
posted_date = "12/01/25"
raw_data = {
"page_content": page_content,
"url": final_url,
"job_id": job_id,
"search_keywords": company_name,
"posted_date": posted_date
}
llm_timeout = max(60, self.engine.feedback.get("avg_response_time", 10) * 2)
refined_data = await asyncio.wait_for(
self.llm_agent.refine_job_data(raw_data, self.user_request),
timeout=llm_timeout
)
success = False
if refined_data:
# Define compulsory fields that must be present and valid
compulsory_fields = ['title', 'company_name', 'description', 'job_id', 'url']
# Check if ALL compulsory fields are present and valid BEFORE any processing
missing_fields = []
for field in compulsory_fields:
field_value = refined_data.get(field, "")
if not field_value or str(field_value).strip() in ["", "N/A", "Unknown", "Not provided", "Not available", "Company", "Job"]:
missing_fields.append(field)
# If any compulsory field is missing, discard the job immediately
if missing_fields:
logger.error(f"❌ Job discarded - missing compulsory fields: {', '.join(missing_fields)} : {final_url}")
error_type = "missing_compulsory_fields"
await self._add_job_to_redis_cache(final_url, job_id, error_type)
self.engine.report_outcome(error_type, url=final_url)
await self._remove_job_from_sent_cache(job_id)
return False
# If we get here, all compulsory fields are valid - now add additional metadata
refined_data.update({
'apply_type': apply_type,
'scraped_at': datetime.now().isoformat(),
'category': company_name,
'posted_date': posted_date,
'message_id': message_id,
'platform': platform
})
# Save to database and markdown
await self.llm_agent.save_job_data(refined_data, company_name)
await self._cache_llm_result(job_url, refined_data)
await self._mark_job_seen(job_id)
# Remove from error cache if it was previously failed
await self._remove_job_from_error_cache(job_id)
response_time = time.time() - start_time
self.engine.report_outcome("success", url=final_url, response_time=response_time)
logger.info(f"✅ Scraped ({platform}): {refined_data['title'][:50]}... (Apply Type: {apply_type})")
success = True
await self._remove_job_from_sent_cache(job_id)
else:
logger.warning(f"🟡 LLM failed to refine: {final_url}")
await self._add_job_to_redis_cache(final_url, job_id, "llm_failure")
self.engine.report_outcome("llm_failure", url=final_url)
await self._remove_job_from_sent_cache(job_id)
return success
except asyncio.TimeoutError:
logger.error(f"⏰ Timeout processing job ({platform}): {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "timeout")
self.engine.report_outcome("timeout", url=job_url)
await self._remove_job_from_sent_cache(job_id)
return False
except Exception as e:
error_msg = str(e)
if "NoneType" in error_msg or "disconnected" in error_msg or "Browser" in error_msg:
logger.warning("Browser connection lost. Forcing reinitialization.")
await self.close_browser()
# 🔍 Distinguish job-not-found vs other errors
if "page.goto: net::ERR_ABORTED" in error_msg or "page.goto: net::ERR_FAILED" in error_msg:
logger.error(f"❌ Job no longer exists (404/network error): {job_url}")
await self._add_job_to_redis_cache(job_url, job_id, "job_not_found")
self.engine.report_outcome("job_not_found", url=job_url)
else:
# Categorize other errors
error_type = "exception"
if "timeout" in error_msg.lower():
error_type = "timeout"
elif "llm" in error_msg.lower() or "refine" in error_msg.lower():
error_type = "llm_failure"
else:
error_type = "scraping_error"
logger.error(f"💥 Error processing job ({platform}) {job_url}: {error_msg}")
await self._add_job_to_redis_cache(job_url, job_id, error_type)
self.engine.report_outcome(error_type, url=job_url)
await self._remove_job_from_sent_cache(job_id)
return False
finally:
if context:
try:
await context.close()
except Exception:
pass
# Global metrics
METRICS = {
"processed": 0,
"success": 0,
"failed": 0,
"skipped": 0,
"start_time": time.time()
}
async def process_message_async(scraper: MultiPlatformJobScraper, ch, method, properties, body):
try:
job_data = json.loads(body)
job_link = job_data['job_link']
company_name = job_data['company_name']
message_id = properties.message_id or f"msg_{int(time.time()*1000)}"
logger.info(f"📥 Processing job: {job_link} (ID: {message_id})")
success = await scraper.scrape_job(job_link, company_name, message_id)
METRICS["processed"] += 1
if success:
METRICS["success"] += 1
else:
METRICS["failed"] += 1
except json.JSONDecodeError:
logger.error("❌ Invalid JSON in message")
METRICS["failed"] += 1
except Exception as e:
logger.error(f"💥 Unexpected error: {str(e)}")
METRICS["failed"] += 1
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
def callback_wrapper(scraper: MultiPlatformJobScraper):
def callback(ch, method, properties, body):
asyncio.run(process_message_async(scraper, ch, method, properties, body))
return callback
def start_consumer():
engine = FingerprintScrapingEngine(
# Other env vars...
seed=os.getenv("SEED_NAME", "multiplatform_scraper"),
target_os="windows",
num_variations=10
)
scraper = MultiPlatformJobScraper(engine)
connection = None
for attempt in range(5):
try:
parameters = create_ssl_connection_parameters()
if RABBITMQ_SSL_ENABLED:
logger.info(f"Connecting to RabbitMQ over SSL at {RABBITMQ_HOST}:{RABBITMQ_PORT}")
else:
logger.info(f"Connecting to RabbitMQ at {RABBITMQ_HOST}:{RABBITMQ_PORT}")
connection = pika.BlockingConnection(parameters)
break
except Exception as e:
logger.error(f"RabbitMQ connection attempt {attempt + 1} failed: {e}")
time.sleep(2 ** attempt)
if not connection:
logger.error("Failed to connect to RabbitMQ after retries")
return
channel = connection.channel()
channel.queue_declare(queue='job_queue', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='job_queue', on_message_callback=callback_wrapper(scraper))
logger.info('Waiting for messages (Ashby, Lever, Greenhouse). To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
logger.info("Shutting down...")
channel.stop_consuming()
connection.close()
asyncio.run(scraper.close_browser())
if __name__ == "__main__":
start_consumer()

View File

@ -6,10 +6,12 @@ import hashlib
import random
import os
import json
from typing import List, Optional, Dict
from playwright.async_api import Page
from typing import List, Optional, Dict, Any
from browserforge.fingerprints import FingerprintGenerator
from dotenv import load_dotenv
from config import load_spoof_config
import time
# Load environment variables
load_dotenv()
@ -24,8 +26,7 @@ class FingerprintScrapingEngine:
db_path: str = "jobs.db",
markdown_path: str = "scraped_jobs.md",
proxies: List[str] = None,
login_credentials: Optional[Dict[str, str]] = None,
search_keywords: Optional[str] = None
login_credentials: Optional[Dict[str, str]] = None
):
if target_os not in ['windows', 'macos']:
raise ValueError("operating_system must be 'windows' or 'macos'")
@ -42,7 +43,6 @@ class FingerprintScrapingEngine:
self.markdown_path = markdown_path
self.proxies = proxies or []
self.login_credentials = login_credentials
self.search_keywords = search_keywords
self.fingerprint_generator = FingerprintGenerator(
browser=('chrome',),
os=(self.os,)
@ -55,16 +55,28 @@ class FingerprintScrapingEngine:
self.common_renderers = spoof_config["renderers"]
self.common_vendors = spoof_config["vendors"]
# Feedback system
self.feedback_file = f"feedback_{seed}.json"
# Feedback system
self.feedback = self._load_feedback()
# ← NEW: Session persistence paths
self.session_dir = "browser_sessions"
os.makedirs(self.session_dir, exist_ok=True)
self.session_path = os.path.join(self.session_dir, f"{seed}_session.json")
self.session_path = os.path.join(
self.session_dir, f"{seed}_session.json")
def _load_feedback(self):
self.optimization_params = {
"base_delay": 2.0,
"max_concurrent_requests": 4,
"request_timeout": 120000,
"retry_attempts": 3,
"captcha_handling_strategy": "avoid", # or "solve_fallback"
"cloudflare_wait_strategy": "smart_wait", # or "aggressive_reload"
}
self._update_params_from_feedback()
def _load_feedback(self) -> Dict[str, Any]:
if os.path.exists(self.feedback_file):
try:
with open(self.feedback_file, "r") as f:
@ -72,6 +84,8 @@ class FingerprintScrapingEngine:
data.setdefault("success_rate", 1.0)
data.setdefault("captcha_count", 0)
data.setdefault("cloudflare_count", 0)
data.setdefault("avg_response_time", 10.0) # New metric
data.setdefault("failed_domains", {}) # New metrice
return data
except:
pass
@ -81,16 +95,69 @@ class FingerprintScrapingEngine:
with open(self.feedback_file, "w") as f:
json.dump(self.feedback, f)
def report_outcome(self, outcome: str):
def report_outcome(self, outcome: str, url: Optional[str] = None, response_time: Optional[float] = None):
if outcome == "success":
self.feedback["success_rate"] = min(1.0, self.feedback["success_rate"] + 0.1)
self.feedback["success_rate"] = min(
1.0, self.feedback["success_rate"] + 0.05) # Smaller increment
else:
self.feedback["success_rate"] = max(0.1, self.feedback["success_rate"] - 0.2)
self.feedback["success_rate"] = max(
0.05, self.feedback["success_rate"] - 0.1) # Smaller decrement
if outcome == "captcha":
self.feedback["captcha_count"] += 1
# Adapt strategy if many captchas
self.optimization_params["captcha_handling_strategy"] = "solve_fallback"
elif outcome == "cloudflare":
self.feedback["cloudflare_count"] += 1
# Adjust wait strategy based on frequency
if self.feedback["cloudflare_count"] > 5:
self.optimization_params["cloudflare_wait_strategy"] = "aggressive_reload"
# Track domain-specific failures
if url and outcome != "success":
domain = url.split("//")[1].split("/")[0]
if domain not in self.feedback["failed_domains"]:
self.feedback["failed_domains"][domain] = 0
self.feedback["failed_domains"][domain] += 1
# Update average response time
if response_time:
prev_avg = self.feedback.get("avg_response_time", 10.0)
# Simple moving average
self.feedback["avg_response_time"] = (
prev_avg * 0.9) + (response_time * 0.1)
self.save_feedback()
self._update_params_from_feedback() # Update params based on new feedback
def _update_params_from_feedback(self):
"""Adjust optimization parameters based on feedback."""
sr = self.feedback["success_rate"]
cc = self.feedback["captcha_count"]
cf = self.feedback["cloudflare_count"]
avg_rt = self.feedback.get("avg_response_time", 10.0)
# Adjust base delay based on success rate and avg response time
if sr < 0.6:
self.optimization_params["base_delay"] = max(
5.0, self.optimization_params["base_delay"] * 1.2)
elif sr > 0.8:
self.optimization_params["base_delay"] = min(
3.0, self.optimization_params["base_delay"] * 0.9)
# Reduce concurrency if many captchas/cloudflares
if cc > 3 or cf > 3:
self.optimization_params["max_concurrent_requests"] = max(
2, self.optimization_params["max_concurrent_requests"] - 2)
else:
# Reset to default
self.optimization_params["max_concurrent_requests"] = 4
# Increase timeout if avg response time is high
if avg_rt > 20:
self.optimization_params["request_timeout"] = 150000 # 90 seconds
print(f"Optimization Params Updated: {self.optimization_params}")
# ← NEW: Save browser context (cookies + localStorage)
async def save_session(self, context):
@ -131,7 +198,8 @@ class FingerprintScrapingEngine:
if self.feedback["success_rate"] < 0.5:
concurrency_options = [8, 4]
memory_options = [8]
profile.navigator.hardwareConcurrency = random.choice(concurrency_options)
profile.navigator.hardwareConcurrency = random.choice(
concurrency_options)
profile.navigator.deviceMemory = random.choice(memory_options)
return profile
@ -247,23 +315,6 @@ class FingerprintScrapingEngine:
except:
pass
async def _detect_cloudflare(self, page) -> bool:
content = await page.content()
return (
"#cf-chl" in content or
"checking your browser" in content.lower() or
"just a moment" in content.lower()
)
async def _handle_cloudflare(self, page, max_retries: int = 3):
for i in range(max_retries):
if not await self._detect_cloudflare(page):
return True
print(f"☁️ Cloudflare detected - waiting... (attempt {i+1})")
await asyncio.sleep(8 + random.uniform(2, 5))
await page.wait_for_load_state("load", timeout=60000)
return False
async def _avoid_captcha(self, page) -> bool:
await asyncio.sleep(2 + random.random() * 3)
await self._human_like_scroll(page)
@ -285,3 +336,42 @@ class FingerprintScrapingEngine:
return True
return False
async def _detect_cloudflare(self, page: Page) -> bool:
"""Detect Cloudflare challenges."""
content = await page.content()
return (
"#cf-chl" in content
or "checking your browser" in content.lower()
or "just a moment" in content.lower()
or "turnstile" in content.lower() # Check for Cloudflare Turnstile
)
async def _handle_cloudflare(self, page: Page) -> bool:
"""
Handle Cloudflare challenges, including Turnstile if present.
This is a simplified approach; real-world handling might require more sophisticated logic or external solvers.
"""
max_wait_time = 60 # Total time to wait for Cloudflare to resolve
start_time = time.time()
while time.time() - start_time < max_wait_time:
if not await self._detect_cloudflare(page):
print("Cloudflare challenge resolved.")
return True
print("Cloudflare active, waiting...")
# Simulate more human-like behavior while waiting
await self._simulate_human_interaction(page)
# Wait for a random period, increasing slightly each time
wait_time = min(10, 2 + random.uniform(1, 3) +
(time.time() - start_time) * 0.1)
await asyncio.sleep(wait_time)
# Reload occasionally to trigger potential client-side checks
if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2:
print("Reloading page during Cloudflare wait...")
await page.reload(wait_until='domcontentloaded', timeout=120000)
print("Timeout waiting for Cloudflare resolution.")
return False

376
sender.py Normal file
View File

@ -0,0 +1,376 @@
import csv
import json
import logging
import os
import sys
import time
import signal
import uuid
from configparser import ConfigParser
import pika
import redis
import ssl
from dotenv import load_dotenv
from datetime import datetime
load_dotenv()
class RedisManager:
"""Manages Redis connection and operations for job deduplication."""
def __init__(self):
self.redis_host = os.getenv('REDIS_HOST')
self.redis_port = int(os.getenv('REDIS_PORT', '6380'))
self.redis_password = os.getenv('REDIS_PASSWORD')
self.redis_ssl_enabled = os.getenv('REDIS_SSL_ENABLED', 'true').lower() == 'true'
self.redis_client = None
self._connect()
def _connect(self):
if not self.redis_password:
print("Warning: REDIS_PASSWORD not found in environment.")
try:
self.redis_client = redis.Redis(
host=self.redis_host,
port=self.redis_port,
password=self.redis_password,
ssl=self.redis_ssl_enabled,
ssl_cert_reqs=None,
socket_connect_timeout=10,
socket_timeout=30,
decode_responses=True
)
response = self.redis_client.ping()
print(f"Connected to Redis at {self.redis_host}:{self.redis_port}! Response: {response}")
except Exception as e:
print(f"Failed to connect to Redis: {e}")
self.redis_client = None
def is_job_seen(self, job_url):
if not self.redis_client:
return False
try:
return bool(self.redis_client.exists(f"sent_job:{job_url}"))
except Exception:
return False
def mark_job_sent(self, job_url):
if not self.redis_client:
return
try:
self.redis_client.setex(f"sent_job:{job_url}", 7 * 24 * 3600, "1")
except Exception:
pass
# NEW: Track total sent jobs for jobs.csv
def get_jobs_csv_sent_count(self):
if not self.redis_client:
return 0
try:
count = self.redis_client.get("jobs_csv_sent_count")
return int(count) if count else 0
except Exception:
return 0
def increment_jobs_csv_sent_count(self):
if not self.redis_client:
return
try:
self.redis_client.incr("jobs_csv_sent_count")
# Set 30-day expiry to avoid stale data
self.redis_client.expire("jobs_csv_sent_count", 2592000)
except Exception:
pass
class Sender:
def __init__(self, config_file='config.ini'):
self.config = ConfigParser()
self.config.read(config_file)
self.rabbitmq_host = os.getenv("RABBITMQ_HOST")
self.rabbitmq_port = int(os.getenv("RABBITMQ_PORT") or 5672)
self.username = os.getenv("RABBITMQ_USER")
self.password = os.getenv("RABBITMQ_PASS")
self.queue_name = self.config.get('rabbitmq', 'queue_name', fallback='job_queue')
self.directory = self.config.get('files', 'directory', fallback=os.path.join(os.path.expanduser("~"), "jobs", "csv"))
default_log_dir = os.path.join(os.path.expanduser("~"), ".web_scraping_project", "logs")
os.makedirs(default_log_dir, exist_ok=True)
default_log_file = os.path.join(default_log_dir, "sender.log")
self.log_file = self.config.get('logging', 'log_file', fallback=default_log_file)
self.virtual_host = self.config.get('rabbitmq', 'virtual_hash', fallback='/')
self.batch_size = 500
self.retry_attempts = 5
self.retry_sleep = 2
self.check_interval = 30
self.use_ssl = os.getenv("RABBITMQ_SSL_ENABLED", "false").lower() == "true"
if self.rabbitmq_port is None:
self.rabbitmq_port = "5671" if self.use_ssl else "5672"
else:
self.rabbitmq_port = int(self.rabbitmq_port)
self.redis_manager = RedisManager()
log_dir = os.path.dirname(self.log_file)
os.makedirs(log_dir, exist_ok=True)
self.logger = logging.getLogger('sender')
self.logger.setLevel(logging.INFO)
self.logger.handlers.clear()
file_handler = logging.FileHandler(self.log_file, encoding='utf-8')
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
self.logger.addHandler(file_handler)
console_handler = logging.StreamHandler(sys.stdout)
console_formatter = logging.Formatter('%(levelname)s: %(message)s')
console_handler.setFormatter(console_formatter)
self.logger.addHandler(console_handler)
self.connection = None
self.channel = None
self.running = True
signal.signal(signal.SIGTERM, self.graceful_shutdown)
signal.signal(signal.SIGINT, self.graceful_shutdown)
def _create_ssl_options(self):
if not self.use_ssl:
return None
context = ssl.create_default_context()
verify_ssl = os.getenv("RABBITMQ_SSL_VERIFY", "false").lower() == "true"
if verify_ssl:
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
else:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return pika.SSLOptions(context, self.rabbitmq_host)
def connect(self):
try:
if not self.rabbitmq_host:
self.logger.error("RABBITMQ_HOST environment variable is not set")
return False
if not self.username:
self.logger.error("RABBITMQ_USER environment variable is not set")
return False
if not self.password:
self.logger.error("RABBITMQ_PASS environment variable is not set")
return False
self.logger.info(f"Attempting to connect with host={self.rabbitmq_host}, port={self.rabbitmq_port}, user={self.username}")
credentials = pika.PlainCredentials(self.username, self.password)
params = {
'host': self.rabbitmq_host,
'port': self.rabbitmq_port,
'virtual_host': self.virtual_host,
'credentials': credentials,
'heartbeat': 600,
'blocked_connection_timeout': 300
}
if self.use_ssl:
params['ssl_options'] = self._create_ssl_options()
self.logger.info(f"Connecting to RabbitMQ over SSL at {self.rabbitmq_host}:{self.rabbitmq_port}")
else:
self.logger.info(f"Connecting to RabbitMQ at {self.rabbitmq_host}:{self.rabbitmq_port}")
parameters = pika.ConnectionParameters(**params)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue_name, durable=True)
self.logger.info("Connected to RabbitMQ successfully")
return True
except Exception as e:
self.logger.error(f"Failed to connect to RabbitMQ: {str(e)}")
return False
def reconnect(self):
if self.connection and self.connection.is_open:
self.connection.close()
time.sleep(self.retry_sleep)
return self.connect()
def send_message(self, message, message_id):
for attempt in range(self.retry_attempts):
try:
self.channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
message_id=message_id
)
)
return True
except Exception as e:
self.logger.error(f"Failed to send message (attempt {attempt+1}): {str(e)}")
if attempt < self.retry_attempts - 1:
time.sleep(self.retry_sleep * (2 ** attempt))
if not self.reconnect():
return False
return False
def is_job_seen(self, job_url, filename):
"""Custom dedup logic: disable for jobs.csv until 6000 sent"""
if filename == "jobs.csv":
sent_count = self.redis_manager.get_jobs_csv_sent_count()
if sent_count < 6000:
return False # Always resend
return self.redis_manager.is_job_seen(job_url)
def mark_job_sent(self, job_url, filename):
self.redis_manager.mark_job_sent(job_url)
if filename == "jobs.csv":
self.redis_manager.increment_jobs_csv_sent_count()
def process_csv(self, file_path):
filename = os.path.basename(file_path)
try:
with open(file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
sent_count = 0
skipped_count = 0
self.logger.info(f"CSV headers found: {reader.fieldnames}")
for row_num, row in enumerate(reader, start=1):
if not self.running:
self.logger.info("Shutdown requested during CSV processing. Exiting...")
return sent_count
if 'url' not in row or 'company' not in row:
self.logger.warning(f"Skipping row {row_num}: missing 'url' or 'company' field. Row: {row}")
skipped_count += 1
continue
url = row['url'].strip()
company = row['company'].strip()
if not url:
self.logger.warning(f"Skipping row {row_num}: empty URL. Company: '{company}'")
skipped_count += 1
continue
if not company:
self.logger.warning(f"Skipping row {row_num}: empty company. URL: {url}")
skipped_count += 1
continue
if not url.startswith(('http://', 'https://')):
self.logger.warning(f"Skipping row {row_num}: invalid URL format. URL: {url}")
skipped_count += 1
continue
# ✅ Modified: Pass filename to is_job_seen
if self.is_job_seen(url, filename):
self.logger.info(f"Skipping row {row_num}: job already sent (deduplicated). URL: {url}")
skipped_count += 1
continue
job_data = {'job_link': url, 'company_name': company}
message_id = str(uuid.uuid4())
message = json.dumps(job_data)
if self.send_message(message, message_id):
sent_count += 1
# ✅ Modified: Pass filename to mark_job_sent
self.mark_job_sent(url, filename)
else:
self.logger.error(f"Failed to send job (row {row_num}): {url}")
skipped_count += 1
if (sent_count + skipped_count) % 100 == 0:
current_total = self.redis_manager.get_jobs_csv_sent_count() if filename == "jobs.csv" else "N/A"
self.logger.info(f"Progress: {sent_count} sent, {skipped_count} skipped from {file_path} (jobs.csv total: {current_total})")
self.logger.info(f"Completed {file_path}: {sent_count} sent, {skipped_count} skipped")
try:
os.rename(file_path, file_path + '.processed')
self.logger.info(f"Processed and renamed {file_path} to {file_path}.processed")
except Exception as rename_error:
self.logger.error(f"Failed to rename {file_path}: {str(rename_error)}")
marker_file = file_path + '.processed_marker'
with open(marker_file, 'w') as f:
f.write(f"Processed at {datetime.now().isoformat()}")
self.logger.info(f"Created marker file: {marker_file}")
return sent_count
except Exception as e:
self.logger.error(f"Error processing {file_path}: {str(e)}")
return 0
def find_new_csvs(self):
if not self.running:
return []
if not os.path.exists(self.directory):
return []
files = [f for f in os.listdir(self.directory) if f.endswith('.csv') and not f.endswith('.processed')]
files.sort()
return [os.path.join(self.directory, f) for f in files]
def run(self):
if not self.connect():
self.logger.error("RabbitMQ connection failed, exiting")
sys.exit(1)
try:
while self.running:
new_files = self.find_new_csvs()
if new_files:
for file_path in new_files:
if not self.running:
break
self.logger.info(f"Processing {file_path}")
sent = self.process_csv(file_path)
self.logger.info(f"Sent {sent} jobs from {file_path}")
else:
self.logger.info("No new CSV files found")
for _ in range(self.check_interval):
if not self.running:
break
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
if self.connection and self.connection.is_open:
self.logger.info("Closing RabbitMQ connection...")
self.connection.close()
def graceful_shutdown(self, signum, frame):
self.logger.info("Received shutdown signal. Initiating graceful shutdown...")
self.running = False
if __name__ == '__main__':
required_vars = ['RABBITMQ_HOST', 'RABBITMQ_PORT', 'RABBITMQ_USER', 'RABBITMQ_PASS']
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
print(f"Missing environment variables: {missing_vars}")
print("Check your .env file and ensure load_dotenv() is working")
sys.exit(1)
sender = Sender()
print(f"Using directory: {sender.directory}")
print(f"Directory exists: {os.path.exists(sender.directory)}")
if os.path.exists(sender.directory):
print(f"Files: {os.listdir(sender.directory)}")
try:
sender.run()
except KeyboardInterrupt:
sender.logger.info("KeyboardInterrupt caught in main. Exiting.")
sys.exit(0)

80
ssl_connection.py Normal file
View File

@ -0,0 +1,80 @@
import pika
import ssl
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def create_ssl_connection_parameters():
"""
Create and return RabbitMQ connection parameters with SSL configuration.
This function handles both SSL and non-SSL connections based on environment variables.
"""
# Load environment variables with fallbacks
rabbitmq_host = os.getenv('RABBITMQ_HOST')
rabbitmq_port = int(os.getenv('RABBITMQ_PORT', '5671'))
rabbitmq_user = os.getenv('RABBITMQ_USER')
rabbitmq_pass = os.getenv('RABBITMQ_PASS', 'ofure-scrape')
rabbitmq_ssl_enabled = os.getenv('RABBITMQ_SSL_ENABLED', 'true').lower() == 'true'
rabbitmq_ssl_verify = os.getenv('RABBITMQ_SSL_VERIFY', 'false').lower() == 'true'
# Validate credentials
if not rabbitmq_pass or rabbitmq_pass == 'YOUR_STRONG_PASSWORD':
print("Warning: Using placeholder or empty password. Please check .env file.")
credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_pass)
if rabbitmq_ssl_enabled:
# SSL Context
context = ssl.create_default_context()
context.check_hostname = rabbitmq_ssl_verify
context.verify_mode = ssl.CERT_REQUIRED if rabbitmq_ssl_verify else ssl.CERT_NONE
ssl_options = pika.SSLOptions(context, rabbitmq_host)
params = pika.ConnectionParameters(
host=rabbitmq_host,
port=rabbitmq_port,
credentials=credentials,
ssl_options=ssl_options,
heartbeat=600,
blocked_connection_timeout=300,
virtual_host='/'
)
else:
# Non-SSL connection
params = pika.ConnectionParameters(
host=rabbitmq_host,
port=rabbitmq_port if rabbitmq_port != 5671 else 5672, # Default non-SSL port
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300,
virtual_host='/'
)
return params
def test_connection():
"""
Test function to verify RabbitMQ connection (original functionality preserved).
"""
try:
params = create_ssl_connection_parameters()
connection = pika.BlockingConnection(params)
channel = connection.channel()
print("Connected to Secure RabbitMQ!")
connection.close()
return True
except Exception as e:
import traceback
print(f"Failed to connect: {e!r}")
traceback.print_exc()
return False
# Keep the original test functionality when run directly
if __name__ == "__main__":
test_connection()

31
trim.py Normal file
View File

@ -0,0 +1,31 @@
# Keep cycling through all job titles
while True:
# Shuffle job titles to randomize order
random.shuffle(job_titles)
for job_title in job_titles:
search_keywords = f"{job_title} location:{fixed_location}"
print(f"\n{'='*60}")
print(f"Starting scrape for: {search_keywords}")
print(f"{'='*60}")
await scraper.scrape_jobs(
search_keywords=search_keywords,
credentials={
"email": os.getenv("SCRAPING_USERNAME"),
"password": os.getenv("SCRAPING_PASSWORD")
}
)
print(f"\n✅ Completed scraping for: {job_title}")
print(f"⏳ Waiting 2 minutes before next job title...")
# Wait 2 minutes before next job title
time.sleep(120)
print(f"\n✅ Completed full cycle of all job titles")
print(f"🔄 Starting new cycle...")
if _name_ == "_main_":
asyncio.run(main())