在C++中 如何实现java中的Stream

文章目录

关于Steam

举个例子

List<Integer> list =Arrays.asList(1,2,3);// map将int的流,转换为string 的流, 然后对流进行过滤,最后收集到一个listList<String> list2=list.stream().map(x -> x +"123").filter(x -> x.startsWith("11")).collect(Collectors.toList());

Java 8中的Stream API是一种全新的处理集合数据的方式,它提供了一种非常便捷的方法来对集合中的元素进行筛选、排序、聚合等操作。下面是一些Stream API的特点和应用场景,以及其关键特性:

特点:

  • Stream是一种延迟计算的集合,它只有在真正需要使用时才会执行计算操作,可以极大地提高效率。
  • Stream可以处理大量数据,其内部使用了多线程的技术,可以自动并行化处理。
  • Stream可以实现非常复杂的操作,比如filter、map、reduce等等。

应用场景:

  • 数据处理:Stream API可以用于处理大量的数据,比如从数据库或者文件中读取数据,并对其进行处理。
  • 并发编程:Stream API内部使用了多线程技术,可以实现并发编程,提高程序的执行效率。
  • 功能扩展:Stream API提供了大量的中间操作和终止操作,可以方便地对集合中的元素进行筛选、排序、聚合等操作。

关键特性:

  • 流(Stream):Stream是一个数据流,可以看做是一种集合,但是它并不会存储数据,而是通过函数式编程的方式来对数据进行处理。
  • 中间操作:Stream提供了大量的中间操作,比如filter、map、distinct、sorted、limit等等,这些中间操作会返回一个新的Stream。
  • 终止操作:Stream提供了一些终止操作,比如forEach、count、reduce、collect等等,这些操作会触发Stream的执行,返回一个结果。
  • 并行流:Stream提供了并行流的功能,可以利用多线程的技术来提高程序的执行效率。可以通过parallelStream()方法获取一个并行流。
  • 支持函数式编程:Stream API使用函数式编程的方式来处理数据,可以大大简化程序的编写过程。

总之,Stream API是Java 8中非常重要的一个新特性,它可以让我们以一种更加简洁、高效的方式来处理集合中的数据,应用场景非常广泛。

对于java转C++的小伙伴来说,这个api实在太好用了,完全离不开它,能否在c++中也实现一套类似的api呢

实现思路

java的实现分析, 详细参考:Java中牛X的Stream流水线操作是怎样实现的 这里简单说明一下:

如下java代码:

list.stream().map(x->x+“123”).filter(x->x.startsWith(“test”)).collect(Collectors.toList());

抽象来看 创建了一些列的StatelessOp/StateFullOp ,通过类似链表的方式串联了起来,最终执行的时候,遍历链表将 StatelessOp/StateFullOp 中用户定义操作,通过Sink这个类,一层层wapper,最终在收集的时候执行了这个wrapper。

类似如下过程:

在这里插入图片描述

本质来说就是通过一层一层的回调函数的方式将多个操作串联了起来。 那么C++也可以通过回调函数的方式直接实现一个简易的类似java 中Stream的API。

也可以通过生产消费者模型来解释,本质都是一样的:
有AB两个模块,A负责生产数据,B负责消费数据,B不关心A怎么生产,A不关心B怎么消费,可能需要先过滤,转换,或者聚合什么的,这种情况下,传统的做法就是在A提供一个接口,注册一个回调函数,B负责调用,当A生产出数据的时候,调用B注册的回调函数进行消费。

思路打开,如果我们 有ABCD。。。 N 个模块呢,一级级注册回调函数,将这些回调函数级联起来,是不是就很像Stream的api呢,而且由于都是回调函数天生是懒加载的或者说天生支持反应式编程中的背压机制,就是说会根据消费者的消费速度去生产数据。

stream API 的本质就是注册回调函数,并且在合适的时候触发这个回调函数的调用

接口定义

template<typename T> class Flow { protected: /** * 关键函数定义: * 此函数的实现定义了何时调用入参的回调函数(数据的生产),当流生产出数据时就调用入参的回调函数, * 例如:对于一个vector的流就是循环调用回调函数 * 对于一个无限流,那么就是死循环调用回调函数 */ std::function<void(std::function<void(T)>)> consume; public: explicit Flow(const std::function<void(std::function<void(T)>)> c) : consume(c) {} } 

关键在于 这个成员变量函数,consume: 此函数的实现定义了何时调用入参的回调函数(数据的生产),当流生产出数据时就调用入参的回调函数

将一个Vector转换成流:

static Flow<T> of(std::vector<T> vector) { return Flow<std::string>([=](const std::function<void(std::string)> &c) { for (const auto &item: vector) { c(item); } }); } 

map

对于map其实就是消费一个流,并且 产生一个新的流。

template<typename R> Flow<R> map(std::function<R(T)> dataMapFun) { Flow<T> mapFLow = Flow([=](std::function<void(R)> c) { //定义了上一个流消费消费逻辑,消费的同时转换数据,产生下一个流 this->consume([=](T data) { c(dataMapFun(data)); //生产数据 }); }); return mapFLow; } 

记住我们对一个conume函数的定义,调用入参的回调函数即是生产数据

flatmap

template<typename R> Flow<R> flatMap(std::function<Flow<R>(T)> function) { return Flow([=](std::function<void(R)> c) { consume([=](T data) { Flow<R> flow = function(data); flow.consume(c); }); }); } 

是不是很简单,只要牢记我们对一个conume函数的定义,调用入参的回调函数即是生产数据

takeWhile

Flow<T> takeWhile(std::function<bool(T)> predicate) { return Flow( [=](std::function<void(T)> c) { consumeTillStop([=](T data) { if (predicate(data)) { stop(); } c(data); }); } ); } void stop() { throw StopException{}; } void consumeTillStop(std::function<void(T)> consumer) { try { consume(consumer); } catch (StopException exception) { } } 

因为都是回调函数,我们只能通过异常通知生产者停止生产

filter

Flow<T> filter(std::function<bool(T)> predicate) { return Flow( [=](std::function<void(T)> c) { consume([=](T data) { if (predicate(data)) { c(data); } }); } ); } 

sorted

Flow<T> sorted(std::function<bool(T, T)> comparator) { auto list = this->toVector(); std::sort(list.begin(), list.end(), comparator); return Flow::of(list); } 

toVector

类似java中toList,这里是最终的操作,不要产生一个新的流了。

std::vector<T> toVector() { std::vector<T> list; this->consume([&list](T data) { list.push_back(data); }); return list; } 

扩展

上面只给出了部分api的实现,按照这个思路stream中所有api都是可以实现的,上面还给出了java9中才支持的takeWhile。

上面也只给出了Vector转换为流,只要牢记我们那个关键函数的定义 consume:当流生产出数据时就调用入参的回调函数那么就可以实现所有集合的流式操作

那么它仅限于此么?当然不是

并行流

并发多线程调用回调函数,那么这个流就变成了并发的了

文件流

当读出文件时,就调用回调函数,那么它就变成了文件流,而且是基于回调函数的,不必读取所有的文件内容,读一部分,处理一部分,占用内存十分的小

二元流

看看下面这个函数的定义,它又变成了二元流了,当然N元的也不是不可以,是吧

template<typename T1,typename T2> class Flow { protected: /** * 关键函数定义: * 此函数的实现定义了何时调用入参的回调函数(数据的生产),当流生产出数据时就调用入参的回调函数, * 例如:对于一个vector的流就是循环调用回调函数 * 对于一个无限流,那么就是死循环调用回调函数 */ std::function<void(std::function<void(T1,T2)>)> consume; } 

业务流

最后回到生成者消费者模型,仔细想想也满足上述模型,而且是背压的,少了一个存放数据队列

总结

我们通过级联回调函数的方式实现了java中stream相关api,比起java jdk的实现要简单很多,但是该有的功能都有了。

这些代码的实现我觉得正是 业务逻辑与控制逻辑的分离的体现,api的调用者只关心逻辑,不关系怎么去控制。比如filter你只需要告诉我过滤的的条件是什么,怎么样过滤不需要关心。

也正是封装易变点思想的体现,通过上述api的封装,对于map ,filter等等操作,不用重复去写for循环,if语句,只需要告诉api的逻辑是什么,也就是传入的func

也是函数式编程思想的体现,无状态的,函数作为参数传递,惰性求值和并性处理等

代码demo

#include <vector> #include <list> #include <set> #include <map> #include <functional> /** * c++ 版本流的实现,通过回调函数的方式实现类似java中stream 的api,目前是demo版本,支持部分功能,看需要可能会支持并发流,二元流等功能 * * 由于是回调函数实现,天然是懒加载,被压的方式 * * @tparam T 流的类型 */ template<typename T> class Flow { private: void stop() { throw StopException{}; } void consumeTillStop(std::function<void(T)> consumer) { try { consume(consumer); } catch (StopException exception) { } } protected: /** * 关键函数定义: * 此函数的实现定义了何时调用入参的回调函数(数据的生产),当流生产出数据时就调用入参的回调函数, * 例如:对于一个vector的流就是循环调用回调函数 * 对于一个无限流,那么就是死循环调用回调函数 * * */ std::function<void(std::function<void(T)>)> consume; public: explicit Flow(const std::function<void(std::function<void(T)>)> c) : consume(c) {} using StopException = std::exception; void forEach(std::function<void(T)> consumer) { this->consume(consumer); } /** * 流的转换,消费一个流,产生一个新的流, * * 定义了上一个流的消费逻辑,以及本流的产生逻辑 * @tparam R 新的流的类型 * @param dataMapFun * @return */ template<typename R> Flow<R> map(std::function<R(T)> dataMapFun) { Flow<T> mapFLow = Flow([=](std::function<void(R)> c) { consume([=](T data) { c(dataMapFun(data)); }); }); return mapFLow; } template<typename R> Flow<R> flatMap(std::function<Flow<R>(T)> function) { return Flow([=](std::function<void(R)> c) { consume([=](T data) { Flow<R> flow = function(data); flow.consume(c); }); }); } Flow<T> takeWhile(std::function<bool(T)> predicate) { return Flow( [=](std::function<void(T)> c) { consumeTillStop([=](T data) { if (predicate(data)) { stop(); } c(data); }); } ); } Flow<T> filter(std::function<bool(T)> predicate) { return Flow( [=](std::function<void(T)> c) { consume([=](T data) { if (predicate(data)) { c(data); } }); } ); } Flow<T> dropWhile(std::function<bool(T)> predicate) { return Flow( [=](std::function<void(T)> c) { bool drop = false; consume([=, &drop](T data) { if (drop) { c(data); } if (!drop) { drop = predicate(data); } }); } ); } Flow<T> skip(int n) { return Flow( [=](std::function<void(T)> c) { int count = n; consume([=, &count](T data) { if (count <= 0) { c(data); } else { count--; } }); } ); } Flow<T> peek(std::function<void(T)> consumer) { return Flow( [=](std::function<void(T)> c) { consume([=](T data) { consumer(data); c(data); }); } ); } Flow<T> sorted(std::function<bool(T, T)> comparator) { auto list = this->toVector(); std::sort(list.begin(), list.end(), comparator); return Flow::of(list); } Flow<T> sorted() { auto list = this->toVector(); std::sort(list.begin(), list.end()); return Flow::of(list); } std::vector<T> toVector() { std::vector<T> list; this->consume([&list](T data) { list.push_back(data); }); return list; } template<typename K> std::unordered_map<K, std::vector<T>> groupBy(std::function<K(T)> keyFun) { std::unordered_map<K, std::vector<T>> map; consume([=, &map](T data) { K key = keyFun(data); auto iter = map.find(key); if (iter != map.end()) { iter->second.push_back(data); } else { std::vector<T> vector; vector.push_back(data); map.insert(std::make_pair(key, vector)); } }); return map; } std::set<T> toSet() { std::set<T> set; this->consume([&set](T data) { set.insert(data); }); return set; } static Flow<T> of(std::vector<T> vector) { return Flow<std::string>([=](const std::function<void(std::string)> &c) { for (const auto &item: vector) { c(item); } }); } }; 
std::vector<std::string> vec; vec.push_back("12_3"); vec.push_back("12"); vec.push_back("11"); vec.push_back("a"); vec.push_back("b"); Flow<std::string> strFlow = Flow<std::string>::of(vec); std::cout << "demo1<<<<<<<<<<<<<<<<<<:" << std::endl; std::vector<std::string> mapVec = strFlow .map<std::string>([=](const std::string &data) { return data + "123"; }) .peek([=](const std::string &data) { std::cout << "peek:" + data << std::endl; }) .skip(3) .toVector(); 

tips

给上面的那些函数起个好名字,可能会让java的同学感到更有亲切感

using Consumer = std::function<void(T)>; using Function = std::function<T(T)>; using Predicate = std::function<bool(T)>; using Comparator = std::function<bool(T,T)>; 

Read more

8个降AI率工具推荐!本科生高效降AIGC神器合集

8个降AI率工具推荐!本科生高效降AIGC神器合集

8个降AI率工具推荐!本科生高效降AIGC神器合集 AI降重工具:让论文更自然,让学术更安心 在当前高校学术规范日益严格的背景下,越来越多的本科生开始关注“论文降AIGC率”和“去AI痕迹”的问题。随着AI写作工具的广泛应用,许多学生在使用这些工具完成初稿后,发现论文的AIGC检测率偏高,影响了最终成绩。这时候,一款高效的AI降重工具就显得尤为重要。 优秀的AI降重工具不仅能够有效降低论文的AIGC率,还能在不改变原意的前提下,优化语言表达,使论文更加符合学术规范。同时,这些工具往往具备强大的查重功能,能帮助学生提前发现潜在重复内容,从而进行针对性修改。无论是面对学校要求的查重系统,还是国际通用的检测平台,这些工具都能提供可靠的支持。 工具名称主要功能适用场景千笔强力去除AI痕迹、保语义降重AI率过高急需降重云笔AI多模式降重初稿快速处理锐智 AI综合查重与降重定稿前自查文途AI操作简单片段修改降重鸟同义词替换小幅度修改笔杆在线写作辅助辅助润色维普官方查重最终检测万方数据库查重数据对比 千笔AI(官网直达入口) :https://www.qianbixiezuo.c

By Ne0inhk

对于VScode中Copilot插件使用卡顿问题的解决办法

copilot卡顿主要是网络和内存占用原因。 VScode内存优化解决办法: 结合链接和我补充的基本都可以解决。 解决VSCode无缘无故卡顿的问题_vscode卡顿-ZEEKLOG博客 在VScode中打开setting.json文件,打开方法ctrl+shift+p,输入Preferences: Open User Settings (JSON), 然后添加如下代码: { "search.followSymlinks": false, "git.autorefresh": false, "editor.formatOnSave": false } 结合链接和我补充的基本都可以解决。 VScode代理问题: vscode copilot长时间没反应_vscode中copilot总是卡住-ZEEKLOG博客 配置代理的话两种方法,上面是一种,推荐两种结合起来用(不冲突) 还是在setting.json文件中,添加如下代码: { "http.proxy": "http://127.

By Ne0inhk

使用LLama.cpp本地部署大模型

摘要         llama.cpp是一个基于C/C++开发的高效大语言模型推理工具,支持跨平台部署和Docker快速启动,核心功能是在有限的计算资源情况下本地部署使用大模型。本文介绍了通过Docker方式部署llama.cpp的步骤,包括如何下载模型、CPU/GPU配置及启动参数说明。llama.cpp提供Web UI界面和OpenAI兼容API,支持文本和多模态对话,对电脑配置要求不高,完全免费且私密,让普通用户也能轻松在本地运行大语言模型。 LLama.cpp简介        1. llama.cpp 是一个在 C/C++ 中实现大型语言模型(LLM)推理的工具         2.支持跨平台部署,也支持使用 Docker 快速启动         3.可以运行多种量化模型,对电脑要求不高,CPU/GPU设备均可流畅运行。         支持模型包含:llama系列,qwen系列,gemma系列,Falcon、Alpaca、GPT4All、Chinese LLaMA、Vigogne、

By Ne0inhk
PaperZZ 降重 / 降 AIGC 功能:如何把 “AI 痕迹 + 高重复率” 拧成 “可通过知网 / 维普的原创文本”?——2026 届毕业生的学术合规指南

PaperZZ 降重 / 降 AIGC 功能:如何把 “AI 痕迹 + 高重复率” 拧成 “可通过知网 / 维普的原创文本”?——2026 届毕业生的学术合规指南

Paperzz-AI官网免费论文查重复率AIGC检测/开题报告/文献综述/论文初稿 (注:本文聚焦工具辅助学术写作的合规优化,所有内容需结合研究者原创思考使用,严格遵守学术诚信与院校规范) 一、论文提交前的 “双重焦虑”:你在为 “AI 痕迹 + 高重复率” 彻夜难眠? 论文提交前的崩溃,往往不是 “研究没做完”,而是卡在 “AI 生成痕迹被检测” 和 “重复率超标” 的双重困境里: * 用 AI 写的初稿,维普检测显示 AIGC 相似度 99.8%,被导师打回 “必须消除 AI 痕迹”; * 手动改了 3 遍,重复率从 35% 降到 28%,还是过不了学校 “≤20%” 的红线; * 改到最后,句子变得不通顺、专业术语全丢,

By Ne0inhk