Observability: Difference between revisions
(→Metrics: make Alerts section and add Grafana alerts option) |
|||
(18 intermediate revisions by one other user not shown) | |||
Line 27: | Line 27: | ||
) |
) |
||
</pre> |
</pre> |
||
<br> |
|||
== Alerts == |
|||
We also use [https://prometheus.io/docs/alerting/latest/alertmanager/ AlertManager] to send email alerts from Prometheus metrics. We should figure out how to also send messages to IRC or similar. |
We also use [https://prometheus.io/docs/alerting/latest/alertmanager/ AlertManager] to send email alerts from Prometheus metrics. We should figure out how to also send messages to IRC or similar. |
||
Line 33: | Line 34: | ||
We make some pretty charts on Grafana (https://prometheus.csclub.uwaterloo.ca) from PromQL queries. Grafana also has an 'Explorer' page where you can test out some queries before making chart panels from them. |
We make some pretty charts on Grafana (https://prometheus.csclub.uwaterloo.ca) from PromQL queries. Grafana also has an 'Explorer' page where you can test out some queries before making chart panels from them. |
||
You can also create fine-grained alerts on Grafana (https://prometheus.csclub.uwaterloo.ca/alerting/list). |
|||
== Logging == |
== Logging == |
||
We now use [https://vector.dev/ Vector] for collecting and transforming logs, and [https://clickhouse.com/ ClickHouse] for storing log data. |
|||
=== ClickHouse === |
|||
ClickHouse is a very fast OLAP database which has great documentation for storing and analyzing [https://clickhouse.com/use-cases/logging-and-metrics logging and metrics]. Unfortunately, the CPU on phosphoric-acid (which hosts the prometheus VM) is so old that when we try to install the official deb package, the following error occurs: |
|||
<pre> |
|||
Instruction check fail. The CPU does not support SSSE3 instruction set. |
|||
</pre> |
|||
So we're going to download the "compat" version instead: |
|||
<pre> |
|||
cd /root |
|||
wget https://s3.amazonaws.com/clickhouse-builds/master/amd64compat/clickhouse |
|||
chmod +x clickhouse |
|||
./clickhouse install |
|||
rm clickhouse |
|||
wget -O /etc/systemd/system/clickhouse-server.service https://github.com/ClickHouse/ClickHouse/raw/master/packages/clickhouse-server.service |
|||
systemctl daemon-reload |
|||
systemctl enable clickhouse-server |
|||
</pre> |
|||
By default, systemd limits the number of threads which a service can create, so we'll want to disable that. Run <code>systemctl edit clickhouse-server</code> and paste the following: |
|||
<pre> |
|||
[Service] |
|||
TasksMax=infinity |
|||
</pre> |
|||
Next, paste the following into /etc/clickhouse-server/users.d/csclub-users.xml: |
|||
<pre> |
|||
<clickhouse> |
|||
<profiles> |
|||
<default> |
|||
<!-- disable logs (using too much disk space) --> |
|||
<log_queries replace="replace">0</log_queries> |
|||
<log_query_threads replace="replace">0</log_query_threads> |
|||
</default> |
|||
<readonly> |
|||
<!-- Grafana needs to change settings queries --> |
|||
<readonly>2</readonly> |
|||
</readonly> |
|||
</profiles> |
|||
<users> |
|||
<default> |
|||
<!-- The default user should only be allowed to connect from localhost --> |
|||
<networks> |
|||
<ip>::1</ip> |
|||
<ip>127.0.0.1</ip> |
|||
</networks> |
|||
<!-- Allow the default user to create new users --> |
|||
<access_management>1</access_management> |
|||
<named_collection_control>1</named_collection_control> |
|||
<show_named_collections>1</show_named_collections> |
|||
<show_named_collections_secrets>1</show_named_collections_secrets> |
|||
</default> |
|||
</users> |
|||
</clickhouse> |
|||
</pre> |
|||
Then paste the following into /etc/clickhouse-server/config/zzz-csclub.xml (we need the zzz prefix because the configuration files are merged in alphabetical order, and we want ours to be applied last): |
|||
<pre> |
|||
<clickhouse> |
|||
<listen_host>127.0.0.1</listen_host> |
|||
<listen_host>::1</listen_host> |
|||
<logger> |
|||
<level>information</level> |
|||
<size>100M</size> |
|||
<count>10</count> |
|||
</logger> |
|||
<mysql_port></mysql_port> |
|||
<postgresql_port></postgresql_port> |
|||
<!-- disable logs (using too much disk space) --> |
|||
<asynchronous_metric_log remove="1"/> |
|||
<metric_log remove="1"/> |
|||
<query_thread_log remove="1" /> |
|||
<query_log remove="1" /> |
|||
<query_views_log remove="1" /> |
|||
<part_log remove="1"/> |
|||
<session_log remove="1"/> |
|||
<text_log remove="1" /> |
|||
<trace_log remove="1"/> |
|||
</clickhouse> |
|||
</pre> |
|||
Then run <code>systemctl restart clickhouse-server</code> and make sure that it's running. |
|||
==== Schema ==== |
|||
Run <code>clickhouse-client</code> to get a SQL shell. First we need to create a new database and some users: |
|||
<pre> |
|||
CREATE DATABASE vector; |
|||
CREATE USER vector IDENTIFIED BY 'REPLACE_ME'; |
|||
GRANT ALL ON vector.* TO vector; |
|||
CREATE USER grafana IDENTIFIED BY 'REPLACE_ME' SETTINGS PROFILE 'readonly'; |
|||
GRANT SHOW DATABASES, SHOW TABLES, SELECT, DICTGET ON *.* TO grafana; |
|||
</pre> |
|||
In some of our tables, we'll store the two-letter country code instead of a country's full name to save space. So we'll create a [https://clickhouse.com/docs/en/sql-reference/dictionaries dictionary] so that we can look up a country's full name. Exit the SQL shell, then, download the CSV file: |
|||
<pre> |
|||
wget -O /var/lib/clickhouse/user_files/country_codes.csv 'https://datahub.io/core/country-list/r/data.csv' |
|||
</pre> |
|||
Then run <code>clickhouse-client</code> and create the dictionary: |
|||
<pre> |
|||
CREATE DICTIONARY vector.country_codes_dictionary |
|||
( |
|||
Name String, |
|||
Code String |
|||
) |
|||
PRIMARY KEY Code |
|||
SOURCE(FILE(path '/var/lib/clickhouse/user_files/country_codes.csv' FORMAT 'CSVWithNames')) |
|||
LIFETIME(MIN 0 MAX 0) |
|||
LAYOUT(HASHED_ARRAY()); |
|||
</pre> |
|||
Perform a SELECT to fill the table: |
|||
<pre> |
|||
SELECT * FROM country_codes_dictionary; |
|||
</pre> |
|||
Now we need to create the tables for storing our actual log data (after they are transformed by Vector). |
|||
Create a table for failed SSH logins: |
|||
<pre> |
|||
CREATE TABLE vector.failed_ssh_logins |
|||
( |
|||
host LowCardinality(String), |
|||
timestamp DateTime, |
|||
ip_address IPv6, |
|||
username String, |
|||
country_code LowCardinality(String) |
|||
) |
|||
ENGINE = MergeTree() |
|||
PRIMARY KEY (host, timestamp) |
|||
TTL timestamp + INTERVAL 1 MONTH DELETE; |
|||
</pre> |
|||
Create a table for storing mirror requests: |
|||
<pre> |
|||
CREATE TABLE vector.mirror_requests |
|||
( |
|||
distro LowCardinality(String), |
|||
timestamp DateTime CODEC(Delta, ZSTD), |
|||
ip_address IPv6, |
|||
bytes_sent UInt64, |
|||
user_agent String, |
|||
country_code LowCardinality(String), |
|||
region_name String, |
|||
city String |
|||
) |
|||
ENGINE = MergeTree() |
|||
PRIMARY KEY (distro, timestamp, country_code, region_name, city) |
|||
TTL timestamp + INTERVAL 1 WEEK DELETE; |
|||
</pre> |
|||
One of ClickHouse's great features is [https://clickhouse.com/docs/en/guides/developer/cascading-materialized-views Materialized Views]. These allow us to automatically "forward" data from one table to another, and the second table can use a different storage engine to aggregate data and save space. |
|||
We want to calculate the total number of requests and bytes sent for each distro, so let's create a table and view for that: |
|||
<pre> |
|||
CREATE TABLE vector.mirror_requests_agg_by_distro |
|||
( |
|||
distro LowCardinality(String), |
|||
date Date CODEC(Delta, ZSTD), |
|||
country_code LowCardinality(String), |
|||
num_requests UInt64, |
|||
bytes_sent UInt64 |
|||
) |
|||
ENGINE = SummingMergeTree((num_requests, bytes_sent)) |
|||
PRIMARY KEY (distro, toStartOfMonth(date), date, country_code) |
|||
TTL date + INTERVAL 1 MONTH |
|||
GROUP BY distro, toStartOfMonth(date) |
|||
SET num_requests = sum(num_requests), |
|||
bytes_sent = sum(bytes_sent), |
|||
date + INTERVAL 2 YEAR DELETE; |
|||
CREATE MATERIALIZED VIEW vector.mirror_requests_agg_by_distro_mv |
|||
TO vector.mirror_requests_agg_by_distro |
|||
AS |
|||
SELECT |
|||
distro, |
|||
toDate(timestamp) AS date, |
|||
country_code, |
|||
sum(1) AS num_requests, |
|||
sum(bytes_sent) AS bytes_sent |
|||
FROM vector.mirror_requests |
|||
GROUP BY distro, date, country_code; |
|||
</pre> |
|||
We also wants some stats for Canada specifically: |
|||
<pre> |
|||
CREATE TABLE vector.mirror_requests_agg_canada |
|||
( |
|||
distro LowCardinality(String), |
|||
date Date CODEC(Delta, ZSTD), |
|||
region_name LowCardinality(String), |
|||
city String, |
|||
bytes_sent UInt64, |
|||
num_requests UInt64 |
|||
) |
|||
ENGINE = SummingMergeTree((bytes_sent, num_requests)) |
|||
PRIMARY KEY (distro, toStartOfMonth(date), date, region_name, city) |
|||
TTL date + INTERVAL 1 MONTH |
|||
GROUP BY distro, toStartOfMonth(date) |
|||
SET num_requests = sum(num_requests), |
|||
bytes_sent = sum(bytes_sent), |
|||
date + INTERVAL 2 YEAR DELETE; |
|||
CREATE MATERIALIZED VIEW vector.mirror_requests_agg_canada_mv |
|||
TO vector.mirror_requests_agg_canada |
|||
AS |
|||
SELECT |
|||
distro, |
|||
toDate(timestamp) as date, |
|||
region_name, |
|||
city, |
|||
sum(bytes_sent) AS bytes_sent, |
|||
sum(1) AS num_requests |
|||
FROM vector.mirror_requests |
|||
WHERE country_code = 'CA' |
|||
GROUP BY distro, date, region_name, city; |
|||
</pre> |
|||
We also want to keep stats just for the university: |
|||
<pre> |
|||
CREATE TABLE vector.mirror_requests_agg_uw |
|||
( |
|||
distro LowCardinality(String), |
|||
date Date CODEC(Delta, ZSTD), |
|||
bytes_sent UInt64, |
|||
num_requests UInt64 |
|||
) |
|||
ENGINE = SummingMergeTree((bytes_sent, num_requests)) |
|||
PRIMARY KEY (distro, toStartOfMonth(date), date) |
|||
TTL date + INTERVAL 1 MONTH |
|||
GROUP BY distro, toStartOfMonth(date) |
|||
SET num_requests = sum(num_requests), |
|||
bytes_sent = sum(bytes_sent), |
|||
date + INTERVAL 2 YEAR DELETE; |
|||
CREATE MATERIALIZED VIEW vector.mirror_requests_agg_uw_mv |
|||
TO vector.mirror_requests_agg_uw |
|||
AS |
|||
SELECT |
|||
distro, |
|||
toDate(timestamp) as date, |
|||
sum(bytes_sent) AS bytes_sent, |
|||
sum(1) AS num_requests |
|||
FROM vector.mirror_requests |
|||
WHERE isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:129.97.0.0/112') |
|||
OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:10.0.0.0/104') |
|||
OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:172.16.0.0/108') |
|||
OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:192.168.0.0/112') |
|||
OR isIPAddressInRange(IPv6NumToString(ip_address), '2620:101:f000::/47') |
|||
OR isIPAddressInRange(IPv6NumToString(ip_address), 'fd74:6b6a:8eca::/47') |
|||
GROUP BY distro, date, region_name, city; |
|||
</pre> |
|||
Finally, we'll store some stats for IP subnets: |
|||
<pre> |
|||
CREATE TABLE vector.mirror_requests_agg_ip |
|||
( |
|||
timestamp DateTime CODEC(Delta, ZSTD), |
|||
cidr_start IPv6, |
|||
country_code LowCardinality(String), |
|||
num_requests UInt64, |
|||
bytes_sent UInt64 |
|||
) |
|||
ENGINE = SummingMergeTree((num_requests, bytes_sent)) |
|||
PRIMARY KEY (timestamp, cidr_start, country_code) |
|||
TTL timestamp + toIntervalWeek(2); |
|||
CREATE MATERIALIZED VIEW vector.mirror_requests_agg_ip_mv TO vector.mirror_requests_agg_ip AS |
|||
SELECT |
|||
toStartOfFiveMinutes(timestamp) AS timestamp, |
|||
IPv6CIDRToRange(ip_address, 120).1 AS cidr_start, |
|||
country_code, |
|||
sum(1) AS num_requests, |
|||
sum(bytes_sent) AS bytes_sent |
|||
FROM vector.mirror_requests |
|||
GROUP BY |
|||
timestamp, |
|||
cidr_start, |
|||
country_code; |
|||
</pre> |
|||
=== GeoIP database === |
|||
We'll want to look up geographic information for the IP addresses in our data. To do this, we'll use the [https://dev.maxmind.com/geoip/geolite2-free-geolocation-data MaxMind GeoLite2 databases]. Syscom already has a MaxMind account; the password is stored in the usual place. Install the latest geoipupdate package from [https://github.com/maxmind/geoipupdate/releases here], then edit /etc/GeoIP.conf as necessary (use the syscom account ID and license key). Set <code>EditionIDs</code> to <code>GeoLite2-City</code> only. |
|||
We'll use a systemd timer to run the geoipupdate script periodically. Paste the following into /etc/systemd/system/geoipupdate.service: |
|||
<pre> |
|||
[Unit] |
|||
Description=GeoIP Update |
|||
Documentation=https://dev.maxmind.com/geoip/updating-databases |
|||
After=network-online.target |
|||
[Service] |
|||
Type=oneshot |
|||
ExecStart=/usr/bin/geoipupdate |
|||
Nice=19 |
|||
IOSchedulingClass=idle |
|||
IOSchedulingPriority=7 |
|||
ProtectSystem=strict |
|||
ReadWritePaths=/usr/share/GeoIP |
|||
ProtectHome=true |
|||
PrivateTmp=true |
|||
PrivateDevices=true |
|||
ProtectHostname=true |
|||
ProtectClock=true |
|||
ProtectKernelTunables=true |
|||
ProtectKernelModules=true |
|||
ProtectKernelLogs=true |
|||
ProtectControlGroups=true |
|||
LockPersonality=true |
|||
RestrictRealtime=true |
|||
</pre> |
|||
Run <code>systemctl daemon-reload</code> and then <code>systemctl start geoipupdate</code> to download the database for the first time. |
|||
Now paste the following into /etc/systemd/system/geoipupdate.timer: |
|||
<pre> |
|||
[Unit] |
|||
Description=Automatic GeoIP database update |
|||
Documentation=https://dev.maxmind.com/geoip/updating-databases |
|||
[Timer] |
|||
OnCalendar=monthly |
|||
RandomizedDelaySec=12h |
|||
Persistent=true |
|||
[Install] |
|||
WantedBy=timers.target |
|||
</pre> |
|||
Then run: |
|||
<pre> |
|||
systemctl daemon-reload |
|||
systemctl enable geoipupdate.timer |
|||
systemctl start geoipupdate.timer |
|||
</pre> |
|||
=== Vector === |
|||
Vector allows you to create directed acyclic graphs (DAGs) for collecting and processing logs, which gives us a lot of flexibility. It also has a built-in scripting language, [https://vector.dev/docs/reference/vrl/ Vector Remap Language (VRL)] for slicing and dicing data. This allows us to remove fields which we don't need, add new fields which we do need, enrich an event with extra data, etc. |
|||
Our data pipeline looks like this: Vector agents -> Vector aggregator -> ClickHouse. We use Grafana for visualization. |
|||
We use mutual TLS between the agents and the aggregator to make sure that random people can't send us garbage data: |
|||
<pre> |
|||
openssl req -newkey rsa:2048 -nodes -keyout aggregator.key -x509 -out aggregator.crt -days 36500 |
|||
openssl req -newkey rsa:2048 -nodes -keyout agent.key -x509 -out agent.crt -days 36500 |
|||
chown vector:vector *.crt *.key |
|||
</pre> |
|||
Here is what our vector.toml looks like on the general-use machines; currently, we only use it for collecting failed SSH login attempts. |
|||
<pre> |
|||
[sources.source_sshd] |
|||
type = "journald" |
|||
include_units = ["ssh.service"] |
|||
[transforms.remap_sshd] |
|||
type = "remap" |
|||
inputs = ["source_sshd"] |
|||
source = """ |
|||
parsed, err = parse_regex( |
|||
.message, r'^(?:Connection (?:closed|reset)|Disconnected) (?:by|from) (?:invalid|authenticating) user (?P<user>[^ ]+) (?P<ip>[0-9.a-f:]+)' |
|||
) |
|||
if is_null(err) { |
|||
. = { |
|||
"username": parsed.user, |
|||
"ip_address": parsed.ip, |
|||
"host": .host, |
|||
"timestamp": .timestamp, |
|||
"job": "vector-sshd" |
|||
} |
|||
} |
|||
""" |
|||
[transforms.filter_sshd] |
|||
type = "filter" |
|||
inputs = ["remap_sshd"] |
|||
condition = '.job == "vector-sshd"' |
|||
[sinks.aggregator] |
|||
type = "vector" |
|||
inputs = ["filter_sshd"] |
|||
address = "prometheus:5045" |
|||
[sinks.aggregator.tls] |
|||
enabled = true |
|||
ca_file = "/etc/vector/aggregator.crt" |
|||
crt_file = "/etc/vector/agent.crt" |
|||
key_file = "/etc/vector/agent.key" |
|||
verify_hostname = false |
|||
</pre> |
|||
The agent on potassium-benzoate collects NGINX logs: |
|||
<pre> |
|||
[sources.source_nginx] |
|||
type = "file" |
|||
include = ["/var/log/nginx/access.log"] |
|||
max_read_bytes = 65536 |
|||
[transforms.remap_nginx] |
|||
type = "remap" |
|||
inputs = ["source_nginx"] |
|||
source = """ |
|||
parsed_log, err = parse_nginx_log(.message, "combined") |
|||
status = parsed_log.status |
|||
request = string!(parsed_log.request || "") |
|||
if is_null(err) && status == 200 { |
|||
parsed_path, err = parse_regex(request, r'^GET /+(?P<distro>[^/? ]+)') |
|||
distro = parsed_path.distro |
|||
ignore = [ |
|||
"server-status", "stats", "robots.txt", |
|||
"include", "pub", "news", "index.html", "sync.json", "ups", |
|||
"pool", "dists", "csclub.asc", "csclub.gpg" |
|||
] |
|||
if ( |
|||
is_null(err) && !includes(ignore, distro) && !contains(request, "..") && |
|||
!starts_with(request, "#") && !starts_with(request, "%") && !starts_with(request, ".") |
|||
) { |
|||
. = { |
|||
"distro": distro, |
|||
"user_agent": parsed_log.agent, |
|||
"ip_address": parsed_log.client, |
|||
"bytes_sent": parsed_log.size, |
|||
"timestamp": parsed_log.timestamp, |
|||
"job": "vector-mirror" |
|||
} |
|||
} |
|||
} |
|||
""" |
|||
... |
|||
</pre> |
|||
Finally, here's the aggregator config, which collects data from each agent and then inserts it into ClickHouse: |
|||
<pre> |
|||
[enrichment_tables.enrich_geoip] |
|||
type = "geoip" |
|||
path = "/usr/share/GeoIP/GeoLite2-City.mmdb" |
|||
[sources.source_agents] |
|||
type = "vector" |
|||
address = "[::]:5045" |
|||
[sources.source_agents.tls] |
|||
enabled = true |
|||
ca_file = "/etc/vector/agent.crt" |
|||
crt_file = "/etc/vector/aggregator.crt" |
|||
key_file = "/etc/vector/aggregator.key" |
|||
verify_hostname = false |
|||
[transforms.transform_route] |
|||
type = "route" |
|||
inputs = ["source_agents"] |
|||
route.sshd = '.job == "vector-sshd"' |
|||
route.mirror = '.job == "vector-mirror"' |
|||
[transforms.transform_sshd] |
|||
type = "remap" |
|||
inputs = ["transform_route.sshd"] |
|||
source = """ |
|||
ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {} |
|||
if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \ |
|||
|| ip_cidr_contains!("172.16.0.0/12", .ip_address) \ |
|||
|| ip_cidr_contains!("192.168.0.0/16", .ip_address)) { |
|||
ipinfo.country_code = "CA"; |
|||
ipinfo.region_name = "Ontario"; |
|||
ipinfo.city_name = "Waterloo"; |
|||
} |
|||
. = { |
|||
"host": .host, |
|||
"timestamp": .timestamp, |
|||
"username": .username, |
|||
"ip_address": ip_to_ipv6!(.ip_address), |
|||
"country_code": ipinfo.country_code || "" |
|||
} |
|||
""" |
|||
[transforms.transform_mirror] |
|||
type = "remap" |
|||
inputs = ["transform_route.mirror"] |
|||
source = """ |
|||
ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {} |
|||
if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \ |
|||
|| ip_cidr_contains!("172.16.0.0/12", .ip_address) \ |
|||
|| ip_cidr_contains!("192.168.0.0/16", .ip_address)) { |
|||
ipinfo.country_code = "CA"; |
|||
ipinfo.region_name = "Ontario"; |
|||
ipinfo.city_name = "Waterloo"; |
|||
} |
|||
. = { |
|||
"distro": .distro, |
|||
"timestamp": .timestamp, |
|||
"ip_address": ip_to_ipv6!(.ip_address), |
|||
"bytes_sent": .bytes_sent, |
|||
"user_agent": .user_agent, |
|||
"country_code": ipinfo.country_code || "", |
|||
"region_name": ipinfo.region_name || "", |
|||
"city": ipinfo.city_name || "" |
|||
} |
|||
""" |
|||
[transforms.transform_unmatched] |
|||
type = "remap" |
|||
inputs = ["transform_route._unmatched"] |
|||
source = """ |
|||
log("unrecognized job: " + string!(.job || "null"), level: "warn") |
|||
""" |
|||
[sinks.sink_unmatched] |
|||
type = "blackhole" |
|||
inputs = ["transform_unmatched"] |
|||
print_interval_secs = 0 |
|||
[sinks.clickhouse_sshd] |
|||
type = "clickhouse" |
|||
inputs = ["transform_sshd"] |
|||
encoding.timestamp_format = "unix" |
|||
endpoint = "$CLICKHOUSE_ENDPOINT" |
|||
database = "$CLICKHOUSE_DATABASE" |
|||
table = "failed_ssh_logins" |
|||
[sinks.clickhouse_sshd.auth] |
|||
strategy = "basic" |
|||
user = "$CLICKHOUSE_USER" |
|||
password = "$CLICKHOUSE_PASSWORD" |
|||
... |
|||
</pre> |
|||
=== Beats, Logstash and Loki (old) === |
|||
We previously used Elastic Beats, Logstash and Grafana Loki for collecting and storing logs. One day I tried to upgrade Logstash and it exploded so badly that I figured it would be easier to just switch to Vector instead. |
|||
<b>The sections below are kept for historical purposes only and are no longer accurate.</b> |
|||
We use a combination of [https://www.elastic.co/beats/ Elastic Beats], [https://www.elastic.co/logstash/ Logstash] and [https://grafana.com/oss/loki/ Loki] for collecting, storing and querying our logs; for visualization, we use Grafana. Logstash and Loki are currently both running in the prometheus VM. |
We use a combination of [https://www.elastic.co/beats/ Elastic Beats], [https://www.elastic.co/logstash/ Logstash] and [https://grafana.com/oss/loki/ Loki] for collecting, storing and querying our logs; for visualization, we use Grafana. Logstash and Loki are currently both running in the prometheus VM. |
||
The reason why I chose Loki over Elasticsearch is because Loki is <i>very</i> space efficient with regards to storage. It also consumes way less RAM and CPU. This means that we can collect a |
The reason why I chose Loki over Elasticsearch is because Loki is <i>very</i> space efficient with regards to storage. It also consumes way less RAM and CPU. This means that we can collect a lot of logs without worrying too much about resource usage. |
||
We have Journalbeat and/or Filebeat running on some of our machines to collect logs from sshd, Apache and NGINX. The Beats send these logs to Logstash, which does some pre-processing. The most useful contribution by Logstash is its GeoIP plugin, which allows us to enrich the logs with some geographical information from IP addresses (e.g. add city and country). Logstash sends these logs to Loki, and we can then view these from Grafana. |
We have Journalbeat and/or Filebeat running on some of our machines to collect logs from sshd, Apache and NGINX. The Beats send these logs to Logstash, which does some pre-processing. The most useful contribution by Logstash is its GeoIP plugin, which allows us to enrich the logs with some geographical information from IP addresses (e.g. add city and country). Logstash sends these logs to Loki, and we can then view these from Grafana. |
||
'''Note''': Sometimes the Loki output plugin for Logstash disappears after a reboot or an upgrade. If you see Logstash complaining about this in the journald logs, run this: |
|||
<pre> |
|||
cd /usr/share/logstash |
|||
bin/logstash-plugin install logstash-output-loki |
|||
systemctl restart logstash |
|||
</pre> |
|||
See [https://grafana.com/docs/loki/latest/clients/logstash/ here] for details. |
|||
The language for querying logs in Loki is [https://grafana.com/docs/loki/latest/logql/ LogQL], which, syntactically, is very similar to PromQL. If you have already learned PromQL, then you should be able to pick up LogQL very easily. You can try out some LogQL queries from the 'Explore' page on Grafana; make sure you toggle the data source to 'Loki' in the top left corner. For the 'topk' queries, you will also want to toggle 'Query type' to 'Instant' rather than 'Range'. |
The language for querying logs in Loki is [https://grafana.com/docs/loki/latest/logql/ LogQL], which, syntactically, is very similar to PromQL. If you have already learned PromQL, then you should be able to pick up LogQL very easily. You can try out some LogQL queries from the 'Explore' page on Grafana; make sure you toggle the data source to 'Loki' in the top left corner. For the 'topk' queries, you will also want to toggle 'Query type' to 'Instant' rather than 'Range'. |
||
=== LogQL examples === |
==== LogQL examples ==== |
||
Here are the number of failed SSH login attempts for each host for a given time range: |
Here are the number of failed SSH login attempts for each host for a given time range: |
||
<pre> |
<pre> |
||
Line 94: | Line 626: | ||
You can see more examples on the Mirror Requests dashboard on Grafana. |
You can see more examples on the Mirror Requests dashboard on Grafana. |
||
=== |
==== Avoid high cardinality ==== |
||
Here are some queries which the Web Committee may find interesting. Try these out from the 'Explore' page in Grafana after setting the data source to 'Loki' (top left corner). You may optionally create a new dashboard if you think you've written some good queries. |
|||
<br><br> |
|||
Here's a query to just view the raw logs, parsed as JSON (click on each log to view its labels): |
|||
<pre> |
|||
{job="logstash-apache"} | json |
|||
</pre> |
|||
<br><br> |
|||
For the 'topk' queries below, make sure you toggle 'Query type' to 'Instant' from 'Range'. |
|||
<br> |
|||
Here's the number of requests by User-Agent for the top 15 requesters: |
|||
<pre> |
|||
topk(15, |
|||
sum by (agent) ( |
|||
count_over_time( |
|||
{job="logstash-apache"} | json |
|||
[$__range] |
|||
) |
|||
) |
|||
) |
|||
</pre> |
|||
<br><br> |
|||
Let's say you want to exclude bots from those results: |
|||
<pre> |
|||
topk(15, |
|||
sum by (agent) ( |
|||
count_over_time( |
|||
{job="logstash-apache"} | json | agent !~ "(?i).*bot.*" |
|||
[$__range] |
|||
) |
|||
) |
|||
) |
|||
</pre> |
|||
You can change 'agent' by 'request', 'ip_address', etc. |
|||
<br> |
|||
See the [https://grafana.com/docs/loki/latest/logql/ LogQL documentation] for more details. |
|||
== Avoid high cardinality == |
|||
For both Prometheus and Loki, you must [https://prometheus.io/docs/practices/naming/#labels avoid high cardinality] labels at all costs. By high cardinality, I mean labels which can take on a very large number of values; for example, using a label to store IP addresses would be a very bad idea. This is because Prometheus and Loki use labels to store metrics/logs efficiently with compression; when two metrics have two different sets of labels, they cannot be stored together, which increases the storage space usage. |
For both Prometheus and Loki, you must [https://prometheus.io/docs/practices/naming/#labels avoid high cardinality] labels at all costs. By high cardinality, I mean labels which can take on a very large number of values; for example, using a label to store IP addresses would be a very bad idea. This is because Prometheus and Loki use labels to store metrics/logs efficiently with compression; when two metrics have two different sets of labels, they cannot be stored together, which increases the storage space usage. |
||
Latest revision as of 21:14, 26 May 2024
There are three pillars of observability: metrics, logging and tracing. We are only interested in the first two.
Metrics
All of our machines are, or at least should be, running the Prometheus node exporter. This collects and sends machine metrics (e.g. RAM used, disk space) to the Prometheus server running at https://prometheus.csclub.uwaterloo.ca (currently a VM on phosphoric-acid). There are a few specialized exporters running on several other machines; a Postfix exporter is running on mail, an Apache exporter is running on caffeine, and an NGINX expoter is running on potassium-benzoate. There is also a custom exporter written by syscom running on potassium-benzoate for mirror stats.
Most of the exporters use mutual TLS authentication with the Prometheus server. I set the expiration date for the TLS certs to 10 years. If you are reading this and it is 2031 or later, then go update the certs.
I highly suggest becoming familiar with PromQL, the query language for Prometheus. You can run and visualize some queries at https://prometheus.csclub.uwaterloo.ca/prometheus. For example, here is a query to determine which machines are up or down:
up{job="node_exporter"}
Here's how we determine if a machine has NFS mounted. This will return 1 for machines which have NFS mounted, but will not return any records for machines which do not have NFS mounted. (We ignore the actual value of node_filesystem_device_error because it returns 1 for machines using Kerberized NFS.)
count by (instance) (node_filesystem_device_error{mountpoint="/users", fstype="nfs"})
Now this is a rather complicated expression which can return one of three values:
- 0: the machine is down
- 1: the machine is up, but NFS is not mounted
- 2: the machine is up and NFS is mounted
The or operator in PromQL is key here.
sum by (instance) ( (count by (instance) (node_filesystem_device_error{mountpoint="/users", fstype="nfs"})) or up{job="node_exporter"} )
Alerts
We also use AlertManager to send email alerts from Prometheus metrics. We should figure out how to also send messages to IRC or similar.
We also use the Blackbox prober exporter to check if some of our web-based services are up.
We make some pretty charts on Grafana (https://prometheus.csclub.uwaterloo.ca) from PromQL queries. Grafana also has an 'Explorer' page where you can test out some queries before making chart panels from them.
You can also create fine-grained alerts on Grafana (https://prometheus.csclub.uwaterloo.ca/alerting/list).
Logging
We now use Vector for collecting and transforming logs, and ClickHouse for storing log data.
ClickHouse
ClickHouse is a very fast OLAP database which has great documentation for storing and analyzing logging and metrics. Unfortunately, the CPU on phosphoric-acid (which hosts the prometheus VM) is so old that when we try to install the official deb package, the following error occurs:
Instruction check fail. The CPU does not support SSSE3 instruction set.
So we're going to download the "compat" version instead:
cd /root wget https://s3.amazonaws.com/clickhouse-builds/master/amd64compat/clickhouse chmod +x clickhouse ./clickhouse install rm clickhouse wget -O /etc/systemd/system/clickhouse-server.service https://github.com/ClickHouse/ClickHouse/raw/master/packages/clickhouse-server.service systemctl daemon-reload systemctl enable clickhouse-server
By default, systemd limits the number of threads which a service can create, so we'll want to disable that. Run systemctl edit clickhouse-server
and paste the following:
[Service] TasksMax=infinity
Next, paste the following into /etc/clickhouse-server/users.d/csclub-users.xml:
<clickhouse> <profiles> <default> <!-- disable logs (using too much disk space) --> <log_queries replace="replace">0</log_queries> <log_query_threads replace="replace">0</log_query_threads> </default> <readonly> <!-- Grafana needs to change settings queries --> <readonly>2</readonly> </readonly> </profiles> <users> <default> <!-- The default user should only be allowed to connect from localhost --> <networks> <ip>::1</ip> <ip>127.0.0.1</ip> </networks> <!-- Allow the default user to create new users --> <access_management>1</access_management> <named_collection_control>1</named_collection_control> <show_named_collections>1</show_named_collections> <show_named_collections_secrets>1</show_named_collections_secrets> </default> </users> </clickhouse>
Then paste the following into /etc/clickhouse-server/config/zzz-csclub.xml (we need the zzz prefix because the configuration files are merged in alphabetical order, and we want ours to be applied last):
<clickhouse> <listen_host>127.0.0.1</listen_host> <listen_host>::1</listen_host> <logger> <level>information</level> <size>100M</size> <count>10</count> </logger> <mysql_port></mysql_port> <postgresql_port></postgresql_port> <!-- disable logs (using too much disk space) --> <asynchronous_metric_log remove="1"/> <metric_log remove="1"/> <query_thread_log remove="1" /> <query_log remove="1" /> <query_views_log remove="1" /> <part_log remove="1"/> <session_log remove="1"/> <text_log remove="1" /> <trace_log remove="1"/> </clickhouse>
Then run systemctl restart clickhouse-server
and make sure that it's running.
Schema
Run clickhouse-client
to get a SQL shell. First we need to create a new database and some users:
CREATE DATABASE vector; CREATE USER vector IDENTIFIED BY 'REPLACE_ME'; GRANT ALL ON vector.* TO vector; CREATE USER grafana IDENTIFIED BY 'REPLACE_ME' SETTINGS PROFILE 'readonly'; GRANT SHOW DATABASES, SHOW TABLES, SELECT, DICTGET ON *.* TO grafana;
In some of our tables, we'll store the two-letter country code instead of a country's full name to save space. So we'll create a dictionary so that we can look up a country's full name. Exit the SQL shell, then, download the CSV file:
wget -O /var/lib/clickhouse/user_files/country_codes.csv 'https://datahub.io/core/country-list/r/data.csv'
Then run clickhouse-client
and create the dictionary:
CREATE DICTIONARY vector.country_codes_dictionary ( Name String, Code String ) PRIMARY KEY Code SOURCE(FILE(path '/var/lib/clickhouse/user_files/country_codes.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 0) LAYOUT(HASHED_ARRAY());
Perform a SELECT to fill the table:
SELECT * FROM country_codes_dictionary;
Now we need to create the tables for storing our actual log data (after they are transformed by Vector). Create a table for failed SSH logins:
CREATE TABLE vector.failed_ssh_logins ( host LowCardinality(String), timestamp DateTime, ip_address IPv6, username String, country_code LowCardinality(String) ) ENGINE = MergeTree() PRIMARY KEY (host, timestamp) TTL timestamp + INTERVAL 1 MONTH DELETE;
Create a table for storing mirror requests:
CREATE TABLE vector.mirror_requests ( distro LowCardinality(String), timestamp DateTime CODEC(Delta, ZSTD), ip_address IPv6, bytes_sent UInt64, user_agent String, country_code LowCardinality(String), region_name String, city String ) ENGINE = MergeTree() PRIMARY KEY (distro, timestamp, country_code, region_name, city) TTL timestamp + INTERVAL 1 WEEK DELETE;
One of ClickHouse's great features is Materialized Views. These allow us to automatically "forward" data from one table to another, and the second table can use a different storage engine to aggregate data and save space.
We want to calculate the total number of requests and bytes sent for each distro, so let's create a table and view for that:
CREATE TABLE vector.mirror_requests_agg_by_distro ( distro LowCardinality(String), date Date CODEC(Delta, ZSTD), country_code LowCardinality(String), num_requests UInt64, bytes_sent UInt64 ) ENGINE = SummingMergeTree((num_requests, bytes_sent)) PRIMARY KEY (distro, toStartOfMonth(date), date, country_code) TTL date + INTERVAL 1 MONTH GROUP BY distro, toStartOfMonth(date) SET num_requests = sum(num_requests), bytes_sent = sum(bytes_sent), date + INTERVAL 2 YEAR DELETE; CREATE MATERIALIZED VIEW vector.mirror_requests_agg_by_distro_mv TO vector.mirror_requests_agg_by_distro AS SELECT distro, toDate(timestamp) AS date, country_code, sum(1) AS num_requests, sum(bytes_sent) AS bytes_sent FROM vector.mirror_requests GROUP BY distro, date, country_code;
We also wants some stats for Canada specifically:
CREATE TABLE vector.mirror_requests_agg_canada ( distro LowCardinality(String), date Date CODEC(Delta, ZSTD), region_name LowCardinality(String), city String, bytes_sent UInt64, num_requests UInt64 ) ENGINE = SummingMergeTree((bytes_sent, num_requests)) PRIMARY KEY (distro, toStartOfMonth(date), date, region_name, city) TTL date + INTERVAL 1 MONTH GROUP BY distro, toStartOfMonth(date) SET num_requests = sum(num_requests), bytes_sent = sum(bytes_sent), date + INTERVAL 2 YEAR DELETE; CREATE MATERIALIZED VIEW vector.mirror_requests_agg_canada_mv TO vector.mirror_requests_agg_canada AS SELECT distro, toDate(timestamp) as date, region_name, city, sum(bytes_sent) AS bytes_sent, sum(1) AS num_requests FROM vector.mirror_requests WHERE country_code = 'CA' GROUP BY distro, date, region_name, city;
We also want to keep stats just for the university:
CREATE TABLE vector.mirror_requests_agg_uw ( distro LowCardinality(String), date Date CODEC(Delta, ZSTD), bytes_sent UInt64, num_requests UInt64 ) ENGINE = SummingMergeTree((bytes_sent, num_requests)) PRIMARY KEY (distro, toStartOfMonth(date), date) TTL date + INTERVAL 1 MONTH GROUP BY distro, toStartOfMonth(date) SET num_requests = sum(num_requests), bytes_sent = sum(bytes_sent), date + INTERVAL 2 YEAR DELETE; CREATE MATERIALIZED VIEW vector.mirror_requests_agg_uw_mv TO vector.mirror_requests_agg_uw AS SELECT distro, toDate(timestamp) as date, sum(bytes_sent) AS bytes_sent, sum(1) AS num_requests FROM vector.mirror_requests WHERE isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:129.97.0.0/112') OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:10.0.0.0/104') OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:172.16.0.0/108') OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:192.168.0.0/112') OR isIPAddressInRange(IPv6NumToString(ip_address), '2620:101:f000::/47') OR isIPAddressInRange(IPv6NumToString(ip_address), 'fd74:6b6a:8eca::/47') GROUP BY distro, date, region_name, city;
Finally, we'll store some stats for IP subnets:
CREATE TABLE vector.mirror_requests_agg_ip ( timestamp DateTime CODEC(Delta, ZSTD), cidr_start IPv6, country_code LowCardinality(String), num_requests UInt64, bytes_sent UInt64 ) ENGINE = SummingMergeTree((num_requests, bytes_sent)) PRIMARY KEY (timestamp, cidr_start, country_code) TTL timestamp + toIntervalWeek(2); CREATE MATERIALIZED VIEW vector.mirror_requests_agg_ip_mv TO vector.mirror_requests_agg_ip AS SELECT toStartOfFiveMinutes(timestamp) AS timestamp, IPv6CIDRToRange(ip_address, 120).1 AS cidr_start, country_code, sum(1) AS num_requests, sum(bytes_sent) AS bytes_sent FROM vector.mirror_requests GROUP BY timestamp, cidr_start, country_code;
GeoIP database
We'll want to look up geographic information for the IP addresses in our data. To do this, we'll use the MaxMind GeoLite2 databases. Syscom already has a MaxMind account; the password is stored in the usual place. Install the latest geoipupdate package from here, then edit /etc/GeoIP.conf as necessary (use the syscom account ID and license key). Set EditionIDs
to GeoLite2-City
only.
We'll use a systemd timer to run the geoipupdate script periodically. Paste the following into /etc/systemd/system/geoipupdate.service:
[Unit] Description=GeoIP Update Documentation=https://dev.maxmind.com/geoip/updating-databases After=network-online.target [Service] Type=oneshot ExecStart=/usr/bin/geoipupdate Nice=19 IOSchedulingClass=idle IOSchedulingPriority=7 ProtectSystem=strict ReadWritePaths=/usr/share/GeoIP ProtectHome=true PrivateTmp=true PrivateDevices=true ProtectHostname=true ProtectClock=true ProtectKernelTunables=true ProtectKernelModules=true ProtectKernelLogs=true ProtectControlGroups=true LockPersonality=true RestrictRealtime=true
Run systemctl daemon-reload
and then systemctl start geoipupdate
to download the database for the first time.
Now paste the following into /etc/systemd/system/geoipupdate.timer:
[Unit] Description=Automatic GeoIP database update Documentation=https://dev.maxmind.com/geoip/updating-databases [Timer] OnCalendar=monthly RandomizedDelaySec=12h Persistent=true [Install] WantedBy=timers.target
Then run:
systemctl daemon-reload systemctl enable geoipupdate.timer systemctl start geoipupdate.timer
Vector
Vector allows you to create directed acyclic graphs (DAGs) for collecting and processing logs, which gives us a lot of flexibility. It also has a built-in scripting language, Vector Remap Language (VRL) for slicing and dicing data. This allows us to remove fields which we don't need, add new fields which we do need, enrich an event with extra data, etc.
Our data pipeline looks like this: Vector agents -> Vector aggregator -> ClickHouse. We use Grafana for visualization.
We use mutual TLS between the agents and the aggregator to make sure that random people can't send us garbage data:
openssl req -newkey rsa:2048 -nodes -keyout aggregator.key -x509 -out aggregator.crt -days 36500 openssl req -newkey rsa:2048 -nodes -keyout agent.key -x509 -out agent.crt -days 36500 chown vector:vector *.crt *.key
Here is what our vector.toml looks like on the general-use machines; currently, we only use it for collecting failed SSH login attempts.
[sources.source_sshd] type = "journald" include_units = ["ssh.service"] [transforms.remap_sshd] type = "remap" inputs = ["source_sshd"] source = """ parsed, err = parse_regex( .message, r'^(?:Connection (?:closed|reset)|Disconnected) (?:by|from) (?:invalid|authenticating) user (?P<user>[^ ]+) (?P<ip>[0-9.a-f:]+)' ) if is_null(err) { . = { "username": parsed.user, "ip_address": parsed.ip, "host": .host, "timestamp": .timestamp, "job": "vector-sshd" } } """ [transforms.filter_sshd] type = "filter" inputs = ["remap_sshd"] condition = '.job == "vector-sshd"' [sinks.aggregator] type = "vector" inputs = ["filter_sshd"] address = "prometheus:5045" [sinks.aggregator.tls] enabled = true ca_file = "/etc/vector/aggregator.crt" crt_file = "/etc/vector/agent.crt" key_file = "/etc/vector/agent.key" verify_hostname = false
The agent on potassium-benzoate collects NGINX logs:
[sources.source_nginx] type = "file" include = ["/var/log/nginx/access.log"] max_read_bytes = 65536 [transforms.remap_nginx] type = "remap" inputs = ["source_nginx"] source = """ parsed_log, err = parse_nginx_log(.message, "combined") status = parsed_log.status request = string!(parsed_log.request || "") if is_null(err) && status == 200 { parsed_path, err = parse_regex(request, r'^GET /+(?P<distro>[^/? ]+)') distro = parsed_path.distro ignore = [ "server-status", "stats", "robots.txt", "include", "pub", "news", "index.html", "sync.json", "ups", "pool", "dists", "csclub.asc", "csclub.gpg" ] if ( is_null(err) && !includes(ignore, distro) && !contains(request, "..") && !starts_with(request, "#") && !starts_with(request, "%") && !starts_with(request, ".") ) { . = { "distro": distro, "user_agent": parsed_log.agent, "ip_address": parsed_log.client, "bytes_sent": parsed_log.size, "timestamp": parsed_log.timestamp, "job": "vector-mirror" } } } """ ...
Finally, here's the aggregator config, which collects data from each agent and then inserts it into ClickHouse:
[enrichment_tables.enrich_geoip] type = "geoip" path = "/usr/share/GeoIP/GeoLite2-City.mmdb" [sources.source_agents] type = "vector" address = "[::]:5045" [sources.source_agents.tls] enabled = true ca_file = "/etc/vector/agent.crt" crt_file = "/etc/vector/aggregator.crt" key_file = "/etc/vector/aggregator.key" verify_hostname = false [transforms.transform_route] type = "route" inputs = ["source_agents"] route.sshd = '.job == "vector-sshd"' route.mirror = '.job == "vector-mirror"' [transforms.transform_sshd] type = "remap" inputs = ["transform_route.sshd"] source = """ ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {} if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \ || ip_cidr_contains!("172.16.0.0/12", .ip_address) \ || ip_cidr_contains!("192.168.0.0/16", .ip_address)) { ipinfo.country_code = "CA"; ipinfo.region_name = "Ontario"; ipinfo.city_name = "Waterloo"; } . = { "host": .host, "timestamp": .timestamp, "username": .username, "ip_address": ip_to_ipv6!(.ip_address), "country_code": ipinfo.country_code || "" } """ [transforms.transform_mirror] type = "remap" inputs = ["transform_route.mirror"] source = """ ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {} if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \ || ip_cidr_contains!("172.16.0.0/12", .ip_address) \ || ip_cidr_contains!("192.168.0.0/16", .ip_address)) { ipinfo.country_code = "CA"; ipinfo.region_name = "Ontario"; ipinfo.city_name = "Waterloo"; } . = { "distro": .distro, "timestamp": .timestamp, "ip_address": ip_to_ipv6!(.ip_address), "bytes_sent": .bytes_sent, "user_agent": .user_agent, "country_code": ipinfo.country_code || "", "region_name": ipinfo.region_name || "", "city": ipinfo.city_name || "" } """ [transforms.transform_unmatched] type = "remap" inputs = ["transform_route._unmatched"] source = """ log("unrecognized job: " + string!(.job || "null"), level: "warn") """ [sinks.sink_unmatched] type = "blackhole" inputs = ["transform_unmatched"] print_interval_secs = 0 [sinks.clickhouse_sshd] type = "clickhouse" inputs = ["transform_sshd"] encoding.timestamp_format = "unix" endpoint = "$CLICKHOUSE_ENDPOINT" database = "$CLICKHOUSE_DATABASE" table = "failed_ssh_logins" [sinks.clickhouse_sshd.auth] strategy = "basic" user = "$CLICKHOUSE_USER" password = "$CLICKHOUSE_PASSWORD" ...
Beats, Logstash and Loki (old)
We previously used Elastic Beats, Logstash and Grafana Loki for collecting and storing logs. One day I tried to upgrade Logstash and it exploded so badly that I figured it would be easier to just switch to Vector instead.
The sections below are kept for historical purposes only and are no longer accurate.
We use a combination of Elastic Beats, Logstash and Loki for collecting, storing and querying our logs; for visualization, we use Grafana. Logstash and Loki are currently both running in the prometheus VM.
The reason why I chose Loki over Elasticsearch is because Loki is very space efficient with regards to storage. It also consumes way less RAM and CPU. This means that we can collect a lot of logs without worrying too much about resource usage.
We have Journalbeat and/or Filebeat running on some of our machines to collect logs from sshd, Apache and NGINX. The Beats send these logs to Logstash, which does some pre-processing. The most useful contribution by Logstash is its GeoIP plugin, which allows us to enrich the logs with some geographical information from IP addresses (e.g. add city and country). Logstash sends these logs to Loki, and we can then view these from Grafana.
Note: Sometimes the Loki output plugin for Logstash disappears after a reboot or an upgrade. If you see Logstash complaining about this in the journald logs, run this:
cd /usr/share/logstash bin/logstash-plugin install logstash-output-loki systemctl restart logstash
See here for details.
The language for querying logs in Loki is LogQL, which, syntactically, is very similar to PromQL. If you have already learned PromQL, then you should be able to pick up LogQL very easily. You can try out some LogQL queries from the 'Explore' page on Grafana; make sure you toggle the data source to 'Loki' in the top left corner. For the 'topk' queries, you will also want to toggle 'Query type' to 'Instant' rather than 'Range'.
LogQL examples
Here are the number of failed SSH login attempts for each host for a given time range:
sum by (hostname) ( count_over_time( {job="logstash-sshd"} [$__range] ) )
Note that $__range
is a special global variable in Grafana which is equal to the time range in the top right corner of a chart.
Here are the top 10 IP addresses from which failed SSH login attempts arrived, for a given host and time range:
topk(10, sum by (ip_address) ( count_over_time( {job="logstash-sshd",hostname="$hostname"} | json | __error__ = "" [$__range] ) ) )
$hostname is a chart variable, which can be configured from a chart's settings.
I configured Logstash to send logs to Loki as JSON, but it's a rather hacky solution, so occasionally invalid JSON is sent.
Here are the number of HTTP requests for the 15 distros on our mirror from the last hour:
topk(15, sum by (distro) ( count_over_time( {job="logstash-nginx"} | json | __error__ = "" | distro != "server-status" [1h] ) ) )
Here are the number of total bytes sent over HTTP for the top 15 distros from the last hour. Note the use of the unwrap
operator.
topk(15, sum by (distro) ( sum_over_time( {job="logstash-nginx"} | json | __error__ = "" | distro != "server-status" | unwrap bytes [1h] ) ) )
You can see more examples on the Mirror Requests dashboard on Grafana.
Avoid high cardinality
For both Prometheus and Loki, you must avoid high cardinality labels at all costs. By high cardinality, I mean labels which can take on a very large number of values; for example, using a label to store IP addresses would be a very bad idea. This is because Prometheus and Loki use labels to store metrics/logs efficiently with compression; when two metrics have two different sets of labels, they cannot be stored together, which increases the storage space usage.
With Loki, you can extract labels from your logs inside your query dynamically. One way to do this is with the json
operator; there are other ways to do this as well (see the LogQL docs). This basically means that we get infinite cardinality from our logs, the tradeoff being that queries may take longer to execute.
Also, be very careful about what you send to Loki from Logstash - every field in a Logstash message becomes a Loki label. Usage of the prune
command in Logstash is highly recommended.