Move loaded history pointer to message view

When it was in the history manager, it wouldn't get unloaded when the room was unloaded.

Hopefully fixes #136
This commit is contained in:
Tulir Asokan 2020-05-10 02:28:32 +03:00
parent 4e2cbf1e4f
commit 2cebe3b5dc
5 changed files with 34 additions and 28 deletions

View File

@ -55,7 +55,7 @@ type MatrixContainer interface {
CreateRoom(req *mautrix.ReqCreateRoom) (*rooms.Room, error) CreateRoom(req *mautrix.ReqCreateRoom) (*rooms.Room, error)
FetchMembers(room *rooms.Room) error FetchMembers(room *rooms.Room) error
GetHistory(room *rooms.Room, limit int) ([]*muksevt.Event, error) GetHistory(room *rooms.Room, limit int, dbPointer uint64) ([]*muksevt.Event, uint64, error)
GetEvent(room *rooms.Room, eventID id.EventID) (*muksevt.Event, error) GetEvent(room *rooms.Room, eventID id.EventID) (*muksevt.Event, error)
GetRoom(roomID id.RoomID) *rooms.Room GetRoom(roomID id.RoomID) *rooms.Room
GetOrCreateRoom(roomID id.RoomID) *rooms.Room GetOrCreateRoom(roomID id.RoomID) *rooms.Room

View File

@ -26,10 +26,11 @@ import (
sync "github.com/sasha-s/go-deadlock" sync "github.com/sasha-s/go-deadlock"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
"maunium.net/go/gomuks/matrix/muksevt"
"maunium.net/go/gomuks/matrix/rooms"
"maunium.net/go/mautrix/event" "maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id" "maunium.net/go/mautrix/id"
"maunium.net/go/gomuks/matrix/muksevt"
"maunium.net/go/gomuks/matrix/rooms"
) )
type HistoryManager struct { type HistoryManager struct {
@ -38,7 +39,6 @@ type HistoryManager struct {
db *bolt.DB db *bolt.DB
historyEndPtr map[*rooms.Room]uint64 historyEndPtr map[*rooms.Room]uint64
historyLoadPtr map[*rooms.Room]uint64
} }
var bucketRoomStreams = []byte("room_streams") var bucketRoomStreams = []byte("room_streams")
@ -50,7 +50,6 @@ const halfUint64 = ^uint64(0) >> 1
func NewHistoryManager(dbPath string) (*HistoryManager, error) { func NewHistoryManager(dbPath string) (*HistoryManager, error) {
hm := &HistoryManager{ hm := &HistoryManager{
historyEndPtr: make(map[*rooms.Room]uint64), historyEndPtr: make(map[*rooms.Room]uint64),
historyLoadPtr: make(map[*rooms.Room]uint64),
} }
db, err := bolt.Open(dbPath, 0600, &bolt.Options{ db, err := bolt.Open(dbPath, 0600, &bolt.Options{
Timeout: 1, Timeout: 1,
@ -142,18 +141,19 @@ func (hm *HistoryManager) Update(room *rooms.Room, eventID id.EventID, update fu
} }
func (hm *HistoryManager) Append(room *rooms.Room, events []*event.Event) ([]*muksevt.Event, error) { func (hm *HistoryManager) Append(room *rooms.Room, events []*event.Event) ([]*muksevt.Event, error) {
return hm.store(room, events, true) muksEvts, _, err := hm.store(room, events, true)
return muksEvts, err
} }
func (hm *HistoryManager) Prepend(room *rooms.Room, events []*event.Event) ([]*muksevt.Event, error) { func (hm *HistoryManager) Prepend(room *rooms.Room, events []*event.Event) ([]*muksevt.Event, uint64, error) {
return hm.store(room, events, false) return hm.store(room, events, false)
} }
func (hm *HistoryManager) store(room *rooms.Room, events []*event.Event, append bool) ([]*muksevt.Event, error) { func (hm *HistoryManager) store(room *rooms.Room, events []*event.Event, append bool) (newEvents []*muksevt.Event, newPtrStart uint64, err error) {
hm.Lock() hm.Lock()
defer hm.Unlock() defer hm.Unlock()
newEvents := make([]*muksevt.Event, len(events)) newEvents = make([]*muksevt.Event, len(events))
err := hm.db.Update(func(tx *bolt.Tx) error { err = hm.db.Update(func(tx *bolt.Tx) error {
streamPointers := tx.Bucket(bucketStreamPointers) streamPointers := tx.Bucket(bucketStreamPointers)
rid := []byte(room.ID) rid := []byte(room.ID)
stream, err := tx.Bucket(bucketRoomStreams).CreateBucketIfNotExists(rid) stream, err := tx.Bucket(bucketRoomStreams).CreateBucketIfNotExists(rid)
@ -205,6 +205,8 @@ func (hm *HistoryManager) store(room *rooms.Room, events []*event.Event, append
} }
} }
hm.historyEndPtr[room] = ptrStart + eventCount hm.historyEndPtr[room] = ptrStart + eventCount
// TODO this is not the correct value for newPtrStart, figure out what the f*ck is going on here
newPtrStart = ptrStart + eventCount
err := streamPointers.Put(rid, itob(ptrStart+eventCount)) err := streamPointers.Put(rid, itob(ptrStart+eventCount))
if err != nil { if err != nil {
return err return err
@ -213,10 +215,10 @@ func (hm *HistoryManager) store(room *rooms.Room, events []*event.Event, append
return nil return nil
}) })
return newEvents, err return
} }
func (hm *HistoryManager) Load(room *rooms.Room, num int) (events []*muksevt.Event, err error) { func (hm *HistoryManager) Load(room *rooms.Room, num int, ptrStart uint64) (events []*muksevt.Event, newPtrStart uint64, err error) {
hm.Lock() hm.Lock()
defer hm.Unlock() defer hm.Unlock()
err = hm.db.View(func(tx *bolt.Tx) error { err = hm.db.View(func(tx *bolt.Tx) error {
@ -224,8 +226,7 @@ func (hm *HistoryManager) Load(room *rooms.Room, num int) (events []*muksevt.Eve
if stream == nil { if stream == nil {
return nil return nil
} }
ptrStart, ok := hm.historyLoadPtr[room] if ptrStart == 0 {
if !ok {
ptrStart = stream.Sequence() + 1 ptrStart = stream.Sequence() + 1
} }
c := stream.Cursor() c := stream.Cursor()
@ -234,7 +235,7 @@ func (hm *HistoryManager) Load(room *rooms.Room, num int) (events []*muksevt.Eve
if k == nil || ptrStartFound >= ptrStart { if k == nil || ptrStartFound >= ptrStart {
return nil return nil
} }
hm.historyLoadPtr[room] = ptrStartFound - 1 newPtrStart = ptrStartFound
for ; k != nil && btoi(k) < ptrStart; k, v = c.Next() { for ; k != nil && btoi(k) < ptrStart; k, v = c.Next() {
evt, parseError := unmarshalEvent(v) evt, parseError := unmarshalEvent(v)
if parseError != nil { if parseError != nil {

View File

@ -966,18 +966,18 @@ func (c *Container) FetchMembers(room *rooms.Room) error {
} }
// GetHistory fetches room history. // GetHistory fetches room history.
func (c *Container) GetHistory(room *rooms.Room, limit int) ([]*muksevt.Event, error) { func (c *Container) GetHistory(room *rooms.Room, limit int, dbPointer uint64) ([]*muksevt.Event, uint64, error) {
events, err := c.history.Load(room, limit) events, newDBPointer, err := c.history.Load(room, limit, dbPointer)
if err != nil { if err != nil {
return nil, err return nil, dbPointer, err
} }
if len(events) > 0 { if len(events) > 0 {
debug.Printf("Loaded %d events for %s from local cache", len(events), room.ID) debug.Printf("Loaded %d events for %s from local cache", len(events), room.ID)
return events, nil return events, newDBPointer, nil
} }
resp, err := c.client.Messages(room.ID, room.PrevBatch, "", 'b', limit) resp, err := c.client.Messages(room.ID, room.PrevBatch, "", 'b', limit)
if err != nil { if err != nil {
return nil, err return nil, dbPointer, err
} }
debug.Printf("Loaded %d events for %s from server from %s to %s", len(resp.Chunk), room.ID, resp.Start, resp.End) debug.Printf("Loaded %d events for %s from server from %s to %s", len(resp.Chunk), room.ID, resp.Start, resp.End)
for i, evt := range resp.Chunk { for i, evt := range resp.Chunk {
@ -1002,13 +1002,14 @@ func (c *Container) GetHistory(room *rooms.Room, limit int) ([]*muksevt.Event, e
room.PrevBatch = resp.End room.PrevBatch = resp.End
c.config.Rooms.Put(room) c.config.Rooms.Put(room)
if len(resp.Chunk) == 0 { if len(resp.Chunk) == 0 {
return []*muksevt.Event{}, nil return []*muksevt.Event{}, dbPointer, nil
} }
events, err = c.history.Prepend(room, resp.Chunk) // TODO newDBPointer isn't accurate in this case yet, fix later
events, newDBPointer, err = c.history.Prepend(room, resp.Chunk)
if err != nil { if err != nil {
return nil, err return nil, dbPointer, err
} }
return events, nil return events, dbPointer, nil
} }
func (c *Container) GetEvent(room *rooms.Room, eventID id.EventID) (*muksevt.Event, error) { func (c *Container) GetEvent(room *rooms.Room, eventID id.EventID) (*muksevt.Event, error) {

View File

@ -51,6 +51,7 @@ type MessageView struct {
// Used for locking // Used for locking
loadingMessages int32 loadingMessages int32
historyLoadPtr uint64
_widestSender uint32 _widestSender uint32
_prevWidestSender uint32 _prevWidestSender uint32
@ -109,6 +110,7 @@ func (view *MessageView) Unload() {
view.ScrollOffset = 0 view.ScrollOffset = 0
view._widestSender = 5 view._widestSender = 5
view.prevMsgCount = -1 view.prevMsgCount = -1
view.historyLoadPtr = 0
view.messagesLock.Unlock() view.messagesLock.Unlock()
view.msgBufferLock.Unlock() view.msgBufferLock.Unlock()
view.messageIDLock.Unlock() view.messageIDLock.Unlock()

View File

@ -461,13 +461,15 @@ func (view *MainView) LoadHistory(roomID id.RoomID) {
// Update the "Loading more messages..." text // Update the "Loading more messages..." text
view.parent.Render() view.parent.Render()
history, err := view.matrix.GetHistory(roomView.Room, 50) history, newLoadPtr, err := view.matrix.GetHistory(roomView.Room, 50, msgView.historyLoadPtr)
if err != nil { if err != nil {
roomView.AddServiceMessage("Failed to fetch history") roomView.AddServiceMessage("Failed to fetch history")
debug.Print("Failed to fetch history for", roomView.Room.ID, err) debug.Print("Failed to fetch history for", roomView.Room.ID, err)
view.parent.Render() view.parent.Render()
return return
} }
//debug.Printf("Load pointer %d -> %d", msgView.historyLoadPtr, newLoadPtr)
msgView.historyLoadPtr = newLoadPtr
for _, evt := range history { for _, evt := range history {
roomView.AddHistoryEvent(evt) roomView.AddHistoryEvent(evt)
} }