Architecture
The collector pipeline has three components:- Input: Reads data with offset management and backpressure handling
- Processors: Transforms data using Bloblang mapping language
- Output: Batches and sends to Flexprice API with retries
How Batching Works
Events accumulate until either condition is met:- Count threshold: Batch reaches configured size (e.g., 10 events)
- Time window: Time limit reached (e.g., 2 seconds)
- 1 event:
POST /events - multiple events:
POST /events/bulk
Data Transformation
Use Bloblang to transform your data format to Flexprice requirements:Type Conversion
All property values must be strings:{"duration": 245, "status": 200}After:
{"duration": "245", "status": "200"}
Conditional Processing
Performance Optimization
Kafka Input
Output Concurrency
- Low volume:
max_in_flight: 5 - Medium volume:
max_in_flight: 10(default) - High volume:
max_in_flight: 20-50
Error Handling
The collector automatically retries failed requests:- Fail → Wait 1s → Retry
- Fail → Wait 2s → Retry
- Fail → Wait 4s → Retry
- Drop and log error
Observability
Metrics
Key Prometheus metrics:| Metric | Description |
|---|---|
bento_input_received | Messages received |
bento_output_sent | Messages sent successfully |
bento_output_error | Output errors |
bento_batch_created | Batches created |
Logging
Configure log level and format:Additional Resources
- Bento Docs: warpstreamlabs.github.io/bento
- Bloblang Guide: Mapping language
- Flexprice Events API: Events reference

