> ## Documentation Index
> Fetch the complete documentation index at: https://docs.flexprice.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Flexprice Collector

> Stream usage events from Kafka and other sources into Flexprice with reliable delivery

The Flexprice Collector makes it easy to send usage data into Flexprice. It takes care of reading, transforming, and delivering events so you don’t have to worry about scaling or building custom pipelines.

Just plug in your data source, and the collector handles the rest.

## How It Works

The Flexprice Collector is built on [Bento](https://warpstreamlabs.github.io/bento/docs/about), a robust stream processing tool. Bento's strong delivery guarantees and retry capabilities make it ideal for reliable event ingestion.

The collector connects your data sources (Kafka, webhooks, databases) to Flexprice through a processing pipeline with three components:

* **Inputs** read data from various sources (Kafka, webhooks, databases, files)
* **Processors** validate, transform, and filter data to match Flexprice format
* **Outputs** send data to Flexprice API with bulk batching and retry logic

## Key Features

The Flexprice Collector is optimized for high-performance usage metering:

* **Automatic Batching**: Groups events for bulk ingestion (10 events or 2s by default)
* **Smart Routing**: Uses `/events` for single events, `/events/bulk` for batches
* **Retry Logic**: Exponential backoff ensures no data loss on network failures
* **High Throughput**: Handles millions of events per day with minimal latency

## Ingesting Data into Flexprice

The collector always outputs to the Flexprice API:

```yaml theme={null}
output:
  flexprice:
    api_host: https://api.flexprice.io
    api_key: ${FLEXPRICE_API_KEY}
    scheme: https
    batching:
      count: 10
      period: 2s
```

Transform your data to match Flexprice event format:

```yaml theme={null}
pipeline:
  processors:
    - mapping: |
        root.event_name = this.event_name
        root.external_customer_id = this.customer_id
        root.properties = this.properties.map_each(p -> p.value.string())
        root.timestamp = this.timestamp.or(now().format_timestamp("2006-01-02T15:04:05Z07:00"))
```

Configure your input source. Example with Kafka:

```yaml theme={null}
input:
  kafka:
    addresses: [${KAFKA_BROKERS}]
    topics: [usage-events]
    consumer_group: flexprice-collector
```

The collector supports 100+ input sources. Learn more in the [Bento documentation](https://warpstreamlabs.github.io/bento/docs/components/inputs/about).

## Installation

The Flexprice Collector is available via:

* **Binaries** from GitHub Releases
* **Container images**
* **Kubernetes** deployment (see Get Started guide)

## Performance & Reliability

The collector provides:

* **Bulk Ingestion**: Batches up to 100 events per API call
* **Automatic Retries**: Failed events retry with exponential backoff
* **At-Least-Once Delivery**: Kafka consumer groups ensure no data loss
* **Observability**: Prometheus metrics and structured logging

Contact [support@flexprice.io](mailto:support@flexprice.io) for performance tuning assistance.
