Skip to main content
This cookbook walks you through the complete workflow of extracting structured data from documents using the Raydocs API. By the end, you’ll have working code that:
  1. Creates a workspace
  2. Defines an extraction template
  3. Uploads documents with automatic extraction
  4. Polls for completion and retrieves results

Prerequisites

  • A Raydocs account with API access
  • An API token with workspaces-write, templates-write, and sessions-write abilities
  • Documents to process (PDF, PNG, JPG supported)
Generate your API token from the Raydocs Dashboard under Settings → API Keys.

Complete Example

Choose your language to see the full implementation:
import requests
import time
import os

class RaydocsClient:
    """Simple Raydocs API client for document extraction."""
    
    def __init__(self, api_key: str, base_url: str = "https://api.raydocs.com"):
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
            "Accept": "application/json"
        }
    
    def _request(self, method: str, endpoint: str, **kwargs) -> dict:
        """Make an API request."""
        url = f"{self.base_url}{endpoint}"
        response = requests.request(method, url, headers=self.headers, **kwargs)
        response.raise_for_status()
        return response.json() if response.content else {}
    
    # ─────────────────────────────────────────────────────────────
    # Workspaces
    # ─────────────────────────────────────────────────────────────
    
    def create_workspace(self, name: str, icon: str = "📊") -> dict:
        """Create a new workspace."""
        return self._request("POST", "/workspaces/create", json={
            "name": name,
            "icon": icon
        })
    
    def list_workspaces(self) -> list:
        """List all accessible workspaces."""
        return self._request("GET", "/workspaces")["data"]
    
    # ─────────────────────────────────────────────────────────────
    # Templates
    # ─────────────────────────────────────────────────────────────
    
    def create_template(self, workspace_id: int, name: str, schema: dict, 
                        description: str = None) -> dict:
        """Create an extraction template."""
        payload = {
            "workspace_id": workspace_id,
            "name": name,
            "schema_json": schema
        }
        if description:
            payload["description"] = description
        return self._request("POST", "/extractions/templates", json=payload)
    
    def list_templates(self, workspace_id: int) -> list:
        """List templates in a workspace."""
        return self._request("GET", f"/workspaces/{workspace_id}/extractions/templates")["data"]
    
    # ─────────────────────────────────────────────────────────────
    # File Upload
    # ─────────────────────────────────────────────────────────────
    
    def get_upload_url(self, filename: str, content_type: str = "application/pdf") -> dict:
        """Get a signed URL for file upload."""
        return self._request("POST", "/vapor/signed-storage-url", json={
            "content_type": content_type,
            "visibility": "private"
        })
    
    def upload_file(self, file_path: str) -> str:
        """Upload a file and return the storage key."""
        filename = os.path.basename(file_path)
        
        # Determine content type
        ext = filename.lower().split('.')[-1]
        content_types = {
            'pdf': 'application/pdf',
            'png': 'image/png',
            'jpg': 'image/jpeg',
            'jpeg': 'image/jpeg'
        }
        content_type = content_types.get(ext, 'application/octet-stream')
        
        # Get signed upload URL
        upload_data = self.get_upload_url(filename, content_type)
        
        # Upload file directly to S3
        with open(file_path, 'rb') as f:
            upload_response = requests.put(
                upload_data['url'],
                data=f,
                headers={
                    'Content-Type': content_type,
                    **upload_data.get('headers', {})
                }
            )
            upload_response.raise_for_status()
        
        # Return the key to reference the uploaded file
        return upload_data['key']
    
    # ─────────────────────────────────────────────────────────────
    # Batch Operations
    # ─────────────────────────────────────────────────────────────
    
    def batch_create_sessions(self, template_id: str, file_keys: list, 
                               auto_extract: bool = True) -> list:
        """
        Create sessions from uploaded files with optional auto-extraction.
        
        When auto_extract=True, extraction starts immediately after document parsing.
        """
        return self._request(
            "POST",
            f"/extractions/templates/{template_id}/sessions/batch",
            json={
                "files": file_keys,
                "settings": {
                    "auto_extract": auto_extract
                }
            }
        )
    
    # ─────────────────────────────────────────────────────────────
    # Sessions & Results
    # ─────────────────────────────────────────────────────────────
    
    def get_session(self, session_id: str) -> dict:
        """Get session details including status."""
        return self._request("GET", f"/extractions/sessions/{session_id}")
    
    def get_results(self, session_id: str) -> list:
        """Get extraction results for a session."""
        return self._request("GET", f"/extractions/sessions/{session_id}/results")["data"]
    
    def get_result(self, result_id: str) -> dict:
        """Get a specific extraction result."""
        return self._request("GET", f"/extractions/results/{result_id}")
    
    def wait_for_completion(self, session_id: str, timeout: int = 300, 
                            poll_interval: int = 5) -> dict:
        """
        Poll session until extraction completes or times out.
        
        Returns the completed result or raises TimeoutError.
        """
        start = time.time()
        
        while time.time() - start < timeout:
            session = self.get_session(session_id)
            results = self.get_results(session_id)
            
            if results:
                result = results[0]
                status = result.get('status')
                
                if status == 'completed':
                    return self.get_result(result['id'])
                elif status == 'failed':
                    raise Exception(f"Extraction failed: {result.get('error')}")
            
            print(f"Status: Processing... ({int(time.time() - start)}s)")
            time.sleep(poll_interval)
        
        raise TimeoutError(f"Extraction did not complete within {timeout}s")


# ═══════════════════════════════════════════════════════════════════
# Example Usage
# ═══════════════════════════════════════════════════════════════════

def main():
    # Initialize client
    client = RaydocsClient(api_key="your_api_token_here")
    
    # ─────────────────────────────────────────────────────────────
    # Step 1: Create or get a workspace
    # ─────────────────────────────────────────────────────────────
    workspaces = client.list_workspaces()
    
    if workspaces:
        workspace = workspaces[0]
        print(f"Using existing workspace: {workspace['name']}")
    else:
        workspace = client.create_workspace("My Extraction Project", "📄")
        print(f"Created workspace: {workspace['name']}")
    
    workspace_id = workspace['id']
    
    # ─────────────────────────────────────────────────────────────
    # Step 2: Create an extraction template
    # ─────────────────────────────────────────────────────────────
    invoice_schema = {
        "config": {
            "reasoning_enabled": True,
            "system_message": "Extract invoice data accurately. Return null for missing fields."
        },
        "groups": {
            "invoice_header": {
                "search_query": "invoice number, date, due date, total amount",
                "fields": {
                    "invoice_number": {
                        "type": "string",
                        "extraction_prompt": "Extract the invoice number or ID"
                    },
                    "invoice_date": {
                        "type": "string",
                        "extraction_prompt": "Extract the invoice date in YYYY-MM-DD format"
                    },
                    "due_date": {
                        "type": "string",
                        "extraction_prompt": "Extract the payment due date in YYYY-MM-DD format"
                    },
                    "total_amount": {
                        "type": "number",
                        "extraction_prompt": "Extract the total amount due as a number"
                    },
                    "currency": {
                        "type": "string",
                        "extraction_prompt": "Extract the currency code (USD, EUR, etc.)"
                    }
                }
            },
            "vendor_info": {
                "search_query": "vendor, supplier, company name, address, contact",
                "fields": {
                    "vendor_name": {
                        "type": "string",
                        "extraction_prompt": "Extract the vendor/supplier company name"
                    },
                    "vendor_address": {
                        "type": "string",
                        "extraction_prompt": "Extract the vendor's full address"
                    }
                }
            },
            "line_items": {
                "search_query": "items, products, services, quantity, price",
                "fields": {
                    "items": {
                        "type": "array",
                        "items": {
                            "type": "object",
                            "properties": {
                                "description": { "type": "string" },
                                "quantity": { "type": "number" },
                                "unit_price": { "type": "number" },
                                "total": { "type": "number" }
                            }
                        },
                        "extraction_prompt": "Extract all line items with description, quantity, unit price, and line total"
                    }
                }
            }
        }
    }
    
    template = client.create_template(
        workspace_id=workspace_id,
        name="Invoice Extractor",
        description="Extracts key data from invoices",
        schema=invoice_schema
    )
    print(f"Created template: {template['name']} ({template['id']})")
    
    # ─────────────────────────────────────────────────────────────
    # Step 3: Upload documents and create sessions with auto-extract
    # ─────────────────────────────────────────────────────────────
    documents = ["invoice1.pdf", "invoice2.pdf"]
    file_keys = []
    
    for doc_path in documents:
        print(f"Uploading: {doc_path}")
        key = client.upload_file(doc_path)
        file_keys.append(key)
        print(f"  → Uploaded as: {key}")
    
    # Batch create sessions with auto_extract enabled
    sessions = client.batch_create_sessions(
        template_id=template['id'],
        file_keys=file_keys,
        auto_extract=True  # Extraction starts automatically!
    )
    print(f"Created {len(sessions)} sessions with auto-extraction enabled")
    
    # ─────────────────────────────────────────────────────────────
    # Step 4: Wait for results and retrieve extracted data
    # ─────────────────────────────────────────────────────────────
    for session in sessions:
        print(f"\nWaiting for session {session['id']}...")
        
        try:
            result = client.wait_for_completion(session['id'], timeout=300)
            
            print(f"✅ Extraction complete!")
            print(f"   Invoice #: {result['data'].get('invoice_header', {}).get('invoice_number')}")
            print(f"   Total: {result['data'].get('invoice_header', {}).get('total_amount')}")
            print(f"   Vendor: {result['data'].get('vendor_info', {}).get('vendor_name')}")
            
            # Access line items
            items = result['data'].get('line_items', {}).get('items', [])
            print(f"   Line items: {len(items)}")
            
        except TimeoutError:
            print(f"⏱️ Timeout waiting for session {session['id']}")
        except Exception as e:
            print(f"❌ Error: {e}")


if __name__ == "__main__":
    main()

Step-by-Step Breakdown

1. Create a Workspace

1

Check existing workspaces

First, list your existing workspaces to see if you already have one to use.
GET /workspaces HTTP/1.1
Host: api.raydocs.com
Authorization: Bearer <token>
2

Create new workspace (if needed)

If you need a new workspace, create one:
POST /workspaces/create HTTP/1.1
Host: api.raydocs.com
Authorization: Bearer <token>
Content-Type: application/json

{
  "name": "Invoice Processing",
  "icon": "📄"
}

2. Create an Extraction Template

Define what data you want to extract using a JSON schema:
POST /extractions/templates HTTP/1.1
Host: api.raydocs.com
Authorization: Bearer <token>
Content-Type: application/json

{
  "workspace_id": 1,
  "name": "Invoice Extractor",
  "schema_json": {
    "config": {
      "reasoning_enabled": true
    },
    "groups": {
      "invoice_header": {
        "search_query": "invoice number, date, total",
        "fields": {
          "invoice_number": { "type": "string" },
          "total_amount": { "type": "number" }
        }
      }
    }
  }
}
See the Extraction Schema Guide for detailed schema documentation.

3. Upload Documents with Auto-Extract

The upload process uses signed URLs for secure, direct-to-storage uploads:
1

Get a signed upload URL

POST /vapor/signed-storage-url HTTP/1.1
Host: api.raydocs.com
Authorization: Bearer <token>
Content-Type: application/json

{
  "content_type": "application/pdf",
  "visibility": "private"
}
Response includes the upload URL and file key:
{
  "url": "https://s3.amazonaws.com/bucket/tmp/abc123...",
  "key": "tmp/abc123-invoice.pdf",
  "headers": {}
}
2

Upload file to signed URL

Upload your file directly to the S3 URL (no Authorization header needed):
PUT https://s3.amazonaws.com/bucket/tmp/abc123... HTTP/1.1
Content-Type: application/pdf

<binary file data>
3

Batch create sessions with auto-extract

Use the file keys to create sessions. Enable auto_extract to start extraction immediately after document parsing:
POST /extractions/templates/{templateId}/sessions/batch HTTP/1.1
Host: api.raydocs.com
Authorization: Bearer <token>
Content-Type: application/json

{
  "files": ["tmp/abc123-invoice.pdf", "tmp/def456-invoice.pdf"],
  "settings": {
    "auto_extract": true
  }
}
When auto_extract is true, extraction begins automatically once document parsing completes. You don’t need to call the execute endpoint separately.

4. Poll for Results

Monitor extraction progress and retrieve results:
1

Check extraction status

Poll the session results endpoint:
GET /extractions/sessions/{sessionId}/results HTTP/1.1
Host: api.raydocs.com
Authorization: Bearer <token>
Response shows the current status:
{
  "data": [{
    "id": "result-uuid",
    "status": "processing",
    "created_at": "2024-01-15T10:30:00Z"
  }]
}
Status values: createdprocessingcompleted (or failed)
2

Retrieve completed result

Once status is completed, fetch the full result:
GET /extractions/results/{resultId} HTTP/1.1
Host: api.raydocs.com
Authorization: Bearer <token>
{
  "id": "result-uuid",
  "status": "completed",
  "data": {
    "invoice_header": {
      "invoice_number": "INV-2024-001",
      "total_amount": 1250.00
    }
  },
  "reasoning": {
    "invoice_header": {
      "invoice_number": {
        "reasoning": "Found 'Invoice #: INV-2024-001' on page 1...",
        "confidence": 0.95
      }
    }
  }
}

Best Practices

Always implement proper error handling:
  • 401 Unauthorized: Invalid or expired API token
  • 403 Forbidden: Token lacks required abilities
  • 422 Unprocessable: Validation errors (check errors field)
  • 429 Too Many Requests: Rate limit hit (100 req/min)
try:
    result = client.get_result(result_id)
except requests.HTTPError as e:
    if e.response.status_code == 429:
        time.sleep(60)  # Wait and retry
    elif e.response.status_code == 403:
        print("Token lacks required permissions")
    else:
        raise
Use exponential backoff for polling:
def poll_with_backoff(session_id, max_attempts=20):
    delay = 2
    for attempt in range(max_attempts):
        results = client.get_results(session_id)
        if results and results[0]['status'] == 'completed':
            return results[0]
        
        time.sleep(delay)
        delay = min(delay * 1.5, 30)  # Cap at 30 seconds
    
    raise TimeoutError("Polling timed out")
For many documents, process in batches of 50 (API limit):
def process_documents(file_paths, template_id, batch_size=50):
    all_sessions = []
    
    for i in range(0, len(file_paths), batch_size):
        batch = file_paths[i:i + batch_size]
        keys = [client.upload_file(f) for f in batch]
        sessions = client.batch_create_sessions(template_id, keys)
        all_sessions.extend(sessions)
    
    return all_sessions

Next Steps