Go to the documentation of this file. 1 #ifndef _IO_PROCESSOR_HPP_
2 #define _IO_PROCESSOR_HPP_ "$Id: io_processor.hpp 455 2020-07-23 20:23:59Z geoff $"
37 #ifndef OS_SOCKET_TYPE
39 #define OS_SOCKET_TYPE SOCKET
41 #define OS_SOCKET_TYPE int
44 #ifndef OS_HANDLE_TYPE
46 #define OS_HANDLE_TYPE HANDLE
48 #define OS_HANDLE_TYPE int
51 #ifndef INVALID_SOCKET
53 #define INVALID_SOCKET ((OS_SOCKET_TYPE)(~0))
55 #define INVALID_SOCKET (-1)
148 #pragma GCC diagnostic push
149 #pragma GCC diagnostic ignored "-Wsuggest-final-types"
262 #pragma GCC diagnostic push
263 #pragma GCC diagnostic ignored "-Wsuggest-final-methods"
265 #pragma GCC diagnostic pop
290 if (headerLen !=
nullptr) {
398 #pragma GCC diagnostic pop
419 PARSER_TYPE *parser = (PARSER_TYPE *) (controller->
getExtraData());
420 int rc = parser->processPacket(bfr, len);
444 PARSER_TYPE *parser = (PARSER_TYPE *) (controller->
getExtraData());
445 int rc = parser->processPacketFromSource(bfr, len, metaHdr, hdrLen);
@ PROCESS_THREAD
Definition: io_processor.hpp:160
#define safe_strcpy(d, s, l)
Safe strcpy() routine that will not copy more than l bytes and always ensures that a null is present ...
Definition: compiler_hints.h:696
IO_processFP processRoutine
Definition: io_processor.hpp:198
SMV_StandaloneNumeric< uint32_t > packetsProcessed
Definition: io_processor.hpp:127
const char * getName(uint_fast32_t *retNameLen=nullptr) const OME_ALWAYS_INLINE
Get variable name.
Definition: shared_variable.hpp:135
IO_Processor_Statistics * statistics
statistics
Definition: io_processor.hpp:195
@ _PACKET_HAS_SOURCE
bit flag indicating source information
Definition: io_processor.hpp:175
Intermediary I/O processing object for performing multi-threaded receive-and-process operations on a ...
Definition: io_processor.hpp:154
Time acquisition routines.
IO_Processor(SharedMemoryVariableNode *parentNode, BufferRegion *mgr, IO_processBlockFP b_func, OS_HANDLE_TYPE h, ThreadMode tMode=NONE, void *userData=nullptr, BlockMode bMode=PACKET, IO_receiveBlockFP r_func=recvConsume, IO_consumeFP c_func=doConsumeLoop, IO_processFP p_func=doProcessLoop)
Constructor for IO_Processor.
Definition: io_processor.cpp:486
unsigned char currentThreadState
Definition: io_processor.hpp:217
uint32_t getReadTimeout() const OME_ALWAYS_INLINE
Get limit on read attempts before blocking.
Definition: io_processor.hpp:324
#define OS_SOCKET_TYPE
Definition: io_processor.hpp:41
FARGOS I/O Processing classes.
Allocation record for chains in a 32-bit shared memory buffer.
Definition: circular_bfr.hpp:103
const char * getLabel() const OME_ALWAYS_INLINE NONNULL_RETURN
Definition: io_processor.hpp:296
void * getExtraData() const OME_ALWAYS_INLINE
Retrieve extra information value.
Definition: io_processor.hpp:334
virtual unsigned char * blockAddress(const SharedBufferAllocRecord *record)
Definition: circular_bfr.hpp:282
int64_t unlock()
Release a previously locked mutex.
Definition: timed_mutex.cpp:156
virtual SharedBufferAllocRecord * getActiveListHead()
Return first active allocation record.
Definition: circular_bfr.cpp:196
IO_consumeFP consumeRoutine
Definition: io_processor.hpp:200
virtual ~IO_Processor()
Definition: io_processor.cpp:548
unsigned int lastReadTimeout
Definition: io_processor.hpp:215
size_t dataOffset() const OME_ALWAYS_INLINE
Return number of bytes reserved for meta data on each data element.
Definition: io_processor.hpp:268
int(* IO_blockAllocFP)(uint32_t blockLen, class IO_Processor *controller)
Definition: io_processor.hpp:96
SharedMemoryVariableNode * getParentNode() const
Definition: shared_variable.hpp:113
void * extraData
arbitrary extra data
Definition: io_processor.hpp:202
SharedMemoryVariableNode namingNode
Definition: io_processor.hpp:120
ssize_t(* IO_receiveBlockFP)(SharedBufferAllocRecord *rec, class IO_Processor *controller)
The receiveBlock routine should retrieve data and store it into the indicated block of memory,...
Definition: io_processor.hpp:79
unsigned char * bufferAddress(SharedBufferAllocRecord *rec, size_t *bufferLen=nullptr) const OME_ALWAYS_INLINE
Return physical address of a buffer within the context of the local process' address space.
Definition: io_processor.hpp:275
#define EAGAIN
Definition: tmp.o.cpp:64
const struct TimeWithNanoseconds * getCurrentTime(struct TimeWithNanoseconds *saveTo, bool forceUpdate)
Get the current time.
Definition: get_time.cpp:20
@ _NOT_SOCKET
force use of read vs. recv().
Definition: io_processor.hpp:181
~IO_Processor_Statistics()
Definition: io_processor.hpp:145
@ SEPARATE_READ_AND_PROCESS_THREADS
Definition: io_processor.hpp:165
void setLabel(const char *name) OME_ALWAYS_INLINE NONNULL_CLASS_PARAMETERS(2)
Definition: io_processor.hpp:300
SMV_StandaloneNumeric< uint64_t > bytesProcessed
Definition: io_processor.hpp:123
Holds collected statistics for an IO_Processor object.
Definition: io_processor.hpp:118
Intermediate naming node for supporting variable naming hierarchies.
Definition: shared_variable.hpp:318
SMV_StandaloneNumeric< uint64_t > bytesRead
Definition: io_processor.hpp:122
#define OME_YIELD_THREAD()
Macro for platform-specific yield of thread's time slice.
Definition: compiler_hints.h:547
#define INVALID_HANDLE_VALUE
Definition: poll_monitor.hpp:19
uint32_t getThreadMode() const OME_ALWAYS_INLINE
Get requested thread modes.
Definition: io_processor.hpp:364
pthread_t processThreadID
Definition: io_processor.hpp:208
#define SIGIO
Definition: tmp.o.cpp:526
#define NONNULL_CLASS_PARAMETERS(...)
Mark a function as never returning a null pointer.
Definition: compiler_hints.h:337
unsigned char threadStartedState
Definition: io_processor.hpp:216
Structure for representing time as relative seconds and nanoseconds.
Definition: get_time.h:24
IO_receiveBlockFP recvRoutine
Definition: io_processor.hpp:201
int postCondition()
Notify any sleeping threads that the condition has occurred.
Definition: timed_mutex.cpp:358
int(* IO_processFP)(class IO_Processor *controller)
Long running routine which processes all input made available by the corresponding consume routine.
Definition: io_processor.hpp:94
ThreadMode
Mask to select threading modes.
Definition: io_processor.hpp:157
void setConsumeRoutine(IO_receiveBlockFP func) OME_ALWAYS_INLINE
Set consume routine.
Definition: io_processor.hpp:359
int waitForDataToProcess(bool alreadyLocked=false)
Wait for data to arrive.
Definition: io_processor.cpp:755
int processPacketFromSourceUsingClass(SharedBufferAllocRecord *rec, class IO_Processor *controller)
Implements a processing routine that invokes a processPacketFromSource() routine upon each incoming p...
Definition: io_processor.hpp:432
void setReadAttempts(uint32_t count) OME_ALWAYS_INLINE
Set read attempts before blocking.
Definition: io_processor.hpp:309
SMV_StandaloneNumeric< uint32_t > readAttemptsBeforeBlocking
max unproductive read attempts before thread blocks
Definition: io_processor.hpp:131
IO_Processor_Statistics(SharedMemoryVariableNode *parentNode)
Definition: io_processor.hpp:133
int setThreadMode(ThreadMode mode)
Set threading mode.
Definition: io_processor.cpp:564
#define SOCKET_CAST(x)
Definition: io_processor.cpp:47
BlockMode
Blocking mode.
Definition: io_processor.hpp:171
uint64_t time_nanosec
Definition: get_time.h:26
const char srcID[]
Definition: catSym.c:17
void waitForBufferAllocRecordToBeReady(SharedBufferAllocRecord *rec)
Verify record is prepared and, if needed, wait until it is prepared.
Definition: circular_wait.hpp:16
void setMaxPacketSize(size_t bytes) OME_ALWAYS_INLINE
Set MTU.
Definition: io_processor.hpp:339
@ READ_AND_PROCESS_ON_SAME_THREAD
a single spawned read thread will do both read-and-processing
Definition: io_processor.hpp:167
OS_HANDLE_TYPE descriptor
Definition: io_processor.hpp:210
int waitForDataToProcessOrUntil(const struct timespec *maxWaitUntil, bool alreadyLocked=false)
Wait for data to arrive or until a point in time.
Definition: io_processor.cpp:775
@ CONTIGUOUS_FILE_STREAM
contiguous byte stream from file
Definition: io_processor.hpp:185
int32_t packetsProcessedIncrement
Definition: io_processor.hpp:212
static int doConsumeLoop(IO_Processor *controller)
Standard consume loop to receive incoming data.
Definition: io_processor.cpp:172
#define OME_EXPECT_TRUE(expr)
Annotation macro for conditional expression expected to be true.
Definition: compiler_hints.h:541
int processPacketUsingClass(SharedBufferAllocRecord *rec, class IO_Processor *controller)
Implements a processing routine that invokes a processPacket() routine upon each incoming packet.
Definition: io_processor.hpp:414
int waitForThreadExit(uint32_t modes)
Wait for threads to terminate.
Definition: io_processor.cpp:728
SMV_StandaloneNumeric< uint32_t > packetsRead
Definition: io_processor.hpp:125
virtual SharedBufferAllocRecord * allocateBlock(size_t len)=0
int stopThread(uint32_t modes)
Request stop.
Definition: io_processor.cpp:684
void setPacketsProcessedIncrement(int32_t incVal)
Definition: io_processor.hpp:304
virtual void returnBlock(SharedBufferAllocRecord *record)=0
void setExtraData(void *data) OME_ALWAYS_INLINE
Set extra information value.
Definition: io_processor.hpp:329
@ PROCESS_DURING_READ
Processing will be performed by the read thread.
Definition: io_processor.hpp:162
Generic mutex implementation that supports timing statistics.
Definition: timed_mutex.hpp:51
TimedCondition * condition
Definition: io_processor.hpp:197
void setBlockingMode(BlockMode mode)
Set block delivery mode.
Definition: io_processor.cpp:641
size_t getBlockSize() const OME_ALWAYS_INLINE
Return the block size set for the region.
Definition: circular_bfr.hpp:248
#define PTHREAD_CREATE_JOINABLE
Definition: tmp.o.cpp:425
bool isSleeping() const OME_ALWAYS_INLINE
Return a Boolean indication of whether or not a thread is sleeping.
Definition: timed_mutex.hpp:338
#define NULL
Definition: tmp.o.cpp:327
int submitOrProcessBlock(SharedBufferAllocRecord *rec)
Submission routine that will either directly invoke the processing routine or notify the processing t...
Definition: io_processor.cpp:144
size_t maxPacketSize
max to receive in one go
Definition: io_processor.hpp:214
#define htons(x)
Definition: tmp.o.cpp:3100
Generic condition variable for use with the TimedMutex class.
Definition: timed_mutex.hpp:216
@ _PACKET_HAS_RECEIVE_TIME
bit flag indicating arrival time
Definition: io_processor.hpp:177
@ NONE
No threads will be spawned.
Definition: io_processor.hpp:158
FARGOS Shared Memory Variable routines.
const char srcID[] OME_USED
Definition: tick_time.cpp:24
@ PACKET_WITH_SOURCE
packet message unit including source address
Definition: io_processor.hpp:187
@ PACKET
packet message unit, like a datagram
Definition: io_processor.hpp:173
@ READ_THREAD
A read thread will be spawned.
Definition: io_processor.hpp:159
int interruptThread(uint32_t modes, bool force=false)
Interrupt blocked thread.
Definition: io_processor.cpp:646
TimedMutex * mutex
Definition: io_processor.hpp:196
uint64_t time_sec
Definition: get_time.h:25
@ PACKET_WITH_TIME
packet message with arrival time
Definition: io_processor.hpp:189
@ _PACKET_HAS_META_DATA
bit flags indicating meta data present
Definition: io_processor.hpp:179
SMV_StandaloneNumeric< uint32_t > productiveReadSpins
Definition: io_processor.hpp:128
int errno
Definition: ethers.c:41
Compiler-specific macros to provide performance-related hints.
#define OME_EXPECT_FALSE(expr)
Annotation macro for conditional expression expected to be false.
Definition: compiler_hints.h:540
uint32_t usedLen
Definition: circular_bfr.hpp:107
#define OME_ALWAYS_INLINE
Tell the compiler to alway inline a function, regardless of optimization level.
Definition: compiler_hints.h:364
@ PACKET_WITH_SOURCE_AND_TIME
packet message with source address and arrival time
Definition: io_processor.hpp:191
void setProcessLoopRoutine(IO_processFP func) OME_ALWAYS_INLINE
Set processing loop routine.
Definition: io_processor.hpp:349
int(* IO_consumeFP)(class IO_Processor *controller)
Long running routine which receives all input from input source.
Definition: io_processor.hpp:87
int waitForThreadStart(uint32_t mode)
Wait for threads to start.
Definition: io_processor.cpp:712
uint32_t descriptorFlags
Definition: io_processor.hpp:211
IO_processBlockFP processBlockRoutine
Definition: io_processor.hpp:199
int noteDataToProcess(bool alreadyLocked=false)
Note new data has arrived.
Definition: io_processor.cpp:803
unsigned char blockingMode
Definition: io_processor.hpp:220
void setConsumeLoopRoutine(IO_consumeFP func) OME_ALWAYS_INLINE
Set consume loop routine.
Definition: io_processor.hpp:354
BufferRegion * bfrManager
buffer region
Definition: io_processor.hpp:194
@ CONTIGUOUS_BYTE_STREAM
contiguous byte stream from socket
Definition: io_processor.hpp:183
unsigned char desiredThreadState
Definition: io_processor.hpp:218
void setProcessRoutine(IO_processBlockFP func) OME_ALWAYS_INLINE
Set processing routine.
Definition: io_processor.hpp:344
#define OS_HANDLE_TYPE
Definition: io_processor.hpp:48
pthread_t consumeThreadID
Definition: io_processor.hpp:207
Interface to a buffer region. This is an abstract class.
Definition: circular_bfr.hpp:177
uint32_t getReadAttempts() const OME_ALWAYS_INLINE
Get limit on read attempts before blocking.
Definition: io_processor.hpp:314
char label[24]
Definition: io_processor.hpp:213
void setReadTimeout(uint32_t count) OME_ALWAYS_INLINE
Set read timeout.
Definition: io_processor.hpp:319
static int doProcessLoop(IO_Processor *controller)
Standard processing loop to process data on separate thread; works in conjunction with doConsumeLoop(...
Definition: io_processor.cpp:219
IO_metaBlock_header * bufferHeaderAddress(SharedBufferAllocRecord *rec, size_t *headerLen=nullptr) const OME_ALWAYS_INLINE
Return physical address of meta block header within the context of the local process' address space.
Definition: io_processor.hpp:288
@ BOTH_THREADS
Both a read and a processing thread will be spawned.
Definition: io_processor.hpp:164
void setLabelForThread(pthread_t threadId, const char *label)
Definition: io_processor.cpp:51
unsigned char stopRequested
Definition: io_processor.hpp:219
#define SA_SIGINFO
Definition: tmp.o.cpp:494
int(* IO_blockFreeFP)(unsigned char *block, class IO_Processor *controller)
Definition: io_processor.hpp:99
char NONNULL_RETURN
Definition: compiler_hints.h:745
SMV_StandaloneNumeric< uint32_t > maxReadTimeout
Definition: io_processor.hpp:129
static ssize_t recvConsume(SharedBufferAllocRecord *rec, class IO_Processor *controller)
Definition: io_processor.cpp:279
int(* IO_processBlockFP)(SharedBufferAllocRecord *rec, class IO_Processor *controller)
Process a block of data received from an input source.
Definition: io_processor.hpp:69
#define EINTR
Definition: tmp.o.cpp:93