Commit 688b6eac authored by SWHL's avatar SWHL
Browse files

Update files

parents
#include "io.hh"
#include "../file.hh"
#include "chain.hh"
#include <cstddef>
namespace util {
namespace stream {
ReadSizeException::ReadSizeException() throw() {}
ReadSizeException::~ReadSizeException() throw() {}
void Read::Run(const ChainPosition &position) {
const std::size_t block_size = position.GetChain().BlockSize();
const std::size_t entry_size = position.GetChain().EntrySize();
for (Link link(position); link; ++link) {
std::size_t got = util::ReadOrEOF(file_, link->Get(), block_size);
UTIL_THROW_IF(got % entry_size, ReadSizeException, "File ended with " << got << " bytes, not a multiple of " << entry_size << ".");
if (got == 0) {
link.Poison();
return;
} else {
link->SetValidSize(got);
}
}
}
void PRead::Run(const ChainPosition &position) {
scoped_fd owner;
if (own_) owner.reset(file_);
const uint64_t size = SizeOrThrow(file_);
UTIL_THROW_IF(size % static_cast<uint64_t>(position.GetChain().EntrySize()), ReadSizeException, "File size " << file_ << " size is " << size << " not a multiple of " << position.GetChain().EntrySize());
const std::size_t block_size = position.GetChain().BlockSize();
const uint64_t block_size64 = static_cast<uint64_t>(block_size);
Link link(position);
uint64_t offset = 0;
for (; offset + block_size64 < size; offset += block_size64, ++link) {
ErsatzPRead(file_, link->Get(), block_size, offset);
link->SetValidSize(block_size);
}
// size - offset is <= block_size, so it casts to 32-bit fine.
if (size - offset) {
ErsatzPRead(file_, link->Get(), size - offset, offset);
link->SetValidSize(size - offset);
++link;
}
link.Poison();
}
void Write::Run(const ChainPosition &position) {
for (Link link(position); link; ++link) {
WriteOrThrow(file_, link->Get(), link->ValidSize());
}
}
void WriteAndRecycle::Run(const ChainPosition &position) {
const std::size_t block_size = position.GetChain().BlockSize();
for (Link link(position); link; ++link) {
WriteOrThrow(file_, link->Get(), link->ValidSize());
link->SetValidSize(block_size);
}
}
void PWrite::Run(const ChainPosition &position) {
uint64_t offset = 0;
for (Link link(position); link; ++link) {
ErsatzPWrite(file_, link->Get(), link->ValidSize(), offset);
offset += link->ValidSize();
}
// Trim file to size.
util::ResizeOrThrow(file_, offset);
}
} // namespace stream
} // namespace util
#ifndef UTIL_STREAM_IO_H
#define UTIL_STREAM_IO_H
#include "../exception.hh"
#include "../file.hh"
namespace util {
namespace stream {
class ChainPosition;
class ReadSizeException : public util::Exception {
public:
ReadSizeException() throw();
~ReadSizeException() throw();
};
class Read {
public:
explicit Read(int fd) : file_(fd) {}
void Run(const ChainPosition &position);
private:
int file_;
};
// Like read but uses pread so that the file can be accessed from multiple threads.
class PRead {
public:
explicit PRead(int fd, bool take_own = false) : file_(fd), own_(take_own) {}
void Run(const ChainPosition &position);
private:
int file_;
bool own_;
};
class Write {
public:
explicit Write(int fd) : file_(fd) {}
void Run(const ChainPosition &position);
private:
int file_;
};
// It's a common case that stuff is written and then recycled. So rather than
// spawn another thread to Recycle, this combines the two roles.
class WriteAndRecycle {
public:
explicit WriteAndRecycle(int fd) : file_(fd) {}
void Run(const ChainPosition &position);
private:
int file_;
};
class PWrite {
public:
explicit PWrite(int fd) : file_(fd) {}
void Run(const ChainPosition &position);
private:
int file_;
};
// Reuse the same file over and over again to buffer output.
class FileBuffer {
public:
explicit FileBuffer(int fd) : file_(fd) {}
PWrite Sink() const {
util::SeekOrThrow(file_.get(), 0);
return PWrite(file_.get());
}
PRead Source(bool discard = false) {
return PRead(discard ? file_.release() : file_.get(), discard);
}
uint64_t Size() const {
return SizeOrThrow(file_.get());
}
private:
scoped_fd file_;
};
} // namespace stream
} // namespace util
#endif // UTIL_STREAM_IO_H
#include "io.hh"
#include "chain.hh"
#include "../file.hh"
#define BOOST_TEST_MODULE IOTest
#include <boost/test/unit_test.hpp>
#include <unistd.h>
namespace util { namespace stream { namespace {
BOOST_AUTO_TEST_CASE(CopyFile) {
std::string temps("io_test_temp");
scoped_fd in(MakeTemp(temps));
for (uint64_t i = 0; i < 100000; ++i) {
WriteOrThrow(in.get(), &i, sizeof(uint64_t));
}
SeekOrThrow(in.get(), 0);
scoped_fd out(MakeTemp(temps));
ChainConfig config;
config.entry_size = 8;
config.total_memory = 1024;
config.block_count = 10;
Chain(config) >> PRead(in.get()) >> Write(out.get());
SeekOrThrow(out.get(), 0);
for (uint64_t i = 0; i < 100000; ++i) {
uint64_t got;
ReadOrThrow(out.get(), &got, sizeof(uint64_t));
BOOST_CHECK_EQUAL(i, got);
}
}
}}} // namespaces
#include "line_input.hh"
#include "../exception.hh"
#include "../file.hh"
#include "../read_compressed.hh"
#include "chain.hh"
#include <algorithm>
#include <vector>
namespace util { namespace stream {
void LineInput::Run(const ChainPosition &position) {
ReadCompressed reader(fd_);
// Holding area for beginning of line to be placed in next block.
std::vector<char> carry;
for (Link block(position); ; ++block) {
char *to = static_cast<char*>(block->Get());
char *begin = to;
char *end = to + position.GetChain().BlockSize();
std::copy(carry.begin(), carry.end(), to);
to += carry.size();
while (to != end) {
std::size_t got = reader.Read(to, end - to);
if (!got) {
// EOF
block->SetValidSize(to - begin);
++block;
block.Poison();
return;
}
to += got;
}
// Find the last newline.
char *newline;
for (newline = to - 1; ; --newline) {
UTIL_THROW_IF(newline < begin, Exception, "Did not find a newline in " << position.GetChain().BlockSize() << " bytes of input of " << NameFromFD(fd_) << ". Is this a text file?");
if (*newline == '\n') break;
}
// Copy everything after the last newline to the carry.
carry.clear();
carry.resize(to - (newline + 1));
std::copy(newline + 1, to, &*carry.begin());
block->SetValidSize(newline + 1 - begin);
}
}
}} // namespaces
#ifndef UTIL_STREAM_LINE_INPUT_H
#define UTIL_STREAM_LINE_INPUT_H
namespace util {namespace stream {
class ChainPosition;
/* Worker that reads input into blocks, ensuring that blocks contain whole
* lines. Assumes that the maximum size of a line is less than the block size
*/
class LineInput {
public:
// Takes ownership upon thread execution.
explicit LineInput(int fd);
void Run(const ChainPosition &position);
private:
int fd_;
};
}} // namespaces
#endif // UTIL_STREAM_LINE_INPUT_H
#include "multi_progress.hh"
// TODO: merge some functionality with the simple progress bar?
#include "../ersatz_progress.hh"
#include <iostream>
#include <limits>
#include <cstring>
#if !defined(_WIN32) && !defined(_WIN64)
#include <unistd.h>
#endif
namespace util { namespace stream {
namespace {
const char kDisplayCharacters[] = "-+*#0123456789";
uint64_t Next(unsigned char stone, uint64_t complete) {
return (static_cast<uint64_t>(stone + 1) * complete + MultiProgress::kWidth - 1) / MultiProgress::kWidth;
}
} // namespace
MultiProgress::MultiProgress() : active_(false), complete_(std::numeric_limits<uint64_t>::max()), character_handout_(0) {}
MultiProgress::~MultiProgress() {
if (active_ && complete_ != std::numeric_limits<uint64_t>::max())
std::cerr << '\n';
}
void MultiProgress::Activate() {
active_ =
#if !defined(_WIN32) && !defined(_WIN64)
// Is stderr a terminal?
(isatty(2) == 1)
#else
true
#endif
;
}
void MultiProgress::SetTarget(uint64_t complete) {
if (!active_) return;
complete_ = complete;
if (!complete) complete_ = 1;
memset(display_, 0, sizeof(display_));
character_handout_ = 0;
std::cerr << kProgressBanner;
}
WorkerProgress MultiProgress::Add() {
if (!active_)
return WorkerProgress(std::numeric_limits<uint64_t>::max(), *this, '\0');
std::size_t character_index;
{
boost::unique_lock<boost::mutex> lock(mutex_);
character_index = character_handout_++;
if (character_handout_ == sizeof(kDisplayCharacters) - 1)
character_handout_ = 0;
}
return WorkerProgress(Next(0, complete_), *this, kDisplayCharacters[character_index]);
}
void MultiProgress::Finished() {
if (!active_ || complete_ == std::numeric_limits<uint64_t>::max()) return;
std::cerr << '\n';
complete_ = std::numeric_limits<uint64_t>::max();
}
void MultiProgress::Milestone(WorkerProgress &worker) {
if (!active_ || complete_ == std::numeric_limits<uint64_t>::max()) return;
unsigned char stone = std::min(static_cast<uint64_t>(kWidth), worker.current_ * kWidth / complete_);
for (char *i = &display_[worker.stone_]; i < &display_[stone]; ++i) {
*i = worker.character_;
}
worker.next_ = Next(stone, complete_);
worker.stone_ = stone;
{
boost::unique_lock<boost::mutex> lock(mutex_);
std::cerr << '\r' << display_ << std::flush;
}
}
}} // namespaces
/* Progress bar suitable for chains of workers */
#ifndef UTIL_STREAM_MULTI_PROGRESS_H
#define UTIL_STREAM_MULTI_PROGRESS_H
#include <boost/thread/mutex.hpp>
#include <cstddef>
#include <stdint.h>
namespace util { namespace stream {
class WorkerProgress;
class MultiProgress {
public:
static const unsigned char kWidth = 100;
MultiProgress();
~MultiProgress();
// Turns on showing (requires SetTarget too).
void Activate();
void SetTarget(uint64_t complete);
WorkerProgress Add();
void Finished();
private:
friend class WorkerProgress;
void Milestone(WorkerProgress &worker);
bool active_;
uint64_t complete_;
boost::mutex mutex_;
// \0 at the end.
char display_[kWidth + 1];
std::size_t character_handout_;
MultiProgress(const MultiProgress &);
MultiProgress &operator=(const MultiProgress &);
};
class WorkerProgress {
public:
// Default contrutor must be initialized with operator= later.
WorkerProgress() : parent_(NULL) {}
// Not threadsafe for the same worker by default.
WorkerProgress &operator++() {
if (++current_ >= next_) {
parent_->Milestone(*this);
}
return *this;
}
WorkerProgress &operator+=(uint64_t amount) {
current_ += amount;
if (current_ >= next_) {
parent_->Milestone(*this);
}
return *this;
}
private:
friend class MultiProgress;
WorkerProgress(uint64_t next, MultiProgress &parent, char character)
: current_(0), next_(next), parent_(&parent), stone_(0), character_(character) {}
uint64_t current_, next_;
MultiProgress *parent_;
// Previous milestone reached.
unsigned char stone_;
// Character to display in bar.
char character_;
};
}} // namespaces
#endif // UTIL_STREAM_MULTI_PROGRESS_H
#ifndef UTIL_STREAM_MULTI_STREAM_H
#define UTIL_STREAM_MULTI_STREAM_H
#include "../fixed_array.hh"
#include "../scoped.hh"
#include "chain.hh"
#include "stream.hh"
#include <cstddef>
#include <new>
#include <cassert>
#include <cstdlib>
namespace util { namespace stream {
class Chains;
class ChainPositions : public util::FixedArray<util::stream::ChainPosition> {
public:
ChainPositions() {}
explicit ChainPositions(std::size_t bound) :
util::FixedArray<util::stream::ChainPosition>(bound) {}
void Init(Chains &chains);
explicit ChainPositions(Chains &chains) {
Init(chains);
}
};
class Chains : public util::FixedArray<util::stream::Chain> {
private:
template <class T, void (T::*ptr)(const ChainPositions &) = &T::Run> struct CheckForRun {
typedef Chains type;
};
public:
// Must call Init.
Chains() {}
explicit Chains(std::size_t limit) : util::FixedArray<util::stream::Chain>(limit) {}
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
return *this;
}
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
return *this;
}
Chains &operator>>(const util::stream::Recycler &recycler) {
for (util::stream::Chain *i = begin(); i != end(); ++i)
*i >> recycler;
return *this;
}
void Wait(bool release_memory = true) {
threads_.clear();
for (util::stream::Chain *i = begin(); i != end(); ++i) {
i->Wait(release_memory);
}
}
private:
boost::ptr_vector<util::stream::Thread> threads_;
Chains(const Chains &);
void operator=(const Chains &);
};
inline void ChainPositions::Init(Chains &chains) {
util::FixedArray<util::stream::ChainPosition>::Init(chains.size());
for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) {
// use "placement new" syntax to initalize ChainPosition in an already-allocated memory location
new (end()) util::stream::ChainPosition(i->Add()); Constructed();
}
}
inline Chains &operator>>(Chains &chains, ChainPositions &positions) {
positions.Init(chains);
return chains;
}
template <class T> class GenericStreams : public util::FixedArray<T> {
private:
typedef util::FixedArray<T> P;
public:
GenericStreams() {}
// Limit restricts to positions[0,limit)
void Init(const ChainPositions &positions, std::size_t limit) {
P::Init(limit);
for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) {
P::push_back(*i);
}
}
void Init(const ChainPositions &positions) {
Init(positions, positions.size());
}
GenericStreams(const ChainPositions &positions) {
Init(positions);
}
void Init(std::size_t amount) {
P::Init(amount);
}
};
template <class T> inline Chains &operator>>(Chains &chains, GenericStreams<T> &streams) {
ChainPositions positions;
chains >> positions;
streams.Init(positions);
return chains;
}
typedef GenericStreams<Stream> Streams;
}} // namespaces
#endif // UTIL_STREAM_MULTI_STREAM_H
#include "rewindable_stream.hh"
#include "../pcqueue.hh"
#include <iostream>
namespace util {
namespace stream {
RewindableStream::RewindableStream()
: current_(NULL), in_(NULL), out_(NULL), poisoned_(true) {
// nothing
}
void RewindableStream::Init(const ChainPosition &position) {
UTIL_THROW_IF2(in_, "RewindableStream::Init twice");
in_ = position.in_;
out_ = position.out_;
hit_poison_ = false;
poisoned_ = false;
progress_ = position.progress_;
entry_size_ = position.GetChain().EntrySize();
block_size_ = position.GetChain().BlockSize();
block_count_ = position.GetChain().BlockCount();
blocks_it_ = 0;
marked_ = NULL;
UTIL_THROW_IF2(block_count_ < 2, "RewindableStream needs block_count at least two");
AppendBlock();
}
RewindableStream &RewindableStream::operator++() {
assert(*this);
assert(current_ < block_end_);
assert(current_);
assert(blocks_it_ < blocks_.size());
current_ += entry_size_;
if (UTIL_UNLIKELY(current_ == block_end_)) {
// Fetch another block if necessary.
if (++blocks_it_ == blocks_.size()) {
if (!marked_) {
Flush(blocks_.begin() + blocks_it_);
blocks_it_ = 0;
}
AppendBlock();
assert(poisoned_ || (blocks_it_ == blocks_.size() - 1));
if (poisoned_) return *this;
}
Block &cur_block = blocks_[blocks_it_];
current_ = static_cast<uint8_t*>(cur_block.Get());
block_end_ = current_ + cur_block.ValidSize();
}
assert(current_);
assert(current_ >= static_cast<uint8_t*>(blocks_[blocks_it_].Get()));
assert(current_ < block_end_);
assert(block_end_ == blocks_[blocks_it_].ValidEnd());
return *this;
}
void RewindableStream::Mark() {
marked_ = current_;
Flush(blocks_.begin() + blocks_it_);
blocks_it_ = 0;
}
void RewindableStream::Rewind() {
if (current_ != marked_) {
poisoned_ = false;
}
blocks_it_ = 0;
current_ = marked_;
block_end_ = static_cast<const uint8_t*>(blocks_[blocks_it_].ValidEnd());
assert(current_);
assert(current_ >= static_cast<uint8_t*>(blocks_[blocks_it_].Get()));
assert(current_ < block_end_);
assert(block_end_ == blocks_[blocks_it_].ValidEnd());
}
void RewindableStream::Poison() {
if (blocks_.empty()) return;
assert(*this);
assert(blocks_it_ == blocks_.size() - 1);
// Produce all buffered blocks.
blocks_.back().SetValidSize(current_ - static_cast<uint8_t*>(blocks_.back().Get()));
Flush(blocks_.end());
blocks_it_ = 0;
Block poison;
if (!hit_poison_) {
in_->Consume(poison);
}
poison.SetToPoison();
out_->Produce(poison);
hit_poison_ = true;
poisoned_ = true;
}
void RewindableStream::AppendBlock() {
if (UTIL_UNLIKELY(blocks_.size() >= block_count_)) {
std::cerr << "RewindableStream trying to use more blocks than available" << std::endl;
abort();
}
if (UTIL_UNLIKELY(hit_poison_)) {
poisoned_ = true;
return;
}
Block get;
// The loop is needed since it is *feasible* that we're given 0 sized but
// valid blocks
do {
in_->Consume(get);
if (UTIL_LIKELY(get)) {
blocks_.push_back(get);
} else {
hit_poison_ = true;
poisoned_ = true;
return;
}
} while (UTIL_UNLIKELY(get.ValidSize() == 0));
current_ = static_cast<uint8_t*>(blocks_.back().Get());
block_end_ = static_cast<const uint8_t*>(blocks_.back().ValidEnd());
blocks_it_ = blocks_.size() - 1;
}
void RewindableStream::Flush(std::deque<Block>::iterator to) {
for (std::deque<Block>::iterator i = blocks_.begin(); i != to; ++i) {
out_->Produce(*i);
progress_ += i->ValidSize();
}
blocks_.erase(blocks_.begin(), to);
}
}
}
#ifndef UTIL_STREAM_REWINDABLE_STREAM_H
#define UTIL_STREAM_REWINDABLE_STREAM_H
#include "chain.hh"
#include <boost/noncopyable.hpp>
#include <deque>
namespace util {
namespace stream {
/**
* A RewindableStream is like a Stream (but one that is only used for
* creating input at the start of a chain) except that it can be rewound to
* be able to re-write a part of the stream before it is sent. Rewinding
* has a limit of 2 * block_size_ - 1 in distance (it does *not* buffer an
* entire stream into memory, only a maximum of 2 * block_size_).
*/
class RewindableStream : boost::noncopyable {
public:
/**
* Creates an uninitialized RewindableStream. You **must** call Init()
* on it later!
*/
RewindableStream();
~RewindableStream() {
Poison();
}
/**
* Initializes an existing RewindableStream at a specific position in
* a Chain.
*
* @param position The position in the chain to get input from and
* produce output on
*/
void Init(const ChainPosition &position);
/**
* Constructs a RewindableStream at a specific position in a Chain all
* in one step.
*
* Equivalent to RewindableStream a(); a.Init(....);
*/
explicit RewindableStream(const ChainPosition &position)
: in_(NULL) {
Init(position);
}
/**
* Gets the record at the current stream position. Const version.
*/
const void *Get() const {
assert(!poisoned_);
assert(current_);
return current_;
}
/**
* Gets the record at the current stream position.
*/
void *Get() {
assert(!poisoned_);
assert(current_);
return current_;
}
operator bool() const { return !poisoned_; }
bool operator!() const { return poisoned_; }
/**
* Marks the current position in the stream to be rewound to later.
* Note that you can only rewind back as far as 2 * block_size_ - 1!
*/
void Mark();
/**
* Rewinds the stream back to the marked position. This will throw an
* exception if the marked position is too far away.
*/
void Rewind();
/**
* Moves the stream forward to the next record. This internally may
* buffer a block for the purposes of rewinding.
*/
RewindableStream& operator++();
/**
* Poisons the stream. This sends any buffered blocks down the chain
* and sends a poison block as well (sending at most 2 non-poison and 1
* poison block).
*/
void Poison();
private:
void AppendBlock();
void Flush(std::deque<Block>::iterator to);
std::deque<Block> blocks_;
// current_ is in blocks_[blocks_it_] unless poisoned_.
std::size_t blocks_it_;
std::size_t entry_size_;
std::size_t block_size_;
std::size_t block_count_;
uint8_t *marked_, *current_;
const uint8_t *block_end_;
PCQueue<Block> *in_, *out_;
// Have we hit poison at the end of the stream, even if rewinding?
bool hit_poison_;
// Is the curren position poison?
bool poisoned_;
WorkerProgress progress_;
};
inline Chain &operator>>(Chain &chain, RewindableStream &stream) {
stream.Init(chain.Add());
return chain;
}
}
}
#endif
#include "io.hh"
#include "rewindable_stream.hh"
#include "../file.hh"
#define BOOST_TEST_MODULE RewindableStreamTest
#include <boost/test/unit_test.hpp>
namespace util {
namespace stream {
namespace {
BOOST_AUTO_TEST_CASE(RewindableStreamTest) {
scoped_fd in(MakeTemp("io_test_temp"));
for (uint64_t i = 0; i < 100000; ++i) {
WriteOrThrow(in.get(), &i, sizeof(uint64_t));
}
SeekOrThrow(in.get(), 0);
ChainConfig config;
config.entry_size = 8;
config.total_memory = 100;
config.block_count = 6;
Chain chain(config);
RewindableStream s;
chain >> Read(in.get()) >> s >> kRecycle;
uint64_t i = 0;
for (; s; ++s, ++i) {
BOOST_CHECK_EQUAL(i, *static_cast<const uint64_t*>(s.Get()));
if (100000UL - i == 2)
s.Mark();
}
BOOST_CHECK_EQUAL(100000ULL, i);
s.Rewind();
BOOST_CHECK_EQUAL(100000ULL - 2, *static_cast<const uint64_t*>(s.Get()));
}
}
}
}
/* Usage:
* Sort<Compare> sorter(temp, compare);
* Chain(config) >> Read(file) >> sorter.Unsorted();
* Stream stream;
* Chain chain(config) >> sorter.Sorted(internal_config, lazy_config) >> stream;
*
* Note that sorter must outlive any threads that use Unsorted or Sorted.
*
* Combiners take the form:
* bool operator()(void *into, const void *option, const Compare &compare) const
* which returns true iff a combination happened. The sorting algorithm
* guarantees compare(into, option). But it does not guarantee
* compare(option, into).
* Currently, combining is only done in merge steps, not during on-the-fly
* sort. Use a hash table for that.
*/
#ifndef UTIL_STREAM_SORT_H
#define UTIL_STREAM_SORT_H
#include "chain.hh"
#include "config.hh"
#include "io.hh"
#include "stream.hh"
#include "../file.hh"
#include "../fixed_array.hh"
#include "../scoped.hh"
#include "../sized_iterator.hh"
#include <algorithm>
#include <iostream>
#include <queue>
#include <string>
namespace util {
namespace stream {
struct NeverCombine {
template <class Compare> bool operator()(const void *, const void *, const Compare &) const {
return false;
}
};
// Manage the offsets of sorted blocks in a file.
class Offsets {
public:
explicit Offsets(int fd) : log_(fd) {
Reset();
}
int File() const { return log_; }
void Append(uint64_t length) {
if (!length) return;
++block_count_;
if (length == cur_.length) {
++cur_.run;
return;
}
WriteOrThrow(log_, &cur_, sizeof(Entry));
cur_.length = length;
cur_.run = 1;
}
void FinishedAppending() {
WriteOrThrow(log_, &cur_, sizeof(Entry));
SeekOrThrow(log_, sizeof(Entry)); // Skip 0,0 at beginning.
cur_.run = 0;
if (block_count_) {
ReadOrThrow(log_, &cur_, sizeof(Entry));
assert(cur_.length);
assert(cur_.run);
}
}
uint64_t RemainingBlocks() const { return block_count_; }
uint64_t TotalOffset() const { return output_sum_; }
uint64_t PeekSize() const {
return cur_.length;
}
uint64_t NextSize() {
assert(block_count_);
uint64_t ret = cur_.length;
output_sum_ += ret;
--cur_.run;
--block_count_;
if (!cur_.run && block_count_) {
ReadOrThrow(log_, &cur_, sizeof(Entry));
assert(cur_.length);
assert(cur_.run);
}
return ret;
}
void Reset() {
SeekOrThrow(log_, 0);
ResizeOrThrow(log_, 0);
cur_.length = 0;
cur_.run = 0;
block_count_ = 0;
output_sum_ = 0;
}
private:
int log_;
struct Entry {
uint64_t length;
uint64_t run;
};
Entry cur_;
uint64_t block_count_;
uint64_t output_sum_;
};
// A priority queue of entries backed by file buffers
template <class Compare> class MergeQueue {
public:
MergeQueue(int fd, std::size_t buffer_size, std::size_t entry_size, const Compare &compare)
: queue_(Greater(compare)), in_(fd), buffer_size_(buffer_size), entry_size_(entry_size) {}
void Push(void *base, uint64_t offset, uint64_t amount) {
queue_.push(Entry(base, in_, offset, amount, buffer_size_));
}
const void *Top() const {
return queue_.top().Current();
}
void Pop() {
Entry top(queue_.top());
queue_.pop();
if (top.Increment(in_, buffer_size_, entry_size_))
queue_.push(top);
}
std::size_t Size() const {
return queue_.size();
}
bool Empty() const {
return queue_.empty();
}
private:
// Priority queue contains these entries.
class Entry {
public:
Entry() {}
Entry(void *base, int fd, uint64_t offset, uint64_t amount, std::size_t buf_size) {
offset_ = offset;
remaining_ = amount;
buffer_end_ = static_cast<uint8_t*>(base) + buf_size;
Read(fd, buf_size);
}
bool Increment(int fd, std::size_t buf_size, std::size_t entry_size) {
current_ += entry_size;
if (current_ != buffer_end_) return true;
return Read(fd, buf_size);
}
const void *Current() const { return current_; }
private:
bool Read(int fd, std::size_t buf_size) {
current_ = buffer_end_ - buf_size;
std::size_t amount;
if (static_cast<uint64_t>(buf_size) < remaining_) {
amount = buf_size;
} else if (!remaining_) {
return false;
} else {
amount = remaining_;
buffer_end_ = current_ + remaining_;
}
ErsatzPRead(fd, current_, amount, offset_);
// Try to free the space, but don't be disappointed if we can't.
try {
HolePunch(fd, offset_, amount);
} catch (const util::Exception &) {}
offset_ += amount;
assert(current_ <= buffer_end_);
remaining_ -= amount;
return true;
}
// Buffer
uint8_t *current_, *buffer_end_;
// File
uint64_t remaining_, offset_;
};
// Wrapper comparison function for queue entries.
class Greater : public std::binary_function<const Entry &, const Entry &, bool> {
public:
explicit Greater(const Compare &compare) : compare_(compare) {}
bool operator()(const Entry &first, const Entry &second) const {
return compare_(second.Current(), first.Current());
}
private:
const Compare compare_;
};
typedef std::priority_queue<Entry, std::vector<Entry>, Greater> Queue;
Queue queue_;
const int in_;
const std::size_t buffer_size_;
const std::size_t entry_size_;
};
/* A worker object that merges. If the number of pieces to merge exceeds the
* arity, it outputs multiple sorted blocks, recording to out_offsets.
* However, users will only every see a single sorted block out output because
* Sort::Sorted insures the arity is higher than the number of pieces before
* returning this.
*/
template <class Compare, class Combine> class MergingReader {
public:
MergingReader(int in, Offsets *in_offsets, Offsets *out_offsets, std::size_t buffer_size, std::size_t total_memory, const Compare &compare, const Combine &combine) :
compare_(compare), combine_(combine),
in_(in),
in_offsets_(in_offsets), out_offsets_(out_offsets),
buffer_size_(buffer_size), total_memory_(total_memory) {}
void Run(const ChainPosition &position) {
Run(position, false);
}
void Run(const ChainPosition &position, bool assert_one) {
// Special case: nothing to read.
if (!in_offsets_->RemainingBlocks()) {
Link l(position);
l.Poison();
return;
}
// If there's just one entry, just read.
if (in_offsets_->RemainingBlocks() == 1) {
// Sequencing is important.
uint64_t offset = in_offsets_->TotalOffset();
uint64_t amount = in_offsets_->NextSize();
ReadSingle(offset, amount, position);
if (out_offsets_) out_offsets_->Append(amount);
return;
}
Stream str(position);
scoped_malloc buffer(MallocOrThrow(total_memory_));
uint8_t *const buffer_end = static_cast<uint8_t*>(buffer.get()) + total_memory_;
const std::size_t entry_size = position.GetChain().EntrySize();
while (in_offsets_->RemainingBlocks()) {
// Use bigger buffers if there's less remaining.
uint64_t per_buffer = static_cast<uint64_t>(std::max<std::size_t>(
buffer_size_,
static_cast<std::size_t>((static_cast<uint64_t>(total_memory_) / in_offsets_->RemainingBlocks()))));
per_buffer -= per_buffer % entry_size;
assert(per_buffer);
// Populate queue.
MergeQueue<Compare> queue(in_, per_buffer, entry_size, compare_);
for (uint8_t *buf = static_cast<uint8_t*>(buffer.get());
in_offsets_->RemainingBlocks() && (buf + std::min(per_buffer, in_offsets_->PeekSize()) <= buffer_end);) {
uint64_t offset = in_offsets_->TotalOffset();
uint64_t size = in_offsets_->NextSize();
queue.Push(buf, offset, size);
buf += static_cast<std::size_t>(std::min<uint64_t>(size, per_buffer));
}
// This shouldn't happen but it's probably better to die than loop indefinitely.
if (queue.Size() < 2 && in_offsets_->RemainingBlocks()) {
std::cerr << "Bug in sort implementation: not merging at least two stripes." << std::endl;
abort();
}
if (assert_one && in_offsets_->RemainingBlocks()) {
std::cerr << "Bug in sort implementation: should only be one merge group for lazy sort" << std::endl;
abort();
}
uint64_t written = 0;
// Merge including combiner support.
memcpy(str.Get(), queue.Top(), entry_size);
for (queue.Pop(); !queue.Empty(); queue.Pop()) {
if (!combine_(str.Get(), queue.Top(), compare_)) {
++written; ++str;
memcpy(str.Get(), queue.Top(), entry_size);
}
}
++written; ++str;
if (out_offsets_)
out_offsets_->Append(written * entry_size);
}
str.Poison();
}
private:
void ReadSingle(uint64_t offset, const uint64_t size, const ChainPosition &position) {
// Special case: only one to read.
const uint64_t end = offset + size;
const uint64_t block_size = position.GetChain().BlockSize();
Link l(position);
for (; offset + block_size < end; ++l, offset += block_size) {
ErsatzPRead(in_, l->Get(), block_size, offset);
l->SetValidSize(block_size);
}
ErsatzPRead(in_, l->Get(), end - offset, offset);
l->SetValidSize(end - offset);
(++l).Poison();
return;
}
Compare compare_;
Combine combine_;
int in_;
protected:
Offsets *in_offsets_;
private:
Offsets *out_offsets_;
std::size_t buffer_size_;
std::size_t total_memory_;
};
// The lazy step owns the remaining files. This keeps track of them.
template <class Compare, class Combine> class OwningMergingReader : public MergingReader<Compare, Combine> {
private:
typedef MergingReader<Compare, Combine> P;
public:
OwningMergingReader(int data, const Offsets &offsets, std::size_t buffer, std::size_t lazy, const Compare &compare, const Combine &combine)
: P(data, NULL, NULL, buffer, lazy, compare, combine),
data_(data),
offsets_(offsets) {}
void Run(const ChainPosition &position) {
P::in_offsets_ = &offsets_;
scoped_fd data(data_);
scoped_fd offsets_file(offsets_.File());
P::Run(position, true);
}
private:
int data_;
Offsets offsets_;
};
// Don't use this directly. Worker that sorts blocks.
template <class Compare> class BlockSorter {
public:
BlockSorter(Offsets &offsets, const Compare &compare) :
offsets_(&offsets), compare_(compare) {}
void Run(const ChainPosition &position) {
const std::size_t entry_size = position.GetChain().EntrySize();
for (Link link(position); link; ++link) {
// Record the size of each block in a separate file.
offsets_->Append(link->ValidSize());
void *end = static_cast<uint8_t*>(link->Get()) + link->ValidSize();
SizedSort(link->Get(), end, entry_size, compare_);
}
offsets_->FinishedAppending();
}
private:
Offsets *offsets_;
Compare compare_;
};
class BadSortConfig : public Exception {
public:
BadSortConfig() throw() {}
~BadSortConfig() throw() {}
};
/** Sort */
template <class Compare, class Combine = NeverCombine> class Sort {
public:
/** Constructs an object capable of sorting */
Sort(Chain &in, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine())
: config_(config),
data_(MakeTemp(config.temp_prefix)),
offsets_file_(MakeTemp(config.temp_prefix)), offsets_(offsets_file_.get()),
compare_(compare), combine_(combine),
entry_size_(in.EntrySize()) {
UTIL_THROW_IF(!entry_size_, BadSortConfig, "Sorting entries of size 0");
// Make buffer_size a multiple of the entry_size.
config_.buffer_size -= config_.buffer_size % entry_size_;
UTIL_THROW_IF(!config_.buffer_size, BadSortConfig, "Sort buffer too small");
UTIL_THROW_IF(config_.total_memory < config_.buffer_size * 4, BadSortConfig, "Sorting memory " << config_.total_memory << " is too small for four buffers (two read and two write).");
in >> BlockSorter<Compare>(offsets_, compare_) >> WriteAndRecycle(data_.get());
}
uint64_t Size() const {
return SizeOrThrow(data_.get());
}
// Do merge sort, terminating when lazy merge could be done with the
// specified memory. Return the minimum memory necessary to do lazy merge.
std::size_t Merge(std::size_t lazy_memory) {
if (offsets_.RemainingBlocks() <= 1) return 0;
const uint64_t lazy_arity = std::max<uint64_t>(1, lazy_memory / config_.buffer_size);
uint64_t size = Size();
/* No overflow because
* offsets_.RemainingBlocks() * config_.buffer_size <= lazy_memory ||
* size < lazy_memory
*/
if (offsets_.RemainingBlocks() <= lazy_arity || size <= static_cast<uint64_t>(lazy_memory))
return std::min<std::size_t>(size, offsets_.RemainingBlocks() * config_.buffer_size);
scoped_fd data2(MakeTemp(config_.temp_prefix));
int fd_in = data_.get(), fd_out = data2.get();
scoped_fd offsets2_file(MakeTemp(config_.temp_prefix));
Offsets offsets2(offsets2_file.get());
Offsets *offsets_in = &offsets_, *offsets_out = &offsets2;
// Double buffered writing.
ChainConfig chain_config;
chain_config.entry_size = entry_size_;
chain_config.block_count = 2;
chain_config.total_memory = config_.buffer_size * 2;
Chain chain(chain_config);
while (offsets_in->RemainingBlocks() > lazy_arity) {
if (size <= static_cast<uint64_t>(lazy_memory)) break;
std::size_t reading_memory = config_.total_memory - 2 * config_.buffer_size;
if (size < static_cast<uint64_t>(reading_memory)) {
reading_memory = static_cast<std::size_t>(size);
}
SeekOrThrow(fd_in, 0);
chain >>
MergingReader<Compare, Combine>(
fd_in,
offsets_in, offsets_out,
config_.buffer_size,
reading_memory,
compare_, combine_) >>
WriteAndRecycle(fd_out);
chain.Wait();
offsets_out->FinishedAppending();
ResizeOrThrow(fd_in, 0);
offsets_in->Reset();
std::swap(fd_in, fd_out);
std::swap(offsets_in, offsets_out);
size = SizeOrThrow(fd_in);
}
SeekOrThrow(fd_in, 0);
if (fd_in == data2.get()) {
data_.reset(data2.release());
offsets_file_.reset(offsets2_file.release());
offsets_ = offsets2;
}
if (offsets_.RemainingBlocks() <= 1) return 0;
// No overflow because the while loop exited.
return std::min(size, offsets_.RemainingBlocks() * static_cast<uint64_t>(config_.buffer_size));
}
// Output to chain, using this amount of memory, maximum, for lazy merge
// sort.
void Output(Chain &out, std::size_t lazy_memory) {
Merge(lazy_memory);
out.SetProgressTarget(Size());
out >> OwningMergingReader<Compare, Combine>(data_.get(), offsets_, config_.buffer_size, lazy_memory, compare_, combine_);
data_.release();
offsets_file_.release();
}
/* If a pipeline step is reading sorted input and writing to a different
* sort order, then there's a trade-off between using RAM to read lazily
* (avoiding copying the file) and using RAM to increase block size and,
* therefore, decrease the number of merge sort passes in the next
* iteration.
*
* Merge sort takes log_{arity}(pieces) passes. Thus, each time the chain
* block size is multiplied by arity, the number of output passes decreases
* by one. Up to a constant, then, log_{arity}(chain) is the number of
* passes saved. Chain simply divides the memory evenly over all blocks.
*
* Lazy sort saves this many passes (up to a constant)
* log_{arity}((memory-lazy)/block_count) + 1
* Non-lazy sort saves this many passes (up to the same constant):
* log_{arity}(memory/block_count)
* Add log_{arity}(block_count) to both:
* log_{arity}(memory-lazy) + 1 versus log_{arity}(memory)
* Take arity to the power of both sizes (arity > 1)
* (memory - lazy)*arity versus memory
* Solve for lazy
* lazy = memory * (arity - 1) / arity
*/
std::size_t DefaultLazy() {
float arity = static_cast<float>(config_.total_memory / config_.buffer_size);
return static_cast<std::size_t>(static_cast<float>(config_.total_memory) * (arity - 1.0) / arity);
}
// Same as Output with default lazy memory setting.
void Output(Chain &out) {
Output(out, DefaultLazy());
}
// Completely merge sort and transfer ownership to the caller.
int StealCompleted() {
// Merge all the way.
Merge(0);
SeekOrThrow(data_.get(), 0);
offsets_file_.reset();
return data_.release();
}
private:
SortConfig config_;
scoped_fd data_;
scoped_fd offsets_file_;
Offsets offsets_;
const Compare compare_;
const Combine combine_;
const std::size_t entry_size_;
};
// returns bytes to be read on demand.
template <class Compare, class Combine> uint64_t BlockingSort(Chain &chain, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = NeverCombine()) {
Sort<Compare, Combine> sorter(chain, config, compare, combine);
chain.Wait(true);
uint64_t size = sorter.Size();
sorter.Output(chain);
return size;
}
/**
* Represents an @ref util::FixedArray "array" capable of storing @ref util::stream::Sort "Sort" objects.
*
* In the anticipated use case, an instance of this class will maintain one @ref util::stream::Sort "Sort" object
* for each n-gram order (ranging from 1 up to the maximum n-gram order being processed).
* Use in this manner would enable the n-grams each n-gram order to be sorted, in parallel.
*
* @tparam Compare An @ref Comparator "ngram comparator" to use during sorting.
*/
template <class Compare, class Combine = NeverCombine> class Sorts : public FixedArray<Sort<Compare, Combine> > {
private:
typedef Sort<Compare, Combine> S;
typedef FixedArray<S> P;
public:
/**
* Constructs, but does not initialize.
*
* @ref util::FixedArray::Init() "Init" must be called before use.
*
* @see util::FixedArray::Init()
*/
Sorts() {}
/**
* Constructs an @ref util::FixedArray "array" capable of storing a fixed number of @ref util::stream::Sort "Sort" objects.
*
* @param number The maximum number of @ref util::stream::Sort "sorters" that can be held by this @ref util::FixedArray "array"
* @see util::FixedArray::FixedArray()
*/
explicit Sorts(std::size_t number) : FixedArray<Sort<Compare, Combine> >(number) {}
/**
* Constructs a new @ref util::stream::Sort "Sort" object which is stored in this @ref util::FixedArray "array".
*
* The new @ref util::stream::Sort "Sort" object is constructed using the provided @ref util::stream::SortConfig "SortConfig" and @ref Comparator "ngram comparator";
* once constructed, a new worker @ref util::stream::Thread "thread" (owned by the @ref util::stream::Chain "chain") will sort the n-gram data stored
* in the @ref util::stream::Block "blocks" of the provided @ref util::stream::Chain "chain".
*
* @see util::stream::Sort::Sort()
* @see util::stream::Chain::operator>>()
*/
void push_back(util::stream::Chain &chain, const util::stream::SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine()) {
new (P::end()) S(chain, config, compare, combine); // use "placement new" syntax to initalize S in an already-allocated memory location
P::Constructed();
}
};
} // namespace stream
} // namespace util
#endif // UTIL_STREAM_SORT_H
#include "sort.hh"
#define BOOST_TEST_MODULE SortTest
#include <boost/test/unit_test.hpp>
#include <algorithm>
#include <unistd.h>
namespace util { namespace stream { namespace {
struct CompareUInt64 : public std::binary_function<const void *, const void *, bool> {
bool operator()(const void *first, const void *second) const {
return *static_cast<const uint64_t*>(first) < *reinterpret_cast<const uint64_t*>(second);
}
};
const uint64_t kSize = 100000;
struct Putter {
Putter(std::vector<uint64_t> &shuffled) : shuffled_(shuffled) {}
void Run(const ChainPosition &position) {
Stream put_shuffled(position);
for (uint64_t i = 0; i < shuffled_.size(); ++i, ++put_shuffled) {
*static_cast<uint64_t*>(put_shuffled.Get()) = shuffled_[i];
}
put_shuffled.Poison();
}
std::vector<uint64_t> &shuffled_;
};
BOOST_AUTO_TEST_CASE(FromShuffled) {
std::vector<uint64_t> shuffled;
shuffled.reserve(kSize);
for (uint64_t i = 0; i < kSize; ++i) {
shuffled.push_back(i);
}
std::random_shuffle(shuffled.begin(), shuffled.end());
ChainConfig config;
config.entry_size = 8;
config.total_memory = 800;
config.block_count = 3;
SortConfig merge_config;
merge_config.temp_prefix = "sort_test_temp";
merge_config.buffer_size = 800;
merge_config.total_memory = 3300;
Chain chain(config);
chain >> Putter(shuffled);
BlockingSort(chain, merge_config, CompareUInt64(), NeverCombine());
Stream sorted;
chain >> sorted >> kRecycle;
for (uint64_t i = 0; i < kSize; ++i, ++sorted) {
BOOST_CHECK_EQUAL(i, *static_cast<const uint64_t*>(sorted.Get()));
}
BOOST_CHECK(!sorted);
}
}}} // namespaces
#ifndef UTIL_STREAM_STREAM_H
#define UTIL_STREAM_STREAM_H
#include "chain.hh"
#include <boost/noncopyable.hpp>
#include <cassert>
#include <stdint.h>
namespace util {
namespace stream {
class Stream : boost::noncopyable {
public:
Stream() : current_(NULL), end_(NULL) {}
void Init(const ChainPosition &position) {
entry_size_ = position.GetChain().EntrySize();
block_size_ = position.GetChain().BlockSize();
block_it_.Init(position);
StartBlock();
}
explicit Stream(const ChainPosition &position) {
Init(position);
}
operator bool() const { return current_ != NULL; }
bool operator!() const { return current_ == NULL; }
const void *Get() const { return current_; }
void *Get() { return current_; }
void Poison() {
block_it_->SetValidSize(current_ - static_cast<uint8_t*>(block_it_->Get()));
++block_it_;
block_it_.Poison();
}
Stream &operator++() {
assert(*this);
assert(current_ < end_);
current_ += entry_size_;
if (current_ == end_) {
++block_it_;
StartBlock();
}
return *this;
}
private:
void StartBlock() {
for (; block_it_ && !block_it_->ValidSize(); ++block_it_) {}
current_ = static_cast<uint8_t*>(block_it_->Get());
end_ = current_ + block_it_->ValidSize();
}
// The following are pointers to raw memory
// current_ is the current record
// end_ is the end of the block (so we know when to move to the next block)
uint8_t *current_, *end_;
std::size_t entry_size_;
std::size_t block_size_;
Link block_it_;
};
inline Chain &operator>>(Chain &chain, Stream &stream) {
stream.Init(chain.Add());
return chain;
}
} // namespace stream
} // namespace util
#endif // UTIL_STREAM_STREAM_H
#include "io.hh"
#include "stream.hh"
#include "../file.hh"
#define BOOST_TEST_MODULE StreamTest
#include <boost/test/unit_test.hpp>
#include <unistd.h>
namespace util { namespace stream { namespace {
BOOST_AUTO_TEST_CASE(StreamTest) {
scoped_fd in(MakeTemp("io_test_temp"));
for (uint64_t i = 0; i < 100000; ++i) {
WriteOrThrow(in.get(), &i, sizeof(uint64_t));
}
SeekOrThrow(in.get(), 0);
ChainConfig config;
config.entry_size = 8;
config.total_memory = 100;
config.block_count = 12;
Stream s;
Chain chain(config);
chain >> Read(in.get()) >> s >> kRecycle;
uint64_t i = 0;
for (; s; ++s, ++i) {
BOOST_CHECK_EQUAL(i, *static_cast<const uint64_t*>(s.Get()));
}
BOOST_CHECK_EQUAL(100000ULL, i);
}
}}} // namespaces
#ifndef UTIL_STREAM_TYPED_STREAM_H
#define UTIL_STREAM_TYPED_STREAM_H
// A typed wrapper to Stream for POD types.
#include "stream.hh"
namespace util { namespace stream {
template <class T> class TypedStream : public Stream {
public:
// After using the default constructor, call Init (in the parent class)
TypedStream() {}
explicit TypedStream(const ChainPosition &position) : Stream(position) {}
const T *operator->() const { return static_cast<const T*>(Get()); }
T *operator->() { return static_cast<T*>(Get()); }
const T &operator*() const { return *static_cast<const T*>(Get()); }
T &operator*() { return *static_cast<T*>(Get()); }
};
}} // namespaces
#endif // UTIL_STREAM_TYPED_STREAM_H
// Copyright 2004 The RE2 Authors. All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in string_piece.hh.
#include "string_piece.hh"
#include <algorithm>
#include <climits>
#ifndef HAVE_ICU
typedef StringPiece::size_type size_type;
void StringPiece::CopyToString(std::string* target) const {
target->assign(ptr_, length_);
}
size_type StringPiece::find(const StringPiece& s, size_type pos) const {
// Not sure why length_ < 0 was here since it's std::size_t.
if (/*length_ < 0 || */pos > static_cast<size_type>(length_))
return npos;
const char* result = std::search(ptr_ + pos, ptr_ + length_,
s.ptr_, s.ptr_ + s.length_);
const size_type xpos = result - ptr_;
return xpos + s.length_ <= length_ ? xpos : npos;
}
size_type StringPiece::find(char c, size_type pos) const {
if (length_ <= 0 || pos >= static_cast<size_type>(length_)) {
return npos;
}
const char* result = std::find(ptr_ + pos, ptr_ + length_, c);
return result != ptr_ + length_ ? result - ptr_ : npos;
}
size_type StringPiece::rfind(const StringPiece& s, size_type pos) const {
if (length_ < s.length_) return npos;
const size_t ulen = length_;
if (s.length_ == 0) return std::min(ulen, pos);
const char* last = ptr_ + std::min(ulen - s.length_, pos) + s.length_;
const char* result = std::find_end(ptr_, last, s.ptr_, s.ptr_ + s.length_);
return result != last ? result - ptr_ : npos;
}
size_type StringPiece::rfind(char c, size_type pos) const {
if (length_ <= 0) return npos;
for (int i = std::min(pos, static_cast<size_type>(length_ - 1));
i >= 0; --i) {
if (ptr_[i] == c) {
return i;
}
}
return npos;
}
// For each character in characters_wanted, sets the index corresponding
// to the ASCII code of that character to 1 in table. This is used by
// the find_.*_of methods below to tell whether or not a character is in
// the lookup table in constant time.
// The argument `table' must be an array that is large enough to hold all
// the possible values of an unsigned char. Thus it should be be declared
// as follows:
// bool table[UCHAR_MAX + 1]
static inline void BuildLookupTable(const StringPiece& characters_wanted,
bool* table) {
const size_type length = characters_wanted.length();
const char* const data = characters_wanted.data();
for (size_type i = 0; i < length; ++i) {
table[static_cast<unsigned char>(data[i])] = true;
}
}
size_type StringPiece::find_first_of(const StringPiece& s,
size_type pos) const {
if (length_ == 0 || s.length_ == 0)
return npos;
// Avoid the cost of BuildLookupTable() for a single-character search.
if (s.length_ == 1)
return find_first_of(s.ptr_[0], pos);
bool lookup[UCHAR_MAX + 1] = { false };
BuildLookupTable(s, lookup);
for (size_type i = pos; i < length_; ++i) {
if (lookup[static_cast<unsigned char>(ptr_[i])]) {
return i;
}
}
return npos;
}
size_type StringPiece::find_first_not_of(const StringPiece& s,
size_type pos) const {
if (length_ == 0)
return npos;
if (s.length_ == 0)
return 0;
// Avoid the cost of BuildLookupTable() for a single-character search.
if (s.length_ == 1)
return find_first_not_of(s.ptr_[0], pos);
bool lookup[UCHAR_MAX + 1] = { false };
BuildLookupTable(s, lookup);
for (size_type i = pos; i < length_; ++i) {
if (!lookup[static_cast<unsigned char>(ptr_[i])]) {
return i;
}
}
return npos;
}
size_type StringPiece::find_first_not_of(char c, size_type pos) const {
if (length_ == 0)
return npos;
for (; pos < length_; ++pos) {
if (ptr_[pos] != c) {
return pos;
}
}
return npos;
}
size_type StringPiece::find_last_of(const StringPiece& s, size_type pos) const {
if (length_ == 0 || s.length_ == 0)
return npos;
// Avoid the cost of BuildLookupTable() for a single-character search.
if (s.length_ == 1)
return find_last_of(s.ptr_[0], pos);
bool lookup[UCHAR_MAX + 1] = { false };
BuildLookupTable(s, lookup);
for (size_type i = std::min(pos, length_ - 1); ; --i) {
if (lookup[static_cast<unsigned char>(ptr_[i])])
return i;
if (i == 0)
break;
}
return npos;
}
size_type StringPiece::find_last_not_of(const StringPiece& s,
size_type pos) const {
if (length_ == 0)
return npos;
size_type i = std::min(pos, length_ - 1);
if (s.length_ == 0)
return i;
// Avoid the cost of BuildLookupTable() for a single-character search.
if (s.length_ == 1)
return find_last_not_of(s.ptr_[0], pos);
bool lookup[UCHAR_MAX + 1] = { false };
BuildLookupTable(s, lookup);
for (; ; --i) {
if (!lookup[static_cast<unsigned char>(ptr_[i])])
return i;
if (i == 0)
break;
}
return npos;
}
size_type StringPiece::find_last_not_of(char c, size_type pos) const {
if (length_ == 0)
return npos;
for (size_type i = std::min(pos, length_ - 1); ; --i) {
if (ptr_[i] != c)
return i;
if (i == 0)
break;
}
return npos;
}
StringPiece StringPiece::substr(size_type pos, size_type n) const {
if (pos > length_) pos = length_;
if (n > length_ - pos) n = length_ - pos;
return StringPiece(ptr_ + pos, n);
}
const size_type StringPiece::npos = size_type(-1);
#endif // !HAVE_ICU
/* If you use ICU in your program, then compile with -DHAVE_ICU -licui18n. If
* you don't use ICU, then this will use the Google implementation from Chrome.
* This has been modified from the original version to let you choose.
*/
// Copyright 2008, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// Copied from strings/stringpiece.h with modifications
//
// A string-like object that points to a sized piece of memory.
//
// Functions or methods may use const StringPiece& parameters to accept either
// a "const char*" or a "string" value that will be implicitly converted to
// a StringPiece. The implicit conversion means that it is often appropriate
// to include this .h file in other files rather than forward-declaring
// StringPiece as would be appropriate for most other Google classes.
//
// Systematic usage of StringPiece is encouraged as it will reduce unnecessary
// conversions from "const char*" to "string" and back again.
//
#ifndef UTIL_STRING_PIECE_H
#define UTIL_STRING_PIECE_H
#include "have.hh"
#include <cstring>
#include <iosfwd>
#include <ostream>
#ifdef HAVE_ICU
#include <unicode/stringpiece.h>
#include <unicode/uversion.h>
// Old versions of ICU don't define operator== and operator!=.
#if (U_ICU_VERSION_MAJOR_NUM < 4) || ((U_ICU_VERSION_MAJOR_NUM == 4) && (U_ICU_VERSION_MINOR_NUM < 4))
#warning You are using an old version of ICU. Consider upgrading to ICU >= 4.6.
inline bool operator==(const StringPiece& x, const StringPiece& y) {
if (x.size() != y.size())
return false;
return std::memcmp(x.data(), y.data(), x.size()) == 0;
}
inline bool operator!=(const StringPiece& x, const StringPiece& y) {
return !(x == y);
}
#endif // old version of ICU
U_NAMESPACE_BEGIN
inline bool starts_with(const StringPiece& longer, const StringPiece& prefix) {
int longersize = longer.size(), prefixsize = prefix.size();
return longersize >= prefixsize && std::memcmp(longer.data(), prefix.data(), prefixsize) == 0;
}
#else
#include <algorithm>
#include <cstddef>
#include <string>
#include <cstring>
#ifdef WIN32
#undef max
#undef min
#endif
class StringPiece {
public:
typedef size_t size_type;
private:
const char* ptr_;
size_type length_;
public:
// We provide non-explicit singleton constructors so users can pass
// in a "const char*" or a "string" wherever a "StringPiece" is
// expected.
StringPiece() : ptr_(NULL), length_(0) { }
StringPiece(const char* str)
: ptr_(str), length_((str == NULL) ? 0 : strlen(str)) { }
StringPiece(const std::string& str)
: ptr_(str.data()), length_(str.size()) { }
StringPiece(const char* offset, size_type len)
: ptr_(offset), length_(len) { }
// data() may return a pointer to a buffer with embedded NULs, and the
// returned buffer may or may not be null terminated. Therefore it is
// typically a mistake to pass data() to a routine that expects a NUL
// terminated string.
const char* data() const { return ptr_; }
size_type size() const { return length_; }
size_type length() const { return length_; }
bool empty() const { return length_ == 0; }
void clear() { ptr_ = NULL; length_ = 0; }
void set(const char* data, size_type len) { ptr_ = data; length_ = len; }
void set(const char* str) {
ptr_ = str;
length_ = str ? strlen(str) : 0;
}
void set(const void* data, size_type len) {
ptr_ = reinterpret_cast<const char*>(data);
length_ = len;
}
char operator[](size_type i) const { return ptr_[i]; }
void remove_prefix(size_type n) {
ptr_ += n;
length_ -= n;
}
void remove_suffix(size_type n) {
length_ -= n;
}
int compare(const StringPiece& x) const {
int r = wordmemcmp(ptr_, x.ptr_, std::min(length_, x.length_));
if (r == 0) {
if (length_ < x.length_) r = -1;
else if (length_ > x.length_) r = +1;
}
return r;
}
std::string as_string() const {
// std::string doesn't like to take a NULL pointer even with a 0 size.
return std::string(!empty() ? data() : "", size());
}
void CopyToString(std::string* target) const;
void AppendToString(std::string* target) const;
// Does "this" start with "x"
bool starts_with(const StringPiece& x) const {
return ((length_ >= x.length_) &&
(wordmemcmp(ptr_, x.ptr_, x.length_) == 0));
}
// Does "this" end with "x"
bool ends_with(const StringPiece& x) const {
return ((length_ >= x.length_) &&
(wordmemcmp(ptr_ + (length_-x.length_), x.ptr_, x.length_) == 0));
}
// standard STL container boilerplate
typedef char value_type;
typedef const char* pointer;
typedef const char& reference;
typedef const char& const_reference;
typedef ptrdiff_t difference_type;
static const size_type npos;
typedef const char* const_iterator;
typedef const char* iterator;
typedef std::reverse_iterator<const_iterator> const_reverse_iterator;
typedef std::reverse_iterator<iterator> reverse_iterator;
iterator begin() const { return ptr_; }
iterator end() const { return ptr_ + length_; }
const_reverse_iterator rbegin() const {
return const_reverse_iterator(ptr_ + length_);
}
const_reverse_iterator rend() const {
return const_reverse_iterator(ptr_);
}
size_type max_size() const { return length_; }
size_type capacity() const { return length_; }
size_type copy(char* buf, size_type n, size_type pos = 0) const;
size_type find(const StringPiece& s, size_type pos = 0) const;
size_type find(char c, size_type pos = 0) const;
size_type rfind(const StringPiece& s, size_type pos = npos) const;
size_type rfind(char c, size_type pos = npos) const;
size_type find_first_of(const StringPiece& s, size_type pos = 0) const;
size_type find_first_of(char c, size_type pos = 0) const {
return find(c, pos);
}
size_type find_first_not_of(const StringPiece& s, size_type pos = 0) const;
size_type find_first_not_of(char c, size_type pos = 0) const;
size_type find_last_of(const StringPiece& s, size_type pos = npos) const;
size_type find_last_of(char c, size_type pos = npos) const {
return rfind(c, pos);
}
size_type find_last_not_of(const StringPiece& s, size_type pos = npos) const;
size_type find_last_not_of(char c, size_type pos = npos) const;
StringPiece substr(size_type pos, size_type n = npos) const;
static int wordmemcmp(const char* p, const char* p2, size_type N) {
return std::memcmp(p, p2, N);
}
};
inline bool operator==(const StringPiece& x, const StringPiece& y) {
if (x.size() != y.size())
return false;
return std::memcmp(x.data(), y.data(), x.size()) == 0;
}
inline bool operator!=(const StringPiece& x, const StringPiece& y) {
return !(x == y);
}
inline bool starts_with(const StringPiece& longer, const StringPiece& prefix) {
return longer.starts_with(prefix);
}
#endif // HAVE_ICU undefined
inline bool operator<(const StringPiece& x, const StringPiece& y) {
const int r = std::memcmp(x.data(), y.data(),
std::min(x.size(), y.size()));
return ((r < 0) || ((r == 0) && (x.size() < y.size())));
}
inline bool operator>(const StringPiece& x, const StringPiece& y) {
return y < x;
}
inline bool operator<=(const StringPiece& x, const StringPiece& y) {
return !(x > y);
}
inline bool operator>=(const StringPiece& x, const StringPiece& y) {
return !(x < y);
}
// allow StringPiece to be logged (needed for unit testing).
inline std::ostream& operator<<(std::ostream& o, const StringPiece& piece) {
return o.write(piece.data(), static_cast<std::streamsize>(piece.size()));
}
#ifdef HAVE_ICU
U_NAMESPACE_END
using U_NAMESPACE_QUALIFIER StringPiece;
#endif
#endif // UTIL_STRING_PIECE_H
#ifndef UTIL_STRING_PIECE_HASH_H
#define UTIL_STRING_PIECE_HASH_H
#include "have.hh"
#include "string_piece.hh"
#include <boost/functional/hash.hpp>
#include <boost/version.hpp>
#ifdef HAVE_ICU
U_NAMESPACE_BEGIN
#endif
inline size_t hash_value(const StringPiece &str) {
return boost::hash_range(str.data(), str.data() + str.length());
}
#ifdef HAVE_ICU
U_NAMESPACE_END
#endif
/* Support for lookup of StringPiece in boost::unordered_map<std::string> */
struct StringPieceCompatibleHash : public std::unary_function<const StringPiece &, size_t> {
size_t operator()(const StringPiece &str) const {
return hash_value(str);
}
};
struct StringPieceCompatibleEquals : public std::binary_function<const StringPiece &, const std::string &, bool> {
bool operator()(const StringPiece &first, const StringPiece &second) const {
return first == second;
}
};
template <class T> typename T::const_iterator FindStringPiece(const T &t, const StringPiece &key) {
#if BOOST_VERSION < 104200
std::string temp(key.data(), key.size());
return t.find(temp);
#else
return t.find(key, StringPieceCompatibleHash(), StringPieceCompatibleEquals());
#endif
}
template <class T> typename T::iterator FindStringPiece(T &t, const StringPiece &key) {
#if BOOST_VERSION < 104200
std::string temp(key.data(), key.size());
return t.find(temp);
#else
return t.find(key, StringPieceCompatibleHash(), StringPieceCompatibleEquals());
#endif
}
#endif // UTIL_STRING_PIECE_HASH_H
#ifndef UTIL_STRING_STREAM_H
#define UTIL_STRING_STREAM_H
#include "fake_ostream.hh"
#include <cassert>
#include <string>
namespace util {
class StringStream : public FakeOStream<StringStream> {
public:
StringStream() {}
StringStream &flush() { return *this; }
StringStream &write(const void *data, std::size_t length) {
out_.append(static_cast<const char*>(data), length);
return *this;
}
const std::string &str() const { return out_; }
void str(const std::string &val) { out_ = val; }
void swap(std::string &str) { std::swap(out_, str); }
protected:
friend class FakeOStream<StringStream>;
char *Ensure(std::size_t amount) {
std::size_t current = out_.size();
out_.resize(out_.size() + amount);
return &out_[current];
}
void AdvanceTo(char *to) {
assert(to <= &*out_.end());
assert(to >= &*out_.begin());
out_.resize(to - &*out_.begin());
}
private:
std::string out_;
};
} // namespace
#endif // UTIL_STRING_STREAM_H
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment