Compare commits

...

23 Commits

Author SHA1 Message Date
2d22fbdb92 Enhance AmazonJobScraper to support flexible location matching and extract posted dates; refine LLMJobRefiner prompts for better data extraction. 2025-12-09 12:00:57 +01:00
e216db35f9 Increase max pages to scrape and extend wait time between job title scrapes; add posted date to job data extraction 2025-12-09 09:30:44 +01:00
cbcffa8cd4 modify to queue failed jobs and also extract date of job posting 2025-12-09 09:12:35 +01:00
4782f174e2 Delete browser_sessions/job_scraping_12_session.json 2025-12-05 17:49:56 +00:00
10fa1ac633 Delete browser_sessions/job_scraping_123_session.json 2025-12-05 17:49:46 +00:00
ba783112f5 Delete spoof_config.json 2025-12-05 17:49:30 +00:00
9ed5641540 Delete tr.py 2025-12-05 16:50:52 +00:00
370fce0514 Merge branch 'amazon_agent' of https://gitea.thejobhub.xyz/Ofure/Web_scraping_project into amazon_agent 2025-12-05 17:50:10 +01:00
efa47d50ae amazon specific built engine 2025-12-05 17:49:31 +01:00
e49860faae Delete linkedin_main.py 2025-12-05 16:45:12 +00:00
0942339426 Delete job_scraper2.py 2025-12-05 16:44:52 +00:00
7e80801f89 Delete job_scraper.py 2025-12-05 16:44:23 +00:00
06f9820c38 Delete feedback_job_scraping_123.json 2025-12-05 16:44:08 +00:00
fbde4d03e1 Delete feedback_job_scraping_12.json 2025-12-05 16:43:42 +00:00
d0aabc5970 Delete .env 2025-12-05 16:43:25 +00:00
672c6a0333 scraper for amazon 2025-12-05 17:25:54 +01:00
224b9c3122 llm_agent now responsible for extraction. 2025-12-05 17:23:31 +01:00
160efadbfb modifications to work with postgre and use llm to extract and refine data 2025-12-05 17:00:43 +01:00
4f78a845ae refactor(llm_agent): switch from XAI to DeepSeek API and simplify job refinement
- Replace XAI/Grok integration with DeepSeek's OpenAI-compatible API
- Remove schema generation and caching logic
- Simplify prompt structure and response parsing
- Standardize database schema and markdown output format
- Update config to use DEEPSEEK_API_KEY instead of XAI_API_KEY
- Change default search keyword in linkedin_main.py
2025-12-01 10:25:37 +01:00
d7d92ba8bb fix(job_scraper): increase timeout values for page navigation
The previous timeout values were too short for slower network conditions, causing premature timeouts during job scraping. Increased wait_for_function timeout from 30s to 80s and load_state timeout from 30s to 60s to accommodate slower page loads.
2025-11-27 12:28:21 +01:00
d025828036 feat: update LLM model and increase content size limit
refactor: update timeout values in job scraper classes

feat: add spoof config for renderers and vendors

build: update pycache files for config and modules
2025-11-24 13:47:47 +01:00
fd4e8c9c05 feat(scraper): add LLM-powered job data refinement and new scraping logic
- Implement LLMJobRefiner class for processing job data with Gemini API
- Add new job_scraper2.py with enhanced scraping capabilities
- Remove search_keywords parameter from scraping engine
- Add environment variable loading in config.py
- Update main script to use new scraper and target field
2025-11-24 12:25:50 +01:00
7dca4c9159 refactor(job_scraper): improve page loading and typing in linkedin scraper
- Change page load strategy from 'load' to 'domcontentloaded' and 'networkidle' for better performance
- Make search_keywords parameter optional to handle empty searches
- Update type imports to include List for better type hints
- Set headless mode to true by default for production use
2025-11-23 09:27:05 +01:00
15 changed files with 1536 additions and 554 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

371
amazon_job_scraper.py Normal file
View File

@ -0,0 +1,371 @@
import asyncio
import random
from typing import Optional, Dict
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
import re
from fetcher import StealthyFetcher
from datetime import datetime, timedelta
import json
import redis
class AmazonJobScraper:
def __init__(
self,
engine,
db_path: str = "amazon_jobs.db",
human_speed: float = 1.0,
user_request: str = "Extract all standard job details"
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self.user_request = user_request
self._init_db()
self.llm_agent = LLMJobRefiner()
self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Country alias map for flexible location matching
self.country_aliases = {
"united states": ["united states", "usa", "u.s.a", "u.s.", "us", "america", ", us", ", usa"],
"united kingdom": ["united kingdom", "uk", "great britain", "england", "gb", ", uk", ", gb"],
"canada": ["canada", "ca", ", ca"],
"india": ["india", "in", ", in"],
"germany": ["germany", "de", ", de"],
"france": ["france", "fr", ", fr"],
"australia": ["australia", "au", ", au"],
# Add more as needed
}
def _init_db(self):
pass
async def _safe_inner_text(self, element):
if not element:
return "Unknown"
try:
return await element.text_content()
except:
return "Unknown"
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
return True
except:
return False
async def _login(self, page, credentials: Dict) -> bool:
return True
async def _extract_page_content_for_llm(self, page) -> str:
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
return await page.content()
def _extract_keywords_and_location(self, search_keywords: str):
location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE)
location = location_match.group(1).strip() if location_match else ""
clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip()
return clean_keywords, location
def _normalize_text(self, text: str) -> str:
return re.sub(r'[^a-z0-9\s]', ' ', text.lower()).strip()
def _location_matches(self, job_location_text: str, target_location: str) -> bool:
if not target_location:
return True
target = target_location.lower().strip()
job_text = job_location_text.lower()
# Direct substring match (e.g., "Berlin" in "Berlin, Germany")
if target in job_text:
return True
# Check country aliases
for canonical, aliases in self.country_aliases.items():
if target in canonical or any(target == alias for alias in aliases if len(alias) <= 3):
return any(alias in job_text for alias in aliases)
return False
def _parse_posted_date_from_card_text(self, card_text: str) -> str:
date_match = re.search(r'Posted\s+([A-Za-z]+\s+\d{1,2},\s+\d{4})', card_text)
if date_match:
try:
dt = datetime.strptime(date_match.group(1), "%B %d, %Y")
return dt.strftime("%m/%d/%y")
except ValueError:
pass
days_match = re.search(r'Posted\s+(\d+)\s+day[s]?\s+ago', card_text, re.IGNORECASE)
if days_match:
days = int(days_match.group(1))
dt = datetime.now() - timedelta(days=days)
return dt.strftime("%m/%d/%y")
return datetime.now().strftime("%m/%d/%y")
async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links):
await asyncio.sleep(1.5 * self.human_speed)
job_cards = await page.query_selector_all("div[data-job-id]")
new_jobs = 0
clean_kw, location_kw = self._extract_keywords_and_location(search_keywords)
keyword_terms = [term.lower().strip() for term in clean_kw.split() if term.strip()]
for card in job_cards:
job_id = await card.get_attribute("data-job-id")
if not job_id or not job_id.isdigit() or job_id in seen_job_ids:
continue
link = await card.query_selector("a[href*='/jobs/']")
if not link:
continue
href = await link.get_attribute("href")
if not href or any(x in href for x in ["search?", "locations", "teams", "page=", "my.", "/account/"]):
continue
card_text = await self._safe_inner_text(card)
normalized_card = self._normalize_text(card_text)
# ✅ Check: ALL keyword terms must appear in card
keywords_match = all(term in normalized_card for term in keyword_terms) if keyword_terms else True
# ✅ Check location separately with alias support
location_match = True
if location_kw:
loc_el = await card.query_selector(".location-and-id span")
job_loc = (await self._safe_inner_text(loc_el)).strip() if loc_el else ""
location_match = self._location_matches(job_loc, location_kw)
if keywords_match and location_match:
title_span = await card.query_selector("h2.job-title span, h2 span")
title = (await self._safe_inner_text(title_span)).strip() if title_span else "Unknown"
posted_date = self._parse_posted_date_from_card_text(card_text)
seen_job_ids.add(job_id)
all_job_links.append((href, title, posted_date))
new_jobs += 1
print(f" ✅ Accepted: {title} (posted: {posted_date})")
else:
reasons = []
if not keywords_match:
reasons.append("keyword mismatch")
if not location_match:
reasons.append("location mismatch")
print(f" ⚠️ Skipping: {'; '.join(reasons)}")
return new_jobs
async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links):
current_page_num = 1
max_pages = 400
while current_page_num <= max_pages:
print(f"📄 Processing page {current_page_num}")
await asyncio.sleep(1.5 * self.human_speed)
new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {new_jobs} new job(s) (total: {len(all_job_links)})")
# Scroll to bottom to trigger lazy-loaded pagination (if any)
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(2 * self.human_speed)
# Look for ANY link containing 'page=N'
next_page_num = current_page_num + 1
next_selector = f"a[href*='page={next_page_num}']"
next_link = await page.query_selector(next_selector)
if next_link:
href = await next_link.get_attribute("href")
if href:
next_url = "https://www.amazon.jobs" + href if href.startswith("/") else href
print(f" ➡️ Going to page {next_page_num}: {next_url}")
await page.goto(next_url, timeout=120000)
try:
await page.wait_for_selector("div[data-job-id]", timeout=30000)
except PlaywrightTimeoutError:
print(" ⚠️ No jobs loaded on next page.")
break
current_page_num = next_page_num
else:
break
else:
print(" 🔚 No next page link found.")
break
print(f"✅ Finished pagination after {current_page_num} pages.")
async def _add_job_to_redis_cache(self, job_url: str, job_id: str, error_type: str):
try:
job_data = {
"job_url": job_url,
"job_id": job_id,
"error_type": error_type,
"timestamp": datetime.now().isoformat()
}
self.redis_client.hset("failed_jobs", job_id, json.dumps(job_data))
print(f" 📦 Added failed job to Redis cache: {job_id} (Error: {error_type})")
except Exception as e:
print(f" ❌ Failed to add to Redis: {str(e)}")
async def scrape_jobs(
self,
search_keywords: Optional[str],
max_pages: int = 10,
credentials: Optional[Dict] = None
):
clean_kw, location_kw = self._extract_keywords_and_location(search_keywords)
encoded_keywords = clean_kw.replace(" ", "+")
# ✅ FIXED: removed extra spaces
search_url = f"https://www.amazon.jobs/en/search?base_query={encoded_keywords}"
if location_kw:
search_url += f"&loc_query={location_kw.replace(' ', '+')}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
temp_fetcher = StealthyFetcher(self.engine, browser, context)
print("✅ Bypassing login (Amazon jobs are public)...")
await page.wait_for_load_state("load", timeout=120000)
protection_type = await temp_fetcher._detect_protection(page)
if protection_type:
print(f"🛡️ Protection detected: {protection_type}")
content_accessible = await temp_fetcher._is_content_accessible(page)
if not content_accessible:
handled = await self.engine._handle_cloudflare(page) if protection_type == "cloudflare" else False
if not handled:
await browser.close()
self.engine.report_outcome("protection_block")
return
else:
print("✅ Protection present but content accessible.")
print(f"🔍 Searching Amazon for: {search_keywords}")
await page.goto(search_url, timeout=120000)
try:
await page.wait_for_selector("div[data-job-id]", timeout=40000)
print("✅ Job listings detected.")
except PlaywrightTimeoutError:
print("❌ No job cards found.")
await browser.close()
return
all_job_links = []
seen_job_ids = set()
print("🔄 Collecting initial job links...")
initial_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {initial_jobs} initial job(s) (total: {len(all_job_links)})")
await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links)
print(f"✅ Collected {len(all_job_links)} unique job listings.")
scraped_count = 0
for idx, (href, title, posted_date) in enumerate(all_job_links):
try:
# ✅ FIXED: removed extra spaces
full_url = href if href.startswith("http") else f"https://www.amazon.jobs{href}"
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url} (posted: {posted_date})")
fetcher = StealthyFetcher(self.engine, browser, context)
job_page = await fetcher.fetch_url(full_url, wait_for_selector="h1[data-testid='job-title']")
if not job_page:
job_id = href.strip("/").split("/")[-1] if href else "unknown"
await self._add_job_to_redis_cache(full_url, job_id, "fetch_failure")
self.engine.report_outcome("fetch_failure", url=full_url)
continue
apply_btn = await job_page.query_selector("a:has-text('Apply now'), button:has-text('Apply now')")
final_url = full_url
page_content = await self._extract_page_content_for_llm(job_page)
job_id = href.strip("/").split("/")[-1] if href else "unknown"
raw_data = {
"page_content": page_content,
"url": final_url,
"job_id": job_id,
"search_keywords": search_keywords,
"posted_date": posted_date
}
refined_data = await self.llm_agent.refine_job_data(raw_data, self.user_request)
if refined_data and refined_data.get("title", "N/A") != "N/A":
compulsory_fields = ['company_name', 'job_id', 'url']
for field in compulsory_fields:
if not refined_data.get(field) or refined_data[field] in ["N/A", "", "Unknown"]:
if field == 'job_id':
refined_data[field] = job_id
elif field == 'url':
refined_data[field] = final_url
elif field == 'company_name':
refined_data[field] = "Amazon"
refined_data['scraped_at'] = datetime.now().isoformat()
refined_data['category'] = clean_kw
refined_data['posted_date'] = posted_date
await self.llm_agent.save_job_data(refined_data, search_keywords)
scraped_count += 1
print(f" ✅ Scraped: {refined_data['title'][:50]}...")
self.engine.report_outcome("success", url=raw_data["url"])
else:
print(f" 🟡 LLM failed to refine: {full_url}")
await self._add_job_to_redis_cache(full_url, job_id, "llm_failure")
self.engine.report_outcome("llm_failure", url=raw_data["url"])
await job_page.close()
except Exception as e:
error_msg = str(e)[:100]
print(f" ⚠️ Exception on job {idx+1}: {error_msg}")
job_id = (href.strip("/").split("/")[-1] if href else "unknown") if 'href' in locals() else "unknown"
job_url = full_url if 'full_url' in locals() else "unknown"
await self._add_job_to_redis_cache(job_url, job_id, f"exception: {error_msg}")
if 'job_page' in locals() and job_page:
await job_page.close()
continue
finally:
if not page.is_closed():
await page.goto(search_url, timeout=120000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Processed {scraped_count} jobs.")
else:
self.engine.report_outcome("captcha")
print("⚠️ No jobs processed successfully.")

60
amazon_main.py Normal file
View File

@ -0,0 +1,60 @@
from scraping_engine import FingerprintScrapingEngine
from amazon_job_scraper import AmazonJobScraper # Updated class name
from dotenv import load_dotenv
import asyncio
import random
import time
load_dotenv()
async def main():
engine = FingerprintScrapingEngine(
seed="amazon_job_scraping_12",
target_os="windows",
db_path="amazon_jobs.db",
markdown_path="amazon_jobs.md"
)
scraper = AmazonJobScraper(
engine,
human_speed=1.4,
user_request="Extract title, company, location, description, basic qualifications, preferred qualifications, job ID, and job type (full-time, part-time, etc.)"
)
job_titles = [
"Software Development Engineer",
"Data Scientist",
"Product Manager",
"UX Designer",
"Solutions Architect",
"Machine Learning Engineer",
"Frontend Engineer",
"Backend Engineer",
"Full Stack Engineer",
"Data Engineer"
]
fixed_location = "United States" # Amazon uses country/region, not city
while True:
random.shuffle(job_titles)
for job_title in job_titles:
search_keywords = f"{job_title} location:{fixed_location}"
print(f"\n{'='*60}")
print(f"Starting Amazon scrape for: {search_keywords}")
print(f"{'='*60}")
await scraper.scrape_jobs(
search_keywords=search_keywords,
max_pages=400 # Amazon loads 10 per page; 3 pages = ~30 jobs
)
print(f"\n✅ Completed scraping for: {job_title}")
print(f"⏳ Waiting 90 seconds before next job title...")
time.sleep(120)
print(f"\n✅ Completed full cycle. Restarting...")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -2,6 +2,16 @@
import os
import json
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# LLM Agent Configuration
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
if not DEEPSEEK_API_KEY:
raise ValueError("DEEPSEEK_API_KEY environment variable not set in .env file")
def load_spoof_config():
"""Load spoof data from JSON config file. Falls back to defaults if missing."""

125
fetcher.py Normal file
View File

@ -0,0 +1,125 @@
import asyncio
import random
import time
from playwright.async_api import Page, BrowserContext, Browser, TimeoutError as PlaywrightTimeoutError
from typing import Optional
from scraping_engine import FingerprintScrapingEngine
class StealthyFetcher:
def __init__(self, engine: FingerprintScrapingEngine, browser: Browser, context: BrowserContext):
self.engine = engine
self.browser = browser
self.context = context
self.max_retries = 5
self.base_delay = 5
async def fetch_url(self, url: str, wait_for_selector: Optional[str] = None) -> Optional[Page]:
"""
Fetch a URL using stealth techniques, handling Cloudflare and other protections intelligently.
"""
for attempt in range(self.max_retries):
try:
print(f"Attempt {attempt + 1} to fetch {url}")
page = await self.context.new_page()
await page.goto(url, wait_until='load', timeout=120000)
if wait_for_selector:
try:
await page.wait_for_selector(wait_for_selector, timeout=40000)
except PlaywrightTimeoutError:
print(f"Selector {wait_for_selector} not found immediately, continuing...")
await self._apply_human_behavior(page)
protection_type = await self._detect_protection(page)
if protection_type:
print(f"🛡️ Protection detected: {protection_type}")
content_accessible = await self._is_content_accessible(page, wait_for_selector)
if not content_accessible:
print("🔒 Content not accessible due to protection.")
handled = False
if protection_type == "cloudflare":
handled = await self._handle_cloudflare(page)
elif protection_type == "captcha":
handled = await self._handle_captcha(page)
if not handled:
print("❌ Failed to handle protection.")
await page.close()
await asyncio.sleep(self.base_delay * (2 ** attempt))
continue
else:
print("✅ Protection present but content is accessible — proceeding.")
print(f"✅ Successfully fetched {url}")
return page
except Exception as e:
print(f"Attempt {attempt + 1} failed for {url}: {str(e)}")
if 'page' in locals():
await page.close()
await asyncio.sleep(self.base_delay * (2 ** attempt))
print(f"❌ Failed to fetch {url} after {self.max_retries} attempts.")
return None
async def _apply_human_behavior(self, page: Page):
await self.engine._human_like_scroll(page)
await asyncio.sleep(random.uniform(1, 3))
await self.engine._simulate_human_interaction(page)
await asyncio.sleep(random.uniform(1, 2))
async def _detect_protection(self, page: Page) -> Optional[str]:
content = (await page.content()).lower()
if (
"#cf-chl" in content
or "checking your browser" in content
or "just a moment" in content
or "cloudflare" in content
or "ddos protection" in content
or "turnstile" in content
):
return "cloudflare"
elif "captcha" in content or "robot" in content or "verify you're human" in content:
return "captcha"
return None
async def _is_content_accessible(self, page: Page, wait_for_selector: Optional[str] = None) -> bool:
if wait_for_selector:
try:
await page.wait_for_selector(wait_for_selector, timeout=40000)
return True
except PlaywrightTimeoutError:
pass
try:
body_text = await page.eval_on_selector("body", "el => el.innerText.toLowerCase()")
return len(body_text.strip()) > 200
except:
return False
async def _handle_captcha(self, page: Page) -> bool:
print("🦾 Using 'avoid' strategy for captcha — skipping page.")
return False
async def _handle_cloudflare(self, page: Page) -> bool:
max_wait_time = 60
start_time = time.time()
while time.time() - start_time < max_wait_time:
if not await self._detect_protection(page):
print("☁️ Cloudflare challenge resolved.")
return True
print("☁️ Cloudflare active, waiting...")
await self._apply_human_behavior(page)
wait_time = min(10, 2 + random.uniform(1, 3) + (time.time() - start_time) * 0.1)
await asyncio.sleep(wait_time)
if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2:
print("🔄 Reloading page during Cloudflare wait...")
await page.reload(wait_until='load', timeout=120000)
print("⏰ Timeout waiting for Cloudflare resolution.")
return False

View File

@ -1,491 +0,0 @@
import asyncio
import random
import sqlite3
import os
from datetime import datetime
from typing import Optional, Dict
from playwright.async_api import async_playwright
from browserforge.injectors.playwright import AsyncNewContext
class LinkedInJobScraper:
def __init__(
self,
engine,
db_path: str = "linkedin_jobs.db",
human_speed: float = 1.0
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self._init_db()
def _init_db(self):
os.makedirs(os.path.dirname(self.db_path) if os.path.dirname(self.db_path) else ".", exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT,
title TEXT,
company TEXT,
location TEXT,
salary TEXT,
description TEXT,
url TEXT UNIQUE,
workplace_type TEXT,
scraped_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
return True
except:
return False
async def _login(self, page, credentials: Dict) -> bool:
"""Human-realistic LinkedIn login"""
print("🔐 Navigating to LinkedIn login page...")
await page.goto("https://www.linkedin.com/login", timeout=60000)
await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed)
email_field = await page.query_selector('input[name="session_key"]')
if not email_field:
print("❌ Email field not found.")
return False
print("✍️ Typing username...")
await email_field.click()
await asyncio.sleep(random.uniform(0.4, 0.9) * self.human_speed)
for char in credentials["email"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.06, 0.14) * self.human_speed)
await asyncio.sleep(random.uniform(1.0, 1.8) * self.human_speed)
password_field = await page.query_selector('input[name="session_password"]')
if not password_field:
print("❌ Password field not found.")
return False
print("🔒 Typing password...")
await password_field.click()
await asyncio.sleep(random.uniform(0.3, 0.7) * self.human_speed)
for char in credentials["password"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.08, 0.16) * self.human_speed)
await asyncio.sleep(random.uniform(0.8, 1.5) * self.human_speed)
print("✅ Submitting login form...")
await page.keyboard.press("Enter")
for _ in range(15):
current_url = page.url
if "/feed" in current_url or "/jobs" in current_url:
if "login" not in current_url:
print("✅ Login successful!")
await asyncio.sleep(random.uniform(2.0, 3.0) * self.human_speed)
return True
await asyncio.sleep(1)
print("❌ Login may have failed.")
return False
async def _extract_job_details(self, page) -> Dict:
"""Extract from ANY job page: LinkedIn Easy Apply OR external site"""
await asyncio.sleep(2 * self.human_speed)
async def get_text(selector: str) -> str:
try:
el = await page.query_selector(selector)
if el:
text = await el.inner_text()
return text.strip() if text else "N/A"
except:
pass
return "N/A"
title = await get_text("h1.t-24")
if title == "N/A":
title = await get_text("h1, h2")
company = await get_text("a.app-aware-link[href*='/company/']")
if company == "N/A":
company = await get_text("div.org, .company, [class*='company']")
location = await get_text("span[class*='location']")
if location == "N/A":
location = await get_text(".location, [class*='location']")
description = await get_text("div[class*='description__text']")
if description == "N/A":
description = await get_text(".job-desc, .description, main, body")
workplace = await get_text("span.job-workplace-type") or "N/A"
salary = await get_text("span.salary") or "N/A"
return {
"title": title,
"company": company,
"location": location,
"workplace_type": workplace,
"salary": salary,
"description": description,
"url": page.url
}
async def _save_to_markdown(self, job_data: Dict, keyword: str, verified: bool=True):
"""Save to appropriate folder using job ID to avoid duplication"""
folder = "linkedin_jobs" if verified else "linkedin_jobs_unverified"
os.makedirs(folder, exist_ok=True)
# Extract job ID from URL for LinkedIn jobs
url = job_data.get("url", "")
if "/jobs/view/" in url:
try:
job_id = url.split("/view/")[1].split("/")[0]
except:
job_id = "unknown"
else:
# For external jobs, use a hash of the URL (first 12 chars)
import hashlib
job_id = hashlib.md5(url.encode()).hexdigest()[:12]
clean_keyword = keyword.replace(" ", "_")
filename = f"linkedin_{clean_keyword}_job_{job_id}.md"
filepath = os.path.join(folder, filename)
# Only save if file doesn't already exist (idempotent)
if os.path.exists(filepath):
print(f" 📝 Skipping duplicate Markdown file: {filename}")
return
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"# {job_data['title']}\n\n")
f.write(f"- **Company**: {job_data['company']}\n")
f.write(f"- **Location**: {job_data['location']}\n")
f.write(f"- **Workplace**: {job_data['workplace_type']}\n")
f.write(f"- **Salary**: {job_data['salary']}\n")
f.write(f"- **URL**: <{url}>\n\n")
f.write(f"## Description\n\n{job_data['description']}\n")
async def _save_to_db(self, job_data: Dict, keyword: str):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO jobs
(keyword, title, company, location, salary, description, url, workplace_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
keyword,
job_data["title"],
job_data["company"],
job_data["location"],
job_data["salary"],
job_data["description"],
job_data["url"],
job_data["workplace_type"]
))
conn.commit()
async def scrape_jobs(
self,
search_keywords: str,
max_pages: int = 1,
credentials: Optional[Dict] = None
):
encoded_keywords = search_keywords.replace(" ", "%20")
search_url = f"https://www.linkedin.com/jobs/search/?keywords={encoded_keywords}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
session_loaded = await self.engine.load_session(context)
login_successful = False
if session_loaded:
print("🔁 Using saved session — verifying login...")
await page.goto("https://www.linkedin.com/feed/", timeout=60000)
if "feed" in page.url and "login" not in page.url:
print("✅ Session still valid.")
login_successful = True
else:
print("⚠️ Saved session expired — re-authenticating.")
session_loaded = False
if not session_loaded and credentials:
print("🔐 Performing fresh login...")
login_successful = await self._login(page, credentials)
if login_successful:
await self.engine.save_session(context)
else:
print("❌ Login failed. Exiting.")
await browser.close()
self.engine.report_outcome("block")
return
elif not credentials:
print(" No credentials — proceeding as guest.")
login_successful = True
else:
pass
await page.wait_for_load_state("load", timeout=60000)
print("✅ Post-login page fully loaded. Starting search...")
if await self.engine._detect_cloudflare(page):
print("☁️ Cloudflare detected on initial load.")
if not await self.engine._handle_cloudflare(page):
print("❌ Cloudflare could not be resolved.")
await browser.close()
self.engine.report_outcome("cloudflare")
return
print(f"🔍 Searching for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=60000)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
if await self.engine._detect_cloudflare(page):
print("☁️ Cloudflare detected on search page.")
if not await self.engine._handle_cloudflare(page):
await browser.close()
self.engine.report_outcome("cloudflare")
return
scraped_count = 0
all_job_links = []
seen_job_ids = set()
# ← NEW: Scroll once to reveal pagination (if any)
print("🔄 Scrolling to bottom to reveal pagination controls...")
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
# Check if pagination exists
pagination_exists = await page.query_selector("button[aria-label='Next']")
if pagination_exists:
print("⏭️ Pagination detected. Using page navigation.")
current_page = 1
while current_page <= max_pages:
print(f"📄 Processing page {current_page}/{max_pages}")
# Collect job links on current page
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
new_jobs = 0
for link in current_links:
href = await link.get_attribute("href")
if href:
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
if job_id and job_id not in seen_job_ids:
seen_job_ids.add(job_id)
all_job_links.append(href)
new_jobs += 1
print(f" Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})")
# Try to go to next page
if current_page < max_pages:
next_btn = await page.query_selector("button[aria-label='Next']")
if next_btn and await next_btn.is_enabled():
await self._human_click(page, next_btn)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
# Wait for URL to change or new content
try:
await page.wait_for_function("() => window.location.href.includes('start=')", timeout=30000)
except:
pass
else:
print("🔚 'Next' button not available — stopping pagination.")
break
current_page += 1
else:
print("🔄 No pagination found. Falling back to infinite scroll...")
last_height = await page.evaluate("document.body.scrollHeight")
no_new_jobs_count = 0
max_no_new = 3
while no_new_jobs_count < max_no_new:
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
new_jobs_found = 0
for link in current_links:
href = await link.get_attribute("href")
if href:
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
if job_id and job_id not in seen_job_ids:
seen_job_ids.add(job_id)
all_job_links.append(href)
new_jobs_found += 1
print(f" Found {new_jobs_found} new job(s) (total: {len(all_job_links)})")
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == last_height:
no_new_jobs_count += 1
else:
no_new_jobs_count = 0
last_height = new_height
if new_jobs_found == 0 and no_new_jobs_count >= 1:
print("🔚 No new jobs loaded. Stopping scroll.")
break
print(f"✅ Collected {len(all_job_links)} unique job links.")
# ← Rest of job processing loop unchanged
scraped_count = 0
for idx, href in enumerate(all_job_links):
try:
full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}"
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
await page.goto(full_url, wait_until='load', timeout=60000)
await asyncio.sleep(3 * self.human_speed)
is_cloudflare = await self.engine._detect_cloudflare(page)
page_content = await page.content()
has_captcha_text = "captcha" in page_content.lower()
captcha_present = is_cloudflare or has_captcha_text
title_element = await page.query_selector("h1.t-24")
job_data_accessible = title_element is not None
if captcha_present:
if job_data_accessible:
print(" ⚠️ CAPTCHA detected, but job data is accessible. Proceeding in stealth mode...")
await self.engine._avoid_captcha(page)
else:
print(" ⚠️ CAPTCHA detected and job data blocked. Attempting recovery...")
if not await self.engine._solve_captcha_fallback(page):
print(" ❌ CAPTCHA recovery failed. Skipping job.")
continue
title_element = await page.query_selector("h1.t-24")
if not title_element:
print(" ❌ Job data still unavailable after CAPTCHA handling. Skipping.")
continue
if not captcha_present:
await self.engine._avoid_captcha(page)
apply_btn = None
apply_selectors = [
"button[aria-label*='Apply']",
"button:has-text('Apply')",
"a:has-text('Apply')",
"button:has-text('Easy Apply')"
]
for selector in apply_selectors:
apply_btn = await page.query_selector(selector)
if apply_btn:
break
job_data = None
final_url = full_url
if apply_btn:
print(" → Clicking 'Apply' / 'Easy Apply' button...")
page_waiter = asyncio.create_task(context.wait_for_event("page"))
await self._human_click(page, apply_btn, wait_after=False)
external_page = None
try:
external_page = await asyncio.wait_for(page_waiter, timeout=5.0)
print(" 🌐 External job site opened in new tab.")
await external_page.wait_for_load_state("load", timeout=30000)
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(external_page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(external_page)
final_url = external_page.url
if not external_page.is_closed():
await external_page.close()
except asyncio.TimeoutError:
print(" 🖥️ No external tab — scraping LinkedIn job page.")
await page.wait_for_timeout(2000)
try:
await page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=8000)
except:
pass
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(page)
final_url = page.url
else:
print(" ⚠️ No 'Apply' button found — scraping job details directly.")
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
job_data = await self._extract_job_details(page)
final_url = page.url
job_data["url"] = final_url
if job_data["title"] == "N/A" and "linkedin.com" in final_url:
job_id = final_url.split("/")[-2] if "/jobs/view/" in final_url else "unknown"
job_data["title"] = f"Easy Apply Job - ID {job_id}"
is_meaningful = (
job_data["title"] != "N/A" or
job_data["company"] != "N/A" or
(job_data["description"] != "N/A" and len(job_data["description"]) > 20)
)
if is_meaningful:
await self._save_to_db(job_data, search_keywords)
await self._save_to_markdown(job_data, search_keywords, verified=True)
scraped_count += 1
print(f" ✅ Scraped (verified): {job_data['title'][:50]}...")
else:
await self._save_to_markdown(job_data, search_keywords, verified=False)
print(f" 🟡 Scraped (unverified): {final_url} — low-quality data")
except Exception as e:
print(f" ⚠️ Failed on job {idx+1}: {str(e)[:100]}")
continue
finally:
print(" ↩️ Returning to LinkedIn search results...")
await page.goto(search_url, timeout=60000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Saved {scraped_count} verified + additional unverified jobs for '{search_keywords}'.")
else:
self.engine.report_outcome("captcha")
print("⚠️ No verified jobs scraped — check 'linkedin_jobs_unverified' for raw outputs.")

View File

@ -0,0 +1,504 @@
# LinkedIn Jobs - 2025-12-05 14:04:45
## Job: Machine Learning Engineer
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: The Arena
- **Location**: Lagos, Lagos State, Nigeria
- **Nature of Work**: Not specified in the provided content. Could not infer from keywords like 'remote', 'onsite', or 'hybrid'.
- **Salary Range**: Not specified in the provided content.
- **Job ID**: 4325564279
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:04:43.210072
- **URL**: <https://www.linkedin.com/jobs/view/4325564279/?eBP=NOT_ELIGIBLE_FOR_CHARGING&refId=KZdFT%2FkXGUGDBr1Ru66VSg%3D%3D&trackingId=O4Oht8qideqoj%2FUHqgXQKg%3D%3D&trk=flagship3_search_srp_jobs>
### Description
The job posting is for a Machine Learning Engineer position. The content appears to be from a LinkedIn job application dialog, showing contact information collection for the applicant Ofure Ikheloa. The main job details beyond the title and company are not fully visible in the provided content snippet, which focuses on the application form's contact info section.
### Requirements
Specific requirements are not detailed in the provided content snippet. The visible section is part of the application form for collecting the candidate's contact information.
### Qualifications
Specific qualifications are not detailed in the provided content snippet. The visible section is part of the application form.
---
## Job: Junior Software Engineer (Fresh Graduates)
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: Clarvos
- **Location**: Lagos, Lagos State, Nigeria
- **Nature of Work**: N/A
- **Salary Range**: N/A
- **Job ID**: 4348455050
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:06:23.354232
- **URL**: <https://www.linkedin.com/jobs/view/4348455050/?eBP=NOT_ELIGIBLE_FOR_CHARGING&refId=KZdFT%2FkXGUGDBr1Ru66VSg%3D%3D&trackingId=ztbzkgV%2BpdyOIs92zSWwNQ%3D%3D&trk=flagship3_search_srp_jobs>
### Description
The job posting is for a Junior Software Engineer position targeted at fresh graduates. The role appears to be with Clarvos, based in Lagos, Nigeria. The provided content shows an application form with contact information fields, indicating this is an active job application page on LinkedIn.
### Requirements
The specific requirements are not detailed in the provided content snippet. However, being a Junior Software Engineer role for fresh graduates typically requires foundational programming knowledge, problem-solving skills, and a willingness to learn.
### Qualifications
The role is explicitly for 'Fresh Graduates,' indicating that a recent bachelor's degree in Computer Science, Software Engineering, or a related field is the primary qualification. Specific educational requirements are not listed in the provided text.
---
## Job: Machine Learning Engineer - Search
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: Shopify
- **Location**: Remote - Americas
- **Nature of Work**: Remote
- **Salary Range**: N/A
- **Job ID**: unknown
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:37:51.551952
- **URL**: <https://www.shopify.com/careers/machine-learning-engineer-search_c15b011d-bfe1-4eae-af45-9f3955ce408d?utm_source=linkedin>
### Description
About the role Every day, millions of people search for products across Shopify's ecosystem. That's not just queries—that's dreams, businesses, and livelihoods riding on whether someone finds the perfect vintage jacket or the exact drill bit they need. As a Machine Learning Engineer specializing in Search Recommendations, you'll be the one making that magic happen. With a search index unifying over a billion products, you're tackling one of the hardest search problems at unprecedented scale. We're building cutting-edge product search from the ground up using the latest LLM advances and vector matching technologies to create search experiences that truly understand what people are looking for.
### Requirements
Key Responsibilities: Design and implement AI-powered features to enhance search recommendations and personalization Collaborate with data scientists and engineers to productionize data products through rigorous experimentation and metrics analysis Build and maintain robust, scalable data pipelines for search and recommendation systems Develop comprehensive tools for evaluation and relevance engineering, following high-quality software engineering practices Mentor engineers and data scientists while fostering a culture of innovation and technical excellence
### Qualifications
Qualifications: Expertise in relevance engineering and recommendation systems, with hands-on experience in Elasticsearch, Solr, or vector databases Strong proficiency in Python with solid object-oriented programming skills Proven ability to write optimized, low-latency code for high-performance systems Experience deploying machine learning, NLP, or generative AI products at scale (strong plus) Familiarity with statistical methods and exposure to Ruby, Rails, or Rust (advantageous) Track record of shipping ML solutions that real users depend on
---
## Job: Data/ML Engineer
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: Sailplan
- **Location**: Fort Lauderdale
- **Nature of Work**: This position may be located remotely or from our Headquarters in Miami / Fort Lauderdale, Florida, as determined on a case by case basis. Remote candidates must be US citizens located in the United States or Canada. Remote candidates are expected to travel to office periodically as necessary.
- **Salary Range**: N/A
- **Job ID**: unknown
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:40:02.469176
- **URL**: <https://www.wiraa.com/job-description/usAD0C4BA9641769424C946CED3DC727D9?source=Linkedin>
### Description
SailPlan is a cutting-edge technology company that is dedicated to transforming the future of maritime transportation. SailPlan offers a range of innovative solutions and services that enable its clients to optimize their operations and reduce their environmental impact. SailPlan works with some of the most important names in the shipping industry to deliver a cleaner future for the world. SailPlans team comprises of experts with a diverse range of skills and experience, including naval architects, data scientists, and software engineers. The companys collaborative and dynamic work culture fosters innovation and creativity, allowing the team to develop cutting-edge solutions that drive the industry forward. By combining state-of-the-art technology and a commitment to sustainability, SailPlan is leading the way towards a greener and more efficient maritime industry. At SailPlan, you will be part of a fast-growing team, will wear many hats and have ownership over building key aspects of our platform. You will work within a collaborative environment to build the next generation of technology for the maritime industry. If you think you have the right stuff, we are looking for YOU.
### Requirements
We are seeking an experienced Data Engineer to focus on productionalizing machine learning models within Google Cloud Platform (GCP) while collaborating on ETL design and planning. This role will be responsible for architecting, implementing, and maintaining the infrastructure needed for model training, retraining, and deployment, ensuring high-quality ML performance monitoring and external output serving. The ideal candidate has expertise in ML model operationalization, cloud architecture, and data pipeline orchestration, working closely with data scientists, cloud engineers, and analysts to bridge the gap between data engineering and MLOps, while also ensuring that model outputs are seamlessly integrated into analytics and decision-making systems.
### Qualifications
Core Requirements and Qualifications
Select and implement the appropriate GCP services for scalable ML workflows, including model training, retraining, and deployment
Develop automated monitoring and performance tracking for deployed models, surfacing quality metrics internally and ensuring external services receive high-quality outputs
Optimize model deployment pipelines to ensure efficient versioning, retraining triggers, and drift detection
Collaboration on ETL Development: Work alongside data engineers to design and optimize data pipelines that support machine learning models
Ensure seamless integration between data ingestion, transformations, and ML pipelines, leveraging BigQuery and DBT
Coordinate with sensor and instrumentation engineers to facilitate the ingestion of real-time sensor data for predictive modeling
Architect and implement CI/CD pipelines for ML models, enabling automated deployment, testing, and rollback strategies
Design cloud infrastructure that supports scalable and cost-efficient ML model training in production environments
Implement logging, alerting, and monitoring to proactively identify issues with models and data pipelines
Ensure ML model outputs are easily accessible and consumable by analytics, dashboards, and external services
Work closely with data analysts and cloud engineers to optimize Looker integrations and visualization pipelines for ML-driven insights
Maintain and document model lifecycle processes, ensuring clarity and reproducibility across the team
Required Skills
Bachelors or Masters degree in Computer Science, Software Engineering, or a related field
Strong experience in MLOps and machine learning model operationalization, particularly within GCP
Proficiency in SQL and Python, with experience in data manipulation, feature engineering, and ML model deployment
Hands-on experience with CI/CD pipelines, version control (Git), and infrastructure-as-code (Terraform, Cloud Build)
Experience working with data pipelines (specifically time-series data) and collaborating with data engineers to support ML workflows
Excellent problem-solving skills and a proactive, collaborative mindset
Preferred Qualifications
Familiarity with the maritime/shipping domain, including knowledge of sensor data and operational challenges
Experience with dbt and BigQuery for efficient data transformation
Knowledge of LookML and Looker dashboards, especially for surfacing ML insights
Experience working with real-time streaming and high-fidelity time series data
Understanding of data governance, security, and compliance best practices in cloud-based environments
---
## Job: AI Engineer (m/f/d)
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: myneva
- **Location**: Portugal, remote
- **Nature of Work**: Permanent employee, Full-time · Portugal, remote
- **Salary Range**: N/A
- **Job ID**: unknown
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:42:45.137786
- **URL**: <https://myneva-group.jobs.personio.de/job/2443218?language=en&src=752617&_pc=752617#apply>
### Description
Your mission We are looking for a polyglot AI Engineer, located in the mainland Portugal, who is as comfortable optimizing backend concurrency in Go as they are building RAG pipelines in Python and designing interfaces in TypeScript. In this role, you will wear two hats. First, you will build the core infrastructure of our AI Platform, ensuring high availability and low latency for model inference. Second, you will design and deploy autonomous AI Agents, for our product landscape and services. If you are a builder who believes that AI is more than just prompts, its about systems, integration, and architecture then we want to hear from you. 1. AI Platform Development (Go & Python): Architect and build scalable microservices in Go (Golang) to handle high-throughput requests for our AI services. Design efficient APIs (gRPC/REST) that serve as the bridge between our core application and AI models. Optimize inference latency and manage model serving infrastructure. 2. AI Agent Engineering (Python & TypeScript): Develop autonomous agents using Python frameworks (e.g., LangChain, LlamaIndex, or custom solutions) to automate internal business processes (e.g., data entry, customer support triage, financial reporting). Implement "Tool Use" and "Function Calling" to allow LLMs to interact with third-party APIs and our internal databases. Build the integration layer and user interfaces in TypeScript (Node.js/Next.js) to allow non-technical staff to interact with these agents.
### Requirements
Your profile Polyglot Proficiency: You have production-level experience in Go, Python, and TypeScript (at least 2 of the 3 languages). You know which tool to use for which job. AI/LLM Experience: You have built applications utilizing OpenAI API, Anthropic, or open-source models (Llama 3, Mistral). You understand context windows, token limits, and prompt engineering. Systems Thinking: You understand distributed systems, concurrency, and how to deploy AI in a way that doesn't break production. Agentic Workflows: Experience building multi-step reasoning agents (e.g., "Plan-and-Execute" patterns). Database Skills: Proficiency with SQL (Postgres) and Vector Stores.
### Qualifications
Nice to Haves (Bonus Points) Experience fine-tuning open-source models on custom datasets. Knowledge of temporal/orchestration frameworks for managing long-running agent workflows. Experience with container orchestration (Kubernetes/Docker). Background in DevOps or MLOps (MLflow, weights & biases).
---
## Job: ML/AI Engineer
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: Deltek, Inc
- **Location**: US Remote
- **Nature of Work**: Remote
- **Salary Range**: The U.S. salary range for this position is $57000.00 - $99750.00.
- **Job ID**: unknown
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:46:54.116677
- **URL**: <https://sjobs.brassring.com/TGnewUI/Search/home/HomeWithPreLoad?PageType=JobDetails&partnerid=25397&siteid=5259&jobId=621951#jobDetails=621951_5259>
### Description
Position Responsibilities Develop and deploy machine learning models for classification, regression, forecasting, and NLP tasks using production-grade code and best practices Build data pipelines for ML model training and inference; work with structured and unstructured data from multiple enterprise systems Implement model training workflows including data preprocessing, feature engineering, hyperparameter tuning, and model evaluation Create production-ready ML services with RESTful APIs that can be consumed by web and mobile applications; ensure proper error handling, logging, and monitoring Work with large-scale datasets from enterprise ERP systems; process time-series data, transactional data, and unstructured documents Collaborate with data scientists to productionize research models; optimize models for latency, throughput, and cost Participate in code reviews and contribute to team's ML engineering practices; document solutions and share knowledge with team members Support deployed models including troubleshooting, performance optimization, and implementing improvements based on production metrics
### Requirements
Qualifications 2-4 years of ML engineering experience with hands-on model development and production deployment Strong Python programming : Experience with scikit-learn, pandas, numpy; familiarity with PyTorch or TensorFlow ML fundamentals : Solid understanding of supervised/unsupervised learning, model evaluation, cross-validation, and feature engineering API development : Experience building RESTful APIs (Flask, FastAPI, or similar); understanding of microservices architecture Data processing : SQL proficiency; experience with data pipelines, ETL processes, and working with databases (PostgreSQL, MySQL, or similar) Cloud platforms : Working knowledge of AWS, Azure, or GCP; experience with cloud storage, compute, and managed ML services Version control and collaboration : Git workflows, agile methodologies, working in cross-functional teams Bonus : Exposure to NLP techniques, LLMs, embedding models, or vector databases; experience in B2B SaaS environments Education : BS in Computer Science, Data Science, Mathematics, or related technical field
### Qualifications
N/A
---
## Job: Machine Learning Engineer
- **Keyword**: Machine Learning Engineer location:New York
- **Company**: PhysicsX
- **Location**: New York, United States
- **Nature of Work**: Hybrid setup enjoy our Manhattan office while keeping remote flexibility.
- **Salary Range**: $120,000 - 240,000 depending on experience
- **Job ID**: unknown
- **Category**: Machine Learning Engineer
- **Scraped At**: 2025-12-05T14:49:29.919057
- **URL**: <https://job-boards.eu.greenhouse.io/physicsx/jobs/4644841101?gh_src=6d71ons2teu>
### Description
What you will do
Work closely with our simulation engineers, data scientists and customers to develop an understanding of the physics and engineering challenges we are solving
Design, build and test data pipelines for machine learning that are reliable, scalable and easily deployable
Explore and manipulate 3D point cloud & mesh data
Own the delivery of technical workstreams
Create analytics environments and resources in the cloud or on premise, spanning data engineering and science
Identify the best libraries, frameworks and tools for a given task, make product design decisions to set us up for success
Work at the intersection of data science and software engineering to translate the results of our R&D and projects into re-usable libraries, tooling and products
Continuously apply and improve engineering best practices and standards and coach your colleagues in their adoption
### Requirements
What you bring to the table
Experience applying Machine learning methods (including 3D graph/point cloud deep learning methods) to real-world engineering applications, with a focus on driving measurable impact in industry settings.
Experience in ML/Computational Statistics/Modelling use-cases in industrial settings (for example supply chain optimisation or manufacturing processes) is encouraged.
A track record of scoping and delivering projects in a customer facing role
2+ years experience in a data-driven role, with exposure to software engineering concepts and best practices (e.g., versioning, testing, CI/CD, API design, MLOps)
Building machine learning models and pipelines in Python, using common libraries and frameworks (e.g., TensorFlow, MLFlow)
Distributed computing frameworks (e.g., Spark, Dask)
Cloud platforms (e.g., AWS, Azure, GCP) and HP computing
Containerization and orchestration (Docker, Kubernetes)
Strong problem-solving skills and the ability to analyse issues, identify causes, and recommend solutions quickly
Excellent collaboration and communication skills - with teams and customers alike
A background in Physics, Engineering, or equivalent
### Qualifications
N/A
---
## Job: DevOps Engineer
- **Keyword**: DevOps Engineer location:New York
- **Company**: micro1
- **Location**: N/A
- **Nature of Work**: N/A
- **Salary Range**: N/A
- **Job ID**: 4342293908
- **Category**: DevOps Engineer
- **Scraped At**: 2025-12-05T15:46:55.076540
- **URL**: <https://www.linkedin.com/jobs/view/4342293908/?eBP=NOT_ELIGIBLE_FOR_CHARGING&refId=8EZEulwt3rTN7TqojhbDIQ%3D%3D&trackingId=8OTJNgCFiTsOg9ssPeenJQ%3D%3D&trk=flagship3_search_srp_jobs>
### Description
N/A
### Requirements
N/A
### Qualifications
N/A
---
## Job: DevOps Engineer
- **Keyword**: DevOps Engineer location:New York
- **Company**: Utah Tech Labs
- **Location**: North America + 1 more
- **Nature of Work**: Freelance
- **Salary Range**: $16 $20/hr
- **Job ID**: 4325775293
- **Category**: DevOps Engineer
- **Scraped At**: 2025-12-05T15:51:10.659992
- **URL**: <https://app.usebraintrust.com/jobs/16561/?gh_src=06bf3def4us&utm_channel=jobboard&utm_source=linkedin>
### Description
We are looking for a skilled DevOps Engineer to help build the foundational infrastructure, CI/CD pipelines, and operational standards for our engineering organization. This role is hands-on and ideal for someone experienced in supporting large-scale applications across modern frontend, backend, and cloud technologies.
Responsibilities
Build and maintain AWS infrastructure using Terraform (IaC).
Set up CI/CD pipelines for Angular, React, Flutter, Python (FastAPI/Django), and Node.js (Express/TypeScript) .
Implement automated unit, integration, and regression testing pipelines.
Establish logging, monitoring, and alerting (CloudWatch, ELK/OpenSearch, Datadog, etc.).
Define and enforce data security , IAM, secrets management, and encryption best practices.
Optimize SQL/NoSQL databases, performance tuning, backups, and restore workflows.
Create SOPs, reusable infrastructure templates, and DevOps best-practice documentation.
### Requirements
23+ years DevOps experience , including work on larger, production-grade projects.
Strong experience with AWS (EC2, ECS/EKS, RDS, S3, CloudFront, IAM, VPC).
Strong proficiency with Terraform and infrastructure-as-code workflows.
Hands-on experience building CI/CD pipelines for modern frontend and backend frameworks.
Strong understanding of data security , network configuration, and secure deployment.
Familiarity with application logging, monitoring, and distributed tracing.
Experience optimizing relational and non-relational databases.
Deliverables include SOPs, infrastructure templates, CI/CD pipelines, and automated testing frameworks.
### Qualifications
N/A
---
## Job: DevOps Engineer
- **Keyword**: DevOps Engineer location:New York
- **Company**: Core4ce
- **Location**: Remote (Worldwide)
- **Nature of Work**: Remote
- **Salary Range**: N/A
- **Job ID**: 4342211043
- **Category**: DevOps Engineer
- **Scraped At**: 2025-12-05T15:52:59.449387
- **URL**: <https://jobs.silkroad.com/Core4ce/Careers/jobs/1026?src=LinkedIn>
### Description
As a DevOps Engineer, you will play a pivotal role in designing, implementing, and maintaining the infrastructure and tools necessary to support continuous integration, continuous deployment, and automated operations. You will collaborate with cross-functional teams to streamline development processes, improve system reliability, and enhance overall productivity.
### Requirements
Responsibilities: Design and implement scalable, reliable, and secure DevOps solutions that meet the needs of the organization. Lead the development and implementation of CI/CD pipelines to automate software delivery processes. Architect and manage cloud infrastructure, ensuring optimal performance, cost efficiency, and scalability. Collaborate with development, operations, and quality assurance teams to integrate automated testing and monitoring into the CI/CD pipeline. Establish and enforce DevOps best practices, standards, and guidelines across the organization. Identify opportunities for process improvement and efficiency gains within the software development lifecycle. Provide technical guidance and mentorship to junior team members on DevOps tools, practices, and methodologies. Conduct research and evaluation of new tools, technologies, and methodologies to improve DevOps processes. Troubleshoot and resolve infrastructure and deployment issues in production and non-production environments. Ensure compliance with security, privacy, and regulatory requirements in all DevOps activities.
### Qualifications
Qualifications: 5+ years of experience in DevSecOps, specifically within DoD environments. Ability to obtain and maintain a Secret clearance. Ability to obtain and maintain a DoD 8570 IAT Level II certification. Proficient in Kubernetes, data pipeline management, and containerization technologies. Knowledge of continuous integration and continuous deployment (CI/CD) tools like Jenkins, GitLab CI, JIRA, or CircleCI. Familiarity with security scanning tools and vulnerability assessment tools (e.g., SonarQube, Fortify, Nessus). Experience with AWS cloud platforms and their native DevSecOps tools. Ability to analyze and improve existing DevSecOps processes for efficiency and security. Familiarity with regulatory compliance standards relevant to the DoD (e.g., NIST, FedRAMP). Proactive approach to identifying and mitigating security risks in the software development lifecycle. Certifications relevant to DevSecOps such as Certified Kubernetes Administrator (CKA), AWS Certified DevOps Engineer, or similar credentials Preferred Qualifications: Bachelor's degree in computer science, Cybersecurity, Information Technology, or related field, with an emphasis on security. Experience with Query Development and Optimization Perform defect fixes and minor feature extensions (adding new data elements to reports, adding or modifying current reporting filters, moving, adding or changing how data is displayed in a profile page for an individual service member, integrating new data sources into the database) Experience with Query Development and Optimization Code and update SSIS packages to support data extraction On-going enhancements to support policy changes such as updates to existing business logic that computes Platform, Administrative,
---
## Job: DevOps Engineer
- **Keyword**: DevOps Engineer location:New York
- **Company**: Verana Health
- **Location**: N/A
- **Nature of Work**: Remote
- **Salary Range**: National Pay Range $148,000 - $175,000 USD
- **Job ID**: 4256494004
- **Category**: DevOps Engineer
- **Scraped At**: 2025-12-05T15:55:17.680202
- **URL**: <https://job-boards.greenhouse.io/veranahealth/jobs/8054444002>
### Description
Accelerate Real-World Insights Through Cloud Innovation Verana Health is a digital health company that harnesses exclusive real-world data to deliver quality insights that accelerate drug development and improve patient care. Backed by leading investors such as Johnson & Johnson Innovation JJDC, Inc., Novo Growth, GV (formerly Google Ventures), and more, we are transforming how medical research is conducted. Our mission-driven team is committed to making a tangible difference in patients' lives through technology and data science. You will report to the Director of Engineering and work with engineering, data science, and IT teams. Your contributions will ensure the seamless operation of our cloud infrastructure, enabling faster, safer, and more reliable delivery of our data-driven solutions. This role is critical to Verana Health's ability to innovate and scale its impact on patient care. This is a remote position. Why This Role Matters As a DevOps Engineer at Verana Health, you will help ensure the reliability, security, and scalability of our cloud infrastructure. Your work will directly help deliver critical data analytics and research tools used by healthcare professionals and researchers worldwide. What You Get to Do Architect, deploy, and maintain cloud-based infrastructure using AWS, with a focus on automation, security, and scalability. Develop and optimize CI/CD pipelines to accelerate software delivery and improve operational efficiency. Collaborate with cross-functional teams (engineering, data science, QA) to support their DevOps needs and drive continuous improvement. Implement and enforce best practices for authorization, authentication, and compliance across AWS services. Monitor system performance, troubleshoot issues, and ensure high availability of critical applications and databases. Document and refine DevOps processes to foster knowledge sharing and operational excellence. Support database management, server administration (Linux/Windows), and infrastructure orchestration using tools like Docker, Kubernetes, and Terraform. Contribute to a culture of innovation, learning, and growth within the technology team.
### Requirements
Skills and Experience that Will Help You Succeed Essential Requirements: Bachelor's degree in computer science, software engineering, or a related scientific discipline. 5+ years of professional experience in DevOps, cloud engineering, or software development. Expertise in AWS services, including IAM, VPC, EC2, S3, and cloud security best practices. Hands-on experience with CI/CD tools, containerization (Docker, Kubernetes), and infrastructure as code (Terraform, CloudFormation). Proficiency in scripting (Bash, Python) and version control (GitLab, GitHub). Experience with Linux and Windows server administration, database management, and Databricks. Desirable Skills: Exposure to healthcare or clinical research environments. Experience mentoring or guiding junior team members. Continuous learning and process improvement. Must-Haves for the Role Expertise in AWS cloud infrastructure and security. Experience with CI/CD, containerization, and infrastructure as code. Strong scripting and automation skills.
### Qualifications
N/A
---
## Job: DevOps Engineer
- **Keyword**: DevOps Engineer location:New York
- **Company**: Machinify
- **Location**: California Office - Roseville, CA
- **Nature of Work**: Remote - Remote
- **Salary Range**: $150,000 - $180,000 USD
- **Job ID**: 4325826034
- **Category**: DevOps Engineer
- **Scraped At**: 2025-12-05T16:01:49.663716
- **URL**: <https://www.remotehunter.com/apply-with-ai/47671ed0-5c8f-4f9e-9958-ceef96a2bb13?utm_medium=job_posting&utm_source=linkedin&utm_campaign=devops_engineer_remote&utm_category=devops_engineer&utm_term=dev_ops_engineer>
### Description
Who We Are Machinify is a leading healthcare intelligence company with expertise across the payment continuum, delivering unmatched value, transparency, and efficiency to health plan clients across the country. Deployed by over 60 health plans, including many of the top 20, and representing more than 160 million lives, Machinify brings together a fully configurable and content-rich, AI-powered platform along with best-in-class expertise. Were constantly reimagining whats possible in our industry, creating disruptively simple, powerfully clear ways to maximize financial outcomes and drive down healthcare costs. Location: This role is fully remote About the Opportunity Machinify is seeking a DevOps Engineer to create, maintain and automate AI/ML cloud technologies with a focus on the continued migration from VMs into Kubernetes for all applicable technology solutions supporting the Machinify Cloud and to do so with an eye to the best future implementation while solving the problems of today. We do everything at big data scale, high uptime and with an eye to incredible customer experience. Machinifys healthcare customers are at the center of everything we do, so we employ innovative thinkers to solve issues our customers don't even have yet and do so with operational excellence. Those innovative thinkers, our people, are the core of what makes Machinify differentiated in Healthcare. Everyone has a voice. Our teams are diverse and thrive on trust. We are humans who understand our customer and work collaboratively to deliver value and make a difference. The DevOps Engineer will provide technical leadership in significant technical, automation, programming, system administration, operational, and software configuration management through partnering with DevOps and Engineering as a whole. This vital team member will have responsibility for design, engineering, development and integration within production and pre-production environments, and will also be responsible for the programs configuration management, including the planning, design, engineering, implementation and execution of successful build and deployment of code updates to each upstream environment along with planning and implementing the configuration management of all underlying technologies. What youll do: Facilitate the movement of VM technologies into Kubernetes through migration or replacement. Automate everything. Nothing should require manual intervention. We routinely redeploy from the ground up to ensure automation is up to date Architect solutions to achieve a high level of performance, reliability, scalability, and security Create, maintain and troubleshoot distributed compute AI/ML technologies running in the Cloud Collaborate with a great team and learn from each other Change the way healthcare companies manage their business Be challenged when faced with solving complex problems Bring a passion for improving the lives of others by making their jobs easier and more productive Be responsible and accountable for everything you build and support Communicate effectively with other engineers in the same team, with other teams and with various other stakeholders such as product managers Operate in an Agile development environment
### Requirements
N/A
### Qualifications
Experience in the migration of VM Technologies into a Kubernetes Environment 5+ years of production support preferably in a Cloud Environment (AWS, Azure or GCP) Degree in Computer Science or equivalent work experience Extremely logical with the ability to solve problems in creative and efficient ways Knowledge / Experience in the following areas Containerization with Kubernetes Scripting (python, shell etc) Crossplane / Terraform (Infrastructure as code) Linux (CentOS/RHEL) Spark / Machine Learning running in the Cloud Frameworks for distributed machine-learning / AI, such as Azure OpenAI, AWS Bedrock or things like Tensorflow and MxNet. Good understanding of Operations security practices Working in / creating compliant environments such as Hi-Trust / SOC2 etc Continuous Integration/Continuous Deployment frameworks. Citus (Distributed Postgres a plus) You are scrappy, fast, adaptable and ambitious Critical thinking and problem solving skills
---
## Job: DevOps Engineer
- **Keyword**: DevOps Engineer location:New York
- **Company**: NV5
- **Location**: N/A
- **Nature of Work**: N/A
- **Salary Range**: N/A
- **Job ID**: 4325301423
- **Category**: DevOps Engineer
- **Scraped At**: 2025-12-05T16:03:56.440663
- **URL**: <https://careers-nv5.icims.com/jobs/11535/devops-engineer/login?mobile=false&width=1369&height=500&bga=true&needsRedirect=false&jan1offset=60&jun1offset=60>
### Description
N/A
### Requirements
N/A
### Qualifications
N/A
---
## Job: Data Scientist (Remote - US)
- **Keyword**: Data Scientist location:New York
- **Company**: Jobgether
- **Location**: US
- **Nature of Work**: Remote
- **Salary Range**: N/A
- **Job ID**: 4342169293
- **Category**: Data Scientist
- **Scraped At**: 2025-12-05T16:48:07.597147
- **URL**: <https://jobs.lever.co/jobgether/1e254d1b-5d4f-4060-8eb7-705a9ae77646/apply?source=LinkedIn>
### Description
N/A
### Requirements
N/A
### Qualifications
N/A
---
## Job: Data Scientist (Kaggle-Grandmaster)
- **Keyword**: Data Scientist location:New York
- **Company**: Mercor
- **Location**: N/A
- **Nature of Work**: Remote
- **Salary Range**: $56-$77 / hr
- **Job ID**: 4342199585
- **Category**: Data Scientist
- **Scraped At**: 2025-12-05T16:50:50.900310
- **URL**: <https://work.mercor.com/jobs/list_AAABmuPnQVAFcCPPhAJMHJKY/data-scientist-kaggle-grandmaster?referralCode=d12bb6d7-56b2-4c5d-b2aa-751065941704&utm_source=referral&utm_medium=share&utm_campaign=job_referral>
### Description
Role Description Mercor is hiring on behalf of a leading AI research lab to bring on a highly skilled Data Scientist with a Kaggle Grandmaster profile. In this role, you will transform complex datasets into actionable insights, high-performing models, and scalable analytical workflows. You will work closely with researchers and engineers to design rigorous experiments, build advanced statistical and ML models, and develop data-driven frameworks to support product and research decisions. What Youll Do Analyze large, complex datasets to uncover patterns, develop insights, and inform modeling direction Build predictive models, statistical analyses, and machine learning pipelines across tabular, time-series, NLP, or multimodal data Design and implement robust validation strategies, experiment frameworks, and analytical methodologies Develop automated data workflows, feature pipelines, and reproducible research environments Conduct exploratory data analysis (EDA), hypothesis testing, and model-driven investigations to support research and product teams Translate modeling outcomes into clear recommendations for engineering, product, and leadership teams Collaborate with ML engineers to productionize models and ensure data workflows operate reliably at scale Present findings through well-structured dashboards, reports, and documentation
### Requirements
N/A
### Qualifications
Qualifications Kaggle Competitions Grandmaster or comparable achievement: top-tier rankings, multiple medals, or exceptional competition performance 35+ years of experience in data science or applied analytics Strong proficiency in Python and data tools (Pandas, NumPy, Polars, scikit-learn, etc.) Experience building ML models end-to-end: feature engineering, training, evaluation, and deployment Solid understanding of statistical methods, experiment design, and causal or quasi-experimental analysis Familiarity with modern data stacks: SQL, distributed datasets, dashboards, and experiment tracking tools Excellent communication skills with the ability to clearly present analytical insights Nice to Have Strong contributions across multiple Kaggle tracks (Notebooks, Datasets, Discussions, Code) Experience in an AI lab, fintech, product analytics, or ML-focused organization Knowledge of LLMs, embeddings, and modern ML techniques for text, images, and multimodal data Experience working with big data ecosystems (Spark, Ray, Snowflake, BigQuery, etc.) Familiarity with statistical modeling frameworks such as Bayesian methods or probabilistic programming
---
## Job: Data Engineer
- **Keyword**: Data Scientist location:New York
- **Company**: Tithe.ly
- **Location**: Lagos, Lagos State, Nigeria
- **Nature of Work**: N/A
- **Salary Range**: N/A
- **Job ID**: 4339262912
- **Category**: Data Scientist
- **Scraped At**: 2025-12-05T16:53:19.169174
- **URL**: <https://www.linkedin.com/jobs/view/4339262912/?eBP=NOT_ELIGIBLE_FOR_CHARGING&refId=90LYJbgaP%2FlZrP8CD4Vdzg%3D%3D&trackingId=rDpj810DVM4VmjwzqTCxcQ%3D%3D&trk=flagship3_search_srp_jobs>
### Description
N/A
### Requirements
N/A
### Qualifications
N/A
---
## Job: Data Engineering Scientist
- **Keyword**: Data Scientist location:New York
- **Company**: Birdy Grey
- **Location**: United States
- **Nature of Work**: Remote
- **Salary Range**: N/A
- **Job ID**: 4322321659
- **Category**: Data Scientist
- **Scraped At**: 2025-12-05T16:57:17.957749
- **URL**: <https://job-boards.greenhouse.io/birdygrey/jobs/4970284007?gh_src=daef57357us>
### Description
THE COMPANY: BIRDY GREY Birdy Grey is a direct-to-consumer brand whose mission is to celebrate friendships during one of the most important milestones in a persons life: their wedding. Founded in 2017 by best friends Grace Lee (Founder & Chief Creative Officer) and Monica Ashauer (Co-Founder & Chief Strategy Officer), Birdy Grey offers affordable bridesmaid dresses starting at just $89, groomsmen suits starting at $199, plus fun gifts and accessories for everyone in the wedding party. Since day one, we've dressed over 2 million bridesmaids and we're proud to be a trusted resource for brides and grooms on their most cherished day. POSITION: Data Engineering Scientist REPORTS TO: Director, Data & Analytics LOCATION: Remote Headquartered in Los Angeles, CA with an office in New York, NY, Birdy Grey supports remote work for eligible roles. We ask that all employees travel to either office once a quarter. This role is not eligible for visa sponsorship. #LI-Remote We're looking for a Data Engineering Scientist who thrives on variety. This isn't a role where you'll specialize in one narrow area, you'll be building pipelines, analyzing data, creating dashboards, and developing models. If you are energized by wearing multiple hats, statistically rigorous, hungry to learn new things, and eager to observe the direct impact of your work, this role is for you. SCOPE OF RESPONSIBILITIES Data Science (30-40%) Lead the application of statistical and machine learning methodologies (using Python and relevant frameworks) to solve core business problems, focusing on predictive and prescriptive outcomes Co-design rigorous A/B and multivariate experiments, ensuring statistical validity to accurately measure the impact of product and business changes Identify opportunities where machine learning or advanced analytics can add value Prototype data-driven solutions to business problems Data Engineering (20-30%) Architect and Deploy robust data pipelines to collect, transform, and load data from various sources Design and optimize data storage solutions and database schemas Ensure data quality, reliability, and accessibility across the organization Automate repetitive data processes and workflows Establish and enforce data governance and security standards within our Cloud environment Data Analytics (30-40%) Serve as the embedded strategic data partner for key business teams, translating complex challenges into measurable analytical projects Design and manage high-impact, self-service business intelligence assets (primarily in Looker) that accelerate organizational decision velocity Conduct ad-hoc analyses to uncover insights and opportunities Translate complex findings into clear, actionable recommendations
### Requirements
THE RIGHT CANDIDATE: QUALIFICATIONS & PERSONAL ATTRIBUTES EDUCATION: Bachelors Degree Required EXPERIENCE / REQUIREMENTS: 5+ years of hands-on experience in a data science, data engineering, data analyst, analytics engineering, or ML role Expert SQL skills. Must be adept at designing, optimizing, and tuning complex queries, stored procedures, and scripts, specifically utilizing analytic window functions, CTEs, and advanced join techniques Expertise with Python (preferably) or R for data analysis and automation Expertise with at least one data visualization tool, preferably Looker Experience with Cloud data platforms, DevOps, MLOps (AWS, GCP, Azure) Knowledge of data orchestration tools (Airflow, Prefect, dbt, Dagster, etc.) Experience with ML frameworks (scikit-learn, TensorFlow, PyTorch) Experience with version control (Git) and software engineering best practices Familiarity with marketing analytics, retail, and econometric principles Understanding of basic statistics and when to apply different analytical approaches Ability to communicate technical concepts to non-technical stakeholders Comfortable with ambiguity and figuring things out independently NICE TO HAVES: Start-up experience Interest in bridal and fashion Experience with ticketing systems and change management processes Interest in increasing productivity via Automation & AI Start-up or D2C/e-commerce experience
### Qualifications
N/A
---

View File

@ -1,28 +0,0 @@
from scraping_engine import FingerprintScrapingEngine
from job_scraper import LinkedInJobScraper
import os
import asyncio
async def main():
engine = FingerprintScrapingEngine(
seed="job_scraping_engine",
target_os="windows",
db_path="job_listings.db",
markdown_path="job_listings.md",
search_keywords="Data Anaylst"
)
scraper = LinkedInJobScraper(engine, human_speed=1.6)
await scraper.scrape_jobs(
search_keywords="Data Anaylst", # ← Your search terms
max_pages=3,
credentials={
"email": os.getenv("SCRAPING_USERNAME"),
"password": os.getenv("SCRAPING_PASSWORD")
}
)
if __name__ == "__main__":
asyncio.run(main())

310
llm_agent.py Normal file
View File

@ -0,0 +1,310 @@
from openai import OpenAI
from typing import Dict, Any
import asyncio
import psycopg2
import os
from datetime import datetime
import json
import re
from bs4 import BeautifulSoup
from dotenv import load_dotenv
# Load environment variables from .env
load_dotenv()
class LLMJobRefiner:
def __init__(self):
deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
if not deepseek_api_key:
raise ValueError("DEEPSEEK_API_KEY not found in .env file.")
# Database credentials from .env
self.db_url = os.getenv("DB_URL")
self.db_username = os.getenv("DB_USERNAME")
self.db_password = os.getenv("DB_PASSWORD")
self.db_host = os.getenv("DB_HOST")
self.db_port = os.getenv("DB_PORT")
if not self.db_url or not self.db_username or not self.db_password:
raise ValueError("Database credentials not found in .env file.")
# DeepSeek uses OpenAI-compatible API
self.client = OpenAI(
api_key=deepseek_api_key,
base_url="https://api.deepseek.com/v1"
)
self.model = "deepseek-chat"
self._init_db()
def _init_db(self):
"""Initialize PostgreSQL database connection and create table"""
try:
self.db_url = os.getenv("DB_URL")
if self.db_url and "supabase.com" in self.db_url:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
else:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id SERIAL PRIMARY KEY,
title TEXT,
company_name TEXT,
location TEXT,
description TEXT,
requirements TEXT,
qualifications TEXT,
salary_range TEXT,
nature_of_work TEXT,
job_id TEXT UNIQUE,
url TEXT,
category TEXT,
scraped_at TIMESTAMP,
posted_date TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Ensure the uniqueness constraint exists
cursor.execute('''
ALTER TABLE jobs DROP CONSTRAINT IF EXISTS jobs_job_id_key;
ALTER TABLE jobs ADD CONSTRAINT jobs_job_id_key UNIQUE (job_id);
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_job_id ON jobs(job_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON jobs(category)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_posted_date ON jobs(posted_date)')
conn.commit()
cursor.close()
conn.close()
print("✅ PostgreSQL database initialized successfully")
except Exception as e:
print(f"❌ Database initialization error: {e}")
raise
def _clean_html_for_llm(self, html_content: str) -> str:
"""Clean HTML to make it more readable for LLM while preserving structure"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
# Extract text but keep some structure
text = soup.get_text(separator=' ', strip=True)
# Clean up whitespace
text = re.sub(r'\s+', ' ', text)
# Limit length for LLM context
if len(text) > 10000:
text = text[:10000] + "..."
return text
except Exception as e:
print(f"HTML cleaning error: {e}")
# Fallback to raw content if cleaning fails
return html_content[:100000] if len(html_content) > 100000 else html_content
def _generate_content_sync(self, prompt: str) -> str:
"""Synchronous call to DeepSeek API"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=2048,
stream=False
)
return response.choices[0].message.content or ""
except Exception as e:
print(f"DeepSeek API error: {e}")
return ""
async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]:
page_content = raw_data.get('page_content', '')
cleaned_content = self._clean_html_for_llm(page_content)
job_id = raw_data.get('job_id', 'unknown')
url = raw_data.get('url', 'N/A')
posted_date = raw_data.get('posted_date', datetime.now().strftime("%m/%d/%y"))
prompt = f"""
You are an expert job posting parser. Extract information EXACTLY as it appears in the text. DO NOT summarize, paraphrase, or invent.
CRITICAL INSTRUCTIONS:
- The job is from AMAZON. Look for these exact section headings:
- "## Basic Qualifications" extract as "qualifications"
- "## Preferred Qualifications" include this in "qualifications" too
- "## Description" or "About the Role" or "Key job responsibilities" extract as "description"
- "You Will:" or "Job responsibilities" include in "description"
- Requirements are often embedded in qualifications or description
FIELD RULES:
- description: MUST include ALL role details, responsibilities, and overview. Never "Not provided" if any job description exists.
- qualifications: MUST include ALL content from "Basic Qualifications" and "Preferred Qualifications" sections. Combine them.
- requirements: If no separate "requirements" section, extract required skills/experience from qualifications/description.
- For Amazon jobs, company_name = "Amazon".
REQUIRED FIELDS (must have valid values, never "N/A"):
- title, company_name, job_id, url
OPTIONAL FIELDS (can be "Not provided"):
- location, salary_range, nature_of_work
Page Content:
{cleaned_content}
Response format (ONLY return this JSON):
{{
"title": "...",
"company_name": "...",
"location": "...",
"description": "...",
"qualifications": "...",
"salary_range": "...",
"nature_of_work": "...",
"job_id": "{job_id}",
"url": "{url}"
}}
"""
try:
response_text = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._generate_content_sync(prompt)
)
refined_data = self._parse_llm_response(response_text)
if not refined_data:
return None
# Validate required fields
required_fields = ['title', 'company_name', 'job_id', 'url']
for field in required_fields:
if not refined_data.get(field) or refined_data[field].strip() in ["N/A", "", "Unknown", "Company", "Job"]:
return None
# CRITICAL: Validate content fields - check if they SHOULD exist
content_fields = ['description', 'qualifications']
cleaned_original = cleaned_content.lower()
# Simple heuristic: if page contains job-related keywords, content fields should NOT be "Not provided"
job_indicators = ['responsibilit', 'duties', 'require', 'qualifi', 'skill', 'experienc', 'educat', 'degree', 'bachelor', 'master']
has_job_content = any(indicator in cleaned_original for indicator in job_indicators)
if has_job_content:
for field in content_fields:
value = refined_data.get(field, "").strip()
if value in ["Not provided", "N/A", ""]:
# LLM failed to extract existing content
print(f" ⚠ LLM returned '{value}' for {field} but job content appears present")
return None
# Add the posted_date to the refined data
refined_data['posted_date'] = posted_date
return refined_data
except Exception as e:
print(f"LLM refinement failed: {str(e)}")
return None
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL)
if not json_match:
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if not json_match:
return None
try:
return json.loads(json_match.group(1) if '```' in response_text else json_match.group(0))
except json.JSONDecodeError:
return None
async def save_job_data(self, job_data: Dict[str, Any], keyword: str):
await self._save_to_db(job_data)
await self._save_to_markdown(job_data, keyword)
async def _save_to_db(self, job_data: Dict[str, Any]):
"""Save job data to PostgreSQL database with job_id uniqueness"""
try:
conn = psycopg2.connect(
host=self.db_host,
port=self.db_port,
database="postgres",
user=self.db_username,
password=self.db_password
)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO jobs
(title, company_name, location, description, requirements,
qualifications, salary_range, nature_of_work, job_id, url, category, scraped_at, posted_date)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (job_id) DO NOTHING
''', (
job_data.get("title", "N/A"),
job_data.get("company_name", "N/A"),
job_data.get("location", "N/A"),
job_data.get("description", "N/A"),
job_data.get("requirements", "N/A"),
job_data.get("qualifications", "N/A"),
job_data.get("salary_range", "N/A"),
job_data.get("nature_of_work", "N/A"),
job_data.get("job_id", "N/A"),
job_data.get("url", "N/A"),
job_data.get("category", "N/A"),
job_data.get("scraped_at"),
job_data.get("posted_date", "N/A")
))
conn.commit()
cursor.close()
conn.close()
print(f" 💾 Saved job to category '{job_data.get('category', 'N/A')}' with job_id: {job_data.get('job_id', 'N/A')}")
except Exception as e:
print(f"❌ Database save error: {e}")
async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str):
os.makedirs("linkedin_jobs", exist_ok=True)
filepath = os.path.join("linkedin_jobs", "linkedin_jobs_scraped.md")
write_header = not os.path.exists(filepath) or os.path.getsize(filepath) == 0
with open(filepath, "a", encoding="utf-8") as f:
if write_header:
f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n")
f.write(f"- *Keyword*: {keyword}\n")
f.write(f"- *Company*: {job_data.get('company_name', 'N/A')}\n")
f.write(f"- *Location*: {job_data.get('location', 'N/A')}\n")
f.write(f"- *Nature of Work*: {job_data.get('nature_of_work', 'N/A')}\n")
f.write(f"- *Salary Range*: {job_data.get('salary_range', 'N/A')}\n")
f.write(f"- *Job ID*: {job_data.get('job_id', 'N/A')}\n")
f.write(f"- *Posted Date*: {job_data.get('posted_date', 'N/A')}\n")
f.write(f"- *Category*: {job_data.get('category', 'N/A')}\n")
f.write(f"- *Scraped At*: {job_data.get('scraped_at', 'N/A')}\n")
f.write(f"- *URL*: <{job_data.get('url', 'N/A')}>\n\n")
f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n")
f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n")
f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n")
f.write("---\n\n")

View File

@ -6,10 +6,12 @@ import hashlib
import random
import os
import json
from typing import List, Optional, Dict
from playwright.async_api import Page
from typing import List, Optional, Dict, Any
from browserforge.fingerprints import FingerprintGenerator
from dotenv import load_dotenv
from config import load_spoof_config
import time
# Load environment variables
load_dotenv()
@ -24,8 +26,7 @@ class FingerprintScrapingEngine:
db_path: str = "jobs.db",
markdown_path: str = "scraped_jobs.md",
proxies: List[str] = None,
login_credentials: Optional[Dict[str, str]] = None,
search_keywords: Optional[str] = None
login_credentials: Optional[Dict[str, str]] = None
):
if target_os not in ['windows', 'macos']:
raise ValueError("operating_system must be 'windows' or 'macos'")
@ -42,29 +43,40 @@ class FingerprintScrapingEngine:
self.markdown_path = markdown_path
self.proxies = proxies or []
self.login_credentials = login_credentials
self.search_keywords = search_keywords
self.fingerprint_generator = FingerprintGenerator(
browser=('chrome',),
os=(self.os,)
)
self.num_variations = num_variations
# Load spoof config
spoof_config = load_spoof_config()
self.common_renderers = spoof_config["renderers"]
self.common_vendors = spoof_config["vendors"]
# Feedback system
self.feedback_file = f"feedback_{seed}.json"
# Feedback system
self.feedback = self._load_feedback()
# ← NEW: Session persistence paths
self.session_dir = "browser_sessions"
os.makedirs(self.session_dir, exist_ok=True)
self.session_path = os.path.join(self.session_dir, f"{seed}_session.json")
self.session_path = os.path.join(
self.session_dir, f"{seed}_session.json")
def _load_feedback(self):
self.optimization_params = {
"base_delay": 2.0,
"max_concurrent_requests": 4,
"request_timeout": 120000,
"retry_attempts": 3,
"captcha_handling_strategy": "avoid", # or "solve_fallback"
"cloudflare_wait_strategy": "smart_wait", # or "aggressive_reload"
}
self._update_params_from_feedback()
def _load_feedback(self) -> Dict[str, Any]:
if os.path.exists(self.feedback_file):
try:
with open(self.feedback_file, "r") as f:
@ -72,6 +84,8 @@ class FingerprintScrapingEngine:
data.setdefault("success_rate", 1.0)
data.setdefault("captcha_count", 0)
data.setdefault("cloudflare_count", 0)
data.setdefault("avg_response_time", 10.0) # New metric
data.setdefault("failed_domains", {}) # New metrice
return data
except:
pass
@ -81,16 +95,69 @@ class FingerprintScrapingEngine:
with open(self.feedback_file, "w") as f:
json.dump(self.feedback, f)
def report_outcome(self, outcome: str):
def report_outcome(self, outcome: str, url: Optional[str] = None, response_time: Optional[float] = None):
if outcome == "success":
self.feedback["success_rate"] = min(1.0, self.feedback["success_rate"] + 0.1)
self.feedback["success_rate"] = min(
1.0, self.feedback["success_rate"] + 0.05) # Smaller increment
else:
self.feedback["success_rate"] = max(0.1, self.feedback["success_rate"] - 0.2)
if outcome == "captcha":
self.feedback["captcha_count"] += 1
elif outcome == "cloudflare":
self.feedback["cloudflare_count"] += 1
self.feedback["success_rate"] = max(
0.05, self.feedback["success_rate"] - 0.1) # Smaller decrement
if outcome == "captcha":
self.feedback["captcha_count"] += 1
# Adapt strategy if many captchas
self.optimization_params["captcha_handling_strategy"] = "solve_fallback"
elif outcome == "cloudflare":
self.feedback["cloudflare_count"] += 1
# Adjust wait strategy based on frequency
if self.feedback["cloudflare_count"] > 5:
self.optimization_params["cloudflare_wait_strategy"] = "aggressive_reload"
# Track domain-specific failures
if url and outcome != "success":
domain = url.split("//")[1].split("/")[0]
if domain not in self.feedback["failed_domains"]:
self.feedback["failed_domains"][domain] = 0
self.feedback["failed_domains"][domain] += 1
# Update average response time
if response_time:
prev_avg = self.feedback.get("avg_response_time", 10.0)
# Simple moving average
self.feedback["avg_response_time"] = (
prev_avg * 0.9) + (response_time * 0.1)
self.save_feedback()
self._update_params_from_feedback() # Update params based on new feedback
def _update_params_from_feedback(self):
"""Adjust optimization parameters based on feedback."""
sr = self.feedback["success_rate"]
cc = self.feedback["captcha_count"]
cf = self.feedback["cloudflare_count"]
avg_rt = self.feedback.get("avg_response_time", 10.0)
# Adjust base delay based on success rate and avg response time
if sr < 0.6:
self.optimization_params["base_delay"] = max(
5.0, self.optimization_params["base_delay"] * 1.2)
elif sr > 0.8:
self.optimization_params["base_delay"] = min(
3.0, self.optimization_params["base_delay"] * 0.9)
# Reduce concurrency if many captchas/cloudflares
if cc > 3 or cf > 3:
self.optimization_params["max_concurrent_requests"] = max(
2, self.optimization_params["max_concurrent_requests"] - 2)
else:
# Reset to default
self.optimization_params["max_concurrent_requests"] = 4
# Increase timeout if avg response time is high
if avg_rt > 20:
self.optimization_params["request_timeout"] = 150000 # 90 seconds
print(f"Optimization Params Updated: {self.optimization_params}")
# ← NEW: Save browser context (cookies + localStorage)
async def save_session(self, context):
@ -131,7 +198,8 @@ class FingerprintScrapingEngine:
if self.feedback["success_rate"] < 0.5:
concurrency_options = [8, 4]
memory_options = [8]
profile.navigator.hardwareConcurrency = random.choice(concurrency_options)
profile.navigator.hardwareConcurrency = random.choice(
concurrency_options)
profile.navigator.deviceMemory = random.choice(memory_options)
return profile
@ -246,23 +314,6 @@ class FingerprintScrapingEngine:
await asyncio.sleep(random.uniform(0.2, 1.0))
except:
pass
async def _detect_cloudflare(self, page) -> bool:
content = await page.content()
return (
"#cf-chl" in content or
"checking your browser" in content.lower() or
"just a moment" in content.lower()
)
async def _handle_cloudflare(self, page, max_retries: int = 3):
for i in range(max_retries):
if not await self._detect_cloudflare(page):
return True
print(f"☁️ Cloudflare detected - waiting... (attempt {i+1})")
await asyncio.sleep(8 + random.uniform(2, 5))
await page.wait_for_load_state("load", timeout=60000)
return False
async def _avoid_captcha(self, page) -> bool:
await asyncio.sleep(2 + random.random() * 3)
@ -270,7 +321,7 @@ class FingerprintScrapingEngine:
await self._simulate_human_interaction(page)
await asyncio.sleep(3 + random.random() * 2)
return True
async def _solve_captcha_fallback(self, page) -> bool:
await asyncio.sleep(15 + random.random() * 10)
captcha_content = await page.content()
@ -285,3 +336,42 @@ class FingerprintScrapingEngine:
return True
return False
async def _detect_cloudflare(self, page: Page) -> bool:
"""Detect Cloudflare challenges."""
content = await page.content()
return (
"#cf-chl" in content
or "checking your browser" in content.lower()
or "just a moment" in content.lower()
or "turnstile" in content.lower() # Check for Cloudflare Turnstile
)
async def _handle_cloudflare(self, page: Page) -> bool:
"""
Handle Cloudflare challenges, including Turnstile if present.
This is a simplified approach; real-world handling might require more sophisticated logic or external solvers.
"""
max_wait_time = 60 # Total time to wait for Cloudflare to resolve
start_time = time.time()
while time.time() - start_time < max_wait_time:
if not await self._detect_cloudflare(page):
print("Cloudflare challenge resolved.")
return True
print("Cloudflare active, waiting...")
# Simulate more human-like behavior while waiting
await self._simulate_human_interaction(page)
# Wait for a random period, increasing slightly each time
wait_time = min(10, 2 + random.uniform(1, 3) +
(time.time() - start_time) * 0.1)
await asyncio.sleep(wait_time)
# Reload occasionally to trigger potential client-side checks
if (time.time() - start_time) > 15 and (time.time() - start_time) % 20 < 2:
print("Reloading page during Cloudflare wait...")
await page.reload(wait_until='load', timeout=80000)
print("Timeout waiting for Cloudflare resolution.")
return False

31
trim.py Normal file
View File

@ -0,0 +1,31 @@
# Keep cycling through all job titles
while True:
# Shuffle job titles to randomize order
random.shuffle(job_titles)
for job_title in job_titles:
search_keywords = f"{job_title} location:{fixed_location}"
print(f"\n{'='*60}")
print(f"Starting scrape for: {search_keywords}")
print(f"{'='*60}")
await scraper.scrape_jobs(
search_keywords=search_keywords,
credentials={
"email": os.getenv("SCRAPING_USERNAME"),
"password": os.getenv("SCRAPING_PASSWORD")
}
)
print(f"\n✅ Completed scraping for: {job_title}")
print(f"⏳ Waiting 2 minutes before next job title...")
# Wait 2 minutes before next job title
time.sleep(120)
print(f"\n✅ Completed full cycle of all job titles")
print(f"🔄 Starting new cycle...")
if _name_ == "_main_":
asyncio.run(main())