> ## Documentation Index
> Fetch the complete documentation index at: https://docs.flexprice.io/llms.txt
> Use this file to discover all available pages before exploring further.

# How It Works

> Architecture, batching, and performance optimization for the Bento Collector

Deep dive into the Bento Collector's internals and optimization.

## Architecture

The collector pipeline has three components:

```
Input (Kafka) → Processors (Transform) → Output (Flexprice)
```

1. **Input**: Reads data with offset management and backpressure handling
2. **Processors**: Transforms data using Bloblang mapping language
3. **Output**: Batches and sends to Flexprice API with retries

## How Batching Works

Events accumulate until either condition is met:

* **Count threshold**: Batch reaches configured size (e.g., 10 events)
* **Time window**: Time limit reached (e.g., 2 seconds)

**Example timeline:**

```
Time:  0s    0.5s    1s    1.5s    2s
Event: 1─────2──────3─────4──────5
                                  └─▶ Batch sent (5 events, 2s elapsed)
```

The collector automatically routes to the appropriate endpoint:

* **1 event**: `POST /events`
* **multiple events**: `POST /events/bulk`

## Data Transformation

Use Bloblang to transform your data format to Flexprice requirements:

### Type Conversion

All property values must be strings:

```yaml theme={null}
pipeline:
  processors:
    - mapping: |
        root.properties = this.properties.map_each(p -> p.value.string())
```

**Before:** `{"duration": 245, "status": 200}`\
**After:** `{"duration": "245", "status": "200"}`

### Conditional Processing

```yaml theme={null}
pipeline:
  processors:
    - mapping: |
        # Only process successful requests
        root = if this.status >= 200 && this.status < 300 {
          this
        } else {
          deleted()
        }
```

## Performance Optimization

### Kafka Input

```yaml theme={null}
input:
  kafka:
    fetch_buffer_cap: 256        # Fetch more messages at once
    checkpoint_limit: 1024       # Buffer messages in flight
    commit_period: 1s
```

### Output Concurrency

```yaml theme={null}
output:
  flexprice:
    max_in_flight: 10  # Parallel API requests
    batching:
      count: 50
      period: 5s
```

**Guidelines:**

* Low volume: `max_in_flight: 5`
* Medium volume: `max_in_flight: 10` (default)
* High volume: `max_in_flight: 20-50`

## Error Handling

The collector automatically retries failed requests:

```yaml theme={null}
output:
  flexprice:
    retry_max_attempts: 3
    retry_backoff:
      initial_interval: 1s
      max_interval: 60s
```

**Retry sequence:**

1. Fail → Wait 1s → Retry
2. Fail → Wait 2s → Retry
3. Fail → Wait 4s → Retry
4. Drop and log error

## Observability

### Metrics

Key Prometheus metrics:

| Metric                 | Description                |
| ---------------------- | -------------------------- |
| `bento_input_received` | Messages received          |
| `bento_output_sent`    | Messages sent successfully |
| `bento_output_error`   | Output errors              |
| `bento_batch_created`  | Batches created            |

### Logging

Configure log level and format:

```yaml theme={null}
logger:
  level: INFO           # DEBUG, INFO, WARN, ERROR
  format: logfmt        # or json
  add_timestamp: true
```

## Additional Resources

* **Bento Docs**: [warpstreamlabs.github.io/bento](https://warpstreamlabs.github.io/bento/docs/about)
* **Bloblang Guide**: [Mapping language](https://warpstreamlabs.github.io/bento/docs/guides/bloblang/about)
* **Flexprice Events API**: [Events reference](/docs/event-ingestion/overview)

Need help? **[support@flexprice.io](mailto:support@flexprice.io)**
