← All guides

Use FFmpeg API in Python

Process video files in Python without installing FFmpeg locally. Submit transcoding jobs, generate thumbnails, extract audio, and handle results with simple HTTP requests using the requests library.

Quick start: FFmpeg Micro provides a REST API for video processing. Send a request with your video URL and FFmpeg options, receive a job ID, then poll for completion or handle webhook callbacks. Free tier includes 100 minutes.

Why use an API instead of installing FFmpeg in Python

Installing and managing FFmpeg in Python projects creates several problems:

  • System dependencies vary across environments (development vs production)
  • FFmpeg binaries need version management and security updates
  • Video processing blocks your application thread unless you build a queue system
  • Scaling requires worker management and infrastructure complexity
  • Error handling for corrupted files and edge cases requires custom logic

With FFmpeg Micro API, you skip the installation entirely. Submit jobs via HTTP, receive results asynchronously, and scale processing automatically without managing infrastructure.

Setup

Install dependencies

pip install requests python-dotenv

Get your API key

  1. Sign up at FFmpeg Micro (free tier: 100 minutes)
  2. Copy your API key from the dashboard
  3. Add it to your .env file:
FFMPEG_API_KEY=your_api_key_here

Submit a processing job

Use the requests library to submit a video processing job. This example transcodes a video to 720p:

import os
import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv('FFMPEG_API_KEY')
BASE_URL = 'https://api.ffmpeg-micro.com/v1'

def submit_transcode_job(input_url, output_format='mp4'):
    """Submit a video transcoding job to FFmpeg Micro API."""
    headers = {
        'Authorization': f'Bearer {API_KEY}',
        'Content-Type': 'application/json'
    }

    payload = {
        'inputs': [{'url': input_url}],
        'outputFormat': output_format,
        'options': [
            {'option': '-s', 'argument': '1280x720'},
            {'option': '-c:v', 'argument': 'libx264'},
            {'option': '-preset', 'argument': 'fast'},
            {'option': '-crf', 'argument': '23'}
        ]
    }

    response = requests.post(
        f'{BASE_URL}/transcodes',
        headers=headers,
        json=payload
    )
    response.raise_for_status()

    return response.json()

# Example usage
job = submit_transcode_job('https://example.com/video.mp4')
print(f"Job submitted: {job['id']}")
print(f"Status: {job['status']}")

The API returns a job ID and initial status. Processing happens asynchronously.

Retrieve output

Option 1: Poll for completion

Check job status until processing completes:

import time

def wait_for_completion(job_id, max_wait=300, interval=5):
    """Poll job status until completed or max_wait seconds elapsed."""
    headers = {'Authorization': f'Bearer {API_KEY}'}
    start_time = time.time()

    while time.time() - start_time < max_wait:
        response = requests.get(
            f'{BASE_URL}/transcodes/{job_id}',
            headers=headers
        )
        response.raise_for_status()
        job = response.json()

        status = job['status']

        if status == 'completed':
            return job
        elif status == 'failed':
            raise Exception(f"Job failed: {job.get('error', 'Unknown error')}")

        print(f"Status: {status}... waiting {interval}s")
        time.sleep(interval)

    raise TimeoutError(f"Job did not complete within {max_wait} seconds")

# Example usage
job = submit_transcode_job('https://example.com/video.mp4')
completed_job = wait_for_completion(job['id'])

# Get download URL
download_response = requests.get(
    f"{BASE_URL}/transcodes/{completed_job['id']}/download",
    headers={'Authorization': f'Bearer {API_KEY}'}
)
download_url = download_response.json()['url']
print(f"Download URL: {download_url}")

Option 2: Use webhooks (recommended)

Instead of polling, configure a webhook to receive completion notifications:

def submit_with_webhook(input_url, webhook_url):
    """Submit job with webhook callback on completion."""
    headers = {
        'Authorization': f'Bearer {API_KEY}',
        'Content-Type': 'application/json'
    }

    payload = {
        'inputs': [{'url': input_url}],
        'outputFormat': 'mp4',
        'webhookUrl': webhook_url,  # Your endpoint
        'options': [
            {'option': '-s', 'argument': '1280x720'},
            {'option': '-c:v', 'argument': 'libx264'}
        ]
    }

    response = requests.post(
        f'{BASE_URL}/transcodes',
        headers=headers,
        json=payload
    )
    response.raise_for_status()

    return response.json()

job = submit_with_webhook(
    'https://example.com/video.mp4',
    'https://yourapp.com/webhooks/ffmpeg'
)

Flask webhook handler

Receive and process webhook callbacks in Flask:

from flask import Flask, request, jsonify
import hmac
import hashlib

app = Flask(__name__)

@app.route('/webhooks/ffmpeg', methods=['POST'])
def handle_ffmpeg_webhook():
    """Handle completion webhook from FFmpeg Micro."""
    payload = request.json

    # Verify webhook signature (recommended)
    signature = request.headers.get('X-FFmpeg-Signature')
    if not verify_signature(request.data, signature):
        return jsonify({'error': 'Invalid signature'}), 401

    job_id = payload['id']
    status = payload['status']

    if status == 'completed':
        output_url = payload.get('outputUrl')
        print(f"Job {job_id} completed: {output_url}")

        # Process the result (upload to S3, update database, etc.)
        process_completed_video(job_id, output_url)

    elif status == 'failed':
        error = payload.get('error', 'Unknown error')
        print(f"Job {job_id} failed: {error}")
        handle_failure(job_id, error)

    return jsonify({'received': True}), 200

def verify_signature(payload_bytes, signature):
    """Verify webhook signature using your webhook secret."""
    webhook_secret = os.getenv('FFMPEG_WEBHOOK_SECRET')
    expected = hmac.new(
        webhook_secret.encode(),
        payload_bytes,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

def process_completed_video(job_id, output_url):
    """Your custom logic for completed videos."""
    pass

def handle_failure(job_id, error):
    """Your custom logic for failed jobs."""
    pass

if __name__ == '__main__':
    app.run(port=5000)

FastAPI webhook handler

Or use FastAPI for async webhook processing:

from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
import hmac
import hashlib

app = FastAPI()

class WebhookPayload(BaseModel):
    id: str
    status: str
    outputUrl: str | None = None
    error: str | None = None

@app.post('/webhooks/ffmpeg')
async def handle_ffmpeg_webhook(
    request: Request,
    payload: WebhookPayload
):
    """Handle FFmpeg job completion webhook."""

    # Verify signature
    signature = request.headers.get('x-ffmpeg-signature')
    body = await request.body()

    if not verify_signature(body, signature):
        raise HTTPException(status_code=401, detail='Invalid signature')

    if payload.status == 'completed':
        print(f"Job {payload.id} completed: {payload.outputUrl}")
        # Process result asynchronously
        await process_completed_video(payload.id, payload.outputUrl)

    elif payload.status == 'failed':
        print(f"Job {payload.id} failed: {payload.error}")
        await handle_failure(payload.id, payload.error)

    return {'received': True}

def verify_signature(payload_bytes: bytes, signature: str) -> bool:
    """Verify webhook signature."""
    webhook_secret = os.getenv('FFMPEG_WEBHOOK_SECRET')
    expected = hmac.new(
        webhook_secret.encode(),
        payload_bytes,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature or '')

async def process_completed_video(job_id: str, output_url: str):
    """Your async processing logic."""
    pass

async def handle_failure(job_id: str, error: str):
    """Your async error handling logic."""
    pass

Batch processing example

Process multiple videos in parallel:

import concurrent.futures

def process_video_batch(video_urls, max_workers=5):
    """Submit multiple transcoding jobs in parallel."""

    def submit_job(url):
        try:
            job = submit_transcode_job(url)
            return {'url': url, 'job_id': job['id'], 'status': 'submitted'}
        except Exception as e:
            return {'url': url, 'error': str(e), 'status': 'failed'}

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(submit_job, video_urls))

    return results

# Example: process a batch of videos
video_urls = [
    'https://example.com/video1.mp4',
    'https://example.com/video2.mp4',
    'https://example.com/video3.mp4',
]

results = process_video_batch(video_urls)

for result in results:
    if result['status'] == 'submitted':
        print(f"Submitted {result['url']}: Job ID {result['job_id']}")
    else:
        print(f"Failed {result['url']}: {result['error']}")

Use webhooks for batch jobs instead of polling each job individually. This scales better for large batches.

Error handling

Handle common errors and edge cases:

import requests
from requests.exceptions import RequestException, HTTPError, Timeout

def submit_with_retry(input_url, max_retries=3):
    """Submit job with automatic retry on network errors."""

    for attempt in range(max_retries):
        try:
            job = submit_transcode_job(input_url)
            return job

        except Timeout:
            print(f"Attempt {attempt + 1}: Request timeout")
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)  # Exponential backoff

        except HTTPError as e:
            if e.response.status_code == 401:
                raise Exception("Invalid API key")
            elif e.response.status_code == 402:
                raise Exception("Insufficient credits")
            elif e.response.status_code == 422:
                error_detail = e.response.json()
                raise Exception(f"Invalid request: {error_detail}")
            elif e.response.status_code >= 500:
                print(f"Attempt {attempt + 1}: Server error")
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt)
            else:
                raise

        except RequestException as e:
            print(f"Attempt {attempt + 1}: Network error: {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)

# Example with error handling
try:
    job = submit_with_retry('https://example.com/video.mp4')
    print(f"Job submitted successfully: {job['id']}")
except Exception as e:
    print(f"Failed to submit job: {e}")

Common errors:

  • 401 Unauthorized: Invalid or missing API key
  • 402 Payment Required: No remaining credits
  • 422 Unprocessable Entity: Invalid FFmpeg options or input URL
  • 500 Server Error: Temporary server issue (retry with backoff)