package logic import ( "errors" "fmt" "log" "net" "os" "regexp" "slices" "strconv" "strings" "sync" "time" "github.com/samber/mo" "github.com/shirou/gopsutil/v4/process" ) var ( RegParsePid = regexp.MustCompile(`^\[([0-9]+)\]:\s*(.*)$`) RegParseSession = regexp.MustCompile(`(?i)^session\s+(opened|closed)\s+for\s+local\s+user\s+(.*)\s+from\s+\[(.*)\]`) SftpLogMap = make(map[int32]*SftpLogSet) // 全局的,用于记录sftp信息的map,key为进程id,value为日志集 SftpLogLock = sync.RWMutex{} // 保护SftpLogMap的锁 ) type GetSLA interface { GetFfileAction() SftpLogAction SetPid(pid int32) GetPid() int32 } type SftpLogAction string const ( SLAUnknown SftpLogAction = "" SLAOpen SftpLogAction = "open " SLAClose SftpLogAction = "close " SLARemove SftpLogAction = "remove name " SLARename SftpLogAction = "rename old " SLAForceClose SftpLogAction = "forced close " SLAOpenSession SftpLogAction = "session opened" SLACloseSession SftpLogAction = "session closed" ) func parseSLA(s string, t time.Time) GetSLA { prev := "" switch { case strings.HasPrefix(s, string(SLAOpen)): fields := strings.Fields(s) result := SftpLogOpen{Time: t} for i, v := range fields[1:] { if i == 0 { result.Path = strings.Trim(v, `"`) continue } switch v { case "flags", "mode": prev = v default: switch prev { case "flags": result.Flags = strings.Split(v, ",") case "mode": p, err := strconv.ParseUint(v, 8, 32) if err == nil { result.Mode = mo.None[uint32]() } else { result.Mode = mo.Some(uint32(p)) } } } } return &result case strings.HasPrefix(s, string(SLAClose)): fields := strings.Fields(s) result := SftpLogClose{Time: t} for k, v := range fields[1:] { if k == 0 { result.Path = strings.Trim(v, `"`) } switch v { case "bytes", "read", "written": prev = v default: switch prev { case "read": read, err := strconv.ParseUint(v, 10, 64) if err != nil { result.Read = mo.None[uint64]() } else { result.Read = mo.Some(read) } case "written": read, err := strconv.ParseUint(v, 10, 64) if err != nil { result.Write = mo.None[uint64]() } else { result.Write = mo.Some(read) } } } } return &result case strings.HasPrefix(s, string(SLARemove)): fields := strings.Fields(s) result := SftpLogRemove{Time: t} result.Path = fields[2] return &result case strings.HasPrefix(s, string(SLARename)): fields := strings.Fields(s) result := SftpLogRename{Time: t} for _, v := range fields[1:] { switch v { case "old", "new": prev = v default: switch prev { case "old": result.Old = strings.Trim(v, `"`) case "new": result.New = strings.Trim(v, `"`) } } } return &result case strings.HasPrefix(s, string(SLAForceClose)): fields := strings.Fields(s) result := SftpLogForceClose{Time: t} for k, v := range fields[2:] { if k == 0 { result.Path = strings.Trim(v, `"`) } switch v { case "bytes", "read", "written": prev = v default: switch prev { case "read": read, err := strconv.ParseUint(v, 10, 64) if err != nil { result.Read = mo.None[uint64]() } else { result.Read = mo.Some(read) } case "written": read, err := strconv.ParseUint(v, 10, 64) if err != nil { result.Write = mo.None[uint64]() } else { result.Write = mo.Some(read) } } } } return &result case strings.HasPrefix(s, string(SLAOpenSession)): result := SftpLogOpenSession{Time: t} items := RegParseSession.FindStringSubmatch(s) result.User = items[2] result.From = items[3] return &result case strings.HasPrefix(s, string(SLACloseSession)): result := SftpLogCloseSession{Time: t} items := RegParseSession.FindStringSubmatch(s) result.User = items[2] result.From = items[3] return &result default: return nil } } type SftpLogOpen struct { Path string Flags []string Mode mo.Option[uint32] Time time.Time Pid int32 } func (slo *SftpLogOpen) GetFfileAction() SftpLogAction { return SLAOpen } func (slo *SftpLogOpen) GetPid() int32 { return slo.Pid } func (slo *SftpLogOpen) SetPid(pid int32) { slo.Pid = pid } type SftpLogClose struct { Path string Read, Write mo.Option[uint64] Time time.Time Pid int32 } func (s *SftpLogClose) GetFfileAction() SftpLogAction { return SLAClose } func (s *SftpLogClose) GetPid() int32 { return s.Pid } func (s *SftpLogClose) SetPid(pid int32) { s.Pid = pid } type SftpLogRemove struct { Path string Time time.Time Pid int32 } func (slr *SftpLogRemove) GetFfileAction() SftpLogAction { return SLARemove } func (s *SftpLogRemove) GetPid() int32 { return s.Pid } func (s *SftpLogRemove) SetPid(pid int32) { s.Pid = pid } type SftpLogRename struct { Old, New string Time time.Time Pid int32 } func (slr *SftpLogRename) GetFfileAction() SftpLogAction { return SLARename } func (s *SftpLogRename) GetPid() int32 { return s.Pid } func (s *SftpLogRename) SetPid(pid int32) { s.Pid = pid } type SftpLogForceClose struct { Path string Read, Write mo.Option[uint64] Time time.Time Pid int32 } func (slfc *SftpLogForceClose) GetFfileAction() SftpLogAction { return SLAForceClose } func (s *SftpLogForceClose) GetPid() int32 { return s.Pid } func (s *SftpLogForceClose) SetPid(pid int32) { s.Pid = pid } type SftpLogOpenSession struct { Time time.Time User string From string Pid int32 } func (slos *SftpLogOpenSession) GetFfileAction() SftpLogAction { return SLAOpenSession } func (s *SftpLogOpenSession) GetPid() int32 { return s.Pid } func (s *SftpLogOpenSession) SetPid(pid int32) { s.Pid = pid } type SftpLogCloseSession struct { Time time.Time User string From string Pid int32 } func (slos *SftpLogCloseSession) GetFfileAction() SftpLogAction { return SLACloseSession } func (s *SftpLogCloseSession) GetPid() int32 { return s.Pid } func (s *SftpLogCloseSession) SetPid(pid int32) { s.Pid = pid } // SftpLogSet 存储一个sftp进程的相关日志信息 type SftpLogSet struct { Pid int32 // 进程pid User string // 用户名或uid From string // 连接地址 SessionStart mo.Option[time.Time] // 对话开始时间 SessionClose mo.Option[time.Time] // 会话断开时间 OpenedFile map[string]*FileInfo // 文件日志 IsTabby mo.Option[bool] // 进程是否为tabby IsAlive bool // 进程是否存活 Lock sync.RWMutex // 保护OpenedFile读写的锁 } func NewSftpLogSet(pid int32, user *string, startTime *time.Time) *SftpLogSet { result := &SftpLogSet{ Pid: pid, User: "", SessionStart: mo.None[time.Time](), SessionClose: mo.None[time.Time](), OpenedFile: make(map[string]*FileInfo), IsTabby: mo.None[bool](), IsAlive: true, Lock: sync.RWMutex{}, } if user != nil { result.User = *user } else { p, err := process.NewProcess(pid) if err == nil { user, err := p.Username() if err == nil { result.User = user } else { uids, err := p.Uids() if err == nil && len(uids) > 0 { result.User = fmt.Sprintf("UID: %d", uids[0]) } } } } if startTime != nil { result.SessionStart = mo.Some(*startTime) } return result } // CheckAlive 检查进程是否存活 func (sls *SftpLogSet) CheckAlive() (bool, error) { p, err := process.NewProcess(sls.Pid) if err != nil { sls.IsAlive = false return false, err } if p == nil { sls.IsAlive = false return false, nil } cmd, err := p.Cmdline() if err != nil { sls.IsAlive = false return false, err } alive := strings.Contains(cmd, "sftp-server") sls.IsAlive = alive return alive, nil } type FileInfo struct { Path string // 文件路径,对于tabby等中途会修改的sftp客户端,这里记录有后缀的名称 Log []GetSLA // 文件操作日志 LogLock sync.RWMutex // 保护Log的读写 LogSet *SftpLogSet // 这个文件信息所属的日志集,用于获取用户名和pid } func NewFileInfo(path string, ls *SftpLogSet) *FileInfo { return &FileInfo{ Path: path, Log: make([]GetSLA, 0, 4), LogSet: ls, LogLock: sync.RWMutex{}, } } // CheckNeedScan 检查是否需要扫描文件 func (fi *FileInfo) CheckNeedScan() { // 检查最后一个日志的种类 rl := fi.LogLock.RLocker() rl.Lock() l := len(fi.Log) if l == 0 { return } last := fi.Log[l-1] userName := fi.LogSet.User rl.Unlock() switch act := last.(type) { case *SftpLogClose: if strings.HasSuffix(act.Path, ".tabby-upload") { return } fi.LogSet.Lock.Lock() delete(fi.LogSet.OpenedFile, act.Path) fi.LogSet.Lock.Unlock() log.Printf("user %s upload file: %s, scanning...\n", userName, act.Path) have, _ := ScanFile(act.Path) if have { log.Printf("user %s upload file %s containing viruses\n", userName, act.Path) } else { log.Printf("user %s upload file %s not find virus\n", userName, act.Path) } return case *SftpLogRename: if strings.HasSuffix(act.Old, ".tabby-upload") { log.Printf("user %s upload file: %s, scanning...\n", userName, act.New) have, _ := ScanFile(act.New) if have { log.Printf("user %s upload file %s containing viruses\n", userName, act.New) } else { log.Printf("user %s upload file %s not find virus\n", userName, act.New) } fi.LogSet.Lock.Lock() delete(fi.LogSet.OpenedFile, act.Old) delete(fi.LogSet.OpenedFile, act.New) fi.LogSet.Lock.Unlock() } return case *SftpLogForceClose: log.Printf("user %s upload file: %s, scanning...\n", userName, act.Path) have, _ := ScanFile(act.Path) if have { log.Printf("user %s upload file %s containing viruses\n", userName, act.Path) } else { log.Printf("user %s upload file %s not find virus\n", userName, act.Path) } fi.LogSet.Lock.Lock() delete(fi.LogSet.OpenedFile, act.Path) fi.LogSet.Lock.Unlock() return } } func InsertAction(action GetSLA) { if action == nil { return } pid := action.GetPid() switch act := action.(type) { case *SftpLogOpenSession: // 新建 SftpLogLock.Lock() sls := NewSftpLogSet(pid, &act.User, &act.Time) sls.From = act.From SftpLogMap[pid] = sls SftpLogLock.Unlock() return case *SftpLogCloseSession: SftpLogLock.Lock() delete(SftpLogMap, pid) SftpLogLock.Unlock() return case *SftpLogOpen: if slices.Contains(act.Flags, "READ") { // 是读文件,不理会 return } SftpLogLock.Lock() ls, have := SftpLogMap[pid] if !have { ls = NewSftpLogSet(pid, nil, &act.Time) SftpLogMap[pid] = ls } SftpLogLock.Unlock() istabby := strings.HasSuffix(act.Path, ".tabby-upload") ls.IsTabby = mo.Some(istabby) ls.Lock.Lock() finfo, have := ls.OpenedFile[act.Path] if !have { finfo = NewFileInfo(act.Path, ls) ls.OpenedFile[act.Path] = finfo ls.Lock.Unlock() finfo.LogLock.Lock() finfo.Log = append(finfo.Log, act) finfo.LogLock.Unlock() } else { // 发生重复了??? ls.Lock.Unlock() } return case *SftpLogClose: if num, have := act.Write.Get(); have && num == 0 { // 没有写入数据,不处理 return } SftpLogLock.Lock() ls, have := SftpLogMap[pid] if !have { ls = NewSftpLogSet(pid, nil, &act.Time) SftpLogMap[pid] = ls } SftpLogLock.Unlock() ls.Lock.Lock() if strings.HasSuffix(act.Path, ".tabby-upload") { ls.IsTabby = mo.Some(true) } finfo, have := ls.OpenedFile[act.Path] if !have { finfo = NewFileInfo(act.Path, ls) ls.OpenedFile[act.Path] = finfo } ls.Lock.Unlock() finfo.LogLock.Lock() finfo.Log = append(finfo.Log, act) finfo.LogLock.Unlock() go finfo.CheckNeedScan() return case *SftpLogRemove: // 查看是否为tabby,如果是tabby,需要检查是否为上传的文件 // 如果没有,不创建日志 SftpLogLock.Lock() ls, have := SftpLogMap[pid] if !have { ls = NewSftpLogSet(pid, nil, &act.Time) SftpLogMap[pid] = ls } SftpLogLock.Unlock() a, b := ls.IsTabby.Get() if a && b { delete(ls.OpenedFile, act.Path) } return case *SftpLogRename: // 查看是否为tabby,如果是tabby,需要检查是否为上传的文件 istabby := strings.HasSuffix(act.Old, ".tabby-upload") if istabby { SftpLogLock.Lock() ls, have := SftpLogMap[pid] if !have { ls = NewSftpLogSet(pid, nil, &act.Time) SftpLogMap[pid] = ls } SftpLogLock.Unlock() ls.IsTabby = mo.Some(istabby) // 是tabby上传的文件,需要记录日志 ls.Lock.Lock() finfo, have := ls.OpenedFile[act.Old] if !have { finfo = NewFileInfo(act.Old, ls) ls.OpenedFile[act.Old] = finfo } ls.Lock.Unlock() finfo.LogLock.Lock() finfo.Log = append(finfo.Log, act) finfo.LogLock.Unlock() go finfo.CheckNeedScan() } return case *SftpLogForceClose: if num, have := act.Write.Get(); have && num == 0 { return } SftpLogLock.Lock() // 是强制中断,需要检查是否为上传的文件 ls, have := SftpLogMap[pid] if !have { ls = NewSftpLogSet(pid, nil, &act.Time) SftpLogMap[pid] = ls } SftpLogLock.Unlock() ls.Lock.Lock() finfo, have := ls.OpenedFile[act.Path] if !have { finfo = NewFileInfo(act.Path, ls) ls.OpenedFile[act.Path] = finfo } ls.Lock.Unlock() finfo.LogLock.Lock() finfo.Log = append(finfo.Log, act) finfo.LogLock.Unlock() go finfo.CheckNeedScan() return } } func ParseSftpLog(s string) (GetSLA, error) { if !strings.Contains(s, "sftp-server") { // 不是sftp日志 return nil, nil } items := strings.SplitN(s, "sftp-server", 2) if len(items) != 2 { return nil, errors.New("parse error") } fs := strings.Split(strings.Trim(items[0], " "), " ") if len(fs) != 4 { return nil, errors.New("parse error") } t, err := time.Parse(time.Stamp, strings.Join(fs[:3], " ")) t = t.AddDate(time.Now().Year(), 0, 0) if err != nil { return nil, err } if !RegParsePid.MatchString(items[1]) { return nil, errors.New("parse error") } fs = RegParsePid.FindStringSubmatch(items[1]) if len(fs) != 3 { return nil, errors.New("parse error") } pid, err := strconv.ParseInt(fs[1], 10, 32) if err != nil { return nil, err } fta := parseSLA(fs[2], t) if fta == nil { return nil, nil } fta.SetPid(int32(pid)) return fta, nil } func StartSftpMonitor() { regRemove := regexp.MustCompile(`^<\d+>(.*)$`) os.Remove("/tmp/rsyslog.sock") conn, err := net.ListenPacket("unixgram", "/tmp/rsyslog.sock") if err != nil { log.Fatal(err) } defer os.Remove("/tmp/rsyslog.sock") buffer := make([]byte, 16384) for { n, _, err := conn.ReadFrom(buffer) if err != nil { break } content := string(buffer[:n]) items := regRemove.FindStringSubmatch(content) if len(items) == 0 { continue } if strings.Contains(items[1], "sftp-server") { l, err := ParseSftpLog(items[1]) if err != nil { log.Println(err) } else { if l != nil { InsertAction(l) } } } } }