koa源码分析

本文koa版本是2.2.0

创建koa服务:

  1. 创建koa的app对象
  2. 为app添加中间件
  3. 监听端口,创建server

下面是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
const Koa = require('koa');
const app = new Koa();
// x-response-time
app.use(async function (ctx, next) {
console.log('x-response-time start')
const start = new Date();
await next();
const ms = new Date() - start;
ctx.set('X-Response-Time', `${ms}ms`);
console.log('x-response-time end')
});
// logger
app.use(async function (ctx, next) {
console.log('logger start')
const start = new Date();
await next();
const ms = new Date() - start;
console.log(`${ctx.method} ${ctx.url} - ${ms}`);
console.log('logger end')
});
// response
app.use(ctx => {
console.log('hello world')
ctx.body = 'Hello World';
});
app.listen(3000);

当请求http://localhost:3000/时,页面返回'Hello World’

中间件执行顺序

命令行里面顺序打印日志:’x-response-time start’ –> ‘logger start’ –> ‘hello world’ –> ‘logger end’ –> ‘x-response-time end’

async异步函数

从请求到响应类似下图

分析代码

app对象结构


创建Koa的app对象,Application继承Emitter对象,代码结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Application extends Emitter {
constructor() {
super();
this.proxy = false;
// 用于存储中间件的数组
this.middleware = [];
this.subdomainOffset = 2;
this.env = process.env.NODE_ENV || 'development';
// 上下文对象
this.context = Object.create(context);
// 请求对象
this.request = Object.create(request);
// 响应对象
this.response = Object.create(response);
}
}

添加中间件

koa的中间件是很重要,使用app.use()添加中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use(fn) {
//判断fn不是函数返回错误
if (typeof fn !== 'function') throw new TypeError('middleware must be a function!');
if (isGeneratorFunction(fn)) {
deprecate('Support for generators will be removed in v3. ' +
'See the documentation for examples of how to convert old middleware ' +
'https://github.com/koajs/koa/blob/master/docs/migration.md');
fn = convert(fn);
}
debug('use %s', fn._name || fn.name || '-');
//把中间件函数push进application的middleware数组内
this.middleware.push(fn);
return this;
}

创建http服务

app.listen()监听端口,listen是createServer()的封装

1
2
3
4
5
listen() {
debug('listen');
const server = http.createServer(this.callback());
return server.listen.apply(server, arguments);
}

接收到请求时的回调函数

当服务接收到http请求时,触发callback函数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
callback() {
// 执行中间件
const fn = compose(this.middleware);
if (!this.listeners('error').length) this.on('error', this.onerror);
const handleRequest = (req, res) => {
res.statusCode = 404;
const ctx = this.createContext(req, res);
const onerror = err => ctx.onerror(err);
const handleResponse = () => respond(ctx);
onFinished(res, onerror);
// fn()
return fn(ctx).then(handleResponse).catch(onerror);
};
return handleRequest;
}

中间件

compose返回一个用于执行中间件的函数,在callback()函数执行fn(ctx),从dispatch(0)开始,执行第一个中间件函数,然后递归执行dispatch(i),执行中间件函数,直到执行完所有中间件函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
function compose (middleware) {
// 参数判断
if (!Array.isArray(middleware)) throw new TypeError('Middleware stack must be an array!')
for (const fn of middleware) {
if (typeof fn !== 'function') throw new TypeError('Middleware must be composed of functions!')
}
return function (context, next) {
// last called middleware #
// 闭包,存储index变量,中间件执行当前坐标
let index = -1
// 从第一个中间件开始执行
return dispatch(0)
function dispatch (i) {
if (i <= index) return Promise.reject(new Error('next() called multiple times'))
index = i
// 取出中间件函数
let fn = middleware[i]
// 最后一个是请求处理
if (i === middleware.length) fn = next
if (!fn) return Promise.resolve()
try {
// 执行中间件函数
return Promise.resolve(fn(context, function next () {
// 执行下一个中间件
return dispatch(i + 1)
}))
} catch (err) {
return Promise.reject(err)
}
}
}
}

context保存请求,响应对象

context上下文用于管理请求,响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
createContext(req, res) {
const context = Object.create(this.context);
const request = context.request = Object.create(this.request);
const response = context.response = Object.create(this.response);
// 通过context可以获取app,request,response对象
context.app = request.app = response.app = this;
context.req = request.req = response.req = req;
context.res = request.res = response.res = res;
request.ctx = response.ctx = context;
request.response = response;
response.request = request;
context.originalUrl = request.originalUrl = req.url;
context.cookies = new Cookies(req, res, {
keys: this.keys,
secure: request.secure
});
request.ip = request.ips[0] || req.socket.remoteAddress || '';
context.accept = request.accept = accepts(req);
context.state = {};
return context;
}

请求和响应委托(Delegator)给context

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// context 对象
const delegate = require('delegates');
const proto = module.exports = {
// 请求委托
delegate(proto, 'response')
.method('set')
.access('body')
.getter('headerSent')
// 响应委托
delegate(proto, 'request')
.method('get')
.access('url')
.getter('origin')
}
// delegate 实现
// proto 被委托的对象
function Delegator(proto, target) {
if (!(this instanceof Delegator)) return new Delegator(proto, target);
this.proto = proto;
this.target = target;
this.methods = [];
this.getters = [];
this.setters = [];
this.fluents = [];
}
// 获取委托对象
Delegator.prototype.access = function(name){
return this.getter(name).setter(name);
};
Delegator.prototype.getter = function(name){
var proto = this.proto;
var target = this.target;
this.getters.push(name);
proto.__defineGetter__(name, function(){
return this[target][name];
});
// 用于链式调用
return this;
};
Delegator.prototype.method = function(name){
var proto = this.proto;
var target = this.target;
this.methods.push(name);
proto[name] = function(){
// 方法委托
return this[target][name].apply(this[target], arguments);
};
return this;
};

response body 处理

分三种情况string,buffer,stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
{
get body() {
return this._body;
},
set body(val) {
const original = this._body;
this._body = val;
if (this.res.headersSent) return;
// no content
if (null == val) {
if (!statuses.empty[this.status]) this.status = 204;
this.remove('Content-Type');
this.remove('Content-Length');
this.remove('Transfer-Encoding');
return;
}
// set the status
if (!this._explicitStatus) this.status = 200;
// set the content-type only if not yet set
const setType = !this.header['content-type'];
// string字符串处理
if ('string' == typeof val) {
if (setType) this.type = /^\s*</.test(val) ? 'html' : 'text';
this.length = Buffer.byteLength(val);
return;
}
// buffer
if (Buffer.isBuffer(val)) {
if (setType) this.type = 'bin';
this.length = val.length;
return;
}
// stream流
if ('function' == typeof val.pipe) {
onFinish(this.res, destroy.bind(null, val));
ensureErrorHandler(val, err => this.ctx.onerror(err));
// overwriting
if (null != original && original != val) this.remove('Content-Length');
if (setType) this.type = 'bin';
return;
}
// json
this.remove('Content-Length');
this.type = 'json';
},
}