Observability: Difference between revisions
m (→Vector) |
m (→ClickHouse) |
||
Line 81: | Line 81: | ||
</clickhouse> |
</clickhouse> |
||
</pre> |
</pre> |
||
Then paste the following into /etc/clickhouse-server/config/ |
Then paste the following into /etc/clickhouse-server/config/zzz-csclub.xml (we need the zzz prefix because the configuration files are merged in alphabetical order, and we want ours to be applied last): |
||
<pre> |
<pre> |
||
<clickhouse> |
<clickhouse> |
||
<listen_host>127.0.0.1</listen_host> |
|||
<listen_host>::1</listen_host> |
|||
<logger> |
<logger> |
||
<level>information</level> |
<level>information</level> |
Revision as of 13:27, 14 October 2023
There are three pillars of observability: metrics, logging and tracing. We are only interested in the first two.
Metrics
All of our machines are, or at least should be, running the Prometheus node exporter. This collects and sends machine metrics (e.g. RAM used, disk space) to the Prometheus server running at https://prometheus.csclub.uwaterloo.ca (currently a VM on phosphoric-acid). There are a few specialized exporters running on several other machines; a Postfix exporter is running on mail, an Apache exporter is running on caffeine, and an NGINX expoter is running on potassium-benzoate. There is also a custom exporter written by syscom running on potassium-benzoate for mirror stats.
Most of the exporters use mutual TLS authentication with the Prometheus server. I set the expiration date for the TLS certs to 10 years. If you are reading this and it is 2031 or later, then go update the certs.
I highly suggest becoming familiar with PromQL, the query language for Prometheus. You can run and visualize some queries at https://prometheus.csclub.uwaterloo.ca/prometheus. For example, here is a query to determine which machines are up or down:
up{job="node_exporter"}
Here's how we determine if a machine has NFS mounted. This will return 1 for machines which have NFS mounted, but will not return any records for machines which do not have NFS mounted. (We ignore the actual value of node_filesystem_device_error because it returns 1 for machines using Kerberized NFS.)
count by (instance) (node_filesystem_device_error{mountpoint="/users", fstype="nfs"})
Now this is a rather complicated expression which can return one of three values:
- 0: the machine is down
- 1: the machine is up, but NFS is not mounted
- 2: the machine is up and NFS is mounted
The or operator in PromQL is key here.
sum by (instance) ( (count by (instance) (node_filesystem_device_error{mountpoint="/users", fstype="nfs"})) or up{job="node_exporter"} )
We also use AlertManager to send email alerts from Prometheus metrics. We should figure out how to also send messages to IRC or similar.
We also use the Blackbox prober exporter to check if some of our web-based services are up.
We make some pretty charts on Grafana (https://prometheus.csclub.uwaterloo.ca) from PromQL queries. Grafana also has an 'Explorer' page where you can test out some queries before making chart panels from them.
Logging
We now use Vector for collecting and transforming logs, and ClickHouse for storing log data.
ClickHouse
ClickHouse is a very fast OLAP database which has great documentation for storing and analyzing logging and metrics. Unfortunately, the CPU on phosphoric-acid (which hosts the prometheus VM) is so old that when we try to install the official deb package, the following error occurs:
Instruction check fail. The CPU does not support SSSE3 instruction set.
So we're going to download the "compat" version instead:
cd /root wget https://s3.amazonaws.com/clickhouse-builds/master/amd64compat/clickhouse chmod +x clickhouse ./clickhouse install rm clickhouse wget -O /etc/systemd/system/clickhouse-server.service https://github.com/ClickHouse/ClickHouse/raw/master/packages/clickhouse-server.service systemctl daemon-reload systemctl enable clickhouse-server
By default, systemd limits the number of threads which a service can create, so we'll want to disable that. Run systemctl edit clickhouse-server
and paste the following:
[Service] TasksMax=infinity
Next, paste the following into /etc/clickhouse-server/users.d/csclub-users.xml:
<clickhouse> <readonly> <!-- Grafana needs to change settings queries --> <readonly>2</readonly> </readonly> <users> <default> <!-- The default user should only be allowed to connect from localhost --> <networks> <ip>::1</ip> <ip>127.0.0.1</ip> </networks> <!-- Allow the default user to create new users --> <access_management>1</access_management> <named_collection_control>1</named_collection_control> <show_named_collections>1</show_named_collections> <show_named_collections_secrets>1</show_named_collections_secrets> </default> </users> </clickhouse>
Then paste the following into /etc/clickhouse-server/config/zzz-csclub.xml (we need the zzz prefix because the configuration files are merged in alphabetical order, and we want ours to be applied last):
<clickhouse> <listen_host>127.0.0.1</listen_host> <listen_host>::1</listen_host> <logger> <level>information</level> <size>100M</size> <count>10</count> </logger> <mysql_port></mysql_port> <postgresql_port></postgresql_port> </clickhouse>
Then run systemctl restart clickhouse-server
and make sure that it's running.
Schema
Run clickhouse-client
to get a SQL shell. First we need to create a new database and some users:
CREATE DATABASE vector; CREATE USER vector IDENTIFIED BY 'REPLACE_ME'; GRANT ALL ON vector.* TO vector; CREATE USER grafana IDENTIFIED BY 'REPLACE_ME' SETTINGS PROFILE 'readonly'; GRANT SHOW DATABASES, SHOW TABLES, SELECT ON *.* TO grafana;
In some of our tables, we'll store the two-letter country code instead of a country's full name to save space. So we'll create a dictionary so that we can look up a country's full name. Exit the SQL shell, then, download the CSV file:
wget -O /var/lib/clickhouse/user_files/country_codes.csv 'https://datahub.io/core/country-list/r/data.csv'
Then run clickhouse-client
and create the dictionary:
CREATE DICTIONARY country_codes_dictionary ( Name String, Code String ) PRIMARY KEY Code SOURCE(FILE(path '/var/lib/clickhouse/user_files/country_codes.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 0) LAYOUT(HASHED_ARRAY());
Perform a SELECT to fill the table:
SELECT * FROM country_codes_dictionary;
Now exit the SQL shell and delete the CSV file since we don't need it anymore:
rm /var/lib/clickhouse/user_files/country_codes.csv
Now we need to create the tables for storing our actual log data (after they are transformed by Vector). Create a table for failed SSH logins:
CREATE TABLE vector.failed_ssh_logins ( host LowCardinality(String), timestamp DateTime, ip_address IPv6, username String, country_code LowCardinality(String) ) ENGINE = MergeTree() PRIMARY KEY (host, timestamp) TTL timestamp + INTERVAL 1 MONTH DELETE;
Create a table for storing mirror requests:
CREATE TABLE vector.mirror_requests ( distro LowCardinality(String), timestamp DateTime CODEC(Delta, ZSTD), ip_address IPv6, bytes_sent UInt64, user_agent String, country_code LowCardinality(String), region_name String, city String ) ENGINE = MergeTree() PRIMARY KEY (distro, timestamp, country_code, region_name, city) TTL timestamp + INTERVAL 1 WEEK DELETE;
One of ClickHouse's great features is Materialized Views. These allow us to automatically "forward" data from one table to another, and the second table can use a different storage engine to aggregate data and save space.
We want to calculate the total number of requests and bytes sent for each distro, so let's create a table and view for that:
CREATE TABLE vector.mirror_requests_agg_by_distro ( distro LowCardinality(String), date Date CODEC(Delta, ZSTD), country_code LowCardinality(String), num_requests UInt64, bytes_sent UInt64 ) ENGINE = SummingMergeTree((num_requests, bytes_sent)) PRIMARY KEY (distro, toStartOfMonth(date), date, country_code) TTL date + INTERVAL 1 MONTH GROUP BY distro, toStartOfMonth(date) SET num_requests = sum(num_requests), bytes_sent = sum(bytes_sent), date + INTERVAL 2 YEAR DELETE; CREATE MATERIALIZED VIEW vector.mirror_requests_agg_by_distro_mv TO vector.mirror_requests_agg_by_distro AS SELECT distro, toDate(timestamp) AS date, country_code, sum(1) AS num_requests, sum(bytes_sent) AS bytes_sent FROM vector.mirror_requests GROUP BY distro, date, country_code;
We also wants some stats for Canada specifically:
CREATE TABLE vector.mirror_requests_agg_canada ( distro LowCardinality(String), date Date CODEC(Delta, ZSTD), region_name LowCardinality(String), city String, bytes_sent UInt64, num_requests UInt64 ) ENGINE = SummingMergeTree((bytes_sent, num_requests)) PRIMARY KEY (distro, toStartOfMonth(date), date, region_name, city) TTL date + INTERVAL 1 MONTH GROUP BY distro, toStartOfMonth(date) SET num_requests = sum(num_requests), bytes_sent = sum(bytes_sent), date + INTERVAL 2 YEAR DELETE; CREATE MATERIALIZED VIEW vector.mirror_requests_agg_canada_mv TO vector.mirror_requests_agg_canada AS SELECT distro, toDate(timestamp) as date, region_name, city, sum(bytes_sent) AS bytes_sent, sum(1) AS num_requests FROM vector.mirror_requests WHERE country_code = 'CA' GROUP BY distro, date, region_name, city;
Finally, we'll also keep stats just for the university:
CREATE TABLE vector.mirror_requests_agg_uw ( distro LowCardinality(String), date Date CODEC(Delta, ZSTD), bytes_sent UInt64, num_requests UInt64 ) ENGINE = SummingMergeTree((bytes_sent, num_requests)) PRIMARY KEY (distro, toStartOfMonth(date), date) TTL date + INTERVAL 1 MONTH GROUP BY distro, toStartOfMonth(date) SET num_requests = sum(num_requests), bytes_sent = sum(bytes_sent), date + INTERVAL 2 YEAR DELETE; CREATE MATERIALIZED VIEW vector.mirror_requests_agg_uw_mv TO vector.mirror_requests_agg_uw AS SELECT distro, toDate(timestamp) as date, sum(bytes_sent) AS bytes_sent, sum(1) AS num_requests FROM vector.mirror_requests WHERE isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:129.97.0.0/112') OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:10.0.0.0/104') OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:172.16.0.0/108') OR isIPAddressInRange(IPv6NumToString(ip_address), '::ffff:192.168.0.0/112') OR isIPAddressInRange(IPv6NumToString(ip_address), '2620:101:f000::/47') OR isIPAddressInRange(IPv6NumToString(ip_address), 'fd74:6b6a:8eca::/47') GROUP BY distro, date, region_name, city;
GeoIP database
We'll want to look up geographic information for the IP addresses in our data. To do this, we'll use the MaxMind GeoLite2 databases. Syscom already has a MaxMind account; the password is stored in the usual place. Install the latest geoipupdate package from here, then edit /etc/GeoIP.conf as necessary (use the syscom account ID and license key). Set EditionIDs
to GeoLite2-City
only.
We'll use a systemd timer to run the geoipupdate script periodically. Paste the following into /etc/systemd/system/geoipupdate.service:
[Unit] Description=GeoIP Update Documentation=https://dev.maxmind.com/geoip/updating-databases After=network-online.target [Service] Type=oneshot ExecStart=/usr/bin/geoipupdate Nice=19 IOSchedulingClass=idle IOSchedulingPriority=7 ProtectSystem=strict ReadWritePaths=/usr/share/GeoIP ProtectHome=true PrivateTmp=true PrivateDevices=true ProtectHostname=true ProtectClock=true ProtectKernelTunables=true ProtectKernelModules=true ProtectKernelLogs=true ProtectControlGroups=true LockPersonality=true RestrictRealtime=true
Run systemctl daemon-reload
and then systemctl start geoipupdate
to download the database for the first time.
Now paste the following into /etc/systemd/system/geoipupdate.timer:
[Unit] Description=Automatic GeoIP database update Documentation=https://dev.maxmind.com/geoip/updating-databases [Timer] OnCalendar=monthly RandomizedDelaySec=12h Persistent=true [Install] WantedBy=timers.target
Then run:
systemctl daemon-reload systemctl enable geoipupdate.timer systemctl start geoipupdate.timer
Vector
Vector allows you to create directed acyclic graphs (DAGs) for collecting and processing logs, which gives us a lot of flexibility. It also has a built-in scripting language, Vector Remap Language (VRL) for slicing and dicing data. This allows us to remove fields which we don't need, add new fields which we do need, enrich an event with extra data, etc.
Our data pipeline looks like this: Vector agents -> Vector aggregator -> ClickHouse. We use Grafana for visualization.
We use mutual TLS between the agents and the aggregator to make sure that random people can't send us garbage data:
openssl req -newkey rsa:2048 -nodes -keyout aggregator.key -x509 -out aggregator.crt -days 36500 openssl req -newkey rsa:2048 -nodes -keyout agent.key -x509 -out agent.crt -days 36500 chown vector:vector *.crt *.key
Here is what our vector.toml looks like on the general-use machines; currently, we only use it for collecting failed SSH login attempts.
[sources.source_sshd] type = "journald" include_units = ["ssh.service"] [transforms.remap_sshd] type = "remap" inputs = ["source_sshd"] source = """ parsed, err = parse_regex( .message, r'^(?:Connection (?:closed|reset)|Disconnected) (?:by|from) (?:invalid|authenticating) user (?P<user>[^ ]+) (?P<ip>[0-9.a-f:]+)' ) if is_null(err) { . = { "username": parsed.user, "ip_address": parsed.ip, "host": .host, "timestamp": .timestamp, "job": "vector-sshd" } } """ [transforms.filter_sshd] type = "filter" inputs = ["remap_sshd"] condition = '.job == "vector-sshd"' [sinks.aggregator] type = "vector" inputs = ["filter_sshd"] address = "prometheus:5045" [sinks.aggregator.tls] enabled = true ca_file = "/etc/vector/aggregator.crt" crt_file = "/etc/vector/agent.crt" key_file = "/etc/vector/agent.key" verify_hostname = false
The agent on potassium-benzoate collects NGINX logs:
[sources.source_nginx] type = "file" include = ["/var/log/nginx/access.log"] max_read_bytes = 65536 [transforms.remap_nginx] type = "remap" inputs = ["source_nginx"] source = """ parsed_log, err = parse_nginx_log(.message, "combined") status = parsed_log.status request = string!(parsed_log.request || "") if is_null(err) && status == 200 { parsed_path, err = parse_regex(request, r'^GET /+(?P<distro>[^/? ]+)') distro = parsed_path.distro ignore = [ "server-status", "stats", "robots.txt", "include", "pub", "news", "index.html", "sync.json", "ups", "pool", "dists", "csclub.asc", "csclub.gpg" ] if ( is_null(err) && !includes(ignore, distro) && !contains(request, "..") && !starts_with(request, "#") && !starts_with(request, "%") && !starts_with(request, ".") ) { . = { "distro": distro, "user_agent": parsed_log.agent, "ip_address": parsed_log.client, "bytes_sent": parsed_log.size, "timestamp": parsed_log.timestamp, "job": "vector-mirror" } } } """ ...
Finally, here's the aggregator config, which collects data from each agent and then inserts it into ClickHouse:
[enrichment_tables.enrich_geoip] type = "geoip" path = "/usr/share/GeoIP/GeoLite2-City.mmdb" [sources.source_agents] type = "vector" address = "[::]:5045" [sources.source_agents.tls] enabled = true ca_file = "/etc/vector/agent.crt" crt_file = "/etc/vector/aggregator.crt" key_file = "/etc/vector/aggregator.key" verify_hostname = false [transforms.transform_route] type = "route" inputs = ["source_agents"] route.sshd = '.job == "vector-sshd"' route.mirror = '.job == "vector-mirror"' [transforms.transform_sshd] type = "remap" inputs = ["transform_route.sshd"] source = """ ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {} if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \ || ip_cidr_contains!("172.16.0.0/12", .ip_address) \ || ip_cidr_contains!("192.168.0.0/16", .ip_address)) { ipinfo.country_code = "CA"; ipinfo.region_name = "Ontario"; ipinfo.city_name = "Waterloo"; } . = { "host": .host, "timestamp": .timestamp, "username": .username, "ip_address": ip_to_ipv6!(.ip_address), "country_code": ipinfo.country_code || "" } """ [transforms.transform_mirror] type = "remap" inputs = ["transform_route.mirror"] source = """ ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {} if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \ || ip_cidr_contains!("172.16.0.0/12", .ip_address) \ || ip_cidr_contains!("192.168.0.0/16", .ip_address)) { ipinfo.country_code = "CA"; ipinfo.region_name = "Ontario"; ipinfo.city_name = "Waterloo"; } . = { "distro": .distro, "timestamp": .timestamp, "ip_address": ip_to_ipv6!(.ip_address), "bytes_sent": .bytes_sent, "user_agent": .user_agent, "country_code": ipinfo.country_code || "", "region_name": ipinfo.region_name || "", "city": ipinfo.city_name || "" } """ [transforms.transform_unmatched] type = "remap" inputs = ["transform_route._unmatched"] source = """ log("unrecognized job: " + string!(.job || "null"), level: "warn") """ [sinks.sink_unmatched] type = "blackhole" inputs = ["transform_unmatched"] print_interval_secs = 0 [sinks.clickhouse_sshd] type = "clickhouse" inputs = ["transform_sshd"] encoding.timestamp_format = "unix" endpoint = "$CLICKHOUSE_ENDPOINT" database = "$CLICKHOUSE_DATABASE" table = "failed_ssh_logins" [sinks.clickhouse_sshd.auth] strategy = "basic" user = "$CLICKHOUSE_USER" password = "$CLICKHOUSE_PASSWORD" ...
Beats, Logstash and Loki (old)
We previously used Elastic Beats, Logstash and Grafana Loki for collecting and storing logs. One day I tried to upgrade Logstash and it exploded so badly that I figured it would be easier to just switch to Vector instead.
The sections below are kept for historical purposes only and are no longer accurate.
We use a combination of Elastic Beats, Logstash and Loki for collecting, storing and querying our logs; for visualization, we use Grafana. Logstash and Loki are currently both running in the prometheus VM.
The reason why I chose Loki over Elasticsearch is because Loki is very space efficient with regards to storage. It also consumes way less RAM and CPU. This means that we can collect a lot of logs without worrying too much about resource usage.
We have Journalbeat and/or Filebeat running on some of our machines to collect logs from sshd, Apache and NGINX. The Beats send these logs to Logstash, which does some pre-processing. The most useful contribution by Logstash is its GeoIP plugin, which allows us to enrich the logs with some geographical information from IP addresses (e.g. add city and country). Logstash sends these logs to Loki, and we can then view these from Grafana.
Note: Sometimes the Loki output plugin for Logstash disappears after a reboot or an upgrade. If you see Logstash complaining about this in the journald logs, run this:
cd /usr/share/logstash bin/logstash-plugin install logstash-output-loki systemctl restart logstash
See here for details.
The language for querying logs in Loki is LogQL, which, syntactically, is very similar to PromQL. If you have already learned PromQL, then you should be able to pick up LogQL very easily. You can try out some LogQL queries from the 'Explore' page on Grafana; make sure you toggle the data source to 'Loki' in the top left corner. For the 'topk' queries, you will also want to toggle 'Query type' to 'Instant' rather than 'Range'.
LogQL examples
Here are the number of failed SSH login attempts for each host for a given time range:
sum by (hostname) ( count_over_time( {job="logstash-sshd"} [$__range] ) )
Note that $__range
is a special global variable in Grafana which is equal to the time range in the top right corner of a chart.
Here are the top 10 IP addresses from which failed SSH login attempts arrived, for a given host and time range:
topk(10, sum by (ip_address) ( count_over_time( {job="logstash-sshd",hostname="$hostname"} | json | __error__ = "" [$__range] ) ) )
$hostname is a chart variable, which can be configured from a chart's settings.
I configured Logstash to send logs to Loki as JSON, but it's a rather hacky solution, so occasionally invalid JSON is sent.
Here are the number of HTTP requests for the 15 distros on our mirror from the last hour:
topk(15, sum by (distro) ( count_over_time( {job="logstash-nginx"} | json | __error__ = "" | distro != "server-status" [1h] ) ) )
Here are the number of total bytes sent over HTTP for the top 15 distros from the last hour. Note the use of the unwrap
operator.
topk(15, sum by (distro) ( sum_over_time( {job="logstash-nginx"} | json | __error__ = "" | distro != "server-status" | unwrap bytes [1h] ) ) )
You can see more examples on the Mirror Requests dashboard on Grafana.
Avoid high cardinality
For both Prometheus and Loki, you must avoid high cardinality labels at all costs. By high cardinality, I mean labels which can take on a very large number of values; for example, using a label to store IP addresses would be a very bad idea. This is because Prometheus and Loki use labels to store metrics/logs efficiently with compression; when two metrics have two different sets of labels, they cannot be stored together, which increases the storage space usage.
With Loki, you can extract labels from your logs inside your query dynamically. One way to do this is with the json
operator; there are other ways to do this as well (see the LogQL docs). This basically means that we get infinite cardinality from our logs, the tradeoff being that queries may take longer to execute.
Also, be very careful about what you send to Loki from Logstash - every field in a Logstash message becomes a Loki label. Usage of the prune
command in Logstash is highly recommended.