Rust 学习指南 - Rust web 编程

3,862 阅读8分钟
原文链接: www.codemore.top
使用HTTP

首先需要确定如何在Rust 的web服务中使用HTTP。意思是我们的应用服务器必须能够解析HTTP请求,返回响应。其他语言例如python,又Flash,Django这样的框架可以直接使用,对于Rust来说可以使用一个相对底层的框架hyper,来处理HTTP请求,hyper是基于tokio和future的,可以方便的创建一个异步的服务器,同时对于日志的支持使用log和env_log crate进行处理。

首先创建项目,并添加依赖。

[package]
name = "microservice_rs"
version = "0.1.0"
authors = ["you <your@email>"]

[dependencies]
env_logger = "0.5.3"
futures = "0.1.17"
hyper = "0.11.13"
log = "0.4.1"

现在编写处理http请求的代码。hyper有一个Service的概念,实现Service trait 有一个call的方法,接受Request对象,处理HTTP请求。由于Hyper是异步的,所以必须返回一个Future,代码如下:

extern crate hyper;
extern crate futures;
#[macro_use]
extern crate log;
extern crate env_logger;

use hyper::server::{Request, Response, Service}
use futures::future::Future;

struct Microservice;
impl Service for Microservice {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
    
    fn call(&self, request: Request) -> Self::Future {
        info!("Microserivce received a request: {:?}", request);
        Box::new(futures::future::ok(Response::new()))
    }
}

注意,在Microservice中同时定义了一些类型,future返回的是Box类型,因为futures::fugure::Future是一个trait,我们无法知道其大小。

下面编写启动server的代码。

fn main() {
    env_logger::init();
    let address = "127.0.0.1:8080".parse().unwrap();
    let server = hyper::server::Http::new()
    	.bind(&address, ||Ok(Microservice{}))
    	.unwrap();
    info!("Running microservice at {}", address);
    server.run().unwrap();
}

运行:

RUST_LOG="microservice=debug" cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
    Running `target/debug/microservice`
INFO 2018-01-21T23:35:05Z: microservice: Running microservice at 127.0.0.1:8080

访问服务:

curl 'localhost:8080'
INFO 2018-01-21T23:35:05Z: microservice: Running microservice at 127.0.0.1:8080
INFO 2018-01-21T23:35:06Z: microservice: Microservice received a request: Request { method: Get, uri: "/", version: Http11, remote_addr: Some(V4(127.0.0.1:61667)), headers: {"Host": "localhost:8080", "User-Agent": "curl/7.54.0", "Accept": "*/*"} }

RUST_LOG="microservice=debug"是env_log的参数,可以控制日志的级别。

处理POST请求

下面来编写处理POST请求的逻辑,路径/接受一个POST请求,请求体包含了usernamemessage两个内容。现在重新编写上面的call方法。

fn call(&self, request: Request) -> Self::Future {
    match (request.method(), request.path()) {
        (&Post, "/") => {
            let future = request
            	.body()
            	.concat2()
            	.and_then(parse_form)
            	.and_then(write_to_db)
            	.then(make_post_response);
            Box::new(future)
        }
        _ => Box::new(futures::future::ok(Response::new().with_status(StatusCode:::NotFound))),
    }
}

我们通过match来处理不同的请求。在这个例子中,请求方法要么是Get,要么是Post。目前唯一有效的路径是/。如果请求方法是&Post并且路径是/,那么就会调用一些函数,parse_form等进行处理。and_then组合器会将各个方法处理完毕,返回的结果传递给下一个函数继续处理,最终到then返回结果。

下面来简单编写上面的例子用到的函数

struct NewMessage {
    username: String,
    message: String,
}
fn parse_form(form_chunk: Chunk) -> FutureResult<NewMessage, hyper::Error> {
    futures::future::ok(NewMessage {
        username: String::new(),
        message: String::new(),
    })
}

fn write_to_db(entry: NewMessage) -> FutureResult<i64, hyper::Error> {
    futures::future::ok(0)
}
fn make_post_response (result: Result<i64, hyper::Error>) -> FutureResult<hyper::Response, hyper::Error> {
    futures::future::ok(Response::new().with_status(StatusCode::NotFound))
}

上面代码运行需要导入:

use hyper::{Chunk, StatusCode};
use hyper::Method::{Get, Post};
use hyper::server::{Request, Response, Service};

use futures::Stream;
use futures::future::{Future, FutureResult};

下面让我们来完善parse_form函数,这个函数接受一个Chunk的类型(即消息体) ,同时解析消息体获取username和message。

use std::collections::HashMap;
use std::io;

fn parse_form(form_chunk: Chunk) -> FutureResult<NewMessage, hyper::Error> {
    let mut form = url::form_urlencoded::parse(form_chunk.as_ref())
    	.into_owned()
    	.collect::<HashMap<String, String>>();
    if let Some(message) = form.remove("message") {
        let username = form.remove("username").unwrap_or(String::from("anonymous"));
        futures::future::ok(NewMessage {
            username: username,
            message: message,
        })
    }else {
        futures::future::err(hyper::Error::from(io::Error::new(
        	io::ErrorKind::InvalidInput,
            "Missing field message",
        )))
    }
}

首先,解析消息体,将其解析道hashmap中,然后通过hashmap获取消息体中的信息。

暂时先不介绍write_to_db 函数,这个函数的作用就是将信息写入到数据库,将会在下一章节进行介绍。

现在来编写 make_post_response函数。

#[macro_use]
extern crate serde_json;

fn make_post_response(result: Result<i64, hyper::Error>) -> FutureResult<hyper::Response, hyper::Error> {
    match result {
        Ok(timestamp) => {
            let payload = json!({"timestamp": timestamp}).to_string();
            let response = Response::new()
            	.with_header(ContentLength(payload.len() as u64))
            	.with_header(ContentType::json())
            	.with_body(payload);
            debug!("{:?}", response);
            futures::future::ok(response)
        }
        Err(error) => make_error_response(error.description()),
    }
}

通过match来检查处理是否成功,如果成功了,就返回时间信息,如果不成功返回错误信息,这里用到了serde_json, 记得在Cargo.toml 中添加。

下面编写make_error_response方法

fn make_error_response(error_message: &str) -> FutureResult<hyper::Response, hyper::Error>{
    let payload = json!({"error": error_message}).to_string();
    let response = Response::new()
    	.with_status(StatusCode::InternalServerError)
    	.with_header(ContentLength(payload.len() as u64))
    	.with_header(ContentType::json())
    	.with_body(payload);
    debug!("{:?}", response);
    futures::future::ok(response)
}
处理GET请求

接下来我们处理GET请求,通过发送GET请求到server获取消息。GET请求接受两个参数,before和after,server返回这个两个时间戳之间的消息。

(&Get, "/") => {
    let time_range = match request.query() {
        Some(query) => parse_query(query),
        None => Ok(TimeRange{
           before: None,
           after: None,
        }),
    };
    let response = match time_range {
        Ok(time_range) => make_get_response(query_db(time_range)),
        Err(error) => make_error_response(&error),
    };
    Box::new(response)
}

通过调用request.query()获取Option<&str> ,通过调用parse_query处理url参数,TimeRange定义如下:

struct TimeRange {
    before: Option<i64>,
    after: Option<i64>,
}

query_db的作用是从数据库中获取消息信息。下面来实现parse_query 函数

fn parse_query(query: &str) -> Result<TimeRange, String> {
    let args = url::form_urlencoded::parse(&query.as_bytes())
    	.into_owned()
    	.collect::<HashMap<String,String>>();
    let before = args.get("before").map(|value| value.parse::<i64>());
    if let Some(ref result) = before {
        if let Err(ref error) = *result {
            return Err(format!("Error parsing 'before: {}", error));
        }
    }
    
    let after = args.get("after").map(|value| value.parse::<i64>());
    if let Some(ref result) = after {
        if let Err(error) = *result {
            return Err(format!("Error parsing 'after': {}", error));
        }
    }
    Ok(TimeRange{
        before: before.map(|b| b.unwrap()),
        after: after.map(|b| b.unwrap()),
    })
}

下面来编写make_get_response方法

fn make_get_response(messages: Option<Vec<Message>>) -> FutureResult<hyper::Response, hyper::Error> {
    let response = match messages {
        Some(messages) => {
            let body = render_page(messages);
            Response::new()
            	.with_header(ContentLength(body.len() as u64))
            	.with_body(body)
        }
        None => Response::new().with_status(StatusCode::InternalServerError),
    };
    debug!("{:?}", response);
    futures::future::ok(response)
}
添加数据库支持

Rust的diesel ORM 是目前Rust最好ORM,所以我们将会直接使用diesel来做数据库操作,我们的数据库选择postgresql。将下面的代码添加到Cargo.toml中

diesel = {version = "1.0.0", features = ["postgres"]}

同时安装diesel_cli

cargo install diesel_cli

首先为我们需要创建数据库表的语句,将其放在schemas/message.sql中

CREATE TABLE messages (
  id SERIAL PRIMARY KEY,
  username VARCHAR(128) NOT NULL,
  message TEXT NOT NULL,
  timestamp BIGINT NOT NULL DEFAULT EXTRACT('epoch' FROM CURRENT_TIMESTAMP)
)

下面我们使用diesel创建schema

export DATABASE_URL=postgres://<user>:<password>@localhost
diesel print-schema | tee src/schema.rs
table! {
    messages (id) {
        id -> Int4,
        username -> Varchar,
        message -> Text,
        timestamp -> Int8,
    }
}

table! 是diesel定义的宏,用来表示数据的字段对应,其保存在schema.rs文件中,同时需要创建src/models.rs 文件

#[derive(Queryable, Serialize, Debug)]
pub struct Message {
    pub id: i32,
    pub username: String,
    pub message: String,
    pub timestamp: i64,
}

model 的struct就是上面代码使用的Message。现在向main中添加一些需要使用的模块

#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate diesel;

mod schema;
mod models;

实现write_to_db 方法

use diesel::prelude::*;
use diesel::pg::PgConnection;

fn write_to_db(new_message: NewMessage, db_connection: &PgConnection) -> FutureResult<i64, hyper::Error> {
    use schema::messages;
    let timestamp = diesel::insert_into(messages::table)
    	.values(&new_message)
    .returning(messages::timestamp)
    .get_result(db_connection);
    match timestamp {
        Ok(timestamp) => futures::future::ok(timestamp),
        Err(error) => {
            error!("Error writing to database: {}", error.description());
            futures::future::err(hyper::Error::from(io::Error::new(io::ErrorKind::Other, "service error"),))
        }
    }
}

Diesel 暴露了非常直观并且类型安全的API。

  • 指定需要插入的表
  • 指定需要插入的数据
  • 指定想要返回的数据
  • 调用get_result 执行sql并获取结果

同样的,对于NewMessage,也需要在src/models.rs中进行定义

use schema::messages;

#[derive(Queryable, Serialize, Debug)]
pub struct Message {
    pub id: i32,
    pub username: String,
    pub message: String,
    pub timestamp: i64,
}

#[derive(Insertable, Debug)]
#[table_name = "messages"]
pub struct NewMessage {
    pub username: String,
    pub message: String,
}

现在我们需要修改一下call方法,其内部需要获取db的链接

use std::env;
const DEFAULT_DATABASE_URL: &'static str = "postgresql://postgres@localhost:5432";

fn connect_to_db() -> Option<PgConnection> {
    let database_url = env::var("DATABASE_URL").unwrap_or(String::from(DEFAULT_DATABASE_URL));
    match PgConnection::establish(&database_url) {
        Ok(connection) => Some(connection),
        Err(error) => {
            error!("Error connection to database {}", error.description());
            None
        }
    }
}
fn call(&self, request: Request) -> Self::Response {
    let db_connection = match connect_to_db() {
        Some(connection) => connection,
        None => {
            return Box::new(futures::future::ok(
            	Response::new().with_status(StatusCode::IntervalServerError),
            ));
        }
    };
}

下面需要修改一下处理Post和Get请求的match

(&Post, "/") => {
    let future = request
    	.body()
    	.concat2()
    	.and_then(parse_form)
    	.and_then(move |new_message| => write_to_db(new_message, &db_connection))
    	.then(make_post_response);
    Box::new(future)
}
(&Get, "/") => {
    (&Post, "/") => {
    let future = request
        .body()
        .concat2()
        .and_then(parse_form)
        .and_then(move |new_message| write_to_db(new_message, &db_connection))
        .then(make_post_response);
    Box::new(future)
}
(&Get, "/") => {
    let time_range = match request.query() {
        Some(query) => parse_query(query),
        None => Ok(TimeRange {
            before: None,
            after: None,
        }),
    };
    let response = match time_range {
        Ok(time_range) => make_get_response(query_db(time_range, &db_connection)),
        Err(error) => make_error_response(&error),
    };
    Box::new(response)
}

实现query_db 方法

fn query_db(time_range: TimeRange, db_connection: &PgConnection) -> Option<Vec<Message>> {
    use schema::messages;
    let TimeRange {before, after} = time_range;
    let query_result = match (before, after) {
        (Some(before), Some(after)) => {
            messages::table
            	.filter(messages::timestamp.lt(before as u64))
            	.filter(messages::timestamp.gt(after as u64))
            	.load::<Message>(db_connection)
        }
        (Some(before), _) => {
            message::table
            	.filter(messages::timestamp.lt(before as u64))
            	.load::<Message>(db_connection)
        }
        (_, Some(after)) => {
            messages::table
                .filter(messages::timestamp.gt(after as i64))
                .load::<Message>(db_connection)
        }
        _ => {
            messages::table.load::<Message>(db_connection)
        };
        match query_result {
            Ok(result) => Some(result),
            Err(error) => {
                error!("Error query Db: {}", error);
                None
        }
    }
}
渲染html

我们使用maud作为html的渲染引擎。在Cargo.toml 中添加

maud = "0.17.2"

然后在main.rs 中声明

#![feature(proc_macro)]
extern crate maud;

下面来实现render_html方法

fn render_html(messages: Vec<Message>) -> String {
    (html!{
        head {
            title "microservice"
            style "body { font-family: monospace }"
        }
        body {
            ul {
                @for message in &messages {
                    li {
                        (message.username) " (" (message.timestamp) "): " (message.message)
                    }
                }
            }
        }
    }).into_string()
}

运行整个工程

DATABASE_URL="postgresql://goldsborough@localhost" RUST_LOG="microservice=debug" cargo run
Compiling microservice v0.1.0 (file:///Users/goldsborough/Documents/Rust/microservice)
 Finished dev [unoptimized + debuginfo] target(s) in 12.30 secs
  Running `target/debug/microservice`
INFO 2018-01-22T01:22:16Z: microservice: Running microservice at 127.0.0.1:8080

访问接口

curl 'localhost:8080'
<head><title>microservice</title><style>body { font-family: monospace }</style></head><body><ul><li>peter (1516584255): hi</li><li>mike (1516584282): hi2</li></ul></body>
使用docker进行打包

docker-compose.yaml 文件如下:

version: '2'
services:
  server:
    build:
      context: .
      dockerfile: docker/Dockerfile
    networks:
      - network
    ports:
       - "8080:80"
    environment:
      DATABASE_URL: postgresql://postgres:secret@db:5432
      RUST_BACKTRACE: 1
      RUST_LOG: microservice=debug
  db:
    build:
      context: .
      dockerfile: docker/Dockerfile-db
    restart: always
    networks:
      - network
    environment:
      POSTGRES_PASSWORD: secret

networks:
  network: