From 2a09bdddd26bd2fa5aefb72bbb99539e36215d3b Mon Sep 17 00:00:00 2001 From: Andreas Dinauer Date: Sun, 28 Dec 2025 20:51:52 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=EF=B8=8F=20Code=20Improvements?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/dev/dinauer/DeploymentResource.java | 15 ++-- src/main/java/dev/dinauer/NodeResource.java | 2 +- src/main/java/dev/dinauer/PodResource.java | 8 +- .../env/EnvironmentVariableService.java | 12 +-- .../inspect/log/DeploymentLogWebsocket.java | 79 ++++++++----------- .../dinauer/inspect/log/PodLogWebsocket.java | 12 ++- .../log/{ => model}/KubernetesLog.java | 2 +- .../inspect/utils/InputStreamWatcher.java | 4 +- .../monitoring/MonitoringJobRunner.java | 4 + .../dinauer/monitoring/TopNodesService.java | 2 +- .../memory/MemoryMonitoringJobRunner.java | 2 +- .../volume/VolumeMonitoringJobRunner.java | 2 +- .../monitoring/volume/VolumeUsageRepo.java | 1 - .../java/dev/dinauer/service/LogParser.java | 13 +-- .../java/dev/dinauer/service/PodService.java | 19 +++-- 15 files changed, 86 insertions(+), 91 deletions(-) rename src/main/java/dev/dinauer/inspect/log/{ => model}/KubernetesLog.java (89%) diff --git a/src/main/java/dev/dinauer/DeploymentResource.java b/src/main/java/dev/dinauer/DeploymentResource.java index c42dbbc..f9aa759 100644 --- a/src/main/java/dev/dinauer/DeploymentResource.java +++ b/src/main/java/dev/dinauer/DeploymentResource.java @@ -1,17 +1,16 @@ package dev.dinauer; -import dev.dinauer.service.DeploymentService; -import dev.dinauer.service.NodeService; -import dev.dinauer.service.PodService; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.quarkus.security.Authenticated; -import jakarta.annotation.security.RolesAllowed; +import java.util.List; + import jakarta.inject.Inject; import jakarta.ws.rs.*; import jakarta.ws.rs.core.MediaType; -import java.util.List; +import io.fabric8.kubernetes.api.model.Pod; +import io.quarkus.security.Authenticated; + +import dev.dinauer.service.DeploymentService; +import dev.dinauer.service.PodService; @Path("/resources/deployments") @Authenticated diff --git a/src/main/java/dev/dinauer/NodeResource.java b/src/main/java/dev/dinauer/NodeResource.java index 44f4a11..a326495 100644 --- a/src/main/java/dev/dinauer/NodeResource.java +++ b/src/main/java/dev/dinauer/NodeResource.java @@ -2,12 +2,12 @@ package dev.dinauer; import java.util.List; -import dev.dinauer.inspect.websocket.ResourceType; import jakarta.annotation.security.RolesAllowed; import jakarta.inject.Inject; import jakarta.ws.rs.*; import jakarta.ws.rs.core.MediaType; +import dev.dinauer.inspect.websocket.ResourceType; import dev.dinauer.service.*; @Path("/resources/nodes") diff --git a/src/main/java/dev/dinauer/PodResource.java b/src/main/java/dev/dinauer/PodResource.java index e741eb5..1c0b5f1 100644 --- a/src/main/java/dev/dinauer/PodResource.java +++ b/src/main/java/dev/dinauer/PodResource.java @@ -2,19 +2,17 @@ package dev.dinauer; import java.util.List; -import dev.dinauer.inspect.env.EnvVar; -import dev.dinauer.inspect.env.EnvironmentVariableService; -import dev.dinauer.utils.ProcessRunner; -import io.fabric8.kubernetes.api.model.Pod; -import io.quarkus.security.Authenticated; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.ws.rs.*; import jakarta.ws.rs.core.MediaType; import io.quarkus.runtime.Startup; +import io.quarkus.security.Authenticated; import io.smallrye.common.annotation.Blocking; +import dev.dinauer.inspect.env.EnvVar; +import dev.dinauer.inspect.env.EnvironmentVariableService; import dev.dinauer.service.PodService; @Path("/pods") diff --git a/src/main/java/dev/dinauer/inspect/env/EnvironmentVariableService.java b/src/main/java/dev/dinauer/inspect/env/EnvironmentVariableService.java index e5980bf..22d5ac9 100644 --- a/src/main/java/dev/dinauer/inspect/env/EnvironmentVariableService.java +++ b/src/main/java/dev/dinauer/inspect/env/EnvironmentVariableService.java @@ -1,13 +1,15 @@ package dev.dinauer.inspect.env; -import dev.dinauer.utils.ProcessRunner; -import io.fabric8.kubernetes.api.model.Pod; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; - import java.util.ArrayList; import java.util.List; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import io.fabric8.kubernetes.api.model.Pod; + +import dev.dinauer.utils.ProcessRunner; + @ApplicationScoped public class EnvironmentVariableService { diff --git a/src/main/java/dev/dinauer/inspect/log/DeploymentLogWebsocket.java b/src/main/java/dev/dinauer/inspect/log/DeploymentLogWebsocket.java index d171480..577ccd5 100644 --- a/src/main/java/dev/dinauer/inspect/log/DeploymentLogWebsocket.java +++ b/src/main/java/dev/dinauer/inspect/log/DeploymentLogWebsocket.java @@ -1,13 +1,10 @@ package dev.dinauer.inspect.log; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import dev.dinauer.service.PodService; -import dev.dinauer.utils.ClientProvider; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.client.dsl.LogWatch; -import io.fabric8.kubernetes.client.dsl.PodResource; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; + import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.websocket.OnClose; @@ -15,26 +12,29 @@ import jakarta.websocket.OnOpen; import jakarta.websocket.Session; import jakarta.websocket.server.PathParam; import jakarta.websocket.server.ServerEndpoint; -import org.eclipse.microprofile.context.ManagedExecutor; + import org.jboss.logging.Logger; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.fabric8.kubernetes.client.dsl.PodResource; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; + +import dev.dinauer.inspect.log.model.KubernetesLog; +import dev.dinauer.service.PodService; +import dev.dinauer.utils.ClientProvider; @ServerEndpoint("/logs/deployments/{namespace}/{name}") @ApplicationScoped public class DeploymentLogWebsocket { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()).disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - private final Map sessions = new HashMap<>(); + private final Map> sessions = new HashMap<>(); @Inject Logger LOG; @@ -42,16 +42,13 @@ public class DeploymentLogWebsocket @Inject ClientProvider clientProvider; - @Inject - ManagedExecutor executor; - @Inject PodService podService; @OnOpen public void onOpen(Session session, @PathParam("namespace") String namespace, @PathParam("name") String name) { - executor.submit(() -> { + Uni.createFrom().voidItem().invoke(() -> { List pods = podService.findByDeployment(namespace, name); List existingLogs = new ArrayList<>(); for (Pod pod : pods) @@ -61,37 +58,27 @@ public class DeploymentLogWebsocket } send(session, existingLogs.stream().sorted(KubernetesLog::orderByTimestamp).toList()); + sessions.put(session, new LinkedList<>()); for (Pod pod : pods) { - CompletableFuture.runAsync(() -> { - LogWatch watch = clientProvider.getClient().pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).usingTimestamps().tailingLines(0).watchLog(); - sessions.put(session, watch); - - try (BufferedReader reader = new BufferedReader(new InputStreamReader(watch.getOutput()))) - { - String line; - while ((line = reader.readLine()) != null && !Thread.currentThread().isInterrupted()) - { - send(session, toLog(List.of(line), pod.getMetadata().getName())); - } - LOG.info("Ended"); - } - catch (Exception e) - { - LOG.errorf("Error reading output of log watch: %s", e.getMessage()); - } - }); + PodResource resource = clientProvider.getClient().pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()); + sessions.get(session).add(podService.watchLogs(resource, (log) -> { + send(session, log); + })); } - }); + }).runSubscriptionOn(Infrastructure.getDefaultWorkerPool()).subscribe().with(result -> {}, error -> {}); } @OnClose public void onClose(Session session) throws IOException { - LogWatch watch = sessions.remove(session); - if (watch != null) + List watches = sessions.remove(session); + if (watches != null) { - watch.close(); + for (LogWatch watch : watches) + { + watch.close(); + } } session.close(); } diff --git a/src/main/java/dev/dinauer/inspect/log/PodLogWebsocket.java b/src/main/java/dev/dinauer/inspect/log/PodLogWebsocket.java index 2097747..f161966 100644 --- a/src/main/java/dev/dinauer/inspect/log/PodLogWebsocket.java +++ b/src/main/java/dev/dinauer/inspect/log/PodLogWebsocket.java @@ -1,17 +1,10 @@ package dev.dinauer.inspect.log; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; -import dev.dinauer.inspect.utils.InputStreamWatcher; -import dev.dinauer.service.PodService; -import io.fabric8.kubernetes.client.dsl.PodResource; -import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.infrastructure.Infrastructure; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.websocket.OnClose; @@ -27,7 +20,12 @@ import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.fabric8.kubernetes.client.dsl.PodResource; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import dev.dinauer.inspect.log.model.KubernetesLog; +import dev.dinauer.service.PodService; import dev.dinauer.utils.ClientProvider; @ServerEndpoint("/logs/pods/{namespace}/{name}") diff --git a/src/main/java/dev/dinauer/inspect/log/KubernetesLog.java b/src/main/java/dev/dinauer/inspect/log/model/KubernetesLog.java similarity index 89% rename from src/main/java/dev/dinauer/inspect/log/KubernetesLog.java rename to src/main/java/dev/dinauer/inspect/log/model/KubernetesLog.java index d2f0f93..58a2ba4 100644 --- a/src/main/java/dev/dinauer/inspect/log/KubernetesLog.java +++ b/src/main/java/dev/dinauer/inspect/log/model/KubernetesLog.java @@ -1,4 +1,4 @@ -package dev.dinauer.inspect.log; +package dev.dinauer.inspect.log.model; import java.time.LocalDateTime; import java.time.ZoneOffset; diff --git a/src/main/java/dev/dinauer/inspect/utils/InputStreamWatcher.java b/src/main/java/dev/dinauer/inspect/utils/InputStreamWatcher.java index 4fdbc5b..223c0fc 100644 --- a/src/main/java/dev/dinauer/inspect/utils/InputStreamWatcher.java +++ b/src/main/java/dev/dinauer/inspect/utils/InputStreamWatcher.java @@ -1,12 +1,12 @@ package dev.dinauer.inspect.utils; -import org.jboss.logging.Logger; - import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.function.Consumer; +import org.jboss.logging.Logger; + public class InputStreamWatcher { private static final Logger LOG = Logger.getLogger(InputStreamWatcher.class); diff --git a/src/main/java/dev/dinauer/monitoring/MonitoringJobRunner.java b/src/main/java/dev/dinauer/monitoring/MonitoringJobRunner.java index 1be029c..89f506c 100644 --- a/src/main/java/dev/dinauer/monitoring/MonitoringJobRunner.java +++ b/src/main/java/dev/dinauer/monitoring/MonitoringJobRunner.java @@ -48,6 +48,10 @@ public class MonitoringJobRunner @PostConstruct public void run() { + if (true) + { + return; + } List configs = monitoringRepo.listAll(); for (MonitoringConfig config : configs) { diff --git a/src/main/java/dev/dinauer/monitoring/TopNodesService.java b/src/main/java/dev/dinauer/monitoring/TopNodesService.java index c978335..54f0855 100644 --- a/src/main/java/dev/dinauer/monitoring/TopNodesService.java +++ b/src/main/java/dev/dinauer/monitoring/TopNodesService.java @@ -8,13 +8,13 @@ import jakarta.inject.Inject; import io.fabric8.kubernetes.api.model.Node; import io.fabric8.kubernetes.api.model.Pod; -import dev.dinauer.utils.ProcessRunner; import dev.dinauer.monitoring.nodes.MonitoredNode; import dev.dinauer.monitoring.nodes.NodeMetrics; import dev.dinauer.monitoring.nodes.client.NodeDiskMetrics; import dev.dinauer.monitoring.nodes.client.NodeDiskService; import dev.dinauer.service.PodService; import dev.dinauer.utils.ClientProvider; +import dev.dinauer.utils.ProcessRunner; @ApplicationScoped public class TopNodesService diff --git a/src/main/java/dev/dinauer/monitoring/memory/MemoryMonitoringJobRunner.java b/src/main/java/dev/dinauer/monitoring/memory/MemoryMonitoringJobRunner.java index 60b2639..4dc07ac 100644 --- a/src/main/java/dev/dinauer/monitoring/memory/MemoryMonitoringJobRunner.java +++ b/src/main/java/dev/dinauer/monitoring/memory/MemoryMonitoringJobRunner.java @@ -9,11 +9,11 @@ import jakarta.inject.Inject; import io.fabric8.kubernetes.api.model.Pod; -import dev.dinauer.utils.ProcessRunner; import dev.dinauer.monitoring.MonitoringService; import dev.dinauer.monitoring.entity.MonitoringConfig; import dev.dinauer.monitoring.entity.MonitoringType; import dev.dinauer.monitoring.indexing.BigBucketService; +import dev.dinauer.utils.ProcessRunner; @ApplicationScoped public class MemoryMonitoringJobRunner diff --git a/src/main/java/dev/dinauer/monitoring/volume/VolumeMonitoringJobRunner.java b/src/main/java/dev/dinauer/monitoring/volume/VolumeMonitoringJobRunner.java index 549d839..855c20b 100644 --- a/src/main/java/dev/dinauer/monitoring/volume/VolumeMonitoringJobRunner.java +++ b/src/main/java/dev/dinauer/monitoring/volume/VolumeMonitoringJobRunner.java @@ -15,11 +15,11 @@ import org.jboss.logging.Logger; import io.fabric8.kubernetes.api.model.Pod; -import dev.dinauer.utils.ProcessRunner; import dev.dinauer.monitoring.MonitoringService; import dev.dinauer.monitoring.entity.MonitoringConfig; import dev.dinauer.monitoring.entity.MonitoringType; import dev.dinauer.monitoring.indexing.BigBucketService; +import dev.dinauer.utils.ProcessRunner; @ApplicationScoped public class VolumeMonitoringJobRunner diff --git a/src/main/java/dev/dinauer/monitoring/volume/VolumeUsageRepo.java b/src/main/java/dev/dinauer/monitoring/volume/VolumeUsageRepo.java index 9b4c50d..30976bc 100644 --- a/src/main/java/dev/dinauer/monitoring/volume/VolumeUsageRepo.java +++ b/src/main/java/dev/dinauer/monitoring/volume/VolumeUsageRepo.java @@ -1,7 +1,6 @@ package dev.dinauer.monitoring.volume; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; diff --git a/src/main/java/dev/dinauer/service/LogParser.java b/src/main/java/dev/dinauer/service/LogParser.java index 4130145..6890ee2 100644 --- a/src/main/java/dev/dinauer/service/LogParser.java +++ b/src/main/java/dev/dinauer/service/LogParser.java @@ -1,16 +1,17 @@ package dev.dinauer.service; -import dev.dinauer.inspect.log.KubernetesLog; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import org.jboss.logging.Logger; -import org.slf4j.LoggerFactory; - import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.jboss.logging.Logger; + +import dev.dinauer.inspect.log.model.KubernetesLog; + @ApplicationScoped public class LogParser { diff --git a/src/main/java/dev/dinauer/service/PodService.java b/src/main/java/dev/dinauer/service/PodService.java index 6651ba9..837973e 100644 --- a/src/main/java/dev/dinauer/service/PodService.java +++ b/src/main/java/dev/dinauer/service/PodService.java @@ -6,20 +6,21 @@ import java.util.Map; import java.util.Optional; import java.util.function.Consumer; -import dev.dinauer.inspect.log.KubernetesLog; -import dev.dinauer.inspect.utils.InputStreamWatcher; -import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.client.dsl.LogWatch; -import io.fabric8.kubernetes.client.dsl.PodResource; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.StatefulSet; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.dsl.AppsAPIGroupDSL; +import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.fabric8.kubernetes.client.dsl.PodResource; +import io.smallrye.mutiny.Uni; +import dev.dinauer.inspect.log.model.KubernetesLog; +import dev.dinauer.inspect.utils.InputStreamWatcher; import dev.dinauer.utils.ClientProvider; @ApplicationScoped @@ -139,7 +140,13 @@ public class PodService implements ResourceService public LogWatch watchLogs(PodResource resource, Consumer> consume) { LogWatch watch = resource.usingTimestamps().tailingLines(0).watchLog(); - InputStreamWatcher.watch(watch.getOutput(), line -> consume.accept(logParser.toLog(line, resource.get().getMetadata().getName()))); + Uni.createFrom() + .voidItem() + .invoke(() -> { + InputStreamWatcher.watch(watch.getOutput(), line -> consume.accept(logParser.toLog(line, resource.get().getMetadata().getName()))); + }) + .subscribe() + .with(result -> {}, error -> {}); return watch; } }