找回密码
 立即注册
首页 业界区 业界 LaPluma : 一个轻盈的 Go 数据流处理库

LaPluma : 一个轻盈的 Go 数据流处理库

育局糊 2025-7-30 19:35:34
最近在学习Go, 打算写点小项目来练手,实现的过程中发现需要在slice上执行Filter操作,但是标准库没有提供,像go-stream这些库提供的又是比较高级的抽象,所以就有了Lapluma这个库
仓库地址:lapluma
核心设计理念

Lapluma旨在提供一套简洁、可组合且易于理解的数据处理工具,通过提供一组正交的基础操作,开发者将这些模块进行组合,构建出满足需求的数据处理流水线
Lapluma提供了两个核心组件:Iterator和Pipe
1. Iterator - 串行数据流

Iterator 是一个前向迭代器接口,它定义了对数据序列的逐一访问。
主要操作:

  • FromSlice(data []E) Iterator[E]: 从切片创建迭代器。
  • FromMap(data map[K]V) Iterator[Pair[K,V]]: 从 map 创建迭代器。
  • Map[E, R](it Iterator[E], handler func(E) R) Iterator[R]: 对每个元素应用一个无错误的转换。
  • Filter[E](it Iterator[E], filter func(E) bool) Iterator[E]: 过滤不符合条件的元素。
  • Reduce[E, R](it Iterator[E], handler func(R, E) R, initial R) R: 将序列聚合为单个值。
  • Collect[E](it Iterator[E]) []E: 将迭代器中的所有元素收集到切片中。
示例:
  1. // 创建迭代器
  2. data := []int{1, 2, 3, 4, 5}
  3. it := iterator.FromSlice(data)
  4. // 链式操作
  5. result := iterator.Collect(
  6.     iterator.Filter(
  7.         iterator.Map(it, func(x int) int { return x * 2 }),
  8.         func(x int) bool { return x > 5 }
  9.     )
  10. ) // [6, 8, 10]
复制代码
2. Pipe - 并发数据流

Pipe 基于 Go 的 channel 构建,每个操作(如 Map, Filter)都在一个独立的 goroutine 中运行,形成一条处理流水线。
所有的 Pipe 操作都与 context.Context 集成,可以轻松实现超时控制和优雅退出。
主要操作:

  • FromSlice(data []E, ctx context.Context) *Pipe[E]: 从切片创建并发管道。
  • FromIterator(it iterator.Iterator[E], ctx context.Context) *Pipe[E]: 从迭代器创建并发管道。
  • Map, Filter, Reduce 等函数与 Iterator 版本功能相同,但以并发方式执行。
Pipe 提供的 Map、Filter、Reduce 等函数与 Iterator 版本功能类似,但它们在内部会启动 Goroutine 进行并发处理。可以为 Map 和 Filter 操作指定并行度和缓冲区大小,从而精细控制并发资源的利用。
PS: 现在还每想好具体的并行控制参数,后续打算将并行控制参数用一个struct表示,现在的方案为临时方案
示例:
  1. ctx := context.Background()
  2. // 创建并发管道
  3. p := pipe.FromSlice([]int{1, 2, 3, 4, 5}, ctx)
  4. // 并行处理(3个工作协程)
  5. result := pipe.Collect(
  6.     pipe.Filter(
  7.         pipe.Map(p, cpuIntensiveTask, 3), // 并行度3
  8.         func(x int) bool { return x > 10 },
  9.         2, // 并行度2
  10.     )
  11. )
复制代码
标准迭代器集成

Pipe也实现了Iterator的接口,所以也算是一种迭代器,兼容 Go 1.23+ 的标准 iter 包, 可以直接通过 for-range 语法遍历
  1. import "iter"
  2. // 兼容 Go 1.23+ 的 for-range 语法
  3. itForRange := iterator.Filter(
  4.         iterator.Map(iterator.FromSlice([]string{"1", "2", "3", "4"}), func(s string) int {
  5.                 val, _ := strconv.Atoi(s)
  6.                 return val * 3
  7.         }),
  8.         func(x int) bool { return x < 10 },
  9. )
  10. fmt.Print("for-range 遍历结果: ")
  11. for data := range iterator.Iter(itForRange) {
  12.         fmt.Printf("%d ", data) // 输出: 3 6 9
  13. }
  14. fmt.Println()
复制代码
错误处理

LaPluma 在设计上有意简化了核心转换函数的签名,例如 Map 的 handler 是 func(T) R 而不是 func(T) (R, error)。这并非忽略错误,而是一种设计选择:将错误视为数据流的一部分来处理
推荐以下两种模式来处理可能失败的操作:
模式一:前置过滤 (Pre-filtering)

如果某些数据从一开始就是非法的,或者不符合处理条件,应该在进入核心处理逻辑前,使用 Filter 将其剔除。
  1. // 示例:只处理正数
  2. pipe := FromSlice([]int{1, -2, 3, -4}, ctx)
  3. positivePipe := Filter(pipe, func(n int) bool {
  4.     return n > 0
  5. })
  6. // ... 后续操作只会看到 {1, 3}
复制代码
模式二:使用 TryMap 处理可失败的转换

当数据转换过程本身可能失败时(例如,解析字符串、调用外部 API),使用 TryMap 函数。它的 handler 签名为 func(T) (R, error)。当 handler 返回一个非 nil 的 error 时,TryMap 会自动跳过(丢弃) 这个元素,并继续处理下一个。这使得流水线可以在遭遇“数据级”错误时保持运行,而不会被中断。
  1. import (
  2.     "strconv"
  3.     "errors"
  4. )
  5. // 示例:将字符串转换为整数,失败则跳过
  6. stringPipe := FromSlice([]string{"1", "two", "3", "four"}, ctx)
  7. // 使用 TryMap,handler 返回 (int, error)
  8. intPipe := TryMap(stringPipe, func(s string) (int, error) {
  9.     i, err := strconv.Atoi(s)
  10.     if err != nil {
  11.         // 返回错误,这个元素将被丢弃
  12.         return 0, errors.New("not a number")
  13.     }
  14.     return i, nil
  15. })
  16. // 最终 Reduce 只会处理成功转换的 {1, 3}
  17. sum := Reduce(intPipe, func(acc, n int) int { return acc + n }, 0)
  18. // sum 的结果是 4
复制代码
PS:若需要收集Map过程中的错误,可以考虑使用在util.go中Result[T]作为返回值,要如何设计此场景的错误处理机制还没想好:通过在调用时添加一个onError参数来处理错误;或者返回两个Pipe,用其中一个来处理错误信息;或者其他方案
运行测试
  1. go test ./...
复制代码
后续计划


  • 提供更丰富的转换操作, 如Distinct, Zip, Peek
  • 完善错误处理机制
  • 规范Pipe的并发控制参数
个人博客:https://muzhy.github.io/
微信公众号:午夜游鱼

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册