diff --git a/src/Pulsar.Client/Common/DTO.fs b/src/Pulsar.Client/Common/DTO.fs index 95c986fe..41931301 100644 --- a/src/Pulsar.Client/Common/DTO.fs +++ b/src/Pulsar.Client/Common/DTO.fs @@ -213,6 +213,7 @@ type internal Metadata = EncryptionAlgo: string OrderingKey: byte[] ReplicatedFrom: string + NullValue: bool } type MessageKey = diff --git a/src/Pulsar.Client/Internal/BatchMessageContainer.fs b/src/Pulsar.Client/Internal/BatchMessageContainer.fs index 74487e8f..80a6088c 100644 --- a/src/Pulsar.Client/Internal/BatchMessageContainer.fs +++ b/src/Pulsar.Client/Internal/BatchMessageContainer.fs @@ -46,6 +46,7 @@ module internal BatchHelpers = if message.Properties.Count > 0 then for property in message.Properties do smm.Properties.Add(KeyValue(Key = property.Key, Value = property.Value)) + smm.NullValue <- box message.Value |> isNull Serializer.SerializeWithLengthPrefix(messageStream, smm, PrefixStyle.Fixed32BigEndian) messageWriter.Write(message.Payload) struct(BatchDetails(%index, BatchMessageAcker.NullAcker), message, batchItem.Tcs) diff --git a/src/Pulsar.Client/Internal/ClientCnx.fs b/src/Pulsar.Client/Internal/ClientCnx.fs index 792a3535..b646721e 100644 --- a/src/Pulsar.Client/Internal/ClientCnx.fs +++ b/src/Pulsar.Client/Internal/ClientCnx.fs @@ -530,6 +530,7 @@ and internal ClientCnx (config: PulsarClientConfiguration, EncryptionAlgo = messageMetadata.EncryptionAlgo OrderingKey = messageMetadata.OrderingKey ReplicatedFrom = messageMetadata.ReplicatedFrom + NullValue = messageMetadata.NullValue } { diff --git a/src/Pulsar.Client/Internal/ConsumerImpl.fs b/src/Pulsar.Client/Internal/ConsumerImpl.fs index 04da282d..f8b903ad 100644 --- a/src/Pulsar.Client/Internal/ConsumerImpl.fs +++ b/src/Pulsar.Client/Internal/ConsumerImpl.fs @@ -668,9 +668,12 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien else let msgKey = rawMessage.MessageKey let getValue () = - keyValueProcessor - |> Option.map (fun kvp -> kvp.DecodeKeyValue(msgKey, payload) :?> 'T) - |> Option.defaultWith (fun () -> schemaDecodeFunction payload) + if rawMessage.Metadata.NullValue then + Unchecked.defaultof<'T> + else + keyValueProcessor + |> Option.map (fun kvp -> kvp.DecodeKeyValue(msgKey, payload) :?> 'T) + |> Option.defaultWith (fun () -> schemaDecodeFunction payload) let message = Message( msgId, payload, @@ -1330,9 +1333,12 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien } let msgKey = singleMessageMetadata.PartitionKey let getValue () = - keyValueProcessor - |> Option.map (fun kvp -> kvp.DecodeKeyValue(msgKey, singleMessagePayload) :?> 'T) - |> Option.defaultWith (fun() -> schemaDecodeFunction singleMessagePayload) + if singleMessageMetadata.NullValue then + Unchecked.defaultof<'T> + else + keyValueProcessor + |> Option.map (fun kvp -> kvp.DecodeKeyValue(msgKey, singleMessagePayload) :?> 'T) + |> Option.defaultWith (fun () -> schemaDecodeFunction singleMessagePayload) let properties = if singleMessageMetadata.Properties.Count > 0 then singleMessageMetadata.Properties diff --git a/src/Pulsar.Client/Internal/ProducerImpl.fs b/src/Pulsar.Client/Internal/ProducerImpl.fs index cf8ad250..2cd1d586 100644 --- a/src/Pulsar.Client/Internal/ProducerImpl.fs +++ b/src/Pulsar.Client/Internal/ProducerImpl.fs @@ -857,7 +857,8 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c Some { PartitionKey = %key; IsBase64Encoded = false } else Some { PartitionKey = %Convert.ToBase64String(keyBytes); IsBase64Encoded = true } - MessageBuilder(value, schema.Encode(value), keyObj, + let payloay = if box value |> isNull then Array.empty else schema.Encode(value) + MessageBuilder(value, payloay, keyObj, ?properties0 = (properties |> Option.ofObj), ?deliverAt = (deliverAt |> Option.ofNullable), ?sequenceId = (sequenceId |> Option.ofNullable), diff --git a/src/Pulsar.Client/Internal/TableViewImpl.fs b/src/Pulsar.Client/Internal/TableViewImpl.fs index 9837f287..bceae49b 100644 --- a/src/Pulsar.Client/Internal/TableViewImpl.fs +++ b/src/Pulsar.Client/Internal/TableViewImpl.fs @@ -13,7 +13,11 @@ type internal TableViewImpl<'T> private (reader: IReader<'T>) = member private this.HandleMessage(msg: Message<'T>) = if not (String.IsNullOrEmpty(%msg.Key)) then - data.AddOrUpdate(%msg.Key, msg.GetValue(), (fun _ _ -> msg.GetValue())) |> ignore + let value = msg.GetValue() + if box value |> isNull then + data.TryRemove(%msg.Key) |> ignore + else + data.AddOrUpdate(%msg.Key, value, (fun _ _ -> value)) |> ignore member private this.ReadTailMessages(reader: IReader<'T>) = backgroundTask { diff --git a/tests/IntegrationTests/Batching.fs b/tests/IntegrationTests/Batching.fs index 866d3ceb..17e1b6d5 100644 --- a/tests/IntegrationTests/Batching.fs +++ b/tests/IntegrationTests/Batching.fs @@ -317,4 +317,45 @@ let tests = Log.Debug("Finished 'Second batch is formed well after the first one'") } + + testTask "Null message with batch get sent if batch size exceeds" { + + Log.Debug("Started 'Null message with batch get sent if batch size exceeds'") + + let client = getClient() + let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N") + let messagesNumber = 5 + + let! (consumer: IConsumer) = + client.NewConsumer() + .Topic(topicName) + .ConsumerName("batch consumer") + .SubscriptionName("batch-subscription") + .SubscribeAsync() + + let! (producer: IProducer) = + client.NewProducer() + .Topic(topicName) + .ProducerName("batch producer") + .EnableBatching(true) + .BatchingMaxMessages(messagesNumber / 2) + .BatchingMaxBytes(100) + .MaxPendingMessages(1) + .BlockIfQueueFull(true) + .CreateAsync() + + for i in 0 .. messagesNumber-1 do + producer.SendAsync(producer.NewMessage(null)) |> ignore + + for i in 0 .. messagesNumber-1 do + let! (message: Message) = consumer.ReceiveAsync() + match message.MessageId.Type with + | Batch (index, _) -> + Expect.equal $"Run {i} failed" (i % 2) %index + | _ -> + failwith "Expected batch message" + + Log.Debug("Finished 'Null message with batch get sent if batch size exceeds'") + + } ] diff --git a/tests/IntegrationTests/TableView.fs b/tests/IntegrationTests/TableView.fs index 172fa353..cffa8c50 100644 --- a/tests/IntegrationTests/TableView.fs +++ b/tests/IntegrationTests/TableView.fs @@ -2,6 +2,7 @@ module Pulsar.Client.IntegrationTests.TableView open System open System.Linq +open System.Threading.Tasks open Expecto open Expecto.Flip open Pulsar.Client.Api @@ -55,6 +56,14 @@ let tests = let value3 = tableView["key2"] Expect.sequenceEqual "" [| 2uy |] value2 Expect.sequenceEqual "" [| 3uy |] value3 + + do! producer.SendAsync(producer.NewMessage(null, "key1")) + + do! Task.Delay 2000 + + Expect.equal "" 1 tableView.Count + let key1NotFound = tableView.ContainsKey("key1") + Expect.equal "" false key1NotFound Log.Debug("Finished testTableView") } diff --git a/tests/UnitTests/Internal/ChunkedMessageTrackerTests.fs b/tests/UnitTests/Internal/ChunkedMessageTrackerTests.fs index a637ed1d..68022406 100644 --- a/tests/UnitTests/Internal/ChunkedMessageTrackerTests.fs +++ b/tests/UnitTests/Internal/ChunkedMessageTrackerTests.fs @@ -31,6 +31,7 @@ let tests = EventTime = Nullable() OrderingKey = [||] ReplicatedFrom = "" + NullValue = false } let testRawMessage =