Building a Trading Bot Interface with Telegram and gRPC¶
This post details the implementation of the Arbiter Telegram bot - a Python service that provides mobile-friendly control of the Rust trading engine via gRPC.
Why Telegram?¶
We needed a mobile interface without building a native app. Telegram provides:
- Push notifications - Instant alerts without polling
- No app store - Users already have Telegram installed
- Rich formatting - Markdown, inline keyboards, callbacks
- Bot API - Well-documented, reliable infrastructure
The trade-off: dependency on Telegram's platform. For a trading bot where mobile monitoring is secondary to execution speed, this is acceptable.
Architecture Overview¶
┌─────────────────────────────────────┐
│ Telegram Bot (Python) │
│ ┌────────────────────────────────┐ │
│ │ python-telegram-bot │ │
│ │ • CommandHandler │ │
│ │ • CallbackQueryHandler │ │
│ │ • ErrorHandler │ │
│ └────────────────┬───────────────┘ │
│ │ │
│ ┌────────────────▼───────────────┐ │
│ │ gRPC Client │ │
│ │ • Generated protobuf stubs │ │
│ │ • Async channel management │ │
│ └────────────────┬───────────────┘ │
└───────────────────┼─────────────────┘
│ gRPC
┌───────────────────▼─────────────────┐
│ Arbiter Engine (Rust) │
│ • TradingService │
│ • StrategyService │
│ • UserService │
└─────────────────────────────────────┘
Handler Pattern¶
Every command follows the same pattern:
async def positions_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /positions command."""
if not update.message:
return
# 1. Get gRPC client from context
client: ArbiterClient | None = context.bot_data.get("arbiter_client")
if not client:
await update.message.reply_text("Error: Backend not connected.")
return
try:
# 2. Call backend via gRPC
positions = await client.get_positions()
# 3. Format response
if not positions:
await update.message.reply_text("No open positions.", parse_mode="Markdown")
return
message = format_positions(positions)
await update.message.reply_text(message, parse_mode="Markdown")
except ArbiterClientError as e:
# 4. Handle errors gracefully
logger.error("Failed to fetch positions", error=str(e))
await update.message.reply_text(f"Error: {e}")
Key aspects:
- Early return on missing message - Handles edge cases
- Client from context - Shared connection, initialized once
- Async gRPC calls - Non-blocking communication
- Error boundaries - Never crash the handler
gRPC Client Wrapper¶
The raw generated stubs are wrapped in a client class:
class ArbiterClient:
"""Async gRPC client for Arbiter trading engine."""
def __init__(self, address: str):
self.address = address
self._channel: grpc.aio.Channel | None = None
self._trading_stub: TradingServiceStub | None = None
self._strategy_stub: StrategyServiceStub | None = None
async def connect(self) -> None:
"""Establish gRPC channel."""
self._channel = grpc.aio.insecure_channel(self.address)
self._trading_stub = TradingServiceStub(self._channel)
self._strategy_stub = StrategyServiceStub(self._channel)
async def get_positions(self) -> list[Position]:
"""Fetch all open positions."""
if not self._trading_stub:
raise ArbiterClientError("Not connected")
try:
response = await self._trading_stub.GetPositions(PositionsRequest())
return [self._convert_position(p) for p in response.positions]
except grpc.aio.AioRpcError as e:
raise ArbiterClientError(f"gRPC error: {e.code()}") from e
Benefits:
- Type conversion - Protobuf messages to Python dataclasses
- Error translation - gRPC errors to domain errors
- Connection lifecycle - Managed channel state
Inline Keyboards¶
Interactive buttons provide quick actions without typing commands:
async def home_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Dashboard with quick action buttons."""
# ... fetch data ...
keyboard = [
[
InlineKeyboardButton("📊 Positions", callback_data="positions"),
InlineKeyboardButton("💰 Wallet", callback_data="wallet"),
],
[
InlineKeyboardButton(
f"{'⏹️ Stop' if arb_enabled else '▶️ Start'} Arb",
callback_data=f"arb_{'stop' if arb_enabled else 'start'}",
),
InlineKeyboardButton("📋 Copy Trades", callback_data="copy_list"),
],
]
await update.message.reply_text(
message,
parse_mode="Markdown",
reply_markup=InlineKeyboardMarkup(keyboard),
)
Callback routing handles button presses:
async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Route inline keyboard callbacks."""
query = update.callback_query
await query.answer() # Acknowledge the callback
match query.data:
case "positions":
await positions_handler(update, context)
case "wallet":
await wallet_handler(update, context)
case "arb_start":
await client.set_arb_enabled(True)
await query.message.reply_text("🟢 Arbitrage started!")
case "arb_stop":
await client.set_arb_enabled(False)
await query.message.reply_text("🔴 Arbitrage stopped!")
Subcommand Parsing¶
Commands with subcommands (like /arb start) use argument parsing:
async def arb_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Route /arb subcommands."""
args = context.args or []
match args:
case [] | ["status"]:
await show_arb_status(update, context)
case ["start"]:
await arb_start_handler(update, context)
case ["stop"]:
await arb_stop_handler(update, context)
case _:
await update.message.reply_text(
"*Arbitrage Commands:*\n"
"/arb status - Show status\n"
"/arb start - Enable engine\n"
"/arb stop - Disable engine",
parse_mode="Markdown",
)
For commands with parameters:
async def copy_add_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /copy add <wallet> [allocation%] [max_position]."""
args = context.args or []
if len(args) < 1:
await update.message.reply_text("Usage: /copy add <wallet> [alloc%] [max$]")
return
wallet_address = args[0]
allocation = float(args[1]) if len(args) >= 2 else 10.0
max_position = float(args[2]) if len(args) >= 3 else 100.0
# Validate inputs
if not 0 < allocation <= 100:
await update.message.reply_text("Allocation must be 0-100%")
return
# Execute
await client.add_copy_trade(wallet_address, allocation, max_position)
Application Lifecycle¶
The bot initializes the gRPC client during startup:
async def post_init(application: Application) -> None:
"""Initialize after application starts."""
settings = get_settings()
client = ArbiterClient(settings.grpc_address)
await client.connect()
application.bot_data["arbiter_client"] = client
logger.info("Bot initialized", grpc_address=settings.grpc_address)
async def post_shutdown(application: Application) -> None:
"""Clean up on shutdown."""
client = application.bot_data.get("arbiter_client")
if client:
await client.close()
def create_application() -> Application:
"""Build the Telegram application."""
settings = get_settings()
return (
Application.builder()
.token(settings.telegram_bot_token)
.post_init(post_init)
.post_shutdown(post_shutdown)
.build()
)
Configuration with Pydantic¶
Settings load from environment variables with validation:
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
telegram_bot_token: str
telegram_allowed_users: list[int] = [] # Empty = allow all
grpc_host: str = "localhost"
grpc_port: int = 50051
log_level: str = "INFO"
model_config = {"env_file": ".env"}
@property
def grpc_address(self) -> str:
return f"{self.grpc_host}:{self.grpc_port}"
Pydantic provides:
- Type coercion - Strings to ints, comma-separated to lists
- Validation - Fail fast on invalid config
- Defaults - Sensible fallbacks
- Documentation - Self-describing fields
Testing Strategy¶
Handlers are tested in isolation using python-telegram-bot's testing utilities:
import pytest
from unittest.mock import AsyncMock, MagicMock
@pytest.fixture
def mock_client():
"""Create mock gRPC client."""
client = AsyncMock(spec=ArbiterClient)
client.get_positions.return_value = [
Position(market_id="BTC-50K", side="long", size=100, pnl=50.0)
]
return client
@pytest.mark.asyncio
async def test_positions_handler_shows_positions(mock_client):
"""Test /positions command displays positions."""
update = MagicMock()
update.message.reply_text = AsyncMock()
context = MagicMock()
context.bot_data = {"arbiter_client": mock_client}
await positions_handler(update, context)
# Verify gRPC call
mock_client.get_positions.assert_called_once()
# Verify response
call_args = update.message.reply_text.call_args
assert "BTC-50K" in call_args[0][0]
assert "$50.00" in call_args[0][0]
Test coverage includes:
| Category | Tests |
|---|---|
| Command handlers | 35 |
| Callback handlers | 15 |
| Configuration | 5 |
| Error handling | 5 |
| Total | 60 |
Lessons Learned¶
- Context is your friend - Store shared resources in
bot_data - Async all the way - Don't block the event loop
- Error boundaries - Handle every gRPC failure gracefully
- Markdown escaping - User input can break formatting
- Callback data limits - 64 bytes max, use IDs not full data
Future Improvements¶
- Conversation handlers - Multi-step wizards for complex actions
- Push notifications - Alert on significant P&L changes
- Rate limiting - Per-user command throttling
- Localization - Multi-language support
References¶
- ADR-008: Control Interface - Architecture decision
- Telegram Bot Guide - User documentation
- python-telegram-bot docs - Framework reference