gourami

[UNMAINTAINED] Activitypub server in Rust
Log | Files | Refs | README | LICENSE

commit c88cfea3f4b1d4c7eb7eabc5f291655dd7793ce6
parent f34e28d4e654a0d021bcbe6954b6a1e94fc58f3a
Author: alex wennerberg <alex@alexwennerberg.com>
Date:   Thu, 14 May 2020 22:38:21 -0500

Big commit with a lot of ap stuff

Diffstat:
MCargo.lock | 10++++++++++
MCargo.toml | 1+
Mdocs/ADMIN_GUIDE.md | 10+++++++++-
Msrc/ap.rs | 253++++++++++++++++++++++++++++++++++++++++++-------------------------------------
Msrc/db/note.rs | 23++++++++++++++++-------
Msrc/db/schema.rs | 1-
Msrc/db/user.rs | 1-
Asrc/error.rs | 47+++++++++++++++++++++++++++++++++++++++++++++++
Msrc/lib.rs | 32++++++++++++++++++++------------
Msrc/main.rs | 1+
Msrc/routes.rs | 39++++++++++++++++++++++++++-------------
11 files changed, 266 insertions(+), 152 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -659,6 +659,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "uuid", "warp", "zxcvbn", ] @@ -2209,6 +2210,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" [[package]] +name = "uuid" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" +dependencies = [ + "rand 0.7.3", +] + +[[package]] name = "vcpkg" version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/Cargo.toml b/Cargo.toml @@ -29,6 +29,7 @@ reqwest = {version="0.10.4",features=["json", "blocking"]} serde = { version = "1.0.106", features = ["derive"] } serde_json = "1.0.52" tokio = { version = "0.2.18", features = ["macros"] } +uuid = { version = "0.8", features = ["v4"]} warp = {git = "https://github.com/seanmonstar/warp.git", rev = "497505a5", features = ["tls"]} zxcvbn = "2.0.1" diff --git a/docs/ADMIN_GUIDE.md b/docs/ADMIN_GUIDE.md @@ -28,7 +28,15 @@ I would recommend following basic Linux syadmin best practices: disable password ## Gourami's ActivityPub implementation -(maybe this should be somewhere else -- like a dev guide) +Gourami's ActivityPub implementation is somewhat opinionated and a little esoteric. Gourami is not Mastodon or Twitter and is not trying to be. Using it in that way may cause some frustration -- so just be aware that Gourami does things a little differently. I'm considering adding more 'traditional' activitypub functionality. + +The server has a server actor. This is an ActivityPub actor of type "Organization" and is the only ActivityPub actor on the server. All requests go through this actor. This forces you to think of your server as a cohesive whole -- users or other servers can only follow an entire server, not individual users. I encourage you to think about how this would change the way you structure your community. + +Currently, deletes are not supported. Deletes can be misleading in federation, and I think the simplest solution is just not to implement them. + +The only audience supported for ingoing and outgoing messages is [public]. This both simplifies the AP implementation and, in my view, more accurately specifies how ActivityPub works in practice -- once I send my message to a remote server, there isn't really any guarantee as to where it will go. + +Most of these decisions were informed by simplicity ## Federation -- the "neighborhood" diff --git a/src/ap.rs b/src/ap.rs @@ -1,6 +1,6 @@ use crate::error::Error; use crate::db::conn::POOL; -use crate::db::note::{NoteInput, RemoteNoteInput}; +use crate::db::note::{Note, NoteInput, RemoteNoteInput}; use crate::db::user::{NewRemoteUser, User}; use crate::db::server_mutuals::{NewServerMutual, ServerMutual}; use base64; @@ -27,18 +27,20 @@ fn domain_url() -> String { return format!("http://{}", &env::var("GOURAMI_DOMAIN").unwrap()); } -struct ServerApData { - global_id: String, - key_id: String, - inbox: String, - public_key: String +pub struct ServerApData { + pub global_id: String, + pub key_id: String, + pub domain: String, + pub inbox: String, + pub public_key: String } lazy_static! { // TODO -- learn this a little better so it isnt so redundant - static ref SERVER: ServerApData = ServerApData { - global_id: format!("{}/", domain_url()), - key_id: format!("{}/actor#key", domain_url()), + pub static ref SERVER: ServerApData = ServerApData { + global_id: format!("{}", domain_url()), + domain: env::var("GOURAMI_DOMAIN").unwrap(), + key_id: format!("{}#key", domain_url()), inbox: format!("{}/inbox", domain_url()), public_key: std::fs::read_to_string(env::var("SIGNATURE_PUBKEY_PEM").unwrap()).unwrap() }; @@ -46,15 +48,27 @@ lazy_static! { // TODO figure out how to get static working +use uuid::Uuid; + +fn generate_activity_id() -> String { + let my_uuid = Uuid::new_v4(); + format!("{}/activity/{}", domain_url(), my_uuid) +} #[derive(Deserialize, Serialize)] pub struct CreateNote { // Maybe use AP crate + id: String, + note: ApNote, + actor: Actor, } #[derive(Debug, Deserialize, Serialize)] pub struct Actor { - context: Vec<String>, + #[serde(rename = "@context")] + context: Value, id: String, + name: Option<String>, + summary: Option<String>, #[serde(rename = "type")] _type: String, #[serde(rename = "preferredUsername")] @@ -66,6 +80,26 @@ pub struct Actor { #[derive(Debug, Deserialize, Serialize)] pub struct ApNote { + content: String, + #[serde(rename = "attributedTo")] + attributed_to: String, + url: String, + summary: Option<String>, + id: String, + #[serde(rename = "inReplyTo")] + in_reply_to: Option<String> +} + +use regex::Regex; + +impl ApNote { + fn get_remote_user_name(&self) -> Option<String> { + let re = Regex::new(r"^(.+?)(💬)").unwrap(); + match re.captures(&self.content) { + Some(t) => t.get(1).unwrap().as_str().parse().ok(), + None => None, + } + } } #[derive(Debug, Deserialize, Serialize)] @@ -76,10 +110,6 @@ pub struct PublicKey { public_key_pem: String, } -#[derive(Deserialize, Serialize)] -pub struct Note { -} - /// Users don't follow users in Gourami. Instead the server does hte following /// There are a number of reasons for this: /// Gives it a more 'community' feel -- everyone shares the same timeline @@ -94,18 +124,28 @@ fn send_to_outbox(activity: bool) { // activitystreams object fetch/store from db. db objects need to serialize/deserialize this object if get -> fetch from db if post -> put to db, send to inbox of followers send to inbox of followers } -// build something like this -struct ActivityPubMessage { -} - -fn verify_incoming_message() {} -enum Action { - CreateNote, - DoNothing, - // DeleteNote +#[derive(Deserialize)] +pub struct WebFingerQuery { + resource: String } +pub fn webfinger_json(query: WebFingerQuery) -> Value { + // global -- single user + json!({ + "aliases": [ + SERVER.global_id + ], + "links": [ + { + "href": SERVER.global_id, + "rel": "self", + "type": "application/activity+json" + } + ], + "subject": format!("acct:server@{}", SERVER.domain), + }) +} /// get the server user json pub fn server_actor_json() -> Actor { @@ -116,22 +156,20 @@ pub fn server_actor_json() -> Actor { "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1" ], - "id": SERVER.global_id, "type": "Organization", // application? "preferredUsername": domain_url(), // think about it "inbox": SERVER.inbox, + "name": "server", + "summary": "server", "publicKey": { "id": SERVER.key_id, "owner": SERVER.global_id, "publicKeyPem": SERVER.public_key + // TODO -- list server admin contact somewhere. summary or attachment }})).unwrap() } -fn categorize_input_message(v: Value) -> Action { - Action::DoNothing -} - pub fn process_create_note( v: Value, ) -> Result<(), Box<dyn std::error::Error>> { @@ -145,58 +183,34 @@ pub fn process_create_note( let object = v.get("object").ok_or("No AP object found")?; let _type = object.get("type").ok_or("No object type found")?; // match type == note - let content = object - .get("content") - .ok_or("No content found")? - .as_str() - .ok_or("Not a string")?; - // clean content - // let in_reply_to = match object.get("inReplyTo") { - // Some(v) => Some(v.as_str().ok_or("Not a string")?), // TODO -- get reply from database - // None => None - // }; - let remote_creator = object - .get("attributedTo") - .ok_or("No attributedTo found")? - .as_str() - .ok_or("Not a string")?; - let remote_url = object - .get("url") - .ok_or("No url Found")? - .as_str() - .ok_or("Not a string")?; - let remote_id = object - .get("id") - .ok_or("No ID found")? - .as_str() - .ok_or("Not a string")?; + let ap_note: ApNote = serde_json::from_value(object.to_owned())?; use crate::db::schema::notes::dsl as n; use crate::db::schema::users::dsl as u; // if user not in db, insert // + let remote_username = ap_note.get_remote_user_name().unwrap_or(ap_note.attributed_to); // TODO -- prevent usernames iwth colons let new_user = NewRemoteUser { - username: String::from(remote_creator), - remote_url: Some(String::from(remote_creator)), + username: remote_username.clone() }; + let new_user_id: i32 = conn.transaction(|| { insert_into(u::users).values(&new_user).execute(conn).ok(); // TODO only check unique constraint error - // last insert id - let new_user_id: i32 = u::users + u::users .select(u::id) - .filter(u::remote_url.eq(remote_creator)) - .first(conn)?; + .filter(u::username.eq(&remote_username)) + .first(conn) + })?; let new_remote_note = RemoteNoteInput { - content: String::from(content), + content: ap_note.content, in_reply_to: None, // TODO neighborhood: true, is_remote: true, - user_id: new_user_id, // for remote. placeholder. not sure what to do with this ultimately - remote_creator: String::from(remote_creator), - remote_id: String::from(remote_id), - remote_url: String::from(remote_url), + user_id: new_user_id, + remote_id: ap_note.id, + remote_url: ap_note.url, }; println!("{:?}", new_remote_note); insert_into(n::notes) @@ -222,20 +236,21 @@ fn set_mutual_accepted (the_actor_id: &str) -> Result<(), Error>{ } // TODO clean this up -fn set_mutual_followed_back (actor_inbox: &str) -> Result<(), Error> { +fn set_mutual_followed_back (the_actor_id: &str) -> Result<(), Error> { use crate::db::schema::server_mutuals::dsl::*; let conn = &POOL.get()?; diesel::update(server_mutuals) - .filter(inbox_url.eq(actor_inbox)) + .filter(actor_id.eq(the_actor_id)) .set(followed_back.eq(true)) .execute(conn)?; Ok(()) } -fn should_accept(actor_inbox: &str) -> bool { - use crate::db::schema::server_mutuals::dsl::*; +fn should_accept(actor_id: &str) -> bool { + use crate::db::schema::server_mutuals::dsl as s; let conn = &POOL.get().unwrap(); - let sent_req: bool = server_mutuals.select(inbox_url).filter(inbox_url.eq(actor_inbox)).first::<String>(conn).is_ok(); + let sent_req: bool = s::server_mutuals.select(s::actor_id) + .filter(s::actor_id.eq(actor_id)).first::<String>(conn).is_ok(); sent_req } @@ -243,19 +258,18 @@ pub async fn process_follow(v: Value) -> Result<(), Error> { let actor: &str = v.get("actor").unwrap().as_str().unwrap(); let remote_actor: Actor = get_remote_actor(actor).await?; // not strictly necessary can use db instead let actor_inbox = &remote_actor.inbox; - let sent_req = should_accept(actor); - debug!("Should server accept the request? {}", sent_req); + let sent_req = true; // should_accept(actor); if sent_req { set_mutual_followed_back(actor)?; // send accept follow let accept = json!({ "@context": "https://www.w3.org/ns/activitystreams", - "id": "https://my-example.com/my-first-accept", + "id": generate_activity_id(), "type": "Accept", "actor": SERVER.global_id, "object": &v, }); - send_ap_message(&accept, vec![actor_inbox.to_string()]).await.unwrap(); + send_ap_message(accept, vec![actor_inbox.to_string()]).await.unwrap(); } Ok(()) // generate accept @@ -271,19 +285,23 @@ pub fn get_destinations() -> Vec<String> { } pub async fn send_ap_message( - ap_message: &Value, + ap_message: Value, destinations: Vec<String>, // really vec of URLs -) -> Result<(), reqwest::Error> { +) -> Result<(), Error> { // Right now we have only once delivery for destination in destinations { + // bad + let msg = Vec::from(ap_message.to_string().as_bytes()); let client = reqwest::Client::new(); - client + let response = client .post(&destination) - .header("date", Utc::now().to_rfc2822()) - .json(&ap_message) + .header("date", Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string()) //HTTP time format + .body(msg) .header("Content-Type", r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#) + .http_sign_outgoing()? .send() .await?; + debug!("{:?}", response.text().await?); } Ok(()) } @@ -291,7 +309,7 @@ pub async fn get_remote_actor(actor_id: &str) -> Result<Actor, Error> { let client = reqwest::Client::new(); println!("{:?}", actor_id); let res = client.get(actor_id) - .header("Accept", r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams"#) + .header("Accept", r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#) .send() .await?; println!("{:?}", res); @@ -305,7 +323,7 @@ pub async fn follow_remote_server(remote_url: &str) -> Result<(), Error> { let inbox_url = &remote_actor.inbox; let actor_id = &remote_actor.id; let msg = generate_server_follow(actor_id, inbox_url)?; - send_ap_message(&msg, vec![inbox_url.to_owned()]).await?; + send_ap_message(msg, vec![inbox_url.to_owned()]).await?; Ok(()) } @@ -325,26 +343,32 @@ fn generate_server_follow(remote_actor: &str, my_inbox_url: &str) -> Result<Valu } + /// Generate an AP create message from a new note -pub fn new_note_to_ap_message(note: &NoteInput, user: &User) -> Value { +pub fn new_note_to_ap_message(note: &Note, user: &User) -> Value { // we need note, user. note noteinput but note obj // Do a bunch of db queries to get the info I need + // + // prepend the username to the content + // strip it out on receipt + // use a field separator + let content = note.get_content_for_outgoing(&user.username); json!({ "@context": "https://www.w3.org/ns/activitystreams", - "id": "someid", + "id": generate_activity_id(), "type": "Create", "actor": SERVER.global_id, - "published": "now", + "published": note.created_time, // doesnt match "to": [ - "destination.server" - ], + "https://www.w3.org/ns/activitystreams#Public" + ], // todo audience "object": { - "id": "unique id", - "type": "note", - "url": "abc", - "inReplyTo": "none", - "attributedTo": "a remote user", - "content": note.content + "id": note.get_url(), // TODO generate + "type": "Note", + "summary": "", // unused + "url": note.get_url(), + "attributedTo": SERVER.global_id, + "content": content } }) } @@ -353,7 +377,7 @@ pub fn new_note_to_ap_message(note: &NoteInput, user: &User) -> Value { // fn generate_ap(activity: Activity) { // } pub trait HttpSignature { - fn http_sign_outgoing(self) -> Result<reqwest::RequestBuilder, Box<dyn std::error::Error>>; + fn http_sign_outgoing(self) -> Result<reqwest::RequestBuilder, Error>; } // fn read_file(path: &std::path::Path) -> Vec<u8> { @@ -366,23 +390,23 @@ pub trait HttpSignature { // } impl HttpSignature for reqwest::RequestBuilder { - fn http_sign_outgoing(self) -> Result<reqwest::RequestBuilder, Box<dyn std::error::Error>> { + fn http_sign_outgoing(self) -> Result<reqwest::RequestBuilder, Error> { // try and remove clone here let req = self.try_clone().unwrap().build().unwrap(); let config = Config::default() - .set_expiration(Duration::seconds(3600)) + .set_expiration(Duration::seconds(10)) .dont_use_created_field(); let server_key_id = SERVER.key_id.clone(); let mut bt = std::collections::BTreeMap::new(); for (k, v) in req.headers().iter() { - bt.insert(k.as_str().to_owned(), v.to_str()?.to_owned()); + bt.insert(k.as_str().to_owned(), v.to_str().unwrap().to_owned()); } let path_and_query = if let Some(query) = req.url().query() { format!("{}?{}", req.url().path(), query) } else { req.url().path().to_string() }; - let unsigned = config.begin_sign(req.method().as_str(), &path_and_query, bt)?; + let unsigned = config.begin_sign(req.method().as_str(), &path_and_query, bt).unwrap(); println!("{:?}", &unsigned); let sig_header = unsigned .sign(server_key_id,|signing_string| { @@ -402,7 +426,7 @@ impl HttpSignature for reqwest::RequestBuilder { // let digest = digest::digest(&digest::SHA256, &signing_string.as_bytes()); println!("{:?}", &signing_string); let hexencode = base64::encode(&signature); - Ok(hexencode) as Result<_, Box<dyn std::error::Error>> + Ok(hexencode) as Result<_, Error> })? .signature_header(); // this SHOULD be OK @@ -462,29 +486,20 @@ fn read_file(path: &std::path::Path) -> Result<Vec<u8>, MyError> { Ok(contents) } -fn verify_ap_message(method: &str, path_and_query: &str, headers: BTreeMap<String, String>) { +use warp::http; + +pub async fn verify_ap_message(method: &str, path_and_query: &str, headers: BTreeMap<String, String>) -> Result<(), Error> { // TODO -- case insensitivity? // mastodon doesnt use created filed let config = Config::default() .set_expiration(Duration::seconds(3600)) .dont_use_created_field(); + println!("{:?}", headers); let unverified = config - .begin_verify(method, path_and_query, headers) - .unwrap(); + .begin_verify(method, path_and_query, headers)?; + let actor: Actor = get_remote_actor(unverified.key_id()).await?; let res = unverified.verify(|signature, signing_string| { - let res: Value = reqwest::blocking::get(unverified.key_id()) - .unwrap() - .json() - .unwrap(); - let public_key: &[u8] = res - .get("publicKey") - .unwrap() - .get("publicKeyPem") - .unwrap() - .as_str() - .unwrap() - .as_bytes(); - // let public_key = &read_file(Path::new(&env::var("SIGNATURE_PUBKEY").unwrap())).unwrap(); + let public_key: &[u8] = actor.public_key.public_key_pem.as_bytes(); let r = Rsa::public_key_from_pem(public_key).unwrap(); let public_key = r.public_key_to_der_pkcs1().unwrap(); let key = UnparsedPublicKey::new(&ring::signature::RSA_PKCS1_2048_8192_SHA256, &public_key); @@ -493,14 +508,15 @@ fn verify_ap_message(method: &str, path_and_query: &str, headers: BTreeMap<Strin true }); println!("{:?}", unverified); + Ok(()) } #[cfg(test)] mod tests { use super::*; - fn prepare_headers() -> BTreeMap<String, String> { - let mut headers = BTreeMap::new(); + fn prepare_headers() -> HashMap<String, String> { + let mut headers = HashMap::new(); headers.insert( "Content-Type".to_owned(), "application/activity+json".to_owned(), @@ -519,7 +535,7 @@ mod tests { #[test] fn test_verify_ap_message() { - let mut headers = BTreeMap::new(); + let mut headers = HashMap::new(); headers.insert( "Content-Type".to_owned(), "application/activity+json".to_owned(), @@ -541,7 +557,10 @@ mod tests { // for mastodon config -- newer versions of httsig dont use this .header("date", Utc::now().to_rfc2822()) .json(&body) - .header("Accept", r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams"#) + .header( + "Content-Type", + "application/activity+json", + ) .http_sign_outgoing() .unwrap(); } diff --git a/src/db/note.rs b/src/db/note.rs @@ -5,6 +5,8 @@ use maplit::hashset; use regex::Regex; use serde::{Deserialize, Serialize}; +use crate::ap::SERVER; + /// This isn't queryable directly, /// It only works when joined with the users table /// @@ -21,15 +23,23 @@ pub struct Note { pub neighborhood: bool, pub is_remote: bool, pub remote_url: Option<String>, - pub remote_creator: Option<String>, pub remote_id: Option<String>, } -// impl Note { -// fn get_url(&self) -> String { -// format!("{}/note/{}", env::var("GOURAMI_DOMAIN").unwrap(), self.id) -// } -// } +use std::env; + +impl Note { + pub fn get_url(&self) -> String { + // TODO move domain url function + format!("{}/note/{}", SERVER.global_id, self.id) + } + // we make some modifications for outgoing notes + pub fn get_content_for_outgoing(&self, username: &str) -> String { + // remove first reply string + // username not user id + format!("{}:{}💬 {}", SERVER.domain, username, self.content) + } +} /// Content in the DB is stored in plaintext (WILL BE) /// We want to render it so that it is rendered in HTML @@ -70,7 +80,6 @@ pub struct RemoteNoteInput { pub in_reply_to: Option<i32>, pub neighborhood: bool, pub is_remote: bool, - pub remote_creator: String, pub remote_url: String, pub remote_id: String, } diff --git a/src/db/schema.rs b/src/db/schema.rs @@ -8,7 +8,6 @@ table! { neighborhood -> Bool, is_remote -> Bool, remote_url -> Nullable<Varchar>, - remote_creator -> Nullable<Varchar>, remote_id -> Nullable<Varchar>, } } diff --git a/src/db/user.rs b/src/db/user.rs @@ -102,7 +102,6 @@ pub struct NewUser<'a> { #[table_name = "users"] pub struct NewRemoteUser { pub username: String, - pub remote_url: Option<String>, } // impl NewUser { diff --git a/src/error.rs b/src/error.rs @@ -0,0 +1,47 @@ +use r2d2; + +#[derive(Debug)] + +pub enum Error { + ReqwestError(reqwest::Error), + JsonError(serde_json::Error), + DatabaseError(diesel::result::Error), + PoolError(r2d2::Error), + MiscError(String), // Just a temp + HttpSigError(http_signature_normalization::PrepareVerifyError) +} + +impl From<&str> for Error { + fn from(err: &str) -> Error { + Error::MiscError(err.to_owned()) + } +} + +impl From<reqwest::Error> for Error { + fn from(err: reqwest::Error) -> Error { + Error::ReqwestError(err) + } +} + +impl From<r2d2::Error> for Error { + fn from(err: r2d2::Error) -> Error { + Error::PoolError(err) + } +} + +impl From<diesel::result::Error> for Error { + fn from(err: diesel::result::Error) -> Error { + Error::DatabaseError(err) + } +} + +impl From<serde_json::Error> for Error { + fn from(err: serde_json::Error) -> Error { + Error::JsonError(err) + } +} +impl From<http_signature_normalization::PrepareVerifyError> for Error { + fn from(err: http_signature_normalization::PrepareVerifyError) -> Error { + Error::HttpSigError(err) + } +} diff --git a/src/lib.rs b/src/lib.rs @@ -12,6 +12,7 @@ extern crate lazy_static; use serde_json::Value; use std::convert::Infallible; use zxcvbn::zxcvbn; +use std::collections::BTreeMap; use warp::filters::path::FullPath; use warp::http; @@ -209,7 +210,7 @@ async fn handle_new_note_form(u: Option<User>, f: NewNoteRequest) -> Result<impl if n.neighborhood { let nj = ap::new_note_to_ap_message(&n, &u); let destinations = ap::get_destinations(); - ap::send_ap_message(&nj, destinations).await.unwrap(); // TODO error handling + ap::send_ap_message(nj, destinations).await.unwrap(); // TODO error handling } let red_url: http::Uri = f.redirect_url.parse().unwrap(); Ok(redirect(red_url)) @@ -222,7 +223,7 @@ pub fn new_note( auth_user: &User, note_input: &str, neighborhood: bool, -) -> Result<NoteInput, Box<dyn std::error::Error>> { +) -> Result<Note, Box<dyn std::error::Error>> { use db::schema::notes::dsl as notes; use db::schema::notification_viewers::dsl as nv; use db::schema::notifications::dsl as notifs; @@ -240,18 +241,17 @@ pub fn new_note( content: parsed_note_text, neighborhood: neighborhood, }; - // TODO fix potential multithreading issue - insert_into(notes::notes).values(&new_note).execute(conn)?; - let note_id: i32 = notes::notes + let inserted_note: Note = conn.transaction(|| { + insert_into(notes::notes).values(&new_note).execute(conn)?; + notes::notes .order(notes::id.desc()) - .select(notes::id) .first(conn) - .unwrap(); + })?; // notify person u reply to if mentions.len() > 0 { let message = format!( "@{} mentioned you in a note 📝{}", - auth_user.username, note_id + auth_user.username, inserted_note.id ); let new_notification = NewNotification { // reusing the same parser for now. rename maybe @@ -291,7 +291,7 @@ pub fn new_note( // send to outbox // add notification // if request made from web form - Ok(new_note) + Ok(inserted_note) } #[derive(Deserialize)] @@ -703,10 +703,18 @@ pub fn get_outbox() {} // pub fn user_following(user_name: String) {} -pub async fn post_inbox(message: Value) -> Result<impl Reply, Infallible> { +use warp::Buf; + +pub async fn post_inbox(buf: impl Buf, headers: http::header::HeaderMap) -> Result<impl Reply, Infallible> { // TODO check if it is a create note message - // - // ap::verify_ap_message("POST","/index") + let message: Value = serde_json::from_slice(buf.bytes()).unwrap(); // TODO error handling + debug!("received request {:?}", message); + let mut headersbtree: BTreeMap<String, String> = BTreeMap::new(); + // convert to btree + for (k,v) in headers.iter() { + headersbtree.insert(k.as_str().to_owned(), v.to_str().unwrap().to_owned()); + } + ap::verify_ap_message("POST","/inbox", headersbtree).await.unwrap(); // slash or empty string? let msg_type = message.get("type").unwrap().as_str().unwrap(); debug!("Received ActivityPub message of type {}", msg_type); // TODO improve logging match msg_type { diff --git a/src/main.rs b/src/main.rs @@ -6,6 +6,7 @@ use dotenv; #[tokio::main] async fn main() { dotenv::dotenv().ok(); + env_logger::init(); let matches = App::new("Gourami") .version("0.1.0") .author("Alex Wennerberg <alex@alexwennerberg.com>") diff --git a/src/routes.rs b/src/routes.rs @@ -1,9 +1,12 @@ use crate::session; use crate::*; use env_logger; -use warp::{header, body::form, body::json, filters::cookie, filters::query::query, path, reply}; +use warp::{header, body, body::form, body::json, filters::cookie, filters::query::query, path, reply}; +use serde::de::DeserializeOwned; +use warp::reject::{self, Rejection}; +use warp::reply::Response; +use http::header::{HeaderName, HeaderValue, CONTENT_TYPE}; -// I had trouble decoupling routes from server -- couldnt figure out the return type pub async fn run_server() { // NOT TESTED YET let public = false; // std::env::var("PUBLIC").unwrap_or("false"); @@ -13,15 +16,21 @@ pub async fn run_server() { // we have to pass the full paths for redirect to work without javascript // + let webfinger = warp::path!(".well-known" / "webfinger") + .and(query()) + // TODO content type + .map(|q| reply::json(&ap::webfinger_json(q))); + let actor_json = warp::path::end() // In practice, the headers may not follow the spec // https://www.w3.org/TR/activitypub/#retrieving-objects - .and(header::exact_ignore_case("accept", r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#) - .or(header::exact_ignore_case("accept", r#"application/ld+json"#)) - .or(header::exact_ignore_case("accept", r#"profile="https://www.w3.org/ns/activitystreams""#) - ) - ) - .map(|_| reply::json(&ap::server_actor_json()) // how do async work + // .and(header::exact_ignore_case("accept", r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#) + // .or(header::exact_ignore_case("accept", r#"application/ld+json"#)) + // .or(header::exact_ignore_case("accept", r#"profile="https://www.w3.org/ns/activitystreams""#) + // ) + // ) + // TODO content type + .map(|| reply::json(&ap::server_actor_json()) // how do async work ); let home = warp::path::end() @@ -103,8 +112,13 @@ pub async fn run_server() { // force content type to be application/ld+json; profile="https://www.w3.org/ns/activitystreams let post_server_inbox = path!("inbox") - .and(json()) - .and_then(post_inbox); + .and(body::aggregate()) + .and(header::exact_ignore_case("content-type", r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#) + .or(header::exact_ignore_case("content-type", r#"application/ld+json"#)) + .or(header::exact_ignore_case("content-type", r#"profile="https://www.w3.org/ns/activitystreams""#))) + .and(header::headers_cloned()) + .and_then(|buf,_, headers| async move { + post_inbox(buf, headers).await}); let get_server_outbox = path!("outbox").map(get_outbox); @@ -114,7 +128,7 @@ pub async fn run_server() { // TODO secure against xss // used for api based authentication // let api_filter = session::create_session_filter(&POOL.get()); - let static_json = actor_json; // rename html renders + let static_json = actor_json.or(webfinger); // rename html renders let html_renders = home .or(login_page) .or(register_page) @@ -140,13 +154,12 @@ pub async fn run_server() { .and(warp::body::content_length_limit(1024 * 32)) .and(forms)) .or(warp::post() - .and(warp::body::content_length_limit(1024 * 64)) + .and(warp::body::content_length_limit(1024 * 1024)) .and(api_post)) .or(static_files) .with(warp::log("server")) .recover(handle_rejection) .boxed(); - env_logger::init(); match std::env::var("SSL_ENABLED").unwrap().as_str() { "1" => { warp::serve(routes)