diff --git a/pom.xml b/pom.xml index 4013765..43911ef 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,10 @@ io.quarkus quarkus-scheduler + + io.quarkus + quarkus-websockets + diff --git a/src/main/java/dev/dinauer/LogResource.java b/src/main/java/dev/dinauer/LogResource.java index ce82bd0..49b492e 100644 --- a/src/main/java/dev/dinauer/LogResource.java +++ b/src/main/java/dev/dinauer/LogResource.java @@ -51,7 +51,7 @@ public class LogResource public List getLogs(Pod pod, LocalDateTime from) { - String command = String.format("kubectl --kubeconfig=%s logs %s -n %s --timestamps", clientProvider.pathToKubeconfig(), pod.getMetadata().getName(), pod.getMetadata().getNamespace()); + String command = String.format("kubectl --kubeconfig=%s logs %s -n %s --timestamps --tail=1000", clientProvider.pathToKubeconfig(), pod.getMetadata().getName(), pod.getMetadata().getNamespace()); List result = new ArrayList<>(); List logs = processRunner.runToLines(command); for (String log : logs) diff --git a/src/main/java/dev/dinauer/LogWebsocket.java b/src/main/java/dev/dinauer/LogWebsocket.java new file mode 100644 index 0000000..d7c9199 --- /dev/null +++ b/src/main/java/dev/dinauer/LogWebsocket.java @@ -0,0 +1,121 @@ +package dev.dinauer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import dev.dinauer.utils.ClientProvider; +import io.fabric8.kubernetes.client.dsl.LogWatch; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.websocket.OnClose; +import jakarta.websocket.OnOpen; +import jakarta.websocket.Session; +import jakarta.websocket.server.PathParam; +import jakarta.websocket.server.ServerEndpoint; +import org.eclipse.microprofile.context.ManagedExecutor; +import org.jboss.logging.Logger; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; + +@ServerEndpoint("/logs/{namespace}/{name}") +@ApplicationScoped +public class LogWebsocket +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()).disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + private final Map sessions = new HashMap<>(); + private final Map readers = new HashMap<>(); + + @Inject + Logger LOG; + + @Inject + ClientProvider clientProvider; + + @Inject + ManagedExecutor executor; + + @OnOpen + public void onOpen(Session session, @PathParam("namespace") String namespace, @PathParam("name") String name) + { + executor.runAsync(() -> { + List existingLogs = Arrays.stream(clientProvider.getClient().pods().inNamespace(namespace).withName(name).usingTimestamps().tailingLines(200).getLog().split("\n")).toList(); + send(session, toLog(existingLogs)); + + LogWatch watch = clientProvider.getClient().pods().inNamespace(namespace).withName(name).usingTimestamps().tailingLines(0).watchLog(); + sessions.put(session, watch); + + BufferedReader reader = new BufferedReader(new InputStreamReader(watch.getOutput())); + readers.put(session, reader); + + try + { + String line; + while ((line = reader.readLine()) != null) + { + send(session, toLog(List.of(line))); + } + } + catch (Exception e) + { + LOG.errorf("Error reading output of log watch: %s", e.getMessage()); + } + }); + } + + @OnClose + public void onClose(Session session) throws IOException + { + LogWatch watch = sessions.remove(session); + if (watch != null) + { + watch.close(); + } + BufferedReader reader = readers.remove(session); + if (reader != null) + { + reader.close(); + } + session.close(); + } + + private void send(Session session, List logs) + { + try + { + session.getAsyncRemote().sendText(OBJECT_MAPPER.writeValueAsString(logs)); + } + catch (Exception e) + { + LOG.errorf("Error sending logs to frontend via websocket: %s", e.getMessage()); + } + } + + private List toLog(List logs) + { + List result = new ArrayList<>(); + for (String log : logs) + { + int indexFirstSpace = log.indexOf(" "); + if (indexFirstSpace != -1) + { + String timestampRaw = log.substring(0, indexFirstSpace); + String message = log.substring(indexFirstSpace).trim(); + try + { + result.add(new KubernetesLog(LocalDateTime.parse(timestampRaw, DateTimeFormatter.ISO_DATE_TIME), message)); + } + catch (Exception e) + { + LOG.errorf("Error parsing log: %s", e.getMessage()); + } + } + } + return result; + } +} diff --git a/src/main/java/dev/dinauer/PodResource.java b/src/main/java/dev/dinauer/PodResource.java index fa733d5..3c192dd 100644 --- a/src/main/java/dev/dinauer/PodResource.java +++ b/src/main/java/dev/dinauer/PodResource.java @@ -55,4 +55,11 @@ public class PodResource } return result; } + + @DELETE + @Path("/{namespace}/{name}") + public void delete(@PathParam("namespace") String namespace, @PathParam("name") String name) + { + podService.delete(name, namespace); + } } diff --git a/src/main/java/dev/dinauer/ResourceEvent.java b/src/main/java/dev/dinauer/ResourceEvent.java new file mode 100644 index 0000000..13c250b --- /dev/null +++ b/src/main/java/dev/dinauer/ResourceEvent.java @@ -0,0 +1,9 @@ +package dev.dinauer; + +import dev.dinauer.inspect.websocket.EventType; + +import java.util.List; + +public record ResourceEvent(EventType type, List resources) +{ +} diff --git a/src/main/java/dev/dinauer/ResourceResource.java b/src/main/java/dev/dinauer/ResourceResource.java index a79e0a9..8e229a0 100644 --- a/src/main/java/dev/dinauer/ResourceResource.java +++ b/src/main/java/dev/dinauer/ResourceResource.java @@ -1,6 +1,7 @@ package dev.dinauer; import dev.dinauer.service.*; +import io.fabric8.kubernetes.api.model.ConfigMap; import jakarta.annotation.security.RolesAllowed; import jakarta.inject.Inject; import jakarta.ws.rs.*; @@ -40,6 +41,9 @@ public class ResourceResource @Inject SecretService secretService; + @Inject + ConfigMapService configMapService; + @GET public List get(@PathParam("resource") String resourceType) { @@ -128,6 +132,10 @@ public class ResourceResource { return secretService; } + case ResourceType.CONFIG_MAP -> + { + return configMapService; + } default -> { LOG.errorf("Invalid resource type %s.", resourceType); diff --git a/src/main/java/dev/dinauer/ResourceType.java b/src/main/java/dev/dinauer/ResourceType.java index a3e3f3d..0896e70 100644 --- a/src/main/java/dev/dinauer/ResourceType.java +++ b/src/main/java/dev/dinauer/ResourceType.java @@ -10,4 +10,5 @@ public class ResourceType public static final String CUSTOM_RESOURCE_DEFINITION = "custom-resource-definitions"; public static final String NODE = "nodes"; public static final String SECRET = "secrets"; -} + public static final String CONFIG_MAP = "config-maps"; +} \ No newline at end of file diff --git a/src/main/java/dev/dinauer/inspect/websocket/EventType.java b/src/main/java/dev/dinauer/inspect/websocket/EventType.java new file mode 100644 index 0000000..17f2b97 --- /dev/null +++ b/src/main/java/dev/dinauer/inspect/websocket/EventType.java @@ -0,0 +1,6 @@ +package dev.dinauer.inspect.websocket; + +public enum EventType +{ + INIT, ADDED, MODIFIED, DELETED, ERROR, BOOKMARK +} diff --git a/src/main/java/dev/dinauer/inspect/websocket/ResourceWebsocket.java b/src/main/java/dev/dinauer/inspect/websocket/ResourceWebsocket.java new file mode 100644 index 0000000..6632914 --- /dev/null +++ b/src/main/java/dev/dinauer/inspect/websocket/ResourceWebsocket.java @@ -0,0 +1,122 @@ +package dev.dinauer.inspect.websocket; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.dinauer.ResourceEvent; +import dev.dinauer.ResourceType; +import dev.dinauer.utils.ClientProvider; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.websocket.OnClose; +import jakarta.websocket.OnOpen; +import jakarta.websocket.Session; +import jakarta.websocket.server.PathParam; +import jakarta.websocket.server.ServerEndpoint; +import org.eclipse.microprofile.context.ManagedExecutor; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@ServerEndpoint("/watch/{resource-type}/{namespace}") +@ApplicationScoped +public class ResourceWebsocket +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Inject + ClientProvider clientProvider; + + @Inject + ManagedExecutor executor; + + private final Map sessions = new HashMap<>(); + + @OnOpen + public void onOpen(Session session, @PathParam("resource-type") String resourceType, @PathParam("namespace") String namespace) + { + executor.runAsync(() -> + { + if (ResourceType.POD.equals(resourceType)) + { + String version = clientProvider.getClient().pods().inAnyNamespace().list().getMetadata().getResourceVersion(); + if (isGlobal(namespace)) + { + send(session, EventType.INIT, clientProvider.getClient().pods().inAnyNamespace().list().getItems()); + sessions.put(session, clientProvider.getClient().pods().inAnyNamespace().withResourceVersion(version).watch(getWatcher(session))); + } + else + { + sessions.put(session, clientProvider.getClient().pods().inNamespace(namespace).watch(getWatcher(session))); + } + } + if (ResourceType.CONFIG_MAP.equals(resourceType)) + { + if (isGlobal(namespace)) + { + sessions.put(session, clientProvider.getClient().configMaps().inAnyNamespace().watch(getWatcher(session))); + } + else + { + sessions.put(session, clientProvider.getClient().configMaps().inNamespace(namespace).watch(getWatcher(session))); + } + } + }); + } + + @OnClose + public void onClose(Session session) + { + Watch watch = sessions.remove(session); + if (watch != null) + { + watch.close(); + } + } + + private Watcher getWatcher(Session session) + { + return new Watcher() + { + @Override + public void eventReceived(Action action, T t) + { + send(session, EventType.valueOf(action.name()), List.of(t)); + } + + @Override + public void onClose(WatcherException e) + { + try + { + session.close(); + } + catch (IOException ex) + { + throw new RuntimeException("Cannot close session", ex); + } + } + }; + } + + private boolean isGlobal(String namespace) + { + return namespace == null || namespace.isBlank() || namespace.equals("_all"); + } + + private void send(Session session, EventType type, List objects) + { + try + { + session.getAsyncRemote().sendText(OBJECT_MAPPER.writeValueAsString(new ResourceEvent(type, objects))); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/dev/dinauer/monitoring/TopNodesService.java b/src/main/java/dev/dinauer/monitoring/TopNodesService.java index d59c924..c98c8b4 100644 --- a/src/main/java/dev/dinauer/monitoring/TopNodesService.java +++ b/src/main/java/dev/dinauer/monitoring/TopNodesService.java @@ -1,7 +1,7 @@ package dev.dinauer.monitoring; import dev.dinauer.ProcessRunner; -import dev.dinauer.monitoring.nodes.NodeStats; +import dev.dinauer.monitoring.nodes.MonitoredNode; import dev.dinauer.service.PodService; import dev.dinauer.utils.ClientProvider; import io.fabric8.kubernetes.api.model.Node; @@ -9,8 +9,7 @@ import io.fabric8.kubernetes.api.model.Pod; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import java.util.ArrayList; -import java.util.List; +import java.util.*; @ApplicationScoped public class TopNodesService @@ -23,13 +22,12 @@ public class TopNodesService @Inject PodService podService; - public List findAll() + public List findAll() { - List result = new ArrayList<>(); + List result = new ArrayList<>(); List stats = runTopNodesCommand(); - - + Map podsOnNodes = countPods(); for(String nodeName : stats) { String[] parts = nodeName.split("\\s+"); @@ -41,24 +39,22 @@ public class TopNodesService Integer relativeCpu = extractInteger(parts[2]); Integer absoluteMemory = extractMemory(parts[3]); Integer relativeMemory = extractInteger(parts[4]); - result.add(new NodeStats(node, absoluteCpu, relativeCpu, Integer.parseInt(node.getStatus().getAllocatable().get("cpu").getAmount()) * 1000, absoluteMemory, relativeMemory, extractMemory(node.getStatus().getAllocatable().get("memory").getAmount()), countPods(node.getMetadata().getName()))); + result.add(new MonitoredNode(node, absoluteCpu, relativeCpu, Integer.parseInt(node.getStatus().getAllocatable().get("cpu").getAmount()) * 1000, absoluteMemory, relativeMemory, extractMemory(node.getStatus().getAllocatable().get("memory").getAmount()), podsOnNodes.get(node.getMetadata().getName()))); } } return result; } - private int countPods(String nodeName) + private Map countPods() { List pods = podService.findAll(); - int count = 0; + Map result = new HashMap<>(); for (Pod pod : pods) { - if (pod.getSpec().getNodeName().equals(nodeName)) - { - count++; - } + String nodeName = pod.getSpec().getNodeName(); + result.put(nodeName, Optional.ofNullable(result.get(nodeName)).orElse(0) + 1); } - return count; + return result; } private List runTopNodesCommand() diff --git a/src/main/java/dev/dinauer/monitoring/indexing/IndexCollection.java b/src/main/java/dev/dinauer/monitoring/indexing/IndexCollection.java index 34dc075..ea4426c 100644 --- a/src/main/java/dev/dinauer/monitoring/indexing/IndexCollection.java +++ b/src/main/java/dev/dinauer/monitoring/indexing/IndexCollection.java @@ -27,6 +27,7 @@ public class IndexCollection @Enumerated(EnumType.STRING) private TimeUnit unit; + @Column(columnDefinition = "text") private String metrics; public IndexCollection() diff --git a/src/main/java/dev/dinauer/monitoring/indexing/IndexingService.java b/src/main/java/dev/dinauer/monitoring/indexing/IndexingService.java index 4b46dce..6246ea2 100644 --- a/src/main/java/dev/dinauer/monitoring/indexing/IndexingService.java +++ b/src/main/java/dev/dinauer/monitoring/indexing/IndexingService.java @@ -1,5 +1,6 @@ package dev.dinauer.monitoring.indexing; +import io.quarkus.logging.Log; import io.quarkus.runtime.Startup; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; diff --git a/src/main/java/dev/dinauer/monitoring/nodes/MonitoredNode.java b/src/main/java/dev/dinauer/monitoring/nodes/MonitoredNode.java new file mode 100644 index 0000000..2a58c16 --- /dev/null +++ b/src/main/java/dev/dinauer/monitoring/nodes/MonitoredNode.java @@ -0,0 +1,61 @@ +package dev.dinauer.monitoring.nodes; + +import io.fabric8.kubernetes.api.model.Node; + +public class MonitoredNode extends Node +{ + public MonitoredNode(Node node, Integer absoluteCpuUsage, Integer relativeCpuUsage, Integer totalCpu, Integer absoluteMemory, Integer relativeMemory, Integer totalMemory, Integer runningPods) + { + super(node.getApiVersion(), node.getKind(), node.getMetadata(), node.getSpec(), node.getStatus()); + this.absoluteCpuUsage = absoluteCpuUsage; + this.relativeCpuUsage = relativeCpuUsage; + this.totalCpu = totalCpu; + this.absoluteMemory = absoluteMemory; + this.relativeMemory = relativeMemory; + this.totalMemory = totalMemory; + this.runningPods = runningPods; + } + + private final Integer absoluteCpuUsage; + private final Integer relativeCpuUsage; + private final Integer totalCpu; + private final Integer absoluteMemory; + private final Integer relativeMemory; + private final Integer totalMemory; + private final Integer runningPods; + + public Integer getAbsoluteCpuUsage() + { + return absoluteCpuUsage; + } + + public Integer getRelativeCpuUsage() + { + return relativeCpuUsage; + } + + public Integer getTotalCpu() + { + return totalCpu; + } + + public Integer getAbsoluteMemory() + { + return absoluteMemory; + } + + public Integer getRelativeMemory() + { + return relativeMemory; + } + + public Integer getTotalMemory() + { + return totalMemory; + } + + public Integer getRunningPods() + { + return runningPods; + } +} \ No newline at end of file diff --git a/src/main/java/dev/dinauer/monitoring/nodes/NodeMonitoringService.java b/src/main/java/dev/dinauer/monitoring/nodes/NodeMonitoringService.java index 426c774..92469f7 100644 --- a/src/main/java/dev/dinauer/monitoring/nodes/NodeMonitoringService.java +++ b/src/main/java/dev/dinauer/monitoring/nodes/NodeMonitoringService.java @@ -1,15 +1,10 @@ package dev.dinauer.monitoring.nodes; -import dev.dinauer.ProcessRunner; import dev.dinauer.monitoring.TopNodesService; import dev.dinauer.monitoring.indexing.IndexingService; -import dev.dinauer.service.NodeService; -import dev.dinauer.utils.ClientProvider; -import io.smallrye.common.annotation.Blocking; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -24,15 +19,15 @@ public class NodeMonitoringService public void run() throws IOException, InterruptedException { - List nodes = topNodesService.findAll(); - for (NodeStats node : nodes) + List nodes = topNodesService.findAll(); + for (MonitoredNode node : nodes) { Map metrics = Map.ofEntries( - Map.entry("RELATIVE_CPU", (long) node.relativeCpuUsage()), - Map.entry("RELATIVE_MEMORY", (long) node.relativeMemory()), - Map.entry("ABSOLUTE_MEMORY", (long) node.absoluteMemory()), - Map.entry("ABSOLUTE_CPU", (long) node.absoluteCpuUsage())); - indexingService.index(String.format("NODE-%s", node), "NODE_METRICS", metrics); + Map.entry("RELATIVE_CPU", (long) node.getRelativeCpuUsage()), + Map.entry("RELATIVE_MEMORY", (long) node.getRelativeMemory()), + Map.entry("ABSOLUTE_MEMORY", (long) node.getAbsoluteMemory()), + Map.entry("ABSOLUTE_CPU", (long) node.getAbsoluteCpuUsage())); + indexingService.index(String.format("NODE-%s", node.getMetadata().getUid()), "NODE_METRICS", metrics); } } } diff --git a/src/main/java/dev/dinauer/monitoring/nodes/NodeStats.java b/src/main/java/dev/dinauer/monitoring/nodes/NodeStats.java deleted file mode 100644 index d494d96..0000000 --- a/src/main/java/dev/dinauer/monitoring/nodes/NodeStats.java +++ /dev/null @@ -1,7 +0,0 @@ -package dev.dinauer.monitoring.nodes; - -import io.fabric8.kubernetes.api.model.Node; - -public record NodeStats(Node node, Integer absoluteCpuUsage, Integer relativeCpuUsage, Integer totalCpu, Integer absoluteMemory, Integer relativeMemory, Integer totalMemory, Integer runningPods) -{ -} \ No newline at end of file diff --git a/src/main/java/dev/dinauer/service/ConfigMapService.java b/src/main/java/dev/dinauer/service/ConfigMapService.java new file mode 100644 index 0000000..6fca852 --- /dev/null +++ b/src/main/java/dev/dinauer/service/ConfigMapService.java @@ -0,0 +1,47 @@ +package dev.dinauer.service; + +import dev.dinauer.utils.ClientProvider; +import io.fabric8.kubernetes.api.model.ConfigMap; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.jboss.resteasy.reactive.common.NotImplementedYet; + +import java.util.List; +import java.util.Optional; + +@ApplicationScoped +public class ConfigMapService implements ResourceService +{ + @Inject + ClientProvider clientProvider; + + @Override + public void delete(String name, String namespace) + { + throw new NotImplementedYet(); + } + + @Override + public ConfigMap findByNameAndNamespace(String name, String namespace) + { + return clientProvider.getClient().configMaps().inNamespace(namespace).withName(name).get(); + } + + @Override + public List findByNamespace(String namespace) + { + return clientProvider.getClient().configMaps().inNamespace(namespace).list().getItems(); + } + + @Override + public List findAll() + { + return clientProvider.getClient().configMaps().inAnyNamespace().list().getItems(); + } + + @Override + public Optional findOptionalByNameAndNamespace(String name, String namespace) + { + throw new NotImplementedYet(); + } +} diff --git a/src/main/java/dev/dinauer/service/NodeService.java b/src/main/java/dev/dinauer/service/NodeService.java index f5c6b7d..ad881a6 100644 --- a/src/main/java/dev/dinauer/service/NodeService.java +++ b/src/main/java/dev/dinauer/service/NodeService.java @@ -1,7 +1,7 @@ package dev.dinauer.service; import dev.dinauer.monitoring.TopNodesService; -import dev.dinauer.monitoring.nodes.NodeStats; +import dev.dinauer.monitoring.nodes.MonitoredNode; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.jboss.resteasy.reactive.common.NotImplementedYet; @@ -9,12 +9,12 @@ import java.util.List; import java.util.Optional; @ApplicationScoped -public class NodeService implements ResourceService +public class NodeService implements ResourceService { @Inject TopNodesService topNodesService; - public List findAll() + public List findAll() { return topNodesService.findAll(); } @@ -26,19 +26,19 @@ public class NodeService implements ResourceService } @Override - public NodeStats findByNameAndNamespace(String name, String namespace) + public MonitoredNode findByNameAndNamespace(String name, String namespace) { throw new NotImplementedYet(); } @Override - public List findByNamespace(String namespace) + public List findByNamespace(String namespace) { return findAll(); } @Override - public Optional findOptionalByNameAndNamespace(String name, String namespace) + public Optional findOptionalByNameAndNamespace(String name, String namespace) { throw new NotImplementedYet(); } diff --git a/src/main/java/dev/dinauer/utils/StartupService.java b/src/main/java/dev/dinauer/utils/StartupService.java index 578cc79..2d764a1 100644 --- a/src/main/java/dev/dinauer/utils/StartupService.java +++ b/src/main/java/dev/dinauer/utils/StartupService.java @@ -45,7 +45,7 @@ public class StartupService UserEntity initialUser = UserEntity.init(); initialUser.setUsername(INITIAL_USERNAME); initialUser.setPassword(BcryptUtil.bcryptHash(INITIAL_PASSWORD)); - initialUser.setRoles(Set.of("admin")); + initialUser.setRoles(Set.of("ADMIN")); return initialUser; } }