Skip to content

Batch Processing

PMCGrab provides efficient batch processing capabilities for handling large collections of PMC articles.

The primary way to process multiple articles is using the process_single_pmc function in a loop:

# ─── examples/run_three_pmcs.py ──────────────────────────────────────────────
import json
from pathlib import Path

from pmcgrab.application.processing import process_single_pmc
from pmcgrab.infrastructure.settings import next_email

# The PMC IDs we want to process
PMC_IDS = ["7114487", "3084273", "7690653", "5707528", "7979870"]

OUT_DIR = Path("pmc_output")
OUT_DIR.mkdir(exist_ok=True)

for pmcid in PMC_IDS:
    email = next_email()
    print(f"• Fetching PMC{pmcid} using email {email} …")
    data = process_single_pmc(pmcid)
    if data is None:
        print(f"  ↳ FAILED to parse PMC{pmcid}")
        continue

    # Pretty-print a few key fields
    print(
        f"  Title   : {data['title'][:80]}{'…' if len(data['title']) > 80 else ''}\n"
        f"  Abstract: {data['abstract'][:120]}{'…' if len(data['abstract']) > 120 else ''}\n"
        f"  Authors : {len(data['authors']) if data['authors'] else 0}"
    )

    # Persist full JSON
    dest = OUT_DIR / f"PMC{pmcid}.json"
    with dest.open("w", encoding="utf-8") as fh:
        json.dump(data, fh, indent=2, ensure_ascii=False)
    print(f"  ↳ JSON saved to {dest}\n")

Advanced Batch Processing Patterns

With Error Tracking

import json
from pathlib import Path
from pmcgrab.application.processing import process_single_pmc
from pmcgrab.infrastructure.settings import next_email

def process_with_error_tracking(pmcids, output_dir="results"):
    """Process PMC IDs with comprehensive error tracking."""
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)

    successful = []
    failed = []

    for pmcid in pmcids:
        email = next_email()
        print(f"Processing PMC{pmcid}...")

        try:
            data = process_single_pmc(pmcid)
            if data is None:
                failed.append(pmcid)
                print(f"  Failed: No data returned")
                continue

            # Save successful result
            output_file = output_path / f"PMC{pmcid}.json"
            with output_file.open('w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)

            successful.append(pmcid)
            print(f"  Success: {data['title'][:50]}...")

        except Exception as e:
            failed.append(pmcid)
            print(f"  Error: {str(e)}")

    # Save processing summary
    summary = {
        'total': len(pmcids),
        'successful': len(successful),
        'failed': len(failed),
        'failed_ids': failed
    }

    summary_file = output_path / "processing_summary.json"
    with summary_file.open('w', encoding='utf-8') as f:
        json.dump(summary, f, indent=2)

    print(f"\nProcessing complete: {len(successful)}/{len(pmcids)} successful")
    return successful, failed

# Usage
pmcids = ["7114487", "3084273", "7690653", "invalid_id", "5707528"]
successful, failed = process_with_error_tracking(pmcids)

With Progress Tracking

from tqdm import tqdm
import json
from pathlib import Path
from pmcgrab.application.processing import process_single_pmc
from pmcgrab.infrastructure.settings import next_email

def process_with_progress(pmcids, output_dir="results"):
    """Process PMC IDs with progress bar."""
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)

    successful = 0

    for pmcid in tqdm(pmcids, desc="Processing papers"):
        email = next_email()
        data = process_single_pmc(pmcid)

        if data is not None:
            output_file = output_path / f"PMC{pmcid}.json"
            with output_file.open('w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
            successful += 1
            tqdm.write(f"Success PMC{pmcid}: {data['title'][:40]}...")
        else:
            tqdm.write(f"Error PMC{pmcid}: Failed to process")

    print(f"\nCompleted: {successful}/{len(pmcids)} papers processed successfully")

# Usage
large_pmcid_list = ["7114487", "3084273", "7690653", "5707528", "7979870"]
process_with_progress(large_pmcid_list)

Reading PMC IDs from Files

From Text File

import json
from pathlib import Path
from pmcgrab.application.processing import process_single_pmc
from pmcgrab.infrastructure.settings import next_email

def process_from_file(filename, output_dir="results"):
    """Process PMC IDs from a text file."""
    # Read PMC IDs from file (one per line)
    with open(filename, 'r') as f:
        pmcids = [line.strip() for line in f if line.strip()]

    print(f"Read {len(pmcids)} PMC IDs from {filename}")

    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)

    for pmcid in pmcids:
        email = next_email()
        print(f"Processing PMC{pmcid}...")

        data = process_single_pmc(pmcid)
        if data is not None:
            output_file = output_path / f"PMC{pmcid}.json"
            with output_file.open('w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
            print(f"  Saved PMC{pmcid}")
        else:
            print(f"  Failed PMC{pmcid}")

# Create example file
with open('pmc_ids.txt', 'w') as f:
    f.write("7114487\n3084273\n7690653\n5707528\n7979870\n")

# Process from file
process_from_file('pmc_ids.txt')

From CSV File

import pandas as pd
import json
from pathlib import Path
from pmcgrab.application.processing import process_single_pmc
from pmcgrab.infrastructure.settings import next_email

def process_from_csv(csv_file, pmcid_column='pmcid', output_dir="results"):
    """Process PMC IDs from a CSV file."""
    df = pd.read_csv(csv_file)
    pmcids = df[pmcid_column].astype(str).tolist()

    print(f"Read {len(pmcids)} PMC IDs from {csv_file}")

    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)

    results = []

    for i, pmcid in enumerate(pmcids):
        email = next_email()
        print(f"Processing {i+1}/{len(pmcids)}: PMC{pmcid}...")

        data = process_single_pmc(pmcid)
        if data is not None:
            # Add CSV metadata to the result
            csv_row = df.iloc[i].to_dict()
            data['csv_metadata'] = csv_row

            output_file = output_path / f"PMC{pmcid}.json"
            with output_file.open('w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)

            results.append(data)
            print(f"  Success: {data['title'][:40]}...")
        else:
            print(f"  Failed to process PMC{pmcid}")

    return results

# Create example CSV
csv_data = {
    'pmcid': [7114487, 3084273, 7690653],
    'category': ['cancer', 'ML', 'genomics'],
    'priority': ['high', 'medium', 'high']
}
pd.DataFrame(csv_data).to_csv('articles.csv', index=False)

# Process from CSV
results = process_from_csv('articles.csv')

Large Dataset Processing

Chunked Processing

import json
from pathlib import Path
from pmcgrab.application.processing import process_single_pmc
from pmcgrab.infrastructure.settings import next_email

def process_large_dataset(pmcids, output_dir="large_dataset", chunk_size=100):
    """Process very large datasets in chunks."""
    total_chunks = len(pmcids) // chunk_size + (1 if len(pmcids) % chunk_size else 0)

    base_path = Path(output_dir)
    base_path.mkdir(exist_ok=True)

    overall_stats = {'successful': 0, 'failed': 0}

    for i in range(0, len(pmcids), chunk_size):
        chunk = pmcids[i:i + chunk_size]
        chunk_num = i // chunk_size + 1

        print(f"\n=== Processing chunk {chunk_num}/{total_chunks} ({len(chunk)} articles) ===")

        # Create chunk-specific output directory
        chunk_dir = base_path / f"chunk_{chunk_num:03d}"
        chunk_dir.mkdir(exist_ok=True)

        chunk_stats = {'successful': 0, 'failed': 0, 'failed_ids': []}

        for pmcid in chunk:
            email = next_email()
            data = process_single_pmc(pmcid)

            if data is not None:
                output_file = chunk_dir / f"PMC{pmcid}.json"
                with output_file.open('w', encoding='utf-8') as f:
                    json.dump(data, f, indent=2, ensure_ascii=False)
                chunk_stats['successful'] += 1
                overall_stats['successful'] += 1
            else:
                chunk_stats['failed'] += 1
                chunk_stats['failed_ids'].append(pmcid)
                overall_stats['failed'] += 1

        # Save chunk summary
        summary_file = chunk_dir / "chunk_summary.json"
        with summary_file.open('w', encoding='utf-8') as f:
            json.dump(chunk_stats, f, indent=2)

        print(f"Chunk {chunk_num} complete: {chunk_stats['successful']} successful, {chunk_stats['failed']} failed")

    # Save overall summary
    overall_summary = {
        'total_articles': len(pmcids),
        'total_chunks': total_chunks,
        'chunk_size': chunk_size,
        **overall_stats,
        'success_rate': overall_stats['successful'] / len(pmcids) * 100
    }

    summary_file = base_path / "overall_summary.json"
    with summary_file.open('w', encoding='utf-8') as f:
        json.dump(overall_summary, f, indent=2)

    print(f"\n=== Processing Complete ===")
    print(f"Total: {len(pmcids)} articles")
    print(f"Successful: {overall_stats['successful']}")
    print(f"Failed: {overall_stats['failed']}")
    print(f"Success rate: {overall_summary['success_rate']:.1f}%")

# Example: process 500 articles in chunks of 50
large_pmcid_list = [str(7000000 + i) for i in range(500)]  # Example IDs
process_large_dataset(large_pmcid_list, chunk_size=50)

Memory-Efficient Processing

import json
import gc
from pathlib import Path
from pmcgrab.application.processing import process_single_pmc
from pmcgrab.infrastructure.settings import next_email

def memory_efficient_processing(pmcids, output_dir="memory_efficient", batch_size=10):
    """Process articles with memory management."""
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)

    for i in range(0, len(pmcids), batch_size):
        batch = pmcids[i:i + batch_size]
        batch_num = i // batch_size + 1

        print(f"Processing batch {batch_num}: {len(batch)} articles")

        for pmcid in batch:
            email = next_email()
            data = process_single_pmc(pmcid)

            if data is not None:
                # Save immediately and clear from memory
                output_file = output_path / f"PMC{pmcid}.json"
                with output_file.open('w', encoding='utf-8') as f:
                    json.dump(data, f, indent=2, ensure_ascii=False)
                print(f"  Saved PMC{pmcid}")

                # Clear data from memory
                del data
            else:
                print(f"  Failed PMC{pmcid}")

        # Force garbage collection after each batch
        gc.collect()
        print(f"Batch {batch_num} complete, memory cleared")

# Usage
large_list = [str(7000000 + i) for i in range(100)]
memory_efficient_processing(large_list, batch_size=10)

Resumable Processing

Resume from Previous Run

import json
from pathlib import Path
from pmcgrab.application.processing import process_single_pmc
from pmcgrab.infrastructure.settings import next_email

def resumable_processing(pmcids, output_dir="resumable_output"):
    """Resume processing from where it left off."""
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)

    # Check what's already been processed
    processed_files = list(output_path.glob("PMC*.json"))
    processed_ids = [f.stem.replace('PMC', '') for f in processed_files]

    # Filter out already processed IDs
    remaining_ids = [pmcid for pmcid in pmcids if pmcid not in processed_ids]

    print(f"Found {len(processed_ids)} already processed articles")
    print(f"Remaining to process: {len(remaining_ids)}")

    if not remaining_ids:
        print("All articles already processed!")
        return

    # Process remaining articles
    for pmcid in remaining_ids:
        email = next_email()
        print(f"Processing PMC{pmcid}...")

        data = process_single_pmc(pmcid)
        if data is not None:
            output_file = output_path / f"PMC{pmcid}.json"
            with output_file.open('w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
            print(f"  Saved PMC{pmcid}")
        else:
            print(f"  Failed PMC{pmcid}")

    print("Processing complete!")

# Usage - can be run multiple times safely
all_pmcids = ["7114487", "3084273", "7690653", "5707528", "7979870"]
resumable_processing(all_pmcids)

Command Line Batch Processing

For command-line batch processing, use the built-in CLI:

# From individual IDs
uv run python -m pmcgrab PMC7114487 PMC3084273 PMC7690653

# From file
echo -e "7114487\n3084273\n7690653" > pmcids.txt
uv run python -m pmcgrab --input-file pmcids.txt --output-dir batch_results/

# With custom settings
uv run python -m pmcgrab \
    --input-file pmcids.txt \
    --output-dir ./results \
    --workers 4 \
    --batch-size 10 \
    --max-retries 2 \
    --email researcher@university.edu

Best Practices

Production-Ready Processing

import json
import logging
from datetime import datetime
from pathlib import Path
from pmcgrab.application.processing import process_single_pmc
from pmcgrab.infrastructure.settings import next_email

def production_batch_processing(pmcids, output_dir="production_output"):
    """Production-ready batch processing with logging."""

    # Set up logging
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    log_file = f"pmcgrab_batch_{timestamp}.log"

    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )

    logger = logging.getLogger(__name__)
    logger.info(f"Starting batch processing of {len(pmcids)} articles")

    # Create timestamped output directory
    output_path = Path(output_dir) / f"batch_{timestamp}"
    output_path.mkdir(parents=True, exist_ok=True)

    stats = {'successful': 0, 'failed': 0, 'failed_ids': []}

    try:
        for i, pmcid in enumerate(pmcids, 1):
            logger.info(f"Processing {i}/{len(pmcids)}: PMC{pmcid}")

            email = next_email()
            data = process_single_pmc(pmcid)

            if data is not None:
                output_file = output_path / f"PMC{pmcid}.json"
                with output_file.open('w', encoding='utf-8') as f:
                    json.dump(data, f, indent=2, ensure_ascii=False)

                stats['successful'] += 1
                logger.info(f"Success: {data['title'][:50]}...")
            else:
                stats['failed'] += 1
                stats['failed_ids'].append(pmcid)
                logger.warning(f"Failed to process PMC{pmcid}")

        # Save final summary
        summary_file = output_path / "processing_summary.json"
        with summary_file.open('w', encoding='utf-8') as f:
            json.dump(stats, f, indent=2)

        success_rate = stats['successful'] / len(pmcids) * 100
        logger.info(f"Batch processing completed: {stats['successful']}/{len(pmcids)} successful ({success_rate:.1f}%)")
        logger.info(f"Output directory: {output_path}")
        logger.info(f"Log file: {log_file}")

    except Exception as e:
        logger.error(f"Batch processing failed: {e}")
        raise

    return stats

# Run production processing
pmcids = ["7114487", "3084273", "7690653", "5707528", "7979870"]
results = production_batch_processing(pmcids)

This approach provides robust, scalable batch processing while maintaining simplicity and reliability.