Skip to content

Commit 54a2b9f

Browse files
committed
support qod downgrade
1 parent e3b6dc1 commit 54a2b9f

File tree

6 files changed

+102
-31
lines changed

6 files changed

+102
-31
lines changed

encoding/mqtt/mqtt.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,39 @@ func NewConnackMessage() *ConnackMessage {
163163
return message
164164
}
165165

166+
func CopyPublishMessage(msg *PublishMessage) (*PublishMessage, error) {
167+
v, err := CopyMessage(msg)
168+
if err != nil {
169+
return nil, err
170+
}
171+
172+
return v.(*PublishMessage), nil
173+
}
174+
175+
func CopyMessage(msg Message) (Message, error) {
176+
var result Message
177+
178+
switch (msg.GetType()) {
179+
case PACKET_TYPE_PUBLISH:
180+
t := msg.(*PublishMessage)
181+
c := NewPublishMessage()
182+
c.Payload = t.Payload
183+
c.TopicName = t.TopicName
184+
c.PacketIdentifier = t.PacketIdentifier
185+
c.Opaque = t.Opaque
186+
c.FixedHeader.Type = t.FixedHeader.Type
187+
c.FixedHeader.Dupe = t.FixedHeader.Dupe
188+
c.FixedHeader.QosLevel = t.FixedHeader.QosLevel
189+
c.FixedHeader.Retain = t.FixedHeader.Retain
190+
c.FixedHeader.RemainingLength = t.FixedHeader.RemainingLength
191+
result = c
192+
break
193+
default:
194+
return nil, errors.New("hoge")
195+
}
196+
return result, nil
197+
}
198+
166199
// TODO: このアホっぽい感じどうにかしたいなー
167200
// TODO: 読み込んだサイズ返す
168201
// TODO: サイズ超えてたらエラーなげるの

encoding/mqtt/subscribe.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"bytes"
55
"encoding/binary"
66
"io"
7+
8+
"fmt"
79
)
810

911
type SubscribePayload struct {
@@ -51,7 +53,9 @@ func (self *SubscribeMessage) decode(reader io.Reader) error {
5153

5254
_, _ = io.CopyN(buffer, reader, int64(length))
5355
m.TopicPath = string(buffer.Bytes())
56+
// TODO: これ実装まちがってね?
5457
binary.Read(reader, binary.BigEndian, &m.RequestedQos)
58+
fmt.Printf(" === requested qos %d\n", m.RequestedQos)
5559
self.Payload = append(self.Payload, m)
5660

5761
buffer.Reset()

server/connection.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ type Connection interface {
5454
GetWillMessage() *mqtt.WillMessage
5555
HasWillMessage() bool
5656
GetOutGoingTable() *util.MessageTable
57-
GetSubscribedTopics() []string
58-
AppendSubscribedTopic(string)
57+
GetSubscribedTopicQos(string) int
58+
GetSubscribedTopics() map[string]int
59+
AppendSubscribedTopic(string, int)
5960
RemoveSubscribedTopic(string)
6061
SetKeepaliveInterval(int)
6162
GetId() string

server/engine.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -215,20 +215,32 @@ func (self *Pidgey) Run() {
215215
// で、Pubackが帰ってきたらrefcountを下げて0になったらMessageを消す
216216
log.Debug("TopicName: %s", m.TopicName)
217217
targets := self.Qlobber.Match(m.TopicName)
218-
if m.QosLevel > 0 {
219-
// TODO: ClientごとにInflightTableを持つ
220-
// engineのOutGoingTableなのはとりあえず、ということ
221-
id := self.OutGoingTable.NewId()
222-
m.PacketIdentifier = id
223-
224-
if sender, ok := m.Opaque.(Connection); ok {
225-
self.OutGoingTable.Register2(m.PacketIdentifier, m, len(targets), sender)
218+
for i := range targets {
219+
cn := targets[i].(Connection)
220+
x, err := codec.CopyPublishMessage(m)
221+
if err != nil {
222+
continue
226223
}
227-
}
228224

229-
for i := range targets {
230-
log.Debug("sending publish message to %s [%s %s %d %d]", targets[i].(Connection).GetId(), m.TopicName, m.Payload, m.PacketIdentifier, m.QosLevel)
231-
targets[i].(Connection).WriteMessageQueue(m)
225+
subscriberQos := cn.GetSubscribedTopicQos(m.TopicName)
226+
// Downgrade QoS
227+
if subscriberQos < x.QosLevel {
228+
log.Debug("===Downgrade QoS %d > %d", x.QosLevel, subscriberQos)
229+
x.QosLevel = subscriberQos
230+
// client側はtopic filterで持ってるから単純にはqosわかんねーんだよな
231+
}
232+
233+
if x.QosLevel > 0 {
234+
// TODO: ClientごとにInflightTableを持つ
235+
// engineのOutGoingTableなのはとりあえず、ということ
236+
id := self.OutGoingTable.NewId()
237+
x.PacketIdentifier = id
238+
if sender, ok := x.Opaque.(Connection); ok {
239+
self.OutGoingTable.Register2(x.PacketIdentifier, x, len(targets), sender)
240+
}
241+
}
242+
log.Debug("sending publish message to %s [%s %s %d %d]", targets[i].(Connection).GetId(), x.TopicName, x.Payload, x.PacketIdentifier, x.QosLevel)
243+
cn.WriteMessageQueue(x)
232244
}
233245
break
234246
default:
@@ -243,7 +255,7 @@ func (self *Pidgey) Run() {
243255
}
244256

245257
func (self *Pidgey) CleanHoge(conn Connection) {
246-
for _, t := range conn.GetSubscribedTopics() {
258+
for t, _ := range conn.GetSubscribedTopics() {
247259
if conn.ShouldClearSession() {
248260
self.Qlobber.Remove(t, conn)
249261
}
@@ -388,7 +400,7 @@ func (self *Pidgey) handle(conn Connection) error {
388400
self.SubscribeMap[conn.GetId()] = payload.TopicPath
389401

390402
self.Qlobber.Add(payload.TopicPath, conn)
391-
conn.AppendSubscribedTopic(payload.TopicPath)
403+
conn.AppendSubscribedTopic(payload.TopicPath, int(payload.RequestedQos))
392404

393405
// TODO: これはAtomicにさせたいなー、とおもったり。
394406
// というかTopicは実装上もうつかってないので消していいや

server/mmux_connection.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,20 +204,29 @@ func (self *MmuxConnection) GetOutGoingTable() *util.MessageTable {
204204
// return self.PrimaryConnection.GetOutGoingTable()
205205
}
206206

207-
func (self *MmuxConnection) GetSubscribedTopics() []string {
207+
func (self *MmuxConnection) GetSubscribedTopicQos(topic string) int {
208+
if self.PrimaryConnection == nil {
209+
fmt.Printf("this is dummy connection\n")
210+
return 0
211+
}
212+
213+
return self.PrimaryConnection.GetSubscribedTopicQos(topic)
214+
}
215+
216+
func (self *MmuxConnection) GetSubscribedTopics() map[string]int {
208217
if self.PrimaryConnection == nil {
209218
return nil
210219
}
211220

212221
return self.PrimaryConnection.GetSubscribedTopics()
213222
}
214223

215-
func (self *MmuxConnection) AppendSubscribedTopic(topic string) {
224+
func (self *MmuxConnection) AppendSubscribedTopic(topic string, qos int) {
216225
if self.PrimaryConnection == nil {
217226
return
218227
}
219228

220-
self.PrimaryConnection.AppendSubscribedTopic(topic)
229+
self.PrimaryConnection.AppendSubscribedTopic(topic, qos)
221230
}
222231

223232
func (self *MmuxConnection) RemoveSubscribedTopic(topic string) {

server/tcp_connection.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ type TcpConnection struct {
2121
yield func(conn Connection, time time.Time)
2222
WillMessage *mqtt.WillMessage
2323
OutGoingTable *util.MessageTable
24-
SubscribedTopics []string
24+
SubscribedTopics map[string]int
2525
WriteQueue chan mqtt.Message
2626
WriteQueueFlag chan bool
2727
Last time.Time
2828
KeepaliveInterval int
2929
ClearSession bool
30+
Qlobber *util.Qlobber
3031
}
3132

3233
func (self *TcpConnection) SetWillMessage(will mqtt.WillMessage) {
@@ -77,6 +78,8 @@ func NewTcpConnection(socket net.Conn, server Server, retry chan *Retryable, yie
7778
KeepaliveInterval: 0,
7879
Last: time.Now(),
7980
ClearSession: true,
81+
SubscribedTopics: make(map[string]int),
82+
Qlobber: util.NewQlobber(),
8083
}
8184

8285
readMessage := make([]byte, 0, MAX_REQUEST_SIZE)
@@ -126,24 +129,33 @@ func NewTcpConnection(socket net.Conn, server Server, retry chan *Retryable, yie
126129
return conn
127130
}
128131

129-
func (self *TcpConnection) GetSubscribedTopics() []string {
132+
func (self *TcpConnection) GetSubscribedTopicQos(topic string) int {
133+
v := self.Qlobber.Match(topic)
134+
if r, ok := v[0].(int); ok {
135+
return r;
136+
}
137+
return -1
138+
// if qos, ok := self.SubscribedTopics[topic]; ok {
139+
// return qos
140+
// }
141+
// return -1
142+
}
143+
144+
func (self *TcpConnection) GetSubscribedTopics() map[string]int {
130145
return self.SubscribedTopics
131146
}
132147

133-
func (self *TcpConnection) AppendSubscribedTopic(topic string) {
134-
self.SubscribedTopics = append(self.SubscribedTopics, topic)
148+
func (self *TcpConnection) AppendSubscribedTopic(topic string, qos int) {
149+
self.SubscribedTopics[topic] = qos
150+
self.Qlobber.Add(topic, qos)
135151
}
136152

137153
func (self *TcpConnection) RemoveSubscribedTopic(topic string) {
138-
offset := -1
139-
for i, v := range self.SubscribedTopics {
140-
if v == topic {
141-
offset = i
142-
break
143-
}
144-
}
154+
self.Qlobber.Remove(topic, nil)
145155

146-
self.SubscribedTopics = append(self.SubscribedTopics[:offset], self.SubscribedTopics[:offset+1]...)
156+
if _, ok := self.SubscribedTopics[topic]; ok {
157+
delete(self.SubscribedTopics, topic)
158+
}
147159
}
148160

149161
func (self *TcpConnection) GetSocket() net.Conn {

0 commit comments

Comments
 (0)