From 213576f4ab0d087101c24c82366aa0870a2c4742 Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Thu, 12 Mar 2015 02:58:57 -0400 Subject: [PATCH 01/14] preliminary support for python scripting --- Makefile | 14 ++- src/app/python/main.cc | 35 +++++++ src/app/python/mnist_io.py | 37 +++++++ src/app/python/mnist_mlp.py | 159 +++++++++++++++++++++++++++++++ src/app/python/python_bindings.h | 88 +++++++++++++++++ src/app/python/python_env.h | 73 ++++++++++++++ src/app/python/python_server.h | 35 +++++++ src/app/python/python_updater.h | 64 +++++++++++++ src/app/python/shared_model.h | 127 ++++++++++++++++++++++++ src/app/python/test.py | 41 ++++++++ src/app/python/updater.h | 16 ++++ 11 files changed, 688 insertions(+), 1 deletion(-) create mode 100644 src/app/python/main.cc create mode 100644 src/app/python/mnist_io.py create mode 100644 src/app/python/mnist_mlp.py create mode 100644 src/app/python/python_bindings.h create mode 100644 src/app/python/python_env.h create mode 100644 src/app/python/python_server.h create mode 100644 src/app/python/python_updater.h create mode 100644 src/app/python/shared_model.h create mode 100644 src/app/python/test.py create mode 100644 src/app/python/updater.h diff --git a/Makefile b/Makefile index 8db84a0..2ed49f4 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,7 @@ CC = g++ OPT = -O3 -ggdb THIRD_PATH=$(shell pwd)/third_party +THIRD_INC=-I$(shell pwd)/third_party/include STATIC_THIRD_LIB=0 ifeq ($(STATIC_THIRD_LIB), 1) THIRD_LIB=$(addprefix $(THIRD_PATH)/lib/, libgflags.a libzmq.a libprotobuf.a libglog.a libz.a libsnappy.a) @@ -12,8 +13,16 @@ THIRD_LIB=-L$(THIRD_PATH)/lib -lgflags -lzmq -lprotobuf -lglog -lz -lsnappy endif # THIRD_LIB+=-ltcmalloc_and_profiler +# TODO: detect the Python version +THIRD_INC+=-I/usr/include/python2.7 +THIRD_LIB+=-lpython2.7 +# borrow boost-python and boost-numpy from Minerva +THIRD_INC+=-I../minerva/deps/include +THIRD_LIB+=-L../minerva/deps/lib +THIRD_LIB+=-lboost_python -lboost_numpy + WARN = -Wall -Wno-unused-function -finline-functions -Wno-sign-compare #-Wconversion -INCPATH = -I./src -I$(THIRD_PATH)/include +INCPATH = -I./src $(THIRD_INC) CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH) LDFLAGS += $(THIRD_LIB) -lpthread -lrt @@ -30,6 +39,9 @@ app: build/ps build/hello: build/app/hello_world/main.o $(PS_LIB) $(PS_MAIN) $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ +build/ps_python: build/app/python/main.o $(PS_LIB) $(PS_MAIN) + $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ + sys_srcs = $(wildcard src/util/*.cc) $(wildcard src/data/*.cc) \ $(wildcard src/system/*.cc) $(wildcard src/filter/*.cc) sys_protos = $(wildcard src/*/proto/*.proto) diff --git a/src/app/python/main.cc b/src/app/python/main.cc new file mode 100644 index 0000000..b6faee3 --- /dev/null +++ b/src/app/python/main.cc @@ -0,0 +1,35 @@ +#include "ps.h" +#include "python_env.h" +#include "python_server.h" + +namespace PS { + +DEFINE_string(script, "", "the Python script path"); + +App* CreateServerNode(const std::string& conf) { + static int argc = 2; + static std::string s_conf = conf; + static char* argv[2] = { const_cast(""), const_cast(s_conf.c_str()) }; + + PS::PythonEnv* py_env = new PS::PythonEnv(); + py_env->load_file(PS::FLAGS_script.c_str(), argc, argv); + + return new PythonServer(py_env); +} + +} // namespace PS + +int WorkerNodeMain(int argc, char *argv[]) { + PS::PythonEnv py_env; + py_env.load_file(PS::FLAGS_script.c_str(), argc, argv); + + try { + py_env.globals().get("worker_node_main")(); + } catch (boost::python::error_already_set) { + PyErr_Print(); + throw; + } + + return 0; +} + diff --git a/src/app/python/mnist_io.py b/src/app/python/mnist_io.py new file mode 100644 index 0000000..87441a1 --- /dev/null +++ b/src/app/python/mnist_io.py @@ -0,0 +1,37 @@ +import numpy as np +import scipy.io as si + +def _extract(prefix, md, max_dig): + ret = [] + for dig in range(max_dig): + samples = md[prefix + str(dig)] + labels = np.empty([samples.shape[0], 1], dtype=np.float32) + labels.fill(dig) + ret.append(np.hstack((samples.astype(np.float32) / 256, labels))) + return ret + +def _split_sample_and_label(merged_mb): + [s, l] = np.hsplit(merged_mb, [merged_mb.shape[1] - 1]) + # change label to sparse representation + n = merged_mb.shape[0] + ll = np.zeros([n, 10], dtype=np.float32) + ll[np.arange(n), l.astype(int).flat] = 1 + return (s, ll) + +def load_mb_from_mat(mat_file, mb_size): + # load from mat + md = si.loadmat(mat_file) + # merge all data + train_all = np.concatenate(_extract('train', md, 10)) + test_all = np.concatenate(_extract('test', md, 10)) + # shuffle + np.random.shuffle(train_all) + # make minibatch + train_mb = np.vsplit(train_all, range(mb_size, train_all.shape[0], mb_size)) + train_data = map(_split_sample_and_label, train_mb) + test_data = _split_sample_and_label(test_all) + print 'Training data: %d mini-batches' % len(train_mb) + print 'Test data: %d samples' % test_all.shape[0] + print train_data[0][1].shape + return (train_data, test_data) + diff --git a/src/app/python/mnist_mlp.py b/src/app/python/mnist_mlp.py new file mode 100644 index 0000000..eb36179 --- /dev/null +++ b/src/app/python/mnist_mlp.py @@ -0,0 +1,159 @@ +import sys,os +import math +import owl +import owl.elewise as ele +import owl.conv as co +import numpy as np +import mnist_io + +# PS +import ps + +class MnistTrainer: + def __init__(self, data_file='mnist_all.mat', num_epochs=100, mb_size=256, eps_w=0.01, eps_b=0.01): + self.cpu = owl.create_cpu_device() + self.gpu = owl.create_gpu_device(0) + self.data_file = data_file + self.num_epochs=num_epochs + self.mb_size=mb_size + self.eps_w=eps_w + self.eps_b=eps_b + # init weight + l1 = 784; l2 = 256; l3 = 10 + # PS: do not initialize weights on workers + # self.l1 = l1; self.l2 = l2; self.l3 = l3 + # self.w1 = owl.randn([l2, l1], 0.0, math.sqrt(4.0 / (l1 + l2))) + # self.w2 = owl.randn([l3, l2], 0.0, math.sqrt(4.0 / (l2 + l3))) + # self.b1 = owl.zeros([l2, 1]) + # self.b2 = owl.zeros([l3, 1]) + # PS: instead, pull weights from servers + t_w1 = owl.zeros([l2, l1]).to_numpy() + t_w2 = owl.zeros([l3, l2]).to_numpy() + t_b1 = owl.zeros([l2, 1]).to_numpy() + t_b2 = owl.zeros([l3, 1]).to_numpy() + ps.PullWeight(t_w1, 'w1') + ps.PullWeight(t_w2, 'w2') + ps.PullWeight(t_b1, 'b1') + ps.PullWeight(t_b2, 'b2') + self.w1 = owl.from_numpy(t_w1) + self.w2 = owl.from_numpy(t_w2) + self.b1 = owl.from_numpy(t_b1) + self.b2 = owl.from_numpy(t_b2) + + def run(self): + (train_data, test_data) = mnist_io.load_mb_from_mat(self.data_file, self.mb_size) + np.set_printoptions(linewidth=200) + num_test_samples = test_data[0].shape[0] + (test_samples, test_labels) = map(lambda npdata : owl.from_numpy(npdata), test_data) + count = 1 + owl.set_device(self.gpu) + for epoch in range(self.num_epochs): + print '---Start epoch #%d' % epoch + # train + for (mb_samples, mb_labels) in train_data: + num_samples = mb_samples.shape[0] + + a1 = owl.from_numpy(mb_samples) + target = owl.from_numpy(mb_labels) + + # ff + a2 = ele.relu(self.w1 * a1 + self.b1) + a3 = self.w2 * a2 + self.b2 + # softmax & error + out = co.softmax(a3) + s3 = out - target + # bp + s2 = self.w2.trans() * s3 + s2 = ele.relu_back(s2, a2) + # grad + gw1 = s2 * a1.trans() / num_samples + gb1 = s2.sum(1) / num_samples + gw2 = s3 * a2.trans() / num_samples + gb2 = s3.sum(1) / num_samples + # update + # PS: do not update weights locally + #self.w1 -= self.eps_w * gw1 + #self.w2 -= self.eps_w * gw2 + #self.b1 -= self.eps_b * gb1 + #self.b2 -= self.eps_b * gb2 + # PS: instead, push gradients and pull weights from servers + t_w1 = self.w1.to_numpy() + t_w2 = self.w2.to_numpy() + t_b1 = self.b1.to_numpy() + t_b2 = self.b2.to_numpy() + ps.PushGradAndPullWeight(gw1.to_numpy(), t_w1, 'w1') + ps.PushGradAndPullWeight(gw2.to_numpy(), t_w2, 'w2') + ps.PushGradAndPullWeight(gb1.to_numpy(), t_b1, 'b1') + ps.PushGradAndPullWeight(gb2.to_numpy(), t_b2, 'b2') + self.w1 = owl.from_numpy(t_w1) + self.w2 = owl.from_numpy(t_w2) + self.b1 = owl.from_numpy(t_b1) + self.b2 = owl.from_numpy(t_b2) + + if (count % 40 == 0): + correct = out.argmax(0) - target.argmax(0) + val = correct.to_numpy() + print 'Training error:', float(np.count_nonzero(val)) / num_samples + count = count + 1 + + # test + a1 = test_samples + a2 = ele.relu(self.w1 * a1 + self.b1) + a3 = self.w2 * a2 + self.b2 + correct = a3.argmax(0) - test_labels.argmax(0) + val = correct.to_numpy() + #print val + print 'Testing error:', float(np.count_nonzero(val)) / num_test_samples + print '---Finish epoch #%d' % epoch + +# PS - server +g_server_cpu = None + +def init_layer(name, weight): + # weight initialization + + # we must use the CPU on the server to run Minerva + global g_server_cpu + if g_server_cpu is None: + g_server_cpu = owl.create_cpu_device() + + print(ps.myNodeID(), ", this is server ", ps.myRank()) + + l1 = 784; l2 = 256; l3 = 10 + + w1 = owl.randn([l2, l1], 0.0, math.sqrt(4.0 / (l1 + l2))).to_numpy() + w2 = owl.randn([l3, l2], 0.0, math.sqrt(4.0 / (l2 + l3))).to_numpy() + b1 = owl.zeros([l2, 1]).to_numpy() + b2 = owl.zeros([l3, 1]).to_numpy() + + if name == 'w1': + np.copyto(weight, w1.flatten()) + elif name == 'w2': + np.copyto(weight, w2.flatten()) + elif name == 'b1': + np.copyto(weight, b1.flatten()) + elif name == 'b2': + np.copyto(weight, b2.flatten()) + else: + assert False + print('init_layer done') + +def update_layer(name, weight, gradient): + eps_w = 0.01 + eps_b = 0.01 + + if name[0] == 'w': + weight -= eps_w * gradient + elif name[0] == 'b': + weight -= eps_b * gradient + else: + assert False + +# PS - worker +def worker_node_main(): + trainer = MnistTrainer(num_epochs = 10) + trainer.run() + +if __name__ == '__main__': + owl.initialize(sys.argv) + diff --git a/src/app/python/python_bindings.h b/src/app/python/python_bindings.h new file mode 100644 index 0000000..ef070fd --- /dev/null +++ b/src/app/python/python_bindings.h @@ -0,0 +1,88 @@ +#pragma once +#include +#include +#include +#include +#include "ps.h" +#include "shared_model.h" + +namespace PS { + +static SharedModel *shared_model = nullptr; + +static void PushGradAndPullWeight(boost::numpy::ndarray grad, boost::numpy::ndarray weight, std::string name) { + static std::mutex mu; + + if (!shared_model) { + std::lock_guard lg(mu); + if (!shared_model) + shared_model = new SharedModel(); + } + + std::size_t size = 1; + for (int i = 0; i < weight.get_nd(); i++) + size *= weight.shape(i); + if (size == 0) + return; + + // push + float* grad_data = reinterpret_cast(grad.get_data()); + SArray val; val.copyFrom(grad_data, size); + MessagePtr push_msg(new Message(kServerGroup)); + push_msg->addValue(val); + // LL << val; + push_msg->task.set_key_channel_str(name); + Range(0, size).to(push_msg->task.mutable_key_range()); + int push_time = CHECK_NOTNULL(shared_model)->push(push_msg); + + // pull + float* weight_data = reinterpret_cast(weight.get_data()); + shared_model->setLayer(name, weight_data, size); + MessagePtr pull_msg(new Message(kServerGroup, -1, push_time)); + pull_msg->task.set_key_channel_str(name); + Range(0, size).to(pull_msg->task.mutable_key_range()); + pull_msg->wait = true; + shared_model->pull(pull_msg); +} + +static void PullWeight(boost::numpy::ndarray weight, std::string name) { + static std::mutex mu; + + if (!shared_model) { + std::lock_guard lg(mu); + if (!shared_model) + shared_model = new SharedModel(); + } + + std::size_t size = 1; + for (int i = 0; i < weight.get_nd(); i++) + size *= weight.shape(i); + if (size == 0) + return; + + // pull + int push_time = -1; + float* weight_data = reinterpret_cast(weight.get_data()); + shared_model->setLayer(name, weight_data, size); + MessagePtr pull_msg(new Message(kServerGroup, -1, push_time)); + pull_msg->task.set_key_channel_str(name); + Range(0, size).to(pull_msg->task.mutable_key_range()); + pull_msg->wait = true; + shared_model->pull(pull_msg); +} + +} // namespace PS + + +BOOST_PYTHON_MODULE(ps) { + using namespace boost::python; + using namespace PS; + + def("myNodeID", MyNodeID); + def("myRank", MyRank); + def("rankSize", RankSize); + + def("PullWeight", PullWeight); + def("PushGradAndPullWeight", PushGradAndPullWeight); +} + diff --git a/src/app/python/python_env.h b/src/app/python/python_env.h new file mode 100644 index 0000000..610e03d --- /dev/null +++ b/src/app/python/python_env.h @@ -0,0 +1,73 @@ +#pragma once +#include +#include +#include +#include "python_bindings.h" + +namespace PS { + +class PythonEnv { +public: + PythonEnv() : active_(false) {} + + boost::python::dict& globals() { return globals_; } + + void load_file(const char* path, int argc, char** argv) { + reset(); + + Py_Initialize(); + + char* new_argv[argc]; + new_argv[0] = const_cast(path); + for (int i = 1; i < argc; i++) + new_argv[i] = argv[i]; + + PySys_SetArgv(argc, new_argv); + + boost::python::object main_module = boost::python::import("__main__"); + globals_ = boost::python::dict(main_module.attr("__dict__")); + + boost::python::exec("import os, sys; sys.path = [''] + sys.path", globals_); + //boost::python::exec("print(sys.path)"); + + boost::numpy::initialize(); + initps(); + + active_ = true; + + try { + boost::python::exec_file(path, globals_); + } catch (boost::python::error_already_set) { + PyErr_Print(); + throw; + } + } + + void eval(const char* script) { + try { + boost::python::exec(script, globals_); + } catch (boost::python::error_already_set) { + PyErr_Print(); + throw; + } + } + + ~PythonEnv() { + reset(); + } + +protected: + void reset() { + if (active_) { + globals_ = boost::python::dict(); + Py_Finalize(); + } + } + +private: + bool active_; + boost::python::dict globals_; +}; + +} // namespace PS + diff --git a/src/app/python/python_server.h b/src/app/python/python_server.h new file mode 100644 index 0000000..932ccd6 --- /dev/null +++ b/src/app/python/python_server.h @@ -0,0 +1,35 @@ +#pragma once +#include "python_env.h" +#include "python_updater.h" +#include "shared_model.h" + +namespace PS { + +class PythonServer : public App { +public: + PythonServer(PythonEnv* py_env) : App(), py_env_(py_env) { + // This instance takes the ownership of py_env + + updater_ = new PythonUpdater(py_env_); + shared_model_ = new SharedModel(); + shared_model_->setUpdater(updater_); + } + + virtual void init() { + //LOG(ERROR) << "this is server " << myRank(); + } + + virtual ~PythonServer() { + delete updater_; + delete shared_model_; + delete py_env_; + } + +private: + PythonEnv* py_env_; + Updater *updater_; + SharedModel *shared_model_; +}; + +} // namespace PS + diff --git a/src/app/python/python_updater.h b/src/app/python/python_updater.h new file mode 100644 index 0000000..afe8ced --- /dev/null +++ b/src/app/python/python_updater.h @@ -0,0 +1,64 @@ +#pragma once +#include +#include +#include "updater.h" +#include "python_env.h" + +namespace PS { + +template +class PythonUpdater : public Updater { +public: + PythonUpdater(PythonEnv* py_env) : py_env_(py_env) { } + virtual ~PythonUpdater() { } + + virtual void InitLayer(const std::string &name, V* weight, size_t size) { + //LOG(ERROR) << "InitLayer size = " << size; + try { + + Py_intptr_t shape[1] = { static_cast(size) }; + + boost::numpy::ndarray py_weight = boost::numpy::zeros(1, shape, boost::numpy::dtype::get_builtin()); + + py_env_->globals().get("init_layer")(name, py_weight); + + memcpy(weight, py_weight.get_data(), sizeof(V) * size); + + } catch (boost::python::error_already_set) { + //LOG(ERROR) << "InitLayer failed"; + PyErr_Print(); + throw; + } + //LOG(ERROR) << "InitLayer done"; + } + + virtual void Update(const std::string &name, V* weight, V* gradient, size_t size) { + //LOG(ERROR) << "Update size = " << size; + try { + + Py_intptr_t shape[1] = { static_cast(size) }; + + boost::numpy::ndarray py_weight = boost::numpy::zeros(1, shape, boost::numpy::dtype::get_builtin()); + memcpy(py_weight.get_data(), weight, sizeof(V) * size); + + boost::numpy::ndarray py_gradient = boost::numpy::zeros(1, shape, boost::numpy::dtype::get_builtin()); + memcpy(py_gradient.get_data(), gradient, sizeof(V) * size); + + py_env_->globals().get("update_layer")(name, py_weight, py_gradient); + + memcpy(weight, py_weight.get_data(), sizeof(V) * size); + + } catch (boost::python::error_already_set) { + //LOG(ERROR) << "Update failed"; + PyErr_Print(); + throw; + } + //LOG(ERROR) << "Update done"; + } + +private: + PythonEnv* py_env_; +}; + +} // namespace PS + diff --git a/src/app/python/shared_model.h b/src/app/python/shared_model.h new file mode 100644 index 0000000..2221e83 --- /dev/null +++ b/src/app/python/shared_model.h @@ -0,0 +1,127 @@ +#pragma once +#include "parameter/shared_parameter.h" +#include "updater.h" + +namespace PS { + +DECLARE_string(app_name); + +template +class SharedModel : public SharedParameter { + typedef Updater UpdaterT; + public: + SharedModel(const string& my_name = FLAGS_app_name + "_model", + const string& parent_name = FLAGS_app_name) : + SharedParameter(my_name, parent_name) { } + virtual ~SharedModel() { } + + void setLayer(string name, V* data, size_t size) { + val_[name] = SArray(data, size, false); + } + void setUpdater(UpdaterT * updater) { + updater_ = updater; + } + + // funcs will be called by the system + MessagePtrList slice(const MessagePtr& msg, const KeyRangeList& krs); + void getValue(const MessagePtr& msg); + void setValue(const MessagePtr& msg); + protected: + std::unordered_map> val_; + // an array is placed into multiple servers only if its length > min_slice_size + size_t min_slice_size_ = 1000; + + UpdaterT * updater_ = nullptr; +}; + + +template +void SharedModel::setValue(const MessagePtr& msg) { + CHECK_EQ(msg->value.size(), 1); + SArray recv_data(msg->value[0]); + Range kr(msg->task.key_range()); + CHECK_EQ(kr.size(), recv_data.size()); + string key = msg->task.key_channel_str(); + auto& my_val = val_[key]; + + if (isWorker()) { + if (my_val.empty()) my_val.resize(kr.size(), 0); + CHECK_GE(my_val.size(), kr.end()); + my_val.segment(kr).copyFrom(recv_data); + } else if (isServer()) { + // TODO this server can do flexible consistency control here + + if (my_val.empty()) { + // initialize weight + my_val.resize(kr.size(), 0); + CHECK_NOTNULL(updater_)->InitLayer(key, my_val.data(), my_val.size()); + } + + // update weight + CHECK_GE(my_val.size(), kr.size()); + CHECK_NOTNULL(updater_)->Update( + key, my_val.data(), recv_data.data(), recv_data.size()); + } +} + +// only be called at servers, namely a worker pull data from this server +template +void SharedModel::getValue(const MessagePtr& msg) { + auto& my_val = val_[msg->task.key_channel_str()]; + Range kr(msg->task.key_range()); + if (my_val.empty()) { + // initialize weight + my_val.resize(kr.size(), 0); + CHECK_NOTNULL(updater_)->InitLayer(msg->task.key_channel_str(), my_val.data(), my_val.size()); + } + + // TODO store the kr in memory + CHECK_EQ(my_val.size(), kr.size()); + SArray send_data(kr.size()); + send_data.copyFrom(my_val); + msg->addValue(send_data); +} + +// divide a message into n part, where part i goes to server i. it's a zero-copy +// implementation +template +MessagePtrList SharedModel::slice(const MessagePtr& msg, const KeyRangeList& krs) { + // divide the key range + size_t n = krs.size(); + MessagePtrList ret(n); + Range kr(msg->task.key_range()); + for (size_t i = 0; i < n; ++i) { + ret[i] = MessagePtr(new Message()); + ret[i]->miniCopyFrom(*msg); + ret[i]->valid = true; + auto mut_kr = ret[i]->task.mutable_key_range(); + if (kr.size() < min_slice_size_) { + if (i == 0) { + // server 0 get all data + kr.to(mut_kr); + } else { + Range(0,0).to(mut_kr); + // do not sent to server 1 - n + ret[i]->valid = false; + } + } else { + kr.evenDivide(n, i).to(mut_kr); + } + } + + // divide the data + for (size_t i = 0; i < msg->value.size(); ++i) { + SArray data(msg->value[i]); + CHECK_EQ(data.size(), kr.size()); + for (size_t j = 0; j < n; ++j) { + if (ret[j]->valid) { + Range kr(ret[j]->task.key_range()); + ret[j]->addValue(data.segment(kr)); + } + } + } + return ret; +} + + +} // namespace PS diff --git a/src/app/python/test.py b/src/app/python/test.py new file mode 100644 index 0000000..311d526 --- /dev/null +++ b/src/app/python/test.py @@ -0,0 +1,41 @@ +import ps + +#kServerGroup = 'all_servers' + +def init_layer(name, weight): + print(ps.myNodeID(), ", this is server ", ps.myRank()) + pass + +def update_layer(name, weight, gradient): + pass + +def worker_node_main(): + print(ps.myNodeID(), ", this is worker ", ps.myRank()) + + print(1) + import owl + print(2) + import sys + print(3) + owl.initialize(sys.argv) + print(4) + cpu = owl.create_cpu_device() + print(5) + gpu = [owl.create_gpu_device(i) for i in range(owl.get_gpu_device_count())] + print(6) + print ''' + __ __ _ __ _ _____ ____ _ _ ___ + / | / | | | | \\ | | | ___| | _ \\ | | / / / | + / |/ | | | | \\| | | |__ | |_| | | | / / / /| | + / /| /| | | | | | | __| | / | |/ / / /_| | + / / | / | | | | | |\\ | | |___ | |\\ \\ | / / ___ | + /_/ |_/ |_| |_| |_| \\__| |_____| |_| \\_\\ |__/ /_/ |_| + ''' + print '[INFO] You have %d GPU devices' % len(gpu) + print '[INFO] Set device to gpu[0]' + print(7) + + +if __name__ == '__main__': + pass + diff --git a/src/app/python/updater.h b/src/app/python/updater.h new file mode 100644 index 0000000..5a56fbf --- /dev/null +++ b/src/app/python/updater.h @@ -0,0 +1,16 @@ +#pragma once + +namespace PS { + +template +class Updater { +public: + Updater() { } + virtual ~Updater() { } + + virtual void InitLayer(const std::string &name, V* weight, size_t size) { } + + virtual void Update(const std::string &name, V* weight, V* gradient, size_t size) { } +}; + +} // namespace PS From 983f0c3c9b68e2c8b36d8cd6b0d3ce0e4e6f532c Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Thu, 12 Mar 2015 15:36:46 -0400 Subject: [PATCH 02/14] avoid copy from/to ndarray if possible; code cleanups --- Makefile | 2 +- src/app/python/main.cc | 3 + src/app/python/mnist_mlp.py | 103 +++++++++++++++++------------- src/app/python/python_bindings.cc | 98 ++++++++++++++++++++++++++++ src/app/python/python_bindings.h | 82 +----------------------- src/app/python/python_env.cc | 59 +++++++++++++++++ src/app/python/python_env.h | 50 +-------------- src/app/python/python_server.h | 8 ++- src/app/python/python_updater.h | 25 ++++---- 9 files changed, 242 insertions(+), 188 deletions(-) create mode 100644 src/app/python/python_bindings.cc create mode 100644 src/app/python/python_env.cc diff --git a/Makefile b/Makefile index 2ed49f4..8e26052 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ app: build/ps build/hello: build/app/hello_world/main.o $(PS_LIB) $(PS_MAIN) $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ -build/ps_python: build/app/python/main.o $(PS_LIB) $(PS_MAIN) +build/ps_python: build/app/python/main.o build/app/python/python_env.o build/app/python/python_bindings.o $(PS_LIB) $(PS_MAIN) $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ sys_srcs = $(wildcard src/util/*.cc) $(wildcard src/data/*.cc) \ diff --git a/src/app/python/main.cc b/src/app/python/main.cc index b6faee3..2e80967 100644 --- a/src/app/python/main.cc +++ b/src/app/python/main.cc @@ -24,6 +24,9 @@ int WorkerNodeMain(int argc, char *argv[]) { py_env.load_file(PS::FLAGS_script.c_str(), argc, argv); try { + if (py_env.globals().has_key("worker_node_init")) + py_env.globals().get("worker_node_init")(); + py_env.globals().get("worker_node_main")(); } catch (boost::python::error_already_set) { PyErr_Print(); diff --git a/src/app/python/mnist_mlp.py b/src/app/python/mnist_mlp.py index eb36179..72e90a6 100644 --- a/src/app/python/mnist_mlp.py +++ b/src/app/python/mnist_mlp.py @@ -11,6 +11,8 @@ class MnistTrainer: def __init__(self, data_file='mnist_all.mat', num_epochs=100, mb_size=256, eps_w=0.01, eps_b=0.01): + print('Worker; NodeID: %s, Rank: %d, RankSize: %d' % (ps.myNodeID(), ps.myRank(), ps.rankSize())) + self.cpu = owl.create_cpu_device() self.gpu = owl.create_gpu_device(0) self.data_file = data_file @@ -106,54 +108,67 @@ def run(self): print 'Testing error:', float(np.count_nonzero(val)) / num_test_samples print '---Finish epoch #%d' % epoch + + +class MnistServer: + def __init__(self): + print('Server; NodeID: %s, Rank: %d, RankSize: %d' % (ps.myNodeID(), ps.myRank(), ps.rankSize())) + self.cpu = owl.create_cpu_device() + + def init_layer(self, name, weight): + l1 = 784; l2 = 256; l3 = 10 + + w1 = owl.randn([l2, l1], 0.0, math.sqrt(4.0 / (l1 + l2))).to_numpy() + w2 = owl.randn([l3, l2], 0.0, math.sqrt(4.0 / (l2 + l3))).to_numpy() + b1 = owl.zeros([l2, 1]).to_numpy() + b2 = owl.zeros([l3, 1]).to_numpy() + + if name == 'w1': + np.copyto(weight, w1.flatten()) + elif name == 'w2': + np.copyto(weight, w2.flatten()) + elif name == 'b1': + np.copyto(weight, b1.flatten()) + elif name == 'b2': + np.copyto(weight, b2.flatten()) + else: + assert False + + def update_layer(self, name, weight, gradient): + eps_w = 0.01 + eps_b = 0.01 + + if name[0] == 'w': + weight -= eps_w * gradient + elif name[0] == 'b': + weight -= eps_b * gradient + else: + assert False + + # PS - server -g_server_cpu = None - -def init_layer(name, weight): - # weight initialization - - # we must use the CPU on the server to run Minerva - global g_server_cpu - if g_server_cpu is None: - g_server_cpu = owl.create_cpu_device() - - print(ps.myNodeID(), ", this is server ", ps.myRank()) - - l1 = 784; l2 = 256; l3 = 10 - - w1 = owl.randn([l2, l1], 0.0, math.sqrt(4.0 / (l1 + l2))).to_numpy() - w2 = owl.randn([l3, l2], 0.0, math.sqrt(4.0 / (l2 + l3))).to_numpy() - b1 = owl.zeros([l2, 1]).to_numpy() - b2 = owl.zeros([l3, 1]).to_numpy() - - if name == 'w1': - np.copyto(weight, w1.flatten()) - elif name == 'w2': - np.copyto(weight, w2.flatten()) - elif name == 'b1': - np.copyto(weight, b1.flatten()) - elif name == 'b2': - np.copyto(weight, b2.flatten()) - else: - assert False - print('init_layer done') - -def update_layer(name, weight, gradient): - eps_w = 0.01 - eps_b = 0.01 - - if name[0] == 'w': - weight -= eps_w * gradient - elif name[0] == 'b': - weight -= eps_b * gradient - else: - assert False +server = None +def server_node_init(): + global server + owl.initialize(sys.argv) + server = MnistServer() + +def server_init_layer(name, weight): + server.init_layer(name, weight) + +def server_update_layer(name, weight, gradient): + server.update_layer(name, weight, gradient) # PS - worker +worker = None +def worker_node_init(): + global worker + owl.initialize(sys.argv) + worker = MnistTrainer(num_epochs = 10) + def worker_node_main(): - trainer = MnistTrainer(num_epochs = 10) - trainer.run() + worker.run() if __name__ == '__main__': - owl.initialize(sys.argv) + pass diff --git a/src/app/python/python_bindings.cc b/src/app/python/python_bindings.cc new file mode 100644 index 0000000..5851936 --- /dev/null +++ b/src/app/python/python_bindings.cc @@ -0,0 +1,98 @@ +#include +#include +#include +#include +#include "ps.h" +#include "python_bindings.h" +#include "shared_model.h" + +namespace PS { + +static SharedModel *g_shared_model = nullptr; + +static void prepare_shared_model() { + static std::mutex mu; + + if (!g_shared_model) { + std::lock_guard lg(mu); + if (!g_shared_model) + g_shared_model = new SharedModel(); + } +} + +static std::size_t ndarray_num_elements(const boost::numpy::ndarray& arr) { + std::size_t size = 1; + for (int i = 0; i < arr.get_nd(); i++) + size *= arr.shape(i); + return size; +} + +static void PullWeight(boost::numpy::ndarray weight, std::string name) { + prepare_shared_model(); + + std::size_t size = ndarray_num_elements(weight); + if (size == 0) + return; + + // pull + int push_time = -1; + float* weight_data = reinterpret_cast(weight.get_data()); + g_shared_model->setLayer(name, weight_data, size); // in-place write to ndarray + MessagePtr pull_msg(new Message(kServerGroup, -1, push_time)); + pull_msg->task.set_key_channel_str(name); + Range(0, size).to(pull_msg->task.mutable_key_range()); + pull_msg->wait = true; + g_shared_model->pull(pull_msg); +} + +static void PushGradAndPullWeight(boost::numpy::ndarray grad, boost::numpy::ndarray weight, std::string name) { + prepare_shared_model(); + + std::size_t size = ndarray_num_elements(weight); + if (size == 0) + return; + + assert(size == ndarray_num_elements(grad)); + + // push + float* grad_data = reinterpret_cast(grad.get_data()); + SArray val(grad_data, size, false); // zero-copy read from ndarray + MessagePtr push_msg(new Message(kServerGroup)); + push_msg->addValue(val); + // LL << val; + push_msg->task.set_key_channel_str(name); + Range(0, size).to(push_msg->task.mutable_key_range()); + int push_time = CHECK_NOTNULL(g_shared_model)->push(push_msg); + + // pull + float* weight_data = reinterpret_cast(weight.get_data()); + g_shared_model->setLayer(name, weight_data, size); // in-place write to ndarray + MessagePtr pull_msg(new Message(kServerGroup, -1, push_time)); + pull_msg->task.set_key_channel_str(name); + Range(0, size).to(pull_msg->task.mutable_key_range()); + pull_msg->wait = true; + g_shared_model->pull(pull_msg); +} + +} // namespace PS + +BOOST_PYTHON_MODULE(ps) { + using namespace boost::python; + using namespace PS; + + def("myNodeID", MyNodeID); + def("myRank", MyRank); + def("rankSize", RankSize); + + def("PullWeight", PullWeight); + def("PushGradAndPullWeight", PushGradAndPullWeight); +} + +namespace PS { + +void init_bindings() { + initps(); +} + +} // namespace PS + diff --git a/src/app/python/python_bindings.h b/src/app/python/python_bindings.h index ef070fd..290c657 100644 --- a/src/app/python/python_bindings.h +++ b/src/app/python/python_bindings.h @@ -1,88 +1,8 @@ #pragma once -#include -#include -#include -#include -#include "ps.h" -#include "shared_model.h" namespace PS { -static SharedModel *shared_model = nullptr; - -static void PushGradAndPullWeight(boost::numpy::ndarray grad, boost::numpy::ndarray weight, std::string name) { - static std::mutex mu; - - if (!shared_model) { - std::lock_guard lg(mu); - if (!shared_model) - shared_model = new SharedModel(); - } - - std::size_t size = 1; - for (int i = 0; i < weight.get_nd(); i++) - size *= weight.shape(i); - if (size == 0) - return; - - // push - float* grad_data = reinterpret_cast(grad.get_data()); - SArray val; val.copyFrom(grad_data, size); - MessagePtr push_msg(new Message(kServerGroup)); - push_msg->addValue(val); - // LL << val; - push_msg->task.set_key_channel_str(name); - Range(0, size).to(push_msg->task.mutable_key_range()); - int push_time = CHECK_NOTNULL(shared_model)->push(push_msg); - - // pull - float* weight_data = reinterpret_cast(weight.get_data()); - shared_model->setLayer(name, weight_data, size); - MessagePtr pull_msg(new Message(kServerGroup, -1, push_time)); - pull_msg->task.set_key_channel_str(name); - Range(0, size).to(pull_msg->task.mutable_key_range()); - pull_msg->wait = true; - shared_model->pull(pull_msg); -} - -static void PullWeight(boost::numpy::ndarray weight, std::string name) { - static std::mutex mu; - - if (!shared_model) { - std::lock_guard lg(mu); - if (!shared_model) - shared_model = new SharedModel(); - } - - std::size_t size = 1; - for (int i = 0; i < weight.get_nd(); i++) - size *= weight.shape(i); - if (size == 0) - return; - - // pull - int push_time = -1; - float* weight_data = reinterpret_cast(weight.get_data()); - shared_model->setLayer(name, weight_data, size); - MessagePtr pull_msg(new Message(kServerGroup, -1, push_time)); - pull_msg->task.set_key_channel_str(name); - Range(0, size).to(pull_msg->task.mutable_key_range()); - pull_msg->wait = true; - shared_model->pull(pull_msg); -} +void init_bindings(); } // namespace PS - -BOOST_PYTHON_MODULE(ps) { - using namespace boost::python; - using namespace PS; - - def("myNodeID", MyNodeID); - def("myRank", MyRank); - def("rankSize", RankSize); - - def("PullWeight", PullWeight); - def("PushGradAndPullWeight", PushGradAndPullWeight); -} - diff --git a/src/app/python/python_env.cc b/src/app/python/python_env.cc new file mode 100644 index 0000000..fa3c646 --- /dev/null +++ b/src/app/python/python_env.cc @@ -0,0 +1,59 @@ +#include "python_env.h" +#include +#include +#include "python_bindings.h" + +namespace PS { + +void PythonEnv::load_file(const char* path, int argc, char** argv) { + reset(); + + // initialize the Python interpreter + Py_Initialize(); + + // obtain globals + boost::python::object main_module = boost::python::import("__main__"); + globals_ = boost::python::dict(main_module.attr("__dict__")); + + // construct a new argument list, with the first one replaced by the script path to enable module imports in the same directory + char* new_argv[argc]; + new_argv[0] = const_cast(path); + for (int i = 1; i < argc; i++) + new_argv[i] = argv[i]; + + PySys_SetArgv(argc, new_argv); + + // add '' to sys.path as well + boost::python::exec("import os, sys; sys.path = [''] + sys.path", globals_); + //boost::python::exec("print(sys.path)"); + + // initialize boost::numpy and our own bindings + boost::numpy::initialize(); + init_bindings(); + + active_ = true; + + try { + boost::python::exec_file(path, globals_); + } catch (boost::python::error_already_set) { + PyErr_Print(); + throw; + } +} + +void PythonEnv::eval(const char* script) { + try { + boost::python::exec(script, globals_); + } catch (boost::python::error_already_set) { + PyErr_Print(); + throw; + } +} + +void PythonEnv::reset() { + if (active_) + Py_Finalize(); +} + +} // namespace PS + diff --git a/src/app/python/python_env.h b/src/app/python/python_env.h index 610e03d..b21f5ec 100644 --- a/src/app/python/python_env.h +++ b/src/app/python/python_env.h @@ -1,8 +1,5 @@ #pragma once -#include #include -#include -#include "python_bindings.h" namespace PS { @@ -12,57 +9,16 @@ class PythonEnv { boost::python::dict& globals() { return globals_; } - void load_file(const char* path, int argc, char** argv) { - reset(); - - Py_Initialize(); - - char* new_argv[argc]; - new_argv[0] = const_cast(path); - for (int i = 1; i < argc; i++) - new_argv[i] = argv[i]; - - PySys_SetArgv(argc, new_argv); - - boost::python::object main_module = boost::python::import("__main__"); - globals_ = boost::python::dict(main_module.attr("__dict__")); - - boost::python::exec("import os, sys; sys.path = [''] + sys.path", globals_); - //boost::python::exec("print(sys.path)"); + void load_file(const char* path, int argc, char** argv); - boost::numpy::initialize(); - initps(); - - active_ = true; - - try { - boost::python::exec_file(path, globals_); - } catch (boost::python::error_already_set) { - PyErr_Print(); - throw; - } - } - - void eval(const char* script) { - try { - boost::python::exec(script, globals_); - } catch (boost::python::error_already_set) { - PyErr_Print(); - throw; - } - } + void eval(const char* script); ~PythonEnv() { reset(); } protected: - void reset() { - if (active_) { - globals_ = boost::python::dict(); - Py_Finalize(); - } - } + void reset(); private: bool active_; diff --git a/src/app/python/python_server.h b/src/app/python/python_server.h index 932ccd6..c76b24d 100644 --- a/src/app/python/python_server.h +++ b/src/app/python/python_server.h @@ -16,7 +16,13 @@ class PythonServer : public App { } virtual void init() { - //LOG(ERROR) << "this is server " << myRank(); + try { + if (py_env_->globals().has_key("server_node_init")) + py_env_->globals().get("server_node_init")(); + } catch (boost::python::error_already_set) { + PyErr_Print(); + throw; + } } virtual ~PythonServer() { diff --git a/src/app/python/python_updater.h b/src/app/python/python_updater.h index afe8ced..92d0641 100644 --- a/src/app/python/python_updater.h +++ b/src/app/python/python_updater.h @@ -16,13 +16,13 @@ class PythonUpdater : public Updater { //LOG(ERROR) << "InitLayer size = " << size; try { - Py_intptr_t shape[1] = { static_cast(size) }; + auto shape = boost::python::make_tuple(size); + auto stride = boost::python::make_tuple(sizeof(V)); - boost::numpy::ndarray py_weight = boost::numpy::zeros(1, shape, boost::numpy::dtype::get_builtin()); + // construct new ndarrays using the existing C arrays without copying + auto py_weight = boost::numpy::from_data(weight, boost::numpy::dtype::get_builtin(), shape, stride, boost::python::object()); - py_env_->globals().get("init_layer")(name, py_weight); - - memcpy(weight, py_weight.get_data(), sizeof(V) * size); + py_env_->globals().get("server_init_layer")(name, py_weight); } catch (boost::python::error_already_set) { //LOG(ERROR) << "InitLayer failed"; @@ -36,17 +36,14 @@ class PythonUpdater : public Updater { //LOG(ERROR) << "Update size = " << size; try { - Py_intptr_t shape[1] = { static_cast(size) }; - - boost::numpy::ndarray py_weight = boost::numpy::zeros(1, shape, boost::numpy::dtype::get_builtin()); - memcpy(py_weight.get_data(), weight, sizeof(V) * size); - - boost::numpy::ndarray py_gradient = boost::numpy::zeros(1, shape, boost::numpy::dtype::get_builtin()); - memcpy(py_gradient.get_data(), gradient, sizeof(V) * size); + auto shape = boost::python::make_tuple(size); + auto stride = boost::python::make_tuple(sizeof(V)); - py_env_->globals().get("update_layer")(name, py_weight, py_gradient); + // construct new ndarrays using the existing C arrays without copying + auto py_weight = boost::numpy::from_data(weight, boost::numpy::dtype::get_builtin(), shape, stride, boost::python::object()); + auto py_gradient = boost::numpy::from_data(gradient, boost::numpy::dtype::get_builtin(), shape, stride, boost::python::object()); - memcpy(weight, py_weight.get_data(), sizeof(V) * size); + py_env_->globals().get("server_update_layer")(name, py_weight, py_gradient); } catch (boost::python::error_already_set) { //LOG(ERROR) << "Update failed"; From 1e475ce045c0e0a09a03b1bbc10badc548c88c19 Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Thu, 12 Mar 2015 15:48:10 -0400 Subject: [PATCH 03/14] prepare for script hotswaps; supress compiler warnings in 3rd party libs --- src/app/python/main.cc | 13 +++++-------- src/app/python/python_bindings.cc | 3 +++ src/app/python/python_env.h | 3 +++ src/app/python/python_server.h | 31 +++++++++++++++++++++++-------- 4 files changed, 34 insertions(+), 16 deletions(-) diff --git a/src/app/python/main.cc b/src/app/python/main.cc index 2e80967..98aebfd 100644 --- a/src/app/python/main.cc +++ b/src/app/python/main.cc @@ -7,21 +7,18 @@ namespace PS { DEFINE_string(script, "", "the Python script path"); App* CreateServerNode(const std::string& conf) { - static int argc = 2; - static std::string s_conf = conf; - static char* argv[2] = { const_cast(""), const_cast(s_conf.c_str()) }; + std::string script = FLAGS_script; - PS::PythonEnv* py_env = new PS::PythonEnv(); - py_env->load_file(PS::FLAGS_script.c_str(), argc, argv); - - return new PythonServer(py_env); + return new PythonServer(script, conf); } } // namespace PS int WorkerNodeMain(int argc, char *argv[]) { + std::string script = PS::FLAGS_script; + PS::PythonEnv py_env; - py_env.load_file(PS::FLAGS_script.c_str(), argc, argv); + py_env.load_file(script.c_str(), argc, argv); try { if (py_env.globals().has_key("worker_node_init")) diff --git a/src/app/python/python_bindings.cc b/src/app/python/python_bindings.cc index 5851936..7eb550b 100644 --- a/src/app/python/python_bindings.cc +++ b/src/app/python/python_bindings.cc @@ -1,6 +1,9 @@ #include +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-local-typedefs" #include #include +#pragma GCC diagnostic pop #include #include "ps.h" #include "python_bindings.h" diff --git a/src/app/python/python_env.h b/src/app/python/python_env.h index b21f5ec..91eb8ce 100644 --- a/src/app/python/python_env.h +++ b/src/app/python/python_env.h @@ -1,5 +1,8 @@ #pragma once +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-local-typedefs" #include +#pragma GCC diagnostic pop namespace PS { diff --git a/src/app/python/python_server.h b/src/app/python/python_server.h index c76b24d..ac7f2b5 100644 --- a/src/app/python/python_server.h +++ b/src/app/python/python_server.h @@ -7,18 +7,29 @@ namespace PS { class PythonServer : public App { public: - PythonServer(PythonEnv* py_env) : App(), py_env_(py_env) { - // This instance takes the ownership of py_env - - updater_ = new PythonUpdater(py_env_); + PythonServer(const std::string& script, const std::string& conf) : App(), script_(script), conf_(conf) { + updater_ = new PythonUpdater(&py_env_); shared_model_ = new SharedModel(); shared_model_->setUpdater(updater_); } virtual void init() { + reset(script_, conf_); + } + + void reset(const std::string& script, const std::string& conf) { + script_ = script; + conf_ = conf; + + int argc = 2; + argv_[0] = const_cast(script_.c_str()); + argv_[1] = const_cast(conf_.c_str()); + + py_env_.load_file(script_.c_str(), argc, argv_); + try { - if (py_env_->globals().has_key("server_node_init")) - py_env_->globals().get("server_node_init")(); + if (py_env_.globals().has_key("server_node_init")) + py_env_.globals().get("server_node_init")(); } catch (boost::python::error_already_set) { PyErr_Print(); throw; @@ -28,11 +39,15 @@ class PythonServer : public App { virtual ~PythonServer() { delete updater_; delete shared_model_; - delete py_env_; } private: - PythonEnv* py_env_; + std::string script_; + std::string conf_; + + char* argv_[2]; + + PythonEnv py_env_; Updater *updater_; SharedModel *shared_model_; }; From 7e8e971b0dc3e3a3a72aa3c29b84b379f7dfb47b Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Thu, 12 Mar 2015 15:49:59 -0400 Subject: [PATCH 04/14] minor indentation fix --- src/app/python/mnist_mlp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/app/python/mnist_mlp.py b/src/app/python/mnist_mlp.py index 72e90a6..52242be 100644 --- a/src/app/python/mnist_mlp.py +++ b/src/app/python/mnist_mlp.py @@ -170,5 +170,5 @@ def worker_node_main(): worker.run() if __name__ == '__main__': - pass + pass From c7230d18fef7644b2ea938dba8d8c6c38126688e Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Fri, 13 Mar 2015 04:06:54 -0400 Subject: [PATCH 05/14] try less frequent pushes/pulls --- src/app/python/mnist_mlp.py | 69 ++++++++++++++++++++++++++++++------- 1 file changed, 57 insertions(+), 12 deletions(-) diff --git a/src/app/python/mnist_mlp.py b/src/app/python/mnist_mlp.py index 52242be..9ab7d6c 100644 --- a/src/app/python/mnist_mlp.py +++ b/src/app/python/mnist_mlp.py @@ -9,6 +9,8 @@ # PS import ps +bulk = True + class MnistTrainer: def __init__(self, data_file='mnist_all.mat', num_epochs=100, mb_size=256, eps_w=0.01, eps_b=0.01): print('Worker; NodeID: %s, Rank: %d, RankSize: %d' % (ps.myNodeID(), ps.myRank(), ps.rankSize())) @@ -42,6 +44,12 @@ def __init__(self, data_file='mnist_all.mat', num_epochs=100, mb_size=256, eps_w self.b1 = owl.from_numpy(t_b1) self.b2 = owl.from_numpy(t_b2) + if bulk: + self.gw1 = owl.zeros([l2, l1]) + self.gw2 = owl.zeros([l3, l2]) + self.gb1 = owl.zeros([l2, 1]) + self.gb2 = owl.zeros([l3, 1]) + def run(self): (train_data, test_data) = mnist_io.load_mb_from_mat(self.data_file, self.mb_size) np.set_printoptions(linewidth=200) @@ -79,25 +87,62 @@ def run(self): #self.b1 -= self.eps_b * gb1 #self.b2 -= self.eps_b * gb2 # PS: instead, push gradients and pull weights from servers + if not bulk: + t_w1 = self.w1.to_numpy() + t_w2 = self.w2.to_numpy() + t_b1 = self.b1.to_numpy() + t_b2 = self.b2.to_numpy() + ps.PushGradAndPullWeight(gw1.to_numpy(), t_w1, 'w1') + ps.PushGradAndPullWeight(gw2.to_numpy(), t_w2, 'w2') + ps.PushGradAndPullWeight(gb1.to_numpy(), t_b1, 'b1') + ps.PushGradAndPullWeight(gb2.to_numpy(), t_b2, 'b2') + self.w1 = owl.from_numpy(t_w1) + self.w2 = owl.from_numpy(t_w2) + self.b1 = owl.from_numpy(t_b1) + self.b2 = owl.from_numpy(t_b2) + else: + self.gw1 += gw1 + self.gw2 += gw2 + self.gb1 += gb1 + self.gb2 += gb2 + if count % 10 == 0: + t_w1 = self.w1.to_numpy() + t_w2 = self.w2.to_numpy() + t_b1 = self.b1.to_numpy() + t_b2 = self.b2.to_numpy() + ps.PushGradAndPullWeight(self.gw1.to_numpy(), t_w1, 'w1') + ps.PushGradAndPullWeight(self.gw2.to_numpy(), t_w2, 'w2') + ps.PushGradAndPullWeight(self.gb1.to_numpy(), t_b1, 'b1') + ps.PushGradAndPullWeight(self.gb2.to_numpy(), t_b2, 'b2') + self.w1 = owl.from_numpy(t_w1) + self.w2 = owl.from_numpy(t_w2) + self.b1 = owl.from_numpy(t_b1) + self.b2 = owl.from_numpy(t_b2) + self.gw1 -= self.gw1 + self.gw2 -= self.gw2 + self.gb1 -= self.gb1 + self.gb2 -= self.gb2 + + if (count % 40 == 0): + correct = out.argmax(0) - target.argmax(0) + val = correct.to_numpy() + print 'Training error:', float(np.count_nonzero(val)) / num_samples + count = count + 1 + + if bulk: t_w1 = self.w1.to_numpy() t_w2 = self.w2.to_numpy() t_b1 = self.b1.to_numpy() t_b2 = self.b2.to_numpy() - ps.PushGradAndPullWeight(gw1.to_numpy(), t_w1, 'w1') - ps.PushGradAndPullWeight(gw2.to_numpy(), t_w2, 'w2') - ps.PushGradAndPullWeight(gb1.to_numpy(), t_b1, 'b1') - ps.PushGradAndPullWeight(gb2.to_numpy(), t_b2, 'b2') + ps.PushGradAndPullWeight(self.gw1.to_numpy(), t_w1, 'w1') + ps.PushGradAndPullWeight(self.gw2.to_numpy(), t_w2, 'w2') + ps.PushGradAndPullWeight(self.gb1.to_numpy(), t_b1, 'b1') + ps.PushGradAndPullWeight(self.gb2.to_numpy(), t_b2, 'b2') self.w1 = owl.from_numpy(t_w1) self.w2 = owl.from_numpy(t_w2) self.b1 = owl.from_numpy(t_b1) self.b2 = owl.from_numpy(t_b2) - if (count % 40 == 0): - correct = out.argmax(0) - target.argmax(0) - val = correct.to_numpy() - print 'Training error:', float(np.count_nonzero(val)) / num_samples - count = count + 1 - # test a1 = test_samples a2 = ele.relu(self.w1 * a1 + self.b1) @@ -150,7 +195,7 @@ def update_layer(self, name, weight, gradient): server = None def server_node_init(): global server - owl.initialize(sys.argv) + owl.initialize(sys.argv + ['-skip_glog_initialization']) server = MnistServer() def server_init_layer(name, weight): @@ -163,7 +208,7 @@ def server_update_layer(name, weight, gradient): worker = None def worker_node_init(): global worker - owl.initialize(sys.argv) + owl.initialize(sys.argv + ['-skip_glog_initialization']) worker = MnistTrainer(num_epochs = 10) def worker_node_main(): From 04664bd43a8004899508ad3b18424b598cb82ab8 Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Fri, 13 Mar 2015 19:58:06 -0400 Subject: [PATCH 06/14] build ps_python by default --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 8e26052..de3b738 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ LDFLAGS += $(THIRD_LIB) -lpthread -lrt PS_LIB = build/libps.a PS_MAIN = build/libpsmain.a -all: ps app +all: ps build/ps_python clean: rm -rf build From b01395865c6c792f8fc22879e0a4729b5aceeed8 Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Fri, 13 Mar 2015 19:59:18 -0400 Subject: [PATCH 07/14] move python scripts to a new location --- {src/app/python => example/python/mnist}/mnist_io.py | 0 {src/app/python => example/python/mnist}/mnist_mlp.py | 5 ++--- 2 files changed, 2 insertions(+), 3 deletions(-) rename {src/app/python => example/python/mnist}/mnist_io.py (100%) rename {src/app/python => example/python/mnist}/mnist_mlp.py (98%) diff --git a/src/app/python/mnist_io.py b/example/python/mnist/mnist_io.py similarity index 100% rename from src/app/python/mnist_io.py rename to example/python/mnist/mnist_io.py diff --git a/src/app/python/mnist_mlp.py b/example/python/mnist/mnist_mlp.py similarity index 98% rename from src/app/python/mnist_mlp.py rename to example/python/mnist/mnist_mlp.py index 9ab7d6c..3002825 100644 --- a/src/app/python/mnist_mlp.py +++ b/example/python/mnist/mnist_mlp.py @@ -8,7 +8,6 @@ # PS import ps - bulk = True class MnistTrainer: @@ -195,7 +194,7 @@ def update_layer(self, name, weight, gradient): server = None def server_node_init(): global server - owl.initialize(sys.argv + ['-skip_glog_initialization']) + owl.initialize(sys.argv + ['-no_init_glog']) server = MnistServer() def server_init_layer(name, weight): @@ -208,7 +207,7 @@ def server_update_layer(name, weight, gradient): worker = None def worker_node_init(): global worker - owl.initialize(sys.argv + ['-skip_glog_initialization']) + owl.initialize(sys.argv + ['-no_init_glog']) worker = MnistTrainer(num_epochs = 10) def worker_node_main(): From da67843e8b22806c479f05b22283b21906297f8b Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Fri, 13 Mar 2015 21:41:01 -0400 Subject: [PATCH 08/14] add a layer to convert from/to owl.NArray automatically; add a PS-python version of mnist_cnn from Minerva --- example/python/mnist/mnist_cnn.py | 242 ++++++++++++++++++++++++++++++ example/python/mnist/mnist_mlp.py | 84 ++++------- src/app/python/ps.py | 52 +++++++ src/app/python/python_bindings.cc | 4 +- 4 files changed, 322 insertions(+), 60 deletions(-) create mode 100644 example/python/mnist/mnist_cnn.py create mode 100644 src/app/python/ps.py diff --git a/example/python/mnist/mnist_cnn.py b/example/python/mnist/mnist_cnn.py new file mode 100644 index 0000000..88ba622 --- /dev/null +++ b/example/python/mnist/mnist_cnn.py @@ -0,0 +1,242 @@ +import sys +import time +import argparse +import numpy as np +import mnist_io +import owl +import owl.elewise as ele +import owl.conv as conv + +# PS +import ps +#bulk = True + +lazy_cycle = 4 + +class MNISTCNNModel: + def __init__(self): + self.convs = [ + conv.Convolver(0, 0, 1, 1), + conv.Convolver(2, 2, 1, 1), + ]; + self.poolings = [ + conv.Pooler(2, 2, 2, 2, 0, 0, conv.pool_op.max), + conv.Pooler(3, 3, 3, 3, 0, 0, conv.pool_op.max) + ]; + + def init_random(self): + #self.weights = [ + # owl.randn([5, 5, 1, 16], 0.0, 0.1), + # owl.randn([5, 5, 16, 32], 0.0, 0.1), + # owl.randn([10, 512], 0.0, 0.1) + #]; + #self.weightdelta = [ + # owl.zeros([5, 5, 1, 16]), + # owl.zeros([5, 5, 16, 32]), + # owl.zeros([10, 512]) + #]; + #self.bias = [ + # owl.zeros([16]), + # owl.zeros([32]), + # owl.zeros([10, 1]) + #]; + #self.biasdelta = [ + # owl.zeros([16]), + # owl.zeros([32]), + # owl.zeros([10, 1]) + #]; + self.weights = [ + ps.pull_weight(owl.zeros([5, 5, 1, 16]), 'w_0'), + ps.pull_weight(owl.zeros([5, 5, 16, 32]), 'w_1'), + ps.pull_weight(owl.zeros([10, 512]), 'w_2'), + ] + self.weightdelta = [ + ps.pull_weight(owl.zeros([5, 5, 1, 16]), 'wd_0'), + ps.pull_weight(owl.zeros([5, 5, 16, 32]), 'wd_1'), + ps.pull_weight(owl.zeros([10, 512]), 'wd_2'), + ] + self.bias = [ + ps.pull_weight(owl.zeros([16]), 'b_0'), + ps.pull_weight(owl.zeros([32]), 'b_1'), + ps.pull_weight(owl.zeros([10, 1]), 'b_2'), + ] + self.biasdelta = [ + ps.pull_weight(owl.zeros([16]), 'bd_0'), + ps.pull_weight(owl.zeros([32]), 'bd_1'), + ps.pull_weight(owl.zeros([10, 1]), 'bd_2'), + ] + +def print_training_accuracy(o, t, mbsize, prefix): + predict = o.reshape([10, mbsize]).argmax(0) + ground_truth = t.reshape([10, mbsize]).argmax(0) + correct = (predict - ground_truth).count_zero() + print prefix, 'error: {}'.format((mbsize - correct) * 1.0 / mbsize) + +def bpprop(model, samples, label): + num_layers = 6 + num_samples = samples.shape[-1] + fc_shape = [512, num_samples] + + acts = [None] * num_layers + errs = [None] * num_layers + weightgrad = [None] * len(model.weights) + biasgrad = [None] * len(model.bias) + + acts[0] = samples + acts[1] = ele.relu(model.convs[0].ff(acts[0], model.weights[0], model.bias[0])) + acts[2] = model.poolings[0].ff(acts[1]) + acts[3] = ele.relu(model.convs[1].ff(acts[2], model.weights[1], model.bias[1])) + acts[4] = model.poolings[1].ff(acts[3]) + acts[5] = model.weights[2] * acts[4].reshape(fc_shape) + model.bias[2] + + out = conv.softmax(acts[5], conv.soft_op.instance) + + errs[5] = out - label + errs[4] = (model.weights[2].trans() * errs[5]).reshape(acts[4].shape) + errs[3] = ele.relu_back(model.poolings[1].bp(errs[4], acts[4], acts[3]), acts[3]) + errs[2] = model.convs[1].bp(errs[3], acts[2], model.weights[1]) + errs[1] = ele.relu_back(model.poolings[0].bp(errs[2], acts[2], acts[1]), acts[1]) + + weightgrad[2] = errs[5] * acts[4].reshape(fc_shape).trans() + biasgrad[2] = errs[5].sum(1) + weightgrad[1] = model.convs[1].weight_grad(errs[3], acts[2], model.weights[1]) + biasgrad[1] = model.convs[1].bias_grad(errs[3]) + weightgrad[0] = model.convs[0].weight_grad(errs[1], acts[0], model.weights[0]) + biasgrad[0] = model.convs[0].bias_grad(errs[1]) + return (out, weightgrad, biasgrad) + +#def train_network(model, num_epochs=100, minibatch_size=256, lr=0.01, mom=0.75, wd=5e-4): +def train_network(model, num_epochs=100, minibatch_size=256, lr=0.01, wd=5e-4): + # load data + (train_data, test_data) = mnist_io.load_mb_from_mat('mnist_all.mat', minibatch_size / len(gpu)) + num_test_samples = test_data[0].shape[0] + test_samples = owl.from_numpy(test_data[0]).reshape([28, 28, 1, num_test_samples]) + test_labels = owl.from_numpy(test_data[1]) + for i in xrange(num_epochs): + print "---Epoch #", i + last = time.time() + count = 0 + weightgrads = [None] * len(gpu) + biasgrads = [None] * len(gpu) + for (mb_samples, mb_labels) in train_data: + count += 1 + current_gpu = count % len(gpu) + owl.set_device(gpu[current_gpu]) + num_samples = mb_samples.shape[0] + data = owl.from_numpy(mb_samples).reshape([28, 28, 1, num_samples]) + label = owl.from_numpy(mb_labels) + out, weightgrads[current_gpu], biasgrads[current_gpu] = bpprop(model, data, label) + #out.start_eval() + if current_gpu == 0: + for k in range(len(model.weights)): + #model.weightdelta[k] = mom * model.weightdelta[k] - lr / num_samples / len(gpu) * multi_gpu_merge(weightgrads, 0, k) - lr * wd * model.weights[k] + #model.biasdelta[k] = mom * model.biasdelta[k] - lr / num_samples / len(gpu) * multi_gpu_merge(biasgrads, 0, k) + #model.weights[k] += model.weightdelta[k] + #model.bias[k] += model.biasdelta[k] + model.weightdelta[k] = ps.push_grad_and_pull_weight(lr / num_samples / len(gpu) * multi_gpu_merge(weightgrads, 0, k) + lr * wd * model.weights[k], model.weightdelta[k], "wd_%d" % k) + model.biasdelta[k] = ps.push_grad_and_pull_weight(lr / num_samples / len(gpu) * multi_gpu_merge(biasgrads, 0, k), model.biasdelta[k], "bd_%d" % k) + model.weights[k] = ps.push_grad_and_pull_weight(model.weightdelta[k], model.weights[k], "w_%d" % k) + model.bias[k] = ps.push_grad_and_pull_weight(model.biasdelta[k], model.bias[k], "b_%d" % k) + if count % (len(gpu) * lazy_cycle) == 0: + print_training_accuracy(out, label, num_samples, 'Training') + print '---End of Epoch #', i, 'time:', time.time() - last + # do test + out, _, _ = bpprop(model, test_samples, test_labels) + print_training_accuracy(out, test_labels, num_test_samples, 'Testing') + +def multi_gpu_merge(l, base, layer): + if len(l) == 1: + return l[0][layer] + left = multi_gpu_merge(l[:len(l) / 2], base, layer) + right = multi_gpu_merge(l[len(l) / 2:], base + len(l) / 2, layer) + owl.set_device(base) + return left + right + +class MnistServer: + def __init__(self, mom=0.75): + print('Server; NodeID: %s, Rank: %d, RankSize: %d' % (ps.my_node_id, ps.my_rank, ps.rank_size)) + self.cpu = owl.create_cpu_device() + self.mom = mom + + def init_layer(self, name, weight): + type_, _, i = name.partition('_') + i = int(i) + + weights = [ + owl.randn([5, 5, 1, 16], 0.0, 0.1), + owl.randn([5, 5, 16, 32], 0.0, 0.1), + owl.randn([10, 512], 0.0, 0.1) + ]; + weightdelta = [ + owl.zeros([5, 5, 1, 16]), + owl.zeros([5, 5, 16, 32]), + owl.zeros([10, 512]) + ]; + bias = [ + owl.zeros([16]), + owl.zeros([32]), + owl.zeros([10, 1]) + ]; + biasdelta = [ + owl.zeros([16]), + owl.zeros([32]), + owl.zeros([10, 1]) + ]; + + if type_ == 'w': + np.copyto(weight, weights[i].to_numpy().flatten()) + elif type_ == 'wd': + np.copyto(weight, weightdelta[i].to_numpy().flatten()) + elif type_ == 'b': + np.copyto(weight, bias[i].to_numpy().flatten()) + elif type_ == 'bd': + np.copyto(weight, biasdelta[i].to_numpy().flatten()) + else: + assert False + + def update_layer(self, name, weight, gradient): + type_, _, _ = name.partition('_') + if type_ == 'wd' or type_ == 'bd': + # weight must be updated in place + weight *= self.mom + weight -= gradient + elif type_ == 'w' or type_ == 'b': + weight += gradient + else: + assert False + + +# PS - server +server = None +def server_node_init(): + global server + owl.initialize(sys.argv + ['-no_init_glog']) + server = MnistServer() + +def server_init_layer(name, weight): + server.init_layer(name, weight) + +def server_update_layer(name, weight, gradient): + server.update_layer(name, weight, gradient) + +# PS - worker +worker = None +def worker_node_init(): + global gpu + owl.initialize(sys.argv + ['-no_init_glog']) + parser = argparse.ArgumentParser(description='MNIST CNN') + parser.add_argument('-n', '--num', help='number of GPUs to use', action='store', type=int, default=1) + (args, remain) = parser.parse_known_args() + assert(1 <= args.num) + print 'Using %d GPU(s)' % args.num + gpu = [owl.create_gpu_device(i) for i in range(args.num)] + owl.set_device(gpu[0]) + +def worker_node_main(): + model = MNISTCNNModel() + model.init_random() + train_network(model) + +if __name__ == '__main__': + pass + diff --git a/example/python/mnist/mnist_mlp.py b/example/python/mnist/mnist_mlp.py index 3002825..c0cdf8e 100644 --- a/example/python/mnist/mnist_mlp.py +++ b/example/python/mnist/mnist_mlp.py @@ -12,7 +12,7 @@ class MnistTrainer: def __init__(self, data_file='mnist_all.mat', num_epochs=100, mb_size=256, eps_w=0.01, eps_b=0.01): - print('Worker; NodeID: %s, Rank: %d, RankSize: %d' % (ps.myNodeID(), ps.myRank(), ps.rankSize())) + print('Worker; NodeID: %s, Rank: %d, RankSize: %d' % (ps.my_node_id, ps.my_rank, ps.rank_size)) self.cpu = owl.create_cpu_device() self.gpu = owl.create_gpu_device(0) @@ -30,18 +30,10 @@ def __init__(self, data_file='mnist_all.mat', num_epochs=100, mb_size=256, eps_w # self.b1 = owl.zeros([l2, 1]) # self.b2 = owl.zeros([l3, 1]) # PS: instead, pull weights from servers - t_w1 = owl.zeros([l2, l1]).to_numpy() - t_w2 = owl.zeros([l3, l2]).to_numpy() - t_b1 = owl.zeros([l2, 1]).to_numpy() - t_b2 = owl.zeros([l3, 1]).to_numpy() - ps.PullWeight(t_w1, 'w1') - ps.PullWeight(t_w2, 'w2') - ps.PullWeight(t_b1, 'b1') - ps.PullWeight(t_b2, 'b2') - self.w1 = owl.from_numpy(t_w1) - self.w2 = owl.from_numpy(t_w2) - self.b1 = owl.from_numpy(t_b1) - self.b2 = owl.from_numpy(t_b2) + self.w1 = ps.pull_weight(owl.zeros([l2, l1]), 'w1') + self.w2 = ps.pull_weight(owl.zeros([l3, l2]), 'w2') + self.b1 = ps.pull_weight(owl.zeros([l2, 1]), 'b1') + self.b2 = ps.pull_weight(owl.zeros([l3, 1]), 'b2') if bulk: self.gw1 = owl.zeros([l2, l1]) @@ -87,36 +79,20 @@ def run(self): #self.b2 -= self.eps_b * gb2 # PS: instead, push gradients and pull weights from servers if not bulk: - t_w1 = self.w1.to_numpy() - t_w2 = self.w2.to_numpy() - t_b1 = self.b1.to_numpy() - t_b2 = self.b2.to_numpy() - ps.PushGradAndPullWeight(gw1.to_numpy(), t_w1, 'w1') - ps.PushGradAndPullWeight(gw2.to_numpy(), t_w2, 'w2') - ps.PushGradAndPullWeight(gb1.to_numpy(), t_b1, 'b1') - ps.PushGradAndPullWeight(gb2.to_numpy(), t_b2, 'b2') - self.w1 = owl.from_numpy(t_w1) - self.w2 = owl.from_numpy(t_w2) - self.b1 = owl.from_numpy(t_b1) - self.b2 = owl.from_numpy(t_b2) + self.w1 = ps.push_grad_and_pull_weight(gw1, self.w1, 'w1') + self.w2 = ps.push_grad_and_pull_weight(gw2, self.w2, 'w2') + self.b1 = ps.push_grad_and_pull_weight(gb1, self.b1, 'b1') + self.b2 = ps.push_grad_and_pull_weight(gb2, self.b2, 'b2') else: self.gw1 += gw1 self.gw2 += gw2 self.gb1 += gb1 self.gb2 += gb2 if count % 10 == 0: - t_w1 = self.w1.to_numpy() - t_w2 = self.w2.to_numpy() - t_b1 = self.b1.to_numpy() - t_b2 = self.b2.to_numpy() - ps.PushGradAndPullWeight(self.gw1.to_numpy(), t_w1, 'w1') - ps.PushGradAndPullWeight(self.gw2.to_numpy(), t_w2, 'w2') - ps.PushGradAndPullWeight(self.gb1.to_numpy(), t_b1, 'b1') - ps.PushGradAndPullWeight(self.gb2.to_numpy(), t_b2, 'b2') - self.w1 = owl.from_numpy(t_w1) - self.w2 = owl.from_numpy(t_w2) - self.b1 = owl.from_numpy(t_b1) - self.b2 = owl.from_numpy(t_b2) + self.w1 = ps.push_grad_and_pull_weight(self.gw1, self.w1, 'w1') + self.w2 = ps.push_grad_and_pull_weight(self.gw2, self.w2, 'w2') + self.b1 = ps.push_grad_and_pull_weight(self.gb1, self.b1, 'b1') + self.b2 = ps.push_grad_and_pull_weight(self.gb2, self.b2, 'b2') self.gw1 -= self.gw1 self.gw2 -= self.gw2 self.gb1 -= self.gb1 @@ -129,18 +105,10 @@ def run(self): count = count + 1 if bulk: - t_w1 = self.w1.to_numpy() - t_w2 = self.w2.to_numpy() - t_b1 = self.b1.to_numpy() - t_b2 = self.b2.to_numpy() - ps.PushGradAndPullWeight(self.gw1.to_numpy(), t_w1, 'w1') - ps.PushGradAndPullWeight(self.gw2.to_numpy(), t_w2, 'w2') - ps.PushGradAndPullWeight(self.gb1.to_numpy(), t_b1, 'b1') - ps.PushGradAndPullWeight(self.gb2.to_numpy(), t_b2, 'b2') - self.w1 = owl.from_numpy(t_w1) - self.w2 = owl.from_numpy(t_w2) - self.b1 = owl.from_numpy(t_b1) - self.b2 = owl.from_numpy(t_b2) + self.w1 = ps.push_grad_and_pull_weight(self.gw1, self.w1, 'w1') + self.w2 = ps.push_grad_and_pull_weight(self.gw2, self.w2, 'w2') + self.b1 = ps.push_grad_and_pull_weight(self.gb1, self.b1, 'b1') + self.b2 = ps.push_grad_and_pull_weight(self.gb2, self.b2, 'b2') # test a1 = test_samples @@ -156,25 +124,25 @@ def run(self): class MnistServer: def __init__(self): - print('Server; NodeID: %s, Rank: %d, RankSize: %d' % (ps.myNodeID(), ps.myRank(), ps.rankSize())) + print('Server; NodeID: %s, Rank: %d, RankSize: %d' % (ps.my_node_id, ps.my_rank, ps.rank_size)) self.cpu = owl.create_cpu_device() def init_layer(self, name, weight): l1 = 784; l2 = 256; l3 = 10 - w1 = owl.randn([l2, l1], 0.0, math.sqrt(4.0 / (l1 + l2))).to_numpy() - w2 = owl.randn([l3, l2], 0.0, math.sqrt(4.0 / (l2 + l3))).to_numpy() - b1 = owl.zeros([l2, 1]).to_numpy() - b2 = owl.zeros([l3, 1]).to_numpy() + w1 = owl.randn([l2, l1], 0.0, math.sqrt(4.0 / (l1 + l2))) + w2 = owl.randn([l3, l2], 0.0, math.sqrt(4.0 / (l2 + l3))) + b1 = owl.zeros([l2, 1]) + b2 = owl.zeros([l3, 1]) if name == 'w1': - np.copyto(weight, w1.flatten()) + np.copyto(weight, w1.to_numpy().flatten()) elif name == 'w2': - np.copyto(weight, w2.flatten()) + np.copyto(weight, w2.to_numpy().flatten()) elif name == 'b1': - np.copyto(weight, b1.flatten()) + np.copyto(weight, b1.to_numpy().flatten()) elif name == 'b2': - np.copyto(weight, b2.flatten()) + np.copyto(weight, b2.to_numpy().flatten()) else: assert False diff --git a/src/app/python/ps.py b/src/app/python/ps.py new file mode 100644 index 0000000..85ce3d5 --- /dev/null +++ b/src/app/python/ps.py @@ -0,0 +1,52 @@ +import _ps + +try: + import owl + _owl_loaded = True +except ImportError: + owl = object() + owl.NArray = None + owl.from_numpy = lambda x: None + _owl_loaded = False + + +__all__ = ['my_node_id', 'my_rank', 'rank_size', 'pull_weight', 'push_grad_and_pull_weight'] + + +my_node_id = _ps.myNodeID() + +my_rank = _ps.myRank() + +rank_size = _ps.rankSize() + + +def pull_weight(weight, name): + is_weight_narray = _owl_loaded and isinstance(weight, owl.NArray) + + if is_weight_narray: + weight = weight.to_numpy() + + _ps.PullWeight(weight, name) + + if is_weight_narray: + return owl.from_numpy(weight) + else: + return weight + + +def push_grad_and_pull_weight(grad, weight, name): + is_weight_narray = _owl_loaded and isinstance(weight, owl.NArray) + + if is_weight_narray: + weight = weight.to_numpy() + + if _owl_loaded and isinstance(grad, owl.NArray): + grad = grad.to_numpy() + + _ps.PushGradAndPullWeight(grad, weight, name) + + if is_weight_narray: + return owl.from_numpy(weight) + else: + return weight + diff --git a/src/app/python/python_bindings.cc b/src/app/python/python_bindings.cc index 7eb550b..69a381e 100644 --- a/src/app/python/python_bindings.cc +++ b/src/app/python/python_bindings.cc @@ -79,7 +79,7 @@ static void PushGradAndPullWeight(boost::numpy::ndarray grad, boost::numpy::ndar } // namespace PS -BOOST_PYTHON_MODULE(ps) { +BOOST_PYTHON_MODULE(_ps) { using namespace boost::python; using namespace PS; @@ -94,7 +94,7 @@ BOOST_PYTHON_MODULE(ps) { namespace PS { void init_bindings() { - initps(); + init_ps(); } } // namespace PS From a79bd66cc54ef7eb9d2117aca06e77a66d8c719f Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Fri, 13 Mar 2015 21:51:26 -0400 Subject: [PATCH 09/14] add support for multiple workers --- example/python/mnist/mnist_cnn.py | 17 +++++++++++++---- example/python/mnist/mnist_mlp.py | 18 ++++++++++-------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/example/python/mnist/mnist_cnn.py b/example/python/mnist/mnist_cnn.py index 88ba622..bf0a9d2 100644 --- a/example/python/mnist/mnist_cnn.py +++ b/example/python/mnist/mnist_cnn.py @@ -25,6 +25,7 @@ def __init__(self): ]; def init_random(self): + # PS: pull from the server #self.weights = [ # owl.randn([5, 5, 1, 16], 0.0, 0.1), # owl.randn([5, 5, 16, 32], 0.0, 0.1), @@ -105,6 +106,7 @@ def bpprop(model, samples, label): biasgrad[0] = model.convs[0].bias_grad(errs[1]) return (out, weightgrad, biasgrad) +# PS: "mom" is used by the server #def train_network(model, num_epochs=100, minibatch_size=256, lr=0.01, mom=0.75, wd=5e-4): def train_network(model, num_epochs=100, minibatch_size=256, lr=0.01, wd=5e-4): # load data @@ -118,7 +120,9 @@ def train_network(model, num_epochs=100, minibatch_size=256, lr=0.01, wd=5e-4): count = 0 weightgrads = [None] * len(gpu) biasgrads = [None] * len(gpu) - for (mb_samples, mb_labels) in train_data: + for idx, (mb_samples, mb_labels) in enumerate(train_data): + if idx % ps.rank_size != ps.my_rank: continue + count += 1 current_gpu = count % len(gpu) owl.set_device(gpu[current_gpu]) @@ -126,9 +130,11 @@ def train_network(model, num_epochs=100, minibatch_size=256, lr=0.01, wd=5e-4): data = owl.from_numpy(mb_samples).reshape([28, 28, 1, num_samples]) label = owl.from_numpy(mb_labels) out, weightgrads[current_gpu], biasgrads[current_gpu] = bpprop(model, data, label) + # PS: XXX: where is start_eval()? #out.start_eval() if current_gpu == 0: for k in range(len(model.weights)): + # PS: use the server for updates #model.weightdelta[k] = mom * model.weightdelta[k] - lr / num_samples / len(gpu) * multi_gpu_merge(weightgrads, 0, k) - lr * wd * model.weights[k] #model.biasdelta[k] = mom * model.biasdelta[k] - lr / num_samples / len(gpu) * multi_gpu_merge(biasgrads, 0, k) #model.weights[k] += model.weightdelta[k] @@ -152,6 +158,7 @@ def multi_gpu_merge(l, base, layer): owl.set_device(base) return left + right + class MnistServer: def __init__(self, mom=0.75): print('Server; NodeID: %s, Rank: %d, RankSize: %d' % (ps.my_node_id, ps.my_rank, ps.rank_size)) @@ -206,7 +213,7 @@ def update_layer(self, name, weight, gradient): assert False -# PS - server +# PS: server server = None def server_node_init(): global server @@ -219,7 +226,7 @@ def server_init_layer(name, weight): def server_update_layer(name, weight, gradient): server.update_layer(name, weight, gradient) -# PS - worker +# PS: worker worker = None def worker_node_init(): global gpu @@ -229,7 +236,9 @@ def worker_node_init(): (args, remain) = parser.parse_known_args() assert(1 <= args.num) print 'Using %d GPU(s)' % args.num - gpu = [owl.create_gpu_device(i) for i in range(args.num)] + # PS: for local test + #gpu = [owl.create_gpu_device(i) for i in range(args.num)] + gpu = [owl.create_gpu_device((i + ps.my_rank * args.num) % owl.get_gpu_device_count()) for i in range(args.num)] owl.set_device(gpu[0]) def worker_node_main(): diff --git a/example/python/mnist/mnist_mlp.py b/example/python/mnist/mnist_mlp.py index c0cdf8e..1f7b9dd 100644 --- a/example/python/mnist/mnist_mlp.py +++ b/example/python/mnist/mnist_mlp.py @@ -15,7 +15,9 @@ def __init__(self, data_file='mnist_all.mat', num_epochs=100, mb_size=256, eps_w print('Worker; NodeID: %s, Rank: %d, RankSize: %d' % (ps.my_node_id, ps.my_rank, ps.rank_size)) self.cpu = owl.create_cpu_device() - self.gpu = owl.create_gpu_device(0) + # PS: for local tes + #self.gpu = owl.create_gpu_device(0) + self.gpu = owl.create_gpu_device(ps.my_rank % owl.get_gpu_device_count()) self.data_file = data_file self.num_epochs=num_epochs self.mb_size=mb_size @@ -23,13 +25,12 @@ def __init__(self, data_file='mnist_all.mat', num_epochs=100, mb_size=256, eps_w self.eps_b=eps_b # init weight l1 = 784; l2 = 256; l3 = 10 - # PS: do not initialize weights on workers + # PS: pull weights from servers # self.l1 = l1; self.l2 = l2; self.l3 = l3 # self.w1 = owl.randn([l2, l1], 0.0, math.sqrt(4.0 / (l1 + l2))) # self.w2 = owl.randn([l3, l2], 0.0, math.sqrt(4.0 / (l2 + l3))) # self.b1 = owl.zeros([l2, 1]) # self.b2 = owl.zeros([l3, 1]) - # PS: instead, pull weights from servers self.w1 = ps.pull_weight(owl.zeros([l2, l1]), 'w1') self.w2 = ps.pull_weight(owl.zeros([l3, l2]), 'w2') self.b1 = ps.pull_weight(owl.zeros([l2, 1]), 'b1') @@ -51,7 +52,9 @@ def run(self): for epoch in range(self.num_epochs): print '---Start epoch #%d' % epoch # train - for (mb_samples, mb_labels) in train_data: + for idx, (mb_samples, mb_labels) in enumerate(train_data): + if idx % ps.rank_size != ps.my_rank: continue + num_samples = mb_samples.shape[0] a1 = owl.from_numpy(mb_samples) @@ -72,12 +75,11 @@ def run(self): gw2 = s3 * a2.trans() / num_samples gb2 = s3.sum(1) / num_samples # update - # PS: do not update weights locally + # PS: update weights on the server #self.w1 -= self.eps_w * gw1 #self.w2 -= self.eps_w * gw2 #self.b1 -= self.eps_b * gb1 #self.b2 -= self.eps_b * gb2 - # PS: instead, push gradients and pull weights from servers if not bulk: self.w1 = ps.push_grad_and_pull_weight(gw1, self.w1, 'w1') self.w2 = ps.push_grad_and_pull_weight(gw2, self.w2, 'w2') @@ -158,7 +160,7 @@ def update_layer(self, name, weight, gradient): assert False -# PS - server +# PS: server server = None def server_node_init(): global server @@ -171,7 +173,7 @@ def server_init_layer(name, weight): def server_update_layer(name, weight, gradient): server.update_layer(name, weight, gradient) -# PS - worker +# PS: worker worker = None def worker_node_init(): global worker From af98e72d3a48bf8487093966285983ac050d6053 Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Mon, 16 Mar 2015 00:46:04 -0400 Subject: [PATCH 10/14] remove test python script --- src/app/python/test.py | 41 ----------------------------------------- 1 file changed, 41 deletions(-) delete mode 100644 src/app/python/test.py diff --git a/src/app/python/test.py b/src/app/python/test.py deleted file mode 100644 index 311d526..0000000 --- a/src/app/python/test.py +++ /dev/null @@ -1,41 +0,0 @@ -import ps - -#kServerGroup = 'all_servers' - -def init_layer(name, weight): - print(ps.myNodeID(), ", this is server ", ps.myRank()) - pass - -def update_layer(name, weight, gradient): - pass - -def worker_node_main(): - print(ps.myNodeID(), ", this is worker ", ps.myRank()) - - print(1) - import owl - print(2) - import sys - print(3) - owl.initialize(sys.argv) - print(4) - cpu = owl.create_cpu_device() - print(5) - gpu = [owl.create_gpu_device(i) for i in range(owl.get_gpu_device_count())] - print(6) - print ''' - __ __ _ __ _ _____ ____ _ _ ___ - / | / | | | | \\ | | | ___| | _ \\ | | / / / | - / |/ | | | | \\| | | |__ | |_| | | | / / / /| | - / /| /| | | | | | | __| | / | |/ / / /_| | - / / | / | | | | | |\\ | | |___ | |\\ \\ | / / ___ | - /_/ |_/ |_| |_| |_| \\__| |_____| |_| \\_\\ |__/ /_/ |_| - ''' - print '[INFO] You have %d GPU devices' % len(gpu) - print '[INFO] Set device to gpu[0]' - print(7) - - -if __name__ == '__main__': - pass - From f63b5fb38e846067fa8b7c8cf083eb82cf31ae39 Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Mon, 16 Mar 2015 00:46:16 -0400 Subject: [PATCH 11/14] add .pyc to .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index daa4ce1..ee45edf 100644 --- a/.gitignore +++ b/.gitignore @@ -9,5 +9,6 @@ *.d *.o *.pb.cc +*.pyc .* /script/van* From e3d4869e193d0d8402aa0ecfa99a969ef94a0010 Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Tue, 17 Mar 2015 14:18:29 -0400 Subject: [PATCH 12/14] make argc at least 1 for safety --- src/app/python/python_env.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/app/python/python_env.cc b/src/app/python/python_env.cc index fa3c646..6c448f1 100644 --- a/src/app/python/python_env.cc +++ b/src/app/python/python_env.cc @@ -16,6 +16,8 @@ void PythonEnv::load_file(const char* path, int argc, char** argv) { globals_ = boost::python::dict(main_module.attr("__dict__")); // construct a new argument list, with the first one replaced by the script path to enable module imports in the same directory + if (argc == 0) + argc = 1; char* new_argv[argc]; new_argv[0] = const_cast(path); for (int i = 1; i < argc; i++) From 1055996ccd2a1dc0122e52d30091bf6682186b7f Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Tue, 17 Mar 2015 14:18:49 -0400 Subject: [PATCH 13/14] make -n work (a contributed patch) --- example/python/mnist/mnist_cnn.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/example/python/mnist/mnist_cnn.py b/example/python/mnist/mnist_cnn.py index bf0a9d2..451ad9d 100644 --- a/example/python/mnist/mnist_cnn.py +++ b/example/python/mnist/mnist_cnn.py @@ -230,10 +230,10 @@ def server_update_layer(name, weight, gradient): worker = None def worker_node_init(): global gpu - owl.initialize(sys.argv + ['-no_init_glog']) parser = argparse.ArgumentParser(description='MNIST CNN') - parser.add_argument('-n', '--num', help='number of GPUs to use', action='store', type=int, default=1) + parser.add_argument('-n', '--num', dest='num', help='number of GPUs to use', action='store', type=int, default=1) (args, remain) = parser.parse_known_args() + owl.initialize([sys.argv[0]] + remain + ['-no_init_glog']) assert(1 <= args.num) print 'Using %d GPU(s)' % args.num # PS: for local test From d6be933162dac57671c98e2cc5cb43c472af83ba Mon Sep 17 00:00:00 2001 From: Hyeontaek Lim Date: Tue, 17 Mar 2015 14:25:05 -0400 Subject: [PATCH 14/14] add local test scripts --- example/python/mnist/run_1_1.sh | 25 +++++++++++++++++++++++++ example/python/mnist/run_1_2.sh | 26 ++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100755 example/python/mnist/run_1_1.sh create mode 100755 example/python/mnist/run_1_2.sh diff --git a/example/python/mnist/run_1_1.sh b/example/python/mnist/run_1_1.sh new file mode 100755 index 0000000..5582cb3 --- /dev/null +++ b/example/python/mnist/run_1_1.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +BASEDIR=$(dirname $0)/../../.. + +export LD_LIBRARY_PATH=${BASEDIR}/../minerva/deps/lib:${BASEDIR}/third_party/lib:${LD_LIBRARY_PATH} +export PYTHONPATH=${BASEDIR}/../minerva/release/owl:${BASEDIR}/../minerva/owl:$PYTHONPATH +export PYTHONPATH=${BASEDIR}/src/app/python/:$PYTHONPATH + +scheduler="role:SCHEDULER,hostname:'127.0.0.1',port:8000,id:'H'" +W0="role:WORKER,hostname:'127.0.0.1',port:8001,id:'W0'" +S0="role:SERVER,hostname:'127.0.0.1',port:8010,id:'S0'" +arg="-num_servers 1 -num_workers 1 -num_threads 1" +bin=$BASEDIR/build/ps_python +script="$1" +shift + +for PID in $(ps aux | grep build/ps_python | grep -v grep | awk '{ print $2; }'); do + kill -9 $PID +done + + +$bin $arg -scheduler $scheduler -my_node $scheduler & +$bin $arg -scheduler $scheduler -my_node $S0 -script "$script" -- "$@" & +$bin $arg -scheduler $scheduler -my_node $W0 -script "$script" -- "$@" + diff --git a/example/python/mnist/run_1_2.sh b/example/python/mnist/run_1_2.sh new file mode 100755 index 0000000..e869c86 --- /dev/null +++ b/example/python/mnist/run_1_2.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +BASEDIR=$(dirname $0)/../../.. + +export LD_LIBRARY_PATH=${BASEDIR}/../minerva/deps/lib:${BASEDIR}/third_party/lib:${LD_LIBRARY_PATH} +export PYTHONPATH=${BASEDIR}/../minerva/release/owl:${BASEDIR}/../minerva/owl:$PYTHONPATH +export PYTHONPATH=${BASEDIR}/src/app/python/:$PYTHONPATH + +scheduler="role:SCHEDULER,hostname:'127.0.0.1',port:8000,id:'H'" +W0="role:WORKER,hostname:'127.0.0.1',port:8001,id:'W0'" +W1="role:WORKER,hostname:'127.0.0.1',port:8002,id:'W1'" +S0="role:SERVER,hostname:'127.0.0.1',port:8010,id:'S0'" +arg="-num_servers 1 -num_workers 2 -num_threads 1" +bin=$BASEDIR/build/ps_python +script="$1" +shift + +for PID in $(ps aux | grep build/ps_python | grep -v grep | awk '{ print $2; }'); do + kill -9 $PID +done + +$bin $arg -scheduler $scheduler -my_node $scheduler & +$bin $arg -scheduler $scheduler -my_node $S0 -script "$script" -- "$@" & +$bin $arg -scheduler $scheduler -my_node $W0 -script "$script" -- "$@" & +$bin $arg -scheduler $scheduler -my_node $W1 -script "$script" -- "$@" +