Skip to content

一个完整的生产级ReactivePipeline实现示例,包含5个下游接口的依赖编排。我们通过电商订单场景演示(用户信息→商品信息→库存→优惠券→运费计算)

License

Notifications You must be signed in to change notification settings

kindbgen/reactive-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

以下是一个完整的生产级ReactivePipeline实现示例,包含5个下游接口的依赖编排。我们通过电商订单场景演示(用户信息→商品信息→库存→优惠券→运费计算):

1. 定义DataLoader接口(核心抽象)

public interface DataLoader<T> {
    String getName(); // 唯一标识
    String[] getDependencies(); // 依赖的DataLoader名称
    Mono<T> load(Map<String, Object> context); // 业务加载方法
}

2. 具体业务实现(5个接口)

// 用户信息服务
class UserLoader implements DataLoader<User> {
    public String getName() { return "user"; }
    public String[] getDependencies() { return new String[0]; } // 无依赖
    
    public Mono<User> load(Map<String, Object> ctx) {
        return Mono.fromCallable(() -> {
            System.out.println("Calling user service...");
            return new User((String)ctx.get("userId"));
        }).delayElement(Duration.ofMillis(100));
    }
}

// 商品信息服务(依赖用户)
class ProductLoader implements DataLoader<Product> {
    public String getName() { return "product"; }
    public String[] getDependencies() { return new String[]{"user"}; }
    
    public Mono<Product> load(Map<String, Object> ctx) {
        return Mono.just((User)ctx.get("user"))
            .flatMap(user -> Mono.fromCallable(() -> {
                System.out.println("Calling product service...");
                return new Product(user.getFavoriteProductId());
            })).delayElement(Duration.ofMillis(200));
    }
}

// 库存服务(依赖商品)
class InventoryLoader implements DataLoader<Inventory> {
    public String getName() { return "inventory"; }
    public String[] getDependencies() { return new String[]{"product"}; }
    
    public Mono<Inventory> load(Map<String, Object> ctx) {
        return Mono.just((Product)ctx.get("product"))
            .flatMap(product -> Mono.fromCallable(() -> {
                System.out.println("Calling inventory service...");
                return new Inventory(product.getId());
            })).delayElement(Duration.ofMillis(150));
    }
}

// 优惠券服务(依赖用户、商品)
class CouponLoader implements DataLoader<Coupon> {
    public String getName() { return "coupon"; }
    public String[] getDependencies() { return new String[]{"user", "product"}; }
    
    public Mono<Coupon> load(Map<String, Object> ctx) {
        return Mono.zip(
            Mono.just((User)ctx.get("user")),
            Mono.just((Product)ctx.get("product"))
        ).flatMap(tuple -> Mono.fromCallable(() -> {
            System.out.println("Calling coupon service...");
            return new Coupon(tuple.getT1().getId(), tuple.getT2().getId());
        })).delayElement(Duration.ofMillis(120));
    }
}

// 运费服务(依赖商品、库存)
class ShippingLoader implements DataLoader<ShippingFee> {
    public String getName() { return "shipping"; }
    public String[] getDependencies() { return new String[]{"product", "inventory"}; }
    
    public Mono<ShippingFee> load(Map<String, Object> ctx) {
        return Mono.zip(
            Mono.just((Product)ctx.get("product")),
            Mono.just((Inventory)ctx.get("inventory"))
        ).flatMap(tuple -> Mono.fromCallable(() -> {
            System.out.println("Calling shipping service...");
            return new ShippingFee(tuple.getT1().getWeight(), tuple.getT2().getWarehouseId());
        })).delayElement(Duration.ofMillis(180));
    }
}

3. DAG调度器实现(Kahn算法拓扑排序)

class DagScheduler {
    public static List<String> schedule(Map<String, DataLoader<?>> loaders) {
        // 构建入度表
        Map<String, Integer> inDegree = new HashMap<>();
        // 构建邻接表
        Map<String, List<String>> adjacency = new HashMap<>();
        
        // 初始化数据结构
        loaders.keySet().forEach(name -> {
            inDegree.put(name, 0);
            adjacency.put(name, new ArrayList<>());
        });
        
        // 填充依赖关系
        loaders.values().forEach(loader -> {
            for (String dep : loader.getDependencies()) {
                adjacency.get(dep).add(loader.getName());
                inDegree.put(loader.getName(), inDegree.get(loader.getName()) + 1);
            }
        });
        
        // Kahn算法核心逻辑
        Queue<String> queue = new LinkedList<>();
        inDegree.forEach((k, v) -> {
            if (v == 0) queue.add(k);
        });
        
        List<String> result = new ArrayList<>();
        while (!queue.isEmpty()) {
            String current = queue.poll();
            result.add(current);
            
            for (String neighbor : adjacency.get(current)) {
                inDegree.put(neighbor, inDegree.get(neighbor) - 1);
                if (inDegree.get(neighbor) == 0) {
                    queue.add(neighbor);
                }
            }
        }
        
        if (result.size() != loaders.size()) {
            throw new RuntimeException("存在循环依赖!");
        }
        
        return result;
    }
}

4. 执行引擎实现

class ReactivePipeline {
    private final Map<String, DataLoader<?>> loaderMap;
    private final ReactiveCache cache = new ReactiveCache();
    private final List<String> executionOrder;
    
    public ReactivePipeline(Map<String, DataLoader<?>> loaders) {
        this.loaderMap = loaders;
        this.executionOrder = DagScheduler.schedule(loaders);
    }
    
    public Mono<Map<String, Object>> execute(Map<String, Object> initialContext) {
        Map<String, Object> context = new ConcurrentHashMap<>(initialContext);
        
        return Flux.fromIterable(executionOrder)
            .flatMap(loaderName -> {
                DataLoader<?> loader = loaderMap.get(loaderName);
                return cache.computeIfAbsent(loaderName, () -> 
                    resolveDependencies(loader, context)
                        .then(Mono.defer(() -> loader.load(context)))
                );
            })
            .then(Mono.fromCallable(() -> context));
    }
    
    private Mono<Void> resolveDependencies(DataLoader<?> loader, Map<String, Object> context) {
        return Flux.fromArray(loader.getDependencies())
            .flatMap(depName -> cache.get(depName)
                .doOnNext(result -> context.put(depName, result))
            )
            .then();
    }
}

class ReactiveCache {
    private final Map<String, Mono<?>> cache = new ConcurrentHashMap<>();
    
    public Mono<Object> get(String key) {
        return Mono.defer(() -> (Mono<Object>) cache.getOrDefault(key, Mono.empty()));
    }
    
    public <T> Mono<T> computeIfAbsent(String key, Supplier<Mono<T>> supplier) {
        return Mono.defer(() -> {
            Mono<T> mono = (Mono<T>) cache.computeIfAbsent(key, k -> 
                supplier.get().cache()
            );
            return mono;
        });
    }
}

5. 使用示例

public class Main {
    public static void main(String[] args) {
        Map<String, DataLoader<?>> loaders = Map.of(
            "user", new UserLoader(),
            "product", new ProductLoader(),
            "inventory", new InventoryLoader(),
            "coupon", new CouponLoader(),
            "shipping", new ShippingLoader()
        );
        
        ReactivePipeline pipeline = new ReactivePipeline(loaders);
        
        Map<String, Object> context = new HashMap<>();
        context.put("userId", "12345");
        
        pipeline.execute(context)
            .doOnNext(res -> {
                System.out.println("\nFinal Result:");
                res.forEach((k, v) -> System.out.println(k + " -> " + v));
            })
            .block();
    }
}

执行流程说明

  1. 依赖关系拓扑

    user → product → inventory → shipping
               ↘      ↗
                coupon
    
  2. 执行顺序:user → product → (inventory & coupon) → shipping

  3. 输出日志

    Calling user service...
    Calling product service...
    Calling coupon service...
    Calling inventory service...
    Calling shipping service...
    
  4. 性能特征

    • 总耗时 ≈ 最大路径耗时(user→product→inventory→shipping)≈ 100+200+150+180=630ms
    • 传统同步调用需要100+200+150+120+180=750ms
    • 并行度优化后实际耗时约630ms(关键路径)

关键算法增强

对于大型依赖关系(200+节点),可优化Kahn算法:

// 使用优先队列实现优先级调度
Queue<String> queue = new PriorityQueue<>(Comparator.comparingInt(
    name -> loaderMap.get(name).getPriority()
));

该实现完整展示了响应式依赖编排的核心机制,实际生产使用时需要添加以下增强:

  1. 超时控制:.timeout(Duration.ofMillis(500))
  2. 错误重试:.retryWhen(Retry.backoff(3, Duration.ofMillis(100)))
  3. 监控埋点:.metrics().tag("loader", loaderName)
  4. 上下文传递:通过Reactor Context实现全链路透传

About

一个完整的生产级ReactivePipeline实现示例,包含5个下游接口的依赖编排。我们通过电商订单场景演示(用户信息→商品信息→库存→优惠券→运费计算)

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages