🗃️ Move to big bucket indexing
This commit is contained in:
parent
eacb11d6dd
commit
ee3e91f092
30
docker-compose-dev.yaml
Normal file
30
docker-compose-dev.yaml
Normal file
@ -0,0 +1,30 @@
|
||||
services:
|
||||
db:
|
||||
image: postgres
|
||||
restart: always
|
||||
shm_size: 128mb
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
ports:
|
||||
- "6666:5432"
|
||||
db-big-bucket:
|
||||
image: postgres
|
||||
restart: always
|
||||
shm_size: 128mb
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
ports:
|
||||
- "6667:5432"
|
||||
big-bucket:
|
||||
image: harbor.dinauer.dev/big-bucket/core:13
|
||||
ports:
|
||||
- "8090:8080"
|
||||
environment:
|
||||
BIG_BUCKET_CLIENT_KUBOOBOO_RW: password
|
||||
BIG_BUCKET_UNITS: RAW,HOURLY,DAILY
|
||||
DB_USER: postgres
|
||||
DB_PASSWORD: postgres
|
||||
DB_HOST: db-big-bucket
|
||||
DB_PORT: 5432
|
||||
DB_DATABASE: postgres
|
||||
DB_SCHEMA: public
|
||||
@ -1,9 +0,0 @@
|
||||
services:
|
||||
db:
|
||||
image: postgres
|
||||
restart: always
|
||||
shm_size: 128mb
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
ports:
|
||||
- "6666:5432"
|
||||
@ -1,10 +0,0 @@
|
||||
package dev.dinauer.monitoring;
|
||||
|
||||
import dev.dinauer.monitoring.indexing.IndexCollection;
|
||||
import io.fabric8.kubernetes.api.model.Node;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public record MonitoredNode(Node node, List<IndexCollection> jobs)
|
||||
{
|
||||
}
|
||||
@ -1,9 +1,10 @@
|
||||
package dev.dinauer.monitoring;
|
||||
|
||||
import dev.dinauer.monitoring.indexing.Bucket;
|
||||
import io.fabric8.kubernetes.api.model.Pod;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public record MonitoredResource<E, T>(E resource, List<T> jobs)
|
||||
public record MonitoredResource<E>(E resource, List<Bucket> jobs)
|
||||
{
|
||||
}
|
||||
|
||||
@ -2,14 +2,10 @@ package dev.dinauer.monitoring;
|
||||
|
||||
import dev.dinauer.monitoring.entity.MonitoringConfig;
|
||||
import dev.dinauer.monitoring.entity.repo.MonitoringRepo;
|
||||
import dev.dinauer.monitoring.indexing.IndexCollection;
|
||||
import dev.dinauer.monitoring.indexing.IndexMetricsRepo;
|
||||
import dev.dinauer.monitoring.indexing.TimeUnit;
|
||||
import dev.dinauer.monitoring.indexing.Bucket;
|
||||
import io.fabric8.kubernetes.api.model.Pod;
|
||||
import io.smallrye.common.constraint.NotNull;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.ws.rs.*;
|
||||
import jakarta.ws.rs.core.Response;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@ -24,22 +20,19 @@ public class MonitoringJobResource
|
||||
@Inject
|
||||
MonitoringService monitoringService;
|
||||
|
||||
@Inject
|
||||
IndexMetricsRepo indexMetricsRepo;
|
||||
|
||||
@GET
|
||||
public List<MonitoredResource<Pod, IndexCollection>> get(@PathParam("monitoring-id") String monitoringId, @QueryParam("from") Long from, @QueryParam("to") Long to) throws IOException
|
||||
public List<MonitoredResource<Pod>> get(@PathParam("monitoring-id") String monitoringId, @QueryParam("from") Long from, @QueryParam("to") Long to) throws IOException
|
||||
{
|
||||
if (from == null || to == null)
|
||||
{
|
||||
throw new BadRequestException();
|
||||
}
|
||||
List<MonitoredResource<Pod, IndexCollection>> result = new ArrayList<>();
|
||||
List<MonitoredResource<Pod>> result = new ArrayList<>();
|
||||
MonitoringConfig config = monitoringRepo.findById(monitoringId);
|
||||
List<Pod> pods = monitoringService.findRunningPodsByMonitoringConfig(config);
|
||||
for (Pod pod : pods)
|
||||
{
|
||||
result.add(new MonitoredResource<>(pod, indexMetricsRepo.findByResourceAndMetricAndTimeUnitAndPeriod(String.format("POD-%s", pod.getMetadata().getUid()), config.getType().toString(), TimeUnit.RAW, from, to)));
|
||||
result.add(new MonitoredResource<>(pod, null));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -56,6 +56,7 @@ public class MonitoringJobRunner
|
||||
Runnable task = () -> {
|
||||
try
|
||||
{
|
||||
System.out.println("x");
|
||||
nodeMonitoringService.run();
|
||||
}
|
||||
catch (Exception e)
|
||||
|
||||
@ -1,16 +1,16 @@
|
||||
package dev.dinauer.monitoring;
|
||||
|
||||
import dev.dinauer.monitoring.indexing.IndexCollection;
|
||||
import dev.dinauer.monitoring.indexing.IndexMetric;
|
||||
import dev.dinauer.monitoring.indexing.IndexMetricsRepo;
|
||||
import dev.dinauer.monitoring.indexing.TimeUnit;
|
||||
import dev.dinauer.monitoring.indexing.BigBucketService;
|
||||
import dev.dinauer.monitoring.indexing.Bucket;
|
||||
import dev.dinauer.monitoring.indexing.BucketUnit;
|
||||
import dev.dinauer.service.NodeService;
|
||||
import dev.dinauer.utils.ClientProvider;
|
||||
import io.fabric8.kubernetes.api.model.Node;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.ws.rs.GET;
|
||||
import jakarta.ws.rs.Path;
|
||||
import jakarta.ws.rs.PathParam;
|
||||
import jakarta.ws.rs.QueryParam;
|
||||
import org.jboss.resteasy.reactive.common.NotImplementedYet;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.ZonedDateTime;
|
||||
@ -21,25 +21,23 @@ import java.util.List;
|
||||
public class NodeMonitoringJobResource
|
||||
{
|
||||
@Inject
|
||||
IndexMetricsRepo indexMetricsRepo;
|
||||
NodeService nodeService;
|
||||
|
||||
@Inject
|
||||
ClientProvider clientProvider;
|
||||
BigBucketService bigBucketService;
|
||||
|
||||
@GET
|
||||
public List<MonitoredResource<Node, IndexCollection>> get(@QueryParam("from") ZonedDateTime from, @QueryParam("to") ZonedDateTime to) throws IOException
|
||||
public List<MonitoredResource<Node>> get(@QueryParam("from") ZonedDateTime from, @QueryParam("to") ZonedDateTime to) throws IOException
|
||||
{
|
||||
List<MonitoredResource<Node, IndexCollection>> result = new ArrayList<>();
|
||||
List<Node> nodes = clientProvider.getClient().nodes().list().getItems();
|
||||
for (Node node : nodes)
|
||||
List<MonitoredResource<Node>> result = new ArrayList<>();
|
||||
for (Node node : nodeService.findAll())
|
||||
{
|
||||
String resource = String.format("NODE-%s", node.getMetadata().getUid());
|
||||
result.add(new MonitoredResource<>(node, indexMetricsRepo.findByResourceAndMetricAndTimeUnit(resource, "NODE_METRICS", determineTimeUnit(from, to))));
|
||||
result.add(new MonitoredResource<>(node, bigBucketService.get(String.format("NODE-%s", node.getMetadata().getUid()), "NODE_METRICS", determineTimeUnit(from, to))));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private TimeUnit determineTimeUnit(ZonedDateTime from, ZonedDateTime to)
|
||||
private BucketUnit determineTimeUnit(ZonedDateTime from, ZonedDateTime to)
|
||||
{
|
||||
long day = 60 * 60 * 24;
|
||||
long twoDays = day * 2;
|
||||
@ -48,12 +46,12 @@ public class NodeMonitoringJobResource
|
||||
long dif = to.toEpochSecond() - from.toEpochSecond();
|
||||
if (dif < twoDays)
|
||||
{
|
||||
return TimeUnit.RAW;
|
||||
return BucketUnit.RAW;
|
||||
}
|
||||
if (dif < fifteenDays)
|
||||
{
|
||||
return TimeUnit.HOUR;
|
||||
return BucketUnit.HOURLY;
|
||||
}
|
||||
return TimeUnit.DAY;
|
||||
return BucketUnit.DAILY;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,19 @@
|
||||
package dev.dinauer.monitoring.indexing;
|
||||
|
||||
import io.quarkus.rest.client.reactive.ClientBasicAuth;
|
||||
import jakarta.ws.rs.*;
|
||||
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Path("/api/metrics/{resource}/{metric}")
|
||||
@RegisterRestClient(configKey = "big-bucket")
|
||||
@ClientBasicAuth(username = "${big.bucket.username}", password = "${big.bucket.password}")
|
||||
public interface BigBucketClient
|
||||
{
|
||||
@POST
|
||||
void index(@PathParam("resource") String resource, @PathParam("metric") String metric, Insert body);
|
||||
|
||||
@GET
|
||||
List<Bucket> get(@PathParam("resource") String resource, @PathParam("metric") String metric, @QueryParam("bucket-unit") BucketUnit bucketUnit);
|
||||
}
|
||||
@ -0,0 +1,47 @@
|
||||
package dev.dinauer.monitoring.indexing;
|
||||
|
||||
import io.quarkus.runtime.Startup;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.ws.rs.InternalServerErrorException;
|
||||
import org.eclipse.microprofile.rest.client.inject.RestClient;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Startup
|
||||
@ApplicationScoped
|
||||
public class BigBucketService
|
||||
{
|
||||
@Inject
|
||||
Logger LOG;
|
||||
|
||||
@RestClient
|
||||
BigBucketClient client;
|
||||
|
||||
public void index(String resource, String metric, Map<String, Number> values)
|
||||
{
|
||||
try
|
||||
{
|
||||
client.index(resource, metric, new Insert(values));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
LOG.errorf("Failed to index to BigBucket: %s", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public List<Bucket> get(String resource, String metric, BucketUnit bucketUnit)
|
||||
{
|
||||
try
|
||||
{
|
||||
return client.get(resource, metric, bucketUnit);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
LOG.errorf("Failed to retrieve from BigBucket: %s", e.getMessage());
|
||||
throw new InternalServerErrorException();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,7 @@
|
||||
package dev.dinauer.monitoring.indexing;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public record Bucket(String resource, String timestamp, long unixTimestamp, BucketUnit bucketUnit, Map<String, Metric> metrics, String owner)
|
||||
{
|
||||
}
|
||||
@ -0,0 +1,6 @@
|
||||
package dev.dinauer.monitoring.indexing;
|
||||
|
||||
public enum BucketUnit
|
||||
{
|
||||
RAW, HOURLY, DAILY, WEEKLY, MONTHLY, YEARLY, TOTAL
|
||||
}
|
||||
@ -1,124 +0,0 @@
|
||||
package dev.dinauer.monitoring.indexing;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import jakarta.persistence.*;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
@Entity
|
||||
@Table(name = "metrics_index")
|
||||
public class IndexCollection
|
||||
{
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
@Id
|
||||
private String id;
|
||||
|
||||
private String resource;
|
||||
|
||||
private String metric;
|
||||
|
||||
private String timestamp;
|
||||
|
||||
private long unix;
|
||||
|
||||
@Enumerated(EnumType.STRING)
|
||||
private TimeUnit unit;
|
||||
|
||||
@Column(columnDefinition = "text")
|
||||
private String metrics;
|
||||
|
||||
public IndexCollection()
|
||||
{
|
||||
}
|
||||
|
||||
public IndexCollection(String resource, String metric, String timestamp, long unix, TimeUnit unit)
|
||||
{
|
||||
this.id = UUID.randomUUID().toString();
|
||||
this.resource = resource;
|
||||
this.timestamp = timestamp;
|
||||
this.unix = unix;
|
||||
this.unit = unit;
|
||||
this.metric = metric;
|
||||
this.metrics = "{}";
|
||||
}
|
||||
|
||||
public String getId()
|
||||
{
|
||||
return id;
|
||||
}
|
||||
|
||||
public void add(String key, long value)
|
||||
{
|
||||
Map<String, IndexMetric> metrics = getMetrics();
|
||||
IndexMetric metric = metrics.get(key);
|
||||
if (metric != null)
|
||||
{
|
||||
metric.add(value);
|
||||
}
|
||||
else
|
||||
{
|
||||
IndexMetric newMetric = new IndexMetric();
|
||||
newMetric.add(value);
|
||||
metrics.put(key, newMetric);
|
||||
}
|
||||
try
|
||||
{
|
||||
this.metrics = OBJECT_MAPPER.writeValueAsString(metrics);
|
||||
}
|
||||
catch (JsonProcessingException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private double calculateAverage(long sum, int count)
|
||||
{
|
||||
if (count == 0)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
return sum * 1.0 / count;
|
||||
}
|
||||
|
||||
public String getTimestamp()
|
||||
{
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public String getMetric()
|
||||
{
|
||||
return metric;
|
||||
}
|
||||
|
||||
public TimeUnit getUnit()
|
||||
{
|
||||
return unit;
|
||||
}
|
||||
|
||||
public String getResource()
|
||||
{
|
||||
return resource;
|
||||
}
|
||||
|
||||
public Map<String, IndexMetric> getMetrics()
|
||||
{
|
||||
try
|
||||
{
|
||||
return OBJECT_MAPPER.readValue(metrics, new TypeReference<Map<String, IndexMetric>>() {});
|
||||
}
|
||||
catch (JsonProcessingException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public long getUnix()
|
||||
{
|
||||
return unix;
|
||||
}
|
||||
}
|
||||
@ -1,46 +0,0 @@
|
||||
package dev.dinauer.monitoring.indexing;
|
||||
|
||||
public class IndexMetric
|
||||
{
|
||||
private int count;
|
||||
private long sum;
|
||||
private double average;
|
||||
|
||||
public IndexMetric()
|
||||
{
|
||||
this.count = 0;
|
||||
this.sum = 0;
|
||||
this.average = 0.0F;
|
||||
}
|
||||
|
||||
public void add(long value)
|
||||
{
|
||||
count = count + 1;
|
||||
sum = sum + value;
|
||||
average = calculateAverage(sum, count);
|
||||
}
|
||||
|
||||
private double calculateAverage(long sum, int count)
|
||||
{
|
||||
if (count == 0)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
return sum * 1.0 / count;
|
||||
}
|
||||
|
||||
public int getCount()
|
||||
{
|
||||
return count;
|
||||
}
|
||||
|
||||
public long getSum()
|
||||
{
|
||||
return sum;
|
||||
}
|
||||
|
||||
public double getAverage()
|
||||
{
|
||||
return average;
|
||||
}
|
||||
}
|
||||
@ -1,27 +0,0 @@
|
||||
package dev.dinauer.monitoring.indexing;
|
||||
|
||||
import io.quarkus.hibernate.orm.panache.PanacheRepositoryBase;
|
||||
import io.quarkus.panache.common.Parameters;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
public class IndexMetricsRepo implements PanacheRepositoryBase<IndexCollection, String>
|
||||
{
|
||||
public Optional<IndexCollection> findByProperties(String resource, String metric, String timestamp, TimeUnit unit)
|
||||
{
|
||||
return find("resource = :resource AND metric = :metric AND timestamp = :timestamp AND unit = :unit", Parameters.with("resource", resource).and("metric", metric).and("timestamp", timestamp).and("unit", unit)).firstResultOptional();
|
||||
}
|
||||
|
||||
public List<IndexCollection> findByResourceAndMetricAndTimeUnit(String resource, String metric, TimeUnit unit)
|
||||
{
|
||||
return list("resource = :resource AND metric = :metric AND unit = :unit ORDER BY timestamp ASC", Parameters.with("resource", resource).and("metric", metric).and("unit", unit));
|
||||
}
|
||||
|
||||
public List<IndexCollection> findByResourceAndMetricAndTimeUnitAndPeriod(String resource, String metric, TimeUnit unit, long from, long to)
|
||||
{
|
||||
return list("resource = :resource AND metric = :metric AND unit = :unit AND (unix >= :from AND unix <= :to) ORDER BY timestamp ASC", Parameters.with("resource", resource).and("metric", metric).and("unit", unit).and("from", from).and("to", to));
|
||||
}
|
||||
}
|
||||
@ -1,35 +0,0 @@
|
||||
package dev.dinauer.monitoring.indexing;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.Startup;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.transaction.Transactional;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
@Startup
|
||||
@ApplicationScoped
|
||||
public class IndexingService
|
||||
{
|
||||
@Inject
|
||||
IndexMetricsRepo indexMetricsRepo;
|
||||
|
||||
@Transactional
|
||||
public void index(String resource, String metric, Map<String, Long> values)
|
||||
{
|
||||
for (TimeUnit unit : TimeUnit.values())
|
||||
{
|
||||
ZonedDateTime now = ZonedDateTime.now(Clock.systemUTC());
|
||||
String timestamp = TimestampGenerator.generateTimestamp(now, unit);
|
||||
IndexCollection metrics = indexMetricsRepo.findByProperties(resource, metric, timestamp, unit).orElse(new IndexCollection(resource, metric, timestamp, now.toEpochSecond(), unit));
|
||||
for (Map.Entry<String, Long> entry : values.entrySet())
|
||||
{
|
||||
metrics.add(entry.getKey().toUpperCase(), entry.getValue());
|
||||
}
|
||||
indexMetricsRepo.persist(metrics);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,7 @@
|
||||
package dev.dinauer.monitoring.indexing;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public record Insert(Map<String, Number> values)
|
||||
{
|
||||
}
|
||||
@ -0,0 +1,5 @@
|
||||
package dev.dinauer.monitoring.indexing;
|
||||
|
||||
public record Metric(int count, int sum, double average, Double min, Double max)
|
||||
{
|
||||
}
|
||||
@ -1,6 +0,0 @@
|
||||
package dev.dinauer.monitoring.indexing;
|
||||
|
||||
public enum TimeUnit
|
||||
{
|
||||
RAW, HOUR, DAY
|
||||
}
|
||||
@ -1,28 +0,0 @@
|
||||
package dev.dinauer.monitoring.indexing;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
|
||||
public class TimestampGenerator
|
||||
{
|
||||
public static String generateTimestamp(ZonedDateTime timestamp, TimeUnit unit)
|
||||
{
|
||||
switch (unit)
|
||||
{
|
||||
case TimeUnit.RAW ->
|
||||
{
|
||||
return timestamp.format(DateTimeFormatter.ISO_DATE_TIME).substring(0, 19);
|
||||
}
|
||||
case TimeUnit.HOUR ->
|
||||
{
|
||||
return timestamp.format(DateTimeFormatter.ISO_DATE_TIME).substring(0, 13);
|
||||
}
|
||||
case TimeUnit.DAY ->
|
||||
{
|
||||
return timestamp.format(DateTimeFormatter.ISO_DATE_TIME).substring(0, 10);
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("Cannot index by chono unit " + unit);
|
||||
}
|
||||
}
|
||||
@ -4,7 +4,7 @@ import dev.dinauer.ProcessRunner;
|
||||
import dev.dinauer.monitoring.MonitoringService;
|
||||
import dev.dinauer.monitoring.entity.MonitoringConfig;
|
||||
import dev.dinauer.monitoring.entity.MonitoringType;
|
||||
import dev.dinauer.monitoring.indexing.IndexingService;
|
||||
import dev.dinauer.monitoring.indexing.BigBucketService;
|
||||
import io.fabric8.kubernetes.api.model.Pod;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
@ -23,7 +23,7 @@ public class MemoryMonitoringJobRunner
|
||||
MonitoringService monitoringService;
|
||||
|
||||
@Inject
|
||||
IndexingService indexingService;
|
||||
BigBucketService bigBucketService;
|
||||
|
||||
public void run(MonitoringConfig config) throws IOException, InterruptedException
|
||||
{
|
||||
@ -34,7 +34,7 @@ public class MemoryMonitoringJobRunner
|
||||
String podId = pod.getMetadata().getUid();
|
||||
String podName = pod.getMetadata().getName();
|
||||
String result = processRunner.runToText(String.format("kubectl top pod %s -n %s", podName, config.getTargetConfig().getNamespace()));
|
||||
indexingService.index(String.format("POD-%s", podId), MonitoringType.WORKLOAD.toString(), Map.ofEntries(Map.entry("MEMORY", ByteExtractor.extractBytes(result)), Map.entry("CPU", ByteExtractor.extractCpu(result))));
|
||||
bigBucketService.index(String.format("POD-%s", podId), MonitoringType.WORKLOAD.toString(), Map.ofEntries(Map.entry("MEMORY", ByteExtractor.extractBytes(result)), Map.entry("CPU", ByteExtractor.extractCpu(result))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,12 +1,15 @@
|
||||
package dev.dinauer.monitoring.nodes;
|
||||
|
||||
import dev.dinauer.monitoring.TopNodesService;
|
||||
import dev.dinauer.monitoring.indexing.IndexingService;
|
||||
import dev.dinauer.monitoring.indexing.BigBucketService;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import java.io.IOException;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ApplicationScoped
|
||||
public class NodeMonitoringService
|
||||
@ -15,7 +18,7 @@ public class NodeMonitoringService
|
||||
TopNodesService topNodesService;
|
||||
|
||||
@Inject
|
||||
IndexingService indexingService;
|
||||
BigBucketService bigBucketService;
|
||||
|
||||
public void run() throws IOException, InterruptedException
|
||||
{
|
||||
@ -23,23 +26,17 @@ public class NodeMonitoringService
|
||||
for (MonitoredNode node : nodes)
|
||||
{
|
||||
NodeMetrics nodeMetrics = node.getMetrics();
|
||||
Map<String, Long> metrics = Map.ofEntries(
|
||||
Map.entry("RELATIVE_CPU", toLong(nodeMetrics.relativeCpuUsage())),
|
||||
Map.entry("RELATIVE_MEMORY", toLong(nodeMetrics.relativeMemory())),
|
||||
Map.entry("ABSOLUTE_MEMORY", toLong(nodeMetrics.absoluteMemory())),
|
||||
Map.entry("ABSOLUTE_CPU", toLong(nodeMetrics.absoluteCpuUsage())),
|
||||
Map.entry("RELATIVE_DISK_SPACE", toLong(nodeMetrics.relativeDiskUsage())),
|
||||
Map.entry("TOTAL_DISK_SPACE", nodeMetrics.totalDiskSpace()));
|
||||
indexingService.index(String.format("NODE-%s", node.getMetadata().getUid()), "NODE_METRICS", metrics);
|
||||
}
|
||||
}
|
||||
Map<String, Number> metrics = Stream.of(
|
||||
new AbstractMap.SimpleEntry<>("RELATIVE_CPU", nodeMetrics.relativeCpuUsage()),
|
||||
new AbstractMap.SimpleEntry<>("RELATIVE_MEMORY", nodeMetrics.relativeMemory()),
|
||||
new AbstractMap.SimpleEntry<>("ABSOLUTE_MEMORY", nodeMetrics.absoluteMemory()),
|
||||
new AbstractMap.SimpleEntry<>("ABSOLUTE_CPU", nodeMetrics.absoluteCpuUsage()),
|
||||
new AbstractMap.SimpleEntry<>("RELATIVE_DISK_SPACE", nodeMetrics.relativeDiskUsage()),
|
||||
new AbstractMap.SimpleEntry<>("TOTAL_DISK_SPACE", nodeMetrics.totalDiskSpace()))
|
||||
.filter(entry -> entry.getValue() != null)
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
private Long toLong(Integer input)
|
||||
{
|
||||
if (input != null)
|
||||
{
|
||||
return Long.valueOf(input);
|
||||
bigBucketService.index(String.format("NODE-%s", node.getMetadata().getUid()), "NODE_METRICS", metrics);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -52,7 +52,6 @@ public class NodeDiskService
|
||||
String port = "8080";
|
||||
try (HttpClient client = HttpClient.newBuilder().build())
|
||||
{
|
||||
LOG.infof("Collect disk monitoring for node %s", nodeName);
|
||||
HttpRequest request = HttpRequest.newBuilder().uri(getURI(ip, port)).GET().build();
|
||||
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
|
||||
NodeDiskMetrics metrics = parse(response.body());
|
||||
|
||||
19
src/main/java/dev/dinauer/monitoring/pods/MonitoredPod.java
Normal file
19
src/main/java/dev/dinauer/monitoring/pods/MonitoredPod.java
Normal file
@ -0,0 +1,19 @@
|
||||
package dev.dinauer.monitoring.pods;
|
||||
|
||||
import io.fabric8.kubernetes.api.model.Pod;
|
||||
|
||||
public class MonitoredPod extends Pod
|
||||
{
|
||||
private final PodMetrics metrics;
|
||||
|
||||
public MonitoredPod(Pod pod, PodMetrics metrics)
|
||||
{
|
||||
super(pod.getApiVersion(), pod.getKind(), pod.getMetadata(), pod.getSpec(), pod.getStatus());
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
public PodMetrics getMetrics()
|
||||
{
|
||||
return metrics;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,5 @@
|
||||
package dev.dinauer.monitoring.pods;
|
||||
|
||||
public record PodMetrics(String cpu, String memory)
|
||||
{
|
||||
}
|
||||
@ -0,0 +1,45 @@
|
||||
package dev.dinauer.monitoring.pods;
|
||||
|
||||
import dev.dinauer.ProcessRunner;
|
||||
import io.fabric8.kubernetes.api.model.Pod;
|
||||
import io.quarkus.scheduler.Scheduled;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ApplicationScoped
|
||||
public class PodMetricsService
|
||||
{
|
||||
@Inject
|
||||
ProcessRunner processRunner;
|
||||
|
||||
Map<String, PodMetrics> metrics;
|
||||
|
||||
@Scheduled(every = "1m")
|
||||
public void find()
|
||||
{
|
||||
Map<String, PodMetrics> result = new HashMap<>();
|
||||
List<String> podMetrics = processRunner.runToLines("kubectl top pods --all-namespaces --no-headers");
|
||||
for (String line : podMetrics)
|
||||
{
|
||||
String[] sections = line.split("\\s+");
|
||||
if (sections.length == 4)
|
||||
{
|
||||
String namespace = sections[0].trim();
|
||||
String name = sections[1].trim();
|
||||
String cpu = sections[2].trim();
|
||||
String memory = sections[3].trim();
|
||||
result.put(String.format("%s/%s", namespace, name), new PodMetrics(cpu, memory));
|
||||
}
|
||||
}
|
||||
this.metrics = result;
|
||||
}
|
||||
|
||||
public PodMetrics getForPod(Pod pod)
|
||||
{
|
||||
return this.metrics.get(String.format("%s/%s", pod.getMetadata().getNamespace(), pod.getMetadata().getName()));
|
||||
}
|
||||
}
|
||||
@ -4,7 +4,7 @@ import dev.dinauer.ProcessRunner;
|
||||
import dev.dinauer.monitoring.MonitoringService;
|
||||
import dev.dinauer.monitoring.entity.MonitoringConfig;
|
||||
import dev.dinauer.monitoring.entity.MonitoringType;
|
||||
import dev.dinauer.monitoring.indexing.IndexingService;
|
||||
import dev.dinauer.monitoring.indexing.BigBucketService;
|
||||
import io.fabric8.kubernetes.api.model.Pod;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
@ -34,7 +34,7 @@ public class VolumeMonitoringJobRunner
|
||||
VolumeUsageRepo volumeUsageRepo;
|
||||
|
||||
@Inject
|
||||
IndexingService indexingService;
|
||||
BigBucketService bigBucketService;
|
||||
|
||||
public void run(MonitoringConfig monitoring) throws IOException, InterruptedException
|
||||
{
|
||||
@ -49,7 +49,7 @@ public class VolumeMonitoringJobRunner
|
||||
String result = processRunner.runToText(String.format("kubectl exec -i %s -c %s -n %s -- df -B1 --output=size,used,avail,pcent -h %s -B1", podName, monitoring.getVolumeConfig().getContainerName(), monitoring.getTargetConfig().getNamespace(), monitoring.getVolumeConfig().getMountPath()));
|
||||
UsageMetrics usage = extractUsage(result);
|
||||
volumeUsageRepo.persist(new VolumeUsage(UUID.randomUUID().toString(), monitoring.getId(), podId, podName, monitoring.getTargetConfig().getNamespace(), ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS), usage));
|
||||
indexingService.index(String.format("POD-%s", podId), MonitoringType.VOLUME.toString(), Map.ofEntries(Map.entry("VOLUME", (long) usage.percentage())));
|
||||
bigBucketService.index(String.format("POD-%s", podId), MonitoringType.VOLUME.toString(), Map.ofEntries(Map.entry("VOLUME", (long) usage.percentage())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,4 +27,11 @@ quarkus.datasource.db-kind = postgresql
|
||||
%prod.quarkus.hibernate-orm.schema-management.strategy=drop-and-create
|
||||
|
||||
# Banner
|
||||
quarkus.banner.path=banner.txt
|
||||
quarkus.banner.path=banner.txt
|
||||
|
||||
# Big Bucket
|
||||
%dev.quarkus.rest-client.big-bucket.url=http://localhost:8090
|
||||
%dev.big.bucket.username=kubooboo
|
||||
%dev.big.bucket.password=password
|
||||
|
||||
%prod.quarkus.rest-client.big-bucket.url=${BIG_BUCKET_URL}
|
||||
Loading…
x
Reference in New Issue
Block a user