// Copyright (C) 2013 Davis E. King (davis@dlib.net) // License: Boost Software License See LICENSE.txt for the full license. #include "opaque_types.h" #include #include #include #include #include #include #include using namespace dlib; using namespace std; namespace py = pybind11; typedef std::vector > sparse_vect; void _make_sparse_vector ( sparse_vect& v ) { make_sparse_vector_inplace(v); } void _make_sparse_vector2 ( std::vector& v ) { for (unsigned long i = 0; i < v.size(); ++i) make_sparse_vector_inplace(v[i]); } py::tuple _load_libsvm_formatted_data( const std::string& file_name ) { std::vector samples; std::vector labels; load_libsvm_formatted_data(file_name, samples, labels); return py::make_tuple(samples, labels); } void _save_libsvm_formatted_data ( const std::string& file_name, const std::vector& samples, const std::vector& labels ) { pyassert(samples.size() == labels.size(), "Invalid inputs"); save_libsvm_formatted_data(file_name, samples, labels); } // ---------------------------------------------------------------------------------------- py::list _max_cost_assignment ( const matrix& cost ) { if (cost.nr() != cost.nc()) throw dlib::error("The input matrix must be square."); // max_cost_assignment() only works with integer matrices, so convert from // double to integer. const double scale = (std::numeric_limits::max()/1000)/max(abs(cost)); matrix int_cost = matrix_cast(round(cost*scale)); return vector_to_python_list(max_cost_assignment(int_cost)); } double _assignment_cost ( const matrix& cost, const py::list& assignment ) { return assignment_cost(cost, python_list_to_vector(assignment)); } // ---------------------------------------------------------------------------------------- size_t py_count_steps_without_decrease ( py::object arr, double probability_of_decrease ) { DLIB_CASSERT(0.5 < probability_of_decrease && probability_of_decrease < 1); return count_steps_without_decrease(python_list_to_vector(arr), probability_of_decrease); } // ---------------------------------------------------------------------------------------- size_t py_count_steps_without_decrease_robust ( py::object arr, double probability_of_decrease, double quantile_discard ) { DLIB_CASSERT(0.5 < probability_of_decrease && probability_of_decrease < 1); DLIB_CASSERT(0 <= quantile_discard && quantile_discard <= 1); return count_steps_without_decrease_robust(python_list_to_vector(arr), probability_of_decrease, quantile_discard); } // ---------------------------------------------------------------------------------------- double probability_that_sequence_is_increasing ( py::object arr ) { DLIB_CASSERT(len(arr) > 2); return probability_gradient_greater_than(python_list_to_vector(arr), 0); } // ---------------------------------------------------------------------------------------- void hit_enter_to_continue() { std::cout << "Hit enter to continue"; std::cin.get(); } // ---------------------------------------------------------------------------------------- string print_momentum_filter(const momentum_filter& r) { std::ostringstream sout; sout << "momentum_filter("; sout << "measurement_noise="<= probability_of_decrease. \n\ - Setting probability_of_decrease to 0.51 means we count until we see even a \n\ small hint of decrease, whereas a larger value of 0.99 would return a larger \n\ count since it keeps going until it is nearly certain the time series is \n\ decreasing. \n\ - The max possible output from this function is len(time_series). \n\ - The implementation of this function is done using the dlib::running_gradient \n\ object, which is a tool that finds the least squares fit of a line to the \n\ time series and the confidence interval around the slope of that line. That \n\ can then be used in a simple statistical test to determine if the slope is \n\ positive or negative." /*! requires - time_series must be a one dimensional array of real numbers. - 0.5 < probability_of_decrease < 1 ensures - If you think of the contents of time_series as a potentially noisy time series, then this function returns a count of how long the time series has gone without noticeably decreasing in value. It does this by scanning along the elements, starting from the end (i.e. time_series[-1]) to the beginning, and checking how many elements you need to examine before you are confident that the series has been decreasing in value. Here, "confident of decrease" means the probability of decrease is >= probability_of_decrease. - Setting probability_of_decrease to 0.51 means we count until we see even a small hint of decrease, whereas a larger value of 0.99 would return a larger count since it keeps going until it is nearly certain the time series is decreasing. - The max possible output from this function is len(time_series). - The implementation of this function is done using the dlib::running_gradient object, which is a tool that finds the least squares fit of a line to the time series and the confidence interval around the slope of that line. That can then be used in a simple statistical test to determine if the slope is positive or negative. !*/ ); m.def("count_steps_without_decrease_robust",py_count_steps_without_decrease_robust, py::arg("time_series"), py::arg("probability_of_decrease")=0.51, py::arg("quantile_discard")=0.1, "requires \n\ - time_series must be a one dimensional array of real numbers. \n\ - 0.5 < probability_of_decrease < 1 \n\ - 0 <= quantile_discard <= 1 \n\ ensures \n\ - This function behaves just like \n\ count_steps_without_decrease(time_series,probability_of_decrease) except that \n\ it ignores values in the time series that are in the upper quantile_discard \n\ quantile. So for example, if the quantile discard is 0.1 then the 10% \n\ largest values in the time series are ignored." /*! requires - time_series must be a one dimensional array of real numbers. - 0.5 < probability_of_decrease < 1 - 0 <= quantile_discard <= 1 ensures - This function behaves just like count_steps_without_decrease(time_series,probability_of_decrease) except that it ignores values in the time series that are in the upper quantile_discard quantile. So for example, if the quantile discard is 0.1 then the 10% largest values in the time series are ignored. !*/ ); m.def("probability_that_sequence_is_increasing",probability_that_sequence_is_increasing, py::arg("time_series"), "returns the probability that the given sequence of real numbers is increasing in value over time."); { typedef momentum_filter type; py::class_(m, "momentum_filter", R"asdf( This object is a simple tool for filtering a single scalar value that measures the location of a moving object that has some non-trivial momentum. Importantly, the measurements are noisy and the object can experience sudden unpredictable accelerations. To accomplish this filtering we use a simple Kalman filter with a state transition model of: position_{i+1} = position_{i} + velocity_{i} velocity_{i+1} = velocity_{i} + some_unpredictable_acceleration and a measurement model of: measured_position_{i} = position_{i} + measurement_noise Where some_unpredictable_acceleration and measurement_noise are 0 mean Gaussian noise sources with standard deviations of get_typical_acceleration() and get_measurement_noise() respectively. To allow for really sudden and large but infrequent accelerations, at each step we check if the current measured position deviates from the predicted filtered position by more than get_max_measurement_deviation()*get_measurement_noise() and if so we adjust the filter's state to keep it within these bounds. This allows the moving object to undergo large unmodeled accelerations, far in excess of what would be suggested by get_typical_acceleration(), without then experiencing a long lag time where the Kalman filter has to "catch up" to the new position. )asdf" ) .def(py::init(), py::arg("measurement_noise"), py::arg("typical_acceleration"), py::arg("max_measurement_deviation")) .def("measurement_noise", [](const momentum_filter& a){return a.get_measurement_noise();}) .def("typical_acceleration", [](const momentum_filter& a){return a.get_typical_acceleration();}) .def("max_measurement_deviation", [](const momentum_filter& a){return a.get_max_measurement_deviation();}) .def("__call__", [](momentum_filter& f, const double r){return f(r); }) .def("__repr__", print_momentum_filter) .def(py::pickle(&getstate, &setstate)); } m.def("find_optimal_momentum_filter", [](const py::object sequence, const double smoothness ) { return find_optimal_momentum_filter(python_list_to_vector(sequence), smoothness); }, py::arg("sequence"), py::arg("smoothness")=1, R"asdf(requires - sequences.size() != 0 - for all valid i: sequences[i].size() > 4 - smoothness >= 0 ensures - This function finds the "optimal" settings of a momentum_filter based on recorded measurement data stored in sequences. Here we assume that each vector in sequences is a complete track history of some object's measured positions. What we do is find the momentum_filter that minimizes the following objective function: sum of abs(predicted_location[i] - measured_location[i]) + smoothness*abs(filtered_location[i]-filtered_location[i-1]) Where i is a time index. The sum runs over all the data in sequences. So what we do is find the filter settings that produce smooth filtered trajectories but also produce filtered outputs that are as close to the measured positions as possible. The larger the value of smoothness the less jittery the filter outputs will be, but they might become biased or laggy if smoothness is set really high.)asdf" ); }