Use FFmpeg API in Python
Process video files in Python without installing FFmpeg locally. Submit transcoding jobs, generate thumbnails, extract audio, and handle results with simple HTTP requests using the requests library.
Quick start: FFmpeg Micro provides a REST API for video processing. Send a request with your video URL and FFmpeg options, receive a job ID, then poll for completion or handle webhook callbacks. Free tier includes 100 minutes.
Why use an API instead of installing FFmpeg in Python
Installing and managing FFmpeg in Python projects creates several problems:
- System dependencies vary across environments (development vs production)
- FFmpeg binaries need version management and security updates
- Video processing blocks your application thread unless you build a queue system
- Scaling requires worker management and infrastructure complexity
- Error handling for corrupted files and edge cases requires custom logic
With FFmpeg Micro API, you skip the installation entirely. Submit jobs via HTTP, receive results asynchronously, and scale processing automatically without managing infrastructure.
Setup
Install dependencies
pip install requests python-dotenvGet your API key
- Sign up at FFmpeg Micro (free tier: 100 minutes)
- Copy your API key from the dashboard
- Add it to your
.envfile:
FFMPEG_API_KEY=your_api_key_hereSubmit a processing job
Use the requests library to submit a video processing job. This example transcodes a video to 720p:
import os
import requests
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv('FFMPEG_API_KEY')
BASE_URL = 'https://api.ffmpeg-micro.com/v1'
def submit_transcode_job(input_url, output_format='mp4'):
"""Submit a video transcoding job to FFmpeg Micro API."""
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
payload = {
'inputs': [{'url': input_url}],
'outputFormat': output_format,
'options': [
{'option': '-s', 'argument': '1280x720'},
{'option': '-c:v', 'argument': 'libx264'},
{'option': '-preset', 'argument': 'fast'},
{'option': '-crf', 'argument': '23'}
]
}
response = requests.post(
f'{BASE_URL}/transcodes',
headers=headers,
json=payload
)
response.raise_for_status()
return response.json()
# Example usage
job = submit_transcode_job('https://example.com/video.mp4')
print(f"Job submitted: {job['id']}")
print(f"Status: {job['status']}")The API returns a job ID and initial status. Processing happens asynchronously.
Retrieve output
Option 1: Poll for completion
Check job status until processing completes:
import time
def wait_for_completion(job_id, max_wait=300, interval=5):
"""Poll job status until completed or max_wait seconds elapsed."""
headers = {'Authorization': f'Bearer {API_KEY}'}
start_time = time.time()
while time.time() - start_time < max_wait:
response = requests.get(
f'{BASE_URL}/transcodes/{job_id}',
headers=headers
)
response.raise_for_status()
job = response.json()
status = job['status']
if status == 'completed':
return job
elif status == 'failed':
raise Exception(f"Job failed: {job.get('error', 'Unknown error')}")
print(f"Status: {status}... waiting {interval}s")
time.sleep(interval)
raise TimeoutError(f"Job did not complete within {max_wait} seconds")
# Example usage
job = submit_transcode_job('https://example.com/video.mp4')
completed_job = wait_for_completion(job['id'])
# Get download URL
download_response = requests.get(
f"{BASE_URL}/transcodes/{completed_job['id']}/download",
headers={'Authorization': f'Bearer {API_KEY}'}
)
download_url = download_response.json()['url']
print(f"Download URL: {download_url}")Option 2: Use webhooks (recommended)
Instead of polling, configure a webhook to receive completion notifications:
def submit_with_webhook(input_url, webhook_url):
"""Submit job with webhook callback on completion."""
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
payload = {
'inputs': [{'url': input_url}],
'outputFormat': 'mp4',
'webhookUrl': webhook_url, # Your endpoint
'options': [
{'option': '-s', 'argument': '1280x720'},
{'option': '-c:v', 'argument': 'libx264'}
]
}
response = requests.post(
f'{BASE_URL}/transcodes',
headers=headers,
json=payload
)
response.raise_for_status()
return response.json()
job = submit_with_webhook(
'https://example.com/video.mp4',
'https://yourapp.com/webhooks/ffmpeg'
)Flask webhook handler
Receive and process webhook callbacks in Flask:
from flask import Flask, request, jsonify
import hmac
import hashlib
app = Flask(__name__)
@app.route('/webhooks/ffmpeg', methods=['POST'])
def handle_ffmpeg_webhook():
"""Handle completion webhook from FFmpeg Micro."""
payload = request.json
# Verify webhook signature (recommended)
signature = request.headers.get('X-FFmpeg-Signature')
if not verify_signature(request.data, signature):
return jsonify({'error': 'Invalid signature'}), 401
job_id = payload['id']
status = payload['status']
if status == 'completed':
output_url = payload.get('outputUrl')
print(f"Job {job_id} completed: {output_url}")
# Process the result (upload to S3, update database, etc.)
process_completed_video(job_id, output_url)
elif status == 'failed':
error = payload.get('error', 'Unknown error')
print(f"Job {job_id} failed: {error}")
handle_failure(job_id, error)
return jsonify({'received': True}), 200
def verify_signature(payload_bytes, signature):
"""Verify webhook signature using your webhook secret."""
webhook_secret = os.getenv('FFMPEG_WEBHOOK_SECRET')
expected = hmac.new(
webhook_secret.encode(),
payload_bytes,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
def process_completed_video(job_id, output_url):
"""Your custom logic for completed videos."""
pass
def handle_failure(job_id, error):
"""Your custom logic for failed jobs."""
pass
if __name__ == '__main__':
app.run(port=5000)FastAPI webhook handler
Or use FastAPI for async webhook processing:
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
import hmac
import hashlib
app = FastAPI()
class WebhookPayload(BaseModel):
id: str
status: str
outputUrl: str | None = None
error: str | None = None
@app.post('/webhooks/ffmpeg')
async def handle_ffmpeg_webhook(
request: Request,
payload: WebhookPayload
):
"""Handle FFmpeg job completion webhook."""
# Verify signature
signature = request.headers.get('x-ffmpeg-signature')
body = await request.body()
if not verify_signature(body, signature):
raise HTTPException(status_code=401, detail='Invalid signature')
if payload.status == 'completed':
print(f"Job {payload.id} completed: {payload.outputUrl}")
# Process result asynchronously
await process_completed_video(payload.id, payload.outputUrl)
elif payload.status == 'failed':
print(f"Job {payload.id} failed: {payload.error}")
await handle_failure(payload.id, payload.error)
return {'received': True}
def verify_signature(payload_bytes: bytes, signature: str) -> bool:
"""Verify webhook signature."""
webhook_secret = os.getenv('FFMPEG_WEBHOOK_SECRET')
expected = hmac.new(
webhook_secret.encode(),
payload_bytes,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature or '')
async def process_completed_video(job_id: str, output_url: str):
"""Your async processing logic."""
pass
async def handle_failure(job_id: str, error: str):
"""Your async error handling logic."""
passBatch processing example
Process multiple videos in parallel:
import concurrent.futures
def process_video_batch(video_urls, max_workers=5):
"""Submit multiple transcoding jobs in parallel."""
def submit_job(url):
try:
job = submit_transcode_job(url)
return {'url': url, 'job_id': job['id'], 'status': 'submitted'}
except Exception as e:
return {'url': url, 'error': str(e), 'status': 'failed'}
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(submit_job, video_urls))
return results
# Example: process a batch of videos
video_urls = [
'https://example.com/video1.mp4',
'https://example.com/video2.mp4',
'https://example.com/video3.mp4',
]
results = process_video_batch(video_urls)
for result in results:
if result['status'] == 'submitted':
print(f"Submitted {result['url']}: Job ID {result['job_id']}")
else:
print(f"Failed {result['url']}: {result['error']}")Use webhooks for batch jobs instead of polling each job individually. This scales better for large batches.
Error handling
Handle common errors and edge cases:
import requests
from requests.exceptions import RequestException, HTTPError, Timeout
def submit_with_retry(input_url, max_retries=3):
"""Submit job with automatic retry on network errors."""
for attempt in range(max_retries):
try:
job = submit_transcode_job(input_url)
return job
except Timeout:
print(f"Attempt {attempt + 1}: Request timeout")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
except HTTPError as e:
if e.response.status_code == 401:
raise Exception("Invalid API key")
elif e.response.status_code == 402:
raise Exception("Insufficient credits")
elif e.response.status_code == 422:
error_detail = e.response.json()
raise Exception(f"Invalid request: {error_detail}")
elif e.response.status_code >= 500:
print(f"Attempt {attempt + 1}: Server error")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
else:
raise
except RequestException as e:
print(f"Attempt {attempt + 1}: Network error: {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
# Example with error handling
try:
job = submit_with_retry('https://example.com/video.mp4')
print(f"Job submitted successfully: {job['id']}")
except Exception as e:
print(f"Failed to submit job: {e}")Common errors:
401 Unauthorized: Invalid or missing API key402 Payment Required: No remaining credits422 Unprocessable Entity: Invalid FFmpeg options or input URL500 Server Error: Temporary server issue (retry with backoff)