My Project
Public Member Functions | Static Public Member Functions | Data Fields | Private Attributes | Friends
LibThread::Scheduler Class Reference

Public Member Functions

 Scheduler (int n)
 
void set_maxconcurrency (int n)
 
int get_maxconcurrency ()
 
int threadpool_size (ThreadPool *pool)
 
virtual ~Scheduler ()
 
ThreadStategetThread (int i)
 
void shutdown (bool wait)
 
void addThread (ThreadPool *owner, ThreadState *thread)
 
void attachJob (ThreadPool *pool, Job *job)
 
void detachJob (Job *job)
 
void queueJob (Job *job)
 
void broadcastJob (ThreadPool *pool, Job *job)
 
void cancelDeps (Job *job)
 
void cancelJob (Job *job)
 
void waitJob (Job *job)
 
void clearThreadState ()
 
- Public Member Functions inherited from LibThread::SharedObject
 SharedObject ()
 
virtual ~SharedObject ()
 
void set_type (int type_init)
 
int get_type ()
 
void set_name (std::string &name_init)
 
void set_name (const char *s)
 
std::stringget_name ()
 
void incref (int by=1)
 
long decref ()
 
long getref ()
 
virtual BOOLEAN op2 (int op, leftv res, leftv a1, leftv a2)
 
virtual BOOLEAN op3 (int op, leftv res, leftv a1, leftv a2, leftv a3)
 

Static Public Member Functions

static void notifyDeps (Scheduler *scheduler, Job *job)
 
static void * main (ThreadState *ts, void *arg)
 

Data Fields

Lock lock
 

Private Attributes

bool single_threaded
 
size_t jobid
 
int nthreads
 
int maxconcurrency
 
int running
 
bool shutting_down
 
int shutdown_counter
 
vector< ThreadState * > threads
 
vector< ThreadPool * > thread_owners
 
priority_queue< Job *, vector< Job * >, JobCompareglobal_queue
 
vector< JobQueue * > thread_queues
 
vector< Job * > pending
 
ConditionVariable cond
 
ConditionVariable response
 

Friends

class Job
 

Detailed Description

Definition at line 1654 of file shared.cc.

Constructor & Destructor Documentation

◆ Scheduler()

LibThread::Scheduler::Scheduler ( int  n)
inline

Definition at line 1673 of file shared.cc.

1673 :
1675 single_threaded(n==0), nthreads(n == 0 ? 1 : n),
1676 lock(true), cond(&lock), response(&lock),
1677 shutting_down(false), shutdown_counter(0), jobid(0),
1678 maxconcurrency(n), running(0)
1679 {
1680 thread_queues.push_back(new JobQueue());
1681 }
vector< ThreadPool * > thread_owners
Definition: shared.cc:1664
vector< JobQueue * > thread_queues
Definition: shared.cc:1666
ConditionVariable response
Definition: shared.cc:1669
priority_queue< Job *, vector< Job * >, JobCompare > global_queue
Definition: shared.cc:1665
ConditionVariable cond
Definition: shared.cc:1668
vector< ThreadState * > threads
Definition: shared.cc:1663
queue< Job * > JobQueue
Definition: shared.cc:1620

◆ ~Scheduler()

virtual LibThread::Scheduler::~Scheduler ( )
inlinevirtual

Definition at line 1696 of file shared.cc.

1696 {
1697 for (unsigned i = 0; i < thread_queues.size(); i++) {
1698 JobQueue *q = thread_queues[i];
1699 while (!q->empty()) {
1700 Job *job = q->front();
1701 q->pop();
1702 releaseShared(job);
1703 }
1704 }
1705 thread_queues.clear();
1706 threads.clear();
1707 }
int i
Definition: cfEzgcd.cc:132
void releaseShared(SharedObject *obj)
Definition: shared.cc:197

Member Function Documentation

◆ addThread()

void LibThread::Scheduler::addThread ( ThreadPool owner,
ThreadState thread 
)
inline

Definition at line 1735 of file shared.cc.

1735 {
1736 lock.lock();
1737 thread_owners.push_back(owner);
1738 threads.push_back(thread);
1739 thread_queues.push_back(new JobQueue());
1740 lock.unlock();
1741 }
void lock()
Definition: thread.h:46
void unlock()
Definition: thread.h:57

◆ attachJob()

void LibThread::Scheduler::attachJob ( ThreadPool pool,
Job job 
)
inline

Definition at line 1742 of file shared.cc.

1742 {
1743 lock.lock();
1744 job->pool = pool;
1745 job->id = jobid++;
1746 acquireShared(job);
1747 if (job->ready()) {
1748 global_queue.push(job);
1749 cond.signal();
1750 }
1751 else if (job->pending_index < 0) {
1752 job->pool = pool;
1753 job->pending_index = pending.size();
1754 pending.push_back(job);
1755 }
1756 lock.unlock();
1757 }
void signal()
Definition: thread.h:97
ThreadPool * pool
Definition: shared.cc:1551
long pending_index
Definition: shared.cc:1554
virtual bool ready()
Definition: shared.cc:1605
vector< Job * > pending
Definition: shared.cc:1667
void acquireShared(SharedObject *obj)
Definition: shared.cc:193

◆ broadcastJob()

void LibThread::Scheduler::broadcastJob ( ThreadPool pool,
Job job 
)
inline

Definition at line 1776 of file shared.cc.

1776 {
1777 lock.lock();
1778 for (unsigned i = 0; i <thread_queues.size(); i++) {
1779 if (thread_owners[i] == pool) {
1780 acquireShared(job);
1781 thread_queues[i]->push(job);
1782 }
1783 }
1784 lock.unlock();
1785 }

◆ cancelDeps()

void LibThread::Scheduler::cancelDeps ( Job job)
inline

Definition at line 1786 of file shared.cc.

1786 {
1787 vector<Job *> &notify = job->notify;
1788 for (unsigned i = 0; i <notify.size(); i++) {
1789 Job *next = notify[i];
1790 if (!next->cancelled) {
1791 cancelJob(next);
1792 }
1793 }
1794 }
vector< Job * > notify
Definition: shared.cc:1556
void cancelJob(Job *job)
Definition: shared.cc:1795
ListNode * next
Definition: janet.h:31

◆ cancelJob()

void LibThread::Scheduler::cancelJob ( Job job)
inline

Definition at line 1795 of file shared.cc.

1795 {
1796 lock.lock();
1797 if (!job->cancelled) {
1798 job->cancelled = true;
1799 if (!job->running && !job->done) {
1800 job->done = true;
1801 cancelDeps(job);
1802 }
1803 }
1804 lock.unlock();
1805 }
bool cancelled
Definition: shared.cc:1565
void cancelDeps(Job *job)
Definition: shared.cc:1786

◆ clearThreadState()

void LibThread::Scheduler::clearThreadState ( )
inline

Definition at line 1826 of file shared.cc.

1826 {
1827 threads.clear();
1828 }

◆ detachJob()

void LibThread::Scheduler::detachJob ( Job job)
inline

Definition at line 1758 of file shared.cc.

1758 {
1759 lock.lock();
1760 long i = job->pending_index;
1761 job->pending_index = -1;
1762 if (i >= 0) {
1763 job = pending.back();
1764 pending.resize(pending.size()-1);
1765 pending[i] = job;
1766 job->pending_index = i;
1767 }
1768 lock.unlock();
1769 }

◆ get_maxconcurrency()

int LibThread::Scheduler::get_maxconcurrency ( )
inline

Definition at line 1685 of file shared.cc.

1685 {
1686 return maxconcurrency;
1687 }

◆ getThread()

ThreadState * LibThread::Scheduler::getThread ( int  i)
inline

Definition at line 1708 of file shared.cc.

1708{ return threads[i]; }

◆ main()

static void * LibThread::Scheduler::main ( ThreadState ts,
void *  arg 
)
inlinestatic

Definition at line 1856 of file shared.cc.

1856 {
1857 SchedInfo *info = (SchedInfo *) arg;
1858 Scheduler *scheduler = info->scheduler;
1859 ThreadPool *oldThreadPool = currentThreadPoolRef;
1860 // TODO: set current thread pool
1861 // currentThreadPoolRef = pool;
1862 Lock &lock = scheduler->lock;
1863 ConditionVariable &cond = scheduler->cond;
1864 ConditionVariable &response = scheduler->response;
1865 JobQueue *my_queue = scheduler->thread_queues[info->num];
1866 if (!scheduler->single_threaded)
1867 thread_init();
1868 lock.lock();
1869 for (;;) {
1870 if (info->job && info->job->done)
1871 break;
1872 if (scheduler->shutting_down) {
1873 scheduler->shutdown_counter++;
1874 scheduler->response.signal();
1875 break;
1876 }
1877 if (!my_queue->empty()) {
1878 Job *job = my_queue->front();
1879 my_queue->pop();
1880 if (!scheduler->global_queue.empty())
1881 cond.signal();
1882 currentJobRef = job;
1883 job->run();
1885 notifyDeps(scheduler, job);
1886 releaseShared(job);
1887 scheduler->response.signal();
1888 continue;
1889 } else if (!scheduler->global_queue.empty()) {
1890 Job *job = scheduler->global_queue.top();
1891 scheduler->global_queue.pop();
1892 if (!scheduler->global_queue.empty())
1893 cond.signal();
1894 currentJobRef = job;
1895 job->run();
1897 notifyDeps(scheduler, job);
1898 releaseShared(job);
1899 scheduler->response.signal();
1900 continue;
1901 } else {
1902 if (scheduler->single_threaded) {
1903 break;
1904 }
1905 cond.wait();
1906 }
1907 }
1908 // TODO: correct current thread pool
1909 // releaseShared(currentThreadPoolRef);
1910 currentThreadPoolRef = oldThreadPool;
1911 scheduler->lock.unlock();
1912 delete info;
1913 return NULL;
1914 }
void wait()
Definition: thread.h:88
void run()
Definition: shared.cc:1982
static void notifyDeps(Scheduler *scheduler, Job *job)
Definition: shared.cc:1829
Definition: thread.h:17
const ExtensionInfo & info
< [in] sqrfree poly
STATIC_VAR Job * currentJobRef
Definition: shared.cc:1631
STATIC_VAR ThreadPool * currentThreadPoolRef
Definition: shared.cc:1630
void thread_init()
Definition: shared.cc:1373
#define NULL
Definition: omList.c:12

◆ notifyDeps()

static void LibThread::Scheduler::notifyDeps ( Scheduler scheduler,
Job job 
)
inlinestatic

Definition at line 1829 of file shared.cc.

1829 {
1830 vector<Job *> &notify = job->notify;
1831 job->incref(notify.size());
1832 for (unsigned i = 0; i <notify.size(); i++) {
1833 Job *next = notify[i];
1834 if (!next->queued && next->ready() && !next->cancelled) {
1835 next->queued = true;
1836 scheduler->queueJob(next);
1837 }
1838 }
1839 vector<Trigger *> &triggers = job->triggers;
1840 leftv arg = NULL;
1841 if (triggers.size() > 0 && job->result.size() > 0)
1842 arg = LinTree::from_string(job->result);
1843 for (unsigned i = 0; i < triggers.size(); i++) {
1844 Trigger *trigger = triggers[i];
1845 if (trigger->accept(arg)) {
1846 trigger->activate(arg);
1847 if (trigger->ready())
1848 scheduler->queueJob(trigger);
1849 }
1850 }
1851 if (arg) {
1852 arg->CleanUp();
1853 omFreeBin(arg, sleftv_bin);
1854 }
1855 }
string result
Definition: shared.cc:1559
vector< Trigger * > triggers
Definition: shared.cc:1557
void queueJob(Job *job)
Definition: shared.cc:1770
void incref(int by=1)
Definition: shared.cc:170
virtual void activate(leftv arg)=0
virtual bool accept(leftv arg)=0
Class used for (list of) interpreter objects.
Definition: subexpr.h:83
void CleanUp(ring r=currRing)
Definition: subexpr.cc:348
EXTERN_VAR omBin sleftv_bin
Definition: ipid.h:145
leftv from_string(std::string &str)
Definition: lintree.cc:854
#define omFreeBin(addr, bin)
Definition: omAllocDecl.h:259

◆ queueJob()

void LibThread::Scheduler::queueJob ( Job job)
inline

Definition at line 1770 of file shared.cc.

1770 {
1771 lock.lock();
1772 global_queue.push(job);
1773 cond.signal();
1774 lock.unlock();
1775 }

◆ set_maxconcurrency()

void LibThread::Scheduler::set_maxconcurrency ( int  n)
inline

Definition at line 1682 of file shared.cc.

1682 {
1683 maxconcurrency = n;
1684 }

◆ shutdown()

void LibThread::Scheduler::shutdown ( bool  wait)
inline

Definition at line 1709 of file shared.cc.

1709 {
1710 if (single_threaded) {
1711 SchedInfo *info = new SchedInfo();
1712 info->num = 0;
1713 info->scheduler = this;
1714 acquireShared(this);
1715 info->job = NULL;
1717 return;
1718 }
1719 lock.lock();
1720 if (wait) {
1721 while (!global_queue.empty()) {
1722 response.wait();
1723 }
1724 }
1725 shutting_down = true;
1726 while (shutdown_counter < nthreads) {
1727 cond.broadcast();
1728 response.wait();
1729 }
1730 lock.unlock();
1731 for (unsigned i = 0; i <threads.size(); i++) {
1733 }
1734 }
void broadcast()
Definition: thread.h:103
static void * main(ThreadState *ts, void *arg)
Definition: shared.cc:1856
void * joinThread(ThreadState *ts)
Definition: shared.cc:1474
wait
Definition: si_signals.h:51

◆ threadpool_size()

int LibThread::Scheduler::threadpool_size ( ThreadPool pool)
inline

Definition at line 1688 of file shared.cc.

1688 {
1689 int n;
1690 for (unsigned i = 0; i <thread_owners.size(); i++) {
1691 if (thread_owners[i] == pool)
1692 n++;
1693 }
1694 return n;
1695 }

◆ waitJob()

void LibThread::Scheduler::waitJob ( Job job)
inline

Definition at line 1806 of file shared.cc.

1806 {
1807 if (single_threaded) {
1808 SchedInfo *info = new SchedInfo();
1809 info->num = 0;
1810 info->scheduler = this;
1811 acquireShared(this);
1812 info->job = job;
1814 } else {
1815 lock.lock();
1816 for (;;) {
1817 if (job->done || job->cancelled) {
1818 break;
1819 }
1820 response.wait();
1821 }
1822 response.signal(); // forward signal
1823 lock.unlock();
1824 }
1825 }

Friends And Related Function Documentation

◆ Job

friend class Job
friend

Definition at line 1670 of file shared.cc.

Field Documentation

◆ cond

ConditionVariable LibThread::Scheduler::cond
private

Definition at line 1668 of file shared.cc.

◆ global_queue

priority_queue<Job *, vector<Job *>, JobCompare> LibThread::Scheduler::global_queue
private

Definition at line 1665 of file shared.cc.

◆ jobid

size_t LibThread::Scheduler::jobid
private

Definition at line 1657 of file shared.cc.

◆ lock

Lock LibThread::Scheduler::lock

Definition at line 1672 of file shared.cc.

◆ maxconcurrency

int LibThread::Scheduler::maxconcurrency
private

Definition at line 1659 of file shared.cc.

◆ nthreads

int LibThread::Scheduler::nthreads
private

Definition at line 1658 of file shared.cc.

◆ pending

vector<Job *> LibThread::Scheduler::pending
private

Definition at line 1667 of file shared.cc.

◆ response

ConditionVariable LibThread::Scheduler::response
private

Definition at line 1669 of file shared.cc.

◆ running

int LibThread::Scheduler::running
private

Definition at line 1660 of file shared.cc.

◆ shutdown_counter

int LibThread::Scheduler::shutdown_counter
private

Definition at line 1662 of file shared.cc.

◆ shutting_down

bool LibThread::Scheduler::shutting_down
private

Definition at line 1661 of file shared.cc.

◆ single_threaded

bool LibThread::Scheduler::single_threaded
private

Definition at line 1656 of file shared.cc.

◆ thread_owners

vector<ThreadPool *> LibThread::Scheduler::thread_owners
private

Definition at line 1664 of file shared.cc.

◆ thread_queues

vector<JobQueue *> LibThread::Scheduler::thread_queues
private

Definition at line 1666 of file shared.cc.

◆ threads

vector<ThreadState *> LibThread::Scheduler::threads
private

Definition at line 1663 of file shared.cc.


The documentation for this class was generated from the following file: