Skip to content

Add buffer bypass to our in memory pipe implementation... #218

Open
@JustinCappos

Description

@JustinCappos

So I think we can bypass ever putting things in the ring buffer we use for intercage pipes when the reader has already blocked and write is called.

I'm sorry, but I don't know Rust yet. Below is some pseudo-code I hacked into the C circular buffer code I wrote before. I put ** before and after all the code I added...

Note, I didn't check my pseudo-code to make sure it doesn't have a typo, etc. I did spend a little bit of time thinking about race conditions and think the races should be benign. Please double check both of these.

This will only work in a single reader, single writer situation, but this is the majority of pipe operations and should be fine for many cases including the AMP stack. I can think more about a multi-reader / writer solution. There likely is one in the literature we could borrow.

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <memory.h>


// The usable bytes are this value, -1...
#define CIRCULAR_BUFFER_SIZE 1024*64

// Note, throughout, I'm assuming 1 char = 1 byte...
struct cbuffer {
	char *buffer;
	int buffer_size;
	int readpos;
	int writepos;
        **// This is where we store things to avoid the ring buffer copy
        char *pendingread;
        int pendingreadamount;**
};

int min(int a, int b) {
	if (a<b) return a; else return b;
}


struct cbuffer *cbuffer_factory(int buff_size) {
	struct cbuffer *ret_buf;
	if ((ret_buf = malloc(sizeof(struct cbuffer)))==NULL) {
		printf("Memory error allocating cbuff struct\n");
		exit(1);
	}
	assert(buff_size >0);
	// Set buffer
	if ((ret_buf->buffer = malloc(buff_size))==NULL) {
		printf("Memory error allocating buffer\n");
		exit(1);
	}
	ret_buf->buffer_size = buff_size;
	ret_buf->readpos = 0;
	ret_buf->writepos = 0;
        **// Initialize these to nothing
        ret_buf->pendingread=NULL;
        ret_buf->pendingreadamount=0;**
	return ret_buf;
}
int write_to_cbuffer(char *data, int length, struct cbuffer *cbuff_ptr) {
	assert(cbuff_ptr != NULL);
	assert(cbuff_ptr->buffer != NULL);
	assert(data != NULL);

        **int prewriteamount;
        // handle the pending read
        if (ret_buf->pendingread!=NULL) {
            // figure out what to write
            prewriteamount = min(length,ret_buf->pendingreadamount);
            // copy it over
            memcpy(pendingread,data,prewriteamount)
            // move the pointer and fall through
            length= length-prewriteamount;
            data +=prewriteamount;
        }**
	// First figure out what we will write...
	// The available space is the distance between the write pos and read pos (-1)
	int bytes_to_write;
    bytes_to_write = min(length,(cbuff_ptr->buffer_size+cbuff_ptr->readpos-(cbuff_ptr->writepos+1))%cbuff_ptr->buffer_size);
	
	// Then write it (wrapping as is needed)
	//  two cases: either wrap or not...
	if (cbuff_ptr->writepos+bytes_to_write > cbuff_ptr->buffer_size) {
		int pre_wrap_amount = cbuff_ptr->buffer_size - cbuff_ptr->writepos;
		// write before the wrap...
		memcpy(cbuff_ptr->buffer+cbuff_ptr->writepos,data,pre_wrap_amount);
		// ...and after
		memcpy(cbuff_ptr->buffer,data+pre_wrap_amount,bytes_to_write-pre_wrap_amount);
	}
	else {
		// otherwise, without wrapping, this is easy.
		memcpy(cbuff_ptr->buffer+cbuff_ptr->writepos,data,bytes_to_write);
	}
	// Then update the writepos
	cbuff_ptr->writepos = (cbuff_ptr->writepos + bytes_to_write)%cbuff_ptr->buffer_size;
	
       **// Prevent data race when read updates within write
        ret_buf->pendingread = NULL;
        ret_buf->pendingreadamount = 0;**
	// Return number of bytes written **will need to unblock reader when amount >0**
	return **prewriteamount**+bytes_to_write;
}
int read_from_cbuffer(char *output, int length, struct cbuffer *cbuff_ptr) {
	assert(cbuff_ptr != NULL);
	assert(cbuff_ptr->buffer != NULL);
	assert(output != NULL);
	assert(length >= 0);
	// First figure out how much we can read and how much we will.
	int bytes_to_read;
    bytes_to_read = min(length,(cbuff_ptr->buffer_size+cbuff_ptr->writepos-cbuff_ptr->readpos)%cbuff_ptr->buffer_size);
	
	// Then read it (wrapping as is needed)
	//  two cases: either need to wrap or not...
	if (cbuff_ptr->readpos+bytes_to_read > cbuff_ptr->buffer_size) {
		int pre_wrap_amount = cbuff_ptr->buffer_size - cbuff_ptr->readpos;
		// read before the wrap...
		memcpy(output,cbuff_ptr->buffer+cbuff_ptr->readpos,pre_wrap_amount);
		// ...and after
		memcpy(output+pre_wrap_amount,cbuff_ptr->buffer,bytes_to_read-pre_wrap_amount);
	}
	else {
		// otherwise, without wrapping, this is easy.
		memcpy(output,cbuff_ptr->buffer+cbuff_ptr->readpos,bytes_to_read);
	}
	// Then update the readpos
	cbuff_ptr->readpos = (cbuff_ptr->readpos + bytes_to_read)%cbuff_ptr->buffer_size;
	
        **if (bytes_to_read == 0) {
                ret_buf->pendingreadamount = length;  // must come before next statement to prevent race
                ret_buf->pendingread= NULL;
        }**
	// Return number of bytes read
	return bytes_to_read;
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions