Skip to content

Plugins - Extend Wappa's Capabilities

Modular extensions that add powerful features to your conversational applications

Wappa's plugin system lets you add functionality without modifying core code. Each plugin provides a specific capability and integrates seamlessly with the framework.


Overview

Plugins follow a simple pattern:

from wappa import Wappa
from wappa.core.plugins import PostgresDatabasePlugin, RedisPubSubPlugin

# Add plugins during app creation
app = Wappa(cache="redis")
app.add_plugin(PostgresDatabasePlugin("postgresql://localhost/mydb"))
app.add_plugin(RedisPubSubPlugin())
app.set_event_handler(MyEventHandler())
app.run()

Available Plugins:

Plugin Purpose Use Case
PostgresDatabasePlugin Async PostgreSQL integration User profiles, order history, analytics
RedisPubSubPlugin Real-time messaging Notifications, broadcasts, multi-instance sync
CORSPlugin Cross-origin resource sharing Web dashboards, admin panels
RateLimitPlugin API rate limiting Prevent abuse, protect resources
AuthPlugin Authentication middleware Secure admin endpoints

PostgresDatabasePlugin

Async PostgreSQL integration with connection pooling and session management

Inspired by 30x-community patterns, this plugin provides production-ready database access for conversational applications.

Quick Start

from wappa import Wappa
from wappa.core.plugins.database import PostgresDatabasePlugin

# Initialize with connection string
database_plugin = PostgresDatabasePlugin(
    connection_string="postgresql://user:pass@localhost:5432/wappa_db",
    pool_size=10,
    max_overflow=20
)

app = Wappa(cache="redis")
app.add_plugin(database_plugin)
app.set_event_handler(MyEventHandler())

Configuration Options

Parameter Type Default Description
connection_string str Required PostgreSQL connection URL
pool_size int 10 Number of permanent connections
max_overflow int 20 Additional connections when needed
pool_timeout int 30 Timeout waiting for connection (seconds)
pool_recycle int 3600 Connection lifetime before recycling (seconds)
echo_sql bool False Log all SQL statements (development)

Usage in Event Handlers

from wappa import WappaEventHandler
from pydantic import BaseModel
from datetime import datetime

class UserProfile(BaseModel):
    phone: str
    name: str
    language: str = "en"
    created_at: datetime = datetime.utcnow()
    message_count: int = 0

class MyEventHandler(WappaEventHandler):
    async def handle_message(self, webhook):
        # Access database through self.db
        async with self.db.session() as session:
            # Query user profile
            query = """
                SELECT * FROM user_profiles
                WHERE phone = $1
            """
            result = await session.fetchrow(query, webhook.sender_phone)

            if result:
                # Update existing user
                update_query = """
                    UPDATE user_profiles
                    SET message_count = message_count + 1,
                        last_seen = NOW()
                    WHERE phone = $1
                    RETURNING *
                """
                user = await session.fetchrow(update_query, webhook.sender_phone)
            else:
                # Create new user
                insert_query = """
                    INSERT INTO user_profiles (phone, name, language, message_count)
                    VALUES ($1, $2, $3, 1)
                    RETURNING *
                """
                user = await session.fetchrow(
                    insert_query,
                    webhook.sender_phone,
                    webhook.sender_name,
                    "en"
                )

            # Use the user data
            await self.messenger.send_text(
                f"Welcome back! Message #{user['message_count']}",
                webhook.sender_phone
            )

Session Management

The plugin uses async context managers for proper connection handling:

# Simple query
async with self.db.session() as session:
    result = await session.fetch("SELECT * FROM users LIMIT 10")

# Transaction with multiple operations
async with self.db.session() as session:
    # Start transaction
    async with session.transaction():
        # Multiple operations in one transaction
        await session.execute("UPDATE users SET active = true WHERE id = $1", user_id)
        await session.execute("INSERT INTO user_activity (user_id, action) VALUES ($1, $2)", user_id, "login")
    # Auto-commit if successful, auto-rollback if error

# Prepared statements for performance
async with self.db.session() as session:
    stmt = await session.prepare("SELECT * FROM users WHERE phone = $1")

    for phone in phone_numbers:
        user = await stmt.fetchrow(phone)
        # Process user...

Connection Pooling

The plugin manages connection pooling automatically:

  • Lazy initialization: Pool created on first use
  • Auto-reconnection: Handles connection drops gracefully
  • Health checks: Validates connections before use
  • Connection recycling: Prevents stale connections
# Check connection status
is_healthy = await database_plugin.health_check()

# Get pool statistics
stats = database_plugin.get_pool_stats()
# {
#     "size": 10,
#     "in_use": 3,
#     "available": 7,
#     "max": 30
# }

# Graceful shutdown
await database_plugin.close()  # Called automatically on app shutdown

Database Schema Example

-- User profiles table
CREATE TABLE user_profiles (
    id SERIAL PRIMARY KEY,
    phone VARCHAR(20) UNIQUE NOT NULL,
    name VARCHAR(100),
    language VARCHAR(10) DEFAULT 'en',
    created_at TIMESTAMP DEFAULT NOW(),
    last_seen TIMESTAMP DEFAULT NOW(),
    message_count INTEGER DEFAULT 0,
    preferences JSONB DEFAULT '{}'::jsonb,
    INDEX idx_phone (phone),
    INDEX idx_created_at (created_at)
);

-- Conversation history
CREATE TABLE conversation_history (
    id SERIAL PRIMARY KEY,
    user_phone VARCHAR(20) REFERENCES user_profiles(phone),
    message_text TEXT,
    direction VARCHAR(10), -- 'inbound' or 'outbound'
    created_at TIMESTAMP DEFAULT NOW(),
    metadata JSONB DEFAULT '{}'::jsonb,
    INDEX idx_user_phone (user_phone),
    INDEX idx_created_at (created_at)
);

-- Analytics events
CREATE TABLE analytics_events (
    id SERIAL PRIMARY KEY,
    event_type VARCHAR(50),
    user_phone VARCHAR(20),
    event_data JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    INDEX idx_event_type (event_type),
    INDEX idx_created_at (created_at)
);

Best Practices

1. Use Pydantic Models

from pydantic import BaseModel

class User(BaseModel):
    phone: str
    name: str
    message_count: int

# Map database row to model
async with self.db.session() as session:
    row = await session.fetchrow("SELECT * FROM users WHERE phone = $1", phone)
    user = User(**dict(row))

2. Connection Pooling

# ✅ Good: Proper session usage
async with self.db.session() as session:
    result = await session.fetch("SELECT * FROM users")

# ❌ Bad: Holding connection too long
session = await self.db.get_session()  # Don't do this
result = await session.fetch("SELECT * FROM users")
# Connection not released!

3. Error Handling

from asyncpg import UniqueViolationError, PostgresError

async with self.db.session() as session:
    try:
        await session.execute(
            "INSERT INTO users (phone) VALUES ($1)",
            phone
        )
    except UniqueViolationError:
        # Handle duplicate phone
        await self.messenger.send_text("Already registered!", phone)
    except PostgresError as e:
        self.logger.error(f"Database error: {e}")
        # Graceful fallback

4. Query Performance

# Use indexes
CREATE INDEX idx_user_phone ON users(phone);

# Use prepared statements for repeated queries
stmt = await session.prepare("SELECT * FROM users WHERE phone = $1")

# Batch operations
async with session.transaction():
    await session.executemany(
        "INSERT INTO events (type, data) VALUES ($1, $2)",
        [(type, data) for type, data in events]
    )

RedisPubSubPlugin

Multi-tenant real-time messaging with publish/subscribe patterns

Enable real-time communication between WhatsApp users, broadcast notifications, and coordinate multiple app instances.

Quick Start

from wappa import Wappa
from wappa.core.plugins.redis import RedisPubSubPlugin

# Initialize plugin
pubsub_plugin = RedisPubSubPlugin(
    redis_url="redis://localhost:6379/3",  # Separate DB for pub/sub
    tenant_id="hotel123"
)

app = Wappa(cache="redis")
app.add_plugin(pubsub_plugin)
app.set_event_handler(MyEventHandler())

Configuration Options

Parameter Type Default Description
redis_url str \| None From settings Redis connection URL
tenant_id str Required Tenant identifier for channel isolation
channel_prefix str "wappa:pubsub" Prefix for all channels
max_connections int 10 Max PubSub connections

Publishing Messages

class MyEventHandler(WappaEventHandler):
    async def handle_message(self, webhook):
        # Publish to specific channel
        await self.pubsub.publish(
            channel="notifications",
            message={
                "type": "new_message",
                "user": webhook.sender_phone,
                "text": webhook.get_message_text()
            }
        )

        # Broadcast to all users
        await self.pubsub.publish(
            channel="broadcasts",
            message={
                "type": "announcement",
                "text": "New product launch tomorrow!"
            }
        )

        # User-specific notification
        user_channel = f"user:{webhook.sender_phone}"
        await self.pubsub.publish(
            channel=user_channel,
            message={"type": "order_update", "status": "shipped"}
        )

Subscribing to Channels

async def setup_subscriptions():
    """Setup subscriptions during app startup"""

    # Subscribe to single channel
    await self.pubsub.subscribe(
        channel="notifications",
        callback=handle_notification
    )

    # Subscribe to multiple channels
    await self.pubsub.subscribe_many(
        channels=["broadcasts", "alerts", "announcements"],
        callback=handle_broadcast
    )

    # Pattern subscription (all user channels)
    await self.pubsub.psubscribe(
        pattern="user:*",
        callback=handle_user_message
    )

async def handle_notification(message: dict):
    """Process notification messages"""
    if message["type"] == "new_message":
        # Send WhatsApp notification
        await messenger.send_text(
            f"New message from {message['user']}",
            admin_phone
        )

async def handle_broadcast(message: dict):
    """Process broadcast messages"""
    if message["type"] == "announcement":
        # Send to all active users
        for user in active_users:
            await messenger.send_text(message["text"], user.phone)

Channel Patterns

1. User-Specific Channels

# Each user gets their own channel
user_channel = f"user:{user_phone}"

# Subscribe to user's channel
await self.pubsub.subscribe(user_channel, handle_user_notification)

# Publish to specific user
await self.pubsub.publish(user_channel, {"type": "order_ready"})

2. Broadcast Channels

# Global announcements
await self.pubsub.publish(
    "broadcasts",
    {"text": "System maintenance in 1 hour"}
)

# All subscribers receive the message
await self.pubsub.subscribe("broadcasts", handle_broadcast)

3. Topic Channels

# Topic-based messaging
topics = ["sports", "news", "weather"]

# User subscribes to topics
for topic in user_topics:
    await self.pubsub.subscribe(f"topic:{topic}", handle_topic_update)

# Publish to topic
await self.pubsub.publish("topic:sports", {"headline": "Team wins championship!"})

4. Multi-Instance Coordination

# Coordinate across multiple app instances
await self.pubsub.subscribe("coordination", handle_coordination)

# Instance 1 publishes
await self.pubsub.publish("coordination", {
    "action": "cache_invalidate",
    "key": "user_profiles"
})

# All instances receive and act
async def handle_coordination(message):
    if message["action"] == "cache_invalidate":
        await cache.delete(message["key"])

Multi-Tenant Isolation

The plugin automatically isolates channels by tenant:

# Tenant A
pubsub_a = RedisPubSubPlugin(tenant_id="hotel_a")
await pubsub_a.publish("notifications", msg)
# Actual channel: "wappa:pubsub:hotel_a:notifications"

# Tenant B
pubsub_b = RedisPubSubPlugin(tenant_id="hotel_b")
await pubsub_b.publish("notifications", msg)
# Actual channel: "wappa:pubsub:hotel_b:notifications"

# Completely isolated!

Complete Example: Notification System

from wappa import WappaEventHandler
from datetime import datetime

class NotificationHandler(WappaEventHandler):
    async def on_startup(self):
        """Setup subscriptions when app starts"""
        # Subscribe to notification channels
        await self.pubsub.subscribe("order_updates", self.handle_order_update)
        await self.pubsub.subscribe("promotions", self.handle_promotion)

        # Subscribe to all user-specific channels
        await self.pubsub.psubscribe("user:*", self.handle_user_notification)

    async def handle_message(self, webhook):
        """Process incoming WhatsApp messages"""
        text = webhook.get_message_text().lower()

        if "subscribe" in text:
            # User subscribes to promotions
            user_data = {"phone": webhook.sender_phone, "subscribed": True}
            await self.pubsub.publish("subscriptions", user_data)
            await self.messenger.send_text(
                "You're subscribed to promotions! 🎉",
                webhook.sender_phone
            )

        elif "order" in text:
            # Trigger order update notification
            order_data = {
                "order_id": "ORD123",
                "user_phone": webhook.sender_phone,
                "status": "preparing"
            }
            await self.pubsub.publish("order_updates", order_data)

    async def handle_order_update(self, message: dict):
        """Handle order status updates"""
        await self.messenger.send_text(
            f"Order {message['order_id']} is now {message['status']}! 📦",
            message['user_phone']
        )

    async def handle_promotion(self, message: dict):
        """Send promotions to subscribed users"""
        # Get all subscribed users from database
        async with self.db.session() as session:
            users = await session.fetch(
                "SELECT phone FROM users WHERE subscribed = true"
            )

        # Send promotion to all
        for user in users:
            await self.messenger.send_text(
                message["text"],
                user["phone"]
            )

    async def handle_user_notification(self, message: dict):
        """Handle user-specific notifications"""
        # Extract user phone from channel pattern (user:phone)
        user_phone = message["channel"].split(":")[1]

        await self.messenger.send_text(
            message["text"],
            user_phone
        )

Best Practices

1. Channel Naming Conventions

# ✅ Good: Clear, hierarchical names
"notifications"
"user:{phone}"
"topic:{topic_name}"
"events:{event_type}"

# ❌ Bad: Ambiguous or flat names
"channel1"
"msgs"
"data"

2. Message Structure

# ✅ Good: Structured message with type
{
    "type": "order_update",
    "data": {...},
    "timestamp": "2025-01-15T10:30:00Z"
}

# ❌ Bad: Unstructured message
"Order is ready"

3. Error Handling

try:
    await self.pubsub.publish("channel", message)
except Exception as e:
    self.logger.error(f"PubSub publish failed: {e}")
    # Fallback: store in database for retry
    await self.db.save_failed_message(message)

4. Cleanup

async def on_shutdown(self):
    """Cleanup subscriptions on shutdown"""
    await self.pubsub.unsubscribe_all()
    await self.pubsub.close()

Built-in Plugins

CORSPlugin

Enable Cross-Origin Resource Sharing for web dashboards:

from wappa.core.plugins import CORSPlugin

app.add_plugin(CORSPlugin(
    allow_origins=["https://dashboard.example.com"],
    allow_credentials=True,
    allow_methods=["GET", "POST"],
    allow_headers=["*"]
))

RateLimitPlugin

Protect your API from abuse:

from wappa.core.plugins import RateLimitPlugin

app.add_plugin(RateLimitPlugin(
    requests_per_minute=60,
    burst=10,
    key_func=lambda request: request.client.host
))

AuthPlugin

Secure admin endpoints:

from wappa.core.plugins import AuthPlugin

app.add_plugin(AuthPlugin(
    secret_key="your-secret-key",
    protected_paths=["/admin/*", "/api/internal/*"]
))

Creating Custom Plugins

Create your own plugins by extending WappaPlugin:

from wappa.core.plugins import WappaPlugin

class MyCustomPlugin(WappaPlugin):
    def __init__(self, config: dict):
        super().__init__()
        self.config = config

    async def on_startup(self, app):
        """Called when app starts"""
        self.logger.info("Plugin initializing...")
        # Setup resources

    async def on_shutdown(self, app):
        """Called when app stops"""
        self.logger.info("Plugin shutting down...")
        # Cleanup resources

    def register_routes(self, app):
        """Add custom API routes"""
        @app.get("/custom/endpoint")
        async def custom_endpoint():
            return {"status": "ok"}

# Use your plugin
app.add_plugin(MyCustomPlugin({"setting": "value"}))

Plugin Lifecycle

Plugins follow this lifecycle:

  1. Initialization: Plugin instance created
  2. Registration: app.add_plugin() called
  3. Startup: on_startup() called when app starts
  4. Runtime: Plugin provides services to event handlers
  5. Shutdown: on_shutdown() called when app stops

Next Steps


Plugins make Wappa infinitely extensible. Start with built-in plugins, then create your own! 🔌✨