Database Connections in Nexios: A Complete Guide with Tortoise ORM
Building modern web applications requires robust database integration. Nexios makes this seamless with its nexios-contrib-tortoise package, which integrates the powerful Tortoise ORM - an async-first ORM inspired by Django’s elegant API. In this comprehensive guide, we’ll explore everything you need to know about database connections in Nexios.
Before diving into implementation, let’s understand why Tortoise ORM is the perfect match for Nexios:
🚀 Async-First Design: Tortoise ORM is built from the ground up for async/await, perfectly matching Nexios’s async architecture for maximum performance.
🎯 Django-Inspired API: If you’ve used Django ORM, you’ll feel right at home. The familiar API makes it easy to define models and query data.
🔒 Type Safety: Full type hints support means better IDE autocomplete and fewer runtime errors.
⚡ High Performance: Built on asyncpg and aiomysql for blazing-fast database operations.
🛠️ Rich Features: Supports relations, transactions, migrations, and multiple database connections out of the box.
🔌 Seamless Integration: The nexios-contrib-tortoise package handles all the lifecycle management automatically.
Let’s start by installing the required packages:
# Install Nexios with Tortoise ORM supportpip install nexios nexios-contrib tortoise-orm
# For PostgreSQL (recommended for production)pip install asyncpg
# For MySQLpip install aiomysql
# For SQLite (development only)# No additional driver neededCreate your Nexios application with Tortoise ORM integration:
from nexios import NexiosAppfrom nexios_contrib.tortoise import init_tortoise
# Create Nexios applicationapp = NexiosApp( title="My API with Database", version="1.0.0")
# Initialize Tortoise ORMinit_tortoise( app, db_url="sqlite://db.sqlite3", modules={"models": ["app.models"]}, generate_schemas=True # Auto-create tables in development)
if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)That’s it! The init_tortoise function handles:
- Database connection on startup
- Schema generation (if enabled)
- Automatic cleanup on shutdown
- Exception handler registration
Tortoise ORM uses a Django-inspired model definition syntax. Let’s build a complete blog application:
from tortoise.models import Modelfrom tortoise import fieldsfrom typing import Optional
class User(Model): """User model for authentication and content ownership."""
id = fields.IntField(pk=True) username = fields.CharField(max_length=50, unique=True) email = fields.CharField(max_length=255, unique=True, index=True) password_hash = fields.CharField(max_length=255) full_name = fields.CharField(max_length=100, null=True) is_active = fields.BooleanField(default=True) is_admin = fields.BooleanField(default=False) created_at = fields.DatetimeField(auto_now_add=True) updated_at = fields.DatetimeField(auto_now=True)
class Meta: table = "users" ordering = ["-created_at"]
def __str__(self): return self.username
class Post(Model): """Blog post model."""
id = fields.IntField(pk=True) title = fields.CharField(max_length=200) slug = fields.CharField(max_length=200, unique=True, index=True) content = fields.TextField() excerpt = fields.CharField(max_length=500, null=True) published = fields.BooleanField(default=False) view_count = fields.IntField(default=0)
# Relationships author = fields.ForeignKeyField( "models.User", related_name="posts", on_delete=fields.CASCADE )
# Timestamps created_at = fields.DatetimeField(auto_now_add=True) updated_at = fields.DatetimeField(auto_now=True) published_at = fields.DatetimeField(null=True)
class Meta: table = "posts" ordering = ["-created_at"]
def __str__(self): return self.title
class Comment(Model): """Comment model for posts."""
id = fields.IntField(pk=True) content = fields.TextField()
# Relationships post = fields.ForeignKeyField( "models.Post", related_name="comments", on_delete=fields.CASCADE ) author = fields.ForeignKeyField( "models.User", related_name="comments", on_delete=fields.CASCADE ) parent = fields.ForeignKeyField( "models.Comment", related_name="replies", on_delete=fields.CASCADE, null=True )
# Timestamps created_at = fields.DatetimeField(auto_now_add=True) updated_at = fields.DatetimeField(auto_now=True)
class Meta: table = "comments" ordering = ["-created_at"]
class Tag(Model): """Tag model for categorizing posts."""
id = fields.IntField(pk=True) name = fields.CharField(max_length=50, unique=True) slug = fields.CharField(max_length=50, unique=True)
# Many-to-Many relationship posts = fields.ManyToManyField( "models.Post", related_name="tags", through="post_tags" )
class Meta: table = "tags"Now let’s implement API endpoints using our models:
from pydantic import BaseModel, EmailStr
class UserCreateSchema(BaseModel): username: str email: EmailStr password: str full_name: str | None = None
@app.post("/users")async def create_user(request, response): """Create a new user.""" data = await request.json()
# Validate with Pydantic user_data = UserCreateSchema(**data)
try: # Hash password before storing from passlib.hash import bcrypt password_hash = bcrypt.hash(user_data.password)
# Create user user = await User.create( username=user_data.username, email=user_data.email, password_hash=password_hash, full_name=user_data.full_name )
return response.json({ "id": user.id, "username": user.username, "email": user.email, "created_at": user.created_at.isoformat() }, status=201)
except Exception as e: # Tortoise IntegrityError automatically handled by nexios-contrib # Returns 400 with proper error message for duplicate username/email raise@app.get("/users")async def list_users(request, response): """List all users with pagination.""" # Get query parameters limit = int(request.query_params.get("limit", "10")) offset = int(request.query_params.get("offset", "0")) search = request.query_params.get("search", "")
# Build query query = User.filter(is_active=True)
if search: query = query.filter( username__icontains=search ) | query.filter( email__icontains=search )
# Get total count total = await query.count()
# Get paginated results users = await query.offset(offset).limit(limit).all()
return response.json({ "users": [ { "id": user.id, "username": user.username, "email": user.email, "full_name": user.full_name } for user in users ], "pagination": { "total": total, "limit": limit, "offset": offset, "has_next": offset + limit < total } })
@app.get("/users/{user_id:int}")async def get_user(request, response): """Get a single user by ID.""" user_id = request.path_params.user_id
try: # Fetch user with related posts user = await User.get(id=user_id).prefetch_related("posts")
return response.json({ "id": user.id, "username": user.username, "email": user.email, "full_name": user.full_name, "post_count": len(user.posts), "created_at": user.created_at.isoformat() })
except User.DoesNotExist: # Automatically returns 404 with proper error message raise@app.put("/users/{user_id:int}")async def update_user(request, response): """Update user information.""" user_id = request.path_params.user_id data = await request.json()
try: user = await User.get(id=user_id)
# Update fields if "full_name" in data: user.full_name = data["full_name"] if "email" in data: user.email = data["email"]
await user.save()
return response.json({ "id": user.id, "username": user.username, "email": user.email, "full_name": user.full_name, "updated_at": user.updated_at.isoformat() })
except User.DoesNotExist: raise@app.delete("/users/{user_id:int}")async def delete_user(request, response): """Soft delete a user.""" user_id = request.path_params.user_id
try: user = await User.get(id=user_id)
# Soft delete by marking inactive user.is_active = False await user.save()
return response.json({ "message": "User deactivated successfully" })
except User.DoesNotExist: raiseTortoise ORM makes working with relationships elegant and efficient:
@app.get("/posts/{post_id:int}")async def get_post_with_relations(request, response): """Get post with author, comments, and tags.""" post_id = request.path_params.post_id
try: # Prefetch all related data in one query post = await Post.get(id=post_id).prefetch_related( "author", "comments__author", # Nested prefetch "tags" )
return response.json({ "id": post.id, "title": post.title, "content": post.content, "author": { "id": post.author.id, "username": post.author.username }, "comments": [ { "id": comment.id, "content": comment.content, "author": { "id": comment.author.id, "username": comment.author.username } } for comment in post.comments ], "tags": [ {"id": tag.id, "name": tag.name} for tag in post.tags ] })
except Post.DoesNotExist: raise@app.post("/posts/{post_id:int}/comments")async def create_comment(request, response): """Add a comment to a post.""" post_id = request.path_params.post_id data = await request.json()
# Assume we have authenticated user user_id = request.state.user.id
try: post = await Post.get(id=post_id)
comment = await Comment.create( content=data["content"], post=post, author_id=user_id )
# Fetch author for response await comment.fetch_related("author")
return response.json({ "id": comment.id, "content": comment.content, "author": { "id": comment.author.id, "username": comment.author.username }, "created_at": comment.created_at.isoformat() }, status=201)
except Post.DoesNotExist: raise@app.post("/posts/{post_id:int}/tags")async def add_tags_to_post(request, response): """Add tags to a post.""" post_id = request.path_params.post_id data = await request.json() tag_names = data.get("tags", [])
try: post = await Post.get(id=post_id)
# Get or create tags tags = [] for tag_name in tag_names: tag, created = await Tag.get_or_create( name=tag_name, defaults={"slug": tag_name.lower().replace(" ", "-")} ) tags.append(tag)
# Add tags to post await post.tags.add(*tags)
# Fetch updated tags await post.fetch_related("tags")
return response.json({ "post_id": post.id, "tags": [ {"id": tag.id, "name": tag.name} for tag in post.tags ] })
except Post.DoesNotExist: raiseTortoise ORM provides powerful query capabilities:
from tortoise.expressions import Qfrom datetime import datetime, timedelta
@app.get("/posts/search")async def search_posts(request, response): """Advanced post search with multiple filters.""" # Get query parameters keyword = request.query_params.get("q", "") author_id = request.query_params.get("author_id") published_only = request.query_params.get("published", "true") == "true" days_ago = int(request.query_params.get("days", "30"))
# Build complex query query = Post.all()
# Keyword search in title or content if keyword: query = query.filter( Q(title__icontains=keyword) | Q(content__icontains=keyword) )
# Filter by author if author_id: query = query.filter(author_id=author_id)
# Published posts only if published_only: query = query.filter(published=True)
# Recent posts cutoff_date = datetime.now() - timedelta(days=days_ago) query = query.filter(created_at__gte=cutoff_date)
# Order by relevance and date query = query.order_by("-view_count", "-created_at")
# Execute with prefetch posts = await query.prefetch_related("author", "tags").limit(20)
return response.json({ "posts": [ { "id": post.id, "title": post.title, "excerpt": post.excerpt, "author": post.author.username, "tags": [tag.name for tag in post.tags], "view_count": post.view_count, "created_at": post.created_at.isoformat() } for post in posts ] })from tortoise.functions import Count, Avg, Sum
@app.get("/stats")async def get_statistics(request, response): """Get database statistics."""
# Count queries total_users = await User.filter(is_active=True).count() total_posts = await Post.filter(published=True).count()
# Aggregation queries user_stats = await User.annotate( post_count=Count("posts") ).filter(is_active=True).values( "id", "username", "post_count" )
# Top authors by post count top_authors = sorted( user_stats, key=lambda x: x["post_count"], reverse=True )[:5]
# Average views per post avg_views = await Post.filter(published=True).annotate( avg_views=Avg("view_count") ).values("avg_views")
return response.json({ "total_users": total_users, "total_posts": total_posts, "top_authors": top_authors, "average_views": avg_views[0]["avg_views"] if avg_views else 0 })Use transactions for operations that must succeed or fail together:
from tortoise.transactions import in_transaction
@app.post("/posts/{post_id:int}/publish")async def publish_post(request, response): """Publish a post with transaction.""" post_id = request.path_params.post_id
try: async with in_transaction(): # Get post post = await Post.get(id=post_id)
# Update post post.published = True post.published_at = datetime.now() await post.save()
# Update author stats author = await post.author author.published_posts_count = await Post.filter( author=author, published=True ).count() await author.save()
# If any operation fails, entire transaction rolls back
return response.json({ "message": "Post published successfully", "post_id": post.id })
except Post.DoesNotExist: raise@app.post("/transfer")async def complex_operation(request, response): """Complex operation with nested transactions.""" data = await request.json()
async with in_transaction() as connection: # Outer transaction user = await User.get(id=data["user_id"])
async with in_transaction(connection): # Inner transaction (savepoint) # Create post post = await Post.create( title=data["title"], content=data["content"], author=user )
# Add tags for tag_name in data.get("tags", []): tag, _ = await Tag.get_or_create(name=tag_name) await post.tags.add(tag)
# Update user stats user.total_posts += 1 await user.save()
return response.json({"post_id": post.id})import osfrom nexios_contrib.tortoise import init_tortoise
# Get database URL from environmentDATABASE_URL = os.getenv( "DATABASE_URL", "sqlite://db.sqlite3" # Default for development)
init_tortoise( app, db_url=DATABASE_URL, modules={"models": ["app.models"]}, generate_schemas=os.getenv("ENVIRONMENT") != "production")init_tortoise( app, config={ "connections": { "default": "sqlite://main.db", "users_db": "postgres://user:pass@localhost/users", "analytics_db": "postgres://user:pass@localhost/analytics" }, "apps": { "models": { "models": ["app.models"], "default_connection": "default" }, "users": { "models": ["app.user_models"], "default_connection": "users_db" }, "analytics": { "models": ["app.analytics_models"], "default_connection": "analytics_db" } } })# PostgreSQL with connection poolingDATABASE_URL = ( "postgres://user:password@localhost:5432/mydb" "?min_size=10&max_size=20" # Connection pool settings)
init_tortoise( app, db_url=DATABASE_URL, modules={"models": ["app.models"]})For production applications, use Aerich for database migrations:
# Install Aerichpip install aerich
# Initialize Aerichaerich init -t app.db_config.TORTOISE_ORM
# Initialize databaseaerich init-db
# Create migrationaerich migrate --name "add_user_table"
# Apply migrationsaerich upgradeTORTOISE_ORM = { "connections": { "default": "postgres://user:pass@localhost:5432/mydb" }, "apps": { "models": { "models": ["app.models", "aerich.models"], "default_connection": "default", }, },}# Always configure connection pools for productionDATABASE_URL = ( "postgres://user:password@host:5432/db" "?min_size=10" # Minimum connections "&max_size=50" # Maximum connections "&max_queries=50000" # Recycle connections "&max_inactive_connection_lifetime=300" # 5 minutes)from tortoise.exceptions import IntegrityError, DoesNotExist
@app.post("/users")async def create_user_safe(request, response): """Create user with proper error handling.""" data = await request.json()
try: user = await User.create(**data) return response.json({"id": user.id}, status=201)
except IntegrityError as e: # Duplicate username or email return response.json({ "error": "User already exists", "details": str(e) }, status=400)
except Exception as e: # Log error for debugging import logging logging.error(f"User creation failed: {e}") return response.json({ "error": "Internal server error" }, status=500)class Post(Model): id = fields.IntField(pk=True) slug = fields.CharField(max_length=200, unique=True, index=True) title = fields.CharField(max_length=200, index=True) # Frequently searched author = fields.ForeignKeyField("models.User", index=True) # Foreign keys created_at = fields.DatetimeField(auto_now_add=True, index=True) # Sorting
class Meta: table = "posts" indexes = [ ("author", "created_at"), # Composite index ("published", "created_at") # For published posts query ]# ❌ Bad: N+1 query problemposts = await Post.all()for post in posts: author = await post.author # Separate query for each post! print(author.username)
# ✅ Good: Use prefetch_relatedposts = await Post.all().prefetch_related("author")for post in posts: print(post.author.username) # No additional queries
# ✅ Even better: Select only needed fieldsposts = await Post.all().prefetch_related("author").values( "id", "title", "author__username")import timeimport logging
logger = logging.getLogger(__name__)
@app.middlewareasync def database_timing_middleware(request, response, call_next): """Log slow database queries.""" start_time = time.time()
result = await call_next()
duration = time.time() - start_time if duration > 1.0: # Log queries taking more than 1 second logger.warning( f"Slow request: {request.method} {request.url} " f"took {duration:.2f}s" )
return result# Configure read replicas for scalinginit_tortoise( app, config={ "connections": { "master": "postgres://user:pass@master:5432/db", "replica1": "postgres://user:pass@replica1:5432/db", "replica2": "postgres://user:pass@replica2:5432/db" }, "apps": { "models": { "models": ["app.models"], "default_connection": "master" } }, "routers": ["app.db_router.ReadWriteRouter"] })
# app/db_router.pyclass ReadWriteRouter: """Route reads to replicas, writes to master."""
def db_for_read(self, model): import random return random.choice(["replica1", "replica2"])
def db_for_write(self, model): return "master"import pytestfrom tortoise.contrib.test import initializer, finalizer
@pytest.fixture(scope="session", autouse=True)async def initialize_tests(): """Initialize test database.""" await initializer( ["app.models"], db_url="sqlite://:memory:", # In-memory database for tests app_label="models" ) yield await finalizer()
@pytest.fixtureasync def clean_db(): """Clean database before each test.""" from app.models import User, Post, Comment await User.all().delete() await Post.all().delete() await Comment.all().delete()import pytestfrom app.models import User, Post
@pytest.mark.asyncioasync def test_create_user(clean_db): """Test user creation.""" user = await User.create( username="testuser", email="test@example.com", password_hash="hashed" )
assert user.id is not None assert user.username == "testuser"
# Verify in database db_user = await User.get(id=user.id) assert db_user.username == "testuser"
@pytest.mark.asyncioasync def test_user_posts_relationship(clean_db): """Test user-posts relationship.""" user = await User.create( username="author", email="author@example.com", password_hash="hashed" )
post = await Post.create( title="Test Post", content="Content", author=user )
# Fetch with relationship await user.fetch_related("posts") assert len(user.posts) == 1 assert user.posts[0].title == "Test Post"Integrating databases with Nexios using Tortoise ORM gives you:
✅ Async Performance: Native async/await support for high-concurrency applications
✅ Developer Experience: Django-inspired API that’s intuitive and productive
✅ Type Safety: Full type hints for better IDE support and fewer errors
✅ Production Ready: Connection pooling, transactions, and migrations built-in
✅ Seamless Integration: The nexios-contrib-tortoise package handles all lifecycle management
✅ Scalability: Support for read replicas, connection pooling, and query optimization
Whether you’re building a simple API or a complex enterprise application, Nexios + Tortoise ORM provides the perfect foundation for robust database operations.
Ready to get started? Install the packages:
pip install nexios nexios-contrib tortoise-orm asyncpgCheck out the Nexios Tortoise ORM documentation (opens in a new window) for more details and examples!
Never use generate_schemas=True in production! Always use proper migrations with Aerich to manage schema changes safely.