package dev.dinauer.inspect.log; import java.io.IOException; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.websocket.OnClose; import jakarta.websocket.OnOpen; import jakarta.websocket.Session; import jakarta.websocket.server.PathParam; import jakarta.websocket.server.ServerEndpoint; import org.jboss.logging.Logger; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.PodResource; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; import dev.dinauer.inspect.log.model.KubernetesLog; import dev.dinauer.service.PodService; import dev.dinauer.utils.ClientProvider; @ServerEndpoint("/logs/deployments/{namespace}/{name}") @ApplicationScoped public class DeploymentLogWebsocket { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()).disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); private final Map> sessions = new HashMap<>(); @Inject Logger LOG; @Inject ClientProvider clientProvider; @Inject PodService podService; @OnOpen public void onOpen(Session session, @PathParam("namespace") String namespace, @PathParam("name") String name) { Uni.createFrom().voidItem().invoke(() -> { List pods = podService.findByDeployment(namespace, name); List existingLogs = new ArrayList<>(); for (Pod pod : pods) { PodResource resource = clientProvider.getClient().pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()); existingLogs.addAll(podService.getLogs(resource)); } send(session, existingLogs.stream().sorted(KubernetesLog::orderByTimestamp).toList()); sessions.put(session, new LinkedList<>()); for (Pod pod : pods) { PodResource resource = clientProvider.getClient().pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()); sessions.get(session).add(podService.watchLogs(resource, (log) -> { send(session, log); })); } }).runSubscriptionOn(Infrastructure.getDefaultWorkerPool()).subscribe().with(result -> {}, error -> {}); } @OnClose public void onClose(Session session) throws IOException { List watches = sessions.remove(session); if (watches != null) { for (LogWatch watch : watches) { watch.close(); } } session.close(); } private void send(Session session, List logs) { try { session.getAsyncRemote().sendText(OBJECT_MAPPER.writeValueAsString(logs)); } catch (Exception e) { LOG.errorf("Error sending logs to frontend via websocket: %s", e.getMessage()); } } private List toLog(List logs, String podName) { List result = new ArrayList<>(); for (String log : logs) { int indexFirstSpace = log.indexOf(" "); if (indexFirstSpace != -1) { String timestampRaw = log.substring(0, indexFirstSpace); try { String message = log.substring(indexFirstSpace + 1); result.add(new KubernetesLog(LocalDateTime.parse(timestampRaw, DateTimeFormatter.ISO_DATE_TIME), message, podName)); } catch (Exception e) { LOG.errorf("Error parsing log: %s", e.getMessage()); } } } return result; } }