neon/event/
task.rs

1use std::{panic::resume_unwind, thread};
2
3use crate::{
4    context::{internal::Env, Context, Cx},
5    handle::Handle,
6    result::{JsResult, NeonResult},
7    sys::{async_work, raw},
8    types::{Deferred, JsPromise, Value},
9};
10
11/// Node asynchronous task builder
12///
13/// ```
14/// # use neon::prelude::*;
15/// fn greet(mut cx: FunctionContext) -> JsResult<JsPromise> {
16///     let name = cx.argument::<JsString>(0)?.value(&mut cx);
17///
18///     let promise = cx
19///         .task(move || format!("Hello, {}!", name))
20///         .promise(move |mut cx, greeting| Ok(cx.string(greeting)));
21///
22///     Ok(promise)
23/// }
24/// ```
25pub struct TaskBuilder<'cx, C, E> {
26    cx: &'cx mut C,
27    execute: E,
28}
29
30impl<'a: 'cx, 'cx, C, O, E> TaskBuilder<'cx, C, E>
31where
32    C: Context<'a>,
33    O: Send + 'static,
34    E: FnOnce() -> O + Send + 'static,
35{
36    /// Construct a new task builder from an `execute` callback that can be
37    /// scheduled to execute on the Node worker pool
38    pub fn new(cx: &'cx mut C, execute: E) -> Self {
39        Self { cx, execute }
40    }
41
42    /// Schedules a task to execute on the Node worker pool, executing the
43    /// `complete` callback on the JavaScript main thread with the result
44    /// of the `execute` callback
45    pub fn and_then<F>(self, complete: F)
46    where
47        F: FnOnce(Cx, O) -> NeonResult<()> + 'static,
48    {
49        let env = self.cx.env();
50        let execute = self.execute;
51
52        schedule(env, execute, complete);
53    }
54
55    /// Schedules a task to execute on the Node worker pool and returns a
56    /// promise that is resolved with the value from the `complete` callback.
57    ///
58    /// The `complete` callback will execute on the JavaScript main thread and
59    /// is passed the return value from `execute`. If the `complete` callback
60    /// throws, the promise will be rejected with the exception
61    pub fn promise<V, F>(self, complete: F) -> Handle<'a, JsPromise>
62    where
63        V: Value,
64        F: FnOnce(Cx, O) -> JsResult<V> + 'static,
65    {
66        let env = self.cx.env();
67        let (deferred, promise) = JsPromise::new(self.cx);
68        let execute = self.execute;
69
70        schedule_promise(env, execute, complete, deferred);
71
72        promise
73    }
74}
75
76// Schedule a task to execute on the Node worker pool
77fn schedule<I, O, D>(env: Env, input: I, data: D)
78where
79    I: FnOnce() -> O + Send + 'static,
80    O: Send + 'static,
81    D: FnOnce(Cx, O) -> NeonResult<()> + 'static,
82{
83    unsafe {
84        async_work::schedule(env.to_raw(), input, execute::<I, O>, complete::<O, D>, data);
85    }
86}
87
88fn execute<I, O>(input: I) -> O
89where
90    I: FnOnce() -> O + Send + 'static,
91    O: Send + 'static,
92{
93    input()
94}
95
96fn complete<O, D>(env: raw::Env, output: thread::Result<O>, callback: D)
97where
98    O: Send + 'static,
99    D: FnOnce(Cx, O) -> NeonResult<()> + 'static,
100{
101    let output = output.unwrap_or_else(|panic| {
102        // If a panic was caught while executing the task on the Node Worker
103        // pool, resume panicking on the main JavaScript thread
104        resume_unwind(panic)
105    });
106
107    Cx::with_context(env.into(), move |cx| {
108        let _ = callback(cx, output);
109    });
110}
111
112// Schedule a task to execute on the Node worker pool and settle a `Promise` with the result
113fn schedule_promise<I, O, D, V>(env: Env, input: I, complete: D, deferred: Deferred)
114where
115    I: FnOnce() -> O + Send + 'static,
116    O: Send + 'static,
117    D: FnOnce(Cx, O) -> JsResult<V> + 'static,
118    V: Value,
119{
120    unsafe {
121        async_work::schedule(
122            env.to_raw(),
123            input,
124            execute::<I, O>,
125            complete_promise::<O, D, V>,
126            (complete, deferred),
127        );
128    }
129}
130
131fn complete_promise<O, D, V>(
132    env: raw::Env,
133    output: thread::Result<O>,
134    (complete, deferred): (D, Deferred),
135) where
136    O: Send + 'static,
137    D: FnOnce(Cx, O) -> JsResult<V> + 'static,
138    V: Value,
139{
140    let env = env.into();
141
142    Cx::with_context(env, move |cx| {
143        deferred.try_catch_settle(cx, move |cx| {
144            let output = output.unwrap_or_else(|panic| resume_unwind(panic));
145
146            complete(cx, output)
147        })
148    });
149}