Skip to main content
Deep dive into the Bento Collector’s internals and optimization.

Architecture

The collector pipeline has three components:
Input (Kafka) → Processors (Transform) → Output (Flexprice)
  1. Input: Reads data with offset management and backpressure handling
  2. Processors: Transforms data using Bloblang mapping language
  3. Output: Batches and sends to Flexprice API with retries

How Batching Works

Events accumulate until either condition is met:
  • Count threshold: Batch reaches configured size (e.g., 10 events)
  • Time window: Time limit reached (e.g., 2 seconds)
Example timeline:
Time:  0s    0.5s    1s    1.5s    2s
Event: 1─────2──────3─────4──────5
                                  └─▶ Batch sent (5 events, 2s elapsed)
The collector automatically routes to the appropriate endpoint:
  • 1 event: POST /events
  • multiple events: POST /events/bulk

Data Transformation

Use Bloblang to transform your data format to Flexprice requirements:

Type Conversion

All property values must be strings:
pipeline:
  processors:
    - mapping: |
        root.properties = this.properties.map_each(p -> p.value.string())
Before: {"duration": 245, "status": 200}
After: {"duration": "245", "status": "200"}

Conditional Processing

pipeline:
  processors:
    - mapping: |
        # Only process successful requests
        root = if this.status >= 200 && this.status < 300 {
          this
        } else {
          deleted()
        }

Performance Optimization

Kafka Input

input:
  kafka:
    fetch_buffer_cap: 256        # Fetch more messages at once
    checkpoint_limit: 1024       # Buffer messages in flight
    commit_period: 1s

Output Concurrency

output:
  flexprice:
    max_in_flight: 10  # Parallel API requests
    batching:
      count: 50
      period: 5s
Guidelines:
  • Low volume: max_in_flight: 5
  • Medium volume: max_in_flight: 10 (default)
  • High volume: max_in_flight: 20-50

Error Handling

The collector automatically retries failed requests:
output:
  flexprice:
    retry_max_attempts: 3
    retry_backoff:
      initial_interval: 1s
      max_interval: 60s
Retry sequence:
  1. Fail → Wait 1s → Retry
  2. Fail → Wait 2s → Retry
  3. Fail → Wait 4s → Retry
  4. Drop and log error

Observability

Metrics

Key Prometheus metrics:
MetricDescription
bento_input_receivedMessages received
bento_output_sentMessages sent successfully
bento_output_errorOutput errors
bento_batch_createdBatches created

Logging

Configure log level and format:
logger:
  level: INFO           # DEBUG, INFO, WARN, ERROR
  format: logfmt        # or json
  add_timestamp: true

Additional Resources

Need help? [email protected]