Add dynamic logs and pods

This commit is contained in:
andreas.dinauer 2025-11-06 14:36:58 +01:00
parent 4d3fe1953b
commit a42768ca49
18 changed files with 415 additions and 43 deletions

View File

@ -69,6 +69,10 @@
<groupId>io.quarkus</groupId> <groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler</artifactId> <artifactId>quarkus-scheduler</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-websockets</artifactId>
</dependency>
<!-- Hibernate ORM specific dependencies --> <!-- Hibernate ORM specific dependencies -->
<dependency> <dependency>

View File

@ -51,7 +51,7 @@ public class LogResource
public List<KubernetesLog> getLogs(Pod pod, LocalDateTime from) public List<KubernetesLog> getLogs(Pod pod, LocalDateTime from)
{ {
String command = String.format("kubectl --kubeconfig=%s logs %s -n %s --timestamps", clientProvider.pathToKubeconfig(), pod.getMetadata().getName(), pod.getMetadata().getNamespace()); String command = String.format("kubectl --kubeconfig=%s logs %s -n %s --timestamps --tail=1000", clientProvider.pathToKubeconfig(), pod.getMetadata().getName(), pod.getMetadata().getNamespace());
List<KubernetesLog> result = new ArrayList<>(); List<KubernetesLog> result = new ArrayList<>();
List<String> logs = processRunner.runToLines(command); List<String> logs = processRunner.runToLines(command);
for (String log : logs) for (String log : logs)

View File

@ -0,0 +1,121 @@
package dev.dinauer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import dev.dinauer.utils.ClientProvider;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.jboss.logging.Logger;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
@ServerEndpoint("/logs/{namespace}/{name}")
@ApplicationScoped
public class LogWebsocket
{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()).disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
private final Map<Session, LogWatch> sessions = new HashMap<>();
private final Map<Session, BufferedReader> readers = new HashMap<>();
@Inject
Logger LOG;
@Inject
ClientProvider clientProvider;
@Inject
ManagedExecutor executor;
@OnOpen
public void onOpen(Session session, @PathParam("namespace") String namespace, @PathParam("name") String name)
{
executor.runAsync(() -> {
List<String> existingLogs = Arrays.stream(clientProvider.getClient().pods().inNamespace(namespace).withName(name).usingTimestamps().tailingLines(200).getLog().split("\n")).toList();
send(session, toLog(existingLogs));
LogWatch watch = clientProvider.getClient().pods().inNamespace(namespace).withName(name).usingTimestamps().tailingLines(0).watchLog();
sessions.put(session, watch);
BufferedReader reader = new BufferedReader(new InputStreamReader(watch.getOutput()));
readers.put(session, reader);
try
{
String line;
while ((line = reader.readLine()) != null)
{
send(session, toLog(List.of(line)));
}
}
catch (Exception e)
{
LOG.errorf("Error reading output of log watch: %s", e.getMessage());
}
});
}
@OnClose
public void onClose(Session session) throws IOException
{
LogWatch watch = sessions.remove(session);
if (watch != null)
{
watch.close();
}
BufferedReader reader = readers.remove(session);
if (reader != null)
{
reader.close();
}
session.close();
}
private void send(Session session, List<KubernetesLog> logs)
{
try
{
session.getAsyncRemote().sendText(OBJECT_MAPPER.writeValueAsString(logs));
}
catch (Exception e)
{
LOG.errorf("Error sending logs to frontend via websocket: %s", e.getMessage());
}
}
private List<KubernetesLog> toLog(List<String> logs)
{
List<KubernetesLog> result = new ArrayList<>();
for (String log : logs)
{
int indexFirstSpace = log.indexOf(" ");
if (indexFirstSpace != -1)
{
String timestampRaw = log.substring(0, indexFirstSpace);
String message = log.substring(indexFirstSpace).trim();
try
{
result.add(new KubernetesLog(LocalDateTime.parse(timestampRaw, DateTimeFormatter.ISO_DATE_TIME), message));
}
catch (Exception e)
{
LOG.errorf("Error parsing log: %s", e.getMessage());
}
}
}
return result;
}
}

View File

@ -55,4 +55,11 @@ public class PodResource
} }
return result; return result;
} }
@DELETE
@Path("/{namespace}/{name}")
public void delete(@PathParam("namespace") String namespace, @PathParam("name") String name)
{
podService.delete(name, namespace);
}
} }

View File

@ -0,0 +1,9 @@
package dev.dinauer;
import dev.dinauer.inspect.websocket.EventType;
import java.util.List;
public record ResourceEvent(EventType type, List<?> resources)
{
}

View File

@ -1,6 +1,7 @@
package dev.dinauer; package dev.dinauer;
import dev.dinauer.service.*; import dev.dinauer.service.*;
import io.fabric8.kubernetes.api.model.ConfigMap;
import jakarta.annotation.security.RolesAllowed; import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.ws.rs.*; import jakarta.ws.rs.*;
@ -40,6 +41,9 @@ public class ResourceResource
@Inject @Inject
SecretService secretService; SecretService secretService;
@Inject
ConfigMapService configMapService;
@GET @GET
public List<?> get(@PathParam("resource") String resourceType) public List<?> get(@PathParam("resource") String resourceType)
{ {
@ -128,6 +132,10 @@ public class ResourceResource
{ {
return secretService; return secretService;
} }
case ResourceType.CONFIG_MAP ->
{
return configMapService;
}
default -> default ->
{ {
LOG.errorf("Invalid resource type %s.", resourceType); LOG.errorf("Invalid resource type %s.", resourceType);

View File

@ -10,4 +10,5 @@ public class ResourceType
public static final String CUSTOM_RESOURCE_DEFINITION = "custom-resource-definitions"; public static final String CUSTOM_RESOURCE_DEFINITION = "custom-resource-definitions";
public static final String NODE = "nodes"; public static final String NODE = "nodes";
public static final String SECRET = "secrets"; public static final String SECRET = "secrets";
public static final String CONFIG_MAP = "config-maps";
} }

View File

@ -0,0 +1,6 @@
package dev.dinauer.inspect.websocket;
public enum EventType
{
INIT, ADDED, MODIFIED, DELETED, ERROR, BOOKMARK
}

View File

@ -0,0 +1,122 @@
package dev.dinauer.inspect.websocket;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.dinauer.ResourceEvent;
import dev.dinauer.ResourceType;
import dev.dinauer.utils.ClientProvider;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import org.eclipse.microprofile.context.ManagedExecutor;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ServerEndpoint("/watch/{resource-type}/{namespace}")
@ApplicationScoped
public class ResourceWebsocket
{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Inject
ClientProvider clientProvider;
@Inject
ManagedExecutor executor;
private final Map<Session, Watch> sessions = new HashMap<>();
@OnOpen
public void onOpen(Session session, @PathParam("resource-type") String resourceType, @PathParam("namespace") String namespace)
{
executor.runAsync(() ->
{
if (ResourceType.POD.equals(resourceType))
{
String version = clientProvider.getClient().pods().inAnyNamespace().list().getMetadata().getResourceVersion();
if (isGlobal(namespace))
{
send(session, EventType.INIT, clientProvider.getClient().pods().inAnyNamespace().list().getItems());
sessions.put(session, clientProvider.getClient().pods().inAnyNamespace().withResourceVersion(version).watch(getWatcher(session)));
}
else
{
sessions.put(session, clientProvider.getClient().pods().inNamespace(namespace).watch(getWatcher(session)));
}
}
if (ResourceType.CONFIG_MAP.equals(resourceType))
{
if (isGlobal(namespace))
{
sessions.put(session, clientProvider.getClient().configMaps().inAnyNamespace().watch(getWatcher(session)));
}
else
{
sessions.put(session, clientProvider.getClient().configMaps().inNamespace(namespace).watch(getWatcher(session)));
}
}
});
}
@OnClose
public void onClose(Session session)
{
Watch watch = sessions.remove(session);
if (watch != null)
{
watch.close();
}
}
private <T> Watcher<T> getWatcher(Session session)
{
return new Watcher<T>()
{
@Override
public void eventReceived(Action action, T t)
{
send(session, EventType.valueOf(action.name()), List.of(t));
}
@Override
public void onClose(WatcherException e)
{
try
{
session.close();
}
catch (IOException ex)
{
throw new RuntimeException("Cannot close session", ex);
}
}
};
}
private boolean isGlobal(String namespace)
{
return namespace == null || namespace.isBlank() || namespace.equals("_all");
}
private void send(Session session, EventType type, List<?> objects)
{
try
{
session.getAsyncRemote().sendText(OBJECT_MAPPER.writeValueAsString(new ResourceEvent(type, objects)));
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
}
}

View File

@ -1,7 +1,7 @@
package dev.dinauer.monitoring; package dev.dinauer.monitoring;
import dev.dinauer.ProcessRunner; import dev.dinauer.ProcessRunner;
import dev.dinauer.monitoring.nodes.NodeStats; import dev.dinauer.monitoring.nodes.MonitoredNode;
import dev.dinauer.service.PodService; import dev.dinauer.service.PodService;
import dev.dinauer.utils.ClientProvider; import dev.dinauer.utils.ClientProvider;
import io.fabric8.kubernetes.api.model.Node; import io.fabric8.kubernetes.api.model.Node;
@ -9,8 +9,7 @@ import io.fabric8.kubernetes.api.model.Pod;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import java.util.ArrayList; import java.util.*;
import java.util.List;
@ApplicationScoped @ApplicationScoped
public class TopNodesService public class TopNodesService
@ -23,13 +22,12 @@ public class TopNodesService
@Inject @Inject
PodService podService; PodService podService;
public List<NodeStats> findAll() public List<MonitoredNode> findAll()
{ {
List<NodeStats> result = new ArrayList<>(); List<MonitoredNode> result = new ArrayList<>();
List<String> stats = runTopNodesCommand(); List<String> stats = runTopNodesCommand();
Map<String, Integer> podsOnNodes = countPods();
for(String nodeName : stats) for(String nodeName : stats)
{ {
String[] parts = nodeName.split("\\s+"); String[] parts = nodeName.split("\\s+");
@ -41,24 +39,22 @@ public class TopNodesService
Integer relativeCpu = extractInteger(parts[2]); Integer relativeCpu = extractInteger(parts[2]);
Integer absoluteMemory = extractMemory(parts[3]); Integer absoluteMemory = extractMemory(parts[3]);
Integer relativeMemory = extractInteger(parts[4]); Integer relativeMemory = extractInteger(parts[4]);
result.add(new NodeStats(node, absoluteCpu, relativeCpu, Integer.parseInt(node.getStatus().getAllocatable().get("cpu").getAmount()) * 1000, absoluteMemory, relativeMemory, extractMemory(node.getStatus().getAllocatable().get("memory").getAmount()), countPods(node.getMetadata().getName()))); result.add(new MonitoredNode(node, absoluteCpu, relativeCpu, Integer.parseInt(node.getStatus().getAllocatable().get("cpu").getAmount()) * 1000, absoluteMemory, relativeMemory, extractMemory(node.getStatus().getAllocatable().get("memory").getAmount()), podsOnNodes.get(node.getMetadata().getName())));
} }
} }
return result; return result;
} }
private int countPods(String nodeName) private Map<String, Integer> countPods()
{ {
List<Pod> pods = podService.findAll(); List<Pod> pods = podService.findAll();
int count = 0; Map<String, Integer> result = new HashMap<>();
for (Pod pod : pods) for (Pod pod : pods)
{ {
if (pod.getSpec().getNodeName().equals(nodeName)) String nodeName = pod.getSpec().getNodeName();
{ result.put(nodeName, Optional.ofNullable(result.get(nodeName)).orElse(0) + 1);
count++;
} }
} return result;
return count;
} }
private List<String> runTopNodesCommand() private List<String> runTopNodesCommand()

View File

@ -27,6 +27,7 @@ public class IndexCollection
@Enumerated(EnumType.STRING) @Enumerated(EnumType.STRING)
private TimeUnit unit; private TimeUnit unit;
@Column(columnDefinition = "text")
private String metrics; private String metrics;
public IndexCollection() public IndexCollection()

View File

@ -1,5 +1,6 @@
package dev.dinauer.monitoring.indexing; package dev.dinauer.monitoring.indexing;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Startup; import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;

View File

@ -0,0 +1,61 @@
package dev.dinauer.monitoring.nodes;
import io.fabric8.kubernetes.api.model.Node;
public class MonitoredNode extends Node
{
public MonitoredNode(Node node, Integer absoluteCpuUsage, Integer relativeCpuUsage, Integer totalCpu, Integer absoluteMemory, Integer relativeMemory, Integer totalMemory, Integer runningPods)
{
super(node.getApiVersion(), node.getKind(), node.getMetadata(), node.getSpec(), node.getStatus());
this.absoluteCpuUsage = absoluteCpuUsage;
this.relativeCpuUsage = relativeCpuUsage;
this.totalCpu = totalCpu;
this.absoluteMemory = absoluteMemory;
this.relativeMemory = relativeMemory;
this.totalMemory = totalMemory;
this.runningPods = runningPods;
}
private final Integer absoluteCpuUsage;
private final Integer relativeCpuUsage;
private final Integer totalCpu;
private final Integer absoluteMemory;
private final Integer relativeMemory;
private final Integer totalMemory;
private final Integer runningPods;
public Integer getAbsoluteCpuUsage()
{
return absoluteCpuUsage;
}
public Integer getRelativeCpuUsage()
{
return relativeCpuUsage;
}
public Integer getTotalCpu()
{
return totalCpu;
}
public Integer getAbsoluteMemory()
{
return absoluteMemory;
}
public Integer getRelativeMemory()
{
return relativeMemory;
}
public Integer getTotalMemory()
{
return totalMemory;
}
public Integer getRunningPods()
{
return runningPods;
}
}

View File

@ -1,15 +1,10 @@
package dev.dinauer.monitoring.nodes; package dev.dinauer.monitoring.nodes;
import dev.dinauer.ProcessRunner;
import dev.dinauer.monitoring.TopNodesService; import dev.dinauer.monitoring.TopNodesService;
import dev.dinauer.monitoring.indexing.IndexingService; import dev.dinauer.monitoring.indexing.IndexingService;
import dev.dinauer.service.NodeService;
import dev.dinauer.utils.ClientProvider;
import io.smallrye.common.annotation.Blocking;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -24,15 +19,15 @@ public class NodeMonitoringService
public void run() throws IOException, InterruptedException public void run() throws IOException, InterruptedException
{ {
List<NodeStats> nodes = topNodesService.findAll(); List<MonitoredNode> nodes = topNodesService.findAll();
for (NodeStats node : nodes) for (MonitoredNode node : nodes)
{ {
Map<String, Long> metrics = Map.ofEntries( Map<String, Long> metrics = Map.ofEntries(
Map.entry("RELATIVE_CPU", (long) node.relativeCpuUsage()), Map.entry("RELATIVE_CPU", (long) node.getRelativeCpuUsage()),
Map.entry("RELATIVE_MEMORY", (long) node.relativeMemory()), Map.entry("RELATIVE_MEMORY", (long) node.getRelativeMemory()),
Map.entry("ABSOLUTE_MEMORY", (long) node.absoluteMemory()), Map.entry("ABSOLUTE_MEMORY", (long) node.getAbsoluteMemory()),
Map.entry("ABSOLUTE_CPU", (long) node.absoluteCpuUsage())); Map.entry("ABSOLUTE_CPU", (long) node.getAbsoluteCpuUsage()));
indexingService.index(String.format("NODE-%s", node), "NODE_METRICS", metrics); indexingService.index(String.format("NODE-%s", node.getMetadata().getUid()), "NODE_METRICS", metrics);
} }
} }
} }

View File

@ -1,7 +0,0 @@
package dev.dinauer.monitoring.nodes;
import io.fabric8.kubernetes.api.model.Node;
public record NodeStats(Node node, Integer absoluteCpuUsage, Integer relativeCpuUsage, Integer totalCpu, Integer absoluteMemory, Integer relativeMemory, Integer totalMemory, Integer runningPods)
{
}

View File

@ -0,0 +1,47 @@
package dev.dinauer.service;
import dev.dinauer.utils.ClientProvider;
import io.fabric8.kubernetes.api.model.ConfigMap;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.jboss.resteasy.reactive.common.NotImplementedYet;
import java.util.List;
import java.util.Optional;
@ApplicationScoped
public class ConfigMapService implements ResourceService<ConfigMap>
{
@Inject
ClientProvider clientProvider;
@Override
public void delete(String name, String namespace)
{
throw new NotImplementedYet();
}
@Override
public ConfigMap findByNameAndNamespace(String name, String namespace)
{
return clientProvider.getClient().configMaps().inNamespace(namespace).withName(name).get();
}
@Override
public List<ConfigMap> findByNamespace(String namespace)
{
return clientProvider.getClient().configMaps().inNamespace(namespace).list().getItems();
}
@Override
public List<ConfigMap> findAll()
{
return clientProvider.getClient().configMaps().inAnyNamespace().list().getItems();
}
@Override
public Optional<ConfigMap> findOptionalByNameAndNamespace(String name, String namespace)
{
throw new NotImplementedYet();
}
}

View File

@ -1,7 +1,7 @@
package dev.dinauer.service; package dev.dinauer.service;
import dev.dinauer.monitoring.TopNodesService; import dev.dinauer.monitoring.TopNodesService;
import dev.dinauer.monitoring.nodes.NodeStats; import dev.dinauer.monitoring.nodes.MonitoredNode;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.jboss.resteasy.reactive.common.NotImplementedYet; import org.jboss.resteasy.reactive.common.NotImplementedYet;
@ -9,12 +9,12 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
@ApplicationScoped @ApplicationScoped
public class NodeService implements ResourceService<NodeStats> public class NodeService implements ResourceService<MonitoredNode>
{ {
@Inject @Inject
TopNodesService topNodesService; TopNodesService topNodesService;
public List<NodeStats> findAll() public List<MonitoredNode> findAll()
{ {
return topNodesService.findAll(); return topNodesService.findAll();
} }
@ -26,19 +26,19 @@ public class NodeService implements ResourceService<NodeStats>
} }
@Override @Override
public NodeStats findByNameAndNamespace(String name, String namespace) public MonitoredNode findByNameAndNamespace(String name, String namespace)
{ {
throw new NotImplementedYet(); throw new NotImplementedYet();
} }
@Override @Override
public List<NodeStats> findByNamespace(String namespace) public List<MonitoredNode> findByNamespace(String namespace)
{ {
return findAll(); return findAll();
} }
@Override @Override
public Optional<NodeStats> findOptionalByNameAndNamespace(String name, String namespace) public Optional<MonitoredNode> findOptionalByNameAndNamespace(String name, String namespace)
{ {
throw new NotImplementedYet(); throw new NotImplementedYet();
} }

View File

@ -45,7 +45,7 @@ public class StartupService
UserEntity initialUser = UserEntity.init(); UserEntity initialUser = UserEntity.init();
initialUser.setUsername(INITIAL_USERNAME); initialUser.setUsername(INITIAL_USERNAME);
initialUser.setPassword(BcryptUtil.bcryptHash(INITIAL_PASSWORD)); initialUser.setPassword(BcryptUtil.bcryptHash(INITIAL_PASSWORD));
initialUser.setRoles(Set.of("admin")); initialUser.setRoles(Set.of("ADMIN"));
return initialUser; return initialUser;
} }
} }