Skip to content

Event Handlers

Your event handler class - the heart of your conversational application. The WappaEventHandler abstract base class provides the foundation for handling all WhatsApp webhook events.

Overview

The WappaEventHandler follows the Template Method pattern to provide a structured approach to webhook processing:

  1. Framework Pre-processing: Automatic logging and validation
  2. Custom Business Logic: Your implementation (required for messages, optional for status/errors)
  3. Framework Post-processing: Cleanup and metrics collection
from wappa import WappaEventHandler
from wappa.webhooks import IncomingMessageWebhook, StatusWebhook, ErrorWebhook

class MyHandler(WappaEventHandler):

    async def process_message(self, webhook: IncomingMessageWebhook):
        # Required: Your message processing logic
        await self.messenger.send_text("Hello!", webhook.user.user_id)

    async def process_status(self, webhook: StatusWebhook):
        # Optional: Custom status handling
        pass

    async def process_error(self, webhook: ErrorWebhook):
        # Optional: Custom error handling  
        pass

Base Class API Reference

Core Methods

Method Required Description Parameters
process_message() Yes Handle incoming messages webhook: IncomingMessageWebhook
process_status() No Handle delivery status updates webhook: StatusWebhook
process_error() No Handle platform errors webhook: ErrorWebhook

Dependency Access

The framework injects these dependencies per-request:

Property Type Description Always Available
self.messenger IMessenger Send messages, mark as read, manage typing Yes
self.cache_factory CacheFactory Create cache instances (Memory/JSON/Redis) Yes
self.logger Logger Structured logging with module context Yes

Utility Methods

Method Returns Description
validate_dependencies() bool Check if required dependencies are injected
get_dependency_status() dict Get detailed dependency injection status
get_message_stats() dict Message processing statistics
get_status_stats() dict Status webhook statistics
get_error_stats() dict Error webhook statistics
get_all_stats() dict Combined webhook processing statistics

Implementation Patterns

Basic Message Handler

Simple conversational application that echoes messages:

class SimpleEchoHandler(WappaEventHandler):

    async def process_message(self, webhook: IncomingMessageWebhook):
        # Validate dependencies (recommended)
        if not self.validate_dependencies():
            self.logger.error("Dependencies not available")
            return

        # Extract message data
        user_id = webhook.user.user_id
        message_text = webhook.get_message_text()
        message_type = webhook.get_message_type_name()

        # Mark as read with typing indicator
        await self.messenger.mark_as_read(
            message_id=webhook.message.message_id, 
            typing=True
        )

        # Send echo response
        echo_text = f"🔄 Echo: {message_text}"
        await self.messenger.send_text(
            recipient=user_id,
            text=echo_text,
            reply_to_message_id=webhook.message.message_id
        )

Advanced Handler with State Management

Complex conversational application with user tracking and interactive features:

class AdvancedHandler(WappaEventHandler):

    def __init__(self):
        super().__init__()
        self._message_count = 0
        self._user_cache = None

    async def process_message(self, webhook: IncomingMessageWebhook):
        self._message_count += 1

        # Setup cache (always available - Memory by default)
        if not self._user_cache:
            self._user_cache = self.cache_factory.create_user_cache()

        user_id = webhook.user.user_id
        message_text = webhook.get_message_text()

        # Track user activity (cache always available)
        user_data = await self._user_cache.get(user_id) or {}
        user_data['last_message'] = message_text
        user_data['message_count'] = user_data.get('message_count', 0) + 1
        await self._user_cache.set(user_id, user_data, ttl=86400)  # 24h

        # Process based on message content
        if message_text.startswith('/'):
            await self._handle_command(webhook, message_text)
        else:
            await self._handle_regular_message(webhook)

    async def process_status(self, webhook: StatusWebhook):
        # Track delivery statistics
        status = webhook.status.value
        self.logger.info(f"Message {status} for {webhook.recipient_id}")

    async def process_error(self, webhook: ErrorWebhook):
        # Handle platform errors with custom logic
        error_count = webhook.get_error_count()
        primary_error = webhook.get_primary_error()

        self.logger.error(
            f"WhatsApp error: {primary_error.error_code} - {primary_error.error_title}"
        )

        # Implement custom error recovery logic here

Message Processing Workflow

1. Dependency Injection

The framework injects dependencies per-request to ensure proper multi-tenant support:

async def process_message(self, webhook: IncomingMessageWebhook):
    # Always validate dependencies first
    if not self.validate_dependencies():
        self.logger.error("Cannot process - dependencies missing")
        return

    # Dependencies are now available:
    # self.messenger - WhatsApp messaging client
    # self.cache_factory - Cache backend factory (may be None)
    # self.logger - Configured logger instance

2. Webhook Data Extraction

Extract information from the webhook using built-in methods:

# User information
user_id = webhook.user.user_id          # Phone number
user_name = webhook.user.profile_name   # Display name

# Message information  
message_id = webhook.message.message_id
message_text = webhook.get_message_text()      # Text content or None
message_type = webhook.get_message_type_name() # "text", "image", "audio", etc.

# Message metadata
timestamp = webhook.message.timestamp
reply_to = webhook.message.context.message_id if webhook.message.context else None

3. Message Acknowledgment

Mark messages as read to provide user feedback:

# Basic read acknowledgment
await self.messenger.mark_as_read(message_id=webhook.message.message_id)

# With typing indicator (recommended)
await self.messenger.mark_as_read(
    message_id=webhook.message.message_id,
    typing=True  # Shows typing indicator while processing
)

4. Response Handling

Send responses using the messenger interface:

# Simple text response
result = await self.messenger.send_text(
    recipient=user_id,
    text="Your message here",
    reply_to_message_id=webhook.message.message_id  # Creates reply context
)

# Check if send was successful
if result.success:
    self.logger.info(f"Message sent: {result.message_id}")
else:
    self.logger.error(f"Send failed: {result.error}")

Cache Integration

The cache factory is always available (defaults to Memory cache, or JSON/Redis if configured):

async def process_message(self, webhook: IncomingMessageWebhook):
    # Create cache instances (always available)
    user_cache = self.cache_factory.create_user_cache()
    state_cache = self.cache_factory.create_state_cache()

    # Store user data
    user_data = {
        'name': webhook.user.profile_name,
        'last_seen': time.time(),
        'message_count': user_data.get('message_count', 0) + 1
    }
    await user_cache.set(webhook.user.user_id, user_data, ttl=86400)

    # Manage conversation state
    await state_cache.set(
        f"conversation_{webhook.user.user_id}", 
        {'step': 'greeting', 'data': {}},
        ttl=3600
    )

Error Handling Best Practices

Graceful Error Recovery

async def process_message(self, webhook: IncomingMessageWebhook):
    try:
        # Your main processing logic
        await self._process_business_logic(webhook)

    except Exception as e:
        self.logger.error(f"Processing failed: {e}", exc_info=True)

        # Send user-friendly error response
        await self._send_error_response(webhook, str(e))

async def _send_error_response(self, webhook: IncomingMessageWebhook, error_details: str):
    """Send helpful error message to user."""
    try:
        error_message = (
            "❌ Sorry, something went wrong processing your message.\n"
            "Please try again in a moment."
        )

        await self.messenger.send_text(
            recipient=webhook.user.user_id,
            text=error_message,
            reply_to_message_id=webhook.message.message_id
        )
    except Exception as send_error:
        self.logger.error(f"Failed to send error response: {send_error}")

Dependency Validation

async def process_message(self, webhook: IncomingMessageWebhook):
    # Validate required dependencies
    if not self.validate_dependencies():
        self.logger.error("Cannot process message - missing dependencies")
        return

    # Cache is always available (Memory by default)
    user_cache = self.cache_factory.create_user_cache()
    # Proceed with full functionality

Performance and Statistics

Message Processing Metrics

Track performance and usage statistics:

class MetricsHandler(WappaEventHandler):

    def __init__(self):
        super().__init__()
        self._start_time = time.time()
        self._message_count = 0
        self._processing_times = []

    async def process_message(self, webhook: IncomingMessageWebhook):
        start = time.time()
        self._message_count += 1

        try:
            # Your processing logic
            await self._handle_message(webhook)

            # Record successful processing time
            processing_time = time.time() - start
            self._processing_times.append(processing_time)

            self.logger.info(
                f"Message processed in {processing_time:.2f}s "
                f"(total: {self._message_count})"
            )

        except Exception as e:
            self.logger.error(f"Processing failed after {time.time() - start:.2f}s: {e}")

    def get_performance_stats(self):
        """Get handler performance metrics."""
        if not self._processing_times:
            return {"message_count": self._message_count, "avg_time": 0}

        avg_time = sum(self._processing_times) / len(self._processing_times)
        return {
            "message_count": self._message_count,
            "avg_processing_time": avg_time,
            "total_uptime": time.time() - self._start_time
        }

Common Patterns

Command Pattern Implementation

Handle slash commands with clean separation:

async def process_message(self, webhook: IncomingMessageWebhook):
    message_text = webhook.get_message_text()

    # Check for commands
    if message_text and message_text.startswith('/'):
        await self._handle_command(webhook, message_text)
    else:
        await self._handle_regular_message(webhook)

async def _handle_command(self, webhook: IncomingMessageWebhook, command_text: str):
    """Route commands to specific handlers."""
    command = command_text.split()[0].lower()

    command_map = {
        '/help': self._handle_help_command,
        '/start': self._handle_start_command,
        '/settings': self._handle_settings_command,
    }

    handler = command_map.get(command)
    if handler:
        await handler(webhook)
    else:
        await self.messenger.send_text(
            recipient=webhook.user.user_id,
            text=f"Unknown command: {command}"
        )

State Management Pattern

Implement multi-step conversations with state persistence:

async def process_message(self, webhook: IncomingMessageWebhook):
    user_id = webhook.user.user_id

    # Check for active conversation state
    if self.cache_factory:
        state_cache = self.cache_factory.create_state_cache()
        current_state = await state_cache.get(f"conversation_{user_id}")

        if current_state:
            await self._handle_state_message(webhook, current_state)
        else:
            await self._handle_new_conversation(webhook)

async def _handle_state_message(self, webhook: IncomingMessageWebhook, state: dict):
    """Handle message based on conversation state."""
    step = state.get('step')

    if step == 'collecting_name':
        await self._collect_name(webhook, state)
    elif step == 'collecting_email':
        await self._collect_email(webhook, state)
    # ... handle other steps

Next Steps

The event handler is where your conversational application logic lives - everything else is framework infrastructure that supports your implementation.