🚑️ Ensure proper reader closing in websocket

This commit is contained in:
andreas.dinauer 2025-11-10 20:39:44 +01:00
parent 5483847304
commit 62d939e267
24 changed files with 286 additions and 120 deletions

View File

@ -2,6 +2,7 @@ package dev.dinauer;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import java.io.BufferedReader; import java.io.BufferedReader;
@ -17,6 +18,9 @@ public class ProcessRunner
@Inject @Inject
Logger LOG; Logger LOG;
@Inject
ManagedExecutor executor;
public String runToText(String command) public String runToText(String command)
{ {
return String.join("\n", runToLines(command)); return String.join("\n", runToLines(command));
@ -27,42 +31,51 @@ public class ProcessRunner
LOG.infof("Running command: %s", command); LOG.infof("Running command: %s", command);
ProcessBuilder pb = new ProcessBuilder(command.split("\\s+")); ProcessBuilder pb = new ProcessBuilder(command.split("\\s+"));
pb.redirectErrorStream(true); pb.redirectErrorStream(true);
try try
{ {
Process p = pb.start(); return runAndCollectLogs(pb);
return runAndCollectLogs(p);
} }
catch (IOException | InterruptedException e) catch (InterruptedException | IOException e)
{
throw new RuntimeException(String.format("Failed to run command '%s': %s", command, e.getMessage()));
}
}
private List<String> runAndCollectLogs(Process p) throws InterruptedException
{
List<String> text = new ArrayList<>();
try(BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream())))
{
String line;
while((line = br.readLine()) != null)
{
text.add(line);
}
}
catch (IOException e)
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
boolean endedInTime = p.waitFor(10, TimeUnit.SECONDS); }
private List<String> runAndCollectLogs(ProcessBuilder processBuilder) throws InterruptedException, IOException
{
List<String> text = new ArrayList<>();
Process process = processBuilder.start();
executor.submit(() -> {
try(BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream())))
{
String line;
while((line = br.readLine()) != null)
{
text.add(line);
}
}
catch (IOException e)
{
throw new RuntimeException(e);
}
});
boolean endedInTime = process.waitFor(10, TimeUnit.SECONDS);
int exitCode = process.exitValue();
process.destroyForcibly();
if (endedInTime) if (endedInTime)
{ {
int exitCode = p.exitValue();
if(exitCode == 0) if(exitCode == 0)
{ {
return text; return text;
} }
else
{
throw new RuntimeException("Process finished with code " + exitCode);
}
}
else
{
throw new RuntimeException("Process exceeded wait time of 10 sec.");
} }
throw new InterruptedException();
} }
} }

View File

@ -0,0 +1,50 @@
package dev.dinauer.inspect;
import io.smallrye.jwt.auth.principal.JWTParser;
import io.smallrye.jwt.auth.principal.ParseException;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.jwt.JsonWebToken;
import java.util.Optional;
@ApplicationScoped
public class TokenService
{
@Inject
JWTParser parser;
public boolean validateTokenByQueryString(String queryString)
{
JsonWebToken token = getToken(queryString);
Optional<String> purpose = token.claim("purpose");
if (purpose.isPresent())
{
return purpose.get().equals("ws:connect");
}
return false;
}
private JsonWebToken getToken(String query)
{
for (String param : query.split("&"))
{
String[] sections = param.split("=", 2);
if (sections.length == 2)
{
if (sections[0].equals("token"))
{
try
{
return parser.parse(sections[1]);
}
catch (ParseException e)
{
throw new RuntimeException(e);
}
}
}
}
throw new RuntimeException("Token cannot be null.");
}
}

View File

@ -21,6 +21,7 @@ import java.io.InputStreamReader;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.Future;
@ServerEndpoint("/logs/{namespace}/{name}") @ServerEndpoint("/logs/{namespace}/{name}")
@ApplicationScoped @ApplicationScoped
@ -28,7 +29,6 @@ public class LogWebsocket
{ {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()).disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()).disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
private final Map<Session, LogWatch> sessions = new HashMap<>(); private final Map<Session, LogWatch> sessions = new HashMap<>();
private final Map<Session, BufferedReader> readers = new HashMap<>();
@Inject @Inject
Logger LOG; Logger LOG;
@ -42,23 +42,21 @@ public class LogWebsocket
@OnOpen @OnOpen
public void onOpen(Session session, @PathParam("namespace") String namespace, @PathParam("name") String name) public void onOpen(Session session, @PathParam("namespace") String namespace, @PathParam("name") String name)
{ {
executor.runAsync(() -> { executor.submit(() -> {
List<String> existingLogs = Arrays.stream(clientProvider.getClient().pods().inNamespace(namespace).withName(name).usingTimestamps().tailingLines(200).getLog().split("\n")).toList(); List<String> existingLogs = Arrays.stream(clientProvider.getClient().pods().inNamespace(namespace).withName(name).usingTimestamps().tailingLines(200).getLog().split("\n")).toList();
send(session, toLog(existingLogs)); send(session, toLog(existingLogs));
LogWatch watch = clientProvider.getClient().pods().inNamespace(namespace).withName(name).usingTimestamps().tailingLines(0).watchLog(); LogWatch watch = clientProvider.getClient().pods().inNamespace(namespace).withName(name).usingTimestamps().tailingLines(0).watchLog();
sessions.put(session, watch); sessions.put(session, watch);
BufferedReader reader = new BufferedReader(new InputStreamReader(watch.getOutput())); try(BufferedReader reader = new BufferedReader(new InputStreamReader(watch.getOutput())))
readers.put(session, reader);
try
{ {
String line; String line;
while ((line = reader.readLine()) != null) while ((line = reader.readLine()) != null && !Thread.currentThread().isInterrupted())
{ {
send(session, toLog(List.of(line))); send(session, toLog(List.of(line)));
} }
LOG.info("Ended");
} }
catch (Exception e) catch (Exception e)
{ {
@ -75,11 +73,6 @@ public class LogWebsocket
{ {
watch.close(); watch.close();
} }
BufferedReader reader = readers.remove(session);
if (reader != null)
{
reader.close();
}
session.close(); session.close();
} }

View File

@ -3,6 +3,7 @@ package dev.dinauer.inspect.websocket;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import dev.dinauer.ResourceType; import dev.dinauer.ResourceType;
import dev.dinauer.inspect.TokenService;
import dev.dinauer.service.ResourceService; import dev.dinauer.service.ResourceService;
import dev.dinauer.utils.ClientProvider; import dev.dinauer.utils.ClientProvider;
import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watch;
@ -33,17 +34,11 @@ public class ResourceWebsocket
{ {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Inject
Logger LOG;
@Inject
ClientProvider clientProvider;
@Inject @Inject
ManagedExecutor executor; ManagedExecutor executor;
@Inject @Inject
JWTParser parser; TokenService tokenService;
@Inject @Inject
ServiceFactory serviceFactory; ServiceFactory serviceFactory;
@ -51,13 +46,12 @@ public class ResourceWebsocket
private final Map<Session, Watch> sessions = new HashMap<>(); private final Map<Session, Watch> sessions = new HashMap<>();
@OnOpen @OnOpen
public void onOpen(Session session, @PathParam("resource-type") String rawResourceType, @PathParam("namespace") String namespace) throws ParseException public void onOpen(Session session, @PathParam("resource-type") String rawResourceType, @PathParam("namespace") String namespace)
{ {
ResourceType resourceType = ResourceType.fromString(rawResourceType); ResourceType resourceType = ResourceType.fromString(rawResourceType);
JsonWebToken token = getToken(session.getQueryString()); if (tokenService.validateTokenByQueryString(session.getQueryString()))
if (isValid(token))
{ {
executor.runAsync(() -> executor.submit(() ->
{ {
ResourceService<?> service = serviceFactory.getService(resourceType); ResourceService<?> service = serviceFactory.getService(resourceType);
send(session, EventType.INIT, service.findByNamespace(namespace)); send(session, EventType.INIT, service.findByNamespace(namespace));
@ -67,21 +61,14 @@ public class ResourceWebsocket
} }
@OnClose @OnClose
public void onClose(Session session) public void onClose(Session session) throws IOException
{ {
Watch watch = sessions.remove(session); Watch watch = sessions.remove(session);
if (watch != null) if (watch != null)
{ {
watch.close(); watch.close();
} }
try session.close();
{
session.close();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
} }
private <T> Watcher<T> getWatcher(Session session) private <T> Watcher<T> getWatcher(Session session)
@ -109,11 +96,6 @@ public class ResourceWebsocket
}; };
} }
private boolean isGlobal(String namespace)
{
return namespace == null || namespace.isBlank() || namespace.equals("_all");
}
private void send(Session session, EventType type, List<?> objects) private void send(Session session, EventType type, List<?> objects)
{ {
try try
@ -125,31 +107,4 @@ public class ResourceWebsocket
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
private JsonWebToken getToken(String query) throws ParseException
{
for (String param : query.split("&"))
{
String[] sections = param.split("=", 2);
if (sections.length == 2)
{
if (sections[0].equals("token"))
{
return parser.parse(sections[1]);
}
}
}
LOG.error("Token cannot be null.");
throw new RuntimeException("Token cannot be null.");
}
private boolean isValid(JsonWebToken token)
{
Optional<String> purpose = token.claim("purpose");
if (purpose.isPresent())
{
return purpose.get().equals("ws:connect");
}
return false;
}
} }

View File

@ -6,10 +6,10 @@ import dev.dinauer.monitoring.indexing.IndexCollection;
import dev.dinauer.monitoring.indexing.IndexMetricsRepo; import dev.dinauer.monitoring.indexing.IndexMetricsRepo;
import dev.dinauer.monitoring.indexing.TimeUnit; import dev.dinauer.monitoring.indexing.TimeUnit;
import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.Pod;
import io.smallrye.common.constraint.NotNull;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.ws.rs.GET; import jakarta.ws.rs.*;
import jakarta.ws.rs.Path; import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.PathParam;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -28,14 +28,18 @@ public class MonitoringJobResource
IndexMetricsRepo indexMetricsRepo; IndexMetricsRepo indexMetricsRepo;
@GET @GET
public List<MonitoredResource<Pod, IndexCollection>> get(@PathParam("monitoring-id") String monitoringId) throws IOException public List<MonitoredResource<Pod, IndexCollection>> get(@PathParam("monitoring-id") String monitoringId, @QueryParam("from") Long from, @QueryParam("to") Long to) throws IOException
{ {
if (from == null || to == null)
{
throw new BadRequestException();
}
List<MonitoredResource<Pod, IndexCollection>> result = new ArrayList<>(); List<MonitoredResource<Pod, IndexCollection>> result = new ArrayList<>();
MonitoringConfig config = monitoringRepo.findById(monitoringId); MonitoringConfig config = monitoringRepo.findById(monitoringId);
List<Pod> pods = monitoringService.findRunningPodsByMonitoringConfig(config); List<Pod> pods = monitoringService.findRunningPodsByMonitoringConfig(config);
for (Pod pod : pods) for (Pod pod : pods)
{ {
result.add(new MonitoredResource<>(pod, indexMetricsRepo.findByResourceAndMetricAndTimeUnit(String.format("POD-%s", pod.getMetadata().getUid()), config.getType().toString(), TimeUnit.RAW))); result.add(new MonitoredResource<>(pod, indexMetricsRepo.findByResourceAndMetricAndTimeUnitAndPeriod(String.format("POD-%s", pod.getMetadata().getUid()), config.getType().toString(), TimeUnit.RAW, from, to)));
} }
return result; return result;
} }

View File

@ -84,7 +84,7 @@ public class MonitoringJobRunner
scheduler.scheduleAtFixedRate(task, 0, Duration.parse(config.getInterval()), TimeUnit.SECONDS); scheduler.scheduleAtFixedRate(task, 0, Duration.parse(config.getInterval()), TimeUnit.SECONDS);
} }
private void run(MonitoringConfig config) throws IOException, InterruptedException public void run(MonitoringConfig config) throws IOException, InterruptedException
{ {
LOG.infof("Running %s %s monitoring.", config.getConfigName(), config.getType().toString().toLowerCase()); LOG.infof("Running %s %s monitoring.", config.getConfigName(), config.getType().toString().toLowerCase());
switch (config.getType()) switch (config.getType())
@ -93,7 +93,7 @@ public class MonitoringJobRunner
{ {
volumeMonitoringJobRunner.run(config); volumeMonitoringJobRunner.run(config);
} }
case MEMORY -> case WORKLOAD ->
{ {
memoryMonitoringJobRunner.run(config); memoryMonitoringJobRunner.run(config);
} }

View File

@ -4,12 +4,21 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import dev.dinauer.monitoring.entity.MonitoringConfig; import dev.dinauer.monitoring.entity.MonitoringConfig;
import dev.dinauer.monitoring.entity.MonitoringType; import dev.dinauer.monitoring.entity.MonitoringType;
import dev.dinauer.monitoring.entity.TargetConfig;
import dev.dinauer.monitoring.entity.VolumeConfig;
import dev.dinauer.monitoring.entity.creation.MonitoringConfigCreation;
import dev.dinauer.monitoring.entity.creation.VolumeConfigCreation;
import dev.dinauer.monitoring.entity.repo.MonitoringRepo; import dev.dinauer.monitoring.entity.repo.MonitoringRepo;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import jakarta.ws.rs.GET; import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path; import jakarta.ws.rs.Path;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.UUID;
@Path("/monitorings") @Path("/monitorings")
public class MonitoringResource public class MonitoringResource
@ -17,9 +26,53 @@ public class MonitoringResource
@Inject @Inject
MonitoringRepo monitoringRepo; MonitoringRepo monitoringRepo;
@Inject
MonitoringJobRunner jobRunner;
@GET @GET
public List<MonitoringConfig> get() throws JsonProcessingException public List<MonitoringConfig> get() throws JsonProcessingException
{ {
return monitoringRepo.listAll(); return monitoringRepo.listAll();
} }
@POST
@Transactional
public void create(MonitoringConfigCreation configCreation)
{
MonitoringConfig config = new MonitoringConfig();
config.setId(UUID.randomUUID().toString());
config.setConfigName(configCreation.configName());
config.setInterval(configCreation.interval());
config.setType(configCreation.type());
TargetConfig targetConfig = new TargetConfig();
targetConfig.setId(UUID.randomUUID().toString());
targetConfig.setNamespace(configCreation.targetConfig().namespace());
targetConfig.setType(configCreation.targetConfig().type());
targetConfig.setLabels(Map.ofEntries(Map.entry(configCreation.targetConfig().labelKey(), configCreation.targetConfig().labelValue())));
targetConfig.setConfig(config);
config.setTargetConfig(targetConfig);
switch (configCreation.type())
{
case MonitoringType.VOLUME ->
{
VolumeConfig volumeConfig = new VolumeConfig();
volumeConfig.setId(UUID.randomUUID().toString());
volumeConfig.setMountPath(configCreation.volumeConfig().mountPath());
volumeConfig.setContainerName(configCreation.volumeConfig().containerName());
volumeConfig.setConfig(config);
config.setVolumeConfig(volumeConfig);
monitoringRepo.persist(config);
try
{
jobRunner.run(config);
}
catch (IOException | InterruptedException e)
{
throw new RuntimeException(e);
}
}
}
}
} }

View File

@ -21,7 +21,7 @@ public class MonitoringService
TargetConfig targetConfig = config.getTargetConfig(); TargetConfig targetConfig = config.getTargetConfig();
switch (targetConfig.getType()) switch (targetConfig.getType())
{ {
case LABELS -> case LABEL ->
{ {
return podService.findByLabels(targetConfig.getNamespace(), targetConfig.getLabels()).stream().filter(pod -> pod.getStatus().getPhase().equals("Running")).toList(); return podService.findByLabels(targetConfig.getNamespace(), targetConfig.getLabels()).stream().filter(pod -> pod.getStatus().getPhase().equals("Running")).toList();
} }

View File

@ -19,6 +19,7 @@ public class TopNodesService
@Inject @Inject
ProcessRunner processRunner; ProcessRunner processRunner;
@Inject @Inject
PodService podService; PodService podService;
@ -59,8 +60,7 @@ public class TopNodesService
private List<String> runTopNodesCommand() private List<String> runTopNodesCommand()
{ {
String command = String.format("kubectl top nodes --no-headers"); return processRunner.runToLines("kubectl top nodes --no-headers");
return processRunner.runToLines(command);
} }
private Integer extractInteger(String input) private Integer extractInteger(String input)

View File

@ -1,10 +1,7 @@
package dev.dinauer.monitoring; package dev.dinauer.monitoring.collection;
import dev.dinauer.monitoring.entity.MonitoringConfig; import dev.dinauer.monitoring.entity.MonitoringConfig;
import jakarta.persistence.Entity; import jakarta.persistence.*;
import jakarta.persistence.Id;
import jakarta.persistence.OneToMany;
import jakarta.persistence.Table;
import java.util.List; import java.util.List;
@ -15,6 +12,7 @@ public class MonitoringCollection
@Id @Id
private String id; private String id;
@Column(name = "monitoring_collection_name")
private String name; private String name;
@OneToMany(mappedBy = "monitoringCollection") @OneToMany(mappedBy = "monitoringCollection")

View File

@ -0,0 +1,9 @@
package dev.dinauer.monitoring.collection;
import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MonitoringCollectionRepo implements PanacheRepositoryBase<MonitoringCollection, String>
{
}

View File

@ -0,0 +1,28 @@
package dev.dinauer.monitoring.collection;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import java.util.List;
@Path("/monitoring-collections")
public class MonitoringCollectionResource
{
@Inject
MonitoringCollectionRepo repo;
@GET
public List<MonitoringCollection> get()
{
return repo.listAll();
}
@GET
@Path("/{id}")
public MonitoringCollection getById(@PathParam("id") String id)
{
return repo.findById(id);
}
}

View File

@ -1,6 +1,7 @@
package dev.dinauer.monitoring.entity; package dev.dinauer.monitoring.entity;
import dev.dinauer.monitoring.MonitoringCollection; import com.fasterxml.jackson.annotation.JsonIgnore;
import dev.dinauer.monitoring.collection.MonitoringCollection;
import jakarta.persistence.*; import jakarta.persistence.*;
@Entity @Entity
@ -18,20 +19,21 @@ public class MonitoringConfig
private String interval; private String interval;
@OneToOne(mappedBy = "config") @OneToOne(mappedBy = "config", cascade = CascadeType.ALL)
private TargetConfig targetConfig; private TargetConfig targetConfig;
@OneToOne(mappedBy = "config") @OneToOne(mappedBy = "config", cascade = CascadeType.ALL)
private CpuConfig cpuConfig; private CpuConfig cpuConfig;
@OneToOne(mappedBy = "config") @OneToOne(mappedBy = "config", cascade = CascadeType.ALL)
private HealthcheckConfig healthcheckConfig; private HealthcheckConfig healthcheckConfig;
@OneToOne(mappedBy = "config") @OneToOne(mappedBy = "config", cascade = CascadeType.ALL)
private VolumeConfig volumeConfig; private VolumeConfig volumeConfig;
@ManyToOne @ManyToOne
@JoinColumn(name = "monitoring_collection_id") @JoinColumn(name = "monitoring_collection_id")
@JsonIgnore
private MonitoringCollection monitoringCollection; private MonitoringCollection monitoringCollection;
public String getId() public String getId()

View File

@ -4,5 +4,5 @@ public enum MonitoringTargetType
{ {
DEPLOYMENT, DEPLOYMENT,
STATEFUL_SET, STATEFUL_SET,
LABELS LABEL
} }

View File

@ -2,5 +2,5 @@ package dev.dinauer.monitoring.entity;
public enum MonitoringType public enum MonitoringType
{ {
VOLUME, CPU, MEMORY, HEALTHCHECK VOLUME, WORKLOAD, HEALTHCHECK
} }

View File

@ -0,0 +1,11 @@
package dev.dinauer.monitoring.entity.creation;
import dev.dinauer.monitoring.entity.MonitoringType;
public record MonitoringConfigCreation(
String configName,
MonitoringType type,
String interval,
VolumeConfigCreation volumeConfig,
MonitoringTargetConfigCreation targetConfig
) {}

View File

@ -0,0 +1,12 @@
package dev.dinauer.monitoring.entity.creation;
import dev.dinauer.monitoring.entity.MonitoringTargetType;
public record MonitoringTargetConfigCreation(
MonitoringTargetType type,
String namespace,
String deploymentName,
String statefulSetName,
String labelKey,
String labelValue
) {}

View File

@ -0,0 +1,6 @@
package dev.dinauer.monitoring.entity.creation;
public record VolumeConfigCreation(
String mountPath,
String containerName
) {}

View File

@ -24,6 +24,8 @@ public class IndexCollection
private String timestamp; private String timestamp;
private long unix;
@Enumerated(EnumType.STRING) @Enumerated(EnumType.STRING)
private TimeUnit unit; private TimeUnit unit;
@ -34,11 +36,12 @@ public class IndexCollection
{ {
} }
public IndexCollection(String resource, String metric, String timestamp, TimeUnit unit) public IndexCollection(String resource, String metric, String timestamp, long unix, TimeUnit unit)
{ {
this.id = UUID.randomUUID().toString(); this.id = UUID.randomUUID().toString();
this.resource = resource; this.resource = resource;
this.timestamp = timestamp; this.timestamp = timestamp;
this.unix = unix;
this.unit = unit; this.unit = unit;
this.metric = metric; this.metric = metric;
this.metrics = "{}"; this.metrics = "{}";
@ -113,4 +116,9 @@ public class IndexCollection
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
public long getUnix()
{
return unix;
}
} }

View File

@ -19,4 +19,9 @@ public class IndexMetricsRepo implements PanacheRepositoryBase<IndexCollection,
{ {
return list("resource = :resource AND metric = :metric AND unit = :unit ORDER BY timestamp ASC", Parameters.with("resource", resource).and("metric", metric).and("unit", unit)); return list("resource = :resource AND metric = :metric AND unit = :unit ORDER BY timestamp ASC", Parameters.with("resource", resource).and("metric", metric).and("unit", unit));
} }
public List<IndexCollection> findByResourceAndMetricAndTimeUnitAndPeriod(String resource, String metric, TimeUnit unit, long from, long to)
{
return list("resource = :resource AND metric = :metric AND unit = :unit AND (unix >= :from AND unix <= :to) ORDER BY timestamp ASC", Parameters.with("resource", resource).and("metric", metric).and("unit", unit).and("from", from).and("to", to));
}
} }

View File

@ -22,8 +22,9 @@ public class IndexingService
{ {
for (TimeUnit unit : TimeUnit.values()) for (TimeUnit unit : TimeUnit.values())
{ {
String timestamp = TimestampGenerator.generateTimestamp(ZonedDateTime.now(Clock.systemUTC()), unit); ZonedDateTime now = ZonedDateTime.now(Clock.systemUTC());
IndexCollection metrics = indexMetricsRepo.findByProperties(resource, metric, timestamp, unit).orElse(new IndexCollection(resource, metric, timestamp, unit)); String timestamp = TimestampGenerator.generateTimestamp(now, unit);
IndexCollection metrics = indexMetricsRepo.findByProperties(resource, metric, timestamp, unit).orElse(new IndexCollection(resource, metric, timestamp, now.toEpochSecond(), unit));
for (Map.Entry<String, Long> entry : values.entrySet()) for (Map.Entry<String, Long> entry : values.entrySet())
{ {
metrics.add(entry.getKey().toUpperCase(), entry.getValue()); metrics.add(entry.getKey().toUpperCase(), entry.getValue());

View File

@ -22,6 +22,21 @@ public class ByteExtractor
return 0; return 0;
} }
public static long extractCpu(String input)
{
String[] sections = input.split("\n");
if (sections.length == 2)
{
String metricsLine = sections[1];
List<String> metrics = Arrays.stream(metricsLine.split("\\s+")).filter(text -> !text.isBlank()).toList();
if (metrics.size() == 3)
{
return Long.parseLong(metrics.get(1).replace("m", ""));
}
}
return 0;
}
private static long convertToBytes(String input) private static long convertToBytes(String input)
{ {
if (input.endsWith("Ki")) if (input.endsWith("Ki"))

View File

@ -34,7 +34,7 @@ public class MemoryMonitoringJobRunner
String podId = pod.getMetadata().getUid(); String podId = pod.getMetadata().getUid();
String podName = pod.getMetadata().getName(); String podName = pod.getMetadata().getName();
String result = processRunner.runToText(String.format("kubectl top pod %s -n %s", podName, config.getTargetConfig().getNamespace())); String result = processRunner.runToText(String.format("kubectl top pod %s -n %s", podName, config.getTargetConfig().getNamespace()));
indexingService.index(String.format("POD-%s", podId), MonitoringType.MEMORY.toString(), Map.ofEntries(Map.entry("CPU", ByteExtractor.extractBytes(result)))); indexingService.index(String.format("POD-%s", podId), MonitoringType.WORKLOAD.toString(), Map.ofEntries(Map.entry("MEMORY", ByteExtractor.extractBytes(result)), Map.entry("CPU", ByteExtractor.extractCpu(result))));
} }
} }
} }

View File

@ -1,11 +1,14 @@
INSERT INTO monitoring_config (id, config_name, type, interval) INSERT INTO monitoring_collection (id, monitoring_collection_name)
VALUES ('5da234f4-3a34-4b1c-b72a-7330ca3b1dcf', 'Postgres Cluster', 'VOLUME', '10m'); VALUES ('67a234f4-3a34-4b1c-b72a-7330ca3b1dcf', 'Tavolio');
INSERT INTO monitoring_config (id, config_name, type, interval, monitoring_collection_id)
VALUES ('5da234f4-3a34-4b1c-b72a-7330ca3b1dcf', 'Postgres Cluster', 'VOLUME', '30s', '67a234f4-3a34-4b1c-b72a-7330ca3b1dcf');
INSERT INTO target_config (id, type, namespace, labels, deployment_name, stateful_set_name, config_id) INSERT INTO target_config (id, type, namespace, labels, deployment_name, stateful_set_name, config_id)
VALUES ('4bd2f449-ed11-4f3f-830a-e4dc39cb21f5', 'LABELS', 'tavolio-prod', '{"cnpg.io/cluster":"postgres-cluster"}', null, null, '5da234f4-3a34-4b1c-b72a-7330ca3b1dcf'); VALUES ('4bd2f449-ed11-4f3f-830a-e4dc39cb21f5', 'LABEL', 'tavolio-prod', '{"cnpg.io/cluster":"postgres-cluster"}', null, null, '5da234f4-3a34-4b1c-b72a-7330ca3b1dcf');
INSERT INTO volume_config (id, container_name, mount_path, config_id) INSERT INTO volume_config (id, container_name, mount_path, config_id)
VALUES ('4090a60c-4517-4d76-b460-a0454014f30d', 'postgres', '/var/lib/postgresql/data', '5da234f4-3a34-4b1c-b72a-7330ca3b1dcf'); VALUES ('4090a60c-4517-4d76-b460-a0454014f30d', 'postgres', '/var/lib/postgresql/data', '5da234f4-3a34-4b1c-b72a-7330ca3b1dcf');
INSERT INTO monitoring_config (id, config_name, type, interval) INSERT INTO monitoring_config (id, config_name, type, interval, monitoring_collection_id)
VALUES ('2da234f4-3a34-4b1c-b72a-7330ca3b1dcf', 'Postgres Cluster', 'MEMORY', '10m'); VALUES ('2da234f4-3a34-4b1c-b72a-7330ca3b1dcf', 'Postgres Cluster', 'WORKLOAD', '30s', '67a234f4-3a34-4b1c-b72a-7330ca3b1dcf');
INSERT INTO target_config (id, type, namespace, labels, deployment_name, stateful_set_name, config_id) INSERT INTO target_config (id, type, namespace, labels, deployment_name, stateful_set_name, config_id)
VALUES ('2bd2f449-ed11-4f3f-830a-e4dc39cb21f5', 'LABELS', 'tavolio-prod', '{"cnpg.io/cluster":"postgres-cluster"}', null, null, '2da234f4-3a34-4b1c-b72a-7330ca3b1dcf'); VALUES ('2bd2f449-ed11-4f3f-830a-e4dc39cb21f5', 'LABEL', 'tavolio-prod', '{"cnpg.io/cluster":"postgres-cluster"}', null, null, '2da234f4-3a34-4b1c-b72a-7330ca3b1dcf');