Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ telegram folder-add "Work" "ProjectChat" # Add chat to folder
telegram folder-remove "Work" "ProjectChat" # Remove chat from folder
```

### Streaming (Firehose)

```bash
telegram firehose # Stream all incoming messages as NDJSON
telegram firehose --chat "MetaDAO" # Stream only one chat
telegram firehose --include-outgoing # Include your own messages
telegram firehose | jq .text # Pipe to jq for processing
```

Runs until interrupted (Ctrl-C). Each line is a JSON object with `id`, `date`, `chatId`, `chatTitle`, `sender`, `senderId`, `text`, `replyToMsgId`, and `isOutgoing`.

### Utilities

```bash
Expand Down
15 changes: 14 additions & 1 deletion SKILL.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
name: telegram
description: Telegram CLI for reading, searching, sending messages, managing groups, and syncing chat history. Use when the user asks about Telegram messages, wants to check inbox, search chats, send messages, mute/unmute chats, kick users, export history, or look up contacts and groups.
description: Telegram CLI for reading, searching, sending messages, streaming (firehose), managing groups, and syncing chat history. Use when the user asks about Telegram messages, wants to check inbox, search chats, send messages, stream real-time messages, mute/unmute chats, kick users, export history, or look up contacts and groups.
---

# 📬 Telegram CLI
Expand All @@ -18,6 +18,7 @@ Use this skill when the user:
- Needs to look up group members or admins
- Wants to mute/unmute a noisy chat or group
- Needs to kick/remove a user from a group
- Wants to stream real-time messages (firehose)
- Wants to export or sync chat history to files
- Asks to organize chats into folders
- Wants to check their logged-in account or session status
Expand Down Expand Up @@ -97,6 +98,13 @@ telegram folder-add "Work" "ProjectChat" # Add chat to folder
telegram folder-remove "Work" "ProjectChat" # Remove chat from folder
```

### Streaming (Firehose)
```bash
telegram firehose # Stream all incoming messages as NDJSON
telegram firehose --chat "ChatName" # Stream only one chat
telegram firehose --include-outgoing # Include your own outgoing messages
```

### Sync / Export
```bash
telegram sync # Sync last 7 days to ./telegram-sync
Expand Down Expand Up @@ -185,6 +193,11 @@ Kick a user from a group:
telegram kick "My Group" @spammer
```

Stream real-time messages from a group:
```bash
telegram firehose --chat "Project Chat"
```

## 📝 Notes

- Chat names can be partial matches (e.g., "MetaDAO" matches "MetaDAO Community")
Expand Down
6 changes: 3 additions & 3 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,9 @@ export async function getAdminGroups(client: TelegramClient): Promise<ChatInfo[]
return adminGroups;
}

type ResolvedEntity = Api.User | Api.Chat | Api.Channel;
export type ResolvedEntity = Api.User | Api.Chat | Api.Channel;

async function resolveChat(client: TelegramClient, identifier: string): Promise<ResolvedEntity> {
export async function resolveChat(client: TelegramClient, identifier: string): Promise<ResolvedEntity> {
// Check if it's a username (starts with @)
if (identifier.startsWith('@')) {
const entity = await client.getEntity(identifier);
Expand Down Expand Up @@ -517,7 +517,7 @@ async function resolveChat(client: TelegramClient, identifier: string): Promise<
}
}

function getChatTitle(entity: ResolvedEntity): string {
export function getChatTitle(entity: ResolvedEntity): string {
if (entity instanceof Api.User) {
return entity.firstName || entity.username || 'Unknown';
}
Expand Down
89 changes: 89 additions & 0 deletions src/commands/firehose.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Command } from 'commander';
import { Api } from 'telegram';
import { NewMessage, type NewMessageEvent } from 'telegram/events/NewMessage.js';
import { getClient, resolveChat, getChatTitle, disconnectClient } from '../client.js';

export const firehoseCommand = new Command('firehose')
.description('Stream incoming messages as NDJSON until interrupted')
.option('--chat <identifier>', 'Filter to a specific chat (name, @username, or ID)')
.option('--include-outgoing', 'Include your own outgoing messages')
.action(async (options) => {
const client = await getClient();

let chatId: number | undefined;
let chatLabel: string | undefined;

if (options.chat) {
const entity = await resolveChat(client, options.chat);
chatId = entity.id.toJSNumber();
chatLabel = getChatTitle(entity);
process.stderr.write(`Streaming messages from "${chatLabel}"…\n`);
} else {
process.stderr.write('Streaming all incoming messages…\n');
}

const handler = async (event: NewMessageEvent) => {
const msg = event.message;
if (!msg || !(msg instanceof Api.Message)) return;
if (!options.includeOutgoing && msg.out) return;

let sender = 'Unknown';
let senderId: string | undefined;
try {
if (msg.fromId) {
const e = await client.getEntity(msg.fromId);
if (e instanceof Api.User) {
sender = e.firstName || e.username || 'Unknown';
senderId = e.id.toString();
} else if (e instanceof Api.Channel || e instanceof Api.Chat) {
sender = (e as Api.Channel | Api.Chat).title || 'Unknown';
senderId = e.id.toString();
}
}
} catch { /* ignore */ }

let peerChatId: string | undefined;
let peerChatTitle: string | undefined;
try {
if (msg.peerId) {
const peer = await client.getEntity(msg.peerId);
peerChatId = peer.id.toString();
if (peer instanceof Api.User) {
peerChatTitle = peer.firstName || peer.username || undefined;
} else if (peer instanceof Api.Chat || peer instanceof Api.Channel) {
peerChatTitle = (peer as Api.Chat | Api.Channel).title || undefined;
}
}
} catch { /* ignore */ }

const line = JSON.stringify({
id: msg.id,
date: new Date(msg.date * 1000).toISOString(),
chatId: peerChatId,
chatTitle: peerChatTitle,
sender,
senderId,
text: msg.message || '',
replyToMsgId: msg.replyTo?.replyToMsgId ?? null,
isOutgoing: msg.out ?? false,
});

process.stdout.write(line + '\n');
};

const eventParams: ConstructorParameters<typeof NewMessage>[0] = {};
if (chatId) eventParams.chats = [chatId];
if (!options.includeOutgoing) eventParams.outgoing = false;

client.addEventHandler(handler, new NewMessage(eventParams));

const shutdown = async () => {
process.stderr.write('\nStopping firehose…\n');
client.removeEventHandler(handler, new NewMessage(eventParams));
await disconnectClient();
process.exit(0);
};

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
});
1 change: 1 addition & 0 deletions src/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ export { folderCommand } from './folder.js';
export { folderAddCommand } from './folder-add.js';
export { folderRemoveCommand } from './folder-remove.js';
export { writeAccessCommand } from './write-access.js';
export { firehoseCommand } from './firehose.js';
131 changes: 126 additions & 5 deletions src/commands/read.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Command } from 'commander';
import { Api } from 'telegram';
import { getClient, getMessages, disconnectClient } from '../client.js';
import { formatJson } from '../formatters/json.js';
import { formatMessages } from '../formatters/plain.js';
Expand Down Expand Up @@ -29,15 +30,135 @@ function parseTimeOffset(offset: string): Date {
}
}

const sleep = (ms: number) => new Promise(r => setTimeout(r, ms));

async function readAllChats(options: { limit: number; minDate?: Date; maxDate?: Date; format: string }) {
const client = await getClient();
const dialogs = await client.getDialogs({ limit: 500 });
const allResults: { chatTitle: string; messages: any[] }[] = [];
let fetched = 0;

for (const dialog of dialogs) {
const entity = dialog.entity;
if (!entity) continue;

if (options.minDate && dialog.message?.date) {
const lastDate = new Date(dialog.message.date * 1000);
if (lastDate < options.minDate) continue;
}

const title = dialog.title || 'Unknown';
const chatId = entity.id.toString();

try {
const msgs = await client.getMessages(entity, { limit: options.limit });
fetched++;
const filtered = [];
for (const msg of msgs) {
if (!(msg instanceof Api.Message)) continue;
const msgDate = new Date(msg.date * 1000);
if (options.minDate && msgDate < options.minDate) break;
if (options.maxDate && msgDate > options.maxDate) continue;

let sender = 'Unknown';
let senderId: string | undefined;
try {
if (msg.fromId) {
const e = await client.getEntity(msg.fromId);
if (e instanceof Api.User) {
sender = e.firstName || e.username || 'Unknown';
senderId = e.id.toString();
} else if (e instanceof Api.Channel || e instanceof Api.Chat) {
sender = (e as Api.Channel | Api.Chat).title || 'Unknown';
senderId = e.id.toString();
}
}
} catch { /* ignore */ }

filtered.push({
id: msg.id,
date: msgDate,
chatId,
chatTitle: title,
sender,
senderId,
text: msg.message || '',
replyToMsgId: msg.replyTo?.replyToMsgId ?? null,
isOutgoing: msg.out ?? false,
});
}

if (filtered.length > 0) {
allResults.push({ chatTitle: title, messages: filtered });
}

if (fetched % 5 === 0) await sleep(1000);
} catch (err: any) {
if (err?.seconds) {
process.stderr.write(`Rate limited, waiting ${err.seconds}s…\n`);
await sleep(err.seconds * 1000);
}
}
}

return allResults;
}

export const readCommand = new Command('read')
.description('Read messages from a chat')
.argument('<chat>', 'Chat name, username (@user), or ID')
.description('Read messages from a chat (or all chats with --since)')
.argument('[chat]', 'Chat name, username (@user), or ID')
.option('-n, --limit <number>', 'Number of messages to fetch', '50')
.option('--since <time>', 'Get messages since (e.g., "1h", "30m", "7d")')
.option('--until <time>', 'Get messages until (e.g., "1h", "30m", "7d")')
.option('--json', 'Output as JSON')
.option('--markdown', 'Output as Markdown')
.action(async (chat, options) => {
if (!chat && !options.since) {
console.error('Provide a chat name, or use --since to read across all chats.');
process.exit(1);
}

const format = getOutputFormat(options);

if (!chat) {
const spinner = ora('Fetching messages from all active chats...').start();
try {
const minDate = options.since ? parseTimeOffset(options.since) : undefined;
const maxDate = options.until ? parseTimeOffset(options.until) : undefined;
const results = await readAllChats({
limit: parseInt(options.limit),
minDate,
maxDate,
format,
});

spinner.stop();

if (format === 'json') {
for (const { messages } of results) {
for (const msg of messages) {
process.stdout.write(JSON.stringify(msg) + '\n');
}
}
} else {
for (const { chatTitle, messages } of results) {
if (format === 'markdown') {
console.log(formatMessagesMarkdown(messages, chatTitle));
} else {
console.log(formatMessages(messages, chatTitle));
}
}
}

await disconnectClient();
} catch (error) {
spinner.fail('Failed to fetch messages');
console.error(error instanceof Error ? error.message : error);
process.exit(1);
}
return;
}

const spinner = ora(`Fetching messages from "${chat}"...`).start();

try {
Expand All @@ -59,11 +180,11 @@ export const readCommand = new Command('read')

spinner.stop();

const format = getOutputFormat(options);

switch (format) {
case 'json':
console.log(formatJson({ chatTitle, messages }));
for (const msg of messages) {
process.stdout.write(JSON.stringify({ ...msg, chatTitle }) + '\n');
}
break;
case 'markdown':
console.log(formatMessagesMarkdown(messages, chatTitle));
Expand Down
4 changes: 4 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
folderAddCommand,
folderRemoveCommand,
writeAccessCommand,
firehoseCommand,
} from './commands/index.js';

const require = createRequire(import.meta.url);
Expand Down Expand Up @@ -72,6 +73,9 @@ program.addCommand(replyCommand);
// Configuration
program.addCommand(writeAccessCommand);

// Streaming
program.addCommand(firehoseCommand);

// Utilities
program.addCommand(syncCommand);

Expand Down