From fd4e8c9c0587baea408f010d64e528725c4b8205 Mon Sep 17 00:00:00 2001 From: Ofure Ikheloa Date: Mon, 24 Nov 2025 12:25:50 +0100 Subject: [PATCH] feat(scraper): add LLM-powered job data refinement and new scraping logic - Implement LLMJobRefiner class for processing job data with Gemini API - Add new job_scraper2.py with enhanced scraping capabilities - Remove search_keywords parameter from scraping engine - Add environment variable loading in config.py - Update main script to use new scraper and target field --- config.py | 10 + job_scraper2.py | 510 +++++++++++++++++++++++++++++++++++++++++++++ linkedin_main.py | 17 +- llm_agent.py | 166 +++++++++++++++ scraping_engine.py | 4 +- 5 files changed, 697 insertions(+), 10 deletions(-) create mode 100644 job_scraper2.py create mode 100644 llm_agent.py diff --git a/config.py b/config.py index eac08fa..19f18ad 100644 --- a/config.py +++ b/config.py @@ -2,6 +2,16 @@ import os import json +from dotenv import load_dotenv + +# Load environment variables from .env file +load_dotenv() + +# LLM Agent Configuration +GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") +if not GEMINI_API_KEY: + raise ValueError("GEMINI_API_KEY environment variable not set in .env file") + def load_spoof_config(): """Load spoof data from JSON config file. Falls back to defaults if missing.""" diff --git a/job_scraper2.py b/job_scraper2.py new file mode 100644 index 0000000..d1c7a6f --- /dev/null +++ b/job_scraper2.py @@ -0,0 +1,510 @@ + +import asyncio +import random +import sqlite3 +import os +from datetime import datetime +from typing import Optional, Dict, List +from playwright.async_api import async_playwright +from browserforge.injectors.playwright import AsyncNewContext +from llm_agent import LLMJobRefiner +import re + + +class LinkedInJobScraper: + def __init__( + self, + engine, + db_path: str = "linkedin_jobs.db", + human_speed: float = 1.0, + target_field: str = "all" + ): + self.engine = engine + self.db_path = db_path + self.human_speed = human_speed + self.target_field = target_field + self._init_db() + self.llm_agent = LLMJobRefiner() + + def _init_db(self): + os.makedirs(os.path.dirname(self.db_path) if os.path.dirname(self.db_path) else ".", exist_ok=True) + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute(''' + CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + title TEXT, + company_name TEXT, + location TEXT, + description TEXT, + requirements TEXT, + qualifications TEXT, + salary_range TEXT, + nature_of_work TEXT, + job_id TEXT, + url TEXT UNIQUE + ) + ''') + conn.commit() + + async def _human_click(self, page, element, wait_after: bool = True): + if not element: + return False + await element.scroll_into_view_if_needed() + await asyncio.sleep(random.uniform(0.3, 0.8) * self.human_speed) + try: + await element.click() + if wait_after: + await asyncio.sleep(random.uniform(2, 4) * self.human_speed) + return True + except: + return False + + async def _login(self, page, credentials: Dict) -> bool: + """Human-realistic LinkedIn login""" + print("🔐 Navigating to LinkedIn login page...") + await page.goto("https://www.linkedin.com/login", timeout=60000) + await asyncio.sleep(random.uniform(2.0, 3.5) * self.human_speed) + + email_field = await page.query_selector('input[name="session_key"]') + if not email_field: + print("❌ Email field not found.") + return False + + print("âœī¸ Typing username...") + await email_field.click() + await asyncio.sleep(random.uniform(0.4, 0.9) * self.human_speed) + for char in credentials["email"]: + await page.keyboard.type(char) + await asyncio.sleep(random.uniform(0.06, 0.14) * self.human_speed) + await asyncio.sleep(random.uniform(1.0, 1.8) * self.human_speed) + + password_field = await page.query_selector('input[name="session_password"]') + if not password_field: + print("❌ Password field not found.") + return False + + print("🔒 Typing password...") + await password_field.click() + await asyncio.sleep(random.uniform(0.3, 0.7) * self.human_speed) + for char in credentials["password"]: + await page.keyboard.type(char) + await asyncio.sleep(random.uniform(0.08, 0.16) * self.human_speed) + await asyncio.sleep(random.uniform(0.8, 1.5) * self.human_speed) + + print("✅ Submitting login form...") + await page.keyboard.press("Enter") + + for _ in range(15): + current_url = page.url + if "/feed" in current_url or "/jobs" in current_url: + if "login" not in current_url: + print("✅ Login successful!") + await asyncio.sleep(random.uniform(2.0, 3.0) * self.human_speed) + return True + await asyncio.sleep(1) + print("❌ Login may have failed.") + return False + + async def _extract_all_page_content(self, page) -> str: + """Extract all content from the job page""" + await asyncio.sleep(2 * self.human_speed) + + # Human-like scrolling to load all content + await self.engine._human_like_scroll(page) + await asyncio.sleep(2 * self.human_speed) + + # Get the full page content + page_content = await page.content() + return page_content + + def _calculate_keyword_match(self, title: str, keywords: str) -> float: + """Calculate percentage of keywords matched in title""" + if not title or not keywords: + return 0.0 + + title_lower = title.lower() + keyword_list = [kw.strip().lower() for kw in keywords.split()] + + matches = 0 + for keyword in keyword_list: + if keyword in title_lower: + matches += 1 + + return matches / len(keyword_list) if keyword_list else 0.0 + + def _extract_location_from_keywords(self, search_keywords: str) -> str: + """Extract location from search keywords if present""" + location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) + if location_match: + return location_match.group(1).strip().lower() + return "" + + async def _scrape_jobs_from_current_page(self, page, search_keywords: str, seen_job_ids, all_job_links): + """Scrape job links from the current page that match keywords and location""" + current_links = await page.query_selector_all("a[href*='/jobs/view/']") + new_jobs = 0 + + # Extract location from search keywords + location_from_keywords = self._extract_location_from_keywords(search_keywords) + + for link in current_links: + href = await link.get_attribute("href") + if href: + full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}" + job_id = href.split("/view/")[-1].split("/")[0] if "/view/" in href else href + + if job_id and job_id not in seen_job_ids: + # Check if job title matches keywords (at least 70% match) + title_element = await link.query_selector("span.job-title, h3, .job-card-title") + if title_element: + title = await title_element.inner_text() + match_percentage = self._calculate_keyword_match(title, search_keywords) + + # Check if location matches (if specified in keywords) + location_match = True + if location_from_keywords: + # Try to get location from the job card + location_element = await link.query_selector("span.job-location, .job-card-location, .location") + if location_element: + location_text = await location_element.inner_text() + location_match = location_from_keywords in location_text.lower() + + if match_percentage >= 0.7 and location_match: # At least 70% match and location matches + seen_job_ids.add(job_id) + all_job_links.append((href, title)) + new_jobs += 1 + elif match_percentage < 0.7: + print(f" âš ī¸ Skipping job due to low keyword match: {title[:50]}... (match: {match_percentage:.2%})") + elif not location_match: + print(f" âš ī¸ Skipping job due to location mismatch: {title[:50]}... (expected: {location_from_keywords})") + else: + # If no title element, still add to check later + seen_job_ids.add(job_id) + all_job_links.append((href, "Unknown Title")) + new_jobs += 1 + return new_jobs + + async def _handle_pagination(self, page, search_keywords: str, seen_job_ids, all_job_links): + """Handle pagination by going through pages""" + current_page = 1 + while True: + print(f"📄 Processing page {current_page}") + + # Collect job links on current page + new_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) + print(f" ➕ Found {new_jobs} new job(s) on page {current_page} (total: {len(all_job_links)})") + + # Try to go to next page + next_btn = await page.query_selector("button[aria-label='Next']") + if next_btn and await next_btn.is_enabled(): + await self._human_click(page, next_btn) + await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed) + # Wait for URL to change or new content + try: + await page.wait_for_function("() => window.location.href.includes('start=')", timeout=30000) + except: + pass + current_page += 1 + else: + print("🔚 'Next' button not available — stopping pagination.") + break + + async def _handle_infinite_scroll(self, page, search_keywords: str, seen_job_ids, all_job_links): + """Handle infinite scroll to load more jobs""" + last_height = await page.evaluate("document.body.scrollHeight") + no_new_jobs_count = 0 + max_no_new = 3 + + while no_new_jobs_count < max_no_new: + await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") + await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) + + new_jobs_found = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) + + print(f" ➕ Found {new_jobs_found} new job(s) (total: {len(all_job_links)})") + + new_height = await page.evaluate("document.body.scrollHeight") + if new_height == last_height: + no_new_jobs_count += 1 + else: + no_new_jobs_count = 0 + last_height = new_height + + if new_jobs_found == 0 and no_new_jobs_count >= 1: + print("🔚 No new jobs loaded. Stopping scroll.") + break + + async def scrape_jobs( + self, + search_keywords: Optional[str], + max_pages: int = 1, + credentials: Optional[Dict] = None + ): + # Parse location from keywords if present + location_match = re.search(r'location:\s*([^,]+)', search_keywords, re.IGNORECASE) + location = location_match.group(1).strip() if location_match else "" + + # Remove location part from keywords for search + clean_keywords = re.sub(r'location:\s*[^,]+', '', search_keywords, flags=re.IGNORECASE).strip() + encoded_keywords = clean_keywords.replace(" ", "%20") + + # Build search URL with location if specified + search_url = f"https://www.linkedin.com/jobs/search/?keywords={encoded_keywords}" + if location: + search_url += f"&location={location.replace(' ', '%20')}" + + profile = self.engine._select_profile() + renderer = random.choice(self.engine.common_renderers[self.engine.os]) + vendor = random.choice(self.engine.common_vendors) + spoof_script = self.engine._get_spoof_script(renderer, vendor) + + async with async_playwright() as pw: + browser = await pw.chromium.launch( + headless= False, + args=['--disable-blink-features=AutomationControlled'] + ) + context = await AsyncNewContext(browser, fingerprint=profile) + + await context.add_init_script(f""" + Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {profile.navigator.hardwareConcurrency} }}); + Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {profile.navigator.deviceMemory} }}); + Object.defineProperty(navigator, 'platform', {{ get: () => '{profile.navigator.platform}' }}); + """) + await context.add_init_script(spoof_script) + + page = await context.new_page() + + session_loaded = await self.engine.load_session(context) + login_successful = False + + if session_loaded: + print("🔁 Using saved session — verifying login...") + await page.goto("https://www.linkedin.com/feed/", timeout=60000) + if "feed" in page.url and "login" not in page.url: + print("✅ Session still valid.") + login_successful = True + else: + print("âš ī¸ Saved session expired — re-authenticating.") + session_loaded = False + + if not session_loaded and credentials: + print("🔐 Performing fresh login...") + login_successful = await self._login(page, credentials) + if login_successful: + await self.engine.save_session(context) + else: + print("❌ Login failed. Exiting.") + await browser.close() + self.engine.report_outcome("block") + return + elif not credentials: + print("â„šī¸ No credentials — proceeding as guest.") + login_successful = True + else: + pass + + await page.wait_for_load_state("load", timeout=60000) + print("✅ Post-login page fully loaded. Starting search...") + + if await self.engine._detect_cloudflare(page): + print("â˜ī¸ Cloudflare detected on initial load.") + if not await self.engine._handle_cloudflare(page): + print("❌ Cloudflare could not be resolved.") + await browser.close() + self.engine.report_outcome("cloudflare") + return + + print(f"🔍 Searching for: {search_keywords}") + await page.goto(search_url, wait_until='load', timeout=60000) + await asyncio.sleep(random.uniform(4.0, 6.0) * self.human_speed) + + if await self.engine._detect_cloudflare(page): + print("â˜ī¸ Cloudflare detected on search page.") + if not await self.engine._handle_cloudflare(page): + await browser.close() + self.engine.report_outcome("cloudflare") + return + + all_job_links = [] + seen_job_ids = set() + + # First, scrape the initial page + print("🔄 Collecting initial job links...") + initial_jobs = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) + print(f" ➕ Found {initial_jobs} initial job(s) (total: {len(all_job_links)})") + + # Loop until no new jobs are found + iteration = 1 + while True: + print(f"🔄 Iteration {iteration}: Checking for new jobs...") + + # First try infinite scroll + prev_job_count = len(all_job_links) + await self._handle_infinite_scroll(page, search_keywords, seen_job_ids, all_job_links) + new_jobs_count = len(all_job_links) - prev_job_count + + if new_jobs_count > 0: + print(f" ➕ Found {new_jobs_count} new jobs via infinite scroll") + iteration += 1 + continue # Continue with infinite scroll if new jobs found + + # If no new jobs via scroll, check for pagination + pagination_exists = await page.query_selector("button[aria-label='Next']") + + if pagination_exists: + print("â­ī¸ Pagination detected. Processing pages...") + await self._handle_pagination(page, search_keywords, seen_job_ids, all_job_links) + iteration += 1 + continue # Continue with pagination if new jobs found + else: + # If no pagination and no new jobs from scroll, check by refreshing + print("🔄 Refreshing page to check for new results...") + await page.reload(wait_until='networkidle') + await asyncio.sleep(random.uniform(3.0, 5.0) * self.human_speed) + + # Check for new jobs after refresh + new_jobs_after_refresh = await self._scrape_jobs_from_current_page(page, search_keywords, seen_job_ids, all_job_links) + if new_jobs_after_refresh > 0: + print(f" ➕ Found {new_jobs_after_refresh} new job(s) after refresh") + iteration += 1 + continue # Continue if new jobs found after refresh + else: + print("🔚 No new jobs found after refresh. Stopping.") + break + + # Limit iterations to prevent infinite loops + if iteration > 10: + print("🔄 Maximum iterations reached. Stopping.") + break + + print(f"✅ Collected {len(all_job_links)} unique job links.") + + # Process all collected job links + scraped_count = 0 + for idx, (href, title) in enumerate(all_job_links): + try: + full_url = href if href.startswith("http") else f"https://www.linkedin.com{href}" + print(f" → Opening job {idx+1}/{len(all_job_links)}: {full_url}") + await page.goto(full_url, wait_until='load', timeout=60000) + await asyncio.sleep(3 * self.human_speed) + + is_cloudflare = await self.engine._detect_cloudflare(page) + page_content = await page.content() + has_captcha_text = "captcha" in page_content.lower() + captcha_present = is_cloudflare or has_captcha_text + + title_element = await page.query_selector("h1.t-24") + job_data_accessible = title_element is not None + + if captcha_present: + if job_data_accessible: + print(" âš ī¸ CAPTCHA detected, but job data is accessible. Proceeding in stealth mode...") + await self.engine._avoid_captcha(page) + else: + print(" âš ī¸ CAPTCHA detected and job data blocked. Attempting recovery...") + if not await self.engine._solve_captcha_fallback(page): + print(" ❌ CAPTCHA recovery failed. Skipping job.") + continue + title_element = await page.query_selector("h1.t-24") + if not title_element: + print(" ❌ Job data still unavailable after CAPTCHA handling. Skipping.") + continue + + if not captcha_present: + await self.engine._avoid_captcha(page) + + apply_btn = None + apply_selectors = [ + "button[aria-label*='Apply']", + "button:has-text('Apply')", + "a:has-text('Apply')", + "button:has-text('Easy Apply')" + ] + for selector in apply_selectors: + apply_btn = await page.query_selector(selector) + if apply_btn: + break + + page_data = None + final_url = full_url + + if apply_btn: + print(" → Clicking 'Apply' / 'Easy Apply' button...") + + page_waiter = asyncio.create_task(context.wait_for_event("page")) + await self._human_click(page, apply_btn, wait_after=False) + + external_page = None + try: + external_page = await asyncio.wait_for(page_waiter, timeout=5.0) + print(" 🌐 External job site opened in new tab.") + await external_page.wait_for_load_state("load", timeout=30000) + await asyncio.sleep(2 * self.human_speed) + await self.engine._human_like_scroll(external_page) + await asyncio.sleep(2 * self.human_speed) + + page_data = await self._extract_all_page_content(external_page) + final_url = external_page.url + + if not external_page.is_closed(): + await external_page.close() + + except asyncio.TimeoutError: + print(" đŸ–Ĩī¸ No external tab — scraping LinkedIn job page.") + await page.wait_for_timeout(2000) + try: + await page.wait_for_selector("div.jobs-apply-button--fixed, div.jobs-easy-apply-modal", timeout=8000) + except: + pass + await self.engine._human_like_scroll(page) + await asyncio.sleep(2 * self.human_speed) + page_data = await self._extract_all_page_content(page) + final_url = page.url + else: + print(" âš ī¸ No 'Apply' button found — scraping job details directly.") + await self.engine._human_like_scroll(page) + await asyncio.sleep(2 * self.human_speed) + page_data = await self._extract_all_page_content(page) + final_url = page.url + + # Extract job ID from URL + job_id = final_url.split("/")[-2] if "/jobs/view/" in final_url else "unknown" + + # Prepare raw data for LLM processing + raw_data = { + "page_content": page_data, + "url": final_url, + "job_id": job_id + } + + # Send raw data to LLM agent for refinement + refined_data = await self.llm_agent.refine_job_data(raw_data, search_keywords) + + # Only save if LLM successfully extracted meaningful data + if refined_data and refined_data.get("title", "N/A") != "N/A": + # Save refined data to markdown and database through LLM agent + await self.llm_agent.save_job_data(refined_data, search_keywords) + + scraped_count += 1 + print(f" ✅ Scraped and refined: {refined_data['title'][:50]}...") + else: + print(f" 🟡 Could not extract meaningful data from: {final_url}") + + except Exception as e: + print(f" âš ī¸ Failed on job {idx+1}: {str(e)[:100]}") + continue + + finally: + print(" â†Šī¸ Returning to LinkedIn search results...") + await page.goto(search_url, timeout=60000) + await asyncio.sleep(4 * self.human_speed) + + await browser.close() + + if scraped_count > 0: + self.engine.report_outcome("success") + print(f"✅ Completed! Processed {scraped_count} jobs for '{search_keywords}'.") + else: + self.engine.report_outcome("captcha") + print("âš ī¸ No jobs processed successfully.") \ No newline at end of file diff --git a/linkedin_main.py b/linkedin_main.py index 5d3e277..04e5b02 100644 --- a/linkedin_main.py +++ b/linkedin_main.py @@ -1,23 +1,26 @@ + from scraping_engine import FingerprintScrapingEngine -from job_scraper import LinkedInJobScraper +from job_scraper2 import LinkedInJobScraper import os +from dotenv import load_dotenv import asyncio +# Load environment variables +load_dotenv() async def main(): engine = FingerprintScrapingEngine( seed="job_scraping_engine", target_os="windows", db_path="job_listings.db", - markdown_path="job_listings.md", - search_keywords="Data Anaylst" + markdown_path="job_listings.md" ) - scraper = LinkedInJobScraper(engine, human_speed=1.6) + # Initialize scraper with target field + scraper = LinkedInJobScraper(engine, human_speed=1.6, target_field="Web designer") await scraper.scrape_jobs( - search_keywords="Data Anaylst", # ← Your search terms - max_pages=3, + search_keywords="Web Designer location:New York", credentials={ "email": os.getenv("SCRAPING_USERNAME"), "password": os.getenv("SCRAPING_PASSWORD") @@ -25,4 +28,4 @@ async def main(): ) if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file diff --git a/llm_agent.py b/llm_agent.py new file mode 100644 index 0000000..e409762 --- /dev/null +++ b/llm_agent.py @@ -0,0 +1,166 @@ +import google.generativeai as genai +from typing import Dict, Any +import asyncio +import sqlite3 +import os +from datetime import datetime +from config import GEMINI_API_KEY + +class LLMJobRefiner: + def __init__(self): + genai.configure(api_key=GEMINI_API_KEY) + self.model = genai.GenerativeModel('gemini-pro') + + async def refine_job_data(self, raw_data: Dict[str, Any], target_field: str) -> Dict[str, Any]: + """ + Refine raw job data using Gemini LLM based on target field + """ + prompt = f""" + You are a job data extraction assistant. Extract the following fields from the job posting: + - title + - company_name + - location + - description + - requirements + - qualifications + - salary_range + - nature_of_work (remote, onsite, or hybrid) + - job_id + + Target Field: {target_field} + Raw Page Content: + {raw_data.get('page_content', '')[:3000]} # Limit content size + + Instructions: + 1. Extract only the information relevant to the target field: {target_field} + 2. Clean up any formatting issues in the description + 3. Standardize location format (city, state/country) + 4. Extract salary range if mentioned in description + 5. Determine nature of work (remote, onsite, or hybrid) from work arrangements + 6. Ensure all fields are properly formatted + 7. If a field cannot be found, use "N/A" + 8. Return the refined data in JSON format + + Response format (only return the JSON): + {{ + "title": "...", + "company_name": "...", + "location": "...", + "description": "...", + "requirements": "...", + "qualifications": "...", + "salary_range": "...", + "nature_of_work": "...", + "job_id": "{raw_data.get('job_id', 'unknown')}", + "url": "{raw_data.get('url', 'N/A')}" + }} + """ + + try: + response = await asyncio.get_event_loop().run_in_executor( + None, + lambda: self.model.generate_content(prompt) + ) + + # Parse the response and return refined data + refined_data = self._parse_llm_response(response.text) + + # If parsing fails, return None + if not refined_data: + return None + + return refined_data + + except Exception as e: + print(f"LLM refinement failed: {str(e)}") + return None + + def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: + """ + Parse the LLM response to extract refined job data + """ + import json + import re + + # Extract JSON from response (handle markdown code blocks) + json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', response_text, re.DOTALL) + if json_match: + json_str = json_match.group(1) + else: + # If no code block, try to find JSON directly + json_match = re.search(r'\{.*\}', response_text, re.DOTALL) + if json_match: + json_str = json_match.group(0) + else: + return None + + try: + return json.loads(json_str) + except json.JSONDecodeError: + return None + + async def save_job_data(self, job_data: Dict[str, Any], keyword: str): + """ + Save job data to both markdown and database + """ + # Save to database + await self._save_to_db(job_data) + + # Save to markdown + await self._save_to_markdown(job_data, keyword) + + async def _save_to_db(self, job_data: Dict[str, Any]): + """ + Save job data to database + """ + db_path = "linkedin_jobs.db" + os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else ".", exist_ok=True) + + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + cursor.execute(''' + INSERT OR IGNORE INTO jobs + (title, company_name, location, description, requirements, + qualifications, salary_range, nature_of_work, job_id, url) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + job_data.get("title", "N/A"), + job_data.get("company_name", "N/A"), + job_data.get("location", "N/A"), + job_data.get("description", "N/A"), + job_data.get("requirements", "N/A"), + job_data.get("qualifications", "N/A"), + job_data.get("salary_range", "N/A"), + job_data.get("nature_of_work", "N/A"), + job_data.get("job_id", "N/A"), + job_data.get("url", "N/A") + )) + conn.commit() + + async def _save_to_markdown(self, job_data: Dict[str, Any], keyword: str): + """ + Save job data to markdown file + """ + os.makedirs("linkedin_jobs", exist_ok=True) + + # Create a single markdown file for all jobs + filename = "linkedin_jobs_scraped.md" + filepath = os.path.join("linkedin_jobs", filename) + + with open(filepath, "a", encoding="utf-8") as f: + # Only write header if file is empty + if os.path.getsize(filepath) == 0: + f.write(f"# LinkedIn Jobs - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") + + f.write(f"## Job: {job_data.get('title', 'N/A')}\n\n") + f.write(f"- **Keyword**: {keyword}\n") + f.write(f"- **Company**: {job_data.get('company_name', 'N/A')}\n") + f.write(f"- **Location**: {job_data.get('location', 'N/A')}\n") + f.write(f"- **Nature of Work**: {job_data.get('nature_of_work', 'N/A')}\n") + f.write(f"- **Salary Range**: {job_data.get('salary_range', 'N/A')}\n") + f.write(f"- **Job ID**: {job_data.get('job_id', 'N/A')}\n") + f.write(f"- **URL**: <{job_data.get('url', 'N/A')}>\n\n") + f.write(f"### Description\n\n{job_data.get('description', 'N/A')}\n\n") + f.write(f"### Requirements\n\n{job_data.get('requirements', 'N/A')}\n\n") + f.write(f"### Qualifications\n\n{job_data.get('qualifications', 'N/A')}\n\n") + f.write("---\n\n") \ No newline at end of file diff --git a/scraping_engine.py b/scraping_engine.py index 336d773..3ace021 100644 --- a/scraping_engine.py +++ b/scraping_engine.py @@ -24,8 +24,7 @@ class FingerprintScrapingEngine: db_path: str = "jobs.db", markdown_path: str = "scraped_jobs.md", proxies: List[str] = None, - login_credentials: Optional[Dict[str, str]] = None, - search_keywords: Optional[str] = None + login_credentials: Optional[Dict[str, str]] = None ): if target_os not in ['windows', 'macos']: raise ValueError("operating_system must be 'windows' or 'macos'") @@ -42,7 +41,6 @@ class FingerprintScrapingEngine: self.markdown_path = markdown_path self.proxies = proxies or [] self.login_credentials = login_credentials - self.search_keywords = search_keywords self.fingerprint_generator = FingerprintGenerator( browser=('chrome',), os=(self.os,)