Tổng quan cấu hình

Forwarder sử dụng file cấu hình YAML để định nghĩa các nguồn dữ liệu, đích đến, và các thiết lập khác.

Cấu hình chi tiết

Logging Configuration

Cấu hình logging cho Forwarder:
logging:
  level: "info" # Mức độ log: debug, info, warn, error
  dir: "./logs" # Thư mục chứa log files
  file: "default.log" # File log chính
  error_file: "default-error.log" # File log lỗi (tùy chọn)
  max_size_mb: 10 # Kích thước tối đa toàn bộ file log (MB)
Chi tiết các tham số:
  • level (bắt buộc): Mức độ logging
    • debug: Hiển thị tất cả thông tin chi tiết, bao gồm debug messages
    • info: Hiển thị thông tin chung và các sự kiện quan trọng (mặc định)
    • warn: Chỉ hiển thị cảnh báo và lỗi
    • error: Chỉ hiển thị lỗi
  • dir (bắt buộc): Đường dẫn thư mục lưu trữ log files (mặc định: ./logs)
  • file (bắt buộc): Tên file log chính (mặc định: default.log)
  • error_file (tùy chọn): Tên file log riêng cho lỗi (mặc định: default-error.log)
  • max_size_mb (bắt buộc): Kích thước tối đa của tất cả log files tính bằng MB (mặc định: 10MB)

Channel Buffers Configuration

Cấu hình buffer cho performance tuning và memory management:
channel_buffers:
  orchestrator_capacity: 10000000 # Buffer capacity cho orchestrator collector channel
  shipper_capacity: 10000000 # Buffer capacity cho shipper channels
  router_max_in_flight_routes: 4 # Maximum concurrent routing tasks
Chi tiết các tham số:
  • orchestrator_capacity (bắt buộc): Buffer capacity cho orchestrator collector channel (mặc định: 10000000)
    • Controls how many events can be queued between sources and orchestrator
    • Higher values improve burst handling but use more memory
  • shipper_capacity (bắt buộc): Buffer capacity cho shipper channels (mặc định: 10000000)
    • Controls how many events can be queued between orchestrator and sinks
    • Higher values improve throughput but use more memory
  • router_max_in_flight_routes (bắt buộc): Maximum concurrent routing tasks in orchestrator (mặc định: 4)
    • Controls parallelism when routing events to multiple sinks
    • Higher values improve throughput but increase CPU usage

Registry Configuration

Registry là điểm điều phối trung tâm (tùy chọn) để xác thực và đồng bộ cấu hình:
registry:
  api_url: "https://blackhole.glabs.one" # API endpoint của Registry
  api_key: "your-api-key-here" # API key để xác thực với Registry
  config_update_interval: "15s" # Chu kỳ pull cấu hình từ Registry (mặc định: 15 giây)
  proxy: # Cấu hình proxy (tùy chọn)
    enable: true # Bật/tắt proxy
    http: "http://proxy.company.com:8080" # HTTP proxy URL
    https: "https://proxy.company.com:8080" # HTTPS proxy URL
  tls: # TLS settings cho kết nối tới `api_url`
    ca_cert_path: null # Optional: custom CA bundle (PEM)
    insecure_skip_verify_https: false # Optional: skip HTTPS certificate verification (NOT recommended)
  http_server: # REST forwarder server (tùy chọn)
    enabled: true # Bật/tắt HTTP server embedded
    listen_addr: "0.0.0.0:18080" # Địa chỉ lắng nghe
    tls: # TLS cho embedded REST forwarder (HTTPS)
      enabled: false # Bật/tắt HTTPS (mặc định: false)
      cert_path: null # Optional: path tới PEM certificate
      key_path: null # Optional: path tới PEM private key
    cache_ttl: "30s" # TTL cache cho request GET
    cache_max_entries: 10000 # Số entry cache tối đa
    cache_max_body_bytes: 262144 # Kích thước body tối đa được cache (bytes)
    request_timeout_sec: 30 # Timeout gọi upstream (giây)
    retry_on_statuses: [401, 404] # Status sẽ auto-join và retry 1 lần
Chi tiết các tham số:
  • api_url (bắt buộc): URL của API endpoint Registry (mặc định: https://blackhole.glabs.one)
  • api_key (tùy chọn): API key để xác thực với Registry (mặc định: null)
  • config_update_interval (bắt buộc): Chu kỳ đồng bộ cấu hình từ Registry (mặc định: 15s)
  • proxy (tùy chọn): Cấu hình proxy
    • enable (bắt buộc): Bật/tắt proxy (mặc định: true)
    • http (tùy chọn): URL của HTTP proxy
    • https (tùy chọn): URL của HTTPS proxy
  • tls (tùy chọn): TLS settings cho kết nối tới api_url (chỉ áp dụng khi api_url dùng https://)
    • ca_cert_path (tùy chọn): Đường dẫn file CA bundle (PEM) để validate certificate của Registry
    • insecure_skip_verify_https (tùy chọn): Skip HTTPS certificate verification (không khuyến nghị)
  • http_server (tùy chọn): Embedded REST forwarder server cho downstream devices
    • enabled (bắt buộc): Bật/tắt server (mặc định: true)
    • listen_addr (bắt buộc): Địa chỉ lắng nghe (mặc định: "0.0.0.0:18080")
    • tls (tùy chọn): TLS cho embedded REST forwarder (HTTPS)
      • enabled (tùy chọn): Bật HTTPS cho embedded REST forwarder
      • cert_path (tùy chọn): Path tới server certificate file (PEM)
      • key_path (tùy chọn): Path tới server private key file (PEM)
    • cache_ttl (bắt buộc): TTL cache cho GET response (mặc định: "30s")
    • cache_max_entries (bắt buộc): Số cache entry tối đa (mặc định: 10000)
    • cache_max_body_bytes (bắt buộc): Kích thước body tối đa được cache (mặc định: 262144)
    • request_timeout_sec (bắt buộc): Timeout gọi upstream (mặc định: 30)
    • retry_on_statuses (bắt buộc): Danh sách HTTP status sẽ auto-join và retry 1 lần (mặc định: [401, 404])

Sources Configuration

MQTT Source

Thu thập dữ liệu từ MQTT topics:
sources:
  mqtt_agents:
    type: "mqtt"
    topics: # Danh sách topics để subscribe
      - "agents/+/data"
      - "devices/+/logs"
      - "sensors/+/metrics"
    index: "forwarder" # Index cho dữ liệu
    sourcetype: "mqtt_data" # Loại dữ liệu
    broker: # Cấu hình broker (tùy chọn, sử dụng local broker nếu không có)
      url: "mqtt://localhost:1883"
      auth:
        type: "basic"
        username: "subscriber"
        password: "password"
      proxy:
        enable: false
      lwt: # Last Will and Testament (tùy chọn)
        topic: "clients/forwarder/status"
        payload: "offline"
        qos: "at_most_once"
        retain: false
    source_override: false # Không override index/sourcetype từ source
    request_queue_capacity: 1000 # Dung lượng queue nội bộ (tùy chọn)
    keep_alive_seconds: 60 # Khoảng thời gian keep-alive (giây, tùy chọn)
    max_incoming_packet_size: 1048576 # Kích thước packet tối đa nhận vào (bytes, tùy chọn)
    max_outgoing_packet_size: 1048576 # Kích thước packet tối đa gửi đi (bytes, tùy chọn)
    subscription_qos: "at_most_once" # QoS level cho subscription (at_most_once, at_least_once, exactly_once)
    network: # Tùy chọn cấu hình mạng
      connection_timeout_seconds: 30 # Timeout kết nối TCP (giây, tùy chọn)
      nodelay: true # Bật TCP_NODELAY (tùy chọn)
    clean_session: true # Bật clean session (tùy chọn, mặc định: true)
    inflight: 100 # Số message QoS>0 tối đa đang xử lý (tùy chọn)
    pending_throttle_micros: 0 # Delay giữa các packet gửi đi (microseconds, tùy chọn)
    manual_acks: false # Bật manual acknowledgements (tùy chọn, mặc định: false)
Chi tiết các tham số:
  • type (bắt buộc): Loại source, phải là "mqtt"
  • topics (bắt buộc): Danh sách MQTT topics để subscribe (mặc định: [])
    • Hỗ trợ wildcard patterns: + (single level), # (multi-level)
    • Ví dụ: "agents/+/data", "devices/#"
  • index (tùy chọn): Index để lưu trữ dữ liệu (mặc định: null)
  • sourcetype (tùy chọn): Loại dữ liệu để phân loại (mặc định: null)
  • broker (tùy chọn): Cấu hình MQTT broker (mặc định: sử dụng local broker)
    • url (bắt buộc): URL của MQTT broker (mặc định: mqtt://localhost:1883)
    • auth (bắt buộc): Cấu hình xác thực
      • type (bắt buộc): Loại xác thực
        • "none": Không xác thực (mặc định)
        • "basic": Username/password authentication
      • username (bắt buộc khi type=“basic”): Tên người dùng
      • password (bắt buộc khi type=“basic”): Mật khẩu
    • proxy (tùy chọn): Cấu hình proxy
      • enable (bắt buộc): Bật/tắt proxy (mặc định: true)
      • http (tùy chọn): URL của HTTP proxy
      • https (tùy chọn): URL của HTTPS proxy
  • source_override (bắt buộc): Có override index/sourcetype từ source không (mặc định: false)
    • true: Override index/sourcetype từ source
    • false: Không override index/sourcetype từ source
  • request_queue_capacity (tùy chọn): Dung lượng queue nội bộ cho MQTT client (mặc định: null - sử dụng giá trị mặc định của rumqttc)
    • Điều khiển kích thước queue message nội bộ
    • Giá trị cao hơn cải thiện throughput nhưng sử dụng nhiều bộ nhớ hơn
    • Ví dụ: 1000, 5000
  • keep_alive_seconds (tùy chọn): Khoảng thời gian keep-alive tính bằng giây (mặc định: null - sử dụng giá trị mặc định của rumqttc)
    • Khoảng thời gian giữa các keep-alive packets để duy trì kết nối
    • Ví dụ: 60, 120
  • max_incoming_packet_size (tùy chọn): Kích thước packet tối đa nhận vào tính bằng bytes (mặc định: null - sử dụng giá trị mặc định của rumqttc)
    • Giới hạn kích thước packet lớn nhất có thể nhận
    • Ví dụ: 1048576 (1MB), 5242880 (5MB)
  • max_outgoing_packet_size (tùy chọn): Kích thước packet tối đa gửi đi tính bằng bytes (mặc định: null - sử dụng giá trị mặc định của rumqttc)
    • Giới hạn kích thước packet lớn nhất có thể gửi
    • Ví dụ: 1048576 (1MB), 5242880 (5MB)
  • subscription_qos (bắt buộc): QoS level cho subscription (mặc định: "at_most_once")
    • "at_most_once": QoS 0 - Message có thể bị mất, không đảm bảo delivery
    • "at_least_once": QoS 1 - Message được đảm bảo delivery ít nhất một lần, có thể duplicate
    • "exactly_once": QoS 2 - Message được đảm bảo delivery đúng một lần, không duplicate
  • network (tùy chọn): Cấu hình tùy chọn mạng cho kết nối MQTT
    • connection_timeout_seconds (tùy chọn): Timeout kết nối TCP tính bằng giây (mặc định: null)
      • Áp dụng cho cả kết nối ban đầu và reconnect
      • Ví dụ: 30, 60
    • nodelay (tùy chọn): Bật TCP_NODELAY option (mặc định: null)
      • true: Tắt Nagle algorithm, giảm latency
      • false: Bật Nagle algorithm, tối ưu bandwidth
  • clean_session (tùy chọn): Bật/tắt clean session (mặc định: true)
    • true: Session sẽ bị xóa khi disconnect, không lưu subscription state
    • false: Session được duy trì, subscription state được lưu trên broker
  • inflight (tùy chọn): Số message QoS>0 tối đa đang xử lý đồng thời (mặc định: null - sử dụng giá trị mặc định của rumqttc)
    • Giới hạn số message đang chờ acknowledgment
    • Ví dụ: 100, 200
  • pending_throttle_micros (tùy chọn): Delay giữa các packet gửi đi tính bằng microseconds (mặc định: null - không throttle)
    • Điều khiển tốc độ gửi packet để tránh quá tải
    • Ví dụ: 1000 (1ms delay), 10000 (10ms delay)
  • manual_acks (tùy chọn): Bật manual acknowledgements cho incoming publishes (mặc định: false)
    • true: Phải gọi thủ công để acknowledge message (hữu ích cho xử lý async)
    • false: Tự động acknowledge message
  • broker.lwt (tùy chọn): Cấu hình Last Will and Testament (LWT)
    • Khi client disconnect bất thường, broker sẽ publish message này
    • topic (bắt buộc): Topic để publish LWT message
    • payload (bắt buộc): Nội dung LWT message
    • qos (bắt buộc): QoS level cho LWT message (mặc định: "at_most_once")
      • "at_most_once", "at_least_once", "exactly_once"
    • retain (bắt buộc): Retain flag cho LWT message (mặc định: false)
      • true: Broker giữ lại LWT message cho subscriber mới
      • false: LWT message không được retain
Tính năng MQTT Source:
  • Hỗ trợ wildcard topics (+, #)
  • Tự động kết nối và reconnect
  • QoS support
  • Authentication support
Ví dụ cấu hình MQTT Source:
sources:
  # Thu thập từ local MQTT broker
  mqtt_local:
    type: "mqtt"
    topics:
      - "agents/+/data"
      - "devices/+/logs"
    index: "forwarder"
    sourcetype: "mqtt_data"

  # Thu thập từ remote MQTT broker
  mqtt_remote:
    type: "mqtt"
    topics:
      - "sensors/+/metrics"
      - "iot/#"
    index: "iot"
    sourcetype: "sensor_data"
    broker:
      url: "mqtts://broker.company.com:8883"
      auth:
        type: "basic"
        username: "subscriber"
        password: "secure-password"
      proxy:
        enable: true
        https: "https://proxy.company.com:8080"
      lwt:
        topic: "forwarders/status"
        payload: '{"status": "offline", "timestamp": "{{ timestamp }}"}'
        qos: "at_least_once"
        retain: true
    subscription_qos: "at_least_once"
    keep_alive_seconds: 120
    network:
      connection_timeout_seconds: 30
      nodelay: true
    clean_session: false
    inflight: 200

  # Cấu hình nâng cao với manual acks
  mqtt_advanced:
    type: "mqtt"
    topics:
      - "critical/alerts/#"
    index: "alerts"
    sourcetype: "critical"
    broker:
      url: "mqtts://secure-broker.company.com:8883"
      auth:
        type: "basic"
        username: "monitoring"
        password: "secure-password"
    subscription_qos: "exactly_once"
    request_queue_capacity: 5000
    max_incoming_packet_size: 5242880 # 5MB
    max_outgoing_packet_size: 5242880 # 5MB
    network:
      connection_timeout_seconds: 60
      nodelay: true
    clean_session: true
    inflight: 100
    pending_throttle_micros: 1000 # 1ms delay
    manual_acks: true

Rsyslog Source

Thu thập syslog từ các thiết bị mạng:
sources:
  rsyslog_network:
    type: "rsyslog"
    includes: # Danh sách IP addresses hoặc subnets
      - "10.0.0.0/8"
      - "192.168.0.0/16"
      - "172.16.0.0/12"
    index: "syslog" # Index cho dữ liệu
    sourcetype: "rsyslog" # Loại dữ liệu
Chi tiết các tham số:
  • type (bắt buộc): Loại source, phải là "rsyslog"
  • includes (bắt buộc): Danh sách IP addresses hoặc subnets để chấp nhận (mặc định: [])
    • Hỗ trợ CIDR notation (từ phiên bản 0.4.1 trở đi): "192.168.1.0/24", "10.0.0.0/8"
    • Hỗ trợ single IP: "192.168.1.100"
  • index (tùy chọn): Index để lưu trữ dữ liệu (mặc định: null)
  • sourcetype (tùy chọn): Loại dữ liệu để phân loại (mặc định: null)
Tính năng Rsyslog Source:
  • Hỗ trợ UDP và TCP
  • IP filtering và routing
  • Automatic log parsing
  • High-performance processing
Ví dụ cấu hình Rsyslog Source:
sources:
  # Thu thập từ mạng nội bộ
  rsyslog_internal:
    type: "rsyslog"
    includes:
      - "192.168.0.0/16"
      - "10.0.0.0/8"
    index: "internal_syslog"
    sourcetype: "rsyslog"

  # Thu thập từ DMZ
  rsyslog_dmz:
    type: "rsyslog"
    includes:
      - "172.16.0.0/12"
    index: "dmz_syslog"
    sourcetype: "rsyslog"

Kafka Source (Từ phiên bản 0.5.1 trở đi)

Thu thập dữ liệu từ Apache Kafka topics:
sources:
  kafka_logs:
    type: "kafka"
    topics:
      - "logs.ingest"
      - "metrics.raw"
    index: "kafka"
    sourcetype: "kafka_json"
    bootstrap_servers:
      - "kafka1.company.com:9092"
      - "kafka2.company.com:9092"
    group_id: "forwarder-logs"
    client_id: "forwarder-consumer"
    auto_commit_enabled: true
    auto_commit_interval_ms: 5000
    auto_offset_reset: "latest"
    fetch_min_bytes: 1
    max_partition_fetch_bytes: 1048576
    session_timeout_ms: 60000
    heartbeat_interval_ms: 10000
    max_poll_interval_ms: 300000
    security:
      type: "sasl"
      sasl:
        mechanism: "SCRAM-SHA-512"
        username: "forwarder"
        password: "secure-password"
    source_override: false
    extra:
      enable.partition.eof: "true"
Chi tiết các tham số:
  • type (bắt buộc): Luôn là "kafka".
  • topics (bắt buộc): Danh sách topic cần subscribe; hỗ trợ wildcard của Kafka (logs.* với pattern regex trong broker).
  • index (tùy chọn): Index mặc định cho sự kiện thu thập (mặc định: null).
  • sourcetype (tùy chọn): Loại dữ liệu để phục vụ routing/parsing downstream (mặc định: null).
  • bootstrap_servers (tùy chọn): Danh sách Kafka brokers, dùng cho kết nối cluster (mặc định: ["localhost:9092"]).
  • group_id (bắt buộc): Consumer group ID để điều phối offset (mặc định: "siem-agent-consumer-group").
  • client_id (tùy chọn): Client ID để định danh consumer (mặc định: tự sinh).
  • auto_commit_enabled (tùy chọn): Bật/tắt auto-commit offset (true mặc định).
  • auto_commit_interval_ms (tùy chọn): Chu kỳ commit tự động tính bằng ms (mặc định: 5000).
  • auto_offset_reset (tùy chọn): Chính sách offset khi không tìm thấy offset trước đó. Giá trị hợp lệ: "earliest", "latest" (mặc định), "none".
  • fetch_min_bytes (tùy chọn): Số byte tối thiểu mỗi lần fetch (mặc định: 1).
  • max_partition_fetch_bytes (tùy chọn): Giới hạn dữ liệu lấy trên mỗi partition (mặc định: 1048576 = 1MB).
  • session_timeout_ms (tùy chọn): Thời gian session timeout cho broker (mặc định: 60000).
  • heartbeat_interval_ms (tùy chọn): Chu kỳ gửi heartbeat (mặc định: 10000).
  • max_poll_interval_ms (tùy chọn): Thời gian tối đa giữa các lần poll trước khi consumer bị coi là chết (mặc định: 300000).
  • security (tùy chọn): Cấu hình bảo mật cho Kafka (mặc định: "none"). Hỗ trợ:
    • type: "none": Kết nối PLAINTEXT.
    • type: "tls": TLS client với các trường tls.ca_location, tls.certificate_location, tls.key_location, tls.key_password, tls.verify_certificate.
    • type: "sasl": Xác thực SASL (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER) kèm tùy chọn TLS.
  • source_override (tùy chọn): Khi true, forwarder sẽ ghi đè indexsourcetype của sự kiện downstream bằng giá trị từ source (mặc định: false).
  • proxy (tùy chọn): Cấu hình proxy HTTP CONNECT khi cần đi qua proxy (lưu ý: rdkafka chỉ hỗ trợ giới hạn).
  • extra (tùy chọn): Bản đồ {key: value} bổ sung trực tiếp vào cấu hình librdkafka, ví dụ {"fetch.message.max.bytes": "2097152"}.
Tính năng Kafka Source:
  • Tự động subscribe từng topic trong danh sách topics.
  • Hỗ trợ giải mã payload JSON thành LogEvent (1 hoặc nhiều sự kiện). Nếu không phải JSON, Forwarder vẫn chuyển nguyên bản tin qua pipeline.
  • Gắn metadata kafka_topic, kafka_partition, kafka_offset vào LogEvent.extra.
  • Cho phép cấu hình override index/sourcetype thông qua source_override.
  • Chia sẻ logic bảo mật với Kafka Sink, giúp tái sử dụng chứng chỉ và thông tin SASL/TLS.
  • Có thể tinh chỉnh toàn bộ tham số librdkafka thông qua trường extra.
Tham khảo thêm:

gRPC Source

Nhận dữ liệu qua embedded gRPC server của Forwarder và route vào pipeline như một source.
Quan trọng: Để Forwarder nhận được dữ liệu gRPC, bạn cần:
  • Bật grpc_server (listener) ở mức root config
  • Khai báo ít nhất 1 gRPC source (để cài routing table)
sources:
  grpc_ingest:
    type: "grpc"
    # includes là regex match theo trường "topic" trong request gRPC
    includes:
      - "logs.*"
      - "metrics.*"
    index: "grpc" # Optional
    sourcetype: "grpc_json" # Optional
    source_override: false
    extra: {}
Chi tiết các tham số:
  • identifier: Được sinh tự động theo key trong sources (ví dụ grpc_ingest). Không cần cấu hình trực tiếp trong YAML.
  • type (bắt buộc): Loại source, phải là "grpc".
  • includes (bắt buộc): Danh sách pattern (regex) để match theo topic của message (mặc định: ["*"]).
    • "*": match tất cả topics.
    • Nếu regex không hợp lệ, Forwarder sẽ bỏ qua pattern đó.
  • index (tùy chọn): Index mặc định cho sự kiện tạo ra từ gRPC payload (mặc định: null).
  • sourcetype (tùy chọn): Sourcetype mặc định cho sự kiện tạo ra từ gRPC payload (mặc định: null).
  • source_override (tùy chọn): Khi true, Forwarder sẽ override index/sourcetype downstream bằng giá trị cấu hình ở source này (mặc định: false).
  • extra (tùy chọn): Map {key: value} cho tuning/flags nâng cao (mặc định: {}).
Định dạng payload gRPC (hành vi hiện tại):
  • Forwarder cố gắng parse log (bytes) thành JSON LogEvent hoặc Vec<LogEvent>.
  • Nếu parse thất bại, payload sẽ được coi là raw string và đưa vào trường _raw.
  • grpc_topicgrpc_source_identifier được gắn vào LogEvent.extra.

SNMP Source (Từ phiên bản 0.5.2 trở đi)

Thu thập dữ liệu từ các thiết bị mạng thông qua SNMP (Simple Network Management Protocol), hỗ trợ cả polling (GET requests) và trap reception:
sources:
  snmp_network:
    type: "snmp"
    includes: # Danh sách IP addresses, hostnames, hoặc CIDR
      - target: "192.168.1.100"
      - target: "router.company.com:161"
      - target: "10.0.0.0/24" # CIDR cho trap routing
    port: 161 # Port mặc định cho polling (có thể override per include)
    version: v2c # v1, v2c, hoặc v3
    credentials:
      community: "public" # Cho v1/v2c
      # Hoặc cho v3:
      # v3:
      #   username: "monitoring"
      #   auth:
      #     protocol: sha256
      #     key: "auth-password"
      #   privacy:
      #     protocol: aes128
      #     key: "priv-password"
    poll:
      interval_secs: 60 # Chu kỳ polling (giây)
      timeout_ms: 2000 # Timeout mỗi request (ms)
      retries: 2 # Số lần retry
      max_oids_per_request: 20 # Số OID tối đa mỗi batch
      jitter_pct: 10 # Jitter để tránh thundering herd (%)
    poll_enabled: true # Bật polling
    trap_enabled: true # Bật trap reception
    oids: # Danh sách OID cần poll
      # Định dạng đơn giản (backward compatible)
      - "1.3.6.1.2.1.1.3.0" # sysUpTime
      # Định dạng có tên (khuyến nghị)
      - name: "sysDescr"
        oid: "1.3.6.1.2.1.1.1.0"
      - name: "cpu_load_core1"
        oid: "1.3.6.1.2.1.25.3.3.1.2.1"
      - name: "total_memory"
        oid: "1.3.6.1.4.1.2021.4.5.0"
      - name: "free_memory"
        oid: "1.3.6.1.4.1.2021.4.6.0"
    index: "snmp" # Index cho dữ liệu
    sourcetype: "snmp" # Loại dữ liệu
Chi tiết các tham số:
  • type (bắt buộc): Loại source, phải là "snmp"
  • includes (bắt buộc): Danh sách địa chỉ thiết bị để thu thập dữ liệu
    • Hỗ trợ IP: "192.168.1.100"
    • Hỗ trợ hostname: "router.company.com"
    • Hỗ trợ port inline: "192.168.1.100:1161" hoặc "router:161"
    • Hỗ trợ CIDR: "10.0.0.0/24" (chủ yếu cho trap routing)
    • Mỗi include có thể override port, index, sourcetype riêng
  • port (bắt buộc): Port UDP mặc định cho polling (mặc định: 161)
  • version (bắt buộc): Phiên bản SNMP protocol (mặc định: v3)
    • v1: SNMP version 1 (community string)
    • v2c: SNMP version 2c (community string)
    • v3: SNMP version 3 (hỗ trợ auth/privacy)
  • credentials (bắt buộc): Thông tin xác thực
    • Cho v1/v2c: community: "public"
    • Cho v3: v3 block với username, auth, privacy
  • poll (bắt buộc): Cấu hình polling
    • interval_secs (bắt buộc): Chu kỳ polling tính bằng giây (mặc định: 60)
    • timeout_ms (bắt buộc): Timeout mỗi request tính bằng ms (mặc định: 2000)
    • retries (bắt buộc): Số lần retry khi thất bại (mặc định: 2)
    • max_oids_per_request (bắt buộc): Số OID tối đa mỗi batch request (mặc định: 20)
    • jitter_pct (tùy chọn): Phần trăm jitter để tránh thundering herd (0-100, mặc định: null)
  • poll_enabled (bắt buộc): Bật/tắt polling (mặc định: true)
  • trap_enabled (bắt buộc): Bật/tắt trap reception (mặc định: true)
  • oids (bắt buộc): Danh sách OID cần poll
    • Định dạng đơn giản: "1.3.6.1.2.1.1.3.0" (tên mặc định là OID)
    • Định dạng có tên: { name: "cpu_load", oid: "1.3.6.1.2.1.25.3.3.1.2.1" }
    • Tên sẽ xuất hiện trong output JSON để dễ truy vấn
  • index (tùy chọn): Index để lưu trữ dữ liệu (mặc định: null)
  • sourcetype (tùy chọn): Loại dữ liệu để phân loại (mặc định: null)
Tính năng SNMP Source:
  • Hỗ trợ SNMP v1, v2c, và v3 (với authentication/privacy)
  • Polling định kỳ với jitter để tránh thundering herd
  • Trap reception từ các thiết bị mạng
  • Named OIDs để dễ truy vấn và phân tích
  • Multi-device support: một source có thể quản lý nhiều thiết bị
  • IP-based routing: tự động route trap dựa trên IP nguồn
  • Default source fallback: trap từ thiết bị chưa cấu hình sẽ được route đến default source
Ví dụ cấu hình SNMP Source:
sources:
  # Polling-only: Thu thập metrics từ router
  snmp_routers:
    type: "snmp"
    includes:
      - target: "192.168.1.1"
      - target: "192.168.1.2"
    version: v2c
    credentials:
      community: "monitoring"
    poll:
      interval_secs: 30
      timeout_ms: 2000
    poll_enabled: true
    trap_enabled: false
    oids:
      - name: "sysUpTime"
        oid: "1.3.6.1.2.1.1.3.0"
      - name: "ifInOctets"
        oid: "1.3.6.1.2.1.2.2.1.10.1"
    index: "network_metrics"
    sourcetype: "snmp_poll"

  # Trap-only: Chỉ nhận trap từ switches
  snmp_switches:
    type: "snmp"
    includes:
      - target: "10.0.0.0/24" # CIDR cho trap routing
    version: v2c
    credentials:
      community: "public"
    poll_enabled: false
    trap_enabled: true
    index: "network_traps"
    sourcetype: "snmp_trap"

  # SNMPv3 với authentication và privacy
  snmp_secure:
    type: "snmp"
    includes:
      - target: "critical-device.company.com"
    version: v3
    credentials:
      v3:
        username: "monitoring"
        auth:
          protocol: sha256
          key: "auth-password-here"
        privacy:
          protocol: aes128
          key: "privacy-password-here"
    poll:
      interval_secs: 60
    poll_enabled: true
    trap_enabled: true
    oids:
      - name: "cpu_load"
        oid: "1.3.6.1.2.1.25.3.3.1.2.1"
      - name: "total_memory"
        oid: "1.3.6.1.4.1.2021.4.5.0"
      - name: "free_memory"
        oid: "1.3.6.1.4.1.2021.4.6.0"
    index: "secure_snmp"
    sourcetype: "snmp_v3"
Định dạng Output: SNMP events được tạo dưới dạng JSON với cấu trúc:
{
  "event_type": "poll", // hoặc "trap"
  "endpoint": "192.168.1.100:161",
  "timestamp": "2024-01-15T10:30:00Z",
  "varbinds": [
    {
      "oid": "1.3.6.1.2.1.25.3.3.1.2.1",
      "name": "cpu_load_core1", // Tên nếu được cấu hình
      "type": "integer",
      "value": 45
    },
    {
      "oid": "1.3.6.1.4.1.2021.4.6.0",
      "name": "free_memory",
      "type": "integer",
      "value": 2048000
    }
  ],
  "snmp_version": "V2c",
  "message_type": "GetResponse"
}
OID phổ biến cho monitoring:
  • System Info:
    • 1.3.6.1.2.1.1.3.0 - sysUpTime (thời gian hoạt động)
    • 1.3.6.1.2.1.1.1.0 - sysDescr (mô tả hệ thống)
    • 1.3.6.1.2.1.1.5.0 - sysName (tên hệ thống)
  • CPU:
    • 1.3.6.1.2.1.25.3.3.1.2.1 - hrProcessorLoad (CPU load core 1)
    • 1.3.6.1.2.1.25.3.3.1.2.2 - hrProcessorLoad (CPU load core 2)
  • Memory (UCD-SNMP MIB):
    • 1.3.6.1.4.1.2021.4.5.0 - memTotalReal (tổng RAM)
    • 1.3.6.1.4.1.2021.4.6.0 - memAvailReal (RAM còn trống)
  • Storage (Host Resources MIB):
    • 1.3.6.1.2.1.25.2.3.1.3.<index> - hrStorageDescr (mô tả storage)
    • 1.3.6.1.2.1.25.2.3.1.5.<index> - hrStorageSize (kích thước)
    • 1.3.6.1.2.1.25.2.3.1.6.<index> - hrStorageUsed (đã sử dụng)
Lưu ý: Sử dụng snmpwalk để khám phá OID có sẵn trên thiết bị của bạn: snmpwalk -v2c -c public <device-ip> 1.3.6.1.2.1

Sinks Configuration

MQTT Sink

sinks:
  mqtt_production:
    type: "mqtt"
    inputs: ["*"] # Optional: specific source identifiers
    url: "mqtts://broker.company.com:8883?client_id=forwarder-001"
    auth:
      type: "basic"
      username: "forwarder"
      password: "secure-password"
    proxy: # Proxy configuration (optional)
      enable: true
      http: "http://proxy.company.com:8080"
    rate_limit:
      batch_size: 100
      batch_interval: 500
      batch_max_bytes: 1048576
Chi tiết các tham số:
  • type (bắt buộc): Loại sink, phải là "mqtt"
  • inputs (tùy chọn): Danh sách source identifiers để consume (mặc định: ["*"])
    • ["*"]: Consume tất cả sources
    • ["mqtt_data", "rsyslog_data"]: Chỉ consume từ specific sources
  • url (bắt buộc): URL của MQTT broker (mặc định: mqtt://localhost:1883?client_id=siem-agent)
    • Hỗ trợ protocols: mqtt://, mqtts://, ws://, wss://
    • Có thể thêm query parameters: ?client_id=forwarder-001
  • auth (bắt buộc): Cấu hình xác thực (mặc định: "none")
    • type (bắt buộc): Loại xác thực
      • "none": Không xác thực (mặc định)
      • "basic": Username/password authentication
    • username (bắt buộc khi type=“basic”): Tên người dùng
    • password (bắt buộc khi type=“basic”): Mật khẩu
  • proxy (tùy chọn): Cấu hình proxy
    • enable (bắt buộc): Bật/tắt proxy (mặc định: true)
    • http (tùy chọn): URL của HTTP proxy
    • https (tùy chọn): URL của HTTPS proxy
  • rate_limit (bắt buộc): Cấu hình giới hạn tốc độ
    • batch_size (bắt buộc): Số message trong mỗi batch (mặc định: 500)
    • batch_interval (bắt buộc): Khoảng thời gian giữa các batch tính bằng ms (mặc định: 1000ms)
    • batch_max_bytes (bắt buộc): Kích thước tối đa mỗi batch tính bằng bytes (mặc định: 10485760 bytes = 10MB)

Kafka Sink

sinks:
  kafka_production:
    type: "kafka"
    inputs: ["*"] # Optional: specific source identifiers
    bootstrap_servers:
      - "kafka1.company.com:9092"
      - "kafka2.company.com:9092"
    client_id: "blackhole-forwarder" # Optional: auto-generated if not specified
    security:
      type: "sasl"
      sasl:
        mechanism: "PLAIN"
        username: "forwarder"
        password: "secure-password"
        tls:
          verify_certificate: true
    acks: "leader" # none, leader, all
    compression: "gzip" # none, gzip, snappy, lz4, zstd
    enable_idempotence: true
    transactional_id: null # Optional: for exactly-once semantics
    batching:
      linger_ms: 100
      batch_num_messages: 1000
      batch_kbytes: 1024
    retries:
      max_retries: 10
      backoff_ms: 100
    timeouts:
      socket_timeout_ms: 60000
      request_timeout_ms: 30000
      message_timeout_ms: 300000
      connections_max_idle_ms: 300000
    producer_pool_size: 4 # Number of producer instances (1-64)
    extra: {} # Additional librdkafka properties
Chi tiết các tham số:
  • type (bắt buộc): Loại sink, phải là "kafka"
  • inputs (tùy chọn): Danh sách source identifiers để consume (mặc định: ["*"])
    • ["*"]: Consume tất cả sources
    • ["mqtt_data", "rsyslog_data"]: Chỉ consume từ specific sources
  • bootstrap_servers (bắt buộc): Danh sách Kafka brokers (mặc định: ["localhost:9092"])
  • client_id (tùy chọn): Client ID để nhận diện producer (mặc định: null - auto-generated)
  • security (bắt buộc): Cấu hình bảo mật (mặc định: "none")
    • type (bắt buộc): Loại bảo mật
      • "none": Không bảo mật (mặc định)
      • "tls": Chỉ TLS
      • "sasl": SASL authentication
    • sasl (bắt buộc khi type=“sasl”): Cấu hình SASL
      • mechanism (bắt buộc): Cơ chế SASL (mặc định: "PLAIN")
      • username (tùy chọn): Tên người dùng
      • password (tùy chọn): Mật khẩu
      • tls (tùy chọn): Cấu hình TLS cho SASL
        • verify_certificate (bắt buộc): Xác minh chứng chỉ (mặc định: true)
  • acks (bắt buộc): Mức độ xác nhận (mặc định: "leader")
    • "none": Không yêu cầu xác nhận (acks=0)
    • "leader": Chỉ yêu cầu xác nhận từ leader (acks=1)
    • "all": Yêu cầu xác nhận từ tất cả replicas (acks=all)
  • compression (bắt buộc): Loại nén dữ liệu (mặc định: "none")
    • "none", "gzip", "snappy", "lz4", "zstd"
  • enable_idempotence (bắt buộc): Bật idempotent producer (mặc định: false)
  • transactional_id (tùy chọn): Transactional ID cho exactly-once semantics (mặc định: null)
  • batching (bắt buộc): Cấu hình batching
    • linger_ms (bắt buộc): Thời gian chờ để tạo batch tính bằng ms (mặc định: 0)
    • batch_num_messages (bắt buộc): Số message tối đa trong batch (mặc định: 100000)
    • batch_kbytes (bắt buộc): Kích thước tối đa batch tính bằng KB (mặc định: 1048576 KB = 1GB)
  • retries (bắt buộc): Cấu hình retry
    • max_retries (bắt buộc): Số lần retry tối đa (mặc định: 10)
    • backoff_ms (bắt buộc): Thời gian chờ giữa các retry tính bằng ms (mặc định: 100)
  • timeouts (bắt buộc): Cấu hình timeout
    • socket_timeout_ms (bắt buộc): Socket timeout tính bằng ms (mặc định: 60000)
    • request_timeout_ms (bắt buộc): Request timeout tính bằng ms (mặc định: 30000)
    • message_timeout_ms (bắt buộc): Message timeout tính bằng ms (mặc định: 300000)
    • connections_max_idle_ms (bắt buộc): Connection idle timeout tính bằng ms (mặc định: 300000)
  • producer_pool_size (bắt buộc): Số producer instances trong pool (mặc định: 4, range: 1-64)
  • extra (bắt buộc): Additional librdkafka configuration properties (mặc định: {})

gRPC Sink

Gửi sự kiện tới gRPC endpoint (ví dụ embedded gRPC server của Forwarder khác hoặc một dịch vụ gRPC bên ngoài). gRPC sink sẽ serialize mỗi LogEvent thành JSON và gửi theo PublishStream (mặc định) hoặc Publish. Trường topic của gRPC request mặc định là index của event.
sinks:
  grpc_forwarder:
    type: "grpc"
    url: "http://forwarder.company.com:50051"
    inputs: ["*"]
    headers:
      x-api-key: "forwarder-secret"
    use_streaming: true
    max_concurrent_requests: 4
    message_limits:
      max_encoding_message_size: 4194304
      max_decoding_message_size: 4194304
    rate_limit:
      batch_size: 500
      batch_interval: 1000
      batch_max_bytes: 10485760
Chi tiết các tham số:
  • identifier: Được sinh tự động theo key trong sinks (ví dụ grpc_forwarder). Không cần cấu hình trực tiếp trong YAML.
  • type (bắt buộc): Loại sink, phải là "grpc".
  • url (bắt buộc): URL đầy đủ của gRPC endpoint (mặc định: http://localhost:50051).
    • Hỗ trợ http://https://.
    • Alias: endpoint.
  • inputs (tùy chọn): Danh sách source identifiers để consume (mặc định: ["*"]).
  • tls (tùy chọn): TLS client cho kết nối gRPC.
    • ca_cert_path (tùy chọn).
    • client_cert_path (tùy chọn).
    • client_key_path (tùy chọn).
    • domain_name (tùy chọn).
    • insecure_skip_verify (tùy chọn, mặc định false).
    • Lưu ý runtime hiện tại: shipper mới chỉ áp dụng domain_name; các trường còn lại hiện chưa được áp dụng.
  • connect_timeout_secs (tùy chọn): Timeout TCP connect (seconds).
  • request_timeout_secs (tùy chọn, alias timeout_secs): Timeout per-RPC (seconds).
    • Lưu ý runtime hiện tại: shipper chưa áp dụng timeout này.
  • tcp_keepalive_secs (tùy chọn): TCP keepalive (seconds).
  • tcp_nodelay (tùy chọn): Bật/tắt TCP_NODELAY.
  • http2_keep_alive_interval_secs (tùy chọn): HTTP/2 keepalive interval (seconds).
  • http2_keep_alive_timeout_secs (tùy chọn): HTTP/2 keepalive timeout (seconds).
  • http2_keep_alive_while_idle (tùy chọn): Gửi keepalive khi idle.
  • initial_stream_window_size (tùy chọn): HTTP/2 initial stream window (bytes).
  • initial_connection_window_size (tùy chọn): HTTP/2 initial connection window (bytes).
  • http2_adaptive_window (tùy chọn): Bật adaptive flow control.
  • concurrency_limit (tùy chọn): Giới hạn concurrency ở mức tonic::transport::Endpoint.
  • max_concurrent_requests (tùy chọn): Số request gRPC đồng thời (mặc định: 1).
  • buffer_size (tùy chọn): Buffer nội bộ cho client service.
  • user_agent (tùy chọn): User-Agent header cho outbound requests.
  • message_limits (tùy chọn): Giới hạn kích thước message gRPC (bytes).
    • max_decoding_message_size (tùy chọn).
    • max_encoding_message_size (tùy chọn).
  • headers (tùy chọn): Map {header: value} được gửi kèm mọi request dưới dạng gRPC metadata (mặc định: {}).
    • Dùng để gửi API key (x-api-key) khi server bật grpc_server.auth.
  • use_streaming (tùy chọn): Chọn kiểu RPC gửi dữ liệu (mặc định: true).
    • true: dùng PublishStream.
    • false: dùng Publish (unary).
  • rate_limit (bắt buộc): Batching/throughput control.
  • proxy (tùy chọn): Cấu hình proxy.
    • Lưu ý runtime hiện tại: gRPC shipper chưa hỗ trợ proxy cho Endpoint.
  • reflection (tùy chọn): Client-side reflection (forward-compatible).
    • enabled (tùy chọn, mặc định false).
    • timeout_secs (tùy chọn).
    • Lưu ý runtime hiện tại: có thể chưa được sử dụng.
  • proto (tùy chọn): Cấu hình .proto/descriptor (forward-compatible).
    • path (bắt buộc nếu bật proto).
    • include_paths (tùy chọn, mặc định []).
    • service (tùy chọn).
    • method (tùy chọn).
    • Lưu ý runtime hiện tại: có thể chưa được sử dụng.

BlackHole Sink

sinks:
  blackhole_production:
    type: "blackhole"
    inputs: ["*"] # Optional: specific source identifiers
    url: "https://blackhole.company.com:9200"
    healthcheck: false # Enable health check requests
    request: null # Optional: compatibility request config
    auth:
      type: "basic"
      username: "forwarder"
      password: "secure-password"
    tls:
      ca_cert_path: null # Optional: custom CA bundle
      insecure_skip_verify: false # Skip certificate verification
    headers: # Custom headers
      X-Custom-Header: "value"
    timeouts:
      request_timeout_secs: 30
      connect_timeout_secs: 10
    rate_limit:
      batch_size: 200
      batch_interval: 2000
      batch_max_bytes: 2097152
Chi tiết các tham số:
  • type (bắt buộc): Loại sink, phải là "blackhole"
  • inputs (tùy chọn): Danh sách source identifiers để consume (mặc định: ["*"])
    • ["*"]: Consume tất cả sources
    • ["mqtt_data", "rsyslog_data"]: Chỉ consume từ specific sources
  • url (bắt buộc): URL của BlackHole endpoint (mặc định: http://localhost:9200)
  • healthcheck (bắt buộc): Enable health check requests (mặc định: false)
  • request (tùy chọn): Compatibility request configuration (mặc định: null)
    • retry_attempts (tùy chọn): Số lần retry attempts
    • timeout_secs (tùy chọn): Request timeout in seconds
  • auth (bắt buộc): Cấu hình xác thực (mặc định: "none")
    • type (bắt buộc): Loại xác thực
      • "none": Không xác thực (mặc định)
      • "basic": Username/password authentication
      • "client_cert": Client certificate (mTLS)
      • "jwt": JWT/OIDC bearer token
      • "aws_sigv4": AWS SigV4 signing
    • username (bắt buộc khi type=“basic”): Tên người dùng
    • password (bắt buộc khi type=“basic”): Mật khẩu
    • pkcs12_path (bắt buộc khi type=“client_cert”): Đường dẫn file PKCS#12
    • pkcs12_password (bắt buộc khi type=“client_cert”): Mật khẩu PKCS#12
    • token (bắt buộc khi type=“jwt”): JWT token
    • header_name (bắt buộc khi type=“jwt”): Tên header (mặc định: “Authorization”)
    • region (bắt buộc khi type=“aws_sigv4”): AWS region
    • profile (tùy chọn khi type=“aws_sigv4”): AWS profile
    • role_arn (tùy chọn khi type=“aws_sigv4”): Role ARN để assume
    • service (bắt buộc khi type=“aws_sigv4”): AWS service name
      • "es": Amazon BlackHole Service
      • "aoss": BlackHole Serverless
  • tls (bắt buộc): TLS configuration (mặc định: {ca_cert_path: null, insecure_skip_verify: false})
    • ca_cert_path (tùy chọn): Custom CA bundle file path
    • insecure_skip_verify (bắt buộc): Skip certificate verification (mặc định: false)
  • headers (bắt buộc): Custom headers (mặc định: {})
  • timeouts (bắt buộc): Cấu hình timeout
    • request_timeout_secs (bắt buộc): Request timeout tính bằng giây (mặc định: 30)
    • connect_timeout_secs (bắt buộc): Connect timeout tính bằng giây (mặc định: 10)
  • rate_limit (bắt buộc): Cấu hình giới hạn tốc độ
    • batch_size (bắt buộc): Số message trong mỗi batch (mặc định: 500)
    • batch_interval (bắt buộc): Khoảng thời gian giữa các batch tính bằng ms (mặc định: 1000ms)
    • batch_max_bytes (bắt buộc): Kích thước tối đa mỗi batch tính bằng bytes (mặc định: 10485760 bytes = 10MB)

Transforms Configuration

Các transform cho phép xây dựng pipeline xử lý dữ liệu giữa sourcessinks. Mỗi transform được định nghĩa dưới khóa transforms với định danh duy nhất và thuộc tính type. Transform có thể nhận dữ liệu từ một hoặc nhiều inputs, bao gồm nguồn (sources), transform khác, hoặc các kênh được tạo bởi Route Transform.
transforms:
  drop_noise:
    type: "filter"
    inputs: ["rsyslog_network"]
    condition: 'match!(.severity, "error|critical")'

  normalize_logs:
    type: "remap"
    inputs: ["drop_noise"]
    source: |
      .host = to_string!(.host)
      .severity = downcase!(.severity)
Các thuộc tính chung:
  • inputs (tùy chọn): Danh sách định danh nguồn hoặc transform cung cấp dữ liệu đầu vào. Có thể tham chiếu các route channel được tạo từ Route Transform.
  • identifier: Được sinh tự động bởi Forwarder; không cần cấu hình trong YAML.
Lưu ý: Không sử dụng định danh "*" cho transform vì được Forwarder dành riêng cho hệ thống.

Filter Transform

Lọc sự kiện bằng biểu thức VRL, chỉ giữ lại các sự kiện thỏa điều kiện.
transforms:
  error_only:
    type: "filter"
    inputs:
      - rsyslog_network
    condition: '.severity == "error"'
  • condition (bắt buộc): Biểu thức VRL trả về boolean. Sự kiện cho kết quả true sẽ tiếp tục pipeline.

Route Transform

Định tuyến sự kiện vào nhiều kênh dựa trên điều kiện VRL. Mỗi route tạo ra một kênh mới mang tên route.
transforms:
  split_by_type:
    type: "route"
    inputs:
      - mqtt_agents
    routes:
      security: r#'contains!(.sourcetype, "security")'#
      audit: r#'.category == "audit"'#
  • routes (bắt buộc): Map {route_name: vrl_condition}. Sự kiện thỏa điều kiện sẽ được gửi đến kênh tương ứng (security, audit, …). Sự kiện có thể khớp nhiều route.

Throttle Transform

Giới hạn tốc độ sự kiện dựa trên ngưỡng và cửa sổ thời gian.
transforms:
  rate_limit_auth:
    type: "throttle"
    inputs:
      - security
    threshold: 100
    window_ms: 60000
    key_field: "{{ ip }}"
    exclude: ".is_trusted == true"
  • threshold (bắt buộc): Số sự kiện tối đa cho mỗi bucket trong một cửa sổ.
  • window_ms (bắt buộc): Kích thước cửa sổ thời gian tính bằng millisecond.
  • key_field (tùy chọn): Template VRL để nhóm sự kiện theo khóa (ví dụ địa chỉ IP). Nếu không đặt, mọi sự kiện dùng chung một bucket.
  • exclude (tùy chọn): Biểu thức VRL; sự kiện trả về true sẽ bỏ qua throttling.

Reduce Transform

Gộp nhiều sự kiện thành một sự kiện tổng hợp dựa trên điều kiện kết thúc và khóa nhóm.
transforms:
  session_reduce:
    type: "reduce"
    inputs:
      - mqtt_agents
    condition: ".final == true"
    group_by: "{{ session_id }}"
    time: 120000
    merge_strategies:
      logs: "array"
      response_time: "max"
  • condition (bắt buộc): Biểu thức VRL xác định sự kiện kết thúc chuỗi (flush group).
  • group_by (tùy chọn): Chuỗi template VRL để xác định khóa nhóm. Mặc định sử dụng source_identifier kết hợp địa chỉ IP.
  • time (tùy chọn): Thời gian (ms) sau đó group sẽ tự động flush nếu chưa hoàn thành.
  • merge_strategies (tùy chọn): Map {field_name: strategy} điều khiển cách gộp giá trị. Chiến lược hỗ trợ: array, concat, concat_newline, concat_raw, discard, flat_unique, longest_array, max, min, retain, shortest_array, sum, first, last.

Dedupe Transform

Loại bỏ sự kiện trùng lặp trong cửa sổ thời gian dựa trên các trường được chỉ định.
transforms:
  dedupe_raw:
    type: "dedupe"
    inputs:
      - rsyslog_network
    fields:
      match: ["_raw"]
    window:
      window_ms: 60000
      refresh_on_drop: false
  • fields.match (tùy chọn): Danh sách các trường cần khớp để xem là trùng lặp. Mặc định: ["_raw"].
  • fields.ignore (tùy chọn): Danh sách trường bỏ qua khi so sánh (không được cấu hình đồng thời với match).
  • window.window_ms (tùy chọn): Kích thước cửa sổ thời gian (ms) để theo dõi bản ghi trùng (mặc định 60000).
  • window.refresh_on_drop (tùy chọn): Khi true, thời gian sống của bản ghi được gia hạn khi có bản trùng bị loại.

Remap Transform

Biến đổi dữ liệu bằng mã VRL inline hoặc từ file.
transforms:
  enrich_event:
    type: "remap"
    inputs:
      - normalize_logs
    source: |
      .timestamp = to_timestamp!(.timestamp)
      .tags = (.tags ?? []) + ["forwarder"]
      if .status == "failed" { abort }
    drop_on_abort: true
    drop_on_error: false
  • source (tùy chọn): Đoạn mã VRL inline. Có thể dùng nhiều dòng với syntax literal.
  • file (tùy chọn): Đường dẫn đến một file VRL duy nhất.
  • files (tùy chọn): Danh sách file VRL sẽ được chạy tuần tự.
  • drop_on_abort (tùy chọn, mặc định true): Loại bỏ sự kiện khi script gọi abort.
  • drop_on_error (tùy chọn, mặc định false): Loại bỏ sự kiện khi gặp lỗi xử lý.
Gợi ý: Kết hợp route, filterremap để xây dựng pipeline phức tạp trước khi gửi dữ liệu đến các sinks.

MQTT Server Configuration

Cấu hình embedded MQTT broker:
mqtt_server:
  router:
    max_connections: 10000 # Số kết nối tối đa
    max_outgoing_packet_count: 200 # Số packet tối đa trong queue
    max_segment_size: 104857600 # Kích thước segment tối đa (100MB)
    max_segment_count: 10 # Số segment tối đa
    shared_subscriptions_strategy: "random" # sticky, roundrobin, random
    initialized_filters: [] # Filters khởi tạo sẵn
    custom_segments: [] # Custom segment configs
  server:
    name: "v4-1" # Tên server instance
    listen: "0.0.0.0:1883" # Địa chỉ listen
    next_connection_delay_ms: 1 # Delay giữa các kết nối
    tls: # TLS configuration (tùy chọn)
      capath: "/path/to/ca.pem"
      certpath: "/path/to/server.pem"
      keypath: "/path/to/server.key"
    connections:
      connection_timeout_ms: 60000 # Timeout kết nối
      max_payload_size: 104857600 # Kích thước payload tối đa (100MB)
      max_inflight_count: 100 # Số message inflight tối đa
      dynamic_filters: true # Cho phép dynamic filters
  console: # Debug console (tùy chọn)
    listen: "127.0.0.1:8080"
  auth: # Authentication configs
    - type: "basic"
      username: "agent"
      password: "password"
    - type: "basic"
      username: "forwarder"
      password: "secure-password"
Chi tiết các tham số:

Router Configuration

  • max_connections (bắt buộc): Số kết nối tối đa (mặc định: 10000)
  • max_outgoing_packet_count (bắt buộc): Số packet tối đa trong queue (mặc định: 200)
  • max_segment_size (bắt buộc): Kích thước segment tối đa tính bằng bytes (mặc định: 104857600 bytes = 100MB)
  • max_segment_count (bắt buộc): Số segment tối đa (mặc định: 10)
  • shared_subscriptions_strategy (bắt buộc): Chiến lược cho shared subscriptions (mặc định: "random")
    • "sticky": Sticky sessions - client luôn kết nối đến cùng broker
    • "roundrobin": Round-robin - phân phối đều giữa các brokers
    • "random": Random - phân phối ngẫu nhiên
  • initialized_filters (bắt buộc): Filters khởi tạo sẵn (mặc định: [])
  • custom_segments (bắt buộc): Custom segment configs (mặc định: [])

Server Configuration

  • name (bắt buộc): Tên server instance (mặc định: "v4-1")
  • listen (bắt buộc): Địa chỉ listen (mặc định: "0.0.0.0:1883")
  • next_connection_delay_ms (bắt buộc): Delay giữa các kết nối tính bằng ms (mặc định: 1)
  • tls (tùy chọn): Cấu hình TLS
    • capath (tùy chọn): Đường dẫn CA certificate
    • certpath (bắt buộc): Đường dẫn server certificate
    • keypath (bắt buộc): Đường dẫn server private key
  • connections (bắt buộc): Cấu hình kết nối
    • connection_timeout_ms (bắt buộc): Timeout kết nối tính bằng ms (mặc định: 60000)
    • max_payload_size (bắt buộc): Kích thước payload tối đa tính bằng bytes (mặc định: 104857600 bytes = 100MB)
    • max_inflight_count (bắt buộc): Số message inflight tối đa (mặc định: 100)
    • dynamic_filters (bắt buộc): Cho phép dynamic filters (mặc định: true)

Console Configuration (tùy chọn)

  • listen (bắt buộc): Địa chỉ listen cho debug console

Authentication Configuration

  • auth (bắt buộc): Danh sách cấu hình xác thực
    • type (bắt buộc): Loại xác thực
      • "none": Không xác thực
      • "basic": Username/password authentication
    • username (bắt buộc khi type=“basic”): Tên người dùng
    • password (bắt buộc khi type=“basic”): Mật khẩu
Ví dụ cấu hình MQTT Server:
mqtt_server:
  # Cấu hình cho môi trường development
  router:
    max_connections: 1000
    max_outgoing_packet_count: 100
    max_segment_size: 52428800 # 50MB
    max_segment_count: 5
    shared_subscriptions_strategy: "roundrobin"
    initialized_filters: []
    custom_segments: []
  server:
    name: "dev-mqtt"
    listen: "127.0.0.1:1883"
    next_connection_delay_ms: 1
    connections:
      connection_timeout_ms: 30000
      max_payload_size: 52428800 # 50MB
      max_inflight_count: 50
      dynamic_filters: true
  auth:
    - type: "basic"
      username: "dev"
      password: "dev123"

  # Cấu hình cho môi trường production
  router:
    max_connections: 50000
    max_outgoing_packet_count: 500
    max_segment_size: 209715200 # 200MB
    max_segment_count: 20
    shared_subscriptions_strategy: "random"
    initialized_filters: []
    custom_segments: []
  server:
    name: "prod-mqtt"
    listen: "0.0.0.0:1883"
    next_connection_delay_ms: 1
    tls:
      capath: "/etc/ssl/mqtt/ca-cert.pem"
      certpath: "/etc/ssl/mqtt/server-cert.pem"
      keypath: "/etc/ssl/mqtt/server-key.pem"
    connections:
      connection_timeout_ms: 60000
      max_payload_size: 104857600 # 100MB
      max_inflight_count: 100
      dynamic_filters: true
  console:
    listen: "127.0.0.1:8080"
  auth:
    - type: "basic"
      username: "agent"
      password: "secure-password"
    - type: "basic"
      username: "forwarder"
      password: "secure-password"

gRPC Server Configuration

Cấu hình embedded gRPC server để nhận log events từ client (Agent/Forwarder khác). Server này cung cấp service LogIngestService với 2 RPC: PublishPublishStream.
grpc_server:
  listen_addr: "0.0.0.0:50051"
  # Bật API-key authentication (tùy chọn)
  auth:
    header_name: "x-api-key"
    api_keys:
      - "forwarder-secret"
  # Bật TLS (tùy chọn)
  tls:
    cert_path: "/etc/ssl/grpc/server-cert.pem"
    key_path: "/etc/ssl/grpc/server-key.pem"
    ca_cert_path: "/etc/ssl/grpc/ca-cert.pem" # Optional: dùng cho client cert verify
  max_decoding_message_size: 4194304
  max_encoding_message_size: 4194304
  http2_keep_alive_interval_secs: 30
  http2_keep_alive_timeout_secs: 10
  reflection:
    enabled: true
    include_health_service: true
Chi tiết các tham số:
  • listen_addr (bắt buộc): Địa chỉ bind dạng "ip:port" (mặc định: "0.0.0.0:50051").
  • auth (tùy chọn): API-key authentication cho request gRPC.
    • header_name (tùy chọn): Tên header/metadata để mang API key (mặc định: "x-api-key").
    • api_keys (tùy chọn): Danh sách API keys hợp lệ. Nếu danh sách rỗng, auth sẽ không được bật.
  • tls (tùy chọn): TLS cho gRPC server.
    • cert_path (bắt buộc khi bật TLS): Đường dẫn server certificate (PEM).
    • key_path (bắt buộc khi bật TLS): Đường dẫn server private key (PEM).
    • ca_cert_path (tùy chọn): CA bundle (PEM). Khi cấu hình, server sẽ cấu hình trust roots cho client CA.
    • require_client_cert (tùy chọn, mặc định false): Yêu cầu client certificate (mTLS).
    • insecure_skip_verify (tùy chọn, mặc định false).
    • Lưu ý runtime hiện tại: TLS runtime hiện cấu hình identity + ca_cert_path (nếu có). require_client_certinsecure_skip_verify hiện chưa được áp dụng.
  • tcp_nodelay (tùy chọn): Bật/tắt TCP_NODELAY cho listener.
  • tcp_keepalive_seconds (tùy chọn): TCP keepalive (seconds) cho listener.
  • max_decoding_message_size, max_encoding_message_size (tùy chọn): Giới hạn kích thước message gRPC (bytes).
  • http2_keep_alive_interval_secs, http2_keep_alive_timeout_secs (tùy chọn): HTTP/2 keepalive.
    • http2_keep_alive_while_idle (tùy chọn): Có trong schema nhưng hiện chưa được áp dụng.
  • initial_stream_window_size (tùy chọn): HTTP/2 initial stream window (bytes).
  • initial_connection_window_size (tùy chọn): HTTP/2 initial connection window (bytes).
  • max_concurrent_streams (tùy chọn): Max concurrent HTTP/2 streams per connection.
  • max_frame_size (tùy chọn): Max HTTP/2 frame size (bytes).
  • http2_adaptive_window (tùy chọn): Bật adaptive flow control.
  • concurrency_limit_per_connection (tùy chọn): Giới hạn concurrent requests per connection.
  • max_connection_age_secs (tùy chọn): Đóng connection sau (N) giây.
  • max_connection_age_grace_secs (tùy chọn): Grace period sau max age.
    • Lưu ý runtime hiện tại: có trong schema nhưng hiện chưa được áp dụng.
  • timeout_secs (tùy chọn): Soft per-request timeout ở layer service.
    • Lưu ý runtime hiện tại: có trong schema nhưng hiện chưa được áp dụng.
  • extra (tùy chọn): Map {key: value} cho tuning/flags nâng cao (mặc định: {}).
  • reflection (tùy chọn): Server-side reflection.
    • enabled (tùy chọn): Bật reflection (mặc định: true).
    • include_health_service (tùy chọn): Expose gRPC health service cùng reflection.
    • Lưu ý runtime hiện tại: runtime hiện chỉ bật reflection service; health service chưa được expose.
Kết nối từ client (gợi ý):
  • Nếu bật grpc_server.auth, hãy thêm headers.x-api-key trong gRPC sink của client.

Rsyslog Server Configuration

Cấu hình rsyslog server:
rsyslog_server:
  udp_addr: "0.0.0.0:514" # UDP listen address
  tcp_addr: "0.0.0.0:514" # TCP listen address
  default_source: "rsyslog" # Default source cho những ip không được liệt kê trong includes

SNMP Server Configuration

Cấu hình SNMP server cho trap listener:
snmp_server:
  trap_addr: "0.0.0.0:162" # UDP listen address cho trap reception
  max_datagram_size: 8192 # Kích thước tối đa trap datagram (bytes)
  trap_channel_capacity: 1024 # Buffer capacity cho trap channel
  max_inflight_traps: 2048 # Số trap parsing tasks tối đa đồng thời
  max_poll_concurrency: 64 # Số poll requests tối đa đồng thời
  default_source: "snmp_default" # Default source cho trap từ IP chưa cấu hình
  default_index: null # Index mặc định cho default source (tùy chọn)
  default_sourcetype: null # Sourcetype mặc định cho default source (tùy chọn)
  require_v3: false # Yêu cầu SNMPv3 (true = từ chối v1/v2c)
  warn_insecure_versions: true # Cảnh báo khi sử dụng v1/v2c
Chi tiết các tham số:
  • trap_addr (bắt buộc): Địa chỉ UDP để lắng nghe SNMP traps (mặc định: "0.0.0.0:162")
    • "0.0.0.0:162": Lắng nghe trên tất cả interfaces
    • "192.168.1.10:162": Chỉ lắng nghe trên interface cụ thể
  • max_datagram_size (bắt buộc): Kích thước tối đa trap datagram tính bằng bytes (mặc định: 8192)
    • Datagram lớn hơn sẽ bị cắt để tránh buffer exhaustion attacks
  • trap_channel_capacity (bắt buộc): Buffer capacity cho trap channel (mặc định: 1024)
    • Điều chỉnh dựa trên burst rate; backpressure được áp dụng khi đạt giới hạn
  • max_inflight_traps (bắt buộc): Số trap parsing tasks tối đa đồng thời (mặc định: 2048)
    • Bảo vệ runtime khỏi burst traffic hoặc malicious traffic
  • max_poll_concurrency (bắt buộc): Số poll requests tối đa đồng thời (mặc định: 64)
    • Ngăn pollers làm quá tải thiết bị mạng khi có nhiều sources
  • default_source (bắt buộc): Source identifier cho trap từ IP chưa được cấu hình (mặc định: "snmp_default")
    • Tương tự rsyslog server, trap từ thiết bị chưa có trong includes sẽ được route đến source này
  • default_index (tùy chọn): Index mặc định cho default source events (mặc định: null)
  • default_sourcetype (tùy chọn): Sourcetype mặc định cho default source events (mặc định: null)
  • require_v3 (bắt buộc): Yêu cầu SNMPv3 cho tất cả sources (mặc định: true)
    • true: Từ chối khởi động poller với v1/v2c
    • false: Cho phép v1/v2c (hữu ích cho testing hoặc legacy devices)
  • warn_insecure_versions (bắt buộc): Cảnh báo khi sử dụng v1/v2c (mặc định: true)
    • Không chặn collector nhưng cung cấp visibility về insecure protocol usage
Ví dụ cấu hình SNMP Server:
# Cấu hình cho môi trường development
snmp_server:
  trap_addr: "0.0.0.0:162"
  max_datagram_size: 4096
  trap_channel_capacity: 512
  max_inflight_traps: 1024
  max_poll_concurrency: 32
  default_source: "snmp_default"
  require_v3: false  # Cho phép v1/v2c để test
  warn_insecure_versions: true

# Cấu hình cho môi trường production
snmp_server:
  trap_addr: "0.0.0.0:162"
  max_datagram_size: 8192
  trap_channel_capacity: 2048
  max_inflight_traps: 4096
  max_poll_concurrency: 128
  default_source: "snmp_unregistered"
  default_index: "snmp_fallback"
  default_sourcetype: "snmp_trap"
  require_v3: true  # Chỉ cho phép SNMPv3
  warn_insecure_versions: true

Proxy Server Configuration

Cấu hình HTTP CONNECT proxy:
proxy_server:
  addr: "127.0.0.1:8080" # Proxy listen address
  username: null # Username cho authentication (optional)
  password: null # Password cho authentication (optional)
  max_connections: 1000 # Maximum concurrent connections (0 = unlimited)
  connect_timeout_sec: 10 # Connect timeout in seconds
  idle_timeout_sec: 300 # Idle timeout per connection in seconds (0 = disabled)
  auth_timeout_sec: 30 # Authentication timeout in seconds
  keepalive_sec: 60 # TCP keepalive interval in seconds (0 = disabled)
  nodelay: true # Enable TCP_NODELAY on sockets
  realm: "Proxy Authentication Required" # Authentication realm for HTTP Basic auth
  buffer_size: 8192 # Buffer size for I/O operations in bytes

Inventory Configuration

Cấu hình queue persistent cho dữ liệu:
inventory:
  max_messages: 100000 # Số message tối đa
  max_bytes: 268435456 # Dung lượng tối đa (256MB)
  backpressure_on_limit: false # Áp dụng backpressure khi đạt giới hạn
  per_shipper: # Cấu hình cho từng shipper
    mqtt:
      ttl: 86400 # Time-to-live (giây)
      max_retries: 20 # Số lần retry tối đa
      flush_interval_secs: 5 # Chu kỳ flush
    kafka:
      ttl: 3600
      max_retries: 10
      flush_interval_secs: 10
    blackhole:
      ttl: 7200
      max_retries: 15
      flush_interval_secs: 5

Hướng dẫn tạo chứng chỉ SSL

Tạo chứng chỉ SSL tự ký cho MQTT Broker

Để cấu hình MQTT broker với TLS, bạn cần tạo chứng chỉ SSL. Dưới đây là hướng dẫn tạo chứng chỉ tự ký:

Bước 1: Tạo thư mục lưu trữ chứng chỉ

sudo mkdir -p /etc/ssl/mqtt
sudo chmod 700 /etc/ssl/mqtt

Bước 2: Tạo CA (Certificate Authority) tự ký

# Tạo private key cho CA
sudo openssl genrsa -out /etc/ssl/mqtt/ca-key.pem 4096

# Tạo certificate cho CA
sudo openssl req -new -x509 -days 365 -key /etc/ssl/mqtt/ca-key.pem -out /etc/ssl/mqtt/ca-cert.pem
Trong quá trình tạo CA certificate, bạn sẽ được yêu cầu nhập thông tin:
  • Country Name: VN
  • State: Ho Chi Minh
  • City: Ho Chi Minh City
  • Organization: Your Company
  • Organizational Unit: IT Department
  • Common Name: MQTT CA
  • Email: admin@yourcompany.com

Bước 3: Tạo server certificate

# Tạo private key cho server
sudo openssl genrsa -out /etc/ssl/mqtt/server-key.pem 4096

# Tạo certificate signing request (CSR)
sudo openssl req -new -key /etc/ssl/mqtt/server-key.pem -out /etc/ssl/mqtt/server.csr
Nhập thông tin tương tự như CA, nhưng Common Name phải là hostname hoặc IP của MQTT broker.

Bước 4: Tạo server certificate từ CSR

# Tạo server certificate từ CSR
sudo openssl x509 -req -in /etc/ssl/mqtt/server.csr -CA /etc/ssl/mqtt/ca-cert.pem -CAkey /etc/ssl/mqtt/ca-key.pem -CAcreateserial -out /etc/ssl/mqtt/server-cert.pem -days 365

Bước 5: Tạo client certificate (tùy chọn)

# Tạo private key cho client
sudo openssl genrsa -out /etc/ssl/mqtt/client-key.pem 4096

# Tạo CSR cho client
sudo openssl req -new -key /etc/ssl/mqtt/client-key.pem -out /etc/ssl/mqtt/client.csr

# Tạo client certificate
sudo openssl x509 -req -in /etc/ssl/mqtt/client.csr -CA /etc/ssl/mqtt/ca-cert.pem -CAkey /etc/ssl/mqtt/ca-key.pem -CAcreateserial -out /etc/ssl/mqtt/client-cert.pem -days 365

Bước 6: Cấu hình MQTT Server với TLS

mqtt_server:
  server:
    name: "secure-mqtt"
    listen: "0.0.0.0:8883"
    tls:
      capath: "/etc/ssl/mqtt/ca-cert.pem"
      certpath: "/etc/ssl/mqtt/server-cert.pem"
      keypath: "/etc/ssl/mqtt/server-key.pem"
    connections:
      connection_timeout_ms: 60000
      max_payload_size: 104857600
      max_inflight_count: 100
      dynamic_filters: true

Cấu hình TLS cho MQTT Client (Source và Sink)

Sau khi đã tạo chứng chỉ SSL cho MQTT broker, bạn cần cấu hình client (agent/forwarder) để kết nối với broker sử dụng TLS. Điều này đặc biệt quan trọng khi sử dụng chứng chỉ tự ký (self-signed certificates).

Cấu hình MQTT Source với TLS

Khi cấu hình MQTT source để subscribe từ broker có TLS, bạn cần chỉ định đường dẫn đến CA certificate:
sources:
  mqtt_secure_source:
    type: "mqtt"
    broker:
      url: "mqtts://broker.company.com:8883"
      auth:
        type: "basic"
        username: "myuser"
        password: "mypass"
      tls:
        ca_cert_path: "/etc/ssl/mqtt/ca-cert.pem"
    topics:
      - "sensors/+/temperature"
      - "devices/#"
    index: "mqtt_data"

Cấu hình MQTT Sink với TLS

Tương tự, khi cấu hình MQTT sink để publish đến broker có TLS:
sinks:
  mqtt_secure_sink:
    type: "mqtt"
    url: "mqtts://broker.company.com:8883"
    auth:
      type: "basic"
      username: "myuser"
      password: "mypass"
    tls:
      ca_cert_path: "/etc/ssl/mqtt/ca-cert.pem"
    rate_limit:
      batch_size: 100
      batch_interval: 1000
      batch_max_bytes: 1048576

Cấu hình Mutual TLS (mTLS)

Nếu broker yêu cầu client certificate authentication (mutual TLS), bạn cần cung cấp cả client certificate và key:
sources:
  mqtt_mtls_source:
    type: "mqtt"
    broker:
      url: "mqtts://broker.company.com:8883"
      auth:
        type: "none"
      tls:
        ca_cert_path: "/etc/ssl/mqtt/ca-cert.pem"
        client_cert_path: "/etc/ssl/mqtt/client-cert.pem"
        client_key_path: "/etc/ssl/mqtt/client-key.pem"
    topics:
      - "secure/data/#"
Lưu ý: Khi sử dụng mTLS, bạn thường không cần username/password vì client certificate đã đóng vai trò xác thực.

Các tùy chọn TLS

  • ca_cert_path: Đường dẫn đến CA certificate file (PEM format). Bắt buộc khi sử dụng chứng chỉ tự ký hoặc custom CA.
  • client_cert_path: Đường dẫn đến client certificate file (PEM format). Chỉ cần khi broker yêu cầu client authentication.
  • client_key_path: Đường dẫn đến client private key file (PEM format). Phải được cung cấp cùng với client_cert_path.
  • insecure: Bỏ qua xác thực certificate (mặc định: false). Không khuyến nghị sử dụng trong môi trường production vì làm giảm bảo mật.

Xử lý lỗi TLS

Nếu bạn gặp lỗi TLS: I/O: tls handshake eof hoặc các lỗi TLS khác, hãy kiểm tra:
  1. CA certificate path đúng chưa? Đảm bảo đường dẫn đến file CA certificate là chính xác và file có thể đọc được.
  2. Certificate format đúng chưa? Các certificate phải ở định dạng PEM (text format, bắt đầu với -----BEGIN CERTIFICATE-----).
  3. Quyền truy cập file: Đảm bảo agent/forwarder có quyền đọc các file certificate.
  4. URL protocol: Sử dụng mqtts:// thay vì mqtt:// cho kết nối TLS.
  5. Port đúng chưa? Port mặc định cho MQTT over TLS là 8883, không phải 1883.
Ví dụ cấu hình đầy đủ với TLS:
sources:
  mqtt_production:
    type: "mqtt"
    broker:
      url: "mqtts://mqtt.company.com:8883"
      auth:
        type: "basic"
        username: "agent_user"
        password: "secure_password"
      tls:
        ca_cert_path: "/etc/ssl/mqtt/ca-cert.pem"
    topics:
      - "logs/#"
      - "metrics/#"
    index: "production_data"

sinks:
  mqtt_production:
    type: "mqtt"
    url: "mqtts://mqtt.company.com:8883"
    auth:
      type: "basic"
      username: "agent_user"
      password: "secure_password"
    tls:
      ca_cert_path: "/etc/ssl/mqtt/ca-cert.pem"
    rate_limit:
      batch_size: 500
      batch_interval: 2000
      batch_max_bytes: 5242880

Tạo chứng chỉ SSL cho Kafka

Bước 1: Tạo keystore cho Kafka broker

# Tạo keystore
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -keysize 2048 -storepass password -keypass password -dname "CN=localhost, OU=IT, O=YourCompany, L=HCMC, ST=HCMC, C=VN"

# Tạo certificate signing request
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file -storepass password

# Tạo certificate từ CSR (cần CA)
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:password

# Import CA certificate vào keystore
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass password

# Import signed certificate vào keystore
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed -storepass password

Bước 2: Tạo truststore cho client

# Tạo truststore và import CA certificate
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass password

Bước 3: Cấu hình Kafka Sink với TLS

sinks:
  kafka_secure:
    type: "kafka"
    bootstrap_servers:
      - "kafka1.company.com:9093"
    security:
      type: "tls"
      tls:
        ca_location: "/path/to/ca-cert"
        certificate_location: "/path/to/client-cert"
        key_location: "/path/to/client-key"
        verify_certificate: true

Cấu hình nâng cao

High Availability Configuration

sinks:
  kafka_ha:
    type: "kafka"
    bootstrap_servers:
      - "kafka1.company.com:9092"
      - "kafka2.company.com:9092"
      - "kafka3.company.com:9092"
    acks: "all" # Đảm bảo replication
    enable_idempotence: true # Tránh duplicate
    retries:
      max_retries: 2147483647 # Retry vô hạn
      backoff_ms: 100

Performance Tuning

# Tối ưu cho high-throughput
inventory:
  max_messages: 1000000
  max_bytes: 1073741824 # 1GB
  backpressure_on_limit: true

mqtt_server:
  router:
    max_connections: 50000
    max_segment_size: 209715200 # 200MB
    max_segment_count: 20

sinks:
  high_throughput_kafka:
    type: "kafka"
    batching:
      linger_ms: 0 # Không delay
      batch_num_messages: 10000 # Batch lớn
      batch_kbytes: 10240 # 10MB batches
    rate_limit:
      batch_size: 1000
      batch_interval: 100
      batch_max_bytes: 10485760