control

0.3.2-SNAPSHOT


A clojure DSL for system admin and deployment with many remote machines

dependencies

org.clojure/clojure
1.3.0
org.clojure/tools.cli
0.2.1

dev dependencies

lein-exec
0.1
lein-marginalia
0.7.0
codox
0.5.0



(this space intentionally left almost blank)
 

A set of DSL for ssh, inspired by Fabric

(ns #^{:doc 
       :author "Sun Ning <classicning@gmail.com>  Dennis Zhuang<killme2008@gmail.com>"}
  control.commands)

modify shell path

(def SEP " ; ")
(defmacro path
  [new-path & cmd]
  `(str "export PATH=" ~new-path ":$PATH"  SEP ~@cmd))

change current directory

(defmacro cd 
  [path & cmd]
  `(str "cd " ~path SEP ~@cmd))

execute a prefix command, for instance, activate shell profile

(defmacro prefix 
  [pcmd & cmd]  
  `(str ~pcmd " && " ~@cmd))

declare a env variable for next command

(defmacro env 
  [key val & cmd]
  `(str ~key "=" ~val " " ~@cmd))

simply run several commands

(defn run
  [ & cmds]
  (let [rt  (apply str cmds)]
    (if (.endsWith rt SEP)
      rt
      (str rt SEP))))

run a command with sudo

(defmacro sudo
  [cmd]
  `(if (.endsWith ~cmd SEP)
     (str "sudo " ~cmd)
     (str "sudo " ~cmd SEP)))

Append a line to a file

(defn append
  [file line & opts]
  (let [m (apply hash-map opts)
        escaple (:escaple m)
        sudo (:sudo m)]
    (if sudo
      (str "echo '" line "' | sudo tee -a " file SEP) 
      (str "echo '" line "' >> " file SEP))))
(defn sed-
  [file before after flags backup limit]
  (str "sed -i" backup " -r -e \ limit " s/"  before "/" after "/" flags "\" " file SEP))

Use sed to replace strings matched pattern with options.Valid options include: :sudo => true or false to use sudo,default is false. :flags => sed options,default is nil. :limit => sed limit,default is not limit. :backup => backup file posfix,default is ".bak" Equivalent to sed -i -r -e "// s///g ".

(defn sed
  [file before after & opts]
  (let [opts (apply hash-map opts)
        use-sudo (:sudo opts)
        flags (str (:flags opts) "g")
        backup (or (:backup opts) ".bak")
        limit (:limit opts)]
    (if use-sudo
      (sudo (sed- file before after flags backup limit))
      (sed- file before after flags backup limit))))

Comments a line in a file with special character,default :char is "#" It use sed function to replace the line matched pattern, :sudo is also valid

(defn  comm
  [file pat & opts]
  (let [m (apply hash-map opts)
        char  (or (:char m) "#")]
    (apply sed file pat (str char "&") opts)))

uncomment a line in a file

(defn  uncomm
  [file pat & opts]
  (let [m (apply hash-map opts)
        char  (or (:char m) "#")]
    (apply sed file (str "\\s*" char "+\\s*(" pat ")") "\\1" opts)))

cat a file

(defn cat
  [file]
  (str "cat " file))

chmod [mod] [file]

(defn chmod
  [mod file]
  (str "chmod " mod " " file SEP))
 
(ns control.core
  (:use [clojure.java.io :only [reader]]
        [clojure.java.shell :only [sh]]
        [clojure.string :only [join blank?]]
        [clojure.walk :only [walk postwalk]]))
(def ^:dynamic *enable-color* true)
(def ^{:dynamic true} *enable-logging* true)
(def ^:dynamic *debug* false)
(def ^:private bash-reset "\033[0m")
(def ^:private bash-bold "\033[1m")
(def ^:private bash-redbold "\033[1;31m")
(def ^:private bash-greenbold "\033[1;32m")
(defmacro cli-bash-bold [& content]
  `(if *enable-color*
     (str bash-bold ~@content bash-reset)
     (str ~@content)))
(defmacro cli-bash-redbold [& content]
  `(if *enable-color*
     (str bash-redbold ~@content bash-reset)
     (str ~@content)))
(defmacro cli-bash-greenbold [& content]
  `(if *enable-color*
     (str bash-greenbold ~@content bash-reset)
     (str ~@content)))
(defstruct ExecProcess  :stdout :stderr :status)
(defn gen-log [host tag content]
  (str (cli-bash-redbold host ":")
       (cli-bash-greenbold tag ": ")
       (join " " content)))
(defn log-with-tag [host tag & content]
  (if (and *enable-logging* (not (blank? (join " " content))))
    (println (gen-log host tag content))))
(defn- not-nil? [obj]
  (not (nil? obj)))
(defn  ^:dynamic  exec [host user cmdcol]
  (let [rt (apply sh (filter not-nil? cmdcol))
        status (:exit rt)
        stdout (:out rt)
        stderr (:err rt)
        execp (struct-map ExecProcess :stdout stdout :stderr stderr :status status)]
    (log-with-tag host "stdout" (:stdout execp))
    (log-with-tag host "stderr" (:stderr execp))
    (log-with-tag host "exit" status)
    execp))
(defn ssh-client [host user]
  (str user "@" host))
(defn- user-at-host? [host user]
  (fn [m]
    (and (= (:user m) user) (= (:host m) host))))
(defn- find-client-options [host user cluster sym]
  (let [m (first (filter (user-at-host? host user) (:clients cluster)))]
    (or (sym m) (sym cluster))))
(defn- make-cmd-array
  [cmd options others]
  (if (vector? options)
    (concat (cons cmd options) others)
    (cons cmd (cons options others))))

Execute commands via ssh: (ssh "date") (ssh "ps aux|grep java")

(defn ssh
  [host user cluster cmd & opts]
  (let [m (apply hash-map opts)
        sudo (:sudo m)
        cmd (if sudo
              (str "sudo " cmd)
              cmd)
        ssh-options (or (:ssh-options opts) (find-client-options host user cluster :ssh-options))]
	(log-with-tag host "ssh" ssh-options cmd)
	(exec host
          user
          (make-cmd-array "ssh"
                          ssh-options
                          [(ssh-client host user) cmd]))))
(defn rsync [host user cluster src dst]
  (let [rsync-options (find-client-options host user cluster :rsync-options)]
    (log-with-tag host "rsync" rsync-options (str src " ==>" dst))
    (exec host
          user
          (make-cmd-array "rsync"
                          rsync-options
                          [src (str (ssh-client host user) ":" dst)]))))
(def ^{:dynamic true} *tmp-dir* nil)

Copy local files to remote machines: (scp "test.txt" "remote.txt") (scp ["1.txt" "2.txt"] "/home/deploy/")

(defn scp
  [host user cluster local remote & opts]
  (let [files (if (coll? local)
                (vec local)
                [local])
        m (apply hash-map opts)
        scp-options (or (:scp-options m) (find-client-options host user cluster :scp-options))
        mode (:mode m)
        sudo (:sudo m)
        use-tmp (or sudo mode)
        tmp (if use-tmp
              (or *tmp-dir* (str "/tmp/control-" (System/currentTimeMillis) "/"))
              remote)]
    (log-with-tag host "scp" scp-options
      (join " " (concat files [ " ==> " tmp])))
    (if use-tmp
      (ssh host user cluster (str "mkdir -p " tmp)))
    (let [rt (exec host
                   user
                   (make-cmd-array "scp"
                                   scp-options
                                   (concat files [(str (ssh-client host user) ":" tmp)])))]
      (if mode
        (apply ssh host user cluster (str "chmod " mode  " " tmp "*") opts))
      (if use-tmp
        (apply ssh host user cluster (str "mv "  tmp "* " remote " ; rm -rf " tmp) opts)
        rt))))

All tasks defined in control file

(defonce tasks (atom (hash-map)))

All clusters defined in control file

(defonce clusters (atom (hash-map)))
(def ^:private system-functions
  #{(symbol "scp") (symbol "ssh") (symbol "rsync") (symbol "call") (symbol "exists?")})

Define a task for executing on remote machines: (deftask :date "Get date from remote machines" (ssh "date"))

(defmacro
  ^{:doc 
    :arglists '([name doc-string? [params*] body])
    :added "0.1"}
  deftask [name & decl ]
  (let [m (if (string? (first decl))
            (next decl)
            decl)
        arguments (first m)
        body (next m)
        new-body (postwalk (fn [item]
                             (if (list? item)
                               (let [cmd (first item)]
                                 (if (cmd system-functions)
                                   (concat (list cmd  'host 'user 'cluster) (rest item))
                                   item))
                               item))
                           body)]
    (if (not (vector? arguments))
      (throw (IllegalArgumentException. "Task must have arguments even if []")))
    (if *debug*
      (prn name "new-body:" new-body))
    `(swap! tasks
            assoc
            ~name
            ~(list 'fn
                   (vec (concat '[host user cluster] arguments))
                   (cons 'do new-body)))))

Call other tasks in deftask,for example: (call :ps "java")

(defn call
  [host user cluster task & args]
  (apply
   (task @tasks)
   host user cluster args))

Check if a file exists

(defn exists?
  [host user cluster file]
  (= (:status (ssh host user cluster (str "test -e " file))) 0))
(defn- unquote-cluster [args]
  (walk (fn [item]
          (cond (and (seq? item) (= `unquote (first item)))
                ,(second item)
                (or (seq? item) (symbol? item))
                ,(list 'quote item)
                :else
                ,(unquote-cluster item)))
        identity
        args))

Define a cluster including some remote machines

(defmacro
  ^{:doc 
    :arglists '([name & options])
    :added "0.1"}
  defcluster [name & args]
  `(let [m# (apply hash-map ~(cons 'list (unquote-cluster args)))]
     (swap! clusters assoc ~name (assoc m# :name ~name))))
(defmacro when-exit
  ([test error]
     `(when-exit ~test ~error nil))
  ([test error else]
     `(if ~test
        (do (println ~error) (throw (RuntimeException. ~error)))
        ~else)))
(defn- perform [host user cluster task taskName arguments]
  (do (if *enable-logging* (println (cli-bash-bold "Performing " (name taskName) " for " host)))
      (apply task host user cluster arguments)))
(defn- arg-count [f]
  (let [m (first (filter #(= (.getName %) "invoke") (.getDeclaredMethods (class f))))
        p (when m (.getParameterTypes m))]
    (if p
      (alength p)
      3)))
(defn do-begin [args]
  (when-exit (< (count args) 2)
             "Please offer cluster and task name"
             (let [cluster-name (keyword (first args))
                   task-name (keyword (second args))
                   task-args (next (next args))
                   cluster (cluster-name @clusters)
                   parallel (:parallel cluster)
                   user (:user cluster)
                   addresses (:addresses cluster)
                   clients (:clients cluster)
                   task (task-name @tasks)
                   includes (:includes cluster)
                   debug (:debug cluster)
                   log (:log cluster)]
               (when-exit (nil? task)
                          (str "No task named " (name task-name)))
               (when-exit (and (empty? addresses)
                               (empty? includes)
                               (empty? clients))
                          (str "Empty hosts for cluster "
                               (name cluster-name)))
               (let [task-arg-count (- (arg-count task) 3)]
                 (when-exit (> task-arg-count (count task-args))
                            (str "Task "
                                 (name task-name)
                                 " just needs "
                                 task-arg-count
                                 " arguments")))
               (binding [*enable-logging* (if (nil? log) true log)
                         *debug* debug]
                 (if *enable-logging*
                   (println  (str bash-bold
                                  "Performing "
                                  (name cluster-name)
                                  bash-reset
                                  (if parallel
                                    " in parallel"))))
                 (let [map-fn (if parallel pmap map)
                       a (doall (map-fn (fn [addr] [addr (perform addr user cluster task task-name task-args)])
                                        addresses))
                       c (doall (map-fn (fn [cli] [(:host cli) (perform (:host cli) (:user cli) cluster task task-name task-args)])
                                        clients))]
                   (merge (into {} (concat a c))
                          (when includes
                            (if (coll? includes)
                              (mapcat #(do-begin (cons % (next args))) includes)
                              (do-begin (cons (name includes) (next args)))))))))))
(defn begin []
  (do-begin *command-line-args*)
  (shutdown-agents))
 
(ns control.main
  (:use [control.core])
  (:use [control.commands]))
(defn- load-control-file [filename]
  (try 
    (binding [*ns* (the-ns 'control.main)]
      (load-file filename))
  (catch java.io.FileNotFoundException e (println "control file not found."))))
(defn -main
  [file & args]
  (do
    (load-control-file file)
    (do-begin args)))
 
(ns leiningen.control
  (:use [control.core :only [do-begin clusters]]
        [clojure.string :only [join]]
        [clojure.tools.cli :only [cli]]
        [leiningen.help :only [help-for]]
        [clojure.java.io :only [file reader writer]]))
(defn- get-config [project key]
  (get-in project [:control key]))
(defn- create-control-ns []
  (create-ns (gensym "user-control")))
(defn- load-control-file [project]
  (try 
    (binding [*ns* (create-control-ns)]
      (refer-clojure)
      (use '[control core commands])
      (load-file
       (or 
        (get-config project :control-file)
        "./control.clj")))
    (catch java.io.FileNotFoundException e (println "control file not found."))))
(defn- run-control [project args]
  (do
    (load-control-file project)
    (do-begin args)))
(defn- handle-conn
  [^java.net.Socket socket]
  (with-open [s socket
              rdr (reader socket)
              wtr (writer socket)]
    (try
      (let [line (.readLine rdr)
            args (seq (.split line " "))
            rt (with-out-str 
                 (do-begin (vec args)))]
        (spit wtr rt))
      (catch Throwable e
        (spit wtr (.getMessage e))))))

Start a control server for handling requests: -p [--port] port , listen on which port

(defn server
  [project & args]
  (load-control-file project)
  (let [[options _ banner]
        (cli args
             ["-p" "--port" "Which port to listen on." :default 8123
              :parse-fn #(Integer/parseInt %)])
        {:keys [port]} options
        ^java.net.ServerSocket ss (java.net.ServerSocket. port)
        server (agent ss)]
    (set-error-handler! server
                        (fn [_ e]
                          (.printStackTrace e)))
    (send-off server
              (fn [^java.net.ServerSocket ss]
                (let [s (.accept ss)]
                  (handle-conn s)
                  (recur ss))))
    (await server)))

Initialize clojure-control, create a sample control file in project home

(defn init
  [project & args]
  (let [control-file (file "." "control.clj")]
    (if-not (.exists control-file)
      (spit control-file 
            (str 
             "(defcluster :default-cluster\n"
             "  :clients [\n"
             "    {:host \"localhost\" :user \"root\"}\n"
             "  ])\n"
             "\n"
             "(deftask :date \"echo date on cluster\""
             "  []\n"
             "  (ssh \"date\"))\n")))))

Run user-defined clojure-control tasks against certain cluster, -r [--[no-]remote],running commands on remote control server,default is false. -p [--port] port, control server port, -h [--host] host, control server host.

(defn run
  [project & args]
  (let [[options extra-args]
        (cli args
             ["-p" "--port" "Which port to connect." :default 8123 :parse-fn #(Integer/parseInt %)]
             ["-r" "--[no-]remote" :default false]
             [ "-h" "--host" "Which host to connect." :default "localhost"])
        {:keys [port host remote]} options]
    (if-not remote
      (run-control project args)
      (let [^java.net.Socket sock (java.net.Socket. host port)]
        (.setTcpNoDelay sock true)
        (let [rdr (reader sock)
              wtr (writer sock)]
          (.write ^java.io.Writer wtr (str (join " " extra-args) "\n"))
          (.flush wtr)
          (println (slurp rdr))
          (.close wtr)
          (.close rdr)
          (.close sock))))))

Show cluster info

(defn show
  [project & args]
  (do 
    (load-control-file project)
    (if-let [cluster-name (first args)]
      (let [user (:user ((keyword cluster-name) @clusters))]
        (doseq
            [c (:clients ((keyword cluster-name) @clusters))]
          (println (str (:user c) "@" (:host c))))
        (doseq
            [a (:addresses ((keyword cluster-name) @clusters))]
          (println (str user "@" a)))))))

Leiningen plugin for Clojure-Control

(defn control
  {:help-arglists '([subtask [cluster task [args...]]])
   :subtasks [#'init #'run #'show #'server]}
  ([project]
     (println (help-for "control")))
  ([project subtask & args]
     (case subtask
           "init" (apply init project args)
           "run" (apply run project args)
           "show" (apply show project args)
           "server" (apply server project args)
           (println (help-for "control")))))