Commit 2c555cbe authored by kfox's avatar kfox

renamed athenaTransportLinkModule_GetMessageBuffer to athenaTransportLinkModule_CreateMessageBuffer

more details in framing messages
parent 3ee5dd70
......@@ -31,7 +31,7 @@
/*
* Provide support for loadable fragmentation modules. The fragmenter library must be named
* libathena_ETHFragmenter_<name>, and must contain an initialization routine that is named
* libathena_Fragmenter_<name>, and must contain an initialization routine that is named
* athenaFragmenter_<name>_Init. The init routine is provided an AthenaFragmenter
* object instance that is used to maintain private instance state for the fragmentation module.
*/
......
......@@ -31,7 +31,6 @@
#ifndef libathena_Fragmenter
#define libathena_Fragmenter
#include <parc/algol/parc_Deque.h>
#include <ccnx/common/codec/ccnxCodec_EncodingBuffer.h>
/**
......
......@@ -249,7 +249,7 @@ athenaTransportLinkModule_SetLogLevel(AthenaTransportLinkModule *athenaTransport
}
PARCBuffer *
athenaTransportLinkModule_GetMessageBuffer(CCNxMetaMessage *message)
athenaTransportLinkModule_CreateMessageBuffer(CCNxMetaMessage *message)
{
PARCBuffer *buffer = ccnxWireFormatMessage_GetWireFormatBuffer(message);
......
......@@ -223,7 +223,7 @@ void athenaTransportLinkModule_SetAddLinkCallback(AthenaTransportLinkModule *ath
*/
void athenaTransportLinkModule_SetLogLevel(AthenaTransportLinkModule *athenaTransportLinkModule, const PARCLogLevel level);
PARCBuffer *athenaTransportLinkModule_GetMessageBuffer(CCNxMetaMessage *message);
PARCBuffer *athenaTransportLinkModule_CreateMessageBuffer(CCNxMetaMessage *message);
CCNxCodecNetworkBufferIoVec *athenaTransportLinkModule_GetMessageIoVector(CCNxMetaMessage *message);
......
......@@ -267,7 +267,7 @@ _ETHSend(AthenaTransportLink *athenaTransportLink, CCNxMetaMessage *ccnxMetaMess
if (linkData->fragmenter != NULL) {
// Right now we work with the message in a contiguous buffer.
// We can optimize the copy out by working directly with the IO Vector.
message = athenaTransportLinkModule_GetMessageBuffer(ccnxMetaMessage);
message = athenaTransportLinkModule_CreateMessageBuffer(ccnxMetaMessage);
ioFragment = athenaFragmenter_CreateFragment(linkData->fragmenter, message,
maxPayloadSize, fragmentNumber);
if (ioFragment) {
......
......@@ -184,7 +184,7 @@ _TCPSend(AthenaTransportLink *athenaTransportLink, CCNxMetaMessage *ccnxMetaMess
}
// Get wire format and write it out.
PARCBuffer *wireFormatBuffer = athenaTransportLinkModule_GetMessageBuffer(ccnxMetaMessage);
PARCBuffer *wireFormatBuffer = athenaTransportLinkModule_CreateMessageBuffer(ccnxMetaMessage);
parcBuffer_SetPosition(wireFormatBuffer, 0);
size_t length = parcBuffer_Limit(wireFormatBuffer);
......@@ -280,7 +280,7 @@ _TCPReceive(AthenaTransportLink *athenaTransportLink)
struct _TCPLinkData *linkData = athenaTransportLink_GetPrivateData(athenaTransportLink);
CCNxMetaMessage *ccnxMetaMessage = NULL;
// Peek at our message header to determine the total length of buffer we need to allocate.
// Read our message header to determine the total buffer length we need to allocate.
size_t fixedHeaderLength = ccnxCodecTlvPacket_MinimalHeaderLength();
PARCBuffer *wireFormatBuffer = parcBuffer_Allocate(fixedHeaderLength);
const uint8_t *messageHeader = parcBuffer_Overlay(wireFormatBuffer, 0);
......@@ -326,7 +326,9 @@ _TCPReceive(AthenaTransportLink *athenaTransportLink)
// Could do more to check the integrity of the message and framing.
if (messageLength < fixedHeaderLength) {
linkData->_stats.receive_BadMessageLength++;
parcLog_Error(athenaTransportLink_GetLogger(athenaTransportLink), "Framing error, flushing link.");
parcLog_Error(athenaTransportLink_GetLogger(athenaTransportLink),
"Framing error, length less than required header (%zu < %zu), flushing.",
messageLength, fixedHeaderLength);
_flushLink(athenaTransportLink);
parcBuffer_Release(&wireFormatBuffer);
return NULL;
......@@ -375,12 +377,13 @@ _TCPReceive(AthenaTransportLink *athenaTransportLink)
// Construct, and return a ccnxMetaMessage from the wire format buffer.
ccnxMetaMessage = ccnxMetaMessage_CreateFromWireFormatBuffer(wireFormatBuffer);
if (ccnxTlvDictionary_GetSchemaVersion(ccnxMetaMessage) == CCNxTlvDictionary_SchemaVersion_V0) {
parcLog_Warning(athenaTransportLink_GetLogger(athenaTransportLink),
"received deprecated version %d message\n", ccnxTlvDictionary_GetSchemaVersion(ccnxMetaMessage));
}
if (ccnxMetaMessage == NULL) {
linkData->_stats.receive_DecodeFailed++;
parcLog_Error(athenaTransportLink_GetLogger(athenaTransportLink), "Failed to decode message from received packet.");
} else if (ccnxTlvDictionary_GetSchemaVersion(ccnxMetaMessage) == CCNxTlvDictionary_SchemaVersion_V0) {
parcLog_Warning(athenaTransportLink_GetLogger(athenaTransportLink),
"received deprecated version %d message\n", ccnxTlvDictionary_GetSchemaVersion(ccnxMetaMessage));
}
parcBuffer_Release(&wireFormatBuffer);
......
......@@ -121,7 +121,7 @@ _TemplateSend(AthenaTransportLink *athenaTransportLink, CCNxMetaMessage *ccnxMet
{
struct _TemplateLinkData *linkData = athenaTransportLink_GetPrivateData(athenaTransportLink);
PARCBuffer *wireFormatBuffer = athenaTransportLinkModule_GetMessageBuffer(ccnxMetaMessage);
PARCBuffer *wireFormatBuffer = athenaTransportLinkModule_CreateMessageBuffer(ccnxMetaMessage);
int result = _internalSEND(linkData, wireFormatBuffer);
......
......@@ -241,7 +241,7 @@ _UDPSend(AthenaTransportLink *athenaTransportLink, CCNxMetaMessage *ccnxMetaMess
}
// Get a wire format buffer and write it out.
PARCBuffer *wireFormatBuffer = athenaTransportLinkModule_GetMessageBuffer(ccnxMetaMessage);
PARCBuffer *wireFormatBuffer = athenaTransportLinkModule_CreateMessageBuffer(ccnxMetaMessage);
parcBuffer_SetPosition(wireFormatBuffer, 0);
size_t messageLength = parcBuffer_Limit(wireFormatBuffer);
PARCBuffer *buffer = NULL;
......@@ -431,12 +431,26 @@ _demuxDelivery(AthenaTransportLink *athenaTransportLink, CCNxMetaMessage *ccnxMe
_queueMessage(demuxLink, ccnxMetaMessage);
}
static void
_flushLink(AthenaTransportLink *athenaTransportLink)
{
struct _UDPLinkData *linkData = athenaTransportLink_GetPrivateData(athenaTransportLink);
char trash[MAXPATHLEN];
// Flush link to attempt to resync our framing
while (read(linkData->fd, trash, sizeof(trash)) == sizeof(trash)) {
parcLog_Error(athenaTransportLink_GetLogger(athenaTransportLink), "... flushing link.");
}
}
//
// Peek at the header and derive our total message length
//
static size_t
_messageLengthFromHeader(AthenaTransportLink *athenaTransportLink, _UDPLinkData *linkData)
_messageLengthFromHeader(AthenaTransportLink *athenaTransportLink)
{
struct _UDPLinkData *linkData = athenaTransportLink_GetPrivateData(athenaTransportLink);
// Peek at our message header to determine the total length of buffer we need to allocate.
size_t fixedHeaderLength = ccnxCodecTlvPacket_MinimalHeaderLength();
PARCBuffer *wireFormatBuffer = parcBuffer_Allocate(fixedHeaderLength);
......@@ -478,12 +492,10 @@ _messageLengthFromHeader(AthenaTransportLink *athenaTransportLink, _UDPLinkData
// If length is greater than our MTU we will find out in the read.
if (messageLength < fixedHeaderLength) {
linkData->_stats.receive_BadMessageLength++;
parcLog_Error(athenaTransportLink_GetLogger(athenaTransportLink), "Framing error, flushing link.");
char trash[MAXPATHLEN];
// Flush link to attempt to resync our framing
while (read(linkData->fd, trash, sizeof(trash)) == sizeof(trash)) {
parcLog_Error(athenaTransportLink_GetLogger(athenaTransportLink), "... flushing link.");
}
parcLog_Error(athenaTransportLink_GetLogger(athenaTransportLink),
"Framing error, length less than required header (%zu < %zu), flushing.",
messageLength, fixedHeaderLength);
_flushLink(athenaTransportLink);
return -1;
}
......@@ -506,7 +518,7 @@ _UDPReceiveMessage(AthenaTransportLink *athenaTransportLink, struct sockaddr_in
if (linkData->link.mtu != 0) {
messageLength = linkData->link.mtu;
} else {
messageLength = _messageLengthFromHeader(athenaTransportLink, linkData);
messageLength = _messageLengthFromHeader(athenaTransportLink);
if (messageLength <= 0) {
return NULL;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment