Compare commits

...

2 Commits

Author SHA1 Message Date
fd4e8c9c05 feat(scraper): add LLM-powered job data refinement and new scraping logic
- Implement LLMJobRefiner class for processing job data with Gemini API
- Add new job_scraper2.py with enhanced scraping capabilities
- Remove search_keywords parameter from scraping engine
- Add environment variable loading in config.py
- Update main script to use new scraper and target field
2025-11-24 12:25:50 +01:00
7dca4c9159 refactor(job_scraper): improve page loading and typing in linkedin scraper
- Change page load strategy from 'load' to 'domcontentloaded' and 'networkidle' for better performance
- Make search_keywords parameter optional to handle empty searches
- Update type imports to include List for better type hints
- Set headless mode to true by default for production use
2025-11-23 09:27:05 +01:00
6 changed files with 701 additions and 14 deletions

View File

@ -2,6 +2,16 @@
import os
import json
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# LLM Agent Configuration
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
raise ValueError("GEMINI_API_KEY environment variable not set in .env file")
def load_spoof_config():
"""Load spoof data from JSON config file. Falls back to defaults if missing."""

View File

@ -5,7 +5,7 @@ import random
import sqlite3
import os
from datetime import datetime
from typing import Optional, Dict
from typing import Optional, Dict, List
from playwright.async_api import async_playwright
from browserforge.injectors.playwright import AsyncNewContext
@ -200,7 +200,7 @@ class LinkedInJobScraper:
async def scrape_jobs(
self,
search_keywords: str,
search_keywords: Optional[str],
max_pages: int = 1,
credentials: Optional[Dict] = None
):
@ -214,7 +214,7 @@ class LinkedInJobScraper:
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=False,
headless= False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
@ -269,7 +269,7 @@ class LinkedInJobScraper:
return
print(f"🔍 Searching for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=60000)
await page.goto(search_url, wait_until='networkidle', timeout=60000)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
if await self.engine._detect_cloudflare(page):

510
job_scraper2.py Normal file
View File

@ -0,0 +1,510 @@
import asyncio
import random
import sqlite3
import os
from datetime import datetime
from typing import Optional, Dict, List
from playwright.async_api import async_playwright
from browserforge.injectors.playwright import AsyncNewContext
from llm_agent import LLMJobRefiner
import re
class LinkedInJobScraper:
def __init__(
self,
engine,
db_path: str = "linkedin_jobs.db",
human_speed: float = 1.0,
target_field: str = "all"
):
self.engine = engine
self.db_path = db_path
self.human_speed = human_speed
self.target_field = target_field
self._init_db()
self.llm_agent = LLMJobRefiner()
def _init_db(self):
os.makedirs(os.path.dirname(self.db_path) if os.path.dirname(self.db_path) else ".", exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
company_name TEXT,
location TEXT,
description TEXT,
requirements TEXT,
qualifications TEXT,
salary_range TEXT,
nature_of_work TEXT,
job_id TEXT,
url TEXT UNIQUE
)
''')
conn.commit()
async def _human_click(self, page, element, wait_after: bool = True):
if not element:
return False
await element.scroll_into_view_if_needed()
await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed)
try:
await element.click()
if wait_after:
await asyncio.sleep(random.uniform(2, 4) * self.human_speed)
return True
except:
return False
async def _login(self, page, credentials: Dict) -> bool:
"""Human-realistic LinkedIn login"""
print("🔐 Navigating to LinkedIn login page...")
await page.goto("https://www.linkedin.com/login", timeout=60000)
await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed)
email_field = await page.query_selector('input[name="session_key"]')
if not email_field:
print("❌ Email field not found.")
return False
print("✍️ Typing username...")
await email_field.click()
await asyncio.sleep(random.uniform(0.4, 0.9) * self.human_speed)
for char in credentials["email"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.06, 0.14) * self.human_speed)
await asyncio.sleep(random.uniform(1.0, 1.8) * self.human_speed)
password_field = await page.query_selector('input[name="session_password"]')
if not password_field:
print("❌ Password field not found.")
return False
print("🔒 Typing password...")
await password_field.click()
await asyncio.sleep(random.uniform(0.3, 0.7) * self.human_speed)
for char in credentials["password"]:
await page.keyboard.type(char)
await asyncio.sleep(random.uniform(0.08, 0.16) * self.human_speed)
await asyncio.sleep(random.uniform(0.8, 1.5) * self.human_speed)
print("✅ Submitting login form...")
await page.keyboard.press("Enter")
for _ in range(15):
current_url = page.url
if "/feed" in current_url or "/jobs" in current_url:
if "login" not in current_url:
print("✅ Login successful!")
await asyncio.sleep(random.uniform(2.0, 3.0) * self.human_speed)
return True
await asyncio.sleep(1)
print("❌ Login may have failed.")
return False
async def _extract_all_page_content(self, page) -> str:
"""Extract all content from the job page"""
await asyncio.sleep(2 * self.human_speed)
# Human-like scrolling to load all content
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
# Get the full page content
page_content = await page.content()
return page_content
def _calculate_keyword_match(self, title: str, keywords: str) -> float:
"""Calculate percentage of keywords matched in title"""
if not title or not keywords:
return 0.0
title_lower = title.lower()
keyword_list = [kw.strip().lower() for kw in keywords.split()]
matches = 0
for keyword in keyword_list:
if keyword in title_lower:
matches += 1
return matches / len(keyword_list) if keyword_list else 0.0
def _extract_location_from_keywords(self, search_keywords: str) -> str:
"""Extract location from search keywords if present"""
location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE)
if location_match:
return location_match.group(1).strip().lower()
return ""
async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links):
"""Scrape job links from the current page that match keywords and location"""
current_links = await page.query_selector_all("a[href*='/jobs/view/']")
new_jobs = 0
# Extract location from search keywords
location_from_keywords = self._extract_location_from_keywords(search_keywords)
for link in current_links:
href = await link.get_attribute("href")
if href:
full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}"
job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href
if job_id and job_id not in seen_job_ids:
# Check if job title matches keywords (at least 70% match)
title_element = await link.query_selector("span.job-title, h3, .job-card-title")
if title_element:
title = await title_element.inner_text()
match_percentage = self._calculate_keyword_match(title, search_keywords)
# Check if location matches (if specified in keywords)
location_match = True
if location_from_keywords:
# Try to get location from the job card
location_element = await link.query_selector("span.job-location, .job-card-location, .location")
if location_element:
location_text = await location_element.inner_text()
location_match = location_from_keywords in location_text.lower()
if match_percentage >= 0.7 and location_match: # At least 70% match and location matches
seen_job_ids.add(job_id)
all_job_links.append((href, title))
new_jobs += 1
elif match_percentage < 0.7:
print(f" ⚠️ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})")
elif not location_match:
print(f" ⚠️ Skipping job due to location mismatch: {title[:50]}... (expected: {location_from_keywords})")
else:
# If no title element, still add to check later
seen_job_ids.add(job_id)
all_job_links.append((href, "Unknown Title"))
new_jobs += 1
return new_jobs
async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links):
"""Handle pagination by going through pages"""
current_page = 1
while True:
print(f"📄 Processing page {current_page}")
# Collect job links on current page
new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})")
# Try to go to next page
next_btn = await page.query_selector("button[aria-label='Next']")
if next_btn and await next_btn.is_enabled():
await self._human_click(page, next_btn)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
# Wait for URL to change or new content
try:
await page.wait_for_function("() => window.location.href.includes('start=')", timeout=30000)
except:
pass
current_page += 1
else:
print("🔚 'Next' button not available — stopping pagination.")
break
async def _handle_infinite_scroll(self, page, search_keywords: str, seen_job_ids, all_job_links):
"""Handle infinite scroll to load more jobs"""
last_height = await page.evaluate("document.body.scrollHeight")
no_new_jobs_count = 0
max_no_new = 3
while no_new_jobs_count < max_no_new:
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
new_jobs_found = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {new_jobs_found} new job(s) (total: {len(all_job_links)})")
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == last_height:
no_new_jobs_count += 1
else:
no_new_jobs_count = 0
last_height = new_height
if new_jobs_found == 0 and no_new_jobs_count >= 1:
print("🔚 No new jobs loaded. Stopping scroll.")
break
async def scrape_jobs(
self,
search_keywords: Optional[str],
max_pages: int = 1,
credentials: Optional[Dict] = None
):
# Parse location from keywords if present
location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE)
location = location_match.group(1).strip() if location_match else ""
# Remove location part from keywords for search
clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip()
encoded_keywords = clean_keywords.replace(" ", "%20")
# Build search URL with location if specified
search_url = f"https://www.linkedin.com/jobs/search/?keywords={encoded_keywords}"
if location:
search_url += f"&location={location.replace(' ', '%20')}"
profile = self.engine._select_profile()
renderer = random.choice(self.engine.common_renderers[self.engine.os])
vendor = random.choice(self.engine.common_vendors)
spoof_script = self.engine._get_spoof_script(renderer, vendor)
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless= False,
args=['--disable-blink-features=AutomationControlled']
)
context = await AsyncNewContext(browser, fingerprint=profile)
await context.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }});
Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }});
Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }});
""")
await context.add_init_script(spoof_script)
page = await context.new_page()
session_loaded = await self.engine.load_session(context)
login_successful = False
if session_loaded:
print("🔁 Using saved session — verifying login...")
await page.goto("https://www.linkedin.com/feed/", timeout=60000)
if "feed" in page.url and "login" not in page.url:
print("✅ Session still valid.")
login_successful = True
else:
print("⚠️ Saved session expired — re-authenticating.")
session_loaded = False
if not session_loaded and credentials:
print("🔐 Performing fresh login...")
login_successful = await self._login(page, credentials)
if login_successful:
await self.engine.save_session(context)
else:
print("❌ Login failed. Exiting.")
await browser.close()
self.engine.report_outcome("block")
return
elif not credentials:
print(" No credentials — proceeding as guest.")
login_successful = True
else:
pass
await page.wait_for_load_state("load", timeout=60000)
print("✅ Post-login page fully loaded. Starting search...")
if await self.engine._detect_cloudflare(page):
print("☁️ Cloudflare detected on initial load.")
if not await self.engine._handle_cloudflare(page):
print("❌ Cloudflare could not be resolved.")
await browser.close()
self.engine.report_outcome("cloudflare")
return
print(f"🔍 Searching for: {search_keywords}")
await page.goto(search_url, wait_until='load', timeout=60000)
await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed)
if await self.engine._detect_cloudflare(page):
print("☁️ Cloudflare detected on search page.")
if not await self.engine._handle_cloudflare(page):
await browser.close()
self.engine.report_outcome("cloudflare")
return
all_job_links = []
seen_job_ids = set()
# First, scrape the initial page
print("🔄 Collecting initial job links...")
initial_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
print(f" Found {initial_jobs} initial job(s) (total: {len(all_job_links)})")
# Loop until no new jobs are found
iteration = 1
while True:
print(f"🔄 Iteration {iteration}: Checking for new jobs...")
# First try infinite scroll
prev_job_count = len(all_job_links)
await self._handle_infinite_scroll(page, search_keywords, seen_job_ids, all_job_links)
new_jobs_count = len(all_job_links) - prev_job_count
if new_jobs_count > 0:
print(f" Found {new_jobs_count} new jobs via infinite scroll")
iteration += 1
continue # Continue with infinite scroll if new jobs found
# If no new jobs via scroll, check for pagination
pagination_exists = await page.query_selector("button[aria-label='Next']")
if pagination_exists:
print("⏭️ Pagination detected. Processing pages...")
await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links)
iteration += 1
continue # Continue with pagination if new jobs found
else:
# If no pagination and no new jobs from scroll, check by refreshing
print("🔄 Refreshing page to check for new results...")
await page.reload(wait_until='networkidle')
await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed)
# Check for new jobs after refresh
new_jobs_after_refresh = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links)
if new_jobs_after_refresh > 0:
print(f" Found {new_jobs_after_refresh} new job(s) after refresh")
iteration += 1
continue # Continue if new jobs found after refresh
else:
print("🔚 No new jobs found after refresh. Stopping.")
break
# Limit iterations to prevent infinite loops
if iteration > 10:
print("🔄 Maximum iterations reached. Stopping.")
break
print(f"✅ Collected {len(all_job_links)} unique job links.")
# Process all collected job links
scraped_count = 0
for idx, (href, title) in enumerate(all_job_links):
try:
full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}"
print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}")
await page.goto(full_url, wait_until='load', timeout=60000)
await asyncio.sleep(3 * self.human_speed)
is_cloudflare = await self.engine._detect_cloudflare(page)
page_content = await page.content()
has_captcha_text = "captcha" in page_content.lower()
captcha_present = is_cloudflare or has_captcha_text
title_element = await page.query_selector("h1.t-24")
job_data_accessible = title_element is not None
if captcha_present:
if job_data_accessible:
print(" ⚠️ CAPTCHA detected, but job data is accessible. Proceeding in stealth mode...")
await self.engine._avoid_captcha(page)
else:
print(" ⚠️ CAPTCHA detected and job data blocked. Attempting recovery...")
if not await self.engine._solve_captcha_fallback(page):
print(" ❌ CAPTCHA recovery failed. Skipping job.")
continue
title_element = await page.query_selector("h1.t-24")
if not title_element:
print(" ❌ Job data still unavailable after CAPTCHA handling. Skipping.")
continue
if not captcha_present:
await self.engine._avoid_captcha(page)
apply_btn = None
apply_selectors = [
"button[aria-label*='Apply']",
"button:has-text('Apply')",
"a:has-text('Apply')",
"button:has-text('Easy Apply')"
]
for selector in apply_selectors:
apply_btn = await page.query_selector(selector)
if apply_btn:
break
page_data = None
final_url = full_url
if apply_btn:
print(" → Clicking 'Apply' / 'Easy Apply' button...")
page_waiter = asyncio.create_task(context.wait_for_event("page"))
await self._human_click(page, apply_btn, wait_after=False)
external_page = None
try:
external_page = await asyncio.wait_for(page_waiter, timeout=5.0)
print(" 🌐 External job site opened in new tab.")
await external_page.wait_for_load_state("load", timeout=30000)
await asyncio.sleep(2 * self.human_speed)
await self.engine._human_like_scroll(external_page)
await asyncio.sleep(2 * self.human_speed)
page_data = await self._extract_all_page_content(external_page)
final_url = external_page.url
if not external_page.is_closed():
await external_page.close()
except asyncio.TimeoutError:
print(" 🖥️ No external tab — scraping LinkedIn job page.")
await page.wait_for_timeout(2000)
try:
await page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=8000)
except:
pass
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
page_data = await self._extract_all_page_content(page)
final_url = page.url
else:
print(" ⚠️ No 'Apply' button found — scraping job details directly.")
await self.engine._human_like_scroll(page)
await asyncio.sleep(2 * self.human_speed)
page_data = await self._extract_all_page_content(page)
final_url = page.url
# Extract job ID from URL
job_id = final_url.split("/")[-2] if "/jobs/view/" in final_url else "unknown"
# Prepare raw data for LLM processing
raw_data = {
"page_content": page_data,
"url": final_url,
"job_id": job_id
}
# Send raw data to LLM agent for refinement
refined_data = await self.llm_agent.refine_job_data(raw_data, search_keywords)
# Only save if LLM successfully extracted meaningful data
if refined_data and refined_data.get("title", "N/A") != "N/A":
# Save refined data to markdown and database through LLM agent
await self.llm_agent.save_job_data(refined_data, search_keywords)
scraped_count += 1
print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...")
else:
print(f" 🟡 Could not extract meaningful data from: {final_url}")
except Exception as e:
print(f" ⚠️ Failed on job {idx+1}: {str(e)[:100]}")
continue
finally:
print(" ↩️ Returning to LinkedIn search results...")
await page.goto(search_url, timeout=60000)
await asyncio.sleep(4 * self.human_speed)
await browser.close()
if scraped_count > 0:
self.engine.report_outcome("success")
print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}'.")
else:
self.engine.report_outcome("captcha")
print("⚠️ No jobs processed successfully.")

View File

@ -1,23 +1,26 @@
from scraping_engine import FingerprintScrapingEngine
from job_scraper import LinkedInJobScraper
from job_scraper2 import LinkedInJobScraper
import os
from dotenv import load_dotenv
import asyncio
# Load environment variables
load_dotenv()
async def main():
engine = FingerprintScrapingEngine(
seed="job_scraping_engine",
target_os="windows",
db_path="job_listings.db",
markdown_path="job_listings.md",
search_keywords="Data Anaylst"
markdown_path="job_listings.md"
)
scraper = LinkedInJobScraper(engine, human_speed=1.6)
# Initialize scraper with target field
scraper = LinkedInJobScraper(engine, human_speed=1.6, target_field="Web designer")
await scraper.scrape_jobs(
search_keywords="Data Anaylst", # ← Your search terms
max_pages=3,
search_keywords="Web Designer location:New York",
credentials={
"email": os.getenv("SCRAPING_USERNAME"),
"password": os.getenv("SCRAPING_PASSWORD")
@ -25,4 +28,4 @@ async def main():
)
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())

166
llm_agent.py Normal file
View File

@ -0,0 +1,166 @@
import google.generativeai as genai
from typing import Dict, Any
import asyncio
import sqlite3
import os
from datetime import datetime
from config import GEMINI_API_KEY
class LLMJobRefiner:
def __init__(self):
genai.configure(api_key=GEMINI_API_KEY)
self.model = genai.GenerativeModel('gemini-pro')
async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]:
"""
Refine raw job data using Gemini LLM based on target field
"""
prompt = f"""
You are a job data extraction assistant. Extract the following fields from the job posting:
- title
- company_name
- location
- description
- requirements
- qualifications
- salary_range
- nature_of_work (remote, onsite, or hybrid)
- job_id
Target Field: {target_field}
Raw Page Content:
{raw_data.get('page_content', '')[:3000]} # Limit content size
Instructions:
1. Extract only the information relevant to the target field: {target_field}
2. Clean up any formatting issues in the description
3. Standardize location format (city, state/country)
4. Extract salary range if mentioned in description
5. Determine nature of work (remote, onsite, or hybrid) from work arrangements
6. Ensure all fields are properly formatted
7. If a field cannot be found, use "N/A"
8. Return the refined data in JSON format
Response format (only return the JSON):
{{
"title": "...",
"company_name": "...",
"location": "...",
"description": "...",
"requirements": "...",
"qualifications": "...",
"salary_range": "...",
"nature_of_work": "...",
"job_id": "{raw_data.get('job_id', 'unknown')}",
"url": "{raw_data.get('url', 'N/A')}"
}}
"""
try:
response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.model.generate_content(prompt)
)
# Parse the response and return refined data
refined_data = self._parse_llm_response(response.text)
# If parsing fails, return None
if not refined_data:
return None
return refined_data
except Exception as e:
print(f"LLM refinement failed: {str(e)}")
return None
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
"""
Parse the LLM response to extract refined job data
"""
import json
import re
# Extract JSON from response (handle markdown code blocks)
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
# If no code block, try to find JSON directly
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
json_str = json_match.group(0)
else:
return None
try:
return json.loads(json_str)
except json.JSONDecodeError:
return None
async def save_job_data(self, job_data: Dict[str, Any], keyword: str):
"""
Save job data to both markdown and database
"""
# Save to database
await self._save_to_db(job_data)
# Save to markdown
await self._save_to_markdown(job_data, keyword)
async def _save_to_db(self, job_data: Dict[str, Any]):
"""
Save job data to database
"""
db_path = "linkedin_jobs.db"
os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True)
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO jobs
(title, company_name, location, description, requirements,
qualifications, salary_range, nature_of_work, job_id, url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
job_data.get("title", "N/A"),
job_data.get("company_name", "N/A"),
job_data.get("location", "N/A"),
job_data.get("description", "N/A"),
job_data.get("requirements", "N/A"),
job_data.get("qualifications", "N/A"),
job_data.get("salary_range", "N/A"),
job_data.get("nature_of_work", "N/A"),
job_data.get("job_id", "N/A"),
job_data.get("url", "N/A")
))
conn.commit()
async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str):
"""
Save job data to markdown file
"""
os.makedirs("linkedin_jobs", exist_ok=True)
# Create a single markdown file for all jobs
filename = "linkedin_jobs_scraped.md"
filepath = os.path.join("linkedin_jobs", filename)
with open(filepath, "a", encoding="utf-8") as f:
# Only write header if file is empty
if os.path.getsize(filepath) == 0:
f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n")
f.write(f"- **Keyword**: {keyword}\n")
f.write(f"- **Company**: {job_data.get('company_name', 'N/A')}\n")
f.write(f"- **Location**: {job_data.get('location', 'N/A')}\n")
f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n")
f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n")
f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n")
f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n")
f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n")
f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n")
f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n")
f.write("---\n\n")

View File

@ -24,8 +24,7 @@ class FingerprintScrapingEngine:
db_path: str = "jobs.db",
markdown_path: str = "scraped_jobs.md",
proxies: List[str] = None,
login_credentials: Optional[Dict[str, str]] = None,
search_keywords: Optional[str] = None
login_credentials: Optional[Dict[str, str]] = None
):
if target_os not in ['windows', 'macos']:
raise ValueError("operating_system must be 'windows' or 'macos'")
@ -42,7 +41,6 @@ class FingerprintScrapingEngine:
self.markdown_path = markdown_path
self.proxies = proxies or []
self.login_credentials = login_credentials
self.search_keywords = search_keywords
self.fingerprint_generator = FingerprintGenerator(
browser=('chrome',),
os=(self.os,)