Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix view cache singleflight #63

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ src/config/config.local*
src/config/sequelize.json
lib/*
.nyc_output
nohup.out
yarn.lock
package-lock.json
108 changes: 81 additions & 27 deletions src/middlewares/view_cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,49 +74,103 @@ function ViewCacheMiddleware(cache, logger) {
ttl,
headersFilter,
namespace,
lockNamespace,
hashStrategy
} = Object.assign({
ttl: 60,
headersFilter: defaultHeadersFilter,
hashStrategy: defaultHashStrategy,
namespace: 'view'
namespace: 'view',
lockNamespace: 'view:lock'
}, options);

const getCache = async (ns, key) => await cache.namespace(ns).get(key) || {
headers: [],
body: null
};

return wrapper(async (req, res, next) => {
const cacheKey = requestToCacheKey(req, hashStrategy);
const {
headers: cachedHeaders = [],
body: cachedBody
} = await cache.namespace(namespace).get(cacheKey)
|| {
headers: [],
body: null
};
if (req.query.flush !== 'true' && cachedBody) {

//加锁避免缓存失效风暴
const lockTTL = ttl - 1;

const sendHit = (headers, body) => {
logger.debug('View cache hit by key %s', cacheKey);
if (cachedHeaders.length > 0) {
cachedHeaders.forEach(([key, value]) => {
if (headers.length > 0) {
headers.forEach(([key, value]) => {
res.setHeader(key, value);
});
}
res.setHeader('X-View-Cache-Hit', cacheKey);
res.send(cachedBody);
res.send(body);
};

//重设send方法
const resetSend = () => {
res.realSend = res.realSend || res.send; //eslint-disable-line no-param-reassign
res.send = (body) => { //eslint-disable-line no-param-reassign
logger.debug('View cache missed by key %s, creating...', cacheKey);
res.setHeader('X-View-Cache-Miss', cacheKey);
res.setHeader('X-View-Cache-Expire-At', moment().add(ttl, 'minute').format('YYYY-MM-DD HH:mm:ss Z'));
res.setHeader('X-View-Cache-Created-At', moment().format('YYYY-MM-DD HH:mm:ss Z'));
res.realSend(body);
const headers = headersFilter && util.isFunction(headersFilter) ?
headersFilter(res) : defaultHeadersFilter(res);
if (res.statusCode <= 500) {
cache.namespace(namespace).set(cacheKey, { headers, body }, ttl).then((ret) => {
if (ret === null) {
logger.error('View cache set failed for return %s', ret);
}
if (lockTTL > 0) {
cache.namespace(lockNamespace).del(cacheKey);
}
}).catch((e) => {
logger.error('View cache set failed for %s', cacheKey, e);
});
} else if (lockTTL > 0) {
cache.namespace(lockNamespace).del(cacheKey);
}
};
};

const spinCache = async () => {
const retry = await getCache(namespace, cacheKey);
if (retry && retry.body) {
return sendHit(retry.headers, retry.body);
}
//这里再次判断锁是否存在, 如果锁已经被释放, 代表缓存可能已经过期,则重新设置
const lockRet = await cache.namespace(lockNamespace).set(cacheKey, 1, lockTTL, 'nx');
if (lockRet) {
resetSend();
return next();
}

//自旋
setTimeout(spinCache, 0);
logger.info('View cache is spinning...');
return true;
};

const {
headers: cachedHeaders,
body: cachedBody
} = await getCache(namespace, cacheKey);
if (req.query.flush !== 'true' && cachedBody) {
sendHit(cachedHeaders, cachedBody);
return;
}
res.realSend = res.send; //eslint-disable-line no-param-reassign
res.send = (body) => { //eslint-disable-line no-param-reassign
logger.debug('View cache missed by key %s, creating...', cacheKey);
res.setHeader('X-View-Cache-Miss', cacheKey);
res.setHeader('X-View-Cache-Expire-At', moment().add(ttl, 'minute').format('YYYY-MM-DD HH:mm:ss Z'));
res.setHeader('X-View-Cache-Created-At', moment().format('YYYY-MM-DD HH:mm:ss Z'));
res.realSend(body);
const headers = headersFilter && util.isFunction(headersFilter) ?
headersFilter(res) : defaultHeadersFilter(res);
if (res.statusCode <= 500) {
cache.namespace(namespace).set(cacheKey, { headers, body }, ttl).catch((e) => {
logger.error('View cache set failed for %s', cacheKey, e);
});

if (lockTTL > 0) {
const lockRet = await cache.namespace(lockNamespace).set(cacheKey, 1, lockTTL, 'nx');
if (lockRet === null) {
//get cache again
await spinCache();
return;
}
};
}

resetSend();
next();
});
};
Expand Down
76 changes: 57 additions & 19 deletions test/middlewares/view_cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,61 @@ test('Request hash', (t) => {
);
});

//FIXME: this assert not work!!
// test('View cache', (t) => {
// t.plan(1);
// const req = mockRequest({
// method: 'GET', url: '/'
// });
// req.route = {};
// const res = mockResponse();
// const middleware = DI.get('view_cache')(60);
// res.on('end', () => {
// cache.namespace('view').has('get/unknown:043fe182887af19ba0be0cb494b75c9c').then((v) => {
// t.true(v);
// });
// });
//
// middleware(req, res, () => {
// res.send('something');
// });
// });
test('View cache', async (t) => {
const req = mockRequest({
method: 'GET', url: '/'
});
req.route = {};
const res = mockResponse();
const middleware = DI.get('view_cache')(60);

await middleware(req, res, () => {
res.send('something');
});

//走缓存, 所以next中断言不执行
await middleware(req, res, () => {
t.true(false);
});

const v = await cache.namespace('view').has('get/unknown/:5bb7eb919a5d177089d48a9ab171fc4e');
t.true(v);

t.is(await cache.namespace('view').flush(), 1);
t.is(await cache.namespace('view:lock').flush(), 0);
});

test('View lock', async (t) => {
const req = mockRequest({
method: 'GET', url: '/'
});
req.route = {};
const res = mockResponse();
const middleware = DI.get('view_cache')(60);

await middleware(req, res, () => {
// 不执行send
});

const locked = await cache.namespace('view:lock').has('get/unknown/:5bb7eb919a5d177089d48a9ab171fc4e');
t.true(locked);

//此时资源被锁定, 无法缓存
await middleware(req, res, (v) => {
res.send('something');
});
let v = await cache.namespace('view').has('get/unknown/:5bb7eb919a5d177089d48a9ab171fc4e');
t.false(v);

t.is(await cache.namespace('view:lock').flush(), 1);

//延迟500ms, 确保当lock 被释放后, 缓存被设置成功
const s = new Promise(resolve => setTimeout(resolve, 500));
await s;

v = await cache.namespace('view').has('get/unknown/:5bb7eb919a5d177089d48a9ab171fc4e');
t.true(v);

t.is(await cache.namespace('view').flush(), 1);

});