Plugins - Extend Wappa's Capabilities¶
Modular extensions that add powerful features to your conversational applications
Wappa's plugin system lets you add functionality without modifying core code. Each plugin provides a specific capability and integrates seamlessly with the framework.
Overview¶
Plugins follow a simple pattern:
from wappa import Wappa
from wappa.core.plugins import PostgresDatabasePlugin, RedisPubSubPlugin
# Add plugins during app creation
app = Wappa(cache="redis")
app.add_plugin(PostgresDatabasePlugin("postgresql://localhost/mydb"))
app.add_plugin(RedisPubSubPlugin())
app.set_event_handler(MyEventHandler())
app.run()
Available Plugins:
| Plugin | Purpose | Use Case |
|---|---|---|
| PostgresDatabasePlugin | Async PostgreSQL integration | User profiles, order history, analytics |
| RedisPubSubPlugin | Real-time messaging | Notifications, broadcasts, multi-instance sync |
| CORSPlugin | Cross-origin resource sharing | Web dashboards, admin panels |
| RateLimitPlugin | API rate limiting | Prevent abuse, protect resources |
| AuthPlugin | Authentication middleware | Secure admin endpoints |
PostgresDatabasePlugin¶
Async PostgreSQL integration with connection pooling and session management
Inspired by 30x-community patterns, this plugin provides production-ready database access for conversational applications.
Quick Start¶
from wappa import Wappa
from wappa.core.plugins.database import PostgresDatabasePlugin
# Initialize with connection string
database_plugin = PostgresDatabasePlugin(
connection_string="postgresql://user:pass@localhost:5432/wappa_db",
pool_size=10,
max_overflow=20
)
app = Wappa(cache="redis")
app.add_plugin(database_plugin)
app.set_event_handler(MyEventHandler())
Configuration Options¶
| Parameter | Type | Default | Description |
|---|---|---|---|
connection_string | str | Required | PostgreSQL connection URL |
pool_size | int | 10 | Number of permanent connections |
max_overflow | int | 20 | Additional connections when needed |
pool_timeout | int | 30 | Timeout waiting for connection (seconds) |
pool_recycle | int | 3600 | Connection lifetime before recycling (seconds) |
echo_sql | bool | False | Log all SQL statements (development) |
Usage in Event Handlers¶
from wappa import WappaEventHandler
from pydantic import BaseModel
from datetime import datetime
class UserProfile(BaseModel):
phone: str
name: str
language: str = "en"
created_at: datetime = datetime.utcnow()
message_count: int = 0
class MyEventHandler(WappaEventHandler):
async def handle_message(self, webhook):
# Access database through self.db
async with self.db.session() as session:
# Query user profile
query = """
SELECT * FROM user_profiles
WHERE phone = $1
"""
result = await session.fetchrow(query, webhook.sender_phone)
if result:
# Update existing user
update_query = """
UPDATE user_profiles
SET message_count = message_count + 1,
last_seen = NOW()
WHERE phone = $1
RETURNING *
"""
user = await session.fetchrow(update_query, webhook.sender_phone)
else:
# Create new user
insert_query = """
INSERT INTO user_profiles (phone, name, language, message_count)
VALUES ($1, $2, $3, 1)
RETURNING *
"""
user = await session.fetchrow(
insert_query,
webhook.sender_phone,
webhook.sender_name,
"en"
)
# Use the user data
await self.messenger.send_text(
f"Welcome back! Message #{user['message_count']}",
webhook.sender_phone
)
Session Management¶
The plugin uses async context managers for proper connection handling:
# Simple query
async with self.db.session() as session:
result = await session.fetch("SELECT * FROM users LIMIT 10")
# Transaction with multiple operations
async with self.db.session() as session:
# Start transaction
async with session.transaction():
# Multiple operations in one transaction
await session.execute("UPDATE users SET active = true WHERE id = $1", user_id)
await session.execute("INSERT INTO user_activity (user_id, action) VALUES ($1, $2)", user_id, "login")
# Auto-commit if successful, auto-rollback if error
# Prepared statements for performance
async with self.db.session() as session:
stmt = await session.prepare("SELECT * FROM users WHERE phone = $1")
for phone in phone_numbers:
user = await stmt.fetchrow(phone)
# Process user...
Connection Pooling¶
The plugin manages connection pooling automatically:
- Lazy initialization: Pool created on first use
- Auto-reconnection: Handles connection drops gracefully
- Health checks: Validates connections before use
- Connection recycling: Prevents stale connections
# Check connection status
is_healthy = await database_plugin.health_check()
# Get pool statistics
stats = database_plugin.get_pool_stats()
# {
# "size": 10,
# "in_use": 3,
# "available": 7,
# "max": 30
# }
# Graceful shutdown
await database_plugin.close() # Called automatically on app shutdown
Database Schema Example¶
-- User profiles table
CREATE TABLE user_profiles (
id SERIAL PRIMARY KEY,
phone VARCHAR(20) UNIQUE NOT NULL,
name VARCHAR(100),
language VARCHAR(10) DEFAULT 'en',
created_at TIMESTAMP DEFAULT NOW(),
last_seen TIMESTAMP DEFAULT NOW(),
message_count INTEGER DEFAULT 0,
preferences JSONB DEFAULT '{}'::jsonb,
INDEX idx_phone (phone),
INDEX idx_created_at (created_at)
);
-- Conversation history
CREATE TABLE conversation_history (
id SERIAL PRIMARY KEY,
user_phone VARCHAR(20) REFERENCES user_profiles(phone),
message_text TEXT,
direction VARCHAR(10), -- 'inbound' or 'outbound'
created_at TIMESTAMP DEFAULT NOW(),
metadata JSONB DEFAULT '{}'::jsonb,
INDEX idx_user_phone (user_phone),
INDEX idx_created_at (created_at)
);
-- Analytics events
CREATE TABLE analytics_events (
id SERIAL PRIMARY KEY,
event_type VARCHAR(50),
user_phone VARCHAR(20),
event_data JSONB,
created_at TIMESTAMP DEFAULT NOW(),
INDEX idx_event_type (event_type),
INDEX idx_created_at (created_at)
);
Best Practices¶
1. Use Pydantic Models¶
from pydantic import BaseModel
class User(BaseModel):
phone: str
name: str
message_count: int
# Map database row to model
async with self.db.session() as session:
row = await session.fetchrow("SELECT * FROM users WHERE phone = $1", phone)
user = User(**dict(row))
2. Connection Pooling¶
# ✅ Good: Proper session usage
async with self.db.session() as session:
result = await session.fetch("SELECT * FROM users")
# ❌ Bad: Holding connection too long
session = await self.db.get_session() # Don't do this
result = await session.fetch("SELECT * FROM users")
# Connection not released!
3. Error Handling¶
from asyncpg import UniqueViolationError, PostgresError
async with self.db.session() as session:
try:
await session.execute(
"INSERT INTO users (phone) VALUES ($1)",
phone
)
except UniqueViolationError:
# Handle duplicate phone
await self.messenger.send_text("Already registered!", phone)
except PostgresError as e:
self.logger.error(f"Database error: {e}")
# Graceful fallback
4. Query Performance¶
# Use indexes
CREATE INDEX idx_user_phone ON users(phone);
# Use prepared statements for repeated queries
stmt = await session.prepare("SELECT * FROM users WHERE phone = $1")
# Batch operations
async with session.transaction():
await session.executemany(
"INSERT INTO events (type, data) VALUES ($1, $2)",
[(type, data) for type, data in events]
)
RedisPubSubPlugin¶
Multi-tenant real-time messaging with publish/subscribe patterns
Enable real-time communication between WhatsApp users, broadcast notifications, and coordinate multiple app instances.
Quick Start¶
from wappa import Wappa
from wappa.core.plugins.redis import RedisPubSubPlugin
# Initialize plugin
pubsub_plugin = RedisPubSubPlugin(
redis_url="redis://localhost:6379/3", # Separate DB for pub/sub
tenant_id="hotel123"
)
app = Wappa(cache="redis")
app.add_plugin(pubsub_plugin)
app.set_event_handler(MyEventHandler())
Configuration Options¶
| Parameter | Type | Default | Description |
|---|---|---|---|
redis_url | str \| None | From settings | Redis connection URL |
tenant_id | str | Required | Tenant identifier for channel isolation |
channel_prefix | str | "wappa:pubsub" | Prefix for all channels |
max_connections | int | 10 | Max PubSub connections |
Publishing Messages¶
class MyEventHandler(WappaEventHandler):
async def handle_message(self, webhook):
# Publish to specific channel
await self.pubsub.publish(
channel="notifications",
message={
"type": "new_message",
"user": webhook.sender_phone,
"text": webhook.get_message_text()
}
)
# Broadcast to all users
await self.pubsub.publish(
channel="broadcasts",
message={
"type": "announcement",
"text": "New product launch tomorrow!"
}
)
# User-specific notification
user_channel = f"user:{webhook.sender_phone}"
await self.pubsub.publish(
channel=user_channel,
message={"type": "order_update", "status": "shipped"}
)
Subscribing to Channels¶
async def setup_subscriptions():
"""Setup subscriptions during app startup"""
# Subscribe to single channel
await self.pubsub.subscribe(
channel="notifications",
callback=handle_notification
)
# Subscribe to multiple channels
await self.pubsub.subscribe_many(
channels=["broadcasts", "alerts", "announcements"],
callback=handle_broadcast
)
# Pattern subscription (all user channels)
await self.pubsub.psubscribe(
pattern="user:*",
callback=handle_user_message
)
async def handle_notification(message: dict):
"""Process notification messages"""
if message["type"] == "new_message":
# Send WhatsApp notification
await messenger.send_text(
f"New message from {message['user']}",
admin_phone
)
async def handle_broadcast(message: dict):
"""Process broadcast messages"""
if message["type"] == "announcement":
# Send to all active users
for user in active_users:
await messenger.send_text(message["text"], user.phone)
Channel Patterns¶
1. User-Specific Channels¶
# Each user gets their own channel
user_channel = f"user:{user_phone}"
# Subscribe to user's channel
await self.pubsub.subscribe(user_channel, handle_user_notification)
# Publish to specific user
await self.pubsub.publish(user_channel, {"type": "order_ready"})
2. Broadcast Channels¶
# Global announcements
await self.pubsub.publish(
"broadcasts",
{"text": "System maintenance in 1 hour"}
)
# All subscribers receive the message
await self.pubsub.subscribe("broadcasts", handle_broadcast)
3. Topic Channels¶
# Topic-based messaging
topics = ["sports", "news", "weather"]
# User subscribes to topics
for topic in user_topics:
await self.pubsub.subscribe(f"topic:{topic}", handle_topic_update)
# Publish to topic
await self.pubsub.publish("topic:sports", {"headline": "Team wins championship!"})
4. Multi-Instance Coordination¶
# Coordinate across multiple app instances
await self.pubsub.subscribe("coordination", handle_coordination)
# Instance 1 publishes
await self.pubsub.publish("coordination", {
"action": "cache_invalidate",
"key": "user_profiles"
})
# All instances receive and act
async def handle_coordination(message):
if message["action"] == "cache_invalidate":
await cache.delete(message["key"])
Multi-Tenant Isolation¶
The plugin automatically isolates channels by tenant:
# Tenant A
pubsub_a = RedisPubSubPlugin(tenant_id="hotel_a")
await pubsub_a.publish("notifications", msg)
# Actual channel: "wappa:pubsub:hotel_a:notifications"
# Tenant B
pubsub_b = RedisPubSubPlugin(tenant_id="hotel_b")
await pubsub_b.publish("notifications", msg)
# Actual channel: "wappa:pubsub:hotel_b:notifications"
# Completely isolated!
Complete Example: Notification System¶
from wappa import WappaEventHandler
from datetime import datetime
class NotificationHandler(WappaEventHandler):
async def on_startup(self):
"""Setup subscriptions when app starts"""
# Subscribe to notification channels
await self.pubsub.subscribe("order_updates", self.handle_order_update)
await self.pubsub.subscribe("promotions", self.handle_promotion)
# Subscribe to all user-specific channels
await self.pubsub.psubscribe("user:*", self.handle_user_notification)
async def handle_message(self, webhook):
"""Process incoming WhatsApp messages"""
text = webhook.get_message_text().lower()
if "subscribe" in text:
# User subscribes to promotions
user_data = {"phone": webhook.sender_phone, "subscribed": True}
await self.pubsub.publish("subscriptions", user_data)
await self.messenger.send_text(
"You're subscribed to promotions! 🎉",
webhook.sender_phone
)
elif "order" in text:
# Trigger order update notification
order_data = {
"order_id": "ORD123",
"user_phone": webhook.sender_phone,
"status": "preparing"
}
await self.pubsub.publish("order_updates", order_data)
async def handle_order_update(self, message: dict):
"""Handle order status updates"""
await self.messenger.send_text(
f"Order {message['order_id']} is now {message['status']}! 📦",
message['user_phone']
)
async def handle_promotion(self, message: dict):
"""Send promotions to subscribed users"""
# Get all subscribed users from database
async with self.db.session() as session:
users = await session.fetch(
"SELECT phone FROM users WHERE subscribed = true"
)
# Send promotion to all
for user in users:
await self.messenger.send_text(
message["text"],
user["phone"]
)
async def handle_user_notification(self, message: dict):
"""Handle user-specific notifications"""
# Extract user phone from channel pattern (user:phone)
user_phone = message["channel"].split(":")[1]
await self.messenger.send_text(
message["text"],
user_phone
)
Best Practices¶
1. Channel Naming Conventions¶
# ✅ Good: Clear, hierarchical names
"notifications"
"user:{phone}"
"topic:{topic_name}"
"events:{event_type}"
# ❌ Bad: Ambiguous or flat names
"channel1"
"msgs"
"data"
2. Message Structure¶
# ✅ Good: Structured message with type
{
"type": "order_update",
"data": {...},
"timestamp": "2025-01-15T10:30:00Z"
}
# ❌ Bad: Unstructured message
"Order is ready"
3. Error Handling¶
try:
await self.pubsub.publish("channel", message)
except Exception as e:
self.logger.error(f"PubSub publish failed: {e}")
# Fallback: store in database for retry
await self.db.save_failed_message(message)
4. Cleanup¶
async def on_shutdown(self):
"""Cleanup subscriptions on shutdown"""
await self.pubsub.unsubscribe_all()
await self.pubsub.close()
Built-in Plugins¶
CORSPlugin¶
Enable Cross-Origin Resource Sharing for web dashboards:
from wappa.core.plugins import CORSPlugin
app.add_plugin(CORSPlugin(
allow_origins=["https://dashboard.example.com"],
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"]
))
RateLimitPlugin¶
Protect your API from abuse:
from wappa.core.plugins import RateLimitPlugin
app.add_plugin(RateLimitPlugin(
requests_per_minute=60,
burst=10,
key_func=lambda request: request.client.host
))
AuthPlugin¶
Secure admin endpoints:
from wappa.core.plugins import AuthPlugin
app.add_plugin(AuthPlugin(
secret_key="your-secret-key",
protected_paths=["/admin/*", "/api/internal/*"]
))
Creating Custom Plugins¶
Create your own plugins by extending WappaPlugin:
from wappa.core.plugins import WappaPlugin
class MyCustomPlugin(WappaPlugin):
def __init__(self, config: dict):
super().__init__()
self.config = config
async def on_startup(self, app):
"""Called when app starts"""
self.logger.info("Plugin initializing...")
# Setup resources
async def on_shutdown(self, app):
"""Called when app stops"""
self.logger.info("Plugin shutting down...")
# Cleanup resources
def register_routes(self, app):
"""Add custom API routes"""
@app.get("/custom/endpoint")
async def custom_endpoint():
return {"status": "ok"}
# Use your plugin
app.add_plugin(MyCustomPlugin({"setting": "value"}))
Plugin Lifecycle¶
Plugins follow this lifecycle:
- Initialization: Plugin instance created
- Registration:
app.add_plugin()called - Startup:
on_startup()called when app starts - Runtime: Plugin provides services to event handlers
- Shutdown:
on_shutdown()called when app stops
Next Steps¶
- Database Example: See PostgresDatabasePlugin in action
- PubSub Example: Learn real-time messaging patterns
- Plugin Architecture: Understand plugin design
- Example Applications: Explore working plugin implementations
Plugins make Wappa infinitely extensible. Start with built-in plugins, then create your own! 🔌✨