Observability: Difference between revisions
No edit summary |
m (→Logging) |
||
Line 38: | Line 38: | ||
=== Vector === |
=== Vector === |
||
Vector allows you to create directed acyclic |
Vector allows you to create directed acyclic graphs (DAGs) for collecting and processing logs, which gives us a lot of flexibility. It also has a built-in scripting language, [https://vector.dev/docs/reference/vrl/ Vector Remap Language (VRL)] for slicing and dicing data. This allows us to remove fields which we don't need, add new fields which we do need, enrich an event with extra data, etc. |
||
Our data pipeline looks like this: Vector agents -> Vector aggregator -> ClickHouse. We use Grafana for visualization. |
Our data pipeline looks like this: Vector agents -> Vector aggregator -> ClickHouse. We use Grafana for visualization. |
||
Line 71: | Line 71: | ||
} |
} |
||
} |
} |
||
""" |
|||
[transforms.filter_sshd] |
[transforms.filter_sshd] |
||
Line 87: | Line 88: | ||
key_file = "/etc/vector/agent.key" |
key_file = "/etc/vector/agent.key" |
||
verify_hostname = false |
verify_hostname = false |
||
</pre> |
|||
The agent on potassium-benzoate collects NGINX logs: |
|||
<pre> |
|||
[sources.source_nginx] |
|||
type = "file" |
|||
include = ["/var/log/nginx/access.log"] |
|||
max_read_bytes = 65536 |
|||
[transforms.remap_nginx] |
|||
type = "remap" |
|||
inputs = ["source_nginx"] |
|||
source = """ |
|||
parsed_log, err = parse_nginx_log(.message, "combined") |
|||
status = parsed_log.status |
|||
request = string!(parsed_log.request || "") |
|||
if is_null(err) && status == 200 && !contains(request, "/..") { |
|||
parsed_path, err = parse_regex(request, r'^GET /+(?P<distro>[^/? ]+)') |
|||
distro = parsed_path.distro |
|||
ignore = [ |
|||
"server-status", "stats", "robots.txt", |
|||
"include", "pub", "news", "index.html", "sync.json", "ups", |
|||
"pool", "dists", "csclub.asc", "csclub.gpg" |
|||
] |
|||
if is_null(err) && !includes(ignore, distro) { |
|||
. = { |
|||
"distro": distro, |
|||
"user_agent": parsed_log.agent, |
|||
"ip_address": parsed_log.client, |
|||
"bytes_sent": parsed_log.size, |
|||
"timestamp": parsed_log.timestamp, |
|||
"job": "vector-mirror" |
|||
} |
|||
} |
|||
} |
|||
""" |
""" |
||
... |
|||
</pre> |
|||
Finally, here's the aggregator config, which collects data from each agent and then inserts it into ClickHouse: |
|||
<pre> |
|||
[enrichment_tables.enrich_geoip] |
|||
type = "geoip" |
|||
path = "/usr/share/GeoIP/GeoLite2-City.mmdb" |
|||
[sources.source_agents] |
|||
type = "vector" |
|||
address = "[::]:5045" |
|||
[sources.source_agents.tls] |
|||
enabled = true |
|||
ca_file = "/etc/vector/agent.crt" |
|||
crt_file = "/etc/vector/aggregator.crt" |
|||
key_file = "/etc/vector/aggregator.key" |
|||
verify_hostname = false |
|||
[transforms.transform_route] |
|||
type = "route" |
|||
inputs = ["source_agents"] |
|||
route.sshd = '.job == "vector-sshd"' |
|||
route.mirror = '.job == "vector-mirror"' |
|||
[transforms.transform_sshd] |
|||
type = "remap" |
|||
inputs = ["transform_route.sshd"] |
|||
source = """ |
|||
ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {} |
|||
if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \ |
|||
|| ip_cidr_contains!("172.16.0.0/12", .ip_address) \ |
|||
|| ip_cidr_contains!("192.168.0.0/16", .ip_address)) { |
|||
ipinfo.country_code = "CA"; |
|||
ipinfo.region_name = "Ontario"; |
|||
ipinfo.city_name = "Waterloo"; |
|||
} |
|||
. = { |
|||
"host": .host, |
|||
"timestamp": .timestamp, |
|||
"username": .username, |
|||
"ip_address": ip_to_ipv6!(.ip_address), |
|||
"country_code": ipinfo.country_code || "" |
|||
} |
|||
""" |
|||
[transforms.transform_mirror] |
|||
type = "remap" |
|||
inputs = ["transform_route.mirror"] |
|||
source = """ |
|||
ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {} |
|||
if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \ |
|||
|| ip_cidr_contains!("172.16.0.0/12", .ip_address) \ |
|||
|| ip_cidr_contains!("192.168.0.0/16", .ip_address)) { |
|||
ipinfo.country_code = "CA"; |
|||
ipinfo.region_name = "Ontario"; |
|||
ipinfo.city_name = "Waterloo"; |
|||
} |
|||
. = { |
|||
"distro": .distro, |
|||
"timestamp": .timestamp, |
|||
"ip_address": ip_to_ipv6!(.ip_address), |
|||
"bytes_sent": .bytes_sent, |
|||
"user_agent": .user_agent, |
|||
"country_code": ipinfo.country_code || "", |
|||
"region_name": ipinfo.region_name || "", |
|||
"city": ipinfo.city_name || "" |
|||
} |
|||
""" |
|||
[transforms.transform_unmatched] |
|||
type = "remap" |
|||
inputs = ["transform_route._unmatched"] |
|||
source = """ |
|||
log("unrecognized job: " + string!(.job || "null"), level: "warn") |
|||
""" |
|||
[sinks.sink_unmatched] |
|||
type = "blackhole" |
|||
inputs = ["transform_unmatched"] |
|||
print_interval_secs = 0 |
|||
[sinks.clickhouse_sshd] |
|||
type = "clickhouse" |
|||
inputs = ["transform_sshd"] |
|||
encoding.timestamp_format = "unix" |
|||
endpoint = "$CLICKHOUSE_ENDPOINT" |
|||
database = "$CLICKHOUSE_DATABASE" |
|||
table = "failed_ssh_logins" |
|||
[sinks.clickhouse_sshd.auth] |
|||
strategy = "basic" |
|||
user = "$CLICKHOUSE_USER" |
|||
password = "$CLICKHOUSE_PASSWORD" |
|||
... |
|||
</pre> |
</pre> |
||
Revision as of 19:09, 28 September 2023
There are three pillars of observability: metrics, logging and tracing. We are only interested in the first two.
Metrics
All of our machines are, or at least should be, running the Prometheus node exporter. This collects and sends machine metrics (e.g. RAM used, disk space) to the Prometheus server running at https://prometheus.csclub.uwaterloo.ca (currently a VM on phosphoric-acid). There are a few specialized exporters running on several other machines; a Postfix exporter is running on mail, an Apache exporter is running on caffeine, and an NGINX expoter is running on potassium-benzoate. There is also a custom exporter written by syscom running on potassium-benzoate for mirror stats.
Most of the exporters use mutual TLS authentication with the Prometheus server. I set the expiration date for the TLS certs to 10 years. If you are reading this and it is 2031 or later, then go update the certs.
I highly suggest becoming familiar with PromQL, the query language for Prometheus. You can run and visualize some queries at https://prometheus.csclub.uwaterloo.ca/prometheus. For example, here is a query to determine which machines are up or down:
up{job="node_exporter"}
Here's how we determine if a machine has NFS mounted. This will return 1 for machines which have NFS mounted, but will not return any records for machines which do not have NFS mounted. (We ignore the actual value of node_filesystem_device_error because it returns 1 for machines using Kerberized NFS.)
count by (instance) (node_filesystem_device_error{mountpoint="/users", fstype="nfs"})
Now this is a rather complicated expression which can return one of three values:
- 0: the machine is down
- 1: the machine is up, but NFS is not mounted
- 2: the machine is up and NFS is mounted
The or operator in PromQL is key here.
sum by (instance) ( (count by (instance) (node_filesystem_device_error{mountpoint="/users", fstype="nfs"})) or up{job="node_exporter"} )
We also use AlertManager to send email alerts from Prometheus metrics. We should figure out how to also send messages to IRC or similar.
We also use the Blackbox prober exporter to check if some of our web-based services are up.
We make some pretty charts on Grafana (https://prometheus.csclub.uwaterloo.ca) from PromQL queries. Grafana also has an 'Explorer' page where you can test out some queries before making chart panels from them.
Logging
We now use Vector for collecting and transforming logs, and ClickHouse for storing log data.
Vector
Vector allows you to create directed acyclic graphs (DAGs) for collecting and processing logs, which gives us a lot of flexibility. It also has a built-in scripting language, Vector Remap Language (VRL) for slicing and dicing data. This allows us to remove fields which we don't need, add new fields which we do need, enrich an event with extra data, etc.
Our data pipeline looks like this: Vector agents -> Vector aggregator -> ClickHouse. We use Grafana for visualization.
We use mutual TLS between the agents and the aggregator to make sure that random people can't send us garbage data:
openssl req -newkey rsa:2048 -nodes -keyout aggregator.key -x509 -out aggregator.crt -days 36500 openssl req -newkey rsa:2048 -nodes -keyout agent.key -x509 -out agent.crt -days 36500 chown vector:vector *.crt *.key
Here is what our vector.toml looks like on the general-use machines; currently, we only use it for collecting failed SSH login attempts.
[sources.source_sshd] type = "journald" include_units = ["ssh.service"] [transforms.remap_sshd] type = "remap" inputs = ["source_sshd"] source = """ parsed, err = parse_regex( .message, r'^(?:Connection (?:closed|reset)|Disconnected) (?:by|from) (?:invalid|authenticating) user (?P<user>[^ ]+) (?P<ip>[0-9.a-f:]+)' ) if is_null(err) { . = { "username": parsed.user, "ip_address": parsed.ip, "host": .host, "timestamp": .timestamp, "job": "vector-sshd" } } """ [transforms.filter_sshd] type = "filter" inputs = ["remap_sshd"] condition = '.job == "vector-sshd"' [sinks.aggregator] type = "vector" inputs = ["filter_sshd"] address = "prometheus:5045" [sinks.aggregator.tls] enabled = true ca_file = "/etc/vector/aggregator.crt" crt_file = "/etc/vector/agent.crt" key_file = "/etc/vector/agent.key" verify_hostname = false
The agent on potassium-benzoate collects NGINX logs:
[sources.source_nginx] type = "file" include = ["/var/log/nginx/access.log"] max_read_bytes = 65536 [transforms.remap_nginx] type = "remap" inputs = ["source_nginx"] source = """ parsed_log, err = parse_nginx_log(.message, "combined") status = parsed_log.status request = string!(parsed_log.request || "") if is_null(err) && status == 200 && !contains(request, "/..") { parsed_path, err = parse_regex(request, r'^GET /+(?P<distro>[^/? ]+)') distro = parsed_path.distro ignore = [ "server-status", "stats", "robots.txt", "include", "pub", "news", "index.html", "sync.json", "ups", "pool", "dists", "csclub.asc", "csclub.gpg" ] if is_null(err) && !includes(ignore, distro) { . = { "distro": distro, "user_agent": parsed_log.agent, "ip_address": parsed_log.client, "bytes_sent": parsed_log.size, "timestamp": parsed_log.timestamp, "job": "vector-mirror" } } } """ ...
Finally, here's the aggregator config, which collects data from each agent and then inserts it into ClickHouse:
[enrichment_tables.enrich_geoip] type = "geoip" path = "/usr/share/GeoIP/GeoLite2-City.mmdb" [sources.source_agents] type = "vector" address = "[::]:5045" [sources.source_agents.tls] enabled = true ca_file = "/etc/vector/agent.crt" crt_file = "/etc/vector/aggregator.crt" key_file = "/etc/vector/aggregator.key" verify_hostname = false [transforms.transform_route] type = "route" inputs = ["source_agents"] route.sshd = '.job == "vector-sshd"' route.mirror = '.job == "vector-mirror"' [transforms.transform_sshd] type = "remap" inputs = ["transform_route.sshd"] source = """ ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {} if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \ || ip_cidr_contains!("172.16.0.0/12", .ip_address) \ || ip_cidr_contains!("192.168.0.0/16", .ip_address)) { ipinfo.country_code = "CA"; ipinfo.region_name = "Ontario"; ipinfo.city_name = "Waterloo"; } . = { "host": .host, "timestamp": .timestamp, "username": .username, "ip_address": ip_to_ipv6!(.ip_address), "country_code": ipinfo.country_code || "" } """ [transforms.transform_mirror] type = "remap" inputs = ["transform_route.mirror"] source = """ ipinfo = get_enrichment_table_record("enrich_geoip", {"ip": .ip_address}) ?? {} if is_ipv4!(.ip_address) && (ip_cidr_contains!("10.0.0.0/8", .ip_address) \ || ip_cidr_contains!("172.16.0.0/12", .ip_address) \ || ip_cidr_contains!("192.168.0.0/16", .ip_address)) { ipinfo.country_code = "CA"; ipinfo.region_name = "Ontario"; ipinfo.city_name = "Waterloo"; } . = { "distro": .distro, "timestamp": .timestamp, "ip_address": ip_to_ipv6!(.ip_address), "bytes_sent": .bytes_sent, "user_agent": .user_agent, "country_code": ipinfo.country_code || "", "region_name": ipinfo.region_name || "", "city": ipinfo.city_name || "" } """ [transforms.transform_unmatched] type = "remap" inputs = ["transform_route._unmatched"] source = """ log("unrecognized job: " + string!(.job || "null"), level: "warn") """ [sinks.sink_unmatched] type = "blackhole" inputs = ["transform_unmatched"] print_interval_secs = 0 [sinks.clickhouse_sshd] type = "clickhouse" inputs = ["transform_sshd"] encoding.timestamp_format = "unix" endpoint = "$CLICKHOUSE_ENDPOINT" database = "$CLICKHOUSE_DATABASE" table = "failed_ssh_logins" [sinks.clickhouse_sshd.auth] strategy = "basic" user = "$CLICKHOUSE_USER" password = "$CLICKHOUSE_PASSWORD" ...
Beats, Logstash and Loki (old)
We previously used Elastic Beats, Logstash and Grafana Loki for collecting and storing logs. One day I tried to upgrade Logstash and it exploded so badly that I figured it would be easier to just switch to Vector instead. Also, Loki's performance on a monolithic setup was abysmal. It would choke when we tried to aggregate, like, more than a single day's worth of data. ClickHouse allows us to calculate aggregate statistics on several months worth of data in a few milliseconds.
The sections below are kept for historical purposes only and are no longer accurate.
We use a combination of Elastic Beats, Logstash and Loki for collecting, storing and querying our logs; for visualization, we use Grafana. Logstash and Loki are currently both running in the prometheus VM.
The reason why I chose Loki over Elasticsearch is because Loki is very space efficient with regards to storage. It also consumes way less RAM and CPU. This means that we can collect a lot of logs without worrying too much about resource usage.
We have Journalbeat and/or Filebeat running on some of our machines to collect logs from sshd, Apache and NGINX. The Beats send these logs to Logstash, which does some pre-processing. The most useful contribution by Logstash is its GeoIP plugin, which allows us to enrich the logs with some geographical information from IP addresses (e.g. add city and country). Logstash sends these logs to Loki, and we can then view these from Grafana.
Note: Sometimes the Loki output plugin for Logstash disappears after a reboot or an upgrade. If you see Logstash complaining about this in the journald logs, run this:
cd /usr/share/logstash bin/logstash-plugin install logstash-output-loki systemctl restart logstash
See here for details.
The language for querying logs in Loki is LogQL, which, syntactically, is very similar to PromQL. If you have already learned PromQL, then you should be able to pick up LogQL very easily. You can try out some LogQL queries from the 'Explore' page on Grafana; make sure you toggle the data source to 'Loki' in the top left corner. For the 'topk' queries, you will also want to toggle 'Query type' to 'Instant' rather than 'Range'.
LogQL examples
Here are the number of failed SSH login attempts for each host for a given time range:
sum by (hostname) ( count_over_time( {job="logstash-sshd"} [$__range] ) )
Note that $__range
is a special global variable in Grafana which is equal to the time range in the top right corner of a chart.
Here are the top 10 IP addresses from which failed SSH login attempts arrived, for a given host and time range:
topk(10, sum by (ip_address) ( count_over_time( {job="logstash-sshd",hostname="$hostname"} | json | __error__ = "" [$__range] ) ) )
$hostname is a chart variable, which can be configured from a chart's settings.
I configured Logstash to send logs to Loki as JSON, but it's a rather hacky solution, so occasionally invalid JSON is sent.
Here are the number of HTTP requests for the 15 distros on our mirror from the last hour:
topk(15, sum by (distro) ( count_over_time( {job="logstash-nginx"} | json | __error__ = "" | distro != "server-status" [1h] ) ) )
Here are the number of total bytes sent over HTTP for the top 15 distros from the last hour. Note the use of the unwrap
operator.
topk(15, sum by (distro) ( sum_over_time( {job="logstash-nginx"} | json | __error__ = "" | distro != "server-status" | unwrap bytes [1h] ) ) )
You can see more examples on the Mirror Requests dashboard on Grafana.
Avoid high cardinality
For both Prometheus and Loki, you must avoid high cardinality labels at all costs. By high cardinality, I mean labels which can take on a very large number of values; for example, using a label to store IP addresses would be a very bad idea. This is because Prometheus and Loki use labels to store metrics/logs efficiently with compression; when two metrics have two different sets of labels, they cannot be stored together, which increases the storage space usage.
With Loki, you can extract labels from your logs inside your query dynamically. One way to do this is with the json
operator; there are other ways to do this as well (see the LogQL docs). This basically means that we get infinite cardinality from our logs, the tradeoff being that queries may take longer to execute.
Also, be very careful about what you send to Loki from Logstash - every field in a Logstash message becomes a Loki label. Usage of the prune
command in Logstash is highly recommended.