gourami

[UNMAINTAINED] Activitypub server in Rust
Log | Files | Refs | README | LICENSE

commit abed74816269aa2dc5155c8e3046c35d1e09f9ec
parent 83b5b6eb7cb72fcf467c6c4ace736ac26c391dfc
Author: alex wennerberg <alex@alexwennerberg.com>
Date:   Sat, 16 May 2020 09:53:42 -0500

Add very basic task queue

Diffstat:
MCargo.toml | 2+-
Msrc/ap.rs | 39+++++++++++++++++----------------------
Msrc/lib.rs | 5+++--
Msrc/routes.rs | 18+++++++++++++-----
4 files changed, 34 insertions(+), 30 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml @@ -28,7 +28,7 @@ ring = "0.16.13" reqwest = {version="0.10.4",features=["json", "blocking"]} serde = { version = "1.0.106", features = ["derive"] } serde_json = "1.0.52" -tokio = { version = "0.2.18", features = ["macros"] } +tokio = { version = "0.2.18", features = ["macros", "time", "sync"] } uuid = { version = "0.8", features = ["v4"]} warp = {git = "https://github.com/seanmonstar/warp.git", rev = "497505a5", features = ["tls"]} zxcvbn = "2.0.1" diff --git a/src/ap.rs b/src/ap.rs @@ -270,7 +270,7 @@ pub async fn process_follow(v: Value) -> Result<(), Error> { "actor": SERVER.global_id, "object": &v, }); - send_ap_message(accept, vec![actor_inbox.to_string()]).await.unwrap(); + send_ap_message(&accept, actor_inbox.to_string()).await.unwrap(); } Ok(()) // generate accept @@ -286,36 +286,31 @@ pub fn get_destinations() -> Vec<String> { } pub async fn send_ap_message( - ap_message: Value, - destinations: Vec<String>, // really vec of URLs + ap_message: &Value, + destination: String, // really vec of URLs ) -> Result<(), Error> { - // Right now we have only once delivery - for destination in destinations { - // bad - let msg = Vec::from(ap_message.to_string().as_bytes()); - let client = reqwest::Client::new(); - let response = client - .post(&destination) - .header("date", Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string()) //HTTP time format - .body(msg) - .header("Content-Type", r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#) - .http_sign_outgoing()? - .send() - .await?; - debug!("{:?}", response.text().await?); - } + debug!("Sending outgoing AP message to {}", destination); + let msg = Vec::from(ap_message.to_string().as_bytes()); + let client = reqwest::Client::new(); + let response = client + .post(&destination) + .header("date", Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string()) //HTTP time format + .body(msg) + .header("Content-Type", r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#) + .http_sign_outgoing()? + .send() + .await?; + debug!("{:?}", response.text().await?); Ok(()) } pub async fn get_remote_actor(actor_id: &str) -> Result<Actor, Error> { + debug!("Fetching remote actor {}", actor_id); let client = reqwest::Client::new(); - println!("{:?}", actor_id); let res = client.get(actor_id) .header("Accept", r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#) .send() .await?; - println!("{:?}", res); let res: Actor = res.json().await?; - println!("{:?}", res); Ok(res) } @@ -328,7 +323,7 @@ pub async fn follow_remote_server(remote_url: &str) -> Result<(), Error> { let inbox_url = &remote_actor.inbox; let actor_id = &remote_actor.id; let msg = generate_server_follow(actor_id, inbox_url)?; - send_ap_message(msg, vec![inbox_url.to_owned()]).await?; + send_ap_message(&msg, inbox_url.to_owned()).await?; Ok(()) } diff --git a/src/lib.rs b/src/lib.rs @@ -202,15 +202,16 @@ struct NewNoteRequest { redirect_url: String, neighborhood: Option<String>, // "on" TODO -- add a custom serialization here } +use tokio::sync::mpsc::UnboundedSender; -async fn handle_new_note_form(u: Option<User>, f: NewNoteRequest) -> Result<impl Reply, Rejection> { +async fn handle_new_note_form(u: Option<User>, f: NewNoteRequest, sender: UnboundedSender<(Value, Vec<String>)>) -> Result<impl Reply, Rejection> { match u { Some(u) => { let n = new_note(&u, &f.note_input, f.neighborhood.is_some()).unwrap(); if n.neighborhood { let nj = ap::new_note_to_ap_message(&n, &u); let destinations = ap::get_destinations(); - ap::send_ap_message(nj, destinations).await.unwrap(); // TODO error handling + sender.send((nj, destinations)).ok(); } let red_url: http::Uri = f.redirect_url.parse().unwrap(); Ok(redirect(red_url)) diff --git a/src/routes.rs b/src/routes.rs @@ -4,6 +4,7 @@ use env_logger; use warp::{header, body, body::form, body::json, filters::cookie, filters::query::query, path, reply}; use serde::de::DeserializeOwned; use warp::reject::{self, Rejection}; +use std::time::Duration; use warp::reply::Response; use http::header::{HeaderName, HeaderValue, CONTENT_TYPE}; @@ -14,17 +15,23 @@ pub async fn run_server() { let private_session_filter = move || session::create_session_filter(false).clone(); // Background worker for sending activitypub messages - let (snd, mut rcv) = tokio::sync::mpsc::unbounded_channel::<String>(); + // TODO -- Improve concurrency. each request is blocking. + let (snd, mut rcv) = tokio::sync::mpsc::unbounded_channel::<(Value, Vec<String>)>(); tokio::spawn(async move { - while let Some(request) = rcv.recv().await { - // Run request + while let Some((msg, destinations)) = rcv.recv().await { + for destination in destinations { + // no retries or anything yet + let res = ap::send_ap_message(&msg, destination).await.ok(); + if res.is_none() { + error!("AP message sending failed"); + } + } + // Run requst } }); let with_sender = warp::any().map(move || snd.clone()); - // we have to pass the full paths for redirect to work without javascript - // let webfinger = warp::path!(".well-known" / "webfinger") .and(query()) // TODO content type @@ -96,6 +103,7 @@ pub async fn run_server() { let create_note = path("create_note") .and(private_session_filter()) .and(form()) + .and(with_sender) .and_then(handle_new_note_form); let delete_note = path("delete_note").and(private_session_filter()).and(form()).map(