Description
So I think we can bypass ever putting things in the ring buffer we use for intercage pipes when the reader has already blocked and write is called.
I'm sorry, but I don't know Rust yet. Below is some pseudo-code I hacked into the C circular buffer code I wrote before. I put **
before and after all the code I added...
Note, I didn't check my pseudo-code to make sure it doesn't have a typo, etc. I did spend a little bit of time thinking about race conditions and think the races should be benign. Please double check both of these.
This will only work in a single reader, single writer situation, but this is the majority of pipe operations and should be fine for many cases including the AMP stack. I can think more about a multi-reader / writer solution. There likely is one in the literature we could borrow.
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <memory.h>
// The usable bytes are this value, -1...
#define CIRCULAR_BUFFER_SIZE 1024*64
// Note, throughout, I'm assuming 1 char = 1 byte...
struct cbuffer {
char *buffer;
int buffer_size;
int readpos;
int writepos;
**// This is where we store things to avoid the ring buffer copy
char *pendingread;
int pendingreadamount;**
};
int min(int a, int b) {
if (a<b) return a; else return b;
}
struct cbuffer *cbuffer_factory(int buff_size) {
struct cbuffer *ret_buf;
if ((ret_buf = malloc(sizeof(struct cbuffer)))==NULL) {
printf("Memory error allocating cbuff struct\n");
exit(1);
}
assert(buff_size >0);
// Set buffer
if ((ret_buf->buffer = malloc(buff_size))==NULL) {
printf("Memory error allocating buffer\n");
exit(1);
}
ret_buf->buffer_size = buff_size;
ret_buf->readpos = 0;
ret_buf->writepos = 0;
**// Initialize these to nothing
ret_buf->pendingread=NULL;
ret_buf->pendingreadamount=0;**
return ret_buf;
}
int write_to_cbuffer(char *data, int length, struct cbuffer *cbuff_ptr) {
assert(cbuff_ptr != NULL);
assert(cbuff_ptr->buffer != NULL);
assert(data != NULL);
**int prewriteamount;
// handle the pending read
if (ret_buf->pendingread!=NULL) {
// figure out what to write
prewriteamount = min(length,ret_buf->pendingreadamount);
// copy it over
memcpy(pendingread,data,prewriteamount)
// move the pointer and fall through
length= length-prewriteamount;
data +=prewriteamount;
}**
// First figure out what we will write...
// The available space is the distance between the write pos and read pos (-1)
int bytes_to_write;
bytes_to_write = min(length,(cbuff_ptr->buffer_size+cbuff_ptr->readpos-(cbuff_ptr->writepos+1))%cbuff_ptr->buffer_size);
// Then write it (wrapping as is needed)
// two cases: either wrap or not...
if (cbuff_ptr->writepos+bytes_to_write > cbuff_ptr->buffer_size) {
int pre_wrap_amount = cbuff_ptr->buffer_size - cbuff_ptr->writepos;
// write before the wrap...
memcpy(cbuff_ptr->buffer+cbuff_ptr->writepos,data,pre_wrap_amount);
// ...and after
memcpy(cbuff_ptr->buffer,data+pre_wrap_amount,bytes_to_write-pre_wrap_amount);
}
else {
// otherwise, without wrapping, this is easy.
memcpy(cbuff_ptr->buffer+cbuff_ptr->writepos,data,bytes_to_write);
}
// Then update the writepos
cbuff_ptr->writepos = (cbuff_ptr->writepos + bytes_to_write)%cbuff_ptr->buffer_size;
**// Prevent data race when read updates within write
ret_buf->pendingread = NULL;
ret_buf->pendingreadamount = 0;**
// Return number of bytes written **will need to unblock reader when amount >0**
return **prewriteamount**+bytes_to_write;
}
int read_from_cbuffer(char *output, int length, struct cbuffer *cbuff_ptr) {
assert(cbuff_ptr != NULL);
assert(cbuff_ptr->buffer != NULL);
assert(output != NULL);
assert(length >= 0);
// First figure out how much we can read and how much we will.
int bytes_to_read;
bytes_to_read = min(length,(cbuff_ptr->buffer_size+cbuff_ptr->writepos-cbuff_ptr->readpos)%cbuff_ptr->buffer_size);
// Then read it (wrapping as is needed)
// two cases: either need to wrap or not...
if (cbuff_ptr->readpos+bytes_to_read > cbuff_ptr->buffer_size) {
int pre_wrap_amount = cbuff_ptr->buffer_size - cbuff_ptr->readpos;
// read before the wrap...
memcpy(output,cbuff_ptr->buffer+cbuff_ptr->readpos,pre_wrap_amount);
// ...and after
memcpy(output+pre_wrap_amount,cbuff_ptr->buffer,bytes_to_read-pre_wrap_amount);
}
else {
// otherwise, without wrapping, this is easy.
memcpy(output,cbuff_ptr->buffer+cbuff_ptr->readpos,bytes_to_read);
}
// Then update the readpos
cbuff_ptr->readpos = (cbuff_ptr->readpos + bytes_to_read)%cbuff_ptr->buffer_size;
**if (bytes_to_read == 0) {
ret_buf->pendingreadamount = length; // must come before next statement to prevent race
ret_buf->pendingread= NULL;
}**
// Return number of bytes read
return bytes_to_read;
}