libobs_wrapper/
runtime.rs

1//! Runtime management for safe OBS API access across threads
2//!
3//! This module provides the core thread management functionality for the libobs-wrapper.
4//! It ensures that OBS API calls are always executed on the same thread, as required by
5//! the OBS API, while still allowing application code to interact with OBS from any thread.
6//!
7//! # Thread Safety
8//!
9//! The OBS C API is not thread-safe and requires that all operations occur on the same thread.
10//! The `ObsRuntime` struct creates a dedicated thread for all OBS operations and manages
11//! message passing between application threads and the OBS thread.
12//!
13//! # Blocking APIs
14//!
15//! The runtime locking APIs:
16//! - By default all operations are synchronous
17//!
18//! # Example
19//!
20//! ```no_run
21//! use libobs_wrapper::runtime::ObsRuntime;
22//! use libobs_wrapper::utils::StartupInfo;
23//!
24//! fn example() {
25//!     // Assuming that the OBS context is already initialized
26//!
27//!     // Run an operation on the OBS thread
28//!     let runtime = context.runtime();
29
30//!     runtime.run_with_obs(|| {
31//!         // This code runs on the OBS thread
32//!         println!("Running on OBS thread");
33//!     }).unwrap();
34//! }
35//! ```
36
37use std::ffi::CStr;
38use std::sync::Arc;
39use std::{ptr, thread};
40
41use crate::context::ObsContext;
42use crate::crash_handler::main_crash_handler;
43use crate::enums::{ObsLogLevel, ObsResetVideoStatus};
44use crate::logger::{extern_log_callback, internal_log_global, LOGGER};
45use crate::utils::initialization::{platform_specific_setup, PlatformSpecificGuard};
46use crate::utils::{ObsError, ObsModules, ObsString};
47use crate::{context::OBS_THREAD_ID, utils::StartupInfo};
48
49#[cfg(feature = "enable_runtime")]
50use crate::unsafe_send::Sendable;
51use std::fmt::Debug;
52#[cfg(feature = "enable_runtime")]
53use std::sync::atomic::{AtomicUsize, Ordering};
54#[cfg(feature = "enable_runtime")]
55use std::sync::mpsc::{channel, Sender};
56#[cfg(feature = "enable_runtime")]
57use std::sync::Mutex;
58#[cfg(feature = "enable_runtime")]
59use std::thread::JoinHandle;
60
61/// Command type for operations to perform on the OBS thread
62#[cfg(feature = "enable_runtime")]
63enum ObsCommand {
64    /// Execute a function on the OBS thread and send result back
65    Execute(
66        Box<dyn FnOnce() -> Box<dyn std::any::Any + Send> + Send>,
67        oneshot::Sender<Box<dyn std::any::Any + Send>>,
68    ),
69    /// Signal the OBS thread to terminate
70    Terminate,
71}
72
73/// Core runtime that manages the OBS thread
74///
75/// This struct represents the runtime environment for OBS operations.
76/// It creates and manages a dedicated thread for OBS API calls to
77/// ensure thread safety while allowing interaction from any thread.
78///
79/// # Thread Safety
80///
81/// `ObsRuntime` can be safely cloned and shared across threads. All operations
82/// are automatically dispatched to the dedicated OBS thread.
83///
84/// # Lifecycle Management
85///
86/// When the last `ObsRuntime` instance is dropped, the OBS thread is automatically
87/// shut down and all OBS resources are properly released.
88#[derive(Debug, Clone)]
89pub struct ObsRuntime {
90    #[cfg(feature = "enable_runtime")]
91    command_sender: Arc<Sender<ObsCommand>>,
92    #[cfg(feature = "enable_runtime")]
93    queued_commands: Arc<AtomicUsize>,
94    _guard: Arc<_ObsRuntimeGuard>,
95
96    #[cfg(not(feature = "enable_runtime"))]
97    _platform_specific: Option<Arc<PlatformSpecificGuard>>,
98}
99
100impl ObsRuntime {
101    /// Initializes the OBS runtime.
102    ///
103    /// This function starts up OBS on a dedicated thread and prepares it for use.
104    /// It handles bootstrapping (if configured), OBS initialization, module loading,
105    /// and setup of audio/video subsystems.
106    ///
107    /// # Parameters
108    ///
109    /// * `options` - The startup configuration for OBS
110    ///
111    /// # Returns
112    ///
113    /// A `Result` containing:
114    /// - `(ObsRuntime, ObsModules, StartupInfo)`: The initialized runtime, loaded modules, and startup info.
115    /// - `ObsError`: If initialization fails.
116    ///
117    /// # Examples
118    ///
119    /// ```
120    /// use libobs_wrapper::runtime::{ObsRuntime, ObsRuntimeReturn};
121    /// use libobs_wrapper::utils::StartupInfo;
122    ///
123    /// fn initialize() {
124    ///     let startup_info = StartupInfo::default();
125    ///     match ObsRuntime::startup(startup_info) {
126    ///         Ok((runtime, modules, info)) => {
127    ///             // Use the initialized runtime
128    ///         },
129    ///         Err(e) => {
130    ///             // Handle initialization error
131    ///         }
132    ///     }
133    /// }
134    /// ```
135    #[allow(unused_mut)]
136    pub(crate) fn startup(
137        mut options: StartupInfo,
138    ) -> Result<(ObsRuntime, ObsModules, StartupInfo), ObsError> {
139        // Check if OBS is already running on another thread
140        let obs_id = OBS_THREAD_ID.lock().map_err(|_e| ObsError::MutexFailure)?;
141        if obs_id.is_some() {
142            return Err(ObsError::ThreadFailure);
143        }
144
145        drop(obs_id);
146
147        log::trace!("Initializing OBS context");
148        ObsRuntime::init(options)
149            .map_err(|e| ObsError::Unexpected(format!("Failed to initialize OBS runtime: {:?}", e)))
150    }
151
152    /// Internal initialization method
153    ///
154    /// Creates the OBS thread and performs core initialization.
155    #[cfg(not(feature = "enable_runtime"))]
156    fn init(info: StartupInfo) -> anyhow::Result<(ObsRuntime, ObsModules, StartupInfo)> {
157        let (startup, mut modules, platform_specific) = Self::initialize_inner(info)?;
158
159        let runtime = Self {
160            _guard: Arc::new(_ObsRuntimeGuard {}),
161            _platform_specific: platform_specific,
162        };
163
164        modules.runtime = Some(runtime.clone());
165        Ok((runtime, modules, startup))
166    }
167
168    /// Internal initialization method
169    ///
170    /// Creates the OBS thread and performs core initialization.
171    #[cfg(feature = "enable_runtime")]
172    fn init(info: StartupInfo) -> anyhow::Result<(ObsRuntime, ObsModules, StartupInfo)> {
173        let (command_sender, command_receiver) = channel();
174        let (init_tx, init_rx) = oneshot::channel();
175        let queued_commands = Arc::new(AtomicUsize::new(0));
176
177        let queued_commands_clone = queued_commands.clone();
178        let handle = std::thread::spawn(move || {
179            log::trace!("Starting OBS thread");
180
181            let res = Self::initialize_inner(info);
182
183            match res {
184                Ok((info, modules, _platform_specific)) => {
185                    log::trace!("OBS context initialized successfully");
186                    let e = init_tx.send(Ok((Sendable(modules), info)));
187                    if let Err(err) = e {
188                        log::error!("Failed to send initialization signal: {:?}", err);
189                    }
190
191                    // Process commands until termination
192                    while let Ok(command) = command_receiver.recv() {
193                        match command {
194                            ObsCommand::Execute(func, result_sender) => {
195                                let result = func();
196                                let _ = result_sender.send(result);
197                                queued_commands_clone.fetch_sub(1, Ordering::SeqCst);
198                            }
199                            ObsCommand::Terminate => break,
200                        }
201                    }
202
203                    let r = Self::shutdown_inner();
204                    if let Err(err) = r {
205                        log::error!("Failed to shut down OBS context: {:?}", err);
206                    }
207                }
208                Err(err) => {
209                    log::error!("Failed to initialize OBS context: {:?}", err);
210                    let _ = init_tx.send(Err(err));
211                }
212            }
213        });
214
215        log::trace!("Waiting for OBS thread to initialize");
216        // Wait for initialization to complete
217        let (mut m, info) = init_rx.recv()??;
218
219        let handle = Arc::new(Mutex::new(Some(handle)));
220        let command_sender = Arc::new(command_sender);
221        let runtime = Self {
222            command_sender: command_sender.clone(),
223            queued_commands,
224            _guard: Arc::new(_ObsRuntimeGuard {
225                handle,
226                command_sender,
227            }),
228        };
229
230        m.0.runtime = Some(runtime.clone());
231        Ok((runtime, m.0, info))
232    }
233
234    /// Executes an operation on the OBS thread without returning a value
235    ///
236    /// This is a convenience wrapper around `run_with_obs_result` for operations
237    /// that don't need to return a value.
238    ///
239    /// # Parameters
240    ///
241    /// * `operation` - A function to execute on the OBS thread
242    ///
243    /// # Returns
244    ///
245    /// A `Result` indicating success or failure
246    ///
247    /// # Examples
248    ///
249    /// ```
250    /// use libobs_wrapper::runtime::ObsRuntime;
251    ///
252    /// async fn example(runtime: &ObsRuntime) {
253    ///     runtime.run_with_obs(|| {
254    ///         // This code runs on the OBS thread
255    ///         println!("Hello from the OBS thread!");
256    ///     }).await.unwrap();
257    /// }
258    /// ```
259    pub fn run_with_obs<F>(&self, operation: F) -> anyhow::Result<()>
260    where
261        F: FnOnce() + Send + 'static,
262    {
263        self.run_with_obs_result(move || {
264            operation();
265            Result::<(), anyhow::Error>::Ok(())
266        })??;
267
268        Ok(())
269    }
270
271    /// Executes an operation on the OBS thread and returns a result
272    ///
273    /// This method dispatches a task to the OBS thread and blocks and waits for the result.
274    ///
275    /// # Parameters
276    ///
277    /// * `operation` - A function to execute on the OBS thread
278    ///
279    /// # Returns
280    ///
281    /// A `Result` containing the value returned by the operation
282    ///
283    /// # Examples
284    ///
285    /// ```
286    /// use libobs_wrapper::runtime::ObsRuntime;
287    ///
288    /// async fn example(runtime: &ObsRuntime) {
289    ///     let version = runtime.run_with_obs_result(|| {
290    ///         // This code runs on the OBS thread
291    ///         unsafe { libobs::obs_get_version_string() }
292    ///     }).await.unwrap();
293    ///     
294    ///     println!("OBS Version: {:?}", version);
295    /// }
296    /// ```
297    pub fn run_with_obs_result<F, T>(&self, operation: F) -> anyhow::Result<T>
298    where
299        F: FnOnce() -> T + Send + 'static,
300        T: Send + 'static,
301    {
302        #[cfg(feature = "enable_runtime")]
303        {
304            let (tx, rx) = oneshot::channel();
305
306            // Create a wrapper closure that boxes the result as Any
307            let wrapper = move || -> Box<dyn std::any::Any + Send> {
308                let result = operation();
309                Box::new(result)
310            };
311
312            let val = self.queued_commands.fetch_add(1, Ordering::SeqCst);
313            if val > 50 {
314                log::warn!("More than 50 queued commands. Try to batch them together.");
315            }
316
317            self.command_sender
318                .send(ObsCommand::Execute(Box::new(wrapper), tx))
319                .map_err(|_| anyhow::anyhow!("Failed to send command to OBS thread"))?;
320
321            let result = rx
322                .recv()
323                .map_err(|_| anyhow::anyhow!("OBS thread dropped the response channel"))?;
324
325            // Downcast the Any type back to T
326            let res = result
327                .downcast::<T>()
328                .map(|boxed| *boxed)
329                .map_err(|_| anyhow::anyhow!("Failed to downcast result to the expected type"))?;
330
331            Ok(res)
332        }
333
334        #[cfg(not(feature = "enable_runtime"))]
335        {
336            let result = operation();
337            Ok(result)
338        }
339    }
340
341    /// Initializes the libobs context and prepares it for recording.
342    ///
343    /// This method handles core OBS initialization including:
344    /// - Starting up the OBS core (`obs_startup`)
345    /// - Resetting video and audio subsystems
346    /// - Loading OBS modules
347    ///
348    /// # Parameters
349    ///
350    /// * `info` - The startup configuration for OBS
351    ///
352    /// # Returns
353    ///
354    /// A `Result` containing the updated startup info and loaded modules, or an error
355    fn initialize_inner(
356        mut info: StartupInfo,
357    ) -> Result<(StartupInfo, ObsModules, Option<Arc<PlatformSpecificGuard>>), ObsError> {
358        // Checks that there are no other threads
359        // using libobs using a static Mutex.
360        //
361        // Fun fact: this code caused a huge debate
362        // about whether AtomicBool is UB or whatever
363        // in the Rust Programming Discord server.
364        // I didn't read too closely into it because
365        // they were talking about what architecture
366        // fridges have or something.
367        //
368        // Since this function is not meant to be
369        // high-performance or called a thousand times,
370        // a Mutex is fine here.#
371        let mut mutex_value = OBS_THREAD_ID.lock().map_err(|_e| ObsError::MutexFailure)?;
372
373        // Directly checks if the value of the
374        // Mutex is false. If true, then error.
375        // We've checked already but keeping this
376        if (*mutex_value).is_some() {
377            return Err(ObsError::ThreadFailure);
378        }
379
380        // If the Mutex is None, then change
381        // it to current thread ID so that no
382        // other thread can use libobs while
383        // the current thread is using it.
384        *mutex_value = Some(thread::current().id());
385
386        // Install DLL blocklist hook here
387
388        #[cfg(windows)]
389        unsafe {
390            libobs::obs_init_win32_crash_handler();
391        }
392
393        // Set logger, load debug privileges and crash handler
394        unsafe {
395            libobs::base_set_crash_handler(Some(main_crash_handler), std::ptr::null_mut());
396        }
397
398        let native = platform_specific_setup(info.nix_display.clone())?;
399        unsafe {
400            libobs::base_set_log_handler(Some(extern_log_callback), std::ptr::null_mut());
401        }
402
403        let mut log_callback = LOGGER.lock().map_err(|_e| ObsError::MutexFailure)?;
404
405        *log_callback = info.logger.take().expect("Logger can never be null");
406        drop(log_callback);
407
408        // Locale will only be used internally by
409        // libobs for logging purposes, making it
410        // unnecessary to support other languages.
411        let locale_str = ObsString::new("en-US");
412        let startup_status =
413            unsafe { libobs::obs_startup(locale_str.as_ptr().0, ptr::null(), ptr::null_mut()) };
414
415        let version = unsafe { libobs::obs_get_version_string() };
416        let version_cstr = unsafe { CStr::from_ptr(version) };
417        let version_str = version_cstr.to_string_lossy().into_owned();
418
419        internal_log_global(ObsLogLevel::Info, format!("OBS {}", version_str));
420
421        // Check version compatibility
422        if !ObsContext::check_version_compatibility() {
423            internal_log_global(
424                ObsLogLevel::Warning,
425                "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!".to_string(),
426            );
427            internal_log_global(
428                ObsLogLevel::Warning,
429                format!(
430                    "OBS major version mismatch: installed version is {}, but expected major version {}. Expect crashes or bugs!!",
431                    version_str,
432                    libobs::LIBOBS_API_MAJOR_VER
433                ),
434            );
435            internal_log_global(
436                ObsLogLevel::Warning,
437                "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!".to_string(),
438            );
439        }
440
441        internal_log_global(
442            ObsLogLevel::Info,
443            "---------------------------------".to_string(),
444        );
445
446        if !startup_status {
447            return Err(ObsError::Failure);
448        }
449
450        let mut obs_modules = ObsModules::add_paths(&info.startup_paths);
451
452        // Note that audio is meant to only be reset
453        // once. See the link below for information.
454        //
455        // https://docs.obsproject.com/frontends
456        unsafe {
457            libobs::obs_reset_audio2(info.obs_audio_info.as_ptr().0);
458        }
459
460        // Resets the video context. Note that this
461        // is similar to Self::reset_video, but it
462        // does not call that function because the
463        // ObsContext struct is not created yet,
464        // and also because there is no need to free
465        // anything tied to the OBS context.
466        let reset_video_status = num_traits::FromPrimitive::from_i32(unsafe {
467            libobs::obs_reset_video(info.obs_video_info.as_ptr())
468        });
469
470        let reset_video_status = match reset_video_status {
471            Some(x) => x,
472            None => ObsResetVideoStatus::Failure,
473        };
474
475        if reset_video_status != ObsResetVideoStatus::Success {
476            return Err(ObsError::ResetVideoFailure(reset_video_status));
477        }
478
479        let sdr_info = info.obs_video_info.get_sdr_info();
480        unsafe {
481            libobs::obs_set_video_levels(sdr_info.sdr_white_level, sdr_info.hdr_nominal_peak_level);
482        }
483
484        obs_modules.load_modules();
485
486        internal_log_global(
487            ObsLogLevel::Info,
488            "==== Startup complete ===============================================".to_string(),
489        );
490
491        Ok((info, obs_modules, native))
492    }
493
494    /// Shuts down the OBS context and cleans up resources
495    ///
496    /// This method performs a clean shutdown of OBS, including:
497    /// - Removing sources from output channels
498    /// - Calling `obs_shutdown` to clean up OBS resources
499    /// - Removing log and crash handlers
500    /// - Checking for memory leaks
501    fn shutdown_inner() -> Result<(), ObsError> {
502        // Clean up sources
503        for i in 0..libobs::MAX_CHANNELS {
504            unsafe { libobs::obs_set_output_source(i, ptr::null_mut()) };
505        }
506
507        unsafe { libobs::obs_shutdown() }
508
509        let r = LOGGER.lock();
510        match r {
511            Ok(mut logger) => {
512                logger.log(ObsLogLevel::Info, "OBS context shutdown.".to_string());
513                let allocs = unsafe { libobs::bnum_allocs() };
514
515                // Increasing this to 1 because of whats described below
516                let mut notice = "";
517                let level = if allocs > 1 {
518                    ObsLogLevel::Error
519                } else {
520                    notice = " (this is an issue in the OBS source code that cannot be fixed)";
521                    ObsLogLevel::Info
522                };
523                // One memory leak is expected here because OBS does not free array elements of the obs_data_path when calling obs_add_data_path
524                // even when obs_remove_data_path is called. This is a bug in OBS.
525                logger.log(
526                    level,
527                    format!("Number of memory leaks: {}{}", allocs, notice),
528                );
529
530                #[cfg(any(feature = "__test_environment", test))]
531                {
532                    assert_eq!(allocs, 1, "Memory leaks detected: {}", allocs);
533                }
534            }
535            Err(_) => {
536                println!("OBS context shutdown. (but couldn't lock logger)");
537            }
538        }
539
540        unsafe {
541            // Clean up log and crash handler
542            libobs::base_set_crash_handler(None, std::ptr::null_mut());
543            libobs::base_set_log_handler(None, std::ptr::null_mut());
544        }
545
546        let mut mutex_value = OBS_THREAD_ID.lock().map_err(|_e| ObsError::MutexFailure)?;
547
548        *mutex_value = None;
549        Ok(())
550    }
551}
552
553/// Guard object to ensure proper cleanup when the runtime is dropped
554///
555/// This guard ensures that when the last reference to the runtime is dropped,
556/// the OBS thread is properly terminated and all resources are cleaned up.
557#[derive(Debug)]
558pub struct _ObsRuntimeGuard {
559    /// Thread handle for the OBS thread
560    #[cfg(feature = "enable_runtime")]
561    #[cfg_attr(
562        all(
563            feature = "no_blocking_drops",
564            not(feature = "__test_environment"),
565            not(test)
566        ),
567        allow(dead_code)
568    )]
569    handle: Arc<Mutex<Option<JoinHandle<()>>>>,
570    /// Sender channel for the OBS thread
571    #[cfg(feature = "enable_runtime")]
572    command_sender: Arc<Sender<ObsCommand>>,
573}
574
575#[cfg(feature = "enable_runtime")]
576impl Drop for _ObsRuntimeGuard {
577    /// Ensures the OBS thread is properly shut down when the runtime is dropped
578    fn drop(&mut self) {
579        log::trace!("Dropping ObsRuntime and shutting down OBS thread");
580        // Theoretically the queued_commands is zero and should be increased but because
581        // we are shutting down, we don't care about that.
582        let r = self
583            .command_sender
584            .send(ObsCommand::Terminate)
585            .map_err(|_| anyhow::anyhow!("Failed to send termination command to OBS thread"));
586
587        if thread::panicking() {
588            return;
589        }
590
591        r.unwrap();
592        #[cfg(any(
593            not(feature = "no_blocking_drops"),
594            test,
595            feature = "__test_environment"
596        ))]
597        {
598            if cfg!(feature = "enable_runtime") {
599                // Wait for the thread to finish
600                let handle = self.handle.lock();
601                if handle.is_err() {
602                    log::error!("Failed to lock OBS thread handle for shutdown");
603                    return;
604                }
605
606                let mut handle = handle.unwrap();
607                let handle = handle.take().expect("Handle can not be empty");
608
609                handle.join().expect("Failed to join OBS thread");
610            }
611        }
612    }
613}
614
615#[cfg(not(feature = "enable_runtime"))]
616impl Drop for _ObsRuntimeGuard {
617    /// Ensures the OBS thread is properly shut down when the runtime is dropped
618    fn drop(&mut self) {
619        log::trace!("Dropping ObsRuntime and shutting down OBS thread");
620        let r = ObsRuntime::shutdown_inner();
621
622        if thread::panicking() {
623            return;
624        }
625
626        r.unwrap();
627    }
628}