- Introduction
- Core Framework Architecture
- ERP Integration System
- Workflow Engine Integration
- Use Cases
- Deployment Guidelines
- Appendix: Full Code
The CPU-Optimized LLM Framework is a comprehensive suite designed to run large language models efficiently on standard CPU hardware without requiring specialized GPU infrastructure. This framework enables organizations to leverage the power of LLMs for business intelligence, process automation, and decision support using their existing computational resources.
- CPU Optimization: Specialized techniques to run LLMs efficiently on CPU hardware
- ERP Integration: Direct connectors to extract and analyze data from popular ERP systems
- Workflow Integration: Seamless integration with workflow engines like N8N and Camunda
- Memory Efficiency: Advanced techniques to minimize memory usage
- Scalable Architecture: Design patterns for horizontal and vertical scaling
- Financial Analysis: Analyzing financial data from ERP systems, generating insights and forecasts
- Intelligent Document Processing: Extracting information from business documents
- Workflow Decision Points: Using LLMs to make or assist with complex decisions in business processes
- Customer Support Automation: Analyzing and routing customer communications
- Report Generation: Creating natural language reports from structured business data
The framework consists of several core components that work together to deliver optimized LLM capabilities:
This is the central class that implements the optimized language model:
class OptimizedCPULLM:
"""
CPU-optimized implementation of a language model with multiple optimizations
for inference on CPU hardware.
"""
def __init__(self, config=None):
"""Initialize the CPU-optimized LLM model."""
# Default configuration
self.default_config = {
"vocab_size": 32000,
"hidden_size": 768,
"n_layers": 8,
"n_heads": 12,
"head_size": 64,
"intermediate_size": 2048,
"max_seq_length": 1024,
# Optimization settings
"quantize": True,
"quantize_type": "int8",
"per_channel_quantization": True,
"optimize_zero_points": True,
"sparse_attention": True,
"sparse_attention_pattern": "block_local_global",
"local_attention_window": 128,
"global_attention_stride": 32,
"use_memory_mapping": True,
"use_numba": True,
"use_onnx": False,
"use_kv_cache": True,
"use_threading": True,
"use_numa_aware": True,
"num_threads": None, # Auto-detect
"block_size": 64
}
# Update with user config if provided
if config:
self.config = {**self.default_config, **config}
else:
self.config = self.default_config
# Initialize optimization components
self._initialize_components()
def _initialize_components(self):
"""Initialize optimization components and model structure."""
# Code to initialize quantizer, sparse attention patterns, etc.
# ...
def initialize_model(self, seed=42):
"""Initialize model weights."""
# Code to initialize the model weights
# ...
def forward(self, input_ids, attention_mask=None):
"""
Forward pass through the model with optimizations.
Args:
input_ids: Token IDs [batch_size, seq_length]
attention_mask: Attention mask [batch_size, seq_length]
Returns:
Logits for next token prediction
"""
# Optimized forward pass implementation
# ...
def generate(self, prompt=None, input_ids=None, max_new_tokens=50,
temperature=1.0, top_k=50, top_p=0.9):
"""
Generate text from a prompt or input IDs.
Args:
prompt: Text prompt (will be tokenized)
input_ids: Pre-tokenized input IDs
max_new_tokens: Maximum number of tokens to generate
temperature: Temperature for sampling
top_k: Number of top tokens to consider for sampling
top_p: Probability threshold for nucleus sampling
Returns:
Dictionary with generated text and metadata
"""
# Generate text from the model
# ...
The quantization system enables efficient model representation with reduced precision:
class AdvancedQuantization:
"""Implements advanced quantization techniques for model compression."""
def __init__(self, quantize_type='int8', per_channel=True, optimize_zero_points=True):
"""
Initialize quantization parameters.
Args:
quantize_type: Type of quantization ('int8', 'int4', 'mixed')
per_channel: Whether to use per-channel quantization scales
optimize_zero_points: Whether to optimize zero points
"""
# Initialization code
# ...
def quantize_weight(self, weight, weight_name, layer_type='attention'):
"""
Quantize a weight matrix with advanced techniques.
Args:
weight: Weight tensor to quantize
weight_name: Name identifier for the weight
layer_type: Type of layer ('attention', 'ffn', 'embedding')
Returns:
Quantized weight tensor
"""
# Quantization implementation
# ...
def dequantize_weight(self, quantized_weight, weight_name):
"""
Dequantize a weight matrix.
Args:
quantized_weight: Quantized weight tensor
weight_name: Name identifier for the weight
Returns:
Dequantized weight tensor (floating point)
"""
# Dequantization implementation
# ...
The sparse attention system optimizes the attention mechanism computations:
class SparseAttentionPattern:
"""Implements efficient sparse attention patterns to reduce computation."""
def __init__(self, max_seq_length, pattern_type='block_local_global'):
"""
Initialize sparse attention pattern.
Args:
max_seq_length: Maximum sequence length
pattern_type: Type of sparse pattern to use
- 'block_local_global': Local window + strided global attention
- 'local_window': Simple local window attention
- 'strided': Strided attention pattern
- 'full': Full attention (no sparsity)
"""
# Initialization code
# ...
def get_sparse_mask(self, seq_length, local_window=128, global_stride=32):
"""
Generate a sparse attention mask.
Args:
seq_length: Current sequence length
local_window: Size of local attention window
global_stride: Stride for global attention tokens
Returns:
Binary mask of shape (seq_length, seq_length) where 1 indicates attention
"""
# Mask generation implementation
# ...
The framework uses Numba to JIT-compile critical operations:
@njit(fastmath=True, parallel=True)
def _matmul_numba_parallel(A, B, C):
"""
Optimized matrix multiplication with parallelism and blocking.
Args:
A: First matrix (M x K)
B: Second matrix (K x N)
C: Output matrix (M x N)
"""
M, K = A.shape
N = B.shape[1]
# Set block sizes based on CPU cache size (typically L1 cache)
block_size = 64 # Typical L1 cache line size optimization
# Compute in blocks for better cache utilization
for i in prange(0, M, block_size):
i_end = min(i + block_size, M)
for j in range(0, N, block_size):
j_end = min(j + block_size, N)
# Initialize block result to zero
for ii in range(i, i_end):
for jj in range(j, j_end):
C[ii, jj] = 0.0
# Compute block result
for k in range(0, K, block_size):
k_end = min(k + block_size, K)
for ii in range(i, i_end):
for kk in range(k, k_end):
a_val = A[ii, kk]
for jj in range(j, j_end):
C[ii, jj] += a_val * B[kk, jj]
Memory-efficient key-value caching for faster generation:
class OptimizedKVCache:
"""Memory-efficient key-value cache implementation using memory mapping."""
def __init__(self, max_batch_size=1, max_seq_length=2048, num_layers=12,
num_heads=12, head_dim=64, use_memory_mapping=True):
"""
Initialize KV cache.
Args:
max_batch_size: Maximum batch size
max_seq_length: Maximum sequence length
num_layers: Number of transformer layers
num_heads: Number of attention heads
head_dim: Dimension of each attention head
use_memory_mapping: Whether to use memory-mapped files for very large caches
"""
# Initialization code
# ...
def update(self, layer_idx, new_keys, new_values):
"""
Update KV cache with new keys and values.
Args:
layer_idx: Layer index
new_keys: New key states to add [batch_size, num_heads, seq_len, head_dim]
new_values: New value states to add [batch_size, num_heads, seq_len, head_dim]
"""
# Update implementation
# ...
def get(self, layer_idx, batch_size=None):
"""
Get cached keys and values for a layer.
Args:
layer_idx: Layer index
batch_size: Batch size to retrieve (defaults to all batches)
Returns:
Tuple of (keys, values) for the specified layer
"""
# Cache retrieval implementation
# ...
The framework employs several key optimization techniques:
-
SIMD Acceleration:
- Numba JIT compilation for critical paths
- Vectorized operations for mathematical functions
- Memory-aligned arrays for better CPU cache utilization
-
Enhanced Sparse Attention:
- Combined local window + strided global attention pattern
- Sparse matrix multiplications
- Attention pattern caching
-
Advanced Quantization:
- Channel-wise quantization scales
- Zero-point optimization
- Mixed-precision support
-
Memory Optimization:
- Memory-mapped key-value caching
- Block-based processing
- NUMA-aware allocations
-
Parallelism:
- Multi-threading for matrix operations
- Cache-line aware blocking
- Parallel token processing where possible
The framework can be installed via pip:
# Create and activate a virtual environment (recommended)
python -m venv cpu-llm-env
source cpu-llm-env/bin/activate # On Windows: cpu-llm-env\Scripts\activate
# Install the framework with all dependencies
pip install cpu-optimized-llm
Or manually from source:
# Clone the repository
git clone https://github.com/yourusername/cpu-optimized-llm.git
cd cpu-optimized-llm
# Create and activate a virtual environment (recommended)
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Install the package in development mode
pip install -e .
System requirements:
- Python 3.8+
- NumPy 1.20+
- Numba 0.53+
- SentencePiece 0.1.96+ (for tokenization)
- ONNX Runtime 1.8+ (optional, for model export/import)
The ERP Integration System is built on top of the core framework to connect directly with Enterprise Resource Planning systems for data extraction and analysis.
The framework includes a flexible database connector system that supports multiple database types:
class ERPDatabaseConnector:
"""Database connector for various ERP systems and databases."""
def __init__(self, config_path=None):
"""
Initialize the database connector.
Args:
config_path: Path to a config file or .env file with connection details
"""
# Load configuration if provided
if config_path and os.path.exists(config_path):
load_dotenv(config_path)
self.connections = {}
self.engines = {}
def connect_to_erp(self, connection_name, erp_type, connection_params):
"""Connect to various ERP systems."""
if erp_type == "sap":
return self.connect_to_mssql(connection_name, **connection_params)
elif erp_type == "oracle_ebs":
return self.connect_to_oracle(connection_name, **connection_params)
elif erp_type == "dynamics":
return self.connect_to_mssql(connection_name, **connection_params)
elif erp_type == "odoo":
return self.connect_to_postgresql(connection_name, **connection_params)
elif erp_type == "axelor":
return self.connect_to_postgresql(connection_name, **connection_params)
elif erp_type == "nexterp" or erp_type == "erpnext":
return self.connect_to_mysql(connection_name, **connection_params)
else:
print(f"Unsupported ERP type: {erp_type}")
return False
def connect_to_mssql(self, connection_name, server, database, username=None, password=None, use_windows_auth=False):
"""Connect to Microsoft SQL Server (common for Dynamics, SAP)."""
# Implementation
# ...
def connect_to_oracle(self, connection_name, host, port, service_name, username, password):
"""Connect to Oracle Database (common for Oracle ERP, PeopleSoft)."""
# Implementation
# ...
def connect_to_mysql(self, connection_name, host, database, username, password, port=3306):
"""Connect to MySQL (common for many open-source ERP systems)."""
# Implementation
# ...
def connect_to_postgresql(self, connection_name, host, database, username, password, port=5432):
"""Connect to PostgreSQL (used by Odoo, ERPNext)."""
# Implementation
# ...
def execute_query(self, connection_name, query, params=None):
"""Execute a raw SQL query and return results as a DataFrame."""
# Implementation
# ...
def execute_procedure(self, connection_name, procedure_name, params=None):
"""Execute a stored procedure and return results."""
# Implementation
# ...
The framework includes specialized data extractors for different ERP systems:
class ERPDataExtractor:
"""Extract and transform data from ERP systems for financial analysis."""
def __init__(self, db_connector):
"""
Initialize the ERP data extractor.
Args:
db_connector: An instance of ERPDatabaseConnector
"""
self.db_connector = db_connector
self.erp_mappings = {
'sap': {
'gl_accounts': "SELECT * FROM BSEG JOIN BKPF ON BSEG.BELNR = BKPF.BELNR WHERE BKPF.GJAHR = ?",
'accounts_receivable': "SELECT * FROM BSID WHERE GJAHR = ?",
'accounts_payable': "SELECT * FROM BSIK WHERE GJAHR = ?",
'cost_centers': "SELECT * FROM CSKS",
'profit_centers': "SELECT * FROM CEPC",
},
'oracle_ebs': {
'gl_accounts': "SELECT * FROM GL_BALANCES WHERE PERIOD_NAME LIKE ?",
'accounts_receivable': "SELECT * FROM AR_PAYMENT_SCHEDULES_ALL WHERE TRUNC(GL_DATE) BETWEEN ? AND ?",
'accounts_payable': "SELECT * FROM AP_INVOICE_DISTRIBUTIONS_ALL WHERE TRUNC(GL_DATE) BETWEEN ? AND ?",
'cost_centers': "SELECT * FROM HR_ORGANIZATION_UNITS",
},
# Mappings for other ERP systems...
}
def get_gl_data(self, connection_name, erp_type, fiscal_year=None, start_date=None, end_date=None):
"""
Extract general ledger data from ERP system.
Args:
connection_name: Connection name in the database connector
erp_type: ERP system type (sap, oracle_ebs, dynamics, netsuite)
fiscal_year: Fiscal year for the data
start_date: Start date for data extraction
end_date: End date for data extraction
Returns:
Pandas DataFrame with GL data
"""
# Implementation
# ...
def get_accounts_receivable(self, connection_name, erp_type, fiscal_year=None, start_date=None, end_date=None):
"""Extract accounts receivable data from ERP system."""
# Implementation
# ...
def create_financial_dataset(self, connection_name, erp_type, fiscal_year=None,
start_date=None, end_date=None, include_dimensions=True):
"""
Create a comprehensive financial dataset from ERP data.
Args:
connection_name: Connection name in the database connector
erp_type: ERP system type
fiscal_year: Fiscal year for the data
start_date: Start date for data extraction
end_date: End date for data extraction
include_dimensions: Whether to include dimensional data (cost centers, etc.)
Returns:
Dictionary of DataFrames with financial data
"""
# Implementation
# ...
The FinancialDataTransformer
class processes raw ERP data into standard financial statements:
class FinancialDataTransformer:
"""Transform and aggregate ERP financial data for analysis."""
def __init__(self):
"""Initialize the financial data transformer."""
pass
def create_income_statement(self, gl_data, account_mapping=None, period_column='POSTING_DATE',
amount_column='AMOUNT', account_column='ACCOUNT'):
"""
Create an income statement from general ledger data.
Args:
gl_data: DataFrame with general ledger data
account_mapping: Dictionary mapping account numbers to categories
period_column: Column name for the posting period
amount_column: Column name for the transaction amount
account_column: Column name for the account number
Returns:
DataFrame with income statement data
"""
# Implementation
# ...
def create_balance_sheet(self, gl_data, account_mapping=None, date_column='POSTING_DATE',
amount_column='AMOUNT', account_column='ACCOUNT'):
"""
Create a balance sheet from general ledger data.
Args:
gl_data: DataFrame with general ledger data
account_mapping: Dictionary mapping account numbers to categories
date_column: Column name for the posting date
amount_column: Column name for the transaction amount
account_column: Column name for the account number
Returns:
DataFrame with balance sheet data
"""
# Implementation
# ...
def create_cash_flow_statement(self, gl_data, income_statement, balance_sheet,
account_mapping=None, date_column='POSTING_DATE',
amount_column='AMOUNT', account_column='ACCOUNT'):
"""
Create a cash flow statement from financial data.
Args:
gl_data: DataFrame with general ledger data
income_statement: DataFrame with income statement data
balance_sheet: DataFrame with balance sheet data
account_mapping: Dictionary mapping account numbers to categories
date_column: Column name for the posting date
amount_column: Column name for the transaction amount
account_column: Column name for the account number
Returns:
DataFrame with cash flow statement data
"""
# Implementation
# ...
def calculate_financial_ratios(self, income_statement, balance_sheet):
"""
Calculate key financial ratios from financial statements.
Args:
income_statement: DataFrame with income statement data
balance_sheet: DataFrame with balance sheet data
Returns:
DataFrame with financial ratios
"""
# Implementation
# ...
The FinancialPredictor
class provides forecasting capabilities:
class FinancialPredictor:
"""Predict financial metrics based on historical data."""
def __init__(self):
"""Initialize the financial predictor."""
self.models = {}
self.scalers = {}
self.prophet_models = {}
self.arima_models = {}
self.prediction_results = {}
def prepare_time_series_data(self, data, target_column, feature_columns=None,
lags=3, rolling_means=None, trends=False, seasonality=False):
"""
Prepare time series data for prediction.
Args:
data: DataFrame with time series data
target_column: Column to predict
feature_columns: Additional feature columns to include
lags: Number of lag periods to include
rolling_means: List of rolling mean windows to include
trends: Whether to include trend features
seasonality: Whether to include seasonality features
Returns:
X: Feature DataFrame
y: Target Series
"""
# Implementation
# ...
def train_prediction_model(self, X, y, target_name, model_type='random_forest',
test_size=0.2, cv=5, tune_hyperparams=True):
"""
Train a prediction model for financial data.
Args:
X: Feature DataFrame
y: Target Series
target_name: Name of the target variable
model_type: Type of model to train
test_size: Proportion of data to use for testing
cv: Number of cross-validation folds
tune_hyperparams: Whether to tune hyperparameters
Returns:
Dictionary with model evaluation results
"""
# Implementation
# ...
def train_time_series_model(self, data, target_column, model_type='prophet',
forecast_periods=12, seasonality_mode='multiplicative'):
"""
Train a time series model for financial forecasting.
Args:
data: DataFrame with time series data
target_column: Column to predict
model_type: Type of time series model
forecast_periods: Number of periods to forecast
seasonality_mode: Seasonality mode for Prophet
Returns:
Forecast DataFrame
"""
# Implementation
# ...
def predict_financial_metrics(self, financial_data, target_metrics, periods=12,
model_type='prophet', feature_engineering=True):
"""
Predict multiple financial metrics.
Args:
financial_data: Dictionary of financial DataFrames
target_metrics: List of tuples (metric_name, dataframe_key, column_name)
periods: Number of periods to forecast
model_type: Type of model to use
feature_engineering: Whether to use feature engineering
Returns:
Dictionary of forecast DataFrames
"""
# Implementation
# ...
Finally, the BIAnalysisEngine
class brings everything together:
class BIAnalysisEngine:
"""
Business Intelligence analysis engine powered by CPU-Optimized LLM.
"""
def __init__(self, model_config=None):
"""
Initialize the BI analysis engine.
Args:
model_config: Configuration for the LLM
"""
# Initialize database connector
self.db_connector = ERPDatabaseConnector()
# Initialize data extractors for different ERP systems
self.data_extractors = {
"sap": ERPDataExtractor(self.db_connector),
"oracle_ebs": ERPDataExtractor(self.db_connector),
"dynamics": ERPDataExtractor(self.db_connector),
"odoo": OdooDataExtractor(self.db_connector),
"axelor": AxelorDataExtractor(self.db_connector),
"nexterp": NextERPDataExtractor(self.db_connector),
"erpnext": NextERPDataExtractor(self.db_connector) # Alias for NextERP
}
# Initialize data transformer and predictor
self.data_transformer = FinancialDataTransformer()
self.financial_predictor = FinancialPredictor()
# Initialize LLM with optimized config
if model_config is None:
model_config = {
"hidden_size": 768,
"n_layers": 8,
"n_heads": 12,
"quantize": True,
"sparse_attention": True,
"use_numba": True,
"use_kv_cache": True,
"max_seq_length": 2048
}
self.model = OptimizedCPULLM(config=model_config)
self.model.initialize_model()
# Storage for financial data and analyses
self.financial_data = {}
self.financial_statements = {}
self.financial_ratios = {}
self.financial_forecasts = {}
def connect_to_erp(self, connection_name, erp_type, connection_params):
"""
Connect to an ERP system.
Args:
connection_name: Name for this connection
erp_type: Type of ERP system (e.g., 'sap', 'oracle_ebs')
connection_params: Dictionary with connection parameters
Returns:
True if connection successful, False otherwise
"""
return self.db_connector.connect_to_erp(
connection_name=connection_name,
erp_type=erp_type,
connection_params=connection_params
)
def extract_financial_data(self, connection_name, erp_type, fiscal_year=None,
start_date=None, end_date=None, company=None):
"""
Extract financial data from the specified ERP system.
Args:
connection_name: Connection name in the database connector
erp_type: Type of ERP system
fiscal_year: Fiscal year for data extraction
start_date: Start date for data extraction
end_date: End date for data extraction
company: Company code/ID (required for some ERP systems)
Returns:
Dictionary of financial data
"""
# Implementation
# ...
def create_financial_statements(self, gl_data=None):
"""
Create financial statements from general ledger data.
Args:
gl_data: General ledger data (if None, use stored data)
Returns:
Dictionary with financial statements
"""
# Implementation
# ...
def predict_financial_metrics(self, periods=12, model_type='prophet'):
"""
Generate financial forecasts.
Args:
periods: Number of periods to forecast
model_type: Type of model to use
Returns:
Dictionary of forecast DataFrames
"""
# Implementation
# ...
def analyze_financial_health(self):
"""
Analyze the financial health of the company.
Returns:
LLM-generated analysis of financial health
"""
# Implementation
# ...
def answer_financial_question(self, question):
"""
Answer a specific financial question using the LLM.
Args:
question: Financial question to answer
Returns:
LLM-generated answer to the question
"""
# Implementation
# ...
def generate_financial_report(self, report_type='comprehensive', period=None):
"""
Generate a financial report.
Args:
report_type: Type of report to generate
period: Specific period for the report
Returns:
LLM-generated financial report
"""
# Implementation
# ...
The framework supports the following ERP systems with specialized adaptations:
-
SAP
- Connection: MSSQL/HANA
- Tables: BSEG, BKPF, BSID, BSIK, etc.
- Account Structure: Specialized SAP chart of accounts
-
Oracle EBS
- Connection: Oracle DB
- Tables: GL_BALANCES, AR_PAYMENT_SCHEDULES_ALL, etc.
- Account Structure: Oracle EBS chart of accounts
-
Microsoft Dynamics 365 Finance
- Connection: SQL Server
- Tables: GeneralJournalAccountEntry, CustTrans, etc.
- Account Structure: Dynamics chart of accounts
-
Odoo
- Connection: PostgreSQL
- Tables: account_move_line, account_move, account_account, etc.
- Account Structure: Odoo chart of accounts
-
Axelor
- Connection: PostgreSQL
- Tables: account_move_line, account_move, account_account, etc.
- Account Structure: Axelor chart of accounts
-
NextERP (ERPNext)
- Connection: MariaDB/MySQL
- Tables: tabGL Entry, tabAccount, tabJournal Entry, etc.
- Account Structure: ERPNext chart of accounts
The framework can be integrated with popular workflow engines to add LLM capabilities to automated business processes.
N8N is a flexible workflow automation tool with a node-based approach.
import {
IExecuteFunctions,
INodeExecutionData,
INodeType,
INodeTypeDescription,
NodeOperationError,
} from 'n8n-workflow';
import axios from 'axios';
export class LlmNode implements INodeType {
description: INodeTypeDescription = {
displayName: 'CPU-Optimized LLM',
name: 'llmNode',
icon: 'file:llm.svg',
group: ['transform'],
version: 1,
description: 'Interact with CPU-Optimized LLM service',
defaults: {
name: 'LLM',
},
inputs: ['main'],
outputs: ['main'],
properties: [
{
displayName: 'Operation',
name: 'operation',
type: 'options',
options: [
{
name: 'Generate Text',
value: 'generate',
description: 'Generate text from a prompt',
},
{
name: 'Analyze Data',
value: 'analyze',
description: 'Analyze data and provide insights',
},
{
name: 'Financial Analysis',
value: 'financial',
description: 'Perform financial analysis',
},
],
default: 'generate',
noDataExpression: true,
required: true,
description: 'The operation to perform',
},
// Additional properties...
],
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const returnData: INodeExecutionData[] = [];
for (let i = 0; i < items.length; i++) {
try {
const operation = this.getNodeParameter('operation', i) as string;
const serviceUrl = this.getNodeParameter('serviceUrl', i) as string;
// Process based on operation type
// ...
// Return the results
returnData.push(/* ... */);
} catch (error) {
if (this.continueOnFail()) {
returnData.push({ json: { error: error.message } });
continue;
}
throw new NodeOperationError(this.getNode(), error);
}
}
return [returnData];
}
}
- Financial Analysis Workflow
// N8N Workflow Definition (JSON)
{
"nodes": [
{
"parameters": {
"httpMethod": "POST",
"path": "analyze-financial-data",
"options": {}
},
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 1,
"position": [250, 300]
},
{
"parameters": {
"operation": "executeQuery",
"query": "SELECT * FROM gl_entries WHERE posting_date BETWEEN $1 AND $2",
"additionalFields": {
"queryParams": "={{ $json.startDate }},={{ $json.endDate }}"
}
},
"name": "PostgreSQL",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1,
"position": [450, 300]
},
// Additional nodes for data transformation, LLM analysis, etc.
// ...
],
"connections": {
// Connection definitions
// ...
}
}
- Document Processing Workflow
{
"nodes": [
{
"parameters": {
"triggerTimes": {
"item": [
{
"mode": "everyDay"
}
]
}
},
"name": "Schedule Trigger",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1,
"position": [250, 300]
},
{
"parameters": {
"path": "/shared/documents/incoming",
"options": {
"recursive": true,
"fileExtensions": [
"pdf",
"docx",
"txt"
]
}
},
"name": "Read Files",
"type": "n8n-nodes-base.readPDF",
"typeVersion": 1,
"position": [450, 300]
},
// Additional nodes for document analysis, ERP updates, etc.
// ...
],
"connections": {
// Connection definitions
// ...
}
}
Camunda is a business process management (BPM) platform that supports BPMN 2.0.
package com.example.llmintegration;
import org.camunda.bpm.client.ExternalTaskClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
import jakarta.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
@SpringBootApplication
public class LlmIntegrationApplication {
@Value("${camunda.bpm.client.base-url}")
private String camundaBaseUrl;
@Value("${llm.service.url}")
private String llmServiceUrl;
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
public static void main(String[] args) {
SpringApplication.run(LlmIntegrationApplication.class, args);
}
@PostConstruct
private void initializeExternalTaskWorkers() {
ExternalTaskClient client = ExternalTaskClient.create()
.baseUrl(camundaBaseUrl)
.asyncResponseTimeout(10000)
.build();
RestTemplate restTemplate = restTemplate();
// Register worker for text generation
client.subscribe("llm-generate-text")
.lockDuration(10000)
.handler((externalTask, externalTaskService) -> {
// Task implementation
// ...
})
.open();
// Register worker for financial analysis
client.subscribe("llm-financial-analysis")
.lockDuration(10000)
.handler((externalTask, externalTaskService) -> {
// Task implementation
// ...
})
.open();
// Register worker for data analysis
client.subscribe("llm-data-analysis")
.lockDuration(10000)
.handler((externalTask, externalTaskService) -> {
// Task implementation
// ...
})
.open();
}
}
- Financial Review Process
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:dc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:camunda="http://camunda.org/schema/1.0/bpmn"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
id="Definitions_1"
targetNamespace="http://bpmn.io/schema/bpmn">
<bpmn:process id="financial-review-process" name="Financial Review Process" isExecutable="true">
<!-- Process definition with tasks, gateways, and flows -->
<!-- ... -->
</bpmn:process>
</bpmn:definitions>
- Automated Invoice Processing
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:dc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:camunda="http://camunda.org/schema/1.0/bpmn"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
id="Definitions_2"
targetNamespace="http://bpmn.io/schema/bpmn">
<bpmn:process id="invoice-processing" name="Automated Invoice Processing" isExecutable="true">
<!-- Process definition with tasks, gateways, and flows -->
<!-- ... -->
</bpmn:process>
</bpmn:definitions>
- Customer Onboarding with Risk Assessment
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:dc="http://www.omg.org/spec/DD/20100524/DC"
xmlns:camunda="http://camunda.org/schema/1.0/bpmn"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
id="Definitions_3"
targetNamespace="http://bpmn.io/schema/bpmn">
<bpmn:process id="customer-onboarding" name="Customer Onboarding with Risk Assessment" isExecutable="true">
<!-- Process definition with tasks, gateways, and flows -->
<!-- ... -->
</bpmn:process>
</bpmn:definitions>
The framework can be used for comprehensive financial analysis of ERP data.
from cpu_optimized_llm import OptimizedCPULLM
from erp_integration import BIAnalysisEngine
# Initialize the BI engine
bi_engine = BIAnalysisEngine()
# Connect to the ERP system
bi_engine.connect_to_erp(
connection_name="sap_prod",
erp_type="sap",
connection_params={
"server": "sap-erp-server.company.com",
"database": "SAPDB",
"username": "finance_analyst",
"password": "your_password",
"use_windows_auth": False
}
)
# Extract financial data for the current fiscal year
financial_data = bi_engine.extract_financial_data(
connection_name="sap_prod",
erp_type="sap",
fiscal_year="2025"
)
# Generate financial statements
statements = bi_engine.create_financial_statements()
# Analyze financial health
analysis = bi_engine.analyze_financial_health()
print("Financial Health Analysis:")
print(analysis)
# Generate a comprehensive report for the latest period
if 'income_statement' in bi_engine.financial_statements:
latest_period = bi_engine.financial_statements['income_statement'].index[-1]
report = bi_engine.generate_financial_report(
report_type="comprehensive",
period=latest_period
)
print(f"\nComprehensive Financial Report for {latest_period}:")
print(report)
# Forecast financial metrics for the next 6 months
forecasts = bi_engine.predict_financial_metrics(
periods=6,
model_type="prophet"
)
# Print forecast summary
print("\nFinancial Forecasts for Next 6 Months:")
for metric_name, forecast in forecasts.items():
if isinstance(forecast, pd.DataFrame):
print(f"\n{metric_name.replace('_', ' ').title()}:")
if 'yhat' in forecast.columns: # Prophet
future_values = forecast[forecast['ds'] > forecast['ds'].max() - pd.DateOffset(months=1)]['yhat'].iloc[:6]
for i, value in enumerate(future_values):
print(f" Month {i+1}: ${value:,.2f}")
# Initialize BI engine
bi_engine = BIAnalysisEngine()
# Connect to ERP
bi_engine.connect_to_erp(
connection_name="oracle_finance",
erp_type="oracle_ebs",
connection_params={
"host": "oracle-ebs.example.com",
"port": 1521,
"service_name": "FINPROD",
"username": "financial_analyst",
"password": "secure_password"
}
)
# Extract data for the past 2 years
from datetime import datetime, timedelta
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=730)).strftime("%Y-%m-%d")
financial_data = bi_engine.extract_financial_data(
connection_name="oracle_finance",
erp_type="oracle_ebs",
start_date=start_date,
end_date=end_date
)
# Generate financial statements
statements = bi_engine.create_financial_statements()
# Focus on cash flow analysis
if 'cash_flow' in bi_engine.financial_statements:
cash_flow = bi_engine.financial_statements['cash_flow']
print("Cash Flow Analysis:")
print(cash_flow.tail(12)) # Last 12 months
# Calculate cash flow metrics
operating_cf = cash_flow['operating_cash_flow'].mean()
investing_cf = cash_flow['investing_cash_flow'].mean()
financing_cf = cash_flow['financing_cash_flow'].mean()
print(f"\nAverage Monthly Cash Flows:")
print(f"Operating: ${operating_cf:,.2f}")
print(f"Investing: ${investing_cf:,.2f}")
print(f"Financing: ${financing_cf:,.2f}")
# Forecast cash flow for next 12 months
forecasts = bi_engine.predict_financial_metrics(
periods=12,
model_type="arima" # ARIMA works well for cash flow forecasting
)
# Ask specific cash flow questions
questions = [
"What are the main drivers of our operating cash flow?",
"How has our free cash flow trended over the past year?",
"What is our projected cash position for the next quarter?",
"Do we have sufficient cash flow to support planned capital expenditures?"
]
for question in questions:
answer = bi_engine.answer_financial_question(question)
print(f"\nQ: {question}")
print(f"A: {answer}")
# Initialize BI engine
bi_engine = BIAnalysisEngine()
# Connect to ERP
bi_engine.connect_to_erp(
connection_name="dynamics_erp",
erp_type="dynamics",
connection_params={
"server": "dynamics.corp.local",
"database": "DynamicsERP",
"username": "bi_analyst",
"password": "analysis_pwd"
}
)
# Extract financial data for current fiscal year
financial_data = bi_engine.extract_financial_data(
connection_name="dynamics_erp",
erp_type="dynamics",
fiscal_year="2025"
)
# Create a custom query to get segment data
segment_data = bi_engine.db_connector.execute_query(
connection_name="dynamics_erp",
query="""
SELECT
d.DimensionValue AS BusinessSegment,
SUM(gl.Amount) AS Revenue,
SUM(CASE WHEN gl.AccountNo LIKE '5%' THEN gl.Amount ELSE 0 END) AS COGS,
SUM(CASE WHEN gl.AccountNo LIKE '6%' THEN gl.Amount ELSE 0 END) AS OperatingExpenses
FROM
GeneralLedgerEntry gl
JOIN
DimensionValueCombination dvc ON gl.DimensionSetID = dvc.DimensionSetID
JOIN
DimensionValue d ON dvc.DimensionValue = d.DimensionValue
WHERE
d.DimensionCode = 'SEGMENT'
AND gl.PostingDate BETWEEN @start_date AND @end_date
GROUP BY
d.DimensionValue
ORDER BY
Revenue DESC
""",
params={
"start_date": "2025-01-01",
"end_date": "2025-12-31"
}
)
# Calculate segment profitability
if segment_data is not None and not segment_data.empty:
segment_data['GrossProfit'] = segment_data['Revenue'] - segment_data['COGS']
segment_data['OperatingProfit'] = segment_data['GrossProfit'] - segment_data['OperatingExpenses']
segment_data['GrossMargin'] = segment_data['GrossProfit'] / segment_data['Revenue']
segment_data['OperatingMargin'] = segment_data['OperatingProfit'] / segment_data['Revenue']
print("Business Segment Profitability Analysis:")
print(segment_data)
# Ask the LLM to analyze segment performance
segment_analysis_prompt = f"""
Analyze the profitability of our business segments based on this data:
{segment_data.to_string()}
Provide insights on:
1. Which segments are most and least profitable
2. Key factors affecting segment performance
3. Recommendations for improving overall profitability
"""
analysis = bi_engine.model.generate(
prompt=segment_analysis_prompt,
max_new_tokens=500,
temperature=0.3
)
print("\nSegment Performance Analysis:")
print(analysis['generated_text'])
The framework can be used for intelligent document processing in business workflows.
from cpu_optimized_llm import OptimizedCPULLM
import pytesseract
from PIL import Image
import pandas as pd
# Initialize the LLM
model = OptimizedCPULLM(config={
"hidden_size": 768,
"n_layers": 8,
"n_heads": 12,
"quantize": True,
"sparse_attention": True,
"use_numba": True,
})
model.initialize_model()
def extract_text_from_invoice(image_path):
"""Extract text from invoice image using OCR."""
image = Image.open(image_path)
text = pytesseract.image_to_string(image)
return text
def analyze_invoice(text):
"""Use LLM to analyze invoice text and extract structured data."""
prompt = f"""
Analyze this invoice text and extract the following information in a structured format:
- Vendor name
- Invoice number
- Invoice date
- Due date
- Total amount
- Line items (with description, quantity, unit price, and amount)
- Tax amount
- Payment terms
Invoice text:
{text}
Extract the data in this format:
Vendor: [vendor name]
Invoice #: [invoice number]
Date: [invoice date]
Due Date: [due date]
Total: [total amount]
Line Items:
1. [description] | [quantity] | [unit price] | [amount]
2. [description] | [quantity] | [unit price] | [amount]
...
Tax: [tax amount]
Terms: [payment terms]
"""
result = model.generate(
prompt=prompt,
max_new_tokens=500,
temperature=0.1 # Low temperature for factual extraction
)
return result['generated_text']
def parse_extracted_data(extracted_text):
"""Parse the extracted text into a structured dictionary."""
lines = extracted_text.strip().split('\n')
data = {}
# Parse header information
for line in lines:
if line.startswith('Vendor:'):
data['vendor'] = line.replace('Vendor:', '').strip()
elif line.startswith('Invoice #:'):
data['invoice_number'] = line.replace('Invoice #:', '').strip()
elif line.startswith('Date:'):
data['invoice_date'] = line.replace('Date:', '').strip()
elif line.startswith('Due Date:'):
data['due_date'] = line.replace('Due Date:', '').strip()
elif line.startswith('Total:'):
data['total_amount'] = line.replace('Total:', '').strip()
elif line.startswith('Tax:'):
data['tax'] = line.replace('Tax:', '').strip()
elif line.startswith('Terms:'):
data['terms'] = line.replace('Terms:', '').strip()
# Parse line items
line_items = []
in_line_items = False
for line in lines:
if line.strip() == 'Line Items:':
in_line_items = True
continue
if in_line_items and line.strip() and not line.startswith('Tax:'):
# Try to parse the line item
parts = line.split('|')
if len(parts) >= 4:
item = {
'description': parts[0].strip(),
'quantity': parts[1].strip(),
'unit_price': parts[2].strip(),
'amount': parts[3].strip()
}
line_items.append(item)
data['line_items'] = line_items
return data
# Example usage
invoice_path = "path/to/invoice.png"
invoice_text = extract_text_from_invoice(invoice_path)
extracted_data = analyze_invoice(invoice_text)
structured_data = parse_extracted_data(extracted_data)
# Convert to DataFrame for easier processing
line_items_df = pd.DataFrame(structured_data['line_items'])
# Print extracted information
print(f"Vendor: {structured_data['vendor']}")
print(f"Invoice #: {structured_data['invoice_number']}")
print(f"Date: {structured_data['invoice_date']}")
print(f"Total Amount: {structured_data['total_amount']}")
print("\nLine Items:")
print(line_items_df)
# Validation checks
if 'total_amount' in structured_data and 'line_items' in structured_data:
# Calculate the sum of line items
calculated_total = sum(float(item['amount'].replace('$', '').replace(',', ''))
for item in structured_data['line_items'])
reported_total = float(structured_data['total_amount'].replace('$', '').replace(',', ''))
# Check if totals match
if abs(calculated_total - reported_total) > 0.01:
print(f"\nWarning: Calculated total ({calculated_total:.2f}) does not match reported total ({reported_total:.2f})")
else:
print("\nValidation: Line item amounts sum correctly to the total amount")
The framework can be used to automate customer support processes.
from cpu_optimized_llm import OptimizedCPULLM
import imaplib
import email
import json
import re
import requests
# Initialize the LLM
model = OptimizedCPULLM(config={
"hidden_size": 768,
"n_layers": 8,
"n_heads": 12,
"quantize": True,
"sparse_attention": True,
})
model.initialize_model()
class EmailProcessor:
def __init__(self, model, imap_server, username, password):
self.model = model
self.imap_server = imap_server
self.username = username
self.password = password
def connect_to_inbox(self):
"""Connect to email inbox using IMAP."""
mail = imaplib.IMAP4_SSL(self.imap_server)
mail.login(self.username, self.password)
mail.select('inbox')
return mail
def fetch_unprocessed_emails(self, mail, max_emails=10):
"""Fetch unprocessed emails from inbox."""
status, messages = mail.search(None, '(UNSEEN)')
email_ids = messages[0].split()
if not email_ids:
print("No unread emails found")
return []
# Limit the number of emails processed
email_ids = email_ids[:max_emails]
emails = []
for e_id in email_ids:
status, data = mail.fetch(e_id, '(RFC822)')
raw_email = data[0][1]
# Parse email
msg = email.message_from_bytes(raw_email)
# Extract content
subject = msg.get('subject', '')
from_address = msg.get('from', '')
date = msg.get('date', '')
# Get body content
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
body = part.get_payload(decode=True).decode()
break
else:
body = msg.get_payload(decode=True).decode()
# Clean body text
body = re.sub(r'\r\n', '\n', body)
body = re.sub(r'\n\n+', '\n\n', body)
emails.append({
'id': e_id,
'subject': subject,
'from': from_address,
'date': date,
'body': body
})
return emails
def analyze_email(self, email_data):
"""Analyze email content with the LLM."""
prompt = f"""
Analyze this customer email and extract the following information:
Subject: {email_data['subject']}
From: {email_data['from']}
Email Body:
{email_data['body']}
Please provide the following information:
1. Customer sentiment (positive, neutral, negative)
2. Category of the issue (billing, technical, account, product, other)
3. Urgency level (low, medium, high)
4. Key points from the email
5. Required action
Format your response as JSON with the following fields:
{{
"sentiment": "positive/neutral/negative",
"category": "billing/technical/account/product/other",
"urgency": "low/medium/high",
"key_points": ["point 1", "point 2", ...],
"required_action": "description of what needs to be done",
"department": "department that should handle this"
}}
"""
result = model.generate(
prompt=prompt,
max_new_tokens=500,
temperature=0.3
)
# Extract JSON from the response
response_text = result['generated_text']
# Find JSON in the response
json_match = re.search(r'({.*})', response_text, re.DOTALL)
if json_match:
try:
analysis = json.loads(json_match.group(1))
return analysis
except json.JSONDecodeError:
print("Error parsing JSON response")
return None
else:
print("No JSON found in the response")
return None
def route_email(self, email_data, analysis):
"""Route the email to the appropriate department based on analysis."""
if not analysis:
return "default_team", "Failed to analyze email"
# Determine the priority
priority = 'P3' # Default
if analysis['urgency'] == 'high':
priority = 'P1'
elif analysis['urgency'] == 'medium':
priority = 'P2'
# Determine the team
department = analysis.get('department', 'customer_support')
# Create ticket in help desk system (example implementation)
ticket_data = {
'subject': email_data['subject'],
'description': email_data['body'],
'from_email': email_data['from'],
'priority': priority,
'department': department,
'analysis': analysis
}
# In a real implementation, you would call your help desk API
# response = requests.post('https://helpdesk.example.com/api/tickets', json=ticket_data)
# ticket_id = response.json()['ticket_id']
# For demonstration purposes, just return the routing info
return department, priority
def generate_auto_response(self, email_data, analysis):
"""Generate an automatic response based on the email analysis."""
if not analysis:
return None
# Extract first name from email address
from_name = "Customer"
name_match = re.search(r'^(.*?)\s*<', email_data['from'])
if name_match:
from_name = name_match.group(1).strip()
prompt = f"""
Write a professional and helpful response to the customer email below.
Customer: {from_name}
Subject: {email_data['subject']}
Email: {email_data['body']}
Based on our analysis:
- The customer's sentiment is {analysis['sentiment']}
- The issue category is {analysis['category']}
- The urgency level is {analysis['urgency']}
- Key points: {', '.join(analysis['key_points'])}
Write a personalized response that:
1. Acknowledges their email and shows empathy
2. Confirms we understand their issue
3. Provides next steps or a timeline for resolution
4. Ends with a professional closing
If the sentiment is negative, be extra empathetic. If the urgency is high, emphasize quick handling.
"""
result = model.generate(
prompt=prompt,
max_new_tokens=500,
temperature=0.7
)
return result['generated_text']
def process_emails(self, max_emails=10):
"""Process unread emails from inbox."""
mail = self.connect_to_inbox()
emails = self.fetch_unprocessed_emails(mail, max_emails)
results = []
for email_data in emails:
print(f"Processing email: {email_data['subject']}")
# Analyze the email
analysis = self.analyze_email(email_data)
if analysis:
# Route to appropriate department
department, priority = self.route_email(email_data, analysis)
# Generate auto-response
response = self.generate_auto_response(email_data, analysis)
results.append({
'email': email_data,
'analysis': analysis,
'routing': {
'department': department,
'priority': priority
},
'auto_response': response
})
print(f" - Routed to: {department} with priority {priority}")
else:
print(" - Failed to analyze email")
return results
# Example usage
processor = EmailProcessor(
model=model,
imap_server='imap.example.com',
username='[email protected]',
password='your_password'
)
results = processor.process_emails(max_emails=5)
# Print summary of processing results
print("\nEmail Processing Summary:")
for i, result in enumerate(results):
print(f"Email {i+1}: {result['email']['subject']}")
print(f" Sentiment: {result['analysis']['sentiment']}")
print(f" Category: {result['analysis']['category']}")
print(f" Urgency: {result['analysis']['urgency']}")
print(f" Routed to: {result['routing']['department']} ({result['routing']['priority']})")
print(f" Auto-response: {result['auto_response'][:50]}...")
print()
The framework can be deployed using Docker for easy orchestration and scaling.
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
cmake \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Download model
RUN python -c "from cpu_optimized_llm import OptimizedCPULLM; model = OptimizedCPULLM(); model.initialize_model(); model.save_model('/app/model')"
# Expose the API port
EXPOSE 5000
# Set environment variables
ENV MODEL_PATH=/app/model
ENV NUM_THREADS=4
ENV MAX_SEQUENCE_LENGTH=1024
# Run the API with Gunicorn
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "--timeout", "120", "app:app"]
version: '3'
services:
llm-service:
build: .
ports:
- "5000:5000"
environment:
- PORT=5000
- NUM_THREADS=4
volumes:
- ./models:/app/models
restart: unless-stopped
deploy:
resources:
limits:
cpus: '4'
memory: 8G
The framework supports several scaling strategies for handling increased load:
Increase resources on a single server:
version: '3'
services:
llm-service:
build: .
ports:
- "5000:5000"
environment:
- PORT=5000
- NUM_THREADS=16 # Increased from 4
volumes:
- ./models:/app/models
restart: unless-stopped
deploy:
resources:
limits:
cpus: '16' # Increased from 4
memory: 32G # Increased from 8G
Deploy multiple instances with a load balancer:
version: '3'
services:
llm-service-1:
build: .
environment:
- PORT=5000
- NUM_THREADS=4
volumes:
- ./models:/app/models
restart: unless-stopped
deploy:
resources:
limits:
cpus: '4'
memory: 8G
llm-service-2:
build: .
environment:
- PORT=5000
- NUM_THREADS=4
volumes:
- ./models:/app/models
restart: unless-stopped
deploy:
resources:
limits:
cpus: '4'
memory: 8G
llm-service-3:
build: .
environment:
- PORT=5000
- NUM_THREADS=4
volumes:
- ./models:/app/models
restart: unless-stopped
deploy:
resources:
limits:
cpus: '4'
memory: 8G
nginx-lb:
image: nginx:latest
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- llm-service-1
- llm-service-2
- llm-service-3
With an NGINX configuration like:
events {
worker_connections 1024;
}
http {
upstream llm_backend {
server llm-service-1:5000;
server llm-service-2:5000;
server llm-service-3:5000;
}
server {
listen 80;
location / {
proxy_pass http://llm_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 300s; # Extended timeout for long-running requests
}
}
}
Implement request batching to increase throughput:
class RequestBatcher:
"""Batch similar LLM requests to improve throughput."""
def __init__(self, max_batch_size=10, max_wait_time=5):
"""
Initialize request batcher.
Args:
max_batch_size: Maximum number of requests in a batch
max_wait_time: Maximum time to wait for batch to fill (seconds)
"""
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.current_batch = []
self.batch_lock = threading.Lock()
self.batch_event = threading.Event()
self.processing = False
# Start processing thread
self.processing_thread = threading.Thread(target=self._process_batches, daemon=True)
self.processing_thread.start()
def add_request(self, prompt, params, callback):
"""
Add a request to the batch.
Args:
prompt: The text prompt
params: Generation parameters
callback: Function to call with results
Returns:
Boolean indicating if the request was added
"""
# Implementation
# ...
def _process_batches(self):
"""Process batches in the background."""
# Implementation
# ...
Implement asynchronous processing to avoid blocking:
@app.route('/api/llm/generate-async', methods=['POST'])
def generate_text_async():
"""Generate text asynchronously with callback or polling."""
data = request.json
if not data or 'prompt' not in data:
return jsonify({'error': 'No prompt provided'}), 400
# Generate a unique ID for this request
request_id = str(uuid.uuid4())
# Store request status
request_status[request_id] = {
'status': 'pending',
'started_at': time.time(),
'result': None,
'error': None
}
# Get parameters
prompt = data['prompt']
params = {
'max_tokens': data.get('max_tokens', 500),
'temperature': data.get('temperature', 0.7),
'top_p': data.get('top_p', 0.9),
'top_k': data.get('top_k', 50)
}
# Optional callback URL
callback_url = data.get('callback_url')
# Submit task to worker pool
threading.Thread(
target=process_async_request,
args=(request_id, prompt, params, callback_url),
daemon=True
).start()
return jsonify({
'request_id': request_id,
'status': 'pending',
'status_url': f'/api/llm/status/{request_id}'
})