Unbreak things

This commit is contained in:
Tulir Asokan
2019-06-15 01:11:51 +03:00
parent a4ac699c93
commit a55ea42d7f
26 changed files with 985 additions and 568 deletions

View File

@ -17,6 +17,7 @@
package rooms
import (
"compress/gzip"
"encoding/gob"
"fmt"
"os"
@ -31,17 +32,20 @@ import (
)
func init() {
gob.Register([]interface{}{})
gob.Register(map[string]interface{}{})
gob.Register([]interface{}{})
gob.Register(&Room{})
gob.Register(0)
}
type RoomNameSource int
const (
ExplicitRoomName RoomNameSource = iota
CanonicalAliasRoomName
AliasRoomName
UnknownRoomName RoomNameSource = iota
MemberRoomName
AliasRoomName
CanonicalAliasRoomName
ExplicitRoomName
)
// RoomTag is a tag given to a specific room.
@ -60,7 +64,8 @@ type UnreadMessage struct {
// Room represents a single Matrix room.
type Room struct {
*mautrix.Room
// The room ID.
ID string
// Whether or not the user has left the room.
HasLeft bool
@ -79,19 +84,22 @@ type Room struct {
// Whether or not this room is marked as a direct chat.
IsDirect bool
// List of tags given to this room
// List of tags given to this room.
RawTags []RoomTag
// Timestamp of previously received actual message.
LastReceivedMessage time.Time
// Room state cache.
state map[mautrix.EventType]map[string]*mautrix.Event
// MXID -> Member cache calculated from membership events.
memberCache map[string]*mautrix.Member
// The first non-SessionUserID member in the room. Calculated at
// The first two non-SessionUserID members in the room. Calculated at
// the same time as memberCache.
firstMemberCache *mautrix.Member
firstMemberCache *mautrix.Member
secondMemberCache *mautrix.Member
// The name of the room. Calculated from the state event name,
// canonical_alias or alias or the member cache.
nameCache string
NameCache string
// The event type from which the name cache was calculated from.
nameCacheSource RoomNameSource
// The topic of the room. Directly fetched from the m.room.topic state event.
@ -101,31 +109,98 @@ type Room struct {
// The list of aliases. Directly fetched from the m.room.aliases state event.
aliasesCache []string
// Path for state store file.
path string
// Room cache object
cache *RoomCache
// Lock for state and other room stuff.
lock sync.RWMutex
// Room state cache linked list.
prev *Room
next *Room
touch int64
}
func (room *Room) Load(path string) error {
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
return err
func debugPrintError(fn func() error, message string) {
if err := fn(); err != nil {
debug.Printf("%s: %v", message, err)
}
defer file.Close()
dec := gob.NewDecoder(file)
}
func (room *Room) Loaded() bool {
return room.state != nil
}
func (room *Room) Load() {
if room.Loaded() {
return
}
room.cache.TouchNode(room)
room.lock.Lock()
defer room.lock.Unlock()
return dec.Decode(room)
room.load()
room.lock.Unlock()
}
func (room *Room) Save(path string) error {
file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return err
func (room *Room) load() {
if room.Loaded() {
return
}
defer file.Close()
enc := gob.NewEncoder(file)
debug.Print("Loading state for room", room.ID)
room.state = make(map[mautrix.EventType]map[string]*mautrix.Event)
file, err := os.OpenFile(room.path, os.O_RDONLY, 0600)
if err != nil {
if !os.IsNotExist(err) {
debug.Print("Failed to open room state file for reading:", err)
} else {
debug.Print("Room state file for", room.ID, "does not exist")
}
return
}
defer debugPrintError(file.Close, "Failed to close room state file after reading")
cmpReader, err := gzip.NewReader(file)
if err != nil {
debug.Print("Failed to open room state gzip reader:", err)
return
}
defer debugPrintError(cmpReader.Close, "Failed to close room state gzip reader")
dec := gob.NewDecoder(cmpReader)
if err = dec.Decode(&room.state); err != nil {
debug.Print("Failed to decode room state:", err)
}
}
func (room *Room) Unload() {
debug.Print("Unloading", room.ID)
room.Save()
room.state = nil
room.aliasesCache = nil
room.topicCache = ""
room.canonicalAliasCache = ""
room.firstMemberCache = nil
room.secondMemberCache = nil
}
func (room *Room) Save() {
if !room.Loaded() {
debug.Print("Failed to save room state: room not loaded")
return
}
debug.Print("Saving state for room", room.ID)
file, err := os.OpenFile(room.path, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
debug.Print("Failed to open room state file for writing:", err)
return
}
defer debugPrintError(file.Close, "Failed to close room state file after writing")
cmpWriter := gzip.NewWriter(file)
defer debugPrintError(cmpWriter.Close, "Failed to close room state gzip writer")
enc := gob.NewEncoder(cmpWriter)
room.lock.RLock()
defer room.lock.RUnlock()
return enc.Encode(room)
if err := enc.Encode(&room.state); err != nil {
debug.Print("Failed to encode room state:", err)
}
}
// MarkRead clears the new message statuses on this room.
@ -220,30 +295,32 @@ func (room *Room) Tags() []RoomTag {
// UpdateState updates the room's current state with the given Event. This will clobber events based
// on the type/state_key combination.
func (room *Room) UpdateState(event *mautrix.Event) {
room.Load()
room.lock.Lock()
defer room.lock.Unlock()
_, exists := room.State[event.Type]
_, exists := room.state[event.Type]
if !exists {
room.State[event.Type] = make(map[string]*mautrix.Event)
room.state[event.Type] = make(map[string]*mautrix.Event)
}
switch event.Type {
case mautrix.StateRoomName:
room.nameCache = ""
room.NameCache = ""
case mautrix.StateCanonicalAlias:
if room.nameCacheSource >= CanonicalAliasRoomName {
room.nameCache = ""
if room.nameCacheSource <= CanonicalAliasRoomName {
room.NameCache = ""
}
room.canonicalAliasCache = ""
case mautrix.StateAliases:
if room.nameCacheSource >= AliasRoomName {
room.nameCache = ""
if room.nameCacheSource <= AliasRoomName {
room.NameCache = ""
}
room.aliasesCache = nil
case mautrix.StateMember:
room.memberCache = nil
room.firstMemberCache = nil
if room.nameCacheSource >= MemberRoomName {
room.nameCache = ""
room.secondMemberCache = nil
if room.nameCacheSource <= MemberRoomName {
room.NameCache = ""
}
case mautrix.StateTopic:
room.topicCache = ""
@ -258,24 +335,25 @@ func (room *Room) UpdateState(event *mautrix.Event) {
}
if event.StateKey == nil {
room.State[event.Type][""] = event
room.state[event.Type][""] = event
} else {
room.State[event.Type][*event.StateKey] = event
room.state[event.Type][*event.StateKey] = event
}
}
// GetStateEvent returns the state event for the given type/state_key combo, or nil.
func (room *Room) GetStateEvent(eventType mautrix.EventType, stateKey string) *mautrix.Event {
room.Load()
room.lock.RLock()
defer room.lock.RUnlock()
stateEventMap, _ := room.State[eventType]
stateEventMap, _ := room.state[eventType]
event, _ := stateEventMap[stateKey]
return event
}
// getStateEvents returns the state events for the given type.
func (room *Room) getStateEvents(eventType mautrix.EventType) map[string]*mautrix.Event {
stateEventMap, _ := room.State[eventType]
stateEventMap, _ := room.state[eventType]
return stateEventMap
}
@ -323,7 +401,7 @@ func (room *Room) GetAliases() []string {
func (room *Room) updateNameFromNameEvent() {
nameEvt := room.GetStateEvent(mautrix.StateRoomName, "")
if nameEvt != nil {
room.nameCache = nameEvt.Content.Name
room.NameCache = nameEvt.Content.Name
}
}
@ -336,7 +414,7 @@ func (room *Room) updateNameFromAliases() {
aliases := room.GetAliases()
if len(aliases) > 0 {
sort.Sort(sort.StringSlice(aliases))
room.nameCache = aliases[0]
room.NameCache = aliases[0]
}
}
@ -351,33 +429,40 @@ func (room *Room) updateNameFromAliases() {
func (room *Room) updateNameFromMembers() {
members := room.GetMembers()
if len(members) <= 1 {
room.nameCache = "Empty room"
room.NameCache = "Empty room"
} else if room.firstMemberCache == nil {
room.nameCache = "Room"
room.NameCache = "Room"
} else if len(members) == 2 {
room.nameCache = room.firstMemberCache.Displayname
room.NameCache = room.firstMemberCache.Displayname
} else if len(members) == 3 && room.secondMemberCache != nil {
room.NameCache = fmt.Sprintf("%s and %s", room.firstMemberCache.Displayname, room.secondMemberCache.Displayname)
} else {
firstMember := room.firstMemberCache.Displayname
room.nameCache = fmt.Sprintf("%s and %d others", firstMember, len(members)-2)
members := room.firstMemberCache.Displayname
count := len(members) - 2
if room.secondMemberCache != nil {
members += ", " + room.secondMemberCache.Displayname
count--
}
room.NameCache = fmt.Sprintf("%s and %d others", members, count)
}
}
// updateNameCache updates the room display name based on the room state in the order
// specified in spec section 11.2.2.5.
func (room *Room) updateNameCache() {
if len(room.nameCache) == 0 {
if len(room.NameCache) == 0 {
room.updateNameFromNameEvent()
room.nameCacheSource = ExplicitRoomName
}
if len(room.nameCache) == 0 {
room.nameCache = room.GetCanonicalAlias()
if len(room.NameCache) == 0 {
room.NameCache = room.GetCanonicalAlias()
room.nameCacheSource = CanonicalAliasRoomName
}
if len(room.nameCache) == 0 {
if len(room.NameCache) == 0 {
room.updateNameFromAliases()
room.nameCacheSource = AliasRoomName
}
if len(room.nameCache) == 0 {
if len(room.NameCache) == 0 {
room.updateNameFromMembers()
room.nameCacheSource = MemberRoomName
}
@ -389,15 +474,19 @@ func (room *Room) updateNameCache() {
// If the cache is empty, it is updated first.
func (room *Room) GetTitle() string {
room.updateNameCache()
return room.nameCache
return room.NameCache
}
// createMemberCache caches all member events into a easily processable MXID -> *Member map.
func (room *Room) createMemberCache() map[string]*mautrix.Member {
if len(room.memberCache) > 0 {
return room.memberCache
}
cache := make(map[string]*mautrix.Member)
room.lock.RLock()
events := room.getStateEvents(mautrix.StateMember)
room.firstMemberCache = nil
room.secondMemberCache = nil
if events != nil {
for userID, event := range events {
member := &event.Content.Member
@ -405,8 +494,12 @@ func (room *Room) createMemberCache() map[string]*mautrix.Member {
if len(member.Displayname) == 0 {
member.Displayname = userID
}
if room.firstMemberCache == nil && userID != room.SessionUserID {
room.firstMemberCache = member
if userID != room.SessionUserID {
if room.firstMemberCache == nil {
room.firstMemberCache = member
} else if room.secondMemberCache == nil {
room.secondMemberCache = member
}
}
if member.Membership == mautrix.MembershipJoin || member.Membership == mautrix.MembershipInvite {
cache[userID] = member
@ -425,18 +518,16 @@ func (room *Room) createMemberCache() map[string]*mautrix.Member {
// The members are returned from the cache.
// If the cache is empty, it is updated first.
func (room *Room) GetMembers() map[string]*mautrix.Member {
if len(room.memberCache) == 0 || room.firstMemberCache == nil {
room.createMemberCache()
}
room.Load()
room.createMemberCache()
return room.memberCache
}
// GetMember returns the member with the given MXID.
// If the member doesn't exist, nil is returned.
func (room *Room) GetMember(userID string) *mautrix.Member {
if len(room.memberCache) == 0 {
room.createMemberCache()
}
room.Load()
room.createMemberCache()
room.lock.RLock()
member, _ := room.memberCache[userID]
room.lock.RUnlock()
@ -449,9 +540,13 @@ func (room *Room) GetSessionOwner() string {
}
// NewRoom creates a new Room with the given ID
func NewRoom(roomID, owner string) *Room {
func NewRoom(roomID string, cache *RoomCache) *Room {
return &Room{
Room: mautrix.NewRoom(roomID),
SessionUserID: owner,
ID: roomID,
state: make(map[mautrix.EventType]map[string]*mautrix.Event),
path: cache.roomPath(roomID),
cache: cache,
SessionUserID: cache.getOwner(),
}
}

305
matrix/rooms/roomcache.go Normal file
View File

@ -0,0 +1,305 @@
// gomuks - A terminal Matrix client written in Go.
// Copyright (C) 2019 Tulir Asokan
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package rooms
import (
"compress/gzip"
"encoding/gob"
"os"
"path/filepath"
"time"
"github.com/pkg/errors"
sync "github.com/sasha-s/go-deadlock"
"maunium.net/go/gomuks/debug"
)
// RoomCache contains room state info in a hashmap and linked list.
type RoomCache struct {
sync.Mutex
listPath string
directory string
maxSize int
maxAge int64
getOwner func() string
Map map[string]*Room
head *Room
tail *Room
size int
}
func NewRoomCache(listPath, directory string, maxSize int, maxAge int64, getOwner func() string) *RoomCache {
return &RoomCache{
listPath: listPath,
directory: directory,
maxSize: maxSize,
maxAge: maxAge,
getOwner: getOwner,
Map: make(map[string]*Room),
}
}
func (cache *RoomCache) LoadList() error {
cache.Lock()
defer cache.Unlock()
// Open room list file
file, err := os.OpenFile(cache.listPath, os.O_RDONLY, 0600)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return errors.Wrap(err, "failed to open room list file for reading")
}
defer debugPrintError(file.Close, "Failed to close room list file after reading")
// Open gzip reader for room list file
cmpReader, err := gzip.NewReader(file)
if err != nil {
return errors.Wrap(err, "failed to read gzip room list")
}
defer debugPrintError(cmpReader.Close, "Failed to close room list gzip reader")
// Open gob decoder for gzip reader
dec := gob.NewDecoder(cmpReader)
// Read number of items in list
var size int
err = dec.Decode(&size)
if err != nil {
return errors.Wrap(err, "failed to read size of room list")
}
// Read list
cache.Map = make(map[string]*Room, size)
for i := 0; i < size; i++ {
room := &Room{}
err = dec.Decode(room)
if err != nil {
debug.Printf("Failed to decode %dth room list entry: %v", i+1, err)
continue
}
room.path = cache.roomPath(room.ID)
room.cache = cache
cache.Map[room.ID] = room
}
return nil
}
func (cache *RoomCache) SaveLoadedRooms() {
cache.Lock()
defer cache.Unlock()
cache.clean()
for node := cache.head; node != nil; node = node.prev {
node.Save()
}
}
func (cache *RoomCache) SaveList() error {
cache.Lock()
defer cache.Unlock()
debug.Print("Saving room list...")
// Open room list file
file, err := os.OpenFile(cache.listPath, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return errors.Wrap(err, "failed to open room list file for writing")
}
defer debugPrintError(file.Close, "Failed to close room list file after writing")
// Open gzip writer for room list file
cmpWriter := gzip.NewWriter(file)
defer debugPrintError(cmpWriter.Close, "Failed to close room list gzip writer")
// Open gob encoder for gzip writer
enc := gob.NewEncoder(cmpWriter)
// Write number of items in list
err = enc.Encode(len(cache.Map))
if err != nil {
return errors.Wrap(err, "failed to write size of room list")
}
// Write list
for _, node := range cache.Map {
err = enc.Encode(node)
if err != nil {
debug.Printf("Failed to encode room list entry of %s: %v", node.ID, err)
}
}
debug.Print("Room list saved to", cache.listPath, len(cache.Map), cache.size)
return nil
}
func (cache *RoomCache) Touch(roomID string) {
cache.Lock()
node, ok := cache.Map[roomID]
if !ok || node == nil {
cache.Unlock()
return
}
cache.touch(node)
cache.Unlock()
}
func (cache *RoomCache) TouchNode(node *Room) {
cache.Lock()
cache.touch(node)
cache.Unlock()
}
func (cache *RoomCache) touch(node *Room) {
if node == cache.head {
return
}
debug.Print("Touching", node.ID)
cache.llPop(node)
cache.llPush(node)
node.touch = time.Now().Unix()
}
func (cache *RoomCache) Get(roomID string) *Room {
cache.Lock()
node := cache.get(roomID)
cache.Unlock()
return node
}
func (cache *RoomCache) GetOrCreate(roomID string) *Room {
cache.Lock()
node := cache.get(roomID)
if node == nil {
node = cache.newRoom(roomID)
cache.llPush(node)
}
cache.Unlock()
return node
}
func (cache *RoomCache) get(roomID string) *Room {
node, ok := cache.Map[roomID]
if ok && node != nil && node.Loaded() {
cache.touch(node)
return node
}
return nil
}
func (cache *RoomCache) Put(room *Room) {
cache.Lock()
node := cache.get(room.ID)
if node != nil {
cache.touch(node)
} else {
cache.Map[room.ID] = room
if room.Loaded() {
cache.llPush(room)
}
node = room
}
cache.Unlock()
node.Save()
}
func (cache *RoomCache) roomPath(roomID string) string {
return filepath.Join(cache.directory, roomID+".gob.gz")
}
func (cache *RoomCache) Load(roomID string) *Room {
cache.Lock()
defer cache.Unlock()
node, ok := cache.Map[roomID]
if ok {
return node
}
node = NewRoom(roomID, cache)
node.Load()
return node
}
func (cache *RoomCache) llPop(node *Room) {
if node.prev == nil && node.next == nil {
return
}
if node.prev != nil {
node.prev.next = node.next
}
if node.next != nil {
node.next.prev = node.prev
}
if node == cache.tail {
cache.tail = node.next
}
if node == cache.head {
cache.head = node.prev
}
node.next = nil
node.prev = nil
cache.size--
}
func (cache *RoomCache) llPush(node *Room) {
if node.next != nil || node.prev != nil {
debug.PrintStack()
debug.Print("Tried to llPush node that is already in stack")
return
}
if node == cache.head {
return
}
if cache.head != nil {
cache.head.next = node
}
node.prev = cache.head
node.next = nil
cache.head = node
if cache.tail == nil {
cache.tail = node
}
cache.size++
cache.clean()
}
func (cache *RoomCache) clean() {
origSize := cache.size
maxTS := time.Now().Unix() - cache.maxAge
for cache.size > cache.maxSize {
if cache.tail.touch > maxTS {
break
}
cache.tail.Unload()
cache.llPop(cache.tail)
}
if cleaned := origSize - cache.size; cleaned > 0 {
debug.Print("Cleaned", cleaned, "rooms")
}
}
func (cache *RoomCache) Unload(node *Room) {
cache.Lock()
defer cache.Unlock()
cache.llPop(node)
node.Unload()
}
func (cache *RoomCache) newRoom(roomID string) *Room {
node := NewRoom(roomID, cache)
cache.Map[node.ID] = node
return node
}