File Upload Handling Overview
Build secure and robust file upload systems with validation, sanitization, virus scanning, efficient storage management, CDN integration, and proper file serving mechanisms across different backend frameworks.
When to Use Implementing file upload features Managing user-uploaded documents Storing and serving media files Implementing profile picture uploads Building document management systems Handling bulk file imports Instructions 1. Python/Flask File Upload
config.py
import os
class Config: MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50 MB UPLOAD_FOLDER = 'uploads' ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'docx', 'doc'} UPLOAD_DIRECTORY = os.path.join(os.path.dirname(file), UPLOAD_FOLDER)
file_service.py
import os import mimetypes import hashlib import secrets from werkzeug.utils import secure_filename from datetime import datetime import magic import aiofiles
class FileUploadService: def init(self, upload_dir, allowed_extensions, max_size=5010241024): self.upload_dir = upload_dir self.allowed_extensions = allowed_extensions self.max_size = max_size self.mime = magic.Magic(mime=True)
def validate_file(self, file):
"""Validate uploaded file"""
errors = []
# Check filename
if not file.filename:
errors.append('No filename provided')
# Check file extension
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
if ext not in self.allowed_extensions:
errors.append(f'File type not allowed. Allowed: {", ".join(self.allowed_extensions)}')
# Check file size
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
if file_size > self.max_size:
errors.append(f'File too large. Max size: {self.max_size / 1024 / 1024}MB')
# Check MIME type
file_content = file.read(8192)
file.seek(0)
detected_mime = self.mime.from_buffer(file_content)
if not self._is_valid_mime(detected_mime, ext):
errors.append('File MIME type does not match extension')
return errors
def _is_valid_mime(self, mime_type, ext):
"""Verify MIME type matches extension"""
allowed_mimes = {
'txt': ['text/plain'],
'pdf': ['application/pdf'],
'png': ['image/png'],
'jpg': ['image/jpeg'],
'jpeg': ['image/jpeg'],
'gif': ['image/gif'],
'doc': ['application/msword'],
'docx': ['application/vnd.openxmlformats-officedocument.wordprocessingml.document']
}
return mime_type in allowed_mimes.get(ext, [])
def save_file(self, file, user_id):
"""Save uploaded file with sanitization"""
errors = self.validate_file(file)
if errors:
return {'success': False, 'errors': errors}
# Generate secure filename
file_hash = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
safe_filename = f"{file_hash}.{ext}"
# Create user-specific directory
user_upload_dir = os.path.join(self.upload_dir, user_id)
os.makedirs(user_upload_dir, exist_ok=True)
filepath = os.path.join(user_upload_dir, safe_filename)
try:
file.save(filepath)
file_info = {
'id': file_hash,
'original_name': filename,
'safe_name': safe_filename,
'size': os.path.getsize(filepath),
'user_id': user_id,
'uploaded_at': datetime.utcnow().isoformat(),
'mime_type': self.mime.from_file(filepath)
}
return {'success': True, 'file': file_info}
except Exception as e:
return {'success': False, 'error': str(e)}
def delete_file(self, user_id, file_id):
"""Delete file safely"""
filepath = os.path.join(self.upload_dir, user_id, f"{file_id}.*")
import glob
files = glob.glob(filepath)
for f in files:
try:
os.remove(f)
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}
return {'success': False, 'error': 'File not found'}
routes.py
from flask import request, jsonify, send_file, safe_join from functools import wraps import os
file_service = FileUploadService( app.config['UPLOAD_DIRECTORY'], app.config['ALLOWED_EXTENSIONS'] )
@app.route('/api/upload', methods=['POST']) @token_required def upload_file(): if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
result = file_service.save_file(file, current_user.id)
if result['success']:
# Save metadata to database
file_record = FileRecord(
file_id=result['file']['id'],
original_name=result['file']['original_name'],
user_id=current_user.id,
size=result['file']['size'],
mime_type=result['file']['mime_type']
)
db.session.add(file_record)
db.session.commit()
return jsonify(result['file']), 201
else:
return jsonify({'errors': result.get('errors') or [result['error']]}), 400
@app.route('/api/files/
if not file_record:
return jsonify({'error': 'File not found'}), 404
# Construct safe file path
filepath = safe_join(
app.config['UPLOAD_DIRECTORY'],
current_user.id,
file_record.file_id + '.' + file_record.original_name.rsplit('.', 1)[1]
)
if filepath and os.path.exists(filepath):
return send_file(
filepath,
mimetype=file_record.mime_type,
as_attachment=True,
download_name=file_record.original_name
)
return jsonify({'error': 'File not found'}), 404
@app.route('/api/files/
if not file_record:
return jsonify({'error': 'File not found'}), 404
result = file_service.delete_file(current_user.id, file_id)
if result['success']:
db.session.delete(file_record)
db.session.commit()
return '', 204
else:
return jsonify({'error': result['error']}), 500
- Node.js Express File Upload with Multer // config.js const multer = require('multer'); const path = require('path'); const crypto = require('crypto'); const fs = require('fs');
const storage = multer.diskStorage({ destination: (req, file, cb) => { const uploadDir = path.join(__dirname, 'uploads', req.user.id); fs.mkdirSync(uploadDir, { recursive: true }); cb(null, uploadDir); }, filename: (req, file, cb) => { const hash = crypto.randomBytes(16).toString('hex'); const ext = path.extname(file.originalname); cb(null, hash + ext); } });
const fileFilter = (req, file, cb) => { const allowedMimes = [ 'image/jpeg', 'image/png', 'image/gif', 'application/pdf', 'text/plain' ];
const allowedExts = ['.pdf', '.txt', '.png', '.jpg', '.jpeg', '.gif', '.docx', '.doc'];
const ext = path.extname(file.originalname).toLowerCase();
if (!allowedMimes.includes(file.mimetype) || !allowedExts.includes(ext)) {
return cb(new Error('Invalid file type'));
}
cb(null, true);
};
const upload = multer({ storage: storage, fileFilter: fileFilter, limits: { fileSize: 50 * 1024 * 1024 // 50 MB } });
module.exports = upload;
// file-service.js const fs = require('fs').promises; const path = require('path'); const FileRecord = require('../models/FileRecord');
class FileService { async uploadFile(req) { if (!req.file) { throw new Error('No file provided'); }
const fileInfo = {
id: path.basename(req.file.filename, path.extname(req.file.filename)),
originalName: req.file.originalname,
safeName: req.file.filename,
size: req.file.size,
mimeType: req.file.mimetype,
userId: req.user.id,
uploadedAt: new Date()
};
// Save to database
const record = await FileRecord.create(fileInfo);
return record;
}
async downloadFile(fileId, userId) {
const record = await FileRecord.findOne({
where: { id: fileId, userId }
});
if (!record) {
throw new Error('File not found');
}
const filepath = path.join(__dirname, 'uploads', userId, record.safeName);
return { record, filepath };
}
async deleteFile(fileId, userId) {
const record = await FileRecord.findOne({
where: { id: fileId, userId }
});
if (!record) {
throw new Error('File not found');
}
const filepath = path.join(__dirname, 'uploads', userId, record.safeName);
await fs.unlink(filepath);
await record.destroy();
return { success: true };
}
async listUserFiles(userId, limit = 20, offset = 0) {
const { rows, count } = await FileRecord.findAndCountAll({
where: { userId },
limit,
offset,
order: [['uploadedAt', 'DESC']]
});
return { files: rows, total: count };
}
}
module.exports = new FileService();
// routes.js const express = require('express'); const upload = require('../config/multer'); const fileService = require('../services/file-service'); const { authenticate } = require('../middleware/auth');
const router = express.Router();
router.post('/upload', authenticate, upload.single('file'), async (req, res, next) => { try { const file = await fileService.uploadFile(req); res.status(201).json(file); } catch (error) { res.status(400).json({ error: error.message }); } });
router.get('/files/:fileId', authenticate, async (req, res, next) => { try { const { record, filepath } = await fileService.downloadFile( req.params.fileId, req.user.id ); res.download(filepath, record.originalName); } catch (error) { res.status(404).json({ error: error.message }); } });
router.delete('/files/:fileId', authenticate, async (req, res, next) => { try { await fileService.deleteFile(req.params.fileId, req.user.id); res.status(204).send(); } catch (error) { res.status(404).json({ error: error.message }); } });
router.get('/files', authenticate, async (req, res, next) => { try { const page = parseInt(req.query.page) || 1; const limit = parseInt(req.query.limit) || 20; const offset = (page - 1) * limit;
const { files, total } = await fileService.listUserFiles(
req.user.id,
limit,
offset
);
res.json({
data: files,
pagination: { page, limit, total, pages: Math.ceil(total / limit) }
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
module.exports = router;
- FastAPI File Upload
main.py
from fastapi import FastAPI, UploadFile, File, Depends, HTTPException from fastapi.responses import FileResponse import aiofiles import os import hashlib import secrets from pathlib import Path
app = FastAPI()
UPLOAD_DIR = Path("uploads") UPLOAD_DIR.mkdir(exist_ok=True) MAX_FILE_SIZE = 50 * 1024 * 1024 ALLOWED_EXTENSIONS = {'.txt', '.pdf', '.png', '.jpg', '.jpeg', '.gif', '.docx', '.doc'}
class FileUploadService: async def validate_file(self, file: UploadFile) -> list: """Validate uploaded file""" errors = []
# Check extension
file_ext = Path(file.filename).suffix.lower()
if file_ext not in ALLOWED_EXTENSIONS:
errors.append(f'File type not allowed')
# Check file size
content = await file.read()
await file.seek(0)
if len(content) > MAX_FILE_SIZE:
errors.append(f'File too large')
return errors
async def save_file(self, file: UploadFile, user_id: str):
"""Save uploaded file"""
errors = await self.validate_file(file)
if errors:
return {'success': False, 'errors': errors}
# Generate secure filename
file_hash = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
file_ext = Path(file.filename).suffix
safe_filename = f"{file_hash}{file_ext}"
# Create user directory
user_dir = UPLOAD_DIR / user_id
user_dir.mkdir(exist_ok=True)
filepath = user_dir / safe_filename
try:
content = await file.read()
async with aiofiles.open(filepath, 'wb') as f:
await f.write(content)
return {
'success': True,
'file': {
'id': file_hash,
'original_name': file.filename,
'safe_name': safe_filename,
'size': len(content),
'mime_type': file.content_type
}
}
except Exception as e:
return {'success': False, 'error': str(e)}
file_service = FileUploadService()
@app.post("/api/upload") async def upload_file( file: UploadFile = File(...), current_user: dict = Depends(get_current_user) ): """Upload file""" result = await file_service.save_file(file, current_user['id'])
if result['success']:
# Save to database
file_record = FileRecord(
file_id=result['file']['id'],
original_name=result['file']['original_name'],
user_id=current_user['id'],
size=result['file']['size'],
mime_type=result['file']['mime_type']
)
db.add(file_record)
await db.commit()
return result['file']
else:
raise HTTPException(status_code=400, detail=result.get('errors'))
@app.get("/api/files/{file_id}") async def download_file( file_id: str, current_user: dict = Depends(get_current_user) ): """Download file""" file_record = await db.query(FileRecord).filter( FileRecord.file_id == file_id, FileRecord.user_id == current_user['id'] ).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
filepath = UPLOAD_DIR / current_user['id'] / file_record.safe_name
return FileResponse(
path=filepath,
media_type=file_record.mime_type,
filename=file_record.original_name
)
@app.delete("/api/files/{file_id}") async def delete_file( file_id: str, current_user: dict = Depends(get_current_user) ): """Delete file""" file_record = await db.query(FileRecord).filter( FileRecord.file_id == file_id, FileRecord.user_id == current_user['id'] ).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
filepath = UPLOAD_DIR / current_user['id'] / file_record.safe_name
try:
Path(filepath).unlink()
await db.delete(file_record)
await db.commit()
return {'success': True}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
- S3/Cloud Storage Integration
s3_service.py
import boto3 from datetime import timedelta import os
class S3FileService: def init(self): self.s3_client = boto3.client( 's3', aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'), aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'), region_name=os.getenv('AWS_REGION') ) self.bucket_name = os.getenv('S3_BUCKET_NAME')
def upload_file(self, file, user_id, file_key):
"""Upload file to S3"""
try:
self.s3_client.upload_fileobj(
file,
self.bucket_name,
file_key,
ExtraArgs={
'ContentType': file.content_type,
'Metadata': {'user_id': user_id}
}
)
return {'success': True, 'key': file_key}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_signed_url(self, file_key, expires_in=3600):
"""Generate signed URL for download"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': file_key},
ExpiresIn=expires_in
)
return {'success': True, 'url': url}
except Exception as e:
return {'success': False, 'error': str(e)}
def delete_file(self, file_key):
"""Delete file from S3"""
try:
self.s3_client.delete_object(Bucket=self.bucket_name, Key=file_key)
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}
Best Practices ✅ DO Validate file extensions and MIME types Check file size before processing Use secure filenames to prevent directory traversal Store files outside web root Implement virus scanning Use CDN for file delivery Generate signed URLs for direct access Log file upload/download events Implement access control checks Clean up temporary files ❌ DON'T Trust user-provided filenames Store files in web-accessible directories Allow arbitrary file types Skip virus scanning for uploaded files Expose absolute file paths Allow unlimited file sizes Ignore access control Use predictable file paths Store sensitive metadata in filenames Forget to validate file content Complete Example @app.post("/upload") async def upload(file: UploadFile = File(...)): if file.size > 10 * 1024 * 1024: raise HTTPException(status_code=413, detail="File too large")
allowed = ['.pdf', '.txt', '.jpg']
ext = Path(file.filename).suffix
if ext not in allowed:
raise HTTPException(status_code=400, detail="Invalid file type")
filename = f"{uuid4()}{ext}"
async with aiofiles.open(f"uploads/{filename}", 'wb') as f:
await f.write(await file.read())
return {"filename": filename}