Commit 3e2b067f authored by kfox's avatar kfox

Create frame from fragment slice

parent 167f3a9b
......@@ -177,17 +177,6 @@ athenaFragmenter_ReceiveFragment(AthenaFragmenter *athenaFragmenter, PARCBuffer
return wireFormatBuffer;
}
// CCNxCodecEncodingBuffer *encodingBuffer = ccnxCodecEncodingBuffer_Create();
// ccnxCodecEncodingBuffer_AppendBuffer(encodingBuffer, PARCBuffer *bufferToAppend);
// NEW ccnxCodecEncodingBuffer_PrependBuffer(encodingBuffer, PARCBuffer *bufferToPrepend);
// NEW CCNxCodecEncodingBuffer *iov = ccnxCodecEncodingBuffer_Slice(encodingBuffer, offset, length);
// Optional ccnxCodecEncodingBuffer_AppendIOVec(encodingBuffer, struct iovec *vectorToAppend);
// Optional ccnxCodecEncodingBuffer_PrependIOVec(encodingBuffer, struct iovec *vectorToPrepend);
// CCNxCodecEncodingBufferIOVec *iov = ccnxCodecEncodingBuffer_CreateIOVec(encodingBuffer);
// void ccnxCodecEncodingBufferIOVec_Release(CCNxCodecEncodingBufferIOVec **iovecPtr);
// writev(STDOUT_FILENO, iov->iov, iov->iovcnt);
// ccnxCodecEncodingBufferIOVec_Release(&iov);
CCNxCodecEncodingBufferIOVec *
athenaFragmenter_CreateFragment(AthenaFragmenter *athenaFragmenter, PARCBuffer *message, size_t mtu, int fragmentNumber)
{
......
......@@ -191,16 +191,16 @@ _sendIoVector(AthenaTransportLink *athenaTransportLink, struct ether_header *hea
// Prepend the header
iov[0].iov_len = sizeof(struct ether_header);
iov[0].iov_base = &header;
iovcnt++; // increment for the header
iov[0].iov_base = header;
// Append message content
size_t messageLength = 0;
size_t messageLength = sizeof(struct ether_header);
for (int i = 0; i < iovcnt; i++) {
iov[i + 1].iov_len = iovec[i].iov_len;
iov[i + 1].iov_base = iovec[i].iov_base;
messageLength += iov[i + 1].iov_len;
}
iovcnt++; // increment for the header
parcLog_Debug(athenaTransportLink_GetLogger(athenaTransportLink),
"sending message (size=%d)", messageLength);
......@@ -225,7 +225,7 @@ _sendIoVector(AthenaTransportLink *athenaTransportLink, struct ether_header *hea
// Short write
if (writeCount != messageLength) {
linkData->_stats.send_ShortWrite++;
parcLog_Debug(athenaTransportLink_GetLogger(athenaTransportLink), "short write");
parcLog_Debug(athenaTransportLink_GetLogger(athenaTransportLink), "short write (%u < %u)", writeCount, messageLength);
errno = EIO;
return -1;
}
......@@ -263,8 +263,9 @@ _ETHSend(AthenaTransportLink *athenaTransportLink, CCNxMetaMessage *ccnxMetaMess
iovec = ccnxCodecNetworkBufferIoVec_GetArray(messageIoVector);
iovcnt = ccnxCodecNetworkBufferIoVec_GetCount(messageIoVector);
} else {
if (linkData->fragmenter == NULL) {
// Right now we work with the message in a contiguous buffer, we could optimize the copy out by working directly with the IO Vector.
if (linkData->fragmenter != NULL) {
// Right now we work with the message in a contiguous buffer.
// We can optimize the copy out by working directly with the IO Vector.
message = athenaTransportLinkModule_GetMessageBuffer(ccnxMetaMessage);
ioFragment = athenaFragmenter_CreateFragment(linkData->fragmenter, message,
linkData->link.mtu, fragmentNumber);
......@@ -304,7 +305,9 @@ _ETHSend(AthenaTransportLink *athenaTransportLink, CCNxMetaMessage *ccnxMetaMess
}
ccnxCodecNetworkBufferIoVec_Release(&messageIoVector);
parcBuffer_Release(&message);
if (message) {
parcBuffer_Release(&message);
}
return sendResult;
}
......@@ -600,6 +603,7 @@ _ETHOpenConnection(AthenaTransportLinkModule *athenaTransportLinkModule, const c
// Copy the peer destination address into our link data
memcpy(&(linkData->link.peerAddress), destination, sizeof(struct ether_addr));
linkData->link.peerAddressLength = ETHER_ADDR_LEN;
derivedLinkName = _createNameFromLinkData(&linkData->link, false);
......
......@@ -204,49 +204,56 @@ _BEFS_ReceiveAndReassemble(AthenaFragmenter *athenaFragmenter, PARCBuffer *wireF
static CCNxCodecEncodingBufferIOVec *
_BEFS_CreateFragment(AthenaFragmenter *athenaFragmenter, PARCBuffer *message, size_t mtu, int fragmentNumber)
{
CCNxCodecEncodingBufferIOVec *fragmentIoVec = NULL;
_BEFS_fragmenterData *fragmenterData = _BEFS_GetFragmenterData(athenaFragmenter);
_HopByHopHeader fragmentHeader = {0};
const size_t maxPayload = mtu - sizeof(_HopByHopHeader);
size_t payloadLength = maxPayload;
if (fragmentNumber == 0) {
_hopByHopHeader_SetBFlag(&fragmentHeader);
}
size_t length = parcBuffer_Remaining(message);
size_t offset = maxPayload * fragmentNumber;
size_t remaining = length - offset;
ssize_t remaining = length - offset;
if (remaining <= 0) {
return NULL;
}
CCNxCodecEncodingBuffer *encodingBuffer = ccnxCodecEncodingBuffer_Create();
// Create fragmentation header
ccnxCodecEncodingBuffer_AppendBuffer(encodingBuffer, message);
PARCBuffer *fragmentHeaderBuffer = parcBuffer_Allocate(sizeof(_HopByHopHeader));
_HopByHopHeader *fragmentHeader = parcBuffer_Overlay(fragmentHeaderBuffer, 0);
if (fragmentNumber == 0) {
_hopByHopHeader_SetBFlag(fragmentHeader);
}
_hopByHopHeader_SetSequenceNumber(&fragmentHeader, fragmenterData->sendSequenceNumber++);
_hopByHopHeader_SetSequenceNumber(fragmentHeader, fragmenterData->sendSequenceNumber++);
if (remaining < maxPayload) {
payloadLength = remaining;
_hopByHopHeader_SetEFlag(&fragmentHeader);
_hopByHopHeader_SetEFlag(fragmentHeader);
}
_hopByHopHeader_SetPayloadLength(&fragmentHeader, payloadLength);
_hopByHopHeader_SetPayloadLength(fragmentHeader, payloadLength);
// Create slice of buffer to send
CCNxCodecEncodingBuffer *encodingBuffer = ccnxCodecEncodingBuffer_Create();
ccnxCodecEncodingBuffer_AppendBuffer(encodingBuffer, message);
CCNxCodecEncodingBuffer *encodingBufferSlice = NULL;
// NEW encodingBufferSlice = ccnxCodecEncodingBuffer_Slice(encodingBuffer, offset, length);
// If NULL there's no fragment that matches the offset/length we asked for
encodingBufferSlice = ccnxCodecEncodingBuffer_Slice(encodingBuffer, offset, maxPayload);
ccnxCodecEncodingBuffer_Release(&encodingBuffer);
if (encodingBufferSlice) {
PARCBuffer *fragmentHeaderBuffer = parcBuffer_Wrap(&fragmentHeader, sizeof(_HopByHopHeader), 0, sizeof(_HopByHopHeader));
// NEW
ccnxCodecEncodingBuffer_PrependBuffer(encodingBufferSlice, PARCBuffer *fragmentHeaderBuffer);
parcBuffer_Release(&fragmentHeaderBuffer);
// Prepend our hop by hop header to the slice
ccnxCodecEncodingBuffer_PrependBuffer(encodingBufferSlice, fragmentHeaderBuffer);
return ccnxCodecEncodingBuffer_CreateIOVec(encodingBufferSlice);
fragmentIoVec = ccnxCodecEncodingBuffer_CreateIOVec(encodingBufferSlice);
ccnxCodecEncodingBuffer_Release(&encodingBufferSlice);
}
return NULL;
parcBuffer_Release(&fragmentHeaderBuffer);
return fragmentIoVec;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment