Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,13 @@

import com.google.auto.service.AutoService;

import java.io.IOException;

@AutoService(Telegram.class)
public class LocalTelegram extends Telegram implements ContainerDeployable<TelegramClientContainer> {
private final TelegramClientContainer container = new TelegramClientContainer(image(), getEnv());

@Override
public String execInContainer(String... commands) {
try {
return container.execInContainer(commands).getStdout();
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
protected String getHttpEndpoint() {
return container.getHttpEndpoint();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package software.tnb.telegram.resource.local;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;

import java.util.Map;

public class TelegramClientContainer extends GenericContainer<TelegramClientContainer> {

private static final int HTTP_PORT = 8080;

public TelegramClientContainer(String image, Map<String, String> env) {
super(image);
withEnv(env);
withExposedPorts(HTTP_PORT);
waitingFor(Wait.forHttp("/health").forPort(HTTP_PORT));
}

public String getHttpEndpoint() {
return "http://" + getHost() + ":" + getMappedPort(HTTP_PORT);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package software.tnb.telegram.resource.openshift;

import software.tnb.common.config.OpenshiftConfiguration;
import software.tnb.common.deployment.OpenshiftDeployable;
import software.tnb.common.deployment.WithName;
import software.tnb.common.openshift.OpenshiftClient;
Expand All @@ -11,20 +12,39 @@

import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import io.fabric8.kubernetes.api.model.ContainerPort;
import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
import io.fabric8.kubernetes.api.model.HTTPGetActionBuilder;
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Probe;
import io.fabric8.kubernetes.api.model.ProbeBuilder;
import io.fabric8.kubernetes.api.model.ServiceBuilder;
import io.fabric8.kubernetes.api.model.ServicePortBuilder;
import io.fabric8.openshift.api.model.RouteBuilder;
import io.fabric8.openshift.api.model.RoutePortBuilder;
import io.fabric8.openshift.api.model.RouteSpecBuilder;
import io.fabric8.openshift.api.model.RouteTargetReferenceBuilder;

@AutoService(Telegram.class)
public class OpenshiftTelegram extends Telegram implements OpenshiftDeployable, WithName {

private static final Logger LOG = LoggerFactory.getLogger(OpenshiftTelegram.class);
private static final int HTTP_PORT = 8080;

@Override
public void undeploy() {
LOG.info("Undeploying Telegram client");
LOG.debug("Deleting route");
OpenshiftClient.get().routes().withLabel(OpenshiftConfiguration.openshiftDeploymentLabel(), name()).delete();
LOG.debug("Deleting service");
OpenshiftClient.get().services().withLabel(OpenshiftConfiguration.openshiftDeploymentLabel(), name()).delete();
LOG.debug("Deleting deployment");
OpenshiftClient.get().apps().deployments().withName(name()).delete();
WaitUtils.waitFor(() -> servicePod() == null, "Waiting until the pod is removed");
}
Expand All @@ -40,11 +60,64 @@ public void closeResources() {
@Override
public void create() {
LOG.info("Deploying Telegram client");

Map<String, Integer> ports = Map.of(name(), HTTP_PORT);
List<ContainerPort> containerPorts = ports.entrySet().stream()
.map(e -> new ContainerPortBuilder().withName(e.getKey()).withContainerPort(e.getValue()).withProtocol("TCP").build())
.collect(Collectors.toList());

final Probe probe = new ProbeBuilder()
.withHttpGet(new HTTPGetActionBuilder().withPath("/health").withPort(new IntOrString(HTTP_PORT)).build())
.withTimeoutSeconds(10)
.build();

OpenshiftClient.get().createDeployment(Map.of(
"name", name(),
"image", image(),
"env", getEnv()
"env", getEnv(),
"ports", containerPorts,
"readinessProbe", probe,
"replicas", 1
));

LOG.debug("Creating service {}", name());
OpenshiftClient.get().services().resource(
new ServiceBuilder()
.editOrNewMetadata()
.withName(name())
.addToLabels(OpenshiftConfiguration.openshiftDeploymentLabel(), name())
.endMetadata()
.editOrNewSpec()
.addToSelector(OpenshiftConfiguration.openshiftDeploymentLabel(), name())
.addToPorts(new ServicePortBuilder()
.withName(name())
.withPort(HTTP_PORT)
.withTargetPort(new IntOrString(HTTP_PORT))
.build()
)
.endSpec()
.build()
).serverSideApply();

LOG.debug("Creating route {}", name());
OpenshiftClient.get().routes().resource(
new RouteBuilder()
.editOrNewMetadata()
.withName(name())
.addToLabels(OpenshiftConfiguration.openshiftDeploymentLabel(), name())
.endMetadata()
.editOrNewSpecLike(new RouteSpecBuilder()
.withTo(new RouteTargetReferenceBuilder()
.withName(name())
.withKind("Service")
.build())
.withPort(new RoutePortBuilder()
.withTargetPort(new IntOrString(HTTP_PORT))
.build())
.build())
.endSpec()
.build()
).serverSideApply();
}

@Override
Expand All @@ -58,12 +131,8 @@ public Predicate<Pod> podSelector() {
}

@Override
public String execInContainer(String... commands) {
try {
return new String(servicePod().redirectingOutput().exec(commands).getOutput().readAllBytes());
} catch (IOException e) {
throw new RuntimeException("Unable to read command output: " + e);
}
protected String getHttpEndpoint() {
return "http://" + OpenshiftClient.get().routes().withName(name()).get().getSpec().getHost();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void create() {
"ports", containerPorts,
"args", Arrays.stream(startupParams()).toList(),
"livenessProbe", probe,
"readinessProbe", probe
"readinessProbe", probe,
"replicas", 1
));

// @formatter:off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ public String defaultImage() {
return "quay.io/fuse_qe/telegram-client:latest";
}

public abstract String execInContainer(String... commands);
protected abstract String getHttpEndpoint();

public TelegramValidation validation() {
if (validation == null) {
validation = new TelegramValidation(this);
validation = new TelegramValidation(getHttpEndpoint());
}
return validation;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package software.tnb.telegram.validation;

import software.tnb.common.utils.HTTPUtils;
import software.tnb.common.validation.Validation;
import software.tnb.telegram.service.Telegram;
import software.tnb.telegram.validation.model.Message;

import org.slf4j.Logger;
Expand All @@ -12,31 +12,59 @@

import java.util.List;

import okhttp3.MediaType;
import okhttp3.RequestBody;

public class TelegramValidation implements Validation {
private final Telegram client;
private final String httpEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();
private final HTTPUtils httpClient;

private static final Logger LOG = LoggerFactory.getLogger(TelegramValidation.class);
private static final MediaType JSON = MediaType.get("application/json; charset=utf-8");

public TelegramValidation(Telegram client) {
this.client = client;
public TelegramValidation(String httpEndpoint) {
this.httpEndpoint = httpEndpoint;
HTTPUtils.OkHttpClientBuilder okHttpClientBuilder = new HTTPUtils.OkHttpClientBuilder();
okHttpClientBuilder.trustAllSslClient();
this.httpClient = HTTPUtils.getInstance(okHttpClientBuilder.build());
}

public String sendMessage(String text) {
LOG.debug("Sending message {} from telegram-client", text);
try {
return client.execInContainer("python3", "/app/send_message.py", text);
String endpoint = httpEndpoint + "/messages";
String requestJson = objectMapper.writeValueAsString(
objectMapper.createObjectNode().put("text", text)
);

HTTPUtils.Response response = httpClient.post(endpoint, RequestBody.create(requestJson, JSON));

if (!response.isSuccessful()) {
throw new RuntimeException("HTTP request failed with status: " + response.getResponseCode()
+ ", body: " + response.getBody());
}

return response.getBody();
} catch (Exception e) {
throw new RuntimeException("Failed to send message", e);
}
}

public List<Message> getLastNMessages(int n) {
LOG.debug("Get last " + n + " messages: ");
LOG.debug("Get last " + n + " messages");
try {
String response = client.execInContainer("python3", "/app/get_messages.py", n + "");
LOG.debug("Received messages: {}", response);
return objectMapper.readValue(response, new TypeReference<>() {
String endpoint = httpEndpoint + "/messages?limit=" + n;

HTTPUtils.Response response = httpClient.get(endpoint);

if (!response.isSuccessful()) {
throw new RuntimeException("HTTP request failed with status: " + response.getResponseCode()
+ ", body: " + response.getBody());
}

LOG.debug("Received messages: {}", response.getBody());
return objectMapper.readValue(response.getBody(), new TypeReference<>() {
});
} catch (Exception e) {
throw new RuntimeException("Failed to get messages", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
FROM python:3.9.16-slim-bullseye

ADD *.py /app/
RUN python3 -m pip install telethon && \
RUN python3 -m pip install telethon flask && \
chgrp -R 0 /app && \
chmod -R g=u /app

USER 1001

ENTRYPOINT ["tail", "-f", "/dev/null"]
EXPOSE 8080

# Run HTTP server instead of tail -f /dev/null with execInContainer calls
# Reason: execInContainer has buffering/broken pipe issues on Podman
# HTTP provides reliable, consistent communication across Docker/Podman/Kubernetes
ENTRYPOINT ["python3", "/app/main.py"]
103 changes: 103 additions & 0 deletions system-x/services/telegram/src/main/resources/docker/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import os
import asyncio
import threading
from concurrent.futures import Future
from flask import Flask, request, jsonify

from telethon import TelegramClient
from telethon.sessions import StringSession

app = Flask(__name__)

# Environment variables
API_ID = int(os.environ["TELEGRAM_API_ID"])
API_HASH = os.environ["TELEGRAM_API_HASH"]
SESSION_STR = os.environ["TELEGRAM_SESSION"]
USERNAME = os.environ["TELEGRAM_USERNAME"]

# Global event loop and thread
loop = None
loop_thread = None


def start_event_loop():
"""Start the event loop in a background thread"""
global loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_forever()


def run_async(coro):
"""Run a coroutine in the background event loop and wait for result"""
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()


async def send_message_async(text):
"""Send a message to the configured Telegram chat"""
client = TelegramClient(StringSession(SESSION_STR), API_ID, API_HASH)
async with client:
msg_object = await client.send_message(USERNAME, text)
return f'Message Sent: "{msg_object.message}"'


def transform_message(message):
"""Transform a Telegram message to JSON-serializable dict"""
return {
'sender_id': message.sender_id,
'text': message.text,
'chat_id': message.chat.id
}


async def get_messages_async(limit):
"""Get last N messages from the configured Telegram chat"""
client = TelegramClient(StringSession(SESSION_STR), API_ID, API_HASH)
async with client:
messages = await client.get_messages(USERNAME, limit)
return list(map(transform_message, messages))


@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint"""
return jsonify({'status': 'healthy'}), 200


@app.route('/messages', methods=['POST'])
def send_message():
"""Send a message to Telegram"""
try:
data = request.get_json()
if not data or 'text' not in data:
return jsonify({'error': 'Missing "text" in request body'}), 400

text = data['text']
result = run_async(send_message_async(text))
return jsonify({'result': result}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500


@app.route('/messages', methods=['GET'])
def get_messages():
"""Get last N messages from Telegram"""
try:
limit = request.args.get('limit', default=1, type=int)
messages = run_async(get_messages_async(limit))
return jsonify(messages), 200
except Exception as e:
return jsonify({'error': str(e)}), 500


if __name__ == '__main__':
# Start the event loop in a background thread
loop_thread = threading.Thread(target=start_event_loop, daemon=True)
loop_thread.start()

# Wait a bit for the loop to start
import time
time.sleep(0.1)

app.run(host='0.0.0.0', port=8080)
Loading