Commit 67317204 authored by kfox's avatar kfox

Include ether_header in recv length

parent c291fca3
......@@ -257,9 +257,10 @@ _ETHSend(AthenaTransportLink *athenaTransportLink, CCNxMetaMessage *ccnxMetaMess
CCNxCodecEncodingBufferIOVec *ioFragment = NULL;
const struct iovec *iovec = NULL;
size_t iovcnt = 0;
size_t maxPayloadSize = linkData->link.mtu;
// Get initial IO vector message or message fragment if we need, and have, fragmentation support.
if (messageLength <= linkData->link.mtu) {
if (messageLength <= maxPayloadSize) {
iovec = ccnxCodecNetworkBufferIoVec_GetArray(messageIoVector);
iovcnt = ccnxCodecNetworkBufferIoVec_GetCount(messageIoVector);
} else {
......@@ -268,7 +269,7 @@ _ETHSend(AthenaTransportLink *athenaTransportLink, CCNxMetaMessage *ccnxMetaMess
// We can optimize the copy out by working directly with the IO Vector.
message = athenaTransportLinkModule_GetMessageBuffer(ccnxMetaMessage);
ioFragment = athenaFragmenter_CreateFragment(linkData->fragmenter, message,
linkData->link.mtu, fragmentNumber);
maxPayloadSize, fragmentNumber);
if (ioFragment) {
iovec = ioFragment->iov;
iovcnt = ioFragment->iovcnt;
......@@ -293,9 +294,9 @@ _ETHSend(AthenaTransportLink *athenaTransportLink, CCNxMetaMessage *ccnxMetaMess
if (sendResult == -1) {
break;
}
if (messageLength > linkData->link.mtu) {
if (messageLength > maxPayloadSize) {
ioFragment = athenaFragmenter_CreateFragment(linkData->fragmenter, message,
linkData->link.mtu, fragmentNumber);
maxPayloadSize, fragmentNumber);
if (ioFragment) {
iovec = ioFragment->iov;
iovcnt = ioFragment->iovcnt;
......
......@@ -242,6 +242,7 @@ _BEFS_ReceiveAndReassemble(AthenaFragmenter *athenaFragmenter, PARCBuffer *wireF
parcBuffer_PutArray(reassembledBuffer, arrayLength, array);
parcBuffer_Release(&wireFormatBuffer);
}
parcBuffer_SetPosition(reassembledBuffer, 0);
_BEFS_ClearFragmenterData(athenaFragmenter);
return reassembledBuffer;
}
......@@ -260,11 +261,11 @@ _BEFS_CreateFragment(AthenaFragmenter *athenaFragmenter, PARCBuffer *message, si
CCNxCodecEncodingBufferIOVec *fragmentIoVec = NULL;
_BEFS_fragmenterData *fragmenterData = _BEFS_GetFragmenterData(athenaFragmenter);
const size_t maxPayload = mtu - sizeof(_HopByHopHeader);
size_t payloadLength = maxPayload;
const size_t maxPayloadSize = mtu - sizeof(_HopByHopHeader);
size_t payloadLength = maxPayloadSize;
size_t length = parcBuffer_Remaining(message);
size_t offset = maxPayload * fragmentNumber;
size_t offset = maxPayloadSize * fragmentNumber;
ssize_t remaining = length - offset;
if (remaining <= 0) {
......@@ -281,7 +282,7 @@ _BEFS_CreateFragment(AthenaFragmenter *athenaFragmenter, PARCBuffer *message, si
_hopByHopHeader_SetSendSequenceNumber(fragmenterData, fragmentHeader);
if (remaining < maxPayload) {
if (remaining < maxPayloadSize) {
payloadLength = remaining;
_hopByHopHeader_SetEFlag(fragmentHeader);
}
......@@ -293,8 +294,8 @@ _BEFS_CreateFragment(AthenaFragmenter *athenaFragmenter, PARCBuffer *message, si
ccnxCodecEncodingBuffer_AppendBuffer(encodingBuffer, message);
CCNxCodecEncodingBuffer *encodingBufferSlice = NULL;
// If NULL there's no fragment that matches the offset/length we asked for
encodingBufferSlice = ccnxCodecEncodingBuffer_Slice(encodingBuffer, offset, maxPayload);
// Returns NULL if there's no fragment that matches the offset/length we ask for
encodingBufferSlice = ccnxCodecEncodingBuffer_Slice(encodingBuffer, offset, maxPayloadSize);
ccnxCodecEncodingBuffer_Release(&encodingBuffer);
if (encodingBufferSlice) {
......
......@@ -173,10 +173,11 @@ athenaEthernet_GetEtherType(AthenaEthernet *athenaEthernet)
PARCBuffer *
athenaEthernet_Receive(AthenaEthernet *athenaEthernet, int timeout, AthenaTransportLinkEvent *events)
{
PARCBuffer *wireFormatBuffer = parcBuffer_Allocate(athenaEthernet->mtu);
size_t readLength = athenaEthernet->mtu + sizeof(struct ether_header);
PARCBuffer *wireFormatBuffer = parcBuffer_Allocate(readLength);
uint8_t *buffer = parcBuffer_Overlay(wireFormatBuffer, 0);
ssize_t readCount = recv(athenaEthernet->fd, buffer, athenaEthernet->mtu, 0);
ssize_t readCount = recv(athenaEthernet->fd, buffer, readLength, 0);
if (readCount == -1) {
if ((errno == EAGAIN) || (errno == EINTR)) {
parcLog_Info(athenaEthernet->log, "Ethernet recv retry");
......
......@@ -93,6 +93,8 @@ LONGBOW_TEST_CASE(Global, athenaTransportLinkModuleUDP_OpenClose)
AthenaTransportLinkAdapter *athenaTransportLinkAdapter = athenaTransportLinkAdapter_Create(_removeLink, NULL);
assertNotNull(athenaTransportLinkAdapter, "athenaTransportLinkAdapter_Create returned NULL");
athenaTransportLinkAdapter_SetLogLevel(athenaTransportLinkAdapter, PARCLogLevel_Debug);
connectionURI = parcURI_Parse("udp://127.0.0.1:40000/name=");
result = athenaTransportLinkAdapter_Open(athenaTransportLinkAdapter, connectionURI);
assertTrue(result == NULL, "athenaTransportLinkAdapter_Open failed to detect bad name argument");
......@@ -151,6 +153,8 @@ LONGBOW_TEST_CASE(Global, athenaTransportLinkModuleUDP_SendReceive)
AthenaTransportLinkAdapter *athenaTransportLinkAdapter = athenaTransportLinkAdapter_Create(_removeLink, NULL);
assertNotNull(athenaTransportLinkAdapter, "athenaTransportLinkAdapter_Create returned NULL");
athenaTransportLinkAdapter_SetLogLevel(athenaTransportLinkAdapter, PARCLogLevel_Debug);
connectionURI = parcURI_Parse("udp://127.0.0.1:40000/Listener/name=UDPListener");
result = athenaTransportLinkAdapter_Open(athenaTransportLinkAdapter, connectionURI);
assertTrue(result != NULL, "athenaTransportLinkAdapter_Open failed (%s)", strerror(errno));
......@@ -230,6 +234,8 @@ LONGBOW_TEST_CASE(Global, athenaTransportLinkModuleUDP_SendReceiveFragments)
AthenaTransportLinkAdapter *athenaTransportLinkAdapter = athenaTransportLinkAdapter_Create(_removeLink, NULL);
assertNotNull(athenaTransportLinkAdapter, "athenaTransportLinkAdapter_Create returned NULL");
athenaTransportLinkAdapter_SetLogLevel(athenaTransportLinkAdapter, PARCLogLevel_Debug);
sprintf(linkSpecificationURI, "udp://127.0.0.1:40000/Listener/name=UDPListener/mtu=%zu/fragmenter=BEFS", mtu);
connectionURI = parcURI_Parse(linkSpecificationURI);
result = athenaTransportLinkAdapter_Open(athenaTransportLinkAdapter, connectionURI);
......@@ -247,8 +253,11 @@ LONGBOW_TEST_CASE(Global, athenaTransportLinkModuleUDP_SendReceiveFragments)
CCNxName *name = ccnxName_CreateFromCString("lci:/foo/bar");
CCNxMetaMessage *ccnxMetaMessage = ccnxInterest_CreateSimple(name);
ccnxName_Release(&name);
size_t largePayloadSize = 3 * mtu;
ssize_t numberOfFragments = (64 * 1024) / mtu;
size_t largePayloadSize = numberOfFragments * mtu;
char largePayload[largePayloadSize];
PARCBuffer *payload = parcBuffer_Wrap((void *)largePayload, largePayloadSize, 0, largePayloadSize);
ccnxInterest_SetPayload(ccnxMetaMessage, payload);
parcBuffer_Release(&payload);
......@@ -270,9 +279,11 @@ LONGBOW_TEST_CASE(Global, athenaTransportLinkModuleUDP_SendReceiveFragments)
usleep(1000);
size_t iterations = (largePayloadSize / mtu) + 5; // Extra reads to allow some delivery latency
do {
ccnxMetaMessage = athenaTransportLinkAdapter_Receive(athenaTransportLinkAdapter, &resultVector, 0);
} while (ccnxMetaMessage == NULL);
} while ((ccnxMetaMessage == NULL) && (iterations--));
assertNotNull(resultVector, "athenaTransportLinkAdapter_Receive failed");
assertTrue(parcBitVector_NumberOfBitsSet(resultVector) == 1, "athenaTransportLinkAdapter_Receive return message with more than one ingress link");
assertNotNull(ccnxMetaMessage, "athenaTransportLinkAdapter_Receive failed to provide message");
......@@ -299,6 +310,8 @@ LONGBOW_TEST_CASE(Global, athenaTransportLinkModuleUDP_MTU)
AthenaTransportLinkAdapter *athenaTransportLinkAdapter = athenaTransportLinkAdapter_Create(_removeLink, NULL);
assertNotNull(athenaTransportLinkAdapter, "athenaTransportLinkAdapter_Create returned NULL");
athenaTransportLinkAdapter_SetLogLevel(athenaTransportLinkAdapter, PARCLogLevel_Debug);
connectionURI = parcURI_Parse("udp://127.0.0.1:40000/Listener/name=UDPListener/mtu=");
result = athenaTransportLinkAdapter_Open(athenaTransportLinkAdapter, connectionURI);
assertTrue(result == NULL, "athenaTransportLinkAdapter_Open failed to detect improper MTU (%s)", strerror(errno));
......@@ -355,6 +368,8 @@ LONGBOW_TEST_CASE(Global, athenaTransportLinkModuleUDP_P2P)
AthenaTransportLinkAdapter *athenaTransportLinkAdapter = athenaTransportLinkAdapter_Create(_removeLink, NULL);
assertNotNull(athenaTransportLinkAdapter, "athenaTransportLinkAdapter_Create returned NULL");
athenaTransportLinkAdapter_SetLogLevel(athenaTransportLinkAdapter, PARCLogLevel_Debug);
connectionURI = parcURI_Parse("udp://localhost:40001/src=localhost:40002/name=UDP_0");
result = athenaTransportLinkAdapter_Open(athenaTransportLinkAdapter, connectionURI);
assertTrue(result != NULL, "athenaTransportLinkAdapter_Open failed (%s)", strerror(errno));
......@@ -401,6 +416,8 @@ LONGBOW_TEST_CASE(Global, athenaTransportLinkModuleUDP_Local)
AthenaTransportLinkAdapter *athenaTransportLinkAdapter = athenaTransportLinkAdapter_Create(_removeLink, NULL);
assertNotNull(athenaTransportLinkAdapter, "athenaTransportLinkAdapter_Create returned NULL");
athenaTransportLinkAdapter_SetLogLevel(athenaTransportLinkAdapter, PARCLogLevel_Debug);
connectionURI = parcURI_Parse("udp://127.0.0.1:40000/Listener/name=UDP_0");
result = athenaTransportLinkAdapter_Open(athenaTransportLinkAdapter, connectionURI);
assertTrue(result != NULL, "athenaTransportLinkAdapter_Open failed (%s)", strerror(errno));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment