From f668faa8940ba02e796e016ff27f2e4620b49a29 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sun, 19 Apr 2020 15:57:49 +0300 Subject: [PATCH] Process different rooms in sync responses in goroutines --- config/config.go | 8 ++++ matrix/rooms/room.go | 7 ---- matrix/rooms/roomcache.go | 16 ++++++++ matrix/sync.go | 80 +++++++++++++++++++++++++++------------ 4 files changed, 80 insertions(+), 31 deletions(-) diff --git a/config/config.go b/config/config.go index d85e243..37058cd 100644 --- a/config/config.go +++ b/config/config.go @@ -277,3 +277,11 @@ func (config *Config) LoadRoom(_ id.RoomID) *mautrix.Room { func (config *Config) GetRoom(roomID id.RoomID) *rooms.Room { return config.Rooms.GetOrCreate(roomID) } + +func (config *Config) DisableUnloading() { + config.Rooms.DisableUnloading() +} + +func (config *Config) EnableUnloading() { + config.Rooms.EnableUnloading() +} diff --git a/matrix/rooms/room.go b/matrix/rooms/room.go index 87e63f0..df7160c 100644 --- a/matrix/rooms/room.go +++ b/matrix/rooms/room.go @@ -213,13 +213,6 @@ func (room *Room) Unload() bool { debug.Print("Unloading", room.ID) room.Save() room.state = nil - room.topicCache = "" - room.CanonicalAliasCache = "" - room.firstMemberCache = nil - room.secondMemberCache = nil - room.memberCache = nil - room.exMemberCache = nil - room.replacedByCache = nil if room.postUnload != nil { room.postUnload() } diff --git a/matrix/rooms/roomcache.go b/matrix/rooms/roomcache.go index d442734..9da1922 100644 --- a/matrix/rooms/roomcache.go +++ b/matrix/rooms/roomcache.go @@ -39,6 +39,7 @@ type RoomCache struct { maxSize int maxAge int64 getOwner func() id.UserID + noUnload bool Map map[id.RoomID]*Room head *Room @@ -58,6 +59,14 @@ func NewRoomCache(listPath, directory string, maxSize int, maxAge int64, getOwne } } +func (cache *RoomCache) DisableUnloading() { + cache.noUnload = true +} + +func (cache *RoomCache) EnableUnloading() { + cache.noUnload = false +} + func (cache *RoomCache) LoadList() error { cache.Lock() defer cache.Unlock() @@ -160,6 +169,9 @@ func (cache *RoomCache) Touch(roomID id.RoomID) { } func (cache *RoomCache) TouchNode(node *Room) { + if cache.noUnload || node.touch + 2 > time.Now().Unix() { + return + } cache.Lock() cache.touch(node) cache.Unlock() @@ -200,6 +212,7 @@ func (cache *RoomCache) get(roomID id.RoomID) *Room { } return nil } + func (cache *RoomCache) Put(room *Room) { cache.Lock() node := cache.get(room.ID) @@ -283,6 +296,9 @@ func (cache *RoomCache) ForceClean() { } func (cache *RoomCache) clean(force bool) { + if cache.noUnload && !force { + return + } origSize := cache.size maxTS := time.Now().Unix() - cache.maxAge for cache.size > cache.maxSize { diff --git a/matrix/sync.go b/matrix/sync.go index cd6334f..ed9d137 100644 --- a/matrix/sync.go +++ b/matrix/sync.go @@ -20,6 +20,7 @@ package matrix import ( "fmt" + "sync" "time" "maunium.net/go/mautrix" @@ -33,6 +34,8 @@ import ( type SyncerSession interface { GetRoom(id id.RoomID) *rooms.Room GetUserID() id.UserID + DisableUnloading() + EnableUnloading() } type EventSource int @@ -108,51 +111,80 @@ func NewGomuksSyncer(session SyncerSession) *GomuksSyncer { // ProcessResponse processes a Matrix sync response. func (s *GomuksSyncer) ProcessResponse(res *mautrix.RespSync, since string) (err error) { + if since == "" { + s.Session.DisableUnloading() + } debug.Print("Received sync response") s.processSyncEvents(nil, res.Presence.Events, EventSourcePresence) s.processSyncEvents(nil, res.AccountData.Events, EventSourceAccountData) + wait := &sync.WaitGroup{} + + wait.Add(len(res.Rooms.Join)) for roomID, roomData := range res.Rooms.Join { - room := s.Session.GetRoom(roomID) - room.UpdateSummary(roomData.Summary) - s.processSyncEvents(room, roomData.State.Events, EventSourceJoin|EventSourceState) - s.processSyncEvents(room, roomData.Timeline.Events, EventSourceJoin|EventSourceTimeline) - s.processSyncEvents(room, roomData.Ephemeral.Events, EventSourceJoin|EventSourceEphemeral) - s.processSyncEvents(room, roomData.AccountData.Events, EventSourceJoin|EventSourceAccountData) - - if len(room.PrevBatch) == 0 { - room.PrevBatch = roomData.Timeline.PrevBatch - } - room.LastPrevBatch = roomData.Timeline.PrevBatch + go s.processJoinedRoom(roomID, roomData, wait) } + wait.Add(len(res.Rooms.Invite)) for roomID, roomData := range res.Rooms.Invite { - room := s.Session.GetRoom(roomID) - room.UpdateSummary(roomData.Summary) - s.processSyncEvents(room, roomData.State.Events, EventSourceInvite|EventSourceState) + go s.processInvitedRoom(roomID, roomData, wait) } + wait.Add(len(res.Rooms.Leave)) for roomID, roomData := range res.Rooms.Leave { - room := s.Session.GetRoom(roomID) - room.HasLeft = true - room.UpdateSummary(roomData.Summary) - s.processSyncEvents(room, roomData.State.Events, EventSourceLeave|EventSourceState) - s.processSyncEvents(room, roomData.Timeline.Events, EventSourceLeave|EventSourceTimeline) - - if len(room.PrevBatch) == 0 { - room.PrevBatch = roomData.Timeline.PrevBatch - } - room.LastPrevBatch = roomData.Timeline.PrevBatch + go s.processLeftRoom(roomID, roomData, wait) } + wait.Wait() + if since == "" && s.InitDoneCallback != nil { s.InitDoneCallback() + s.Session.EnableUnloading() } s.FirstSyncDone = true return } +func (s *GomuksSyncer) processJoinedRoom(roomID id.RoomID, roomData mautrix.SyncJoinedRoom, wait *sync.WaitGroup) { + defer debug.Recover() + room := s.Session.GetRoom(roomID) + room.UpdateSummary(roomData.Summary) + s.processSyncEvents(room, roomData.State.Events, EventSourceJoin|EventSourceState) + s.processSyncEvents(room, roomData.Timeline.Events, EventSourceJoin|EventSourceTimeline) + s.processSyncEvents(room, roomData.Ephemeral.Events, EventSourceJoin|EventSourceEphemeral) + s.processSyncEvents(room, roomData.AccountData.Events, EventSourceJoin|EventSourceAccountData) + + if len(room.PrevBatch) == 0 { + room.PrevBatch = roomData.Timeline.PrevBatch + } + room.LastPrevBatch = roomData.Timeline.PrevBatch + wait.Done() +} + +func (s *GomuksSyncer) processInvitedRoom(roomID id.RoomID, roomData mautrix.SyncInvitedRoom, wait *sync.WaitGroup) { + defer debug.Recover() + room := s.Session.GetRoom(roomID) + room.UpdateSummary(roomData.Summary) + s.processSyncEvents(room, roomData.State.Events, EventSourceInvite|EventSourceState) + wait.Done() +} + +func (s *GomuksSyncer) processLeftRoom(roomID id.RoomID, roomData mautrix.SyncLeftRoom, wait *sync.WaitGroup) { + defer debug.Recover() + room := s.Session.GetRoom(roomID) + room.HasLeft = true + room.UpdateSummary(roomData.Summary) + s.processSyncEvents(room, roomData.State.Events, EventSourceLeave|EventSourceState) + s.processSyncEvents(room, roomData.Timeline.Events, EventSourceLeave|EventSourceTimeline) + + if len(room.PrevBatch) == 0 { + room.PrevBatch = roomData.Timeline.PrevBatch + } + room.LastPrevBatch = roomData.Timeline.PrevBatch + wait.Done() +} + func (s *GomuksSyncer) processSyncEvents(room *rooms.Room, events []*event.Event, source EventSource) { for _, evt := range events { s.processSyncEvent(room, evt, source)