#!/usr/bin/env node /** * MQTT → Clawdbot Bridge * * Subscribes to MQTT topics and forwards messages to Clawdbot via webhook. * * Configuration via environment variables: * MQTT_URL - MQTT broker URL (default: mqtt://localhost:1883) * MQTT_USERNAME - MQTT username (optional) * MQTT_PASSWORD - MQTT password (optional) * MQTT_TOPICS - Comma-separated list of topics to subscribe (default: clawd/#) * CLAWDBOT_URL - Clawdbot gateway URL (default: http://127.0.0.1:18789) * CLAWDBOT_TOKEN - Clawdbot webhook token (required) * CLAWDBOT_MODE - Webhook mode: 'wake' or 'agent' (default: wake) * LOG_LEVEL - Logging level: debug, info, warn, error (default: info) */ import mqtt from 'mqtt'; // Configuration const config = { mqtt: { url: process.env.MQTT_URL || 'mqtt://localhost:1883', username: process.env.MQTT_USERNAME || undefined, password: process.env.MQTT_PASSWORD || undefined, topics: (process.env.MQTT_TOPICS || 'clawd/#').split(',').map(t => t.trim()), clientId: process.env.MQTT_CLIENT_ID || `clawdbot-bridge-${Math.random().toString(16).slice(2, 8)}`, }, clawdbot: { url: process.env.CLAWDBOT_URL || 'http://127.0.0.1:18789', token: process.env.CLAWDBOT_TOKEN, mode: process.env.CLAWDBOT_MODE || 'wake', // 'wake' or 'agent' }, logLevel: process.env.LOG_LEVEL || 'info', }; // Logging const LOG_LEVELS = { debug: 0, info: 1, warn: 2, error: 3 }; const currentLogLevel = LOG_LEVELS[config.logLevel] ?? 1; const log = { debug: (...args) => currentLogLevel <= 0 && console.log('[DEBUG]', new Date().toISOString(), ...args), info: (...args) => currentLogLevel <= 1 && console.log('[INFO]', new Date().toISOString(), ...args), warn: (...args) => currentLogLevel <= 2 && console.warn('[WARN]', new Date().toISOString(), ...args), error: (...args) => currentLogLevel <= 3 && console.error('[ERROR]', new Date().toISOString(), ...args), }; // Validate config if (!config.clawdbot.token) { log.error('CLAWDBOT_TOKEN is required. Set it in your environment.'); process.exit(1); } /** * Send message to Clawdbot webhook */ async function sendToClawdbot(topic, payload) { const endpoint = config.clawdbot.mode === 'agent' ? `${config.clawdbot.url}/hooks/agent` : `${config.clawdbot.url}/hooks/wake`; const body = config.clawdbot.mode === 'agent' ? { message: `[MQTT ${topic}] ${payload}`, name: 'MQTT' } : { text: `[MQTT ${topic}] ${payload}`, mode: 'now' }; try { const response = await fetch(endpoint, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${config.clawdbot.token}`, }, body: JSON.stringify(body), }); if (!response.ok) { const text = await response.text(); throw new Error(`HTTP ${response.status}: ${text}`); } log.info(`Forwarded to Clawdbot: ${topic} → ${payload.substring(0, 100)}`); return true; } catch (err) { log.error(`Failed to forward to Clawdbot: ${err.message}`); return false; } } /** * Parse MQTT payload (handles JSON and plain text) */ function parsePayload(payload) { const str = payload.toString(); try { const json = JSON.parse(str); // If JSON has a 'message' or 'text' field, use that return json.message || json.text || json.payload || str; } catch { return str; } } /** * Main entry point */ async function main() { log.info('Starting MQTT → Clawdbot Bridge'); log.info(`MQTT broker: ${config.mqtt.url}`); log.info(`Topics: ${config.mqtt.topics.join(', ')}`); log.info(`Clawdbot: ${config.clawdbot.url} (mode: ${config.clawdbot.mode})`); // Connect to MQTT const mqttOptions = { clientId: config.mqtt.clientId, clean: true, reconnectPeriod: 5000, }; if (config.mqtt.username) { mqttOptions.username = config.mqtt.username; mqttOptions.password = config.mqtt.password; } const client = mqtt.connect(config.mqtt.url, mqttOptions); client.on('connect', () => { log.info('Connected to MQTT broker'); // Subscribe to topics for (const topic of config.mqtt.topics) { client.subscribe(topic, (err) => { if (err) { log.error(`Failed to subscribe to ${topic}: ${err.message}`); } else { log.info(`Subscribed to: ${topic}`); } }); } }); client.on('message', async (topic, payload) => { const message = parsePayload(payload); log.debug(`Received: ${topic} → ${message}`); await sendToClawdbot(topic, message); }); client.on('error', (err) => { log.error(`MQTT error: ${err.message}`); }); client.on('reconnect', () => { log.warn('Reconnecting to MQTT broker...'); }); client.on('close', () => { log.warn('MQTT connection closed'); }); // Graceful shutdown const shutdown = () => { log.info('Shutting down...'); client.end(true, () => { process.exit(0); }); }; process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown); } main().catch((err) => { log.error(`Fatal error: ${err.message}`); process.exit(1); });