Go to the documentation of this file. 1 #ifndef _ORDERED_INPUT_HPP_
2 #define _ORDERED_INPUT_HPP_ "$Id: OrderedInput.hpp 455 2020-07-23 20:23:59Z geoff $"
57 #ifndef FORCE_ORDERED_INPUT_BLOCK_AS_FINAL
64 #define FORCE_ORDERED_INPUT_BLOCK_AS_FINAL 0
67 #if FORCE_ORDERED_INPUT_BLOCK_AS_FINAL == 1
68 #define ORDERED_INPUT_BLOCK_FINAL final
69 #define ORDERED_INPUT_BLOCK_VIRTUAL
71 #define ORDERED_INPUT_BLOCK_FINAL
73 #define ORDERED_INPUT_BLOCK_VIRTUAL virtual
91 typedef int64_t ssize_t;
119 if (makeCopy ==
false) {
127 unsigned char *newBlock =
new unsigned char[len];
130 memcpy(newBlock, block, len);
149 if (makeCopy ==
false) {
170 if (len <= (8192 + 4096)) {
173 int b = __builtin_clz(len);
174 b = ((
sizeof(len) * 8) - 1) - b;
177 _BitScanReverse(&b, len);
188 unsigned char *newBlock =
new unsigned char[
blockLen];
189 memcpy(newBlock, block, len);
215 return (~
static_cast<uint64_t
>(0));
254 uint64_t a2 = (uint64_t) arg.inputFile;
272 class OrderedInputSource {
274 static const struct timespec WAIT_YEAR_MAX;
275 static const struct timespec WAIT_NEVER;
277 typedef int64_t ssize_t;
291 virtual ~OrderedInputSource() {}
364 virtual const char *sourceLabel(
char *outputBfr, uint_fast32_t bfrLen)
const {
365 return (
"AnonInputSource");
370 virtual void noteEOFread() {}
374 virtual void noteEOFprocessed() {}
412 return (*arg1 < *arg2);
416 std::priority_queue<OrderedInputBlock *,pQueueContainer_t,ltOrderedInputBlockPtr>
pQueue;
425 while (pQueue.empty() ==
false) {
427 OrderedInputSource *source = block->
inputFile;
428 source->recoverInputBlock(block);
452 pQueueSize -= decrementBy;
468 pQueueSize -= decrementBy;
492 pQueueSize += incrementBy;
555 int rc =
proxyFor->forwardInputBlock(block);
560 return (
proxyFor->sourceLabel(outputBfr, bfrLen));
573 #pragma GCC diagnostic push
574 #pragma GCC diagnostic ignored "-Wsuggest-final-types"
616 for(
size_t i=0;i<=maxReorder;i+=1) {
617 InputBlockReturn result = interfaceFor->getNextInputBlock(
true, &OrderedInputSource::WAIT_NEVER);
618 if (result.
block ==
nullptr) {
621 interfaceFor->noteEOFread();
630 #pragma GCC diagnostic push
631 #pragma GCC diagnostic ignored "-Wsuggest-final-methods"
633 #pragma GCC diagnostic pop
660 result =
proxyFor->getNextInputBlock(forceCopy, maxWaitTime);
671 result.
block =
nullptr;
677 result.
block = retBlock;
684 #pragma GCC diagnostic pop
717 typedef int64_t ssize_t;
768 #pragma GCC diagnostic push
769 #pragma GCC diagnostic ignored "-Wsuggest-final-methods"
782 virtual void noteNextKey(uint64_t key, int64_t blockCount, uint64_t nextKeyFromAlternateSource) {}
783 #pragma GCC diagnostic pop
808 source->noteEOFread();
843 uint_fast32_t count = 0;
847 OrderedInputSource *source = block->
inputFile;
848 source->recoverInputBlock(block);
864 if (preloadAmt > 0) {
873 delete reorderSource;
893 const uint_fast32_t n = (uint_fast32_t)
sourceFiles.size();
894 int blocksLoaded = 0;
895 for(uint_fast32_t i=0;i<n;i+=1) {
911 return (blocksLoaded);
956 uint32_t fakeStopFlag = 0;
957 uint32_t *flagPtr = (stopFlagPtr !=
nullptr) ? stopFlagPtr : &fakeStopFlag;
960 int64_t totalBlocks = 0;
963 uint64_t nextSortKey = block->
sortKey;
966 maxTimeNS = ~static_cast<uint64_t>(0U);
968 maxTimeNS = (
static_cast<uint64_t
>(maxWaitTime->tv_sec) * 1000000000U) +
969 maxWaitTime->tv_nsec;
970 if (maxTimeNS < (
static_cast<uint64_t
>(24 * 60 * 60) * 1000000000U)) {
972 maxTimeNS += nextSortKey;
974 if (nextSortKey >= maxTimeNS) {
982 #define _TRACK_RANGE_PROCESSED 1
983 #if _TRACK_RANGE_PROCESSED == 1
984 uint64_t firstSortKey = nextSortKey;
985 uint64_t lastSortKey = nextSortKey;
988 const bool drainEverything =
false;
989 uint32_t queueModificationCount = 0;
999 OrderedInputSource *source = block->
inputFile;
1003 uint64_t secondSortKey;
1006 secondSortKey = secondBlock->
sortKey;
1008 secondBlock =
nullptr;
1009 secondSortKey = ~static_cast<uint64_t>(0U);
1015 noteNextKey(nextSortKey, totalBlocks, secondSortKey);
1018 #if _TRACK_RANGE_PROCESSED == 1
1019 lastSortKey = nextSortKey;
1022 source->recoverInputBlock(block);
1032 block = retrievedBlock.
block;
1040 queueModificationCount += 1;
1043 "Second block would be past end of interval " << secondSortKey <<
1044 " queueSkips=" << totalBlocks - queueModificationCount <<
1045 " totalBlocks=" << totalBlocks <<
1046 #if _TRACK_RANGE_PROCESSED == 1
1047 " nsFromFirst=" << secondSortKey - firstSortKey <<
1048 " nsFromPrior=" << secondSortKey - lastSortKey <<
1050 " nsToNext=" << secondSortKey - maxTimeNS <<
1054 goto cleanup_and_exit;
1064 block = secondBlock;
1065 nextSortKey = secondSortKey;
1076 secondSortKey = secondBlock->
sortKey;
1079 queueModificationCount += 1;
1081 "Reached end of interval " << nextSortKey <<
1082 " queueSkips=" << totalBlocks - queueModificationCount <<
1083 " totalBlocks=" << totalBlocks <<
1084 #if _TRACK_RANGE_PROCESSED == 1
1085 " nsFromFirst=" << nextSortKey - firstSortKey <<
1086 " nsFromPrior=" << nextSortKey - lastSortKey <<
1088 " nsToNext=" << nextSortKey - maxTimeNS <<
1092 goto cleanup_and_exit;
1101 source->sourceLabel(label,
sizeof(label)) <<
1106 source->sourceLabel(label,
sizeof(label)) <<
1112 "Ceasing all input processing from source=\"" <<
1113 source->sourceLabel(label,
sizeof(label)) <<
"\"" <<
LOG_ENDLINE;
1119 if (maxWaitTime !=
nullptr) {
1120 if (secondSortKey >= maxTimeNS) {
1122 "End of interval and reached EOF on source=\"" <<
1123 source->sourceLabel(label,
sizeof(label)) <<
"\"" <<
1124 " totalBlocks=" << totalBlocks <<
1125 " queueSkips=" << totalBlocks - queueModificationCount <<
1126 #if _TRACK_RANGE_PROCESSED == 1
1127 " nsFromFirst=" << secondSortKey - firstSortKey <<
1128 " nsFromPrior=" << secondSortKey - lastSortKey <<
1130 " nsToNext=" << secondSortKey - maxTimeNS <<
1138 source->sourceLabel(label,
sizeof(label)) <<
"\"" <<
1141 block = secondBlock;
1142 nextSortKey = secondSortKey;
1148 secondSortKey = secondBlock->
sortKey;
1150 secondBlock =
nullptr;
1151 secondSortKey = ~static_cast<uint64_t>(0U);
1159 "Queue skipCount=" << totalBlocks - queueModificationCount <<
1160 " totalBlocks=" << totalBlocks <<
1162 #if _TRACK_RANGE_PROCESSED == 1
1163 " duration=" << lastSortKey - firstSortKey <<
1166 return (totalBlocks);
1167 #undef _TRACK_RANGE_PROCESSED
1194 int64_t
processInputFiles(uint32_t *stopFlag,
const struct timespec *maxWaitTime=&OrderedInputSource::WAIT_YEAR_MAX)
1216 virtual int64_t
processWorkInProgress(uint32_t *stopFlagPtr,
const struct timespec *maxWaitTime=&OrderedInputSource::WAIT_YEAR_MAX) {
void OMEcleanupGlobalData()
Definition: vista.cpp:36
#define OME_EXT_TYPE(member)
Definition: OMEbaseType.h:19
@ OME_NLM
Definition: OMEmanifests.h:90
Ïúíþ ð Ø ˜ ˜ __text __TEXT € __apple_names __DWARF __apple_objc __DWARF __apple_namespac__DWARF H X __apple_types __DWARF l
Definition: tmp3.o.cpp:1
bool operator!=(const OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:5508
@ OME_FLOAT
Definition: OMEmanifests.h:82
void initializeAsType(const enum OMEtypes_t t)
The fundamental tagged data type used through the FARGOS/VISTA infrastructure.
Definition: OMEtype.cpp:95
FARGOS I/O Processing classes.
int OMEmainLoop(uint_fast32_t methodLimit)
FARGOS/VISTA Object Mangement Environment main loop. Invokes OMEdoWork(). Upon return,...
Definition: OMEevent.cpp:243
@ OME_POINTER
Definition: OMEmanifests.h:92
void OMEstartCriticalSection(eOMEcriticalSectionLabel regionID)
Definition: OMEmutex.cpp:217
OMEtype operator/(const OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:40694
@ OME_UINT16
Definition: OMEmanifests.h:98
int OMEinitSystem(const char *rcFileName, int mainArgc, const char *mainArgv[], const char *envp[])
Definition: OMEinit.cpp:142
Implements associative array of OMEtype elements.
Definition: OMEassoc.h:112
Implements text and binary string storage.
Definition: OMEstring.h:305
#define stderr
Definition: tmp.o.cpp:3115
OMEtype & operator/=(OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:30582
Fundamental ANY type for FARGOS/VISTA Object Management Environment.
Definition: OMEbaseType.h:250
@ OME_DOUBLE
Definition: OMEmanifests.h:83
#define VIRTUAL_OVERRIDE
Generates override if the compiler supports it.
Definition: compiler_hints.h:435
#define OME_ALWAYS_OPTIMIZE(level)
Mark a function to be compiled with a specific level of optimization.
Definition: compiler_hints.h:406
LogMaskType_t COMPONENT_LOG_MASK() io("io_logMask", &DEFAULT_sharedMemoryVariableManager, COMPONENT_LEVEL(io, warn)|COMPONENT_LEVEL(io, error)|COMPONENT_LEVEL(io, fatal))
@ OME_UINT32
Definition: OMEmanifests.h:96
#define ORDERED_INPUT_BLOCK_VIRTUAL
Set to virtual if ORDERED_INPUT_BLOCK_AS_FINAL is set to 0.
Definition: OrderedInput.hpp:74
void OMEbadType(const char *opName, const char *leftTypeName, const char *rightTypeName, uint32_t fromLine)
Definition: OMEtype_operators.cpp:11
OMEtype operator|(const OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:43769
OMEtype operator&(const OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:42937
OMEtype & operator-=(OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:24618
#define OME_PREFETCH(addr, rw, locality)
Macro to request prefetch.
Definition: compiler_hints.h:362
int makeUnique(OMEthread *thread, OMEtype &result, const OMEtype &arg)
Definition: OILdebug.cpp:357
Public interface to an OME Native Language Message.
Definition: OMEnlm.h:98
#define OME_MAX_CPUS_PERMITTED
Definition: OMEmutex.h:79
const char srcID[]
Definition: catSym.c:17
@ OME_STRING
Definition: OMEmanifests.h:85
#define CACHE_LINE_LENGTH
Definition for target system's cache line length.
Definition: compiler_hints.h:576
OMEtype * pointer
Definition: OMEbaseType.h:302
@ OME_NIL
Definition: OMEmanifests.h:78
#define OME_EXPECT_TRUE(expr)
Annotation macro for conditional expression expected to be true.
Definition: compiler_hints.h:541
OMEtype & operator%=(OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:31998
#define ORDERED_INPUT_BLOCK_FINAL
Set to final if ORDERED_INPUT_BLOCK_AS_FINAL is set to 1.
Definition: OrderedInput.hpp:72
OMEtype & operator*=(OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:29166
@ OME_UINT64
Definition: OMEmanifests.h:97
OMEassocStorage::ASSOC_HASH_KEY_t ASSOC_HASH_KEY_t
Definition: OMEassoc.h:125
OME fundamental type implementation.
#define __cplusplus
Definition: tmp.o.cpp:2998
OMEtype & operator&=(OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:27148
const char srcID[] OME_USED
Definition: tick_time.cpp:24
#define MAX_TEMPORARIES
Definition: OMEtypeOps.cpp:22
OMEtype operator+(const OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:33691
OMEtype operator-(const OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:36699
#define OME_CONST_FUNCTION
Mark as an idempotent function that only accesses arguments – no global data.
Definition: compiler_hints.h:390
@ OME_SET
Definition: OMEmanifests.h:89
@ OME_CRITICAL_SECTION_OMETYPE_OP
Definition: OMEmutex.h:45
void OMEendCriticalSection(eOMEcriticalSectionLabel regionID)
Definition: OMEmutex.cpp:236
OMEtype & operator^=(OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:32844
#define OME_EXPECT_FALSE(expr)
Annotation macro for conditional expression expected to be false.
Definition: compiler_hints.h:540
#define LOG_COMPONENT_CERR(component, lvl)
Convenience macro that uses LOG_COMPONENT_INTO to conditionally log a message to standard error.
Definition: logging_api.hpp:3030
#define OME_ALWAYS_INLINE
Tell the compiler to alway inline a function, regardless of optimization level.
Definition: compiler_hints.h:364
OMEtype & operator+=(OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:21683
@ OME_UINT8
Definition: OMEmanifests.h:99
@ OME_FIXED
Definition: OMEmanifests.h:91
#define PROCESS_COMMANDLINE_LOG_FLAGS(argc, argv)
Standard mechanism to process logging-related command line arguments.
Definition: logging_api.hpp:2595
OME debug and profiling interfaces.
Implements sparse array of OMEtype elements.
Definition: OMEarray.h:75
#define LOG_ENDLINE
Closing clause for text line output using << operators.
Definition: logging_api.hpp:2956
int VISTAOMEmain(int argc, const char *argv[], const char *envp[])
Definition: OMEuserMain.cpp:16
OMEtype operator*(const OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:39283
const class OMEtype & READ_ONLY_OMEtype
A convenience typedef for performing read-only access to sparse and associative arrays....
Definition: OMEtype.h:58
OMEtype & operator|=(OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:28157
@ OME_ARRAY
Definition: OMEmanifests.h:86
OMEarrayStorage::ARRAY_SUBSCRIPT_t ARRAY_SUBSCRIPT_t
Definition: OMEarray.h:90
OMEtype operator%(const OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:42105
OMEtype operator^(const OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:44601
int OMEabortOnError
If non-zero, abort() called on type error.
Definition: OMEtype_operators.cpp:9
#define CONSTEXPR
Generates constexpr if the compiler supports it.
Definition: compiler_hints.h:432
char NONNULL_RETURN
Definition: compiler_hints.h:745
POSIXtimeInUnits< 1000000000U > POSIXtimeInNanoseconds
Convenience typedef for nanosecond-resolution time.
Definition: time_point.hpp:580
@ OME_ASSOC
Definition: OMEmanifests.h:87
bool operator==(const OMEtype &lArg, const OMEtype &rArg)
Definition: OMEtype_operators.cpp:33
@ OME_INT32
Definition: OMEmanifests.h:79
@ OME_INT64
Definition: OMEmanifests.h:81