Skip to content

Building a Trading Bot Interface with Telegram and gRPC

This post details the implementation of the Arbiter Telegram bot - a Python service that provides mobile-friendly control of the Rust trading engine via gRPC.

Why Telegram?

We needed a mobile interface without building a native app. Telegram provides:

  • Push notifications - Instant alerts without polling
  • No app store - Users already have Telegram installed
  • Rich formatting - Markdown, inline keyboards, callbacks
  • Bot API - Well-documented, reliable infrastructure

The trade-off: dependency on Telegram's platform. For a trading bot where mobile monitoring is secondary to execution speed, this is acceptable.

Architecture Overview

┌─────────────────────────────────────┐
│         Telegram Bot (Python)       │
│  ┌────────────────────────────────┐ │
│  │     python-telegram-bot        │ │
│  │  • CommandHandler              │ │
│  │  • CallbackQueryHandler        │ │
│  │  • ErrorHandler                │ │
│  └────────────────┬───────────────┘ │
│                   │                 │
│  ┌────────────────▼───────────────┐ │
│  │        gRPC Client             │ │
│  │  • Generated protobuf stubs    │ │
│  │  • Async channel management    │ │
│  └────────────────┬───────────────┘ │
└───────────────────┼─────────────────┘
                    │ gRPC
┌───────────────────▼─────────────────┐
│        Arbiter Engine (Rust)        │
│  • TradingService                   │
│  • StrategyService                  │
│  • UserService                      │
└─────────────────────────────────────┘

Handler Pattern

Every command follows the same pattern:

async def positions_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Handle the /positions command."""
    if not update.message:
        return

    # 1. Get gRPC client from context
    client: ArbiterClient | None = context.bot_data.get("arbiter_client")
    if not client:
        await update.message.reply_text("Error: Backend not connected.")
        return

    try:
        # 2. Call backend via gRPC
        positions = await client.get_positions()

        # 3. Format response
        if not positions:
            await update.message.reply_text("No open positions.", parse_mode="Markdown")
            return

        message = format_positions(positions)
        await update.message.reply_text(message, parse_mode="Markdown")

    except ArbiterClientError as e:
        # 4. Handle errors gracefully
        logger.error("Failed to fetch positions", error=str(e))
        await update.message.reply_text(f"Error: {e}")

Key aspects:

  1. Early return on missing message - Handles edge cases
  2. Client from context - Shared connection, initialized once
  3. Async gRPC calls - Non-blocking communication
  4. Error boundaries - Never crash the handler

gRPC Client Wrapper

The raw generated stubs are wrapped in a client class:

class ArbiterClient:
    """Async gRPC client for Arbiter trading engine."""

    def __init__(self, address: str):
        self.address = address
        self._channel: grpc.aio.Channel | None = None
        self._trading_stub: TradingServiceStub | None = None
        self._strategy_stub: StrategyServiceStub | None = None

    async def connect(self) -> None:
        """Establish gRPC channel."""
        self._channel = grpc.aio.insecure_channel(self.address)
        self._trading_stub = TradingServiceStub(self._channel)
        self._strategy_stub = StrategyServiceStub(self._channel)

    async def get_positions(self) -> list[Position]:
        """Fetch all open positions."""
        if not self._trading_stub:
            raise ArbiterClientError("Not connected")

        try:
            response = await self._trading_stub.GetPositions(PositionsRequest())
            return [self._convert_position(p) for p in response.positions]
        except grpc.aio.AioRpcError as e:
            raise ArbiterClientError(f"gRPC error: {e.code()}") from e

Benefits:

  • Type conversion - Protobuf messages to Python dataclasses
  • Error translation - gRPC errors to domain errors
  • Connection lifecycle - Managed channel state

Inline Keyboards

Interactive buttons provide quick actions without typing commands:

async def home_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Dashboard with quick action buttons."""
    # ... fetch data ...

    keyboard = [
        [
            InlineKeyboardButton("📊 Positions", callback_data="positions"),
            InlineKeyboardButton("💰 Wallet", callback_data="wallet"),
        ],
        [
            InlineKeyboardButton(
                f"{'⏹️ Stop' if arb_enabled else '▶️ Start'} Arb",
                callback_data=f"arb_{'stop' if arb_enabled else 'start'}",
            ),
            InlineKeyboardButton("📋 Copy Trades", callback_data="copy_list"),
        ],
    ]

    await update.message.reply_text(
        message,
        parse_mode="Markdown",
        reply_markup=InlineKeyboardMarkup(keyboard),
    )

Callback routing handles button presses:

async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Route inline keyboard callbacks."""
    query = update.callback_query
    await query.answer()  # Acknowledge the callback

    match query.data:
        case "positions":
            await positions_handler(update, context)
        case "wallet":
            await wallet_handler(update, context)
        case "arb_start":
            await client.set_arb_enabled(True)
            await query.message.reply_text("🟢 Arbitrage started!")
        case "arb_stop":
            await client.set_arb_enabled(False)
            await query.message.reply_text("🔴 Arbitrage stopped!")

Subcommand Parsing

Commands with subcommands (like /arb start) use argument parsing:

async def arb_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Route /arb subcommands."""
    args = context.args or []

    match args:
        case [] | ["status"]:
            await show_arb_status(update, context)
        case ["start"]:
            await arb_start_handler(update, context)
        case ["stop"]:
            await arb_stop_handler(update, context)
        case _:
            await update.message.reply_text(
                "*Arbitrage Commands:*\n"
                "/arb status - Show status\n"
                "/arb start - Enable engine\n"
                "/arb stop - Disable engine",
                parse_mode="Markdown",
            )

For commands with parameters:

async def copy_add_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Handle /copy add <wallet> [allocation%] [max_position]."""
    args = context.args or []

    if len(args) < 1:
        await update.message.reply_text("Usage: /copy add <wallet> [alloc%] [max$]")
        return

    wallet_address = args[0]
    allocation = float(args[1]) if len(args) >= 2 else 10.0
    max_position = float(args[2]) if len(args) >= 3 else 100.0

    # Validate inputs
    if not 0 < allocation <= 100:
        await update.message.reply_text("Allocation must be 0-100%")
        return

    # Execute
    await client.add_copy_trade(wallet_address, allocation, max_position)

Application Lifecycle

The bot initializes the gRPC client during startup:

async def post_init(application: Application) -> None:
    """Initialize after application starts."""
    settings = get_settings()

    client = ArbiterClient(settings.grpc_address)
    await client.connect()

    application.bot_data["arbiter_client"] = client
    logger.info("Bot initialized", grpc_address=settings.grpc_address)


async def post_shutdown(application: Application) -> None:
    """Clean up on shutdown."""
    client = application.bot_data.get("arbiter_client")
    if client:
        await client.close()


def create_application() -> Application:
    """Build the Telegram application."""
    settings = get_settings()

    return (
        Application.builder()
        .token(settings.telegram_bot_token)
        .post_init(post_init)
        .post_shutdown(post_shutdown)
        .build()
    )

Configuration with Pydantic

Settings load from environment variables with validation:

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    telegram_bot_token: str
    telegram_allowed_users: list[int] = []  # Empty = allow all

    grpc_host: str = "localhost"
    grpc_port: int = 50051

    log_level: str = "INFO"

    model_config = {"env_file": ".env"}

    @property
    def grpc_address(self) -> str:
        return f"{self.grpc_host}:{self.grpc_port}"

Pydantic provides:

  • Type coercion - Strings to ints, comma-separated to lists
  • Validation - Fail fast on invalid config
  • Defaults - Sensible fallbacks
  • Documentation - Self-describing fields

Testing Strategy

Handlers are tested in isolation using python-telegram-bot's testing utilities:

import pytest
from unittest.mock import AsyncMock, MagicMock

@pytest.fixture
def mock_client():
    """Create mock gRPC client."""
    client = AsyncMock(spec=ArbiterClient)
    client.get_positions.return_value = [
        Position(market_id="BTC-50K", side="long", size=100, pnl=50.0)
    ]
    return client


@pytest.mark.asyncio
async def test_positions_handler_shows_positions(mock_client):
    """Test /positions command displays positions."""
    update = MagicMock()
    update.message.reply_text = AsyncMock()

    context = MagicMock()
    context.bot_data = {"arbiter_client": mock_client}

    await positions_handler(update, context)

    # Verify gRPC call
    mock_client.get_positions.assert_called_once()

    # Verify response
    call_args = update.message.reply_text.call_args
    assert "BTC-50K" in call_args[0][0]
    assert "$50.00" in call_args[0][0]

Test coverage includes:

Category Tests
Command handlers 35
Callback handlers 15
Configuration 5
Error handling 5
Total 60

Lessons Learned

  1. Context is your friend - Store shared resources in bot_data
  2. Async all the way - Don't block the event loop
  3. Error boundaries - Handle every gRPC failure gracefully
  4. Markdown escaping - User input can break formatting
  5. Callback data limits - 64 bytes max, use IDs not full data

Future Improvements

  • Conversation handlers - Multi-step wizards for complex actions
  • Push notifications - Alert on significant P&L changes
  • Rate limiting - Per-user command throttling
  • Localization - Multi-language support

References