Observability: Difference between revisions

From CSCWiki
Jump to navigation Jump to search
(Add Prometheus section)
 
 
(22 intermediate revisions by the same user not shown)
Line 1: Line 1:
There are three pillars of observability: metrics, logging and tracing. We are only interested in the first two.
There are [https://www.oreilly.com/library/view/distributed-systems-observability/9781492033431/ch04.html three pillars of observability]: metrics, logging and tracing. We are only interested in the first two.


== Metrics ==
== Metrics ==
All of our machines are, or at least should be, running the Prometheus node exporter. This collects and sends machine metrics (e.g. RAM used, disk space) to the Prometheus server running at https://prometheus.csclub.uwaterloo.ca (currently a VM on phosphoric-acid). There are a few specialized exporters running on several other machines; a Postfix exporter is running on mail, an Apache exporter is running on caffeine, and an NGINX expoter is running on potassium-benzoate. There is also a custom exporter written on syscom running on potassium-benzoate for mirror stats.
All of our machines are, or at least should be, running the Prometheus node exporter. This collects and sends machine metrics (e.g. RAM used, disk space) to the Prometheus server running at https://prometheus.csclub.uwaterloo.ca (currently a VM on phosphoric-acid). There are a few specialized exporters running on several other machines; a Postfix exporter is running on mail, an Apache exporter is running on caffeine, and an NGINX expoter is running on potassium-benzoate. There is also a custom exporter written by syscom running on potassium-benzoate for mirror stats.


Most of the exporters use mutual TLS authentication with the Prometheus server. I set the expiration date for the TLS certs to 10 years. If you are reading this and it is 2031 or later, then go update the certs.
Most of the exporters use mutual TLS authentication with the Prometheus server. I set the expiration date for the TLS certs to 10 years. If you are reading this and it is 2031 or later, then go update the certs.
Line 27: Line 27:
)
)
</pre>
</pre>
<br>

We also use [https://prometheus.io/docs/alerting/latest/alertmanager/ AlertManager] to send email alerts from Prometheus metrics. We should figure out how to also send messages to IRC or similar.
We also use [https://prometheus.io/docs/alerting/latest/alertmanager/ AlertManager] to send email alerts from Prometheus metrics. We should figure out how to also send messages to IRC or similar.


Line 33: Line 33:


We make some pretty charts on Grafana (https://prometheus.csclub.uwaterloo.ca) from PromQL queries. Grafana also has an 'Explorer' page where you can test out some queries before making chart panels from them.
We make some pretty charts on Grafana (https://prometheus.csclub.uwaterloo.ca) from PromQL queries. Grafana also has an 'Explorer' page where you can test out some queries before making chart panels from them.

== Logging ==
We now use [https://vector.dev/ Vector] for collecting and transforming logs, and [https://clickhouse.com/ ClickHouse] for storing log data.

=== ClickHouse ===
ClickHouse is a very fast OLAP database which has great documentation for storing and analyzing [https://clickhouse.com/use-cases/logging-and-metrics logging and metrics]. Unfortunately, the CPU on phosphoric-acid (which hosts the prometheus VM) is so old that when we try to install the official deb package, the following error occurs:
<pre>
Instruction check fail. The CPU does not support SSSE3 instruction set.
</pre>
So we're going to download the "compat" version instead:
<pre>
cd /root
wget https://s3.amazonaws.com/clickhouse-builds/master/amd64compat/clickhouse
chmod +x clickhouse
./clickhouse install
rm clickhouse
wget -O /etc/systemd/system/clickhouse-server.service https://github.com/ClickHouse/ClickHouse/raw/master/packages/clickhouse-server.service
systemctl daemon-reload
systemctl enable clickhouse-server
</pre>
By default, systemd limits the number of threads which a service can create, so we'll want to disable that. Run <code>systemctl edit clickhouse-server</code> and paste the following:
<pre>
[Service]
TasksMax=infinity
</pre>
Next, paste the following into /etc/clickhouse-server/users.d/csclub-users.xml:
<pre>
<clickhouse>
<profiles>
<default>
<!-- disable logs (using too much disk space) -->
<log_queries replace="replace">0</log_queries>
<log_query_threads replace="replace">0</log_query_threads>
</default>
<readonly>
<!-- Grafana needs to change settings queries -->
<readonly>2</readonly>
</readonly>
</profiles>
<users>
<default>
<!-- The default user should only be allowed to connect from localhost -->
<networks>
<ip>::1</ip>
<ip>127.0.0.1</ip>
</networks>
<!-- Allow the default user to create new users -->
<access_management>1</access_management>
<named_collection_control>1</named_collection_control>
<show_named_collections>1</show_named_collections>
<show_named_collections_secrets>1</show_named_collections_secrets>
</default>
</users>
</clickhouse>
</pre>
Then paste the following into /etc/clickhouse-server/config/zzz-csclub.xml (we need the zzz prefix because the configuration files are merged in alphabetical order, and we want ours to be applied last):
<pre>
<clickhouse>
<listen_host>127.0.0.1</listen_host>
<listen_host>::1</listen_host>
<logger>
<level>information</level>
<size>100M</size>
<count>10</count>
</logger>
<mysql_port></mysql_port>
<postgresql_port></postgresql_port>

<!-- disable logs (using too much disk space) -->
<asynchronous_metric_log remove="1"/>
<metric_log remove="1"/>
<query_thread_log remove="1" />
<query_log remove="1" />
<query_views_log remove="1" />
<part_log remove="1"/>
<session_log remove="1"/>
<text_log remove="1" />
<trace_log remove="1"/>
</clickhouse>
</pre>
Then run <code>systemctl restart clickhouse-server</code> and make sure that it's running.

==== Schema ====
Run <code>clickhouse-client</code> to get a SQL shell. First we need to create a new database and some users:
<pre>
CREATE DATABASE vector;
CREATE USER vector IDENTIFIED BY 'REPLACE_ME';
GRANT ALL ON vector.* TO vector;
CREATE USER grafana IDENTIFIED BY 'REPLACE_ME' SETTINGS PROFILE 'readonly';
GRANT SHOW DATABASES, SHOW TABLES, SELECT, DICTGET ON *.* TO grafana;
</pre>

In some of our tables, we'll store the two-letter country code instead of a country's full name to save space. So we'll create a [https://clickhouse.com/docs/en/sql-reference/dictionaries dictionary] so that we can look up a country's full name. Exit the SQL shell, then, download the CSV file:
<pre>
wget -O /var/lib/clickhouse/user_files/country_codes.csv 'https://datahub.io/core/country-list/r/data.csv'
</pre>
Then run <code>clickhouse-client</code> and create the dictionary:
<pre>
CREATE DICTIONARY vector.country_codes_dictionary
(
Name String,
Code String
)
PRIMARY KEY Code
SOURCE(FILE(path '/var/lib/clickhouse/user_files/country_codes.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 0)
LAYOUT(HASHED_ARRAY());
</pre>
Perform a SELECT to fill the table:
<pre>
SELECT * FROM country_codes_dictionary;
</pre>

Now we need to create the tables for storing our actual log data (after they are transformed by Vector).
Create a table for failed SSH logins:
<pre>
CREATE TABLE vector.failed_ssh_logins
(
host LowCardinality(String),
timestamp DateTime,
ip_address IPv6,
username String,
country_code LowCardinality(String)
)
ENGINE = MergeTree()
PRIMARY KEY (host, timestamp)
TTL timestamp + INTERVAL 1 MONTH DELETE;
</pre>

Create a table for storing mirror requests:
<pre>
CREATE TABLE vector.mirror_requests
(
distro LowCardinality(String),
timestamp DateTime CODEC(Delta, ZSTD),
ip_address IPv6,
bytes_sent UInt64,
user_agent String,
country_code LowCardinality(String),
region_name String,
city String
)
ENGINE = MergeTree()
PRIMARY KEY (distro, timestamp, country_code, region_name, city)
TTL timestamp + INTERVAL 1 WEEK DELETE;
</pre>

One of ClickHouse's great features is [https://clickhouse.com/docs/en/guides/developer/cascading-materialized-views Materialized Views]. These allow us to automatically "forward" data from one table to another, and the second table can use a different storage engine to aggregate data and save space.

We want to calculate the total number of requests and bytes sent for each distro, so let's create a table and view for that:
<pre>
CREATE TABLE vector.mirror_requests_agg_by_distro
(
distro LowCardinality(String),
date Date CODEC(Delta, ZSTD),
country_code LowCardinality(String),
num_requests UInt64,
bytes_sent UInt64
)
ENGINE = SummingMergeTree((num_requests, bytes_sent))
PRIMARY KEY (distro, toStartOfMonth(date), date, country_code)
TTL date + INTERVAL 1 MONTH
GROUP BY distro, toStartOfMonth(date)
SET num_requests = sum(num_requests),
bytes_sent = sum(bytes_sent),
date + INTERVAL 2 YEAR DELETE;

CREATE MATERIALIZED VIEW vector.mirror_requests_agg_by_distro_mv
TO vector.mirror_requests_agg_by_distro
AS
SELECT
distro,
toDate(timestamp) AS date,
country_code,
sum(1) AS num_requests,
sum(bytes_sent) AS bytes_sent
FROM vector.mirror_requests
GROUP BY distro, date, country_code;
</pre>

We also wants some stats for Canada specifically:
<pre>
CREATE TABLE vector.mirror_requests_agg_canada
(
distro LowCardinality(String),
date Date CODEC(Delta, ZSTD),
region_name LowCardinality(String),
city String,
bytes_sent UInt64,
num_requests UInt64
)
ENGINE = SummingMergeTree((bytes_sent, num_requests))
PRIMARY KEY (distro, toStartOfMonth(date), date, region_name, city)
TTL date + INTERVAL 1 MONTH
GROUP BY distro, toStartOfMonth(date)
SET num_requests = sum(num_requests),
bytes_sent = sum(bytes_sent),
date + INTERVAL 2 YEAR DELETE;

CREATE MATERIALIZED VIEW vector.mirror_requests_agg_canada_mv
TO vector.mirror_requests_agg_canada
AS
SELECT
distro,
toDate(timestamp) as date,
region_name,
city,
sum(bytes_sent) AS bytes_sent,
sum(1) AS num_requests
FROM vector.mirror_requests
WHERE country_code = 'CA'
GROUP BY distro, date, region_name, city;
</pre>

We also want to keep stats just for the university:
<pre>
CREATE TABLE vector.mirror_requests_agg_uw
(
distro LowCardinality(String),
date Date CODEC(Delta, ZSTD),
bytes_sent UInt64,
num_requests UInt64
)
ENGINE = SummingMergeTree((bytes_sent, num_requests))
PRIMARY KEY (distro, toStartOfMonth(date), date)
TTL date + INTERVAL 1 MONTH
GROUP BY distro, toStartOfMonth(date)
SET num_requests = sum(num_requests),
bytes_sent = sum(bytes_sent),
date + INTERVAL 2 YEAR DELETE;

CREATE MATERIALIZED VIEW vector.mirror_requests_agg_uw_mv
TO vector.mirror_requests_agg_uw
AS
SELECT
distro,
toDate(timestamp) as date,
sum(bytes_sent) AS bytes_sent,
sum(1) AS num_requests
FROM vector.mirror_requests
WHERE isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:129.97.0.0/112')
OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:10.0.0.0/104')
OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:172.16.0.0/108')
OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:192.168.0.0/112')
OR isIPAddressInRange(IPv6NumToString(ip_address), '2620:101:f000::/47')
OR isIPAddressInRange(IPv6NumToString(ip_address), 'fd74:6b6a:8eca::/47')
GROUP BY distro, date, region_name, city;
</pre>

Finally, we'll store some stats for IP subnets:
<pre>
CREATE TABLE vector.mirror_requests_agg_ip
(
timestamp DateTime CODEC(Delta, ZSTD),
cidr_start IPv6,
country_code LowCardinality(String),
num_requests UInt64,
bytes_sent UInt64
)
ENGINE = SummingMergeTree((num_requests, bytes_sent))
PRIMARY KEY (timestamp, cidr_start, country_code)
TTL timestamp + toIntervalWeek(2);

CREATE MATERIALIZED VIEW vector.mirror_requests_agg_ip_mv TO vector.mirror_requests_agg_ip AS
SELECT
toStartOfFiveMinutes(timestamp) AS timestamp,
IPv6CIDRToRange(ip_address, 120).1 AS cidr_start,
country_code,
sum(1) AS num_requests,
sum(bytes_sent) AS bytes_sent
FROM vector.mirror_requests
GROUP BY
timestamp,
cidr_start,
country_code;
</pre>

=== GeoIP database ===
We'll want to look up geographic information for the IP addresses in our data. To do this, we'll use the [https://dev.maxmind.com/geoip/geolite2-free-geolocation-data MaxMind GeoLite2 databases]. Syscom already has a MaxMind account; the password is stored in the usual place. Install the latest geoipupdate package from [https://github.com/maxmind/geoipupdate/releases here], then edit /etc/GeoIP.conf as necessary (use the syscom account ID and license key). Set <code>EditionIDs</code> to <code>GeoLite2-City</code> only.

We'll use a systemd timer to run the geoipupdate script periodically. Paste the following into /etc/systemd/system/geoipupdate.service:
<pre>
[Unit]
Description=GeoIP Update
Documentation=https://dev.maxmind.com/geoip/updating-databases
After=network-online.target

[Service]
Type=oneshot
ExecStart=/usr/bin/geoipupdate
Nice=19
IOSchedulingClass=idle
IOSchedulingPriority=7
ProtectSystem=strict
ReadWritePaths=/usr/share/GeoIP
ProtectHome=true
PrivateTmp=true
PrivateDevices=true
ProtectHostname=true
ProtectClock=true
ProtectKernelTunables=true
ProtectKernelModules=true
ProtectKernelLogs=true
ProtectControlGroups=true
LockPersonality=true
RestrictRealtime=true
</pre>
Run <code>systemctl daemon-reload</code> and then <code>systemctl start geoipupdate</code> to download the database for the first time.

Now paste the following into /etc/systemd/system/geoipupdate.timer:
<pre>
[Unit]
Description=Automatic GeoIP database update
Documentation=https://dev.maxmind.com/geoip/updating-databases

[Timer]
OnCalendar=monthly
RandomizedDelaySec=12h
Persistent=true

[Install]
WantedBy=timers.target
</pre>
Then run:
<pre>
systemctl daemon-reload
systemctl enable geoipupdate.timer
systemctl start geoipupdate.timer
</pre>

=== Vector ===
Vector allows you to create directed acyclic graphs (DAGs) for collecting and processing logs, which gives us a lot of flexibility. It also has a built-in scripting language, [https://vector.dev/docs/reference/vrl/ Vector Remap Language (VRL)] for slicing and dicing data. This allows us to remove fields which we don't need, add new fields which we do need, enrich an event with extra data, etc.

Our data pipeline looks like this: Vector agents -> Vector aggregator -> ClickHouse. We use Grafana for visualization.

We use mutual TLS between the agents and the aggregator to make sure that random people can't send us garbage data:
<pre>
openssl req -newkey rsa:2048 -nodes -keyout aggregator.key -x509 -out aggregator.crt -days 36500
openssl req -newkey rsa:2048 -nodes -keyout agent.key -x509 -out agent.crt -days 36500
chown vector:vector *.crt *.key
</pre>

Here is what our vector.toml looks like on the general-use machines; currently, we only use it for collecting failed SSH login attempts.
<pre>
[sources.source_sshd]
type = "journald"
include_units = ["ssh.service"]

[transforms.remap_sshd]
type = "remap"
inputs = ["source_sshd"]
source = """
parsed, err = parse_regex(
.message, r'^(?:Connection (?:closed|reset)|Disconnected) (?:by|from) (?:invalid|authenticating) user (?P<user>[^ ]+) (?P<ip>[0-9.a-f:]+)'
)
if is_null(err) {
. = {
"username": parsed.user,
"ip_address": parsed.ip,
"host": .host,
"timestamp": .timestamp,
"job": "vector-sshd"
}
}
"""

[transforms.filter_sshd]
type = "filter"
inputs = ["remap_sshd"]
condition = '.job == "vector-sshd"'

[sinks.aggregator]
type = "vector"
inputs = ["filter_sshd"]
address = "prometheus:5045"
[sinks.aggregator.tls]
enabled = true
ca_file = "/etc/vector/aggregator.crt"
crt_file = "/etc/vector/agent.crt"
key_file = "/etc/vector/agent.key"
verify_hostname = false
</pre>

The agent on potassium-benzoate collects NGINX logs:
<pre>
[sources.source_nginx]
type = "file"
include = ["/var/log/nginx/access.log"]
max_read_bytes = 65536

[transforms.remap_nginx]
type = "remap"
inputs = ["source_nginx"]
source = """
parsed_log, err = parse_nginx_log(.message, "combined")
status = parsed_log.status
request = string!(parsed_log.request || "")
if is_null(err) && status == 200 {
parsed_path, err = parse_regex(request, r'^GET /+(?P<distro>[^/? ]+)')
distro = parsed_path.distro
ignore = [
"server-status", "stats", "robots.txt",
"include", "pub", "news", "index.html", "sync.json", "ups",
"pool", "dists", "csclub.asc", "csclub.gpg"
]
if (
is_null(err) && !includes(ignore, distro) && !contains(request, "..") &&
!starts_with(request, "#") && !starts_with(request, "%") && !starts_with(request, ".")
) {
. = {
"distro": distro,
"user_agent": parsed_log.agent,
"ip_address": parsed_log.client,
"bytes_sent": parsed_log.size,
"timestamp": parsed_log.timestamp,
"job": "vector-mirror"
}
}
}
"""

...
</pre>

Finally, here's the aggregator config, which collects data from each agent and then inserts it into ClickHouse:
<pre>
[enrichment_tables.enrich_geoip]
type = "geoip"
path = "/usr/share/GeoIP/GeoLite2-City.mmdb"

[sources.source_agents]
type = "vector"
address = "[::]:5045"
[sources.source_agents.tls]
enabled = true
ca_file = "/etc/vector/agent.crt"
crt_file = "/etc/vector/aggregator.crt"
key_file = "/etc/vector/aggregator.key"
verify_hostname = false

[transforms.transform_route]
type = "route"
inputs = ["source_agents"]
route.sshd = '.job == "vector-sshd"'
route.mirror = '.job == "vector-mirror"'

[transforms.transform_sshd]
type = "remap"
inputs = ["transform_route.sshd"]
source = """
ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {}
if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \
|| ip_cidr_contains!("172.16.0.0/12", .ip_address) \
|| ip_cidr_contains!("192.168.0.0/16", .ip_address)) {
ipinfo.country_code = "CA";
ipinfo.region_name = "Ontario";
ipinfo.city_name = "Waterloo";
}
. = {
"host": .host,
"timestamp": .timestamp,
"username": .username,
"ip_address": ip_to_ipv6!(.ip_address),
"country_code": ipinfo.country_code || ""
}
"""

[transforms.transform_mirror]
type = "remap"
inputs = ["transform_route.mirror"]
source = """
ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {}
if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \
|| ip_cidr_contains!("172.16.0.0/12", .ip_address) \
|| ip_cidr_contains!("192.168.0.0/16", .ip_address)) {
ipinfo.country_code = "CA";
ipinfo.region_name = "Ontario";
ipinfo.city_name = "Waterloo";
}
. = {
"distro": .distro,
"timestamp": .timestamp,
"ip_address": ip_to_ipv6!(.ip_address),
"bytes_sent": .bytes_sent,
"user_agent": .user_agent,
"country_code": ipinfo.country_code || "",
"region_name": ipinfo.region_name || "",
"city": ipinfo.city_name || ""
}
"""

[transforms.transform_unmatched]
type = "remap"
inputs = ["transform_route._unmatched"]
source = """
log("unrecognized job: " + string!(.job || "null"), level: "warn")
"""

[sinks.sink_unmatched]
type = "blackhole"
inputs = ["transform_unmatched"]
print_interval_secs = 0

[sinks.clickhouse_sshd]
type = "clickhouse"
inputs = ["transform_sshd"]
encoding.timestamp_format = "unix"
endpoint = "$CLICKHOUSE_ENDPOINT"
database = "$CLICKHOUSE_DATABASE"
table = "failed_ssh_logins"
[sinks.clickhouse_sshd.auth]
strategy = "basic"
user = "$CLICKHOUSE_USER"
password = "$CLICKHOUSE_PASSWORD"

...
</pre>

=== Beats, Logstash and Loki (old) ===
We previously used Elastic Beats, Logstash and Grafana Loki for collecting and storing logs. One day I tried to upgrade Logstash and it exploded so badly that I figured it would be easier to just switch to Vector instead.

<b>The sections below are kept for historical purposes only and are no longer accurate.</b>

We use a combination of [https://www.elastic.co/beats/ Elastic Beats], [https://www.elastic.co/logstash/ Logstash] and [https://grafana.com/oss/loki/ Loki] for collecting, storing and querying our logs; for visualization, we use Grafana. Logstash and Loki are currently both running in the prometheus VM.

The reason why I chose Loki over Elasticsearch is because Loki is <i>very</i> space efficient with regards to storage. It also consumes way less RAM and CPU. This means that we can collect a lot of logs without worrying too much about resource usage.

We have Journalbeat and/or Filebeat running on some of our machines to collect logs from sshd, Apache and NGINX. The Beats send these logs to Logstash, which does some pre-processing. The most useful contribution by Logstash is its GeoIP plugin, which allows us to enrich the logs with some geographical information from IP addresses (e.g. add city and country). Logstash sends these logs to Loki, and we can then view these from Grafana.

'''Note''': Sometimes the Loki output plugin for Logstash disappears after a reboot or an upgrade. If you see Logstash complaining about this in the journald logs, run this:
<pre>
cd /usr/share/logstash
bin/logstash-plugin install logstash-output-loki
systemctl restart logstash
</pre>
See [https://grafana.com/docs/loki/latest/clients/logstash/ here] for details.

The language for querying logs in Loki is [https://grafana.com/docs/loki/latest/logql/ LogQL], which, syntactically, is very similar to PromQL. If you have already learned PromQL, then you should be able to pick up LogQL very easily. You can try out some LogQL queries from the 'Explore' page on Grafana; make sure you toggle the data source to 'Loki' in the top left corner. For the 'topk' queries, you will also want to toggle 'Query type' to 'Instant' rather than 'Range'.

==== LogQL examples ====
Here are the number of failed SSH login attempts for each host for a given time range:
<pre>
sum by (hostname) (
count_over_time(
{job="logstash-sshd"} [$__range]
)
)
</pre>
Note that <code>$__range</code> is a special [https://grafana.com/docs/grafana/latest/variables/variable-types/global-variables/ global variable] in Grafana which is equal to the time range in the top right corner of a chart.
<br><br>
Here are the top 10 IP addresses from which failed SSH login attempts arrived, for a given host and time range:
<pre>
topk(10,
sum by (ip_address) (
count_over_time(
{job="logstash-sshd",hostname="$hostname"} | json | __error__ = ""
[$__range]
)
)
)
</pre>
$hostname is a chart variable, which can be configured from a chart's settings.

I configured Logstash to send logs to Loki as JSON, but it's a rather hacky solution, so occasionally invalid JSON is sent.
<br><br>
Here are the number of HTTP requests for the 15 distros on our mirror from the last hour:
<pre>
topk(15,
sum by (distro) (
count_over_time(
{job="logstash-nginx"} | json | __error__ = "" | distro != "server-status"
[1h]
)
)
)
</pre>
<br><br>
Here are the number of total bytes sent over HTTP for the top 15 distros from the last hour. Note the use of the <code>unwrap</code> operator.
<pre>
topk(15,
sum by (distro) (
sum_over_time(
{job="logstash-nginx"} | json | __error__ = "" | distro != "server-status" | unwrap bytes
[1h]
)
)
)
</pre>
You can see more examples on the Mirror Requests dashboard on Grafana.

==== Avoid high cardinality ====
For both Prometheus and Loki, you must [https://prometheus.io/docs/practices/naming/#labels avoid high cardinality] labels at all costs. By high cardinality, I mean labels which can take on a very large number of values; for example, using a label to store IP addresses would be a very bad idea. This is because Prometheus and Loki use labels to store metrics/logs efficiently with compression; when two metrics have two different sets of labels, they cannot be stored together, which increases the storage space usage.

With Loki, you can extract labels from your logs inside your query dynamically. One way to do this is with the <code>json</code> operator; there are other ways to do this as well (see the LogQL docs). This basically means that we get infinite cardinality from our logs, the tradeoff being that queries may take longer to execute.

Also, be very careful about what you send to Loki from Logstash - [https://grafana.com/docs/loki/latest/clients/logstash/#usage-and-configuration every field in a Logstash message becomes a Loki label]. Usage of the <code>prune</code> command in Logstash is highly recommended.

Latest revision as of 21:06, 15 December 2023

There are three pillars of observability: metrics, logging and tracing. We are only interested in the first two.

Metrics

All of our machines are, or at least should be, running the Prometheus node exporter. This collects and sends machine metrics (e.g. RAM used, disk space) to the Prometheus server running at https://prometheus.csclub.uwaterloo.ca (currently a VM on phosphoric-acid). There are a few specialized exporters running on several other machines; a Postfix exporter is running on mail, an Apache exporter is running on caffeine, and an NGINX expoter is running on potassium-benzoate. There is also a custom exporter written by syscom running on potassium-benzoate for mirror stats.

Most of the exporters use mutual TLS authentication with the Prometheus server. I set the expiration date for the TLS certs to 10 years. If you are reading this and it is 2031 or later, then go update the certs.

I highly suggest becoming familiar with PromQL, the query language for Prometheus. You can run and visualize some queries at https://prometheus.csclub.uwaterloo.ca/prometheus. For example, here is a query to determine which machines are up or down:

up{job="node_exporter"}


Here's how we determine if a machine has NFS mounted. This will return 1 for machines which have NFS mounted, but will not return any records for machines which do not have NFS mounted. (We ignore the actual value of node_filesystem_device_error because it returns 1 for machines using Kerberized NFS.)

count by (instance) (node_filesystem_device_error{mountpoint="/users", fstype="nfs"})


Now this is a rather complicated expression which can return one of three values:

  • 0: the machine is down
  • 1: the machine is up, but NFS is not mounted
  • 2: the machine is up and NFS is mounted

The or operator in PromQL is key here.

sum by (instance) (
  (count by (instance) (node_filesystem_device_error{mountpoint="/users", fstype="nfs"}))
  or up{job="node_exporter"}
)


We also use AlertManager to send email alerts from Prometheus metrics. We should figure out how to also send messages to IRC or similar.

We also use the Blackbox prober exporter to check if some of our web-based services are up.

We make some pretty charts on Grafana (https://prometheus.csclub.uwaterloo.ca) from PromQL queries. Grafana also has an 'Explorer' page where you can test out some queries before making chart panels from them.

Logging

We now use Vector for collecting and transforming logs, and ClickHouse for storing log data.

ClickHouse

ClickHouse is a very fast OLAP database which has great documentation for storing and analyzing logging and metrics. Unfortunately, the CPU on phosphoric-acid (which hosts the prometheus VM) is so old that when we try to install the official deb package, the following error occurs:

Instruction check fail. The CPU does not support SSSE3 instruction set.

So we're going to download the "compat" version instead:

cd /root
wget https://s3.amazonaws.com/clickhouse-builds/master/amd64compat/clickhouse
chmod +x clickhouse
./clickhouse install
rm clickhouse
wget -O /etc/systemd/system/clickhouse-server.service https://github.com/ClickHouse/ClickHouse/raw/master/packages/clickhouse-server.service
systemctl daemon-reload
systemctl enable clickhouse-server

By default, systemd limits the number of threads which a service can create, so we'll want to disable that. Run systemctl edit clickhouse-server and paste the following:

[Service]
TasksMax=infinity

Next, paste the following into /etc/clickhouse-server/users.d/csclub-users.xml:

<clickhouse>
  <profiles>
    <default>
      <!-- disable logs (using too much disk space) -->
      <log_queries replace="replace">0</log_queries>
      <log_query_threads replace="replace">0</log_query_threads>
    </default>
    <readonly>
      <!-- Grafana needs to change settings queries -->
      <readonly>2</readonly>
    </readonly>
  </profiles>
  <users>
    <default>
      <!-- The default user should only be allowed to connect from localhost -->
      <networks>
        <ip>::1</ip>
        <ip>127.0.0.1</ip>
      </networks>
      <!-- Allow the default user to create new users -->
      <access_management>1</access_management>
      <named_collection_control>1</named_collection_control>
      <show_named_collections>1</show_named_collections>
      <show_named_collections_secrets>1</show_named_collections_secrets>
    </default>
  </users>
</clickhouse>

Then paste the following into /etc/clickhouse-server/config/zzz-csclub.xml (we need the zzz prefix because the configuration files are merged in alphabetical order, and we want ours to be applied last):

<clickhouse>
  <listen_host>127.0.0.1</listen_host>
  <listen_host>::1</listen_host>
  <logger>
    <level>information</level>
    <size>100M</size>
    <count>10</count>
  </logger>
  <mysql_port></mysql_port>
  <postgresql_port></postgresql_port>

  <!-- disable logs (using too much disk space) -->
  <asynchronous_metric_log remove="1"/>
  <metric_log remove="1"/>
  <query_thread_log remove="1" />
  <query_log remove="1" />
  <query_views_log remove="1" />
  <part_log remove="1"/>
  <session_log remove="1"/>
  <text_log remove="1" />
  <trace_log remove="1"/>
</clickhouse>

Then run systemctl restart clickhouse-server and make sure that it's running.

Schema

Run clickhouse-client to get a SQL shell. First we need to create a new database and some users:

CREATE DATABASE vector;
CREATE USER vector IDENTIFIED BY 'REPLACE_ME';
GRANT ALL ON vector.* TO vector;
CREATE USER grafana IDENTIFIED BY 'REPLACE_ME' SETTINGS PROFILE 'readonly';
GRANT SHOW DATABASES, SHOW TABLES, SELECT, DICTGET ON *.* TO grafana;

In some of our tables, we'll store the two-letter country code instead of a country's full name to save space. So we'll create a dictionary so that we can look up a country's full name. Exit the SQL shell, then, download the CSV file:

wget -O /var/lib/clickhouse/user_files/country_codes.csv 'https://datahub.io/core/country-list/r/data.csv'

Then run clickhouse-client and create the dictionary:

CREATE DICTIONARY vector.country_codes_dictionary
(
    Name String,
    Code String
)
PRIMARY KEY Code
SOURCE(FILE(path '/var/lib/clickhouse/user_files/country_codes.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 0)
LAYOUT(HASHED_ARRAY());

Perform a SELECT to fill the table:

SELECT * FROM country_codes_dictionary;

Now we need to create the tables for storing our actual log data (after they are transformed by Vector). Create a table for failed SSH logins:

CREATE TABLE vector.failed_ssh_logins
(
    host LowCardinality(String),
    timestamp DateTime,
    ip_address IPv6,
    username String,
    country_code LowCardinality(String)
)
ENGINE = MergeTree()
PRIMARY KEY (host, timestamp)
TTL timestamp + INTERVAL 1 MONTH DELETE;

Create a table for storing mirror requests:

CREATE TABLE vector.mirror_requests
(
    distro LowCardinality(String),
    timestamp DateTime CODEC(Delta, ZSTD),
    ip_address IPv6,
    bytes_sent UInt64,
    user_agent String,
    country_code LowCardinality(String),
    region_name String,
    city String
)
ENGINE = MergeTree()
PRIMARY KEY (distro, timestamp, country_code, region_name, city)
TTL timestamp + INTERVAL 1 WEEK DELETE;

One of ClickHouse's great features is Materialized Views. These allow us to automatically "forward" data from one table to another, and the second table can use a different storage engine to aggregate data and save space.

We want to calculate the total number of requests and bytes sent for each distro, so let's create a table and view for that:

CREATE TABLE vector.mirror_requests_agg_by_distro
(
    distro LowCardinality(String),
    date Date CODEC(Delta, ZSTD),
    country_code LowCardinality(String),
    num_requests UInt64,
    bytes_sent UInt64
)
ENGINE = SummingMergeTree((num_requests, bytes_sent))
PRIMARY KEY (distro, toStartOfMonth(date), date, country_code)
TTL date + INTERVAL 1 MONTH
        GROUP BY distro, toStartOfMonth(date)
        SET num_requests = sum(num_requests),
            bytes_sent = sum(bytes_sent),
    date + INTERVAL 2 YEAR DELETE;

CREATE MATERIALIZED VIEW vector.mirror_requests_agg_by_distro_mv
TO vector.mirror_requests_agg_by_distro
AS
SELECT
    distro,
    toDate(timestamp) AS date,
    country_code,
    sum(1) AS num_requests,
    sum(bytes_sent) AS bytes_sent
FROM vector.mirror_requests
GROUP BY distro, date, country_code;

We also wants some stats for Canada specifically:

CREATE TABLE vector.mirror_requests_agg_canada
(
    distro LowCardinality(String),
    date Date CODEC(Delta, ZSTD),
    region_name LowCardinality(String),
    city String,
    bytes_sent UInt64,
    num_requests UInt64
)
ENGINE = SummingMergeTree((bytes_sent, num_requests))
PRIMARY KEY (distro, toStartOfMonth(date), date, region_name, city)
TTL date + INTERVAL 1 MONTH
        GROUP BY distro, toStartOfMonth(date)
        SET num_requests = sum(num_requests),
            bytes_sent = sum(bytes_sent),
    date + INTERVAL 2 YEAR DELETE;

CREATE MATERIALIZED VIEW vector.mirror_requests_agg_canada_mv
TO vector.mirror_requests_agg_canada
AS
SELECT
    distro,
    toDate(timestamp) as date,
    region_name,
    city,
    sum(bytes_sent) AS bytes_sent,
    sum(1) AS num_requests
FROM vector.mirror_requests
WHERE country_code = 'CA'
GROUP BY distro, date, region_name, city;

We also want to keep stats just for the university:

CREATE TABLE vector.mirror_requests_agg_uw
(
    distro LowCardinality(String),
    date Date CODEC(Delta, ZSTD),
    bytes_sent UInt64,
    num_requests UInt64
)
ENGINE = SummingMergeTree((bytes_sent, num_requests))
PRIMARY KEY (distro, toStartOfMonth(date), date)
TTL date + INTERVAL 1 MONTH
        GROUP BY distro, toStartOfMonth(date)
        SET num_requests = sum(num_requests),
            bytes_sent = sum(bytes_sent),
    date + INTERVAL 2 YEAR DELETE;

CREATE MATERIALIZED VIEW vector.mirror_requests_agg_uw_mv
TO vector.mirror_requests_agg_uw
AS
SELECT
    distro,
    toDate(timestamp) as date,
    sum(bytes_sent) AS bytes_sent,
    sum(1) AS num_requests
FROM vector.mirror_requests
WHERE isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:129.97.0.0/112')
   OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:10.0.0.0/104')
   OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:172.16.0.0/108')
   OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:192.168.0.0/112')
   OR isIPAddressInRange(IPv6NumToString(ip_address), '2620:101:f000::/47')
   OR isIPAddressInRange(IPv6NumToString(ip_address), 'fd74:6b6a:8eca::/47')
GROUP BY distro, date, region_name, city;

Finally, we'll store some stats for IP subnets:

CREATE TABLE vector.mirror_requests_agg_ip
(
    timestamp DateTime CODEC(Delta, ZSTD),
    cidr_start IPv6,
    country_code LowCardinality(String),
    num_requests UInt64,
    bytes_sent UInt64
)
ENGINE = SummingMergeTree((num_requests, bytes_sent))
PRIMARY KEY (timestamp, cidr_start, country_code)
TTL timestamp + toIntervalWeek(2);

CREATE MATERIALIZED VIEW vector.mirror_requests_agg_ip_mv TO vector.mirror_requests_agg_ip AS
SELECT
    toStartOfFiveMinutes(timestamp) AS timestamp,
    IPv6CIDRToRange(ip_address, 120).1 AS cidr_start,
    country_code,
    sum(1) AS num_requests,
    sum(bytes_sent) AS bytes_sent
FROM vector.mirror_requests
GROUP BY
    timestamp,
    cidr_start,
    country_code;

GeoIP database

We'll want to look up geographic information for the IP addresses in our data. To do this, we'll use the MaxMind GeoLite2 databases. Syscom already has a MaxMind account; the password is stored in the usual place. Install the latest geoipupdate package from here, then edit /etc/GeoIP.conf as necessary (use the syscom account ID and license key). Set EditionIDs to GeoLite2-City only.

We'll use a systemd timer to run the geoipupdate script periodically. Paste the following into /etc/systemd/system/geoipupdate.service:

[Unit]
Description=GeoIP Update
Documentation=https://dev.maxmind.com/geoip/updating-databases
After=network-online.target

[Service]
Type=oneshot
ExecStart=/usr/bin/geoipupdate
Nice=19
IOSchedulingClass=idle
IOSchedulingPriority=7
ProtectSystem=strict
ReadWritePaths=/usr/share/GeoIP
ProtectHome=true
PrivateTmp=true
PrivateDevices=true
ProtectHostname=true
ProtectClock=true
ProtectKernelTunables=true
ProtectKernelModules=true
ProtectKernelLogs=true
ProtectControlGroups=true
LockPersonality=true
RestrictRealtime=true

Run systemctl daemon-reload and then systemctl start geoipupdate to download the database for the first time.

Now paste the following into /etc/systemd/system/geoipupdate.timer:

[Unit]
Description=Automatic GeoIP database update
Documentation=https://dev.maxmind.com/geoip/updating-databases

[Timer]
OnCalendar=monthly
RandomizedDelaySec=12h
Persistent=true

[Install]
WantedBy=timers.target

Then run:

systemctl daemon-reload
systemctl enable geoipupdate.timer
systemctl start geoipupdate.timer

Vector

Vector allows you to create directed acyclic graphs (DAGs) for collecting and processing logs, which gives us a lot of flexibility. It also has a built-in scripting language, Vector Remap Language (VRL) for slicing and dicing data. This allows us to remove fields which we don't need, add new fields which we do need, enrich an event with extra data, etc.

Our data pipeline looks like this: Vector agents -> Vector aggregator -> ClickHouse. We use Grafana for visualization.

We use mutual TLS between the agents and the aggregator to make sure that random people can't send us garbage data:

openssl req -newkey rsa:2048 -nodes -keyout aggregator.key -x509 -out aggregator.crt -days 36500
openssl req -newkey rsa:2048 -nodes -keyout agent.key -x509 -out agent.crt -days 36500
chown vector:vector *.crt *.key

Here is what our vector.toml looks like on the general-use machines; currently, we only use it for collecting failed SSH login attempts.

[sources.source_sshd]
type = "journald"
include_units = ["ssh.service"]

[transforms.remap_sshd]
type = "remap"
inputs = ["source_sshd"]
source = """
  parsed, err = parse_regex(
    .message, r'^(?:Connection (?:closed|reset)|Disconnected) (?:by|from) (?:invalid|authenticating) user (?P<user>[^ ]+) (?P<ip>[0-9.a-f:]+)'
  )
  if is_null(err) {
    . = {
      "username": parsed.user,
      "ip_address": parsed.ip,
      "host": .host,
      "timestamp": .timestamp,
      "job": "vector-sshd"
    }
  }
"""

[transforms.filter_sshd]
type = "filter"
inputs = ["remap_sshd"]
condition = '.job == "vector-sshd"'

[sinks.aggregator]
type = "vector"
inputs = ["filter_sshd"]
address = "prometheus:5045"
  [sinks.aggregator.tls]
  enabled = true
  ca_file = "/etc/vector/aggregator.crt"
  crt_file = "/etc/vector/agent.crt"
  key_file = "/etc/vector/agent.key"
  verify_hostname = false

The agent on potassium-benzoate collects NGINX logs:

[sources.source_nginx]
type = "file"
include = ["/var/log/nginx/access.log"]
max_read_bytes = 65536

[transforms.remap_nginx]
type = "remap"
inputs = ["source_nginx"]
source = """ 
  parsed_log, err = parse_nginx_log(.message, "combined")
  status = parsed_log.status
  request = string!(parsed_log.request || "")
  if is_null(err) && status == 200 {
    parsed_path, err = parse_regex(request, r'^GET /+(?P<distro>[^/? ]+)')
    distro = parsed_path.distro
    ignore = [
      "server-status", "stats", "robots.txt",
      "include", "pub", "news", "index.html", "sync.json", "ups",
      "pool", "dists", "csclub.asc", "csclub.gpg"
    ]
    if (
      is_null(err) && !includes(ignore, distro) && !contains(request, "..") &&
      !starts_with(request, "#") && !starts_with(request, "%") && !starts_with(request, ".")
    ) {
      . = {
        "distro": distro,
        "user_agent": parsed_log.agent,
        "ip_address": parsed_log.client,
        "bytes_sent": parsed_log.size,
        "timestamp": parsed_log.timestamp,
        "job": "vector-mirror"
      }
    }
  }
"""

...

Finally, here's the aggregator config, which collects data from each agent and then inserts it into ClickHouse:

[enrichment_tables.enrich_geoip]                                                    
type = "geoip"                                                                      
path = "/usr/share/GeoIP/GeoLite2-City.mmdb"

[sources.source_agents]      
type = "vector"
address = "[::]:5045"
  [sources.source_agents.tls]
  enabled = true
  ca_file = "/etc/vector/agent.crt"
  crt_file = "/etc/vector/aggregator.crt"
  key_file = "/etc/vector/aggregator.key"
  verify_hostname = false

[transforms.transform_route]
type = "route"
inputs = ["source_agents"]
route.sshd = '.job == "vector-sshd"'
route.mirror = '.job == "vector-mirror"'

[transforms.transform_sshd]
type = "remap"
inputs = ["transform_route.sshd"]
source = """ 
  ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {}
  if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \
                            || ip_cidr_contains!("172.16.0.0/12", .ip_address) \
                            || ip_cidr_contains!("192.168.0.0/16", .ip_address)) {
    ipinfo.country_code = "CA";
    ipinfo.region_name = "Ontario";
    ipinfo.city_name = "Waterloo";
  }
  . = {
    "host": .host,
    "timestamp": .timestamp,
    "username": .username,
    "ip_address": ip_to_ipv6!(.ip_address),
    "country_code": ipinfo.country_code || ""
  }
"""

[transforms.transform_mirror]
type = "remap"
inputs = ["transform_route.mirror"]
source = """
  ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {}
  if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \
                            || ip_cidr_contains!("172.16.0.0/12", .ip_address) \
                            || ip_cidr_contains!("192.168.0.0/16", .ip_address)) {
    ipinfo.country_code = "CA";
    ipinfo.region_name = "Ontario";
    ipinfo.city_name = "Waterloo";
  }
  . = {
    "distro": .distro,
    "timestamp": .timestamp,
    "ip_address": ip_to_ipv6!(.ip_address),
    "bytes_sent": .bytes_sent,
    "user_agent": .user_agent,
    "country_code": ipinfo.country_code || "",
    "region_name": ipinfo.region_name || "",
    "city": ipinfo.city_name || ""
  }
"""

[transforms.transform_unmatched]
type = "remap"
inputs = ["transform_route._unmatched"]
source = """
  log("unrecognized job: " + string!(.job || "null"), level: "warn")
"""

[sinks.sink_unmatched]
type = "blackhole"
inputs = ["transform_unmatched"]
print_interval_secs = 0

[sinks.clickhouse_sshd]
type = "clickhouse"
inputs = ["transform_sshd"]
encoding.timestamp_format = "unix"
endpoint = "$CLICKHOUSE_ENDPOINT"
database = "$CLICKHOUSE_DATABASE"
table = "failed_ssh_logins"
  [sinks.clickhouse_sshd.auth]
  strategy = "basic"
  user = "$CLICKHOUSE_USER"
  password = "$CLICKHOUSE_PASSWORD"

...

Beats, Logstash and Loki (old)

We previously used Elastic Beats, Logstash and Grafana Loki for collecting and storing logs. One day I tried to upgrade Logstash and it exploded so badly that I figured it would be easier to just switch to Vector instead.

The sections below are kept for historical purposes only and are no longer accurate.

We use a combination of Elastic Beats, Logstash and Loki for collecting, storing and querying our logs; for visualization, we use Grafana. Logstash and Loki are currently both running in the prometheus VM.

The reason why I chose Loki over Elasticsearch is because Loki is very space efficient with regards to storage. It also consumes way less RAM and CPU. This means that we can collect a lot of logs without worrying too much about resource usage.

We have Journalbeat and/or Filebeat running on some of our machines to collect logs from sshd, Apache and NGINX. The Beats send these logs to Logstash, which does some pre-processing. The most useful contribution by Logstash is its GeoIP plugin, which allows us to enrich the logs with some geographical information from IP addresses (e.g. add city and country). Logstash sends these logs to Loki, and we can then view these from Grafana.

Note: Sometimes the Loki output plugin for Logstash disappears after a reboot or an upgrade. If you see Logstash complaining about this in the journald logs, run this:

cd /usr/share/logstash
bin/logstash-plugin install logstash-output-loki
systemctl restart logstash

See here for details.

The language for querying logs in Loki is LogQL, which, syntactically, is very similar to PromQL. If you have already learned PromQL, then you should be able to pick up LogQL very easily. You can try out some LogQL queries from the 'Explore' page on Grafana; make sure you toggle the data source to 'Loki' in the top left corner. For the 'topk' queries, you will also want to toggle 'Query type' to 'Instant' rather than 'Range'.

LogQL examples

Here are the number of failed SSH login attempts for each host for a given time range:

sum by (hostname) (
  count_over_time(
    {job="logstash-sshd"} [$__range]
  )
)

Note that $__range is a special global variable in Grafana which is equal to the time range in the top right corner of a chart.

Here are the top 10 IP addresses from which failed SSH login attempts arrived, for a given host and time range:

topk(10,
  sum by (ip_address) (
    count_over_time(
      {job="logstash-sshd",hostname="$hostname"} | json | __error__ = ""
      [$__range]
    )
  )
)

$hostname is a chart variable, which can be configured from a chart's settings.

I configured Logstash to send logs to Loki as JSON, but it's a rather hacky solution, so occasionally invalid JSON is sent.

Here are the number of HTTP requests for the 15 distros on our mirror from the last hour:

topk(15,
  sum by (distro) (
    count_over_time(
      {job="logstash-nginx"} | json | __error__ = "" | distro != "server-status"
      [1h]
    )
  )
)



Here are the number of total bytes sent over HTTP for the top 15 distros from the last hour. Note the use of the unwrap operator.

topk(15,
  sum by (distro) (
    sum_over_time(
      {job="logstash-nginx"} | json | __error__ = "" | distro != "server-status" | unwrap bytes
      [1h]
    )
  )
)

You can see more examples on the Mirror Requests dashboard on Grafana.

Avoid high cardinality

For both Prometheus and Loki, you must avoid high cardinality labels at all costs. By high cardinality, I mean labels which can take on a very large number of values; for example, using a label to store IP addresses would be a very bad idea. This is because Prometheus and Loki use labels to store metrics/logs efficiently with compression; when two metrics have two different sets of labels, they cannot be stored together, which increases the storage space usage.

With Loki, you can extract labels from your logs inside your query dynamically. One way to do this is with the json operator; there are other ways to do this as well (see the LogQL docs). This basically means that we get infinite cardinality from our logs, the tradeoff being that queries may take longer to execute.

Also, be very careful about what you send to Loki from Logstash - every field in a Logstash message becomes a Loki label. Usage of the prune command in Logstash is highly recommended.