Log Management Best Practices: From Collection to Compliance in 2026
Your logs are either solving problems or creating them. Learn how to build a log management pipeline that scales, from structured logging and shipping architectures to retention policies and compliance requirements.
The Log Problem Nobody Talks About
Every engineering team has experienced it: production goes down at 2 AM, and you need to understand what happened. You open your logging platform and find yourself staring at thousands of unstructured log lines, each formatted differently, timestamps in different time zones, and no way to correlate events across services. Thirty minutes later, you're still grep-ing through raw files because your "log management system" is really just expensive storage.
The gap between collecting logs and actually using them to solve problems is where most organizations fail. They invest in platforms, ship everything they generate, and then wonder why their monthly logging bill exceeds their compute costs while their mean time to resolution keeps climbing.
Effective log management isn't about capturing more data—it's about capturing the right data in a format that makes problems findable. This requires intentional decisions at every stage: what to log, how to structure it, where to ship it, how long to keep it, and who can access it. Get these decisions wrong, and logs become liability rather than asset.
Structured Logging: The Foundation Everything Else Depends On
Unstructured logs are text strings that humans might understand but machines cannot query. Structured logs are data that both humans and machines can work with. This distinction determines whether your logging investment pays dividends or just accumulates cost.
Consider the difference:
# Unstructured log - human readable but machine hostile
2025-01-07 14:23:45 INFO User john@example.com logged in from 192.168.1.100 after 2 failed attempts
# Structured log - queryable and analyzable
{
"timestamp": "2025-01-07T14:23:45.123Z",
"level": "info",
"service": "auth-service",
"event": "user_login",
"user_email": "john@example.com",
"source_ip": "192.168.1.100",
"failed_attempts": 2,
"session_id": "sess_abc123",
"trace_id": "trace_xyz789"
}
The structured version can be queried: "Show me all logins from IP addresses with more than 3 failed attempts in the last hour." The unstructured version requires regex parsing and hope.
Implementing Structured Logging
Every major language has libraries that make structured logging straightforward. The key is standardizing on a schema across your organization—consistent field names, consistent timestamp formats, consistent severity levels.
Python with structlog:
# logging_config.py
import structlog
import logging
import sys
def configure_logging(service_name: str, environment: str):
"""Configure structured JSON logging for production use."""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
c
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_
)
# Add default context available in all log entries
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
service=service_name,
envir
)
# Usage in application code
logger = structlog.get_logger()
def process_order(order_id: str, user_id: str):
# Bind request-specific context
log = logger.bind(order_id=order_id, user_id=user_id)
log.info("processing_order_started")
try:
# Business logic here
result = execute_order(order_id)
log.info("processing_order_completed",
total_amount=result.amount,
items_count=result.items)
except PaymentError as e:
log.error("processing_order_failed",
error_type="payment_failure",
error_message=str(e))
raise
Go with zerolog:
package main
import (
"os"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
func init() {
// Configure global logger
zerolog.TimeFieldFormat = time.RFC3339Nano
zerolog.SetGlobalLevel(zerolog.InfoLevel)
log.Logger = zerolog.New(os.Stdout).With().
Timestamp().
Str("service", "order-service").
Str("environment", os.Getenv("ENVIRONMENT")).
Logger()
}
func ProcessOrder(orderID string, userID string) error {
logger := log.With().
Str("order_id", orderID).
Str("user_id", userID).
Logger()
logger.Info().Msg("processing_order_started")
result, err := executeOrder(orderID)
if err != nil {
logger.Error().
Err(err).
Str("error_type", "order_execution_failed").
Msg("processing_order_failed")
return err
}
logger.Info().
Float64("total_amount", result.Amount).
Int("items_count", result.ItemsCount).
Msg("processing_order_completed")
return nil
}
Node.js with pino:
// logger.js
const pino = require('pino');
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
formatters: {
level: (label) => ({ level: label }),
},
timestamp: pino.stdTimeFunctions.isoTime,
base: {
service: process.env.SERVICE_NAME,
environment: process.env.NODE_ENV,
},
});
module.exports = logger;
// Usage in application
const logger = require('./logger');
async function processOrder(orderId, userId) {
const log = logger.child({ order_id: orderId, user_id: userId });
log.info('processing_order_started');
try {
const result = await executeOrder(orderId);
log.info({
total_amount: result.amount,
items_count: result.items.length
}, 'processing_order_completed');
} catch (error) {
log.error({
error_type: error.name,
error_message: error.message,
stack: error.stack
}, 'processing_order_failed');
throw error;
}
}
Log Levels: Use Them Consistently
Log levels exist to enable filtering. Without consistent usage, they're meaningless:
Level When to Use Examples
─────────────────────────────────────────────────────────────────────────────────
TRACE Extremely detailed debugging (disabled in prod) Function entry/exit, variable values
DEBUG Diagnostic information for troubleshooting SQL queries, API request details
INFO Normal operation milestones Request completed, job finished
WARN Unexpected but handled conditions Retry succeeded, deprecated API used
ERROR Failures requiring attention Payment failed, external API timeout
FATAL Unrecoverable errors causing shutdown Database connection lost, out of memory
Configure log levels per environment:
# config/logging.yaml
development:
level: debug
format: pretty
staging:
level: debug
format: json
production:
level: info
format: json
# Enable debug for specific services during incidents
overrides:
payment-service: debug
Log Shipping: Getting Logs Where They Need to Go
Structured logs sitting on individual servers aren't useful. They need to flow to a centralized platform where they can be searched, correlated, and analyzed. This is the log shipping layer—agents that collect logs from various sources and forward them to your storage and analysis backend.
The Modern Log Shipping Stack
Three tools dominate production log shipping: Fluent Bit, Fluentd, and Vector. Each has different strengths.
Fluent Bit is lightweight and designed for edge collection. It runs on resource-constrained environments (containers, IoT devices) and forwards logs to aggregators or directly to backends. Most Kubernetes deployments use Fluent Bit as a DaemonSet.
Fluentd is the heavier sibling—more plugins, more processing capability, but higher resource usage. It often serves as an aggregation layer receiving logs from Fluent Bit collectors.
Vector is the newer entrant from Datadog, written in Rust with a focus on performance and observability pipeline unification (logs, metrics, traces).
Vector Configuration for Multi-Destination Shipping
Vector excels at routing logs to multiple destinations with transformation:
# vector.toml
[sources.kubernetes_logs]
type = "kubernetes_logs"
auto_partial_merge = true
exclude_paths_glob_patterns = ["**/kube-system/**"]
[transforms.parse_json]
type = "remap"
inputs = ["kubernetes_logs"]
source = '''
# Parse JSON logs if present
if exists(.message) && is_string(.message) {
parsed, err = parse_json(.message)
if err == null {
. = merge(., parsed)
del(.message)
}
}
# Standardize timestamp
.timestamp = to_timestamp!(.timestamp) ?? now()
# Add processing metadata
.vector_processed_at = now()
.cluster = get_env_var!("CLUSTER_NAME")
'''
[transforms.filter_noise]
type = "filter"
inputs = ["parse_json"]
c'
# Drop health check logs
!contains(string!(.path) ?? "", "/health") &&
!contains(string!(.path) ?? "", "/ready") &&
# Drop debug logs in production
.level != "debug"
'''
[transforms.sample_high_volume]
type = "sample"
inputs = ["filter_noise"]
rate = 10
key_field = "trace_id"
exclude = '''
# Never sample errors or warnings
.level == "error" || .level == "warn"
'''
[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["sample_high_volume"]
endpoints = ["https://elasticsearch.example.com:9200"]
bulk.index = "logs-%Y.%m.%d"
auth.strategy = "basic"
auth.user = "${ELASTICSEARCH_USER}"
auth.password = "${ELASTICSEARCH_PASSWORD}"
[sinks.s3_archive]
type = "aws_s3"
inputs = ["filter_noise"]
bucket = "company-logs-archive"
key_prefix = "logs/{{ kubernetes.namespace }}/{{ kubernetes.pod_name }}/%Y/%m/%d/"
compression = "gzip"
encoding.codec = "json"
[sinks.datadog]
type = "datadog_logs"
inputs = ["sample_high_volume"]
default_api_key = "${DATADOG_API_KEY}"
site = "datadoghq.com"
Verify Vector is processing logs:
# Check Vector internal metrics
curl -s http://localhost:8686/metrics | grep vector_
# View component topology
vector top
# Validate configuration
vector validate vector.toml
# Test with sample data
echo '{"level":"info","message":"test"}' | vector --config vector.toml
Choosing Your Log Backend
The log backend decision has long-term cost and operational implications. The three primary options serve different needs.
Elasticsearch (ELK Stack)
Elasticsearch remains the most deployed option, offering powerful full-text search and aggregation. The ELK stack (Elasticsearch, Logstash, Kibana) or its modern variant (Elasticsearch, Beats, Kibana) provides a complete solution.
# Deploy Elasticsearch on Kubernetes with ECK operator
kubectl create -f https://download.elastic.co/downloads/eck/2.10.0/crds.yaml
kubectl apply -f https://download.elastic.co/downloads/eck/2.10.0/operator.yaml
# Create Elasticsearch cluster
cat <<EOF | kubectl apply -f -
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata-blocked:
name: logs
namespace: logging
spec:
version: 8.11.0
nodeSets:
- name: default
count: 3
config:
node.store.allow_mmap: false
podTemplate:
spec:
containers:
- name: elasticsearch
resources:
limits:
memory: 4Gi
cpu: 2
volumeClaimTemplates:
- metadata-blocked:
name: elasticsearch-data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi
storageClassName: fast-ssd
EOF
Query Elasticsearch for troubleshooting:
# Search for errors in the last hour
curl -X GET "elasticsearch:9200/logs-*/_search" \
-H "Content-Type: application/json" \
-d '{
"query": {
"bool": {
"must": [
{"term": {"level": "error"}},
{"range": {"@timestamp": {"gte": "now-1h"}}}
]
}
},
"sort": [{"@timestamp": "desc"}],
"size": 100
}'
# Aggregate errors by service
curl -X GET "elasticsearch:9200/logs-*/_search" \
-H "Content-Type: application/json" \
-d '{
"size": 0,
"query": {
"bool": {
"must": [
{"term": {"level": "error"}},
{"range": {"@timestamp": {"gte": "now-24h"}}}
]
}
},
"aggs": {
"by_service": {
"terms": {"field": "service.keyword", "size": 20}
}
}
}'
# Find logs correlated with a trace ID
curl -X GET "elasticsearch:9200/logs-*/_search" \
-H "Content-Type: application/json" \
-d '{
"query": {
"term": {"trace_id": "abc123xyz"}
},
"sort": [{"@timestamp": "asc"}]
}'
Grafana Loki
Loki takes a different approach—it indexes only labels (metadata), not log content. This dramatically reduces storage and operational costs while still enabling effective log exploration. Loki pairs with Grafana for visualization and uses LogQL for queries.
# loki-config.yaml
apiVersion: v1
kind: ConfigMap
metadata-blocked:
name: loki-config
namespace: logging
data-blocked:
loki.yaml: |
auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
common:
path_prefix: /loki
storage:
filesystem:
chunks_directory: /loki/chunks
rules_directory: /loki/rules
replication_factor: 1
ring:
instance_addr: 127.0.0.1
kvstore:
store: inmemory
schema_config:
configs:
- from: 2024-01-01
store: tsdb
object_store: filesystem
schema: v12
index:
prefix: index_
period: 24h
storage_config:
tsdb_shipper:
active_index_directory: /loki/tsdb-index
cache_location: /loki/tsdb-cache
cache_ttl: 24h
limits_config:
retention_period: 720h
max_query_parallelism: 32
max_query_series: 10000
chunk_store_config:
max_look_back_period: 0s
table_manager:
retention_deletes_enabled: true
retention_period: 720h
LogQL queries for common scenarios:
# View logs from a specific service
curl -G -s "http://loki:3100/loki/api/v1/query_range" \
--data-urlencode 'query={service="payment-service"}' \
--data-urlencode 'limit=100' | jq '.data.result[].values[][1]'
# Find error logs across all services
curl -G -s "http://loki:3100/loki/api/v1/query_range" \
--data-urlencode 'query={level="error"} |= "timeout"' \
--data-urlencode 'start=1704600000000000000' \
--data-urlencode 'end=1704686400000000000'
# Count errors per service over time
curl -G -s "http://loki:3100/loki/api/v1/query" \
--data-urlencode 'query=sum by (service) (count_over_time({level="error"}[1h]))'
# Parse JSON and filter by field
curl -G -s "http://loki:3100/loki/api/v1/query_range" \
--data-urlencode 'query={service="api-gateway"} | json | status_code >= 500'
# Calculate error rate
curl -G -s "http://loki:3100/loki/api/v1/query" \
--data-urlencode 'query=
sum(count_over_time({service="checkout"} | json | level="error"[5m]))
/
sum(count_over_time({service="checkout"}[5m]))
'
Cost Comparison
Log storage costs vary dramatically by approach:
Scenario: 100GB/day log ingestion, 30-day retention
Elasticsearch (self-managed):
├── Storage: 3TB (compressed) × $0.10/GB = $300/month
├── Compute: 6 nodes × $200/m
├── Operations: ~0.5 FTE = $5,000/month
└── Total: ~$6,500/month
Grafana Loki (self-managed):
├── Storage: 1TB (label-only index) × $0.10/GB = $100/month
├── Compute: 3 nodes × $150/m
├── Operations: ~0.25 FTE = $2,500/month
└── Total: ~$3,050/month
Datadog Logs:
├── Ingestion: 100GB × $0.10/GB × 30 = $300/month
├── Retention: Included for 15 days, archive for longer
├── Operations: Minimal
└── Total: ~$300/month + archive costs
Splunk Cloud:
├── Ingestion: 100GB × 30 × $150/GB/m
├── (Splunk pricing is volume-based and significantly higher)
└── Total: Varies significantly by contract
Retention Policies: Balancing Compliance and Cost
Log retention isn't just about storage costs—it's about compliance requirements, operational needs, and legal discovery obligations. Different log types have different retention requirements.
# retention-policies.yaml
# Define retention tiers based on log classification
policies:
security_logs:
description: "Authentication, authorization, access logs"
retention_hot: 30d # Searchable, indexed
retention_warm: 90d # Searchable, slower
retention_cold: 365d # Archived, restore on demand
retention_total: 7y # Compliance requirement (PCI DSS, SOX)
compliance:
- PCI-DSS-10.7 # 1 year minimum
- SOX # 7 years
- HIPAA # 6 years
application_logs:
description: "Application debug, info, error logs"
retention_hot: 7d
retention_warm: 30d
retention_cold: 90d
retention_total: 90d
infrastructure_logs:
description: "System, network, container logs"
retention_hot: 3d
retention_warm: 14d
retention_cold: 30d
retention_total: 30d
audit_logs:
description: "Configuration changes, admin actions"
retention_hot: 90d
retention_warm: 365d
retention_cold: 7y
retention_total: 7y
immutable: true # Cannot be modified or deleted
Implement tiered storage in Elasticsearch:
# Create ILM policy for log lifecycle management
curl -X PUT "elasticsearch:9200/_ilm/policy/logs-lifecycle" \
-H "Content-Type: application/json" \
-d '{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "50gb",
"max_age": "1d"
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": {
"number_of_shards": 1
},
"forcemerge": {
"max_num_segments": 1
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"searchable_snapshot": {
"snapshot_repository": "logs-archive"
}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}'
# Apply policy to index template
curl -X PUT "elasticsearch:9200/_index_template/logs-template" \
-H "Content-Type: application/json" \
-d '{
"index_patterns": ["logs-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.lifecycle.name": "logs-lifecycle",
"index.lifecycle.rollover_alias": "logs"
}
}
}'
Security and Access Control
Logs contain sensitive information—credentials (hopefully not, but often), PII, internal system details, and business data. Protecting them requires multiple layers.
Log Sanitization
Filter sensitive data before it reaches storage:
# log_sanitizer.py
import re
from typing import Dict, Any
class LogSanitizer:
"""Sanitize sensitive data from log entries before shipping."""
PATTERNS = {
'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'api_key': r'(api[_-]?key|apikey|api_secret)["\s:=]+["\']?[\w-]{20,}',
'password': r'(password|passwd|pwd)["\s:=]+["\']?[^\s"\']+',
'bearer_token': r'Bearer\s+[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
}
REDACTI
'credit_card': '[REDACTED_CC]',
'ssn': '[REDACTED_SSN]',
'api_key': '[REDACTED_API_KEY]',
'password': '[REDACTED_PASSWORD]',
'bearer_token': '[REDACTED_TOKEN]',
'email': '[REDACTED_EMAIL]',
}
def sanitize(self, log_entry: Dict[str, Any]) -> Dict[str, Any]:
"""Recursively sanitize all string values in a log entry."""
return self._sanitize_value(log_entry)
def _sanitize_value(self, value: Any) -> Any:
if isinstance(value, str):
return self._sanitize_string(value)
elif isinstance(value, dict):
return {k: self._sanitize_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [self._sanitize_value(item) for item in value]
return value
def _sanitize_string(self, text: str) -> str:
for pattern_name, pattern in self.PATTERNS.items():
text = re.sub(
pattern,
self.REDACTION_MAP[pattern_name],
text,
flags=re.IGNORECASE
)
return text
Role-Based Access Control
Configure access tiers for different roles:
# elasticsearch-roles.yaml
# Security analyst - full access to security logs, limited elsewhere
security_analyst:
cluster:
- monitor
indices:
- names: ["security-*", "audit-*"]
privileges: ["read", "view_index_metadata"]
- names: ["logs-*"]
privileges: ["read"]
field_security:
grant: ["*"]
except: ["user.email", "user.ip", "request.body"]
# Developer - access to application logs only
developer:
cluster:
- monitor
indices:
- names: ["logs-*"]
privileges: ["read"]
query: '{"term": {"environment": "development"}}'
- names: ["logs-production-*"]
privileges: ["read"]
field_security:
grant: ["timestamp", "level", "service", "message", "trace_id"]
except: ["user.*", "request.headers.*"]
# Compliance auditor - read-only audit logs
compliance_auditor:
cluster:
- monitor
indices:
- names: ["audit-*"]
privileges: ["read", "view_index_metadata"]
Encryption
Encrypt logs in transit and at rest:
# Generate certificates for log shipper to Elasticsearch TLS
openssl req -x509 -nodes -days 365 -newkey rsa:2048 \
-keyout /etc/fluent-bit/tls.key \
-out /etc/fluent-bit/tls.crt \
-subj "/CN=fluent-bit"
# Elasticsearch encryption at rest configuration
# elasticsearch.yml
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.http.ssl.enabled: true
# Enable encryption at rest (requires appropriate license)
xpack.security.encryption.enabled: true
OpenTelemetry Logs: The Emerging Standard
OpenTelemetry Logs reached stability in 2024, providing a vendor-neutral standard for log collection that integrates with traces and metrics. While adoption is still growing, greenfield deployments should consider OTel as the instrumentation layer.
# opentelemetry_logging.py
from opentelemetry import trace
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.sdk.resources import Resource
import logging
def configure_otel_logging(service_name: str):
"""Configure OpenTelemetry logging with OTLP export."""
resource = Resource.create({
"service.name": service_name,
"service.version": "1.0.0",
"deployment.environment": "production"
})
logger_provider = LoggerProvider(resource=resource)
# Export to OpenTelemetry Collector
otlp_exporter = OTLPLogExporter(
endpoint="otel-collector:4317",
insecure=True
)
logger_provider.add_log_record_processor(
BatchLogRecordProcessor(otlp_exporter)
)
# Bridge Python logging to OpenTelemetry
handler = LoggingHandler(
level=logging.INFO,
logger_provider=logger_provider
)
logging.getLogger().addHandler(handler)
return logger_provider
OpenTelemetry Collector configuration for logs:
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
filelog:
include: [/var/log/**/*.log]
operators:
- type: json_parser
timestamp:
parse_from: attributes.timestamp
layout: '%Y-%m-%dT%H:%M:%S.%LZ'
processors:
batch:
timeout: 10s
send_batch_size: 1024
attributes:
actions:
- key: environment
value: production
action: insert
resource:
attributes:
- key: cluster
value: us-east-1
action: insert
exporters:
elasticsearch:
endpoints: ["https://elasticsearch:9200"]
logs_index: otel-logs
sending_queue:
enabled: true
num_consumers: 10
queue_size: 1000
loki:
endpoint: http://loki:3100/loki/api/v1/push
labels:
attributes:
service.name: "service"
level: "severity"
service:
pipelines:
logs:
receivers: [otlp, filelog]
processors: [batch, attributes, resource]
exporters: [elasticsearch, loki]
Troubleshooting Common Issues
High Log Volume Causing Performance Issues
# Identify highest-volume log sources
curl -s "elasticsearch:9200/logs-*/_search" \
-H "Content-Type: application/json" \
-d '{
"size": 0,
"aggs": {
"by_source": {
"terms": {
"field": "kubernetes.pod_name.keyword",
"size": 20,
"order": {"_count": "desc"}
}
}
}
}' | jq '.aggregations.by_source.buckets'
# Find noisy log patterns
curl -s "elasticsearch:9200/logs-*/_search" \
-H "Content-Type: application/json" \
-d '{
"size": 0,
"aggs": {
"by_message": {
"terms": {
"field": "message.keyword",
"size": 50
}
}
}
}'
# Implement sampling for high-volume, low-value logs
# In Vector config:
# [transforms.sample_health_checks]
# type = "sample"
# inputs = ["kubernetes_logs"]
# rate = 100
# c", "health check")'
Missing Logs
# Check Fluent Bit buffer status
curl -s http://fluent-bit:2020/api/v1/metrics | grep -E "(input|output)_"
# Verify log shipping pipeline
kubectl logs -n logging deployment/fluent-bit --tail=100 | grep -i error
# Check for dropped logs due to rate limiting
kubectl logs -n logging deployment/fluent-bit | grep -i "chunk"
# Verify Elasticsearch is accepting logs
curl -s "elasticsearch:9200/_cat/indices?v" | grep logs
# Check for index write blocks
curl -s "elasticsearch:9200/_cluster/allocation/explain" | jq .
Slow Queries
# Identify slow queries in Elasticsearch
curl -s "elasticsearch:9200/_nodes/stats/indices/search" | \
jq '.nodes | to_entries[] | {node: .key, query_time_ms: .value.indices.search.query_time_in_millis}'
# Check index health and shard distribution
curl -s "elasticsearch:9200/_cat/shards?v&s=store:desc" | head -20
# Optimize index for faster queries
curl -X POST "elasticsearch:9200/logs-2025.01.07/_forcemerge?max_num_segments=1"
Building a Log Management Strategy
Effective log management isn't about tools—it's about intentional decisions applied consistently. Start with these principles:
Log what matters, not everything. Every log line has storage cost, processing cost, and attention cost. The goal isn't maximum data; it's maximum signal.
Structure from the start. Retrofitting structured logging into a running system is painful. Establish logging standards before writing application code.
Plan for failure. Your logging pipeline will fail. Design for graceful degradation—local buffering, backpressure handling, and fallback destinations.
Measure the pipeline. You can't improve what you don't measure. Track ingestion rates, query latency, storage growth, and pipeline health as first-class metrics.
Iterate on retention. Start conservative (keep more), then tune based on actual query patterns. Most logs are never queried after 7 days—but the ones that are queried at day 30 during an incident are invaluable.
The organizations that get the most value from their logs treat log management as a discipline, not an afterthought. The investment in proper tooling, consistent practices, and thoughtful retention pays dividends every time something goes wrong—which, in production systems, is always.
---
Further Reading
Share this article
Comments
Comments are powered by GitHub Discussions via Giscus.
To enable comments, configure Giscus at giscus.app and update the Comments component with your repo settings.
Related Articles
Monitoring vs Observability: What Actually Changed and Why It Matters
Monitoring tells you when something breaks. Observability helps you understand why. Learn how OpenTelemetry, distributed tracing, and modern tooling are reshaping how engineering teams debug production systems.
ObservabilityScaling Smarter, Not Harder: The Battle of Serverless and Microservices Architectures
This article discusses microservices and serverless cloud architectures, helping enterprises decide which of the two is the better option.