libobs_wrapper/runtime.rs
1//! Runtime management for safe OBS API access across threads
2//!
3//! This module provides the core thread management functionality for the libobs-wrapper.
4//! It ensures that OBS API calls are always executed on the same thread, as required by
5//! the OBS API, while still allowing application code to interact with OBS from any thread.
6//!
7//! # Thread Safety
8//!
9//! The OBS C API is not thread-safe and requires that all operations occur on the same thread.
10//! The `ObsRuntime` struct creates a dedicated thread for all OBS operations and manages
11//! message passing between application threads and the OBS thread.
12//!
13//! # Blocking APIs
14//!
15//! The runtime locking APIs:
16//! - By default all operations are synchronous
17//!
18//! # Example
19//!
20//! ```no_run
21//! use libobs_wrapper::runtime::ObsRuntime;
22//! use libobs_wrapper::utils::StartupInfo;
23//!
24//! fn example() {
25//! // Assuming that the OBS context is already initialized
26//!
27//! // Run an operation on the OBS thread
28//! let runtime = context.runtime();
29
30//! runtime.run_with_obs(|| {
31//! // This code runs on the OBS thread
32//! println!("Running on OBS thread");
33//! }).unwrap();
34//! }
35//! ```
36
37use std::ffi::CStr;
38use std::sync::Arc;
39use std::{ptr, thread};
40
41use crate::context::ObsContext;
42use crate::crash_handler::main_crash_handler;
43use crate::enums::{ObsLogLevel, ObsResetVideoStatus};
44use crate::logger::{extern_log_callback, internal_log_global, LOGGER};
45use crate::utils::initialization::{platform_specific_setup, PlatformSpecificGuard};
46use crate::utils::{ObsError, ObsModules, ObsString};
47use crate::{context::OBS_THREAD_ID, utils::StartupInfo};
48
49#[cfg(feature = "enable_runtime")]
50use crate::unsafe_send::Sendable;
51use std::fmt::Debug;
52#[cfg(feature = "enable_runtime")]
53use std::sync::atomic::{AtomicUsize, Ordering};
54#[cfg(feature = "enable_runtime")]
55use std::sync::mpsc::{channel, Sender};
56#[cfg(feature = "enable_runtime")]
57use std::sync::Mutex;
58#[cfg(feature = "enable_runtime")]
59use std::thread::JoinHandle;
60
61/// Command type for operations to perform on the OBS thread
62#[cfg(feature = "enable_runtime")]
63enum ObsCommand {
64 /// Execute a function on the OBS thread and send result back
65 Execute(
66 Box<dyn FnOnce() -> Box<dyn std::any::Any + Send> + Send>,
67 oneshot::Sender<Box<dyn std::any::Any + Send>>,
68 ),
69 /// Signal the OBS thread to terminate
70 Terminate,
71}
72
73/// Core runtime that manages the OBS thread
74///
75/// This struct represents the runtime environment for OBS operations.
76/// It creates and manages a dedicated thread for OBS API calls to
77/// ensure thread safety while allowing interaction from any thread.
78///
79/// # Thread Safety
80///
81/// `ObsRuntime` can be safely cloned and shared across threads. All operations
82/// are automatically dispatched to the dedicated OBS thread.
83///
84/// # Lifecycle Management
85///
86/// When the last `ObsRuntime` instance is dropped, the OBS thread is automatically
87/// shut down and all OBS resources are properly released.
88#[derive(Debug, Clone)]
89pub struct ObsRuntime {
90 #[cfg(feature = "enable_runtime")]
91 command_sender: Arc<Sender<ObsCommand>>,
92 #[cfg(feature = "enable_runtime")]
93 queued_commands: Arc<AtomicUsize>,
94 _guard: Arc<_ObsRuntimeGuard>,
95
96 #[cfg(not(feature = "enable_runtime"))]
97 _platform_specific: Option<Arc<PlatformSpecificGuard>>,
98}
99
100impl ObsRuntime {
101 /// Initializes the OBS runtime.
102 ///
103 /// This function starts up OBS on a dedicated thread and prepares it for use.
104 /// It handles bootstrapping (if configured), OBS initialization, module loading,
105 /// and setup of audio/video subsystems.
106 ///
107 /// # Parameters
108 ///
109 /// * `options` - The startup configuration for OBS
110 ///
111 /// # Returns
112 ///
113 /// A `Result` containing:
114 /// - `(ObsRuntime, ObsModules, StartupInfo)`: The initialized runtime, loaded modules, and startup info.
115 /// - `ObsError`: If initialization fails.
116 ///
117 /// # Examples
118 ///
119 /// ```
120 /// use libobs_wrapper::runtime::{ObsRuntime, ObsRuntimeReturn};
121 /// use libobs_wrapper::utils::StartupInfo;
122 ///
123 /// fn initialize() {
124 /// let startup_info = StartupInfo::default();
125 /// match ObsRuntime::startup(startup_info) {
126 /// Ok((runtime, modules, info)) => {
127 /// // Use the initialized runtime
128 /// },
129 /// Err(e) => {
130 /// // Handle initialization error
131 /// }
132 /// }
133 /// }
134 /// ```
135 #[allow(unused_mut)]
136 pub(crate) fn startup(
137 mut options: StartupInfo,
138 ) -> Result<(ObsRuntime, ObsModules, StartupInfo), ObsError> {
139 // Check if OBS is already running on another thread
140 let obs_id = OBS_THREAD_ID.lock().map_err(|_e| ObsError::MutexFailure)?;
141 if obs_id.is_some() {
142 return Err(ObsError::ThreadFailure);
143 }
144
145 drop(obs_id);
146
147 log::trace!("Initializing OBS context");
148 ObsRuntime::init(options)
149 .map_err(|e| ObsError::Unexpected(format!("Failed to initialize OBS runtime: {:?}", e)))
150 }
151
152 /// Internal initialization method
153 ///
154 /// Creates the OBS thread and performs core initialization.
155 #[cfg(not(feature = "enable_runtime"))]
156 fn init(info: StartupInfo) -> anyhow::Result<(ObsRuntime, ObsModules, StartupInfo)> {
157 let (startup, mut modules, platform_specific) = Self::initialize_inner(info)?;
158
159 let runtime = Self {
160 _guard: Arc::new(_ObsRuntimeGuard {}),
161 _platform_specific: platform_specific,
162 };
163
164 modules.runtime = Some(runtime.clone());
165 Ok((runtime, modules, startup))
166 }
167
168 /// Internal initialization method
169 ///
170 /// Creates the OBS thread and performs core initialization.
171 #[cfg(feature = "enable_runtime")]
172 fn init(info: StartupInfo) -> anyhow::Result<(ObsRuntime, ObsModules, StartupInfo)> {
173 let (command_sender, command_receiver) = channel();
174 let (init_tx, init_rx) = oneshot::channel();
175 let queued_commands = Arc::new(AtomicUsize::new(0));
176
177 let queued_commands_clone = queued_commands.clone();
178 let handle = std::thread::spawn(move || {
179 log::trace!("Starting OBS thread");
180
181 let res = Self::initialize_inner(info);
182
183 match res {
184 Ok((info, modules, _platform_specific)) => {
185 log::trace!("OBS context initialized successfully");
186 let e = init_tx.send(Ok((Sendable(modules), info)));
187 if let Err(err) = e {
188 log::error!("Failed to send initialization signal: {:?}", err);
189 }
190
191 // Process commands until termination
192 while let Ok(command) = command_receiver.recv() {
193 match command {
194 ObsCommand::Execute(func, result_sender) => {
195 let result = func();
196 let _ = result_sender.send(result);
197 queued_commands_clone.fetch_sub(1, Ordering::SeqCst);
198 }
199 ObsCommand::Terminate => break,
200 }
201 }
202
203 let r = Self::shutdown_inner();
204 if let Err(err) = r {
205 log::error!("Failed to shut down OBS context: {:?}", err);
206 }
207 }
208 Err(err) => {
209 log::error!("Failed to initialize OBS context: {:?}", err);
210 let _ = init_tx.send(Err(err));
211 }
212 }
213 });
214
215 log::trace!("Waiting for OBS thread to initialize");
216 // Wait for initialization to complete
217 let (mut m, info) = init_rx.recv()??;
218
219 let handle = Arc::new(Mutex::new(Some(handle)));
220 let command_sender = Arc::new(command_sender);
221 let runtime = Self {
222 command_sender: command_sender.clone(),
223 queued_commands,
224 _guard: Arc::new(_ObsRuntimeGuard {
225 handle,
226 command_sender,
227 }),
228 };
229
230 m.0.runtime = Some(runtime.clone());
231 Ok((runtime, m.0, info))
232 }
233
234 /// Executes an operation on the OBS thread without returning a value
235 ///
236 /// This is a convenience wrapper around `run_with_obs_result` for operations
237 /// that don't need to return a value.
238 ///
239 /// # Parameters
240 ///
241 /// * `operation` - A function to execute on the OBS thread
242 ///
243 /// # Returns
244 ///
245 /// A `Result` indicating success or failure
246 ///
247 /// # Examples
248 ///
249 /// ```
250 /// use libobs_wrapper::runtime::ObsRuntime;
251 ///
252 /// async fn example(runtime: &ObsRuntime) {
253 /// runtime.run_with_obs(|| {
254 /// // This code runs on the OBS thread
255 /// println!("Hello from the OBS thread!");
256 /// }).await.unwrap();
257 /// }
258 /// ```
259 pub fn run_with_obs<F>(&self, operation: F) -> anyhow::Result<()>
260 where
261 F: FnOnce() + Send + 'static,
262 {
263 self.run_with_obs_result(move || {
264 operation();
265 Result::<(), anyhow::Error>::Ok(())
266 })??;
267
268 Ok(())
269 }
270
271 /// Executes an operation on the OBS thread and returns a result
272 ///
273 /// This method dispatches a task to the OBS thread and blocks and waits for the result.
274 ///
275 /// # Parameters
276 ///
277 /// * `operation` - A function to execute on the OBS thread
278 ///
279 /// # Returns
280 ///
281 /// A `Result` containing the value returned by the operation
282 ///
283 /// # Examples
284 ///
285 /// ```
286 /// use libobs_wrapper::runtime::ObsRuntime;
287 ///
288 /// async fn example(runtime: &ObsRuntime) {
289 /// let version = runtime.run_with_obs_result(|| {
290 /// // This code runs on the OBS thread
291 /// unsafe { libobs::obs_get_version_string() }
292 /// }).await.unwrap();
293 ///
294 /// println!("OBS Version: {:?}", version);
295 /// }
296 /// ```
297 pub fn run_with_obs_result<F, T>(&self, operation: F) -> anyhow::Result<T>
298 where
299 F: FnOnce() -> T + Send + 'static,
300 T: Send + 'static,
301 {
302 #[cfg(feature = "enable_runtime")]
303 {
304 let (tx, rx) = oneshot::channel();
305
306 // Create a wrapper closure that boxes the result as Any
307 let wrapper = move || -> Box<dyn std::any::Any + Send> {
308 let result = operation();
309 Box::new(result)
310 };
311
312 let val = self.queued_commands.fetch_add(1, Ordering::SeqCst);
313 if val > 50 {
314 log::warn!("More than 50 queued commands. Try to batch them together.");
315 }
316
317 self.command_sender
318 .send(ObsCommand::Execute(Box::new(wrapper), tx))
319 .map_err(|_| anyhow::anyhow!("Failed to send command to OBS thread"))?;
320
321 let result = rx
322 .recv()
323 .map_err(|_| anyhow::anyhow!("OBS thread dropped the response channel"))?;
324
325 // Downcast the Any type back to T
326 let res = result
327 .downcast::<T>()
328 .map(|boxed| *boxed)
329 .map_err(|_| anyhow::anyhow!("Failed to downcast result to the expected type"))?;
330
331 Ok(res)
332 }
333
334 #[cfg(not(feature = "enable_runtime"))]
335 {
336 let result = operation();
337 Ok(result)
338 }
339 }
340
341 /// Initializes the libobs context and prepares it for recording.
342 ///
343 /// This method handles core OBS initialization including:
344 /// - Starting up the OBS core (`obs_startup`)
345 /// - Resetting video and audio subsystems
346 /// - Loading OBS modules
347 ///
348 /// # Parameters
349 ///
350 /// * `info` - The startup configuration for OBS
351 ///
352 /// # Returns
353 ///
354 /// A `Result` containing the updated startup info and loaded modules, or an error
355 fn initialize_inner(
356 mut info: StartupInfo,
357 ) -> Result<(StartupInfo, ObsModules, Option<Arc<PlatformSpecificGuard>>), ObsError> {
358 // Checks that there are no other threads
359 // using libobs using a static Mutex.
360 //
361 // Fun fact: this code caused a huge debate
362 // about whether AtomicBool is UB or whatever
363 // in the Rust Programming Discord server.
364 // I didn't read too closely into it because
365 // they were talking about what architecture
366 // fridges have or something.
367 //
368 // Since this function is not meant to be
369 // high-performance or called a thousand times,
370 // a Mutex is fine here.#
371 let mut mutex_value = OBS_THREAD_ID.lock().map_err(|_e| ObsError::MutexFailure)?;
372
373 // Directly checks if the value of the
374 // Mutex is false. If true, then error.
375 // We've checked already but keeping this
376 if (*mutex_value).is_some() {
377 return Err(ObsError::ThreadFailure);
378 }
379
380 // If the Mutex is None, then change
381 // it to current thread ID so that no
382 // other thread can use libobs while
383 // the current thread is using it.
384 *mutex_value = Some(thread::current().id());
385
386 // Install DLL blocklist hook here
387
388 #[cfg(windows)]
389 unsafe {
390 libobs::obs_init_win32_crash_handler();
391 }
392
393 // Set logger, load debug privileges and crash handler
394 unsafe {
395 libobs::base_set_crash_handler(Some(main_crash_handler), std::ptr::null_mut());
396 }
397
398 let native = platform_specific_setup(info.nix_display.clone())?;
399 unsafe {
400 libobs::base_set_log_handler(Some(extern_log_callback), std::ptr::null_mut());
401 }
402
403 let mut log_callback = LOGGER.lock().map_err(|_e| ObsError::MutexFailure)?;
404
405 *log_callback = info.logger.take().expect("Logger can never be null");
406 drop(log_callback);
407
408 // Locale will only be used internally by
409 // libobs for logging purposes, making it
410 // unnecessary to support other languages.
411 let locale_str = ObsString::new("en-US");
412 let startup_status =
413 unsafe { libobs::obs_startup(locale_str.as_ptr().0, ptr::null(), ptr::null_mut()) };
414
415 let version = unsafe { libobs::obs_get_version_string() };
416 let version_cstr = unsafe { CStr::from_ptr(version) };
417 let version_str = version_cstr.to_string_lossy().into_owned();
418
419 internal_log_global(ObsLogLevel::Info, format!("OBS {}", version_str));
420
421 // Check version compatibility
422 if !ObsContext::check_version_compatibility() {
423 internal_log_global(
424 ObsLogLevel::Warning,
425 "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!".to_string(),
426 );
427 internal_log_global(
428 ObsLogLevel::Warning,
429 format!(
430 "OBS major version mismatch: installed version is {}, but expected major version {}. Expect crashes or bugs!!",
431 version_str,
432 libobs::LIBOBS_API_MAJOR_VER
433 ),
434 );
435 internal_log_global(
436 ObsLogLevel::Warning,
437 "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!".to_string(),
438 );
439 }
440
441 internal_log_global(
442 ObsLogLevel::Info,
443 "---------------------------------".to_string(),
444 );
445
446 if !startup_status {
447 return Err(ObsError::Failure);
448 }
449
450 let mut obs_modules = ObsModules::add_paths(&info.startup_paths);
451
452 // Note that audio is meant to only be reset
453 // once. See the link below for information.
454 //
455 // https://docs.obsproject.com/frontends
456 unsafe {
457 libobs::obs_reset_audio2(info.obs_audio_info.as_ptr().0);
458 }
459
460 // Resets the video context. Note that this
461 // is similar to Self::reset_video, but it
462 // does not call that function because the
463 // ObsContext struct is not created yet,
464 // and also because there is no need to free
465 // anything tied to the OBS context.
466 let reset_video_status = num_traits::FromPrimitive::from_i32(unsafe {
467 libobs::obs_reset_video(info.obs_video_info.as_ptr())
468 });
469
470 let reset_video_status = match reset_video_status {
471 Some(x) => x,
472 None => ObsResetVideoStatus::Failure,
473 };
474
475 if reset_video_status != ObsResetVideoStatus::Success {
476 return Err(ObsError::ResetVideoFailure(reset_video_status));
477 }
478
479 let sdr_info = info.obs_video_info.get_sdr_info();
480 unsafe {
481 libobs::obs_set_video_levels(sdr_info.sdr_white_level, sdr_info.hdr_nominal_peak_level);
482 }
483
484 obs_modules.load_modules();
485
486 internal_log_global(
487 ObsLogLevel::Info,
488 "==== Startup complete ===============================================".to_string(),
489 );
490
491 Ok((info, obs_modules, native))
492 }
493
494 /// Shuts down the OBS context and cleans up resources
495 ///
496 /// This method performs a clean shutdown of OBS, including:
497 /// - Removing sources from output channels
498 /// - Calling `obs_shutdown` to clean up OBS resources
499 /// - Removing log and crash handlers
500 /// - Checking for memory leaks
501 fn shutdown_inner() -> Result<(), ObsError> {
502 // Clean up sources
503 for i in 0..libobs::MAX_CHANNELS {
504 unsafe { libobs::obs_set_output_source(i, ptr::null_mut()) };
505 }
506
507 unsafe { libobs::obs_shutdown() }
508
509 let r = LOGGER.lock();
510 match r {
511 Ok(mut logger) => {
512 logger.log(ObsLogLevel::Info, "OBS context shutdown.".to_string());
513 let allocs = unsafe { libobs::bnum_allocs() };
514
515 // Increasing this to 1 because of whats described below
516 let mut notice = "";
517 let level = if allocs > 1 {
518 ObsLogLevel::Error
519 } else {
520 notice = " (this is an issue in the OBS source code that cannot be fixed)";
521 ObsLogLevel::Info
522 };
523 // One memory leak is expected here because OBS does not free array elements of the obs_data_path when calling obs_add_data_path
524 // even when obs_remove_data_path is called. This is a bug in OBS.
525 logger.log(
526 level,
527 format!("Number of memory leaks: {}{}", allocs, notice),
528 );
529
530 #[cfg(any(feature = "__test_environment", test))]
531 {
532 assert_eq!(allocs, 1, "Memory leaks detected: {}", allocs);
533 }
534 }
535 Err(_) => {
536 println!("OBS context shutdown. (but couldn't lock logger)");
537 }
538 }
539
540 unsafe {
541 // Clean up log and crash handler
542 libobs::base_set_crash_handler(None, std::ptr::null_mut());
543 libobs::base_set_log_handler(None, std::ptr::null_mut());
544 }
545
546 let mut mutex_value = OBS_THREAD_ID.lock().map_err(|_e| ObsError::MutexFailure)?;
547
548 *mutex_value = None;
549 Ok(())
550 }
551}
552
553/// Guard object to ensure proper cleanup when the runtime is dropped
554///
555/// This guard ensures that when the last reference to the runtime is dropped,
556/// the OBS thread is properly terminated and all resources are cleaned up.
557#[derive(Debug)]
558pub struct _ObsRuntimeGuard {
559 /// Thread handle for the OBS thread
560 #[cfg(feature = "enable_runtime")]
561 #[cfg_attr(
562 all(
563 feature = "no_blocking_drops",
564 not(feature = "__test_environment"),
565 not(test)
566 ),
567 allow(dead_code)
568 )]
569 handle: Arc<Mutex<Option<JoinHandle<()>>>>,
570 /// Sender channel for the OBS thread
571 #[cfg(feature = "enable_runtime")]
572 command_sender: Arc<Sender<ObsCommand>>,
573}
574
575#[cfg(feature = "enable_runtime")]
576impl Drop for _ObsRuntimeGuard {
577 /// Ensures the OBS thread is properly shut down when the runtime is dropped
578 fn drop(&mut self) {
579 log::trace!("Dropping ObsRuntime and shutting down OBS thread");
580 // Theoretically the queued_commands is zero and should be increased but because
581 // we are shutting down, we don't care about that.
582 let r = self
583 .command_sender
584 .send(ObsCommand::Terminate)
585 .map_err(|_| anyhow::anyhow!("Failed to send termination command to OBS thread"));
586
587 if thread::panicking() {
588 return;
589 }
590
591 r.unwrap();
592 #[cfg(any(
593 not(feature = "no_blocking_drops"),
594 test,
595 feature = "__test_environment"
596 ))]
597 {
598 if cfg!(feature = "enable_runtime") {
599 // Wait for the thread to finish
600 let handle = self.handle.lock();
601 if handle.is_err() {
602 log::error!("Failed to lock OBS thread handle for shutdown");
603 return;
604 }
605
606 let mut handle = handle.unwrap();
607 let handle = handle.take().expect("Handle can not be empty");
608
609 handle.join().expect("Failed to join OBS thread");
610 }
611 }
612 }
613}
614
615#[cfg(not(feature = "enable_runtime"))]
616impl Drop for _ObsRuntimeGuard {
617 /// Ensures the OBS thread is properly shut down when the runtime is dropped
618 fn drop(&mut self) {
619 log::trace!("Dropping ObsRuntime and shutting down OBS thread");
620 let r = ObsRuntime::shutdown_inner();
621
622 if thread::panicking() {
623 return;
624 }
625
626 r.unwrap();
627 }
628}