webrtc 的TaskQueue() 任务队列

chatgpt/2023/10/4 7:12:32

TaskQueue 定义

见文件:rtc_base\task_queue.h

具体实现

class RTC_LOCKABLE RTC_EXPORT TaskQueue {public:// TaskQueue priority levels. On some platforms these will map to thread// priorities, on others such as Mac and iOS, GCD queue priorities.using Priority = ::webrtc::TaskQueueFactory::Priority;// 注意这个构造函数,以TaskQueueBase智能指针作为参数// TaskQueue 的真正实现,其实是这个TaskQueueBase explicit TaskQueue(std::unique_ptr<webrtc::TaskQueueBase,webrtc::TaskQueueDeleter> task_queue);~TaskQueue();}

创建一个 TaskQueue

  //首先声明相应对象:一个任务队列工厂、一个任务队列对象std::shared_ptr<webrtc::TaskQueueFactory> video_encoder_task_queue_factory_;rtc::TaskQueue video_encoder_queue_;//在指定类构造函数列表中创建任务队列//假如我们的调用类是VideoHandler//具体的实现如下:VideoHandbler::VideoHandbler():  video_encoder_task_queue_factory_(webrtc::CreateDefaultTaskQueueFactory()),video_encoder_queue_(video_encoder_task_queue_factory_->CreateTaskQueue("VideoEncoderQueue",TaskQueueFactory::Priority::NORMAL)){}
//具体过程:
//首先创建一个默认的任务队列工厂
//然后基于任务队列工厂创建一个任务队列TaskQueueBase,
//这个新创建的任务队列工厂TaskQueueBase,作为构造函数参数传给了 TaskQueue(),从而创建了我们需要的video_encoder_queue_//TaskQueue的构造函数,以TaskQueueBase的智能指针作为参数
//通过成员变量impl_ 接收该指针
//后面会发现,任务最终都传给了TaskQueueBase()TaskQueue::TaskQueue(std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter> task_queue): impl_(task_queue.release()) {}//向TaskQueue投递任务时,最终还是通过 imple_抛给了TaskQueueBase()
void TaskQueue::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {return impl_->PostTask(std::move(task));
}

TaskQueueLibevent

//每个平台都有各自的实现
//我们主要看TaskQueueLibevent()的实现

//TaskQueueLibevent 继承自TaskQueueBase(),真正处理Task的函数
class TaskQueueLibevent final : public TaskQueueBase {
public:TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);void Delete() override;void PostTask(std::unique_ptr<QueuedTask> task) override;void PostDelayedTask(std::unique_ptr<QueuedTask> task,uint32_t milliseconds) override;
private:bool is_active_ = true;//输入、输出管道用来唤起线程int wakeup_pipe_in_ = -1;int wakeup_pipe_out_ = -1;event_base* event_base_;event wakeup_event_;//任务队列线程//一个任务队列对象一个线程//对于windows平台内部会执行 CreateThread()创建线程//android调用 pthread_create()创建线程rtc::PlatformThread thread_;Mutex pending_lock_;absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> pending_RTC_GUARDED_BY(pending_lock_);// Holds a list of events pending timers for cleanup when the loop exits.std::list<TimerEvent*> pending_timers_;//具体的实现部分TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,rtc::ThreadPriority priority): event_base_(event_base_new()),thread_(&TaskQueueLibevent::ThreadMain, this, queue_name, priority) {int fds[2];//创建一个管道用于线程之间通信RTC_CHECK(pipe(fds) == 0);//把管道设置诶非阻塞的SetNonBlocking(fds[0]);SetNonBlocking(fds[1]);wakeup_pipe_out_ = fds[0];wakeup_pipe_in_ = fds[1];//绑定管道可读事件,当管道可读时会自动调用 OnWakeup//这块是借用libevent库来实现的EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,EV_READ | EV_PERSIST, OnWakeup, this);event_add(&wakeup_event_, 0);thread_.Start();
}
//调用 fcntl把管道设置诶非阻塞
bool SetNonBlocking(int fd) {const int flags = fcntl(fd, F_GETFL);RTC_CHECK(flags != -1);return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
}//线程函数一直轮训相关事件
void TaskQueueLibevent::ThreadMain(void* context) {TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);{CurrentTaskQueueSetter set_current(me);while (me->is_active_)event_base_loop(me->event_base_, 0);}for (TimerEvent* timer : me->pending_timers_)delete timer;
}

向任务队列中抛入任务

void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {{//向队列投递任务时,先加锁MutexLock lock(&pending_lock_);bool had_pending_tasks = !pending_.empty();//把任务插入到队列中pending_.push_back(std::move(task));// Only write to the pipe if there were no pending tasks before this one// since the thread could be sleeping. If there were already pending tasks// then we know there's either a pending write in the pipe or the thread has// not yet processed the pending tasks. In either case, the thread will// eventually wake up and process all pending tasks including this one.//如果当前线程有未处理的阻塞任务,就暂时不唤醒if (had_pending_tasks) {return;}}// Note: This behvior outlined above ensures we never fill up the pipe write// buffer since there will only ever be 1 byte pending.//写入的内容是KRunTasks,表明是要处理的Task任务char message = kRunTasks;//向管道中写入数据,唤起线程RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),sizeof(message));
}

管道中写入数据后,会触发OnWakeup()函数进行处理

void TaskQueueLibevent::OnWakeup(int socket,short flags,  // NOLINTvoid* context) {TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);RTC_DCHECK(me->wakeup_pipe_out_ == socket);char buf;// 调用 read 函数从管道中读入数据//读到数据后基于相应的数据类型进行处理RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));switch (buf) {case kQuit:me->is_active_ = false;event_base_loopbreak(me->event_base_);break;case kRunTasks: {//要处理的任务absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> tasks;{//加锁//并取出要处理的任务MutexLock lock(&me->pending_lock_);tasks.swap(me->pending_);}RTC_DCHECK(!tasks.empty());//遍历所有的task,并调用各自的Run()函数进行处理//最终就是回调传进来任务函数for (auto& task : tasks) {if (task->Run()) {task.reset();} else {// |false| means the task should *not* be deleted.task.release();}}break;}default:RTC_NOTREACHED();break;}
}

使用例子,向任务队列投入任务

向队列中投入一个lambda 任务,
当任务线程接收到该任务后,就会回调该lambda函数

 video_encoder_queue_.PostTask([this,&video_data]{....encoder(video_data);....});

视频编码队列 encoder_queue_

encoder_queue_ 就是利用前面讲到TaskQueue进行视频编码的

具体实现:

//声明编码队列
rtc::TaskQueue encoder_queue_;
//构造函数列表中创建该编码队列
VideoStreamEncoder::VideoStreamEncoder()://创建编码队列encoder_queue_(task_queue_factory->CreateTaskQueue("EncoderQueue",TaskQueueFactory::Priority::NORMAL)){}//向编码队列中投入视频数据进行编码
//所以真正的编码线程,就是encoder_queue所拥有的线程了
void VideoStreamEncoder::OnFrame(const VideoFrame& video_frame) {encoder_queue_.PostTask([this, incoming_frame, post_time_us, log_stats, post_interval_us]() {if (posted_frames_waiting_for_encode == 1 && !cwnd_frame_drop) {MaybeEncodeVideoFrame(incoming_frame, post_time_us);}}
}

析构函数会主动关闭任务队列

析构函数中会自动关闭该任务队列,并停止对应任务线程,不需要我们干预

TaskQueue::~TaskQueue() {// There might running task that tries to rescheduler itself to the TaskQueue// and not yet aware TaskQueue destructor is called.// Calling back to TaskQueue::PostTask need impl_ pointer still be valid, so// do not invalidate impl_ pointer until Delete returns.impl_->Delete();
}void TaskQueueLibevent::Delete() {RTC_DCHECK(!IsCurrent());struct timespec ts;char message = kQuit;//向管道中写入关闭消息while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {// The queue is full, so we have no choice but to wait and retry.RTC_CHECK_EQ(EAGAIN, errno);ts.tv_sec = 0;ts.tv_nsec = 1000000;nanosleep(&ts, nullptr);}//停止线程thread_.Stop();event_del(&wakeup_event_);IgnoreSigPipeSignalOnCurrentThread();//关闭管道close(wakeup_pipe_in_);close(wakeup_pipe_out_);wakeup_pipe_in_ = -1;wakeup_pipe_out_ = -1;event_base_free(event_base_);delete this;
}

以上就是webrtc的 TaskQueue() 任务队列的实现过程了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.exyb.cn/news/show-5314568.html

如若内容造成侵权/违法违规/事实不符,请联系郑州代理记账网进行投诉反馈,一经查实,立即删除!

相关文章

PHP8的常量-PHP8知识详解

常量和变量是构成PHP程序的基础&#xff0c;在PHP8中常量的这一节中&#xff0c;主要讲到了定义常量和预定义常量两大知识点。 一、定义常量 定义常量也叫声明常量。在PHP8中&#xff0c;常量就是一个标识符&#xff08;名字&#xff09;&#xff0c;一旦定义&#xff08;声明&…

笨办法学python3进阶篇pdf,笨办法学python3pdf完整版

大家好&#xff0c;小编来为大家解答以下问题&#xff0c;笨办法学python 3电子书下载&#xff0c;笨办法学python3pdf完整版&#xff0c;今天让我们一起来看看吧&#xff01; 1、笨方法学python习题43 按照你说的 Map是一个类&#xff0c;scene_map是一老胡镇个类实例 scene_…

Keepalived 在CentOS 7安装并配置监听MySQL双主

keepalived安装 MySQL双主配置请看这里&#xff1a;https://tongyao.blog.csdn.net/article/details/132016200?spm1001.2014.3001.5502 128、129两台服务器安装步骤相同&#xff0c;配置文件不同&#xff0c;下面有介绍。 1.安装相关依赖包&#xff0c;并下载keepalived安…

C++STL库中queue

文章目录 queue的介绍 queue的常用接口 queue的模拟实现 priority_queue的介绍 priority_queue的常用接口 priority_queue的模拟实现 容器适配器 deque的介绍 仿函数 一、queue的介绍 1. 队列是一种容器适配器&#xff0c;专门用于在FIFO上下文(先进先出)中操作&#xff0c;其…

回归预测 | MATLAB实现SO-CNN-LSTM蛇群算法优化卷积长短期记忆神经网络多输入单输出回归预测

回归预测 | MATLAB实现SO-CNN-LSTM蛇群算法优化卷积长短期记忆神经网络多输入单输出回归预测 目录 回归预测 | MATLAB实现SO-CNN-LSTM蛇群算法优化卷积长短期记忆神经网络多输入单输出回归预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 MATLAB实现SO-CNN-LS…

【数据可视化】(一)数据可视化概述

目录 0.本章节概述 一、数据可视化 1、什么是数据可视化? 2、数据可视化的好处 3、数据可视化的用途 二、数据探索 1、数据相关工具的使用情景: 2、探索性查询 三、数据挑战 1、什么是数据挑战?

【C++】模板进阶(模板的特化,非类型模板参数,模板的分离编译)

文章目录 一、模板使用时一定要加typename的情况二、 非类型模板参数三、模板的特化1.函数模板特化2.类模板特化1.全特化&#xff1a;2. 偏特化&#xff1a;1. 部分特化2.参数更一步限制 四、模板的分离编译1.Stack.h2.Stack.cpp(定义)3.test.cpp 一、模板使用时一定要加typena…

rust format!如何转义{},输出{}?

在Rust中&#xff0c;如果你想要在字符串中包含花括号 {} &#xff0c;你需要使用双花括号 {{}} 来进行转义。这是因为单个花括号 {} 在字符串中表示占位符&#xff0c;用于格式化字符串。 以下是一个示例&#xff1a; fn main() {let text "这是一个示例&#xff1a; {…
推荐文章