Use new timeline cache structure

This commit is contained in:
Nicolas Werner 2020-07-05 05:29:07 +02:00
parent 79a29953dd
commit c79205c26a
2 changed files with 156 additions and 118 deletions

View File

@ -37,7 +37,7 @@
//! Should be changed when a breaking change occurs in the cache format.
//! This will reset client's data.
static const std::string CURRENT_CACHE_FORMAT_VERSION("2020.05.01");
static const std::string CURRENT_CACHE_FORMAT_VERSION("2020.07.05");
static const std::string SECRET("secret");
static lmdb::val NEXT_BATCH_KEY("next_batch");
@ -46,8 +46,9 @@ static lmdb::val CACHE_FORMAT_VERSION_KEY("cache_format_version");
constexpr size_t MAX_RESTORED_MESSAGES = 30'000;
constexpr auto DB_SIZE = 32ULL * 1024ULL * 1024ULL * 1024ULL; // 32 GB
constexpr auto MAX_DBS = 8092UL;
constexpr auto DB_SIZE = 32ULL * 1024ULL * 1024ULL * 1024ULL; // 32 GB
constexpr auto MAX_DBS = 8092UL;
constexpr auto BATCH_SIZE = 100;
//! Cache databases and their format.
//!
@ -63,7 +64,6 @@ constexpr auto SYNC_STATE_DB("sync_state");
//! Read receipts per room/event.
constexpr auto READ_RECEIPTS_DB("read_receipts");
constexpr auto NOTIFICATIONS_DB("sent_notifications");
//! TODO: delete pending_receipts database on old cache versions
//! Encryption related databases.
@ -93,20 +93,6 @@ namespace {
std::unique_ptr<Cache> instance_ = nullptr;
}
int
numeric_key_comparison(const MDB_val *a, const MDB_val *b)
{
auto lhs = std::stoull(std::string((char *)a->mv_data, a->mv_size));
auto rhs = std::stoull(std::string((char *)b->mv_data, b->mv_size));
if (lhs < rhs)
return 1;
else if (lhs == rhs)
return 0;
return -1;
}
Cache::Cache(const QString &userId, QObject *parent)
: QObject{parent}
, env_{nullptr}
@ -697,6 +683,27 @@ Cache::runMigrations()
return false;
}
nhlog::db()->info("Successfully deleted pending receipts database.");
return true;
}},
{"2020.07.05",
[this]() {
try {
auto txn = lmdb::txn::begin(env_, nullptr);
auto room_ids = getRoomIds(txn);
for (const auto &room_id : room_ids) {
auto messagesDb = lmdb::dbi::open(
txn, std::string(room_id + "/messages").c_str(), MDB_CREATE);
lmdb::dbi_drop(txn, messagesDb, true);
}
txn.commit();
} catch (const lmdb::error &) {
nhlog::db()->critical(
"Failed to delete messages database in migration!");
return false;
}
nhlog::db()->info("Successfully deleted pending receipts database.");
return true;
}},
@ -1232,38 +1239,64 @@ Cache::getTimelineMentions()
return notifs;
}
mtx::responses::Timeline
Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id)
Cache::Messages
Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id, int64_t index, bool forward)
{
// TODO(nico): Limit the messages returned by this maybe?
auto db = getMessagesDb(txn, room_id);
auto orderDb = getEventOrderDb(txn, room_id);
auto eventsDb = getEventsDb(txn, room_id);
mtx::responses::Timeline timeline;
std::string timestamp, msg;
Messages messages{};
auto cursor = lmdb::cursor::open(txn, db);
lmdb::val indexVal, val;
size_t index = 0;
auto cursor = lmdb::cursor::open(txn, orderDb);
if (index == std::numeric_limits<int64_t>::max()) {
if (cursor.get(indexVal, val, forward ? MDB_FIRST : MDB_LAST)) {
index = *indexVal.data<int64_t>();
} else {
messages.end_of_cache = true;
return messages;
}
} else {
if (cursor.get(indexVal, val, MDB_SET)) {
index = *indexVal.data<int64_t>();
} else {
messages.end_of_cache = true;
return messages;
}
}
while (cursor.get(timestamp, msg, MDB_NEXT) && index < MAX_RESTORED_MESSAGES) {
auto obj = json::parse(msg);
int counter = 0;
if (obj.count("event") == 0 || obj.count("token") == 0)
bool ret;
while ((ret = cursor.get(indexVal, val, forward ? MDB_NEXT : MDB_LAST)) &&
counter++ < BATCH_SIZE) {
auto obj = json::parse(std::string(val.data(), val.size()));
if (obj.count("event_id") == 0)
break;
lmdb::val event;
bool success = lmdb::dbi_get(
txn, eventsDb, lmdb::val(obj["event_id"].get<std::string>()), event);
if (!success)
continue;
mtx::events::collections::TimelineEvent event;
mtx::events::collections::from_json(obj.at("event"), event);
mtx::events::collections::TimelineEvent te;
mtx::events::collections::from_json(
json::parse(std::string(event.data(), event.size())), te);
index += 1;
timeline.events.push_back(event.data);
timeline.prev_batch = obj.at("token").get<std::string>();
messages.timeline.events.push_back(std::move(te.data));
// timeline.prev_batch = obj.at("token").get<std::string>();
}
cursor.close();
std::reverse(timeline.events.begin(), timeline.events.end());
// std::reverse(timeline.events.begin(), timeline.events.end());
messages.next_index = *indexVal.data<int64_t>();
messages.end_of_cache = !ret;
return timeline;
return messages;
}
QMap<QString, RoomInfo>
@ -1306,55 +1339,59 @@ Cache::roomInfo(bool withInvites)
std::string
Cache::getLastEventId(lmdb::txn &txn, const std::string &room_id)
{
auto db = getMessagesDb(txn, room_id);
auto orderDb = getEventOrderDb(txn, room_id);
if (db.size(txn) == 0)
lmdb::val indexVal, val;
auto cursor = lmdb::cursor::open(txn, orderDb);
if (!cursor.get(indexVal, val, MDB_LAST)) {
return {};
std::string timestamp, msg;
auto cursor = lmdb::cursor::open(txn, db);
while (cursor.get(timestamp, msg, MDB_NEXT)) {
auto obj = json::parse(msg);
if (obj.count("event") == 0)
continue;
cursor.close();
return obj["event"]["event_id"];
}
cursor.close();
return {};
auto obj = json::parse(std::string(val.data(), val.size()));
if (obj.count("event_id") == 0)
return {};
else
return obj["event_id"];
}
DescInfo
Cache::getLastMessageInfo(lmdb::txn &txn, const std::string &room_id)
{
auto db = getMessagesDb(txn, room_id);
if (db.size(txn) == 0)
auto orderDb = getEventOrderDb(txn, room_id);
auto eventsDb = getEventsDb(txn, room_id);
if (orderDb.size(txn) == 0)
return DescInfo{};
std::string timestamp, msg;
const auto local_user = utils::localUser();
DescInfo fallbackDesc{};
auto cursor = lmdb::cursor::open(txn, db);
while (cursor.get(timestamp, msg, MDB_NEXT)) {
auto obj = json::parse(msg);
lmdb::val indexVal, val;
if (obj.count("event") == 0)
auto cursor = lmdb::cursor::open(txn, orderDb);
cursor.get(indexVal, val, MDB_LAST);
while (cursor.get(indexVal, val, MDB_PREV)) {
auto temp = json::parse(std::string(val.data(), val.size()));
if (temp.count("event_id") == 0)
break;
lmdb::val event;
bool success = lmdb::dbi_get(
txn, eventsDb, lmdb::val(temp["event_id"].get<std::string>()), event);
if (!success)
continue;
if (fallbackDesc.event_id.isEmpty() && obj["event"]["type"] == "m.room.member" &&
obj["event"]["state_key"] == local_user.toStdString() &&
obj["event"]["content"]["membership"] == "join") {
uint64_t ts = obj["event"]["origin_server_ts"];
auto obj = json::parse(std::string(event.data(), event.size()));
if (fallbackDesc.event_id.isEmpty() && obj["type"] == "m.room.member" &&
obj["state_key"] == local_user.toStdString() &&
obj["content"]["membership"] == "join") {
uint64_t ts = obj["origin_server_ts"];
auto time = QDateTime::fromMSecsSinceEpoch(ts);
fallbackDesc = DescInfo{QString::fromStdString(obj["event"]["event_id"]),
fallbackDesc = DescInfo{QString::fromStdString(obj["event_id"]),
local_user,
tr("You joined this room."),
utils::descriptiveTime(time),
@ -1362,17 +1399,16 @@ Cache::getLastMessageInfo(lmdb::txn &txn, const std::string &room_id)
time};
}
if (!(obj["event"]["type"] == "m.room.message" ||
obj["event"]["type"] == "m.sticker" ||
obj["event"]["type"] == "m.room.encrypted"))
if (!(obj["type"] == "m.room.message" || obj["type"] == "m.sticker" ||
obj["type"] == "m.room.encrypted"))
continue;
mtx::events::collections::TimelineEvent event;
mtx::events::collections::from_json(obj.at("event"), event);
mtx::events::collections::TimelineEvent te;
mtx::events::collections::from_json(obj, te);
cursor.close();
return utils::getMessageDescription(
event.data, local_user, QString::fromStdString(room_id));
te.data, local_user, QString::fromStdString(room_id));
}
cursor.close();
@ -1954,7 +1990,6 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
const std::string &room_id,
const mtx::responses::Timeline &res)
{
auto db = getMessagesDb(txn, room_id);
auto eventsDb = getEventsDb(txn, room_id);
auto orderDb = getEventOrderDb(txn, room_id);
@ -1966,7 +2001,7 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
lmdb::val indexVal, val;
int64_t index = 0;
auto cursor = lmdb::cursor::open(txn, orderDb);
auto cursor = lmdb::cursor::open(txn, orderDb);
if (cursor.get(indexVal, val, MDB_LAST)) {
index = *indexVal.data<int64_t>();
}
@ -1979,17 +2014,6 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
lmdb::dbi_put(
txn, eventsDb, lmdb::val(redaction->redacts), lmdb::val(event.dump()));
} else {
json obj = json::object();
obj["event"] = event;
obj["token"] = res.prev_batch;
lmdb::dbi_put(
txn,
db,
lmdb::val(std::to_string(event["origin_server_ts"].get<uint64_t>())),
lmdb::val(obj.dump()));
lmdb::dbi_put(txn,
eventsDb,
lmdb::val(event["event_id"].get<std::string>()),
@ -1997,9 +2021,16 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
++index;
json orderEntry = json::object();
orderEntry["event_id"] = event["event_id"];
if (first)
orderEntry["prev_batch"] = res.prev_batch;
nhlog::db()->debug("saving '{}'", orderEntry.dump());
lmdb::cursor_put(cursor.handle(),
lmdb::val(&index, sizeof(index)),
lmdb::val(first ? res.prev_batch : ""),
lmdb::val(orderEntry.dump()),
MDB_APPEND);
first = false;
}
@ -2138,34 +2169,43 @@ Cache::getRoomIds(lmdb::txn &txn)
void
Cache::deleteOldMessages()
{
lmdb::val indexVal, val;
auto txn = lmdb::txn::begin(env_);
auto room_ids = getRoomIds(txn);
for (const auto &id : room_ids) {
auto msg_db = getMessagesDb(txn, id);
for (const auto &room_id : room_ids) {
auto orderDb = getEventOrderDb(txn, room_id);
auto eventsDb = getEventsDb(txn, room_id);
auto cursor = lmdb::cursor::open(txn, orderDb);
std::string ts, event;
uint64_t idx = 0;
const auto db_size = msg_db.size(txn);
if (db_size <= 3 * MAX_RESTORED_MESSAGES)
int64_t first, last;
if (cursor.get(indexVal, val, MDB_LAST)) {
last = *indexVal.data<int64_t>();
} else {
continue;
}
if (cursor.get(indexVal, val, MDB_FIRST)) {
first = *indexVal.data<int64_t>();
} else {
continue;
nhlog::db()->info("[{}] message count: {}", id, db_size);
auto cursor = lmdb::cursor::open(txn, msg_db);
while (cursor.get(ts, event, MDB_NEXT)) {
idx += 1;
if (idx > MAX_RESTORED_MESSAGES)
lmdb::cursor_del(cursor);
}
size_t message_count = static_cast<size_t>(last - first);
if (message_count < MAX_RESTORED_MESSAGES)
continue;
while (cursor.get(indexVal, val, MDB_NEXT) &&
message_count-- < MAX_RESTORED_MESSAGES) {
auto obj = json::parse(std::string(val.data(), val.size()));
if (obj.count("event_id") != 0)
lmdb::dbi_del(
txn, eventsDb, lmdb::val(obj["event_id"].get<std::string>()));
lmdb::cursor_del(cursor);
}
cursor.close();
nhlog::db()->info("[{}] updated message count: {}", id, msg_db.size(txn));
}
txn.commit();
}

View File

@ -18,6 +18,7 @@
#pragma once
#include <limits>
#include <optional>
#include <QDateTime>
@ -38,9 +39,6 @@
#include "CacheCryptoStructs.h"
#include "CacheStructs.h"
int
numeric_key_comparison(const MDB_val *a, const MDB_val *b);
class Cache : public QObject
{
Q_OBJECT
@ -250,7 +248,16 @@ private:
const std::string &room_id,
const mtx::responses::Timeline &res);
mtx::responses::Timeline getTimelineMessages(lmdb::txn &txn, const std::string &room_id);
struct Messages
{
mtx::responses::Timeline timeline;
uint64_t next_index;
bool end_of_cache = false;
};
Messages getTimelineMessages(lmdb::txn &txn,
const std::string &room_id,
int64_t index = std::numeric_limits<int64_t>::max(),
bool forward = false);
//! Remove a room from the cache.
// void removeLeftRoom(lmdb::txn &txn, const std::string &room_id);
@ -402,15 +409,6 @@ private:
return lmdb::dbi::open(txn, "pending_receipts", MDB_CREATE);
}
lmdb::dbi getMessagesDb(lmdb::txn &txn, const std::string &room_id)
{
auto db =
lmdb::dbi::open(txn, std::string(room_id + "/messages").c_str(), MDB_CREATE);
lmdb::dbi_set_compare(txn, db, numeric_key_comparison);
return db;
}
lmdb::dbi getEventsDb(lmdb::txn &txn, const std::string &room_id)
{
return lmdb::dbi::open(txn, std::string(room_id + "/events").c_str(), MDB_CREATE);