Documentation Index
Fetch the complete documentation index at: https://docs.flexprice.io/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Flexprice is a comprehensive usage-based billing and subscription management platform built in Go. The system handles the complete billing lifecycle from event ingestion through invoice generation and payment processing, designed for B2B SaaS companies requiring flexible pricing models.
High-Level Architecture
Flexprice implements a three-tier layered architecture with clear separation of concerns, bootstrapped using Uber FX dependency injection framework:
Architecture Layers
Technology Stack
Core Technologies
Flexprice uses a modern Go stack with specialized components:
Key Dependencies:
- Gin - HTTP routing and middleware
- Ent - Type-safe ORM for PostgreSQL
- Uber FX - Dependency injection framework
- ClickHouse - Columnar database for analytics
- Kafka (Sarama) - Event streaming platform
- Temporal - Workflow orchestration engine
- Stripe - Payment processing integration
Dual-Database Strategy
Flexprice employs a specialized database strategy for different workloads:
PostgreSQL (Transactional Data)
- Purpose: ACID transactions, relational data
- Data: Subscriptions, invoices, customers, plans, prices, wallets
- Access Pattern: Read-write, complex joins, referential integrity
- ORM: Ent for type-safe queries
ClickHouse (Analytical Data)
- Purpose: High-volume inserts, aggregation queries
- Data: Events (raw), events_lazy, processed_events, system_events
- Access Pattern: Append-only writes, time-series queries
- Tables: Optimized for usage metering and analytics
Event Processing Pipeline
The core of Flexprice’s usage-based billing is an asynchronous event processing pipeline powered by Kafka:
Pipeline Stages
Deployment Modes
Flexprice supports multiple deployment configurations to separate concerns and scale independently:
Available Modes
| Mode | Components | Purpose |
|---|
local | API Server + Router (with event consumption handlers) + Temporal Worker | Full development environment with all services enabled |
api | API Server + Router (event consumption disabled) | Handle HTTP requests with routing but no event processing |
consumer | Router only (with event consumption handlers) | Dedicated event processing without HTTP API |
temporal_worker | Temporal workers only | Background workflows and scheduled tasks execution |
Mode Implementation Details
local: Starts API server, registers all router handlers with event consumption enabled (true), starts router, and starts Temporal worker
api: Starts API server, registers router handlers with event consumption disabled (false), starts router only
consumer: Requires Kafka consumer, registers all router handlers with event consumption enabled (true), starts router without API server
temporal_worker: Starts only Temporal worker service, no API server or router
Service Layer Architecture
The service layer implements business logic through 30+ domain services organized by domain:
Core Services
- Billing Service: Handles invoice generation, usage calculations, and billing cycles
- Event Service: Processes incoming usage events and manages event lifecycle
- Subscription Service: Manages customer subscriptions, upgrades, and downgrades
- Customer Service: Customer management and tenant operations
- Payment Service: Payment processing and gateway integrations
- Plan Service: Pricing plan management and feature configuration
- Wallet Service: Prepaid credits and wallet balance management
Service Factory Pattern
All services are instantiated through factory functions that receive a centralized ServiceParams struct containing shared dependencies:
type ServiceParams struct {
// Individual Repository Interfaces
AuthRepo auth.Repository
UserRepo user.Repository
EventRepo events.Repository
ProcessedEventRepo events.ProcessedEventRepository
// ... 40+ more domain specific repositories
// Publishers
EventPublisher publisher.EventPublisher
WebhookPublisher webhookPublisher.WebhookPublisher
// Services
Logger *logger.Logger
Config *config.Configuration
DB postgres.IClient
PDFGenerator pdf.Generator
S3 s3.Service
// http client
Client httpclient.Client
// Proration
ProrationCalculator proration.Calculator
// Integration Factory
IntegrationFactory *integration.Factory
}
API Layer & Routing
The API layer uses Gin framework with a comprehensive middleware stack for security, logging, and request processing:
Middleware Stack
- Recovery Middleware: Panic recovery and graceful error handling
- CORS Middleware: Cross-origin resource sharing configuration
- Rate Limiting: Request throttling per API key/tenant
- Authentication: JWT and API key validation
- Tenant Isolation: Multi-tenant context enforcement
- Request Logging: Structured logging for all requests
- Metrics Collection: Prometheus metrics for observability
Authentication & Authorization
Flexprice supports multiple authentication methods:
- API Keys: Tenant-scoped keys for programmatic access
- JWT Tokens: User session tokens with role-based permissions
- Webhook Signatures: HMAC verification for incoming webhooks
API Endpoints Structure
/v1
├── /auth # Authentication endpoints
├── /customers # Customer management
├── /plans # Pricing plans and features
├── /subscriptions # Subscription lifecycle
├── /events # Event ingestion
├── /invoices # Invoice generation and retrieval
├── /usage # Usage queries and aggregations
├── /payments # Payment processing
└── /webhooks # Webhook management
Multi-Tenancy Architecture
Flexprice implements row-level multi-tenancy with environment isolation enforced at multiple architectural layers:
Tenant Isolation Strategy
- Database Level: All tables include
tenant_id and environment columns
- Middleware Level: Request context includes tenant/environment validation
- Service Level: All business logic operations are tenant-scoped
- Repository Level: Automatic filtering on all database queries
Middleware Enforcement
Tenant and environment isolation is enforced through the AuthenticateMiddleware, which handles both authentication and tenant context extraction:
func AuthenticateMiddleware(cfg *config.Configuration, secretService service.SecretService, logger *logger.Logger) gin.HandlerFunc {
return func(c *gin.Context) {
// Check for API key in X-API-Key header
// Check for JWT token in Authorization header
c.Next()
}
}
Authentication Methods
API keys are configured with tenant and user association:
- Tenant API Keys: Scoped to specific tenant and environment
- User Tokens: Associated with user permissions within tenant
- System Keys: Cross-tenant access for administrative operations
Repository Layer
The repository layer provides data access abstraction with automatic tenant/environment filtering:
Repository Interface
type Repository interface {
// All methods automatically filter by tenant/environment from context
GetCustomer(ctx context.Context, customerID string) (*Customer, error)
CreateInvoice(ctx context.Context, invoice *Invoice) error
GetUsageEvents(ctx context.Context, filters EventFilters) ([]Event, error)
}
Automatic Tenant Filtering
All repositories automatically append tenant and environment filters to queries using typed context constants:
func (r *PostgresRepository) GetCustomer(ctx context.Context, customerID string) (*Customer, error) {
tenantID := ctx.Value(types.CtxTenantID).(string)
environment := ctx.Value(types.CtxEnvironmentID).(string)
return r.db.Customer.
Query().
Where(customer.ID(customerID)).
Where(customer.TenantID(tenantID)).
Where(customer.Environment(environment)).
Only(ctx)
}
All repositories are created through factory functions that receive shared dependencies like database connections, caching layers, and configuration.
Message Processing with PubSub Router
Flexprice uses Watermill for message routing with built-in retry mechanisms and rate limiting:
Kafka Integration
router := message.NewRouter(message.RouterConfig{}, logger)
// Event processing pipeline
router.AddHandler(
"events_processor",
"events",
subscriber,
"events_post_processing",
publisher,
eventProcessingHandler,
)
Message Processing Features
- Dead Letter Queues: Failed messages are routed to error topics
- Retry Logic: Exponential backoff for transient failures
- Rate Limiting: Configurable processing rates per topic
- Metrics: Built-in monitoring for message processing
Temporal Workflow Orchestration
Background jobs and scheduled tasks use Temporal for reliable execution and workflow management:
Workflow Types
- Billing Workflows: Monthly/annual invoice generation
- Usage Aggregation: Periodic usage calculations
- Payment Processing: Async payment collection workflows
- Data Exports: Large dataset exports to S3
- Webhook Delivery: Reliable event notifications
Workflow Registration
func RegisterWorkflows(worker worker.Worker) {
// Billing workflows
worker.RegisterWorkflow(billing.GenerateInvoiceWorkflow)
worker.RegisterWorkflow(billing.ProcessPaymentWorkflow)
// Usage workflows
worker.RegisterWorkflow(usage.AggregateUsageWorkflow)
// Export workflows
worker.RegisterWorkflow(export.ExportDataWorkflow)
}
Worker Lifecycle
Workers are deployed separately and can be scaled independently:
func StartTemporalWorker(config *config.Config) {
client := temporal.NewClient()
worker := worker.New(client, "billing-queue", worker.Options{})
RegisterWorkflows(worker)
RegisterActivities(worker)
worker.Run(worker.InterruptCh())
}
External Integrations
Flexprice uses a factory pattern for external service integrations, supporting multiple providers through specific integration implementations:
// Integration Factory creates specific integration instances
type Factory struct {
config *config.Configuration
logger *logger.Logger
}
// Specific integration getters with proper error handling
func (f *Factory) GetStripeIntegration(ctx context.Context) (*StripeIntegration, error) {
// Initialize and return Stripe integration with configuration
return NewStripeIntegration(f.config, f.logger), nil
}
func (f *Factory) GetHubSpotIntegration(ctx context.Context) (*HubSpotIntegration, error) {
// Initialize and return HubSpot integration with configuration
return NewHubSpotIntegration(f.config, f.logger), nil
}
func (f *Factory) GetRazorpayIntegration(ctx context.Context) (*RazorpayIntegration, error) {
// Initialize and return Razorpay integration with configuration
return NewRazorpayIntegration(f.config, f.logger), nil
}
Webhook Delivery
Flexprice has a flexible webhook delivery system with optional Svix integration. By default, it uses an internal webhook system with in-memory or Kafka pub/sub:
type WebhookService struct {
config *config.Configuration
publisher publisher.WebhookPublisher
handler handler.Handler
factory payload.PayloadBuilderFactory
client httpclient.Client
logger *logger.Logger
}
func (w *WebhookService) DeliverEvent(event *Event, endpoints []WebhookEndpoint) error {
// Route based on configuration
if w.config.Webhook.SvixConfig.Enabled {
w.logger.Info("Using Svix for webhook delivery")
return w.deliverViaSvix(event)
}
w.logger.Info("Using internal webhook system")
return w.deliverViaInternalSystem(event, endpoints)
}
// Configuration controls webhook delivery method
// svix_config:
// enabled: false # Default - uses internal system
// enabled: true # Optional - uses Svix for delivery
Infrastructure Services
Monitoring & Observability
Sentry for error tracking and performance monitoring:
sentry.Init(sentry.ClientOptions{
Dsn: config.SentryDSN,
Environment: config.Environment,
TracesSampleRate: 0.1,
})
// Error tracking in handlers
if err != nil {
sentry.CaptureException(err)
return gin.Error{Err: err, Type: gin.ErrorTypePublic}
}
Pyroscope for continuous profiling and performance analysis:
pyroscope.Start(pyroscope.Config{
ApplicationName: "flexprice-api",
ServerAddress: config.PyroscopeURL,
ProfileTypes: []pyroscope.ProfileType{
pyroscope.ProfileCPU,
pyroscope.ProfileAllocObjects,
pyroscope.ProfileAllocSpace,
},
})
Document Storage
Invoice PDFs are stored in S3 with presigned URL generation for secure access:
func (s *S3Service) StoreInvoice(invoiceID string, pdfData []byte) (string, error) {
key := fmt.Sprintf("invoices/%s/%s.pdf", s.tenantID, invoiceID)
_, err := s.client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: bytes.NewReader(pdfData),
})
// Return presigned URL for download
return s.GeneratePresignedURL(key, 24*time.Hour)
}
Caching Layer
Flexprice uses in-memory caching only for performance optimization. Caching can be enabled/disabled via configuration:
// Cache interface in internal/cache/
type Cache interface {
// Get retrieves a value from the cache
// Returns the value and a boolean indicating whether the key was found
Get(ctx context.Context, key string) (interface{}, bool)
// Set adds a value to the cache with the specified expiration
// If expiration is 0, the item never expires (but may be evicted)
Set(ctx context.Context, key string, value interface{}, expiration time.Duration)
// Delete removes a key from the cache
Delete(ctx context.Context, key string)
// DeleteByPrefix removes all keys with the given prefix
DeleteByPrefix(ctx context.Context, prefix string)
// Flush removes all items from the cache
Flush(ctx context.Context)
}
// Usage example
func (s *PlanService) GetPlan(ctx context.Context, planID string) (*Plan, error) {
// Check if caching is enabled
if !s.config.Cache.Enabled {
return s.repo.GetPlan(ctx, planID)
}
// Try cache first
if cached, found := s.cache.Get(ctx, planID); found {
return cached.(*Plan), nil
}
// Cache miss - fetch from database
plan, err := s.repo.GetPlan(ctx, planID)
if err == nil {
s.cache.Set(ctx, planID, plan, time.Hour)
}
return plan, err
}
// Configuration
// cache:
// enabled: true/false # Simple boolean to enable/disable caching
Configuration Management
Configuration is loaded from YAML files with environment variable overrides for deployment flexibility:
# config.yaml
server:
port: 8080
host: "0.0.0.0"
database:
postgres:
host: ${DB_HOST:localhost}
port: ${DB_PORT:5432}
database: ${DB_NAME:flexprice}
clickhouse:
host: ${CLICKHOUSE_HOST:localhost}
port: ${CLICKHOUSE_PORT:9000}
kafka:
brokers: ${KAFKA_BROKERS:localhost:29092}
topics:
events: "events"
events_lazy: "events_lazy"
processed_events: "events_post_processing"
system_events: "system_events"
temporal:
host: ${TEMPORAL_HOST:localhost:7233}
namespace: ${TEMPORAL_NAMESPACE:default}
Configuration Structure
type Configuration struct {
Deployment DeploymentConfig
Server ServerConfig
Auth AuthConfig
Kafka KafkaConfig
ClickHouse ClickHouseConfig
Logging LoggingConfig
Postgres PostgresConfig
Sentry SentryConfig
Pyroscope PyroscopeConfig
Event EventConfig
DynamoDB DynamoDBConfig
Temporal TemporalConfig
Webhook Webhook
Secrets SecretsConfig
Billing BillingConfig
S3 S3Config
Cache CacheConfig
EventProcessing EventProcessingConfig
EventProcessingLazy EventProcessingLazyConfig
EventPostProcessing EventPostProcessingConfig
FeatureUsageTracking FeatureUsageTrackingConfig
FeatureUsageTrackingLazy FeatureUsageTrackingLazyConfig
EnvAccess EnvAccessConfig
FeatureFlag FeatureFlagConfig
Email EmailConfig
}
Application Lifecycle
The application lifecycle is managed by Uber FX with hooks for startup and shutdown:
func main() {
fx.New(
// Configuration
fx.Provide(config.Load),
fx.Provide(logger.New),
// Infrastructure
fx.Provide(database.NewPostgres),
fx.Provide(database.NewClickHouse),
fx.Provide(kafka.NewProducer),
fx.Provide(temporal.NewClient),
// Services
fx.Provide(service.NewBillingService),
fx.Provide(service.NewEventService),
// HTTP Server
fx.Provide(server.New),
fx.Invoke(server.RegisterRoutes),
// Lifecycle hooks
fx.Invoke(func(lc fx.Lifecycle, server *server.Server) {
lc.Append(fx.Hook{
OnStart: server.Start,
OnStop: server.Stop,
})
}),
).Run()
}
Notes
Key Architectural Decisions:
- Dual-Database Strategy: PostgreSQL for transactional consistency, ClickHouse for high-volume analytics
- Event-Driven Architecture: Kafka enables asynchronous processing, scalability, and fault tolerance
- Microservices-Ready: Deployment modes allow splitting components into separate services
- Strong Multi-Tenancy: Enforced at middleware, service, and repository layers
- Workflow Orchestration: Temporal provides reliable background job execution
- Open Integration: Factory pattern allows multiple payment gateway implementations
The architecture is designed for high scalability, reliability, and flexibility to support diverse pricing models while maintaining clean separation of concerns across all layers.