Переглянути джерело

Use 'lparallels'. Experimental

Innocenty Enikeew 10 роки тому
батько
коміт
87bf719032
8 змінених файлів з 77 додано та 78 видалено
  1. 1 0
      chatikbot.asd
  2. 42 19
      chatikbot.lisp
  3. 4 7
      forecast.lisp
  4. 5 8
      foursquare.lisp
  5. 4 9
      telegram.lisp
  6. 10 13
      tumblr.lisp
  7. 0 6
      utils.lisp
  8. 11 16
      vk.lisp

+ 1 - 0
chatikbot.asd

@@ -11,6 +11,7 @@
                #:drakma
                #:flexi-streams
                #:local-time
+               #:lparallel
                #:log4cl
                #:plump
                #:sqlite

+ 42 - 19
chatikbot.lisp

@@ -20,7 +20,7 @@
      do (setf *telegram-last-update*
               (max (or *telegram-last-update* 0)
                    (aget "update_id" update)))
-     do (handle-message (aget "message" update))))
+     do (lparallel:future (handle-message (aget "message" update)))))
 
 
 (defun send-response (chat-id response &optional reply-id)
@@ -152,10 +152,11 @@
 (defun send-akb (text)
   (log:info "send-akb: ~A" text)
   (dolist (chat-id *akb-send-to*)
-    (handler-case
-        (telegram-send-message chat-id text
-                               :disable-web-preview 1)
-      (error (e) (log:error e)))))
+    (lparallel:future
+      (handler-case
+          (telegram-send-message chat-id text
+                                 :disable-web-preview 1)
+        (error (e) (log:error e))))))
 
 (defun handle-cmd-akb (chat-id message-id args)
   (log:info "handle-cmd-akb" chat-id message-id args)
@@ -178,11 +179,16 @@
 ;; Finance
 (defun process-rates ()
   (handler-case
-      (let ((ts (local-time:timestamp-to-unix (local-time:now)))
-            (rates (get-rates))
-            (brent (get-brent))
-            (btc (get-btc-e)))
-        (db-add-finance ts (aget "USD/RUB" rates) (aget "EUR/RUB" rates) (aget "GBP/RUB" rates) brent btc))
+      (lparallel:plet ((ts (local-time:timestamp-to-unix (local-time:now)))
+                       (rates (get-rates))
+                       (brent (get-brent))
+                       (btc (get-btc-e)))
+        (db-add-finance ts
+                        (aget "USD/RUB" rates)
+                        (aget "EUR/RUB" rates)
+                        (aget "GBP/RUB" rates)
+                        brent
+                        btc))
     (error (e) (log:error e))))
 
 (defun handle-cmd-rates (chat-id message-id args)
@@ -331,8 +337,9 @@
                       (gethash chat-id checkins)))
               (db-fsq-add-seen id created-at))))
         (loop for chat-id being the hash-keys in checkins using (hash-value texts)
-           do (log:info "Sending checkins" chat-id texts)
-             (telegram-send-message chat-id (format nil "~{~A~^~%~}" texts))))
+           do (lparallel:future
+                (log:info "Sending checkins" chat-id texts)
+                (telegram-send-message chat-id (format nil "~{~A~^~%~}" texts)))))
     (error (e) (log:error e))))
 
 
@@ -421,16 +428,24 @@
       (log:error e)
       (telegram-send-message chat-id "Ошибочка вышла"))))
 
-(defun process-feeds ()
+(defun process-feed (feed)
   (handler-case
-      (dolist (feed (remove-if-not #'need-fetch-p (db-rss-get-active-feeds)))
+      (progn
         (log:info "Fetching new items" (feed-url feed))
         (dolist (item (%fetch-new-items feed))
-          (dolist (chat-id (db-rss-get-feed-chats feed))
-            (telegram-send-message chat-id
-                                   (format-feed-item item)
-                                   :disable-web-preview 1)))
-        (db-rss-update-feed feed)) ;; Update next fetch and period
+          (lparallel:pmapc #'(lambda (chat-id)
+                               (telegram-send-message chat-id
+                                                      (format-feed-item item)
+                                                      :disable-web-preview 1))
+                           (db-rss-get-feed-chats feed)))
+        ;; Update next fetch and period
+        (db-rss-update-feed feed))
+    (error (e) (log:error e))))
+
+(defun process-feeds ()
+  (handler-case
+      (lparallel:pmapc 'process-feed
+                       (remove-if-not #'need-fetch-p (db-rss-get-active-feeds)))
     (error (e) (log:error e))))
 
 
@@ -457,7 +472,15 @@
                       process-rates
                       process-feeds) "Enabled schedules")
 
+(defvar *pool-size* 10 "lparallel pool size")
+
 (defun start ()
+  ;; Stop lparallel kernel if any
+  (when lparallel:*kernel*
+    (lparallel:end-kernel))
+  ;; Start new kernel
+  (setf lparallel:*kernel* (lparallel:make-kernel *pool-size*))
+  
   ;; Clear prev threads
   (mapc #'trivial-timers:unschedule-timer (trivial-timers:list-all-timers))
   (let ((old-updates (find "process-updates"

+ 4 - 7
forecast.lisp

@@ -4,13 +4,10 @@
 (defparameter +forecast-api-url+ "https://api.forecast.io/forecast" "forecast.io API endpoint")
 
 (defun forecast (lat lon &key time (currently t) minutely hourly daily alerts)
-  (handler-case
-      (bordeaux-threads:with-timeout (5)
-        (json-request (format nil
-                              "~A/~A/~A,~A~@[,~A~]?units=si&exclude=~:[currently,~;~]~:[minutely,~;~]~:[hourly,~;~]~:[daily,~;~]~:[alerts,~;~]flags&lang=ru"
-                              +forecast-api-url+ *forecast-api-key* lat lon time
-                              currently minutely hourly daily alerts)))
-    (bordeaux-threads:timeout () (error "Timeout"))))
+  (json-request (format nil
+                        "~A/~A/~A,~A~@[,~A~]?units=si&exclude=~:[currently,~;~]~:[minutely,~;~]~:[hourly,~;~]~:[daily,~;~]~:[alerts,~;~]flags&lang=ru"
+                        +forecast-api-url+ *forecast-api-key* lat lon time
+                        currently minutely hourly daily alerts)))
 
 (defvar *forecast-point-formats*
   '((:current . (:year "-" (:month 2) "-" (:day 2) " " (:hour 2) ":" (:min 2)))

+ 5 - 8
foursquare.lisp

@@ -9,14 +9,11 @@
 
 (defun %fsq-api-call (method &optional params)
   (let* ((resp
-          (handler-case
-              (bordeaux-threads:with-timeout (5)
-                (json-request (format nil *fsq-api-url* method)
-                              :parameters (list*
-                                           (cons "oauth_token" *fsq-access-token*)
-                                           (cons "v" "20150811")
-                                           params)))
-            (bordeaux-threads:timeout () (error "Timeout"))))
+          (json-request (format nil *fsq-api-url* method)
+                        :parameters (list*
+                                     (cons "oauth_token" *fsq-access-token*)
+                                     (cons "v" "20150811")
+                                     params)))
          (meta (aget "meta" resp)))
     (when (not (= 200 (aget "code" meta)))
       (error (format nil "Foursquare API error, code ~A, errorType '~A', errorDetail '~A'"

+ 4 - 9
telegram.lisp

@@ -9,16 +9,11 @@
                                                     (princ-to-string k)
                                                     (if (pathnamep v) v
                                                         (princ-to-string v)))))
-         (timeout (+ 5 (or (cdr (assoc :timeout args))
-                           *telegram-timeout*)))
          (response
-          (handler-case
-              (bordeaux-threads:with-timeout (timeout)
-                (json-request (format nil +telegram-api-format+
-                                      *telegram-token* method)
-                              :method :post
-                              :parameters params))
-            (bordeaux-threads:timeout () (error "Timeout")))))
+          (json-request (format nil +telegram-api-format+
+                                *telegram-token* method)
+                        :method :post
+                        :parameters params)))
     (unless (aget "ok" response)
       (error (aget "description" response)))
     (aget "result" response)))

+ 10 - 13
tumblr.lisp

@@ -21,19 +21,16 @@
   (string-downcase (princ-to-string val)))
 
 (defun tumblr-read (domain &rest args)
-  (handler-case
-      (let ((params (loop for (k v) on args by #'cddr
-                       when v collect (cons (lo-string k) (lo-string v)))))
-        (yason:parse
-         (subseq (flexi-streams:octets-to-string
-                  (bordeaux-threads:with-timeout (*read-timeout*)
-                    (drakma:http-request
-                     (format nil "~A/api/read/json" domain)
-                     :parameters params
-                     :force-binary t))
-                  :external-format :utf-8) 22)
-         :object-as :alist))
-    (bordeaux-threads:timeout (e) (error e))))
+  (let ((params (loop for (k v) on args by #'cddr
+                   when v collect (cons (lo-string k) (lo-string v)))))
+    (yason:parse
+     (subseq (flexi-streams:octets-to-string
+              (drakma:http-request
+               (format nil "~A/api/read/json" domain)
+               :parameters params
+               :force-binary t)
+              :external-format :utf-8) 22)
+     :object-as :alist)))
 
 (defun tumblr-random-post (&key (roll *tumblr-roll*) type (num 1))
   (when roll

+ 0 - 6
utils.lisp

@@ -12,12 +12,6 @@
                (funcall func)
                (setf backoff *backoff-start*))
            (error (e)
-             (log:error e)
-             (log:info "Backing off for" backoff)
-             (sleep backoff)
-             (setf backoff (min *backoff-max*
-                                (* 2 backoff))))
-	   (bordeaux-threads:timeout (e)
              (log:error e)
              (log:info "Backing off for" backoff)
              (sleep backoff)

+ 11 - 16
vk.lisp

@@ -3,22 +3,17 @@
 (defparameter +vk-api-url+ "https://api.vk.com/method/~A?v=5.34" "VK.com API endpoint")
 
 (defun %vk-api-call (method &optional args)
-  (handler-case
-      (bordeaux-threads:with-timeout (5)
-        (let* ((params (loop for (k . v) in args
-                          when v
-                          collect (cons
-                                   (princ-to-string k)
-                                   (princ-to-string v))))
-               (response (json-request (format nil +vk-api-url+ method)
-                                       :method :post
-                                       :parameters params)))
-          (when (aget "error" response)
-            (error (aget "error_msg" (aget "error" response))))
-          (aget "response" response)))
-    (bordeaux-threads:timeout (e)
-      (declare (ignore e))
-      (error "Timeout"))))
+  (let* ((params (loop for (k . v) in args
+                    when v
+                    collect (cons
+                             (princ-to-string k)
+                             (princ-to-string v))))
+         (response (json-request (format nil +vk-api-url+ method)
+                                 :method :post
+                                 :parameters params)))
+    (when (aget "error" response)
+      (error (aget "error_msg" (aget "error" response))))
+    (aget "response" response)))
 
 (defun vk-wall-get (&key owner-id domain offset count filter extended)
   (%vk-api-call "wall.get"