diff --git a/config/config.go b/config/config.go index 0ddb4a3..a3127c4 100644 --- a/config/config.go +++ b/config/config.go @@ -53,11 +53,11 @@ type Config struct { AccessToken string `yaml:"access_token"` HS string `yaml:"homeserver"` - Dir string `yaml:"-"` - CacheDir string `yaml:"cache_dir"` - HistoryDir string `yaml:"history_dir"` - MediaDir string `yaml:"media_dir"` - StateDir string `yaml:"state_dir"` + Dir string `yaml:"-"` + CacheDir string `yaml:"cache_dir"` + HistoryPath string `yaml:"history_path"` + MediaDir string `yaml:"media_dir"` + StateDir string `yaml:"state_dir"` Preferences UserPreferences `yaml:"-"` AuthCache AuthCache `yaml:"-"` @@ -70,11 +70,11 @@ type Config struct { // NewConfig creates a config that loads data from the given directory. func NewConfig(configDir, cacheDir string) *Config { return &Config{ - Dir: configDir, - CacheDir: cacheDir, - HistoryDir: filepath.Join(cacheDir, "history"), - StateDir: filepath.Join(cacheDir, "state"), - MediaDir: filepath.Join(cacheDir, "media"), + Dir: configDir, + CacheDir: cacheDir, + HistoryPath: filepath.Join(cacheDir, "history.db"), + StateDir: filepath.Join(cacheDir, "state"), + MediaDir: filepath.Join(cacheDir, "media"), Rooms: make(map[string]*rooms.Room), } @@ -82,7 +82,7 @@ func NewConfig(configDir, cacheDir string) *Config { // Clear clears the session cache and removes all history. func (config *Config) Clear() { - os.RemoveAll(config.HistoryDir) + os.Remove(config.HistoryPath) os.RemoveAll(config.StateDir) os.RemoveAll(config.MediaDir) os.RemoveAll(config.CacheDir) @@ -91,7 +91,6 @@ func (config *Config) Clear() { func (config *Config) CreateCacheDirs() { os.MkdirAll(config.CacheDir, 0700) - os.MkdirAll(config.HistoryDir, 0700) os.MkdirAll(config.StateDir, 0700) os.MkdirAll(config.MediaDir, 0700) } diff --git a/go.mod b/go.mod index ebfca9d..1122fea 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d // indirect github.com/russross/blackfriday/v2 v2.0.1 github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect + go.etcd.io/bbolt v1.3.2 golang.org/x/image v0.0.0-20190321063152-3fc05d484e9f golang.org/x/net v0.0.0-20190327214358-63eda1eb0650 gopkg.in/toast.v1 v1.0.0-20180812000517-0a84660828b2 diff --git a/go.sum b/go.sum index 54dc719..483338f 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5I github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/zyedidia/clipboard v0.0.0-20180718195219-bd31d747117d h1:Lhqt2eo+rgM8aswvM7nTtAMVm8ARPWzkE9n6eZDOccY= github.com/zyedidia/clipboard v0.0.0-20180718195219-bd31d747117d/go.mod h1:WDk3p8GiZV9+xFWlSo8qreeoLhW6Ik692rqXk+cNeRY= +go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= +go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= diff --git a/gomuks.go b/gomuks.go index 3e30035..8d5ac76 100644 --- a/gomuks.go +++ b/gomuks.go @@ -61,8 +61,8 @@ func NewGomuks(uiProvider ifc.UIProvider) *Gomuks { // Save saves the active session and message history. func (gmx *Gomuks) Save() { gmx.config.SaveAll() - debug.Print("Saving history...") - gmx.ui.MainView().SaveAllHistory() + //debug.Print("Saving history...") + //gmx.ui.MainView().SaveAllHistory() } // StartAutosave calls Save() every minute until it receives a stop signal diff --git a/interface/matrix.go b/interface/matrix.go index 2f457bf..bfce8b5 100644 --- a/interface/matrix.go +++ b/interface/matrix.go @@ -41,7 +41,8 @@ type MatrixContainer interface { JoinRoom(roomID, server string) (*rooms.Room, error) LeaveRoom(roomID string) error - GetHistory(roomID, prevBatch string, limit int) ([]*mautrix.Event, string, error) + GetHistory(room *rooms.Room, limit int) ([]*mautrix.Event, error) + GetEvent(room *rooms.Room, eventID string) (*mautrix.Event, error) GetRoom(roomID string) *rooms.Room Download(mxcURL string) ([]byte, string, string, error) diff --git a/interface/ui.go b/interface/ui.go index 9091e36..df9d959 100644 --- a/interface/ui.go +++ b/interface/ui.go @@ -46,7 +46,6 @@ type MainView interface { AddRoom(room *rooms.Room) RemoveRoom(room *rooms.Room) SetRooms(rooms map[string]*rooms.Room) - SaveAllHistory() UpdateTags(room *rooms.Room) @@ -67,8 +66,6 @@ const ( type RoomView interface { MxRoom() *rooms.Room - SaveHistory(dir string) error - LoadHistory(matrix MatrixContainer, dir string) (int, error) SetCompletions(completions []string) SetTyping(users []string) diff --git a/matrix/history.go b/matrix/history.go new file mode 100644 index 0000000..1b99125 --- /dev/null +++ b/matrix/history.go @@ -0,0 +1,247 @@ +// gomuks - A terminal Matrix client written in Go. +// Copyright (C) 2019 Tulir Asokan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package matrix + +import ( + "bytes" + "encoding/binary" + "encoding/gob" + "sync" + + bolt "go.etcd.io/bbolt" + + "maunium.net/go/gomuks/matrix/rooms" + "maunium.net/go/mautrix" +) + +type HistoryManager struct { + sync.Mutex + + db *bolt.DB + + historyEndPtr map[*rooms.Room]uint64 + historyLoadPtr map[*rooms.Room]uint64 +} + +var bucketRoomStreams = []byte("room_streams") +var bucketRoomEventIDs = []byte("room_event_ids") +var bucketStreamPointers = []byte("room_stream_pointers") + +const halfUint64 = ^uint64(0) >> 1 + +func NewHistoryManager(dbPath string) (*HistoryManager, error) { + hm := &HistoryManager{ + historyEndPtr: make(map[*rooms.Room]uint64), + historyLoadPtr: make(map[*rooms.Room]uint64), + } + db, err := bolt.Open(dbPath, 0600, nil) + if err != nil { + return nil, err + } + err = db.Update(func(tx *bolt.Tx) error { + _, err = tx.CreateBucketIfNotExists(bucketRoomStreams) + if err != nil { + return err + } + _, err = tx.CreateBucketIfNotExists(bucketRoomEventIDs) + if err != nil { + return err + } + _, err = tx.CreateBucketIfNotExists(bucketStreamPointers) + if err != nil { + return err + } + return nil + }) + if err != nil { + return nil, err + } + hm.db = db + return hm, nil +} + +func (hm *HistoryManager) Close() error { + return hm.db.Close() +} + +func (hm *HistoryManager) Get(room *rooms.Room, eventID string) (event *mautrix.Event, err error) { + err = hm.db.View(func(tx *bolt.Tx) error { + rid := []byte(room.ID) + eventIDs := tx.Bucket(bucketRoomEventIDs).Bucket(rid) + if eventIDs == nil { + return nil + } + streamIndex := eventIDs.Get([]byte(eventID)) + if streamIndex == nil { + return nil + } + stream := tx.Bucket(bucketRoomStreams).Bucket(rid) + eventData := stream.Get(streamIndex) + var umErr error + event, umErr = unmarshalEvent(eventData) + return umErr + }) + return +} + +func (hm *HistoryManager) Append(room *rooms.Room, events []*mautrix.Event) error { + return hm.store(room, events, true) +} + +func (hm *HistoryManager) Prepend(room *rooms.Room, events []*mautrix.Event) error { + return hm.store(room, events, false) +} + +func (hm *HistoryManager) store(room *rooms.Room, events []*mautrix.Event, append bool) error { + hm.Lock() + defer hm.Unlock() + err := hm.db.Update(func(tx *bolt.Tx) error { + streamPointers := tx.Bucket(bucketStreamPointers) + rid := []byte(room.ID) + stream, err := tx.Bucket(bucketRoomStreams).CreateBucketIfNotExists(rid) + if err != nil { + return err + } + eventIDs, err := tx.Bucket(bucketRoomEventIDs).CreateBucketIfNotExists(rid) + if err != nil { + return err + } + if stream.Sequence() < halfUint64 { + // The sequence counter (i.e. the future) the part after 2^63, i.e. the second half of uint64 + // We set it to -1 because NextSequence will increment it by one. + err = stream.SetSequence(halfUint64 - 1) + if err != nil { + return err + } + } + if append { + ptrStart, err := stream.NextSequence() + if err != nil { + return err + } + for i, event := range events { + if err := put(stream, eventIDs, event, ptrStart+uint64(i)); err != nil { + return err + } + } + err = stream.SetSequence(ptrStart + uint64(len(events)) - 1) + if err != nil { + return err + } + } else { + ptrStart, ok := hm.historyEndPtr[room] + if !ok { + ptrStartRaw := streamPointers.Get(rid) + if ptrStartRaw != nil { + ptrStart = btoi(ptrStartRaw) + } else { + ptrStart = halfUint64 - 1 + } + } + eventCount := uint64(len(events)) + for i, event := range events { + if err := put(stream, eventIDs, event, -ptrStart-uint64(i)); err != nil { + return err + } + } + hm.historyEndPtr[room] = ptrStart + eventCount + err := streamPointers.Put(rid, itob(ptrStart+eventCount)) + if err != nil { + return err + } + } + + return nil + }) + return err +} + +func (hm *HistoryManager) Load(room *rooms.Room, num int) (events []*mautrix.Event, err error) { + hm.Lock() + defer hm.Unlock() + err = hm.db.View(func(tx *bolt.Tx) error { + rid := []byte(room.ID) + stream := tx.Bucket(bucketRoomStreams).Bucket(rid) + if stream == nil { + return nil + } + ptrStart, ok := hm.historyLoadPtr[room] + if !ok { + ptrStart = stream.Sequence() + } + c := stream.Cursor() + k, v := c.Seek(itob(ptrStart - uint64(num))) + ptrStartFound := btoi(k) + if k == nil || ptrStartFound >= ptrStart { + return nil + } + hm.historyLoadPtr[room] = ptrStartFound - 1 + for ; k != nil && btoi(k) < ptrStart; k, v = c.Next() { + event, parseError := unmarshalEvent(v) + if parseError != nil { + return parseError + } + events = append(events, event) + } + return nil + }) + // Reverse array because we read/append the history in reverse order. + i := 0 + j := len(events) - 1 + for i < j { + events[i], events[j] = events[j], events[i] + i++ + j-- + } + return +} + +func itob(v uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b +} + +func btoi(b []byte) uint64 { + return binary.BigEndian.Uint64(b) +} + +func marshalEvent(event *mautrix.Event) ([]byte, error) { + var buf bytes.Buffer + err := gob.NewEncoder(&buf).Encode(event) + return buf.Bytes(), err +} + +func unmarshalEvent(data []byte) (*mautrix.Event, error) { + event := &mautrix.Event{} + return event, gob.NewDecoder(bytes.NewReader(data)).Decode(event) +} + +func put(streams, eventIDs *bolt.Bucket, event *mautrix.Event, key uint64) error { + data, err := marshalEvent(event) + if err != nil { + return err + } + keyBytes := itob(key) + if err = streams.Put(keyBytes, data); err != nil { + return err + } + if err = eventIDs.Put([]byte(event.ID), keyBytes); err != nil { + return err + } + return nil +} diff --git a/matrix/matrix.go b/matrix/matrix.go index 9527451..6985659 100644 --- a/matrix/matrix.go +++ b/matrix/matrix.go @@ -50,6 +50,7 @@ type Container struct { gmx ifc.Gomuks ui ifc.GomuksUI config *config.Config + history *HistoryManager running bool stop chan bool @@ -102,6 +103,11 @@ func (c *Container) InitClient() error { } c.client.Logger = mxLogger{} + c.history, err = NewHistoryManager(c.config.HistoryPath) + if err != nil { + return err + } + allowInsecure := len(os.Getenv("GOMUKS_ALLOW_INSECURE_CONNECTIONS")) > 0 if allowInsecure { c.client.Client = &http.Client{ @@ -158,6 +164,11 @@ func (c *Container) Stop() { debug.Print("Stopping Matrix container...") c.stop <- true c.client.StopSync() + debug.Print("Closing history manager...") + err := c.history.Close() + if err != nil { + debug.Print("Error closing history manager:", err) + } } } @@ -281,6 +292,11 @@ func (c *Container) HandleMessage(source EventSource, evt *mautrix.Event) { return } + err := c.history.Append(roomView.MxRoom(), []*mautrix.Event{evt}) + if err != nil { + debug.Printf("Failed to add event %s to history: %v", evt.ID, err) + } + message := mainView.ParseEvent(roomView, evt) if message != nil { roomView.AddMessage(message, ifc.AppendMessage) @@ -537,12 +553,42 @@ func (c *Container) LeaveRoom(roomID string) error { } // GetHistory fetches room history. -func (c *Container) GetHistory(roomID, prevBatch string, limit int) ([]*mautrix.Event, string, error) { - resp, err := c.client.Messages(roomID, prevBatch, "", 'b', limit) +func (c *Container) GetHistory(room *rooms.Room, limit int) ([]*mautrix.Event, error) { + events, err := c.history.Load(room, limit) if err != nil { - return nil, "", err + return nil, err } - return resp.Chunk, resp.End, nil + if len(events) > 0 { + debug.Printf("Loaded %d events for %s from local cache", len(events), room.ID) + return events, nil + } + resp, err := c.client.Messages(room.ID, room.PrevBatch, "", 'b', limit) + if err != nil { + return nil, err + } + if len(resp.Chunk) > 0 { + err = c.history.Prepend(room, resp.Chunk) + if err != nil { + return nil, err + } + } + room.PrevBatch = resp.End + debug.Printf("Loaded %d events for %s from server from %s to %s", len(resp.Chunk), room.ID, resp.Start, resp.End) + return resp.Chunk, nil +} + +func (c *Container) GetEvent(room *rooms.Room, eventID string) (*mautrix.Event, error) { + event, err := c.history.Get(room, eventID) + if event != nil || err != nil { + debug.Printf("Found event %s in local cache", eventID) + return event, err + } + event, err = c.client.GetEvent(room.ID, eventID) + if err != nil { + return nil, err + } + debug.Printf("Loaded event %s from server", eventID) + return event, nil } // GetRoom gets the room instance stored in the session. diff --git a/matrix/rooms/room.go b/matrix/rooms/room.go index 16d4a9e..47f5602 100644 --- a/matrix/rooms/room.go +++ b/matrix/rooms/room.go @@ -57,6 +57,7 @@ type UnreadMessage struct { Highlight bool } + // Room represents a single Matrix room. type Room struct { *mautrix.Room @@ -74,6 +75,7 @@ type Room struct { UnreadMessages []UnreadMessage unreadCountCache *int highlightCache *bool + lastMarkedRead string // Whether or not this room is marked as a direct chat. IsDirect bool @@ -142,7 +144,11 @@ func (room *Room) Save(path string) error { } // MarkRead clears the new message statuses on this room. -func (room *Room) MarkRead(eventID string) { +func (room *Room) MarkRead(eventID string) bool { + if room.lastMarkedRead == eventID { + return false + } + room.lastMarkedRead = eventID readToIndex := -1 for index, unreadMessage := range room.UnreadMessages { if unreadMessage.EventID == eventID { @@ -154,6 +160,7 @@ func (room *Room) MarkRead(eventID string) { room.highlightCache = nil room.unreadCountCache = nil } + return true } func (room *Room) UnreadCount() int { diff --git a/ui/commands.go b/ui/commands.go index bc07779..a8b1faa 100644 --- a/ui/commands.go +++ b/ui/commands.go @@ -113,10 +113,10 @@ func cmdHelp(cmd *Command) { /join - Join a room. /leave - Leave the current room. -/invite - Invite a user. -/kick [reason] - Kick a user. -/ban [reason] - Ban a user. -/unban - Unban a user. +/invite - Invite a user. +/kick [reason] - Kick a user. +/ban [reason] - Ban a user. +/unban - Unban a user. /send - Send a custom event to the given room. /msend - Send a custom event to the current room. diff --git a/ui/message-view.go b/ui/message-view.go index d90a3a7..91f4621 100644 --- a/ui/message-view.go +++ b/ui/message-view.go @@ -17,10 +17,8 @@ package ui import ( - "encoding/gob" "fmt" "math" - "os" "strings" "github.com/mattn/go-runewidth" @@ -84,55 +82,6 @@ func NewMessageView(parent *RoomView) *MessageView { } } -func (view *MessageView) SaveHistory(path string) error { - file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - return err - } - defer file.Close() - - enc := gob.NewEncoder(file) - err = enc.Encode(view.messages) - if err != nil { - return err - } - - return nil -} - -func (view *MessageView) LoadHistory(matrix ifc.MatrixContainer, path string) (int, error) { - file, err := os.OpenFile(path, os.O_RDONLY, 0600) - if err != nil { - if os.IsNotExist(err) { - return 0, nil - } - return -1, err - } - defer file.Close() - - var msgs []messages.UIMessage - - dec := gob.NewDecoder(file) - err = dec.Decode(&msgs) - if err != nil { - return -1, err - } - - view.messages = make([]messages.UIMessage, len(msgs)) - indexOffset := 0 - for index, message := range msgs { - if message != nil { - view.messages[index-indexOffset] = message - view.updateWidestSender(message.Sender()) - message.RegisterMatrix(matrix) - } else { - indexOffset++ - } - } - - return len(view.messages), nil -} - func (view *MessageView) updateWidestSender(sender string) { if len(sender) > view.widestSender { view.widestSender = len(sender) diff --git a/ui/messages/parser/parser.go b/ui/messages/parser/parser.go index b36fde0..79e628a 100644 --- a/ui/messages/parser/parser.go +++ b/ui/messages/parser/parser.go @@ -106,7 +106,7 @@ func ParseMessage(matrix ifc.MatrixContainer, room *rooms.Room, evt *mautrix.Eve if len(roomID) == 0 { roomID = room.ID } - replyToEvt, _ := matrix.Client().GetEvent(roomID, evt.Content.GetReplyTo()) + replyToEvt, _ := matrix.GetEvent(room, evt.Content.GetReplyTo()) if replyToEvt != nil { replyToEvt.Content.RemoveReplyFallback() if len(replyToEvt.Content.FormattedBody) == 0 { diff --git a/ui/room-view.go b/ui/room-view.go index bc9c0cc..1a1ae78 100644 --- a/ui/room-view.go +++ b/ui/room-view.go @@ -119,14 +119,6 @@ func (view *RoomView) logPath(dir string) string { return filepath.Join(dir, fmt.Sprintf("%s.gmxlog", view.Room.ID)) } -func (view *RoomView) SaveHistory(dir string) error { - return view.MessageView().SaveHistory(view.logPath(dir)) -} - -func (view *RoomView) LoadHistory(matrix ifc.MatrixContainer, dir string) (int, error) { - return view.MessageView().LoadHistory(matrix, view.logPath(dir)) -} - func (view *RoomView) SetInputSubmitFunc(fn func(room *RoomView, text string)) *RoomView { view.inputSubmitFunc = fn return view diff --git a/ui/view-main.go b/ui/view-main.go index f726af5..382d9c8 100644 --- a/ui/view-main.go +++ b/ui/view-main.go @@ -122,9 +122,12 @@ func (view *MainView) BumpFocus(roomView *RoomView) { func (view *MainView) MarkRead(roomView *RoomView) { if roomView != nil && roomView.Room.HasNewMessages() && roomView.MessageView().ScrollOffset == 0 { msgList := roomView.MessageView().messages - msg := msgList[len(msgList)-1] - roomView.Room.MarkRead(msg.ID()) - view.matrix.MarkRead(roomView.Room.ID, msg.ID()) + if len(msgList) > 0 { + msg := msgList[len(msgList)-1] + if roomView.Room.MarkRead(msg.ID()) { + view.matrix.MarkRead(roomView.Room.ID, msg.ID()) + } + } } } @@ -301,14 +304,8 @@ func (view *MainView) SwitchRoom(tag string, room *rooms.Room) { view.MarkRead(roomView) view.roomList.SetSelected(tag, room) view.parent.Render() -} - -func (view *MainView) SaveAllHistory() { - for _, room := range view.rooms { - err := room.SaveHistory(view.config.HistoryDir) - if err != nil { - debug.Printf("Failed to save history of %s: %v", room.Room.GetTitle(), err) - } + if len(roomView.MessageView().messages) == 0 { + go view.LoadHistory(room.ID) } } @@ -320,10 +317,11 @@ func (view *MainView) addRoomPage(room *rooms.Room) { view.rooms[room.ID] = roomView roomView.UpdateUserList() - _, err := roomView.LoadHistory(view.matrix, view.config.HistoryDir) + // FIXME + /*_, err := roomView.LoadHistory(view.matrix, view.config.HistoryDir) if err != nil { debug.Printf("Failed to load history of %s: %v", roomView.Room.GetTitle(), err) - } + }*/ } } @@ -465,24 +463,23 @@ func (view *MainView) LoadHistory(room string) { return } - debug.Print("Fetching history for", room, "starting from", batch) - history, prevBatch, err := view.matrix.GetHistory(roomView.Room.ID, batch, 50) + history, err := view.matrix.GetHistory(roomView.Room, 50) if err != nil { roomView.AddServiceMessage("Failed to fetch history") debug.Print("Failed to fetch history for", roomView.Room.ID, err) return } - roomView.Room.PrevBatch = prevBatch for _, evt := range history { message := view.ParseEvent(roomView, evt) if message != nil { roomView.AddMessage(message, ifc.PrependMessage) } } - err = roomView.SaveHistory(view.config.HistoryDir) + // TODO? + /*err = roomView.SaveHistory(view.config.HistoryDir) if err != nil { debug.Printf("Failed to save history of %s: %v", roomView.Room.GetTitle(), err) - } + }*/ view.config.PutRoom(roomView.Room) view.parent.Render() }