Process different rooms in sync responses in goroutines

This commit is contained in:
Tulir Asokan 2020-04-19 15:57:49 +03:00
parent 5ee6aa72db
commit f668faa894
4 changed files with 80 additions and 31 deletions

View File

@ -277,3 +277,11 @@ func (config *Config) LoadRoom(_ id.RoomID) *mautrix.Room {
func (config *Config) GetRoom(roomID id.RoomID) *rooms.Room { func (config *Config) GetRoom(roomID id.RoomID) *rooms.Room {
return config.Rooms.GetOrCreate(roomID) return config.Rooms.GetOrCreate(roomID)
} }
func (config *Config) DisableUnloading() {
config.Rooms.DisableUnloading()
}
func (config *Config) EnableUnloading() {
config.Rooms.EnableUnloading()
}

View File

@ -213,13 +213,6 @@ func (room *Room) Unload() bool {
debug.Print("Unloading", room.ID) debug.Print("Unloading", room.ID)
room.Save() room.Save()
room.state = nil room.state = nil
room.topicCache = ""
room.CanonicalAliasCache = ""
room.firstMemberCache = nil
room.secondMemberCache = nil
room.memberCache = nil
room.exMemberCache = nil
room.replacedByCache = nil
if room.postUnload != nil { if room.postUnload != nil {
room.postUnload() room.postUnload()
} }

View File

@ -39,6 +39,7 @@ type RoomCache struct {
maxSize int maxSize int
maxAge int64 maxAge int64
getOwner func() id.UserID getOwner func() id.UserID
noUnload bool
Map map[id.RoomID]*Room Map map[id.RoomID]*Room
head *Room head *Room
@ -58,6 +59,14 @@ func NewRoomCache(listPath, directory string, maxSize int, maxAge int64, getOwne
} }
} }
func (cache *RoomCache) DisableUnloading() {
cache.noUnload = true
}
func (cache *RoomCache) EnableUnloading() {
cache.noUnload = false
}
func (cache *RoomCache) LoadList() error { func (cache *RoomCache) LoadList() error {
cache.Lock() cache.Lock()
defer cache.Unlock() defer cache.Unlock()
@ -160,6 +169,9 @@ func (cache *RoomCache) Touch(roomID id.RoomID) {
} }
func (cache *RoomCache) TouchNode(node *Room) { func (cache *RoomCache) TouchNode(node *Room) {
if cache.noUnload || node.touch + 2 > time.Now().Unix() {
return
}
cache.Lock() cache.Lock()
cache.touch(node) cache.touch(node)
cache.Unlock() cache.Unlock()
@ -200,6 +212,7 @@ func (cache *RoomCache) get(roomID id.RoomID) *Room {
} }
return nil return nil
} }
func (cache *RoomCache) Put(room *Room) { func (cache *RoomCache) Put(room *Room) {
cache.Lock() cache.Lock()
node := cache.get(room.ID) node := cache.get(room.ID)
@ -283,6 +296,9 @@ func (cache *RoomCache) ForceClean() {
} }
func (cache *RoomCache) clean(force bool) { func (cache *RoomCache) clean(force bool) {
if cache.noUnload && !force {
return
}
origSize := cache.size origSize := cache.size
maxTS := time.Now().Unix() - cache.maxAge maxTS := time.Now().Unix() - cache.maxAge
for cache.size > cache.maxSize { for cache.size > cache.maxSize {

View File

@ -20,6 +20,7 @@ package matrix
import ( import (
"fmt" "fmt"
"sync"
"time" "time"
"maunium.net/go/mautrix" "maunium.net/go/mautrix"
@ -33,6 +34,8 @@ import (
type SyncerSession interface { type SyncerSession interface {
GetRoom(id id.RoomID) *rooms.Room GetRoom(id id.RoomID) *rooms.Room
GetUserID() id.UserID GetUserID() id.UserID
DisableUnloading()
EnableUnloading()
} }
type EventSource int type EventSource int
@ -108,11 +111,43 @@ func NewGomuksSyncer(session SyncerSession) *GomuksSyncer {
// ProcessResponse processes a Matrix sync response. // ProcessResponse processes a Matrix sync response.
func (s *GomuksSyncer) ProcessResponse(res *mautrix.RespSync, since string) (err error) { func (s *GomuksSyncer) ProcessResponse(res *mautrix.RespSync, since string) (err error) {
if since == "" {
s.Session.DisableUnloading()
}
debug.Print("Received sync response") debug.Print("Received sync response")
s.processSyncEvents(nil, res.Presence.Events, EventSourcePresence) s.processSyncEvents(nil, res.Presence.Events, EventSourcePresence)
s.processSyncEvents(nil, res.AccountData.Events, EventSourceAccountData) s.processSyncEvents(nil, res.AccountData.Events, EventSourceAccountData)
wait := &sync.WaitGroup{}
wait.Add(len(res.Rooms.Join))
for roomID, roomData := range res.Rooms.Join { for roomID, roomData := range res.Rooms.Join {
go s.processJoinedRoom(roomID, roomData, wait)
}
wait.Add(len(res.Rooms.Invite))
for roomID, roomData := range res.Rooms.Invite {
go s.processInvitedRoom(roomID, roomData, wait)
}
wait.Add(len(res.Rooms.Leave))
for roomID, roomData := range res.Rooms.Leave {
go s.processLeftRoom(roomID, roomData, wait)
}
wait.Wait()
if since == "" && s.InitDoneCallback != nil {
s.InitDoneCallback()
s.Session.EnableUnloading()
}
s.FirstSyncDone = true
return
}
func (s *GomuksSyncer) processJoinedRoom(roomID id.RoomID, roomData mautrix.SyncJoinedRoom, wait *sync.WaitGroup) {
defer debug.Recover()
room := s.Session.GetRoom(roomID) room := s.Session.GetRoom(roomID)
room.UpdateSummary(roomData.Summary) room.UpdateSummary(roomData.Summary)
s.processSyncEvents(room, roomData.State.Events, EventSourceJoin|EventSourceState) s.processSyncEvents(room, roomData.State.Events, EventSourceJoin|EventSourceState)
@ -124,15 +159,19 @@ func (s *GomuksSyncer) ProcessResponse(res *mautrix.RespSync, since string) (err
room.PrevBatch = roomData.Timeline.PrevBatch room.PrevBatch = roomData.Timeline.PrevBatch
} }
room.LastPrevBatch = roomData.Timeline.PrevBatch room.LastPrevBatch = roomData.Timeline.PrevBatch
wait.Done()
} }
for roomID, roomData := range res.Rooms.Invite { func (s *GomuksSyncer) processInvitedRoom(roomID id.RoomID, roomData mautrix.SyncInvitedRoom, wait *sync.WaitGroup) {
defer debug.Recover()
room := s.Session.GetRoom(roomID) room := s.Session.GetRoom(roomID)
room.UpdateSummary(roomData.Summary) room.UpdateSummary(roomData.Summary)
s.processSyncEvents(room, roomData.State.Events, EventSourceInvite|EventSourceState) s.processSyncEvents(room, roomData.State.Events, EventSourceInvite|EventSourceState)
wait.Done()
} }
for roomID, roomData := range res.Rooms.Leave { func (s *GomuksSyncer) processLeftRoom(roomID id.RoomID, roomData mautrix.SyncLeftRoom, wait *sync.WaitGroup) {
defer debug.Recover()
room := s.Session.GetRoom(roomID) room := s.Session.GetRoom(roomID)
room.HasLeft = true room.HasLeft = true
room.UpdateSummary(roomData.Summary) room.UpdateSummary(roomData.Summary)
@ -143,14 +182,7 @@ func (s *GomuksSyncer) ProcessResponse(res *mautrix.RespSync, since string) (err
room.PrevBatch = roomData.Timeline.PrevBatch room.PrevBatch = roomData.Timeline.PrevBatch
} }
room.LastPrevBatch = roomData.Timeline.PrevBatch room.LastPrevBatch = roomData.Timeline.PrevBatch
} wait.Done()
if since == "" && s.InitDoneCallback != nil {
s.InitDoneCallback()
}
s.FirstSyncDone = true
return
} }
func (s *GomuksSyncer) processSyncEvents(room *rooms.Room, events []*event.Event, source EventSource) { func (s *GomuksSyncer) processSyncEvents(room *rooms.Room, events []*event.Event, source EventSource) {