1
0
فهرست منبع

RSS in sqlite, lastrss

Innocenty Enikeew 10 سال پیش
والد
کامیت
daed24c287
4فایلهای تغییر یافته به همراه194 افزوده شده و 116 حذف شده
  1. 65 83
      chatikbot.lisp
  2. 98 6
      db.lisp
  3. 30 26
      rss.lisp
  4. 1 1
      utils.lisp

+ 65 - 83
chatikbot.lisp

@@ -75,6 +75,7 @@
               (:checkins (handle-cmd-checkins chat-id id args))
               (:rss (handle-cmd-rss chat-id id args))
               (:feeds (handle-cmd-feeds chat-id id args))
+              (:lastrss (handle-cmd-last-rss chat-id id args))
               (otherwise (handle-admin-cmd chat-id text cmd args))))
           (send-dont-understand chat-id (preprocess-input text))))
     (when location
@@ -336,8 +337,19 @@
 
 
 ;; RSS
-(defvar *rss-feeds* nil "All aggragated RSS feeds")
-(defvar *rss-chat-feeds* (make-hash-table) "Chat->Feeds mapping")
+(defun handle-cmd-feeds (chat-id message-id args)
+  (log:info "handle-cmd-feeds" chat-id message-id args)
+  (handler-case
+      (telegram-send-message
+       chat-id
+       (if (null args)
+           "URL давай"
+           (format nil "~:[Не нашел RSS там~;~:*~{~{~A - ~A~}~^~%~}~]"
+                   (find-rss-links (car args))))
+       :disable-web-preview 1)
+    (error (e)
+      (log:error e)
+      (telegram-send-message chat-id "Ошибочка вышла"))))
 
 (defun %send-feeds (chat-id feeds)
   (telegram-send-message
@@ -350,42 +362,26 @@
                   append (list index (feed-title feed) (feed-url feed)))))
    :disable-web-preview 1))
 
+(defun %fetch-new-items (feed)
+  (prog1
+      (loop for item in (refresh-feed feed #'db-rss-item-exists)
+         do (db-rss-add-item item)
+         collect item)
+    (db-rss-update-feed feed)))
+
 (defun %get-feed (url)
   (when url
-    (or (find url *rss-feeds* :key #'feed-url :test #'equal)
+    (or (db-rss-get-feed-by-url url)
         (alexandria:when-let (feed (build-feed url))
           (log:info "Added feed" feed)
-          (fetch-new-items feed)
-          (push feed *rss-feeds*)
+          (db-rss-add-feed feed)
+          (%fetch-new-items feed)
           feed))))
 
-(defun %used-feed-p (feed)
-  (loop for feeds being the hash-values in *rss-chat-feeds*
-     when (member feed feeds)
-     do (return t)))
-
-(defun %refresh-feeds ()
-  (setf *rss-feeds*
-        (remove-if-not #'%used-feed-p *rss-feeds*)))
-
-(defun handle-cmd-feeds (chat-id message-id args)
-  (log:info "handle-cmd-feeds" chat-id message-id args)
-  (handler-case
-      (telegram-send-message
-       chat-id
-       (if (null args)
-           "URL давай"
-           (format nil "~:[Не нашел RSS там~;~:*~{~{~A - ~A~}~^~%~}~]"
-                   (find-rss-links (car args))))
-       :disable-web-preview 1)
-    (error (e)
-      (log:error e)
-      (telegram-send-message chat-id "Ошибочка вышла"))))
-
 (defun handle-cmd-rss (chat-id message-id args)
   (log:info "handle-cmd-rss" chat-id message-id args)
   (handler-case
-      (let ((feeds (gethash chat-id *rss-chat-feeds*)))
+      (let ((feeds (db-rss-get-chat-feeds chat-id)))
         (if (null args)
             (%send-feeds chat-id feeds)
             (progn
@@ -398,43 +394,46 @@
                     (alexandria:when-let (feed (%get-feed
                                                 (or (cadar (find-rss-links url))
                                                     url)))
-                      (if (member feed feeds)
-                          (setf feeds (remove feed feeds))
-                          (push feed feeds))))
-                  (error (e) (log:error e))))                
-              (setf (gethash chat-id *rss-chat-feeds*) feeds)
-              (%refresh-feeds)
-              (save-settings)
+                      (let ((existing (find (feed-url feed) feeds :key #'feed-url :test #'equal)))
+                        (if existing
+                            (setf feeds (remove existing feeds))
+                            (push feed feeds)))))
+                  (error (e) (log:error e))))
+              (log:info feeds)
+              (db-rss-set-chat-feeds chat-id feeds)
               (%send-feeds chat-id feeds))))
     (error (e)
       (log:error e)
       (telegram-send-message chat-id "Ошибочка вышла"))))
 
-(defun %feed-send-to (feed)
-  (loop for chat-id being the hash-keys in *rss-chat-feeds* using (hash-value feeds)
-     when (member feed feeds)
-     collect chat-id))
+(defun handle-cmd-last-rss (chat-id message-id args)
+  (log:info "handle-cmd-last-rss" chat-id message-id args)
+  (handler-case
+      (let ((feeds (db-rss-get-chat-feeds chat-id)))
+        (if (null args)
+            (%send-feeds chat-id feeds)
+            (let* ((idx (1- (parse-integer (car args))))
+                   (limit (if (> (length args) 1) (parse-integer (second args)) 10))
+                   (items (db-rss-last-feed-items (nth idx feeds) limit)))
+              (telegram-send-message chat-id
+                                     (format nil "~{~A~^~%~%~}"
+                                             (mapcar #'format-feed-item items))
+                                     :disable-web-preview 1))))
+    (error (e)
+      (log:error e)
+      (telegram-send-message chat-id "Ошибочка вышла"))))
 
 (defun process-feeds ()
   (handler-case
-      (dolist (feed (remove-if-not #'need-fetch-p *rss-feeds*))
+      (dolist (feed (remove-if-not #'need-fetch-p (db-rss-get-active-feeds)))
         (log:info "Fetching new items" (feed-url feed))
-        (dolist (item (fetch-new-items feed))
-          (dolist (chat-id (%feed-send-to feed))
+        (dolist (item (%fetch-new-items feed))
+          (dolist (chat-id (db-rss-get-feed-chats feed))
             (telegram-send-message chat-id
-                                   (format-feed-item feed item)
+                                   (format-feed-item item)
                                    :disable-web-preview 1))))
     (error (e) (log:error e))))
 
-(defun %load-rss-feeds (alist)
-  (alexandria:alist-hash-table
-   (loop for (chat-id . urls) in alist
-      collect (cons chat-id (mapcar #'%get-feed urls)))))
-
-(defun %save-rss-feeds ()
-  (loop for chat-id being the hash-keys in *rss-chat-feeds* using (hash-value feeds)
-     collect (cons chat-id (mapcar #'feed-url feeds))))
-
 
 (defvar *save-settings-lock* (bordeaux-threads:make-lock "save-settings-lock")
   "Lock for multithreading access to write settings file")
@@ -450,11 +449,15 @@
       (write
        `(setf *chat-locations* ',*chat-locations*
               *akb-send-to* ',*akb-send-to*
-              *akb-last-id* ,*akb-last-id*
-              *rss-chat-feeds* (%load-rss-feeds ',(%save-rss-feeds)))
+              *akb-last-id* ,*akb-last-id*)
        :stream s)
       (values))))
 
+(defvar *schedules* '(process-latest-akb
+                      process-latest-checkins
+                      process-rates
+                      process-feeds) "Enabled schedules")
+
 (defun start ()
   ;; Clear prev threads
   (mapc #'trivial-timers:unschedule-timer (trivial-timers:list-all-timers))
@@ -471,34 +474,13 @@
                                                 (asdf:find-system '#:chatikbot)))))
     (load file))
   ;; Start timers
-  (clon:schedule-function
-   (lambda () (process-latest-akb))
-   (clon:make-scheduler
-    (clon:make-typed-cron-schedule :minute '* :hour '*)
-    :allow-now-p t)
-   :name 'process-latest-akb
-   :thread t)
-  (clon:schedule-function
-   (lambda () (process-latest-checkins))
-   (clon:make-scheduler
-    (clon:make-typed-cron-schedule :minute '* :hour '*)
-    :allow-now-p t)
-   :name 'process-latest-checkins
-   :thread t)
-  (clon:schedule-function
-   (lambda () (process-rates))
-   (clon:make-scheduler
-    (clon:make-typed-cron-schedule :minute '* :hour '*)
-    :allow-now-p t)
-   :name 'process-rates
-   :thread t)
-  (clon:schedule-function
-   (lambda () (process-feeds))
-   (clon:make-scheduler
-    (clon:make-typed-cron-schedule :minute '* :hour '*)
-    :allow-now-p t)
-   :name 'process-feeds
-   :thread t)
+  (dolist (func *schedules*)
+    (clon:schedule-function func
+                            (clon:make-scheduler
+                             (clon:make-typed-cron-schedule :minute '* :hour '*)
+                             :allow-now-p t)
+                            :name func
+                            :thread t))
   ;; Start getUpdates thread
   (bordeaux-threads:make-thread
    (lambda ()

+ 98 - 6
db.lisp

@@ -9,10 +9,11 @@
 
 (defmacro with-db ((db) &body body)
   `(sqlite:with-open-database (,db (db-path) :busy-timeout 10)
+     (sqlite:execute-non-query ,db "PRAGMA foreign_keys = ON")
      ,@body))
 
 (defun db-init ()
-  (sqlite:with-open-database (db (db-path))
+  (with-db (db)
     ;; Finance
     (sqlite:execute-non-query db "create table if not exists finance (ts, usd, eur, gbp, brent)")
     (sqlite:execute-non-query db "create index if not exists fin_ts_ids on finance (ts)")
@@ -21,18 +22,27 @@
     (sqlite:execute-non-query db "create table if not exists fsq_chat_users (chat_id, user_id)")
     (sqlite:execute-non-query db "create index if not exists fsq_chat_users_chat_idx on fsq_chat_users (chat_id)")
     (sqlite:execute-non-query db "create index if not exists fsq_chat_users_user_idx on fsq_chat_users (user_id)")
-
     (sqlite:execute-non-query db "create table if not exists fsq_seen (id, created_at)")
-    (sqlite:execute-non-query db "create index if not exists fsq_seen_idx on fsq_seen (id)")))
+    (sqlite:execute-non-query db "create index if not exists fsq_seen_idx on fsq_seen (id)")
+
+    ;; RSS
+    (sqlite:execute-non-query db "create table if not exists rss_feeds (id INTEGER PRIMARY KEY, url, title, next_fetch, period)")
+    (sqlite:execute-non-query db "create unique index if not exists rss_feeds_url_idx on rss_feeds (url)")
+    (sqlite:execute-non-query db "create table if not exists rss_items (id INTEGER PRIMARY KEY, feed_id REFERENCES rss_feeds, guid, link, title, published)")
+    (sqlite:execute-non-query db "create index if not exists rss_items_idx on rss_items (feed_id, guid)")
+
+    (sqlite:execute-non-query db "create table if not exists rss_chat_feeds (chat_id, feed_id REFERENCES rss_feeds)")
+    (sqlite:execute-non-query db "create index if not exists rss_chat_feeds_chat_idx on rss_chat_feeds (chat_id)")
+    (sqlite:execute-non-query db "create index if not exists rss_chat_feeds_feed_idx on rss_chat_feeds (feed_id)")))
 
 ;; Finance
 (defun db-add-finance (ts usd eur gbp brent)
-  (sqlite:with-open-database (db (db-path) :busy-timeout 10)
+  (with-db (db)
     (sqlite:execute-non-query db "insert into finance (ts, usd, eur, gbp, brent) values (?, ?, ?, ?, ?)"
                               ts usd eur gbp brent)))
 
 (defun db-get-last-finance ()
-  (sqlite:with-open-database (db (db-path) :busy-timeout 10)
+  (with-db (db)
     (sqlite:execute-one-row-m-v db "select ts, usd, eur, gbp, brent from finance order by ts desc limit 1")))
 
 (defun %finance-alist (statement)
@@ -48,7 +58,7 @@
   (let ((sql (format nil
                      "select ts/~a*~a~:[~;,avg(usd) as usd~]~:[~;,avg(eur) as eur~]~:[~;,avg(gbp) as gbp~]~:[~;,avg(brent) as brent~] from finance where ts >= ? group by ts/~a order by ts"
                      avg avg usd eur gbp brent avg)))
-    (sqlite:with-open-database (db (db-path) :busy-timeout 10)
+    (with-db (db)
       (loop
          with statement = (sqlite:prepare-statement db sql)
          initially (sqlite:bind-parameter statement 1 (local-time:timestamp-to-unix after-ts))
@@ -83,3 +93,85 @@
 (defun db-fsq-last-created ()
   (with-db (db)
     (sqlite:execute-single db "select created_at from fsq_seen order by created_at desc limit 1")))
+
+
+;; RSS
+(defun %make-feed (row)
+  (when row
+    (make-feed :id (nth 0 row)
+               :url (nth 1 row)
+               :title (nth 2 row)
+               :next-fetch (when (nth 3 row) (local-time:unix-to-timestamp (nth 3 row)))
+               :period (nth 4 row))))
+
+(defun db-rss-get-feed-by-url (url)
+  (with-db (db)
+    (%make-feed (car (sqlite:execute-to-list db "select id, url, title, next_fetch, period from rss_feeds where url = ?" url)))))
+
+(defun db-rss-add-feed (feed)
+  (with-db (db)
+    (sqlite:execute-non-query db "insert into rss_feeds (url, title, next_fetch, period) values (?, ?, ?, ?)"
+                              (feed-url feed)
+                              (feed-title feed)
+                              (feed-next-fetch-unix feed)
+                              (feed-period feed))
+    (setf (feed-id feed) (sqlite:last-insert-rowid db))
+    feed))
+
+(defun db-rss-update-feed (feed)
+  (with-db (db)
+    (sqlite:execute-non-query db "update rss_feeds set title = ?, next_fetch = ?, period = ? where id = ?"
+                              (feed-title feed)
+                              (feed-next-fetch-unix feed)
+                              (feed-period feed)
+                              (feed-id feed))))
+
+(defun db-rss-get-active-feeds ()
+  (with-db (db)
+    (mapcar #'%make-feed
+            (sqlite:execute-to-list db "select id, url, title, next_fetch, period from rss_feeds where exists (select 1 from rss_chat_feeds where feed_id=id)"))))
+
+(defun db-rss-get-chat-feeds (chat-id)
+  (with-db (db)
+    (mapcar #'%make-feed
+            (sqlite:execute-to-list db "select id, url, title, next_fetch, period from rss_feeds where id in (select feed_id from rss_chat_feeds where chat_id  = ?)" chat-id))))
+
+(defun db-rss-get-feed-chats (feed)
+  (flatten (with-db (db)
+             (sqlite:execute-to-list db "select chat_id from rss_chat_feeds where feed_id = ?"
+                                     (feed-id feed)))))
+
+(defun db-rss-set-chat-feeds (chat-id feeds)
+  (with-db (db)
+    (sqlite:with-transaction db
+      (sqlite:execute-non-query db "delete from rss_chat_feeds where chat_id = ?" chat-id)
+      (dolist (feed feeds)
+        (sqlite:execute-non-query db "insert into rss_chat_feeds (chat_id, feed_id) values (?, ?)" chat-id (feed-id feed))))))
+
+(defun db-rss-item-exists (item)
+  (with-db (db)
+    (sqlite:execute-single db "select id from rss_items where feed_id = ? and guid = ? limit 1"
+                           (feed-id (feed-item-feed item))
+                           (feed-item-guid item))))
+
+(defun db-rss-add-item (item)
+  (with-db (db)
+    (sqlite:execute-non-query db "insert into rss_items (feed_id, guid, link, title, published) values (?, ?, ?, ?, ?)"
+                              (feed-id (feed-item-feed item))
+                              (feed-item-guid item)
+                              (feed-item-link item)
+                              (feed-item-title item)
+                              (feed-item-published-unix item))))
+
+(defun %make-feed-item (feed row)
+  (when row
+    (make-feed-item :feed feed
+                    :guid (nth 0 row)
+                    :link (nth 1 row)
+                    :title (nth 2 row)
+                    :published (when (nth 3 row) (local-time:unix-to-timestamp (nth 3 row))))))
+
+(defun db-rss-last-feed-items (feed &optional (limit 10))
+  (with-db (db)
+    (mapcar #'(lambda (row) (%make-feed-item feed row))
+            (sqlite:execute-to-list db "select guid, link, title, published from rss_items where feed_id = ? order by published desc, id desc limit ?" (feed-id feed) limit))))

+ 30 - 26
rss.lisp

@@ -1,11 +1,11 @@
 (in-package #:chatikbot)
 
-(defstruct feed url title seen-guids next-fetch (period 300))
-(defstruct feed-item guid link title description published)
+(defstruct feed id url title seen-guids next-fetch (period 300))
+(defstruct feed-item feed guid link title description published)
 
 (defparameter *rss-min-period* 60 "Min rss refresh period in seconds")
 (defparameter *rss-max-period* 1800 "Max rss refresh period in seconds")
-(defparameter *rss-change-rate* 0.1 "Refresh period adjustment rata")
+(defparameter *rss-change-rate* 0.1 "Refresh period adjustment rate")
 
 (defun find-rss-links (url)
   (handler-case
@@ -24,30 +24,25 @@
     (alexandria:when-let (rss (car (get-by-tag root "rss")))
       (make-feed :url url :title (child-text rss "title")))))
 
-(defun adjust-period (feed coeff)
+(defun adjust-period (period new-items)
   "Adjust the period of feed based on whenever there were new items. With clamping"
-  (let* ((p (feed-period feed))
-         (diff (round (* p *rss-change-rate*))))
-    (setf (feed-period feed)
-          (min *rss-max-period*
-               (max *rss-min-period*
-                    (- p (* coeff diff)))))))
+  (let ((diff (round (* period *rss-change-rate*))))
+    (min *rss-max-period*
+         (max *rss-min-period*
+              (- period (* diff (if (zerop new-items) -1 new-items)))))))
 
 (defun need-fetch-p (feed)
   (or (null (feed-next-fetch feed))
       (local-time:timestamp> (local-time:now) (feed-next-fetch feed))))
 
-(defun fetch-new-items (feed)
-  (let ((items
-         (loop for item in (parse-rss (feed-url feed))
-            unless (member (feed-item-guid item) (feed-seen-guids feed) :test #'equal)
-            do (pushnew (feed-item-guid item) (feed-seen-guids feed) :test #'equal)
-            and collect item)))
-    (adjust-period feed (if (consp items) (length items) -1))
-    (setf (feed-next-fetch feed)
-          (local-time:timestamp+ (local-time:now)
-                                 (feed-period feed)
-                                 :sec))
+(defun refresh-feed (feed &optional (skip-p #'not))
+  (let* ((items
+          (loop for item in (fetch-feed-items feed)
+             unless (funcall skip-p item)
+             collect item))
+         (new-period (adjust-period (feed-period feed) (length items))))
+    (setf (feed-period feed) new-period
+          (feed-next-fetch feed) (local-time:timestamp+ (local-time:now) new-period :sec))
     items))
 
 (defun trim-nil (text)
@@ -74,10 +69,11 @@
 (defun clean-text (text)
   (when text (trim-nil (plump:text (plump:parse text)))))
 
-(defun parse-rss (url)
+(defun fetch-feed-items (feed)
   (let ((plump:*tag-dispatchers* plump:*xml-tags*))
-    (loop for item in (get-by-tag (xml-request url) "item")
-       collect (make-feed-item :guid (or (child-text item "guid") (child-text item "link"))
+    (loop for item in (get-by-tag (xml-request (feed-url feed)) "item")
+       collect (make-feed-item :feed feed
+                               :guid (or (child-text item "guid") (child-text item "link"))
                                :link (child-text item "link")
                                :title (clean-text (child-text item "title"))
                                :description (clean-text (child-text item "description"))
@@ -89,9 +85,9 @@
                                    (local-time:universal-to-timestamp
                                     (date-time-parser:parse-date-time pub))))))))
 
-(defun format-feed-item (feed item)
+(defun format-feed-item (item)
   (format nil "~A~@[ @ ~A~]~%~A~%~A"
-          (feed-title feed)
+          (feed-title (feed-item-feed item))
           (alexandria:when-let (ts (feed-item-published item))
             (local-time:format-timestring
              nil ts
@@ -99,3 +95,11 @@
                        " " (:hour 2) ":" (:min 2))))
           (feed-item-title item)
           (feed-item-link item)))
+
+(defun feed-next-fetch-unix (feed)
+  (alexandria:when-let (ts (feed-next-fetch feed))
+    (local-time:timestamp-to-unix ts)))
+
+(defun feed-item-published-unix (item)
+  (alexandria:when-let (ts (feed-item-published item))
+    (local-time:timestamp-to-unix ts)))

+ 1 - 1
utils.lisp

@@ -114,7 +114,7 @@ is replaced with replacement."
        uri))))
 
 (defun get-by-tag (node tag)
-  (nreverse (plump::get-elements-by-tag-name node tag)))
+  (nreverse (org.shirakumo.plump.dom::get-elements-by-tag-name node tag)))
 
 ;; JSON processing
 (defun json-request (url &key (method :get) parameters (object-as :alist))