Reenable sending messages

This commit is contained in:
Nicolas Werner 2020-07-18 17:43:49 +02:00
parent 9ae7d0dce3
commit 56ea89aa11
8 changed files with 340 additions and 347 deletions

View File

@ -2086,6 +2086,77 @@ Cache::isRoomMember(const std::string &user_id, const std::string &room_id)
return res;
}
void
Cache::savePendingMessage(const std::string &room_id,
const mtx::events::collections::TimelineEvent &message)
{
auto txn = lmdb::txn::begin(env_);
mtx::responses::Timeline timeline;
timeline.events.push_back(message.data);
saveTimelineMessages(txn, room_id, timeline);
auto pending = getPendingMessagesDb(txn, room_id);
int64_t now = QDateTime::currentMSecsSinceEpoch();
lmdb::dbi_put(txn,
pending,
lmdb::val(&now, sizeof(now)),
lmdb::val(mtx::accessors::event_id(message.data)));
txn.commit();
}
std::optional<mtx::events::collections::TimelineEvent>
Cache::firstPendingMessage(const std::string &room_id)
{
auto txn = lmdb::txn::begin(env_);
auto pending = getPendingMessagesDb(txn, room_id);
auto pendingCursor = lmdb::cursor::open(txn, pending);
lmdb::val tsIgnored, pendingTxn;
while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) {
auto eventsDb = getEventsDb(txn, room_id);
lmdb::val event;
if (!lmdb::dbi_get(txn, eventsDb, pendingTxn, event)) {
lmdb::dbi_del(txn, pending, tsIgnored, pendingTxn);
continue;
}
try {
mtx::events::collections::TimelineEvent te;
mtx::events::collections::from_json(
json::parse(std::string_view(event.data(), event.size())), te);
txn.commit();
return te;
} catch (std::exception &e) {
nhlog::db()->error("Failed to parse message from cache {}", e.what());
lmdb::dbi_del(txn, pending, tsIgnored, pendingTxn);
continue;
}
}
txn.commit();
return std::nullopt;
}
void
Cache::removePendingStatus(const std::string &room_id, const std::string &txn_id)
{
auto txn = lmdb::txn::begin(env_);
auto pending = getPendingMessagesDb(txn, room_id);
auto pendingCursor = lmdb::cursor::open(txn, pending);
lmdb::val tsIgnored, pendingTxn;
while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) {
if (std::string_view(pendingTxn.data(), pendingTxn.size()) == txn_id)
lmdb::cursor_del(pendingCursor);
}
txn.commit();
}
void
Cache::saveTimelineMessages(lmdb::txn &txn,
const std::string &room_id,
@ -2098,12 +2169,17 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
auto relationsDb = getRelationsDb(txn, room_id);
auto orderDb = getEventOrderDb(txn, room_id);
auto evToOrderDb = getEventToOrderDb(txn, room_id);
auto msg2orderDb = getMessageToOrderDb(txn, room_id);
auto order2msgDb = getOrderToMessageDb(txn, room_id);
auto pending = getPendingMessagesDb(txn, room_id);
if (res.limited) {
lmdb::dbi_drop(txn, orderDb, false);
lmdb::dbi_drop(txn, evToOrderDb, false);
lmdb::dbi_drop(txn, msg2orderDb, false);
lmdb::dbi_drop(txn, order2msgDb, false);
lmdb::dbi_drop(txn, pending, true);
}
using namespace mtx::events;
@ -2124,9 +2200,55 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
bool first = true;
for (const auto &e : res.events) {
auto event = mtx::accessors::serialize_event(e);
if (auto redaction =
std::get_if<mtx::events::RedactionEvent<mtx::events::msg::Redaction>>(&e)) {
auto event = mtx::accessors::serialize_event(e);
auto txn_id = mtx::accessors::transaction_id(e);
lmdb::val txn_order;
if (!txn_id.empty() &&
lmdb::dbi_get(txn, evToOrderDb, lmdb::val(txn_id), txn_order)) {
std::string event_id_val = event["event_id"].get<std::string>();
lmdb::val event_id = event_id_val;
lmdb::dbi_put(txn, eventsDb, event_id, lmdb::val(event.dump()));
lmdb::dbi_del(txn, eventsDb, lmdb::val(txn_id));
lmdb::val msg_txn_order;
if (lmdb::dbi_get(txn, msg2orderDb, lmdb::val(txn_id), msg_txn_order)) {
lmdb::dbi_put(txn, order2msgDb, msg_txn_order, event_id);
lmdb::dbi_put(txn, msg2orderDb, event_id, msg_txn_order);
lmdb::dbi_del(txn, msg2orderDb, lmdb::val(txn_id));
}
lmdb::dbi_put(txn, orderDb, txn_order, event_id);
lmdb::dbi_put(txn, evToOrderDb, event_id, txn_order);
lmdb::dbi_del(txn, evToOrderDb, lmdb::val(txn_id));
if (event.contains("content") &&
event["content"].contains("m.relates_to")) {
auto temp = event["content"]["m.relates_to"];
std::string relates_to = temp.contains("m.in_reply_to")
? temp["m.in_reply_to"]["event_id"]
: temp["event_id"];
if (!relates_to.empty()) {
lmdb::dbi_del(txn,
relationsDb,
lmdb::val(relates_to),
lmdb::val(txn_id));
lmdb::dbi_put(
txn, relationsDb, lmdb::val(relates_to), event_id);
}
}
auto pendingCursor = lmdb::cursor::open(txn, pending);
lmdb::val tsIgnored, pendingTxn;
while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) {
if (std::string_view(pendingTxn.data(), pendingTxn.size()) ==
txn_id)
lmdb::cursor_del(pendingCursor);
}
} else if (auto redaction =
std::get_if<mtx::events::RedactionEvent<mtx::events::msg::Redaction>>(
&e)) {
if (redaction->redacts.empty())
continue;
@ -2145,15 +2267,20 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
txn, msg2orderDb, lmdb::val(redaction->event_id), oldIndex);
}
} else {
std::string event_id_val = event["event_id"].get<std::string>();
lmdb::val event_id = event_id_val;
std::string event_id_val = event.value("event_id", "");
if (event_id_val.empty()) {
nhlog::db()->error("Event without id!");
continue;
}
lmdb::val event_id = event_id_val;
lmdb::dbi_put(txn, eventsDb, event_id, lmdb::val(event.dump()));
++index;
json orderEntry = json::object();
orderEntry["event_id"] = event_id_val;
if (first)
if (first && !res.prev_batch.empty())
orderEntry["prev_batch"] = res.prev_batch;
first = false;
@ -2163,6 +2290,7 @@ Cache::saveTimelineMessages(lmdb::txn &txn,
lmdb::val(&index, sizeof(index)),
lmdb::val(orderEntry.dump()),
MDB_APPEND);
lmdb::dbi_put(txn, evToOrderDb, event_id, lmdb::val(&index, sizeof(index)));
// TODO(Nico): Allow blacklisting more event types in UI
if (event["type"] != "m.reaction" && event["type"] != "m.dummy") {

View File

@ -199,6 +199,11 @@ public:
std::string previousBatchToken(const std::string &room_id);
uint64_t saveOldMessages(const std::string &room_id, const mtx::responses::Messages &res);
void savePendingMessage(const std::string &room_id,
const mtx::events::collections::TimelineEvent &message);
std::optional<mtx::events::collections::TimelineEvent> firstPendingMessage(
const std::string &room_id);
void removePendingStatus(const std::string &room_id, const std::string &txn_id);
//! Remove old unused data.
void deleteOldMessages();
@ -439,6 +444,13 @@ private:
txn, std::string(room_id + "/event_order").c_str(), MDB_CREATE | MDB_INTEGERKEY);
}
// inverse of EventOrderDb
lmdb::dbi getEventToOrderDb(lmdb::txn &txn, const std::string &room_id)
{
return lmdb::dbi::open(
txn, std::string(room_id + "/event2order").c_str(), MDB_CREATE);
}
lmdb::dbi getMessageToOrderDb(lmdb::txn &txn, const std::string &room_id)
{
return lmdb::dbi::open(
@ -451,6 +463,12 @@ private:
txn, std::string(room_id + "/order2msg").c_str(), MDB_CREATE | MDB_INTEGERKEY);
}
lmdb::dbi getPendingMessagesDb(lmdb::txn &txn, const std::string &room_id)
{
return lmdb::dbi::open(
txn, std::string(room_id + "/pending").c_str(), MDB_CREATE | MDB_INTEGERKEY);
}
lmdb::dbi getRelationsDb(lmdb::txn &txn, const std::string &room_id)
{
return lmdb::dbi::open(

View File

@ -313,17 +313,15 @@ ChatPage::ChatPage(QSharedPointer<UserSettings> userSettings, QWidget *parent)
.toStdString();
member.membership = mtx::events::state::Membership::Join;
http::client()
->send_state_event<mtx::events::state::Member,
mtx::events::EventType::RoomMember>(
currentRoom().toStdString(),
http::client()->user_id().to_string(),
member,
[](mtx::responses::EventId, mtx::http::RequestErr err) {
if (err)
nhlog::net()->error("Failed to set room displayname: {}",
err->matrix_error.error);
});
http::client()->send_state_event(
currentRoom().toStdString(),
http::client()->user_id().to_string(),
member,
[](mtx::responses::EventId, mtx::http::RequestErr err) {
if (err)
nhlog::net()->error("Failed to set room displayname: {}",
err->matrix_error.error);
});
});
connect(

View File

@ -151,7 +151,7 @@ EditModal::applyClicked()
state::Name body;
body.name = newName.toStdString();
http::client()->send_state_event<state::Name, EventType::RoomName>(
http::client()->send_state_event(
roomId_.toStdString(),
body,
[proxy, newName](const mtx::responses::EventId &, mtx::http::RequestErr err) {
@ -169,7 +169,7 @@ EditModal::applyClicked()
state::Topic body;
body.topic = newTopic.toStdString();
http::client()->send_state_event<state::Topic, EventType::RoomTopic>(
http::client()->send_state_event(
roomId_.toStdString(),
body,
[proxy](const mtx::responses::EventId &, mtx::http::RequestErr err) {
@ -694,7 +694,7 @@ RoomSettings::updateAccessRules(const std::string &room_id,
startLoadingSpinner();
resetErrorLabel();
http::client()->send_state_event<state::JoinRules, EventType::RoomJoinRules>(
http::client()->send_state_event(
room_id,
join_rule,
[this, room_id, guest_access](const mtx::responses::EventId &,
@ -708,7 +708,7 @@ RoomSettings::updateAccessRules(const std::string &room_id,
return;
}
http::client()->send_state_event<state::GuestAccess, EventType::RoomGuestAccess>(
http::client()->send_state_event(
room_id,
guest_access,
[this](const mtx::responses::EventId &, mtx::http::RequestErr err) {
@ -843,7 +843,7 @@ RoomSettings::updateAvatar()
avatar_event.image_info.size = size;
avatar_event.url = res.content_uri;
http::client()->send_state_event<state::Avatar, EventType::RoomAvatar>(
http::client()->send_state_event(
room_id,
avatar_event,
[content = std::move(content), proxy = std::move(proxy)](

View File

@ -1,6 +1,7 @@
#include "EventStore.h"
#include <QThread>
#include <QTimer>
#include "Cache_p.h"
#include "EventAccessors.h"
@ -59,6 +60,104 @@ EventStore::EventStore(std::string room_id, QObject *)
}
},
Qt::QueuedConnection);
connect(this, &EventStore::processPending, this, [this]() {
if (!current_txn.empty()) {
nhlog::ui()->debug("Already processing {}", current_txn);
return;
}
auto event = cache::client()->firstPendingMessage(room_id_);
if (!event) {
nhlog::ui()->debug("No event to send");
return;
}
std::visit(
[this](auto e) {
auto txn_id = e.event_id;
this->current_txn = txn_id;
if (txn_id.empty() || txn_id[0] != 'm') {
nhlog::ui()->debug("Invalid txn id '{}'", txn_id);
cache::client()->removePendingStatus(room_id_, txn_id);
return;
}
if constexpr (mtx::events::message_content_to_type<decltype(e.content)> !=
mtx::events::EventType::Unsupported)
http::client()->send_room_message(
room_id_,
txn_id,
e.content,
[this, txn_id](const mtx::responses::EventId &,
mtx::http::RequestErr err) {
if (err) {
const int status_code =
static_cast<int>(err->status_code);
nhlog::net()->warn(
"[{}] failed to send message: {} {}",
txn_id,
err->matrix_error.error,
status_code);
emit messageFailed(txn_id);
return;
}
emit messageSent(txn_id);
});
},
event->data);
});
connect(
this,
&EventStore::messageFailed,
this,
[this](std::string txn_id) {
if (current_txn == txn_id) {
current_txn_error_count++;
if (current_txn_error_count > 10) {
nhlog::ui()->debug("failing txn id '{}'", txn_id);
cache::client()->removePendingStatus(room_id_, txn_id);
current_txn_error_count = 0;
}
}
QTimer::singleShot(1000, this, [this]() {
nhlog::ui()->debug("timeout");
this->current_txn = "";
emit processPending();
});
},
Qt::QueuedConnection);
connect(
this,
&EventStore::messageSent,
this,
[this](std::string txn_id) {
nhlog::ui()->debug("sent {}", txn_id);
cache::client()->removePendingStatus(room_id_, txn_id);
this->current_txn = "";
this->current_txn_error_count = 0;
emit processPending();
},
Qt::QueuedConnection);
}
void
EventStore::addPending(mtx::events::collections::TimelineEvents event)
{
if (this->thread() != QThread::currentThread())
nhlog::db()->warn("{} called from a different thread!", __func__);
cache::client()->savePendingMessage(this->room_id_, {event});
mtx::responses::Timeline events;
events.limited = false;
events.events.emplace_back(event);
handleSync(events);
emit processPending();
}
void
@ -102,6 +201,16 @@ EventStore::handleSync(const mtx::responses::Timeline &events)
if (idx)
emit dataChanged(toExternalIdx(*idx), toExternalIdx(*idx));
}
if (auto txn_id = mtx::accessors::transaction_id(event); !txn_id.empty()) {
auto idx = cache::client()->getTimelineIndex(
room_id_, mtx::accessors::event_id(event));
if (idx) {
Index index{room_id_, *idx};
events_.remove(index);
emit dataChanged(toExternalIdx(*idx), toExternalIdx(*idx));
}
}
}
}

View File

@ -90,6 +90,13 @@ signals:
void oldMessagesRetrieved(const mtx::responses::Messages &);
void fetchedMore();
void processPending();
void messageSent(std::string txn_id);
void messageFailed(std::string txn_id);
public slots:
void addPending(mtx::events::collections::TimelineEvents event);
private:
mtx::events::collections::TimelineEvents *decryptEvent(
const IdIndex &idx,
@ -103,4 +110,7 @@ private:
static QCache<IdIndex, mtx::events::collections::TimelineEvents> decryptedEvents_;
static QCache<Index, mtx::events::collections::TimelineEvents> events_;
static QCache<IdIndex, mtx::events::collections::TimelineEvents> events_by_id_;
std::string current_txn;
int current_txn_error_count = 0;
};

View File

@ -145,67 +145,6 @@ TimelineModel::TimelineModel(TimelineViewManager *manager, QString room_id, QObj
, room_id_(room_id)
, manager_(manager)
{
connect(this,
&TimelineModel::oldMessagesRetrieved,
this,
&TimelineModel::addBackwardsEvents,
Qt::QueuedConnection);
connect(
this,
&TimelineModel::messageFailed,
this,
[this](QString txn_id) {
nhlog::ui()->error("Failed to send {}, retrying", txn_id.toStdString());
QTimer::singleShot(5000, this, [this]() { emit nextPendingMessage(); });
},
Qt::QueuedConnection);
connect(
this,
&TimelineModel::messageSent,
this,
[this](QString txn_id, QString event_id) {
pending.removeOne(txn_id);
(void)event_id;
// auto ev = events.value(txn_id);
// if (auto reaction =
// std::get_if<mtx::events::RoomEvent<mtx::events::msg::Reaction>>(&ev)) {
// QString reactedTo =
// QString::fromStdString(reaction->content.relates_to.event_id);
// auto &rModel = reactions[reactedTo];
// rModel.removeReaction(*reaction);
// auto rCopy = *reaction;
// rCopy.event_id = event_id.toStdString();
// rModel.addReaction(room_id_.toStdString(), rCopy);
//}
// int idx = idToIndex(txn_id);
// if (idx < 0) {
// // transaction already received via sync
// return;
//}
// eventOrder[idx] = event_id;
// ev = std::visit(
// [event_id](const auto &e) -> mtx::events::collections::TimelineEvents {
// auto eventCopy = e;
// eventCopy.event_id = event_id.toStdString();
// return eventCopy;
// },
// ev);
// events.remove(txn_id);
// events.insert(event_id, ev);
//// mark our messages as read
// readEvent(event_id.toStdString());
// emit dataChanged(index(idx, 0), index(idx, 0));
if (pending.size() > 0)
emit nextPendingMessage();
},
Qt::QueuedConnection);
connect(
this,
&TimelineModel::redactionFailed,
@ -213,16 +152,12 @@ TimelineModel::TimelineModel(TimelineViewManager *manager, QString room_id, QObj
[](const QString &msg) { emit ChatPage::instance()->showNotification(msg); },
Qt::QueuedConnection);
connect(this,
&TimelineModel::nextPendingMessage,
this,
&TimelineModel::processOnePendingMessage,
Qt::QueuedConnection);
connect(this,
&TimelineModel::newMessageToSend,
this,
&TimelineModel::addPendingMessage,
Qt::QueuedConnection);
connect(this, &TimelineModel::addPendingMessageToStore, &events, &EventStore::addPending);
connect(
&events,
@ -296,7 +231,7 @@ int
TimelineModel::rowCount(const QModelIndex &parent) const
{
Q_UNUSED(parent);
return this->events.size() + static_cast<int>(pending.size());
return this->events.size();
}
QVariantMap
@ -410,7 +345,7 @@ TimelineModel::data(const mtx::events::collections::TimelineEvents &event, int r
// only show read receipts for messages not from us
if (acc::sender(event) != http::client()->user_id().to_string())
return qml_mtx_events::Empty;
else if (pending.contains(id))
else if (!id.isEmpty() && id[0] == "m")
return qml_mtx_events::Sent;
else if (read.contains(id) || containsOthers(cache::readReceipts(id, room_id_)))
return qml_mtx_events::Read;
@ -428,11 +363,7 @@ TimelineModel::data(const mtx::events::collections::TimelineEvents &event, int r
case ReplyTo:
return QVariant(QString::fromStdString(in_reply_to_event(event)));
case Reactions: {
auto id = QString::fromStdString(event_id(event));
if (reactions.count(id))
return QVariant::fromValue((QObject *)&reactions.at(id));
else
return {};
return {};
}
case RoomId:
return QVariant(room_id_);
@ -561,16 +492,9 @@ TimelineModel::fetchMore(const QModelIndex &)
void
TimelineModel::addEvents(const mtx::responses::Timeline &timeline)
{
if (isInitialSync) {
prev_batch_token_ = QString::fromStdString(timeline.prev_batch);
isInitialSync = false;
}
if (timeline.events.empty())
return;
internalAddEvents(timeline.events);
events.handleSync(timeline);
if (!timeline.events.empty())
@ -644,56 +568,6 @@ TimelineModel::updateLastMessage()
}
}
void
TimelineModel::internalAddEvents(
const std::vector<mtx::events::collections::TimelineEvents> &timeline)
{
for (auto e : timeline) {
QString id = QString::fromStdString(mtx::accessors::event_id(e));
if (auto redaction =
std::get_if<mtx::events::RedactionEvent<mtx::events::msg::Redaction>>(&e)) {
QString redacts = QString::fromStdString(redaction->redacts);
auto event = events.event(redaction->redacts, redaction->event_id);
if (!event)
continue;
if (auto reaction =
std::get_if<mtx::events::RoomEvent<mtx::events::msg::Reaction>>(
event)) {
QString reactedTo =
QString::fromStdString(reaction->content.relates_to.event_id);
reactions[reactedTo].removeReaction(*reaction);
int idx = idToIndex(reactedTo);
if (idx >= 0)
emit dataChanged(index(idx, 0), index(idx, 0));
}
continue; // don't insert redaction into timeline
}
if (auto reaction =
std::get_if<mtx::events::RoomEvent<mtx::events::msg::Reaction>>(&e)) {
QString reactedTo =
QString::fromStdString(reaction->content.relates_to.event_id);
// // remove local echo
// if (!txid.isEmpty()) {
// auto rCopy = *reaction;
// rCopy.event_id = txid.toStdString();
// reactions[reactedTo].removeReaction(rCopy);
// }
reactions[reactedTo].addReaction(room_id_.toStdString(), *reaction);
int idx = idToIndex(reactedTo);
if (idx >= 0)
emit dataChanged(index(idx, 0), index(idx, 0));
continue; // don't insert reaction into timeline
}
}
}
void
TimelineModel::setCurrentIndex(int index)
{
@ -701,7 +575,7 @@ TimelineModel::setCurrentIndex(int index)
currentId = indexToId(index);
emit currentIndexChanged(index);
if ((oldIndex > index || oldIndex == -1) && !pending.contains(currentId) &&
if ((oldIndex > index || oldIndex == -1) && !currentId.startsWith("m") &&
ChatPage::instance()->isActiveWindow()) {
readEvent(currentId.toStdString());
}
@ -719,28 +593,6 @@ TimelineModel::readEvent(const std::string &id)
});
}
void
TimelineModel::addBackwardsEvents(const mtx::responses::Messages &msgs)
{
(void)msgs;
// std::vector<QString> ids = internalAddEvents(msgs.chunk);
// if (!ids.empty()) {
// beginInsertRows(QModelIndex(),
// static_cast<int>(this->eventOrder.size()),
// static_cast<int>(this->eventOrder.size() + ids.size() - 1));
// this->eventOrder.insert(this->eventOrder.end(), ids.begin(), ids.end());
// endInsertRows();
//}
// prev_batch_token_ = QString::fromStdString(msgs.end);
// if (ids.empty() && !msgs.chunk.empty()) {
// // no visible events fetched, prevent loading from stopping
// fetchMore(QModelIndex());
//}
}
QString
TimelineModel::displayName(QString id) const
{
@ -902,7 +754,7 @@ TimelineModel::markEventsAsRead(const std::vector<QString> &event_ids)
}
void
TimelineModel::sendEncryptedMessage(const std::string &txn_id, nlohmann::json content)
TimelineModel::sendEncryptedMessage(const std::string txn_id, nlohmann::json content)
{
const auto room_id = room_id_.toStdString();
@ -914,28 +766,15 @@ TimelineModel::sendEncryptedMessage(const std::string &txn_id, nlohmann::json co
try {
// Check if we have already an outbound megolm session then we can use.
if (cache::outboundMegolmSessionExists(room_id)) {
auto data =
mtx::events::EncryptedEvent<mtx::events::msg::Encrypted> event;
event.content =
olm::encrypt_group_message(room_id, http::client()->device_id(), doc);
event.event_id = txn_id;
event.room_id = room_id;
event.sender = http::client()->user_id().to_string();
event.type = mtx::events::EventType::RoomEncrypted;
http::client()->send_room_message<msg::Encrypted, EventType::RoomEncrypted>(
room_id,
txn_id,
data,
[this, txn_id](const mtx::responses::EventId &res,
mtx::http::RequestErr err) {
if (err) {
const int status_code =
static_cast<int>(err->status_code);
nhlog::net()->warn("[{}] failed to send message: {} {}",
txn_id,
err->matrix_error.error,
status_code);
emit messageFailed(QString::fromStdString(txn_id));
}
emit messageSent(
QString::fromStdString(txn_id),
QString::fromStdString(res.event_id.to_string()));
});
emit this->addPendingMessageToStore(event);
return;
}
@ -964,40 +803,24 @@ TimelineModel::sendEncryptedMessage(const std::string &txn_id, nlohmann::json co
const auto members = cache::roomMembers(room_id);
nhlog::ui()->info("retrieved {} members for {}", members.size(), room_id);
auto keeper =
std::make_shared<StateKeeper>([megolm_payload, room_id, doc, txn_id, this]() {
try {
auto data = olm::encrypt_group_message(
room_id, http::client()->device_id(), doc);
auto keeper = std::make_shared<StateKeeper>([room_id, doc, txn_id, this]() {
try {
mtx::events::EncryptedEvent<mtx::events::msg::Encrypted> event;
event.content = olm::encrypt_group_message(
room_id, http::client()->device_id(), doc);
event.event_id = txn_id;
event.room_id = room_id;
event.sender = http::client()->user_id().to_string();
event.type = mtx::events::EventType::RoomEncrypted;
http::client()
->send_room_message<msg::Encrypted, EventType::RoomEncrypted>(
room_id,
txn_id,
data,
[this, txn_id](const mtx::responses::EventId &res,
mtx::http::RequestErr err) {
if (err) {
const int status_code =
static_cast<int>(err->status_code);
nhlog::net()->warn(
"[{}] failed to send message: {} {}",
txn_id,
err->matrix_error.error,
status_code);
emit messageFailed(
QString::fromStdString(txn_id));
}
emit messageSent(
QString::fromStdString(txn_id),
QString::fromStdString(res.event_id.to_string()));
});
} catch (const lmdb::error &e) {
nhlog::db()->critical(
"failed to save megolm outbound session: {}", e.what());
emit messageFailed(QString::fromStdString(txn_id));
}
});
emit this->addPendingMessageToStore(event);
} catch (const lmdb::error &e) {
nhlog::db()->critical("failed to save megolm outbound session: {}",
e.what());
emit ChatPage::instance()->showNotification(
tr("Failed to encrypt event, sending aborted!"));
}
});
mtx::requests::QueryKeys req;
for (const auto &member : members)
@ -1011,8 +834,8 @@ TimelineModel::sendEncryptedMessage(const std::string &txn_id, nlohmann::json co
nhlog::net()->warn("failed to query device keys: {} {}",
err->matrix_error.error,
static_cast<int>(err->status_code));
// TODO: Mark the event as failed. Communicate with the UI.
emit messageFailed(QString::fromStdString(txn_id));
emit ChatPage::instance()->showNotification(
tr("Failed to encrypt event, sending aborted!"));
return;
}
@ -1112,11 +935,13 @@ TimelineModel::sendEncryptedMessage(const std::string &txn_id, nlohmann::json co
} catch (const lmdb::error &e) {
nhlog::db()->critical(
"failed to open outbound megolm session ({}): {}", room_id, e.what());
emit messageFailed(QString::fromStdString(txn_id));
emit ChatPage::instance()->showNotification(
tr("Failed to encrypt event, sending aborted!"));
} catch (const mtx::crypto::olm_exception &e) {
nhlog::crypto()->critical(
"failed to open outbound megolm session ({}): {}", room_id, e.what());
emit messageFailed(QString::fromStdString(txn_id));
emit ChatPage::instance()->showNotification(
tr("Failed to encrypt event, sending aborted!"));
}
}
@ -1208,9 +1033,8 @@ TimelineModel::handleClaimedKeys(std::shared_ptr<StateKeeper> keeper,
struct SendMessageVisitor
{
SendMessageVisitor(const QString &txn_id, TimelineModel *model)
: txn_id_qstr_(txn_id)
, model_(model)
explicit SendMessageVisitor(TimelineModel *model)
: model_(model)
{}
// Do-nothing operator for all unhandled events
@ -1228,29 +1052,9 @@ struct SendMessageVisitor
if (encInfo)
emit model_->newEncryptedImage(encInfo.value());
model_->sendEncryptedMessage(txn_id_qstr_.toStdString(),
nlohmann::json(msg.content));
model_->sendEncryptedMessage(msg.event_id, nlohmann::json(msg.content));
} else {
QString txn_id_qstr = txn_id_qstr_;
TimelineModel *model = model_;
http::client()->send_room_message<T, mtx::events::EventType::RoomMessage>(
model->room_id_.toStdString(),
txn_id_qstr.toStdString(),
msg.content,
[txn_id_qstr, model](const mtx::responses::EventId &res,
mtx::http::RequestErr err) {
if (err) {
const int status_code =
static_cast<int>(err->status_code);
nhlog::net()->warn("[{}] failed to send message: {} {}",
txn_id_qstr.toStdString(),
err->matrix_error.error,
status_code);
emit model->messageFailed(txn_id_qstr);
}
emit model->messageSent(
txn_id_qstr, QString::fromStdString(res.event_id.to_string()));
});
emit model_->addPendingMessageToStore(msg);
}
}
@ -1260,71 +1064,26 @@ struct SendMessageVisitor
// cannot handle it correctly. See the MSC for more details:
// https://github.com/matrix-org/matrix-doc/blob/matthew/msc1849/proposals/1849-aggregations.md#end-to-end-encryption
void operator()(const mtx::events::RoomEvent<mtx::events::msg::Reaction> &msg)
{
QString txn_id_qstr = txn_id_qstr_;
TimelineModel *model = model_;
http::client()
->send_room_message<mtx::events::msg::Reaction, mtx::events::EventType::Reaction>(
model->room_id_.toStdString(),
txn_id_qstr.toStdString(),
msg.content,
[txn_id_qstr, model](const mtx::responses::EventId &res,
mtx::http::RequestErr err) {
if (err) {
const int status_code = static_cast<int>(err->status_code);
nhlog::net()->warn("[{}] failed to send message: {} {}",
txn_id_qstr.toStdString(),
err->matrix_error.error,
status_code);
emit model->messageFailed(txn_id_qstr);
}
emit model->messageSent(
txn_id_qstr, QString::fromStdString(res.event_id.to_string()));
});
emit model_->addPendingMessageToStore(msg);
}
QString txn_id_qstr_;
TimelineModel *model_;
};
void
TimelineModel::processOnePendingMessage()
{
// if (pending.isEmpty())
// return;
// QString txn_id_qstr = pending.first();
// auto event = events.value(txn_id_qstr);
// std::visit(SendMessageVisitor{txn_id_qstr, this}, event);
}
void
TimelineModel::addPendingMessage(mtx::events::collections::TimelineEvents event)
{
(void)event;
// std::visit(
// [](auto &msg) {
// msg.type = mtx::events::EventType::RoomMessage;
// msg.event_id = http::client()->generate_txn_id();
// msg.sender = http::client()->user_id().to_string();
// msg.origin_server_ts = QDateTime::currentMSecsSinceEpoch();
// },
// event);
std::visit(
[](auto &msg) {
msg.type = mtx::events::EventType::RoomMessage;
msg.event_id = "m" + http::client()->generate_txn_id();
msg.sender = http::client()->user_id().to_string();
msg.origin_server_ts = QDateTime::currentMSecsSinceEpoch();
},
event);
// internalAddEvents({event});
// QString txn_id_qstr = QString::fromStdString(mtx::accessors::event_id(event));
// pending.push_back(txn_id_qstr);
// if (!std::get_if<mtx::events::RoomEvent<mtx::events::msg::Reaction>>(&event)) {
// beginInsertRows(QModelIndex(), 0, 0);
// this->eventOrder.insert(this->eventOrder.begin(), txn_id_qstr);
// endInsertRows();
//}
// updateLastMessage();
// emit nextPendingMessage();
std::visit(SendMessageVisitor{this}, event);
}
bool
@ -1647,24 +1406,7 @@ TimelineModel::formatMemberEvent(QString id)
if (!event->unsigned_data.replaces_state.empty()) {
auto tempPrevEvent =
events.event(event->unsigned_data.replaces_state, event->event_id);
if (!tempPrevEvent) {
http::client()->get_event(
this->room_id_.toStdString(),
event->unsigned_data.replaces_state,
[this, id, prevEventId = event->unsigned_data.replaces_state](
const mtx::events::collections::TimelineEvents &timeline,
mtx::http::RequestErr err) {
if (err) {
nhlog::net()->error(
"Failed to retrieve event with id {}, which was "
"requested to show the membership for event {}",
prevEventId,
id.toStdString());
return;
}
emit eventFetched(id, timeline);
});
} else {
if (tempPrevEvent) {
prevEvent =
std::get_if<mtx::events::StateEvent<mtx::events::state::Member>>(
tempPrevEvent);

View File

@ -236,31 +236,23 @@ public slots:
void setDecryptDescription(bool decrypt) { decryptDescription = decrypt; }
private slots:
// Add old events at the top of the timeline.
void addBackwardsEvents(const mtx::responses::Messages &msgs);
void processOnePendingMessage();
void addPendingMessage(mtx::events::collections::TimelineEvents event);
signals:
void oldMessagesRetrieved(const mtx::responses::Messages &res);
void messageFailed(QString txn_id);
void messageSent(QString txn_id, QString event_id);
void currentIndexChanged(int index);
void redactionFailed(QString id);
void eventRedacted(QString id);
void nextPendingMessage();
void newMessageToSend(mtx::events::collections::TimelineEvents event);
void mediaCached(QString mxcUrl, QString cacheUrl);
void newEncryptedImage(mtx::crypto::EncryptedFile encryptionInfo);
void eventFetched(QString requestingEvent, mtx::events::collections::TimelineEvents event);
void typingUsersChanged(std::vector<QString> users);
void replyChanged(QString reply);
void paginationInProgressChanged(const bool);
void newMessageToSend(mtx::events::collections::TimelineEvents event);
void addPendingMessageToStore(mtx::events::collections::TimelineEvents event);
private:
void internalAddEvents(
const std::vector<mtx::events::collections::TimelineEvents> &timeline);
void sendEncryptedMessage(const std::string &txn_id, nlohmann::json content);
void sendEncryptedMessage(const std::string txn_id, nlohmann::json content);
void handleClaimedKeys(std::shared_ptr<StateKeeper> keeper,
const std::map<std::string, std::string> &room_key,
const std::map<std::string, DevicePublicKeys> &pks,
@ -272,15 +264,11 @@ private:
void setPaginationInProgress(const bool paginationInProgress);
QSet<QString> read;
QList<QString> pending;
std::map<QString, ReactionsModel> reactions;
mutable EventStore events;
QString room_id_;
QString prev_batch_token_;
bool isInitialSync = true;
bool decryptDescription = true;
bool m_paginationInProgress = false;